XNSStreamImpl.mesa
Copyright © 1986 by Xerox Corporation. All rights reserved.
Hal Murray, February 9, 1986 7:02:38 am PST
Demers, March 4, 1987 3:21:36 pm PST
TODO:
The mgrXXXXWakeup conditions don't need BROADCASTS.
??? Careful that state is never left xxxing after closed! ????
SOMEDAY experiment with making the LOCKS clause work directly on the IO.STREAM, thereby eliminating a level of procedure call from user processes.
QUESTION: If there's a halfempty packet buffer AND there's allocation AND we want to send a system packet, does it make sense to send the data packet instead? Note the other end may respond differently to data vs. system packets with AckMe set.
Probing strategy is wrong???? I think probing should expect a response on the same order of timeout as waitForAck; multiple probes should drive up the estimated round trip delay ????
ESTIMATE THE ROUND TRIP DELAY! — both running and initial based on hopcount.
DEBUG HACKS that need to be removed eventually:
CheckUserBytes
the "these can't hurt" checks in ReadyOutputBuffer
DIRECTORY
Basics USING [LowHalf],
BasicTime USING [GetClockPulses, MicrosecondsToPulses, Pulses, PulsesToMicroseconds],
Booting USING [RegisterProcs, RollbackProc],
Endian USING [CardFromH, HFromCard, HWORD],
IO USING [CreateStream, CreateStreamProcs, EndOfStream, STREAM, StreamProcs, UnsafeBlock],
PrincOpsUtils USING [ByteBlt],
Process USING [Abort, ConditionPointer, Detach, DisableTimeout, EnableAborts, Milliseconds, MsecToTicks, priorityBackground, priorityForeground, Seconds, SecondsToTicks, SetPriority, SetTimeout, Ticks],
Rope USING [ROPE],
SafeStorage USING [CantEstablishFinalization, EnableFinalization, EstablishFinalization, FinalizationQueue, FQNext, NewFQ, ReEstablishFinalization],
XNS USING [Address, Socket, unknownAddress],
XNSBuf USING [Buffer],
XNSEchoBuf USING [Buffer, hdrBytes],
XNSErrorBuf USING [Buffer],
XNSErrorTypes USING [badChecksumErr, cantGetThereErr, invalidPacketTypeErr, listenerRejectErr, noSocketErr, protocolViolationErr, resourceLimitsErr, unspecifiedErr],
XNSRouter USING [GetHops, Hops, unreachable],
XNSSocket USING [AllocBuffer, Create, Destroy, FreeBuffer, Get, GetLocalAddress, GetRemoteAddress, GetUserBytes, Handle, Kick, Milliseconds, Put, ReturnError, ReturnToSender, SetGetTimeout, SetRemoteAddress, SetUserBytes, waitForever],
XNSSocketBackdoor USING [FlushCache, PutCached, ReceiveProc, SetDirectReceive],
XNSSPPBuf USING [Buffer, ConnCtl, hdrBytes],
XNSSPPTypes USING [endReplySST, endSST],
XNSStream USING [AttentionType, CloseReason, FilterProc, ListenerProc, Milliseconds, State, SubSequenceType, waitForever],
XNSStreamExtras USING [XFilterProc, XListenerProc],
XNSStreamPrivate USING [AttentionType, BufIndex, Finger, FingerObject, Handle, HWORD, OBAttn, OBAttnObject, Object, SignedDiff, SPPBuffer, SubSequenceType, unknownConnectionID, XNSBuffer];
XNSStreamImpl: CEDAR MONITOR LOCKS handle USING handle: Handle
IMPORTS Basics, BasicTime, Booting, Endian, IO, PrincOpsUtils, Process, SafeStorage, XNSRouter, XNSStreamPrivate, XNSSocket, XNSSocketBackdoor
EXPORTS XNSStream, XNSStreamExtras
~ {
OPEN XNSStreamPrivate;
Temporary for debugging:
CheckUserBytes: PROC [buffer: XNSBuffer, bytes: CARDINAL] ~ {
IF bytes # XNSSocket.GetUserBytes[buffer] THEN ERROR };
Signals and Errors
ConnectionClosed: PUBLIC ERROR [why: XNSStream.CloseReason, text: Rope.ROPE] ~ CODE;
Timeout: PUBLIC SIGNAL ~ CODE;
CompletionCode: TYPE ~ { normal, timedOut };
Byte Stream Interface
STREAM: TYPE ~ IO.STREAM;
streamProcs: REF IO.StreamProcs ~ IO.CreateStreamProcs [
variety~$inputOutput, class~$XNS,
getChar~GetChar,
getBlock~GetBlock,
unsafeGetBlock~UnsafeGetBlock,
endOf~EndOf,
charsAvail~CharsAvail,
putChar~PutChar,
putBlock~PutBlock,
unsafePutBlock~UnsafePutBlock,
flush~Flush,
close~Close ];
Times
Milliseconds: TYPE ~ XNSStream.Milliseconds;
waitForever: Milliseconds ~ XNSStream.waitForever;
Pulses: TYPE ~ BasicTime.Pulses;
oneSecondOfPulses: Pulses ~ BasicTime.MicrosecondsToPulses[1000000];
MsecToPulses: PROC [msec: Milliseconds] RETURNS [Pulses] ~ {
IF msec >= waitForever
THEN RETURN [Pulses.LAST]
ELSE RETURN [BasicTime.MicrosecondsToPulses[1000*msec]] };
PulsesSince: PROC [then: Pulses] RETURNS [Pulses] ~ INLINE {
RETURN [BasicTime.GetClockPulses[] - then] };
TimeoutTicksFromPulses: PROC[pulses: Pulses] RETURNS[ticks: Process.Ticks] ~ {
The idea is to set the timeout of a condition variable so that the specified number of pulses (+ epsilon) will have elapsed when the condition times out. Epsilon here is 10%, which is pretty crude. I should do some experiments to see how much adjustment Cedar really requires here.
usec: LONG CARDINAL ~ BasicTime.PulsesToMicroseconds[pulses];
msec: Milliseconds;
msec ← (usec + usec/10) / 1000;
IF msec < Process.Milliseconds.LAST THEN
RETURN[ Process.MsecToTicks[Process.Milliseconds[msec]] ];
RETURN[ Process.SecondsToTicks[Process.Seconds[msec/1000]] ];
};
Now: PROC RETURNS [Pulses] ~ INLINE {
RETURN [BasicTime.GetClockPulses[]] };
IsTimedOut: PROC [timeout, then: Pulses] RETURNS [BOOL] ~ INLINE {
RETURN [(BasicTime.GetClockPulses[] - then) > timeout] };
Buffer Enqueue and Dequeue Utilities
defaultConnCtl: XNSSPPBuf.ConnCtl ~ [
system~FALSE,
sendAck~FALSE,
attn~FALSE,
endOfMsg~FALSE,
filler~0];
defaultSystemConnCtl: XNSSPPBuf.ConnCtl ~ [
system~TRUE,
sendAck~FALSE,
attn~FALSE,
endOfMsg~FALSE,
filler~0];
defaultAttnConnCtl: XNSSPPBuf.ConnCtl ~ [
system~FALSE,
sendAck~FALSE,
attn~TRUE,
endOfMsg~FALSE,
filler~0];
AllocateOutputBuffer: INTERNAL PROC [handle: Handle]
RETURNS [nsb: XNSBuffer, b: SPPBuffer] ~ {
Allocate an SPP buffer for handle.
Initialize type and connectionID fields.
nsb ← XNSSocket.AllocBuffer[handle.socket];
TRUSTED { b ← LOOPHOLE[nsb] };
b.hdr1.type ← spp;
b.hdr2.sourceConnID ← handle.connectionID;
b.hdr2.destConnID ← handle.remoteConnectionID;
};
OutputMakeNotEmpty: INTERNAL PROC [handle: Handle] ~ {
Make handle.outputEnqueue nonempty.
Make sure the buffer is actually allocated and set its connCtl, sst and seqNum fields.
PRECONDITION: handle.outputEnqueue.state = empty
finger: Finger ~ handle.outputEnqueue;
buffer: SPPBuffer ← finger.buffer;
IF finger.state # empty THEN ERROR; -- Can't happen.
IF buffer = NIL THEN finger.buffer ← buffer ← AllocateOutputBuffer[handle].b;
buffer.hdr2.connCtl ←
IF finger.attention THEN defaultAttnConnCtl ELSE defaultConnCtl;
buffer.hdr2.sst ← handle.outputSSType;
buffer.hdr2.seqNum ← Endian.HFromCard[handle.outputEnqueueNum];
finger.state ← halfFull;
finger.index ← finger.bytes ← 0;
};
InitSystemOutputBuffer: INTERNAL PROC [handle: Handle, buffer: SPPBuffer] ~ {
Initialize output buffer for system packet. Performs the same buffer initialization as OutputMakeNotEmpty above, and sets buffer length field.
buffer.hdr2.connCtl ← defaultSystemConnCtl;
buffer.hdr2.sst ← handle.outputSSType;
buffer.hdr2.seqNum ← Endian.HFromCard[handle.outputSendNum];
TRUSTED { XNSSocket.SetUserBytes[LOOPHOLE[buffer], XNSSPPBuf.hdrBytes] };
};
OutputEnqueue: INTERNAL PROC [handle: Handle, endOfMessage: BOOL] ~ {
Enqueue the finger+buffer pointed to by handle.outputEnqueue; set its length field, set its state to full and NOTIFY that an output buffer is ready.
PRECONDITION: handle.outputEnqueue.state = halfFull
finger: Finger ~ handle.outputEnqueue;
IF finger.state # halfFull THEN ERROR; -- Can't happen.
finger.buffer.hdr2.connCtl.endOfMsg ← endOfMessage;
TRUSTED { XNSSocket.SetUserBytes[LOOPHOLE[finger.buffer],
XNSSPPBuf.hdrBytes + finger.bytes] };
finger.state ← full;
IF finger.index # 0 THEN ERROR; -- Can't happen.
handle.outputEnqueue ← handle.outputEnqueue.next;
handle.outputEnqueueNum ← handle.outputEnqueueNum + 1;
NOTIFY handle.outputReady;
};
ReadyOutputBuffer: INTERNAL PROC [handle: Handle, buffer: SPPBuffer,
requestAck: BOOL] ~ {
Fill in time-dependent fields of output buffer before sending it.
Called before handle.outputSend pointer is advanced.
expectedAckNum, allocNum: CARDINAL;
Acknowledgement request ...
IF requestAck THEN {
expectedAckNum ← Endian.CardFromH[buffer.hdr2.seqNum];
IF (NOT buffer.hdr2.connCtl.system)
AND (expectedAckNum = handle.outputSendNum)
THEN expectedAckNum ← expectedAckNum + 1;
handle.expectedAckNum ← expectedAckNum;
buffer.hdr2.connCtl.sendAck ← TRUE;
handle.sentAckReqTime ← Now[] };
Acknowledgement ...
buffer.hdr2.ackNum ← Endian.HFromCard[handle.inputAckNum];
handle.mustSendAck ← FALSE;
Allocation ...
allocNum ← handle.inputDequeueNum + handle.inputBuffersAllocated - 1;
buffer.hdr2.allocNum ← Endian.HFromCard[allocNum];
IF allocNum # handle.sentAllocNum THEN {
handle.heWantsAlloc ← FALSE; -- I THINK THIS IS BOGUS ???? Do we need sentAllocNum at all ????
handle.sentAllocNum ← allocNum };
THESE CAN'T HURT, BUT WHAT ARE THEY DOING HERE ???? I'll find out!
buffer.hdr1.type ← spp;
IF buffer.hdr1.type # spp THEN ERROR;
buffer.hdr2.sourceConnID ← handle.connectionID;
IF buffer.hdr2.sourceConnID # handle.connectionID THEN ERROR;
buffer.hdr2.destConnID ← handle.remoteConnectionID;
IF buffer.hdr2.destConnID # handle.remoteConnectionID THEN ERROR;
};
MakeCopyOfXNSBuffer: PROC [sH: XNSSocket.Handle, nsb: XNSBuffer]
RETURNS[copy: XNSBuffer] ~ {
bytes: CARDINAL ~ Endian.CardFromH[nsb.hdr1.length];
copy ← XNSSocket.AllocBuffer[sH];
TRUSTED {
[] ← PrincOpsUtils.ByteBlt[
to ~ [
blockPointer ~ @copy.hdr1,
startIndex ~ 0, stopIndexPlusOne ~ bytes],
from ~ [
blockPointer ~ @nsb.hdr1,
startIndex ~ 0, stopIndexPlusOne ~ bytes]
];
};
};
InputDequeue: INTERNAL PROC [handle: Handle] ~ -- INLINE -- {
finger: Finger;
sendAllocNow, cantPiggyback: BOOL;
finger ← handle.inputDequeue;
finger.state ← empty;
finger.attention ← FALSE;
handle.inputDequeue ← finger.next;
handle.inputDequeueNum ← handle.inputDequeueNum + 1;
Check whether the other side has asked for allocation and a reasonable number of input buffers have become available. If so, make sure the other side gets the allocation by sending a system packet if necessary:
sendAllocNow ←
He wants some allocation AND
handle.heWantsAlloc AND
I have enough input buffers to give him a reasonable allocation (i.e., at most half my receive buffers are occupied)
(SignedDiff[handle.inputEnqueueNum, handle.inputDequeueNum] <= (handle.inputBuffersAllocated/2));
IF sendAllocNow THEN handle.heWantsAlloc ← FALSE;
cantPiggyback ←
There's no data packet at all OR
(handle.outputSendNum = handle.outputEnqueueNum) OR
I'm waiting for allocation from him
(SignedDiff[handle.recvdAllocNum, handle.outputSendNum] < 0);
IF sendAllocNow AND cantPiggyback THEN {
nsb: XNSBuffer;
b: SPPBuffer;
[nsb, b] ← AllocateOutputBuffer[handle];
InitSystemOutputBuffer[handle, b];
ReadyOutputBuffer[handle, b, FALSE];
CheckUserBytes[nsb, XNSSPPBuf.hdrBytes]; -- DEBUG ????
XNSSocketBackdoor.PutCached[nsb];
XNSSocket.FreeBuffer[nsb];
};
};
NotifyClosed: INTERNAL PROC [handle: Handle, closeReason: XNSStream.CloseReason, closeText: Rope.ROPE] ~ {
IF NOT handle.closed THEN {
handle.closeReason ← closeReason;
handle.closeText ← closeText;
handle.closed ← TRUE };
BROADCAST handle.inputReady;
BROADCAST handle.doneEmptying; -- ????
BROADCAST handle.outputSpace;
BROADCAST handle.outputReady;
BROADCAST handle.recvdNewAlloc;
BROADCAST handle.doneFilling; -- ????
BROADCAST handle.doneSending; -- ????
BROADCAST handle.mgrShortWakeup;
BROADCAST handle.mgrLongWakeup;
BROADCAST handle.flusherWakeup;
BROADCAST handle.recvdOBAttn;
XNSSocket.Kick[handle.socket];
};
EntryNotifyClosed: ENTRY PROC [handle: Handle, closeReason: XNSStream.CloseReason, closeText: Rope.ROPE] ~ {
NotifyClosed[handle, closeReason, closeText] };
Creation
MakeBufferRing: PROC [sH: XNSSocket.Handle, nBuffers: CARDINAL]
RETURNS [finger: Finger ← NIL] ~ {
tail: Finger ← NIL;
THROUGH [1..nBuffers] DO
finger ← NEW[ FingerObject ← [next~finger] ];
IF tail = NIL THEN tail ← finger;
ENDLOOP;
tail.next ← finger;
};
FreeBufferRing: PROC [finger: Finger] ~ {
WHILE finger # NIL DO
IF finger.buffer # NIL THEN {
nsb: XNSBuffer;
TRUSTED { nsb ← LOOPHOLE[finger.buffer] };
XNSSocket.FreeBuffer[nsb];
finger.buffer ← NIL };
{ temp: Finger ~ finger.next; finger.next ← NIL; finger ← temp };
ENDLOOP;
};
InitConditions: PROC [handle: Handle] ~ {
TRUSTED {
Process.EnableAborts[@handle.inputReady];
Process.EnableAborts[@handle.doneEmptying]; -- ????
Process.EnableAborts[@handle.outputSpace];
Process.EnableAborts[@handle.outputReady];
Process.EnableAborts[@handle.recvdNewAlloc];
Process.EnableAborts[@handle.doneFilling]; -- ????
Process.EnableAborts[@handle.doneSending]; -- ????
Process.EnableAborts[@handle.mgrShortWakeup]; -- ????
Process.EnableAborts[@handle.mgrLongWakeup]; -- ????
Process.EnableAborts[@handle.flusherWakeup];
Process.EnableAborts[@handle.recvdOBAttn];
};
};
InitTimeouts: PROC [handle: Handle, getTimeout, putTimeout: Milliseconds] ~ {
handle.getTimeout ← getTimeout;
handle.getPulseOut ← MsecToPulses[getTimeout];
IF getTimeout = waitForever
THEN TRUSTED {
Process.DisableTimeout[@handle.inputReady] }
ELSE TRUSTED {
Process.SetTimeout[@handle.inputReady, TimeoutTicksFromPulses[handle.getPulseOut]] };
handle.putTimeout ← putTimeout;
handle.putPulseOut ← MsecToPulses[putTimeout];
IF putTimeout = waitForever
THEN TRUSTED {
Process.DisableTimeout[@handle.outputSpace] }
ELSE TRUSTED {
Process.SetTimeout[@handle.outputSpace, TimeoutTicksFromPulses[handle.putPulseOut]] };
handle.cacheFlushPulseOut ← 30 * oneSecondOfPulses;
handle.roundTripPulses ← oneSecondOfPulses;
UpdateTimeouts[handle];
};
UpdateTimeouts: PROC [handle: Handle] ~ {
TODO: None of this stuff is very good. I don't know any good retransmission heuristics, and I guess I've got to learn some.
handle.waitForAckPulseOut ← 3 * handle.roundTripPulses;
handle.waitForAllocPulseOut ← handle.waitForAckPulseOut;
TRUSTED {
Process.SetTimeout[@handle.mgrShortWakeup, TimeoutTicksFromPulses[handle.waitForAckPulseOut]] };
handle.probePulseOut ← 30 * handle.roundTripPulses;
TRUSTED {
Process.SetTimeout[@handle.mgrLongWakeup, TimeoutTicksFromPulses[handle.probePulseOut]] };
handle.noActivityPulseOut ← 100 * handle.roundTripPulses;
};
MakeHandle: PROC [socket: XNSSocket.Handle, connectionID, remoteConnectionID: HWORD, sendBuffers, recvBuffers: CARDINAL, getTimeout, putTimeout: Milliseconds]
RETURNS [handle: Handle] ~ {
handle ← NEW[Object];
handle.socket ← socket;
handle.remote ← XNSSocket.GetRemoteAddress[socket];
handle.recvdTime ← Now[];
handle.cacheFlushedTime ← Now[];
handle.sentAckReqTime ← Now[];
handle.connectionID ← connectionID;
handle.remoteConnectionID ← remoteConnectionID;
InitConditions[handle];
InitTimeouts[handle, getTimeout, putTimeout];
{ f: Finger ~ MakeBufferRing[handle.socket, sendBuffers];
handle.outputEnqueue ← handle.outputSend ← handle.outputDequeue ← f;
handle.outputBuffersAllocated ← sendBuffers };
{ f: Finger ~ MakeBufferRing[handle.socket, recvBuffers];
handle.inputEnqueue ← handle.inputAck ← handle.inputDequeue ← f;
handle.inputBuffersAllocated ← recvBuffers };
handle.pull ← FORK Pull[handle];
handle.mgr ← FORK Mgr[handle];
handle.push1 ← FORK Push[handle];
handle.push2 ← FORK Push[handle];
AddNewHandle[newHandle~handle];
SafeStorage.EnableFinalization[handle];
};
FinishHandle: PROC [handle: Handle] ~ {
IF NOT handle.closed THEN ERROR;
IF handle.finished THEN RETURN;
TRUSTED {
JOIN handle.push1; handle.push1 ← NIL;
JOIN handle.push2; handle.push2 ← NIL;
JOIN handle.mgr; handle.mgr ← NIL;
JOIN handle.pull; handle.pull ← NIL };
FreeBufferRing[handle.outputEnqueue];
handle.outputEnqueue ← handle.outputSend ← handle.outputDequeue ← NIL;
FreeBufferRing[handle.inputEnqueue];
handle.inputEnqueue ← handle.inputAck ← handle.inputDequeue ← NIL;
XNSSocket.Destroy[handle.socket];
handle.finished ← TRUE };
SendRFC: PROC [socket: XNSSocket.Handle, connectionID: HWORD] ~ {
nsb: XNSBuffer;
b: SPPBuffer;
nsb ← XNSSocket.AllocBuffer[socket];
TRUSTED { b ← LOOPHOLE[nsb] };
b.hdr1.type ← spp;
b.hdr2.connCtl ← defaultSystemConnCtl;
b.hdr2.connCtl.sendAck ← TRUE;
b.hdr2.sst ← 0;
b.hdr2.sourceConnID ← connectionID;
b.hdr2.destConnID ← unknownConnectionID;
b.hdr2.seqNum ← Endian.HFromCard[0];
b.hdr2.ackNum ← Endian.HFromCard[0];
b.hdr2.allocNum ← Endian.HFromCard[0];
XNSSocket.SetUserBytes[nsb, XNSSPPBuf.hdrBytes];
XNSSocket.Put[b~nsb];
};
defaultInitialRetransmissionTime: Milliseconds ← 500;
Create: PUBLIC PROC [remote: XNS.Address, getTimeout, putTimeout: Milliseconds ← waitForever] RETURNS [STREAM] ~ {
nsb: XNSBuffer ← NIL;
socket: XNSSocket.Handle ← NIL;
handle: Handle ← NIL;
retransmissionTime: Milliseconds;
connID: HWORD;
hops: XNSRouter.Hops;
getTimeout ← MIN[getTimeout, waitForever];
putTimeout ← MIN[putTimeout, waitForever];
IF (hops ← XNSRouter.GetHops[remote.net]) = XNSRouter.unreachable
THEN ERROR ConnectionClosed[noRoute, "no route"];
This should depend on hops:
retransmissionTime ← defaultInitialRetransmissionTime;
BEGIN
ENABLE UNWIND => {
IF nsb # NIL THEN { XNSSocket.FreeBuffer[nsb]; nsb ← NIL };
IF socket # NIL THEN { XNSSocket.Destroy[socket]; socket ← NIL } };
socket ← XNSSocket.Create[remote~remote, sendBuffers~10, recvBuffers~14]; -- buffers ????
connID ← GetUniqueConnID[];
FOR nTries: CARDINAL IN [1..10] DO
IF nTries > 5 THEN retransmissionTime ← 2*retransmissionTime;
XNSSocket.SetGetTimeout[socket, retransmissionTime];
SendRFC[socket, connID];
nsb ← XNSSocket.Get[socket];
IF nsb = NIL THEN LOOP;
SELECT nsb.hdr1.type FROM
spp => {
b: SPPBuffer;
TRUSTED { b ← LOOPHOLE[nsb] };
IF b.hdr2.destConnID # connID THEN {
XNSSocket.ReturnError[nsb, XNSErrorTypes.protocolViolationErr];
nsb ← NIL;
LOOP };
IF b.hdr1.source.host # remote.host THEN {
XNSSocket.ReturnError[nsb, XNSErrorTypes.protocolViolationErr];
nsb ← NIL;
LOOP };
XNSSocket.SetRemoteAddress[socket, b.hdr1.source];
handle ← MakeHandle[socket~socket, connectionID~connID, remoteConnectionID~b.hdr2.sourceConnID, sendBuffers~6, recvBuffers~6, getTimeout~getTimeout, putTimeout~putTimeout];
nsb ← Receive[socket, nsb, handle];
IF nsb # NIL THEN { XNSSocket.FreeBuffer[nsb]; nsb ← NIL };
RETURN [IO.CreateStream[streamProcs~streamProcs, streamData~handle]];
};
error => {
OPEN XNSErrorTypes;
eb: XNSErrorBuf.Buffer;
TRUSTED { eb ← LOOPHOLE[nsb] };
SELECT eb.hdr2.type FROM
badChecksumErr, resourceLimitsErr => LOOP;
noSocketErr, listenerRejectErr, invalidPacketTypeErr, protocolViolationErr =>
ERROR ConnectionClosed[remoteReject, "rejected"];
cantGetThereErr => ERROR ConnectionClosed[noRoute, "no route"];
ENDCASE => ERROR ConnectionClosed[unknown, "error reply to RFC"];
};
ENDCASE => {
XNSSocket.ReturnError[nsb, XNSErrorTypes.invalidPacketTypeErr];
nsb ← NIL;
LOOP };
ENDLOOP;
ERROR ConnectionClosed[noResponse, "no response"];
END;
};
Listener: TYPE ~ REF ListenerObject;
ListenerObject: PUBLIC TYPE ~ RECORD [
next: Listener,
destroyed: BOOLFALSE,
socketHandle: XNSSocket.Handle,
listenProcess: PROCESS ];
CreateListener: PUBLIC PROC [
socket: XNS.Socket,
worker: XNSStream.ListenerProc,
getTimeout, putTimeout: Milliseconds ← waitForever,
filter: XNSStream.FilterProc ← NIL, -- NIL => Accept all requests
echoFilter: XNSStream.FilterProc ← NIL] -- NIL => Answer all echos
RETURNS [listener: Listener]
~ {
listener ← NEW[ ListenerObject ];
listener.socketHandle ← XNSSocket.Create[getTimeout~XNSSocket.waitForever, local~socket];
listener.listenProcess ← FORK Listen[socket, getTimeout, putTimeout, worker, filter, echoFilter, NIL, NIL, NIL, NIL];
AddNewListener[newListener~listener];
SafeStorage.EnableFinalization[listener];
};
CreateXListener: PUBLIC PROC [
socket: XNS.Socket,
worker: XNSStreamExtras.XListenerProc,
getTimeout, putTimeout: Milliseconds ← waitForever,
clientData: REFNIL,
filter: XNSStreamExtras.XFilterProc ← NIL, -- NIL => Accept all requests
echoFilter: XNSStreamExtras.XFilterProc ← NIL] -- NIL => Answer all echos
RETURNS [listener: Listener]
~ {
listener ← NEW[ ListenerObject ];
listener.socketHandle ← XNSSocket.Create[getTimeout~XNSSocket.waitForever, local~socket];
listener.listenProcess ← FORK Listen[socket, getTimeout, putTimeout, NIL, NIL, NIL, worker, filter, echoFilter, clientData];
AddNewListener[newListener~listener];
SafeStorage.EnableFinalization[listener];
};
GetLocalFromListener: PUBLIC PROC [listener: Listener] RETURNS [local: XNS.Address] ~ {
socketHandle: XNSSocket.Handle ~ listener.socketHandle;
local ← IF socketHandle # NIL
THEN XNSSocket.GetLocalAddress[socketHandle]
ELSE XNS.unknownAddress;
};
DestroyListener: PUBLIC PROC [listener: Listener] ~ {
IF listener.destroyed THEN RETURN;
listener.destroyed ← TRUE;
IF listener.socketHandle = NIL THEN RETURN; -- CreateListener was ABORTed
XNSSocket.Destroy[listener.socketHandle];
listener.socketHandle ← NIL;
TRUSTED {
Process.Abort[listener.listenProcess];
JOIN listener.listenProcess };
};
Listen: PROC [
socket: XNS.Socket,
getTimeout, putTimeout: Milliseconds,
worker: XNSStream.ListenerProc,
filter: XNSStream.FilterProc,
echoFilter: XNSStream.FilterProc,
xWorker: XNSStreamExtras.XListenerProc,
xFilter: XNSStreamExtras.XFilterProc,
xEchoFilter: XNSStreamExtras.XFilterProc,
clientData: REF]
~ {
socketHandle: XNSSocket.Handle ←
XNSSocket.Create[getTimeout~XNSSocket.waitForever, local~socket];
nsb: XNSBuffer ← NIL;
bytes: NAT;
{
ENABLE UNWIND => {
IF nsb # NIL THEN
{ XNSSocket.FreeBuffer[nsb]; nsb ← NIL };
};
DO
IF nsb # NIL THEN
{ XNSSocket.FreeBuffer[nsb]; nsb ← NIL };
IF (nsb ← XNSSocket.Get[socketHandle]) = NIL THEN LOOP;
bytes ← XNSSocket.GetUserBytes[nsb];
SELECT nsb.hdr1.type FROM
echo => {
b: XNSEchoBuf.Buffer;
IF bytes < XNSEchoBuf.hdrBytes THEN LOOP;
TRUSTED { b ← LOOPHOLE[nsb] };
IF b.hdr2.type # request THEN LOOP;
IF (echoFilter # NIL) AND NOT echoFilter[nsb.hdr1.source]
THEN LOOP;
IF (xEchoFilter # NIL) AND NOT xEchoFilter[nsb.hdr1.source, clientData]
THEN LOOP;
b.hdr2.type ← reply;
{ XNSSocket.ReturnToSender[nsb]; nsb ← NIL };
};
spp => {
b: SPPBuffer;
streamSocketHandle: XNSSocket.Handle;
stream: STREAM;
handle: Handle;
IF bytes < XNSSPPBuf.hdrBytes THEN LOOP;
TRUSTED { b ← LOOPHOLE[nsb] };
IF (b.hdr2.seqNum # 0)
OR (b.hdr2.destConnID # unknownConnectionID)
OR (b.hdr2.sourceConnID = unknownConnectionID) THEN {
XNSSocket.ReturnError[nsb, XNSErrorTypes.protocolViolationErr];
nsb ← NIL;
LOOP };
IF (handle ← FindExistingHandle[remote~nsb.hdr1.source, remoteConnID~b.hdr2.sourceConnID]) # NIL THEN {
ReceiveRFC[handle, nsb];
LOOP };
IF (filter # NIL) AND NOT filter[nsb.hdr1.source] THEN {
XNSSocket.ReturnError[nsb, XNSErrorTypes.listenerRejectErr];
nsb ← NIL;
LOOP };
IF (xFilter # NIL) AND NOT xFilter[nsb.hdr1.source, clientData] THEN {
XNSSocket.ReturnError[nsb, XNSErrorTypes.listenerRejectErr];
nsb ← NIL;
LOOP };
streamSocketHandle ← XNSSocket.Create[remote~nsb.hdr1.source, sendBuffers~10, recvBuffers~14]; -- how many buffers really? what should the timeouts be? ????
handle ← MakeHandle[socket~streamSocketHandle, connectionID~GetUniqueConnID[], remoteConnectionID~b.hdr2.sourceConnID, sendBuffers~6, recvBuffers~6, getTimeout~getTimeout, putTimeout~putTimeout]; -- what should buffers be really? ????
ReceiveRFC[handle, nsb];
stream ← IO.CreateStream[streamProcs~streamProcs, streamData~handle];
IF worker # NIL THEN TRUSTED {
Process.Detach[FORK worker[stream~stream, remote~b.hdr1.source]] };
IF xWorker # NIL THEN TRUSTED {
Process.Detach[FORK xWorker[stream~stream, remote~b.hdr1.source, clientData~clientData]] };
};
ENDCASE;
ENDLOOP;
};
};
ReceiveRFC: PROC [handle: Handle, nsb: XNSBuffer] ~ {
The buffer has to be copied so buffer accounting will work out right. It's only an RFC (short) so the overhead isn't something we need to worry about.
newNsb: XNSBuffer;
newNsb ← MakeCopyOfXNSBuffer[handle.socket, nsb];
TRUSTED { (LOOPHOLE[newNsb, SPPBuffer]).hdr2.connCtl.sendAck ← TRUE }; -- force a reply
newNsb ← EntryReceive[handle, newNsb];
IF newNsb # NIL THEN XNSSocket.FreeBuffer[newNsb];
};
Byte Stream Interface
GetTimeouts: PUBLIC PROC [self: IO.STREAM]
RETURNS [getTimeout, putTimeout: Milliseconds] ~ {
handle: Handle = NARROW[self.streamData];
[getTimeout, putTimeout] ← EntryGetTimeouts[handle] };
EntryGetTimeouts: ENTRY PROC [handle: Handle]
RETURNS [getTimeout, putTimeout: Milliseconds] ~ {
IF handle.closed THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
getTimeout ← handle.getTimeout;
putTimeout ← handle.putTimeout;
};
SetTimeouts: PUBLIC PROC [self: IO.STREAM,
getTimeout, putTimeout: Milliseconds ← waitForever] ~ {
handle: Handle = NARROW[self.streamData];
EntrySetTimeouts[handle, getTimeout, putTimeout];
};
EntrySetTimeouts: ENTRY PROC [handle: Handle,
getTimeout, putTimeout: Milliseconds] ~ {
IF handle.closed THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
InitTimeouts[handle, getTimeout, putTimeout];
};
SetSSType: PUBLIC PROC [self: STREAM, ssType: SubSequenceType] ~ {
handle: Handle = NARROW[self.streamData];
code: CompletionCode;
DO
code ← EntrySetSSType[handle, ssType];
IF code = normal THEN RETURN;
SIGNAL Timeout[];
ENDLOOP;
};
EntrySetSSType: ENTRY PROC [handle: Handle, ssType: SubSequenceType,
sendNow: BOOLFALSE]
RETURNS [CompletionCode ← normal] ~ {
Note: After changing ssType it's necessary to have a halfFull buffer as handle.inputEnqueue^; this ensures that the sequence
 SetSSType[h, t1]; SetSSType[h, t2];
will work correctly.
ENABLE UNWIND => NULL;
finger: Finger;
startTime: Pulses ← Now[];
IF ssType = handle.outputSSType THEN RETURN;
DO
IF handle.closed THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
finger ← handle.outputEnqueue;
SELECT finger.state FROM
empty => EXIT;
sending, full => {
IF IsTimedOut[handle.putPulseOut, startTime] THEN RETURN [timedOut];
WAIT handle.outputSpace; LOOP };
filling => { WAIT handle.doneFilling; LOOP };
halfFull => { OutputEnqueue[handle~handle, endOfMessage~FALSE]; LOOP };
ENDCASE => ERROR;
ENDLOOP;
handle.outputSSType ← ssType;
OutputMakeNotEmpty[handle]; -- gets new handle.outputSSType
IF sendNow THEN OutputEnqueue[handle~handle, endOfMessage~FALSE];
};
SendEndOfMessage: PUBLIC PROC [self: STREAM] ~ {
handle: Handle = NARROW[self.streamData];
code: CompletionCode;
DO
code ← EntrySendEndOfMessage[handle];
IF code = normal THEN RETURN;
SIGNAL Timeout[];
ENDLOOP;
};
EntrySendEndOfMessage: ENTRY PROC [handle: Handle]
RETURNS [CompletionCode] ~ {
ENABLE UNWIND => NULL;
finger: Finger;
startTime: Pulses ← Now[];
DO
IF handle.closed THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
finger ← handle.outputEnqueue;
SELECT finger.state FROM
empty => { OutputMakeNotEmpty[handle]; EXIT };
sending, full => {
IF IsTimedOut[handle.putPulseOut, startTime] THEN RETURN [timedOut];
WAIT handle.outputSpace; LOOP };
filling => { WAIT handle.doneFilling; LOOP };
halfFull => EXIT;
ENDCASE => ERROR;
ENDLOOP;
OutputEnqueue[handle~handle, endOfMessage~TRUE];
RETURN [normal];
};
SendAttention: PUBLIC PROC [self: STREAM, attentionType: AttentionType] ~ {
handle: Handle = NARROW[self.streamData];
nsb: XNSBuffer;
code: CompletionCode;
DO
[nsb, code] ← EntrySendAttention[handle, attentionType];
IF code = normal THEN EXIT;
IF nsb # NIL THEN ERROR; -- can't happen
SIGNAL Timeout[];
ENDLOOP;
IF nsb # NIL THEN XNSSocket.Put[nsb];
};
EntrySendAttention: ENTRY PROC [handle: Handle, attentionType: AttentionType]
RETURNS [nsb: XNSBuffer, code: CompletionCode] ~ {
Note this should only time out if there are too many outstanding output in-band attentions queued.
ENABLE UNWIND => NULL;
finger: Finger;
startTime: Pulses ← Now[];
attnNum: CARDINAL;
DO
IF handle.closed THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
IF handle.outputIBAttnCnt >= 3 THEN { -- How many really ????
IF IsTimedOut[handle.putPulseOut, startTime] THEN RETURN [NIL, timedOut];
WAIT handle.outputSpace; LOOP };
finger ← handle.outputEnqueue;
SELECT finger.state FROM
filling => { WAIT handle.doneFilling; LOOP };
halfFull => { OutputEnqueue[handle~handle, endOfMessage~FALSE]; EXIT };
ENDCASE => EXIT;
ENDLOOP;
Allocate a new finger and buffer for the in-band attention (Note we need to do this before sending it out-of-band, to determine its sequence number).
WHILE finger.next # handle.outputEnqueue DO finger ← finger.next ENDLOOP;
finger.next ← NEW [FingerObject ← [next~handle.outputEnqueue, attention~TRUE]];
handle.outputEnqueue ← finger ← finger.next;
IF handle.outputSendNum = handle.outputEnqueueNum THEN
handle.outputSend ← handle.outputEnqueue;
IF handle.outputDequeueNum = handle.outputEnqueueNum THEN
handle.outputDequeue ← handle.outputEnqueue;
handle.outputIBAttnCnt ← handle.outputIBAttnCnt.SUCC;
attnNum ← handle.outputEnqueueNum;
Fill the in-band attention buffer and enqueue it.
OutputMakeNotEmpty[handle];
TRUSTED { finger.buffer.body.bytes[0] ← LOOPHOLE[attentionType] };
finger.bytes ← 1;
OutputEnqueue[handle~handle, endOfMessage~FALSE];
If there's no allocation for the attention, make a copy of it to be send it out-of-band.
IF SignedDiff[handle.recvdAllocNum, attnNum] < 0 THEN {
nsb: XNSBuffer;
TRUSTED {
nsb ← MakeCopyOfXNSBuffer[handle.socket, LOOPHOLE[finger.buffer, XNSBuffer]];
ReadyOutputBuffer[handle~handle, buffer~LOOPHOLE[nsb, SPPBuffer], requestAck~FALSE] };
RETURN [nsb, normal];
};
RETURN [NIL, normal];
};
WaitAttention: PUBLIC PROC [self: STREAM, waitTimeout: Milliseconds ← XNSStream.waitForever] RETURNS [type: AttentionType] ~ {
handle: Handle = NARROW[self.streamData];
code: CompletionCode;
DO
[type, code] ← EntryWaitAttention[handle, MsecToPulses[waitTimeout]];
IF code = normal THEN RETURN;
SIGNAL Timeout[];
ENDLOOP;
};
EntryWaitAttention: ENTRY PROC [handle: Handle, pulseOut: Pulses]
RETURNS [AttentionType, CompletionCode] ~ {
ENABLE UNWIND => NULL;
startTime: Pulses ← Now[];
p: OBAttn ← NIL;
DO
IF handle.closed THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
IF (p ← handle.recvdOBAttnList) # NIL THEN {
handle.recvdOBAttnList ← p.next;
RETURN [p.attentionType, normal] };
IF IsTimedOut[pulseOut, startTime] THEN RETURN[0, timedOut];
TRUSTED {
IF pulseOut < Pulses.LAST
THEN Process.SetTimeout[@handle.recvdOBAttn, TimeoutTicksFromPulses[pulseOut]]
ELSE Process.DisableTimeout[@handle.recvdOBAttn];
};
WAIT handle.recvdOBAttn;
ENDLOOP;
};
GetStatus: PUBLIC PROC [self: STREAM, reset: BOOL] RETURNS [state: XNSStream.State, ssType: SubSequenceType, attentionType: AttentionType] ~ {
handle: Handle = NARROW[self.streamData];
[state, ssType, attentionType] ← EntryGetStatus[handle, reset];
};
EntryGetStatus: ENTRY PROC [handle: Handle, reset: BOOL] RETURNS [state: XNSStream.State, ssType: SubSequenceType, attentionType: AttentionType] ~ {
ENABLE UNWIND => NULL;
finger: Finger;
buffer: SPPBuffer;
DO
IF handle.closed THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
finger ← handle.inputDequeue;
SELECT finger.state FROM
full, halfEmpty => {
finger.state ← halfEmpty;
buffer ← finger.buffer;
Note: we test finger.attention before anything else. In effect, this means the sst of an attention packet is ignored.
IF finger.attention THEN {
state ← attention; ssType ← handle.inputSSType;
TRUSTED { attentionType ← LOOPHOLE[finger.buffer.body.bytes[0]] };
IF reset THEN {
p: OBAttn ~ handle.recvdOBAttnList;
IF (p # NIL) AND (p.seqNum = handle.inputDequeueNum) THEN
handle.recvdOBAttnList ← p.next;
InputDequeue[handle] };
RETURN };
IF buffer.hdr2.sst # handle.inputSSType THEN {
IF reset THEN handle.inputSSType ← buffer.hdr2.sst;
RETURN [state~ssTypeChange, ssType~buffer.hdr2.sst, attentionType~0] };
IF finger.index < finger.bytes THEN
RETURN [state~open, ssType~handle.inputSSType, attentionType~0];
IF buffer.hdr2.connCtl.endOfMsg THEN {
IF reset THEN InputDequeue[handle];
RETURN [state~endOfMessage, ssType~handle.inputSSType, attentionType~0] };
InputDequeue[handle];
LOOP };
emptying => { WAIT handle.doneEmptying; LOOP };
empty => {
RETURN[state~open, ssType~handle.inputSSType, attentionType~0] };
ENDCASE => ERROR;
ENDLOOP;
};
FlushInput: PUBLIC PROC [self: IO.STREAM, wait: BOOLFALSE]
RETURNS [bytesSkipped: LONG CARDINAL ← 0] ~ {
handle: Handle = NARROW[self.streamData];
code: CompletionCode;
DO
[bytesSkipped, code] ← EntryFlushInput[handle, wait, bytesSkipped];
IF code = normal THEN RETURN;
SIGNAL Timeout[];
ENDLOOP;
};
EntryFlushInput: ENTRY PROC [handle: Handle, wait: BOOLFALSE, bytesSkipped: LONG CARDINAL] RETURNS [newSkipped: LONG CARDINAL, code: CompletionCode] ~ {
ENABLE UNWIND => NULL;
startTime: Pulses ← Now[];
finger: Finger;
buffer: SPPBuffer;
DO
IF handle.closed THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
finger ← handle.inputDequeue;
SELECT finger.state FROM
full, halfEmpty => {
finger.state ← halfEmpty;
IF finger.attention THEN RETURN [bytesSkipped, normal];
buffer ← finger.buffer;
IF buffer.hdr2.sst # handle.inputSSType THEN RETURN [bytesSkipped, normal];
IF finger.index < finger.bytes THEN {
bytesSkipped ← bytesSkipped + (finger.bytes - finger.index);
finger.index ← finger.bytes };
IF buffer.hdr2.connCtl.endOfMsg THEN RETURN [bytesSkipped, normal];
InputDequeue[handle];
LOOP };
emptying => {
WAIT handle.doneEmptying;
LOOP };
empty => {
IF NOT wait THEN RETURN [bytesSkipped, normal];
IF IsTimedOut[handle.getPulseOut, startTime] THEN
RETURN [bytesSkipped, timedOut];
WAIT handle.inputReady;
LOOP };
ENDCASE => ERROR;
ENDLOOP;
};
SendNow: PUBLIC PROC [self: IO.STREAM] ~ {
handle: Handle = NARROW[self.streamData];
EntrySendNow[handle];
};
EntrySendNow: ENTRY PROC [handle: Handle] ~ {
finger: Finger;
DO
IF handle.closed THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
finger ← handle.outputEnqueue;
SELECT finger.state FROM
filling => { WAIT handle.doneFilling; LOOP };
halfFull => { OutputEnqueue[handle~handle, endOfMessage~FALSE]; EXIT };
ENDCASE => EXIT;
ENDLOOP;
};
SendClose: PUBLIC PROC [self: IO.STREAM] RETURNS [ok: BOOLFALSE] ~ {
ENABLE ConnectionClosed => CONTINUE;
handle: Handle = NARROW[self.streamData];
sst: SubSequenceType;
code: CompletionCode;
code ← EntrySetSSType[handle~handle, ssType~XNSSPPTypes.endSST, sendNow~TRUE];
IF code # normal THEN RETURN;
DO
[code~code] ← EntryFlushInput[handle~handle, wait~TRUE, bytesSkipped~0];
IF code # normal THEN RETURN;
[ssType~sst] ← EntryGetStatus[handle~handle, reset~TRUE];
IF (sst = XNSSPPTypes.endReplySST)
OR (sst = XNSSPPTypes.endSST) THEN {
[] ← EntrySetSSType[handle~handle, ssType~XNSSPPTypes.endReplySST, sendNow~TRUE];
RETURN [TRUE] };
ENDLOOP;
};
SendCloseReply: PUBLIC PROC [self: IO.STREAM] RETURNS [ok: BOOLFALSE] ~ {
ENABLE ConnectionClosed => CONTINUE;
handle: Handle = NARROW[self.streamData];
sst: SubSequenceType;
code: CompletionCode;
code ← EntrySetSSType[handle~handle, ssType~XNSSPPTypes.endReplySST, sendNow~TRUE];
IF code # normal THEN RETURN;
DO
[code~code] ← EntryFlushInput[handle~handle, wait~TRUE, bytesSkipped~0];
IF code # normal THEN RETURN;
[ssType~sst] ← EntryGetStatus[handle~handle, reset~TRUE];
IF (sst = XNSSPPTypes.endReplySST) THEN RETURN [TRUE];
ENDLOOP;
};
Stream Procs
GetChar: PROC [self: STREAM] RETURNS [char: CHAR] ~ {
handle: Handle ~ NARROW[self.streamData];
code: CompletionCode;
DO
[char, code] ← EntryGetChar[handle, self];
IF code = normal THEN RETURN;
SIGNAL Timeout[];
ENDLOOP;
};
EntryGetChar: ENTRY PROC [handle: Handle, self: STREAM]
RETURNS [char: CHAR, code: CompletionCode] ~ {
ENABLE UNWIND => NULL;
finger: Finger;
buffer: SPPBuffer;
startTime: Pulses ← Now[];
DO
IF handle.closed THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
finger ← handle.inputDequeue;
SELECT finger.state FROM
full, halfEmpty => {
finger.state ← halfEmpty;
buffer ← finger.buffer;
IF finger.attention OR (buffer.hdr2.sst # handle.inputSSType) THEN
RETURN WITH ERROR IO.EndOfStream[self];
IF finger.index >= finger.bytes THEN {
IF buffer.hdr2.connCtl.endOfMsg THEN
RETURN WITH ERROR IO.EndOfStream[self];
InputDequeue[handle];
LOOP };
TRUSTED { char ← buffer.body.chars[finger.index] };
finger.index ← finger.index + 1;
RETURN [char, normal];
};
emptying => {
WAIT handle.doneEmptying;
LOOP };
empty => {
IF IsTimedOut[handle.getPulseOut, startTime] THEN
RETURN [char~, code~timedOut];
WAIT handle.inputReady;
LOOP };
ENDCASE => ERROR;
ENDLOOP;
};
GetBlock: PROC [self: STREAM, block: REF TEXT, startIndex: NAT, count: NAT] RETURNS [nBytesRead: NAT ← 0] ~ {
handle: Handle ~ NARROW[self.streamData];
finger: Finger ← NIL;
base: LONG POINTER ~ LOOPHOLE[block, LONG POINTER]+SIZE[TEXT[0]]; -- WORD SIZE DEPENDENT ????
stop: NAT ~ MIN[(startIndex+count), block.maxLength];
nBytes: NAT;
code: CompletionCode;
WHILE startIndex < stop DO
[finger, code] ← GetBlockNext[self, handle, finger];
SELECT code FROM
timedOut => { SIGNAL Timeout[]; LOOP };
normal => IF finger = NIL THEN EXIT;
ENDCASE => ERROR;
TRUSTED {
nBytes ← PrincOpsUtils.ByteBlt[
to~[
blockPointer~base,
startIndex~startIndex,
stopIndexPlusOne~stop],
from~[
blockPointer~@finger.buffer.body, -- WORD SIZE DEPENDENT ????
startIndex~finger.index,
stopIndexPlusOne~finger.bytes] ];
};
finger.index ← finger.index + nBytes;
nBytesRead ← nBytesRead + nBytes;
startIndex ← startIndex + nBytes;
block.length ← startIndex;
ENDLOOP;
IF finger # NIL THEN GetBlockLast[handle, finger];
};
UnsafeGetBlock: UNSAFE PROC [self: STREAM, block: IO.UnsafeBlock] RETURNS [nBytesRead: INT ← 0] ~ {
handle: Handle ~ NARROW[self.streamData];
finger: Finger ← NIL;
startIndex: INT ← block.startIndex;
stop: INT ~ block.startIndex+block.count;
nBytes: NAT;
code: CompletionCode;
WHILE startIndex < stop DO
[finger, code] ← GetBlockNext[self, handle, finger];
SELECT code FROM
timedOut => { SIGNAL Timeout[]; LOOP };
normal => IF finger = NIL THEN EXIT;
ENDCASE => ERROR;
TRUSTED {
nBytes ← PrincOpsUtils.ByteBlt[
to~[
blockPointer~block.base,
startIndex~startIndex,
stopIndexPlusOne~stop],
from~[
blockPointer~@finger.buffer.body, -- WORD SIZE DEPENDENT ????
startIndex~finger.index,
stopIndexPlusOne~finger.bytes] ];
};
finger.index ← finger.index + nBytes;
nBytesRead ← nBytesRead + nBytes;
startIndex ← startIndex + nBytes;
ENDLOOP;
IF finger # NIL THEN GetBlockLast[handle, finger];
};
GetBlockNext: ENTRY PROC [self: STREAM, handle: Handle, oldFinger: Finger]
RETURNS [Finger, CompletionCode] ~ {
ENABLE UNWIND => NULL;
finger: Finger ← NIL;
buffer: SPPBuffer;
startTime: Pulses ← Now[];
IF oldFinger # NIL THEN {
oldFinger.state ← halfEmpty;
BROADCAST handle.doneEmptying;
};
DO
IF handle.closed THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
finger ← handle.inputDequeue;
buffer ← finger.buffer;
SELECT finger.state FROM
full, halfEmpty => {
IF finger.attention OR (buffer.hdr2.sst # handle.inputSSType) THEN
RETURN[NIL, normal];
IF finger.index >= finger.bytes THEN {
IF buffer.hdr2.connCtl.endOfMsg THEN RETURN[NIL, normal];
InputDequeue[handle];
LOOP };
finger.state ← emptying;
RETURN[finger, normal] };
emptying => {
WAIT handle.doneEmptying;
LOOP };
empty => {
IF IsTimedOut[handle.getPulseOut, startTime] THEN RETURN [NIL, timedOut];
WAIT handle.inputReady;
LOOP };
ENDCASE => ERROR;
ENDLOOP;
};
GetBlockLast: ENTRY PROC [handle: Handle, oldFinger: Finger] ~ {
IF oldFinger = NIL THEN ERROR; -- can't happen
oldFinger.state ← halfEmpty;
BROADCAST handle.doneEmptying;
};
PutChar: PROC [self: STREAM, char: CHAR] ~ {
handle: Handle ~ NARROW[self.streamData];
code: CompletionCode;
DO
code ← EntryPutChar[handle, char];
IF code = normal THEN RETURN;
SIGNAL Timeout[];
ENDLOOP;
};
EntryPutChar: ENTRY PROC [handle: Handle, char: CHAR] RETURNS [CompletionCode] ~ {
ENABLE UNWIND => NULL;
finger: Finger;
startTime: Pulses ← Now[];
DO
IF handle.closed THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
fingerhandle.outputEnqueue;
SELECT finger.state FROM
empty => OutputMakeNotEmpty[handle];
filling => { WAIT handle.doneFilling; LOOP };
halfFull => NULL;
full, sending => {
IF IsTimedOut[handle.putPulseOut, startTime] THEN RETURN [timedOut];
WAIT handle.outputSpace; LOOP };
ENDCASE => ERROR;
IF finger.bytes = LAST[BufIndex] THEN {
OutputEnqueue[handle~handle, endOfMessage~FALSE];
LOOP };
TRUSTED { finger.buffer.body.chars[finger.bytes] ← char };
finger.bytes ← finger.bytes + 1;
RETURN [normal];
ENDLOOP;
};
PutBlock: PROC [self: STREAM, block: REF READONLY TEXT, startIndex: NAT, count: NAT] ~ {
handle: Handle ~ NARROW[self.streamData];
finger: Finger ← NIL;
base: LONG POINTER ~ LOOPHOLE[block, LONG POINTER]+SIZE[TEXT[0]]; -- WORD SIZE DEPENDENT ????
stop: NAT ~ MIN[(startIndex+count), block.length];
nBytes: NAT;
code: CompletionCode;
WHILE startIndex < stop DO
[finger, code] ← PutBlockNext[self, handle, finger];
SELECT code FROM
timedOut => { SIGNAL Timeout[]; LOOP };
normal => NULL;
ENDCASE => ERROR;
IF finger = NIL THEN ERROR; -- can't happen
TRUSTED {
nBytes ← PrincOpsUtils.ByteBlt[
to~[
blockPointer~@finger.buffer.body, -- WORD SIZE DEPENDENT ????
startIndex~finger.bytes,
stopIndexPlusOne~BufIndex.LAST],
from~[
blockPointer~base,
startIndex~startIndex,
stopIndexPlusOne~stop] ];
};
finger.bytes ← finger.bytes + nBytes;
startIndex ← startIndex + nBytes;
ENDLOOP;
IF finger # NIL THEN PutBlockLast[handle, finger];
};
UnsafePutBlock: PROC [self: STREAM, block: IO.UnsafeBlock] ~ {
handle: Handle ~ NARROW[self.streamData];
finger: Finger ← NIL;
startIndex: INT ← block.startIndex;
stop: INT ~ block.startIndex+block.count;
nBytes: NAT;
code: CompletionCode;
WHILE startIndex < stop DO
[finger, code] ← PutBlockNext[self, handle, finger];
SELECT code FROM
timedOut => { SIGNAL Timeout[]; LOOP };
normal => NULL;
ENDCASE => ERROR;
IF finger = NIL THEN ERROR; -- can't happen
TRUSTED {
nBytes ← PrincOpsUtils.ByteBlt[
to~[
blockPointer~@finger.buffer.body, -- WORD SIZE DEPENDENT ????
startIndex~finger.bytes,
stopIndexPlusOne~BufIndex.LAST],
from~[
blockPointer~block.base,
startIndex~startIndex,
stopIndexPlusOne~stop] ];
};
finger.bytes ← finger.bytes + nBytes;
startIndex ← startIndex + nBytes;
ENDLOOP;
IF finger # NIL THEN PutBlockLast[handle, finger];
};
PutBlockNext: ENTRY PROC [self: STREAM, handle: Handle, oldFinger: Finger]
RETURNS [Finger, CompletionCode] ~ {
ENABLE UNWIND => NULL;
finger: Finger ← NIL;
startTime: Pulses ← Now[];
IF oldFinger # NIL THEN {
oldFinger.state ← halfFull;
BROADCAST handle.doneFilling;
OutputEnqueue[handle~handle, endOfMessage~FALSE] };
DO
IF handle.closed THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
finger ← handle.outputEnqueue;
SELECT finger.state FROM
empty => OutputMakeNotEmpty[handle];
filling => { WAIT handle.doneFilling; LOOP };
halfFull => NULL;
full, sending => {
IF IsTimedOut[handle.putPulseOut, startTime] THEN RETURN [NIL, timedOut];
WAIT handle.outputSpace; LOOP };
ENDCASE => ERROR;
IF finger.bytes = LAST[BufIndex] THEN {
OutputEnqueue[handle~handle, endOfMessage~FALSE];
LOOP };
finger.state ← filling;
RETURN[finger, normal];
ENDLOOP;
};
PutBlockLast: ENTRY PROC [handle: Handle, finger: Finger] ~ {
ENABLE UNWIND => NULL;
finger.state ← halfFull;
BROADCAST handle.doneFilling;
};
EndOf: PROC [self: STREAM] RETURNS [BOOL] ~ {
handle: Handle ~ NARROW[self.streamData];
RETURN [EntryEndOf[handle]];
};
EntryEndOf: ENTRY PROC [handle: Handle] RETURNS [BOOL] ~ {
ENABLE UNWIND => NULL;
finger: Finger;
DO
IF handle.closed THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
finger ← handle.inputDequeue;
SELECT finger.state FROM
full => { finger.state ← halfEmpty; LOOP };
emptying => { WAIT handle.doneEmptying; LOOP };
halfEmpty => RETURN [
finger.attention OR (finger.buffer.hdr2.sst # handle.inputSSType)
OR ((finger.index = finger.bytes) AND finger.buffer.hdr2.connCtl.endOfMsg)
];
empty => RETURN [ FALSE ];
ENDCASE => ERROR;
ENDLOOP;
};
CharsAvail: PROC [self: STREAM, wait: BOOL] RETURNS [nChars: INT] ~ {
handle: Handle ~ NARROW[self.streamData];
code: CompletionCode;
DO
[nChars, code] ← EntryCharsAvail[handle, wait];
IF code = normal THEN RETURN;
SIGNAL Timeout[];
ENDLOOP;
};
EntryCharsAvail: ENTRY PROC [handle: Handle, wait: BOOL]
RETURNS [INT, CompletionCode] ~ {
ENABLE UNWIND => NULL;
finger: Finger;
startTime: Pulses ← Now[];
DO
IF handle.closed THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
finger ← handle.inputDequeue;
SELECT finger.state FROM
full, halfEmpty => {
finger.state ← halfEmpty;
IF finger.attention OR (finger.buffer.hdr2.sst # handle.inputSSType)
THEN RETURN[0, normal];
IF finger.index < finger.bytes
THEN RETURN [(finger.bytes - finger.index), normal];
IF finger.buffer.hdr2.connCtl.endOfMsg THEN RETURN[0, normal];
InputDequeue[handle];
LOOP };
emptying => {
WAIT handle.doneEmptying;
LOOP };
empty => {
IF NOT wait THEN RETURN [0, normal];
IF IsTimedOut[handle.getPulseOut, startTime] THEN RETURN [0, timedOut];
WAIT handle.inputReady;
LOOP };
ENDCASE => ERROR; -- Can't happen.
ENDLOOP;
};
Flush: PROC [self: STREAM] ~ {
handle: Handle ~ NARROW[self.streamData];
code: CompletionCode;
DO
code ← EntryFlush[handle];
IF code = normal THEN RETURN;
SIGNAL Timeout[];
ENDLOOP;
};
EntryFlush: ENTRY PROC [handle: Handle] RETURNS [CompletionCode] ~ {
ENABLE UNWIND => NULL;
flushNum: CARDINAL;
startTime: Pulses ← Now[];
Get rid of partly filled buffer.
DO
IF handle.closed THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
SELECT handle.outputEnqueue.state FROM
filling => { WAIT handle.doneFilling; LOOP };
halfFull => { OutputEnqueue[handle~handle, endOfMessage~FALSE]; EXIT };
empty, full, sending => EXIT;
ENDCASE => ERROR;
ENDLOOP;
Check for already acknowledged.
IF handle.outputDequeueNum = handle.outputEnqueueNum THEN RETURN [normal];
Make sure there's at least one unsent buffer to get its ackMe bit set.
IF handle.outputSendNum = handle.outputEnqueueNum THEN {
DO
IF handle.closed THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
SELECT handle.outputEnqueue.state FROM
empty => { OutputMakeNotEmpty[handle]; EXIT };
filling => { WAIT handle.doneFilling; LOOP };
halfFull => EXIT;
full, sending => {
IF IsTimedOut[handle.putPulseOut, startTime] THEN RETURN [timedOut];
WAIT handle.outputSpace; LOOP };
ENDCASE => ERROR;
ENDLOOP;
OutputEnqueue[handle~handle, endOfMessage~FALSE];
};
Wait until everything is acknowledged.
flushNum ← handle.outputEnqueueNum;
handle.flusherCnt ← handle.flusherCnt + 1;
DO
IF handle.closed THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
IF SignedDiff[handle.outputDequeueNum, flushNum] >= 0 THEN EXIT;
IF IsTimedOut[handle.putPulseOut, startTime] THEN RETURN [timedOut];
WAIT handle.flusherWakeup;
ENDLOOP;
handle.flusherCnt ← handle.flusherCnt - 1;
RETURN [normal];
};
Close: PROC [self: STREAM, abort: BOOL] = {
handle: Handle ~ NARROW[self.streamData];
EntryNotifyClosed[handle, localClose, "close called"];
FinishHandle[handle];
};
Output (Push) Processes
nSent: INT ← 0;
EntryPush: ENTRY PROC [handle: Handle, sentFinger: Finger]
RETURNS [sendFinger: Finger ← NIL, sendBuffer: XNSBuffer ← NIL] ~ {
If (sentFinger # NIL), we've just finished sending it. In that case, change its state from sending to full and BROADCAST handle.doneSending.
Then wait for a data buffer to send and return it.
Returned value of NIL indicates the handle is closed.
ENABLE UNWIND => NULL;
IF sentFinger # NIL THEN {
sentFinger.state ← full;
BROADCAST handle.doneSending };
DO
Check for closed:
IF handle.closed THEN RETURN;
Check for no buffer ready to send:
IF handle.outputSendNum = handle.outputEnqueueNum THEN {
WAIT handle.outputReady;
LOOP };
Check for no allocation:
IF SignedDiff[handle.recvdAllocNum, handle.outputSendNum] < 0 THEN {
WAIT handle.recvdNewAlloc;
LOOP };
All the checks succeeded, so we're now committed to sending the buffer:
{ halfMyBuffers, windowLeft, nSentNoAckReq: INTEGER;
sendAckReq: BOOL;
halfMyBuffers ← INTEGER[handle.outputBuffersAllocated / 2];
windowLeft ← SignedDiff[handle.recvdAllocNum, handle.outputSendNum];
nSentNoAckReq ← SignedDiff[handle.outputSendNum, handle.expectedAckNum];
sendAckReq ← ((windowLeft = halfMyBuffers) OR (windowLeft = 0)
OR (nSentNoAckReq >= halfMyBuffers) OR (handle.flusherCnt > 0));
sendFinger ← handle.outputSend;
ReadyOutputBuffer[handle~handle, buffer~sendFinger.buffer,
requestAck~sendAckReq];
sendFinger.state ← sending;
TRUSTED { sendBuffer ← LOOPHOLE[sendFinger.buffer] };
handle.outputSendNum ← handle.outputSendNum + 1;
handle.outputSend ← sendFinger.next;
IF sendAckReq THEN BROADCAST handle.mgrLongWakeup;
RETURN;
};
ENDLOOP;
};
Push: PROC [handle: Handle] ~ {
sendBuffer: XNSBuffer ← NIL;
sendFinger: Finger ← NIL;
Process.SetPriority[Process.priorityForeground];
DO
[sendFinger, sendBuffer] ← EntryPush[handle, sendFinger];
IF sendFinger = NIL THEN EXIT;
CheckUserBytes[sendBuffer, XNSSPPBuf.hdrBytes + sendFinger.bytes]; -- DEBUG
XNSSocketBackdoor.PutCached[sendBuffer];
ENDLOOP;
IF sendBuffer # NIL THEN ERROR; -- Can't happen ????
};
Retransmitter (Mgr) Process
nResent: INT ← 0;
MaybeFlushCache: INTERNAL PROC [handle: Handle, pulseOut: Pulses] ~ {
IF PulsesSince[handle.cacheFlushedTime] > pulseOut THEN {
handle.cacheFlushedTime ← Now[];
XNSSocketBackdoor.FlushCache[handle.socket] };
};
EntryMgr: ENTRY PROC [handle: Handle, sentFinger: Finger]
RETURNS [sendFinger: Finger ← NIL, sendBuffer: XNSBuffer ← NIL] ~ {
ENABLE UNWIND => NULL;
ackReqOutstanding, allocReqOutstanding, timedOutAck, timedOutProbe: BOOL;
b: SPPBuffer ← NIL;
IF sentFinger # NIL THEN
{ sentFinger.state ← full; BROADCAST handle.doneSending };
DO
Die if no activity for a long time.
IF PulsesSince[handle.recvdTime] >= handle.noActivityPulseOut
THEN NotifyClosed[handle, unknown, "no activity timeout"];
IF handle.closed THEN RETURN;
When direct input is implemented, this might be the place to call get on handle.socket with a getTimeout of 0 (don't wait). NO! Don't do it holding the ML! ????
Periodically flush the routing info cache associated with the socket.
MaybeFlushCache[handle, handle.cacheFlushPulseOut];
ackReqOutstanding ← (
I have unacknowledged packets -- THIS IS NOT NECESSARY ???? (next condition implies it) ????
(handle.outputDequeueNum # handle.outputSendNum)
AND I am expecting an acknowledgement
AND (SignedDiff[handle.expectedAckNum, handle.outputDequeueNum] > 0));
allocReqOutstanding ← (
I have unsent packets
(handle.outputSendNum # handle.outputEnqueueNum)
AND I have no allocation
AND (SignedDiff[handle.outputSendNum, handle.recvdAllocNum] > 0));
timedOutAck ← (
PulsesSince[handle.sentAckReqTime] > handle.waitForAckPulseOut);
timedOutProbe ← (
(PulsesSince[handle.recvdTime] > handle.probePulseOut)
AND (PulsesSince[handle.sentAckReqTime] > handle.probePulseOut));
SELECT TRUE FROM
(ackReqOutstanding AND timedOutAck) => {
An acknowledgement is overdue. Retransmit the packet.
SELECT handle.outputDequeue.state FROM
full => NULL;
sending => { WAIT handle.doneSending; LOOP };
ENDCASE => ERROR;
MaybeFlushCache[handle, 3*handle.waitForAckPulseOut]; -- What's the right timeout to use here ????
nResent ← nResent.SUCC;
sendFinger ← handle.outputDequeue;
sendFinger.state ← sending;
b ← sendFinger.buffer;
TRUSTED { sendBuffer ← LOOPHOLE[b] };
ReadyOutputBuffer[handle, b, TRUE];
RETURN };
(timedOutProbe OR (allocReqOutstanding AND timedOutAck)) => {
No activity for a long time, or else I want allocation and it's overdue. In either case, send a probe.
[nsb~sendBuffer, b~b] ← AllocateOutputBuffer[handle];
InitSystemOutputBuffer[handle, b];
ReadyOutputBuffer[handle, b, TRUE];
RETURN };
handle.mustSendAck => {
We've been asked to send an acknowledgement immediately. Send it.
[nsb~sendBuffer, b~b] ← AllocateOutputBuffer[handle];
InitSystemOutputBuffer[handle, b];
ReadyOutputBuffer[handle, b, FALSE];
RETURN };
(ackReqOutstanding OR allocReqOutstanding) => {
There's an outstanding request, so we should time out pretty soon.
WAIT handle.mgrShortWakeup;
LOOP };
ENDCASE => {
We don't expect anything to happen soon.
WAIT handle.mgrLongWakeup;
LOOP };
ENDLOOP;
};
Mgr: PROC [handle: Handle] ~ {
sendBuffer: XNSBuffer ← NIL;
sendFinger: Finger ← NIL;
Process.SetPriority[Process.priorityForeground];
DO
[sendFinger, sendBuffer] ← EntryMgr[handle, sendFinger];
SELECT TRUE FROM
(sendFinger # NIL) => {
CheckUserBytes[sendBuffer, XNSSPPBuf.hdrBytes + sendFinger.bytes]; -- DEBUG
XNSSocketBackdoor.PutCached[sendBuffer];
};
(sendBuffer # NIL) => {
CheckUserBytes[sendBuffer, XNSSPPBuf.hdrBytes]; -- DEBUG
XNSSocketBackdoor.PutCached[sendBuffer];
XNSSocket.FreeBuffer[sendBuffer];
};
ENDCASE => EXIT;
ENDLOOP;
};
Input (Pull) Process
nRecvd: INT ← 0;
nTooShort: INT ← 0;
nBadConnID: INT ← 0;
nBadRemoteConnID: INT ← 0;
GotNewAck: INTERNAL PROC [handle: Handle, newAckNum: CARDINAL] ~ {
Release sent packets that have been acknowledged.
NOTE: This messes with the OUTPUT queues from an INPUT process.
finger: Finger;
Sanity check.
IF SignedDiff[newAckNum, handle.outputSendNum] > 0 THEN {
This is a protocol error
RETURN };
WHILE SignedDiff[handle.outputDequeueNum, newAckNum] < 0 DO
IF handle.closed THEN RETURN;
finger ← handle.outputDequeue;
SELECT finger.state FROM
full => NULL;
sending => { -- extremely unlikely; occurs only when resending.
WAIT handle.doneSending;
LOOP };
ENDCASE => ERROR;
IF finger.attention
THEN {
Discard the attention finger.
temp: Finger;
nsb: XNSBuffer;
FOR temp ← finger, temp.next WHILE temp.next # finger DO NULL ENDLOOP;
temp.next ← finger.next;
IF finger.buffer = NIL THEN ERROR; -- Can't happen ????
TRUSTED { nsb ← LOOPHOLE[finger.buffer] };
XNSSocket.FreeBuffer[nsb];
finger.buffer ← NIL;
handle.outputIBAttnCnt ← handle.outputIBAttnCnt.PRED;
}
ELSE {
Advance the finger pointer, make the data buffer available.
finger.state ← empty;
BROADCAST handle.outputSpace;
};
handle.outputDequeueNum ← handle.outputDequeueNum + 1;
handle.outputDequeue ← finger.next;
ENDLOOP;
IF handle.flusherCnt > 0 THEN BROADCAST handle.flusherWakeup;
};
GotAckReq: INTERNAL PROC [handle: Handle, seqNum: CARDINAL, systemPacket: BOOL] ~ -- INLINE -- {
TODO: Work on "heWantsAlloc" heuristics ???? Do I want to sent an ack immediately if he's used up his allocation? ????
IF systemPacket OR (SignedDiff[seqNum, handle.inputEnqueueNum] < 0)
THEN {
handle.mustSendAck ← TRUE;
BROADCAST handle.mgrShortWakeup;
BROADCAST handle.mgrLongWakeup }
ELSE {
handle.heWantsAlloc ← TRUE };
};
GotNewAlloc: INTERNAL PROC [handle: Handle, newAllocNum: CARDINAL] ~ -- INLINE -- {
handle.recvdAllocNum ← newAllocNum;
BROADCAST handle.recvdNewAlloc;
};
GotOBAttn: INTERNAL PROC [handle: Handle, seqNum: CARDINAL, type: AttentionType] ~ {
p, prev, new: OBAttn;
seqDiff: INTEGER ~ SignedDiff[seqNum, handle.inputDequeueNum];
IF (seqDiff < 0) OR (seqDiff > 50) THEN RETURN; -- what's to use instead of 50 ????
prev ← NIL; p ← handle.recvdOBAttnList;
DO
IF p = NIL THEN EXIT;
IF p.seqNum = seqNum THEN RETURN;
IF p.seqNum > seqNum THEN EXIT;
prev ← p; p ← p.next;
ENDLOOP;
new ← NEW[OBAttnObject←[next~p, seqNum~seqNum, attentionType~type]];
IF prev = NIL THEN handle.recvdOBAttnList ← new ELSE prev.next ← new;
BROADCAST handle.recvdOBAttn;
};
GotData: INTERNAL PROC [handle: Handle, newSeqNum: CARDINAL, b: SPPBuffer, bytes: NAT] RETURNS [swapb: SPPBuffer] ~ -- INLINE -- {
PRECONDITION: NOT b.hdr2.connCtl.system
finger: Finger;
Check for duplicate.
IF SignedDiff[newSeqNum, handle.inputAckNum] < 0 THEN {
RETURN[b] };
Push inputEnqueue so there's room for new packet.
IF SignedDiff[newSeqNum, handle.inputDequeueNum] >= INT[handle.inputBuffersAllocated] THEN {
The new buffer won't fit, so drop it.
RETURN[b] };
WHILE (SignedDiff[newSeqNum, handle.inputEnqueueNum] >= 0) DO
handle.inputEnqueue ← handle.inputEnqueue.next;
handle.inputEnqueueNum ← handle.inputEnqueueNum + 1;
ENDLOOP;
Set finger to the slot where the new packet should go.
finger ← handle.inputAck;
THROUGH [0 .. SignedDiff[newSeqNum, handle.inputAckNum]) DO
finger ← finger.next;
ENDLOOP;
Put the packet there. If it's a duplicate we use the new one, but so what?
swapb ← finger.buffer;
finger.buffer ← b;
finger.bytes ← bytes;
finger.index ← 0;
finger.attention ← b.hdr2.connCtl.attn;
finger.state ← full;
Mark as acknowledged as many packets as possible and pass them to the client.
WHILE (handle.inputAck.state = full)
AND (handle.inputAckNum # handle.inputEnqueueNum) DO
BROADCAST handle.inputReady;
handle.inputAckNum ← handle.inputAckNum + 1;
handle.inputAck ← handle.inputAck.next;
ENDLOOP;
};
Receive: XNSSocketBackdoor.ReceiveProc
[handle: XNSSocket.Handle, b: XNSBuf.Buffer, clientData: REF ANY] RETURNS [XNSBuf.Buffer]
~ {
nRecvd ← nRecvd.SUCC; -- DEBUG
RETURN [ EntryReceive[NARROW[clientData], b] ] };
EntryReceive: ENTRY PROC [handle: Handle, nsb: XNSBuf.Buffer] RETURNS [XNSBuf.Buffer] ~ {
Note this sends error packets itself,
ENABLE UNWIND => NULL;
bytes: NAT;
IF handle.closed THEN RETURN [nsb];
bytes ← XNSSocket.GetUserBytes[nsb];
SELECT nsb.hdr1.type FROM
error => {
OPEN XNSErrorTypes;
eb: XNSErrorBuf.Buffer;
TRUSTED { eb ← LOOPHOLE[nsb] };
SELECT eb.hdr2.type FROM
noSocketErr, listenerRejectErr, protocolViolationErr => {
NotifyClosed[handle, remoteClose, "remote close"] };
ENDCASE => {
XNSSocketBackdoor.FlushCache[handle.socket] };
RETURN [nsb]
};
spp => {
b: XNSSPPBuf.Buffer;
newSeqNum: CARDINAL;
newAckNum: CARDINAL;
newAllocNum: CARDINAL;
TRUSTED { b ← LOOPHOLE[nsb] };
IF bytes < XNSSPPBuf.hdrBytes THEN {
XNSSocket.ReturnError[nsb, XNSErrorTypes.unspecifiedErr];
nsb ← NIL;
nTooShort ← nTooShort.SUCC; -- DEBUG
RETURN [nsb] };
IF b.hdr2.destConnID # handle.connectionID THEN {
IF b.hdr2.destConnID # unknownConnectionID THEN {
XNSSocket.ReturnError[nsb, XNSErrorTypes.protocolViolationErr];
nsb ← NIL;
nBadConnID ← nBadConnID.SUCC; -- DEBUG
RETURN [nsb] };
};
IF b.hdr2.sourceConnID # handle.remoteConnectionID THEN {
XNSSocket.ReturnError[nsb, XNSErrorTypes.protocolViolationErr];
nsb ← NIL;
nBadRemoteConnID ← nBadRemoteConnID.SUCC; -- DEBUG
RETURN [nsb] };
newSeqNum ← Endian.CardFromH[b.hdr2.seqNum];
newAckNum ← Endian.CardFromH[b.hdr2.ackNum];
newAllocNum ← Endian.CardFromH[b.hdr2.allocNum];
handle.recvdTime ← Now[];
IF SignedDiff[newAllocNum, handle.recvdAllocNum] > 0 THEN
GotNewAlloc[handle, newAllocNum];
IF SignedDiff[newAckNum, handle.outputDequeueNum] >= 0 THEN
GotNewAck[handle, newAckNum];
IF b.hdr2.connCtl.sendAck THEN
GotAckReq[handle, newSeqNum, b.hdr2.connCtl.system];
Handle a data packet.
IF NOT b.hdr2.connCtl.system THEN {
IF b.hdr2.connCtl.attn THEN TRUSTED {
GotOBAttn[handle, newSeqNum, LOOPHOLE[b.body.bytes[0]] ] };
b ← GotData[handle, newSeqNum, b, (bytes - XNSSPPBuf.hdrBytes)];
TRUSTED { nsb ← LOOPHOLE[b] };
};
RETURN [nsb];
};
ENDCASE => {
XNSSocket.ReturnError[nsb, XNSErrorTypes.invalidPacketTypeErr];
nsb ← NIL;
RETURN [nsb];
};
};
Pull: PROC [handle: Handle] ~ {
nsb: XNSBuf.Buffer ← NIL;
XNSSocket.SetGetTimeout[handle.socket, XNSSocket.waitForever];
XNSSocketBackdoor.SetDirectReceive[handle.socket, Receive, handle];
WHILE NOT handle.closed DO
IF (nsb ← XNSSocket.Get[handle.socket]) = NIL THEN LOOP;
IF (nsb ← Receive[handle.socket, nsb, handle]) = NIL THEN LOOP;
XNSSocket.FreeBuffer[nsb];
nsb ← NIL;
ENDLOOP;
XNSSocketBackdoor.SetDirectReceive[handle.socket, NIL, NIL];
handle ← NIL; -- for finalization
};
Chain of Streams
There are three times when we need to be able to locate all of the streams: 1) Smashing them after a rollback, 2) processing duplicate RFCs, and 3) making sure connectionIDs are unique.
globalLockHandle: Handle ← NEW[Object];
handleChain: Handle ← NIL;
AddNewHandle: ENTRY PROC [handle: Handle ← globalLockHandle, newHandle: Handle] ~ {
IF handle # globalLockHandle THEN ERROR;
newHandle.next ← handleChain;
handleChain ← newHandle;
};
RemoveOldHandle: ENTRY PROC [handle: Handle ← globalLockHandle, oldHandle: Handle] ~ {
IF handle # globalLockHandle THEN ERROR;
IF handleChain = oldHandle
THEN handleChain ← handleChain.next
ELSE FOR h: Handle ← handleChain, h.next DO
IF h.next = oldHandle THEN { h.next ← oldHandle.next; EXIT };
ENDLOOP; -- NIL fault if not on chain
oldHandle.next ← NIL; -- Help Finalization
};
FindExistingHandle: ENTRY PROC [handle: Handle ← globalLockHandle, remote: XNS.Address, remoteConnID: HWORD] RETURNS [existing: Handle] ~ {
IF handle # globalLockHandle THEN ERROR;
FOR existing ← handleChain, existing.next WHILE existing # NIL DO
IF (existing.remote = remote) AND (existing.remoteConnectionID = remoteConnID)
THEN EXIT;
ENDLOOP;
};
Rollback: Booting.RollbackProc = {
FOR handle: Handle ← handleChain, handle.next WHILE handle # NIL DO
IF NOT handle.closed THEN
EntryNotifyClosed[handle, localClose, "Rollback"];
ENDLOOP;
};
Unique Connection IDs
uniqueConnID: CARDINAL ← Basics.LowHalf[Now[]];
GetUniqueConnID: ENTRY PROC [handle: Handle ← globalLockHandle]
RETURNS [HWORD] ~ {
h: Handle;
IF handle # globalLockHandle THEN ERROR;
DO
uniqueConnID ← uniqueConnID + 1;
FOR h ← handleChain, h.next
WHILE (h # NIL) AND (h.connectionID # uniqueConnID) DO NULL ENDLOOP;
IF h = NIL THEN RETURN [Endian.HFromCard[uniqueConnID]];
ENDLOOP;
};
Chain of Listeners
listenerChain: Listener ← NIL;
AddNewListener: ENTRY PROC [handle: Handle ← globalLockHandle, newListener: Listener] ~ {
newListener.next ← listenerChain;
listenerChain ← newListener };
RemoveOldListener: ENTRY PROC [handle: Handle ← globalLockHandle, oldListener: Listener] ~ {
IF oldListener = listenerChain
THEN listenerChain ← listenerChain.next
ELSE FOR p: Listener ← listenerChain, p.next DO
IF p.next = oldListener THEN { p.next ← oldListener.next; EXIT }
ENDLOOP; -- NIL fault if not on chain
oldListener.next ← NIL;
};
Stream Finalization
Making dropped streams get finalized is a bit tricky. In order to get the use count to drop to 0, Push, Pull and Mgr must 1) notice when the stream dies, and 2) NIL out their copy of handle as they return. (The frame doesn't get recycled until after the JOIN.) The current code won't do anything with dropped streams unless they get closed by the other end.
Initialization appears in mainline code.
droppedStreams: INT ← 0;
finalizedStreams: INT ← 0;
streamFinalizerQueue: SafeStorage.FinalizationQueue ← SafeStorage.NewFQ[];
StreamFinalizer: PROC ~ {
Process.SetPriority[Process.priorityBackground];
DO
handle: Handle ← NARROW[SafeStorage.FQNext[streamFinalizerQueue]];
IF NOT handle.finished THEN { -- User forgot to call Destroy
IF NOT handle.closed THEN ERROR; -- Can't happen
FinishHandle[handle];
SafeStorage.EnableFinalization[handle];
droppedStreams ← droppedStreams.SUCC; }
ELSE { -- Normal end of life
RemoveOldHandle[oldHandle~handle];
finalizedStreams ← finalizedStreams.SUCC; };
handle ← NIL;
ENDLOOP;
};
Listener Finalization
Initialization appears in mainline code.
droppedListeners: INT ← 0;
finalizedListeners: INT ← 0;
listenerFinalizerQueue: SafeStorage.FinalizationQueue ← SafeStorage.NewFQ[];
ListenerFinalizer: PROC ~ {
Process.SetPriority[Process.priorityBackground];
DO
listener: Listener ← NARROW[SafeStorage.FQNext[listenerFinalizerQueue]];
IF NOT listener.destroyed THEN { -- User forgot to call Destroy
DestroyListener[listener];
SafeStorage.EnableFinalization[listener];
droppedListeners ← droppedListeners.SUCC; }
ELSE { -- Normal end of life
RemoveOldListener[oldListener~listener];
finalizedListeners ← finalizedListeners.SUCC; };
listener ← NIL;
ENDLOOP;
};
MainLine Code
Initialization for stream finalizer
{ established: BOOL;
established ← TRUE;
SafeStorage.EstablishFinalization[type~CODE[Object], npr~1, fq~streamFinalizerQueue
! SafeStorage.CantEstablishFinalization => { established ← FALSE; CONTINUE }];
IF NOT established THEN {
established ← TRUE;
SafeStorage.ReEstablishFinalization[type~CODE[Object], npr~1, fq~streamFinalizerQueue
! SafeStorage.CantEstablishFinalization => { established ← FALSE; CONTINUE }];
};
IF NOT established THEN ERROR };
TRUSTED { Process.Detach[FORK StreamFinalizer[]]; };
Initialization for listener finalizer
{ established: BOOL;
established ← TRUE;
SafeStorage.EstablishFinalization[type~CODE[ListenerObject], npr~1, fq~listenerFinalizerQueue
! SafeStorage.CantEstablishFinalization => { established ← FALSE; CONTINUE }];
IF NOT established THEN {
established ← TRUE;
SafeStorage.ReEstablishFinalization[type~CODE[ListenerObject], npr~1, fq~listenerFinalizerQueue
! SafeStorage.CantEstablishFinalization => { established ← FALSE; CONTINUE }];
};
IF NOT established THEN ERROR };
TRUSTED { Process.Detach[FORK ListenerFinalizer[]]; };
Initialization for rollback
Booting.RegisterProcs[r: Rollback];
}.