-- NetworkStreamInstance.mesa (last edited by: BLyon on: March 21, 1981 12:33 PM) -- Function: The implementation module for an instance of the Network Stream front end. DIRECTORY BufferDefs USING [OisBuffer], ByteBlt USING [ByteBlt], Environment USING [Byte], NetworkStreamInternal USING [ControlObject], OISCP USING [ GetOisPacketTextLength, ReturnFreeOisBuffer, SetOisPacketTextLength], OISCPTypes USING [bytesPerLevel2SppHeader], PacketStream USING [ ConnectionSuspended, Destroy, Get, GetSendSppBuffer, Handle, Put, ReturnGetSppDataBuffer, WaitForAttention], Runtime USING [SelfDestruct], Stream USING [ Byte, Word, Handle, Block, CompletionCode, defaultInputOptions, InputOptions, LongBlock, Object, ShortBlock, SSTChange, SubSequenceType, TimeOut]; NetworkStreamInstance: MONITOR [psH: PacketStream.Handle] RETURNS [Stream.Handle] IMPORTS ByteBlt, OISCP, Stream, PacketStream, Runtime = BEGIN -- debugging (only reason for MONITOR) sanityChecking: BOOLEAN = FALSE; sendInProgress: BOOLEAN ← FALSE; TestAndSetSendInProgress: ENTRY PROCEDURE = INLINE BEGIN IF sendInProgress THEN ERROR; sendInProgress ← TRUE; END; TestAndResetSendInProgress: ENTRY PROCEDURE = INLINE BEGIN IF NOT sendInProgress THEN ERROR; sendInProgress ← FALSE; END; -- the vector of procedures for operating on, and controlling the network stream controlObject: NetworkStreamInternal.ControlObject ← [ -- the vector of procedures as per the standard Pilot Stream interface streamObject: Stream.Object[ options: Stream.defaultInputOptions, getByte: GetByte, putByte: PutByte, getWord: GetWord, putWord: PutWord, get: GetBlock, put: PutBlock, setSST: SetSST, sendAttention: SendAttention, waitAttention: WaitAttention, delete: Delete], -- handle for the packet stream psH: psH]; LeftAndRight: TYPE = MACHINE DEPENDENT RECORD [left, right: Environment.Byte]; -- A client will typicaly have three processes accessing this module. The first -- receives data, the second waits for attentions, and the third transmits data, sends -- attentions and changes the subsequence type. As a consequence it is not necessary -- for this module to be a monitor, since there is no interaction between the three -- processes in this module. The interaction occurs in the PktStreamInstance module, -- which is a monitor. Multiple client processes must not perform data transfer in one -- direction; the result is unpredicatble. Care should be taken when deleting the stream -- and therefore this module. -- input inputBuffer: BufferDefs.OisBuffer ← NIL; inputFinger: CARDINAL; inputSST: Stream.SubSequenceType ← 0; -- output outputBuffer: BufferDefs.OisBuffer ← NIL; outputFinger: CARDINAL; outputSST: Stream.SubSequenceType ← 0; outputSSTSent: BOOLEAN ← TRUE; outputBufferSize: CARDINAL ← 0; -- Hot Procedures GetByte: PROCEDURE [sH: Stream.Handle] RETURNS [byte: Stream.Byte] = BEGIN IF inputBuffer # NIL AND inputFinger + 2 < GetSppDataLength[inputBuffer] THEN BEGIN -- "+2" lets GetBlock give back the buffer if we take the last byte byte ← inputBuffer.ois.sppBytes[inputFinger]; inputFinger ← inputFinger + 1; RETURN; END ELSE BEGIN array: PACKED ARRAY [0..1] OF Stream.Byte; [] ← sH.get[sH, [@array, 0, 1], [FALSE, FALSE, FALSE, TRUE, TRUE]]; RETURN[array[0]]; END; END; GetWord: PROCEDURE [sH: Stream.Handle] RETURNS [word: Stream.Word] = BEGIN OPEN w: LOOPHOLE[word, LeftAndRight]; w.left ← GetByte[sH]; w.right ← GetByte[sH]; END; -- This procedure fills a client's block with data from an incoming packet GetBlock: PROCEDURE [ sH: Stream.Handle, block: Stream.Block, options: Stream.InputOptions] RETURNS [ bytesTransferred: CARDINAL, why: Stream.CompletionCode, sst: Stream.SubSequenceType] = -- block has been passed by value, so we are upadting our copy, not the clients BEGIN input: Stream.Block; moved: CARDINAL; endOfMessageArrived: BOOLEAN ← FALSE; bytesTransferred ← 0; why ← normal; sst ← inputSST; WHILE block.startIndex < block.stopIndexPlusOne DO UNTIL inputBuffer # NIL DO inputFinger ← 0; inputBuffer ← PacketStream.Get[psH]; IF inputBuffer = NIL THEN SIGNAL Stream.TimeOut[block.startIndex] ELSE BEGIN sst ← inputBuffer.ois.subtype; IF inputSST # sst THEN BEGIN inputSST ← sst; IF options.signalSSTChange THEN SIGNAL Stream.SSTChange[inputSST, block.startIndex] ELSE BEGIN why ← sstChange; RETURN; END; END; END; ENDLOOP; input ← [blockPointer: @inputBuffer.ois.sppBytes, startIndex: inputFinger, stopIndexPlusOne: GetSppDataLength[inputBuffer]]; moved ← ByteBlt.ByteBlt[block, input]; bytesTransferred ← bytesTransferred + moved; block.startIndex ← block.startIndex + moved; inputFinger ← inputFinger + moved; -- if the packet buffer is empty return it IF inputFinger = input.stopIndexPlusOne THEN BEGIN -- exhausted the packet contents endOfMessageArrived ← inputBuffer.ois.endOfMessage; PacketStream.ReturnGetSppDataBuffer[psH, inputBuffer]; inputBuffer ← NIL; END; -- if there is no packet buffer and the block is still empty maybe signal IF inputBuffer = NIL AND block.startIndex < block.stopIndexPlusOne AND options.signalLongBlock THEN BEGIN SIGNAL Stream.LongBlock[block.startIndex]; END; -- exit if client wants to terminate when endOfMessage bit on IF endOfMessageArrived AND options.terminateOnEndPhysicalRecord THEN BEGIN why ← endRecord; EXIT; END; ENDLOOP; -- if there is data in the packet buffer then the block was short IF inputBuffer # NIL AND options.signalShortBlock THEN BEGIN ERROR Stream.ShortBlock; END; END; -- GetBlock -- The strategy on the transmission side, is to allocate a buffer only when there is -- data to be copied into the buffer, or if an empty packet must be transmitted. -- State information is kept around for things like whether a new SST has been sent -- to the other end or not, incase it is changed without sending any intervening data. -- This procedure sends a client's block of data in one or more packets. -- Since SendNow isn't a procedure all by itself, we try to know when the client -- did a SendNow, so that we can ask the other end for an ack. If we have no -- buffer, we send a data packet with zero data. PutBlock: PROCEDURE [ sH: Stream.Handle, block: Stream.Block, endPhysicalRecord: BOOLEAN] = -- block has been passed by value, so we are updating our copy, not the clients BEGIN sendNowBlock: Stream.Block = [NIL, 0, 0]; output: Stream.Block; moved: CARDINAL; IF sanityChecking THEN TestAndSetSendInProgress[]; IF (block = sendNowBlock AND endPhysicalRecord) THEN BEGIN -- this must be a SendNow operation, or something like it sigh... SendNow[]; IF sanityChecking THEN TestAndResetSendInProgress[]; RETURN; END; -- see whether this is a no-op or not. IF (block.stopIndexPlusOne - block.startIndex) = 0 AND outputBuffer = NIL AND outputSSTSent THEN BEGIN IF sanityChecking THEN TestAndResetSendInProgress[]; RETURN; END; IF outputBufferSize = 0 THEN outputBufferSize ← psH.getSenderSizeLimit[]; WHILE block.startIndex < block.stopIndexPlusOne DO IF outputBuffer = NIL THEN BEGIN outputBuffer ← PacketStream.GetSendSppBuffer[psH]; outputBuffer.ois.sendAck ← FALSE; outputBuffer.ois.attention ← FALSE; outputFinger ← 0; END; output ← [blockPointer: @outputBuffer.ois.sppBytes, startIndex: outputFinger, stopIndexPlusOne: outputBufferSize]; moved ← ByteBlt.ByteBlt[output, block]; block.startIndex ← block.startIndex + moved; outputFinger ← outputFinger + moved; IF outputFinger = outputBufferSize THEN BEGIN outputBuffer.ois.endOfMessage ← endPhysicalRecord AND block.startIndex >= block.stopIndexPlusOne; FlushOutputBuffer[]; END; ENDLOOP; IF sanityChecking THEN TestAndResetSendInProgress[]; END; -- PutBlock PutByte: PROCEDURE [sH: Stream.Handle, byte: Stream.Byte] = BEGIN IF sanityChecking THEN TestAndSetSendInProgress[]; IF outputBuffer # NIL AND outputFinger + 2 < outputBufferSize THEN BEGIN -- "+2" lets PutBlock flush the buffer if we fill the last byte outputBuffer.ois.sppBytes[outputFinger] ← byte; outputFinger ← outputFinger + 1; IF sanityChecking THEN TestAndResetSendInProgress[]; END ELSE BEGIN array: PACKED ARRAY [0..1] OF Stream.Byte ← [byte, ]; IF sanityChecking THEN TestAndResetSendInProgress[]; PutBlock[sH, [@array, 0, 1], FALSE]; END; END; PutWord: PROCEDURE [sH: Stream.Handle, word: Stream.Word] = BEGIN OPEN w: LOOPHOLE[word, LeftAndRight]; sH.putByte[sH, w.left]; sH.putByte[sH, w.right]; END; SendNow: PROCEDURE = BEGIN IF outputBuffer = NIL THEN BEGIN outputBuffer ← PacketStream.GetSendSppBuffer[psH]; outputBuffer.ois.attention ← FALSE; outputFinger ← 0; END; outputBuffer.ois.sendAck ← TRUE; outputBuffer.ois.endOfMessage ← TRUE; FlushOutputBuffer[]; END; -- SendNow -- This procedure sets the SST to the specified value and has some side effects. -- We assume that the SST is initially = 0, and the first change causes no empty packet -- to be sent SetSST: PROCEDURE [sH: Stream.Handle, sst: Stream.SubSequenceType] = BEGIN IF sanityChecking THEN TestAndSetSendInProgress[]; IF sst # outputSST THEN BEGIN FlushOutputBuffer[]; -- flush the last buffer if there was one IF NOT outputSSTSent THEN -- there was no buffer to flush for the old SST so send an empty packet BEGIN outputBuffer ← PacketStream.GetSendSppBuffer[psH]; outputBuffer.ois.sendAck ← FALSE; outputBuffer.ois.attention ← FALSE; outputFinger ← 0; FlushOutputBuffer[]; END; -- remember the new SST outputSST ← sst; outputSSTSent ← FALSE END; IF sanityChecking THEN TestAndResetSendInProgress[]; END; -- SetSSt -- This procedure sends one byte of data in a packet with the attention bit set. SendAttention: PROCEDURE [sH: Stream.Handle, byte: Stream.Byte] = BEGIN IF sanityChecking THEN TestAndSetSendInProgress[]; FlushOutputBuffer[]; -- flush the last buffer if there was one outputBuffer ← PacketStream.GetSendSppBuffer[psH]; outputBuffer.ois.sppBytes[0] ← byte; outputFinger ← 1; outputBuffer.ois.sendAck ← FALSE; outputBuffer.ois.attention ← TRUE; FlushOutputBuffer[]; IF sanityChecking THEN TestAndResetSendInProgress[]; END; -- SendAttention -- This procedure waits indefinately until an attention arrives, or an ERROR is raised. WaitAttention: PROCEDURE [sH: Stream.Handle] RETURNS [byte: Stream.Byte] = BEGIN b: BufferDefs.OisBuffer; DO b ← PacketStream.WaitForAttention[psH]; -- Get the first data byte, if there is one, otherwise discard this attention packet. -- Discarding the attention packet upsets the timeout mechanism, but then this is -- a situation that is not supposed to happen, and so we pay the price. IF GetSppDataLength[b] # 0 THEN BEGIN byte ← b.ois.sppBytes[0]; OISCP.ReturnFreeOisBuffer[b]; EXIT; END ELSE OISCP.ReturnFreeOisBuffer[b]; ENDLOOP; END; -- WaitAttention -- This procedure flushes (i.e. sends out) the outputBuffer if there is one. FlushOutputBuffer: PROCEDURE = BEGIN b: BufferDefs.OisBuffer; -- don't leave outputBuffer dangling in case of Stream deletion IF outputBuffer = NIL THEN RETURN; b ← outputBuffer; outputBuffer ← NIL; SetSppDataLength[b, outputFinger]; b.ois.subtype ← outputSST; outputSSTSent ← TRUE; -- now put the buffer to the packet stream. -- if the stream is PacketStream.ConnectionSuspended, the buffer is returned and the -- data it contained is lost. This is ok since no more useful can be done with this -- stream anyway. NOTE that the signal is allowed to continue down the stack s.t. -- the client can catch it also. PacketStream.Put[psH, b ! PacketStream.ConnectionSuspended => BEGIN OISCP.ReturnFreeOisBuffer[b]; IF sanityChecking THEN TestAndResetSendInProgress[]; END ]; END; -- FlushOutputBuffer -- This procedure sets the length of the sequenced packet given the length of data. SetSppDataLength: PROCEDURE [b: BufferDefs.OisBuffer, length: CARDINAL] = INLINE BEGIN OISCP.SetOisPacketTextLength[ b, length + OISCPTypes.bytesPerLevel2SppHeader]; END; -- SetSppDataLength -- This procedure returns the amount of data in the sequenced packet. GetSppDataLength: PROCEDURE [b: BufferDefs.OisBuffer] RETURNS [CARDINAL] = INLINE BEGIN RETURN[ OISCP.GetOisPacketTextLength[b] - OISCPTypes.bytesPerLevel2SppHeader]; END; -- GetSppDataLength -- Cool Procedures -- This procedure is instrumental in deleting this transducer. Delete: PUBLIC PROCEDURE [sH: Stream.Handle] = BEGIN IF sanityChecking THEN TestAndSetSendInProgress[]; controlObject.streamObject ← [options: controlObject.streamObject.options, getByte: NIL, putByte: NIL, getWord: NIL, putWord: NIL, get: NIL, put: NIL, setSST: NIL, sendAttention: NIL, waitAttention: NIL, delete: NIL]; IF inputBuffer # NIL THEN PacketStream.ReturnGetSppDataBuffer[psH, inputBuffer]; IF outputBuffer # NIL THEN OISCP.ReturnFreeOisBuffer[outputBuffer]; PacketStream.Destroy[psH]; Runtime.SelfDestruct[]; -- no resetting of send sanity check here! END; -- Delete -- initialization (Cool) RETURN[@controlObject.streamObject]; END. -- of NetworkStreamInstance module LOG Time: May 26, 1978 11:22 AM By: Dalal Action: created file. Time: November 9, 1978 9:07 AM By: Dalal Action: modified GetBlock and PutBlock. Time: March 13, 1979 6:03 PM By: Dalal Action: modified SendAttention and WaitAttention. Time: August 31, 1979 12:39 PM By: Dalal Action: made two INLINEs. Time: January 31, 1980 4:33 AM By: Forrest Action: Added Mandatory fields to Stream.Object, using Stream Defaults. Time: July 8, 1980 5:07 PM By: BLyon Action: removed default procs and replaced with GetByte, GetWord, PutByte, PutWord. Time: August 11, 1980 4:57 PM By: BLyon Action: Changed ByteBltDefs to ByteBlt Time: September 26, 1980 2:19 PM By: Garlick Action: In GetBlock, initialized sst to inputSST so it always gets returned with proper value. Time: January 23, 1981 1:41 PM By: Garlick Action: Added setting of the EndOfMessage bit in the packet when endPhysicalRecord set in PutBlock. Added interpretation of endPhysicalRecord (as propogated from the sender) in GetBlock. Also made WaitAttention generate the resumeable Stream.Timeout. Time: March 18, 1981 3:47 PM By: BLyon Action: Put NIL's in Delete