PupStreamImpl.mesa
Copyright © 1986 by Xerox Corporation. All rights reserved.
Hal Murray, November 4, 1986 3:04:52 pm PST
Doug Wyatt, June 10, 1986 6:08:40 pm PDT
Demers, June 12, 1986 8:26:52 am PDT
Tim Diebert: October 13, 1986 7:07:47 pm PDT
DIRECTORY
Basics USING [bytesPerWord, UnsafeBlock],
BasicTime USING [GetClockPulses, MicrosecondsToPulses, Pulses],
Booting USING [RegisterProcs, RollbackProc],
CommDriver USING [sendPriority],
Endian USING [CardFromF, FFromCard, FWORD],
IO USING [Close, CreateStream, CreateStreamProcs, EndOfStream, STREAM, StreamProcs],
PrincOpsUtils USING [ByteBlt, LongCopy],
Process USING [Abort, Detach, DisableTimeout, EnableAborts, MsecToTicks, Pause, priorityBackground, priorityForeground, SecondsToTicks, SetPriority, SetTimeout, Ticks],
Pup USING [Address, nullAddress, Socket],
PupBuffer USING [Buffer, BufferObject, ByteIndex, maxDataBytes, maxNewGatewayBytes],
PupHop USING [GetHop, Hop],
PupName USING [IsWellKnown],
PupSocket USING [AllocBuffer, AppendRope, CreateServer, CreateEphemeral, Destroy, ExtractAbortRope, ExtractErrorRope, FixupSourceAndDest, FreeBuffer, Get, GetLocalAddress, GetRemoteAddress, GetUserBytes, GetUniqueID, Kick, Put, SetGetTimeout, SetRemoteAddress, SetUserBytes, SetUserHWords, SetUserSize, Socket, waitForever],
PupSocketBackdoor USING [AllocRecvBuffer, PutAgain, PutFirst, ReturnToSenderNoFree, SetDirectReceive, SetSoftwareChecksumming],
PupStream USING [CloseReason, FilterProc, ListenerProc, MARK, Milliseconds, waitForever],
PupStreamBackdoor USING [],
PupType USING [Type],
Rope USING [ROPE],
SafeStorage USING [EnableFinalization, EstablishFinalization, FinalizationQueue, FQNext, NewFQ];
PupStreamImpl: CEDAR MONITOR LOCKS handle USING handle: Handle
IMPORTS BasicTime, Booting, Endian, IO, PrincOpsUtils, Process, PupHop, PupName, PupSocket, PupSocketBackdoor, SafeStorage
EXPORTS PupStream, PupStreamBackdoor = {
Copied Types
Buffer: TYPE = PupBuffer.Buffer;
ByteIndex: TYPE = PupBuffer.ByteIndex;
CloseReason: TYPE = PupStream.CloseReason;
MARK: TYPE = PupStream.MARK;
Milliseconds: TYPE = PupStream.Milliseconds;
ROPE: TYPE = Rope.ROPE;
Socket: TYPE = PupSocket.Socket;
STREAM: TYPE = IO.STREAM;
Should move to Endian ??
WordAligned: PROC [index: NAT] RETURNS [yes: BOOL] = INLINE {
RETURN[(index MOD Basics.bytesPerWord) = 0]; };
ERRORs
SocketNotWellKnown: PUBLIC ERROR = CODE;
Timeout: PUBLIC SIGNAL = CODE;
StreamClosing: PUBLIC ERROR [why: CloseReason, text: ROPE] = CODE;
Performance tuning
useDirectInput: BOOLTRUE;
useInLineCopy: BOOLFALSE;
disableSoftwareChecksums: BOOLFALSE;
defaultNumberOfBuffers: NAT ← 32; -- These are >1500 bytes, not single pages
maxBufferSize: NAT ← maxDataBytes;
SetMaxBufferSize: PUBLIC PROC [size: NAT] ~ { maxBufferSize ← size };
Copied/Derived Values
maxDataBytes: NAT = PupBuffer.maxDataBytes; -- Max bytes in a packet
maxNewGatewayBytes: NAT = PupBuffer.maxNewGatewayBytes;
Max bytes through Old Gateways. Beware: There may be a lot of old ones out there!
streamProcs: REF IO.StreamProcs = IO.CreateStreamProcs [
variety: $inputOutput, class: $Pup,
getChar: GetChar,
endOf: EndOf,
charsAvail: CharsAvail,
getBlock: GetBlock,
unsafeGetBlock: UnsafeGetBlock,
putChar: PutChar,
putBlock: PutBlock,
unsafePutBlock: UnsafePutBlock,
flush: Flush,
close: Close ];
oneSecond: BasicTime.Pulses = BasicTime.MicrosecondsToPulses[1D6];
tenSeconds: BasicTime.Pulses = BasicTime.MicrosecondsToPulses[10D6];
retransmitPulses: BasicTime.Pulses = BasicTime.MicrosecondsToPulses[3D6];
probePulses: BasicTime.Pulses = BasicTime.MicrosecondsToPulses[1*60D6];
Counters
duplicateRfcReceived: LONG CARDINAL ← 0;
funnyPacketType: LONG CARDINAL ← 0;
acksDefered: LONG CARDINAL ← 0;
oldAcks: LONG CARDINAL ← 0;
oldPackets: LONG CARDINAL ← 0;
futurePackets: LONG CARDINAL ← 0;
clumpsRetransmitted: LONG CARDINAL ← 0;
packetsRetransmitted: LONG CARDINAL ← 0;
droppedStreams: LONG CARDINAL ← 0;
finishedStreams: LONG CARDINAL ← 0;
Our Types
Finger: TYPE = REF FingerObject;
FingerObject: TYPE = RECORD [
state: {empty, halfFull, halfEmpty, full} ← empty,
index: ByteIndex ← 0,
bytes: ByteIndex ← 0,
next: Finger ← NIL,
mark: BOOLFALSE,
ack: BOOLFALSE,
inProgress: BOOLFALSE,
buffer: Buffer ];
Handle: TYPE = REF Object;
Object: TYPE = MONITORED RECORD [
outputWrite, outputRead, outputToAck: Finger ← NIL,
inputWrite, inputRead: Finger ← NIL,
socket: Socket ← NIL,
remote: Pup.Address ← NULL,
connectionId: Endian.FWORD,
outputFlushed: CONDITION,
outputSpace: CONDITION,
outputReady: CONDITION,
retransmitter, push1, push2, push3: PROCESS,
pushId: LONG CARDINAL,
ackedId: LONG CARDINAL,
pushPackets: CARDINAL ← 0,
pushBytes: CARDINAL ← 0,
pushPacketSize: CARDINAL ← 0,
outputBuffersToUse: CARDINAL ← 0,
outputBuffersAllocated: CARDINAL ← 0,
outputBuffersReady: CARDINAL ← 0,
outputBuffersInFlight: CARDINAL ← 0,
timeOfLastPush: BasicTime.Pulses ← BasicTime.GetClockPulses[],
retransmitting: BOOLFALSE,
flush: BOOLFALSE,
inputReady: CONDITION,
pull: PROCESS,
pullId: LONG CARDINAL,
pullPackets: CARDINAL ← 0,
pullBytes: CARDINAL ← 0,
inputBuffersToUse: CARDINAL ← 0,
inputBuffersAllocated: CARDINAL ← 0,
inputBuffersAvailable: CARDINAL ← 0,
timeOfLastArrival: BasicTime.Pulses ← BasicTime.GetClockPulses[],
stateChange: CONDITION,
alloc: CONDITION,
ack: CONDITION,
allocNeeded: BOOLFALSE,
Attentions: Out of band signaling
sendAttenIdSent: LONG CARDINAL,
sendAttenIdAcked: LONG CARDINAL,
sendAttention: CONDITION, -- ABORTs NOT ENABLED
recvAttenIdArrived: LONG CARDINAL,
recvAttenIdSeen: LONG CARDINAL,
recvAttention: CONDITION,
err: ROPENIL, -- # NIL => StreamClosed
why: CloseReason ← NULL,
bufferSize: NAT,
next: Handle ← NIL,
dead: BOOLFALSE ];
Listener: TYPE = REF ListenerRep;
ListenerRep: PUBLIC TYPE = RECORD [
local: Pup.Socket,
worker: PupStream.ListenerProc,
clientData: REF ANY,
getTimeout: Milliseconds,
putTimeout: Milliseconds,
filter: PupStream.FilterProc, -- NIL => Accept all requests
echoFilter: PupStream.FilterProc, -- NIL => Answer all echos
listen: PROCESS ];
Sockets: TYPE = REF SocketsRep;
SocketsRep: PUBLIC TYPE = RECORD [
used: BOOL,
socket: Socket ];
Byte Stream Interface
Create: PUBLIC PROC [remote: Pup.Address, getTimeout, putTimeout: Milliseconds] RETURNS [STREAM] = {
handle: Handle ← NewObject[remote, PupSocket.GetUniqueID[], NIL, getTimeout, putTimeout];
OpenConnection[handle];
SetupObject[handle];
ExchangeAcks[handle];
RETURN[HandleToStream[handle]];
};
Abort: PUBLIC PROC [self: STREAM, err: ROPE] = {
handle: Handle = NARROW[self.streamData];
IF handle.err # NIL THEN RETURN;
SendAbort[handle, err];
};
SendMark: PUBLIC PROC [self: STREAM, mark: MARK] = {
handle: Handle = NARROW[self.streamData];
finger: Finger;
DO
CheckForClosed[handle];
finger ← handle.outputWrite;
SELECT finger.state FROM
empty => {
finger.state ← halfFull;
finger.mark ← TRUE;
finger.index ← 1;
finger.bytes ← handle.pushPacketSize;
EXIT; };
halfFull => FinishOutputFinger[handle];
ENDCASE => WaitOutputSpace[handle];
ENDLOOP;
finger.buffer.byte[0] ← mark;
handle.flush ← TRUE;
FinishOutputFinger[handle];
};
ConsumeMark: PUBLIC PROC [self: STREAM] RETURNS [mark: MARK] = {
handle: Handle = NARROW[self.streamData];
finger: Finger ← handle.inputRead;
DO
SELECT finger.state FROM
full => {
finger.state ← halfEmpty;
finger.index ← 0; };
halfEmpty => NULL;
ENDCASE => WaitInputReady[handle];
IF finger.mark THEN EXIT;
finger ← NotifyInputSpace[handle];
MaybeSendAck[handle];
ENDLOOP;
mark ← finger.buffer.byte[0];
[] ← NotifyInputSpace[handle];
MaybeSendAck[handle];
};
GetChar: PROC [self: STREAM] RETURNS [char: CHAR] = {
handle: Handle = NARROW[self.streamData];
finger: Finger ← handle.inputRead;
index: ByteIndex;
DO
SELECT finger.state FROM
full => {
finger.state ← halfEmpty;
finger.index ← 0;
EXIT; };
halfEmpty => EXIT;
ENDCASE => WaitInputReady[handle];
ENDLOOP;
IF finger.mark THEN ERROR IO.EndOfStream[self];
index ← finger.index;
char ← finger.buffer.char[index];
index ← index.SUCC;
finger.index ← index;
IF finger.index = finger.bytes THEN {
[] ← NotifyInputSpace[handle];
MaybeSendAck[handle]; };
};
EndOf: PROC [self: STREAM] RETURNS [BOOL] = {
handle: Handle = NARROW[self.streamData];
finger: Finger ← handle.inputRead;
SELECT finger.state FROM
empty => NULL;
halfEmpty, full => RETURN[finger.mark];
ENDCASE => NULL;
CheckForClosed[handle];
RETURN[FALSE];
};
CharsAvail: PROC [self: STREAM, wait: BOOL] RETURNS [INT] = {
handle: Handle = NARROW[self.streamData];
finger: Finger ← handle.inputRead;
DO
SELECT finger.state FROM
full => {
finger.state ← halfEmpty;
finger.index ← 0;
EXIT; };
halfEmpty => EXIT;
ENDCASE => IF wait THEN WaitInputReady[handle] ELSE RETURN[0];
ENDLOOP;
IF finger.mark THEN RETURN[INT.LAST];
RETURN[finger.bytes-finger.index];
};
UnsafeGetBlock: PROC [self: STREAM, block: Basics.UnsafeBlock] RETURNS [nBytesRead: INT ← 0] = {
handle: Handle = NARROW[self.streamData];
finger: Finger ← handle.inputRead;
base: LONG POINTER = LOOPHOLE[block.base];
startIndex: INT ← block.startIndex;
stop: INT = block.startIndex + block.count;
nBytes: NAT;
WHILE startIndex < stop DO
DO
SELECT finger.state FROM
full => {
finger.state ← halfEmpty;
finger.index ← 0;
EXIT; };
halfEmpty => EXIT;
ENDCASE => {
IF handle.err # NIL AND nBytesRead # 0 THEN RETURN;
Give user tail of data before StreamClosing
WaitInputReady[handle]; };
ENDLOOP;
IF finger.mark THEN RETURN; -- EOF
IF useInLineCopy
AND WordAligned[startIndex] AND WordAligned[stop]
AND WordAligned[finger.index] AND WordAligned[finger.bytes] THEN {
Special check for the normal/easy case.
toBytes: NAT = stop-startIndex;
fromBytes: NAT = finger.bytes-finger.index;
words: NAT = MIN[toBytes, fromBytes] / Basics.bytesPerWord; -- Round down
TRUSTED {
PrincOpsUtils.LongCopy[
to: base + startIndex / Basics.bytesPerWord,
nwords: words,
from: @finger.buffer.byte + finger.index / Basics.bytesPerWord]; };
nBytes ← words * Basics.bytesPerWord; }
ELSE TRUSTED {
nBytes ← PrincOpsUtils.ByteBlt[
to: [
blockPointer: base,
startIndex: startIndex,
stopIndexPlusOne: stop],
from: [
blockPointer: @finger.buffer.byte,
startIndex: finger.index,
stopIndexPlusOne: finger.bytes] ]; };
startIndex ← startIndex + nBytes;
finger.index ← finger.index + nBytes;
IF finger.index = finger.bytes THEN {
finger ← NotifyInputSpace[handle];
MaybeSendAck[handle]; };
nBytesRead ← nBytesRead + nBytes;
ENDLOOP;
};
GetBlock: PROC [self: STREAM, block: REF TEXT, startIndex: NAT ← 0, count: NATNAT.LAST] RETURNS [nBytesRead: NAT ← 0] = {
handle: Handle = NARROW[self.streamData];
finger: Finger ← handle.inputRead;
base: LONG POINTER = LOOPHOLE[block, LONG POINTER]+TEXT[0].SIZE;
stop: NAT = MIN[(startIndex+count), block.maxLength];
nBytes: NAT;
WHILE startIndex < stop DO
DO
SELECT finger.state FROM
full => {
finger.state ← halfEmpty;
finger.index ← 0;
EXIT; };
halfEmpty => EXIT;
ENDCASE => {
IF handle.err # NIL AND nBytesRead # 0 THEN RETURN;
Give user tail of data before StreamClosing
WaitInputReady[handle]; };
ENDLOOP;
IF finger.mark THEN RETURN; -- EOF
IF useInLineCopy
AND WordAligned[startIndex] AND WordAligned[stop]
AND WordAligned[finger.index] AND WordAligned[finger.bytes] THEN {
Special check for the normal/easy case.
toBytes: NAT = stop-startIndex;
fromBytes: NAT = finger.bytes-finger.index;
words: NAT = MIN[toBytes, fromBytes] / Basics.bytesPerWord; -- Round down
TRUSTED {
PrincOpsUtils.LongCopy[
to: base + startIndex / Basics.bytesPerWord,
nwords: words,
from: @finger.buffer.byte + finger.index / Basics.bytesPerWord]; };
nBytes ← words * Basics.bytesPerWord; }
ELSE TRUSTED {
nBytes ← PrincOpsUtils.ByteBlt[
to: [
blockPointer: base,
startIndex: startIndex,
stopIndexPlusOne: stop],
from: [
blockPointer: @finger.buffer.byte,
startIndex: finger.index,
stopIndexPlusOne: finger.bytes] ]; };
startIndex ← startIndex + nBytes;
block.length ← startIndex;
finger.index ← finger.index + nBytes;
IF finger.index = finger.bytes THEN {
finger ← NotifyInputSpace[handle];
MaybeSendAck[handle]; };
nBytesRead ← nBytesRead + nBytes;
ENDLOOP;
};
PutChar: PROC [self: STREAM, char: CHAR] = {
handle: Handle = NARROW[self.streamData];
finger: Finger ← handle.outputWrite;
index: ByteIndex;
DO
CheckForClosed[handle];
SELECT finger.state FROM
empty => {
finger.state ← halfFull;
finger.mark ← FALSE;
finger.index ← 0;
finger.bytes ← handle.pushPacketSize;
EXIT; };
halfFull => EXIT;
ENDCASE => WaitOutputSpace[handle];
ENDLOOP;
index ← finger.index;
finger.buffer.char[finger.index] ← char;
index ← index.SUCC;
finger.index ← index;
IF finger.index = finger.bytes THEN FinishOutputFinger[handle];
};
UnsafePutBlock: PROC [self: STREAM, block: Basics.UnsafeBlock] = {
handle: Handle = NARROW[self.streamData];
base: LONG POINTER = LOOPHOLE[block.base];
startIndex: INT ← block.startIndex;
stop: INT = block.startIndex + block.count;
nBytes: INT;
WHILE startIndex < stop DO
finger: Finger ← handle.outputWrite;
DO
CheckForClosed[handle];
SELECT finger.state FROM
empty => {
finger.state ← halfFull;
finger.mark ← FALSE;
finger.index ← 0;
finger.bytes ← handle.pushPacketSize;
EXIT; };
halfFull => EXIT;
ENDCASE => WaitOutputSpace[handle];
ENDLOOP;
IF useInLineCopy
AND WordAligned[startIndex] AND WordAligned[stop]
AND WordAligned[finger.index] AND WordAligned[finger.bytes] THEN {
Special check for the normal/easy case.
toBytes: NAT = finger.bytes-finger.index;
fromBytes: NAT = stop-startIndex;
words: NAT = MIN[toBytes, fromBytes] / Basics.bytesPerWord; -- Round down
TRUSTED {
PrincOpsUtils.LongCopy[
to: @finger.buffer.byte + (finger.index / Basics.bytesPerWord),
nwords: words,
from: base + (startIndex / Basics.bytesPerWord)]; };
nBytes ← words * Basics.bytesPerWord; }
ELSE TRUSTED {
nBytes ← PrincOpsUtils.ByteBlt[
to: [
blockPointer: @finger.buffer.byte,
startIndex: finger.index,
stopIndexPlusOne: finger.bytes],
from: [
blockPointer: base,
startIndex: startIndex,
stopIndexPlusOne: stop] ]; };
startIndex ← startIndex + nBytes;
finger.index ← finger.index + nBytes;
IF finger.index = finger.bytes THEN FinishOutputFinger[handle];
ENDLOOP;
};
PutBlock: PROC [self: STREAM, block: REF READONLY TEXT, startIndex: NAT ← 0, count: NATNAT.LAST] = {
handle: Handle = NARROW[self.streamData];
base: LONG POINTER = LOOPHOLE[block, LONG POINTER]+TEXT[0].SIZE;
stop: NAT = MIN[(startIndex+count), block.length];
nBytes: NAT;
WHILE startIndex < stop DO
finger: Finger ← handle.outputWrite;
DO
CheckForClosed[handle];
SELECT finger.state FROM
empty => {
finger.state ← halfFull;
finger.mark ← FALSE;
finger.index ← 0;
finger.bytes ← handle.pushPacketSize;
EXIT; };
halfFull => EXIT;
ENDCASE => WaitOutputSpace[handle];
ENDLOOP;
IF useInLineCopy
AND WordAligned[startIndex] AND WordAligned[stop]
AND WordAligned[finger.index] AND WordAligned[finger.bytes] THEN {
Special check for the normal/easy case.
toBytes: NAT = finger.bytes-finger.index;
fromBytes: NAT = stop-startIndex;
words: NAT = MIN[toBytes, fromBytes] / Basics.bytesPerWord; -- Round down
TRUSTED {
PrincOpsUtils.LongCopy[
to: @finger.buffer.byte + (finger.index / Basics.bytesPerWord),
nwords: words,
from: base + (startIndex / Basics.bytesPerWord)]; };
nBytes ← words * Basics.bytesPerWord; }
ELSE TRUSTED {
nBytes ← PrincOpsUtils.ByteBlt[
to: [
blockPointer: @finger.buffer.byte,
startIndex: finger.index,
stopIndexPlusOne: finger.bytes],
from: [
blockPointer: base,
startIndex: startIndex,
stopIndexPlusOne: stop] ]; };
startIndex ← startIndex + nBytes;
finger.index ← finger.index + nBytes;
IF finger.index = finger.bytes THEN FinishOutputFinger[handle];
ENDLOOP;
};
Push: PUBLIC PROC [self: STREAM] = {
handle: Handle = NARROW[self.streamData];
SELECT handle.outputWrite.state FROM
halfFull => FinishOutputFinger[handle];
ENDCASE => NULL;
};
Flush: PROC [self: STREAM] = {
handle: Handle = NARROW[self.streamData];
SELECT handle.outputWrite.state FROM
halfFull => { handle.flush ← TRUE; FinishOutputFinger[handle]; };
ENDCASE => NULL;
IF handle.outputBuffersReady = 0 THEN RETURN; -- Skip closed test
IF ~handle.flush THEN SendProbe[handle];
handle.flush ← TRUE;
WHILE handle.flush DO
CheckForClosed[handle];
DO
WaitOutputFlushed[handle];
CheckForClosed[handle];
IF ~handle.flush THEN RETURN;
SIGNAL Timeout;
ENDLOOP;
ENDLOOP;
};
Close: PROC [self: STREAM, abort: BOOL] = {
handle: Handle = NARROW[self.streamData];
IF handle = NIL THEN RETURN;
IF abort AND handle.err = NIL THEN Abort[self, "Close with Abort"];
IF ~abort AND handle.err = NIL THEN Flush[self ! StreamClosing, Timeout => CONTINUE ];
IF handle.outputBuffersReady # 0 AND handle.err = NIL THEN
Abort[self, "Close with unFlushed data"];
IF handle.err = NIL THEN CloseConnection[handle];
self.streamData ← NIL;
CloseHandle[handle];
};
CloseHandle: PROC [handle: Handle] = {
IF CloseHandleInternal[handle] THEN RETURN;
TRUSTED {
JOIN handle.retransmitter;
JOIN handle.push1;
JOIN handle.push2;
JOIN handle.push3;
JOIN handle.pull; };
FreeBuffers[handle];
PupSocket.Destroy[handle.socket];
handle.socket ← NIL;
};
CloseHandleInternal: ENTRY PROC [handle: Handle] RETURNS [alreadyDead: BOOL] = {
alreadyDeadhandle.dead;
handle.dead ← TRUE;
NOTIFY handle.outputReady;
};
SendAttention: PUBLIC PROC [self: IO.STREAM] = {
handle: Handle = NARROW[self.streamData];
WHILE handle.sendAttenIdSent = handle.sendAttenIdAcked DO
CheckForClosed[handle];
SendInt[handle];
WaitSendAttention[handle];
ENDLOOP;
handle.sendAttenIdSent ← handle.sendAttenIdSent.SUCC;
};
WaitAttention: PUBLIC PROC [self: IO.STREAM] = {
handle: Handle = NARROW[self.streamData];
WHILE handle.recvAttenIdArrived = handle.recvAttenIdSeen DO
CheckForClosed[handle];
WaitRecvAttention[handle];
ENDLOOP;
handle.recvAttenIdSeen ← handle.recvAttenIdSeen.SUCC;
};
Listeners
CreateListener: PUBLIC PROC [
local: Pup.Socket,
worker: PupStream.ListenerProc,
getTimeout: Milliseconds,
putTimeout: Milliseconds,
clientData: REF ANYNIL,
filter: PupStream.FilterProc ← NIL, -- NIL => Accept all requests
echoFilter: PupStream.FilterProc ← NIL] -- NIL => Answer all echos
RETURNS [listener: Listener] = {
IF ~PupName.IsWellKnown[local] THEN ERROR SocketNotWellKnown;
listener ← NEW[ListenerRep ← [
local: local,
worker: worker,
clientData: clientData,
getTimeout: getTimeout,
putTimeout: putTimeout,
filter: filter,
echoFilter: echoFilter,
listen: NIL ]];
listener.listen ← FORK Listen[listener];
};
DestroyListener: PUBLIC PROC [listener: Listener] = {
IF listener.listen = NIL THEN RETURN;
TRUSTED {
Process.Abort[listener.listen];
JOIN listener.listen; };
listener.listen ← NIL;
};
Listen: PROC [listener: Listener] = {
socket: Socket ← PupSocket.CreateServer[
local: listener.local,
sendBuffers: 0,
recvBuffers: 2,
getTimeout: PupSocket.waitForever];
DO ENABLE ABORTED => EXIT;
b: PupBuffer.Buffer ← PupSocket.Get[socket];
remote: Pup.Address ← b.source;
LocalSendRFCReply: PROC [handle: Handle, b: PupBuffer.Buffer] = BEGIN
Moved from ListenInit in order to rfc from the listener.local socket. (Diebert, et al Demers)
b.address ← PupSocket.GetLocalAddress[handle.socket]; -- The ephemeral socket
PupSocket.SetUserSize[b, SIZE[Pup.Address]]; -- Just because.
PupSocketBackdoor.ReturnToSenderNoFree[b];
END; -- LocalSendRFCReply
SELECT b.type FROM
echoMe => {
err: ROPENIL;
IF listener.echoFilter # NIL THEN err ← listener.echoFilter[listener.clientData, remote];
b.type ← iAmEcho;
IF err # NIL THEN {
b.type ← abort;
b.abort.code ← 0;
PupSocket.SetUserHWords[b, 1];
PupSocket.AppendRope[b, err]; };
PupSocketBackdoor.ReturnToSenderNoFree[b]; };
rfc => {
err: ROPENIL;
handle: Handle;
FOR handle: Handle ← chain, handle.next UNTIL handle = NIL DO
IF remote # handle.remote THEN LOOP;
This RFC corresponds to a known connection. Retransmit our answer. Can't use this buffer or accounting gets confused.
IF ~handle.dead THEN LocalSendRFCReply[handle, b];
duplicateRfcReceived ← duplicateRfcReceived.SUCC;
GOTO Processed;
ENDLOOP;
New RFC
IF listener.filter # NIL THEN err ← listener.filter[listener.clientData, remote];
IF err # NIL THEN { -- Reject
b.type ← abort;
b.abort.code ← 0;
PupSocket.SetUserHWords[b, 1];
PupSocket.AppendRope[b, err];
PupSocketBackdoor.ReturnToSenderNoFree[b];
GOTO Processed; };
Accept
handle ← NewObject[remote, b.id, NIL, listener.getTimeout, listener.putTimeout];
SetupObject[handle]; -- Moved from ListenInit in order to rfc from the listener.local socket.
LocalSendRFCReply[handle, b];
TRUSTED { Process.Detach[FORK ListenInit[listener, handle, remote]]; };
EXITS Processed => NULL; };
ENDCASE => NULL;
PupSocket.FreeBuffer[b];
ENDLOOP;
PupSocket.Destroy[socket];
};
ListenInit: PROC [listener: Listener, handle: Handle, remote: Pup.Address] = {
SetupObject[handle];
SendRfc[handle];
ExchangeAcks[handle];
listener.worker[HandleToStream[handle], listener.clientData, remote];
};
Rendezvous
AllocateSocket: PUBLIC PROC [remote: Pup.Address] RETURNS [sockets: Sockets] = {
socket: Socket ← PupSocket.CreateEphemeral[
remote: remote,
sendBuffers: defaultNumberOfBuffers + 2,
recvBuffers: 2*defaultNumberOfBuffers + 2,
getTimeout: 60000 ];
sockets ← NEW[SocketsRep ← [used: FALSE, socket: socket]];
};
LocalAddress: PUBLIC PROC [sockets: Sockets] RETURNS [local: Pup.Address] = {
local ← PupSocket.GetLocalAddress[sockets.socket];
};
RemoteAddress: PUBLIC PROC [sockets: Sockets] RETURNS [remote: Pup.Address] = {
remote ← PupSocket.GetRemoteAddress[sockets.socket];
};
SocketsFromStream: PUBLIC PROC [self: STREAM] RETURNS [sockets: Sockets] = {
handle: Handle = NARROW[self.streamData];
sockets ← NEW[SocketsRep ← [used: TRUE, socket: handle.socket]];
};
SocketsAlreadyUsed: PUBLIC ERROR = CODE;
WaitForRendezvous: PUBLIC PROC [sockets: Sockets, getTimeout, putTimeout, waitTimeout: Milliseconds] RETURNS [STREAM] = {
socket: Socket ← sockets.socket;
IF sockets.used THEN ERROR SocketsAlreadyUsed;
sockets.used ← TRUE;
PupSocket.SetGetTimeout[socket, waitTimeout];
DO
b: PupBuffer.Buffer ← PupSocket.Get[socket];
IF b = NIL THEN ERROR StreamClosing[transmissionTimeout, "No RFC arrived"];
Arg, Socket left dangling.
SELECT b.type FROM
rfc => {
handle: Handle;
PupSocket.FixupSourceAndDest[b];
PupSocket.SetRemoteAddress[socket, b.source];
handle ← NewObject[b.source, b.id, sockets, getTimeout, putTimeout];
PupSocket.FreeBuffer[b];
PupSocket.SetGetTimeout[socket, 250]; -- also in NewObject
SetupObject[handle];
SendRfc[handle];
ExchangeAcks[handle];
RETURN[HandleToStream[handle]]; };
ENDCASE => NULL;
PupSocket.FreeBuffer[b];
ENDLOOP;
};
ActivelyRendezvous: PUBLIC PROC [sockets: Sockets, getTimeout, putTimeout: Milliseconds] RETURNS [STREAM] = {
handle: Handle;
IF sockets.used THEN ERROR SocketsAlreadyUsed;
sockets.used ← TRUE;
handle ← NewObject[Pup.nullAddress, PupSocket.GetUniqueID[], sockets, getTimeout, putTimeout];
OpenConnection[handle];
SetupObject[handle];
ExchangeAcks[handle];
RETURN[HandleToStream[handle]];
};
Connection management
NewObject: PROC [
remote: Pup.Address, id: Endian.FWORD, sockets: Sockets, getTimeout, putTimeout: Milliseconds]
RETURNS [handle: Handle] = {
temp: LONG CARDINAL;
handle ← NEW[Object];
TRUSTED {
Process.SetTimeout[@handle.outputReady, Process.MsecToTicks[2000]];
Process.EnableAborts[@handle.outputSpace];
IF putTimeout # PupStream.waitForever THEN
Process.SetTimeout[@handle.outputFlushed, MillisecondsToTicks[putTimeout]]
ELSE Process.DisableTimeout[@handle.outputFlushed];
IF putTimeout # PupStream.waitForever THEN
Process.SetTimeout[@handle.outputSpace, MillisecondsToTicks[putTimeout]]
ELSE Process.DisableTimeout[@handle.outputSpace];
Process.EnableAborts[@handle.inputReady];
IF getTimeout # PupStream.waitForever THEN
Process.SetTimeout[@handle.inputReady, MillisecondsToTicks[getTimeout]]
ELSE Process.DisableTimeout[@handle.inputReady];
Process.SetTimeout[@handle.sendAttention, Process.MsecToTicks[9000]];
Process.EnableAborts[@handle.recvAttention];
Process.DisableTimeout[@handle.recvAttention];
Process.SetTimeout[@handle.stateChange, Process.MsecToTicks[4000]];
Process.SetTimeout[@handle.alloc, Process.MsecToTicks[5000]];
Process.SetTimeout[@handle.ack, Process.MsecToTicks[6000]]; };
IF sockets # NIL THEN handle.socket ← sockets.socket
ELSE handle.socket ← PupSocket.CreateEphemeral[
remote: remote,
sendBuffers: defaultNumberOfBuffers + 2,
recvBuffers: 2*defaultNumberOfBuffers + 2,
getTimeout: 250 ]; -- also in WaitForRendezvous
handle.remote ← PupSocket.GetRemoteAddress[handle.socket];
IF disableSoftwareChecksums THEN
PupSocketBackdoor.SetSoftwareChecksumming[handle.socket, FALSE, FALSE];
handle.connectionId ← id;
temp ← Endian.CardFromF[handle.connectionId];
handle.pullId ← handle.pushId ← handle.ackedId ← temp;
handle.sendAttenIdSent ← handle.sendAttenIdAcked ← temp;
handle.recvAttenIdArrived ← handle.recvAttenIdSeen ← temp;
};
SetupObject: PROC [handle: Handle] = {
AllocateBuffers[handle, defaultNumberOfBuffers, defaultNumberOfBuffers];
handle.retransmitter ← FORK Retransmitter[handle];
handle.push1 ← FORK Pusher[handle];
handle.push2 ← FORK Pusher[handle];
handle.push3 ← FORK Pusher[handle];
handle.pull ← FORK Puller[handle];
AddHandle[globalLock, handle];
SafeStorage.EnableFinalization[handle];
};
ExchangeAcks: PROC [handle: Handle] = {
SendAckFirst[handle]; -- First ack for free (IFS doesn't play this game)
IF handle.pushPacketSize = 0 THEN Process.Pause[2]; -- Dally (a wee bit) in case of free ack
FOR i: NAT IN [0..10) DO
IF handle.pushPacketSize # 0 THEN EXIT;
IF handle.err # NIL THEN EXIT;
SendProbe[handle];
WaitForAllocation[handle];
ENDLOOP;
IF handle.err = NIL THEN WaitForAllocation[handle]; -- NOTIFY only
IF handle.pushPacketSize = 0 THEN
SmashClosed[handle, transmissionTimeout, "No allocate (need packet size)"];
};
HandleToStream: PROC [handle: Handle] RETURNS [STREAM] = {
RETURN[IO.CreateStream[streamProcs: streamProcs, streamData: handle]];
};
OpenConnection: PROC [handle: Handle] = {
socket: Socket ← handle.socket;
retransmisson: BasicTime.Pulses ← oneSecond;
start: BasicTime.Pulses;
FOR i: NAT IN [0..10) DO
SendRfc[handle];
start ← BasicTime.GetClockPulses[];
retransmisson ← MIN[retransmisson*2, tenSeconds];
2+4+8+10+10+10+10+10+10+10 => 84 seconds before timeout
UNTIL ElapsedPulses[start] > retransmisson DO
b: Buffer ← PupSocket.Get[socket];
IF b = NIL THEN LOOP;
SELECT b.type FROM
rfc => {
handle.remote ← b.address;
PupSocket.SetRemoteAddress[socket, handle.remote];
PupSocket.FreeBuffer[b];
RETURN; };
error => GotError[handle, b];
abort => GotAbort[handle, b];
ENDCASE => NULL;
PupSocket.FreeBuffer[b];
CheckForClosed[handle];
ENDLOOP;
CheckForClosed[handle];
ENDLOOP;
SmashClosed[handle, transmissionTimeout, "No response from remote site"];
CheckForClosed[handle];
Arg, Socket left dangling.
};
CloseConnection: PROC [handle: Handle] = {
socket: Socket ← handle.socket;
IF handle.err # NIL THEN RETURN;
FOR i: NAT IN [0..10) DO
SendEnd[handle];
WaitStateChange[handle];
IF handle.err # NIL THEN RETURN;
ENDLOOP;
SendAbort[handle, "No Response to end"];
};
Utilities
ElapsedPulses: PROC [startTime: BasicTime.Pulses] RETURNS [BasicTime.Pulses] = INLINE {
RETURN[BasicTime.GetClockPulses[] - startTime]; };
Offset: PROC [to, from: LONG CARDINAL] RETURNS [INT] = TRUSTED INLINE {
RETURN[LOOPHOLE[(to-from), INT]]; };
EnoughWordsForBytes: PROC [bytes: NAT] RETURNS [NAT] = TRUSTED INLINE {
Round up so allocation or LongCopy gets everything.
The CARDINAL avoids a KFCB for a signed divide.
RETURN[CARDINAL[(bytes+Basics.bytesPerWord-1)]/Basics.bytesPerWord]; };
MillisecondsToTicks: PROC [ms: Milliseconds] RETURNS [Process.Ticks] = {
SELECT ms FROM
PupStream.waitForever => ERROR;
< CARDINAL.LAST => RETURN[Process.MsecToTicks[ms]];
ENDCASE => RETURN[Process.SecondsToTicks[ms/1000]];
};
Recv Side
Puller: PROC [handle: Handle] = {
socket: Socket ← handle.socket;
probes: CARDINAL ← 0;
timeOfLastArrival: BasicTime.Pulses ← handle.timeOfLastArrival;
Process.SetPriority[Process.priorityForeground];
IF useDirectInput THEN PupSocketBackdoor.SetDirectReceive[socket, Receive, handle];
UNTIL handle.err # NIL DO
b: Buffer ← PupSocket.Get[socket];
IF b = NIL THEN { -- Nothing arrived recently
IF ElapsedPulses[handle.timeOfLastArrival] < probePulses THEN {
probes ← 0;
timeOfLastArrival ← handle.timeOfLastArrival; }
ELSE {
IF ElapsedPulses[timeOfLastArrival] > (probes+1)*probePulses THEN {
We could actually get here by accident if we are receiving a steady stream of traffic long enough for the pulses clock to wrap around. That shouldn't do anything worse than inject an extra probe.
SendProbe[handle];
probes ← probes.SUCC; };
IF probes > 10 THEN
SmashClosed[handle, transmissionTimeout, "no response from remote host"]; };
LOOP; };
b ← Receive[socket, b, handle];
PupSocket.FreeBuffer[b];
ENDLOOP;
PupSocketBackdoor.SetDirectReceive[socket, NIL, NIL];
IF handle.why = remoteClose THEN { -- Dally in case he retransmits
DO
b: Buffer ← PupSocket.Get[socket];
IF b = NIL THEN EXIT;
SELECT b.type FROM
end => GotEnd[handle, b];
endRep => { PupSocket.FreeBuffer[b]; EXIT; };
ENDCASE => NULL;
PupSocket.FreeBuffer[b];
ENDLOOP; };
handle ← NIL; -- Allow Finalization
};
Receive: PROC [socket: Socket, b: Buffer, user: REF ANY] RETURNS [Buffer] = {
handle: Handle = NARROW[user];
SELECT b.type FROM
ack => ProcessAck[handle, b];
data, aData, mark, aMark => {
bytes: ByteIndex = PupSocket.GetUserBytes[b];
type: PupType.Type ← b.type;
IF bytes # 0 THEN b ← DataPacket[handle, b, bytes];
IF type = aData OR type = aMark THEN {
oldAllocNeeded: BOOL ← handle.allocNeeded;
handle.allocNeeded ← handle.inputBuffersAvailable < handle.inputBuffersToUse;
SELECT TRUE FROM
oldAllocNeeded => SendAckFirst[handle];
(bytes = 0) => SendAckFirst[handle];
~handle.allocNeeded => SendAckAgain[handle, b];
ENDCASE => acksDefered ← acksDefered.SUCC; }; };
end => GotEnd[handle, b];
endRep => GotEndReply[handle, b];
error => GotError[handle, b];
abort => GotAbort[handle, b];
int => GotInt[handle, b];
intRep => GotIntReply[handle, b];
rfc => NULL;
ENDCASE => funnyPacketType ← funnyPacketType.SUCC;
RETURN[b];
};
DataPacket: ENTRY PROC [handle: Handle, b: Buffer, bytes: ByteIndex] RETURNS [swap: Buffer] = {
offset: INT ← Offset[Endian.CardFromF[b.id], handle.pullId];
swap ← b;
SELECT offset FROM
< 0 => oldPackets ← oldPackets.SUCC;
> 0 => futurePackets ← futurePackets.SUCC; -- Missed something
0 => {
finger: Finger ← handle.inputWrite;
IF finger.state # empty THEN RETURN;
swap ← finger.buffer;
finger.buffer ← b;
finger.mark ← b.type = mark OR b.type = aMark;
finger.bytes ← bytes;
finger.state ← full;
handle.pullId ← handle.pullId + bytes;
handle.inputBuffersAvailable ← handle.inputBuffersAvailable.PRED;
handle.inputWrite ← finger.next;
NOTIFY handle.inputReady;
handle.timeOfLastArrival ← BasicTime.GetClockPulses[]; };
ENDCASE => ERROR;
};
GotEnd: PROC [handle: Handle, b: Buffer] = {
IF b.id # handle.connectionId THEN RETURN;
IF b.source # handle.remote THEN RETURN;
PupSocketBackdoor.SetDirectReceive[handle.socket, NIL, NIL];
IF handle.err = NIL THEN SmashClosed[handle, remoteClose, "Remote close"];
SELECT handle.why FROM
remoteClose => IF handle.outputBuffersReady # 0 THEN RETURN; -- Can't accept yet;
ENDCASE;
SendEndReplyFirst[handle, b];
};
GotEndReply: PROC [handle: Handle, b: Buffer] = {
IF b.id # handle.connectionId THEN RETURN;
IF b.source # handle.remote THEN RETURN;
PupSocketBackdoor.SetDirectReceive[handle.socket, NIL, NIL];
IF handle.err # NIL THEN RETURN;
SmashClosed[handle, localClose, "Local close"];
SendEndReplyFirst[handle, b];
};
GotError: PROC [handle: Handle, b: Buffer] = {
IF b.source # handle.remote THEN RETURN;
SELECT b.error.code FROM
noSocket => SmashClosed[handle, remoteReject, PupSocket.ExtractErrorRope[b]];
resourceLimits, gatewayResourceLimits =>
handle.outputBuffersToUse ← MIN[1, handle.outputBuffersToUse-1];
ENDCASE => RETURN; -- Not a fatal error
};
GotAbort: PROC [handle: Handle, b: Buffer] = {
IF b.id # handle.connectionId THEN RETURN;
IF b.source # handle.remote THEN RETURN;
SmashClosed[handle, remoteReject, PupSocket.ExtractAbortRope[b]];
};
GotInt: PROC [handle: Handle, b: Buffer] = {
id: LONG CARDINAL = Endian.CardFromF[b.id];
IF b.source # handle.remote THEN RETURN;
IF Offset[id, handle.recvAttenIdArrived] = 1 THEN handle.recvAttenIdArrived ← id;
SendIntReplyFirst[handle, b];
NotifyRecvAttention[handle];
};
GotIntReply: PROC [handle: Handle, b: Buffer] = {
id: LONG CARDINAL = Endian.CardFromF[b.id];
IF b.source # handle.remote THEN RETURN;
IF Offset[id, handle.sendAttenIdSent] = 1 THEN handle.sendAttenIdAcked ← id;
NotifySendAttention[handle];
};
SendRfc: PROC [handle: Handle] = {
socket: Socket ← handle.socket;
b: Buffer ← PupSocket.AllocBuffer[socket];
b.type ← rfc;
b.id ← handle.connectionId;
b.address ← PupSocket.GetLocalAddress[socket];
PupSocket.SetUserSize[b, SIZE[Pup.Address]];
PupSocketBackdoor.PutFirst[socket, b];
PupSocket.FreeBuffer[b];
};
SendAbort: PROC [handle: Handle, err: ROPE] = {
socket: Socket ← handle.socket;
b: Buffer ← PupSocket.AllocBuffer[socket];
b.type ← abort;
b.id ← handle.connectionId;
b.abort.code ← 0;
PupSocket.SetUserHWords[b, 1];
PupSocket.AppendRope[b, err];
PupSocket.Put[socket, b];
IF handle.err # NIL THEN RETURN;
SmashClosed[handle, localAbort, err];
};
MaybeSendAck: PROC [handle: Handle] = {
socket: Socket;
b: Buffer;
IF ~handle.allocNeeded THEN RETURN;
IF handle.inputBuffersAvailable < handle.inputBuffersToUse THEN RETURN;
socket ← handle.socket;
b ← PupSocket.AllocBuffer[socket];
SendAckAgain[handle, b];
PupSocket.FreeBuffer[b];
};
SendAckFirst: PROC [handle: Handle] = {
socket: Socket ← handle.socket;
b: Buffer ← PupSocket.AllocBuffer[socket];
bytes: INT = handle.inputBuffersAvailable*LONG[handle.bufferSize]; -- Overflow
maxBytes: NAT = 32000; -- ARG! IFS gets confused by CARDINAL.LAST
handle.allocNeeded ← handle.inputBuffersAvailable < handle.inputBuffersToUse;
b.type ← ack;
b.id ← Endian.FFromCard[handle.pullId];
TRUSTED {
b.body ← ack[
maxBytesPerPup: handle.bufferSize,
maxPupsAhead: handle.inputBuffersAvailable,
maxBytesAhead: CARDINAL[MIN[bytes, maxBytes]] ]; };
PupSocket.SetUserHWords[b, 3];
PupSocketBackdoor.PutFirst[socket, b];
PupSocket.FreeBuffer[b];
};
SendAckAgain: PROC [handle: Handle, b: Buffer] = {
socket: Socket ← handle.socket;
bytes: INT = handle.inputBuffersAvailable*LONG[handle.bufferSize]; -- Overflow
maxBytes: NAT = 32000; -- ARG! IFS gets confused by CARDINAL.LAST
handle.allocNeeded ← handle.inputBuffersAvailable < handle.inputBuffersToUse;
b.type ← ack;
b.id ← Endian.FFromCard[handle.pullId];
TRUSTED {
b.body ← ack[
maxBytesPerPup: handle.bufferSize,
maxPupsAhead: handle.inputBuffersAvailable,
maxBytesAhead: CARDINAL[MIN[bytes, maxBytes]] ]; };
PupSocket.SetUserHWords[b, 3];
PupSocketBackdoor.PutAgain[socket, b];
};
SendProbe: PROC [handle: Handle] = {
socket: Socket ← handle.socket;
b: Buffer ← PupSocket.AllocBuffer[socket];
b.type ← aData;
b.id ← Endian.FFromCard[handle.pushId];
PupSocket.SetUserBytes[b, 0];
PupSocketBackdoor.PutFirst[socket, b];
PupSocket.FreeBuffer[b];
};
SendEnd: PROC [handle: Handle] = {
socket: Socket ← handle.socket;
b: Buffer ← PupSocket.AllocBuffer[socket];
b.type ← end;
b.id ← handle.connectionId;
PupSocket.SetUserBytes[b, 0];
PupSocket.Put[socket, b];
};
SendEndReplyFirst: PROC [handle: Handle, b: Buffer] = {
socket: Socket ← handle.socket;
b.type ← endRep;
b.id ← handle.connectionId;
PupSocket.SetUserBytes[b, 0];
PupSocketBackdoor.PutFirst[socket, b];
};
SendInt: PROC [handle: Handle] = {
socket: Socket ← handle.socket;
b: Buffer ← PupSocket.AllocBuffer[socket];
b.type ← int;
b.id ← Endian.FFromCard[handle.sendAttenIdSent];
PupSocket.SetUserBytes[b, 0];
PupSocket.Put[socket, b];
};
SendIntReplyFirst: PROC [handle: Handle, b: Buffer] = {
socket: Socket ← handle.socket;
b.type ← intRep;
b.id ← Endian.FFromCard[handle.recvAttenIdArrived];
PupSocket.SetUserBytes[b, 0];
PupSocketBackdoor.PutFirst[socket, b];
};
Send Side
FinishOutputFinger: PROC [handle: Handle] = {
finger: Finger ← handle.outputWrite;
b: Buffer ← finger.buffer;
b.id ← Endian.FFromCard[handle.pushId];
handle.pushId ← handle.pushId + finger.index;
PupSocket.SetUserBytes[b, finger.index];
finger.inProgress ← TRUE;
finger.state ← full;
handle.outputWrite ← finger.next;
NotifyOutputReady[handle];
};
Pusher: PROC [handle: Handle] = {
socket: Socket ← handle.socket;
Process.SetPriority[CommDriver.sendPriority]; -- priorityClient3 = priorityForeground+1
DO
finger: Finger ← WaitOutputReady[handle];
b: Buffer;
IF handle.err # NIL THEN EXIT;
b ← finger.buffer;
IF finger.ack THEN b.type ← IF finger.mark THEN aMark ELSE aData
ELSE b.type ← IF finger.mark THEN mark ELSE data;
PupSocketBackdoor.PutAgain[socket, b];
IF finger.ack THEN handle.timeOfLastPush ← BasicTime.GetClockPulses[];
finger.inProgress ← FALSE;
ENDLOOP;
handle ← NIL; -- Allow Finalization
};
Beware: The fan gets real dirty if the Retransmitter and a Pusher try to send a packet at the same time.
Retransmitter: PROC [handle: Handle] = {
socket: Socket ← handle.socket;
GetValidOutputToAck: PROC RETURNS [Finger] ~ {
finger: Finger ← handle.outputToAck;
IF finger.state # full THEN ERROR;
WHILE finger.inProgress DO
Process.Pause[1];
IF handle.err # NIL THEN RETURN[NIL];
ENDLOOP;
RETURN [finger] };
Process.SetPriority[Process.priorityForeground];
UNTIL handle.err # NIL DO
WaitAck[handle]; -- 6 seconds
IF handle.err # NIL THEN EXIT;
IF NotifyOutputSpace[handle] THEN LOOP; -- Should calculate retransmit time ???
IF handle.outputBuffersReady = 0 THEN LOOP; -- Nothing to send
IF handle.pushPackets = 0 THEN { SendProbe[handle]; LOOP; }; -- Waiting for Alloc
handle.retransmitting ← TRUE;
clumpsRetransmitted ← clumpsRetransmitted.SUCC;
UNTIL handle.err # NIL DO -- Retransmit each packet until it gets acked
finger: Finger;
IF NotifyOutputSpace[handle] THEN EXIT;
IF (finger ← GetValidOutputToAck[]) = NIL THEN EXIT;
Process.Pause[2];
IF NotifyOutputSpace[handle] THEN EXIT;
IF (finger ← GetValidOutputToAck[]) = NIL THEN EXIT;
finger.buffer.type ← IF finger.mark THEN aMark ELSE aData;
PupSocketBackdoor.PutFirst[socket, finger.buffer];
packetsRetransmitted ← packetsRetransmitted.SUCC;
handle.timeOfLastPush ← BasicTime.GetClockPulses[];
WaitAck[handle]; -- 6 seconds
ENDLOOP;
ENDLOOP;
handle ← NIL; -- Allow Finalization
};
Synchronization
WaitInputReady: PROC [handle: Handle] = {
DO
CheckForClosed[handle];
WaitInputReadyInner[handle];
CheckForClosed[handle]; -- Avoid AddressFault if Closed
IF handle.inputRead.state = full THEN RETURN;
SIGNAL Timeout;
ENDLOOP;
};
WaitInputReadyInner: ENTRY PROC [handle: Handle] = INLINE {
ENABLE UNWIND => NULL;
IF handle.inputRead.state = full THEN RETURN;
WAIT handle.inputReady;
};
NotifyInputSpace: ENTRY PROC [handle: Handle] RETURNS [finger: Finger] = {
finger ← handle.inputRead;
finger.state ← empty;
finger ← finger.next;
handle.inputRead ← finger;
handle.inputBuffersAvailable ← handle.inputBuffersAvailable.SUCC;
};
WaitOutputSpace: PROC [handle: Handle] = {
DO
WaitOutputSpaceInner[handle];
CheckForClosed[handle]; -- Avoid AddressFault if Closed
IF handle.outputWrite.state = empty THEN RETURN;
SIGNAL Timeout;
ENDLOOP;
};
WaitOutputSpaceInner: ENTRY PROC [handle: Handle] = INLINE {
ENABLE UNWIND => NULL;
IF handle.outputWrite.state = empty THEN RETURN;
WAIT handle.outputSpace;
};
WaitOutputFlushed: ENTRY PROC [handle: Handle] = INLINE {
ENABLE UNWIND => NULL;
IF handle.outputBuffersReady = 0 THEN handle.flush ← FALSE;
IF ~handle.flush THEN RETURN;
WAIT handle.outputFlushed;
};
NotifyOutputSpace: ENTRY PROC [handle: Handle] RETURNS [progress: BOOLFALSE] = {
The main cause of the complexity here is that this routine does the NOTIFY to the pushing processes. Progress has 2 slightly different meanings. Normally, it indicates that an acked buffer was an aData or aMark (we asked for the ack). During retransmissions, it means that the last buffer to be sent was acked. There may be more buffers ready to send, but they haven't been sent yet.
DO
finger: Finger ← handle.outputToAck;
IF finger.state # full THEN EXIT;
IF finger.inProgress THEN EXIT;
IF Offset[handle.ackedId, Endian.CardFromF[finger.buffer.id]] <= 0 THEN EXIT;
IF finger.ack THEN progress ← ~handle.retransmitting;
finger.state ← empty;
handle.outputToAck ← finger.next;
handle.outputBuffersReady ← handle.outputBuffersReady.PRED;
handle.outputBuffersInFlight ← handle.outputBuffersInFlight.PRED;
NOTIFY handle.outputSpace;
ENDLOOP;
IF handle.retransmitting AND handle.outputBuffersInFlight = 0 THEN {
progress ← TRUE;
handle.retransmitting ← FALSE; };
IF handle.outputBuffersReady = 0 THEN {
handle.flush ← FALSE;
NOTIFY handle.outputFlushed; };
IF progress THEN { BROADCAST handle.alloc; BROADCAST handle.outputReady; };
};
WaitOutputReady: ENTRY PROC [handle: Handle] RETURNS [finger: Finger] = {
DO
UNTIL handle.pushPackets > handle.outputBuffersInFlight AND handle.pushBytes # 0 DO
IF handle.err # NIL THEN RETURN[NIL];
WAIT handle.alloc;
ENDLOOP;
IF handle.err # NIL THEN RETURN[NIL];
finger ← handle.outputRead;
SELECT finger.state FROM
full => IF finger.inProgress AND ~handle.retransmitting THEN EXIT;
ENDCASE => NULL;
WAIT handle.outputReady;
ENDLOOP;
handle.outputRead ← handle.outputRead.next;
handle.outputBuffersInFlight ← handle.outputBuffersInFlight.SUCC;
SELECT TRUE FROM
handle.pushPackets = handle.outputBuffersInFlight => finger.ack ← TRUE;
handle.flush AND finger.next.state # full => finger.ack ← TRUE;
handle.outputBuffersToUse = handle.outputBuffersInFlight => finger.ack ← TRUE;
handle.outputBuffersAllocated = handle.outputBuffersInFlight => finger.ack ← TRUE;
ENDCASE => finger.ack ← FALSE;
};
NotifyOutputReady: ENTRY PROC [handle: Handle] = {
handle.outputBuffersReady ← handle.outputBuffersReady.SUCC;
NOTIFY handle.outputReady;
};
WaitStateChange: ENTRY PROC [handle: Handle] = {
IF handle.err # NIL THEN RETURN;
WAIT handle.stateChange;
};
ProcessAck: ENTRY PROC [handle: Handle, b: Buffer] = {
offset: INT ← Offset[Endian.CardFromF[b.id], handle.ackedId];
SELECT offset FROM
< 0 => oldAcks ← oldAcks.SUCC;
ENDCASE => {
handle.ackedId ← handle.ackedId + offset;
handle.pushPacketSize ← MIN[b.maxBytesPerPup, handle.bufferSize];
handle.pushPackets ← b.maxPupsAhead;
handle.pushBytes ← b.maxBytesAhead;
IF handle.pushPackets # 0 THEN BROADCAST handle.ack; -- ExchangeAcks too
IF offset # 0 AND handle.pushPackets > handle.outputBuffersInFlight THEN
BROADCAST handle.alloc;
handle.timeOfLastArrival ← BasicTime.GetClockPulses[]; };
};
WaitAck: ENTRY PROC [handle: Handle] = {
finger: Finger ← handle.outputToAck;
fingerId: LONG CARDINAL = Endian.CardFromF[finger.buffer.id];
IF finger.state = full AND Offset[handle.ackedId, fingerId] > 0 THEN RETURN;
WAIT handle.ack;
};
WaitForAllocation: ENTRY PROC [handle: Handle] = {
IF handle.pushPackets = 0 OR handle.pushBytes = 0 THEN WAIT handle.ack;
NOTIFY handle.alloc;
};
NotifyRecvAttention: ENTRY PROC [handle: Handle] = {
NOTIFY handle.recvAttention;
};
WaitRecvAttention: ENTRY PROC [handle: Handle] = {
ENABLE UNWIND => NULL;
WAIT handle.recvAttention;
};
NotifySendAttention: ENTRY PROC [handle: Handle] = {
NOTIFY handle.sendAttention;
};
WaitSendAttention: ENTRY PROC [handle: Handle] = {
ENABLE UNWIND => NULL;
WAIT handle.sendAttention;
};
CheckForClosed: PROC [handle: Handle] = {
IF handle.err # NIL THEN ERROR StreamClosing[handle.why, handle.err];
};
SmashClosed: ENTRY PROC [handle: Handle, why: PupStream.CloseReason, err: ROPE] = {
Kill the stream and wakeup everybody waiting for anything connected with this stream. All processes that WAIT should check handle.err when they wakeup and arrange to go away if it is not NIL.
IF handle.err # NIL THEN RETURN;
handle.why ← why;
handle.err ← err;
BROADCAST handle.stateChange;
BROADCAST handle.outputReady;
BROADCAST handle.outputFlushed;
BROADCAST handle.outputSpace;
BROADCAST handle.inputReady;
BROADCAST handle.alloc;
BROADCAST handle.ack;
BROADCAST handle.sendAttention;
BROADCAST handle.recvAttention;
PupSocket.Kick[handle.socket];
};
Buffer Allocation
GetBufferSize: PROC [remote: Pup.Address] RETURNS [bufferSizes: NAT] = {
hop: PupHop.Hop = PupHop.GetHop[remote.net];
IF hop # 0 THEN RETURN[MIN[maxBufferSize, maxNewGatewayBytes]];
Assume that all Cedar machines are going through new Gateways
RETURN[MIN[maxBufferSize, maxDataBytes]];
};
AllocateBuffers: PROC [handle: Handle, outputBuffers, inputBuffers: NAT] = TRUSTED {
socket: PupSocket.Socket ← handle.socket;
handle.bufferSize ← GetBufferSize[handle.remote];
FOR i: NAT IN [0..outputBuffers) DO
finger: Finger ← NEW[FingerObject ← []];
finger.buffer ← PupSocket.AllocBuffer[socket];
finger.buffer.id ← handle.connectionId;
IF handle.outputRead = NIL THEN handle.outputRead ← finger;
finger.next ← handle.outputWrite;
handle.outputWrite ← finger;
ENDLOOP;
handle.outputRead.next ← handle.outputWrite;
handle.outputRead ← handle.outputWrite;
handle.outputToAck ← handle.outputWrite;
FOR i: NAT IN [0..inputBuffers) DO
finger: Finger ← NEW[FingerObject ← []];
finger.buffer ← PupSocketBackdoor.AllocRecvBuffer[socket];
IF handle.inputRead = NIL THEN handle.inputRead ← finger;
finger.next ← handle.inputWrite;
handle.inputWrite ← finger;
ENDLOOP;
handle.inputRead.next ← handle.inputWrite;
handle.inputRead ← handle.inputWrite;
handle.outputBuffersToUse ← (outputBuffers+1)/2; -- Double Buffering
handle.outputBuffersAllocated ← outputBuffers;
handle.outputBuffersReady ← 0;
handle.inputBuffersToUse ← (inputBuffers+1)/2; -- Double Buffering
handle.inputBuffersAllocated ← inputBuffers;
handle.inputBuffersAvailable ← inputBuffers;
};
FreeBuffers: ENTRY PROC [handle: Handle] = TRUSTED {
socket: PupSocket.Socket ← handle.socket;
FOR i: NAT IN [0..handle.outputBuffersAllocated) DO
finger: Finger ← handle.outputWrite;
handle.outputWrite ← finger.next;
IF finger.buffer # NIL THEN PupSocket.FreeBuffer[finger.buffer];
finger.next ← NIL;
ENDLOOP;
FOR i: NAT IN [0..handle.inputBuffersAllocated) DO
finger: Finger ← handle.inputWrite;
handle.inputWrite ← finger.next;
IF finger.buffer # NIL THEN PupSocket.FreeBuffer[finger.buffer];
finger.next ← NIL;
ENDLOOP;
handle.outputToAck ← NIL;
handle.outputRead ← NIL;
handle.outputWrite ← NIL;
handle.inputRead ← NIL;
handle.inputWrite ← NIL;
};
Chain of Streams
There are two times when we need to be able to locate all of the streams: 1) Smashing them after a rollback, and 2) processing duplicate RFCs.
globalLock: Handle ← NEW[Object];
chain: Handle ← NIL;
AddHandle: ENTRY PROC [handle: Handle, new: Handle] = {
Beware: handle is the global lock, new is the real handle
new.next ← chain;
chain ← new;
};
RemoveHandle: ENTRY PROC [handle: Handle, old: Handle] = {
Beware: handle is the global lock, old is the real handle
IF chain = old THEN {
chain ← chain.next;
old.next ← NIL; -- Help Finalization
RETURN; };
FOR finger: Handle ← chain, finger.next DO -- NIL fault if not on chain
IF finger.next = old THEN {
finger.next ← old.next;
old.next ← NIL; -- Help Finalization
RETURN; };
ENDLOOP;
};
Rollback: Booting.RollbackProc = {
FOR handle: Handle ← chain, handle.next UNTIL handle = NIL DO
IF handle.err # NIL THEN LOOP;
SmashClosed[handle, localClose, "Rollback"];
ENDLOOP;
};
Finalization
Making dropped streams get finalized is a bit tricky. In order to get the use count to drop to 0, Push and Pull must 1) notice when the stream dies, and 2) NIL out their copy of handle as they return. (The frame doesn't get recycled until after the JOIN.) The current code won't do anything with dropped streams unless they get closed by the other end.
StreamFinalizer: PROC = {
Process.SetPriority[Process.priorityBackground];
DO
handle: Handle ← NARROW[SafeStorage.FQNext[sfq]];
IF ~handle.dead THEN { -- User forgot to call Destroy
SafeStorage.EnableFinalization[handle];
IF handle.err = NIL THEN SendAbort[handle, "Client dropped Stream"];
CloseHandle[handle];
droppedStreams ← droppedStreams.SUCC; }
ELSE { -- Normal end of life
RemoveHandle[globalLock, handle];
finishedStreams ← finishedStreams.SUCC; };
handle ← NIL;
ENDLOOP;
};
DropTest: PROC [n: NAT ← 25] = {
where: Pup.Address ← [[3],[17B],[0,0,0,1]]; --Ivy
FOR i: NAT IN [0..3) DO
foo: STREAM ← Create[where, 1000, 1000];
IO.Close[foo];
IO.Close[foo];
Process.Pause[Process.MsecToTicks[10000]];
ENDLOOP;
FOR i: NAT IN [0..n) DO
foo: STREAM ← Create[where, 1000, 1000];
Abort[foo," Testing..."];
Process.Pause[Process.MsecToTicks[10000]];
ENDLOOP;
};
sfq: SafeStorage.FinalizationQueue ← SafeStorage.NewFQ[];
SafeStorage.EstablishFinalization[type: Object.CODE, npr: 1, fq: sfq];
TRUSTED { Process.Detach[FORK StreamFinalizer[]]; };
Booting.RegisterProcs[r: Rollback];
}.