-- File: NetworkStreamImpl.mesa - last edit: -- AOF 9-Sep-87 8:26:44 -- SMA 9-Oct-86 9:30:26 -- MXI 3-Nov-87 11:50:45 -- Copyright (C) 1984, 1985, 1986, 1987 by Xerox Corporation. All rights reserved. -- Function: The implementation module for an instance of a SPP packet stream. DIRECTORY Buffer, ByteBlt USING [ByteBlt], NSBuffer USING [AccessHandle, Body, GetBuffer, Buffer, ReturnBuffer, Queue], Checksums USING [SetChecksum], CommFlags USING [doDebug, doStats], CommPriorities USING [receiver], CommUtil USING [PulsesToTicks], Driver USING [GetDeviceChain, Glitch, Device], Environment USING [Byte], NetworkStream USING [], NetworkStreamInternal USING [ControlObject, State], NSTypes USING [ bytesPerIDPHeader, bytesPerSppHeader, ConnectionID, WaitTime], PacketStream USING [ bytesPerSequencedPktHeader, ClassOfService, ConnectionFailed, unknownConnID, ConnectionSuspended, FailureReason, Handle, Object, State, SuspendReason], Process USING [ Abort, CancelAbort, DisableTimeout, EnableAborts, GetCurrent, MsecToTicks, Pause, SetPriority, SetTimeout], Protocol1 USING [EncapsulateAndTransmit, GetContext], Router USING [GetDelayToNet, NoTableEntryForNet], RouterInternal USING [FlushCacheEntry, SendErrorPacket], RoutingTable USING [NetworkContext], Socket USING [ ChannelHandle, Delete, GetPacket, SetPacketBytes, GetPacketBytes], SppNegotiation USING [InitializeSppState, NewInitializeSppState, NewStartPacketStreamProc], SppOps USING [ BufferTable, BufferTableObject, AttentionTableObject, nullAttention, StartByteStreamProc, StopByteStreamProc, StartPacketStreamProc, StopPacketStreamProc, PSProc, PSProcess], Stats USING [StatBump, StatIncr], Stream USING [ Byte, Word, Handle, Block, CompletionCode, InputOptions, LongBlock, Object, ShortBlock, SSTChange, SubSequenceType, TimeOut, Attention, defaultObject, GetByteProcedure, PutByteProcedure, GetWordProcedure, PutWordProcedure, GetProcedure, PutProcedure, SetSSTProcedure, GetSSTProcedure, SendAttentionProcedure, WaitAttentionProcedure, SendNowProcedure, SetTimeoutProcedure, GetTimeoutProcedure, EndRecord], System USING [ GetClockPulses, MicrosecondsToPulses, NetworkAddress, Pulses]; NetworkStreamImpl: MONITOR IMPORTS NSBuffer, ByteBlt, Checksums, CommUtil, Driver, PacketStream, Process, Protocol1, Router, RouterInternal, Socket, SppNegotiation, Stream, Stats, System EXPORTS Buffer, NetworkStream, SppOps = BEGIN Device: PUBLIC TYPE = Driver.Device; ConnectionID: PUBLIC TYPE = NSTypes.ConnectionID; attn: PUBLIC RECORD [table: SppOps.AttentionTableObject]; --connection control parameters connection: PUBLIC RECORD [ state: NetworkStreamInternal.State, --connection state socket: Socket.ChannelHandle, --the socket supporting the stream pool: NSBuffer.AccessHandle, --socket's supporting pool maxSppBytes: CARDINAL, --of packets supported by this stream hops: CARDINAL, --to the remote station localAddr, remoteAddr: System.NetworkAddress, localID, remoteID: NSTypes.ConnectionID, timeout: NSTypes.WaitTime, --specified by the client (Create|SetWaitTime) whyFailed: PacketStream.FailureReason, --connection failures whySuspended: PacketStream.SuspendReason, --after connection stateBeforeSuspension: NetworkStreamInternal.State]; --connection state --variables and parameters for transmission xmtr: PUBLIC RECORD [ unackedSeq: CARDINAL, --oldest packet in retrans table nextSeq: CARDINAL, --next sequence to use for transmission maxSeq: CARDINAL, --max sequence number we can send maxAlloc: CARDINAL, --upper limit on remote's window lastXmt: LONG CARDINAL, --last time we transmitted data interval: LONG CARDINAL, --minimum interval between data transmission blocked: BOOLEAN, --indicates local xmtr is blocked checksums: BOOLEAN, --are we using checksums on local net? rateControlling: BOOLEAN, --are we doing rate control stuff? quenchIncr: CARDINAL, --number of times we've doubled the interval newAllocation: CONDITION, --blocks client sending process clientProcess: PROCESS, --this is the client process transmitProc: PROC[Socket.ChannelHandle, NSBuffer.Buffer], context: RoutingTable.NetworkContext]; --used for expedited delivery xmtrDefaults: RECORD [ allocation, rateControlPause, delay1Hop, delayNHopT1, delayNHop64, delayNHop96, totalQuench: CARDINAL] = [ 1, 50, 50, 100, 120, 300, 16]; --variables and parameteres for receiving rcvr: PUBLIC RECORD [ --sequencing, duplicate suppression and flow control information waitTime: LONG CARDINAL, --milliseconds as requested by client timeout: LONG CARDINAL, --packet should arrive before timeout + interval interval: LONG CARDINAL, --amount of time willing to wait for next packet nextSeq: CARDINAL, --next sequence number expected acksReqested: BOOLEAN, --acks have been requested by remote spp maxSeq: CARDINAL, --max we expect (but will take one greater) maxAlloc: NATURAL, --used to determine window that we present blocked: BOOLEAN, --indicates remote sender is blocked table: SppOps.BufferTableObject, --input buffer table process: SppOps.PSProcess]; --receiver proces for joining in Delete rcvrDefaults: RECORD [accept, duplicate: INTEGER] = [2, 20]; --variables and parameters for retransmission --all times (LONG CARDINALs) are in pulses rexmtr: PUBLIC RECORD [ interval: LONG CARDINAL, --the retransmission interval (time in buffer.time) ceiling: LONG CARDINAL, --max allowed (see rexmtrDefaults.ceiling) floor: LONG CARDINAL, --min allowed (see rexmtrDefaults.floor) giveUp: CARDINAL, --number of times to retransmit before giving up count: CARDINAL, --number of data packets entered in delay calculation delay: LONG CARDINAL, --cumulative time that packets spent in output table calculation: LONG CARDINAL, --beginning of collection interval calculationInterval: LONG CARDINAL, --interval between calculations table: SppOps.BufferTableObject, --retransmitter buffer table process: SppOps.PSProcess, --needed to rejoin process during delete condition: CONDITION]; --block Retransmitter (based on rexmtr.interval) --default intervals are in milliseconds, others are packets rexmtrDefaults: RECORD [ flushRoute, giveUp, --numbers of packets initialLocal, initialRemote, ceiling, floor, calculationInterval: --msecs CARDINAL] = [8, 30, 500, 6000, 24000, 200, 5000]; --variables and parameters for probing --all times (LONG CARDINALs) are in pulses probe: PUBLIC RECORD [ lastTime: LONG CARDINAL, --probe again at "*" + interval inactiveInterval, allocInterval: LONG CARDINAL, --interval between probes unacked: CARDINAL, --number of unacked probes sent probing: BOOLEAN]; --are we probing at all? --intervals are in milliseconds, probes in packets probeDefaults: RECORD [ giveUp, inactiveInterval, allocInterval, hopDelay: CARDINAL] = [8, 5000, 2000, 2000]; packetStreamObject: PUBLIC PacketStream.Object; << Assumptions about buffers needed to support a stream (worst case) (These numbers are all biased by SppOps.sppWindowSize.) n are for sending data 1 for sending a system packet 2 for sending expedited packets n for receiving data 1 for receiving a system packet 2 for receiving an expedited packets 1 for extra receive packet >> streamParms: RECORD [send, receive, xmtrAlloc, rcvrAlloc: CARDINAL] = [ 3, 4, 0, 0]; --the following constant is used in setting wait times, etc maxMsecToPulses: LONG CARDINAL = LAST[LONG CARDINAL] / 1000; unknownCID: NSTypes.ConnectionID = PacketStream.unknownConnID; isEstablished: CONDITION; --we only need this at the very beginning startByteStream: PUBLIC SppOps.StartByteStreamProc; --for starting startPacketStream: PUBLIC SppOps.StartPacketStreamProc; --ditto stopByteStream: PUBLIC SppOps.StopByteStreamProc; --for stopping stopPacketStream: PUBLIC SppOps.StopPacketStreamProc; --ditto newStartPacketStream: PUBLIC SppNegotiation.NewStartPacketStreamProc; --for starting with negotiation --Glitchable signals TableScrambled: ERROR = CODE; StreamTerminating: ERROR = CODE; StreamNotEstablished: ERROR = CODE; MultipleBufferClients: ERROR = CODE; --local signals InvalidPacketSize: PUBLIC ERROR = CODE; --Used by procs that notify|wait without state checking required Notify: ENTRY PROC [c: LONG POINTER TO CONDITION] = INLINE {NOTIFY c↑}; Wait: ENTRY PROC [c: LONG POINTER TO CONDITION] = INLINE {ENABLE UNWIND => NULL; WAIT c↑}; --these modulo functions 'hide' the ugliness of 16 bit wrap around math -- a > b -- Greater: PROC [a, b: CARDINAL] RETURNS [BOOLEAN] = INLINE { RETURN[INTEGER[a - b] > 0]}; -- a < b -- Less: PROC [a, b: CARDINAL] RETURNS [BOOLEAN] = INLINE { RETURN[INTEGER[a - b] < 0]}; -- a < b -- a >= b -- GreaterOrEqual: PROC [a, b: CARDINAL] RETURNS [BOOLEAN] = INLINE { RETURN[INTEGER[a - b] >= 0]}; -- a <= b -- LessOrEqual: PROC [a, b: CARDINAL] RETURNS [BOOLEAN] = INLINE { RETURN[INTEGER[a - b] <= 0]}; Max: PROC [a, b: CARDINAL] RETURNS [CARDINAL] = INLINE { RETURN[IF INTEGER[a - b] > 0 THEN a ELSE b]}; Min: PROC [a, b: CARDINAL] RETURNS [CARDINAL] = INLINE { RETURN[IF INTEGER[a - b] < 0 THEN a ELSE b]}; --SANITY CHECKING AND DEBUGGING CODE fakeQueue: NSBuffer.Queue ← LOOPHOLE[NetworkStreamImpl]; ReturnBuffer: PROCEDURE [b: NSBuffer.Buffer] = {NSBuffer.ReturnBuffer[b]}; DebugInsertBuffer: INTERNAL PROC[b: NSBuffer.Buffer] = INLINE BEGIN IF b.fo.queue # NIL THEN Driver.Glitch[TableScrambled]; b.fo.queue ← fakeQueue; --it isn't a real queue, but it isn't NIL either END; --DebugInsertBuffer DebugExtractBuffer: INTERNAL PROC[b: NSBuffer.Buffer] = INLINE BEGIN IF b.fo.queue # fakeQueue THEN Driver.Glitch[TableScrambled]; b.fo.queue ← NIL; --now it's NIL END; --DebugExtractBuffer SanityCheck: ENTRY PROC[t: LONG POINTER TO SppOps.BufferTableObject] = BEGIN IF CommFlags.doDebug THEN BEGIN l: CARDINAL ← 0; b: NSBuffer.Buffer; FOR i: CARDINAL IN[0..LENGTH[t.slot]) DO SELECT TRUE FROM ((b ← t.slot[i]) = NIL) => NULL; --no action here (b.fo.queue # fakeQueue) => GOTO glitch; --not in my table (b.fo.function = receive) => l ← l + 1; --these use standard requeue (b.requeueProcedure # RequeueProcedure) => GOTO glitch; ENDCASE => l ← l + 1; --it's ok, just count it ENDLOOP; IF l # t.length THEN GOTO glitch; EXITS glitch => Driver.Glitch[TableScrambled]; END; END; --SanityCheck AcceptablePacket: PROC [b: NSBuffer.Buffer] = --PROCESS: RECEIVER BEGIN --acceptable, but may be early - be careful InsertRecvdPacketInTable: ENTRY PROC = BEGIN --no possibility of signals i: CARDINAL ← body.sequenceNumber MOD LENGTH[rcvr.table.slot]; --duplicate reception of early packet IF rcvr.table.slot[i] # NIL THEN {DuplicatePacket[b]; RETURN}; rcvr.table.slot[i] ← b; rcvr.table.length ← rcvr.table.length + 1; IF CommFlags.doDebug THEN DebugInsertBuffer[b]; IF body.sequenceNumber = rcvr.nextSeq THEN BEGIN --first loop of block is guaranteed - we just stuck in the buffer THROUGH [0..LENGTH[rcvr.table.slot]) DO --THIS BLOCK IS THE ONLY UPDATER OF rcvr.nextSeq!!! rcvr.nextSeq ← rcvr.nextSeq + 1; --one more in the sequence i ← (i + 1) MOD LENGTH[rcvr.table.slot]; --point to next slot in table IF rcvr.table.slot[i] = NIL THEN EXIT; --first empty slot stops search ENDLOOP; NOTIFY rcvr.table.condition; END; END; --InsertRecvdPacketInTable body: NSBuffer.Body = b.ns; IF body.destinationConnectionID = connection.localID THEN connection.state ← open; --he's seen us - we've seen him IF CommFlags.doStats AND body.sendAck THEN Stats.StatIncr[statAckRequestsReceived]; SELECT TRUE FROM (body.systemPacket) => BEGIN IF body.sendAck THEN rcvr.acksReqested ← TRUE; --ack sys packets direct IF CommFlags.doStats THEN Stats.StatIncr[statSystemPacketsReceived]; NSBuffer.ReturnBuffer[b]; --client never sees system packets END; (~body.attention) => BEGIN rcvr.blocked ← INTEGER[body.sequenceNumber - rcvr.maxSeq] >= 0; InsertRecvdPacketInTable[]; --packet MAY be saved in table END; (body.attention AND InsertAttention[b]) => InsertRecvdPacketInTable[]; --packet MAY be saved in table ENDCASE => NSBuffer.ReturnBuffer[b]; --duplicate attention packet END; --AcceptablePacket CleanupTable: ENTRY PROC [t: SppOps.BufferTable] = --PROCESS: RECEIVER (t = input) | RETRANSMITTER (t = output) BEGIN b: NSBuffer.Buffer; --No possibility of errors --Get all buffers out of this table FOR i: CARDINAL IN[0..LENGTH[t.slot]) DO IF (b ← t.slot[i]) # NIL THEN BEGIN t.slot[i] ← NIL; t.length ← t.length - 1; IF CommFlags.doDebug THEN DebugExtractBuffer[b]; NSBuffer.ReturnBuffer[b]; END; ENDLOOP; END; --CleanupTable DuplicatePacket: PROC [b: NSBuffer.Buffer] = --PROCESS: RECEIVER BEGIN <<IF b.ns.sendAck THEN>> rcvr.acksReqested ← TRUE; --always ack duplicates IF CommFlags.doStats THEN Stats.StatIncr[statDataPacketsReceivedAgain]; NSBuffer.ReturnBuffer[b]; END; --DuplicatePacket Establish: PROC [b: NSBuffer.Buffer] = --PROCESS: RECEIVER BEGIN body: NSBuffer.Body = b.ns; SELECT TRUE FROM (body.sequenceNumber # 0) => BEGIN IF CommFlags.doStats THEN Stats.StatIncr[statPacketsRejectedBadID]; NSBuffer.ReturnBuffer[b]; RETURN; END; (body.source.socket = connection.remoteAddr.socket) AND (body.destinationConnectionID = connection.localID) AND (body.sourceConnectionID # unknownCID) => NULL; (connection.state = activeEstablish) AND (body.source.socket # connection.remoteAddr.socket) AND (body.destinationConnectionID = connection.localID) AND (body.sourceConnectionID # unknownCID) => BEGIN connection.remoteAddr.socket ← body.source.socket; END; (body.source.socket = connection.remoteAddr.socket) AND (body.destinationConnectionID = unknownCID) AND (body.sourceConnectionID # unknownCID) => BEGIN IF connection.state = waitEstablish THEN rcvr.acksReqested ← TRUE; END; ENDCASE => BEGIN IF CommFlags.doStats THEN Stats.StatIncr[statPacketsRejectedBadID]; NSBuffer.ReturnBuffer[b]; RETURN; END; xmtr.maxSeq ← MIN[ body.allocationNumber, xmtr.unackedSeq + xmtr.maxAlloc]; connection.remoteID ← body.sourceConnectionID; connection.state ← established; --may be 'opened' by AcceptablePacket IF body.sendAck THEN rcvr.acksReqested ← TRUE; --need an ack on this one AcceptablePacket[b]; Notify[@isEstablished]; --notify any waiters END; --Establish ExtractAttention: ENTRY PROC RETURNS [byte: Environment.Byte] = --PROCESS: CLIENT WAIT FOR ATTENTION BEGIN ENABLE UNWIND => NULL; --Extract attention byte or wait for one to arrive i: CARDINAL; DO SELECT connection.state FROM established, open => BEGIN IF attn.table.length # 0 THEN BEGIN i ← attn.table.index MOD LENGTH[attn.table.slot]; attn.table.index ← attn.table.index + 1; IF attn.table.slot[i] = SppOps.nullAttention THEN LOOP; byte ← attn.table.slot[i].attn; attn.table.slot[i] ← SppOps.nullAttention; attn.table.length ← attn.table.length - 1; RETURN; END; END; suspended => RETURN WITH ERROR PacketStream.ConnectionSuspended[connection.whySuspended]; unestablished, activeEstablish, waitEstablish => IF CommFlags.doDebug THEN Driver.Glitch[StreamNotEstablished]; ENDCASE; WAIT attn.table.condition; ENDLOOP; END; --ExtractAttention ExtractRexmt: ENTRY PROC RETURNS [b: NSBuffer.Buffer] = --PROCESS: RETRANSMITTER BEGIN --No possibility of signals --extracts Buffer from table, doesn't consume i: CARDINAL = rexmtr.table.index MOD LENGTH[rexmtr.table.slot]; IF (b ← rexmtr.table.slot[i]) # NIL THEN BEGIN rexmtr.table.slot[i] ← NIL; rexmtr.table.length ← rexmtr.table.length - 1; IF CommFlags.doDebug THEN BEGIN IF (rexmtr.table.index # b.ns.sequenceNumber) THEN Driver.Glitch[TableScrambled]; DebugExtractBuffer[b]; END; END ELSE {xmtr.blocked ← FALSE; NOTIFY xmtr.newAllocation}; END; --ExtractRexmt FindAddresses: PROC RETURNS [local, remote: System.NetworkAddress] = {RETURN[connection.localAddr, connection.remoteAddr]}; GetForUser: ENTRY PROC RETURNS [b: NSBuffer.Buffer] = --PROCESS: CLIENT RECEIVING BEGIN ENABLE UNWIND => NULL; i: CARDINAL = rcvr.table.index MOD LENGTH[rcvr.table.slot]; rcvr.timeout ← System.GetClockPulses[]; DO --either EXITs or ERRORs out of loop IF (b ← rcvr.table.slot[i]) # NIL THEN BEGIN IF CommFlags.doDebug THEN BEGIN IF (rcvr.table.index # b.ns.sequenceNumber) THEN Driver.Glitch[TableScrambled]; DebugExtractBuffer[b]; END; rcvr.table.slot[i] ← NIL; rcvr.table.length ← rcvr.table.length - 1; rcvr.table.index ← rcvr.table.index + 1; IF CommFlags.doStats THEN BEGIN Stats.StatIncr[statDataPacketsReceived]; Stats.StatBump[statDataBytesReceived, b.ns.pktLength - PacketStream.bytesPerSequencedPktHeader]; END; --ONLY UPDATER OF rcvr.maxSeq!!! --don't take away previously allocated window rcvr.maxSeq ← Max[rcvr.table.index + rcvr.maxAlloc, rcvr.maxSeq]; IF (rcvr.table.length = 0) AND (rcvr.blocked OR b.ns.sendAck) THEN SystemPacket[FALSE]; EXIT; --RETURN[b # NIL] END; SELECT connection.state FROM established, open => BEGIN time: LONG CARDINAL = System.GetClockPulses[]; SELECT TRUE FROM (rcvr.waitTime = 0) => EXIT; --RETURN[b = NIL], didn't want to wait ((time - rcvr.timeout) > rcvr.interval) => BEGIN IF CommFlags.doDebug AND (rcvr.interval = LAST[LONG CARDINAL]) THEN Driver.Glitch[StreamTerminating]; EXIT; --RETURN[b = NIL], this is a timeout END; ENDCASE => WAIT rcvr.table.condition; --wait for packet, LOOP END; suspended => RETURN WITH ERROR PacketStream.ConnectionSuspended[connection.whySuspended]; terminating => IF CommFlags.doDebug THEN Driver.Glitch[StreamTerminating]; unestablished, activeEstablish, waitEstablish => IF CommFlags.doDebug THEN Driver.Glitch[StreamNotEstablished]; ENDCASE; ENDLOOP; END; --GetForUser GetOutputBuffer: PROC RETURNS[b: NSBuffer.Buffer] = BEGIN ClearClientProcess: ENTRY PROC = INLINE {xmtr.clientProcess ← NIL}; EnterOperation: ENTRY PROC RETURNS[suspended: BOOLEAN] = INLINE BEGIN SELECT TRUE FROM (CommFlags.doDebug AND (xmtr.clientProcess # NIL)) => Driver.Glitch[MultipleBufferClients]; (connection.state = suspended) => RETURN[TRUE]; ENDCASE => xmtr.clientProcess ← Process.GetCurrent[]; RETURN[FALSE]; END; --EnterOperation ExitOperation: ENTRY PROC = INLINE BEGIN SELECT TRUE FROM (xmtr.clientProcess = NIL) => NULL; --means we had multiple clients!?? (connection.state = suspended) => --we're suspended, we did the abort? {Process.CancelAbort[xmtr.clientProcess]; xmtr.clientProcess ← NIL}; ENDCASE => xmtr.clientProcess ← NIL; END; --ExitOperation BEGIN --protect only the call into the buffer manager IF EnterOperation[].suspended THEN GOTO returnSuspended; b ← NSBuffer.GetBuffer[aH: connection.pool, function: send, size: connection.maxSppBytes + NSTypes.bytesPerSppHeader + NSTypes.bytesPerIDPHeader ! UNWIND => ClearClientProcess[]; ABORTED => IF connection.state = suspended THEN GOTO suspended]; EXITS suspended => {ClearClientProcess[]; GOTO returnSuspended}; END; --protect only the call into the buffer manager ExitOperation[]; --just to cleanup state appropriately IF connection.state = suspended THEN {NSBuffer.ReturnBuffer[b]; GOTO returnSuspended}; b.ns.attention ← b.ns.endOfMessage ← b.ns.systemPacket ← FALSE; EXITS returnSuspended => RETURN WITH ERROR PacketStream.ConnectionSuspended[connection.whySuspended]; END; --GetOutputBuffer GetSenderSizeLimit: PROC RETURNS [CARDINAL] = {RETURN[connection.maxSppBytes]}; GetState: PROC RETURNS[PacketStream.State ← inactive] = BEGIN SELECT connection.state FROM suspended => NULL; established, open => RETURN[active]; terminating => Driver.Glitch[StreamTerminating]; ENDCASE => Driver.Glitch[StreamNotEstablished]; END; --GetState GetWaitTime: PROC RETURNS [time: NSTypes.WaitTime] = {time ← rcvr.waitTime}; InitializePSObject: PROC = BEGIN packetStreamObject ← [ destroy: NIL, --done from the outside(Mgr) put: TakeFromUser, get: GetForUser, waitForAttention: ExtractAttention, setWaitTime: SetWaitTime, findAddresses: FindAddresses, getSenderSizeLimit: GetSenderSizeLimit, setSenderSizeLimit: SetSenderSizeLimit, getSendBuffer: GetOutputBuffer, returnSendBuffer: ReturnBuffer, returnReceiveBuffer: ReturnBuffer, getState: GetState, setState: SetState]; END; InsertAttention: ENTRY PROC [b: NSBuffer.Buffer] RETURNS [BOOLEAN] = --PROCESS: RECEIVER BEGIN --No possibility of signals --Insert the attention byte into the table i: CARDINAL ← b.ns.sequenceNumber; IF CommFlags.doStats THEN Stats.StatIncr[statAttentionsReceived]; IF attn.table.length = 0 THEN attn.table.index ← i; i ← i MOD LENGTH[attn.table.slot]; IF attn.table.slot[i] # SppOps.nullAttention THEN RETURN[FALSE]; attn.table.slot[i] ← [ sequence: b.ns.sequenceNumber, mask: 377B, attn: b.ns.sppBytes[0]]; attn.table.length ← attn.table.length + 1; NOTIFY attn.table.condition; RETURN[TRUE]; END; --InsertAttention InsertRexmtOrConsume: PROC [b: NSBuffer.Buffer] = --PROCESS: RETRANSMITTER BEGIN InsertBuffer: ENTRY PROC[] = INLINE BEGIN rexmtr.table.slot[i] ← b; --return the buffer to the table rexmtr.table.length ← SUCC[rexmtr.table.length]; --adjust the length IF CommFlags.doDebug THEN DebugInsertBuffer[b]; --sanity check END; --InsertBuffer i: CARDINAL = b.ns.sequenceNumber MOD LENGTH[rexmtr.table.slot]; SELECT TRUE FROM (CommFlags.doDebug AND (rexmtr.table.slot[i] # NIL)) => Driver.Glitch[TableScrambled]; (connection.state = terminating) => NSBuffer.ReturnBuffer[b]; --daid stream (INTEGER[xmtr.unackedSeq - b.ns.sequenceNumber] > 0) => --already ack'd BEGIN rexmtr.delay ← rexmtr.delay + (System.GetClockPulses[] - b.fo.time); rexmtr.count ← SUCC[rexmtr.count]; --and the number of participants rexmtr.table.index ← SUCC[rexmtr.table.index]; --consume packet NSBuffer.ReturnBuffer[b]; --free the buffer IF rexmtr.table.length > 0 THEN Notify[@rexmtr.condition]; --on to next END; ENDCASE => InsertBuffer[]; --not finished with buffer END; --InsertRexmtOrConsume ProcessSppState: PROC [b: NSBuffer.Buffer] = --PROCESS: RECEIVER BEGIN bPrime: NSBuffer.Buffer; body: NSBuffer.Body = b.ns; unacked: CARDINAL ← body.acknowledgeNumber; IF INTEGER[body.acknowledgeNumber - xmtr.unackedSeq] > 0 THEN BEGIN --new acknowledge field xmtr.unackedSeq ← body.acknowledgeNumber; --copy new ack number UNTIL (bPrime ← ExtractRexmt[]) = NIL DO --all packets up to but not including 'xmtr.unackedSeq' are acked unacked: CARDINAL ← bPrime.ns.sequenceNumber; InsertRexmtOrConsume[bPrime]; --and put it back | consume it IF LessOrEqual[xmtr.unackedSeq, unacked] THEN EXIT; ENDLOOP; END; --new acknowlege field IF Greater[body.allocationNumber, xmtr.maxSeq] THEN BEGIN --new allocation from remote xmtr.maxSeq ← Min[xmtr.unackedSeq + xmtr.maxAlloc, body.allocationNumber]; Notify[@xmtr.newAllocation]; END; --new allocation from remote probe.unacked ← 0; --other end is alive probe.lastTime ← System.GetClockPulses[]; --to probe for whatever END; --ProcessSppState RcvdErrorPacket: PUBLIC PROCEDURE [b: NSBuffer.Buffer] = --PROCESS: RECEIVER BEGIN IF CommFlags.doStats THEN Stats.StatIncr[statErrorPacketsReceived]; SELECT connection.state FROM IN[unestablished..waitEstablish] => BEGIN SELECT b.ns.errorType FROM noSocket => <<no listener|transducer>> SuspendStream[noRouteToDestination, noServiceAtDestination]; resourceLimits, listenerReject, connectionLimit => <<rejected by listener client>> SuspendStream[noRouteToDestination, remoteReject]; <<excessHops|cantGetThere => NULL;>> ENDCASE; END; established, open => SELECT b.ns.errorType FROM noSocket => <<socket went away (deleted by client)>> SuspendStream[remoteServiceDisappeared, remoteReject]; congestionWarning, congestionDiscard => IF xmtr.rateControlling THEN SetQuencing[up]; ENDCASE; <<rejected|excessHops|cantGetThere => NULL;>> ENDCASE; <<terminating|suspended => NULL>> END; --RcvdErrorPacket RcvdSppPacket: PUBLIC PROC [b: NSBuffer.Buffer] = --PROCESS: RECEIVER BEGIN body: NSBuffer.Body = b.ns; SELECT connection.state FROM established, open => BEGIN SELECT TRUE FROM (body.source.socket # connection.remoteAddr.socket) => BEGIN --out of the blue came..... IF CommFlags.doDebug THEN Stats.StatIncr[statPacketsRejectedBadSource]; NSBuffer.ReturnBuffer[b]; END; (body.sourceConnectionID = connection.remoteID) AND (body.destinationConnectionID = connection.localID) => BEGIN --really is for this connection normalizedNext: INTEGER = INTEGER[rcvr.nextSeq - rcvr.maxSeq]; normalizedSeq: INTEGER = INTEGER[body.sequenceNumber - rcvr.maxSeq]; SELECT normalizedSeq FROM > rcvrDefaults.accept => BEGIN --this is a very early packet - just ignore it IF CommFlags.doStats THEN Stats.StatIncr[statDataPacketsReceivedEarly]; NSBuffer.ReturnBuffer[b]; END; < normalizedNext => IF normalizedSeq < normalizedNext - rcvrDefaults.duplicate THEN BEGIN --this is a very late packet - just ignore it IF CommFlags.doStats THEN Stats.StatIncr[statDataPacketsReceivedVeryLate]; NSBuffer.ReturnBuffer[b]; END --duplicate, but worth looking at state, acking if sender wishes ELSE {ProcessSppState[b]; DuplicatePacket[b]}; --the only case that's really productive ENDCASE => {ProcessSppState[b]; AcceptablePacket[b]}; END; (body.sourceConnectionID = connection.remoteID) OR (body.destinationConnectionID = connection.localID) => BEGIN rcvr.acksReqested ← TRUE; --hasn't seen our id yet? NSBuffer.ReturnBuffer[b]; --don't accept buffer though END; ENDCASE => BEGIN IF CommFlags.doDebug THEN Stats.StatIncr[statPacketsRejectedBadID]; NSBuffer.ReturnBuffer[b]; --just some trash END; END; activeEstablish, waitEstablish => Establish[b]; --suspended, teminating, unestablished => NSBuffer.ReturnBuffer[b];-- ENDCASE => NSBuffer.ReturnBuffer[b]; END; --RcvdSppPacket Receiver: PROC RETURNS [SppOps.PSProc] = --PROCESS: THIS IS THE RECEIVER BEGIN b: NSBuffer.Buffer; body: NSBuffer.Body; Process.SetPriority[CommPriorities.receiver]; <<UNTIL ABORTED>> DO ENABLE ABORTED => EXIT; --get next packet from socket queue b ← Socket.GetPacket[connection.socket]; body ← b.ns; IF CommFlags.doDebug THEN SanityCheck[@rcvr.table]; SELECT TRUE FROM body.source.host # connection.remoteAddr.host => BEGIN IF CommFlags.doStats THEN Stats.StatIncr[statPacketsRejectedBadSource]; NSBuffer.ReturnBuffer[b]; END; (body.packetType = sequencedPacket) AND (body.pktLength >= PacketStream.bytesPerSequencedPktHeader) => BEGIN RcvdSppPacket[b]; IF rcvr.acksReqested THEN SystemPacket[FALSE]; END; body.packetType = error => BEGIN RcvdErrorPacket[b]; NSBuffer.ReturnBuffer[b]; END; ENDCASE => BEGIN IF CommFlags.doStats THEN Stats.StatIncr[statPacketsRejectedBadType]; RouterInternal.SendErrorPacket[b, invalidPacketType]; END; IF CommFlags.doDebug THEN SanityCheck[@rcvr.table]; ENDLOOP; RETURN[LOOPHOLE[Receiver]]; END; --Receiver RequeueProcedure: PROC [b: NSBuffer.Buffer] = --PROCESS: BUFFER MANAGER BEGIN IF (b.fo.status ~IN[pending..aborted]) THEN SuspendStream[noRouteToDestination, SELECT b.fo.status FROM noRouteToNetwork => noRouteToDestination, noTranslationForDestination => noTranslationForDestination, noAnswerOrBusy => noAnswerOrBusy, circuitInUse => circuitInUse, circuitNotReady => circuitNotReady, dialerHardwareProblem => circuitNotReady, noDialingHardware => noDialingHardware, invalidDestAddr => noServiceAtDestination, ENDCASE => noRouteToDestination]; --hardware problem-- IF b.ns.systemPacket THEN NSBuffer.ReturnBuffer[b] --don't reQ sys packets ELSE InsertRexmtOrConsume[b]; --put the buffer back in the table IF CommFlags.doDebug THEN SanityCheck[@rexmtr.table]; END; --RequeueProcedure Retransmitter: PROC RETURNS [SppOps.PSProc] = --PROCESS: THIS IS THE RETRANSMITTER BEGIN b: NSBuffer.Buffer; body: NSBuffer.Body; <<UNTIL ABORTED>> DO ENABLE ABORTED => EXIT; lastProbe: LONG CARDINAL = probe.lastTime; timeNow: LONG CARDINAL ← System.GetClockPulses[]; IF CommFlags.doDebug THEN SanityCheck[@rexmtr.table]; BEGIN SELECT TRUE FROM --this connection is suspended....can it be restarted? (connection.state = suspended) => GOTO suspended; --we aren't dead....should we commit suicide? (probe.unacked > probeDefaults.giveUp) => GOTO suspend; ((b ← ExtractRexmt[]) # NIL) => BEGIN body ← b.ns; --make local copy SELECT TRUE FROM (Greater[xmtr.unackedSeq, body.sequenceNumber]) => BEGIN --packet already acked InsertRexmtOrConsume[b]; --all finished with this buffer LOOP; --and on to bigger and better things - don't wait END; ((timeNow - b.fo.time) < (rexmtr.interval * b.fo.tries)) => --whoops - didn't mean to do that - too soon to retrans this one InsertRexmtOrConsume[b]; --put it back ((b.fo.tries ← b.fo.tries + 1) > rexmtr.giveUp) => --this connection is almost dead BEGIN InsertRexmtOrConsume[b]; --put buffer back GOTO suspend; --and quit trying END; ENDCASE => --this is a retransmission - is there anything else to do? BEGIN --is it time to do something rash?? IF (b.fo.tries MOD rexmtrDefaults.flushRoute) = 0 THEN RouterInternal.FlushCacheEntry[body.destination.net]; --sending buffer with sendAck is equivalent to a probe (almost) IF ((timeNow - lastProbe) > probe.inactiveInterval) THEN BEGIN probe.unacked ← probe.unacked + 1; IF CommFlags.doStats THEN Stats.StatIncr[statProbesSent]; END; xmtr.blocked ← body.sendAck ← TRUE; --resynch stream probe.lastTime ← timeNow; --mark new probe interval SendPacket[b]; --retransmit the packet IF CommFlags.doStats THEN Stats.StatIncr[statDataPacketsRetransmitted]; END; END; --no traffic from other end in along time, no buffer to rexmit ((timeNow - lastProbe) > probe.inactiveInterval) => BEGIN probe.lastTime ← timeNow; --mark new interval start IF probe.probing THEN BEGIN SystemPacket[TRUE]; probe.unacked ← probe.unacked + 1; --count the probe IF CommFlags.doStats THEN Stats.StatIncr[statProbesSent]; END; END; --remote won't open window, perhaps we missed his response (~Less[xmtr.maxSeq, xmtr.nextSeq]) => NULL; --we're not trying to send ((timeNow - lastProbe) > probe.allocInterval) => BEGIN probe.lastTime ← timeNow; --mark new allocation interval start SystemPacket[TRUE]; --then send the probe probe.unacked ← probe.unacked + 1; --count the probe IF CommFlags.doStats THEN Stats.StatIncr[statProbesSent]; END; ENDCASE; --time to recompute retransmission interval? IF ((timeNow - rexmtr.calculation) > rexmtr.calculationInterval) THEN BEGIN rexmtr.calculation ← timeNow; --reset start of interval IF (rexmtr.count # 0) THEN BEGIN ri: LONG CARDINAL; << The new retransmission interval (ri) at time (n+1) is ri(n+1). ri(n+1) ← floor <= INT[f[ri(t)]] (t in[0..n])] <= ceiling; This uses a crude Euler integration of the approximate form f(n+1) ← (3 * f(n-1) + f(n)) / 4; The contribution or ri(n) is 1.5 (2.0) * the average time packets spent in the retransmit table waiting to be ack'd. >> ri ← rexmtr.delay / rexmtr.count; --that's the avg over the interval ri ← ri + (ri / 2); --times 1.5 so we don't drive too hard <<ri ← 2 * ri; --times 2.0 so we don't drive too hard>> --weight old interval more than new (3 to 1) ri ← (((2 * rexmtr.interval) + rexmtr.interval) + ri) / 4; --Euler --interval is bounded both low and high rexmtr.interval ← MAX[rexmtr.floor, MIN[rexmtr.ceiling, ri]]; rexmtr.delay ← 0; rexmtr.count ← 0; --reset the collectors END; IF xmtr.rateControlling THEN SetQuencing[down]; --check out this END; EXITS suspend => SuspendStream[transmissionTimeout, timeout]; suspended => NULL; --it has already been done END; IF CommFlags.doDebug THEN SanityCheck[@rexmtr.table]; Wait[@rexmtr.condition]; ENDLOOP; RETURN[LOOPHOLE[Retransmitter]]; END; --Retransmitter SendPacket: PROC [b: NSBuffer.Buffer] = --PROCESS: RETRANSMITTER AND CLIENT SENDING BEGIN body: NSBuffer.Body = b.ns; IF CommFlags.doStats AND body.sendAck THEN Stats.StatIncr[statAckRequestsSent]; << Any packet going out carries the latest state information. That means that if there have been any acks requested, that this transmission satisfies that request. Also, we may have consumed some data that had our receiver in a blocked state. This is an opportunity to check for that too. >> body.unusedType ← 0; --unused field (at present) rcvr.acksReqested ← FALSE; --this is sufficient for any ack rcvr.blocked ← INTEGER[rcvr.nextSeq - rcvr.maxSeq] >= 0; --still blocked? body.acknowledgeNumber ← rcvr.nextSeq; --latest acknowledgement body.allocationNumber ← rcvr.maxSeq; --and latest allocation IF xmtr.rateControlling AND ~body.systemPacket THEN BEGIN << We don't control the rate of transmission of system packets. They go out in response to probes, duplicates, etc. And, we don't log the time we sent them. So we might transmit a system packet followed very closely by a data packet. If we registered the time we sent system packets, then we might get into a rut that prohibited us from sending anything but system packets. If we get ABORT'd (UNWIND) will dallying, just send the packet. That will get it back via the requeue machinery and we know how to handle that. >> UNTIL (System.GetClockPulses[] - xmtr.lastXmt) > xmtr.interval DO ENABLE UNWIND => xmtr.transmitProc[connection.socket, b]; --send it! Process.Pause[Process.MsecToTicks[xmtrDefaults.rateControlPause]]; ENDLOOP; xmtr.lastXmt ← System.GetClockPulses[]; --remember last time we xmt'd END; xmtr.transmitProc[connection.socket, b]; END; --SendPacket SendSequencedPacket: PROC [b: NSBuffer.Buffer] = --PROCESS: RETRANSMITTER, RECEIVER, CLIENT SENDER BEGIN body: NSBuffer.Body = b.ns; b.requeueProcedure ← RequeueProcedure; b.fo.time ← System.GetClockPulses[]; --record packet send time b.fo.tries ← 1; --number of times we transmitted this packet body.packetType ← sequencedPacket; --but of course body.destination ← connection.remoteAddr; body.sourceConnectionID ← connection.localID; body.destinationConnectionID ← connection.remoteID; SendPacket[b]; --da dah! END; --SendSequencedPacket SendToLocal: PROC[cH: Socket.ChannelHandle, b: NSBuffer.Buffer] = BEGIN --This is an optimization for local ethernet transmission body: NSBuffer.Body = b.ns; b.fo.status ← goodCompletion; body.source ← connection.localAddr; b.fo.type ← ns; b.fo.context ← xmtr.context; b.fo.network ← xmtr.context.network; body.transportControl ← [FALSE, 0, 0]; IF xmtr.checksums THEN Checksums.SetChecksum[body] ELSE body.checksum ← 177777B; Protocol1.EncapsulateAndTransmit[LOOPHOLE[b], @connection.remoteAddr.host]; END; --SendToLocal SetQuencing: ENTRY PROC[action: {up, down}] = BEGIN SELECT action FROM up => BEGIN << PROCESS: REQUEUE PROCEDURE (DISPATCHER) Routers are in trouble - start slowing transmit. How much? Double the current value, whatever that is. For how long? Some multiple of the rexmtr.calculation interval. This will up xmtr.quenchIncr by 4. The clock that decrements it is the RETRANSMITTER process and is running every 5 seconds. That gives us 20 seconds of quench before we drop by half. >> IF xmtr.quenchIncr < xmtrDefaults.totalQuench THEN BEGIN xmtr.quenchIncr ← xmtr.quenchIncr + 4; --notice the times xmtr.interval ← 2 * xmtr.interval; --double it END; END; down => BEGIN << PROCESS: RETRANSMITTER Check to see if it's time to try speeding up again. The "up" code will only double 4 times. The "down" code will only halve every 4th time through. The two are in competition with each other. If xmtr.quenchIncr = 0 then normalize using the current retransmission interval and the distance to the remote. If xmtr.quenchIncr goes to zero, find out how far away the remote is and normalize using the current retransmission interval. If the value of xmtr.quenchIncr MOD 4 = 0, then half the interval. >> SELECT TRUE FROM (xmtr.quenchIncr = 0) => --we're running free and easy xmtr.interval ← rexmtr.interval / (2 * connection.hops); ((xmtr.quenchIncr ← PRED[xmtr.quenchIncr]) = 0) => --just cleared BEGIN connection.hops ← Router.GetDelayToNet[connection.remoteAddr.net ! Router.NoTableEntryForNet => CONTINUE]; --it shouldn't fail, but xmtr.interval ← rexmtr.interval / (2 * connection.hops); END; ((xmtr.quenchIncr MOD 4) = 0) => --still backing out xmtr.interval ← xmtr.interval / 2; --halve the interval ENDCASE; --do nothing this time END; ENDCASE; END; --SetQuencing SetSenderSizeLimit: PROC [size: CARDINAL] = {connection.maxSppBytes ← size}; StartPacketStream: PUBLIC SppOps.StartPacketStreamProc = << PROC [local, remote: System.NetworkAddress, localID, remoteID: NSTypes.ConnectionID, timeout: NSTypes.WaitTime, class: PacketStream.ClassOfService, establish: BOOLEAN] RETURNS [ PacketStream.Handle, System.NetworkAddress, NSTypes.ConnectionID]; >> --PROCESS: CLIENT BEGIN InitializePSObject[]; SppNegotiation.InitializeSppState[local, remote, localID, remoteID, timeout]; StartPacketStreamInternal[timeout, establish]; RETURN[@packetStreamObject, connection.remoteAddr, connection.remoteID]; END; --StartPacketStream NewStartPacketStream: PUBLIC SppNegotiation.NewStartPacketStreamProc = << PROC [local, remote: NewNetworkStream.TSap, negotiationParameters: NewNetworkStream.NegotiationParameters, hints: SppNegotiation.NegotiationHints, timeout: NSTypes.WaitTime] RETURNS [PacketStream.Handle]; >> --PROCESS: CLIENT BEGIN InitializePSObject[]; SppNegotiation.NewInitializeSppState[ local, remote, negotiationParameters, hints, timeout]; StartPacketStreamInternal[timeout, hints.activeEstablish]; remote↑ ← [connection.remoteAddr, connection.remoteID]; RETURN[@packetStreamObject]; END; --NewStartPacketStream StartPacketStreamInternal: PROCEDURE [timeout: NSTypes.WaitTime, establish: BOOLEAN] = BEGIN time: LONG CARDINAL ← System.GetClockPulses[]; SELECT connection.state FROM (unestablished) => BEGIN Process.EnableAborts[@isEstablished]; Process.SetTimeout[@isEstablished, Process.MsecToTicks[2000]]; connection.state ← IF establish THEN activeEstablish ELSE waitEstablish; SetWaitTime[MAX[4000, timeout]]; --force at least two xmts of request rexmtr.process ← FORK Retransmitter[]; rcvr.process ← FORK Receiver[]; DO --UNTIL ConnectionFailed | established | open IF establish THEN SystemPacket[TRUE, TRUE]; Wait[@isEstablished]; SELECT TRUE FROM (connection.state = established), (connection.state = open) => EXIT; (connection.state = suspended) => SIGNAL PacketStream.ConnectionFailed[connection.whyFailed]; ((System.GetClockPulses[] - time) > rcvr.interval) => BEGIN time ← System.GetClockPulses[]; SIGNAL PacketStream.ConnectionFailed[timeout]; END; ENDCASE; ENDLOOP; END; ENDCASE => BEGIN SystemPacket[FALSE, TRUE]; rexmtr.process ← FORK Retransmitter[]; rcvr.process ← FORK Receiver[]; END; SetWaitTime[timeout]; --now set the user's timeout request --LOCAL NETWORK EXPEDITED DELIVERY --In effect for directly connection ethernets only (and not ME) IF (connection.localAddr.host # connection.remoteAddr.host) THEN BEGIN FOR device: Device ← Driver.GetDeviceChain[], device.next UNTIL device = NIL DO IF device.device # ethernet THEN LOOP; xmtr.context ← Protocol1.GetContext[device, ns]; IF xmtr.context.netNumber = connection.remoteAddr.net THEN {xmtr.transmitProc ← SendToLocal; EXIT}; ENDLOOP; END; END; --StartPacketStreamInternal StopPacketStream: PUBLIC SppOps.StopPacketStreamProc = --PROCESS: CLIENT STREAM DELETION BEGIN OPEN pool: connection.pool; connection.state ← terminating; IF rcvr.process # NIL THEN {Process.Abort[rcvr.process]; [] ← JOIN rcvr.process}; IF rexmtr.process # NIL THEN {Process.Abort[rexmtr.process]; [] ← JOIN rexmtr.process}; --Flush the retransmitter and receiver tables CleanupTable[@rexmtr.table]; CleanupTable[@rcvr.table]; Socket.Delete[connection.socket]; END; --StopPacketStream SuspendStream: ENTRY PROCEDURE [ suspend: PacketStream.SuspendReason, fail: PacketStream.FailureReason] = --PROCESS: RECEIVER OR RETRANSMITTER BEGIN SELECT connection.state FROM suspended, terminating => NULL; ENDCASE => BEGIN --No possibility of signals connection.stateBeforeSuspension ← connection.state; connection.state ← suspended; connection.whySuspended ← suspend; connection.whyFailed ← fail; END; << The following processes might be waiting and if we don't notify them here, they may hang forever, a very long time. All of these are in loops that go back and test the current state before continuing and will signal "ConnectionSuspended" in this case. >> NOTIFY rcvr.table.condition; NOTIFY xmtr.newAllocation; NOTIFY attn.table.condition; << If there is a client waiting for buffers, then abort him. The GetBuffer code will trap the aborted and translate it into ConnectionSuspended. If we don't do this, the client may wait forever in the BufferMgr. >> IF xmtr.clientProcess # NIL THEN Process.Abort[xmtr.clientProcess]; END; --SuspendStream SystemPacket: PROC[sendAck, wait: BOOLEAN ← FALSE] = --PROCESS: RECEIVER OR RETRANSMITTER BEGIN b: NSBuffer.Buffer ← NSBuffer.GetBuffer[ aH: connection.pool, function: send, wait: wait, size: NSTypes.bytesPerSppHeader + NSTypes.bytesPerIDPHeader]; IF b # NIL THEN BEGIN body: NSBuffer.Body = b.ns; body.pktLength ← PacketStream.bytesPerSequencedPktHeader; body.systemPacket ← TRUE; --that's what this one is all about body.sendAck ← sendAck OR (connection.state < open); --still establishing body.attention ← body.endOfMessage ← FALSE; body.subtype ← 0; --all system packets have vanilla SST body.sequenceNumber ← xmtr.nextSeq; --assign sequence number IF CommFlags.doStats THEN Stats.StatIncr[statSystemPacketsSent]; SendSequencedPacket[b]; --get current state informaion and ship END; END; --SystemPacket SetState: PROC[newState: PacketStream.State] = BEGIN [] ← GetState[]; --might glitch, that's why we call it connection.state ← SELECT newState FROM active => established, ENDCASE => suspended; END; --SetState SetWaitTime: ENTRY PROCEDURE [time: NSTypes.WaitTime] = BEGIN --No possibility of signals --time IN[maxMsecToPulses..INFINITY] => silly client NEVER wake up! rcvr.waitTime ← time; --record in case he forgets rcvr.interval ← IF time > maxMsecToPulses THEN LAST[LONG CARDINAL] ELSE System.MicrosecondsToPulses[time*1000]; IF rcvr.interval = LAST[LONG CARDINAL] THEN Process.DisableTimeout[@rcvr.table.condition] ELSE Process.SetTimeout[ @rcvr.table.condition, CommUtil.PulsesToTicks[[rcvr.interval]]]; END; --SetWaitTime TakeFromUser: PROC [b: NSBuffer.Buffer] = --PROCESS: CLIENT SENDING BEGIN AssignSeqAndAssertSend: ENTRY PROC = BEGIN ENABLE UNWIND => NULL; assertion: INTEGER; sequenceNumber: CARDINAL; --assign current sequence number sequenceNumber ← body.sequenceNumber ← xmtr.nextSeq; --assign seq # xmtr.nextSeq ← xmtr.nextSeq + 1; --consume seq # DO --EXITs (success) or ERRORs (failure) SELECT connection.state FROM established, open => BEGIN assertion ← INTEGER[xmtr.maxSeq - sequenceNumber]; SELECT TRUE FROM --way past end of alloation (assertion < -rcvrDefaults.accept) => GOTO wait; --attention packet past end of allocation...expedited transmission (body.attention) => body.sendAck ← TRUE; --is xmtr blocked due to retransmission in progress? (xmtr.blocked) => GOTO wait; --at end of allocation...request ack this time, wait next time (assertion = 0) => body.sendAck ← TRUE; --past end of allocation with non-expedited packet..wait (assertion < 0) => GOTO wait; --pipe ~50% full...request ack this time, keep xmt'ng ((sequenceNumber - xmtr.unackedSeq) = SUCC[xmtr.maxAlloc]/2) => body.sendAck ← xmtr.transmitProc # SendToLocal; --pipe not full & not at end of allocation...go, go, go ENDCASE --(assertion > 0)-- => body.sendAck ← FALSE; RETURN; --in all cases that didn't GOTO wait EXITS wait => WAIT xmtr.newAllocation; END; suspended => RETURN WITH ERROR PacketStream.ConnectionSuspended[connection.whySuspended]; unestablished, activeEstablish, waitEstablish => IF CommFlags.doDebug THEN Driver.Glitch[StreamNotEstablished]; terminating => IF CommFlags.doDebug THEN Driver.Glitch[StreamTerminating]; ENDCASE; ENDLOOP; END; --AssignSeqAndAssertSend << Even if AssignSeqAndAssertSend fails, send the packet. This gets it into the retrans table since we already consumed the sequence number. This is most appropriate when the operation is ABORTED, but also works for suspended. >> body: NSBuffer.Body = b.ns; AssignSeqAndAssertSend[! UNWIND => SendSequencedPacket[b]]; SendSequencedPacket[b]; --so send the packet already IF CommFlags.doStats THEN BEGIN Stats.StatIncr[statDataPacketsSent]; Stats.StatBump[statDataBytesSent, body.pktLength - PacketStream.bytesPerSequencedPktHeader]; END; END; --TakeFromUser --BEGINNING OF BYTE STREAM IMPLEMENTATION LeftAndRight: TYPE = MACHINE DEPENDENT RECORD [left, right: Environment.Byte]; << A client will typicaly have three processes accessing this module. The first receives data, the second waits for attentions, and the third transmits data, sends attentions and changes the subsequence type. As a consequence it is not necessary for this module to be a monitor, since there is no interaction between the three processes in this module. The interaction occurs in the PktStreamInstance module, which is a monitor. Multiple client processes must not perform data transfer in one direction; the result is unpredicatble. Care should be taken when deleting the stream and therefore this module. >> --The vector of procedures for operating on, and controlling the network stream controlObject: PUBLIC NetworkStreamInternal.ControlObject; --input input: PUBLIC RECORD [ body: NSBuffer.Body, --pointer to interesting data b: NSBuffer.Buffer, --current input buffer in use|NIL finger: CARDINAL, --bytes consumed from input.buffer length: CARDINAL, --bytes of data in input.buffer sst: Stream.SubSequenceType, --current input sst attn, end: BOOLEAN]; --delayed status to report --output output: PUBLIC RECORD [ b: NSBuffer.Buffer, --current input buffer in use|NIL finger: CARDINAL, --bytes consumed from input.b sst: Stream.SubSequenceType, --current output sst sstSent: BOOLEAN, --has sst been reported to remote? bufferSize: CARDINAL]; --max bytes permitted in output GetByte: Stream.GetByteProcedure = BEGIN IF input.b # NIL AND input.finger + 2 < input.length THEN BEGIN --"+2" lets GetBlock give back the buffer if we take the last byte byte ← input.b.ns.sppBytes[input.finger]; input.finger ← input.finger + 1; RETURN; END ELSE BEGIN array: PACKED ARRAY [0..1] OF Stream.Byte; [] ← GetBlock[sH, [@array, 0, 1], [FALSE, FALSE, FALSE, TRUE, TRUE, TRUE]]; RETURN[array[0]]; END; END; GetWord: Stream.GetWordProcedure = BEGIN OPEN w: LOOPHOLE[word, LeftAndRight]; w ← [left: GetByte[sH], right: GetByte[sH]]; END; GetBlock: Stream.GetProcedure = BEGIN moved: CARDINAL; iBlock: Stream.Block; why ← normal; sst ← input.sst; bytesTransferred ← 0; << NOTE: gets with block.startIndex = block.stopIndexPlusOne. They do not alter the stream's state, i.e., they return the same results for multiple calls. >> WHILE block.startIndex < block.stopIndexPlusOne DO << PENDING ATTENTION Caused by a packet with an sst change and attention and the sst was returned or signalled and not resumed. >> IF input.attn THEN BEGIN input.attn ← FALSE; SELECT options.signalAttention FROM TRUE => SIGNAL Stream.Attention[block.startIndex]; ENDCASE => RETURN[bytesTransferred, attention, sst]; END; UNTIL (input.b # NIL) DO << PENDING END RECORD Caused by an empty packet with an sst change and an endRecord status where the sst was returned or signalled and not resumed. >> IF input.end THEN BEGIN input.end ← FALSE; IF options.terminateOnEndRecord THEN BEGIN SELECT options.signalEndRecord FROM TRUE => SIGNAL Stream.EndRecord[block.startIndex]; ENDCASE => RETURN[bytesTransferred, endRecord, sst]; END; END; <<TIMEOUT>> IF (input.b ← GetForUser[]) = NIL THEN --timeout BEGIN SELECT options.signalTimeout FROM TRUE => {SIGNAL Stream.TimeOut[block.startIndex]; LOOP}; FALSE => RETURN[bytesTransferred, timeout, sst]; ENDCASE; END; input.body ← input.b.ns; --make local frame copy input.finger ← 0; --reset staring index input.length ← GetSppDataLength[input.b]; --won't change until next get input.attn ← input.body.attention; --in case both in one packet sst ← input.body.subtype; <<EMPTY PACKET>> IF input.finger = input.length THEN --not much data BEGIN input.end ← input.body.endOfMessage AND options.terminateOnEndRecord; ReturnBuffer[input.b]; input.b ← NIL; END; <<SST CHANGE - may be an empty packet>> IF (sst # input.sst) THEN --sst change BEGIN input.sst ← sst; --record for later SELECT options.signalSSTChange FROM TRUE => SIGNAL Stream.SSTChange[sst, block.startIndex]; ENDCASE => RETURN[bytesTransferred, sstChange, sst]; END; ENDLOOP; --UNTIL (input.b # NIL) <<ATTENTION PACKET - never an empty packet>> IF input.attn THEN BEGIN input.attn ← FALSE; SELECT options.signalAttention FROM TRUE => SIGNAL Stream.Attention[block.startIndex]; ENDCASE => RETURN[bytesTransferred, attention, sst]; END; <<MOVE THE BYTES>> iBlock ← [@input.body.sppBytes, input.finger, input.length]; moved ← ByteBlt.ByteBlt[block, iBlock]; bytesTransferred ← bytesTransferred + moved; block.startIndex ← block.startIndex + moved; input.finger ← input.finger + moved; <<END OF PACKET PROCESSING>> IF input.finger = input.length THEN BEGIN input.end ← input.body.endOfMessage AND options.terminateOnEndRecord; ReturnBuffer[input.b]; input.b ← NIL; <<LONG BLOCK>> IF (options.signalLongBlock AND (block.startIndex < block.stopIndexPlusOne)) THEN SIGNAL Stream.LongBlock[block.startIndex]; <<END RECORD - options.terminateOnEndRecord already asserted>> IF input.end THEN BEGIN input.end ← FALSE; SELECT options.signalEndRecord FROM TRUE => SIGNAL Stream.EndRecord[block.startIndex]; ENDCASE => RETURN[bytesTransferred, endRecord, sst]; END; END; ENDLOOP; --block.startIndex < block.stopIndexPlusOne <<SHORT BLOCK>> IF (options.signalShortBlock AND (input.b # NIL)) THEN ERROR Stream.ShortBlock; END; --GetBlock << The strategy on the transmission side, is to allocate a buffer only when there is data to be copied into the buffer, or if an empty packet must be transmitted. State information is kept around for things like whether a new SST has been sent to the other end or not, incase it is changed without sending any intervening data. >> --Send a client's block of data in one or more packets. PutBlock: Stream.PutProcedure = BEGIN moved: CARDINAL; oBlock: Stream.Block; --client specified empty block? SELECT TRUE FROM (block.startIndex # block.stopIndexPlusOne) => WHILE block.startIndex < block.stopIndexPlusOne DO IF output.b = NIL THEN BEGIN output.b ← GetOutputBuffer[]; output.finger ← 0; output.bufferSize ← connection.maxSppBytes; END; oBlock ← [ blockPointer: @output.b.ns.sppBytes, startIndex: output.finger, stopIndexPlusOne: output.bufferSize]; moved ← ByteBlt.ByteBlt[oBlock, block]; block.startIndex ← block.startIndex + moved; --did we just fill the packet? IF (output.finger ← output.finger + moved) = output.bufferSize THEN --watch out for packet and client buffer ending at same time SendNow[sH, (block.startIndex >= block.stopIndexPlusOne) AND endRecord]; REPEAT << Check for client requested endRecord, watching out for packet and client buffer ending at same time. IF output.b = NIL then the packet was sent from inside the loop. >> FINISHED => IF endRecord AND (output.b # NIL) THEN SendNow[sH, TRUE]; ENDLOOP; (endRecord) => SendNow[sH, TRUE]; ENDCASE; END; --PutBlock PutByte: Stream.PutByteProcedure = BEGIN IF output.b # NIL AND output.finger + 2 < output.bufferSize THEN BEGIN --"+2" lets PutBlock flush the buffer if we fill the last byte output.b.ns.sppBytes[output.finger] ← byte; output.finger ← output.finger + 1; END ELSE BEGIN array: PACKED ARRAY [0..1] OF Stream.Byte ← [byte, ]; PutBlock[sH, [@array, 0, 1], FALSE]; END; END; PutWord: Stream.PutWordProcedure = BEGIN OPEN w: LOOPHOLE[word, LeftAndRight]; PutByte[sH, w.left]; PutByte[sH, w.right]; END; SendNow: Stream.SendNowProcedure = BEGIN IF output.b = NIL THEN {output.finger ← 0; output.b ← GetOutputBuffer[]}; output.b.ns.endOfMessage ← endRecord; FlushOutputBuffer[]; END; --SendNow << Set the SST to the specified value and has some side effects. We assume that the SST is initially = 0, and the first change causes no empty packet to be sent. >> SetSST: Stream.SetSSTProcedure = BEGIN IF sst # output.sst THEN BEGIN FlushOutputBuffer[]; --flush the last buffer if there was one IF NOT output.sstSent THEN --there was no buffer to flush for the old SST so send an empty packet BEGIN output.finger ← 0; output.b ← GetOutputBuffer[]; FlushOutputBuffer[]; END; --remember the new SST output.sst ← sst; output.sstSent ← FALSE END; END; --SetSSt --Send one byte of data in a packet with the attention bit set. SendAttention: Stream.SendAttentionProcedure = BEGIN FlushOutputBuffer[]; --flush the last buffer if there was one output.b ← GetOutputBuffer[]; output.b.ns.sppBytes[0] ← byte; output.finger ← 1; --alwasys one byte long output.b.ns.attention ← TRUE; --this is what this is all about FlushOutputBuffer[]; --with attention byte END; --SendAttention --Wait until an attention arrives or an ERROR is raised. WaitAttention: Stream.WaitAttentionProcedure = {RETURN[ExtractAttention[]]}; --Flush (i.e. sends out) the output.b if there is one. FlushOutputBuffer: PROCEDURE = BEGIN EnterMonitor: ENTRY PROC = INLINE {b ← output.b; output.b ← NIL}; b: NSBuffer.Buffer; EnterMonitor[]; --to flush the buffer IF b # NIL THEN BEGIN b.ns.subtype ← output.sst; output.sstSent ← TRUE; SetSppDataLength[b, output.finger]; TakeFromUser[b]; END; END; --FlushOutputBuffer --Set the length of the sequenced packet given the length of data. SetSppDataLength: PROCEDURE [b: NSBuffer.Buffer, length: CARDINAL] = INLINE {Socket.SetPacketBytes[b, length + NSTypes.bytesPerSppHeader]}; SetTimeout: Stream.SetTimeoutProcedure = {SetWaitTime[waitTime]}; GetTimeout: Stream.GetTimeoutProcedure = {RETURN[GetWaitTime[]]}; --This procedure returns the amount of data in the sequenced packet. GetSppDataLength: PROCEDURE [b: NSBuffer.Buffer] RETURNS [CARDINAL] = INLINE {RETURN[Socket.GetPacketBytes[b] - NSTypes.bytesPerSppHeader]}; GetSST: Stream.GetSSTProcedure = {sst ← output.sst}; StopByteStream: PUBLIC SppOps.StopByteStreamProc = BEGIN controlObject.streamObject ← Stream.defaultObject; --no longer usable IF input.b # NIL THEN {ReturnBuffer[input.b]; input.b ← NIL}; IF output.b # NIL THEN {NSBuffer.ReturnBuffer[output.b]; output.b ← NIL}; END; --StopByteStream StartByteStream: PUBLIC SppOps.StartByteStreamProc = BEGIN controlObject ← [streamObject: Stream.defaultObject, psH: psH]; --Now fill in NetworkStream Specifics controlObject.streamObject.getByte ← GetByte; controlObject.streamObject.putByte ← PutByte; controlObject.streamObject.getWord ← GetWord; controlObject.streamObject.putWord ← PutWord; controlObject.streamObject.get ← GetBlock; controlObject.streamObject.put ← PutBlock; controlObject.streamObject.setSST ← SetSST; controlObject.streamObject.getSST ← GetSST; controlObject.streamObject.sendAttention ← SendAttention; controlObject.streamObject.waitAttention ← WaitAttention; controlObject.streamObject.sendNow ← SendNow; controlObject.streamObject.setTimeout ← SetTimeout; controlObject.streamObject.getTimeout ← GetTimeout; --input input.b ← NIL; input.sst ← 0; input.attn ← input.end ← FALSE; --output output.b ← NIL; output.sst ← 0; output.sstSent ← TRUE; output.bufferSize ← 0; RETURN[@controlObject.streamObject]; END; --StartByteStream --MAINLINE CODE startByteStream ← StartByteStream; startPacketStream ← StartPacketStream; stopByteStream ← StopByteStream; stopPacketStream ← StopPacketStream; newStartPacketStream ← NewStartPacketStream; END. --of NetworkStreamImpl module LOG 17-May-84 10:49:49 AOF Post Klamath 5-Mar-85 17:59:29 AOF Treat error packet[resourceLimits] like listenerReject 20-Mar-85 11:23:17 AOF Fixing PutBlock of no bytes with endRecord = TRUE 20-Mar-85 17:02:58 AOF Deliver rec'd packet even if stream suspended 25-Mar-85 17:15:21 AOF Put in a 11.0 fix that was missed 'cause of fork 12-Apr-85 11:09:50 AOF Acceptance of early packets, 16 bit modulo arithmetic 25-Apr-85 9:30:35 AOF Delete LOOPHOLEs of TransferStatus 3-Jun-85 14:51:47 AOF Another shot at the 20-Mar-85 11:23:17 fix 25-Nov-85 8:13:32 AOF Support for INR's congestionWarning & congestionDiscard 25-Nov-85 8:13:32 AOF Cut rexmtr.ceiling from 36 to 24 seconds. 4-Dec-85 15:45:02 AOF 16 buffer rexmt and rcvr tables 4-Dec-85 16:55:05 AOF variable length send buffers, toggle idle probing 12-Dec-85 10:24:13 AOF Set ackReq on system packets when establishing 9-Jan-86 15:37:29 AOF Better test for 'opening' a connection 11-Apr-86 9:19:46 AOF Switch to disable checksums on local net 18-Apr-86 16:41:29 AOF Get buffer's length from Buffer.DataWordsPerRawBuffer[] 22-May-86 11:41:24 SMA Remove dependencies on Buffer, changes for new encap. 18-Aug-86 17:03:33 AOF Local caching of b.ns. 28-Aug-86 10:47:40 AOF Use priorityPilotRealtimeSwappable for receiver. 5-Oct-86 18:53:31 AOF Look'n at extending the buffer sizes. 15-Oct-86 15:06:18 AOF Reorder status checks in RequeueProcedure 2-Nov-86 16:18:10 AOF More large packet stuff 17-Nov-86 13:41:00 AOF Renaming priorities 20-Nov-86 9:45:03 AOF Toying with HGM's half step increase in acking 7-Jan-87 16:49:53 AOF MDS Tweeks (POINTERS got longer) 26-Mar-87 17:30:15 AOF Fixed AR#10142 (assign values to PutWord and GetTimeout) 13-Apr-87 18:39:31 AOF Tightened up monitoring logic in GetSendBuffer 7-May-87 11:20:07 AOF Copy value of lastProbe in rexmtr in case rcvr overwrites 13-May-87 12:12:41 AOF Rate controlled transmission 9-Aug-87 15:54:20 AOF Always ack duplicate packets (VAX, etc). 9-Sep-87 8:20:53 AOF User proc variables to start instance 3-Nov-87 11:50:37 MXI Added NewStartPacketStream. Moved InitializeSppState to NewNetworkStreamImpl