DIRECTORY
Basics USING [LowHalf],
BasicTime USING [GetClockPulses, MicrosecondsToPulses, Pulses, PulsesToMicroseconds],
Booting USING [RegisterProcs, RollbackProc],
Endian USING [CardFromH, HFromCard, HWORD],
IO USING [CreateStream, CreateStreamProcs, EndOfStream, STREAM, StreamProcs, UnsafeBlock],
PrincOpsUtils USING [ByteBlt],
Process USING [Abort, ConditionPointer, Detach, DisableTimeout, EnableAborts, Milliseconds, MsecToTicks, priorityBackground, priorityForeground, Seconds, SecondsToTicks, SetPriority, SetTimeout, Ticks],
Rope USING [ROPE],
SafeStorage USING [CantEstablishFinalization, EnableFinalization, EstablishFinalization, FinalizationQueue, FQNext, NewFQ, ReEstablishFinalization],
XNS USING [Address, Socket],
XNSBuf USING [Buffer],
XNSEchoBuf USING [Buffer, hdrBytes],
XNSErrorBuf USING [Buffer],
XNSErrorTypes USING [badChecksumErr, cantGetThereErr, invalidPacketTypeErr, listenerRejectErr, noSocketErr, protocolViolationErr, resourceLimitsErr, unspecifiedErr],
XNSSocket USING [AllocBuffer, Create, Destroy, FreeBuffer, Get, GetRemoteAddress, GetUserBytes, Handle, Kick, Milliseconds, Put, ReturnError, ReturnToSender, SetGetTimeout, SetRemoteAddress, SetUserBytes, waitForever],
XNSSocketBackdoor USING [FlushCache, PutCached, ReceiveProc, SetDirectReceive],
XNSSPPBuf USING [Buffer, ConnCtl, hdrBytes],
XNSSPPTypes USING [endReplySST, endSST],
XNSStream USING [AttentionType, CloseReason, FilterProc, ListenerProc, Milliseconds, State, SubSequenceType, waitForever],
XNSStreamPrivate USING [AttentionType, BufIndex, Finger, FingerObject, Handle, HWORD, OBAttn, OBAttnObject, Object, SignedDiff, SPPBuffer, SubSequenceType, unknownConnectionID, XNSBuffer];
XNSStreamImpl:
CEDAR
MONITOR
LOCKS handle
USING handle: Handle
IMPORTS Basics, BasicTime, Booting, Endian, IO, PrincOpsUtils, Process, SafeStorage, XNSStreamPrivate, XNSSocket, XNSSocketBackdoor
EXPORTS XNSStream
~ {
OPEN XNSStreamPrivate;
Temporary for debugging:
CheckUserBytes:
PROC [buffer: XNSBuffer, bytes:
CARDINAL] ~ {
IF bytes # XNSSocket.GetUserBytes[buffer] THEN ERROR };
Signals and Errors
ConnectionClosed: PUBLIC ERROR [why: XNSStream.CloseReason, text: Rope.ROPE] ~ CODE;
Timeout: PUBLIC SIGNAL ~ CODE;
CompletionCode: TYPE ~ { normal, timedOut };
Byte Stream Interface
STREAM: TYPE ~ IO.STREAM;
streamProcs:
REF
IO.StreamProcs ~
IO.CreateStreamProcs [
variety~$inputOutput, class~$XNS,
getChar~GetChar,
getBlock~GetBlock,
unsafeGetBlock~UnsafeGetBlock,
endOf~EndOf,
charsAvail~CharsAvail,
putChar~PutChar,
putBlock~PutBlock,
unsafePutBlock~UnsafePutBlock,
flush~Flush,
close~Close ];
Times
Milliseconds: TYPE ~ XNSStream.Milliseconds;
waitForever: Milliseconds ~ XNSStream.waitForever;
Pulses: TYPE ~ BasicTime.Pulses;
oneSecondOfPulses: Pulses ~ BasicTime.MicrosecondsToPulses[1000000];
MsecToPulses:
PROC [msec: Milliseconds]
RETURNS [Pulses] ~ {
IF msec >= waitForever
THEN RETURN [Pulses.LAST]
ELSE RETURN [BasicTime.MicrosecondsToPulses[1000*msec]] };
PulsesSince:
PROC [then: Pulses]
RETURNS [Pulses] ~
INLINE {
RETURN [BasicTime.GetClockPulses[] - then] };
TimeoutTicksFromPulses:
PROC[pulses: Pulses]
RETURNS[ticks: Process.Ticks] ~ {
The idea is to set the timeout of a condition variable so that the specified number of pulses (+ epsilon) will have elapsed when the condition times out. Epsilon here is 10%, which is pretty crude. I should do some experiments to see how much adjustment Cedar really requires here.
usec: LONG CARDINAL ~ BasicTime.PulsesToMicroseconds[pulses];
msec: Milliseconds;
msec ← (usec + usec/10) / 1000;
IF msec < Process.Milliseconds.
LAST
THEN
RETURN[ Process.MsecToTicks[Process.Milliseconds[msec]] ];
RETURN[ Process.SecondsToTicks[Process.Seconds[msec/1000]] ];
};
Now:
PROC
RETURNS [Pulses] ~
INLINE {
RETURN [BasicTime.GetClockPulses[]] };
IsTimedOut:
PROC [timeout, then: Pulses]
RETURNS [
BOOL] ~
INLINE {
RETURN [(BasicTime.GetClockPulses[] - then) > timeout] };
Buffer Enqueue and Dequeue Utilities
defaultConnCtl: XNSSPPBuf.ConnCtl ~ [
system~FALSE,
sendAck~FALSE,
attn~FALSE,
endOfMsg~FALSE,
filler~0];
defaultSystemConnCtl: XNSSPPBuf.ConnCtl ~ [
system~TRUE,
sendAck~FALSE,
attn~FALSE,
endOfMsg~FALSE,
filler~0];
defaultAttnConnCtl: XNSSPPBuf.ConnCtl ~ [
system~FALSE,
sendAck~FALSE,
attn~TRUE,
endOfMsg~FALSE,
filler~0];
AllocateOutputBuffer:
INTERNAL
PROC [handle: Handle]
RETURNS [nsb: XNSBuffer, b: SPPBuffer] ~ {
Allocate an SPP buffer for handle.
Initialize type and connectionID fields.
nsb ← XNSSocket.AllocBuffer[handle.socket];
TRUSTED { b ← LOOPHOLE[nsb] };
b.hdr1.type ← spp;
b.hdr2.sourceConnID ← handle.connectionID;
b.hdr2.destConnID ← handle.remoteConnectionID;
};
OutputMakeNotEmpty:
INTERNAL
PROC [handle: Handle] ~ {
Make handle.outputEnqueue nonempty.
Make sure the buffer is actually allocated and set its connCtl, sst and seqNum fields.
PRECONDITION: handle.outputEnqueue.state = empty
finger: Finger ~ handle.outputEnqueue;
buffer: SPPBuffer ← finger.buffer;
IF finger.state # empty THEN ERROR; -- Can't happen.
IF buffer = NIL THEN finger.buffer ← buffer ← AllocateOutputBuffer[handle].b;
buffer.hdr2.connCtl ←
IF finger.attention THEN defaultAttnConnCtl ELSE defaultConnCtl;
buffer.hdr2.sst ← handle.outputSSType;
buffer.hdr2.seqNum ← Endian.HFromCard[handle.outputEnqueueNum];
finger.state ← halfFull;
finger.index ← finger.bytes ← 0;
};
InitSystemOutputBuffer:
INTERNAL
PROC [handle: Handle, buffer: SPPBuffer] ~ {
Initialize output buffer for system packet. Performs the same buffer initialization as OutputMakeNotEmpty above, and sets buffer length field.
buffer.hdr2.connCtl ← defaultSystemConnCtl;
buffer.hdr2.sst ← handle.outputSSType;
buffer.hdr2.seqNum ← Endian.HFromCard[handle.outputSendNum];
TRUSTED { XNSSocket.SetUserBytes[LOOPHOLE[buffer], XNSSPPBuf.hdrBytes] };
};
OutputEnqueue:
INTERNAL
PROC [handle: Handle, endOfMessage:
BOOL] ~ {
Enqueue the finger+buffer pointed to by handle.outputEnqueue; set its length field, set its state to full and NOTIFY that an output buffer is ready.
PRECONDITION: handle.outputEnqueue.state = halfFull
finger: Finger ~ handle.outputEnqueue;
IF finger.state # halfFull THEN ERROR; -- Can't happen.
finger.buffer.hdr2.connCtl.endOfMsg ← endOfMessage;
TRUSTED { XNSSocket.SetUserBytes[LOOPHOLE[finger.buffer],
XNSSPPBuf.hdrBytes + finger.bytes] };
finger.state ← full;
IF finger.index # 0 THEN ERROR; -- Can't happen.
handle.outputEnqueue ← handle.outputEnqueue.next;
handle.outputEnqueueNum ← handle.outputEnqueueNum + 1;
NOTIFY handle.outputReady;
};
ReadyOutputBuffer:
INTERNAL
PROC [handle: Handle, buffer: SPPBuffer,
requestAck:
BOOL] ~ {
Fill in time-dependent fields of output buffer before sending it.
Called before handle.outputSend pointer is advanced.
expectedAckNum, allocNum: CARDINAL;
Acknowledgement request ...
IF requestAck
THEN {
expectedAckNum ← Endian.CardFromH[buffer.hdr2.seqNum];
IF (
NOT buffer.hdr2.connCtl.system)
AND (expectedAckNum = handle.outputSendNum)
THEN expectedAckNum ← expectedAckNum + 1;
handle.expectedAckNum ← expectedAckNum;
buffer.hdr2.connCtl.sendAck ← TRUE;
handle.sentAckReqTime ← Now[] };
Acknowledgement ...
buffer.hdr2.ackNum ← Endian.HFromCard[handle.inputAckNum];
handle.mustSendAck ← FALSE;
Allocation ...
allocNum ← handle.inputDequeueNum + handle.inputBuffersAllocated - 1;
buffer.hdr2.allocNum ← Endian.HFromCard[allocNum];
IF allocNum # handle.sentAllocNum
THEN {
handle.heWantsAlloc ← FALSE; -- I THINK THIS IS BOGUS ???? Do we need sentAllocNum at all ????
handle.sentAllocNum ← allocNum };
THESE CAN'T HURT, BUT WHAT ARE THEY DOING HERE ???? I'll find out!
buffer.hdr1.type ← spp;
IF buffer.hdr1.type # spp THEN ERROR;
buffer.hdr2.sourceConnID ← handle.connectionID;
IF buffer.hdr2.sourceConnID # handle.connectionID THEN ERROR;
buffer.hdr2.destConnID ← handle.remoteConnectionID;
IF buffer.hdr2.destConnID # handle.remoteConnectionID THEN ERROR;
};
MakeCopyOfXNSBuffer:
PROC [sH: XNSSocket.Handle, nsb: XNSBuffer]
RETURNS[copy: XNSBuffer] ~ {
bytes: CARDINAL ~ Endian.CardFromH[nsb.hdr1.length];
copy ← XNSSocket.AllocBuffer[sH];
TRUSTED {
[] ← PrincOpsUtils.ByteBlt[
to ~ [
blockPointer ~ @copy.hdr1,
startIndex ~ 0, stopIndexPlusOne ~ bytes],
from ~ [
blockPointer ~ @nsb.hdr1,
startIndex ~ 0, stopIndexPlusOne ~ bytes]
];
};
};
InputDequeue:
INTERNAL
PROC [handle: Handle] ~
-- INLINE -- {
finger: Finger;
sendAllocNow, cantPiggyback: BOOL;
finger ← handle.inputDequeue;
finger.state ← empty;
finger.attention ← FALSE;
handle.inputDequeue ← finger.next;
handle.inputDequeueNum ← handle.inputDequeueNum + 1;
Check whether the other side has asked for allocation and a reasonable number of input buffers have become available. If so, make sure the other side gets the allocation by sending a system packet if necessary:
sendAllocNow ←
He wants some allocation AND
handle.heWantsAlloc AND
I have enough input buffers to give him a reasonable allocation (i.e., at most half my receive buffers are occupied)
(SignedDiff[handle.inputEnqueueNum, handle.inputDequeueNum] <= (handle.inputBuffersAllocated/2));
IF sendAllocNow THEN handle.heWantsAlloc ← FALSE;
cantPiggyback ←
There's no data packet at all OR
(handle.outputSendNum = handle.outputEnqueueNum) OR
I'm waiting for allocation from him
(SignedDiff[handle.recvdAllocNum, handle.outputSendNum] < 0);
IF sendAllocNow
AND cantPiggyback
THEN {
nsb: XNSBuffer;
b: SPPBuffer;
[nsb, b] ← AllocateOutputBuffer[handle];
InitSystemOutputBuffer[handle, b];
ReadyOutputBuffer[handle, b, FALSE];
CheckUserBytes[nsb, XNSSPPBuf.hdrBytes]; -- DEBUG
XNSSocketBackdoor.PutCached[handle.socket, nsb];
XNSSocket.FreeBuffer[handle.socket, nsb];
};
};
NotifyClosed:
INTERNAL
PROC [handle: Handle, closeReason: XNSStream.CloseReason, closeText: Rope.
ROPE] ~ {
IF
NOT handle.closed
THEN {
handle.closeReason ← closeReason;
handle.closeText ← closeText;
handle.closed ← TRUE };
BROADCAST handle.inputReady;
BROADCAST handle.doneEmptying; -- ????
BROADCAST handle.outputSpace;
BROADCAST handle.outputReady;
BROADCAST handle.recvdNewAlloc;
BROADCAST handle.doneFilling; -- ????
BROADCAST handle.doneSending; -- ????
BROADCAST handle.mgrShortWakeup;
BROADCAST handle.mgrLongWakeup;
BROADCAST handle.flusherWakeup;
BROADCAST handle.recvdOBAttn;
XNSSocket.Kick[handle.socket];
EntryNotifyClosed:
ENTRY
PROC [handle: Handle, closeReason: XNSStream.CloseReason, closeText: Rope.
ROPE] ~ {
NotifyClosed[handle, closeReason, closeText] };
Creation
MakeBufferRing:
PROC [sH: XNSSocket.Handle, nBuffers:
CARDINAL]
RETURNS [finger: Finger ←
NIL] ~ {
tail: Finger ← NIL;
THROUGH [1..nBuffers]
DO
finger ← NEW[ FingerObject ← [next~finger] ];
IF tail = NIL THEN tail ← finger;
ENDLOOP;
tail.next ← finger;
};
FreeBufferRing:
PROC [sH: XNSSocket.Handle, finger: Finger] ~ {
WHILE finger #
NIL
DO
IF finger.buffer #
NIL
THEN {
nsb: XNSBuffer;
TRUSTED { nsb ← LOOPHOLE[finger.buffer] };
XNSSocket.FreeBuffer[sH, nsb];
finger.buffer ← NIL };
{ temp: Finger ~ finger.next; finger.next ← NIL; finger ← temp };
ENDLOOP;
};
InitConditions:
PROC [handle: Handle] ~ {
TRUSTED {
Process.EnableAborts[@handle.inputReady];
Process.EnableAborts[@handle.doneEmptying]; -- ????
Process.EnableAborts[@handle.outputSpace];
Process.EnableAborts[@handle.outputReady];
Process.EnableAborts[@handle.recvdNewAlloc];
Process.EnableAborts[@handle.doneFilling]; -- ????
Process.EnableAborts[@handle.doneSending]; -- ????
Process.EnableAborts[@handle.mgrShortWakeup]; -- ????
Process.EnableAborts[@handle.mgrLongWakeup]; -- ????
Process.EnableAborts[@handle.flusherWakeup];
Process.EnableAborts[@handle.recvdOBAttn];
};
};
InitTimeouts:
PROC [handle: Handle, getTimeout, putTimeout: Milliseconds] ~ {
handle.getTimeout ← getTimeout;
handle.getPulseOut ← MsecToPulses[getTimeout];
IF getTimeout = waitForever
THEN
TRUSTED {
Process.DisableTimeout[@handle.inputReady] }
ELSE
TRUSTED {
Process.SetTimeout[@handle.inputReady, TimeoutTicksFromPulses[handle.getPulseOut]] };
handle.putTimeout ← putTimeout;
handle.putPulseOut ← MsecToPulses[putTimeout];
IF putTimeout = waitForever
THEN
TRUSTED {
Process.DisableTimeout[@handle.outputSpace] }
ELSE
TRUSTED {
Process.SetTimeout[@handle.outputSpace, TimeoutTicksFromPulses[handle.putPulseOut]] };
handle.cacheFlushPulseOut ← 30 * oneSecondOfPulses;
handle.roundTripPulses ← oneSecondOfPulses;
UpdateTimeouts[handle];
};
UpdateTimeouts:
PROC [handle: Handle] ~ {
TODO: None of this stuff is very good. I don't know any good retransmission heuristics, and I guess I've got to learn some.
handle.waitForAckPulseOut ← 3 * handle.roundTripPulses;
handle.waitForAllocPulseOut ← handle.waitForAckPulseOut;
TRUSTED {
Process.SetTimeout[@handle.mgrShortWakeup, TimeoutTicksFromPulses[handle.waitForAckPulseOut]] };
handle.probePulseOut ← 30 * handle.roundTripPulses;
TRUSTED {
Process.SetTimeout[@handle.mgrLongWakeup, TimeoutTicksFromPulses[handle.probePulseOut]] };
handle.noActivityPulseOut ← 100 * handle.roundTripPulses;
};
MakeHandle:
PROC [socket: XNSSocket.Handle, connectionID, remoteConnectionID:
HWORD, sendBuffers, recvBuffers:
CARDINAL, getTimeout, putTimeout: Milliseconds]
RETURNS [handle: Handle] ~ {
handle ← NEW[Object];
handle.socket ← socket;
handle.remote ← XNSSocket.GetRemoteAddress[socket];
handle.recvdTime ← Now[];
handle.cacheFlushedTime ← Now[];
handle.sentAckReqTime ← Now[];
handle.connectionID ← connectionID;
handle.remoteConnectionID ← remoteConnectionID;
InitConditions[handle];
InitTimeouts[handle, getTimeout, putTimeout];
{ f: Finger ~ MakeBufferRing[handle.socket, sendBuffers];
handle.outputEnqueue ← handle.outputSend ← handle.outputDequeue ← f;
handle.outputBuffersAllocated ← sendBuffers };
{ f: Finger ~ MakeBufferRing[handle.socket, recvBuffers];
handle.inputEnqueue ← handle.inputAck ← handle.inputDequeue ← f;
handle.inputBuffersAllocated ← recvBuffers };
handle.pull ← FORK Pull[handle];
handle.mgr ← FORK Mgr[handle];
handle.push1 ← FORK Push[handle];
handle.push2 ← FORK Push[handle];
AddNewHandle[newHandle~handle];
SafeStorage.EnableFinalization[handle];
};
FinishHandle:
PROC [handle: Handle] ~ {
IF NOT handle.closed THEN ERROR;
IF handle.finished THEN RETURN;
TRUSTED {
JOIN handle.push1; handle.push1 ← NIL;
JOIN handle.push2; handle.push2 ← NIL;
JOIN handle.mgr; handle.mgr ← NIL;
JOIN handle.pull; handle.pull ← NIL };
FreeBufferRing[handle.socket, handle.outputEnqueue];
handle.outputEnqueue ← handle.outputSend ← handle.outputDequeue ← NIL;
FreeBufferRing[handle.socket, handle.inputEnqueue];
handle.inputEnqueue ← handle.inputAck ← handle.inputDequeue ← NIL;
XNSSocket.Destroy[handle.socket];
handle.finished ← TRUE };
SendRFC:
PROC [socket: XNSSocket.Handle, connectionID:
HWORD] ~ {
nsb: XNSBuffer;
b: SPPBuffer;
nsb ← XNSSocket.AllocBuffer[socket];
TRUSTED { b ← LOOPHOLE[nsb] };
b.hdr1.type ← spp;
b.hdr2.connCtl ← defaultSystemConnCtl;
b.hdr2.connCtl.sendAck ← TRUE;
b.hdr2.sst ← 0;
b.hdr2.sourceConnID ← connectionID;
b.hdr2.destConnID ← unknownConnectionID;
b.hdr2.seqNum ← Endian.HFromCard[0];
b.hdr2.ackNum ← Endian.HFromCard[0];
b.hdr2.allocNum ← Endian.HFromCard[0];
XNSSocket.SetUserBytes[nsb, XNSSPPBuf.hdrBytes];
XNSSocket.Put[handle~socket, b~nsb];
};
Create:
PUBLIC
PROC [remote:
XNS.Address, getTimeout, putTimeout: Milliseconds ← waitForever]
RETURNS [
STREAM] ~ {
nsb: XNSBuffer ← NIL;
socket: XNSSocket.Handle ← NIL;
handle: Handle ← NIL;
retransmissionTime: Milliseconds ← 500;
connID: HWORD;
getTimeout ← MIN[getTimeout, waitForever];
putTimeout ← MIN[putTimeout, waitForever];
BEGIN
ENABLE
UNWIND => {
IF nsb # NIL THEN { XNSSocket.FreeBuffer[socket, nsb]; nsb ← NIL };
XNSSocket.Destroy[socket] };
socket ← XNSSocket.Create[remote~remote, sendBuffers~10, recvBuffers~14]; -- buffers ????
connID ← GetUniqueConnID[];
FOR nTries:
CARDINAL
IN [1..10]
DO
IF nTries > 5 THEN retransmissionTime ← 2*retransmissionTime;
XNSSocket.SetGetTimeout[socket, retransmissionTime];
SendRFC[socket, connID];
nsb ← XNSSocket.Get[socket];
IF nsb = NIL THEN LOOP;
SELECT nsb.hdr1.type
FROM
spp => {
b: SPPBuffer;
TRUSTED { b ← LOOPHOLE[nsb] };
IF b.hdr2.destConnID # connID
THEN {
XNSSocket.ReturnError[socket, nsb, XNSErrorTypes.protocolViolationErr];
nsb ← NIL;
LOOP };
IF b.hdr1.source.host # remote.host
THEN {
XNSSocket.ReturnError[socket, nsb, XNSErrorTypes.protocolViolationErr];
nsb ← NIL;
LOOP };
XNSSocket.SetRemoteAddress[socket, b.hdr1.source];
handle ← MakeHandle[socket~socket, connectionID~connID, remoteConnectionID~b.hdr2.sourceConnID, sendBuffers~6, recvBuffers~6, getTimeout~getTimeout, putTimeout~putTimeout];
nsb ← Receive[socket, nsb, handle];
IF nsb # NIL THEN { XNSSocket.FreeBuffer[socket, nsb]; nsb ← NIL };
RETURN [IO.CreateStream[streamProcs~streamProcs, streamData~handle]];
};
error => {
OPEN XNSErrorTypes;
eb: XNSErrorBuf.Buffer;
TRUSTED { eb ← LOOPHOLE[nsb] };
SELECT eb.hdr2.type
FROM
badChecksumErr, resourceLimitsErr => LOOP;
noSocketErr, listenerRejectErr, invalidPacketTypeErr, protocolViolationErr =>
ERROR ConnectionClosed[remoteReject, "rejected"];
cantGetThereErr => ERROR ConnectionClosed[noRoute, "no route"];
ENDCASE => ERROR ConnectionClosed[unknown, "error reply to RFC"];
};
ENDCASE => {
XNSSocket.ReturnError[socket, nsb, XNSErrorTypes.invalidPacketTypeErr];
nsb ← NIL;
LOOP };
ENDLOOP;
ERROR ConnectionClosed[noResponse, "no response"];
END;
};
Listener: TYPE ~ REF ListenerObject;
ListenerObject:
PUBLIC
TYPE ~
RECORD [
next: Listener,
destroyed: BOOL ← FALSE,
listenProcess: PROCESS ];
CreateListener:
PUBLIC PROC [
socket: XNS.Socket,
worker: XNSStream.ListenerProc,
getTimeout, putTimeout: Milliseconds ← waitForever,
filter: XNSStream.FilterProc ← NIL, -- NIL => Accept all requests
echoFilter: XNSStream.FilterProc ← NIL] -- NIL => Answer all echos
RETURNS [listener: Listener]
~ {
listener ← NEW[ ListenerObject ← [listenProcess~
FORK Listen[socket, worker, getTimeout, putTimeout, filter, echoFilter]] ];
AddNewListener[newListener~listener];
SafeStorage.EnableFinalization[listener];
};
DestroyListener:
PUBLIC
PROC [listener: Listener] ~ {
IF listener.destroyed THEN RETURN;
TRUSTED {
Process.Abort[listener.listenProcess];
JOIN listener.listenProcess };
listener.destroyed ← TRUE;
};
Listen:
PROC [
socket:
XNS.Socket,
worker: XNSStream.ListenerProc,
getTimeout, putTimeout: Milliseconds,
filter: XNSStream.FilterProc,
echoFilter: XNSStream.FilterProc]
~ {
socketHandle: XNSSocket.Handle ←
XNSSocket.Create[getTimeout~XNSSocket.waitForever, local~socket];
nsb: XNSBuffer ← NIL;
bytes: NAT;
{
ENABLE
UNWIND => {
IF nsb #
NIL
THEN
{ XNSSocket.FreeBuffer[handle~socketHandle, b~nsb]; nsb ← NIL };
XNSSocket.Destroy[socketHandle] };
DO
IF nsb #
NIL
THEN
{ XNSSocket.FreeBuffer[handle~socketHandle, b~nsb]; nsb ← NIL };
IF (nsb ← XNSSocket.Get[socketHandle]) = NIL THEN LOOP;
bytes ← XNSSocket.GetUserBytes[nsb];
SELECT nsb.hdr1.type
FROM
echo => {
b: XNSEchoBuf.Buffer;
IF bytes < XNSEchoBuf.hdrBytes THEN LOOP;
TRUSTED { b ← LOOPHOLE[nsb] };
IF b.hdr2.type # request THEN LOOP;
IF (echoFilter # NIL) AND NOT echoFilter[nsb.hdr1.source] THEN LOOP;
b.hdr2.type ← reply;
{ XNSSocket.ReturnToSender[handle~socketHandle, b~nsb]; nsb ← NIL };
};
spp => {
b: SPPBuffer;
streamSocketHandle: XNSSocket.Handle;
stream: STREAM;
handle: Handle;
IF bytes < XNSSPPBuf.hdrBytes THEN LOOP;
TRUSTED { b ← LOOPHOLE[nsb] };
IF (b.hdr2.seqNum # 0)
OR (b.hdr2.destConnID # unknownConnectionID)
OR (b.hdr2.sourceConnID = unknownConnectionID)
THEN {
XNSSocket.ReturnError[socketHandle, nsb, XNSErrorTypes.protocolViolationErr];
nsb ← NIL;
LOOP };
IF FindExistingHandle[remote~nsb.hdr1.source, remoteConnID~b.hdr2.sourceConnID] # NIL THEN LOOP;
IF (filter #
NIL)
AND
NOT filter[nsb.hdr1.source]
THEN {
XNSSocket.ReturnError[socketHandle, nsb, XNSErrorTypes.listenerRejectErr];
nsb ← NIL;
LOOP };
streamSocketHandle ← XNSSocket.Create[remote~nsb.hdr1.source, sendBuffers~10, recvBuffers~14]; -- how many buffers really? what should the timeouts be? ????
handle ← MakeHandle[socket~streamSocketHandle, connectionID~GetUniqueConnID[], remoteConnectionID~b.hdr2.sourceConnID, sendBuffers~6, recvBuffers~6, getTimeout~getTimeout, putTimeout~putTimeout]; -- what should buffers be really? ????
stream ← IO.CreateStream[streamProcs~streamProcs, streamData~handle];
b.hdr2.connCtl.sendAck ← TRUE; -- force us to reply
nsb ← Receive[streamSocketHandle, nsb, handle];
TRUSTED {
Process.Detach[FORK worker[stream~stream, remote~b.hdr1.source]] };
};
ENDCASE;
ENDLOOP;
};
};
Byte Stream Interface
GetTimeouts:
PUBLIC
PROC [self:
IO.
STREAM]
RETURNS [getTimeout, putTimeout: Milliseconds] ~ {
handle: Handle = NARROW[self.streamData];
[getTimeout, putTimeout] ← EntryGetTimeouts[handle] };
EntryGetTimeouts:
ENTRY
PROC [handle: Handle]
RETURNS [getTimeout, putTimeout: Milliseconds] ~ {
IF handle.closed
THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
getTimeout ← handle.getTimeout;
putTimeout ← handle.putTimeout;
};
SetTimeouts:
PUBLIC PROC [self:
IO.
STREAM,
getTimeout, putTimeout: Milliseconds ← waitForever] ~ {
handle: Handle = NARROW[self.streamData];
EntrySetTimeouts[handle, getTimeout, putTimeout];
};
EntrySetTimeouts:
ENTRY
PROC [handle: Handle,
getTimeout, putTimeout: Milliseconds] ~ {
IF handle.closed
THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
InitTimeouts[handle, getTimeout, putTimeout];
};
SetSSType:
PUBLIC
PROC [self:
STREAM, ssType: SubSequenceType] ~ {
handle: Handle = NARROW[self.streamData];
code: CompletionCode;
DO
code ← EntrySetSSType[handle, ssType];
IF code = normal THEN RETURN;
SIGNAL Timeout[];
ENDLOOP;
};
EntrySetSSType:
ENTRY
PROC [handle: Handle, ssType: SubSequenceType,
sendNow:
BOOL ←
FALSE]
RETURNS [CompletionCode ← normal] ~ {
Note: After changing ssType it's necessary to have a halfFull buffer as handle.inputEnqueue^; this ensures that the sequence
SetSSType[h, t1]; SetSSType[h, t2];
will work correctly.
ENABLE UNWIND => NULL;
finger: Finger;
startTime: Pulses ← Now[];
IF ssType = handle.outputSSType THEN RETURN;
DO
IF handle.closed
THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
finger ← handle.outputEnqueue;
SELECT finger.state
FROM
empty => EXIT;
sending, full => {
IF IsTimedOut[handle.putPulseOut, startTime] THEN RETURN [timedOut];
WAIT handle.outputSpace; LOOP };
filling => { WAIT handle.doneFilling; LOOP };
halfFull => { OutputEnqueue[handle~handle, endOfMessage~FALSE]; LOOP };
ENDCASE => ERROR;
ENDLOOP;
handle.outputSSType ← ssType;
OutputMakeNotEmpty[handle]; -- gets new handle.outputSSType
IF sendNow THEN OutputEnqueue[handle~handle, endOfMessage~FALSE];
};
SendEndOfMessage:
PUBLIC
PROC [self:
STREAM] ~ {
handle: Handle = NARROW[self.streamData];
code: CompletionCode;
DO
code ← EntrySendEndOfMessage[handle];
IF code = normal THEN RETURN;
SIGNAL Timeout[];
ENDLOOP;
};
EntrySendEndOfMessage:
ENTRY
PROC [handle: Handle]
RETURNS [CompletionCode] ~ {
ENABLE UNWIND => NULL;
finger: Finger;
startTime: Pulses ← Now[];
DO
IF handle.closed
THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
finger ← handle.outputEnqueue;
SELECT finger.state
FROM
empty => { OutputMakeNotEmpty[handle]; EXIT };
sending, full => {
IF IsTimedOut[handle.putPulseOut, startTime] THEN RETURN [timedOut];
WAIT handle.outputSpace; LOOP };
filling => { WAIT handle.doneFilling; LOOP };
halfFull => EXIT;
ENDCASE => ERROR;
ENDLOOP;
OutputEnqueue[handle~handle, endOfMessage~TRUE];
RETURN [normal];
};
SendAttention:
PUBLIC PROC [self:
STREAM, attentionType: AttentionType] ~ {
handle: Handle = NARROW[self.streamData];
nsb: XNSBuffer;
code: CompletionCode;
DO
[nsb, code] ← EntrySendAttention[handle, attentionType];
IF code = normal THEN EXIT;
IF nsb # NIL THEN ERROR; -- can't happen
SIGNAL Timeout[];
ENDLOOP;
IF nsb # NIL THEN XNSSocket.Put[handle.socket, nsb];
};
EntrySendAttention:
ENTRY
PROC [handle: Handle, attentionType: AttentionType]
RETURNS [nsb: XNSBuffer, code: CompletionCode] ~ {
Note this should only time out if there are too many outstanding output in-band attentions queued.
ENABLE UNWIND => NULL;
finger: Finger;
startTime: Pulses ← Now[];
attnNum: CARDINAL;
DO
IF handle.closed
THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
IF handle.outputIBAttnCnt >= 3
THEN {
-- How many really ????
IF IsTimedOut[handle.putPulseOut, startTime] THEN RETURN [NIL, timedOut];
WAIT handle.outputSpace; LOOP };
finger ← handle.outputEnqueue;
SELECT finger.state
FROM
filling => { WAIT handle.doneFilling; LOOP };
halfFull => { OutputEnqueue[handle~handle, endOfMessage~FALSE]; EXIT };
ENDCASE => EXIT;
ENDLOOP;
Allocate a new buffer for the in-band attention (to determine its sequence number).
WHILE finger.next # handle.outputEnqueue DO finger ← finger.next ENDLOOP;
finger.next ← NEW [FingerObject ← [next~handle.outputEnqueue, attention~TRUE]];
finger ← finger.next;
handle.outputEnqueue ← finger;
handle.outputIBAttnCnt ← handle.outputIBAttnCnt.SUCC;
attnNum ← handle.outputEnqueueNum;
Fill the in-band attention buffer and enqueue it.
OutputMakeNotEmpty[handle];
TRUSTED { finger.buffer.body.bytes[0] ← LOOPHOLE[attentionType] };
finger.bytes ← 1;
OutputEnqueue[handle~handle, endOfMessage~FALSE];
If there's no allocation for the attention, make a copy of it to be send it out-of-band.
IF SignedDiff[handle.recvdAllocNum, attnNum] < 0
THEN {
nsb: XNSBuffer;
TRUSTED {
nsb ← MakeCopyOfXNSBuffer[handle.socket, LOOPHOLE[finger.buffer, XNSBuffer]];
ReadyOutputBuffer[handle~handle, buffer~LOOPHOLE[nsb, SPPBuffer], requestAck~FALSE] };
RETURN [nsb, normal];
};
RETURN [NIL, normal];
};
WaitAttention:
PUBLIC PROC [self:
STREAM, waitTimeout: Milliseconds ← XNSStream.waitForever]
RETURNS [type: AttentionType] ~ {
handle: Handle = NARROW[self.streamData];
code: CompletionCode;
DO
[type, code] ← EntryWaitAttention[handle, MsecToPulses[waitTimeout]];
IF code = normal THEN RETURN;
SIGNAL Timeout[];
ENDLOOP;
};
EntryWaitAttention:
ENTRY
PROC [handle: Handle, pulseOut: Pulses]
RETURNS [AttentionType, CompletionCode] ~ {
ENABLE UNWIND => NULL;
startTime: Pulses ← Now[];
p: OBAttn ← NIL;
DO
IF handle.closed
THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
IF (p ← handle.recvdOBAttnList) #
NIL
THEN {
handle.recvdOBAttnList ← p.next;
RETURN [p.attentionType, normal] };
IF IsTimedOut[pulseOut, startTime] THEN RETURN[0, timedOut];
TRUSTED {
IF pulseOut < Pulses.
LAST
THEN Process.SetTimeout[@handle.recvdOBAttn, TimeoutTicksFromPulses[pulseOut]]
ELSE Process.DisableTimeout[@handle.recvdOBAttn];
};
WAIT handle.recvdOBAttn;
ENDLOOP;
};
GetStatus:
PUBLIC
PROC [self:
STREAM, reset:
BOOL]
RETURNS [state: XNSStream.State, ssType: SubSequenceType, attentionType: AttentionType] ~ {
handle: Handle = NARROW[self.streamData];
[state, ssType, attentionType] ← EntryGetStatus[handle, reset];
};
EntryGetStatus:
ENTRY
PROC [handle: Handle, reset:
BOOL]
RETURNS [state: XNSStream.State, ssType: SubSequenceType, attentionType: AttentionType] ~ {
ENABLE UNWIND => NULL;
finger: Finger;
buffer: SPPBuffer;
DO
IF handle.closed
THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
finger ← handle.inputDequeue;
SELECT finger.state
FROM
full, halfEmpty => {
finger.state ← halfEmpty;
buffer ← finger.buffer;
Note: we test finger.attention before anything else. In effect, this means the sst of an attention packet is ignored.
IF finger.attention
THEN {
state ← attention; ssType ← handle.inputSSType;
TRUSTED { attentionType ← LOOPHOLE[finger.buffer.body.bytes[0]] };
IF reset
THEN {
p: OBAttn ~ handle.recvdOBAttnList;
IF (p #
NIL)
AND (p.seqNum = handle.inputDequeueNum)
THEN
handle.recvdOBAttnList ← p.next;
InputDequeue[handle] };
RETURN };
IF buffer.hdr2.sst # handle.inputSSType
THEN {
IF reset THEN handle.inputSSType ← buffer.hdr2.sst;
RETURN [state~ssTypeChange, ssType~buffer.hdr2.sst, attentionType~0] };
IF finger.index < finger.bytes
THEN
RETURN [state~open, ssType~handle.inputSSType, attentionType~0];
IF buffer.hdr2.connCtl.endOfMsg
THEN {
IF reset THEN InputDequeue[handle];
RETURN [state~endOfMessage, ssType~handle.inputSSType, attentionType~0] };
InputDequeue[handle];
LOOP };
emptying => { WAIT handle.doneEmptying; LOOP };
empty => {
RETURN[state~open, ssType~handle.inputSSType, attentionType~0] };
ENDCASE => ERROR;
ENDLOOP;
};
FlushInput:
PUBLIC
PROC [self:
IO.
STREAM, wait:
BOOL ←
FALSE]
RETURNS [bytesSkipped:
LONG
CARDINAL ← 0] ~ {
handle: Handle = NARROW[self.streamData];
code: CompletionCode;
DO
[bytesSkipped, code] ← EntryFlushInput[handle, wait, bytesSkipped];
IF code = normal THEN RETURN;
SIGNAL Timeout[];
ENDLOOP;
EntryFlushInput:
ENTRY
PROC [handle: Handle, wait:
BOOL ←
FALSE, bytesSkipped:
LONG
CARDINAL]
RETURNS [newSkipped:
LONG
CARDINAL, code: CompletionCode] ~ {
ENABLE UNWIND => NULL;
startTime: Pulses ← Now[];
finger: Finger;
buffer: SPPBuffer;
DO
IF handle.closed
THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
finger ← handle.inputDequeue;
SELECT finger.state
FROM
full, halfEmpty => {
finger.state ← halfEmpty;
IF finger.attention THEN RETURN [bytesSkipped, normal];
buffer ← finger.buffer;
IF buffer.hdr2.sst # handle.inputSSType THEN RETURN [bytesSkipped, normal];
IF finger.index < finger.bytes
THEN {
bytesSkipped ← bytesSkipped + (finger.bytes - finger.index);
finger.index ← finger.bytes };
IF buffer.hdr2.connCtl.endOfMsg THEN RETURN [bytesSkipped, normal];
InputDequeue[handle];
LOOP };
emptying => {
WAIT handle.doneEmptying;
LOOP };
empty => {
IF NOT wait THEN RETURN [bytesSkipped, normal];
IF IsTimedOut[handle.getPulseOut, startTime]
THEN
RETURN [bytesSkipped, timedOut];
WAIT handle.inputReady;
LOOP };
ENDCASE => ERROR;
ENDLOOP;
};
SendNow:
PUBLIC PROC [self:
IO.
STREAM] ~ {
handle: Handle = NARROW[self.streamData];
EntrySendNow[handle];
};
EntrySendNow:
ENTRY
PROC [handle: Handle] ~ {
finger: Finger;
DO
IF handle.closed
THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
finger ← handle.outputEnqueue;
SELECT finger.state
FROM
filling => { WAIT handle.doneFilling; LOOP };
halfFull => { OutputEnqueue[handle~handle, endOfMessage~FALSE]; EXIT };
ENDCASE => EXIT;
ENDLOOP;
};
SendClose:
PUBLIC
PROC [self:
IO.
STREAM]
RETURNS [ok:
BOOL ←
FALSE] ~ {
ENABLE ConnectionClosed => CONTINUE;
handle: Handle = NARROW[self.streamData];
sst: SubSequenceType;
code: CompletionCode;
code ← EntrySetSSType[handle~handle, ssType~XNSSPPTypes.endSST, sendNow~TRUE];
IF code # normal THEN RETURN;
DO
[code~code] ← EntryFlushInput[handle~handle, wait~TRUE, bytesSkipped~0];
IF code # normal THEN RETURN;
[ssType~sst] ← EntryGetStatus[handle~handle, reset~TRUE];
IF (sst = XNSSPPTypes.endReplySST)
OR (sst = XNSSPPTypes.endSST)
THEN {
[] ← EntrySetSSType[handle~handle, ssType~XNSSPPTypes.endReplySST, sendNow~TRUE];
RETURN [TRUE] };
ENDLOOP;
};
SendCloseReply:
PUBLIC
PROC [self:
IO.
STREAM]
RETURNS [ok:
BOOL ←
FALSE] ~ {
ENABLE ConnectionClosed => CONTINUE;
handle: Handle = NARROW[self.streamData];
sst: SubSequenceType;
code: CompletionCode;
code ← EntrySetSSType[handle~handle, ssType~XNSSPPTypes.endReplySST, sendNow~TRUE];
IF code # normal THEN RETURN;
DO
[code~code] ← EntryFlushInput[handle~handle, wait~TRUE, bytesSkipped~0];
IF code # normal THEN RETURN;
[ssType~sst] ← EntryGetStatus[handle~handle, reset~TRUE];
IF (sst = XNSSPPTypes.endReplySST) THEN RETURN [TRUE];
ENDLOOP;
};
Stream Procs
GetChar:
PROC [self:
STREAM]
RETURNS [char:
CHAR] ~ {
handle: Handle ~ NARROW[self.streamData];
code: CompletionCode;
DO
[char, code] ← EntryGetChar[handle, self];
IF code = normal THEN RETURN;
SIGNAL Timeout[];
ENDLOOP;
};
EntryGetChar:
ENTRY
PROC [handle: Handle, self:
STREAM]
RETURNS [char:
CHAR, code: CompletionCode] ~ {
ENABLE UNWIND => NULL;
finger: Finger;
buffer: SPPBuffer;
startTime: Pulses ← Now[];
DO
IF handle.closed
THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
finger ← handle.inputDequeue;
SELECT finger.state
FROM
full, halfEmpty => {
finger.state ← halfEmpty;
buffer ← finger.buffer;
IF finger.attention
OR (buffer.hdr2.sst # handle.inputSSType)
THEN
RETURN WITH ERROR IO.EndOfStream[self];
IF finger.index >= finger.bytes
THEN {
IF buffer.hdr2.connCtl.endOfMsg
THEN
RETURN WITH ERROR IO.EndOfStream[self];
InputDequeue[handle];
LOOP };
TRUSTED { char ← buffer.body.chars[finger.index] };
finger.index ← finger.index + 1;
RETURN [char, normal];
};
emptying => {
WAIT handle.doneEmptying;
LOOP };
empty => {
IF IsTimedOut[handle.getPulseOut, startTime]
THEN
RETURN [char~, code~timedOut];
WAIT handle.inputReady;
LOOP };
ENDCASE => ERROR;
ENDLOOP;
};
GetBlock:
PROC [self:
STREAM, block:
REF
TEXT, startIndex:
NAT, count:
NAT]
RETURNS [nBytesRead:
NAT ← 0] ~ {
handle: Handle ~ NARROW[self.streamData];
finger: Finger ← NIL;
base: LONG POINTER ~ LOOPHOLE[block, LONG POINTER]+SIZE[TEXT[0]]; -- WORD SIZE DEPENDENT ????
stop: NAT ~ MIN[(startIndex+count), block.maxLength];
nBytes: NAT;
code: CompletionCode;
WHILE startIndex < stop
DO
[finger, code] ← GetBlockNext[self, handle, finger];
SELECT code
FROM
timedOut => { SIGNAL Timeout[]; LOOP };
normal => IF finger = NIL THEN EXIT;
ENDCASE => ERROR;
TRUSTED {
nBytes ← PrincOpsUtils.ByteBlt[
to~[
blockPointer~base,
startIndex~startIndex,
stopIndexPlusOne~stop],
from~[
blockPointer~@finger.buffer.body, -- WORD SIZE DEPENDENT ????
startIndex~finger.index,
stopIndexPlusOne~finger.bytes] ];
};
finger.index ← finger.index + nBytes;
nBytesRead ← nBytesRead + nBytes;
startIndex ← startIndex + nBytes;
block.length ← startIndex;
ENDLOOP;
IF finger # NIL THEN GetBlockLast[handle, finger];
};
UnsafeGetBlock:
UNSAFE PROC [self:
STREAM, block:
IO.UnsafeBlock]
RETURNS [nBytesRead:
INT ← 0] ~ {
handle: Handle ~ NARROW[self.streamData];
finger: Finger ← NIL;
startIndex: INT ← block.startIndex;
stop: INT ~ block.startIndex+block.count;
nBytes: NAT;
code: CompletionCode;
WHILE startIndex < stop
DO
[finger, code] ← GetBlockNext[self, handle, finger];
SELECT code
FROM
timedOut => { SIGNAL Timeout[]; LOOP };
normal => IF finger = NIL THEN EXIT;
ENDCASE => ERROR;
TRUSTED {
nBytes ← PrincOpsUtils.ByteBlt[
to~[
blockPointer~block.base,
startIndex~startIndex,
stopIndexPlusOne~stop],
from~[
blockPointer~@finger.buffer.body, -- WORD SIZE DEPENDENT ????
startIndex~finger.index,
stopIndexPlusOne~finger.bytes] ];
};
finger.index ← finger.index + nBytes;
nBytesRead ← nBytesRead + nBytes;
startIndex ← startIndex + nBytes;
ENDLOOP;
IF finger # NIL THEN GetBlockLast[handle, finger];
};
GetBlockNext:
ENTRY
PROC [self:
STREAM, handle: Handle, oldFinger: Finger]
RETURNS [Finger, CompletionCode] ~ {
ENABLE UNWIND => NULL;
finger: Finger ← NIL;
buffer: SPPBuffer;
startTime: Pulses ← Now[];
IF oldFinger #
NIL
THEN {
oldFinger.state ← halfEmpty;
BROADCAST handle.doneEmptying;
};
DO
IF handle.closed
THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
finger ← handle.inputDequeue;
buffer ← finger.buffer;
SELECT finger.state
FROM
full, halfEmpty => {
IF finger.attention
OR (buffer.hdr2.sst # handle.inputSSType)
THEN
RETURN[NIL, normal];
IF finger.index >= finger.bytes
THEN {
IF buffer.hdr2.connCtl.endOfMsg THEN RETURN[NIL, normal];
InputDequeue[handle];
LOOP };
finger.state ← emptying;
RETURN[finger, normal] };
emptying => {
WAIT handle.doneEmptying;
LOOP };
empty => {
IF IsTimedOut[handle.getPulseOut, startTime] THEN RETURN [NIL, timedOut];
WAIT handle.inputReady;
LOOP };
ENDCASE => ERROR;
ENDLOOP;
};
GetBlockLast:
ENTRY
PROC [handle: Handle, oldFinger: Finger] ~ {
IF oldFinger = NIL THEN ERROR; -- can't happen
oldFinger.state ← halfEmpty;
BROADCAST handle.doneEmptying;
};
PutChar:
PROC [self:
STREAM, char:
CHAR] ~ {
handle: Handle ~ NARROW[self.streamData];
code: CompletionCode;
DO
code ← EntryPutChar[handle, char];
IF code = normal THEN RETURN;
SIGNAL Timeout[];
ENDLOOP;
};
EntryPutChar:
ENTRY
PROC [handle: Handle, char:
CHAR]
RETURNS [CompletionCode] ~ {
ENABLE UNWIND => NULL;
finger: Finger;
startTime: Pulses ← Now[];
DO
IF handle.closed
THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
finger ← handle.outputEnqueue;
SELECT finger.state
FROM
empty => OutputMakeNotEmpty[handle];
filling => { WAIT handle.doneFilling; LOOP };
halfFull => NULL;
full, sending => {
IF IsTimedOut[handle.putPulseOut, startTime] THEN RETURN [timedOut];
WAIT handle.outputSpace; LOOP };
ENDCASE => ERROR;
IF finger.bytes =
LAST[BufIndex]
THEN {
OutputEnqueue[handle~handle, endOfMessage~FALSE];
LOOP };
TRUSTED { finger.buffer.body.chars[finger.bytes] ← char };
finger.bytes ← finger.bytes + 1;
RETURN [normal];
ENDLOOP;
};
PutBlock:
PROC [self:
STREAM, block:
REF
READONLY
TEXT, startIndex:
NAT, count:
NAT] ~ {
handle: Handle ~ NARROW[self.streamData];
finger: Finger ← NIL;
base: LONG POINTER ~ LOOPHOLE[block, LONG POINTER]+SIZE[TEXT[0]]; -- WORD SIZE DEPENDENT ????
stop: NAT ~ MIN[(startIndex+count), block.length];
nBytes: NAT;
code: CompletionCode;
WHILE startIndex < stop
DO
[finger, code] ← PutBlockNext[self, handle, finger];
SELECT code
FROM
timedOut => { SIGNAL Timeout[]; LOOP };
normal => NULL;
ENDCASE => ERROR;
IF finger = NIL THEN ERROR; -- can't happen
TRUSTED {
nBytes ← PrincOpsUtils.ByteBlt[
to~[
blockPointer~@finger.buffer.body, -- WORD SIZE DEPENDENT ????
startIndex~finger.bytes,
stopIndexPlusOne~BufIndex.LAST],
from~[
blockPointer~base,
startIndex~startIndex,
stopIndexPlusOne~stop] ];
};
finger.bytes ← finger.bytes + nBytes;
startIndex ← startIndex + nBytes;
ENDLOOP;
IF finger # NIL THEN PutBlockLast[handle, finger];
};
UnsafePutBlock:
PROC [self:
STREAM, block:
IO.UnsafeBlock] ~ {
handle: Handle ~ NARROW[self.streamData];
finger: Finger ← NIL;
startIndex: INT ← block.startIndex;
stop: INT ~ block.startIndex+block.count;
nBytes: NAT;
code: CompletionCode;
WHILE startIndex < stop
DO
[finger, code] ← PutBlockNext[self, handle, finger];
SELECT code
FROM
timedOut => { SIGNAL Timeout[]; LOOP };
normal => NULL;
ENDCASE => ERROR;
IF finger = NIL THEN ERROR; -- can't happen
TRUSTED {
nBytes ← PrincOpsUtils.ByteBlt[
to~[
blockPointer~@finger.buffer.body, -- WORD SIZE DEPENDENT ????
startIndex~finger.bytes,
stopIndexPlusOne~BufIndex.LAST],
from~[
blockPointer~block.base,
startIndex~startIndex,
stopIndexPlusOne~stop] ];
};
finger.bytes ← finger.bytes + nBytes;
startIndex ← startIndex + nBytes;
ENDLOOP;
IF finger # NIL THEN PutBlockLast[handle, finger];
};
PutBlockNext:
ENTRY
PROC [self:
STREAM, handle: Handle, oldFinger: Finger]
RETURNS [Finger, CompletionCode] ~ {
ENABLE UNWIND => NULL;
finger: Finger ← NIL;
startTime: Pulses ← Now[];
IF oldFinger #
NIL
THEN {
oldFinger.state ← halfFull;
BROADCAST handle.doneFilling;
OutputEnqueue[handle~handle, endOfMessage~FALSE] };
DO
IF handle.closed
THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
finger ← handle.outputEnqueue;
SELECT finger.state
FROM
empty => OutputMakeNotEmpty[handle];
filling => { WAIT handle.doneFilling; LOOP };
halfFull => NULL;
full, sending => {
IF IsTimedOut[handle.putPulseOut, startTime] THEN RETURN [NIL, timedOut];
WAIT handle.outputSpace; LOOP };
ENDCASE => ERROR;
IF finger.bytes =
LAST[BufIndex]
THEN {
OutputEnqueue[handle~handle, endOfMessage~FALSE];
LOOP };
finger.state ← filling;
RETURN[finger, normal];
ENDLOOP;
};
PutBlockLast:
ENTRY
PROC [handle: Handle, finger: Finger] ~ {
ENABLE UNWIND => NULL;
finger.state ← halfFull;
BROADCAST handle.doneFilling;
};
EndOf:
PROC [self:
STREAM]
RETURNS [
BOOL] ~ {
handle: Handle ~ NARROW[self.streamData];
RETURN [EntryEndOf[handle]];
};
EntryEndOf:
ENTRY
PROC [handle: Handle]
RETURNS [
BOOL] ~ {
ENABLE UNWIND => NULL;
finger: Finger;
DO
IF handle.closed THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
finger ← handle.inputDequeue;
SELECT finger.state
FROM
full => { finger.state ← halfEmpty; LOOP };
emptying => { WAIT handle.doneEmptying; LOOP };
halfEmpty =>
RETURN [
finger.attention OR (finger.buffer.hdr2.sst # handle.inputSSType)
OR ((finger.index = finger.bytes) AND finger.buffer.hdr2.connCtl.endOfMsg)
];
empty => RETURN [ FALSE ];
ENDCASE => ERROR;
ENDLOOP;
};
CharsAvail:
PROC [self:
STREAM, wait:
BOOL]
RETURNS [nChars:
INT] ~ {
handle: Handle ~ NARROW[self.streamData];
code: CompletionCode;
DO
[nChars, code] ← EntryCharsAvail[handle, wait];
IF code = normal THEN RETURN;
SIGNAL Timeout[];
ENDLOOP;
};
EntryCharsAvail:
ENTRY
PROC [handle: Handle, wait:
BOOL]
RETURNS [
INT, CompletionCode] ~ {
ENABLE UNWIND => NULL;
finger: Finger;
startTime: Pulses ← Now[];
DO
IF handle.closed
THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
finger ← handle.inputDequeue;
SELECT finger.state
FROM
full, halfEmpty => {
finger.state ← halfEmpty;
IF finger.attention
OR (finger.buffer.hdr2.sst # handle.inputSSType)
THEN RETURN[0, normal];
IF finger.index < finger.bytes
THEN RETURN [(finger.bytes - finger.index), normal];
IF finger.buffer.hdr2.connCtl.endOfMsg THEN RETURN[0, normal];
InputDequeue[handle];
LOOP };
emptying => {
WAIT handle.doneEmptying;
LOOP };
empty => {
IF NOT wait THEN RETURN [0, normal];
IF IsTimedOut[handle.getPulseOut, startTime] THEN RETURN [0, timedOut];
WAIT handle.inputReady;
LOOP };
ENDCASE => ERROR; -- Can't happen.
ENDLOOP;
};
Flush:
PROC [self:
STREAM] ~ {
handle: Handle ~ NARROW[self.streamData];
code: CompletionCode;
DO
code ← EntryFlush[handle];
IF code = normal THEN RETURN;
SIGNAL Timeout[];
ENDLOOP;
};
EntryFlush:
ENTRY
PROC [handle: Handle]
RETURNS [CompletionCode] ~ {
ENABLE UNWIND => NULL;
flushNum: CARDINAL;
startTime: Pulses ← Now[];
Get rid of partly filled buffer.
DO
IF handle.closed
THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
SELECT handle.outputEnqueue.state
FROM
filling => { WAIT handle.doneFilling; LOOP };
halfFull => { OutputEnqueue[handle~handle, endOfMessage~FALSE]; EXIT };
empty, full, sending => EXIT;
ENDCASE => ERROR;
ENDLOOP;
Check for already acknowledged.
IF handle.outputDequeueNum = handle.outputEnqueueNum THEN RETURN [normal];
Make sure there's at least one unsent buffer to get its ackMe bit set.
IF handle.outputSendNum = handle.outputEnqueueNum
THEN {
DO
IF handle.closed
THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
SELECT handle.outputEnqueue.state
FROM
empty => { OutputMakeNotEmpty[handle]; EXIT };
filling => { WAIT handle.doneFilling; LOOP };
halfFull => EXIT;
full, sending => {
IF IsTimedOut[handle.putPulseOut, startTime] THEN RETURN [timedOut];
WAIT handle.outputSpace; LOOP };
ENDCASE => ERROR;
ENDLOOP;
OutputEnqueue[handle~handle, endOfMessage~FALSE];
};
Wait until everything is acknowledged.
flushNum ← handle.outputEnqueueNum;
handle.flusherCnt ← handle.flusherCnt + 1;
DO
IF handle.closed
THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
IF SignedDiff[handle.outputDequeueNum, flushNum] >= 0 THEN EXIT;
IF IsTimedOut[handle.putPulseOut, startTime] THEN RETURN [timedOut];
WAIT handle.flusherWakeup;
ENDLOOP;
handle.flusherCnt ← handle.flusherCnt - 1;
RETURN [normal];
};
Close:
PROC [self:
STREAM, abort:
BOOL] = {
handle: Handle ~ NARROW[self.streamData];
EntryNotifyClosed[handle, localClose, "close called"];
FinishHandle[handle];
};
Output (Push) Processes
nSent: INT ← 0;
EntryPush:
ENTRY
PROC [handle: Handle, sentFinger: Finger]
RETURNS [sendFinger: Finger ←
NIL, sendBuffer: XNSBuffer ←
NIL] ~ {
If (sentFinger # NIL), we've just finished sending it. In that case, change its state from sending to full and BROADCAST handle.doneSending.
Then wait for a data buffer to send and return it.
Returned value of NIL indicates the handle is closed.
ENABLE UNWIND => NULL;
IF sentFinger #
NIL
THEN {
sentFinger.state ← full;
BROADCAST handle.doneSending };
DO
Check for closed:
IF handle.closed THEN RETURN;
Check for no buffer ready to send:
IF handle.outputSendNum = handle.outputEnqueueNum
THEN {
WAIT handle.outputReady;
LOOP };
Check for no allocation:
IF SignedDiff[handle.recvdAllocNum, handle.outputSendNum] < 0
THEN {
WAIT handle.recvdNewAlloc;
LOOP };
All the checks succeeded, so we're now committed to sending the buffer:
{ halfMyBuffers, windowLeft, nSentNoAckReq:
INTEGER;
sendAckReq: BOOL;
halfMyBuffers ← INTEGER[handle.outputBuffersAllocated / 2];
windowLeft ← SignedDiff[handle.recvdAllocNum, handle.outputSendNum];
nSentNoAckReq ← SignedDiff[handle.outputSendNum, handle.expectedAckNum];
sendAckReq ← ((windowLeft = halfMyBuffers) OR (windowLeft = 0)
OR (nSentNoAckReq >= halfMyBuffers) OR (handle.flusherCnt > 0));
sendFinger ← handle.outputSend;
ReadyOutputBuffer[handle~handle, buffer~sendFinger.buffer,
requestAck~sendAckReq];
sendFinger.state ← sending;
TRUSTED { sendBuffer ← LOOPHOLE[sendFinger.buffer] };
handle.outputSendNum ← handle.outputSendNum + 1;
handle.outputSend ← sendFinger.next;
IF sendAckReq THEN BROADCAST handle.mgrLongWakeup;
RETURN;
};
ENDLOOP;
};
Push:
PROC [handle: Handle] ~ {
sendBuffer: XNSBuffer ← NIL;
sendFinger: Finger ← NIL;
Process.SetPriority[Process.priorityForeground];
DO
[sendFinger, sendBuffer] ← EntryPush[handle, sendFinger];
IF sendFinger = NIL THEN EXIT;
CheckUserBytes[sendBuffer, XNSSPPBuf.hdrBytes + sendFinger.bytes]; -- DEBUG
XNSSocketBackdoor.PutCached[handle.socket, sendBuffer];
ENDLOOP;
IF sendBuffer # NIL THEN ERROR; -- Can't happen ????
};
Retransmitter (Mgr) Process
nResent: INT ← 0;
MaybeFlushCache:
INTERNAL
PROC [handle: Handle, pulseOut: Pulses] ~ {
IF PulsesSince[handle.cacheFlushedTime] > pulseOut
THEN {
handle.cacheFlushedTime ← Now[];
XNSSocketBackdoor.FlushCache[handle.socket] };
};
EntryMgr:
ENTRY
PROC [handle: Handle, sentFinger: Finger]
RETURNS [sendFinger: Finger ←
NIL, sendBuffer: XNSBuffer ←
NIL] ~ {
ENABLE UNWIND => NULL;
ackReqOutstanding, allocReqOutstanding, timedOutAck, timedOutProbe: BOOL;
b: SPPBuffer ← NIL;
IF sentFinger #
NIL
THEN
{ sentFinger.state ← full; BROADCAST handle.doneSending };
DO
Die if no activity for a long time.
IF PulsesSince[handle.recvdTime] >= handle.noActivityPulseOut
THEN NotifyClosed[handle, unknown, "no activity timeout"];
IF handle.closed THEN RETURN;
When direct input is implemented, this might be the place to call get on handle.socket with a getTimeout of 0 (don't wait). NO! Don't do it holding the ML! ????
Periodically flush the routing info cache associated with the socket.
MaybeFlushCache[handle, handle.cacheFlushPulseOut];
ackReqOutstanding ← (
I have unacknowledged packets -- THIS IS NOT NECESSARY ???? (next condition implies it) ????
(handle.outputDequeueNum # handle.outputSendNum)
AND I am expecting an acknowledgement
AND (SignedDiff[handle.expectedAckNum, handle.outputDequeueNum] > 0));
allocReqOutstanding ← (
I have unsent packets
(handle.outputSendNum # handle.outputEnqueueNum)
AND I have no allocation
AND (SignedDiff[handle.outputSendNum, handle.recvdAllocNum] > 0));
timedOutAck ← (
PulsesSince[handle.sentAckReqTime] > handle.waitForAckPulseOut);
timedOutProbe ← (
(PulsesSince[handle.recvdTime] > handle.probePulseOut)
AND (PulsesSince[handle.sentAckReqTime] > handle.probePulseOut));
SELECT
TRUE
FROM
(ackReqOutstanding
AND timedOutAck) => {
An acknowledgement is overdue. Retransmit the packet.
SELECT handle.outputDequeue.state
FROM
full => NULL;
sending => { WAIT handle.doneSending; LOOP };
ENDCASE => ERROR;
MaybeFlushCache[handle, 3*handle.waitForAckPulseOut]; -- What's the right timeout to use here ????
nResent ← nResent.SUCC;
sendFinger ← handle.outputDequeue;
sendFinger.state ← sending;
b ← sendFinger.buffer;
TRUSTED { sendBuffer ← LOOPHOLE[b] };
ReadyOutputBuffer[handle, b, TRUE];
RETURN };
(timedOutProbe
OR (allocReqOutstanding
AND timedOutAck)) => {
No activity for a long time, or else I want allocation and it's overdue. In either case, send a probe.
[nsb~sendBuffer, b~b] ← AllocateOutputBuffer[handle];
InitSystemOutputBuffer[handle, b];
ReadyOutputBuffer[handle, b, TRUE];
RETURN };
handle.mustSendAck => {
We've been asked to send an acknowledgement immediately. Send it.
[nsb~sendBuffer, b~b] ← AllocateOutputBuffer[handle];
InitSystemOutputBuffer[handle, b];
ReadyOutputBuffer[handle, b, FALSE];
RETURN };
(ackReqOutstanding
OR allocReqOutstanding) => {
There's an outstanding request, so we should time out pretty soon.
WAIT handle.mgrShortWakeup;
LOOP };
ENDCASE => {
We don't expect anything to happen soon.
WAIT handle.mgrLongWakeup;
LOOP };
ENDLOOP;
};
Mgr:
PROC [handle: Handle] ~ {
sendBuffer: XNSBuffer ← NIL;
sendFinger: Finger ← NIL;
Process.SetPriority[Process.priorityForeground];
DO
[sendFinger, sendBuffer] ← EntryMgr[handle, sendFinger];
SELECT
TRUE
FROM
(sendFinger #
NIL) => {
CheckUserBytes[sendBuffer, XNSSPPBuf.hdrBytes + sendFinger.bytes]; -- DEBUG
XNSSocketBackdoor.PutCached[handle.socket, sendBuffer];
};
(sendBuffer #
NIL) => {
CheckUserBytes[sendBuffer, XNSSPPBuf.hdrBytes]; -- DEBUG
XNSSocketBackdoor.PutCached[handle.socket, sendBuffer];
XNSSocket.FreeBuffer[handle.socket, sendBuffer];
};
ENDCASE => EXIT;
ENDLOOP;
};
Input (Pull) Process
nRecvd: INT ← 0;
nTooShort: INT ← 0;
nBadConnID: INT ← 0;
nBadRemoteConnID: INT ← 0;
GotNewAck:
INTERNAL
PROC [handle: Handle, newAckNum:
CARDINAL] ~ {
Release sent packets that have been acknowledged.
NOTE: This messes with the OUTPUT queues from an INPUT process.
finger: Finger;
Sanity check.
IF SignedDiff[newAckNum, handle.outputSendNum] > 0
THEN {
This is a protocol error
RETURN };
WHILE SignedDiff[handle.outputDequeueNum, newAckNum] < 0
DO
IF handle.closed THEN RETURN;
finger ← handle.outputDequeue;
SELECT finger.state
FROM
full => NULL;
sending => {
-- extremely unlikely; occurs only when resending.
WAIT handle.doneSending;
LOOP };
ENDCASE => ERROR;
IF finger.attention
THEN {
Discard the attention finger.
temp: Finger;
nsb: XNSBuffer;
FOR temp ← finger, temp.next WHILE temp.next # finger
DO NULL ENDLOOP;
temp.next ← finger.next;
IF finger.buffer = NIL THEN ERROR; -- Can't happen ????
TRUSTED { nsb ← LOOPHOLE[finger.buffer] };
XNSSocket.FreeBuffer[handle.socket, nsb];
finger.buffer ← NIL;
handle.outputIBAttnCnt ← handle.outputIBAttnCnt.PRED;
}
ELSE {
Advance the finger pointer, make the data buffer available.
finger.state ← empty;
BROADCAST handle.outputSpace;
};
handle.outputDequeueNum ← handle.outputDequeueNum + 1;
handle.outputDequeue ← finger.next;
ENDLOOP;
IF handle.flusherCnt > 0 THEN BROADCAST handle.flusherWakeup;
};
GotAckReq:
INTERNAL
PROC [handle: Handle, seqNum:
CARDINAL, systemPacket:
BOOL] ~
-- INLINE -- {
TODO: Work on "heWantsAlloc" heuristics ???? Do I want to sent an ack immediately if he's used up his allocation? ????
IF systemPacket
OR (SignedDiff[seqNum, handle.inputEnqueueNum] < 0)
THEN {
handle.mustSendAck ← TRUE;
BROADCAST handle.mgrShortWakeup;
BROADCAST handle.mgrLongWakeup }
ELSE {
handle.heWantsAlloc ← TRUE };
};
GotNewAlloc:
INTERNAL
PROC [handle: Handle, newAllocNum:
CARDINAL] ~
-- INLINE -- {
handle.recvdAllocNum ← newAllocNum;
BROADCAST handle.recvdNewAlloc;
};
GotOBAttn:
INTERNAL
PROC [handle: Handle, seqNum:
CARDINAL, type: AttentionType] ~ {
p, prev, new: OBAttn;
seqDiff: INTEGER ~ SignedDiff[seqNum, handle.inputDequeueNum];
IF (seqDiff < 0) OR (seqDiff > 50) THEN RETURN; -- what's to use instead of 50 ????
prev ← NIL; p ← handle.recvdOBAttnList;
DO
IF p = NIL THEN EXIT;
IF p.seqNum = seqNum THEN RETURN;
IF p.seqNum > seqNum THEN EXIT;
prev ← p; p ← p.next;
ENDLOOP;
new ← NEW[OBAttnObject←[next~p, seqNum~seqNum, attentionType~type]];
IF prev = NIL THEN handle.recvdOBAttnList ← new ELSE prev.next ← new;
BROADCAST handle.recvdOBAttn;
};
GotData:
INTERNAL
PROC [handle: Handle, newSeqNum:
CARDINAL, b: SPPBuffer, bytes:
NAT]
RETURNS [swapb: SPPBuffer] ~
-- INLINE -- {
PRECONDITION: NOT b.hdr2.connCtl.system
finger: Finger;
Check for duplicate.
IF SignedDiff[newSeqNum, handle.inputAckNum] < 0
THEN {
RETURN[b] };
Push inputEnqueue so there's room for new packet.
IF SignedDiff[newSeqNum, handle.inputDequeueNum] >=
INT[handle.inputBuffersAllocated]
THEN {
The new buffer won't fit, so drop it.
RETURN[b] };
WHILE (SignedDiff[newSeqNum, handle.inputEnqueueNum] >= 0)
DO
handle.inputEnqueue ← handle.inputEnqueue.next;
handle.inputEnqueueNum ← handle.inputEnqueueNum + 1;
ENDLOOP;
Set finger to the slot where the new packet should go.
finger ← handle.inputAck;
THROUGH [0 .. SignedDiff[newSeqNum, handle.inputAckNum])
DO
finger ← finger.next;
ENDLOOP;
Put the packet there. If it's a duplicate we use the new one, but so what?
swapb ← finger.buffer;
finger.buffer ← b;
finger.bytes ← bytes;
finger.index ← 0;
finger.attention ← b.hdr2.connCtl.attn;
finger.state ← full;
Mark as acknowledged as many packets as possible and pass them to the client.
WHILE (handle.inputAck.state = full)
AND (handle.inputAckNum # handle.inputEnqueueNum)
DO
BROADCAST handle.inputReady;
handle.inputAckNum ← handle.inputAckNum + 1;
handle.inputAck ← handle.inputAck.next;
ENDLOOP;
};
Receive: XNSSocketBackdoor.ReceiveProc
[handle: XNSSocket.Handle, b: XNSBuf.Buffer, clientData: REF ANY] RETURNS [XNSBuf.Buffer]
~ {
nRecvd ← nRecvd.SUCC; -- DEBUG
RETURN [ EntryReceive[NARROW[clientData], b] ] };
EntryReceive:
ENTRY PROC [handle: Handle, nsb: XNSBuf.Buffer]
RETURNS [XNSBuf.Buffer] ~ {
Note this sends error packets itself,
ENABLE UNWIND => NULL;
bytes: NAT;
IF handle.closed THEN RETURN [nsb];
bytes ← XNSSocket.GetUserBytes[nsb];
SELECT nsb.hdr1.type
FROM
error => {
OPEN XNSErrorTypes;
eb: XNSErrorBuf.Buffer;
TRUSTED { eb ← LOOPHOLE[nsb] };
SELECT eb.hdr2.type
FROM
noSocketErr, listenerRejectErr, protocolViolationErr => {
NotifyClosed[handle, remoteClose, "remote close"] };
ENDCASE => {
XNSSocketBackdoor.FlushCache[handle.socket] };
RETURN [nsb]
};
spp => {
b: XNSSPPBuf.Buffer;
newSeqNum: CARDINAL;
newAckNum: CARDINAL;
newAllocNum: CARDINAL;
TRUSTED { b ← LOOPHOLE[nsb] };
IF bytes < XNSSPPBuf.hdrBytes
THEN {
XNSSocket.ReturnError[handle.socket, nsb, XNSErrorTypes.unspecifiedErr];
nsb ← NIL;
nTooShort ← nTooShort.SUCC; -- DEBUG
RETURN [nsb] };
IF b.hdr2.destConnID # handle.connectionID
THEN {
IF b.hdr2.destConnID # unknownConnectionID
THEN {
XNSSocket.ReturnError[handle.socket, nsb, XNSErrorTypes.protocolViolationErr];
nsb ← NIL;
nBadConnID ← nBadConnID.SUCC; -- DEBUG
RETURN [nsb] };
};
IF b.hdr2.sourceConnID # handle.remoteConnectionID
THEN {
XNSSocket.ReturnError[handle.socket, nsb, XNSErrorTypes.protocolViolationErr];
nsb ← NIL;
nBadRemoteConnID ← nBadRemoteConnID.SUCC; -- DEBUG
RETURN [nsb] };
newSeqNum ← Endian.CardFromH[b.hdr2.seqNum];
newAckNum ← Endian.CardFromH[b.hdr2.ackNum];
newAllocNum ← Endian.CardFromH[b.hdr2.allocNum];
handle.recvdTime ← Now[];
IF SignedDiff[newAllocNum, handle.recvdAllocNum] > 0
THEN
GotNewAlloc[handle, newAllocNum];
IF SignedDiff[newAckNum, handle.outputDequeueNum] >= 0
THEN
GotNewAck[handle, newAckNum];
IF b.hdr2.connCtl.sendAck
THEN
GotAckReq[handle, newSeqNum, b.hdr2.connCtl.system];
Handle a data packet.
IF
NOT b.hdr2.connCtl.system
THEN {
IF b.hdr2.connCtl.attn
THEN
TRUSTED {
GotOBAttn[handle, newSeqNum, LOOPHOLE[b.body.bytes[0]] ] };
b ← GotData[handle, newSeqNum, b, (bytes - XNSSPPBuf.hdrBytes)];
TRUSTED { nsb ← LOOPHOLE[b] };
};
RETURN [nsb];
};
ENDCASE => {
XNSSocket.ReturnError[handle.socket, nsb, XNSErrorTypes.invalidPacketTypeErr];
nsb ← NIL;
RETURN [nsb];
};
};
Pull:
PROC [handle: Handle] ~ {
nsb: XNSBuf.Buffer ← NIL;
XNSSocket.SetGetTimeout[handle.socket, XNSSocket.waitForever];
XNSSocketBackdoor.SetDirectReceive[handle.socket, Receive, handle];
WHILE
NOT handle.closed
DO
IF (nsb ← XNSSocket.Get[handle.socket]) = NIL THEN LOOP;
IF (nsb ← Receive[handle.socket, nsb, handle]) = NIL THEN LOOP;
XNSSocket.FreeBuffer[handle.socket, nsb];
nsb ← NIL;
ENDLOOP;
XNSSocketBackdoor.SetDirectReceive[handle.socket, NIL, NIL];
handle ← NIL; -- for finalization
};
Chain of Streams
There are three times when we need to be able to locate all of the streams: 1) Smashing them after a rollback, 2) processing duplicate RFCs, and 3) making sure connectionIDs are unique.
globalLockHandle: Handle ← NEW[Object];
handleChain: Handle ← NIL;
AddNewHandle:
ENTRY
PROC [handle: Handle ← globalLockHandle, newHandle: Handle] ~ {
IF handle # globalLockHandle THEN ERROR;
newHandle.next ← handleChain;
handleChain ← newHandle;
};
RemoveOldHandle:
ENTRY
PROC [handle: Handle ← globalLockHandle, oldHandle: Handle] ~ {
IF handle # globalLockHandle THEN ERROR;
IF handleChain = oldHandle
THEN handleChain ← handleChain.next
ELSE
FOR h: Handle ← handleChain, h.next
DO
IF h.next = oldHandle THEN { h.next ← oldHandle.next; EXIT };
ENDLOOP; -- NIL fault if not on chain
oldHandle.next ← NIL; -- Help Finalization
};
FindExistingHandle:
ENTRY PROC [handle: Handle ← globalLockHandle, remote:
XNS.Address, remoteConnID:
HWORD]
RETURNS [existing: Handle] ~ {
IF handle # globalLockHandle THEN ERROR;
FOR existing ← handleChain, existing.next
WHILE existing #
NIL
DO
IF (existing.remote = remote)
AND (existing.remoteConnectionID = remoteConnID)
THEN EXIT;
ENDLOOP;
};
Rollback: Booting.RollbackProc = {
FOR handle: Handle ← handleChain, handle.next
WHILE handle #
NIL
DO
IF
NOT handle.closed
THEN
EntryNotifyClosed[handle, localClose, "Rollback"];
ENDLOOP;
};
Unique Connection IDs
uniqueConnID: CARDINAL ← Basics.LowHalf[Now[]];
GetUniqueConnID:
ENTRY
PROC [handle: Handle ← globalLockHandle]
RETURNS [
HWORD] ~ {
h: Handle;
IF handle # globalLockHandle THEN ERROR;
DO
uniqueConnID ← uniqueConnID + 1;
FOR h ← handleChain, h.next
WHILE (h # NIL) AND (h.connectionID # uniqueConnID) DO NULL ENDLOOP;
IF h = NIL THEN RETURN [Endian.HFromCard[uniqueConnID]];
ENDLOOP;
};
Chain of Listeners
listenerChain: Listener ← NIL;
AddNewListener:
ENTRY
PROC [handle: Handle ← globalLockHandle, newListener: Listener] ~ {
newListener.next ← listenerChain;
listenerChain ← newListener };
RemoveOldListener:
ENTRY
PROC [handle: Handle ← globalLockHandle, oldListener: Listener] ~ {
IF oldListener = listenerChain
THEN listenerChain ← listenerChain.next
ELSE FOR p: Listener ← listenerChain, p.next
DO
IF p.next = oldListener THEN { p.next ← oldListener.next; EXIT }
ENDLOOP; -- NIL fault if not on chain
oldListener.next ← NIL;
};
Stream Finalization
Making dropped streams get finalized is a bit tricky. In order to get the use count to drop to 0, Push, Pull and Mgr must 1) notice when the stream dies, and 2) NIL out their copy of handle as they return. (The frame doesn't get recycled until after the JOIN.) The current code won't do anything with dropped streams unless they get closed by the other end.
Initialization appears in mainline code.
droppedStreams: INT ← 0;
finalizedStreams:
INT ← 0;
streamFinalizerQueue: SafeStorage.FinalizationQueue ← SafeStorage.NewFQ[];
StreamFinalizer:
PROC ~ {
Process.SetPriority[Process.priorityBackground];
DO
handle: Handle ← NARROW[SafeStorage.FQNext[streamFinalizerQueue]];
IF
NOT handle.finished
THEN {
-- User forgot to call Destroy
IF NOT handle.closed THEN ERROR; -- Can't happen
FinishHandle[handle];
SafeStorage.EnableFinalization[handle];
droppedStreams ← droppedStreams.SUCC; }
ELSE {
-- Normal end of life
RemoveOldHandle[oldHandle~handle];
finalizedStreams ← finalizedStreams.SUCC; };
handle ← NIL;
ENDLOOP;
};
Listener Finalization
Initialization appears in mainline code.
droppedListeners: INT ← 0;
finalizedListeners:
INT ← 0;
listenerFinalizerQueue: SafeStorage.FinalizationQueue ← SafeStorage.NewFQ[];
ListenerFinalizer:
PROC ~ {
Process.SetPriority[Process.priorityBackground];
DO
listener: Listener ← NARROW[SafeStorage.FQNext[listenerFinalizerQueue]];
IF
NOT listener.destroyed
THEN {
-- User forgot to call Destroy
DestroyListener[listener];
SafeStorage.EnableFinalization[listener];
droppedListeners ← droppedListeners.SUCC; }
ELSE {
-- Normal end of life
RemoveOldListener[oldListener~listener];
finalizedListeners ← finalizedListeners.SUCC; };
listener ← NIL;
ENDLOOP;
};
MainLine Code
Initialization for stream finalizer
{ established:
BOOL;
established ← TRUE;
SafeStorage.EstablishFinalization[type~
CODE[Object], npr~1, fq~streamFinalizerQueue
! SafeStorage.CantEstablishFinalization => { established ← FALSE; CONTINUE }];
IF
NOT established
THEN {
established ← TRUE;
SafeStorage.ReEstablishFinalization[type~
CODE[Object], npr~1, fq~streamFinalizerQueue
! SafeStorage.CantEstablishFinalization => { established ← FALSE; CONTINUE }];
};
IF NOT established THEN ERROR };
TRUSTED { Process.Detach[FORK StreamFinalizer[]]; };
Initialization for listener finalizer
{ established:
BOOL;
established ← TRUE;
SafeStorage.EstablishFinalization[type~
CODE[ListenerObject], npr~1, fq~listenerFinalizerQueue
! SafeStorage.CantEstablishFinalization => { established ← FALSE; CONTINUE }];
IF
NOT established
THEN {
established ← TRUE;
SafeStorage.ReEstablishFinalization[type~
CODE[ListenerObject], npr~1, fq~listenerFinalizerQueue
! SafeStorage.CantEstablishFinalization => { established ← FALSE; CONTINUE }];
};
IF NOT established THEN ERROR };
TRUSTED { Process.Detach[FORK ListenerFinalizer[]]; };
Initialization for rollback
Booting.RegisterProcs[r: Rollback];
}.