DIRECTORY Basics USING [Card16FromH, HFromCard16, HWORD, LowHalf], BasicTime USING [GetClockPulses, MicrosecondsToPulses, Pulses, PulsesToMicroseconds], Booting USING [RegisterProcs, RollbackProc], IO USING [CreateStream, CreateStreamProcs, EndOfStream, STREAM, StreamProcs, UnsafeBlock], PrincOpsUtils USING [ByteBlt], Process USING [Abort, ConditionPointer, Detach, DisableTimeout, EnableAborts, Milliseconds, MsecToTicks, priorityBackground, priorityForeground, SetPriority, SetTimeout, Ticks], Rope USING [ROPE], SafeStorage USING [CantEstablishFinalization, EnableFinalization, EstablishFinalization, FinalizationQueue, FQNext, NewFQ, ReEstablishFinalization], XNS USING [Address, Socket], XNSBuf USING [Buffer], XNSEchoBuf USING [Buffer, hdrBytes], XNSErrorBuf USING [Buffer], XNSErrorTypes USING [badChecksumErr, cantGetThereErr, invalidPacketTypeErr, listenerRejectErr, noSocketErr, protocolViolationErr, resourceLimitsErr, unspecifiedErr], XNSRouter USING [GetHops, Hops, unreachable], XNSSocket USING [AllocBuffer, Create, Destroy, FreeBuffer, Get, GetRemoteAddress, GetUserBytes, Handle, Kick, Milliseconds, Put, ReturnError, ReturnToSender, SetGetTimeout, SetRemoteAddress, SetUserBytes, waitForever], XNSSocketBackdoor USING [FlushCache, PutCached, ReceiveProc, SetDirectReceive], XNSSPPBuf USING [Buffer, ConnCtl, hdrBytes], XNSSPPTypes USING [endReplySST, endSST], XNSStream USING [AttentionType, CloseReason, FilterProc, ListenerProc, Milliseconds, State, SubSequenceType, waitForever], XNSStreamPrivate USING [AttentionType, BufIndex, Finger, FingerObject, Handle, HWORD, OBAttn, OBAttnObject, Object, SignedDiff, SPPBuffer, SubSequenceType, unknownConnectionID, XNSBuffer]; XNSStreamImpl: CEDAR MONITOR LOCKS handle USING handle: Handle IMPORTS Basics, BasicTime, Booting, IO, PrincOpsUtils, Process, SafeStorage, XNSRouter, XNSStreamPrivate, XNSSocket, XNSSocketBackdoor EXPORTS XNSStream ~ { OPEN XNSStreamPrivate; CheckUserBytes: PROC [buffer: XNSBuffer, bytes: CARDINAL] ~ { IF bytes # XNSSocket.GetUserBytes[buffer] THEN ERROR }; ConnectionClosed: PUBLIC ERROR [why: XNSStream.CloseReason, text: Rope.ROPE] ~ CODE; Timeout: PUBLIC SIGNAL ~ CODE; CompletionCode: TYPE ~ { normal, timedOut }; STREAM: TYPE ~ IO.STREAM; streamProcs: REF IO.StreamProcs ~ IO.CreateStreamProcs [ variety~$inputOutput, class~$XNS, getChar~GetChar, getBlock~GetBlock, unsafeGetBlock~UnsafeGetBlock, endOf~EndOf, charsAvail~CharsAvail, putChar~PutChar, putBlock~PutBlock, unsafePutBlock~UnsafePutBlock, flush~Flush, close~Close ]; Milliseconds: TYPE ~ XNSStream.Milliseconds; waitForever: Milliseconds ~ XNSStream.waitForever; Pulses: TYPE ~ BasicTime.Pulses; oneSecondOfPulses: Pulses ~ BasicTime.MicrosecondsToPulses[1000000]; MsecToPulses: PROC [msec: Milliseconds] RETURNS [Pulses] ~ { IF msec >= waitForever THEN RETURN [Pulses.LAST] ELSE RETURN [BasicTime.MicrosecondsToPulses[1000*msec]] }; PulsesSince: PROC [then: Pulses] RETURNS [Pulses] ~ INLINE { RETURN [BasicTime.GetClockPulses[] - then] }; TimeoutTicksFromPulses: PROC[pulses: Pulses] RETURNS[ticks: Process.Ticks] ~ { usec: LONG CARDINAL ~ BasicTime.PulsesToMicroseconds[pulses]; msec: Milliseconds; msec _ (usec + usec/10) / 1000; RETURN[ Process.MsecToTicks[Process.Milliseconds[msec]] ]; }; Now: PROC RETURNS [Pulses] ~ INLINE { RETURN [BasicTime.GetClockPulses[]] }; IsTimedOut: PROC [timeout, then: Pulses] RETURNS [BOOL] ~ INLINE { RETURN [(BasicTime.GetClockPulses[] - then) > timeout] }; defaultConnCtl: XNSSPPBuf.ConnCtl ~ [ system~FALSE, sendAck~FALSE, attn~FALSE, endOfMsg~FALSE, filler~0]; defaultSystemConnCtl: XNSSPPBuf.ConnCtl ~ [ system~TRUE, sendAck~FALSE, attn~FALSE, endOfMsg~FALSE, filler~0]; defaultAttnConnCtl: XNSSPPBuf.ConnCtl ~ [ system~FALSE, sendAck~FALSE, attn~TRUE, endOfMsg~FALSE, filler~0]; AllocateOutputBuffer: INTERNAL PROC [handle: Handle] RETURNS [nsb: XNSBuffer, b: SPPBuffer] ~ { nsb _ XNSSocket.AllocBuffer[handle.socket]; TRUSTED { b _ LOOPHOLE[nsb] }; b.hdr1.type _ spp; b.hdr2.sourceConnID _ handle.connectionID; b.hdr2.destConnID _ handle.remoteConnectionID; }; OutputMakeNotEmpty: INTERNAL PROC [handle: Handle] ~ { finger: Finger ~ handle.outputEnqueue; buffer: SPPBuffer _ finger.buffer; IF finger.state # empty THEN ERROR; -- Can't happen. IF buffer = NIL THEN finger.buffer _ buffer _ AllocateOutputBuffer[handle].b; buffer.hdr2.connCtl _ IF finger.attention THEN defaultAttnConnCtl ELSE defaultConnCtl; buffer.hdr2.sst _ handle.outputSSType; buffer.hdr2.seqNum _ Basics.HFromCard16[handle.outputEnqueueNum]; finger.state _ halfFull; finger.index _ finger.bytes _ 0; }; InitSystemOutputBuffer: INTERNAL PROC [handle: Handle, buffer: SPPBuffer] ~ { buffer.hdr2.connCtl _ defaultSystemConnCtl; buffer.hdr2.sst _ handle.outputSSType; buffer.hdr2.seqNum _ Basics.HFromCard16[handle.outputSendNum]; TRUSTED { XNSSocket.SetUserBytes[LOOPHOLE[buffer], XNSSPPBuf.hdrBytes] }; }; OutputEnqueue: INTERNAL PROC [handle: Handle, endOfMessage: BOOL] ~ { finger: Finger ~ handle.outputEnqueue; IF finger.state # halfFull THEN ERROR; -- Can't happen. finger.buffer.hdr2.connCtl.endOfMsg _ endOfMessage; TRUSTED { XNSSocket.SetUserBytes[LOOPHOLE[finger.buffer], XNSSPPBuf.hdrBytes + finger.bytes] }; finger.state _ full; IF finger.index # 0 THEN ERROR; -- Can't happen. handle.outputEnqueue _; handle.outputEnqueueNum _ handle.outputEnqueueNum + 1; NOTIFY handle.outputReady; }; ReadyOutputBuffer: INTERNAL PROC [handle: Handle, buffer: SPPBuffer, requestAck: BOOL] ~ { expectedAckNum, allocNum: CARD16; IF requestAck THEN { expectedAckNum _ Basics.Card16FromH[buffer.hdr2.seqNum]; IF (NOT buffer.hdr2.connCtl.system) AND (expectedAckNum = handle.outputSendNum) THEN expectedAckNum _ expectedAckNum + 1; handle.expectedAckNum _ expectedAckNum; buffer.hdr2.connCtl.sendAck _ TRUE; handle.sentAckReqTime _ Now[] }; buffer.hdr2.ackNum _ Basics.HFromCard16[handle.inputAckNum]; handle.mustSendAck _ FALSE; allocNum _ handle.inputDequeueNum + handle.inputBuffersAllocated - 1; buffer.hdr2.allocNum _ Basics.HFromCard16[allocNum]; IF allocNum # handle.sentAllocNum THEN { handle.sentAllocNum _ allocNum }; IF buffer.hdr1.type # spp THEN ERROR; IF buffer.hdr2.sourceConnID # handle.connectionID THEN ERROR; IF buffer.hdr2.destConnID # handle.remoteConnectionID THEN ERROR; }; MakeCopyOfXNSBuffer: PROC [sH: XNSSocket.Handle, nsb: XNSBuffer] RETURNS[copy: XNSBuffer] ~ { bytes: CARDINAL ~ Basics.Card16FromH[nsb.hdr1.length]; copy _ XNSSocket.AllocBuffer[sH]; TRUSTED { [] _ PrincOpsUtils.ByteBlt[ to ~ [ blockPointer ~ @copy.hdr1, startIndex ~ 0, stopIndexPlusOne ~ bytes], from ~ [ blockPointer ~ @nsb.hdr1, startIndex ~ 0, stopIndexPlusOne ~ bytes] ]; }; }; InputDequeue: INTERNAL PROC [handle: Handle] ~ -- INLINE -- { finger: Finger; sendAllocNow, cantPiggyback: BOOL; finger _ handle.inputDequeue; finger.state _ empty; finger.attention _ FALSE; handle.inputDequeue _; handle.inputDequeueNum _ handle.inputDequeueNum + 1; sendAllocNow _ handle.heWantsAlloc AND (SignedDiff[handle.inputEnqueueNum, handle.inputDequeueNum] <= (handle.inputBuffersAllocated/2)); IF sendAllocNow THEN handle.heWantsAlloc _ FALSE; cantPiggyback _ (handle.outputSendNum = handle.outputEnqueueNum) OR (SignedDiff[handle.recvdAllocNum, handle.outputSendNum] < 0); IF sendAllocNow AND cantPiggyback THEN { nsb: XNSBuffer; b: SPPBuffer; [nsb, b] _ AllocateOutputBuffer[handle]; InitSystemOutputBuffer[handle, b]; ReadyOutputBuffer[handle, b, FALSE]; CheckUserBytes[nsb, XNSSPPBuf.hdrBytes]; -- DEBUG ???? XNSSocketBackdoor.PutCached[nsb]; XNSSocket.FreeBuffer[nsb]; }; }; NotifyClosed: INTERNAL PROC [handle: Handle, closeReason: XNSStream.CloseReason, closeText: Rope.ROPE] ~ { IF NOT handle.closed THEN { handle.closeReason _ closeReason; handle.closeText _ closeText; handle.closed _ TRUE }; BROADCAST handle.inputReady; BROADCAST handle.doneEmptying; -- not necessary BROADCAST handle.outputSpace; BROADCAST handle.outputReady; BROADCAST handle.recvdNewAlloc; BROADCAST handle.doneFilling; -- not necessary BROADCAST handle.doneSending; -- not necessary BROADCAST handle.mgrShortWakeup; BROADCAST handle.mgrLongWakeup; BROADCAST handle.flusherWakeup; BROADCAST handle.recvdOBAttn; XNSSocket.Kick[handle.socket]; }; EntryNotifyClosed: ENTRY PROC [handle: Handle, closeReason: XNSStream.CloseReason, closeText: Rope.ROPE] ~ { NotifyClosed[handle, closeReason, closeText] }; MakeBufferRing: PROC [sH: XNSSocket.Handle, nBuffers: CARDINAL] RETURNS [finger: Finger _ NIL] ~ { tail: Finger _ NIL; THROUGH [1..nBuffers] DO finger _ NEW[ FingerObject _ [next~finger] ]; IF tail = NIL THEN tail _ finger; ENDLOOP; _ finger; }; FreeBufferRing: PROC [finger: Finger] ~ { WHILE finger # NIL DO IF finger.buffer # NIL THEN { nsb: XNSBuffer; TRUSTED { nsb _ LOOPHOLE[finger.buffer] }; XNSSocket.FreeBuffer[nsb]; finger.buffer _ NIL }; { temp: Finger ~; _ NIL; finger _ temp }; ENDLOOP; }; InitConditions: PROC [handle: Handle] ~ { TRUSTED { Process.EnableAborts[@handle.auxProcsDone]; -- ???? Process.EnableAborts[@handle.inputReady]; Process.EnableAborts[@handle.doneEmptying]; -- ???? Process.EnableAborts[@handle.outputSpace]; Process.EnableAborts[@handle.outputReady]; Process.EnableAborts[@handle.recvdNewAlloc]; Process.EnableAborts[@handle.doneFilling]; -- ???? Process.EnableAborts[@handle.doneSending]; -- ???? Process.EnableAborts[@handle.mgrShortWakeup]; -- ???? Process.EnableAborts[@handle.mgrLongWakeup]; -- ???? Process.EnableAborts[@handle.flusherWakeup]; Process.EnableAborts[@handle.recvdOBAttn]; }; }; InitTimeouts: PROC [handle: Handle, getTimeout, putTimeout: Milliseconds] ~ { handle.getTimeout _ getTimeout; handle.getPulseOut _ MsecToPulses[getTimeout]; IF getTimeout = waitForever THEN TRUSTED { Process.DisableTimeout[@handle.inputReady] } ELSE TRUSTED { Process.SetTimeout[@handle.inputReady, TimeoutTicksFromPulses[handle.getPulseOut]] }; handle.putTimeout _ putTimeout; handle.putPulseOut _ MsecToPulses[putTimeout]; IF putTimeout = waitForever THEN TRUSTED { Process.DisableTimeout[@handle.outputSpace] } ELSE TRUSTED { Process.SetTimeout[@handle.outputSpace, TimeoutTicksFromPulses[handle.putPulseOut]] }; handle.cacheFlushPulseOut _ 30 * oneSecondOfPulses; handle.roundTripPulses _ oneSecondOfPulses; UpdateTimeouts[handle]; }; UpdateTimeouts: PROC [handle: Handle] ~ { handle.waitForAckPulseOut _ 3 * handle.roundTripPulses; handle.waitForAllocPulseOut _ handle.waitForAckPulseOut; TRUSTED { Process.SetTimeout[@handle.mgrShortWakeup, TimeoutTicksFromPulses[handle.waitForAckPulseOut]] }; handle.probePulseOut _ 30 * handle.roundTripPulses; TRUSTED { Process.SetTimeout[@handle.mgrLongWakeup, TimeoutTicksFromPulses[handle.probePulseOut]] }; handle.noActivityPulseOut _ 100 * handle.roundTripPulses; }; MakeHandle: PROC [socket: XNSSocket.Handle, connectionID, remoteConnectionID: HWORD, sendBuffers, recvBuffers: CARDINAL, getTimeout, putTimeout: Milliseconds] RETURNS [handle: Handle] ~ { handle _ NEW[Object]; handle.socket _ socket; handle.remote _ XNSSocket.GetRemoteAddress[socket]; handle.recvdTime _ Now[]; handle.cacheFlushedTime _ Now[]; handle.sentAckReqTime _ Now[]; handle.connectionID _ connectionID; handle.remoteConnectionID _ remoteConnectionID; InitConditions[handle]; InitTimeouts[handle, getTimeout, putTimeout]; { f: Finger ~ MakeBufferRing[handle.socket, sendBuffers]; handle.outputEnqueue _ handle.outputSend _ handle.outputDequeue _ f; handle.outputBuffersAllocated _ sendBuffers }; { f: Finger ~ MakeBufferRing[handle.socket, recvBuffers]; handle.inputEnqueue _ handle.inputAck _ handle.inputDequeue _ f; handle.inputBuffersAllocated _ recvBuffers }; TRUSTED { Process.Detach[FORK Pull[handle]]; handle.auxProcCnt _ handle.auxProcCnt.SUCC; Process.Detach[FORK Mgr[handle]]; handle.auxProcCnt _ handle.auxProcCnt.SUCC; Process.Detach[FORK Push[handle]]; handle.auxProcCnt _ handle.auxProcCnt.SUCC; Process.Detach[FORK Push[handle]]; handle.auxProcCnt _ handle.auxProcCnt.SUCC; }; AddNewHandle[newHandle~handle]; SafeStorage.EnableFinalization[handle]; }; NotifyAuxProcDone: ENTRY PROC [handle: Handle] ~ { IF handle.auxProcCnt = 0 THEN ERROR; handle.auxProcCnt _ handle.auxProcCnt.PRED; IF handle.auxProcCnt = 0 THEN NOTIFY handle.auxProcsDone; }; WaitAuxProcsDone: ENTRY PROC [handle: Handle] ~ { WHILE handle.auxProcCnt > 0 DO WAIT handle.auxProcsDone; ENDLOOP; }; FinishHandle: PROC [handle: Handle] ~ { IF NOT handle.closed THEN ERROR; IF handle.finished THEN RETURN; WaitAuxProcsDone[handle]; FreeBufferRing[handle.outputEnqueue]; handle.outputEnqueue _ handle.outputSend _ handle.outputDequeue _ NIL; FreeBufferRing[handle.inputEnqueue]; handle.inputEnqueue _ handle.inputAck _ handle.inputDequeue _ NIL; XNSSocket.Destroy[handle.socket]; handle.finished _ TRUE }; SendRFC: PROC [socket: XNSSocket.Handle, connectionID: HWORD] ~ { nsb: XNSBuffer; b: SPPBuffer; nsb _ XNSSocket.AllocBuffer[socket]; TRUSTED { b _ LOOPHOLE[nsb] }; b.hdr1.type _ spp; b.hdr2.connCtl _ defaultSystemConnCtl; b.hdr2.connCtl.sendAck _ TRUE; b.hdr2.sst _ 0; b.hdr2.sourceConnID _ connectionID; b.hdr2.destConnID _ unknownConnectionID; b.hdr2.seqNum _ [0, 0]; b.hdr2.ackNum _ [0, 0]; b.hdr2.allocNum _ [0, 0]; XNSSocket.SetUserBytes[nsb, XNSSPPBuf.hdrBytes]; XNSSocket.Put[b~nsb]; }; defaultInitialRetransmissionTime: Milliseconds _ 500; Create: PUBLIC PROC [remote: XNS.Address, getTimeout, putTimeout: Milliseconds _ waitForever] RETURNS [STREAM] ~ { nsb: XNSBuffer _ NIL; socket: XNSSocket.Handle _ NIL; handle: Handle _ NIL; retransmissionTime: Milliseconds; connID: HWORD; hops: XNSRouter.Hops; getTimeout _ MIN[getTimeout, waitForever]; putTimeout _ MIN[putTimeout, waitForever]; IF (hops _ XNSRouter.GetHops[]) = XNSRouter.unreachable THEN ERROR ConnectionClosed[noRoute, "no route"]; retransmissionTime _ defaultInitialRetransmissionTime; BEGIN ENABLE UNWIND => { IF nsb # NIL THEN { XNSSocket.FreeBuffer[nsb]; nsb _ NIL }; IF socket # NIL THEN { XNSSocket.Destroy[socket]; socket _ NIL } }; socket _ XNSSocket.Create[remote~remote, sendBuffers~10, recvBuffers~14]; -- buffers ???? connID _ GetUniqueConnID[]; FOR nTries: CARDINAL IN [1..10] DO IF nTries > 5 THEN retransmissionTime _ 2*retransmissionTime; XNSSocket.SetGetTimeout[socket, retransmissionTime]; SendRFC[socket, connID]; nsb _ XNSSocket.Get[socket]; IF nsb = NIL THEN LOOP; SELECT nsb.hdr1.type FROM spp => { b: SPPBuffer; TRUSTED { b _ LOOPHOLE[nsb] }; IF b.hdr2.destConnID # connID THEN { XNSSocket.ReturnError[nsb, XNSErrorTypes.protocolViolationErr]; nsb _ NIL; LOOP }; IF # THEN { XNSSocket.ReturnError[nsb, XNSErrorTypes.protocolViolationErr]; nsb _ NIL; LOOP }; XNSSocket.SetRemoteAddress[socket, b.hdr1.source]; handle _ MakeHandle[socket~socket, connectionID~connID, remoteConnectionID~b.hdr2.sourceConnID, sendBuffers~6, recvBuffers~6, getTimeout~getTimeout, putTimeout~putTimeout]; nsb _ Receive[socket, nsb, handle]; IF nsb # NIL THEN { XNSSocket.FreeBuffer[nsb]; nsb _ NIL }; RETURN [IO.CreateStream[streamProcs~streamProcs, streamData~handle]]; }; error => { OPEN XNSErrorTypes; eb: XNSErrorBuf.Buffer; TRUSTED { eb _ LOOPHOLE[nsb] }; SELECT eb.hdr2.type FROM badChecksumErr, resourceLimitsErr => LOOP; noSocketErr, listenerRejectErr, invalidPacketTypeErr, protocolViolationErr => ERROR ConnectionClosed[remoteReject, "rejected"]; cantGetThereErr => ERROR ConnectionClosed[noRoute, "no route"]; ENDCASE => ERROR ConnectionClosed[unknown, "error reply to RFC"]; }; ENDCASE => { XNSSocket.ReturnError[nsb, XNSErrorTypes.invalidPacketTypeErr]; nsb _ NIL; LOOP }; ENDLOOP; ERROR ConnectionClosed[noResponse, "no response"]; END; }; Listener: TYPE ~ REF ListenerObject; ListenerObject: PUBLIC TYPE ~ RECORD [ next: Listener, destroyed: BOOL _ FALSE, listenProcess: PROCESS ]; CreateListener: PUBLIC PROC [ socket: XNS.Socket, worker: XNSStream.ListenerProc, getTimeout, putTimeout: Milliseconds _ waitForever, filter: XNSStream.FilterProc _ NIL, -- NIL => Accept all requests echoFilter: XNSStream.FilterProc _ NIL] -- NIL => Answer all echos RETURNS [listener: Listener] ~ { listener _ NEW[ ListenerObject _ [listenProcess~ FORK Listen[socket, worker, getTimeout, putTimeout, filter, echoFilter]] ]; AddNewListener[newListener~listener]; SafeStorage.EnableFinalization[listener]; }; DestroyListener: PUBLIC PROC [listener: Listener] ~ { IF listener.destroyed THEN RETURN; TRUSTED { Process.Abort[listener.listenProcess]; JOIN listener.listenProcess }; listener.destroyed _ TRUE; }; Listen: PROC [ socket: XNS.Socket, worker: XNSStream.ListenerProc, getTimeout, putTimeout: Milliseconds, filter: XNSStream.FilterProc, echoFilter: XNSStream.FilterProc] ~ { socketHandle: XNSSocket.Handle _ XNSSocket.Create[getTimeout~XNSSocket.waitForever, local~socket]; nsb: XNSBuffer _ NIL; bytes: NAT; { ENABLE UNWIND => { IF nsb # NIL THEN { XNSSocket.FreeBuffer[nsb]; nsb _ NIL }; XNSSocket.Destroy[socketHandle] }; DO IF nsb # NIL THEN { XNSSocket.FreeBuffer[nsb]; nsb _ NIL }; IF (nsb _ XNSSocket.Get[socketHandle]) = NIL THEN LOOP; bytes _ XNSSocket.GetUserBytes[nsb]; SELECT nsb.hdr1.type FROM echo => { b: XNSEchoBuf.Buffer; IF bytes < XNSEchoBuf.hdrBytes THEN LOOP; TRUSTED { b _ LOOPHOLE[nsb] }; IF b.hdr2.type # request THEN LOOP; IF (echoFilter # NIL) AND NOT echoFilter[nsb.hdr1.source] THEN LOOP; b.hdr2.type _ reply; { XNSSocket.ReturnToSender[nsb]; nsb _ NIL }; }; spp => { b: SPPBuffer; streamSocketHandle: XNSSocket.Handle; stream: STREAM; handle: Handle; IF bytes < XNSSPPBuf.hdrBytes THEN LOOP; TRUSTED { b _ LOOPHOLE[nsb] }; IF (b.hdr2.seqNum # [0, 0]) OR (b.hdr2.destConnID # unknownConnectionID) OR (b.hdr2.sourceConnID = unknownConnectionID) THEN { XNSSocket.ReturnError[nsb, XNSErrorTypes.protocolViolationErr]; nsb _ NIL; LOOP }; IF (handle _ FindExistingHandle[remote~nsb.hdr1.source, remoteConnID~b.hdr2.sourceConnID]) # NIL THEN { ReceiveRFC[handle, nsb]; LOOP }; IF (filter # NIL) AND NOT filter[nsb.hdr1.source] THEN { XNSSocket.ReturnError[nsb, XNSErrorTypes.listenerRejectErr]; nsb _ NIL; LOOP }; streamSocketHandle _ XNSSocket.Create[remote~nsb.hdr1.source, sendBuffers~10, recvBuffers~14]; -- how many buffers really? what should the timeouts be? ???? handle _ MakeHandle[socket~streamSocketHandle, connectionID~GetUniqueConnID[], remoteConnectionID~b.hdr2.sourceConnID, sendBuffers~6, recvBuffers~6, getTimeout~getTimeout, putTimeout~putTimeout]; -- what should buffers be really? ???? ReceiveRFC[handle, nsb]; stream _ IO.CreateStream[streamProcs~streamProcs, streamData~handle]; TRUSTED { Process.Detach[FORK worker[stream~stream, remote~b.hdr1.source]] }; }; ENDCASE; ENDLOOP; }; }; ReceiveRFC: PROC [handle: Handle, nsb: XNSBuffer] ~ { newNsb: XNSBuffer; newNsb _ MakeCopyOfXNSBuffer[handle.socket, nsb]; TRUSTED { (LOOPHOLE[newNsb, SPPBuffer]).hdr2.connCtl.sendAck _ TRUE }; -- force a reply newNsb _ EntryReceive[handle, newNsb]; IF newNsb # NIL THEN XNSSocket.FreeBuffer[newNsb]; }; GetTimeouts: PUBLIC PROC [self: IO.STREAM] RETURNS [getTimeout, putTimeout: Milliseconds] ~ { handle: Handle = NARROW[self.streamData]; [getTimeout, putTimeout] _ EntryGetTimeouts[handle] }; EntryGetTimeouts: ENTRY PROC [handle: Handle] RETURNS [getTimeout, putTimeout: Milliseconds] ~ { IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; getTimeout _ handle.getTimeout; putTimeout _ handle.putTimeout; }; SetTimeouts: PUBLIC PROC [self: IO.STREAM, getTimeout, putTimeout: Milliseconds _ waitForever] ~ { handle: Handle = NARROW[self.streamData]; EntrySetTimeouts[handle, getTimeout, putTimeout]; }; EntrySetTimeouts: ENTRY PROC [handle: Handle, getTimeout, putTimeout: Milliseconds] ~ { IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; InitTimeouts[handle, getTimeout, putTimeout]; }; SetSSType: PUBLIC PROC [self: STREAM, ssType: SubSequenceType] ~ { handle: Handle = NARROW[self.streamData]; code: CompletionCode; DO code _ EntrySetSSType[handle, ssType]; IF code = normal THEN RETURN; SIGNAL Timeout[]; ENDLOOP; }; EntrySetSSType: ENTRY PROC [handle: Handle, ssType: SubSequenceType, sendNow: BOOL _ FALSE] RETURNS [CompletionCode _ normal] ~ { ENABLE UNWIND => NULL; finger: Finger; startTime: Pulses _ Now[]; IF ssType = handle.outputSSType THEN RETURN; DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; finger _ handle.outputEnqueue; SELECT finger.state FROM empty => EXIT; sending, full => { IF IsTimedOut[handle.putPulseOut, startTime] THEN RETURN [timedOut]; WAIT handle.outputSpace; LOOP }; filling => { WAIT handle.doneFilling; LOOP }; halfFull => { OutputEnqueue[handle~handle, endOfMessage~FALSE]; LOOP }; ENDCASE => ERROR; ENDLOOP; handle.outputSSType _ ssType; OutputMakeNotEmpty[handle]; -- gets new handle.outputSSType IF sendNow THEN OutputEnqueue[handle~handle, endOfMessage~FALSE]; }; SendEndOfMessage: PUBLIC PROC [self: STREAM] ~ { handle: Handle = NARROW[self.streamData]; code: CompletionCode; DO code _ EntrySendEndOfMessage[handle]; IF code = normal THEN RETURN; SIGNAL Timeout[]; ENDLOOP; }; EntrySendEndOfMessage: ENTRY PROC [handle: Handle] RETURNS [CompletionCode] ~ { ENABLE UNWIND => NULL; finger: Finger; startTime: Pulses _ Now[]; DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; finger _ handle.outputEnqueue; SELECT finger.state FROM empty => { OutputMakeNotEmpty[handle]; EXIT }; sending, full => { IF IsTimedOut[handle.putPulseOut, startTime] THEN RETURN [timedOut]; WAIT handle.outputSpace; LOOP }; filling => { WAIT handle.doneFilling; LOOP }; halfFull => EXIT; ENDCASE => ERROR; ENDLOOP; OutputEnqueue[handle~handle, endOfMessage~TRUE]; RETURN [normal]; }; SendAttention: PUBLIC PROC [self: STREAM, attentionType: AttentionType] ~ { handle: Handle = NARROW[self.streamData]; nsb: XNSBuffer; code: CompletionCode; DO [nsb, code] _ EntrySendAttention[handle, attentionType]; IF code = normal THEN EXIT; IF nsb # NIL THEN ERROR; -- can't happen SIGNAL Timeout[]; ENDLOOP; IF nsb # NIL THEN XNSSocket.Put[nsb]; }; EntrySendAttention: ENTRY PROC [handle: Handle, attentionType: AttentionType] RETURNS [nsb: XNSBuffer, code: CompletionCode] ~ { ENABLE UNWIND => NULL; finger: Finger; startTime: Pulses _ Now[]; attnNum: CARD16; DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; IF handle.outputIBAttnCnt >= 3 THEN { -- How many really ???? IF IsTimedOut[handle.putPulseOut, startTime] THEN RETURN [NIL, timedOut]; WAIT handle.outputSpace; LOOP }; finger _ handle.outputEnqueue; SELECT finger.state FROM filling => { WAIT handle.doneFilling; LOOP }; halfFull => { OutputEnqueue[handle~handle, endOfMessage~FALSE]; EXIT }; ENDCASE => EXIT; ENDLOOP; WHILE # handle.outputEnqueue DO finger _ ENDLOOP; _ NEW [FingerObject _ [next~handle.outputEnqueue, attention~TRUE]]; handle.outputEnqueue _ finger _; IF handle.outputSendNum = handle.outputEnqueueNum THEN handle.outputSend _ handle.outputEnqueue; IF handle.outputDequeueNum = handle.outputEnqueueNum THEN handle.outputDequeue _ handle.outputEnqueue; handle.outputIBAttnCnt _ handle.outputIBAttnCnt.SUCC; attnNum _ handle.outputEnqueueNum; OutputMakeNotEmpty[handle]; TRUSTED { finger.buffer.body.bytes[0] _ LOOPHOLE[attentionType] }; finger.bytes _ 1; OutputEnqueue[handle~handle, endOfMessage~FALSE]; IF SignedDiff[handle.recvdAllocNum, attnNum] < 0 THEN { nsb: XNSBuffer; TRUSTED { nsb _ MakeCopyOfXNSBuffer[handle.socket, LOOPHOLE[finger.buffer, XNSBuffer]]; ReadyOutputBuffer[handle~handle, buffer~LOOPHOLE[nsb, SPPBuffer], requestAck~FALSE] }; RETURN [nsb, normal]; }; RETURN [NIL, normal]; }; WaitAttention: PUBLIC PROC [self: STREAM, waitTimeout: Milliseconds _ XNSStream.waitForever] RETURNS [type: AttentionType] ~ { handle: Handle = NARROW[self.streamData]; code: CompletionCode; DO [type, code] _ EntryWaitAttention[handle, MsecToPulses[waitTimeout]]; IF code = normal THEN RETURN; SIGNAL Timeout[]; ENDLOOP; }; EntryWaitAttention: ENTRY PROC [handle: Handle, pulseOut: Pulses] RETURNS [AttentionType, CompletionCode] ~ { ENABLE UNWIND => NULL; startTime: Pulses _ Now[]; p: OBAttn _ NIL; DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; IF (p _ handle.recvdOBAttnList) # NIL THEN { handle.recvdOBAttnList _; RETURN [p.attentionType, normal] }; IF IsTimedOut[pulseOut, startTime] THEN RETURN[0, timedOut]; TRUSTED { IF pulseOut < Pulses.LAST THEN Process.SetTimeout[@handle.recvdOBAttn, TimeoutTicksFromPulses[pulseOut]] ELSE Process.DisableTimeout[@handle.recvdOBAttn]; }; WAIT handle.recvdOBAttn; ENDLOOP; }; GetStatus: PUBLIC PROC [self: STREAM, reset: BOOL] RETURNS [state: XNSStream.State, ssType: SubSequenceType, attentionType: AttentionType] ~ { handle: Handle = NARROW[self.streamData]; [state, ssType, attentionType] _ EntryGetStatus[handle, reset]; }; EntryGetStatus: ENTRY PROC [handle: Handle, reset: BOOL] RETURNS [state: XNSStream.State, ssType: SubSequenceType, attentionType: AttentionType] ~ { ENABLE UNWIND => NULL; finger: Finger; buffer: SPPBuffer; DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; finger _ handle.inputDequeue; SELECT finger.state FROM full, halfEmpty => { finger.state _ halfEmpty; buffer _ finger.buffer; IF finger.attention THEN { state _ attention; ssType _ handle.inputSSType; TRUSTED { attentionType _ LOOPHOLE[finger.buffer.body.bytes[0]] }; IF reset THEN { p: OBAttn ~ handle.recvdOBAttnList; IF (p # NIL) AND (p.seqNum = handle.inputDequeueNum) THEN handle.recvdOBAttnList _; InputDequeue[handle] }; RETURN }; IF buffer.hdr2.sst # handle.inputSSType THEN { IF reset THEN handle.inputSSType _ buffer.hdr2.sst; RETURN [state~ssTypeChange, ssType~buffer.hdr2.sst, attentionType~0] }; IF finger.index < finger.bytes THEN RETURN [state~open, ssType~handle.inputSSType, attentionType~0]; IF buffer.hdr2.connCtl.endOfMsg THEN { IF reset THEN InputDequeue[handle]; RETURN [state~endOfMessage, ssType~handle.inputSSType, attentionType~0] }; InputDequeue[handle]; LOOP }; emptying => { WAIT handle.doneEmptying; LOOP }; empty => { RETURN[state~open, ssType~handle.inputSSType, attentionType~0] }; ENDCASE => ERROR; ENDLOOP; }; FlushInput: PUBLIC PROC [self: IO.STREAM, wait: BOOL _ FALSE] RETURNS [bytesSkipped: LONG CARDINAL _ 0] ~ { handle: Handle = NARROW[self.streamData]; code: CompletionCode; DO [bytesSkipped, code] _ EntryFlushInput[handle, wait, bytesSkipped]; IF code = normal THEN RETURN; SIGNAL Timeout[]; ENDLOOP; }; EntryFlushInput: ENTRY PROC [handle: Handle, wait: BOOL _ FALSE, bytesSkipped: LONG CARDINAL] RETURNS [newSkipped: LONG CARDINAL, code: CompletionCode] ~ { ENABLE UNWIND => NULL; startTime: Pulses _ Now[]; finger: Finger; buffer: SPPBuffer; DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; finger _ handle.inputDequeue; SELECT finger.state FROM full, halfEmpty => { finger.state _ halfEmpty; IF finger.attention THEN RETURN [bytesSkipped, normal]; buffer _ finger.buffer; IF buffer.hdr2.sst # handle.inputSSType THEN RETURN [bytesSkipped, normal]; IF finger.index < finger.bytes THEN { bytesSkipped _ bytesSkipped + (finger.bytes - finger.index); finger.index _ finger.bytes }; IF buffer.hdr2.connCtl.endOfMsg THEN RETURN [bytesSkipped, normal]; InputDequeue[handle]; LOOP }; emptying => { WAIT handle.doneEmptying; LOOP }; empty => { IF NOT wait THEN RETURN [bytesSkipped, normal]; IF IsTimedOut[handle.getPulseOut, startTime] THEN RETURN [bytesSkipped, timedOut]; WAIT handle.inputReady; LOOP }; ENDCASE => ERROR; ENDLOOP; }; SendNow: PUBLIC PROC [self: IO.STREAM] ~ { handle: Handle = NARROW[self.streamData]; EntrySendNow[handle]; }; EntrySendNow: ENTRY PROC [handle: Handle] ~ { finger: Finger; DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; finger _ handle.outputEnqueue; SELECT finger.state FROM filling => { WAIT handle.doneFilling; LOOP }; halfFull => { OutputEnqueue[handle~handle, endOfMessage~FALSE]; EXIT }; ENDCASE => EXIT; ENDLOOP; }; SendClose: PUBLIC PROC [self: IO.STREAM] RETURNS [ok: BOOL _ FALSE] ~ { ENABLE ConnectionClosed => CONTINUE; handle: Handle = NARROW[self.streamData]; sst: SubSequenceType; code: CompletionCode; code _ EntrySetSSType[handle~handle, ssType~XNSSPPTypes.endSST, sendNow~TRUE]; IF code # normal THEN RETURN; DO [code~code] _ EntryFlushInput[handle~handle, wait~TRUE, bytesSkipped~0]; IF code # normal THEN RETURN; [ssType~sst] _ EntryGetStatus[handle~handle, reset~TRUE]; IF (sst = XNSSPPTypes.endReplySST) OR (sst = XNSSPPTypes.endSST) THEN { [] _ EntrySetSSType[handle~handle, ssType~XNSSPPTypes.endReplySST, sendNow~TRUE]; RETURN [TRUE] }; ENDLOOP; }; SendCloseReply: PUBLIC PROC [self: IO.STREAM] RETURNS [ok: BOOL _ FALSE] ~ { ENABLE ConnectionClosed => CONTINUE; handle: Handle = NARROW[self.streamData]; sst: SubSequenceType; code: CompletionCode; code _ EntrySetSSType[handle~handle, ssType~XNSSPPTypes.endReplySST, sendNow~TRUE]; IF code # normal THEN RETURN; DO [code~code] _ EntryFlushInput[handle~handle, wait~TRUE, bytesSkipped~0]; IF code # normal THEN RETURN; [ssType~sst] _ EntryGetStatus[handle~handle, reset~TRUE]; IF (sst = XNSSPPTypes.endReplySST) THEN RETURN [TRUE]; ENDLOOP; }; GetChar: PROC [self: STREAM] RETURNS [char: CHAR] ~ { handle: Handle ~ NARROW[self.streamData]; code: CompletionCode; DO [char, code] _ EntryGetChar[handle, self]; IF code = normal THEN RETURN; SIGNAL Timeout[]; ENDLOOP; }; EntryGetChar: ENTRY PROC [handle: Handle, self: STREAM] RETURNS [char: CHAR, code: CompletionCode] ~ { ENABLE UNWIND => NULL; finger: Finger; buffer: SPPBuffer; startTime: Pulses _ Now[]; DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; finger _ handle.inputDequeue; SELECT finger.state FROM full, halfEmpty => { finger.state _ halfEmpty; buffer _ finger.buffer; IF finger.attention OR (buffer.hdr2.sst # handle.inputSSType) THEN RETURN WITH ERROR IO.EndOfStream[self]; IF finger.index >= finger.bytes THEN { IF buffer.hdr2.connCtl.endOfMsg THEN RETURN WITH ERROR IO.EndOfStream[self]; InputDequeue[handle]; LOOP }; TRUSTED { char _ buffer.body.chars[finger.index] }; finger.index _ finger.index + 1; RETURN [char, normal]; }; emptying => { WAIT handle.doneEmptying; LOOP }; empty => { IF IsTimedOut[handle.getPulseOut, startTime] THEN RETURN [char~, code~timedOut]; WAIT handle.inputReady; LOOP }; ENDCASE => ERROR; ENDLOOP; }; GetBlock: PROC [self: STREAM, block: REF TEXT, startIndex: NAT, count: NAT] RETURNS [nBytesRead: NAT _ 0] ~ { handle: Handle ~ NARROW[self.streamData]; finger: Finger _ NIL; base: LONG POINTER ~ LOOPHOLE[block, LONG POINTER]+SIZE[TEXT[0]]; -- WORD SIZE DEPENDENT ???? stop: NAT ~ MIN[(startIndex+count), block.maxLength]; nBytes: NAT; code: CompletionCode; WHILE startIndex < stop DO [finger, code] _ GetBlockNext[self, handle, finger]; SELECT code FROM timedOut => { SIGNAL Timeout[]; LOOP }; normal => IF finger = NIL THEN EXIT; ENDCASE => ERROR; TRUSTED { nBytes _ PrincOpsUtils.ByteBlt[ to~[ blockPointer~base, startIndex~startIndex, stopIndexPlusOne~stop], from~[ blockPointer~@finger.buffer.body, -- WORD SIZE DEPENDENT ???? startIndex~finger.index, stopIndexPlusOne~finger.bytes] ]; }; finger.index _ finger.index + nBytes; nBytesRead _ nBytesRead + nBytes; startIndex _ startIndex + nBytes; block.length _ startIndex; ENDLOOP; IF finger # NIL THEN GetBlockLast[handle, finger]; }; UnsafeGetBlock: UNSAFE PROC [self: STREAM, block: IO.UnsafeBlock] RETURNS [nBytesRead: INT _ 0] ~ { handle: Handle ~ NARROW[self.streamData]; finger: Finger _ NIL; startIndex: INT _ block.startIndex; stop: INT ~ block.startIndex+block.count; nBytes: NAT; code: CompletionCode; WHILE startIndex < stop DO [finger, code] _ GetBlockNext[self, handle, finger]; SELECT code FROM timedOut => { SIGNAL Timeout[]; LOOP }; normal => IF finger = NIL THEN EXIT; ENDCASE => ERROR; TRUSTED { nBytes _ PrincOpsUtils.ByteBlt[ to~[ blockPointer~block.base, startIndex~startIndex, stopIndexPlusOne~stop], from~[ blockPointer~@finger.buffer.body, -- WORD SIZE DEPENDENT ???? startIndex~finger.index, stopIndexPlusOne~finger.bytes] ]; }; finger.index _ finger.index + nBytes; nBytesRead _ nBytesRead + nBytes; startIndex _ startIndex + nBytes; ENDLOOP; IF finger # NIL THEN GetBlockLast[handle, finger]; }; GetBlockNext: ENTRY PROC [self: STREAM, handle: Handle, oldFinger: Finger] RETURNS [Finger, CompletionCode] ~ { ENABLE UNWIND => NULL; finger: Finger _ NIL; buffer: SPPBuffer; startTime: Pulses _ Now[]; IF oldFinger # NIL THEN { oldFinger.state _ halfEmpty; BROADCAST handle.doneEmptying; }; DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; finger _ handle.inputDequeue; buffer _ finger.buffer; SELECT finger.state FROM full, halfEmpty => { IF finger.attention OR (buffer.hdr2.sst # handle.inputSSType) THEN RETURN[NIL, normal]; IF finger.index >= finger.bytes THEN { IF buffer.hdr2.connCtl.endOfMsg THEN RETURN[NIL, normal]; InputDequeue[handle]; LOOP }; finger.state _ emptying; RETURN[finger, normal] }; emptying => { WAIT handle.doneEmptying; LOOP }; empty => { IF IsTimedOut[handle.getPulseOut, startTime] THEN RETURN [NIL, timedOut]; WAIT handle.inputReady; LOOP }; ENDCASE => ERROR; ENDLOOP; }; GetBlockLast: ENTRY PROC [handle: Handle, oldFinger: Finger] ~ { IF oldFinger = NIL THEN ERROR; -- can't happen oldFinger.state _ halfEmpty; BROADCAST handle.doneEmptying; }; PutChar: PROC [self: STREAM, char: CHAR] ~ { handle: Handle ~ NARROW[self.streamData]; code: CompletionCode; DO code _ EntryPutChar[handle, char]; IF code = normal THEN RETURN; SIGNAL Timeout[]; ENDLOOP; }; EntryPutChar: ENTRY PROC [handle: Handle, char: CHAR] RETURNS [CompletionCode] ~ { ENABLE UNWIND => NULL; finger: Finger; startTime: Pulses _ Now[]; DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; finger _ handle.outputEnqueue; SELECT finger.state FROM empty => OutputMakeNotEmpty[handle]; filling => { WAIT handle.doneFilling; LOOP }; halfFull => NULL; full, sending => { IF IsTimedOut[handle.putPulseOut, startTime] THEN RETURN [timedOut]; WAIT handle.outputSpace; LOOP }; ENDCASE => ERROR; IF finger.bytes = LAST[BufIndex] THEN { OutputEnqueue[handle~handle, endOfMessage~FALSE]; LOOP }; TRUSTED { finger.buffer.body.chars[finger.bytes] _ char }; finger.bytes _ finger.bytes + 1; RETURN [normal]; ENDLOOP; }; PutBlock: PROC [self: STREAM, block: REF READONLY TEXT, startIndex: NAT, count: NAT] ~ { handle: Handle ~ NARROW[self.streamData]; finger: Finger _ NIL; base: LONG POINTER ~ LOOPHOLE[block, LONG POINTER]+SIZE[TEXT[0]]; -- WORD SIZE DEPENDENT ???? stop: NAT ~ MIN[(startIndex+count), block.length]; nBytes: NAT; code: CompletionCode; WHILE startIndex < stop DO [finger, code] _ PutBlockNext[self, handle, finger]; SELECT code FROM timedOut => { SIGNAL Timeout[]; LOOP }; normal => NULL; ENDCASE => ERROR; IF finger = NIL THEN ERROR; -- can't happen TRUSTED { nBytes _ PrincOpsUtils.ByteBlt[ to~[ blockPointer~@finger.buffer.body, -- WORD SIZE DEPENDENT ???? startIndex~finger.bytes, stopIndexPlusOne~BufIndex.LAST], from~[ blockPointer~base, startIndex~startIndex, stopIndexPlusOne~stop] ]; }; finger.bytes _ finger.bytes + nBytes; startIndex _ startIndex + nBytes; ENDLOOP; IF finger # NIL THEN PutBlockLast[handle, finger]; }; UnsafePutBlock: PROC [self: STREAM, block: IO.UnsafeBlock] ~ { handle: Handle ~ NARROW[self.streamData]; finger: Finger _ NIL; startIndex: INT _ block.startIndex; stop: INT ~ block.startIndex+block.count; nBytes: NAT; code: CompletionCode; WHILE startIndex < stop DO [finger, code] _ PutBlockNext[self, handle, finger]; SELECT code FROM timedOut => { SIGNAL Timeout[]; LOOP }; normal => NULL; ENDCASE => ERROR; IF finger = NIL THEN ERROR; -- can't happen TRUSTED { nBytes _ PrincOpsUtils.ByteBlt[ to~[ blockPointer~@finger.buffer.body, -- WORD SIZE DEPENDENT ???? startIndex~finger.bytes, stopIndexPlusOne~BufIndex.LAST], from~[ blockPointer~block.base, startIndex~startIndex, stopIndexPlusOne~stop] ]; }; finger.bytes _ finger.bytes + nBytes; startIndex _ startIndex + nBytes; ENDLOOP; IF finger # NIL THEN PutBlockLast[handle, finger]; }; PutBlockNext: ENTRY PROC [self: STREAM, handle: Handle, oldFinger: Finger] RETURNS [Finger, CompletionCode] ~ { ENABLE UNWIND => NULL; finger: Finger _ NIL; startTime: Pulses _ Now[]; IF oldFinger # NIL THEN { oldFinger.state _ halfFull; BROADCAST handle.doneFilling; OutputEnqueue[handle~handle, endOfMessage~FALSE] }; DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; finger _ handle.outputEnqueue; SELECT finger.state FROM empty => OutputMakeNotEmpty[handle]; filling => { WAIT handle.doneFilling; LOOP }; halfFull => NULL; full, sending => { IF IsTimedOut[handle.putPulseOut, startTime] THEN RETURN [NIL, timedOut]; WAIT handle.outputSpace; LOOP }; ENDCASE => ERROR; IF finger.bytes = LAST[BufIndex] THEN { OutputEnqueue[handle~handle, endOfMessage~FALSE]; LOOP }; finger.state _ filling; RETURN[finger, normal]; ENDLOOP; }; PutBlockLast: ENTRY PROC [handle: Handle, finger: Finger] ~ { ENABLE UNWIND => NULL; finger.state _ halfFull; BROADCAST handle.doneFilling; }; EndOf: PROC [self: STREAM] RETURNS [BOOL] ~ { handle: Handle ~ NARROW[self.streamData]; RETURN [EntryEndOf[handle]]; }; EntryEndOf: ENTRY PROC [handle: Handle] RETURNS [BOOL] ~ { ENABLE UNWIND => NULL; finger: Finger; DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; finger _ handle.inputDequeue; SELECT finger.state FROM full => { finger.state _ halfEmpty; LOOP }; emptying => { WAIT handle.doneEmptying; LOOP }; halfEmpty => RETURN [ finger.attention OR (finger.buffer.hdr2.sst # handle.inputSSType) OR ((finger.index = finger.bytes) AND finger.buffer.hdr2.connCtl.endOfMsg) ]; empty => RETURN [ FALSE ]; ENDCASE => ERROR; ENDLOOP; }; CharsAvail: PROC [self: STREAM, wait: BOOL] RETURNS [nChars: INT] ~ { handle: Handle ~ NARROW[self.streamData]; code: CompletionCode; DO [nChars, code] _ EntryCharsAvail[handle, wait]; IF code = normal THEN RETURN; SIGNAL Timeout[]; ENDLOOP; }; EntryCharsAvail: ENTRY PROC [handle: Handle, wait: BOOL] RETURNS [INT, CompletionCode] ~ { ENABLE UNWIND => NULL; finger: Finger; startTime: Pulses _ Now[]; DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; finger _ handle.inputDequeue; SELECT finger.state FROM full, halfEmpty => { finger.state _ halfEmpty; IF finger.attention OR (finger.buffer.hdr2.sst # handle.inputSSType) THEN RETURN[0, normal]; IF finger.index < finger.bytes THEN RETURN [(finger.bytes - finger.index), normal]; IF finger.buffer.hdr2.connCtl.endOfMsg THEN RETURN[0, normal]; InputDequeue[handle]; LOOP }; emptying => { WAIT handle.doneEmptying; LOOP }; empty => { IF NOT wait THEN RETURN [0, normal]; IF IsTimedOut[handle.getPulseOut, startTime] THEN RETURN [0, timedOut]; WAIT handle.inputReady; LOOP }; ENDCASE => ERROR; -- Can't happen. ENDLOOP; }; Flush: PROC [self: STREAM] ~ { handle: Handle ~ NARROW[self.streamData]; code: CompletionCode; DO code _ EntryFlush[handle]; IF code = normal THEN RETURN; SIGNAL Timeout[]; ENDLOOP; }; EntryFlush: ENTRY PROC [handle: Handle] RETURNS [CompletionCode] ~ { ENABLE UNWIND => NULL; flushNum: CARD16; startTime: Pulses _ Now[]; DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; SELECT handle.outputEnqueue.state FROM filling => { WAIT handle.doneFilling; LOOP }; halfFull => { OutputEnqueue[handle~handle, endOfMessage~FALSE]; EXIT }; empty, full, sending => EXIT; ENDCASE => ERROR; ENDLOOP; IF handle.outputDequeueNum = handle.outputEnqueueNum THEN RETURN [normal]; IF handle.outputSendNum = handle.outputEnqueueNum THEN { DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; SELECT handle.outputEnqueue.state FROM empty => { OutputMakeNotEmpty[handle]; EXIT }; filling => { WAIT handle.doneFilling; LOOP }; halfFull => EXIT; full, sending => { IF IsTimedOut[handle.putPulseOut, startTime] THEN RETURN [timedOut]; WAIT handle.outputSpace; LOOP }; ENDCASE => ERROR; ENDLOOP; OutputEnqueue[handle~handle, endOfMessage~FALSE]; }; flushNum _ handle.outputEnqueueNum; handle.flusherCnt _ handle.flusherCnt + 1; DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; IF SignedDiff[handle.outputDequeueNum, flushNum] >= 0 THEN EXIT; IF IsTimedOut[handle.putPulseOut, startTime] THEN RETURN [timedOut]; WAIT handle.flusherWakeup; ENDLOOP; handle.flusherCnt _ handle.flusherCnt - 1; RETURN [normal]; }; Close: PROC [self: STREAM, abort: BOOL] = { handle: Handle ~ NARROW[self.streamData]; EntryNotifyClosed[handle, localClose, "close called"]; FinishHandle[handle]; }; nSent: INT _ 0; EntryPush: ENTRY PROC [handle: Handle, sentFinger: Finger] RETURNS [sendFinger: Finger _ NIL, sendBuffer: XNSBuffer _ NIL] ~ { ENABLE UNWIND => NULL; IF sentFinger # NIL THEN { sentFinger.state _ full; BROADCAST handle.doneSending }; DO IF handle.closed THEN RETURN; IF handle.outputSendNum = handle.outputEnqueueNum THEN { WAIT handle.outputReady; LOOP }; IF SignedDiff[handle.recvdAllocNum, handle.outputSendNum] < 0 THEN { WAIT handle.recvdNewAlloc; LOOP }; { halfMyBuffers, windowLeft, nSentNoAckReq: INTEGER; sendAckReq: BOOL; halfMyBuffers _ INTEGER[handle.outputBuffersAllocated / 2]; windowLeft _ SignedDiff[handle.recvdAllocNum, handle.outputSendNum]; nSentNoAckReq _ SignedDiff[handle.outputSendNum, handle.expectedAckNum]; sendAckReq _ ((windowLeft = halfMyBuffers) OR (windowLeft = 0) OR (nSentNoAckReq >= halfMyBuffers) OR (handle.flusherCnt > 0)); sendFinger _ handle.outputSend; ReadyOutputBuffer[handle~handle, buffer~sendFinger.buffer, requestAck~sendAckReq]; sendFinger.state _ sending; TRUSTED { sendBuffer _ LOOPHOLE[sendFinger.buffer] }; handle.outputSendNum _ handle.outputSendNum + 1; handle.outputSend _; IF sendAckReq THEN BROADCAST handle.mgrLongWakeup; RETURN; }; ENDLOOP; }; Push: PROC [handle: Handle] ~ { sendBuffer: XNSBuffer _ NIL; sendFinger: Finger _ NIL; Process.SetPriority[Process.priorityForeground]; DO [sendFinger, sendBuffer] _ EntryPush[handle, sendFinger]; IF sendFinger = NIL THEN EXIT; CheckUserBytes[sendBuffer, XNSSPPBuf.hdrBytes + sendFinger.bytes]; -- DEBUG XNSSocketBackdoor.PutCached[sendBuffer]; ENDLOOP; IF sendBuffer # NIL THEN ERROR; -- Can't happen ???? NotifyAuxProcDone[handle]; }; nResent: INT _ 0; MaybeFlushCache: INTERNAL PROC [handle: Handle, pulseOut: Pulses] ~ { IF PulsesSince[handle.cacheFlushedTime] > pulseOut THEN { handle.cacheFlushedTime _ Now[]; XNSSocketBackdoor.FlushCache[handle.socket] }; }; EntryMgr: ENTRY PROC [handle: Handle, sentFinger: Finger] RETURNS [sendFinger: Finger _ NIL, sendBuffer: XNSBuffer _ NIL] ~ { ENABLE UNWIND => NULL; ackReqOutstanding, allocReqOutstanding, timedOutAck, timedOutProbe: BOOL; b: SPPBuffer _ NIL; IF sentFinger # NIL THEN { sentFinger.state _ full; BROADCAST handle.doneSending }; DO IF PulsesSince[handle.recvdTime] >= handle.noActivityPulseOut THEN NotifyClosed[handle, unknown, "no activity timeout"]; IF handle.closed THEN RETURN; MaybeFlushCache[handle, handle.cacheFlushPulseOut]; ackReqOutstanding _ ( (handle.outputDequeueNum # handle.outputSendNum) AND (SignedDiff[handle.expectedAckNum, handle.outputDequeueNum] > 0)); allocReqOutstanding _ ( (handle.outputSendNum # handle.outputEnqueueNum) AND (SignedDiff[handle.outputSendNum, handle.recvdAllocNum] > 0)); timedOutAck _ ( PulsesSince[handle.sentAckReqTime] > handle.waitForAckPulseOut); timedOutProbe _ ( (PulsesSince[handle.recvdTime] > handle.probePulseOut) AND (PulsesSince[handle.sentAckReqTime] > handle.probePulseOut)); SELECT TRUE FROM (ackReqOutstanding AND timedOutAck) => { SELECT handle.outputDequeue.state FROM full => NULL; sending => { WAIT handle.doneSending; LOOP }; ENDCASE => ERROR; MaybeFlushCache[handle, 3*handle.waitForAckPulseOut]; -- What's the right timeout to use here ???? nResent _ nResent.SUCC; sendFinger _ handle.outputDequeue; sendFinger.state _ sending; b _ sendFinger.buffer; TRUSTED { sendBuffer _ LOOPHOLE[b] }; ReadyOutputBuffer[handle, b, TRUE]; RETURN }; (timedOutProbe OR (allocReqOutstanding AND timedOutAck)) => { [nsb~sendBuffer, b~b] _ AllocateOutputBuffer[handle]; InitSystemOutputBuffer[handle, b]; ReadyOutputBuffer[handle, b, TRUE]; RETURN }; handle.mustSendAck => { [nsb~sendBuffer, b~b] _ AllocateOutputBuffer[handle]; InitSystemOutputBuffer[handle, b]; ReadyOutputBuffer[handle, b, FALSE]; RETURN }; (ackReqOutstanding OR allocReqOutstanding) => { WAIT handle.mgrShortWakeup; LOOP }; ENDCASE => { WAIT handle.mgrLongWakeup; LOOP }; ENDLOOP; }; Mgr: PROC [handle: Handle] ~ { sendBuffer: XNSBuffer _ NIL; sendFinger: Finger _ NIL; Process.SetPriority[Process.priorityForeground]; DO [sendFinger, sendBuffer] _ EntryMgr[handle, sendFinger]; SELECT TRUE FROM (sendFinger # NIL) => { CheckUserBytes[sendBuffer, XNSSPPBuf.hdrBytes + sendFinger.bytes]; -- DEBUG XNSSocketBackdoor.PutCached[sendBuffer]; }; (sendBuffer # NIL) => { CheckUserBytes[sendBuffer, XNSSPPBuf.hdrBytes]; -- DEBUG XNSSocketBackdoor.PutCached[sendBuffer]; XNSSocket.FreeBuffer[sendBuffer]; }; ENDCASE => EXIT; ENDLOOP; NotifyAuxProcDone[handle]; }; nRecvd: INT _ 0; nTooShort: INT _ 0; nBadConnID: INT _ 0; nBadRemoteConnID: INT _ 0; GotNewAck: INTERNAL PROC [handle: Handle, newAckNum: CARD16] ~ { finger: Finger; IF SignedDiff[newAckNum, handle.outputSendNum] > 0 THEN { RETURN }; WHILE SignedDiff[handle.outputDequeueNum, newAckNum] < 0 DO IF handle.closed THEN RETURN; finger _ handle.outputDequeue; SELECT finger.state FROM full => NULL; sending => { -- extremely unlikely; occurs only when resending. WAIT handle.doneSending; LOOP }; ENDCASE => ERROR; IF finger.attention THEN { temp: Finger; nsb: XNSBuffer; FOR temp _ finger, WHILE # finger DO NULL ENDLOOP; _; IF finger.buffer = NIL THEN ERROR; -- Can't happen ???? TRUSTED { nsb _ LOOPHOLE[finger.buffer] }; XNSSocket.FreeBuffer[nsb]; finger.buffer _ NIL; handle.outputIBAttnCnt _ handle.outputIBAttnCnt.PRED; } ELSE { finger.state _ empty; BROADCAST handle.outputSpace; }; handle.outputDequeueNum _ handle.outputDequeueNum + 1; handle.outputDequeue _; ENDLOOP; IF handle.flusherCnt > 0 THEN BROADCAST handle.flusherWakeup; }; GotAckReq: INTERNAL PROC [handle: Handle, seqNum: CARD16, systemPacket: BOOL] ~ -- INLINE -- { IF systemPacket OR (SignedDiff[seqNum, handle.inputEnqueueNum] < 0) THEN { handle.mustSendAck _ TRUE; BROADCAST handle.mgrShortWakeup; BROADCAST handle.mgrLongWakeup } ELSE { handle.heWantsAlloc _ TRUE }; }; GotNewAlloc: INTERNAL PROC [handle: Handle, newAllocNum: CARD16] ~ -- INLINE -- { handle.recvdAllocNum _ newAllocNum; BROADCAST handle.recvdNewAlloc; }; GotOBAttn: INTERNAL PROC [handle: Handle, seqNum: CARD16, type: AttentionType] ~ { p, prev, new: OBAttn; seqDiff: INTEGER ~ SignedDiff[seqNum, handle.inputDequeueNum]; IF (seqDiff < 0) OR (seqDiff > 50) THEN RETURN; -- what's to use instead of 50 ???? prev _ NIL; p _ handle.recvdOBAttnList; DO IF p = NIL THEN EXIT; IF p.seqNum = seqNum THEN RETURN; IF SignedDiff[p.seqNum, seqNum] > 0 THEN EXIT; prev _ p; p _; ENDLOOP; new _ NEW[OBAttnObject_[next~p, seqNum~seqNum, attentionType~type]]; IF prev = NIL THEN handle.recvdOBAttnList _ new ELSE _ new; BROADCAST handle.recvdOBAttn; }; GotData: INTERNAL PROC [handle: Handle, newSeqNum: CARD16, b: SPPBuffer, bytes: NAT] RETURNS [swapb: SPPBuffer] ~ -- INLINE -- { finger: Finger; IF SignedDiff[newSeqNum, handle.inputAckNum] < 0 THEN { RETURN[b] }; IF SignedDiff[newSeqNum, handle.inputDequeueNum] >= INT[handle.inputBuffersAllocated] THEN { RETURN[b] }; WHILE (SignedDiff[newSeqNum, handle.inputEnqueueNum] >= 0) DO handle.inputEnqueue _; handle.inputEnqueueNum _ handle.inputEnqueueNum + 1; ENDLOOP; finger _ handle.inputAck; THROUGH [0 .. SignedDiff[newSeqNum, handle.inputAckNum]) DO finger _; ENDLOOP; swapb _ finger.buffer; finger.buffer _ b; finger.bytes _ bytes; finger.index _ 0; finger.attention _ b.hdr2.connCtl.attn; finger.state _ full; WHILE (handle.inputAck.state = full) AND (handle.inputAckNum # handle.inputEnqueueNum) DO BROADCAST handle.inputReady; handle.inputAckNum _ handle.inputAckNum + 1; handle.inputAck _; ENDLOOP; }; Receive: XNSSocketBackdoor.ReceiveProc ~ { nRecvd _ nRecvd.SUCC; -- DEBUG RETURN [ EntryReceive[NARROW[clientData], b] ] }; EntryReceive: ENTRY PROC [handle: Handle, nsb: XNSBuf.Buffer] RETURNS [XNSBuf.Buffer] ~ { ENABLE UNWIND => NULL; bytes: NAT; IF handle.closed THEN RETURN [nsb]; bytes _ XNSSocket.GetUserBytes[nsb]; SELECT nsb.hdr1.type FROM error => { OPEN XNSErrorTypes; eb: XNSErrorBuf.Buffer; TRUSTED { eb _ LOOPHOLE[nsb] }; SELECT eb.hdr2.type FROM noSocketErr, listenerRejectErr, protocolViolationErr => { NotifyClosed[handle, remoteClose, "remote close"] }; ENDCASE => { XNSSocketBackdoor.FlushCache[handle.socket] }; RETURN [nsb] }; spp => { b: XNSSPPBuf.Buffer; newSeqNum: CARD16; newAckNum: CARD16; newAllocNum: CARD16; TRUSTED { b _ LOOPHOLE[nsb] }; IF bytes < XNSSPPBuf.hdrBytes THEN { XNSSocket.ReturnError[nsb, XNSErrorTypes.unspecifiedErr]; nsb _ NIL; nTooShort _ nTooShort.SUCC; -- DEBUG RETURN [nsb] }; IF b.hdr2.destConnID # handle.connectionID THEN { IF b.hdr2.destConnID # unknownConnectionID THEN { XNSSocket.ReturnError[nsb, XNSErrorTypes.protocolViolationErr]; nsb _ NIL; nBadConnID _ nBadConnID.SUCC; -- DEBUG RETURN [nsb] }; }; IF b.hdr2.sourceConnID # handle.remoteConnectionID THEN { XNSSocket.ReturnError[nsb, XNSErrorTypes.protocolViolationErr]; nsb _ NIL; nBadRemoteConnID _ nBadRemoteConnID.SUCC; -- DEBUG RETURN [nsb] }; newSeqNum _ Basics.Card16FromH[b.hdr2.seqNum]; newAckNum _ Basics.Card16FromH[b.hdr2.ackNum]; newAllocNum _ Basics.Card16FromH[b.hdr2.allocNum]; handle.recvdTime _ Now[]; IF SignedDiff[newAllocNum, handle.recvdAllocNum] > 0 THEN GotNewAlloc[handle, newAllocNum]; IF SignedDiff[newAckNum, handle.outputDequeueNum] >= 0 THEN GotNewAck[handle, newAckNum]; IF b.hdr2.connCtl.sendAck THEN GotAckReq[handle, newSeqNum, b.hdr2.connCtl.system]; IF NOT b.hdr2.connCtl.system THEN { IF b.hdr2.connCtl.attn THEN TRUSTED { GotOBAttn[handle, newSeqNum, LOOPHOLE[b.body.bytes[0]] ] }; b _ GotData[handle, newSeqNum, b, (bytes - XNSSPPBuf.hdrBytes)]; TRUSTED { nsb _ LOOPHOLE[b] }; }; RETURN [nsb]; }; ENDCASE => { XNSSocket.ReturnError[nsb, XNSErrorTypes.invalidPacketTypeErr]; nsb _ NIL; RETURN [nsb]; }; }; Pull: PROC [handle: Handle] ~ { nsb: XNSBuf.Buffer _ NIL; XNSSocket.SetGetTimeout[handle.socket, XNSSocket.waitForever]; XNSSocketBackdoor.SetDirectReceive[handle.socket, Receive, handle]; WHILE NOT handle.closed DO IF (nsb _ XNSSocket.Get[handle.socket]) = NIL THEN LOOP; IF (nsb _ Receive[handle.socket, nsb, handle]) = NIL THEN LOOP; XNSSocket.FreeBuffer[nsb]; nsb _ NIL; ENDLOOP; XNSSocketBackdoor.SetDirectReceive[handle.socket, NIL, NIL]; NotifyAuxProcDone[handle]; }; globalLockHandle: Handle _ NEW[Object]; handleChain: Handle _ NIL; AddNewHandle: ENTRY PROC [handle: Handle _ globalLockHandle, newHandle: Handle] ~ { IF handle # globalLockHandle THEN ERROR; _ handleChain; handleChain _ newHandle; }; RemoveOldHandle: ENTRY PROC [handle: Handle _ globalLockHandle, oldHandle: Handle] ~ { IF handle # globalLockHandle THEN ERROR; IF handleChain = oldHandle THEN handleChain _ ELSE FOR h: Handle _ handleChain, DO IF = oldHandle THEN { _; EXIT }; ENDLOOP; -- NIL fault if not on chain _ NIL; -- Help Finalization }; FindExistingHandle: ENTRY PROC [handle: Handle _ globalLockHandle, remote: XNS.Address, remoteConnID: HWORD] RETURNS [existing: Handle] ~ { IF handle # globalLockHandle THEN ERROR; FOR existing _ handleChain, WHILE existing # NIL DO IF (existing.remote = remote) AND (existing.remoteConnectionID = remoteConnID) THEN EXIT; ENDLOOP; }; Rollback: Booting.RollbackProc = { FOR handle: Handle _ handleChain, WHILE handle # NIL DO IF NOT handle.closed THEN EntryNotifyClosed[handle, localClose, "Rollback"]; ENDLOOP; }; uniqueNumber: CARD16 _ Basics.LowHalf[Now[]]; GetUniqueConnID: ENTRY PROC [handle: Handle _ globalLockHandle] RETURNS [HWORD] ~ { h: Handle; newConnID: HWORD; IF handle # globalLockHandle THEN ERROR; DO uniqueNumber _ uniqueNumber + 1; newConnID _ Basics.HFromCard16[uniqueNumber]; FOR h _ handleChain, WHILE (h # NIL) AND (h.connectionID # newConnID) DO NULL ENDLOOP; IF h = NIL THEN RETURN [newConnID]; ENDLOOP; }; listenerChain: Listener _ NIL; AddNewListener: ENTRY PROC [handle: Handle _ globalLockHandle, newListener: Listener] ~ { _ listenerChain; listenerChain _ newListener }; RemoveOldListener: ENTRY PROC [handle: Handle _ globalLockHandle, oldListener: Listener] ~ { IF oldListener = listenerChain THEN listenerChain _ ELSE FOR p: Listener _ listenerChain, DO IF = oldListener THEN { _; EXIT } ENDLOOP; -- NIL fault if not on chain _ NIL; }; droppedStreams: INT _ 0; finalizedStreams: INT _ 0; streamFinalizerQueue: SafeStorage.FinalizationQueue _ SafeStorage.NewFQ[]; StreamFinalizer: PROC ~ { Process.SetPriority[Process.priorityBackground]; DO handle: Handle _ NARROW[SafeStorage.FQNext[streamFinalizerQueue]]; IF NOT handle.finished THEN { -- User forgot to call Destroy IF NOT handle.closed THEN ERROR; -- Can't happen FinishHandle[handle]; SafeStorage.EnableFinalization[handle]; droppedStreams _ droppedStreams.SUCC; } ELSE { -- Normal end of life RemoveOldHandle[oldHandle~handle]; finalizedStreams _ finalizedStreams.SUCC; }; handle _ NIL; ENDLOOP; }; droppedListeners: INT _ 0; finalizedListeners: INT _ 0; listenerFinalizerQueue: SafeStorage.FinalizationQueue _ SafeStorage.NewFQ[]; ListenerFinalizer: PROC ~ { Process.SetPriority[Process.priorityBackground]; DO listener: Listener _ NARROW[SafeStorage.FQNext[listenerFinalizerQueue]]; IF NOT listener.destroyed THEN { -- User forgot to call Destroy DestroyListener[listener]; SafeStorage.EnableFinalization[listener]; droppedListeners _ droppedListeners.SUCC; } ELSE { -- Normal end of life RemoveOldListener[oldListener~listener]; finalizedListeners _ finalizedListeners.SUCC; }; listener _ NIL; ENDLOOP; }; { established: BOOL; established _ TRUE; SafeStorage.EstablishFinalization[type~CODE[Object], npr~1, fq~streamFinalizerQueue ! SafeStorage.CantEstablishFinalization => { established _ FALSE; CONTINUE }]; IF NOT established THEN { established _ TRUE; SafeStorage.ReEstablishFinalization[type~CODE[Object], npr~1, fq~streamFinalizerQueue ! SafeStorage.CantEstablishFinalization => { established _ FALSE; CONTINUE }]; }; IF NOT established THEN ERROR }; TRUSTED { Process.Detach[FORK StreamFinalizer[]]; }; { established: BOOL; established _ TRUE; SafeStorage.EstablishFinalization[type~CODE[ListenerObject], npr~1, fq~listenerFinalizerQueue ! SafeStorage.CantEstablishFinalization => { established _ FALSE; CONTINUE }]; IF NOT established THEN { established _ TRUE; SafeStorage.ReEstablishFinalization[type~CODE[ListenerObject], npr~1, fq~listenerFinalizerQueue ! SafeStorage.CantEstablishFinalization => { established _ FALSE; CONTINUE }]; }; IF NOT established THEN ERROR }; TRUSTED { Process.Detach[FORK ListenerFinalizer[]]; }; Booting.RegisterProcs[r: Rollback]; }. tXNSStreamImpl.mesa Copyright c 1986 by Xerox Corporation. All rights reserved. Hal Murray, February 9, 1986 7:02:38 am PST Demers, January 8, 1987 0:37:38 am PST TODO: The mgrXXXXWakeup conditions don't need BROADCASTS. ??? Careful that state is never left xxxing after closed! ???? SOMEDAY experiment with making the LOCKS clause work directly on the IO.STREAM, thereby eliminating a level of procedure call from user processes. QUESTION: If there's a halfempty packet buffer AND there's allocation AND we want to send a system packet, does it make sense to send the data packet instead? Note the other end may respond differently to data vs. system packets with AckMe set. Probing strategy is wrong???? I think probing should expect a response on the same order of timeout as waitForAck; multiple probes should drive up the estimated round trip delay ???? ESTIMATE THE ROUND TRIP DELAY!  both running and initial based on hopcount. DEBUG HACKS that need to be removed eventually: CheckUserBytes the "these can't hurt" checks in ReadyOutputBuffer Temporary for debugging: Signals and Errors Byte Stream Interface Times The idea is to set the timeout of a condition variable so that the specified number of pulses (+ epsilon) will have elapsed when the condition times out. Epsilon here is 10%, which is pretty crude. I should do some experiments to see how much adjustment Cedar really requires here. Buffer Enqueue and Dequeue Utilities Allocate an SPP buffer for handle. Initialize type and connectionID fields. Make handle.outputEnqueue nonempty. Make sure the buffer is actually allocated and set its connCtl, sst and seqNum fields. PRECONDITION: handle.outputEnqueue.state = empty Initialize output buffer for system packet. Performs the same buffer initialization as OutputMakeNotEmpty above, and sets buffer length field. Enqueue the finger+buffer pointed to by handle.outputEnqueue; set its length field, set its state to full and NOTIFY that an output buffer is ready. PRECONDITION: handle.outputEnqueue.state = halfFull Fill in time-dependent fields of output buffer before sending it. Called before handle.outputSend pointer is advanced. Acknowledgement request ... Acknowledgement ... Allocation ... handle.heWantsAlloc _ FALSE; -- I THINK THIS IS BOGUS ???? Do we need sentAllocNum at all ???? THESE CAN'T HURT, BUT WHAT ARE THEY DOING HERE ???? I'll find out! buffer.hdr1.type _ spp; buffer.hdr2.sourceConnID _ handle.connectionID; buffer.hdr2.destConnID _ handle.remoteConnectionID; Check whether the other side has asked for allocation and a reasonable number of input buffers have become available. If so, make sure the other side gets the allocation by sending a system packet if necessary: He wants some allocation AND I have enough input buffers to give him a reasonable allocation (i.e., at most half my receive buffers are occupied) There's no data packet at all OR I'm waiting for allocation from him Creation TODO: None of this stuff is very good. I don't know any good retransmission heuristics, and I guess I've got to learn some. This should depend on hops: The buffer has to be copied so buffer accounting will work out right. It's only an RFC (short) so the overhead isn't something we need to worry about. Byte Stream Interface Note: After changing ssType it's necessary to have a halfFull buffer as handle.inputEnqueue^; this ensures that the sequence SetSSType[h, t1]; SetSSType[h, t2]; will work correctly. Note this should only time out if there are too many outstanding output in-band attentions queued. Allocate a new finger and buffer for the in-band attention (Note we need to do this before sending it out-of-band, to determine its sequence number). Fill the in-band attention buffer and enqueue it. If there's no allocation for the attention, make a copy of it to be send it out-of-band. Note: we test finger.attention before anything else. In effect, this means the sst of an attention packet is ignored. Stream Procs Get rid of partly filled buffer. Check for already acknowledged. Make sure there's at least one unsent buffer to get its ackMe bit set. Wait until everything is acknowledged. Output (Push) Processes If (sentFinger # NIL), we've just finished sending it. In that case, change its state from sending to full and BROADCAST handle.doneSending. Then wait for a data buffer to send and return it. Returned value of NIL indicates the handle is closed. Check for closed: Check for no buffer ready to send: Check for no allocation: All the checks succeeded, so we're now committed to sending the buffer: Retransmitter (Mgr) Process Die if no activity for a long time. When direct input is implemented, this might be the place to call get on handle.socket with a getTimeout of 0 (don't wait). NO! Don't do it holding the ML! ???? Periodically flush the routing info cache associated with the socket. I have unacknowledged packets -- THIS IS NOT NECESSARY ???? (next condition implies it) ???? AND I am expecting an acknowledgement I have unsent packets AND I have no allocation An acknowledgement is overdue. Retransmit the packet. No activity for a long time, or else I want allocation and it's overdue. In either case, send a probe. We've been asked to send an acknowledgement immediately. Send it. There's an outstanding request, so we should time out pretty soon. We don't expect anything to happen soon. Input (Pull) Process Release sent packets that have been acknowledged. NOTE: This messes with the OUTPUT queues from an INPUT process. Sanity check. This is a protocol error Discard the attention finger. Advance the finger pointer, make the data buffer available. TODO: Work on "heWantsAlloc" heuristics ???? Do I want to sent an ack immediately if he's used up his allocation? ???? PRECONDITION: NOT b.hdr2.connCtl.system Check for duplicate. Push inputEnqueue so there's room for new packet. The new buffer won't fit, so drop it. Set finger to the slot where the new packet should go. Put the packet there. If it's a duplicate we use the new one, but so what? Mark as acknowledged as many packets as possible and pass them to the client. [handle: XNSSocket.Handle, b: XNSBuf.Buffer, clientData: REF ANY] RETURNS [XNSBuf.Buffer] Note this sends error packets itself, Handle a data packet. Chain of Streams There are three times when we need to be able to locate all of the streams: 1) Smashing them after a rollback, 2) processing duplicate RFCs, and 3) making sure connectionIDs are unique. Unique Connection IDs Chain of Listeners Stream Finalization Making dropped streams get finalized is a bit tricky. In order to get the use count to drop to 0, Push, Pull and Mgr must notice when the stream dies and return. The current code won't do anything with dropped streams unless they get closed by the other end. Initialization appears in mainline code. Listener Finalization Initialization appears in mainline code. MainLine Code Initialization for stream finalizer Initialization for listener finalizer Initialization for rollback ΚC3˜codešœ™Kšœ Οmœ1™K™’K™υK™ΆK™L—™/K™K™2—K˜šΟk ˜ Kšœžœžœ ˜8Kšœ žœF˜UKšœ,˜,Kšžœžœ0žœ˜ZKšœžœ ˜Kšœžœ€˜±Kšœžœžœ˜Kšœ žœƒ˜”Kšžœžœ˜Kšœžœ ˜Kšœ žœ˜$Kšœ žœ ˜Kšœžœ’˜₯Kšœ žœ˜-Kšœ žœΛ˜ΪKšœžœ8˜OKšœ žœ˜,Kšœ žœ˜(Kšœ žœk˜zKšœžœ9žœh˜ΌK˜—š Πbl œžœžœžœžœ˜>Kšžœžœ`˜†Kšžœ ˜šœ˜Kšžœ˜K˜™šΟnœžœžœ˜=Kšžœ(žœžœ˜7——K˜—head™Kš  œžœžœ)žœžœ˜TKš œžœžœžœ˜K˜Kšœžœ˜,—™Kšžœžœžœžœ˜K˜šœ žœžœžœ˜8Kšœ!˜!K˜K˜K˜K˜ K˜K˜Kšœ˜K˜K˜ K˜——™Kšœžœ˜,Kšœ2˜2Kšœžœ˜ K˜KšœD˜DK˜š  œžœžœ ˜<šžœ˜Kšžœžœ žœ˜Kšžœžœ/˜:——š  œžœžœ žœ˜˜>Kšžœžœ ˜IKšœ˜—K˜š  œžœžœ žœ˜EK™”K™3K˜&Kšžœžœžœ‘˜7Kšœ3˜3Kšžœžœ6˜_K˜Kšžœžœžœ‘˜0K˜1K˜6Kšžœ˜K˜—K˜š œžœžœ1žœ˜ZK™AK™4Kšœžœ˜!™šžœ žœ˜Kšœ8˜8šžœžœžœ(˜OKšžœ%˜)—Kšœ'˜'Kšœžœ˜#Kšœ ˜ ——™Kšœ<˜žœžœ(žœ˜»Kšœ žœ ˜K˜Kšœ3˜3Kšœ˜K˜ Kšœ˜Kšœ#˜#Kšœ/˜/Kšœ˜Kšœ-˜-šœ9˜9K˜DKšœ.˜.—šœ9˜9K˜@Kšœ-˜-—šžœ˜ šœžœ˜"Kšœ&žœ˜+—šœžœ˜!Kšœ&žœ˜+—šœžœ˜"Kšœ&žœ˜+—šœžœ˜"Kšœ&žœ˜+—K˜—Kšœ˜Kšœ'˜'K˜K˜—š œžœžœ˜2Kšžœžœžœ˜$Kšœ&žœ˜+Kšžœžœžœ˜9K˜K˜—š œžœžœ˜1šžœž˜Kšžœ˜Kšžœ˜—K˜K˜—š  œžœ˜'Kšžœžœžœžœ˜ Kšžœžœžœ˜K˜Kšœ%˜%KšœBžœ˜FKšœ$˜$Kšœ>žœ˜BKšœ!˜!Kšœžœ˜—K˜š œžœ*žœ˜AK˜K˜ Kšœ$˜$Kšžœžœ˜K˜Kšœ&˜&Kšœžœ˜K˜Kšœ#˜#Kšœ(˜(K˜Kšœ˜Kšœ˜K˜0Kšœ˜K˜—K˜K˜5K˜š  œžœžœ žœ>žœžœ˜rKšœžœ˜Kšœžœ˜Kšœžœ˜K˜!Kšœžœ˜K˜Kšœ žœ˜*Kšœ žœ˜*K˜Kšžœ@žœžœ'˜s™K˜6—šž˜šžœžœ˜Kšžœžœžœ$žœ˜;Kšžœ žœžœ'žœ˜C—KšœJ‘˜YKšœ˜šžœ žœžœ ž˜"Kšžœ žœ+˜=Kšœ4˜4Kšœ˜Kšœ˜Kšžœžœžœžœ˜šžœž˜šœ˜K˜ Kšžœžœ˜šžœžœ˜$Kšœ?˜?Kšœžœ˜ Kšžœ˜—šžœ"žœ˜*Kšœ?˜?Kšœžœ˜ Kšžœ˜—Kšœ2˜2Kšœ¬˜¬Kšœ#˜#Kšžœžœžœ%žœ˜Kšœžœ˜)Kšœžœ˜Kšœ žœ˜#Kšœžœ ˜)Kšœžœ˜ K˜šžœž˜K˜4šžœž˜Kšœžœ žœ˜'Kšœ žœ˜Kšžœžœ˜—Kš žœ žœžœžœ‘˜+šžœ˜ šœ˜šœ˜Kšœ"‘˜=Kšœ˜Kšœžœ˜ —šœ˜Kšœ˜Kšœ˜Kšœ˜——K˜—K˜%Kšœ!˜!Kšžœ˜—Kšžœ žœžœ˜2K˜K˜—š   œžœžœžœ%žœ˜oKšžœžœžœ˜Kšœžœ˜K˜šžœ žœžœ˜Kšœ˜Kšž œ˜Kšœ*žœ˜3—šž˜šžœž˜Kšžœžœžœ8˜I—K˜šžœž˜Kšœ$˜$Kšœ žœžœ˜-Kšœ žœ˜šœ˜Kšžœ+žœžœžœ ˜IKšžœžœ˜ —Kšžœžœ˜—šžœžœ žœ˜'Kšœ*žœ˜1Kšžœ˜—K˜Kšžœ˜Kšžœ˜—K˜—K˜š  œžœžœ%˜=Kšžœžœžœ˜K˜Kšž œ˜K˜—K˜š  œžœžœžœžœ˜-Kšœžœ˜)Kšžœ˜K˜—K˜š   œžœžœžœžœ˜:Kšžœžœžœ˜K˜šž˜Kš žœžœžœžœžœ8˜_K˜šžœž˜Kšœ$žœ˜+Kšœžœžœ˜/šœ žœ˜Kšœžœ/žœ žœ%˜ŒK˜—Kšœ žœžœ˜Kšžœžœ˜—Kšžœ˜—K˜—K˜š   œžœžœžœžœ žœ˜EKšœžœ˜)K˜šž˜Kšœ/˜/Kšžœžœžœ˜Kšžœ ˜Kšžœ˜—K˜—K˜š  œžœžœžœžœžœ˜ZKšžœžœžœ˜K˜K˜šž˜šžœž˜Kšžœžœžœ8˜I—K˜šžœž˜šœ˜Kšœ˜šžœžœ.˜DKšžœžœ ˜—šžœ˜Kšžœžœ)˜4—Kšžœ%žœžœ ˜>Kšœ˜Kšžœ˜—šœ ˜ Kšžœ˜Kšžœ˜—šœ ˜ Kšžœžœžœžœ ˜$Kšžœ+žœžœ˜GKšžœ˜Kšžœ˜—Kšžœžœ‘˜"—Kšžœ˜—K˜—K˜š œžœžœ˜Kšœžœ˜)K˜šž˜K˜Kšžœžœžœ˜Kšžœ ˜Kšžœ˜—K˜—K˜š  œžœžœžœ˜DKšžœžœžœ˜Kšœ žœ˜K˜™ šž˜šžœž˜Jšžœžœžœ8˜I—šžœž˜&Jšœ žœžœ˜-Jšœ8žœžœ˜GJšœžœ˜Jšžœžœ˜—Jšžœ˜——™Kšžœ3žœžœ ˜J—™Fšžœ0žœ˜8šž˜šžœž˜Jšžœžœžœ8˜I—šžœž˜&Kšœ'žœ˜.Kšœ žœžœ˜-Kšœ žœ˜šœ˜Kšžœ+žœžœ ˜DKšžœžœ˜ —Kšžœžœ˜—Kšžœ˜—Kšœ*žœ˜1K˜——™&Kšœ#˜#K˜*šž˜šžœž˜Kšžœžœžœ8˜I—Kšžœ4žœžœ˜@Kšžœ+žœžœ ˜DKšžœ˜Kšžœ˜—Kšœ*˜*—Kšžœ ˜K˜—K˜š œžœžœ žœ˜+Kšœžœ˜)Kšœ6˜6K˜K˜——™Kšœžœ˜K˜š   œžœžœ&žœžœžœ˜~K™ŽK™2K™5Kšžœžœžœ˜šžœžœžœ˜Kšœ˜Kšž œ˜—šž˜™Kšžœžœžœ˜—™"šžœ0žœ˜8Kšžœ˜Kšžœ˜——™šžœ<žœ˜DKšžœ˜Kšžœ˜——™Gšœ,žœ˜4Kšœ žœ˜Kšœžœ$˜;KšœD˜DKšœH˜HKšœ+žœžœ"žœ˜Kšœ˜KšœR˜RKšœ˜Kšžœžœ˜5K˜0Kšœ$˜$Kšžœ žœž œ˜2Kšžœ˜K˜——Kšžœ˜—K˜—K˜š œžœ˜Kšœžœ˜Kšœžœ˜K˜Kšœ0˜0K˜šž˜Jšœ9˜9Kšžœžœžœžœ˜KšœC‘˜KKšœ(˜(Kšžœ˜—Kš žœžœžœžœ‘˜4J˜K˜——™Kšœ žœ˜K˜š œžœžœ'˜Ešžœ1žœ˜9Kšœ ˜ K˜.—K˜—K˜š  œžœžœ&žœžœžœ˜}Kšžœžœžœ˜KšœDžœ˜IKšœžœ˜K˜šžœžœž˜Kšœž œ˜:—K˜šž˜™#šžœ;˜=Kšžœ6˜:—Kšžœžœžœ˜—K˜K™‘K˜™EKšœ3˜3—K˜šœ˜™\Kšœ0˜0—šžœ"™%KšžœC˜F——šœ˜™K˜0—šžœ™Kšžœ?˜B——KšœP˜P˜Kšœ6˜6Kšžœ>˜A—K˜šžœžœž˜K˜šœžœ˜(K™6šžœž˜&Kšœžœ˜ Kšœ žœžœ˜-Kšžœžœ˜—Kšœ6‘,˜bKšœžœ˜K˜"K˜K˜Kšžœžœ˜%Kšœžœ˜#Kšžœ˜ —K˜šœžœžœ˜=K™gK˜5Kšœ"˜"Kšœžœ˜#Kšžœ˜ —K˜˜K™BK˜5Kšœ"˜"Kšœžœ˜$Kšžœ˜ —K˜šœžœ˜/K™BKšžœ˜Kšžœ˜—K˜šžœ˜ K™(Kšžœ˜Kšžœ˜——K˜Kšžœ˜—K˜—K˜š œžœ˜Kšœžœ˜Kšœžœ˜K˜Kšœ0˜0K˜šž˜Jšœ8˜8šžœžœž˜šžœ žœ˜KšœC‘˜KKšœ(˜(K˜—šœžœ˜Kšœ0‘˜8Kšœ(˜(Kšœ!˜!K˜—Kšžœžœ˜—Kšžœ˜—Kšœ˜K˜K˜——™Kšœžœ˜Kšœ žœ˜Kšœ žœ˜Kšœžœ˜K˜š  œžœžœžœ˜@K™1K™?K˜™ šžœ1žœ˜9K™Kšžœ˜ ——šžœ4ž˜;Kšžœžœžœ˜K˜šžœž˜Kšœžœ˜ šœ ‘2˜?Kšžœ˜Kšžœ˜—Kšžœžœ˜—šžœ˜šžœ˜™K˜ K˜Kš žœžœžœžœžœ˜FKšœ˜Kš žœžœžœžœ‘˜7Kšžœ žœ˜*Kšœ˜Kšœžœ˜Kšœ0žœ˜5—K˜—šžœ˜™;K˜Kšž œ˜—K˜——K˜6K˜#Kšžœ˜—Kšžœžœž œ˜=K˜—K˜š   œžœžœžœžœ‘ œ˜^K™wšžœžœ1˜Cšžœ˜Kšœžœ˜Kšž œ˜ Kšž œ˜ —šžœ˜Kšœžœ˜——K˜—K˜š   œžœžœžœ‘ œ˜QKšœ#˜#Jšž œ˜K˜—K˜š  œžœžœžœ˜RK˜Kšœ žœ.˜>Kš žœžœžœžœ‘#˜SKšœžœ˜'šž˜Kšžœžœžœžœ˜Kšžœžœžœ˜!Kšžœ"žœžœ˜.K˜Kšžœ˜—Kšœžœ;˜DKšžœžœžœžœ˜EKšž œ˜K˜—K˜š œžœžœžœžœžœ‘ œ˜€Kšž œžœ™'K˜™šžœ/žœ˜7Kšžœ˜ ——™1šžœ2žœžœ˜\K™%Kšžœ˜ —šžœ6ž˜=K˜/K˜4Kšžœ˜——™6K˜šžœ2ž˜;K˜Kšžœ˜——™KKšœ˜K˜K˜K˜K˜'K˜—™Mšžœ žœ/ž˜YKšž œ˜K˜,K˜'Kšžœ˜——K˜—K˜š œ˜&Kšœ9žœžœžœ™YKšœ˜Kšœžœ‘˜Kšžœžœ˜1K˜—š  œž œ&žœ˜YK™&Kšžœžœžœ˜Kšœžœ˜ K˜Kšžœžœžœ˜#Kšœ$˜$šžœž˜K˜˜ Kšžœ˜K˜Kšžœžœ˜šžœž˜šœ9˜9K˜4—šžœ˜ K˜.——Kšžœ˜ Kšœ˜—K˜šœ˜Kšœ˜Kšœ žœ˜Kšœ žœ˜Kšœ žœ˜K˜Kšžœžœ˜šžœžœ˜$Kšœ9˜9Kšœžœ˜ Kšœžœ‘˜$Kšžœ ˜—šžœ)žœ˜1šžœ)žœ˜1Kšœ?˜?Kšœžœ˜ Kšœžœ‘˜&Kšžœ ˜—Kšœ˜—šžœ1žœ˜9Kšœ?˜?Kšœžœ˜ Kšœ$žœ‘˜2Kšžœ ˜—K˜Kšœ.˜.Kšœ.˜.Kšœ2˜2Kšœ˜K™šžœ2ž˜9Kšœ!˜!—K™šžœ4ž˜;Kšœ˜—K™šžœž˜Kšœ4˜4—K˜™šžœžœžœ˜#šžœžœžœ˜%Kšœžœ˜;—Kšœ@˜@Kšžœ žœ˜K˜——K˜Kšžœ˜ K˜K˜—šžœ˜ Kšœ?˜?Kšœžœ˜ Kšžœ˜ Kšœ˜——Kšœ˜—K˜š œžœ˜Kšœžœ˜Kšœ>˜>K˜Cšžœžœž˜Kšžœ(žœžœžœ˜8Kšžœ/žœžœžœ˜?Kšœ˜Kšœžœ˜ Kšžœ˜—Kšœ2žœžœ˜