-- File: NetworkStreamImpl.mesa - last edit:
-- AOF 9-Sep-87 8:26:44
-- SMA 9-Oct-86 9:30:26
-- MXI 3-Nov-87 11:50:45
-- Copyright (C) 1984, 1985, 1986, 1987 by Xerox Corporation. All rights reserved.
-- Function: The implementation module for an instance of a SPP packet stream.
DIRECTORY
Buffer,
ByteBlt USING [ByteBlt],
NSBuffer USING [AccessHandle, Body, GetBuffer, Buffer, ReturnBuffer, Queue],
Checksums USING [SetChecksum],
CommFlags USING [doDebug, doStats],
CommPriorities USING [receiver],
CommUtil USING [PulsesToTicks],
Driver USING [GetDeviceChain, Glitch, Device],
Environment USING [Byte],
NetworkStream USING [],
NetworkStreamInternal USING [ControlObject, State],
NSTypes USING [
bytesPerIDPHeader, bytesPerSppHeader, ConnectionID, WaitTime],
PacketStream USING [
bytesPerSequencedPktHeader, ClassOfService, ConnectionFailed, unknownConnID,
ConnectionSuspended, FailureReason, Handle, Object, State, SuspendReason],
Process USING [
Abort, CancelAbort, DisableTimeout, EnableAborts, GetCurrent, MsecToTicks,
Pause, SetPriority, SetTimeout],
Protocol1 USING [EncapsulateAndTransmit, GetContext],
Router USING [GetDelayToNet, NoTableEntryForNet],
RouterInternal USING [FlushCacheEntry, SendErrorPacket],
RoutingTable USING [NetworkContext],
Socket USING [
ChannelHandle, Delete, GetPacket, SetPacketBytes, GetPacketBytes],
SppNegotiation USING [InitializeSppState,
NewInitializeSppState, NewStartPacketStreamProc],
SppOps USING [
BufferTable, BufferTableObject, AttentionTableObject, nullAttention,
StartByteStreamProc, StopByteStreamProc, StartPacketStreamProc,
StopPacketStreamProc, PSProc, PSProcess],
Stats USING [StatBump, StatIncr],
Stream USING [
Byte, Word, Handle, Block, CompletionCode, InputOptions, LongBlock, Object,
ShortBlock, SSTChange, SubSequenceType, TimeOut, Attention, defaultObject,
GetByteProcedure, PutByteProcedure, GetWordProcedure, PutWordProcedure,
GetProcedure, PutProcedure, SetSSTProcedure, GetSSTProcedure,
SendAttentionProcedure, WaitAttentionProcedure, SendNowProcedure,
SetTimeoutProcedure, GetTimeoutProcedure, EndRecord],
System USING [
GetClockPulses, MicrosecondsToPulses, NetworkAddress, Pulses];
NetworkStreamImpl: MONITOR
IMPORTS
NSBuffer, ByteBlt, Checksums, CommUtil, Driver,
PacketStream, Process, Protocol1, Router, RouterInternal,
Socket, SppNegotiation, Stream, Stats, System
EXPORTS Buffer, NetworkStream, SppOps =
BEGIN
Device: PUBLIC TYPE = Driver.Device;
ConnectionID: PUBLIC TYPE = NSTypes.ConnectionID;
attn: PUBLIC RECORD [table: SppOps.AttentionTableObject];
--connection control parameters
connection: PUBLIC RECORD [
state: NetworkStreamInternal.State, --connection state
socket: Socket.ChannelHandle, --the socket supporting the stream
pool: NSBuffer.AccessHandle, --socket's supporting pool
maxSppBytes: CARDINAL, --of packets supported by this stream
hops: CARDINAL, --to the remote station
localAddr, remoteAddr: System.NetworkAddress,
localID, remoteID: NSTypes.ConnectionID,
timeout: NSTypes.WaitTime, --specified by the client (Create|SetWaitTime)
whyFailed: PacketStream.FailureReason, --connection failures
whySuspended: PacketStream.SuspendReason, --after connection
stateBeforeSuspension: NetworkStreamInternal.State]; --connection state
--variables and parameters for transmission
xmtr: PUBLIC RECORD [
unackedSeq: CARDINAL, --oldest packet in retrans table
nextSeq: CARDINAL, --next sequence to use for transmission
maxSeq: CARDINAL, --max sequence number we can send
maxAlloc: CARDINAL, --upper limit on remote's window
lastXmt: LONG CARDINAL, --last time we transmitted data
interval: LONG CARDINAL, --minimum interval between data transmission
blocked: BOOLEAN, --indicates local xmtr is blocked
checksums: BOOLEAN, --are we using checksums on local net?
rateControlling: BOOLEAN, --are we doing rate control stuff?
quenchIncr: CARDINAL, --number of times we've doubled the interval
newAllocation: CONDITION, --blocks client sending process
clientProcess: PROCESS, --this is the client process
transmitProc: PROC[Socket.ChannelHandle, NSBuffer.Buffer],
context: RoutingTable.NetworkContext]; --used for expedited delivery
xmtrDefaults: RECORD [
allocation, rateControlPause, delay1Hop, delayNHopT1,
delayNHop64, delayNHop96, totalQuench: CARDINAL] = [
1, 50, 50, 100, 120, 300, 16];
--variables and parameteres for receiving
rcvr: PUBLIC RECORD [
--sequencing, duplicate suppression and flow control information
waitTime: LONG CARDINAL, --milliseconds as requested by client
timeout: LONG CARDINAL, --packet should arrive before timeout + interval
interval: LONG CARDINAL, --amount of time willing to wait for next packet
nextSeq: CARDINAL, --next sequence number expected
acksReqested: BOOLEAN, --acks have been requested by remote spp
maxSeq: CARDINAL, --max we expect (but will take one greater)
maxAlloc: NATURAL, --used to determine window that we present
blocked: BOOLEAN, --indicates remote sender is blocked
table: SppOps.BufferTableObject, --input buffer table
process: SppOps.PSProcess]; --receiver proces for joining in Delete
rcvrDefaults: RECORD [accept, duplicate: INTEGER] = [2, 20];
--variables and parameters for retransmission
--all times (LONG CARDINALs) are in pulses
rexmtr: PUBLIC RECORD [
interval: LONG CARDINAL, --the retransmission interval (time in buffer.time)
ceiling: LONG CARDINAL, --max allowed (see rexmtrDefaults.ceiling)
floor: LONG CARDINAL, --min allowed (see rexmtrDefaults.floor)
giveUp: CARDINAL, --number of times to retransmit before giving up
count: CARDINAL, --number of data packets entered in delay calculation
delay: LONG CARDINAL, --cumulative time that packets spent in output table
calculation: LONG CARDINAL, --beginning of collection interval
calculationInterval: LONG CARDINAL, --interval between calculations
table: SppOps.BufferTableObject, --retransmitter buffer table
process: SppOps.PSProcess, --needed to rejoin process during delete
condition: CONDITION]; --block Retransmitter (based on rexmtr.interval)
--default intervals are in milliseconds, others are packets
rexmtrDefaults: RECORD [
flushRoute, giveUp, --numbers of packets
initialLocal, initialRemote, ceiling, floor, calculationInterval: --msecs
CARDINAL] = [8, 30, 500, 6000, 24000, 200, 5000];
--variables and parameters for probing
--all times (LONG CARDINALs) are in pulses
probe: PUBLIC RECORD [
lastTime: LONG CARDINAL, --probe again at "*" + interval
inactiveInterval, allocInterval: LONG CARDINAL, --interval between probes
unacked: CARDINAL, --number of unacked probes sent
probing: BOOLEAN]; --are we probing at all?
--intervals are in milliseconds, probes in packets
probeDefaults: RECORD [
giveUp, inactiveInterval, allocInterval, hopDelay: CARDINAL] =
[8, 5000, 2000, 2000];
packetStreamObject: PUBLIC PacketStream.Object;
<<
Assumptions about buffers needed to support a stream (worst case)
(These numbers are all biased by SppOps.sppWindowSize.)
n are for sending data
1 for sending a system packet
2 for sending expedited packets
n for receiving data
1 for receiving a system packet
2 for receiving an expedited packets
1 for extra receive packet
>>
streamParms: RECORD [send, receive, xmtrAlloc, rcvrAlloc: CARDINAL] = [
3, 4, 0, 0];
--the following constant is used in setting wait times, etc
maxMsecToPulses: LONG CARDINAL = LAST[LONG CARDINAL] / 1000;
unknownCID: NSTypes.ConnectionID = PacketStream.unknownConnID;
isEstablished: CONDITION; --we only need this at the very beginning
startByteStream: PUBLIC SppOps.StartByteStreamProc; --for starting
startPacketStream: PUBLIC SppOps.StartPacketStreamProc; --ditto
stopByteStream: PUBLIC SppOps.StopByteStreamProc; --for stopping
stopPacketStream: PUBLIC SppOps.StopPacketStreamProc; --ditto
newStartPacketStream: PUBLIC SppNegotiation.NewStartPacketStreamProc; --for starting with negotiation
--Glitchable signals
TableScrambled: ERROR = CODE;
StreamTerminating: ERROR = CODE;
StreamNotEstablished: ERROR = CODE;
MultipleBufferClients: ERROR = CODE;
--local signals
InvalidPacketSize: PUBLIC ERROR = CODE;
--Used by procs that notify|wait without state checking required
Notify: ENTRY PROC [c: LONG POINTER TO CONDITION] = INLINE {NOTIFY c↑};
Wait: ENTRY PROC [c: LONG POINTER TO CONDITION] = INLINE
{ENABLE UNWIND => NULL; WAIT c↑};
--these modulo functions 'hide' the ugliness of 16 bit wrap around math
-- a > b -- Greater: PROC [a, b: CARDINAL] RETURNS [BOOLEAN] = INLINE {
RETURN[INTEGER[a - b] > 0]};
-- a < b -- Less: PROC [a, b: CARDINAL] RETURNS [BOOLEAN] = INLINE {
RETURN[INTEGER[a - b] < 0]}; -- a < b
-- a >= b -- GreaterOrEqual: PROC [a, b: CARDINAL] RETURNS [BOOLEAN] = INLINE {
RETURN[INTEGER[a - b] >= 0]};
-- a <= b -- LessOrEqual: PROC [a, b: CARDINAL] RETURNS [BOOLEAN] = INLINE {
RETURN[INTEGER[a - b] <= 0]};
Max: PROC [a, b: CARDINAL] RETURNS [CARDINAL] = INLINE {
RETURN[IF INTEGER[a - b] > 0 THEN a ELSE b]};
Min: PROC [a, b: CARDINAL] RETURNS [CARDINAL] = INLINE {
RETURN[IF INTEGER[a - b] < 0 THEN a ELSE b]};
--SANITY CHECKING AND DEBUGGING CODE
fakeQueue: NSBuffer.Queue ← LOOPHOLE[NetworkStreamImpl];
ReturnBuffer: PROCEDURE [b: NSBuffer.Buffer] = {NSBuffer.ReturnBuffer[b]};
DebugInsertBuffer: INTERNAL PROC[b: NSBuffer.Buffer] = INLINE
BEGIN
IF b.fo.queue # NIL THEN Driver.Glitch[TableScrambled];
b.fo.queue ← fakeQueue; --it isn't a real queue, but it isn't NIL either
END; --DebugInsertBuffer
DebugExtractBuffer: INTERNAL PROC[b: NSBuffer.Buffer] = INLINE
BEGIN
IF b.fo.queue # fakeQueue THEN Driver.Glitch[TableScrambled];
b.fo.queue ← NIL; --now it's NIL
END; --DebugExtractBuffer
SanityCheck: ENTRY PROC[t: LONG POINTER TO SppOps.BufferTableObject] =
BEGIN
IF CommFlags.doDebug THEN
BEGIN
l: CARDINAL ← 0;
b: NSBuffer.Buffer;
FOR i: CARDINAL IN[0..LENGTH[t.slot]) DO
SELECT TRUE FROM
((b ← t.slot[i]) = NIL) => NULL; --no action here
(b.fo.queue # fakeQueue) => GOTO glitch; --not in my table
(b.fo.function = receive) => l ← l + 1; --these use standard requeue
(b.requeueProcedure # RequeueProcedure) => GOTO glitch;
ENDCASE => l ← l + 1; --it's ok, just count it
ENDLOOP;
IF l # t.length THEN GOTO glitch;
EXITS glitch => Driver.Glitch[TableScrambled];
END;
END; --SanityCheck
AcceptablePacket: PROC [b: NSBuffer.Buffer] =
--PROCESS: RECEIVER
BEGIN
--acceptable, but may be early - be careful
InsertRecvdPacketInTable: ENTRY PROC =
BEGIN
--no possibility of signals
i: CARDINAL ← body.sequenceNumber MOD LENGTH[rcvr.table.slot];
--duplicate reception of early packet
IF rcvr.table.slot[i] # NIL THEN {DuplicatePacket[b]; RETURN};
rcvr.table.slot[i] ← b; rcvr.table.length ← rcvr.table.length + 1;
IF CommFlags.doDebug THEN DebugInsertBuffer[b];
IF body.sequenceNumber = rcvr.nextSeq THEN
BEGIN
--first loop of block is guaranteed - we just stuck in the buffer
THROUGH [0..LENGTH[rcvr.table.slot]) DO
--THIS BLOCK IS THE ONLY UPDATER OF rcvr.nextSeq!!!
rcvr.nextSeq ← rcvr.nextSeq + 1; --one more in the sequence
i ← (i + 1) MOD LENGTH[rcvr.table.slot]; --point to next slot in table
IF rcvr.table.slot[i] = NIL THEN EXIT; --first empty slot stops search
ENDLOOP;
NOTIFY rcvr.table.condition;
END;
END; --InsertRecvdPacketInTable
body: NSBuffer.Body = b.ns;
IF body.destinationConnectionID = connection.localID THEN
connection.state ← open; --he's seen us - we've seen him
IF CommFlags.doStats AND body.sendAck THEN
Stats.StatIncr[statAckRequestsReceived];
SELECT TRUE FROM
(body.systemPacket) =>
BEGIN
IF body.sendAck THEN rcvr.acksReqested ← TRUE; --ack sys packets direct
IF CommFlags.doStats THEN Stats.StatIncr[statSystemPacketsReceived];
NSBuffer.ReturnBuffer[b]; --client never sees system packets
END;
(~body.attention) =>
BEGIN
rcvr.blocked ← INTEGER[body.sequenceNumber - rcvr.maxSeq] >= 0;
InsertRecvdPacketInTable[]; --packet MAY be saved in table
END;
(body.attention AND InsertAttention[b]) =>
InsertRecvdPacketInTable[]; --packet MAY be saved in table
ENDCASE => NSBuffer.ReturnBuffer[b]; --duplicate attention packet
END; --AcceptablePacket
CleanupTable: ENTRY PROC [t: SppOps.BufferTable] =
--PROCESS: RECEIVER (t = input) | RETRANSMITTER (t = output)
BEGIN
b: NSBuffer.Buffer;
--No possibility of errors
--Get all buffers out of this table
FOR i: CARDINAL IN[0..LENGTH[t.slot]) DO
IF (b ← t.slot[i]) # NIL THEN
BEGIN
t.slot[i] ← NIL; t.length ← t.length - 1;
IF CommFlags.doDebug THEN DebugExtractBuffer[b];
NSBuffer.ReturnBuffer[b];
END;
ENDLOOP;
END; --CleanupTable
DuplicatePacket: PROC [b: NSBuffer.Buffer] =
--PROCESS: RECEIVER
BEGIN
<<IF b.ns.sendAck THEN>> rcvr.acksReqested ← TRUE; --always ack duplicates
IF CommFlags.doStats THEN Stats.StatIncr[statDataPacketsReceivedAgain];
NSBuffer.ReturnBuffer[b];
END; --DuplicatePacket
Establish: PROC [b: NSBuffer.Buffer] =
--PROCESS: RECEIVER
BEGIN
body: NSBuffer.Body = b.ns;
SELECT TRUE FROM
(body.sequenceNumber # 0) =>
BEGIN
IF CommFlags.doStats THEN Stats.StatIncr[statPacketsRejectedBadID];
NSBuffer.ReturnBuffer[b];
RETURN;
END;
(body.source.socket = connection.remoteAddr.socket)
AND (body.destinationConnectionID = connection.localID)
AND (body.sourceConnectionID # unknownCID) => NULL;
(connection.state = activeEstablish)
AND (body.source.socket # connection.remoteAddr.socket)
AND (body.destinationConnectionID = connection.localID)
AND (body.sourceConnectionID # unknownCID) =>
BEGIN
connection.remoteAddr.socket ← body.source.socket;
END;
(body.source.socket = connection.remoteAddr.socket)
AND (body.destinationConnectionID = unknownCID)
AND (body.sourceConnectionID # unknownCID) =>
BEGIN
IF connection.state = waitEstablish THEN rcvr.acksReqested ← TRUE;
END;
ENDCASE =>
BEGIN
IF CommFlags.doStats THEN Stats.StatIncr[statPacketsRejectedBadID];
NSBuffer.ReturnBuffer[b];
RETURN;
END;
xmtr.maxSeq ← MIN[
body.allocationNumber, xmtr.unackedSeq + xmtr.maxAlloc];
connection.remoteID ← body.sourceConnectionID;
connection.state ← established; --may be 'opened' by AcceptablePacket
IF body.sendAck THEN rcvr.acksReqested ← TRUE; --need an ack on this one
AcceptablePacket[b];
Notify[@isEstablished]; --notify any waiters
END; --Establish
ExtractAttention: ENTRY PROC RETURNS [byte: Environment.Byte] =
--PROCESS: CLIENT WAIT FOR ATTENTION
BEGIN
ENABLE UNWIND => NULL;
--Extract attention byte or wait for one to arrive
i: CARDINAL;
DO
SELECT connection.state FROM
established, open =>
BEGIN
IF attn.table.length # 0 THEN
BEGIN
i ← attn.table.index MOD LENGTH[attn.table.slot];
attn.table.index ← attn.table.index + 1;
IF attn.table.slot[i] = SppOps.nullAttention THEN LOOP;
byte ← attn.table.slot[i].attn;
attn.table.slot[i] ← SppOps.nullAttention;
attn.table.length ← attn.table.length - 1;
RETURN;
END;
END;
suspended => RETURN WITH ERROR
PacketStream.ConnectionSuspended[connection.whySuspended];
unestablished, activeEstablish, waitEstablish =>
IF CommFlags.doDebug THEN Driver.Glitch[StreamNotEstablished];
ENDCASE;
WAIT attn.table.condition;
ENDLOOP;
END; --ExtractAttention
ExtractRexmt: ENTRY PROC RETURNS [b: NSBuffer.Buffer] =
--PROCESS: RETRANSMITTER
BEGIN
--No possibility of signals
--extracts Buffer from table, doesn't consume
i: CARDINAL = rexmtr.table.index MOD LENGTH[rexmtr.table.slot];
IF (b ← rexmtr.table.slot[i]) # NIL THEN
BEGIN
rexmtr.table.slot[i] ← NIL;
rexmtr.table.length ← rexmtr.table.length - 1;
IF CommFlags.doDebug THEN
BEGIN
IF (rexmtr.table.index # b.ns.sequenceNumber) THEN
Driver.Glitch[TableScrambled];
DebugExtractBuffer[b];
END;
END
ELSE {xmtr.blocked ← FALSE; NOTIFY xmtr.newAllocation};
END; --ExtractRexmt
FindAddresses: PROC RETURNS [local, remote: System.NetworkAddress] =
{RETURN[connection.localAddr, connection.remoteAddr]};
GetForUser: ENTRY PROC RETURNS [b: NSBuffer.Buffer] =
--PROCESS: CLIENT RECEIVING
BEGIN
ENABLE UNWIND => NULL;
i: CARDINAL = rcvr.table.index MOD LENGTH[rcvr.table.slot];
rcvr.timeout ← System.GetClockPulses[];
DO --either EXITs or ERRORs out of loop
IF (b ← rcvr.table.slot[i]) # NIL THEN
BEGIN
IF CommFlags.doDebug THEN
BEGIN
IF (rcvr.table.index # b.ns.sequenceNumber) THEN
Driver.Glitch[TableScrambled];
DebugExtractBuffer[b];
END;
rcvr.table.slot[i] ← NIL;
rcvr.table.length ← rcvr.table.length - 1;
rcvr.table.index ← rcvr.table.index + 1;
IF CommFlags.doStats THEN
BEGIN
Stats.StatIncr[statDataPacketsReceived];
Stats.StatBump[statDataBytesReceived,
b.ns.pktLength - PacketStream.bytesPerSequencedPktHeader];
END;
--ONLY UPDATER OF rcvr.maxSeq!!!
--don't take away previously allocated window
rcvr.maxSeq ← Max[rcvr.table.index + rcvr.maxAlloc, rcvr.maxSeq];
IF (rcvr.table.length = 0) AND (rcvr.blocked OR b.ns.sendAck) THEN
SystemPacket[FALSE];
EXIT; --RETURN[b # NIL]
END;
SELECT connection.state FROM
established, open =>
BEGIN
time: LONG CARDINAL = System.GetClockPulses[];
SELECT TRUE FROM
(rcvr.waitTime = 0) => EXIT; --RETURN[b = NIL], didn't want to wait
((time - rcvr.timeout) > rcvr.interval) =>
BEGIN
IF CommFlags.doDebug AND (rcvr.interval = LAST[LONG CARDINAL]) THEN
Driver.Glitch[StreamTerminating];
EXIT; --RETURN[b = NIL], this is a timeout
END;
ENDCASE => WAIT rcvr.table.condition; --wait for packet, LOOP
END;
suspended => RETURN WITH ERROR
PacketStream.ConnectionSuspended[connection.whySuspended];
terminating =>
IF CommFlags.doDebug THEN Driver.Glitch[StreamTerminating];
unestablished, activeEstablish, waitEstablish =>
IF CommFlags.doDebug THEN Driver.Glitch[StreamNotEstablished];
ENDCASE;
ENDLOOP;
END; --GetForUser
GetOutputBuffer: PROC RETURNS[b: NSBuffer.Buffer] =
BEGIN
ClearClientProcess: ENTRY PROC = INLINE {xmtr.clientProcess ← NIL};
EnterOperation: ENTRY PROC RETURNS[suspended: BOOLEAN] = INLINE
BEGIN
SELECT TRUE FROM
(CommFlags.doDebug AND (xmtr.clientProcess # NIL)) =>
Driver.Glitch[MultipleBufferClients];
(connection.state = suspended) => RETURN[TRUE];
ENDCASE => xmtr.clientProcess ← Process.GetCurrent[];
RETURN[FALSE];
END; --EnterOperation
ExitOperation: ENTRY PROC = INLINE
BEGIN
SELECT TRUE FROM
(xmtr.clientProcess = NIL) => NULL; --means we had multiple clients!??
(connection.state = suspended) => --we're suspended, we did the abort?
{Process.CancelAbort[xmtr.clientProcess]; xmtr.clientProcess ← NIL};
ENDCASE => xmtr.clientProcess ← NIL;
END; --ExitOperation
BEGIN --protect only the call into the buffer manager
IF EnterOperation[].suspended THEN GOTO returnSuspended;
b ← NSBuffer.GetBuffer[aH: connection.pool, function: send,
size: connection.maxSppBytes
+ NSTypes.bytesPerSppHeader + NSTypes.bytesPerIDPHeader !
UNWIND => ClearClientProcess[];
ABORTED => IF connection.state = suspended THEN GOTO suspended];
EXITS suspended => {ClearClientProcess[]; GOTO returnSuspended};
END; --protect only the call into the buffer manager
ExitOperation[]; --just to cleanup state appropriately
IF connection.state = suspended THEN
{NSBuffer.ReturnBuffer[b]; GOTO returnSuspended};
b.ns.attention ← b.ns.endOfMessage ← b.ns.systemPacket ← FALSE;
EXITS
returnSuspended =>
RETURN WITH ERROR
PacketStream.ConnectionSuspended[connection.whySuspended];
END; --GetOutputBuffer
GetSenderSizeLimit: PROC RETURNS [CARDINAL] = {RETURN[connection.maxSppBytes]};
GetState: PROC RETURNS[PacketStream.State ← inactive] =
BEGIN
SELECT connection.state FROM
suspended => NULL;
established, open => RETURN[active];
terminating => Driver.Glitch[StreamTerminating];
ENDCASE => Driver.Glitch[StreamNotEstablished];
END; --GetState
GetWaitTime: PROC RETURNS [time: NSTypes.WaitTime] = {time ← rcvr.waitTime};
InitializePSObject: PROC =
BEGIN
packetStreamObject ← [
destroy: NIL, --done from the outside(Mgr)
put: TakeFromUser, get: GetForUser,
waitForAttention: ExtractAttention,
setWaitTime: SetWaitTime,
findAddresses: FindAddresses,
getSenderSizeLimit: GetSenderSizeLimit,
setSenderSizeLimit: SetSenderSizeLimit,
getSendBuffer: GetOutputBuffer,
returnSendBuffer: ReturnBuffer,
returnReceiveBuffer: ReturnBuffer,
getState: GetState, setState: SetState];
END;
InsertAttention: ENTRY PROC [b: NSBuffer.Buffer] RETURNS [BOOLEAN] =
--PROCESS: RECEIVER
BEGIN
--No possibility of signals
--Insert the attention byte into the table
i: CARDINAL ← b.ns.sequenceNumber;
IF CommFlags.doStats THEN Stats.StatIncr[statAttentionsReceived];
IF attn.table.length = 0 THEN attn.table.index ← i;
i ← i MOD LENGTH[attn.table.slot];
IF attn.table.slot[i] # SppOps.nullAttention THEN RETURN[FALSE];
attn.table.slot[i] ← [
sequence: b.ns.sequenceNumber, mask: 377B, attn: b.ns.sppBytes[0]];
attn.table.length ← attn.table.length + 1;
NOTIFY attn.table.condition;
RETURN[TRUE];
END; --InsertAttention
InsertRexmtOrConsume: PROC [b: NSBuffer.Buffer] =
--PROCESS: RETRANSMITTER
BEGIN
InsertBuffer: ENTRY PROC[] = INLINE
BEGIN
rexmtr.table.slot[i] ← b; --return the buffer to the table
rexmtr.table.length ← SUCC[rexmtr.table.length]; --adjust the length
IF CommFlags.doDebug THEN DebugInsertBuffer[b]; --sanity check
END; --InsertBuffer
i: CARDINAL = b.ns.sequenceNumber MOD LENGTH[rexmtr.table.slot];
SELECT TRUE FROM
(CommFlags.doDebug AND (rexmtr.table.slot[i] # NIL)) =>
Driver.Glitch[TableScrambled];
(connection.state = terminating) => NSBuffer.ReturnBuffer[b]; --daid stream
(INTEGER[xmtr.unackedSeq - b.ns.sequenceNumber] > 0) => --already ack'd
BEGIN
rexmtr.delay ← rexmtr.delay + (System.GetClockPulses[] - b.fo.time);
rexmtr.count ← SUCC[rexmtr.count]; --and the number of participants
rexmtr.table.index ← SUCC[rexmtr.table.index]; --consume packet
NSBuffer.ReturnBuffer[b]; --free the buffer
IF rexmtr.table.length > 0 THEN Notify[@rexmtr.condition]; --on to next
END;
ENDCASE => InsertBuffer[]; --not finished with buffer
END; --InsertRexmtOrConsume
ProcessSppState: PROC [b: NSBuffer.Buffer] =
--PROCESS: RECEIVER
BEGIN
bPrime: NSBuffer.Buffer;
body: NSBuffer.Body = b.ns;
unacked: CARDINAL ← body.acknowledgeNumber;
IF INTEGER[body.acknowledgeNumber - xmtr.unackedSeq] > 0 THEN
BEGIN --new acknowledge field
xmtr.unackedSeq ← body.acknowledgeNumber; --copy new ack number
UNTIL (bPrime ← ExtractRexmt[]) = NIL DO
--all packets up to but not including 'xmtr.unackedSeq' are acked
unacked: CARDINAL ← bPrime.ns.sequenceNumber;
InsertRexmtOrConsume[bPrime]; --and put it back | consume it
IF LessOrEqual[xmtr.unackedSeq, unacked] THEN EXIT;
ENDLOOP;
END; --new acknowlege field
IF Greater[body.allocationNumber, xmtr.maxSeq] THEN
BEGIN
--new allocation from remote
xmtr.maxSeq ← Min[xmtr.unackedSeq + xmtr.maxAlloc, body.allocationNumber];
Notify[@xmtr.newAllocation];
END; --new allocation from remote
probe.unacked ← 0; --other end is alive
probe.lastTime ← System.GetClockPulses[]; --to probe for whatever
END; --ProcessSppState
RcvdErrorPacket: PUBLIC PROCEDURE [b: NSBuffer.Buffer] =
--PROCESS: RECEIVER
BEGIN
IF CommFlags.doStats THEN Stats.StatIncr[statErrorPacketsReceived];
SELECT connection.state FROM
IN[unestablished..waitEstablish] =>
BEGIN
SELECT b.ns.errorType FROM
noSocket =>
<<no listener|transducer>>
SuspendStream[noRouteToDestination, noServiceAtDestination];
resourceLimits, listenerReject, connectionLimit =>
<<rejected by listener client>>
SuspendStream[noRouteToDestination, remoteReject];
<<excessHops|cantGetThere => NULL;>>
ENDCASE;
END;
established, open =>
SELECT b.ns.errorType FROM
noSocket =>
<<socket went away (deleted by client)>>
SuspendStream[remoteServiceDisappeared, remoteReject];
congestionWarning, congestionDiscard =>
IF xmtr.rateControlling THEN SetQuencing[up];
ENDCASE; <<rejected|excessHops|cantGetThere => NULL;>>
ENDCASE; <<terminating|suspended => NULL>>
END; --RcvdErrorPacket
RcvdSppPacket: PUBLIC PROC [b: NSBuffer.Buffer] =
--PROCESS: RECEIVER
BEGIN
body: NSBuffer.Body = b.ns;
SELECT connection.state FROM
established, open =>
BEGIN
SELECT TRUE FROM
(body.source.socket # connection.remoteAddr.socket) =>
BEGIN
--out of the blue came.....
IF CommFlags.doDebug THEN
Stats.StatIncr[statPacketsRejectedBadSource];
NSBuffer.ReturnBuffer[b];
END;
(body.sourceConnectionID = connection.remoteID)
AND (body.destinationConnectionID = connection.localID) =>
BEGIN
--really is for this connection
normalizedNext: INTEGER = INTEGER[rcvr.nextSeq - rcvr.maxSeq];
normalizedSeq: INTEGER = INTEGER[body.sequenceNumber - rcvr.maxSeq];
SELECT normalizedSeq FROM
> rcvrDefaults.accept =>
BEGIN
--this is a very early packet - just ignore it
IF CommFlags.doStats THEN
Stats.StatIncr[statDataPacketsReceivedEarly];
NSBuffer.ReturnBuffer[b];
END;
< normalizedNext =>
IF normalizedSeq < normalizedNext - rcvrDefaults.duplicate THEN
BEGIN
--this is a very late packet - just ignore it
IF CommFlags.doStats THEN
Stats.StatIncr[statDataPacketsReceivedVeryLate];
NSBuffer.ReturnBuffer[b];
END
--duplicate, but worth looking at state, acking if sender wishes
ELSE {ProcessSppState[b]; DuplicatePacket[b]};
--the only case that's really productive
ENDCASE => {ProcessSppState[b]; AcceptablePacket[b]};
END;
(body.sourceConnectionID = connection.remoteID)
OR (body.destinationConnectionID = connection.localID) =>
BEGIN
rcvr.acksReqested ← TRUE; --hasn't seen our id yet?
NSBuffer.ReturnBuffer[b]; --don't accept buffer though
END;
ENDCASE =>
BEGIN
IF CommFlags.doDebug THEN
Stats.StatIncr[statPacketsRejectedBadID];
NSBuffer.ReturnBuffer[b]; --just some trash
END;
END;
activeEstablish, waitEstablish => Establish[b];
--suspended, teminating, unestablished => NSBuffer.ReturnBuffer[b];--
ENDCASE => NSBuffer.ReturnBuffer[b];
END; --RcvdSppPacket
Receiver: PROC RETURNS [SppOps.PSProc] =
--PROCESS: THIS IS THE RECEIVER
BEGIN
b: NSBuffer.Buffer;
body: NSBuffer.Body;
Process.SetPriority[CommPriorities.receiver];
<<UNTIL ABORTED>> DO
ENABLE ABORTED => EXIT;
--get next packet from socket queue
b ← Socket.GetPacket[connection.socket]; body ← b.ns;
IF CommFlags.doDebug THEN SanityCheck[@rcvr.table];
SELECT TRUE FROM
body.source.host # connection.remoteAddr.host =>
BEGIN
IF CommFlags.doStats THEN
Stats.StatIncr[statPacketsRejectedBadSource];
NSBuffer.ReturnBuffer[b];
END;
(body.packetType = sequencedPacket)
AND (body.pktLength >= PacketStream.bytesPerSequencedPktHeader) =>
BEGIN
RcvdSppPacket[b];
IF rcvr.acksReqested THEN SystemPacket[FALSE];
END;
body.packetType = error =>
BEGIN
RcvdErrorPacket[b];
NSBuffer.ReturnBuffer[b];
END;
ENDCASE =>
BEGIN
IF CommFlags.doStats THEN Stats.StatIncr[statPacketsRejectedBadType];
RouterInternal.SendErrorPacket[b, invalidPacketType];
END;
IF CommFlags.doDebug THEN SanityCheck[@rcvr.table];
ENDLOOP;
RETURN[LOOPHOLE[Receiver]];
END; --Receiver
RequeueProcedure: PROC [b: NSBuffer.Buffer] =
--PROCESS: BUFFER MANAGER
BEGIN
IF (b.fo.status ~IN[pending..aborted]) THEN
SuspendStream[noRouteToDestination, SELECT b.fo.status FROM
noRouteToNetwork => noRouteToDestination,
noTranslationForDestination => noTranslationForDestination,
noAnswerOrBusy => noAnswerOrBusy,
circuitInUse => circuitInUse,
circuitNotReady => circuitNotReady,
dialerHardwareProblem => circuitNotReady,
noDialingHardware => noDialingHardware,
invalidDestAddr => noServiceAtDestination,
ENDCASE => noRouteToDestination]; --hardware problem--
IF b.ns.systemPacket THEN NSBuffer.ReturnBuffer[b] --don't reQ sys packets
ELSE InsertRexmtOrConsume[b]; --put the buffer back in the table
IF CommFlags.doDebug THEN SanityCheck[@rexmtr.table];
END; --RequeueProcedure
Retransmitter: PROC RETURNS [SppOps.PSProc] =
--PROCESS: THIS IS THE RETRANSMITTER
BEGIN
b: NSBuffer.Buffer;
body: NSBuffer.Body;
<<UNTIL ABORTED>> DO
ENABLE ABORTED => EXIT;
lastProbe: LONG CARDINAL = probe.lastTime;
timeNow: LONG CARDINAL ← System.GetClockPulses[];
IF CommFlags.doDebug THEN SanityCheck[@rexmtr.table];
BEGIN
SELECT TRUE FROM
--this connection is suspended....can it be restarted?
(connection.state = suspended) => GOTO suspended;
--we aren't dead....should we commit suicide?
(probe.unacked > probeDefaults.giveUp) => GOTO suspend;
((b ← ExtractRexmt[]) # NIL) =>
BEGIN
body ← b.ns; --make local copy
SELECT TRUE FROM
(Greater[xmtr.unackedSeq, body.sequenceNumber]) =>
BEGIN
--packet already acked
InsertRexmtOrConsume[b]; --all finished with this buffer
LOOP; --and on to bigger and better things - don't wait
END;
((timeNow - b.fo.time) < (rexmtr.interval * b.fo.tries)) =>
--whoops - didn't mean to do that - too soon to retrans this one
InsertRexmtOrConsume[b]; --put it back
((b.fo.tries ← b.fo.tries + 1) > rexmtr.giveUp) =>
--this connection is almost dead
BEGIN
InsertRexmtOrConsume[b]; --put buffer back
GOTO suspend; --and quit trying
END;
ENDCASE =>
--this is a retransmission - is there anything else to do?
BEGIN
--is it time to do something rash??
IF (b.fo.tries MOD rexmtrDefaults.flushRoute) = 0 THEN
RouterInternal.FlushCacheEntry[body.destination.net];
--sending buffer with sendAck is equivalent to a probe (almost)
IF ((timeNow - lastProbe) > probe.inactiveInterval) THEN
BEGIN
probe.unacked ← probe.unacked + 1;
IF CommFlags.doStats THEN Stats.StatIncr[statProbesSent];
END;
xmtr.blocked ← body.sendAck ← TRUE; --resynch stream
probe.lastTime ← timeNow; --mark new probe interval
SendPacket[b]; --retransmit the packet
IF CommFlags.doStats THEN
Stats.StatIncr[statDataPacketsRetransmitted];
END;
END;
--no traffic from other end in along time, no buffer to rexmit
((timeNow - lastProbe) > probe.inactiveInterval) =>
BEGIN
probe.lastTime ← timeNow; --mark new interval start
IF probe.probing THEN
BEGIN
SystemPacket[TRUE];
probe.unacked ← probe.unacked + 1; --count the probe
IF CommFlags.doStats THEN Stats.StatIncr[statProbesSent];
END;
END;
--remote won't open window, perhaps we missed his response
(~Less[xmtr.maxSeq, xmtr.nextSeq]) => NULL; --we're not trying to send
((timeNow - lastProbe) > probe.allocInterval) =>
BEGIN
probe.lastTime ← timeNow; --mark new allocation interval start
SystemPacket[TRUE]; --then send the probe
probe.unacked ← probe.unacked + 1; --count the probe
IF CommFlags.doStats THEN Stats.StatIncr[statProbesSent];
END;
ENDCASE;
--time to recompute retransmission interval?
IF ((timeNow - rexmtr.calculation) > rexmtr.calculationInterval) THEN
BEGIN
rexmtr.calculation ← timeNow; --reset start of interval
IF (rexmtr.count # 0) THEN
BEGIN
ri: LONG CARDINAL;
<<
The new retransmission interval (ri) at time (n+1) is ri(n+1).
ri(n+1) ← floor <= INT[f[ri(t)]] (t in[0..n])] <= ceiling;
This uses a crude Euler integration of the approximate form
f(n+1) ← (3 * f(n-1) + f(n)) / 4;
The contribution or ri(n) is 1.5 (2.0) * the average time packets
spent in the retransmit table waiting to be ack'd.
>>
ri ← rexmtr.delay / rexmtr.count; --that's the avg over the interval
ri ← ri + (ri / 2); --times 1.5 so we don't drive too hard
<<ri ← 2 * ri; --times 2.0 so we don't drive too hard>>
--weight old interval more than new (3 to 1)
ri ← (((2 * rexmtr.interval) + rexmtr.interval) + ri) / 4; --Euler
--interval is bounded both low and high
rexmtr.interval ← MAX[rexmtr.floor, MIN[rexmtr.ceiling, ri]];
rexmtr.delay ← 0; rexmtr.count ← 0; --reset the collectors
END;
IF xmtr.rateControlling THEN SetQuencing[down]; --check out this
END;
EXITS
suspend => SuspendStream[transmissionTimeout, timeout];
suspended => NULL; --it has already been done
END;
IF CommFlags.doDebug THEN SanityCheck[@rexmtr.table];
Wait[@rexmtr.condition];
ENDLOOP;
RETURN[LOOPHOLE[Retransmitter]];
END; --Retransmitter
SendPacket: PROC [b: NSBuffer.Buffer] =
--PROCESS: RETRANSMITTER AND CLIENT SENDING
BEGIN
body: NSBuffer.Body = b.ns;
IF CommFlags.doStats AND body.sendAck THEN
Stats.StatIncr[statAckRequestsSent];
<<
Any packet going out carries the latest state information. That means
that if there have been any acks requested, that this transmission
satisfies that request. Also, we may have consumed some data that had
our receiver in a blocked state. This is an opportunity to check for
that too.
>>
body.unusedType ← 0; --unused field (at present)
rcvr.acksReqested ← FALSE; --this is sufficient for any ack
rcvr.blocked ← INTEGER[rcvr.nextSeq - rcvr.maxSeq] >= 0; --still blocked?
body.acknowledgeNumber ← rcvr.nextSeq; --latest acknowledgement
body.allocationNumber ← rcvr.maxSeq; --and latest allocation
IF xmtr.rateControlling AND ~body.systemPacket THEN
BEGIN
<<
We don't control the rate of transmission of system packets. They
go out in response to probes, duplicates, etc. And, we don't log
the time we sent them. So we might transmit a system packet followed
very closely by a data packet. If we registered the time we sent
system packets, then we might get into a rut that prohibited us from
sending anything but system packets.
If we get ABORT'd (UNWIND) will dallying, just send the packet. That will
get it back via the requeue machinery and we know how to handle that.
>>
UNTIL (System.GetClockPulses[] - xmtr.lastXmt) > xmtr.interval DO
ENABLE UNWIND => xmtr.transmitProc[connection.socket, b]; --send it!
Process.Pause[Process.MsecToTicks[xmtrDefaults.rateControlPause]];
ENDLOOP;
xmtr.lastXmt ← System.GetClockPulses[]; --remember last time we xmt'd
END;
xmtr.transmitProc[connection.socket, b];
END; --SendPacket
SendSequencedPacket: PROC [b: NSBuffer.Buffer] =
--PROCESS: RETRANSMITTER, RECEIVER, CLIENT SENDER
BEGIN
body: NSBuffer.Body = b.ns;
b.requeueProcedure ← RequeueProcedure;
b.fo.time ← System.GetClockPulses[]; --record packet send time
b.fo.tries ← 1; --number of times we transmitted this packet
body.packetType ← sequencedPacket; --but of course
body.destination ← connection.remoteAddr;
body.sourceConnectionID ← connection.localID;
body.destinationConnectionID ← connection.remoteID;
SendPacket[b]; --da dah!
END; --SendSequencedPacket
SendToLocal: PROC[cH: Socket.ChannelHandle, b: NSBuffer.Buffer] =
BEGIN
--This is an optimization for local ethernet transmission
body: NSBuffer.Body = b.ns;
b.fo.status ← goodCompletion;
body.source ← connection.localAddr;
b.fo.type ← ns;
b.fo.context ← xmtr.context;
b.fo.network ← xmtr.context.network;
body.transportControl ← [FALSE, 0, 0];
IF xmtr.checksums THEN Checksums.SetChecksum[body]
ELSE body.checksum ← 177777B;
Protocol1.EncapsulateAndTransmit[LOOPHOLE[b], @connection.remoteAddr.host];
END; --SendToLocal
SetQuencing: ENTRY PROC[action: {up, down}] =
BEGIN
SELECT action FROM
up =>
BEGIN
<<
PROCESS: REQUEUE PROCEDURE (DISPATCHER)
Routers are in trouble - start slowing transmit.
How much? Double the current value, whatever that is.
For how long? Some multiple of the rexmtr.calculation interval.
This will up xmtr.quenchIncr by 4. The clock that decrements it
is the RETRANSMITTER process and is running every 5 seconds. That
gives us 20 seconds of quench before we drop by half.
>>
IF xmtr.quenchIncr < xmtrDefaults.totalQuench THEN
BEGIN
xmtr.quenchIncr ← xmtr.quenchIncr + 4; --notice the times
xmtr.interval ← 2 * xmtr.interval; --double it
END;
END;
down =>
BEGIN
<<
PROCESS: RETRANSMITTER
Check to see if it's time to try speeding up again. The "up" code
will only double 4 times. The "down" code will only halve every
4th time through. The two are in competition with each other.
If xmtr.quenchIncr = 0 then normalize using the current retransmission
interval and the distance to the remote.
If xmtr.quenchIncr goes to zero, find out how far away the remote
is and normalize using the current retransmission interval.
If the value of xmtr.quenchIncr MOD 4 = 0, then half the interval.
>>
SELECT TRUE FROM
(xmtr.quenchIncr = 0) => --we're running free and easy
xmtr.interval ← rexmtr.interval / (2 * connection.hops);
((xmtr.quenchIncr ← PRED[xmtr.quenchIncr]) = 0) => --just cleared
BEGIN
connection.hops ← Router.GetDelayToNet[connection.remoteAddr.net !
Router.NoTableEntryForNet => CONTINUE]; --it shouldn't fail, but
xmtr.interval ← rexmtr.interval / (2 * connection.hops);
END;
((xmtr.quenchIncr MOD 4) = 0) => --still backing out
xmtr.interval ← xmtr.interval / 2; --halve the interval
ENDCASE; --do nothing this time
END;
ENDCASE;
END; --SetQuencing
SetSenderSizeLimit: PROC [size: CARDINAL] = {connection.maxSppBytes ← size};
StartPacketStream: PUBLIC SppOps.StartPacketStreamProc =
<<
PROC [local, remote: System.NetworkAddress,
localID, remoteID: NSTypes.ConnectionID,
timeout: NSTypes.WaitTime, class: PacketStream.ClassOfService,
establish: BOOLEAN] RETURNS [
PacketStream.Handle, System.NetworkAddress, NSTypes.ConnectionID];
>>
--PROCESS: CLIENT
BEGIN
InitializePSObject[];
SppNegotiation.InitializeSppState[local, remote, localID, remoteID, timeout];
StartPacketStreamInternal[timeout, establish];
RETURN[@packetStreamObject, connection.remoteAddr, connection.remoteID];
END; --StartPacketStream
NewStartPacketStream: PUBLIC SppNegotiation.NewStartPacketStreamProc =
<<
PROC [local, remote: NewNetworkStream.TSap,
negotiationParameters: NewNetworkStream.NegotiationParameters,
hints: SppNegotiation.NegotiationHints, timeout: NSTypes.WaitTime]
RETURNS [PacketStream.Handle];
>>
--PROCESS: CLIENT
BEGIN
InitializePSObject[];
SppNegotiation.NewInitializeSppState[
local, remote, negotiationParameters, hints, timeout];
StartPacketStreamInternal[timeout, hints.activeEstablish];
remote↑ ← [connection.remoteAddr, connection.remoteID];
RETURN[@packetStreamObject];
END; --NewStartPacketStream
StartPacketStreamInternal: PROCEDURE [timeout: NSTypes.WaitTime, establish: BOOLEAN] =
BEGIN
time: LONG CARDINAL ← System.GetClockPulses[];
SELECT connection.state FROM
(unestablished) =>
BEGIN
Process.EnableAborts[@isEstablished];
Process.SetTimeout[@isEstablished, Process.MsecToTicks[2000]];
connection.state ← IF establish THEN activeEstablish ELSE waitEstablish;
SetWaitTime[MAX[4000, timeout]]; --force at least two xmts of request
rexmtr.process ← FORK Retransmitter[];
rcvr.process ← FORK Receiver[];
DO --UNTIL ConnectionFailed | established | open
IF establish THEN SystemPacket[TRUE, TRUE];
Wait[@isEstablished];
SELECT TRUE FROM
(connection.state = established), (connection.state = open) => EXIT;
(connection.state = suspended) =>
SIGNAL PacketStream.ConnectionFailed[connection.whyFailed];
((System.GetClockPulses[] - time) > rcvr.interval) =>
BEGIN
time ← System.GetClockPulses[];
SIGNAL PacketStream.ConnectionFailed[timeout];
END;
ENDCASE;
ENDLOOP;
END;
ENDCASE =>
BEGIN
SystemPacket[FALSE, TRUE];
rexmtr.process ← FORK Retransmitter[];
rcvr.process ← FORK Receiver[];
END;
SetWaitTime[timeout]; --now set the user's timeout request
--LOCAL NETWORK EXPEDITED DELIVERY
--In effect for directly connection ethernets only (and not ME)
IF (connection.localAddr.host # connection.remoteAddr.host) THEN
BEGIN
FOR device: Device ← Driver.GetDeviceChain[], device.next
UNTIL device = NIL DO
IF device.device # ethernet THEN LOOP;
xmtr.context ← Protocol1.GetContext[device, ns];
IF xmtr.context.netNumber = connection.remoteAddr.net THEN
{xmtr.transmitProc ← SendToLocal; EXIT};
ENDLOOP;
END;
END; --StartPacketStreamInternal
StopPacketStream: PUBLIC SppOps.StopPacketStreamProc =
--PROCESS: CLIENT STREAM DELETION
BEGIN
OPEN pool: connection.pool;
connection.state ← terminating;
IF rcvr.process # NIL THEN
{Process.Abort[rcvr.process]; [] ← JOIN rcvr.process};
IF rexmtr.process # NIL THEN
{Process.Abort[rexmtr.process]; [] ← JOIN rexmtr.process};
--Flush the retransmitter and receiver tables
CleanupTable[@rexmtr.table];
CleanupTable[@rcvr.table];
Socket.Delete[connection.socket];
END; --StopPacketStream
SuspendStream: ENTRY PROCEDURE [
suspend: PacketStream.SuspendReason, fail: PacketStream.FailureReason] =
--PROCESS: RECEIVER OR RETRANSMITTER
BEGIN
SELECT connection.state FROM
suspended, terminating => NULL;
ENDCASE =>
BEGIN
--No possibility of signals
connection.stateBeforeSuspension ← connection.state;
connection.state ← suspended;
connection.whySuspended ← suspend;
connection.whyFailed ← fail;
END;
<<
The following processes might be waiting and if we don't notify them here,
they may hang forever, a very long time. All of these are in loops that go
back and test the current state before continuing and will signal
"ConnectionSuspended" in this case.
>>
NOTIFY rcvr.table.condition;
NOTIFY xmtr.newAllocation;
NOTIFY attn.table.condition;
<<
If there is a client waiting for buffers, then abort him. The GetBuffer
code will trap the aborted and translate it into ConnectionSuspended. If
we don't do this, the client may wait forever in the BufferMgr.
>>
IF xmtr.clientProcess # NIL THEN Process.Abort[xmtr.clientProcess];
END; --SuspendStream
SystemPacket: PROC[sendAck, wait: BOOLEAN ← FALSE] =
--PROCESS: RECEIVER OR RETRANSMITTER
BEGIN
b: NSBuffer.Buffer ← NSBuffer.GetBuffer[
aH: connection.pool, function: send, wait: wait,
size: NSTypes.bytesPerSppHeader + NSTypes.bytesPerIDPHeader];
IF b # NIL THEN
BEGIN
body: NSBuffer.Body = b.ns;
body.pktLength ← PacketStream.bytesPerSequencedPktHeader;
body.systemPacket ← TRUE; --that's what this one is all about
body.sendAck ← sendAck OR (connection.state < open); --still establishing
body.attention ← body.endOfMessage ← FALSE;
body.subtype ← 0; --all system packets have vanilla SST
body.sequenceNumber ← xmtr.nextSeq; --assign sequence number
IF CommFlags.doStats THEN Stats.StatIncr[statSystemPacketsSent];
SendSequencedPacket[b]; --get current state informaion and ship
END;
END; --SystemPacket
SetState: PROC[newState: PacketStream.State] =
BEGIN
[] ← GetState[]; --might glitch, that's why we call it
connection.state ← SELECT newState FROM
active => established, ENDCASE => suspended;
END; --SetState
SetWaitTime: ENTRY PROCEDURE [time: NSTypes.WaitTime] =
BEGIN
--No possibility of signals
--time IN[maxMsecToPulses..INFINITY] => silly client NEVER wake up!
rcvr.waitTime ← time; --record in case he forgets
rcvr.interval ← IF time > maxMsecToPulses THEN LAST[LONG CARDINAL]
ELSE System.MicrosecondsToPulses[time*1000];
IF rcvr.interval = LAST[LONG CARDINAL] THEN
Process.DisableTimeout[@rcvr.table.condition]
ELSE Process.SetTimeout[
@rcvr.table.condition, CommUtil.PulsesToTicks[[rcvr.interval]]];
END; --SetWaitTime
TakeFromUser: PROC [b: NSBuffer.Buffer] =
--PROCESS: CLIENT SENDING
BEGIN
AssignSeqAndAssertSend: ENTRY PROC =
BEGIN
ENABLE UNWIND => NULL;
assertion: INTEGER;
sequenceNumber: CARDINAL;
--assign current sequence number
sequenceNumber ← body.sequenceNumber ← xmtr.nextSeq; --assign seq #
xmtr.nextSeq ← xmtr.nextSeq + 1; --consume seq #
DO --EXITs (success) or ERRORs (failure)
SELECT connection.state FROM
established, open =>
BEGIN
assertion ← INTEGER[xmtr.maxSeq - sequenceNumber];
SELECT TRUE FROM
--way past end of alloation
(assertion < -rcvrDefaults.accept) => GOTO wait;
--attention packet past end of allocation...expedited transmission
(body.attention) => body.sendAck ← TRUE;
--is xmtr blocked due to retransmission in progress?
(xmtr.blocked) => GOTO wait;
--at end of allocation...request ack this time, wait next time
(assertion = 0) => body.sendAck ← TRUE;
--past end of allocation with non-expedited packet..wait
(assertion < 0) => GOTO wait;
--pipe ~50% full...request ack this time, keep xmt'ng
((sequenceNumber - xmtr.unackedSeq) = SUCC[xmtr.maxAlloc]/2) =>
body.sendAck ← xmtr.transmitProc # SendToLocal;
--pipe not full & not at end of allocation...go, go, go
ENDCASE --(assertion > 0)-- => body.sendAck ← FALSE;
RETURN; --in all cases that didn't GOTO wait
EXITS wait => WAIT xmtr.newAllocation;
END;
suspended => RETURN WITH ERROR
PacketStream.ConnectionSuspended[connection.whySuspended];
unestablished, activeEstablish, waitEstablish =>
IF CommFlags.doDebug THEN Driver.Glitch[StreamNotEstablished];
terminating =>
IF CommFlags.doDebug THEN Driver.Glitch[StreamTerminating];
ENDCASE;
ENDLOOP;
END; --AssignSeqAndAssertSend
<<
Even if AssignSeqAndAssertSend fails, send the packet. This gets it into
the retrans table since we already consumed the sequence number. This is
most appropriate when the operation is ABORTED, but also works for suspended.
>>
body: NSBuffer.Body = b.ns;
AssignSeqAndAssertSend[! UNWIND => SendSequencedPacket[b]];
SendSequencedPacket[b]; --so send the packet already
IF CommFlags.doStats THEN
BEGIN
Stats.StatIncr[statDataPacketsSent];
Stats.StatBump[statDataBytesSent,
body.pktLength - PacketStream.bytesPerSequencedPktHeader];
END;
END; --TakeFromUser
--BEGINNING OF BYTE STREAM IMPLEMENTATION
LeftAndRight: TYPE = MACHINE DEPENDENT RECORD [left, right: Environment.Byte];
<<
A client will typicaly have three processes accessing this module. The first
receives data, the second waits for attentions, and the third transmits data,
sends attentions and changes the subsequence type. As a consequence it is not
necessary for this module to be a monitor, since there is no interaction
between the three processes in this module. The interaction occurs in the
PktStreamInstance module, which is a monitor. Multiple client processes must
not perform data transfer in one direction; the result is unpredicatble.
Care should be taken when deleting the stream and therefore this module.
>>
--The vector of procedures for operating on, and controlling the network stream
controlObject: PUBLIC NetworkStreamInternal.ControlObject;
--input
input: PUBLIC RECORD [
body: NSBuffer.Body, --pointer to interesting data
b: NSBuffer.Buffer, --current input buffer in use|NIL
finger: CARDINAL, --bytes consumed from input.buffer
length: CARDINAL, --bytes of data in input.buffer
sst: Stream.SubSequenceType, --current input sst
attn, end: BOOLEAN]; --delayed status to report
--output
output: PUBLIC RECORD [
b: NSBuffer.Buffer, --current input buffer in use|NIL
finger: CARDINAL, --bytes consumed from input.b
sst: Stream.SubSequenceType, --current output sst
sstSent: BOOLEAN, --has sst been reported to remote?
bufferSize: CARDINAL]; --max bytes permitted in output
GetByte: Stream.GetByteProcedure =
BEGIN
IF input.b # NIL AND input.finger + 2 < input.length THEN
BEGIN --"+2" lets GetBlock give back the buffer if we take the last byte
byte ← input.b.ns.sppBytes[input.finger];
input.finger ← input.finger + 1;
RETURN;
END
ELSE
BEGIN
array: PACKED ARRAY [0..1] OF Stream.Byte;
[] ← GetBlock[sH, [@array, 0, 1], [FALSE, FALSE, FALSE, TRUE, TRUE, TRUE]];
RETURN[array[0]];
END;
END;
GetWord: Stream.GetWordProcedure =
BEGIN
OPEN w: LOOPHOLE[word, LeftAndRight];
w ← [left: GetByte[sH], right: GetByte[sH]];
END;
GetBlock: Stream.GetProcedure =
BEGIN
moved: CARDINAL;
iBlock: Stream.Block;
why ← normal; sst ← input.sst; bytesTransferred ← 0;
<<
NOTE: gets with block.startIndex = block.stopIndexPlusOne. They
do not alter the stream's state, i.e., they return the same results
for multiple calls.
>>
WHILE block.startIndex < block.stopIndexPlusOne DO
<<
PENDING ATTENTION
Caused by a packet with an sst change and attention and the
sst was returned or signalled and not resumed.
>>
IF input.attn THEN
BEGIN
input.attn ← FALSE;
SELECT options.signalAttention FROM
TRUE => SIGNAL Stream.Attention[block.startIndex];
ENDCASE => RETURN[bytesTransferred, attention, sst];
END;
UNTIL (input.b # NIL) DO
<<
PENDING END RECORD
Caused by an empty packet with an sst change and an endRecord status
where the sst was returned or signalled and not resumed.
>>
IF input.end THEN
BEGIN
input.end ← FALSE;
IF options.terminateOnEndRecord THEN
BEGIN
SELECT options.signalEndRecord FROM
TRUE => SIGNAL Stream.EndRecord[block.startIndex];
ENDCASE => RETURN[bytesTransferred, endRecord, sst];
END;
END;
<<TIMEOUT>>
IF (input.b ← GetForUser[]) = NIL THEN --timeout
BEGIN
SELECT options.signalTimeout FROM
TRUE => {SIGNAL Stream.TimeOut[block.startIndex]; LOOP};
FALSE => RETURN[bytesTransferred, timeout, sst];
ENDCASE;
END;
input.body ← input.b.ns; --make local frame copy
input.finger ← 0; --reset staring index
input.length ← GetSppDataLength[input.b]; --won't change until next get
input.attn ← input.body.attention; --in case both in one packet
sst ← input.body.subtype;
<<EMPTY PACKET>>
IF input.finger = input.length THEN --not much data
BEGIN
input.end ← input.body.endOfMessage AND options.terminateOnEndRecord;
ReturnBuffer[input.b]; input.b ← NIL;
END;
<<SST CHANGE - may be an empty packet>>
IF (sst # input.sst) THEN --sst change
BEGIN
input.sst ← sst; --record for later
SELECT options.signalSSTChange FROM
TRUE => SIGNAL Stream.SSTChange[sst, block.startIndex];
ENDCASE => RETURN[bytesTransferred, sstChange, sst];
END;
ENDLOOP; --UNTIL (input.b # NIL)
<<ATTENTION PACKET - never an empty packet>>
IF input.attn THEN
BEGIN
input.attn ← FALSE;
SELECT options.signalAttention FROM
TRUE => SIGNAL Stream.Attention[block.startIndex];
ENDCASE => RETURN[bytesTransferred, attention, sst];
END;
<<MOVE THE BYTES>>
iBlock ← [@input.body.sppBytes, input.finger, input.length];
moved ← ByteBlt.ByteBlt[block, iBlock];
bytesTransferred ← bytesTransferred + moved;
block.startIndex ← block.startIndex + moved;
input.finger ← input.finger + moved;
<<END OF PACKET PROCESSING>>
IF input.finger = input.length THEN
BEGIN
input.end ← input.body.endOfMessage AND options.terminateOnEndRecord;
ReturnBuffer[input.b]; input.b ← NIL;
<<LONG BLOCK>>
IF (options.signalLongBlock AND
(block.startIndex < block.stopIndexPlusOne)) THEN
SIGNAL Stream.LongBlock[block.startIndex];
<<END RECORD - options.terminateOnEndRecord already asserted>>
IF input.end THEN
BEGIN
input.end ← FALSE;
SELECT options.signalEndRecord FROM
TRUE => SIGNAL Stream.EndRecord[block.startIndex];
ENDCASE => RETURN[bytesTransferred, endRecord, sst];
END;
END;
ENDLOOP; --block.startIndex < block.stopIndexPlusOne
<<SHORT BLOCK>>
IF (options.signalShortBlock AND (input.b # NIL)) THEN
ERROR Stream.ShortBlock;
END; --GetBlock
<<
The strategy on the transmission side, is to allocate a buffer only when
there is data to be copied into the buffer, or if an empty packet must be
transmitted. State information is kept around for things like whether a
new SST has been sent to the other end or not, incase it is changed without
sending any intervening data.
>>
--Send a client's block of data in one or more packets.
PutBlock: Stream.PutProcedure =
BEGIN
moved: CARDINAL;
oBlock: Stream.Block;
--client specified empty block?
SELECT TRUE FROM
(block.startIndex # block.stopIndexPlusOne) =>
WHILE block.startIndex < block.stopIndexPlusOne DO
IF output.b = NIL THEN
BEGIN
output.b ← GetOutputBuffer[];
output.finger ← 0;
output.bufferSize ← connection.maxSppBytes;
END;
oBlock ← [
blockPointer: @output.b.ns.sppBytes,
startIndex: output.finger,
stopIndexPlusOne: output.bufferSize];
moved ← ByteBlt.ByteBlt[oBlock, block];
block.startIndex ← block.startIndex + moved;
--did we just fill the packet?
IF (output.finger ← output.finger + moved) = output.bufferSize THEN
--watch out for packet and client buffer ending at same time
SendNow[sH,
(block.startIndex >= block.stopIndexPlusOne) AND endRecord];
REPEAT
<<
Check for client requested endRecord, watching out for packet and
client buffer ending at same time. IF output.b = NIL then the packet
was sent from inside the loop.
>>
FINISHED => IF endRecord AND (output.b # NIL) THEN SendNow[sH, TRUE];
ENDLOOP;
(endRecord) => SendNow[sH, TRUE];
ENDCASE;
END; --PutBlock
PutByte: Stream.PutByteProcedure =
BEGIN
IF output.b # NIL AND output.finger + 2 < output.bufferSize THEN
BEGIN --"+2" lets PutBlock flush the buffer if we fill the last byte
output.b.ns.sppBytes[output.finger] ← byte;
output.finger ← output.finger + 1;
END
ELSE
BEGIN
array: PACKED ARRAY [0..1] OF Stream.Byte ← [byte, ];
PutBlock[sH, [@array, 0, 1], FALSE];
END;
END;
PutWord: Stream.PutWordProcedure =
BEGIN OPEN w: LOOPHOLE[word, LeftAndRight];
PutByte[sH, w.left];
PutByte[sH, w.right];
END;
SendNow: Stream.SendNowProcedure =
BEGIN
IF output.b = NIL THEN {output.finger ← 0; output.b ← GetOutputBuffer[]};
output.b.ns.endOfMessage ← endRecord;
FlushOutputBuffer[];
END; --SendNow
<<
Set the SST to the specified value and has some side effects.
We assume that the SST is initially = 0, and the first change causes no
empty packet to be sent.
>>
SetSST: Stream.SetSSTProcedure =
BEGIN
IF sst # output.sst THEN
BEGIN
FlushOutputBuffer[]; --flush the last buffer if there was one
IF NOT output.sstSent THEN
--there was no buffer to flush for the old SST so send an empty packet
BEGIN
output.finger ← 0;
output.b ← GetOutputBuffer[];
FlushOutputBuffer[];
END;
--remember the new SST
output.sst ← sst;
output.sstSent ← FALSE
END;
END; --SetSSt
--Send one byte of data in a packet with the attention bit set.
SendAttention: Stream.SendAttentionProcedure =
BEGIN
FlushOutputBuffer[]; --flush the last buffer if there was one
output.b ← GetOutputBuffer[];
output.b.ns.sppBytes[0] ← byte;
output.finger ← 1; --alwasys one byte long
output.b.ns.attention ← TRUE; --this is what this is all about
FlushOutputBuffer[]; --with attention byte
END; --SendAttention
--Wait until an attention arrives or an ERROR is raised.
WaitAttention: Stream.WaitAttentionProcedure = {RETURN[ExtractAttention[]]};
--Flush (i.e. sends out) the output.b if there is one.
FlushOutputBuffer: PROCEDURE =
BEGIN
EnterMonitor: ENTRY PROC = INLINE {b ← output.b; output.b ← NIL};
b: NSBuffer.Buffer;
EnterMonitor[]; --to flush the buffer
IF b # NIL THEN
BEGIN
b.ns.subtype ← output.sst;
output.sstSent ← TRUE;
SetSppDataLength[b, output.finger];
TakeFromUser[b];
END;
END; --FlushOutputBuffer
--Set the length of the sequenced packet given the length of data.
SetSppDataLength: PROCEDURE [b: NSBuffer.Buffer, length: CARDINAL] = INLINE
{Socket.SetPacketBytes[b, length + NSTypes.bytesPerSppHeader]};
SetTimeout: Stream.SetTimeoutProcedure = {SetWaitTime[waitTime]};
GetTimeout: Stream.GetTimeoutProcedure = {RETURN[GetWaitTime[]]};
--This procedure returns the amount of data in the sequenced packet.
GetSppDataLength: PROCEDURE [b: NSBuffer.Buffer] RETURNS [CARDINAL] = INLINE
{RETURN[Socket.GetPacketBytes[b] - NSTypes.bytesPerSppHeader]};
GetSST: Stream.GetSSTProcedure = {sst ← output.sst};
StopByteStream: PUBLIC SppOps.StopByteStreamProc =
BEGIN
controlObject.streamObject ← Stream.defaultObject; --no longer usable
IF input.b # NIL THEN {ReturnBuffer[input.b]; input.b ← NIL};
IF output.b # NIL THEN {NSBuffer.ReturnBuffer[output.b]; output.b ← NIL};
END; --StopByteStream
StartByteStream: PUBLIC SppOps.StartByteStreamProc =
BEGIN
controlObject ← [streamObject: Stream.defaultObject, psH: psH];
--Now fill in NetworkStream Specifics
controlObject.streamObject.getByte ← GetByte;
controlObject.streamObject.putByte ← PutByte;
controlObject.streamObject.getWord ← GetWord;
controlObject.streamObject.putWord ← PutWord;
controlObject.streamObject.get ← GetBlock;
controlObject.streamObject.put ← PutBlock;
controlObject.streamObject.setSST ← SetSST;
controlObject.streamObject.getSST ← GetSST;
controlObject.streamObject.sendAttention ← SendAttention;
controlObject.streamObject.waitAttention ← WaitAttention;
controlObject.streamObject.sendNow ← SendNow;
controlObject.streamObject.setTimeout ← SetTimeout;
controlObject.streamObject.getTimeout ← GetTimeout;
--input
input.b ← NIL;
input.sst ← 0;
input.attn ← input.end ← FALSE;
--output
output.b ← NIL;
output.sst ← 0;
output.sstSent ← TRUE;
output.bufferSize ← 0;
RETURN[@controlObject.streamObject];
END; --StartByteStream
--MAINLINE CODE
startByteStream ← StartByteStream;
startPacketStream ← StartPacketStream;
stopByteStream ← StopByteStream;
stopPacketStream ← StopPacketStream;
newStartPacketStream ← NewStartPacketStream;
END. --of NetworkStreamImpl module
LOG
17-May-84 10:49:49 AOF Post Klamath
5-Mar-85 17:59:29 AOF Treat error packet[resourceLimits] like listenerReject
20-Mar-85 11:23:17 AOF Fixing PutBlock of no bytes with endRecord = TRUE
20-Mar-85 17:02:58 AOF Deliver rec'd packet even if stream suspended
25-Mar-85 17:15:21 AOF Put in a 11.0 fix that was missed 'cause of fork
12-Apr-85 11:09:50 AOF Acceptance of early packets, 16 bit modulo arithmetic
25-Apr-85 9:30:35 AOF Delete LOOPHOLEs of TransferStatus
3-Jun-85 14:51:47 AOF Another shot at the 20-Mar-85 11:23:17 fix
25-Nov-85 8:13:32 AOF Support for INR's congestionWarning & congestionDiscard
25-Nov-85 8:13:32 AOF Cut rexmtr.ceiling from 36 to 24 seconds.
4-Dec-85 15:45:02 AOF 16 buffer rexmt and rcvr tables
4-Dec-85 16:55:05 AOF variable length send buffers, toggle idle probing
12-Dec-85 10:24:13 AOF Set ackReq on system packets when establishing
9-Jan-86 15:37:29 AOF Better test for 'opening' a connection
11-Apr-86 9:19:46 AOF Switch to disable checksums on local net
18-Apr-86 16:41:29 AOF Get buffer's length from Buffer.DataWordsPerRawBuffer[]
22-May-86 11:41:24 SMA Remove dependencies on Buffer, changes for new encap.
18-Aug-86 17:03:33 AOF Local caching of b.ns.
28-Aug-86 10:47:40 AOF Use priorityPilotRealtimeSwappable for receiver.
5-Oct-86 18:53:31 AOF Look'n at extending the buffer sizes.
15-Oct-86 15:06:18 AOF Reorder status checks in RequeueProcedure
2-Nov-86 16:18:10 AOF More large packet stuff
17-Nov-86 13:41:00 AOF Renaming priorities
20-Nov-86 9:45:03 AOF Toying with HGM's half step increase in acking
7-Jan-87 16:49:53 AOF MDS Tweeks (POINTERS got longer)
26-Mar-87 17:30:15 AOF Fixed AR#10142 (assign values to PutWord and GetTimeout)
13-Apr-87 18:39:31 AOF Tightened up monitoring logic in GetSendBuffer
7-May-87 11:20:07 AOF Copy value of lastProbe in rexmtr in case rcvr overwrites
13-May-87 12:12:41 AOF Rate controlled transmission
9-Aug-87 15:54:20 AOF Always ack duplicate packets (VAX, etc).
9-Sep-87 8:20:53 AOF User proc variables to start instance
3-Nov-87 11:50:37 MXI Added NewStartPacketStream. Moved InitializeSppState to NewNetworkStreamImpl