<> <> <> <> <> DIRECTORY BasicTime, FS, FSRemoteFile, IO, Process, Rope, STPStreams; STPStreamsImpl: CEDAR MONITOR IMPORTS BasicTime, IO, FSRemoteFile, Process EXPORTS STPStreams = BEGIN buffSize: NAT = 200; STPStreamDataRef: TYPE = REF STPStreamData; STPStreamData: TYPE = RECORD [-- describes both, an input stream and an outputstream nonFull: CONDITION, nonEmpty: CONDITION, eof: BOOL_FALSE, hasBlock: BOOL, <<--if hasBlock>> block: REF READONLY TEXT_NIL, startIndex: NAT, count: NAT, <<--if not hasBlock>> in, out, num: NAT_0, buff: ARRAY [0..buffSize) OF CHAR ]; Confirm: FSRemoteFile.ConfirmProc = {RETURN [TRUE]}; --Users--OpenSTP: PUBLIC PROC [ fileName: Rope.ROPE, accessOptions: FS.AccessOptions _ read, host: Rope.ROPE _ NIL ] RETURNS [ self: IO.STREAM ] = BEGIN -- simplified for output to Oliver servers like [vice] SELECT accessOptions FROM create => NULL; ENDCASE => ERROR IO.Error[NotImplementedForThisStream, NIL]; BEGIN sd: STPStreamDataRef _ NEW[STPStreamData]; userProcs: REF IO.StreamProcs = IO.CreateStreamProcs[ variety: $output, class: $JacobiSTPUsersOutputStreams, putChar: UsersSendCharToSTP, putBlock: UsersSendBlockToSTP, close: UsersCloseSTP ]; invertedProcs: REF IO.StreamProcs = IO.CreateStreamProcs[ variety: $input, class: $JacobiSTPsGetFromUserStreams, getChar: STPsGetCharFromUser, getBlock: STPsGetBlockFromUser, endOf: STPsEndOf ]; inverted: IO.STREAM; SELECT accessOptions FROM create => NULL; ENDCASE => ERROR IO.Error[NotImplementedForThisStream, NIL]; IF host=NIL THEN host _ "indigo"; <<--initialize conditions; is it necessary, or is something done automaticelly???????>> <<--DisableTimeout is not necessary really>> TRUSTED { Process.DisableTimeout[@sd.nonEmpty]; Process.EnableAborts[@sd.nonEmpty]; Process.DisableTimeout[@sd.nonFull]; Process.EnableAborts[@sd.nonFull]; }; self _ IO.CreateStream[userProcs, sd]; inverted _ IO.CreateStream[invertedProcs, sd]; TRUSTED { Process.Detach[ FORK ForkedSTPStore[fileName: fileName, host: host, inStream: inverted] ] }; END END; ForkedSTPStore: PROC [fileName: Rope.ROPE, host: Rope.ROPE, inStream: IO.STREAM] = BEGIN FSRemoteFile.Store[ server: host, file: fileName, str: inStream, created: BasicTime.Now[],--may raise TimeNotKnown proc: Confirm ]; END; UsersSendBlockToSTP: ENTRY PROC [ self: IO.STREAM, block: REF READONLY TEXT, startIndex: NAT, count: NAT ] = BEGIN ENABLE UNWIND => NULL; sd: STPStreamDataRef = NARROW[self.streamData]; IF count=0 THEN RETURN; WHILE sd.num>0 DO WAIT sd.nonFull ENDLOOP; IF sd.hasBlock THEN ERROR; sd.hasBlock _ TRUE; sd.block _ block; sd.startIndex _ startIndex; sd.count _ count; BROADCAST sd.nonEmpty; WHILE sd.hasBlock DO WAIT sd.nonFull ENDLOOP END; STPsGetBlockFromUser: ENTRY PROC [ self: IO.STREAM, block: REF TEXT, startIndex: NAT, count: NAT] RETURNS [ nBytesRead: NAT ] = BEGIN ENABLE UNWIND => NULL; sd: STPStreamDataRef = NARROW[self.streamData]; WHILE sd.num<=0 AND NOT sd.hasBlock DO IF sd.eof THEN RETURN WITH ERROR IO.EndOfStream[self] ELSE WAIT sd.nonEmpty ENDLOOP; IF sd.num>0 THEN { sd.num _ sd.num-1; sd.out _ (sd.out+1) MOD buffSize; block[startIndex] _ sd.buff[sd.out]; nBytesRead _ 1 } ELSE IF sd.hasBlock THEN { n: NAT = MAX[count, sd.count]; THROUGH [0..n) DO block[startIndex] _ sd.block[sd.startIndex]; startIndex _ startIndex+1; sd.startIndex _ sd.startIndex+1; ENDLOOP; sd.count _ sd.count-n; IF sd.count=0 THEN sd.hasBlock_FALSE; } ELSE ERROR; BROADCAST sd.nonFull END; UsersSendCharToSTP: ENTRY PROC [ self: IO.STREAM, char: CHAR ] = BEGIN ENABLE UNWIND => NULL; sd: STPStreamDataRef = NARROW[self.streamData]; IF sd.hasBlock THEN ERROR; WHILE sd.num>=buffSize OR sd.hasBlock DO WAIT sd.nonFull ENDLOOP; sd.num _ sd.num+1; sd.in _ (sd.in+1) MOD buffSize; sd.buff[sd.in] _ char; BROADCAST sd.nonEmpty END; STPsGetCharFromUser: ENTRY PROC [ self: IO.STREAM] RETURNS [ char: CHAR ] = BEGIN ENABLE UNWIND => NULL; sd: STPStreamDataRef = NARROW[self.streamData]; WHILE sd.num<=0 AND NOT sd.hasBlock DO IF sd.eof THEN RETURN WITH ERROR IO.EndOfStream[self] ELSE WAIT sd.nonEmpty ENDLOOP; IF sd.num>0 THEN { sd.num _ sd.num-1; sd.out _ (sd.out+1) MOD buffSize; char _ sd.buff[sd.out]; } ELSE IF sd.hasBlock AND sd.count>0 THEN { char _ sd.block[sd.startIndex]; sd.startIndex _ sd.startIndex+1; sd.count _ sd.count-1; IF sd.count=0 THEN sd.hasBlock_FALSE; } ELSE ERROR; BROADCAST sd.nonFull END; STPsEndOf: ENTRY PROC [ self: IO.STREAM] RETURNS [ BOOL ] = BEGIN ENABLE UNWIND => NULL; sd: STPStreamDataRef = NARROW[self.streamData]; RETURN[sd.num<=0 AND sd.eof] END; UsersCloseSTP: ENTRY PROC [ self: IO.STREAM, abort: BOOL ] = BEGIN ENABLE UNWIND => NULL; <> <> sd: STPStreamDataRef = NARROW[self.streamData]; sd.eof _ TRUE; END; END.