Buffer Enqueue and Dequeue Utilities
defaultConnCtl: XNSSPPBuf.ConnCtl ~ [
system~FALSE,
sendAck~FALSE,
attn~FALSE,
endOfMsg~FALSE,
filler~0];
defaultSystemConnCtl: XNSSPPBuf.ConnCtl ~ [
system~TRUE,
sendAck~FALSE,
attn~FALSE,
endOfMsg~FALSE,
filler~0];
defaultAttnConnCtl: XNSSPPBuf.ConnCtl ~ [
system~FALSE,
sendAck~FALSE,
attn~TRUE,
endOfMsg~FALSE,
filler~0];
AllocateOutputBuffer:
INTERNAL
PROC [handle: Handle]
RETURNS [nsb: XNSBuffer, b: SPPBuffer] ~ {
Allocate an SPP buffer for handle.
Initialize type and connectionID fields.
nsb ← XNSSocket.AllocBuffer[handle.socket];
TRUSTED { b ← LOOPHOLE[nsb] };
b.hdr1.type ← spp;
b.hdr2.sourceConnID ← handle.connectionID;
b.hdr2.destConnID ← handle.remoteConnectionID;
};
OutputMakeNotEmpty:
INTERNAL
PROC [handle: Handle] ~ {
Make handle.outputEnqueue nonempty.
Make sure the buffer is actually allocated and set its connCtl, sst and seqNum fields.
PRECONDITION: handle.outputEnqueue.state = empty
finger: Finger ~ handle.outputEnqueue;
buffer: SPPBuffer ← finger.buffer;
IF finger.state # empty THEN ERROR; -- Can't happen.
IF buffer = NIL THEN finger.buffer ← buffer ← AllocateOutputBuffer[handle].b;
buffer.hdr2.connCtl ←
IF finger.attention THEN defaultAttnConnCtl ELSE defaultConnCtl;
buffer.hdr2.sst ← handle.outputSSType;
buffer.hdr2.seqNum ← Endian.HFromCard[handle.outputEnqueueNum];
finger.state ← halfFull;
finger.index ← finger.bytes ← 0;
};
InitSystemOutputBuffer:
INTERNAL
PROC [handle: Handle, buffer: SPPBuffer] ~ {
Initialize output buffer for system packet. Performs the same buffer initialization as OutputMakeNotEmpty above, and sets buffer length field.
buffer.hdr2.connCtl ← defaultSystemConnCtl;
buffer.hdr2.sst ← handle.outputSSType;
buffer.hdr2.seqNum ← Endian.HFromCard[handle.outputSendNum];
TRUSTED { XNSSocket.SetUserBytes[LOOPHOLE[buffer], XNSSPPBuf.hdrBytes] };
};
OutputEnqueue:
INTERNAL
PROC [handle: Handle, endOfMessage:
BOOL] ~ {
Enqueue the finger+buffer pointed to by handle.outputEnqueue; set its length field, set its state to full and NOTIFY that an output buffer is ready.
PRECONDITION: handle.outputEnqueue.state = halfFull
finger: Finger ~ handle.outputEnqueue;
IF finger.state # halfFull THEN ERROR; -- Can't happen.
finger.buffer.hdr2.connCtl.endOfMsg ← endOfMessage;
TRUSTED { XNSSocket.SetUserBytes[LOOPHOLE[finger.buffer],
XNSSPPBuf.hdrBytes + finger.bytes] };
finger.state ← full;
IF finger.index # 0 THEN ERROR; -- Can't happen.
handle.outputEnqueue ← handle.outputEnqueue.next;
handle.outputEnqueueNum ← handle.outputEnqueueNum + 1;
NOTIFY handle.outputReady;
};
ReadyOutputBuffer:
INTERNAL
PROC [handle: Handle, buffer: SPPBuffer,
requestAck:
BOOL] ~ {
Fill in time-dependent fields of output buffer before sending it.
Called before handle.outputSend pointer is advanced.
expectedAckNum, allocNum: CARDINAL;
Acknowledgement request ...
IF requestAck
THEN {
expectedAckNum ← Endian.CardFromH[buffer.hdr2.seqNum];
IF (
NOT buffer.hdr2.connCtl.system)
AND (expectedAckNum = handle.outputSendNum)
THEN expectedAckNum ← expectedAckNum + 1;
handle.expectedAckNum ← expectedAckNum;
buffer.hdr2.connCtl.sendAck ← TRUE;
handle.sentAckReqTime ← Now[] };
Acknowledgement ...
buffer.hdr2.ackNum ← Endian.HFromCard[handle.inputAckNum];
handle.mustSendAck ← FALSE;
Allocation ...
allocNum ← handle.inputDequeueNum + handle.inputBuffersAllocated - 1;
buffer.hdr2.allocNum ← Endian.HFromCard[allocNum];
IF allocNum # handle.sentAllocNum
THEN {
handle.heWantsAlloc ← FALSE; -- I THINK THIS IS BOGUS ???? Do we need sentAllocNum at all ????
handle.sentAllocNum ← allocNum };
THESE CAN'T HURT, BUT WHAT ARE THEY DOING HERE ???? I'll find out!
buffer.hdr1.type ← spp;
IF buffer.hdr1.type # spp THEN ERROR;
buffer.hdr2.sourceConnID ← handle.connectionID;
IF buffer.hdr2.sourceConnID # handle.connectionID THEN ERROR;
buffer.hdr2.destConnID ← handle.remoteConnectionID;
IF buffer.hdr2.destConnID # handle.remoteConnectionID THEN ERROR;
};
MakeCopyOfXNSBuffer:
PROC [sH: XNSSocket.Handle, nsb: XNSBuffer]
RETURNS[copy: XNSBuffer] ~ {
bytes: CARDINAL ~ Endian.CardFromH[nsb.hdr1.length];
copy ← XNSSocket.AllocBuffer[sH];
TRUSTED {
[] ← PrincOpsUtils.ByteBlt[
to ~ [
blockPointer ~ @copy.hdr1,
startIndex ~ 0, stopIndexPlusOne ~ bytes],
from ~ [
blockPointer ~ @nsb.hdr1,
startIndex ~ 0, stopIndexPlusOne ~ bytes]
];
};
};
InputDequeue:
INTERNAL
PROC [handle: Handle] ~
-- INLINE -- {
finger: Finger;
sendAllocNow, cantPiggyback: BOOL;
finger ← handle.inputDequeue;
finger.state ← empty;
finger.attention ← FALSE;
handle.inputDequeue ← finger.next;
handle.inputDequeueNum ← handle.inputDequeueNum + 1;
Check whether the other side has asked for allocation and a reasonable number of input buffers have become available. If so, make sure the other side gets the allocation by sending a system packet if necessary:
sendAllocNow ←
He wants some allocation AND
handle.heWantsAlloc AND
I have enough input buffers to give him a reasonable allocation (i.e., at most half my receive buffers are occupied)
(SignedDiff[handle.inputEnqueueNum, handle.inputDequeueNum] <= (handle.inputBuffersAllocated/2));
IF sendAllocNow THEN handle.heWantsAlloc ← FALSE;
cantPiggyback ←
There's no data packet at all OR
(handle.outputSendNum = handle.outputEnqueueNum) OR
I'm waiting for allocation from him
(SignedDiff[handle.recvdAllocNum, handle.outputSendNum] < 0);
IF sendAllocNow
AND cantPiggyback
THEN {
nsb: XNSBuffer;
b: SPPBuffer;
[nsb, b] ← AllocateOutputBuffer[handle];
InitSystemOutputBuffer[handle, b];
ReadyOutputBuffer[handle, b, FALSE];
CheckUserBytes[nsb, XNSSPPBuf.hdrBytes]; -- DEBUG ????
XNSSocketBackdoor.PutCached[nsb];
XNSSocket.FreeBuffer[nsb];
};
};
NotifyClosed:
INTERNAL
PROC [handle: Handle, closeReason: XNSStream.CloseReason, closeText: Rope.
ROPE] ~ {
IF
NOT handle.closed
THEN {
handle.closeReason ← closeReason;
handle.closeText ← closeText;
handle.closed ← TRUE };
BROADCAST handle.inputReady;
BROADCAST handle.doneEmptying; -- ????
BROADCAST handle.outputSpace;
BROADCAST handle.outputReady;
BROADCAST handle.recvdNewAlloc;
BROADCAST handle.doneFilling; -- ????
BROADCAST handle.doneSending; -- ????
BROADCAST handle.mgrShortWakeup;
BROADCAST handle.mgrLongWakeup;
BROADCAST handle.flusherWakeup;
BROADCAST handle.recvdOBAttn;
XNSSocket.Kick[handle.socket];
EntryNotifyClosed:
ENTRY
PROC [handle: Handle, closeReason: XNSStream.CloseReason, closeText: Rope.
ROPE] ~ {
NotifyClosed[handle, closeReason, closeText] };
Creation
MakeBufferRing:
PROC [sH: XNSSocket.Handle, nBuffers:
CARDINAL]
RETURNS [finger: Finger ←
NIL] ~ {
tail: Finger ← NIL;
THROUGH [1..nBuffers]
DO
finger ← NEW[ FingerObject ← [next~finger] ];
IF tail = NIL THEN tail ← finger;
ENDLOOP;
tail.next ← finger;
};
FreeBufferRing:
PROC [finger: Finger] ~ {
WHILE finger #
NIL
DO
IF finger.buffer #
NIL
THEN {
nsb: XNSBuffer;
TRUSTED { nsb ← LOOPHOLE[finger.buffer] };
XNSSocket.FreeBuffer[nsb];
finger.buffer ← NIL };
{ temp: Finger ~ finger.next; finger.next ← NIL; finger ← temp };
ENDLOOP;
};
InitConditions:
PROC [handle: Handle] ~ {
TRUSTED {
Process.EnableAborts[@handle.inputReady];
Process.EnableAborts[@handle.doneEmptying]; -- ????
Process.EnableAborts[@handle.outputSpace];
Process.EnableAborts[@handle.outputReady];
Process.EnableAborts[@handle.recvdNewAlloc];
Process.EnableAborts[@handle.doneFilling]; -- ????
Process.EnableAborts[@handle.doneSending]; -- ????
Process.EnableAborts[@handle.mgrShortWakeup]; -- ????
Process.EnableAborts[@handle.mgrLongWakeup]; -- ????
Process.EnableAborts[@handle.flusherWakeup];
Process.EnableAborts[@handle.recvdOBAttn];
};
};
InitTimeouts:
PROC [handle: Handle, getTimeout, putTimeout: Milliseconds] ~ {
handle.getTimeout ← getTimeout;
handle.getPulseOut ← MsecToPulses[getTimeout];
IF getTimeout = waitForever
THEN
TRUSTED {
Process.DisableTimeout[@handle.inputReady] }
ELSE
TRUSTED {
Process.SetTimeout[@handle.inputReady, TimeoutTicksFromPulses[handle.getPulseOut]] };
handle.putTimeout ← putTimeout;
handle.putPulseOut ← MsecToPulses[putTimeout];
IF putTimeout = waitForever
THEN
TRUSTED {
Process.DisableTimeout[@handle.outputSpace] }
ELSE
TRUSTED {
Process.SetTimeout[@handle.outputSpace, TimeoutTicksFromPulses[handle.putPulseOut]] };
handle.cacheFlushPulseOut ← 30 * oneSecondOfPulses;
handle.roundTripPulses ← oneSecondOfPulses;
UpdateTimeouts[handle];
};
UpdateTimeouts:
PROC [handle: Handle] ~ {
TODO: None of this stuff is very good. I don't know any good retransmission heuristics, and I guess I've got to learn some.
handle.waitForAckPulseOut ← 3 * handle.roundTripPulses;
handle.waitForAllocPulseOut ← handle.waitForAckPulseOut;
TRUSTED {
Process.SetTimeout[@handle.mgrShortWakeup, TimeoutTicksFromPulses[handle.waitForAckPulseOut]] };
handle.probePulseOut ← 30 * handle.roundTripPulses;
TRUSTED {
Process.SetTimeout[@handle.mgrLongWakeup, TimeoutTicksFromPulses[handle.probePulseOut]] };
handle.noActivityPulseOut ← 100 * handle.roundTripPulses;
};
MakeHandle:
PROC [socket: XNSSocket.Handle, connectionID, remoteConnectionID:
HWORD, sendBuffers, recvBuffers:
CARDINAL, getTimeout, putTimeout: Milliseconds]
RETURNS [handle: Handle] ~ {
handle ← NEW[Object];
handle.socket ← socket;
handle.remote ← XNSSocket.GetRemoteAddress[socket];
handle.recvdTime ← Now[];
handle.cacheFlushedTime ← Now[];
handle.sentAckReqTime ← Now[];
handle.connectionID ← connectionID;
handle.remoteConnectionID ← remoteConnectionID;
InitConditions[handle];
InitTimeouts[handle, getTimeout, putTimeout];
{ f: Finger ~ MakeBufferRing[handle.socket, sendBuffers];
handle.outputEnqueue ← handle.outputSend ← handle.outputDequeue ← f;
handle.outputBuffersAllocated ← sendBuffers };
{ f: Finger ~ MakeBufferRing[handle.socket, recvBuffers];
handle.inputEnqueue ← handle.inputAck ← handle.inputDequeue ← f;
handle.inputBuffersAllocated ← recvBuffers };
handle.pull ← FORK Pull[handle];
handle.mgr ← FORK Mgr[handle];
handle.push1 ← FORK Push[handle];
handle.push2 ← FORK Push[handle];
AddNewHandle[newHandle~handle];
SafeStorage.EnableFinalization[handle];
};
FinishHandle:
PROC [handle: Handle] ~ {
IF NOT handle.closed THEN ERROR;
IF handle.finished THEN RETURN;
TRUSTED {
JOIN handle.push1; handle.push1 ← NIL;
JOIN handle.push2; handle.push2 ← NIL;
JOIN handle.mgr; handle.mgr ← NIL;
JOIN handle.pull; handle.pull ← NIL };
FreeBufferRing[handle.outputEnqueue];
handle.outputEnqueue ← handle.outputSend ← handle.outputDequeue ← NIL;
FreeBufferRing[handle.inputEnqueue];
handle.inputEnqueue ← handle.inputAck ← handle.inputDequeue ← NIL;
XNSSocket.Destroy[handle.socket];
handle.finished ← TRUE };
SendRFC:
PROC [socket: XNSSocket.Handle, connectionID:
HWORD] ~ {
nsb: XNSBuffer;
b: SPPBuffer;
nsb ← XNSSocket.AllocBuffer[socket];
TRUSTED { b ← LOOPHOLE[nsb] };
b.hdr1.type ← spp;
b.hdr2.connCtl ← defaultSystemConnCtl;
b.hdr2.connCtl.sendAck ← TRUE;
b.hdr2.sst ← 0;
b.hdr2.sourceConnID ← connectionID;
b.hdr2.destConnID ← unknownConnectionID;
b.hdr2.seqNum ← Endian.HFromCard[0];
b.hdr2.ackNum ← Endian.HFromCard[0];
b.hdr2.allocNum ← Endian.HFromCard[0];
XNSSocket.SetUserBytes[nsb, XNSSPPBuf.hdrBytes];
XNSSocket.Put[b~nsb];
};
defaultInitialRetransmissionTime: Milliseconds ← 500;
Create:
PUBLIC
PROC [remote:
XNS.Address, getTimeout, putTimeout: Milliseconds ← waitForever]
RETURNS [
STREAM] ~ {
nsb: XNSBuffer ← NIL;
socket: XNSSocket.Handle ← NIL;
handle: Handle ← NIL;
retransmissionTime: Milliseconds;
connID: HWORD;
hops: XNSRouter.Hops;
getTimeout ← MIN[getTimeout, waitForever];
putTimeout ← MIN[putTimeout, waitForever];
IF (hops ← XNSRouter.GetHops[remote.net]) = XNSRouter.unreachable
THEN ERROR ConnectionClosed[noRoute, "no route"];
This should depend on hops:
retransmissionTime ← defaultInitialRetransmissionTime;
BEGIN
ENABLE
UNWIND => {
IF nsb # NIL THEN { XNSSocket.FreeBuffer[nsb]; nsb ← NIL };
IF socket # NIL THEN { XNSSocket.Destroy[socket]; socket ← NIL } };
socket ← XNSSocket.Create[remote~remote, sendBuffers~10, recvBuffers~14]; -- buffers ????
connID ← GetUniqueConnID[];
FOR nTries:
CARDINAL
IN [1..10]
DO
IF nTries > 5 THEN retransmissionTime ← 2*retransmissionTime;
XNSSocket.SetGetTimeout[socket, retransmissionTime];
SendRFC[socket, connID];
nsb ← XNSSocket.Get[socket];
IF nsb = NIL THEN LOOP;
SELECT nsb.hdr1.type
FROM
spp => {
b: SPPBuffer;
TRUSTED { b ← LOOPHOLE[nsb] };
IF b.hdr2.destConnID # connID
THEN {
XNSSocket.ReturnError[nsb, XNSErrorTypes.protocolViolationErr];
nsb ← NIL;
LOOP };
IF b.hdr1.source.host # remote.host
THEN {
XNSSocket.ReturnError[nsb, XNSErrorTypes.protocolViolationErr];
nsb ← NIL;
LOOP };
XNSSocket.SetRemoteAddress[socket, b.hdr1.source];
handle ← MakeHandle[socket~socket, connectionID~connID, remoteConnectionID~b.hdr2.sourceConnID, sendBuffers~6, recvBuffers~6, getTimeout~getTimeout, putTimeout~putTimeout];
nsb ← Receive[socket, nsb, handle];
IF nsb # NIL THEN { XNSSocket.FreeBuffer[nsb]; nsb ← NIL };
RETURN [IO.CreateStream[streamProcs~streamProcs, streamData~handle]];
};
error => {
OPEN XNSErrorTypes;
eb: XNSErrorBuf.Buffer;
TRUSTED { eb ← LOOPHOLE[nsb] };
SELECT eb.hdr2.type
FROM
badChecksumErr, resourceLimitsErr => LOOP;
noSocketErr, listenerRejectErr, invalidPacketTypeErr, protocolViolationErr =>
ERROR ConnectionClosed[remoteReject, "rejected"];
cantGetThereErr => ERROR ConnectionClosed[noRoute, "no route"];
ENDCASE => ERROR ConnectionClosed[unknown, "error reply to RFC"];
};
ENDCASE => {
XNSSocket.ReturnError[nsb, XNSErrorTypes.invalidPacketTypeErr];
nsb ← NIL;
LOOP };
ENDLOOP;
ERROR ConnectionClosed[noResponse, "no response"];
END;
};
Listener: TYPE ~ REF ListenerObject;
ListenerObject:
PUBLIC
TYPE ~
RECORD [
next: Listener,
destroyed: BOOL ← FALSE,
socketHandle: XNSSocket.Handle,
listenProcess: PROCESS ];
CreateListener:
PUBLIC PROC [
socket: XNS.Socket,
worker: XNSStream.ListenerProc,
getTimeout, putTimeout: Milliseconds ← waitForever,
filter: XNSStream.FilterProc ← NIL, -- NIL => Accept all requests
echoFilter: XNSStream.FilterProc ← NIL] -- NIL => Answer all echos
RETURNS [listener: Listener]
~ {
listener ← NEW[ ListenerObject ];
listener.socketHandle ← XNSSocket.Create[getTimeout~XNSSocket.waitForever, local~socket];
listener.listenProcess ← FORK Listen[socket, getTimeout, putTimeout, worker, filter, echoFilter, NIL, NIL, NIL, NIL];
AddNewListener[newListener~listener];
SafeStorage.EnableFinalization[listener];
};
CreateXListener:
PUBLIC PROC [
socket: XNS.Socket,
worker: XNSStreamExtras.XListenerProc,
getTimeout, putTimeout: Milliseconds ← waitForever,
clientData: REF ← NIL,
filter: XNSStreamExtras.XFilterProc ← NIL, -- NIL => Accept all requests
echoFilter: XNSStreamExtras.XFilterProc ← NIL] -- NIL => Answer all echos
RETURNS [listener: Listener]
~ {
listener ← NEW[ ListenerObject ];
listener.socketHandle ← XNSSocket.Create[getTimeout~XNSSocket.waitForever, local~socket];
listener.listenProcess ← FORK Listen[socket, getTimeout, putTimeout, NIL, NIL, NIL, worker, filter, echoFilter, clientData];
AddNewListener[newListener~listener];
SafeStorage.EnableFinalization[listener];
};
GetLocalFromListener:
PUBLIC
PROC [listener: Listener]
RETURNS [local:
XNS.Address] ~ {
socketHandle: XNSSocket.Handle ~ listener.socketHandle;
local ←
IF socketHandle #
NIL
THEN XNSSocket.GetLocalAddress[socketHandle]
ELSE XNS.unknownAddress;
};
DestroyListener:
PUBLIC
PROC [listener: Listener] ~ {
IF listener.destroyed THEN RETURN;
listener.destroyed ← TRUE;
IF listener.socketHandle = NIL THEN RETURN; -- CreateListener was ABORTed
XNSSocket.Destroy[listener.socketHandle];
listener.socketHandle ← NIL;
TRUSTED {
Process.Abort[listener.listenProcess];
JOIN listener.listenProcess };
};
Listen:
PROC [
socket:
XNS.Socket,
getTimeout, putTimeout: Milliseconds,
worker: XNSStream.ListenerProc,
filter: XNSStream.FilterProc,
echoFilter: XNSStream.FilterProc,
xWorker: XNSStreamExtras.XListenerProc,
xFilter: XNSStreamExtras.XFilterProc,
xEchoFilter: XNSStreamExtras.XFilterProc,
clientData:
REF]
~ {
socketHandle: XNSSocket.Handle ←
XNSSocket.Create[getTimeout~XNSSocket.waitForever, local~socket];
nsb: XNSBuffer ← NIL;
bytes: NAT;
{
ENABLE
UNWIND => {
IF nsb #
NIL
THEN
{ XNSSocket.FreeBuffer[nsb]; nsb ← NIL };
};
DO
IF nsb #
NIL
THEN
{ XNSSocket.FreeBuffer[nsb]; nsb ← NIL };
IF (nsb ← XNSSocket.Get[socketHandle]) = NIL THEN LOOP;
bytes ← XNSSocket.GetUserBytes[nsb];
SELECT nsb.hdr1.type
FROM
echo => {
b: XNSEchoBuf.Buffer;
IF bytes < XNSEchoBuf.hdrBytes THEN LOOP;
TRUSTED { b ← LOOPHOLE[nsb] };
IF b.hdr2.type # request THEN LOOP;
IF (echoFilter # NIL) AND NOT echoFilter[nsb.hdr1.source]
THEN LOOP;
IF (xEchoFilter # NIL) AND NOT xEchoFilter[nsb.hdr1.source, clientData]
THEN LOOP;
b.hdr2.type ← reply;
{ XNSSocket.ReturnToSender[nsb]; nsb ← NIL };
};
spp => {
b: SPPBuffer;
streamSocketHandle: XNSSocket.Handle;
stream: STREAM;
handle: Handle;
IF bytes < XNSSPPBuf.hdrBytes THEN LOOP;
TRUSTED { b ← LOOPHOLE[nsb] };
IF (b.hdr2.seqNum # 0)
OR (b.hdr2.destConnID # unknownConnectionID)
OR (b.hdr2.sourceConnID = unknownConnectionID)
THEN {
XNSSocket.ReturnError[nsb, XNSErrorTypes.protocolViolationErr];
nsb ← NIL;
LOOP };
IF (handle ← FindExistingHandle[remote~nsb.hdr1.source, remoteConnID~b.hdr2.sourceConnID]) #
NIL
THEN {
ReceiveRFC[handle, nsb];
LOOP };
IF (filter #
NIL)
AND
NOT filter[nsb.hdr1.source]
THEN {
XNSSocket.ReturnError[nsb, XNSErrorTypes.listenerRejectErr];
nsb ← NIL;
LOOP };
IF (xFilter #
NIL)
AND
NOT xFilter[nsb.hdr1.source, clientData]
THEN {
XNSSocket.ReturnError[nsb, XNSErrorTypes.listenerRejectErr];
nsb ← NIL;
LOOP };
streamSocketHandle ← XNSSocket.Create[remote~nsb.hdr1.source, sendBuffers~10, recvBuffers~14]; -- how many buffers really? what should the timeouts be? ????
handle ← MakeHandle[socket~streamSocketHandle, connectionID~GetUniqueConnID[], remoteConnectionID~b.hdr2.sourceConnID, sendBuffers~6, recvBuffers~6, getTimeout~getTimeout, putTimeout~putTimeout]; -- what should buffers be really? ????
ReceiveRFC[handle, nsb];
stream ← IO.CreateStream[streamProcs~streamProcs, streamData~handle];
IF worker #
NIL
THEN
TRUSTED {
Process.Detach[FORK worker[stream~stream, remote~b.hdr1.source]] };
IF xWorker #
NIL
THEN
TRUSTED {
Process.Detach[FORK xWorker[stream~stream, remote~b.hdr1.source, clientData~clientData]] };
};
ENDCASE;
ENDLOOP;
};
};
ReceiveRFC:
PROC [handle: Handle, nsb: XNSBuffer] ~ {
The buffer has to be copied so buffer accounting will work out right. It's only an RFC (short) so the overhead isn't something we need to worry about.
newNsb: XNSBuffer;
newNsb ← MakeCopyOfXNSBuffer[handle.socket, nsb];
TRUSTED { (LOOPHOLE[newNsb, SPPBuffer]).hdr2.connCtl.sendAck ← TRUE }; -- force a reply
newNsb ← EntryReceive[handle, newNsb];
IF newNsb # NIL THEN XNSSocket.FreeBuffer[newNsb];
};
Byte Stream Interface
GetTimeouts:
PUBLIC
PROC [self:
IO.
STREAM]
RETURNS [getTimeout, putTimeout: Milliseconds] ~ {
handle: Handle = NARROW[self.streamData];
[getTimeout, putTimeout] ← EntryGetTimeouts[handle] };
EntryGetTimeouts:
ENTRY
PROC [handle: Handle]
RETURNS [getTimeout, putTimeout: Milliseconds] ~ {
IF handle.closed
THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
getTimeout ← handle.getTimeout;
putTimeout ← handle.putTimeout;
};
SetTimeouts:
PUBLIC PROC [self:
IO.
STREAM,
getTimeout, putTimeout: Milliseconds ← waitForever] ~ {
handle: Handle = NARROW[self.streamData];
EntrySetTimeouts[handle, getTimeout, putTimeout];
};
EntrySetTimeouts:
ENTRY
PROC [handle: Handle,
getTimeout, putTimeout: Milliseconds] ~ {
IF handle.closed
THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
InitTimeouts[handle, getTimeout, putTimeout];
};
SetSSType:
PUBLIC
PROC [self:
STREAM, ssType: SubSequenceType] ~ {
handle: Handle = NARROW[self.streamData];
code: CompletionCode;
DO
code ← EntrySetSSType[handle, ssType];
IF code = normal THEN RETURN;
SIGNAL Timeout[];
ENDLOOP;
};
EntrySetSSType:
ENTRY
PROC [handle: Handle, ssType: SubSequenceType,
sendNow:
BOOL ←
FALSE]
RETURNS [CompletionCode ← normal] ~ {
Note: After changing ssType it's necessary to have a halfFull buffer as handle.inputEnqueue^; this ensures that the sequence
SetSSType[h, t1]; SetSSType[h, t2];
will work correctly.
ENABLE UNWIND => NULL;
finger: Finger;
startTime: Pulses ← Now[];
IF ssType = handle.outputSSType THEN RETURN;
DO
IF handle.closed
THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
finger ← handle.outputEnqueue;
SELECT finger.state
FROM
empty => EXIT;
sending, full => {
IF IsTimedOut[handle.putPulseOut, startTime] THEN RETURN [timedOut];
WAIT handle.outputSpace; LOOP };
filling => { WAIT handle.doneFilling; LOOP };
halfFull => { OutputEnqueue[handle~handle, endOfMessage~FALSE]; LOOP };
ENDCASE => ERROR;
ENDLOOP;
handle.outputSSType ← ssType;
OutputMakeNotEmpty[handle]; -- gets new handle.outputSSType
IF sendNow THEN OutputEnqueue[handle~handle, endOfMessage~FALSE];
};
SendEndOfMessage:
PUBLIC
PROC [self:
STREAM] ~ {
handle: Handle = NARROW[self.streamData];
code: CompletionCode;
DO
code ← EntrySendEndOfMessage[handle];
IF code = normal THEN RETURN;
SIGNAL Timeout[];
ENDLOOP;
};
EntrySendEndOfMessage:
ENTRY
PROC [handle: Handle]
RETURNS [CompletionCode] ~ {
ENABLE UNWIND => NULL;
finger: Finger;
startTime: Pulses ← Now[];
DO
IF handle.closed
THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
finger ← handle.outputEnqueue;
SELECT finger.state
FROM
empty => { OutputMakeNotEmpty[handle]; EXIT };
sending, full => {
IF IsTimedOut[handle.putPulseOut, startTime] THEN RETURN [timedOut];
WAIT handle.outputSpace; LOOP };
filling => { WAIT handle.doneFilling; LOOP };
halfFull => EXIT;
ENDCASE => ERROR;
ENDLOOP;
OutputEnqueue[handle~handle, endOfMessage~TRUE];
RETURN [normal];
};
SendAttention:
PUBLIC PROC [self:
STREAM, attentionType: AttentionType] ~ {
handle: Handle = NARROW[self.streamData];
nsb: XNSBuffer;
code: CompletionCode;
DO
[nsb, code] ← EntrySendAttention[handle, attentionType];
IF code = normal THEN EXIT;
IF nsb # NIL THEN ERROR; -- can't happen
SIGNAL Timeout[];
ENDLOOP;
IF nsb # NIL THEN XNSSocket.Put[nsb];
};
EntrySendAttention:
ENTRY
PROC [handle: Handle, attentionType: AttentionType]
RETURNS [nsb: XNSBuffer, code: CompletionCode] ~ {
Note this should only time out if there are too many outstanding output in-band attentions queued.
ENABLE UNWIND => NULL;
finger: Finger;
startTime: Pulses ← Now[];
attnNum: CARDINAL;
DO
IF handle.closed
THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
IF handle.outputIBAttnCnt >= 3
THEN {
-- How many really ????
IF IsTimedOut[handle.putPulseOut, startTime] THEN RETURN [NIL, timedOut];
WAIT handle.outputSpace; LOOP };
finger ← handle.outputEnqueue;
SELECT finger.state
FROM
filling => { WAIT handle.doneFilling; LOOP };
halfFull => { OutputEnqueue[handle~handle, endOfMessage~FALSE]; EXIT };
ENDCASE => EXIT;
ENDLOOP;
Allocate a new finger and buffer for the in-band attention (Note we need to do this before sending it out-of-band, to determine its sequence number).
WHILE finger.next # handle.outputEnqueue DO finger ← finger.next ENDLOOP;
finger.next ← NEW [FingerObject ← [next~handle.outputEnqueue, attention~TRUE]];
handle.outputEnqueue ← finger ← finger.next;
IF handle.outputSendNum = handle.outputEnqueueNum
THEN
handle.outputSend ← handle.outputEnqueue;
IF handle.outputDequeueNum = handle.outputEnqueueNum
THEN
handle.outputDequeue ← handle.outputEnqueue;
handle.outputIBAttnCnt ← handle.outputIBAttnCnt.SUCC;
attnNum ← handle.outputEnqueueNum;
Fill the in-band attention buffer and enqueue it.
OutputMakeNotEmpty[handle];
TRUSTED { finger.buffer.body.bytes[0] ← LOOPHOLE[attentionType] };
finger.bytes ← 1;
OutputEnqueue[handle~handle, endOfMessage~FALSE];
If there's no allocation for the attention, make a copy of it to be send it out-of-band.
IF SignedDiff[handle.recvdAllocNum, attnNum] < 0
THEN {
nsb: XNSBuffer;
TRUSTED {
nsb ← MakeCopyOfXNSBuffer[handle.socket, LOOPHOLE[finger.buffer, XNSBuffer]];
ReadyOutputBuffer[handle~handle, buffer~LOOPHOLE[nsb, SPPBuffer], requestAck~FALSE] };
RETURN [nsb, normal];
};
RETURN [NIL, normal];
};
WaitAttention:
PUBLIC PROC [self:
STREAM, waitTimeout: Milliseconds ← XNSStream.waitForever]
RETURNS [type: AttentionType] ~ {
handle: Handle = NARROW[self.streamData];
code: CompletionCode;
DO
[type, code] ← EntryWaitAttention[handle, MsecToPulses[waitTimeout]];
IF code = normal THEN RETURN;
SIGNAL Timeout[];
ENDLOOP;
};
EntryWaitAttention:
ENTRY
PROC [handle: Handle, pulseOut: Pulses]
RETURNS [AttentionType, CompletionCode] ~ {
ENABLE UNWIND => NULL;
startTime: Pulses ← Now[];
p: OBAttn ← NIL;
DO
IF handle.closed
THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
IF (p ← handle.recvdOBAttnList) #
NIL
THEN {
handle.recvdOBAttnList ← p.next;
RETURN [p.attentionType, normal] };
IF IsTimedOut[pulseOut, startTime] THEN RETURN[0, timedOut];
TRUSTED {
IF pulseOut < Pulses.
LAST
THEN Process.SetTimeout[@handle.recvdOBAttn, TimeoutTicksFromPulses[pulseOut]]
ELSE Process.DisableTimeout[@handle.recvdOBAttn];
};
WAIT handle.recvdOBAttn;
ENDLOOP;
};
GetStatus:
PUBLIC
PROC [self:
STREAM, reset:
BOOL]
RETURNS [state: XNSStream.State, ssType: SubSequenceType, attentionType: AttentionType] ~ {
handle: Handle = NARROW[self.streamData];
[state, ssType, attentionType] ← EntryGetStatus[handle, reset];
};
EntryGetStatus:
ENTRY
PROC [handle: Handle, reset:
BOOL]
RETURNS [state: XNSStream.State, ssType: SubSequenceType, attentionType: AttentionType] ~ {
ENABLE UNWIND => NULL;
finger: Finger;
buffer: SPPBuffer;
DO
IF handle.closed
THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
finger ← handle.inputDequeue;
SELECT finger.state
FROM
full, halfEmpty => {
finger.state ← halfEmpty;
buffer ← finger.buffer;
Note: we test finger.attention before anything else. In effect, this means the sst of an attention packet is ignored.
IF finger.attention
THEN {
state ← attention; ssType ← handle.inputSSType;
TRUSTED { attentionType ← LOOPHOLE[finger.buffer.body.bytes[0]] };
IF reset
THEN {
p: OBAttn ~ handle.recvdOBAttnList;
IF (p #
NIL)
AND (p.seqNum = handle.inputDequeueNum)
THEN
handle.recvdOBAttnList ← p.next;
InputDequeue[handle] };
RETURN };
IF buffer.hdr2.sst # handle.inputSSType
THEN {
IF reset THEN handle.inputSSType ← buffer.hdr2.sst;
RETURN [state~ssTypeChange, ssType~buffer.hdr2.sst, attentionType~0] };
IF finger.index < finger.bytes
THEN
RETURN [state~open, ssType~handle.inputSSType, attentionType~0];
IF buffer.hdr2.connCtl.endOfMsg
THEN {
IF reset THEN InputDequeue[handle];
RETURN [state~endOfMessage, ssType~handle.inputSSType, attentionType~0] };
InputDequeue[handle];
LOOP };
emptying => { WAIT handle.doneEmptying; LOOP };
empty => {
RETURN[state~open, ssType~handle.inputSSType, attentionType~0] };
ENDCASE => ERROR;
ENDLOOP;
};
FlushInput:
PUBLIC
PROC [self:
IO.
STREAM, wait:
BOOL ←
FALSE]
RETURNS [bytesSkipped:
LONG
CARDINAL ← 0] ~ {
handle: Handle = NARROW[self.streamData];
code: CompletionCode;
DO
[bytesSkipped, code] ← EntryFlushInput[handle, wait, bytesSkipped];
IF code = normal THEN RETURN;
SIGNAL Timeout[];
ENDLOOP;
EntryFlushInput:
ENTRY
PROC [handle: Handle, wait:
BOOL ←
FALSE, bytesSkipped:
LONG
CARDINAL]
RETURNS [newSkipped:
LONG
CARDINAL, code: CompletionCode] ~ {
ENABLE UNWIND => NULL;
startTime: Pulses ← Now[];
finger: Finger;
buffer: SPPBuffer;
DO
IF handle.closed
THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
finger ← handle.inputDequeue;
SELECT finger.state
FROM
full, halfEmpty => {
finger.state ← halfEmpty;
IF finger.attention THEN RETURN [bytesSkipped, normal];
buffer ← finger.buffer;
IF buffer.hdr2.sst # handle.inputSSType THEN RETURN [bytesSkipped, normal];
IF finger.index < finger.bytes
THEN {
bytesSkipped ← bytesSkipped + (finger.bytes - finger.index);
finger.index ← finger.bytes };
IF buffer.hdr2.connCtl.endOfMsg THEN RETURN [bytesSkipped, normal];
InputDequeue[handle];
LOOP };
emptying => {
WAIT handle.doneEmptying;
LOOP };
empty => {
IF NOT wait THEN RETURN [bytesSkipped, normal];
IF IsTimedOut[handle.getPulseOut, startTime]
THEN
RETURN [bytesSkipped, timedOut];
WAIT handle.inputReady;
LOOP };
ENDCASE => ERROR;
ENDLOOP;
};
SendNow:
PUBLIC PROC [self:
IO.
STREAM] ~ {
handle: Handle = NARROW[self.streamData];
EntrySendNow[handle];
};
EntrySendNow:
ENTRY
PROC [handle: Handle] ~ {
finger: Finger;
DO
IF handle.closed
THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
finger ← handle.outputEnqueue;
SELECT finger.state
FROM
filling => { WAIT handle.doneFilling; LOOP };
halfFull => { OutputEnqueue[handle~handle, endOfMessage~FALSE]; EXIT };
ENDCASE => EXIT;
ENDLOOP;
};
SendClose:
PUBLIC
PROC [self:
IO.
STREAM]
RETURNS [ok:
BOOL ←
FALSE] ~ {
ENABLE ConnectionClosed => CONTINUE;
handle: Handle = NARROW[self.streamData];
sst: SubSequenceType;
code: CompletionCode;
code ← EntrySetSSType[handle~handle, ssType~XNSSPPTypes.endSST, sendNow~TRUE];
IF code # normal THEN RETURN;
DO
[code~code] ← EntryFlushInput[handle~handle, wait~TRUE, bytesSkipped~0];
IF code # normal THEN RETURN;
[ssType~sst] ← EntryGetStatus[handle~handle, reset~TRUE];
IF (sst = XNSSPPTypes.endReplySST)
OR (sst = XNSSPPTypes.endSST)
THEN {
[] ← EntrySetSSType[handle~handle, ssType~XNSSPPTypes.endReplySST, sendNow~TRUE];
RETURN [TRUE] };
ENDLOOP;
};
SendCloseReply:
PUBLIC
PROC [self:
IO.
STREAM]
RETURNS [ok:
BOOL ←
FALSE] ~ {
ENABLE ConnectionClosed => CONTINUE;
handle: Handle = NARROW[self.streamData];
sst: SubSequenceType;
code: CompletionCode;
code ← EntrySetSSType[handle~handle, ssType~XNSSPPTypes.endReplySST, sendNow~TRUE];
IF code # normal THEN RETURN;
DO
[code~code] ← EntryFlushInput[handle~handle, wait~TRUE, bytesSkipped~0];
IF code # normal THEN RETURN;
[ssType~sst] ← EntryGetStatus[handle~handle, reset~TRUE];
IF (sst = XNSSPPTypes.endReplySST) THEN RETURN [TRUE];
ENDLOOP;
};
Stream Procs
GetChar:
PROC [self:
STREAM]
RETURNS [char:
CHAR] ~ {
handle: Handle ~ NARROW[self.streamData];
code: CompletionCode;
DO
[char, code] ← EntryGetChar[handle, self];
IF code = normal THEN RETURN;
SIGNAL Timeout[];
ENDLOOP;
};
EntryGetChar:
ENTRY
PROC [handle: Handle, self:
STREAM]
RETURNS [char:
CHAR, code: CompletionCode] ~ {
ENABLE UNWIND => NULL;
finger: Finger;
buffer: SPPBuffer;
startTime: Pulses ← Now[];
DO
IF handle.closed
THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
finger ← handle.inputDequeue;
SELECT finger.state
FROM
full, halfEmpty => {
finger.state ← halfEmpty;
buffer ← finger.buffer;
IF finger.attention
OR (buffer.hdr2.sst # handle.inputSSType)
THEN
RETURN WITH ERROR IO.EndOfStream[self];
IF finger.index >= finger.bytes
THEN {
IF buffer.hdr2.connCtl.endOfMsg
THEN
RETURN WITH ERROR IO.EndOfStream[self];
InputDequeue[handle];
LOOP };
TRUSTED { char ← buffer.body.chars[finger.index] };
finger.index ← finger.index + 1;
RETURN [char, normal];
};
emptying => {
WAIT handle.doneEmptying;
LOOP };
empty => {
IF IsTimedOut[handle.getPulseOut, startTime]
THEN
RETURN [char~, code~timedOut];
WAIT handle.inputReady;
LOOP };
ENDCASE => ERROR;
ENDLOOP;
};
GetBlock:
PROC [self:
STREAM, block:
REF
TEXT, startIndex:
NAT, count:
NAT]
RETURNS [nBytesRead:
NAT ← 0] ~ {
handle: Handle ~ NARROW[self.streamData];
finger: Finger ← NIL;
base: LONG POINTER ~ LOOPHOLE[block, LONG POINTER]+SIZE[TEXT[0]]; -- WORD SIZE DEPENDENT ????
stop: NAT ~ MIN[(startIndex+count), block.maxLength];
nBytes: NAT;
code: CompletionCode;
WHILE startIndex < stop
DO
[finger, code] ← GetBlockNext[self, handle, finger];
SELECT code
FROM
timedOut => { SIGNAL Timeout[]; LOOP };
normal => IF finger = NIL THEN EXIT;
ENDCASE => ERROR;
TRUSTED {
nBytes ← PrincOpsUtils.ByteBlt[
to~[
blockPointer~base,
startIndex~startIndex,
stopIndexPlusOne~stop],
from~[
blockPointer~@finger.buffer.body, -- WORD SIZE DEPENDENT ????
startIndex~finger.index,
stopIndexPlusOne~finger.bytes] ];
};
finger.index ← finger.index + nBytes;
nBytesRead ← nBytesRead + nBytes;
startIndex ← startIndex + nBytes;
block.length ← startIndex;
ENDLOOP;
IF finger # NIL THEN GetBlockLast[handle, finger];
};
UnsafeGetBlock:
UNSAFE PROC [self:
STREAM, block:
IO.UnsafeBlock]
RETURNS [nBytesRead:
INT ← 0] ~ {
handle: Handle ~ NARROW[self.streamData];
finger: Finger ← NIL;
startIndex: INT ← block.startIndex;
stop: INT ~ block.startIndex+block.count;
nBytes: NAT;
code: CompletionCode;
WHILE startIndex < stop
DO
[finger, code] ← GetBlockNext[self, handle, finger];
SELECT code
FROM
timedOut => { SIGNAL Timeout[]; LOOP };
normal => IF finger = NIL THEN EXIT;
ENDCASE => ERROR;
TRUSTED {
nBytes ← PrincOpsUtils.ByteBlt[
to~[
blockPointer~block.base,
startIndex~startIndex,
stopIndexPlusOne~stop],
from~[
blockPointer~@finger.buffer.body, -- WORD SIZE DEPENDENT ????
startIndex~finger.index,
stopIndexPlusOne~finger.bytes] ];
};
finger.index ← finger.index + nBytes;
nBytesRead ← nBytesRead + nBytes;
startIndex ← startIndex + nBytes;
ENDLOOP;
IF finger # NIL THEN GetBlockLast[handle, finger];
};
GetBlockNext:
ENTRY
PROC [self:
STREAM, handle: Handle, oldFinger: Finger]
RETURNS [Finger, CompletionCode] ~ {
ENABLE UNWIND => NULL;
finger: Finger ← NIL;
buffer: SPPBuffer;
startTime: Pulses ← Now[];
IF oldFinger #
NIL
THEN {
oldFinger.state ← halfEmpty;
BROADCAST handle.doneEmptying;
};
DO
IF handle.closed
THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
finger ← handle.inputDequeue;
buffer ← finger.buffer;
SELECT finger.state
FROM
full, halfEmpty => {
IF finger.attention
OR (buffer.hdr2.sst # handle.inputSSType)
THEN
RETURN[NIL, normal];
IF finger.index >= finger.bytes
THEN {
IF buffer.hdr2.connCtl.endOfMsg THEN RETURN[NIL, normal];
InputDequeue[handle];
LOOP };
finger.state ← emptying;
RETURN[finger, normal] };
emptying => {
WAIT handle.doneEmptying;
LOOP };
empty => {
IF IsTimedOut[handle.getPulseOut, startTime] THEN RETURN [NIL, timedOut];
WAIT handle.inputReady;
LOOP };
ENDCASE => ERROR;
ENDLOOP;
};
GetBlockLast:
ENTRY
PROC [handle: Handle, oldFinger: Finger] ~ {
IF oldFinger = NIL THEN ERROR; -- can't happen
oldFinger.state ← halfEmpty;
BROADCAST handle.doneEmptying;
};
PutChar:
PROC [self:
STREAM, char:
CHAR] ~ {
handle: Handle ~ NARROW[self.streamData];
code: CompletionCode;
DO
code ← EntryPutChar[handle, char];
IF code = normal THEN RETURN;
SIGNAL Timeout[];
ENDLOOP;
};
EntryPutChar:
ENTRY
PROC [handle: Handle, char:
CHAR]
RETURNS [CompletionCode] ~ {
ENABLE UNWIND => NULL;
finger: Finger;
startTime: Pulses ← Now[];
DO
IF handle.closed
THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
finger ← handle.outputEnqueue;
SELECT finger.state
FROM
empty => OutputMakeNotEmpty[handle];
filling => { WAIT handle.doneFilling; LOOP };
halfFull => NULL;
full, sending => {
IF IsTimedOut[handle.putPulseOut, startTime] THEN RETURN [timedOut];
WAIT handle.outputSpace; LOOP };
ENDCASE => ERROR;
IF finger.bytes =
LAST[BufIndex]
THEN {
OutputEnqueue[handle~handle, endOfMessage~FALSE];
LOOP };
TRUSTED { finger.buffer.body.chars[finger.bytes] ← char };
finger.bytes ← finger.bytes + 1;
RETURN [normal];
ENDLOOP;
};
PutBlock:
PROC [self:
STREAM, block:
REF
READONLY
TEXT, startIndex:
NAT, count:
NAT] ~ {
handle: Handle ~ NARROW[self.streamData];
finger: Finger ← NIL;
base: LONG POINTER ~ LOOPHOLE[block, LONG POINTER]+SIZE[TEXT[0]]; -- WORD SIZE DEPENDENT ????
stop: NAT ~ MIN[(startIndex+count), block.length];
nBytes: NAT;
code: CompletionCode;
WHILE startIndex < stop
DO
[finger, code] ← PutBlockNext[self, handle, finger];
SELECT code
FROM
timedOut => { SIGNAL Timeout[]; LOOP };
normal => NULL;
ENDCASE => ERROR;
IF finger = NIL THEN ERROR; -- can't happen
TRUSTED {
nBytes ← PrincOpsUtils.ByteBlt[
to~[
blockPointer~@finger.buffer.body, -- WORD SIZE DEPENDENT ????
startIndex~finger.bytes,
stopIndexPlusOne~BufIndex.LAST],
from~[
blockPointer~base,
startIndex~startIndex,
stopIndexPlusOne~stop] ];
};
finger.bytes ← finger.bytes + nBytes;
startIndex ← startIndex + nBytes;
ENDLOOP;
IF finger # NIL THEN PutBlockLast[handle, finger];
};
UnsafePutBlock:
PROC [self:
STREAM, block:
IO.UnsafeBlock] ~ {
handle: Handle ~ NARROW[self.streamData];
finger: Finger ← NIL;
startIndex: INT ← block.startIndex;
stop: INT ~ block.startIndex+block.count;
nBytes: NAT;
code: CompletionCode;
WHILE startIndex < stop
DO
[finger, code] ← PutBlockNext[self, handle, finger];
SELECT code
FROM
timedOut => { SIGNAL Timeout[]; LOOP };
normal => NULL;
ENDCASE => ERROR;
IF finger = NIL THEN ERROR; -- can't happen
TRUSTED {
nBytes ← PrincOpsUtils.ByteBlt[
to~[
blockPointer~@finger.buffer.body, -- WORD SIZE DEPENDENT ????
startIndex~finger.bytes,
stopIndexPlusOne~BufIndex.LAST],
from~[
blockPointer~block.base,
startIndex~startIndex,
stopIndexPlusOne~stop] ];
};
finger.bytes ← finger.bytes + nBytes;
startIndex ← startIndex + nBytes;
ENDLOOP;
IF finger # NIL THEN PutBlockLast[handle, finger];
};
PutBlockNext:
ENTRY
PROC [self:
STREAM, handle: Handle, oldFinger: Finger]
RETURNS [Finger, CompletionCode] ~ {
ENABLE UNWIND => NULL;
finger: Finger ← NIL;
startTime: Pulses ← Now[];
IF oldFinger #
NIL
THEN {
oldFinger.state ← halfFull;
BROADCAST handle.doneFilling;
OutputEnqueue[handle~handle, endOfMessage~FALSE] };
DO
IF handle.closed
THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
finger ← handle.outputEnqueue;
SELECT finger.state
FROM
empty => OutputMakeNotEmpty[handle];
filling => { WAIT handle.doneFilling; LOOP };
halfFull => NULL;
full, sending => {
IF IsTimedOut[handle.putPulseOut, startTime] THEN RETURN [NIL, timedOut];
WAIT handle.outputSpace; LOOP };
ENDCASE => ERROR;
IF finger.bytes =
LAST[BufIndex]
THEN {
OutputEnqueue[handle~handle, endOfMessage~FALSE];
LOOP };
finger.state ← filling;
RETURN[finger, normal];
ENDLOOP;
};
PutBlockLast:
ENTRY
PROC [handle: Handle, finger: Finger] ~ {
ENABLE UNWIND => NULL;
finger.state ← halfFull;
BROADCAST handle.doneFilling;
};
EndOf:
PROC [self:
STREAM]
RETURNS [
BOOL] ~ {
handle: Handle ~ NARROW[self.streamData];
RETURN [EntryEndOf[handle]];
};
EntryEndOf:
ENTRY
PROC [handle: Handle]
RETURNS [
BOOL] ~ {
ENABLE UNWIND => NULL;
finger: Finger;
DO
IF handle.closed THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
finger ← handle.inputDequeue;
SELECT finger.state
FROM
full => { finger.state ← halfEmpty; LOOP };
emptying => { WAIT handle.doneEmptying; LOOP };
halfEmpty =>
RETURN [
finger.attention OR (finger.buffer.hdr2.sst # handle.inputSSType)
OR ((finger.index = finger.bytes) AND finger.buffer.hdr2.connCtl.endOfMsg)
];
empty => RETURN [ FALSE ];
ENDCASE => ERROR;
ENDLOOP;
};
CharsAvail:
PROC [self:
STREAM, wait:
BOOL]
RETURNS [nChars:
INT] ~ {
handle: Handle ~ NARROW[self.streamData];
code: CompletionCode;
DO
[nChars, code] ← EntryCharsAvail[handle, wait];
IF code = normal THEN RETURN;
SIGNAL Timeout[];
ENDLOOP;
};
EntryCharsAvail:
ENTRY
PROC [handle: Handle, wait:
BOOL]
RETURNS [
INT, CompletionCode] ~ {
ENABLE UNWIND => NULL;
finger: Finger;
startTime: Pulses ← Now[];
DO
IF handle.closed
THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
finger ← handle.inputDequeue;
SELECT finger.state
FROM
full, halfEmpty => {
finger.state ← halfEmpty;
IF finger.attention
OR (finger.buffer.hdr2.sst # handle.inputSSType)
THEN RETURN[0, normal];
IF finger.index < finger.bytes
THEN RETURN [(finger.bytes - finger.index), normal];
IF finger.buffer.hdr2.connCtl.endOfMsg THEN RETURN[0, normal];
InputDequeue[handle];
LOOP };
emptying => {
WAIT handle.doneEmptying;
LOOP };
empty => {
IF NOT wait THEN RETURN [0, normal];
IF IsTimedOut[handle.getPulseOut, startTime] THEN RETURN [0, timedOut];
WAIT handle.inputReady;
LOOP };
ENDCASE => ERROR; -- Can't happen.
ENDLOOP;
};
Flush:
PROC [self:
STREAM] ~ {
handle: Handle ~ NARROW[self.streamData];
code: CompletionCode;
DO
code ← EntryFlush[handle];
IF code = normal THEN RETURN;
SIGNAL Timeout[];
ENDLOOP;
};
EntryFlush:
ENTRY
PROC [handle: Handle]
RETURNS [CompletionCode] ~ {
ENABLE UNWIND => NULL;
flushNum: CARDINAL;
startTime: Pulses ← Now[];
Get rid of partly filled buffer.
DO
IF handle.closed
THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
SELECT handle.outputEnqueue.state
FROM
filling => { WAIT handle.doneFilling; LOOP };
halfFull => { OutputEnqueue[handle~handle, endOfMessage~FALSE]; EXIT };
empty, full, sending => EXIT;
ENDCASE => ERROR;
ENDLOOP;
Check for already acknowledged.
IF handle.outputDequeueNum = handle.outputEnqueueNum THEN RETURN [normal];
Make sure there's at least one unsent buffer to get its ackMe bit set.
IF handle.outputSendNum = handle.outputEnqueueNum
THEN {
DO
IF handle.closed
THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
SELECT handle.outputEnqueue.state
FROM
empty => { OutputMakeNotEmpty[handle]; EXIT };
filling => { WAIT handle.doneFilling; LOOP };
halfFull => EXIT;
full, sending => {
IF IsTimedOut[handle.putPulseOut, startTime] THEN RETURN [timedOut];
WAIT handle.outputSpace; LOOP };
ENDCASE => ERROR;
ENDLOOP;
OutputEnqueue[handle~handle, endOfMessage~FALSE];
};
Wait until everything is acknowledged.
flushNum ← handle.outputEnqueueNum;
handle.flusherCnt ← handle.flusherCnt + 1;
DO
IF handle.closed
THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
IF SignedDiff[handle.outputDequeueNum, flushNum] >= 0 THEN EXIT;
IF IsTimedOut[handle.putPulseOut, startTime] THEN RETURN [timedOut];
WAIT handle.flusherWakeup;
ENDLOOP;
handle.flusherCnt ← handle.flusherCnt - 1;
RETURN [normal];
};
Close:
PROC [self:
STREAM, abort:
BOOL] = {
handle: Handle ~ NARROW[self.streamData];
EntryNotifyClosed[handle, localClose, "close called"];
FinishHandle[handle];
};
Input (Pull) Process
nRecvd: INT ← 0;
nTooShort: INT ← 0;
nBadConnID: INT ← 0;
nBadRemoteConnID: INT ← 0;
GotNewAck:
INTERNAL
PROC [handle: Handle, newAckNum:
CARDINAL] ~ {
Release sent packets that have been acknowledged.
NOTE: This messes with the OUTPUT queues from an INPUT process.
finger: Finger;
Sanity check.
IF SignedDiff[newAckNum, handle.outputSendNum] > 0
THEN {
This is a protocol error
RETURN };
WHILE SignedDiff[handle.outputDequeueNum, newAckNum] < 0
DO
IF handle.closed THEN RETURN;
finger ← handle.outputDequeue;
SELECT finger.state
FROM
full => NULL;
sending => {
-- extremely unlikely; occurs only when resending.
WAIT handle.doneSending;
LOOP };
ENDCASE => ERROR;
IF finger.attention
THEN {
Discard the attention finger.
temp: Finger;
nsb: XNSBuffer;
FOR temp ← finger, temp.next WHILE temp.next # finger DO NULL ENDLOOP;
temp.next ← finger.next;
IF finger.buffer = NIL THEN ERROR; -- Can't happen ????
TRUSTED { nsb ← LOOPHOLE[finger.buffer] };
XNSSocket.FreeBuffer[nsb];
finger.buffer ← NIL;
handle.outputIBAttnCnt ← handle.outputIBAttnCnt.PRED;
}
ELSE {
Advance the finger pointer, make the data buffer available.
finger.state ← empty;
BROADCAST handle.outputSpace;
};
handle.outputDequeueNum ← handle.outputDequeueNum + 1;
handle.outputDequeue ← finger.next;
ENDLOOP;
IF handle.flusherCnt > 0 THEN BROADCAST handle.flusherWakeup;
};
GotAckReq:
INTERNAL
PROC [handle: Handle, seqNum:
CARDINAL, systemPacket:
BOOL] ~
-- INLINE -- {
TODO: Work on "heWantsAlloc" heuristics ???? Do I want to sent an ack immediately if he's used up his allocation? ????
IF systemPacket
OR (SignedDiff[seqNum, handle.inputEnqueueNum] < 0)
THEN {
handle.mustSendAck ← TRUE;
BROADCAST handle.mgrShortWakeup;
BROADCAST handle.mgrLongWakeup }
ELSE {
handle.heWantsAlloc ← TRUE };
};
GotNewAlloc:
INTERNAL
PROC [handle: Handle, newAllocNum:
CARDINAL] ~
-- INLINE -- {
handle.recvdAllocNum ← newAllocNum;
BROADCAST handle.recvdNewAlloc;
};
GotOBAttn:
INTERNAL
PROC [handle: Handle, seqNum:
CARDINAL, type: AttentionType] ~ {
p, prev, new: OBAttn;
seqDiff: INTEGER ~ SignedDiff[seqNum, handle.inputDequeueNum];
IF (seqDiff < 0) OR (seqDiff > 50) THEN RETURN; -- what's to use instead of 50 ????
prev ← NIL; p ← handle.recvdOBAttnList;
DO
IF p = NIL THEN EXIT;
IF p.seqNum = seqNum THEN RETURN;
IF p.seqNum > seqNum THEN EXIT;
prev ← p; p ← p.next;
ENDLOOP;
new ← NEW[OBAttnObject←[next~p, seqNum~seqNum, attentionType~type]];
IF prev = NIL THEN handle.recvdOBAttnList ← new ELSE prev.next ← new;
BROADCAST handle.recvdOBAttn;
};
GotData:
INTERNAL
PROC [handle: Handle, newSeqNum:
CARDINAL, b: SPPBuffer, bytes:
NAT]
RETURNS [swapb: SPPBuffer] ~
-- INLINE -- {
PRECONDITION: NOT b.hdr2.connCtl.system
finger: Finger;
Check for duplicate.
IF SignedDiff[newSeqNum, handle.inputAckNum] < 0
THEN {
RETURN[b] };
Push inputEnqueue so there's room for new packet.
IF SignedDiff[newSeqNum, handle.inputDequeueNum] >=
INT[handle.inputBuffersAllocated]
THEN {
The new buffer won't fit, so drop it.
RETURN[b] };
WHILE (SignedDiff[newSeqNum, handle.inputEnqueueNum] >= 0)
DO
handle.inputEnqueue ← handle.inputEnqueue.next;
handle.inputEnqueueNum ← handle.inputEnqueueNum + 1;
ENDLOOP;
Set finger to the slot where the new packet should go.
finger ← handle.inputAck;
THROUGH [0 .. SignedDiff[newSeqNum, handle.inputAckNum])
DO
finger ← finger.next;
ENDLOOP;
Put the packet there. If it's a duplicate we use the new one, but so what?
swapb ← finger.buffer;
finger.buffer ← b;
finger.bytes ← bytes;
finger.index ← 0;
finger.attention ← b.hdr2.connCtl.attn;
finger.state ← full;
Mark as acknowledged as many packets as possible and pass them to the client.
WHILE (handle.inputAck.state = full)
AND (handle.inputAckNum # handle.inputEnqueueNum)
DO
BROADCAST handle.inputReady;
handle.inputAckNum ← handle.inputAckNum + 1;
handle.inputAck ← handle.inputAck.next;
ENDLOOP;
};
Receive: XNSSocketBackdoor.ReceiveProc
[handle: XNSSocket.Handle, b: XNSBuf.Buffer, clientData: REF ANY] RETURNS [XNSBuf.Buffer]
~ {
nRecvd ← nRecvd.SUCC; -- DEBUG
RETURN [ EntryReceive[NARROW[clientData], b] ] };
EntryReceive:
ENTRY PROC [handle: Handle, nsb: XNSBuf.Buffer]
RETURNS [XNSBuf.Buffer] ~ {
Note this sends error packets itself,
ENABLE UNWIND => NULL;
bytes: NAT;
IF handle.closed THEN RETURN [nsb];
bytes ← XNSSocket.GetUserBytes[nsb];
SELECT nsb.hdr1.type
FROM
error => {
OPEN XNSErrorTypes;
eb: XNSErrorBuf.Buffer;
TRUSTED { eb ← LOOPHOLE[nsb] };
SELECT eb.hdr2.type
FROM
noSocketErr, listenerRejectErr, protocolViolationErr => {
NotifyClosed[handle, remoteClose, "remote close"] };
ENDCASE => {
XNSSocketBackdoor.FlushCache[handle.socket] };
RETURN [nsb]
};
spp => {
b: XNSSPPBuf.Buffer;
newSeqNum: CARDINAL;
newAckNum: CARDINAL;
newAllocNum: CARDINAL;
TRUSTED { b ← LOOPHOLE[nsb] };
IF bytes < XNSSPPBuf.hdrBytes
THEN {
XNSSocket.ReturnError[nsb, XNSErrorTypes.unspecifiedErr];
nsb ← NIL;
nTooShort ← nTooShort.SUCC; -- DEBUG
RETURN [nsb] };
IF b.hdr2.destConnID # handle.connectionID
THEN {
IF b.hdr2.destConnID # unknownConnectionID
THEN {
XNSSocket.ReturnError[nsb, XNSErrorTypes.protocolViolationErr];
nsb ← NIL;
nBadConnID ← nBadConnID.SUCC; -- DEBUG
RETURN [nsb] };
};
IF b.hdr2.sourceConnID # handle.remoteConnectionID
THEN {
XNSSocket.ReturnError[nsb, XNSErrorTypes.protocolViolationErr];
nsb ← NIL;
nBadRemoteConnID ← nBadRemoteConnID.SUCC; -- DEBUG
RETURN [nsb] };
newSeqNum ← Endian.CardFromH[b.hdr2.seqNum];
newAckNum ← Endian.CardFromH[b.hdr2.ackNum];
newAllocNum ← Endian.CardFromH[b.hdr2.allocNum];
handle.recvdTime ← Now[];
IF SignedDiff[newAllocNum, handle.recvdAllocNum] > 0
THEN
GotNewAlloc[handle, newAllocNum];
IF SignedDiff[newAckNum, handle.outputDequeueNum] >= 0
THEN
GotNewAck[handle, newAckNum];
IF b.hdr2.connCtl.sendAck
THEN
GotAckReq[handle, newSeqNum, b.hdr2.connCtl.system];
Handle a data packet.
IF
NOT b.hdr2.connCtl.system
THEN {
IF b.hdr2.connCtl.attn
THEN
TRUSTED {
GotOBAttn[handle, newSeqNum, LOOPHOLE[b.body.bytes[0]] ] };
b ← GotData[handle, newSeqNum, b, (bytes - XNSSPPBuf.hdrBytes)];
TRUSTED { nsb ← LOOPHOLE[b] };
};
RETURN [nsb];
};
ENDCASE => {
XNSSocket.ReturnError[nsb, XNSErrorTypes.invalidPacketTypeErr];
nsb ← NIL;
RETURN [nsb];
};
};
Pull:
PROC [handle: Handle] ~ {
nsb: XNSBuf.Buffer ← NIL;
XNSSocket.SetGetTimeout[handle.socket, XNSSocket.waitForever];
XNSSocketBackdoor.SetDirectReceive[handle.socket, Receive, handle];
WHILE
NOT handle.closed
DO
IF (nsb ← XNSSocket.Get[handle.socket]) = NIL THEN LOOP;
IF (nsb ← Receive[handle.socket, nsb, handle]) = NIL THEN LOOP;
XNSSocket.FreeBuffer[nsb];
nsb ← NIL;
ENDLOOP;
XNSSocketBackdoor.SetDirectReceive[handle.socket, NIL, NIL];
handle ← NIL; -- for finalization
};
}.