DIRECTORY
Basics USING [bytesPerWord, UnsafeBlock],
BasicTime USING [GetClockPulses, MicrosecondsToPulses, Pulses],
Booting USING [RegisterProcs, RollbackProc],
CommDriver USING [sendPriority],
Endian USING [CardFromF, FFromCard, FWORD],
IO USING [Close, CreateStream, CreateStreamProcs, EndOfStream, STREAM, StreamProcs],
PrincOpsUtils USING [ByteBlt, LongCopy],
Process USING [Abort, Detach, DisableTimeout, EnableAborts, MsecToTicks, Pause, priorityBackground, priorityForeground, SecondsToTicks, SetPriority, SetTimeout, Ticks],
Pup USING [Address, nullAddress, Socket],
PupBuffer USING [Buffer, BufferObject, ByteIndex, maxDataBytes, maxNewGatewayBytes],
PupHop USING [GetHop, Hop],
PupName USING [IsWellKnown],
PupSocket USING [AllocBuffer, AppendRope, CreateServer, CreateEphemeral, Destroy, ExtractAbortRope, ExtractErrorRope, FixupSourceAndDest, FreeBuffer, Get, GetLocalAddress, GetRemoteAddress, GetUserBytes, GetUniqueID, Kick, Put, SetGetTimeout, SetRemoteAddress, SetUserBytes, SetUserHWords, SetUserSize, Socket, waitForever],
PupSocketBackdoor USING [AllocRecvBuffer, PutAgain, PutFirst, ReturnToSenderNoFree, SetDirectReceive, SetSoftwareChecksumming],
PupStream USING [CloseReason, FilterProc, ListenerProc, MARK, Milliseconds, waitForever],
PupStreamBackdoor USING [],
PupType USING [Type],
Rope USING [ROPE],
SafeStorage USING [EnableFinalization, EstablishFinalization, FinalizationQueue, FQNext, NewFQ];
Our Types
Finger: TYPE = REF FingerObject;
FingerObject:
TYPE =
RECORD [
state: {empty, halfFull, halfEmpty, full} ← empty,
index: ByteIndex ← 0,
bytes: ByteIndex ← 0,
next: Finger ← NIL,
mark: BOOL ← FALSE,
ack: BOOL ← FALSE,
inProgress: BOOL ← FALSE,
buffer: Buffer ];
Handle: TYPE = REF Object;
Object:
TYPE =
MONITORED
RECORD [
outputWrite, outputRead, outputToAck: Finger ← NIL,
inputWrite, inputRead: Finger ← NIL,
socket: Socket ← NIL,
remote: Pup.Address ← NULL,
connectionId: Endian.FWORD,
outputFlushed: CONDITION,
outputSpace: CONDITION,
outputReady: CONDITION,
retransmitter, push1, push2, push3: PROCESS,
pushId: LONG CARDINAL,
ackedId: LONG CARDINAL,
pushPackets: CARDINAL ← 0,
pushBytes: CARDINAL ← 0,
pushPacketSize: CARDINAL ← 0,
outputBuffersToUse: CARDINAL ← 0,
outputBuffersAllocated: CARDINAL ← 0,
outputBuffersReady: CARDINAL ← 0,
outputBuffersInFlight: CARDINAL ← 0,
timeOfLastPush: BasicTime.Pulses ← BasicTime.GetClockPulses[],
retransmitting: BOOL ← FALSE,
flush: BOOL ← FALSE,
inputReady: CONDITION,
pull: PROCESS,
pullId: LONG CARDINAL,
pullPackets: CARDINAL ← 0,
pullBytes: CARDINAL ← 0,
inputBuffersToUse: CARDINAL ← 0,
inputBuffersAllocated: CARDINAL ← 0,
inputBuffersAvailable: CARDINAL ← 0,
timeOfLastArrival: BasicTime.Pulses ← BasicTime.GetClockPulses[],
stateChange: CONDITION,
alloc: CONDITION,
ack: CONDITION,
allocNeeded: BOOL ← FALSE,
Attentions: Out of band signaling
sendAttenIdSent: LONG CARDINAL,
sendAttenIdAcked: LONG CARDINAL,
sendAttention: CONDITION, -- ABORTs NOT ENABLED
recvAttenIdArrived: LONG CARDINAL,
recvAttenIdSeen: LONG CARDINAL,
recvAttention: CONDITION,
err: ROPE ← NIL, -- # NIL => StreamClosed
why: CloseReason ← NULL,
bufferSize: NAT,
next: Handle ← NIL,
dead: BOOL ← FALSE ];
Listener: TYPE = REF ListenerRep;
ListenerRep:
PUBLIC
TYPE =
RECORD [
local: Pup.Socket,
worker: PupStream.ListenerProc,
clientData: REF ANY,
getTimeout: Milliseconds,
putTimeout: Milliseconds,
filter: PupStream.FilterProc, -- NIL => Accept all requests
echoFilter: PupStream.FilterProc, -- NIL => Answer all echos
listen: PROCESS ];
Sockets: TYPE = REF SocketsRep;
SocketsRep:
PUBLIC
TYPE =
RECORD [
used: BOOL,
socket: Socket ];
Byte Stream Interface
Create:
PUBLIC
PROC [remote: Pup.Address, getTimeout, putTimeout: Milliseconds]
RETURNS [
STREAM] = {
handle: Handle ← NewObject[remote, PupSocket.GetUniqueID[], NIL, getTimeout, putTimeout];
OpenConnection[handle];
SetupObject[handle];
ExchangeAcks[handle];
RETURN[HandleToStream[handle]];
};
Abort:
PUBLIC
PROC [self:
STREAM, err:
ROPE] = {
handle: Handle = NARROW[self.streamData];
IF handle.err # NIL THEN RETURN;
SendAbort[handle, err];
};
SendMark:
PUBLIC
PROC [self:
STREAM, mark:
MARK] = {
handle: Handle = NARROW[self.streamData];
finger: Finger;
DO
CheckForClosed[handle];
finger ← handle.outputWrite;
SELECT finger.state
FROM
empty => {
finger.state ← halfFull;
finger.mark ← TRUE;
finger.index ← 1;
finger.bytes ← handle.pushPacketSize;
EXIT; };
halfFull => FinishOutputFinger[handle];
ENDCASE => WaitOutputSpace[handle];
ENDLOOP;
finger.buffer.byte[0] ← mark;
handle.flush ← TRUE;
FinishOutputFinger[handle];
};
ConsumeMark:
PUBLIC
PROC [self:
STREAM]
RETURNS [mark:
MARK] = {
handle: Handle = NARROW[self.streamData];
finger: Finger ← handle.inputRead;
DO
SELECT finger.state
FROM
full => {
finger.state ← halfEmpty;
finger.index ← 0; };
halfEmpty => NULL;
ENDCASE => WaitInputReady[handle];
IF finger.mark THEN EXIT;
finger ← NotifyInputSpace[handle];
MaybeSendAck[handle];
ENDLOOP;
mark ← finger.buffer.byte[0];
[] ← NotifyInputSpace[handle];
MaybeSendAck[handle];
};
GetChar:
PROC [self:
STREAM]
RETURNS [char:
CHAR] = {
handle: Handle = NARROW[self.streamData];
finger: Finger ← handle.inputRead;
index: ByteIndex;
DO
SELECT finger.state
FROM
full => {
finger.state ← halfEmpty;
finger.index ← 0;
EXIT; };
halfEmpty => EXIT;
ENDCASE => WaitInputReady[handle];
ENDLOOP;
IF finger.mark THEN ERROR IO.EndOfStream[self];
index ← finger.index;
char ← finger.buffer.char[index];
index ← index.SUCC;
finger.index ← index;
IF finger.index = finger.bytes
THEN {
[] ← NotifyInputSpace[handle];
MaybeSendAck[handle]; };
};
EndOf:
PROC [self:
STREAM]
RETURNS [
BOOL] = {
handle: Handle = NARROW[self.streamData];
finger: Finger ← handle.inputRead;
SELECT finger.state
FROM
empty => NULL;
halfEmpty, full => RETURN[finger.mark];
ENDCASE => NULL;
CheckForClosed[handle];
RETURN[FALSE];
};
CharsAvail:
PROC [self:
STREAM, wait:
BOOL]
RETURNS [
INT] = {
handle: Handle = NARROW[self.streamData];
finger: Finger ← handle.inputRead;
DO
SELECT finger.state
FROM
full => {
finger.state ← halfEmpty;
finger.index ← 0;
EXIT; };
halfEmpty => EXIT;
ENDCASE => IF wait THEN WaitInputReady[handle] ELSE RETURN[0];
ENDLOOP;
IF finger.mark THEN RETURN[INT.LAST];
RETURN[finger.bytes-finger.index];
};
UnsafeGetBlock:
PROC [self:
STREAM, block: Basics.UnsafeBlock]
RETURNS [nBytesRead:
INT ← 0] = {
handle: Handle = NARROW[self.streamData];
finger: Finger ← handle.inputRead;
base: LONG POINTER = LOOPHOLE[block.base];
startIndex: INT ← block.startIndex;
stop: INT = block.startIndex + block.count;
nBytes: NAT;
WHILE startIndex < stop
DO
DO
SELECT finger.state
FROM
full => {
finger.state ← halfEmpty;
finger.index ← 0;
EXIT; };
halfEmpty => EXIT;
ENDCASE => {
IF handle.err #
NIL
AND nBytesRead # 0
THEN
RETURN;
Give user tail of data before StreamClosing
WaitInputReady[handle]; };
ENDLOOP;
IF finger.mark THEN RETURN; -- EOF
IF useInLineCopy
AND WordAligned[startIndex] AND WordAligned[stop]
AND WordAligned[finger.index]
AND WordAligned[finger.bytes]
THEN {
Special check for the normal/easy case.
toBytes: NAT = stop-startIndex;
fromBytes: NAT = finger.bytes-finger.index;
words: NAT = MIN[toBytes, fromBytes] / Basics.bytesPerWord; -- Round down
TRUSTED {
PrincOpsUtils.LongCopy[
to: base + startIndex / Basics.bytesPerWord,
nwords: words,
from: @finger.buffer.byte + finger.index / Basics.bytesPerWord]; };
nBytes ← words * Basics.bytesPerWord; }
ELSE TRUSTED {
nBytes ← PrincOpsUtils.ByteBlt[
to: [
blockPointer: base,
startIndex: startIndex,
stopIndexPlusOne: stop],
from: [
blockPointer: @finger.buffer.byte,
startIndex: finger.index,
stopIndexPlusOne: finger.bytes] ]; };
startIndex ← startIndex + nBytes;
finger.index ← finger.index + nBytes;
IF finger.index = finger.bytes
THEN {
finger ← NotifyInputSpace[handle];
MaybeSendAck[handle]; };
nBytesRead ← nBytesRead + nBytes;
ENDLOOP;
};
GetBlock:
PROC [self:
STREAM, block:
REF
TEXT, startIndex:
NAT ← 0, count:
NAT ←
NAT.
LAST]
RETURNS [nBytesRead:
NAT ← 0] = {
handle: Handle = NARROW[self.streamData];
finger: Finger ← handle.inputRead;
base: LONG POINTER = LOOPHOLE[block, LONG POINTER]+TEXT[0].SIZE;
stop: NAT = MIN[(startIndex+count), block.maxLength];
nBytes: NAT;
WHILE startIndex < stop
DO
DO
SELECT finger.state
FROM
full => {
finger.state ← halfEmpty;
finger.index ← 0;
EXIT; };
halfEmpty => EXIT;
ENDCASE => {
IF handle.err #
NIL
AND nBytesRead # 0
THEN
RETURN;
Give user tail of data before StreamClosing
WaitInputReady[handle]; };
ENDLOOP;
IF finger.mark THEN RETURN; -- EOF
IF useInLineCopy
AND WordAligned[startIndex] AND WordAligned[stop]
AND WordAligned[finger.index]
AND WordAligned[finger.bytes]
THEN {
Special check for the normal/easy case.
toBytes: NAT = stop-startIndex;
fromBytes: NAT = finger.bytes-finger.index;
words: NAT = MIN[toBytes, fromBytes] / Basics.bytesPerWord; -- Round down
TRUSTED {
PrincOpsUtils.LongCopy[
to: base + startIndex / Basics.bytesPerWord,
nwords: words,
from: @finger.buffer.byte + finger.index / Basics.bytesPerWord]; };
nBytes ← words * Basics.bytesPerWord; }
ELSE TRUSTED {
nBytes ← PrincOpsUtils.ByteBlt[
to: [
blockPointer: base,
startIndex: startIndex,
stopIndexPlusOne: stop],
from: [
blockPointer: @finger.buffer.byte,
startIndex: finger.index,
stopIndexPlusOne: finger.bytes] ]; };
startIndex ← startIndex + nBytes;
block.length ← startIndex;
finger.index ← finger.index + nBytes;
IF finger.index = finger.bytes
THEN {
finger ← NotifyInputSpace[handle];
MaybeSendAck[handle]; };
nBytesRead ← nBytesRead + nBytes;
ENDLOOP;
};
PutChar:
PROC [self:
STREAM, char:
CHAR] = {
handle: Handle = NARROW[self.streamData];
finger: Finger ← handle.outputWrite;
index: ByteIndex;
DO
CheckForClosed[handle];
SELECT finger.state
FROM
empty => {
finger.state ← halfFull;
finger.mark ← FALSE;
finger.index ← 0;
finger.bytes ← handle.pushPacketSize;
EXIT; };
halfFull => EXIT;
ENDCASE => WaitOutputSpace[handle];
ENDLOOP;
index ← finger.index;
finger.buffer.char[finger.index] ← char;
index ← index.SUCC;
finger.index ← index;
IF finger.index = finger.bytes THEN FinishOutputFinger[handle];
};
UnsafePutBlock:
PROC [self:
STREAM, block: Basics.UnsafeBlock] = {
handle: Handle = NARROW[self.streamData];
base: LONG POINTER = LOOPHOLE[block.base];
startIndex: INT ← block.startIndex;
stop: INT = block.startIndex + block.count;
nBytes: INT;
WHILE startIndex < stop
DO
finger: Finger ← handle.outputWrite;
DO
CheckForClosed[handle];
SELECT finger.state
FROM
empty => {
finger.state ← halfFull;
finger.mark ← FALSE;
finger.index ← 0;
finger.bytes ← handle.pushPacketSize;
EXIT; };
halfFull => EXIT;
ENDCASE => WaitOutputSpace[handle];
ENDLOOP;
IF useInLineCopy
AND WordAligned[startIndex] AND WordAligned[stop]
AND WordAligned[finger.index]
AND WordAligned[finger.bytes]
THEN {
Special check for the normal/easy case.
toBytes: NAT = finger.bytes-finger.index;
fromBytes: NAT = stop-startIndex;
words: NAT = MIN[toBytes, fromBytes] / Basics.bytesPerWord; -- Round down
TRUSTED {
PrincOpsUtils.LongCopy[
to: @finger.buffer.byte + (finger.index / Basics.bytesPerWord),
nwords: words,
from: base + (startIndex / Basics.bytesPerWord)]; };
nBytes ← words * Basics.bytesPerWord; }
ELSE TRUSTED {
nBytes ← PrincOpsUtils.ByteBlt[
to: [
blockPointer: @finger.buffer.byte,
startIndex: finger.index,
stopIndexPlusOne: finger.bytes],
from: [
blockPointer: base,
startIndex: startIndex,
stopIndexPlusOne: stop] ]; };
startIndex ← startIndex + nBytes;
finger.index ← finger.index + nBytes;
IF finger.index = finger.bytes THEN FinishOutputFinger[handle];
ENDLOOP;
};
PutBlock:
PROC [self:
STREAM, block:
REF
READONLY
TEXT, startIndex:
NAT ← 0, count:
NAT ←
NAT.LAST] = {
handle: Handle = NARROW[self.streamData];
base: LONG POINTER = LOOPHOLE[block, LONG POINTER]+TEXT[0].SIZE;
stop: NAT = MIN[(startIndex+count), block.length];
nBytes: NAT;
WHILE startIndex < stop
DO
finger: Finger ← handle.outputWrite;
DO
CheckForClosed[handle];
SELECT finger.state
FROM
empty => {
finger.state ← halfFull;
finger.mark ← FALSE;
finger.index ← 0;
finger.bytes ← handle.pushPacketSize;
EXIT; };
halfFull => EXIT;
ENDCASE => WaitOutputSpace[handle];
ENDLOOP;
IF useInLineCopy
AND WordAligned[startIndex] AND WordAligned[stop]
AND WordAligned[finger.index]
AND WordAligned[finger.bytes]
THEN {
Special check for the normal/easy case.
toBytes: NAT = finger.bytes-finger.index;
fromBytes: NAT = stop-startIndex;
words: NAT = MIN[toBytes, fromBytes] / Basics.bytesPerWord; -- Round down
TRUSTED {
PrincOpsUtils.LongCopy[
to: @finger.buffer.byte + (finger.index / Basics.bytesPerWord),
nwords: words,
from: base + (startIndex / Basics.bytesPerWord)]; };
nBytes ← words * Basics.bytesPerWord; }
ELSE TRUSTED {
nBytes ← PrincOpsUtils.ByteBlt[
to: [
blockPointer: @finger.buffer.byte,
startIndex: finger.index,
stopIndexPlusOne: finger.bytes],
from: [
blockPointer: base,
startIndex: startIndex,
stopIndexPlusOne: stop] ]; };
startIndex ← startIndex + nBytes;
finger.index ← finger.index + nBytes;
IF finger.index = finger.bytes THEN FinishOutputFinger[handle];
ENDLOOP;
};
Push:
PUBLIC
PROC [self:
STREAM] = {
handle: Handle = NARROW[self.streamData];
SELECT handle.outputWrite.state
FROM
halfFull => FinishOutputFinger[handle];
ENDCASE => NULL;
};
Flush:
PROC [self:
STREAM] = {
handle: Handle = NARROW[self.streamData];
SELECT handle.outputWrite.state
FROM
halfFull => { handle.flush ← TRUE; FinishOutputFinger[handle]; };
ENDCASE => NULL;
IF handle.outputBuffersReady = 0 THEN RETURN; -- Skip closed test
IF ~handle.flush THEN SendProbe[handle];
handle.flush ← TRUE;
WHILE handle.flush
DO
CheckForClosed[handle];
DO
WaitOutputFlushed[handle];
CheckForClosed[handle];
IF ~handle.flush THEN RETURN;
SIGNAL Timeout;
ENDLOOP;
ENDLOOP;
};
Close:
PROC [self:
STREAM, abort:
BOOL] = {
handle: Handle = NARROW[self.streamData];
IF handle = NIL THEN RETURN;
IF abort AND handle.err = NIL THEN Abort[self, "Close with Abort"];
IF ~abort AND handle.err = NIL THEN Flush[self ! StreamClosing, Timeout => CONTINUE ];
IF handle.outputBuffersReady # 0
AND handle.err =
NIL
THEN
Abort[self, "Close with unFlushed data"];
IF handle.err = NIL THEN CloseConnection[handle];
self.streamData ← NIL;
CloseHandle[handle];
};
CloseHandle:
PROC [handle: Handle] = {
IF CloseHandleInternal[handle] THEN RETURN;
TRUSTED {
JOIN handle.retransmitter;
JOIN handle.push1;
JOIN handle.push2;
JOIN handle.push3;
JOIN handle.pull; };
FreeBuffers[handle];
PupSocket.Destroy[handle.socket];
handle.socket ← NIL;
};
CloseHandleInternal:
ENTRY
PROC [handle: Handle]
RETURNS [alreadyDead:
BOOL] = {
alreadyDead ← handle.dead;
handle.dead ← TRUE;
NOTIFY handle.outputReady;
};
SendAttention:
PUBLIC
PROC [self:
IO.
STREAM] = {
handle: Handle = NARROW[self.streamData];
WHILE handle.sendAttenIdSent = handle.sendAttenIdAcked
DO
CheckForClosed[handle];
SendInt[handle];
WaitSendAttention[handle];
ENDLOOP;
handle.sendAttenIdSent ← handle.sendAttenIdSent.SUCC;
};
WaitAttention:
PUBLIC PROC [self:
IO.
STREAM] = {
handle: Handle = NARROW[self.streamData];
WHILE handle.recvAttenIdArrived = handle.recvAttenIdSeen
DO
CheckForClosed[handle];
WaitRecvAttention[handle];
ENDLOOP;
handle.recvAttenIdSeen ← handle.recvAttenIdSeen.SUCC;
};
Rendezvous
AllocateSocket:
PUBLIC
PROC [remote: Pup.Address]
RETURNS [sockets: Sockets] = {
socket: Socket ← PupSocket.CreateEphemeral[
remote: remote,
sendBuffers: defaultNumberOfBuffers + 2,
recvBuffers: 2*defaultNumberOfBuffers + 2,
getTimeout: 60000 ];
sockets ← NEW[SocketsRep ← [used: FALSE, socket: socket]];
};
LocalAddress:
PUBLIC
PROC [sockets: Sockets]
RETURNS [local: Pup.Address] = {
local ← PupSocket.GetLocalAddress[sockets.socket];
};
RemoteAddress:
PUBLIC
PROC [sockets: Sockets]
RETURNS [remote: Pup.Address] = {
remote ← PupSocket.GetRemoteAddress[sockets.socket];
};
SocketsFromStream:
PUBLIC
PROC [self:
STREAM]
RETURNS [sockets: Sockets] = {
handle: Handle = NARROW[self.streamData];
sockets ← NEW[SocketsRep ← [used: TRUE, socket: handle.socket]];
};
SocketsAlreadyUsed: PUBLIC ERROR = CODE;
WaitForRendezvous:
PUBLIC
PROC [sockets: Sockets, getTimeout, putTimeout, waitTimeout: Milliseconds]
RETURNS [
STREAM] = {
socket: Socket ← sockets.socket;
IF sockets.used THEN ERROR SocketsAlreadyUsed;
sockets.used ← TRUE;
PupSocket.SetGetTimeout[socket, waitTimeout];
DO
b: PupBuffer.Buffer ← PupSocket.Get[socket];
IF b =
NIL
THEN
ERROR StreamClosing[transmissionTimeout, "No RFC arrived"];
Arg, Socket left dangling.
SELECT b.type
FROM
rfc => {
handle: Handle;
PupSocket.FixupSourceAndDest[b];
PupSocket.SetRemoteAddress[socket, b.source];
handle ← NewObject[b.source, b.id, sockets, getTimeout, putTimeout];
PupSocket.FreeBuffer[b];
PupSocket.SetGetTimeout[socket, 250]; -- also in NewObject
SetupObject[handle];
SendRfc[handle];
ExchangeAcks[handle];
RETURN[HandleToStream[handle]]; };
ENDCASE => NULL;
PupSocket.FreeBuffer[b];
ENDLOOP;
};
ActivelyRendezvous:
PUBLIC
PROC [sockets: Sockets, getTimeout, putTimeout: Milliseconds]
RETURNS [
STREAM] = {
handle: Handle;
IF sockets.used THEN ERROR SocketsAlreadyUsed;
sockets.used ← TRUE;
handle ← NewObject[Pup.nullAddress, PupSocket.GetUniqueID[], sockets, getTimeout, putTimeout];
OpenConnection[handle];
SetupObject[handle];
ExchangeAcks[handle];
RETURN[HandleToStream[handle]];
};
Connection management
NewObject:
PROC [
remote: Pup.Address, id: Endian.FWORD, sockets: Sockets, getTimeout, putTimeout: Milliseconds]
RETURNS [handle: Handle] = {
temp: LONG CARDINAL;
handle ← NEW[Object];
TRUSTED {
Process.SetTimeout[@handle.outputReady, Process.MsecToTicks[2000]];
Process.EnableAborts[@handle.outputSpace];
IF putTimeout # PupStream.waitForever
THEN
Process.SetTimeout[@handle.outputFlushed, MillisecondsToTicks[putTimeout]]
ELSE Process.DisableTimeout[@handle.outputFlushed];
IF putTimeout # PupStream.waitForever
THEN
Process.SetTimeout[@handle.outputSpace, MillisecondsToTicks[putTimeout]]
ELSE Process.DisableTimeout[@handle.outputSpace];
Process.EnableAborts[@handle.inputReady];
IF getTimeout # PupStream.waitForever
THEN
Process.SetTimeout[@handle.inputReady, MillisecondsToTicks[getTimeout]]
ELSE Process.DisableTimeout[@handle.inputReady];
Process.SetTimeout[@handle.sendAttention, Process.MsecToTicks[9000]];
Process.EnableAborts[@handle.recvAttention];
Process.DisableTimeout[@handle.recvAttention];
Process.SetTimeout[@handle.stateChange, Process.MsecToTicks[4000]];
Process.SetTimeout[@handle.alloc, Process.MsecToTicks[5000]];
Process.SetTimeout[@handle.ack, Process.MsecToTicks[6000]]; };
IF sockets # NIL THEN handle.socket ← sockets.socket
ELSE handle.socket ← PupSocket.CreateEphemeral[
remote: remote,
sendBuffers: defaultNumberOfBuffers + 2,
recvBuffers: 2*defaultNumberOfBuffers + 2,
getTimeout: 250 ]; -- also in WaitForRendezvous
handle.remote ← PupSocket.GetRemoteAddress[handle.socket];
IF disableSoftwareChecksums
THEN
PupSocketBackdoor.SetSoftwareChecksumming[handle.socket, FALSE, FALSE];
handle.connectionId ← id;
temp ← Endian.CardFromF[handle.connectionId];
handle.pullId ← handle.pushId ← handle.ackedId ← temp;
handle.sendAttenIdSent ← handle.sendAttenIdAcked ← temp;
handle.recvAttenIdArrived ← handle.recvAttenIdSeen ← temp;
};
SetupObject:
PROC [handle: Handle] = {
AllocateBuffers[handle, defaultNumberOfBuffers, defaultNumberOfBuffers];
handle.retransmitter ← FORK Retransmitter[handle];
handle.push1 ← FORK Pusher[handle];
handle.push2 ← FORK Pusher[handle];
handle.push3 ← FORK Pusher[handle];
handle.pull ← FORK Puller[handle];
AddHandle[globalLock, handle];
SafeStorage.EnableFinalization[handle];
};
ExchangeAcks:
PROC [handle: Handle] = {
SendAckFirst[handle]; -- First ack for free (IFS doesn't play this game)
IF handle.pushPacketSize = 0 THEN Process.Pause[2]; -- Dally (a wee bit) in case of free ack
FOR i:
NAT
IN [0..10)
DO
IF handle.pushPacketSize # 0 THEN EXIT;
IF handle.err # NIL THEN EXIT;
SendProbe[handle];
WaitForAllocation[handle];
ENDLOOP;
IF handle.err = NIL THEN WaitForAllocation[handle]; -- NOTIFY only
IF handle.pushPacketSize = 0
THEN
SmashClosed[handle, transmissionTimeout, "No allocate (need packet size)"];
};
HandleToStream:
PROC [handle: Handle]
RETURNS [
STREAM] = {
RETURN[IO.CreateStream[streamProcs: streamProcs, streamData: handle]];
};
OpenConnection:
PROC [handle: Handle] = {
socket: Socket ← handle.socket;
retransmisson: BasicTime.Pulses ← oneSecond;
start: BasicTime.Pulses;
FOR i:
NAT
IN [0..10)
DO
SendRfc[handle];
start ← BasicTime.GetClockPulses[];
retransmisson ← MIN[retransmisson*2, tenSeconds];
2+4+8+10+10+10+10+10+10+10 => 84 seconds before timeout
UNTIL ElapsedPulses[start] > retransmisson
DO
b: Buffer ← PupSocket.Get[socket];
IF b = NIL THEN LOOP;
SELECT b.type
FROM
rfc => {
handle.remote ← b.address;
PupSocket.SetRemoteAddress[socket, handle.remote];
PupSocket.FreeBuffer[b];
RETURN; };
error => GotError[handle, b];
abort => GotAbort[handle, b];
ENDCASE => NULL;
PupSocket.FreeBuffer[b];
CheckForClosed[handle];
ENDLOOP;
CheckForClosed[handle];
ENDLOOP;
SmashClosed[handle, transmissionTimeout, "No response from remote site"];
CheckForClosed[handle];
Arg, Socket left dangling.
};
CloseConnection:
PROC [handle: Handle] = {
socket: Socket ← handle.socket;
IF handle.err # NIL THEN RETURN;
FOR i:
NAT
IN [0..10)
DO
SendEnd[handle];
WaitStateChange[handle];
IF handle.err # NIL THEN RETURN;
ENDLOOP;
SendAbort[handle, "No Response to end"];
};
Recv Side
Puller:
PROC [handle: Handle] = {
socket: Socket ← handle.socket;
probes: CARDINAL ← 0;
timeOfLastArrival: BasicTime.Pulses ← handle.timeOfLastArrival;
Process.SetPriority[Process.priorityForeground];
IF useDirectInput THEN PupSocketBackdoor.SetDirectReceive[socket, Receive, handle];
UNTIL handle.err #
NIL
DO
b: Buffer ← PupSocket.Get[socket];
IF b =
NIL
THEN {
-- Nothing arrived recently
IF ElapsedPulses[handle.timeOfLastArrival] < probePulses
THEN {
probes ← 0;
timeOfLastArrival ← handle.timeOfLastArrival; }
ELSE {
IF ElapsedPulses[timeOfLastArrival] > (probes+1)*probePulses
THEN {
We could actually get here by accident if we are receiving a steady stream of traffic long enough for the pulses clock to wrap around. That shouldn't do anything worse than inject an extra probe.
SendProbe[handle];
probes ← probes.SUCC; };
IF probes > 10
THEN
SmashClosed[handle, transmissionTimeout, "no response from remote host"]; };
LOOP; };
b ← Receive[socket, b, handle];
PupSocket.FreeBuffer[b];
ENDLOOP;
PupSocketBackdoor.SetDirectReceive[socket, NIL, NIL];
IF handle.why = remoteClose
THEN {
-- Dally in case he retransmits
DO
b: Buffer ← PupSocket.Get[socket];
IF b = NIL THEN EXIT;
SELECT b.type
FROM
end => GotEnd[handle, b];
endRep => { PupSocket.FreeBuffer[b]; EXIT; };
ENDCASE => NULL;
PupSocket.FreeBuffer[b];
ENDLOOP; };
handle ← NIL; -- Allow Finalization
};
Receive:
PROC [socket: Socket, b: Buffer, user:
REF
ANY]
RETURNS [Buffer] = {
handle: Handle = NARROW[user];
SELECT b.type
FROM
ack => ProcessAck[handle, b];
data, aData, mark, aMark => {
bytes: ByteIndex = PupSocket.GetUserBytes[b];
type: PupType.Type ← b.type;
IF bytes # 0 THEN b ← DataPacket[handle, b, bytes];
IF type = aData
OR type = aMark
THEN {
oldAllocNeeded: BOOL ← handle.allocNeeded;
handle.allocNeeded ← handle.inputBuffersAvailable < handle.inputBuffersToUse;
SELECT
TRUE
FROM
oldAllocNeeded => SendAckFirst[handle];
(bytes = 0) => SendAckFirst[handle];
~handle.allocNeeded => SendAckAgain[handle, b];
ENDCASE => acksDefered ← acksDefered.SUCC; }; };
end => GotEnd[handle, b];
endRep => GotEndReply[handle, b];
error => GotError[handle, b];
abort => GotAbort[handle, b];
int => GotInt[handle, b];
intRep => GotIntReply[handle, b];
rfc => NULL;
ENDCASE => funnyPacketType ← funnyPacketType.SUCC;
RETURN[b];
};
DataPacket:
ENTRY
PROC [handle: Handle, b: Buffer, bytes: ByteIndex]
RETURNS [swap: Buffer] = {
offset: INT ← Offset[Endian.CardFromF[b.id], handle.pullId];
swap ← b;
SELECT offset
FROM
< 0 => oldPackets ← oldPackets.SUCC;
> 0 => futurePackets ← futurePackets.SUCC; -- Missed something
0 => {
finger: Finger ← handle.inputWrite;
IF finger.state # empty THEN RETURN;
swap ← finger.buffer;
finger.buffer ← b;
finger.mark ← b.type = mark OR b.type = aMark;
finger.bytes ← bytes;
finger.state ← full;
handle.pullId ← handle.pullId + bytes;
handle.inputBuffersAvailable ← handle.inputBuffersAvailable.PRED;
handle.inputWrite ← finger.next;
NOTIFY handle.inputReady;
handle.timeOfLastArrival ← BasicTime.GetClockPulses[]; };
ENDCASE => ERROR;
};
GotEnd:
PROC [handle: Handle, b: Buffer] = {
IF b.id # handle.connectionId THEN RETURN;
IF b.source # handle.remote THEN RETURN;
PupSocketBackdoor.SetDirectReceive[handle.socket, NIL, NIL];
IF handle.err = NIL THEN SmashClosed[handle, remoteClose, "Remote close"];
SELECT handle.why
FROM
remoteClose => IF handle.outputBuffersReady # 0 THEN RETURN; -- Can't accept yet;
ENDCASE;
SendEndReplyFirst[handle, b];
};
GotEndReply:
PROC [handle: Handle, b: Buffer] = {
IF b.id # handle.connectionId THEN RETURN;
IF b.source # handle.remote THEN RETURN;
PupSocketBackdoor.SetDirectReceive[handle.socket, NIL, NIL];
IF handle.err # NIL THEN RETURN;
SmashClosed[handle, localClose, "Local close"];
SendEndReplyFirst[handle, b];
};
GotError:
PROC [handle: Handle, b: Buffer] = {
IF b.source # handle.remote THEN RETURN;
SELECT b.error.code
FROM
noSocket => SmashClosed[handle, remoteReject, PupSocket.ExtractErrorRope[b]];
resourceLimits, gatewayResourceLimits =>
handle.outputBuffersToUse ← MIN[1, handle.outputBuffersToUse-1];
ENDCASE => RETURN; -- Not a fatal error
};
GotAbort:
PROC [handle: Handle, b: Buffer] = {
IF b.id # handle.connectionId THEN RETURN;
IF b.source # handle.remote THEN RETURN;
SmashClosed[handle, remoteReject, PupSocket.ExtractAbortRope[b]];
};
GotInt:
PROC [handle: Handle, b: Buffer] = {
id: LONG CARDINAL = Endian.CardFromF[b.id];
IF b.source # handle.remote THEN RETURN;
IF Offset[id, handle.recvAttenIdArrived] = 1 THEN handle.recvAttenIdArrived ← id;
SendIntReplyFirst[handle, b];
NotifyRecvAttention[handle];
};
GotIntReply:
PROC [handle: Handle, b: Buffer] = {
id: LONG CARDINAL = Endian.CardFromF[b.id];
IF b.source # handle.remote THEN RETURN;
IF Offset[id, handle.sendAttenIdSent] = 1 THEN handle.sendAttenIdAcked ← id;
NotifySendAttention[handle];
};
SendRfc:
PROC [handle: Handle] = {
socket: Socket ← handle.socket;
b: Buffer ← PupSocket.AllocBuffer[socket];
b.type ← rfc;
b.id ← handle.connectionId;
b.address ← PupSocket.GetLocalAddress[socket];
PupSocket.SetUserSize[b, SIZE[Pup.Address]];
PupSocketBackdoor.PutFirst[socket, b];
PupSocket.FreeBuffer[b];
};
SendAbort:
PROC [handle: Handle, err:
ROPE] = {
socket: Socket ← handle.socket;
b: Buffer ← PupSocket.AllocBuffer[socket];
b.type ← abort;
b.id ← handle.connectionId;
b.abort.code ← 0;
PupSocket.SetUserHWords[b, 1];
PupSocket.AppendRope[b, err];
PupSocket.Put[socket, b];
IF handle.err # NIL THEN RETURN;
SmashClosed[handle, localAbort, err];
};
MaybeSendAck:
PROC [handle: Handle] = {
socket: Socket;
b: Buffer;
IF ~handle.allocNeeded THEN RETURN;
IF handle.inputBuffersAvailable < handle.inputBuffersToUse THEN RETURN;
socket ← handle.socket;
b ← PupSocket.AllocBuffer[socket];
SendAckAgain[handle, b];
PupSocket.FreeBuffer[b];
};
SendAckFirst:
PROC [handle: Handle] = {
socket: Socket ← handle.socket;
b: Buffer ← PupSocket.AllocBuffer[socket];
bytes: INT = handle.inputBuffersAvailable*LONG[handle.bufferSize]; -- Overflow
maxBytes: NAT = 32000; -- ARG! IFS gets confused by CARDINAL.LAST
handle.allocNeeded ← handle.inputBuffersAvailable < handle.inputBuffersToUse;
b.type ← ack;
b.id ← Endian.FFromCard[handle.pullId];
TRUSTED {
b.body ← ack[
maxBytesPerPup: handle.bufferSize,
maxPupsAhead: handle.inputBuffersAvailable,
maxBytesAhead: CARDINAL[MIN[bytes, maxBytes]] ]; };
PupSocket.SetUserHWords[b, 3];
PupSocketBackdoor.PutFirst[socket, b];
PupSocket.FreeBuffer[b];
};
SendAckAgain:
PROC [handle: Handle, b: Buffer] = {
socket: Socket ← handle.socket;
bytes: INT = handle.inputBuffersAvailable*LONG[handle.bufferSize]; -- Overflow
maxBytes: NAT = 32000; -- ARG! IFS gets confused by CARDINAL.LAST
handle.allocNeeded ← handle.inputBuffersAvailable < handle.inputBuffersToUse;
b.type ← ack;
b.id ← Endian.FFromCard[handle.pullId];
TRUSTED {
b.body ← ack[
maxBytesPerPup: handle.bufferSize,
maxPupsAhead: handle.inputBuffersAvailable,
maxBytesAhead: CARDINAL[MIN[bytes, maxBytes]] ]; };
PupSocket.SetUserHWords[b, 3];
PupSocketBackdoor.PutAgain[socket, b];
};
SendProbe:
PROC [handle: Handle] = {
socket: Socket ← handle.socket;
b: Buffer ← PupSocket.AllocBuffer[socket];
b.type ← aData;
b.id ← Endian.FFromCard[handle.pushId];
PupSocket.SetUserBytes[b, 0];
PupSocketBackdoor.PutFirst[socket, b];
PupSocket.FreeBuffer[b];
};
SendEnd:
PROC [handle: Handle] = {
socket: Socket ← handle.socket;
b: Buffer ← PupSocket.AllocBuffer[socket];
b.type ← end;
b.id ← handle.connectionId;
PupSocket.SetUserBytes[b, 0];
PupSocket.Put[socket, b];
};
SendEndReplyFirst:
PROC [handle: Handle, b: Buffer] = {
socket: Socket ← handle.socket;
b.type ← endRep;
b.id ← handle.connectionId;
PupSocket.SetUserBytes[b, 0];
PupSocketBackdoor.PutFirst[socket, b];
};
SendInt:
PROC [handle: Handle] = {
socket: Socket ← handle.socket;
b: Buffer ← PupSocket.AllocBuffer[socket];
b.type ← int;
b.id ← Endian.FFromCard[handle.sendAttenIdSent];
PupSocket.SetUserBytes[b, 0];
PupSocket.Put[socket, b];
};
SendIntReplyFirst:
PROC [handle: Handle, b: Buffer] = {
socket: Socket ← handle.socket;
b.type ← intRep;
b.id ← Endian.FFromCard[handle.recvAttenIdArrived];
PupSocket.SetUserBytes[b, 0];
PupSocketBackdoor.PutFirst[socket, b];
};
Synchronization
WaitInputReady:
PROC [handle: Handle] = {
DO
CheckForClosed[handle];
WaitInputReadyInner[handle];
CheckForClosed[handle]; -- Avoid AddressFault if Closed
IF handle.inputRead.state = full THEN RETURN;
SIGNAL Timeout;
ENDLOOP;
};
WaitInputReadyInner:
ENTRY
PROC [handle: Handle] =
INLINE {
ENABLE UNWIND => NULL;
IF handle.inputRead.state = full THEN RETURN;
WAIT handle.inputReady;
};
NotifyInputSpace:
ENTRY
PROC [handle: Handle]
RETURNS [finger: Finger] = {
finger ← handle.inputRead;
finger.state ← empty;
finger ← finger.next;
handle.inputRead ← finger;
handle.inputBuffersAvailable ← handle.inputBuffersAvailable.SUCC;
};
WaitOutputSpace:
PROC [handle: Handle] = {
DO
WaitOutputSpaceInner[handle];
CheckForClosed[handle]; -- Avoid AddressFault if Closed
IF handle.outputWrite.state = empty THEN RETURN;
SIGNAL Timeout;
ENDLOOP;
};
WaitOutputSpaceInner:
ENTRY
PROC [handle: Handle] =
INLINE {
ENABLE UNWIND => NULL;
IF handle.outputWrite.state = empty THEN RETURN;
WAIT handle.outputSpace;
};
WaitOutputFlushed:
ENTRY
PROC [handle: Handle] =
INLINE {
ENABLE UNWIND => NULL;
IF handle.outputBuffersReady = 0 THEN handle.flush ← FALSE;
IF ~handle.flush THEN RETURN;
WAIT handle.outputFlushed;
};
NotifyOutputSpace:
ENTRY
PROC [handle: Handle]
RETURNS [progress:
BOOL ←
FALSE] = {
The main cause of the complexity here is that this routine does the NOTIFY to the pushing processes. Progress has 2 slightly different meanings. Normally, it indicates that an acked buffer was an aData or aMark (we asked for the ack). During retransmissions, it means that the last buffer to be sent was acked. There may be more buffers ready to send, but they haven't been sent yet.
DO
finger: Finger ← handle.outputToAck;
IF finger.state # full THEN EXIT;
IF finger.inProgress THEN EXIT;
IF Offset[handle.ackedId, Endian.CardFromF[finger.buffer.id]] <= 0 THEN EXIT;
IF finger.ack THEN progress ← ~handle.retransmitting;
finger.state ← empty;
handle.outputToAck ← finger.next;
handle.outputBuffersReady ← handle.outputBuffersReady.PRED;
handle.outputBuffersInFlight ← handle.outputBuffersInFlight.PRED;
NOTIFY handle.outputSpace;
ENDLOOP;
IF handle.retransmitting
AND handle.outputBuffersInFlight = 0
THEN {
progress ← TRUE;
handle.retransmitting ← FALSE; };
IF handle.outputBuffersReady = 0
THEN {
handle.flush ← FALSE;
NOTIFY handle.outputFlushed; };
IF progress THEN { BROADCAST handle.alloc; BROADCAST handle.outputReady; };
};
WaitOutputReady:
ENTRY
PROC [handle: Handle]
RETURNS [finger: Finger] = {
DO
UNTIL handle.pushPackets > handle.outputBuffersInFlight
AND handle.pushBytes # 0
DO
IF handle.err # NIL THEN RETURN[NIL];
WAIT handle.alloc;
ENDLOOP;
IF handle.err # NIL THEN RETURN[NIL];
finger ← handle.outputRead;
SELECT finger.state
FROM
full => IF finger.inProgress AND ~handle.retransmitting THEN EXIT;
ENDCASE => NULL;
WAIT handle.outputReady;
ENDLOOP;
handle.outputRead ← handle.outputRead.next;
handle.outputBuffersInFlight ← handle.outputBuffersInFlight.SUCC;
SELECT
TRUE
FROM
handle.pushPackets = handle.outputBuffersInFlight => finger.ack ← TRUE;
handle.flush AND finger.next.state # full => finger.ack ← TRUE;
handle.outputBuffersToUse = handle.outputBuffersInFlight => finger.ack ← TRUE;
handle.outputBuffersAllocated = handle.outputBuffersInFlight => finger.ack ← TRUE;
ENDCASE => finger.ack ← FALSE;
};
NotifyOutputReady:
ENTRY
PROC [handle: Handle] = {
handle.outputBuffersReady ← handle.outputBuffersReady.SUCC;
NOTIFY handle.outputReady;
};
WaitStateChange:
ENTRY
PROC [handle: Handle] = {
IF handle.err # NIL THEN RETURN;
WAIT handle.stateChange;
};
ProcessAck:
ENTRY
PROC [handle: Handle, b: Buffer] = {
offset: INT ← Offset[Endian.CardFromF[b.id], handle.ackedId];
SELECT offset
FROM
< 0 => oldAcks ← oldAcks.SUCC;
ENDCASE => {
handle.ackedId ← handle.ackedId + offset;
handle.pushPacketSize ← MIN[b.maxBytesPerPup, handle.bufferSize];
handle.pushPackets ← b.maxPupsAhead;
handle.pushBytes ← b.maxBytesAhead;
IF handle.pushPackets # 0 THEN BROADCAST handle.ack; -- ExchangeAcks too
IF offset # 0
AND handle.pushPackets > handle.outputBuffersInFlight
THEN
BROADCAST handle.alloc;
handle.timeOfLastArrival ← BasicTime.GetClockPulses[]; };
};
WaitAck:
ENTRY
PROC [handle: Handle] = {
finger: Finger ← handle.outputToAck;
fingerId: LONG CARDINAL = Endian.CardFromF[finger.buffer.id];
IF finger.state = full AND Offset[handle.ackedId, fingerId] > 0 THEN RETURN;
WAIT handle.ack;
};
WaitForAllocation:
ENTRY
PROC [handle: Handle] = {
IF handle.pushPackets = 0 OR handle.pushBytes = 0 THEN WAIT handle.ack;
NOTIFY handle.alloc;
};
NotifyRecvAttention:
ENTRY
PROC [handle: Handle] = {
NOTIFY handle.recvAttention;
};
WaitRecvAttention:
ENTRY
PROC [handle: Handle] = {
ENABLE UNWIND => NULL;
WAIT handle.recvAttention;
};
NotifySendAttention:
ENTRY
PROC [handle: Handle] = {
NOTIFY handle.sendAttention;
};
WaitSendAttention:
ENTRY
PROC [handle: Handle] = {
ENABLE UNWIND => NULL;
WAIT handle.sendAttention;
};
CheckForClosed:
PROC [handle: Handle] = {
IF handle.err # NIL THEN ERROR StreamClosing[handle.why, handle.err];
};
SmashClosed:
ENTRY
PROC [handle: Handle, why: PupStream.CloseReason, err:
ROPE] = {
Kill the stream and wakeup everybody waiting for anything connected with this stream. All processes that WAIT should check handle.err when they wakeup and arrange to go away if it is not NIL.
IF handle.err # NIL THEN RETURN;
handle.why ← why;
handle.err ← err;
BROADCAST handle.stateChange;
BROADCAST handle.outputReady;
BROADCAST handle.outputFlushed;
BROADCAST handle.outputSpace;
BROADCAST handle.inputReady;
BROADCAST handle.alloc;
BROADCAST handle.ack;
BROADCAST handle.sendAttention;
BROADCAST handle.recvAttention;
PupSocket.Kick[handle.socket];
};