<> <> <> <> <> <> <> <> <> <> <> <> DIRECTORY Basics USING [LowHalf], BasicTime USING [GetClockPulses, MicrosecondsToPulses, Pulses, PulsesToMicroseconds], Booting USING [RegisterProcs, RollbackProc], Endian USING [CardFromH, HFromCard, HWORD], IO USING [CreateStream, CreateStreamProcs, EndOfStream, STREAM, StreamProcs, UnsafeBlock], PrincOpsUtils USING [ByteBlt], Process USING [Abort, ConditionPointer, Detach, DisableTimeout, EnableAborts, Milliseconds, MsecToTicks, priorityBackground, priorityForeground, Seconds, SecondsToTicks, SetPriority, SetTimeout, Ticks], Rope USING [ROPE], SafeStorage USING [CantEstablishFinalization, EnableFinalization, EstablishFinalization, FinalizationQueue, FQNext, NewFQ, ReEstablishFinalization], XNS USING [Address, Socket], XNSBuf USING [Buffer], XNSEchoBuf USING [Buffer, hdrBytes], XNSErrorBuf USING [Buffer], XNSErrorTypes USING [badChecksumErr, cantGetThereErr, invalidPacketTypeErr, listenerRejectErr, noSocketErr, protocolViolationErr, resourceLimitsErr, unspecifiedErr], XNSSocket USING [AllocBuffer, Create, Destroy, FreeBuffer, Get, GetRemoteAddress, GetUserBytes, Handle, Kick, Milliseconds, Put, ReturnError, ReturnToSender, SetGetTimeout, SetRemoteAddress, SetUserBytes, waitForever], XNSSocketBackdoor USING [FlushCache, PutCached, ReceiveProc, SetDirectReceive], XNSSPPBuf USING [Buffer, ConnCtl, hdrBytes], XNSSPPTypes USING [endReplySST, endSST], XNSStream USING [AttentionType, CloseReason, FilterProc, ListenerProc, Milliseconds, State, SubSequenceType, waitForever], XNSStreamPrivate USING [AttentionType, BufIndex, Finger, FingerObject, Handle, HWORD, OBAttn, OBAttnObject, Object, SignedDiff, SPPBuffer, SubSequenceType, unknownConnectionID, XNSBuffer]; XNSStreamImpl: CEDAR MONITOR LOCKS handle USING handle: Handle IMPORTS Basics, BasicTime, Booting, Endian, IO, PrincOpsUtils, Process, SafeStorage, XNSStreamPrivate, XNSSocket, XNSSocketBackdoor EXPORTS XNSStream ~ { OPEN XNSStreamPrivate; <> CheckUserBytes: PROC [buffer: XNSBuffer, bytes: CARDINAL] ~ { IF bytes # XNSSocket.GetUserBytes[buffer] THEN ERROR }; <> ConnectionClosed: PUBLIC ERROR [why: XNSStream.CloseReason, text: Rope.ROPE] ~ CODE; Timeout: PUBLIC SIGNAL ~ CODE; CompletionCode: TYPE ~ { normal, timedOut }; <> STREAM: TYPE ~ IO.STREAM; streamProcs: REF IO.StreamProcs ~ IO.CreateStreamProcs [ variety~$inputOutput, class~$XNS, getChar~GetChar, getBlock~GetBlock, unsafeGetBlock~UnsafeGetBlock, endOf~EndOf, charsAvail~CharsAvail, putChar~PutChar, putBlock~PutBlock, unsafePutBlock~UnsafePutBlock, flush~Flush, close~Close ]; <> Milliseconds: TYPE ~ XNSStream.Milliseconds; waitForever: Milliseconds ~ XNSStream.waitForever; Pulses: TYPE ~ BasicTime.Pulses; oneSecondOfPulses: Pulses ~ BasicTime.MicrosecondsToPulses[1000000]; MsecToPulses: PROC [msec: Milliseconds] RETURNS [Pulses] ~ { IF msec >= waitForever THEN RETURN [Pulses.LAST] ELSE RETURN [BasicTime.MicrosecondsToPulses[1000*msec]] }; PulsesSince: PROC [then: Pulses] RETURNS [Pulses] ~ INLINE { RETURN [BasicTime.GetClockPulses[] - then] }; TimeoutTicksFromPulses: PROC[pulses: Pulses] RETURNS[ticks: Process.Ticks] ~ { <> usec: LONG CARDINAL ~ BasicTime.PulsesToMicroseconds[pulses]; msec: Milliseconds; msec _ (usec + usec/10) / 1000; IF msec < Process.Milliseconds.LAST THEN RETURN[ Process.MsecToTicks[Process.Milliseconds[msec]] ]; RETURN[ Process.SecondsToTicks[Process.Seconds[msec/1000]] ]; }; Now: PROC RETURNS [Pulses] ~ INLINE { RETURN [BasicTime.GetClockPulses[]] }; IsTimedOut: PROC [timeout, then: Pulses] RETURNS [BOOL] ~ INLINE { RETURN [(BasicTime.GetClockPulses[] - then) > timeout] }; <> defaultConnCtl: XNSSPPBuf.ConnCtl ~ [ system~FALSE, sendAck~FALSE, attn~FALSE, endOfMsg~FALSE, filler~0]; defaultSystemConnCtl: XNSSPPBuf.ConnCtl ~ [ system~TRUE, sendAck~FALSE, attn~FALSE, endOfMsg~FALSE, filler~0]; defaultAttnConnCtl: XNSSPPBuf.ConnCtl ~ [ system~FALSE, sendAck~FALSE, attn~TRUE, endOfMsg~FALSE, filler~0]; AllocateOutputBuffer: INTERNAL PROC [handle: Handle] RETURNS [nsb: XNSBuffer, b: SPPBuffer] ~ { <> <> nsb _ XNSSocket.AllocBuffer[handle.socket]; TRUSTED { b _ LOOPHOLE[nsb] }; b.hdr1.type _ spp; b.hdr2.sourceConnID _ handle.connectionID; b.hdr2.destConnID _ handle.remoteConnectionID; }; OutputMakeNotEmpty: INTERNAL PROC [handle: Handle] ~ { <> <> <> finger: Finger ~ handle.outputEnqueue; buffer: SPPBuffer _ finger.buffer; IF finger.state # empty THEN ERROR; -- Can't happen. IF buffer = NIL THEN finger.buffer _ buffer _ AllocateOutputBuffer[handle].b; buffer.hdr2.connCtl _ IF finger.attention THEN defaultAttnConnCtl ELSE defaultConnCtl; buffer.hdr2.sst _ handle.outputSSType; buffer.hdr2.seqNum _ Endian.HFromCard[handle.outputEnqueueNum]; finger.state _ halfFull; finger.index _ finger.bytes _ 0; }; InitSystemOutputBuffer: INTERNAL PROC [handle: Handle, buffer: SPPBuffer] ~ { <> buffer.hdr2.connCtl _ defaultSystemConnCtl; buffer.hdr2.sst _ handle.outputSSType; buffer.hdr2.seqNum _ Endian.HFromCard[handle.outputSendNum]; TRUSTED { XNSSocket.SetUserBytes[LOOPHOLE[buffer], XNSSPPBuf.hdrBytes] }; }; OutputEnqueue: INTERNAL PROC [handle: Handle, endOfMessage: BOOL] ~ { <> <> finger: Finger ~ handle.outputEnqueue; IF finger.state # halfFull THEN ERROR; -- Can't happen. finger.buffer.hdr2.connCtl.endOfMsg _ endOfMessage; TRUSTED { XNSSocket.SetUserBytes[LOOPHOLE[finger.buffer], XNSSPPBuf.hdrBytes + finger.bytes] }; finger.state _ full; IF finger.index # 0 THEN ERROR; -- Can't happen. handle.outputEnqueue _ handle.outputEnqueue.next; handle.outputEnqueueNum _ handle.outputEnqueueNum + 1; NOTIFY handle.outputReady; }; ReadyOutputBuffer: INTERNAL PROC [handle: Handle, buffer: SPPBuffer, requestAck: BOOL] ~ { <> <> expectedAckNum, allocNum: CARDINAL; <> IF requestAck THEN { expectedAckNum _ Endian.CardFromH[buffer.hdr2.seqNum]; IF (NOT buffer.hdr2.connCtl.system) AND (expectedAckNum = handle.outputSendNum) THEN expectedAckNum _ expectedAckNum + 1; handle.expectedAckNum _ expectedAckNum; buffer.hdr2.connCtl.sendAck _ TRUE; handle.sentAckReqTime _ Now[] }; <> buffer.hdr2.ackNum _ Endian.HFromCard[handle.inputAckNum]; handle.mustSendAck _ FALSE; <> allocNum _ handle.inputDequeueNum + handle.inputBuffersAllocated - 1; buffer.hdr2.allocNum _ Endian.HFromCard[allocNum]; IF allocNum # handle.sentAllocNum THEN { <> handle.sentAllocNum _ allocNum }; <> <> IF buffer.hdr1.type # spp THEN ERROR; <> IF buffer.hdr2.sourceConnID # handle.connectionID THEN ERROR; <> IF buffer.hdr2.destConnID # handle.remoteConnectionID THEN ERROR; }; MakeCopyOfXNSBuffer: PROC [sH: XNSSocket.Handle, nsb: XNSBuffer] RETURNS[copy: XNSBuffer] ~ { bytes: CARDINAL ~ Endian.CardFromH[nsb.hdr1.length]; copy _ XNSSocket.AllocBuffer[sH]; TRUSTED { [] _ PrincOpsUtils.ByteBlt[ to ~ [ blockPointer ~ @copy.hdr1, startIndex ~ 0, stopIndexPlusOne ~ bytes], from ~ [ blockPointer ~ @nsb.hdr1, startIndex ~ 0, stopIndexPlusOne ~ bytes] ]; }; }; InputDequeue: INTERNAL PROC [handle: Handle] ~ -- INLINE -- { finger: Finger; sendAllocNow, cantPiggyback: BOOL; finger _ handle.inputDequeue; finger.state _ empty; finger.attention _ FALSE; handle.inputDequeue _ finger.next; handle.inputDequeueNum _ handle.inputDequeueNum + 1; <> sendAllocNow _ <> handle.heWantsAlloc AND <> (SignedDiff[handle.inputEnqueueNum, handle.inputDequeueNum] <= (handle.inputBuffersAllocated/2)); IF sendAllocNow THEN handle.heWantsAlloc _ FALSE; cantPiggyback _ <> (handle.outputSendNum = handle.outputEnqueueNum) OR <> (SignedDiff[handle.recvdAllocNum, handle.outputSendNum] < 0); IF sendAllocNow AND cantPiggyback THEN { nsb: XNSBuffer; b: SPPBuffer; [nsb, b] _ AllocateOutputBuffer[handle]; InitSystemOutputBuffer[handle, b]; ReadyOutputBuffer[handle, b, FALSE]; CheckUserBytes[nsb, XNSSPPBuf.hdrBytes]; -- DEBUG XNSSocketBackdoor.PutCached[handle.socket, nsb]; XNSSocket.FreeBuffer[handle.socket, nsb]; }; }; NotifyClosed: INTERNAL PROC [handle: Handle, closeReason: XNSStream.CloseReason, closeText: Rope.ROPE] ~ { IF NOT handle.closed THEN { handle.closeReason _ closeReason; handle.closeText _ closeText; handle.closed _ TRUE }; BROADCAST handle.inputReady; <> BROADCAST handle.outputSpace; BROADCAST handle.outputReady; BROADCAST handle.recvdNewAlloc; <> <> BROADCAST handle.mgrShortWakeup; BROADCAST handle.mgrLongWakeup; BROADCAST handle.flusherWakeup; BROADCAST handle.recvdOBAttn; XNSSocket.Kick[handle.socket]; }; EntryNotifyClosed: ENTRY PROC [handle: Handle, closeReason: XNSStream.CloseReason, closeText: Rope.ROPE] ~ { NotifyClosed[handle, closeReason, closeText] }; <> MakeBufferRing: PROC [sH: XNSSocket.Handle, nBuffers: CARDINAL] RETURNS [finger: Finger _ NIL] ~ { tail: Finger _ NIL; THROUGH [1..nBuffers] DO finger _ NEW[ FingerObject _ [next~finger] ]; IF tail = NIL THEN tail _ finger; ENDLOOP; tail.next _ finger; }; FreeBufferRing: PROC [sH: XNSSocket.Handle, finger: Finger] ~ { WHILE finger # NIL DO IF finger.buffer # NIL THEN { nsb: XNSBuffer; TRUSTED { nsb _ LOOPHOLE[finger.buffer] }; XNSSocket.FreeBuffer[sH, nsb]; finger.buffer _ NIL }; { temp: Finger ~ finger.next; finger.next _ NIL; finger _ temp }; ENDLOOP; }; InitConditions: PROC [handle: Handle] ~ { TRUSTED { Process.EnableAborts[@handle.inputReady]; <> Process.EnableAborts[@handle.outputSpace]; Process.EnableAborts[@handle.outputReady]; Process.EnableAborts[@handle.recvdNewAlloc]; <> <> <> <> Process.EnableAborts[@handle.flusherWakeup]; Process.EnableAborts[@handle.recvdOBAttn]; }; }; InitTimeouts: PROC [handle: Handle, getTimeout, putTimeout: Milliseconds] ~ { handle.getTimeout _ getTimeout; handle.getPulseOut _ MsecToPulses[getTimeout]; IF getTimeout = waitForever THEN TRUSTED { Process.DisableTimeout[@handle.inputReady] } ELSE TRUSTED { Process.SetTimeout[@handle.inputReady, TimeoutTicksFromPulses[handle.getPulseOut]] }; handle.putTimeout _ putTimeout; handle.putPulseOut _ MsecToPulses[putTimeout]; IF putTimeout = waitForever THEN TRUSTED { Process.DisableTimeout[@handle.outputSpace] } ELSE TRUSTED { Process.SetTimeout[@handle.outputSpace, TimeoutTicksFromPulses[handle.putPulseOut]] }; handle.cacheFlushPulseOut _ 30 * oneSecondOfPulses; handle.roundTripPulses _ oneSecondOfPulses; UpdateTimeouts[handle]; }; UpdateTimeouts: PROC [handle: Handle] ~ { <> handle.waitForAckPulseOut _ 3 * handle.roundTripPulses; handle.waitForAllocPulseOut _ handle.waitForAckPulseOut; TRUSTED { Process.SetTimeout[@handle.mgrShortWakeup, TimeoutTicksFromPulses[handle.waitForAckPulseOut]] }; handle.probePulseOut _ 30 * handle.roundTripPulses; TRUSTED { Process.SetTimeout[@handle.mgrLongWakeup, TimeoutTicksFromPulses[handle.probePulseOut]] }; handle.noActivityPulseOut _ 100 * handle.roundTripPulses; }; MakeHandle: PROC [socket: XNSSocket.Handle, connectionID, remoteConnectionID: HWORD, sendBuffers, recvBuffers: CARDINAL, getTimeout, putTimeout: Milliseconds] RETURNS [handle: Handle] ~ { handle _ NEW[Object]; handle.socket _ socket; handle.remote _ XNSSocket.GetRemoteAddress[socket]; handle.recvdTime _ Now[]; handle.cacheFlushedTime _ Now[]; handle.sentAckReqTime _ Now[]; handle.connectionID _ connectionID; handle.remoteConnectionID _ remoteConnectionID; InitConditions[handle]; InitTimeouts[handle, getTimeout, putTimeout]; { f: Finger ~ MakeBufferRing[handle.socket, sendBuffers]; handle.outputEnqueue _ handle.outputSend _ handle.outputDequeue _ f; handle.outputBuffersAllocated _ sendBuffers }; { f: Finger ~ MakeBufferRing[handle.socket, recvBuffers]; handle.inputEnqueue _ handle.inputAck _ handle.inputDequeue _ f; handle.inputBuffersAllocated _ recvBuffers }; handle.pull _ FORK Pull[handle]; handle.mgr _ FORK Mgr[handle]; handle.push1 _ FORK Push[handle]; handle.push2 _ FORK Push[handle]; AddNewHandle[newHandle~handle]; SafeStorage.EnableFinalization[handle]; }; FinishHandle: PROC [handle: Handle] ~ { IF NOT handle.closed THEN ERROR; IF handle.finished THEN RETURN; TRUSTED { JOIN handle.push1; handle.push1 _ NIL; JOIN handle.push2; handle.push2 _ NIL; JOIN handle.mgr; handle.mgr _ NIL; JOIN handle.pull; handle.pull _ NIL }; FreeBufferRing[handle.socket, handle.outputEnqueue]; handle.outputEnqueue _ handle.outputSend _ handle.outputDequeue _ NIL; FreeBufferRing[handle.socket, handle.inputEnqueue]; handle.inputEnqueue _ handle.inputAck _ handle.inputDequeue _ NIL; XNSSocket.Destroy[handle.socket]; handle.finished _ TRUE }; SendRFC: PROC [socket: XNSSocket.Handle, connectionID: HWORD] ~ { nsb: XNSBuffer; b: SPPBuffer; nsb _ XNSSocket.AllocBuffer[socket]; TRUSTED { b _ LOOPHOLE[nsb] }; b.hdr1.type _ spp; b.hdr2.connCtl _ defaultSystemConnCtl; b.hdr2.connCtl.sendAck _ TRUE; b.hdr2.sst _ 0; b.hdr2.sourceConnID _ connectionID; b.hdr2.destConnID _ unknownConnectionID; b.hdr2.seqNum _ Endian.HFromCard[0]; b.hdr2.ackNum _ Endian.HFromCard[0]; b.hdr2.allocNum _ Endian.HFromCard[0]; XNSSocket.SetUserBytes[nsb, XNSSPPBuf.hdrBytes]; XNSSocket.Put[handle~socket, b~nsb]; }; Create: PUBLIC PROC [remote: XNS.Address, getTimeout, putTimeout: Milliseconds _ waitForever] RETURNS [STREAM] ~ { nsb: XNSBuffer _ NIL; socket: XNSSocket.Handle _ NIL; handle: Handle _ NIL; retransmissionTime: Milliseconds _ 500; connID: HWORD; getTimeout _ MIN[getTimeout, waitForever]; putTimeout _ MIN[putTimeout, waitForever]; BEGIN ENABLE UNWIND => { IF nsb # NIL THEN { XNSSocket.FreeBuffer[socket, nsb]; nsb _ NIL }; XNSSocket.Destroy[socket] }; socket _ XNSSocket.Create[remote~remote, sendBuffers~10, recvBuffers~14]; -- buffers ???? connID _ GetUniqueConnID[]; FOR nTries: CARDINAL IN [1..10] DO IF nTries > 5 THEN retransmissionTime _ 2*retransmissionTime; XNSSocket.SetGetTimeout[socket, retransmissionTime]; SendRFC[socket, connID]; nsb _ XNSSocket.Get[socket]; IF nsb = NIL THEN LOOP; SELECT nsb.hdr1.type FROM spp => { b: SPPBuffer; TRUSTED { b _ LOOPHOLE[nsb] }; IF b.hdr2.destConnID # connID THEN { XNSSocket.ReturnError[socket, nsb, XNSErrorTypes.protocolViolationErr]; nsb _ NIL; LOOP }; IF b.hdr1.source.host # remote.host THEN { XNSSocket.ReturnError[socket, nsb, XNSErrorTypes.protocolViolationErr]; nsb _ NIL; LOOP }; XNSSocket.SetRemoteAddress[socket, b.hdr1.source]; handle _ MakeHandle[socket~socket, connectionID~connID, remoteConnectionID~b.hdr2.sourceConnID, sendBuffers~6, recvBuffers~6, getTimeout~getTimeout, putTimeout~putTimeout]; nsb _ Receive[socket, nsb, handle]; IF nsb # NIL THEN { XNSSocket.FreeBuffer[socket, nsb]; nsb _ NIL }; RETURN [IO.CreateStream[streamProcs~streamProcs, streamData~handle]]; }; error => { OPEN XNSErrorTypes; eb: XNSErrorBuf.Buffer; TRUSTED { eb _ LOOPHOLE[nsb] }; SELECT eb.hdr2.type FROM badChecksumErr, resourceLimitsErr => LOOP; noSocketErr, listenerRejectErr, invalidPacketTypeErr, protocolViolationErr => ERROR ConnectionClosed[remoteReject, "rejected"]; cantGetThereErr => ERROR ConnectionClosed[noRoute, "no route"]; ENDCASE => ERROR ConnectionClosed[unknown, "error reply to RFC"]; }; ENDCASE => { XNSSocket.ReturnError[socket, nsb, XNSErrorTypes.invalidPacketTypeErr]; nsb _ NIL; LOOP }; ENDLOOP; ERROR ConnectionClosed[noResponse, "no response"]; END; }; Listener: TYPE ~ REF ListenerObject; ListenerObject: PUBLIC TYPE ~ RECORD [ next: Listener, destroyed: BOOL _ FALSE, listenProcess: PROCESS ]; CreateListener: PUBLIC PROC [ socket: XNS.Socket, worker: XNSStream.ListenerProc, getTimeout, putTimeout: Milliseconds _ waitForever, filter: XNSStream.FilterProc _ NIL, -- NIL => Accept all requests echoFilter: XNSStream.FilterProc _ NIL] -- NIL => Answer all echos RETURNS [listener: Listener] ~ { listener _ NEW[ ListenerObject _ [listenProcess~ FORK Listen[socket, worker, getTimeout, putTimeout, filter, echoFilter]] ]; AddNewListener[newListener~listener]; SafeStorage.EnableFinalization[listener]; }; DestroyListener: PUBLIC PROC [listener: Listener] ~ { IF listener.destroyed THEN RETURN; TRUSTED { Process.Abort[listener.listenProcess]; JOIN listener.listenProcess }; listener.destroyed _ TRUE; }; Listen: PROC [ socket: XNS.Socket, worker: XNSStream.ListenerProc, getTimeout, putTimeout: Milliseconds, filter: XNSStream.FilterProc, echoFilter: XNSStream.FilterProc] ~ { socketHandle: XNSSocket.Handle _ XNSSocket.Create[getTimeout~XNSSocket.waitForever, local~socket]; nsb: XNSBuffer _ NIL; bytes: NAT; { ENABLE UNWIND => { IF nsb # NIL THEN { XNSSocket.FreeBuffer[handle~socketHandle, b~nsb]; nsb _ NIL }; XNSSocket.Destroy[socketHandle] }; DO IF nsb # NIL THEN { XNSSocket.FreeBuffer[handle~socketHandle, b~nsb]; nsb _ NIL }; IF (nsb _ XNSSocket.Get[socketHandle]) = NIL THEN LOOP; bytes _ XNSSocket.GetUserBytes[nsb]; SELECT nsb.hdr1.type FROM echo => { b: XNSEchoBuf.Buffer; IF bytes < XNSEchoBuf.hdrBytes THEN LOOP; TRUSTED { b _ LOOPHOLE[nsb] }; IF b.hdr2.type # request THEN LOOP; IF (echoFilter # NIL) AND NOT echoFilter[nsb.hdr1.source] THEN LOOP; b.hdr2.type _ reply; { XNSSocket.ReturnToSender[handle~socketHandle, b~nsb]; nsb _ NIL }; }; spp => { b: SPPBuffer; streamSocketHandle: XNSSocket.Handle; stream: STREAM; handle: Handle; IF bytes < XNSSPPBuf.hdrBytes THEN LOOP; TRUSTED { b _ LOOPHOLE[nsb] }; IF (b.hdr2.seqNum # 0) OR (b.hdr2.destConnID # unknownConnectionID) OR (b.hdr2.sourceConnID = unknownConnectionID) THEN { XNSSocket.ReturnError[socketHandle, nsb, XNSErrorTypes.protocolViolationErr]; nsb _ NIL; LOOP }; IF FindExistingHandle[remote~nsb.hdr1.source, remoteConnID~b.hdr2.sourceConnID] # NIL THEN LOOP; IF (filter # NIL) AND NOT filter[nsb.hdr1.source] THEN { XNSSocket.ReturnError[socketHandle, nsb, XNSErrorTypes.listenerRejectErr]; nsb _ NIL; LOOP }; streamSocketHandle _ XNSSocket.Create[remote~nsb.hdr1.source, sendBuffers~10, recvBuffers~14]; -- how many buffers really? what should the timeouts be? ???? handle _ MakeHandle[socket~streamSocketHandle, connectionID~GetUniqueConnID[], remoteConnectionID~b.hdr2.sourceConnID, sendBuffers~6, recvBuffers~6, getTimeout~getTimeout, putTimeout~putTimeout]; -- what should buffers be really? ???? stream _ IO.CreateStream[streamProcs~streamProcs, streamData~handle]; b.hdr2.connCtl.sendAck _ TRUE; -- force us to reply nsb _ Receive[streamSocketHandle, nsb, handle]; TRUSTED { Process.Detach[FORK worker[stream~stream, remote~b.hdr1.source]] }; }; ENDCASE; ENDLOOP; }; }; <> GetTimeouts: PUBLIC PROC [self: IO.STREAM] RETURNS [getTimeout, putTimeout: Milliseconds] ~ { handle: Handle = NARROW[self.streamData]; [getTimeout, putTimeout] _ EntryGetTimeouts[handle] }; EntryGetTimeouts: ENTRY PROC [handle: Handle] RETURNS [getTimeout, putTimeout: Milliseconds] ~ { IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; getTimeout _ handle.getTimeout; putTimeout _ handle.putTimeout; }; SetTimeouts: PUBLIC PROC [self: IO.STREAM, getTimeout, putTimeout: Milliseconds _ waitForever] ~ { handle: Handle = NARROW[self.streamData]; EntrySetTimeouts[handle, getTimeout, putTimeout]; }; EntrySetTimeouts: ENTRY PROC [handle: Handle, getTimeout, putTimeout: Milliseconds] ~ { IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; InitTimeouts[handle, getTimeout, putTimeout]; }; SetSSType: PUBLIC PROC [self: STREAM, ssType: SubSequenceType] ~ { handle: Handle = NARROW[self.streamData]; code: CompletionCode; DO code _ EntrySetSSType[handle, ssType]; IF code = normal THEN RETURN; SIGNAL Timeout[]; ENDLOOP; }; EntrySetSSType: ENTRY PROC [handle: Handle, ssType: SubSequenceType, sendNow: BOOL _ FALSE] RETURNS [CompletionCode _ normal] ~ { <> ENABLE UNWIND => NULL; finger: Finger; startTime: Pulses _ Now[]; IF ssType = handle.outputSSType THEN RETURN; DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; finger _ handle.outputEnqueue; SELECT finger.state FROM empty => EXIT; sending, full => { IF IsTimedOut[handle.putPulseOut, startTime] THEN RETURN [timedOut]; WAIT handle.outputSpace; LOOP }; filling => { WAIT handle.doneFilling; LOOP }; halfFull => { OutputEnqueue[handle~handle, endOfMessage~FALSE]; LOOP }; ENDCASE => ERROR; ENDLOOP; handle.outputSSType _ ssType; OutputMakeNotEmpty[handle]; -- gets new handle.outputSSType IF sendNow THEN OutputEnqueue[handle~handle, endOfMessage~FALSE]; }; SendEndOfMessage: PUBLIC PROC [self: STREAM] ~ { handle: Handle = NARROW[self.streamData]; code: CompletionCode; DO code _ EntrySendEndOfMessage[handle]; IF code = normal THEN RETURN; SIGNAL Timeout[]; ENDLOOP; }; EntrySendEndOfMessage: ENTRY PROC [handle: Handle] RETURNS [CompletionCode] ~ { ENABLE UNWIND => NULL; finger: Finger; startTime: Pulses _ Now[]; DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; finger _ handle.outputEnqueue; SELECT finger.state FROM empty => { OutputMakeNotEmpty[handle]; EXIT }; sending, full => { IF IsTimedOut[handle.putPulseOut, startTime] THEN RETURN [timedOut]; WAIT handle.outputSpace; LOOP }; filling => { WAIT handle.doneFilling; LOOP }; halfFull => EXIT; ENDCASE => ERROR; ENDLOOP; OutputEnqueue[handle~handle, endOfMessage~TRUE]; RETURN [normal]; }; SendAttention: PUBLIC PROC [self: STREAM, attentionType: AttentionType] ~ { handle: Handle = NARROW[self.streamData]; nsb: XNSBuffer; code: CompletionCode; DO [nsb, code] _ EntrySendAttention[handle, attentionType]; IF code = normal THEN EXIT; IF nsb # NIL THEN ERROR; -- can't happen SIGNAL Timeout[]; ENDLOOP; IF nsb # NIL THEN XNSSocket.Put[handle.socket, nsb]; }; EntrySendAttention: ENTRY PROC [handle: Handle, attentionType: AttentionType] RETURNS [nsb: XNSBuffer, code: CompletionCode] ~ { <> ENABLE UNWIND => NULL; finger: Finger; startTime: Pulses _ Now[]; attnNum: CARDINAL; DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; IF handle.outputIBAttnCnt >= 3 THEN { -- How many really ???? IF IsTimedOut[handle.putPulseOut, startTime] THEN RETURN [NIL, timedOut]; WAIT handle.outputSpace; LOOP }; finger _ handle.outputEnqueue; SELECT finger.state FROM filling => { WAIT handle.doneFilling; LOOP }; halfFull => { OutputEnqueue[handle~handle, endOfMessage~FALSE]; EXIT }; ENDCASE => EXIT; ENDLOOP; <> WHILE finger.next # handle.outputEnqueue DO finger _ finger.next ENDLOOP; finger.next _ NEW [FingerObject _ [next~handle.outputEnqueue, attention~TRUE]]; finger _ finger.next; handle.outputEnqueue _ finger; handle.outputIBAttnCnt _ handle.outputIBAttnCnt.SUCC; attnNum _ handle.outputEnqueueNum; <> OutputMakeNotEmpty[handle]; TRUSTED { finger.buffer.body.bytes[0] _ LOOPHOLE[attentionType] }; finger.bytes _ 1; OutputEnqueue[handle~handle, endOfMessage~FALSE]; <> IF SignedDiff[handle.recvdAllocNum, attnNum] < 0 THEN { nsb: XNSBuffer; TRUSTED { nsb _ MakeCopyOfXNSBuffer[handle.socket, LOOPHOLE[finger.buffer, XNSBuffer]]; ReadyOutputBuffer[handle~handle, buffer~LOOPHOLE[nsb, SPPBuffer], requestAck~FALSE] }; RETURN [nsb, normal]; }; RETURN [NIL, normal]; }; WaitAttention: PUBLIC PROC [self: STREAM, waitTimeout: Milliseconds _ XNSStream.waitForever] RETURNS [type: AttentionType] ~ { handle: Handle = NARROW[self.streamData]; code: CompletionCode; DO [type, code] _ EntryWaitAttention[handle, MsecToPulses[waitTimeout]]; IF code = normal THEN RETURN; SIGNAL Timeout[]; ENDLOOP; }; EntryWaitAttention: ENTRY PROC [handle: Handle, pulseOut: Pulses] RETURNS [AttentionType, CompletionCode] ~ { ENABLE UNWIND => NULL; startTime: Pulses _ Now[]; p: OBAttn _ NIL; DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; IF (p _ handle.recvdOBAttnList) # NIL THEN { handle.recvdOBAttnList _ p.next; RETURN [p.attentionType, normal] }; IF IsTimedOut[pulseOut, startTime] THEN RETURN[0, timedOut]; TRUSTED { IF pulseOut < Pulses.LAST THEN Process.SetTimeout[@handle.recvdOBAttn, TimeoutTicksFromPulses[pulseOut]] ELSE Process.DisableTimeout[@handle.recvdOBAttn]; }; WAIT handle.recvdOBAttn; ENDLOOP; }; GetStatus: PUBLIC PROC [self: STREAM, reset: BOOL] RETURNS [state: XNSStream.State, ssType: SubSequenceType, attentionType: AttentionType] ~ { handle: Handle = NARROW[self.streamData]; [state, ssType, attentionType] _ EntryGetStatus[handle, reset]; }; EntryGetStatus: ENTRY PROC [handle: Handle, reset: BOOL] RETURNS [state: XNSStream.State, ssType: SubSequenceType, attentionType: AttentionType] ~ { ENABLE UNWIND => NULL; finger: Finger; buffer: SPPBuffer; DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; finger _ handle.inputDequeue; SELECT finger.state FROM full, halfEmpty => { finger.state _ halfEmpty; buffer _ finger.buffer; <> IF finger.attention THEN { state _ attention; ssType _ handle.inputSSType; TRUSTED { attentionType _ LOOPHOLE[finger.buffer.body.bytes[0]] }; IF reset THEN { p: OBAttn ~ handle.recvdOBAttnList; IF (p # NIL) AND (p.seqNum = handle.inputDequeueNum) THEN handle.recvdOBAttnList _ p.next; InputDequeue[handle] }; RETURN }; IF buffer.hdr2.sst # handle.inputSSType THEN { IF reset THEN handle.inputSSType _ buffer.hdr2.sst; RETURN [state~ssTypeChange, ssType~buffer.hdr2.sst, attentionType~0] }; IF finger.index < finger.bytes THEN RETURN [state~open, ssType~handle.inputSSType, attentionType~0]; IF buffer.hdr2.connCtl.endOfMsg THEN { IF reset THEN InputDequeue[handle]; RETURN [state~endOfMessage, ssType~handle.inputSSType, attentionType~0] }; InputDequeue[handle]; LOOP }; emptying => { WAIT handle.doneEmptying; LOOP }; empty => { RETURN[state~open, ssType~handle.inputSSType, attentionType~0] }; ENDCASE => ERROR; ENDLOOP; }; FlushInput: PUBLIC PROC [self: IO.STREAM, wait: BOOL _ FALSE] RETURNS [bytesSkipped: LONG CARDINAL _ 0] ~ { handle: Handle = NARROW[self.streamData]; code: CompletionCode; DO [bytesSkipped, code] _ EntryFlushInput[handle, wait, bytesSkipped]; IF code = normal THEN RETURN; SIGNAL Timeout[]; ENDLOOP; }; EntryFlushInput: ENTRY PROC [handle: Handle, wait: BOOL _ FALSE, bytesSkipped: LONG CARDINAL] RETURNS [newSkipped: LONG CARDINAL, code: CompletionCode] ~ { ENABLE UNWIND => NULL; startTime: Pulses _ Now[]; finger: Finger; buffer: SPPBuffer; DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; finger _ handle.inputDequeue; SELECT finger.state FROM full, halfEmpty => { finger.state _ halfEmpty; IF finger.attention THEN RETURN [bytesSkipped, normal]; buffer _ finger.buffer; IF buffer.hdr2.sst # handle.inputSSType THEN RETURN [bytesSkipped, normal]; IF finger.index < finger.bytes THEN { bytesSkipped _ bytesSkipped + (finger.bytes - finger.index); finger.index _ finger.bytes }; IF buffer.hdr2.connCtl.endOfMsg THEN RETURN [bytesSkipped, normal]; InputDequeue[handle]; LOOP }; emptying => { WAIT handle.doneEmptying; LOOP }; empty => { IF NOT wait THEN RETURN [bytesSkipped, normal]; IF IsTimedOut[handle.getPulseOut, startTime] THEN RETURN [bytesSkipped, timedOut]; WAIT handle.inputReady; LOOP }; ENDCASE => ERROR; ENDLOOP; }; SendNow: PUBLIC PROC [self: IO.STREAM] ~ { handle: Handle = NARROW[self.streamData]; EntrySendNow[handle]; }; EntrySendNow: ENTRY PROC [handle: Handle] ~ { finger: Finger; DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; finger _ handle.outputEnqueue; SELECT finger.state FROM filling => { WAIT handle.doneFilling; LOOP }; halfFull => { OutputEnqueue[handle~handle, endOfMessage~FALSE]; EXIT }; ENDCASE => EXIT; ENDLOOP; }; SendClose: PUBLIC PROC [self: IO.STREAM] RETURNS [ok: BOOL _ FALSE] ~ { ENABLE ConnectionClosed => CONTINUE; handle: Handle = NARROW[self.streamData]; sst: SubSequenceType; code: CompletionCode; code _ EntrySetSSType[handle~handle, ssType~XNSSPPTypes.endSST, sendNow~TRUE]; IF code # normal THEN RETURN; DO [code~code] _ EntryFlushInput[handle~handle, wait~TRUE, bytesSkipped~0]; IF code # normal THEN RETURN; [ssType~sst] _ EntryGetStatus[handle~handle, reset~TRUE]; IF (sst = XNSSPPTypes.endReplySST) OR (sst = XNSSPPTypes.endSST) THEN { [] _ EntrySetSSType[handle~handle, ssType~XNSSPPTypes.endReplySST, sendNow~TRUE]; RETURN [TRUE] }; ENDLOOP; }; SendCloseReply: PUBLIC PROC [self: IO.STREAM] RETURNS [ok: BOOL _ FALSE] ~ { ENABLE ConnectionClosed => CONTINUE; handle: Handle = NARROW[self.streamData]; sst: SubSequenceType; code: CompletionCode; code _ EntrySetSSType[handle~handle, ssType~XNSSPPTypes.endReplySST, sendNow~TRUE]; IF code # normal THEN RETURN; DO [code~code] _ EntryFlushInput[handle~handle, wait~TRUE, bytesSkipped~0]; IF code # normal THEN RETURN; [ssType~sst] _ EntryGetStatus[handle~handle, reset~TRUE]; IF (sst = XNSSPPTypes.endReplySST) THEN RETURN [TRUE]; ENDLOOP; }; <> GetChar: PROC [self: STREAM] RETURNS [char: CHAR] ~ { handle: Handle ~ NARROW[self.streamData]; code: CompletionCode; DO [char, code] _ EntryGetChar[handle, self]; IF code = normal THEN RETURN; SIGNAL Timeout[]; ENDLOOP; }; EntryGetChar: ENTRY PROC [handle: Handle, self: STREAM] RETURNS [char: CHAR, code: CompletionCode] ~ { ENABLE UNWIND => NULL; finger: Finger; buffer: SPPBuffer; startTime: Pulses _ Now[]; DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; finger _ handle.inputDequeue; SELECT finger.state FROM full, halfEmpty => { finger.state _ halfEmpty; buffer _ finger.buffer; IF finger.attention OR (buffer.hdr2.sst # handle.inputSSType) THEN RETURN WITH ERROR IO.EndOfStream[self]; IF finger.index >= finger.bytes THEN { IF buffer.hdr2.connCtl.endOfMsg THEN RETURN WITH ERROR IO.EndOfStream[self]; InputDequeue[handle]; LOOP }; TRUSTED { char _ buffer.body.chars[finger.index] }; finger.index _ finger.index + 1; RETURN [char, normal]; }; emptying => { WAIT handle.doneEmptying; LOOP }; empty => { IF IsTimedOut[handle.getPulseOut, startTime] THEN RETURN [char~, code~timedOut]; WAIT handle.inputReady; LOOP }; ENDCASE => ERROR; ENDLOOP; }; GetBlock: PROC [self: STREAM, block: REF TEXT, startIndex: NAT, count: NAT] RETURNS [nBytesRead: NAT _ 0] ~ { handle: Handle ~ NARROW[self.streamData]; finger: Finger _ NIL; base: LONG POINTER ~ LOOPHOLE[block, LONG POINTER]+SIZE[TEXT[0]]; -- WORD SIZE DEPENDENT ???? stop: NAT ~ MIN[(startIndex+count), block.maxLength]; nBytes: NAT; code: CompletionCode; WHILE startIndex < stop DO [finger, code] _ GetBlockNext[self, handle, finger]; SELECT code FROM timedOut => { SIGNAL Timeout[]; LOOP }; normal => IF finger = NIL THEN EXIT; ENDCASE => ERROR; TRUSTED { nBytes _ PrincOpsUtils.ByteBlt[ to~[ blockPointer~base, startIndex~startIndex, stopIndexPlusOne~stop], from~[ blockPointer~@finger.buffer.body, -- WORD SIZE DEPENDENT ???? startIndex~finger.index, stopIndexPlusOne~finger.bytes] ]; }; finger.index _ finger.index + nBytes; nBytesRead _ nBytesRead + nBytes; startIndex _ startIndex + nBytes; block.length _ startIndex; ENDLOOP; IF finger # NIL THEN GetBlockLast[handle, finger]; }; UnsafeGetBlock: UNSAFE PROC [self: STREAM, block: IO.UnsafeBlock] RETURNS [nBytesRead: INT _ 0] ~ { handle: Handle ~ NARROW[self.streamData]; finger: Finger _ NIL; startIndex: INT _ block.startIndex; stop: INT ~ block.startIndex+block.count; nBytes: NAT; code: CompletionCode; WHILE startIndex < stop DO [finger, code] _ GetBlockNext[self, handle, finger]; SELECT code FROM timedOut => { SIGNAL Timeout[]; LOOP }; normal => IF finger = NIL THEN EXIT; ENDCASE => ERROR; TRUSTED { nBytes _ PrincOpsUtils.ByteBlt[ to~[ blockPointer~block.base, startIndex~startIndex, stopIndexPlusOne~stop], from~[ blockPointer~@finger.buffer.body, -- WORD SIZE DEPENDENT ???? startIndex~finger.index, stopIndexPlusOne~finger.bytes] ]; }; finger.index _ finger.index + nBytes; nBytesRead _ nBytesRead + nBytes; startIndex _ startIndex + nBytes; ENDLOOP; IF finger # NIL THEN GetBlockLast[handle, finger]; }; GetBlockNext: ENTRY PROC [self: STREAM, handle: Handle, oldFinger: Finger] RETURNS [Finger, CompletionCode] ~ { ENABLE UNWIND => NULL; finger: Finger _ NIL; buffer: SPPBuffer; startTime: Pulses _ Now[]; IF oldFinger # NIL THEN { oldFinger.state _ halfEmpty; BROADCAST handle.doneEmptying; }; DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; finger _ handle.inputDequeue; buffer _ finger.buffer; SELECT finger.state FROM full, halfEmpty => { IF finger.attention OR (buffer.hdr2.sst # handle.inputSSType) THEN RETURN[NIL, normal]; IF finger.index >= finger.bytes THEN { IF buffer.hdr2.connCtl.endOfMsg THEN RETURN[NIL, normal]; InputDequeue[handle]; LOOP }; finger.state _ emptying; RETURN[finger, normal] }; emptying => { WAIT handle.doneEmptying; LOOP }; empty => { IF IsTimedOut[handle.getPulseOut, startTime] THEN RETURN [NIL, timedOut]; WAIT handle.inputReady; LOOP }; ENDCASE => ERROR; ENDLOOP; }; GetBlockLast: ENTRY PROC [handle: Handle, oldFinger: Finger] ~ { IF oldFinger = NIL THEN ERROR; -- can't happen oldFinger.state _ halfEmpty; BROADCAST handle.doneEmptying; }; PutChar: PROC [self: STREAM, char: CHAR] ~ { handle: Handle ~ NARROW[self.streamData]; code: CompletionCode; DO code _ EntryPutChar[handle, char]; IF code = normal THEN RETURN; SIGNAL Timeout[]; ENDLOOP; }; EntryPutChar: ENTRY PROC [handle: Handle, char: CHAR] RETURNS [CompletionCode] ~ { ENABLE UNWIND => NULL; finger: Finger; startTime: Pulses _ Now[]; DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; finger _ handle.outputEnqueue; SELECT finger.state FROM empty => OutputMakeNotEmpty[handle]; filling => { WAIT handle.doneFilling; LOOP }; halfFull => NULL; full, sending => { IF IsTimedOut[handle.putPulseOut, startTime] THEN RETURN [timedOut]; WAIT handle.outputSpace; LOOP }; ENDCASE => ERROR; IF finger.bytes = LAST[BufIndex] THEN { OutputEnqueue[handle~handle, endOfMessage~FALSE]; LOOP }; TRUSTED { finger.buffer.body.chars[finger.bytes] _ char }; finger.bytes _ finger.bytes + 1; RETURN [normal]; ENDLOOP; }; PutBlock: PROC [self: STREAM, block: REF READONLY TEXT, startIndex: NAT, count: NAT] ~ { handle: Handle ~ NARROW[self.streamData]; finger: Finger _ NIL; base: LONG POINTER ~ LOOPHOLE[block, LONG POINTER]+SIZE[TEXT[0]]; -- WORD SIZE DEPENDENT ???? stop: NAT ~ MIN[(startIndex+count), block.length]; nBytes: NAT; code: CompletionCode; WHILE startIndex < stop DO [finger, code] _ PutBlockNext[self, handle, finger]; SELECT code FROM timedOut => { SIGNAL Timeout[]; LOOP }; normal => NULL; ENDCASE => ERROR; IF finger = NIL THEN ERROR; -- can't happen TRUSTED { nBytes _ PrincOpsUtils.ByteBlt[ to~[ blockPointer~@finger.buffer.body, -- WORD SIZE DEPENDENT ???? startIndex~finger.bytes, stopIndexPlusOne~BufIndex.LAST], from~[ blockPointer~base, startIndex~startIndex, stopIndexPlusOne~stop] ]; }; finger.bytes _ finger.bytes + nBytes; startIndex _ startIndex + nBytes; ENDLOOP; IF finger # NIL THEN PutBlockLast[handle, finger]; }; UnsafePutBlock: PROC [self: STREAM, block: IO.UnsafeBlock] ~ { handle: Handle ~ NARROW[self.streamData]; finger: Finger _ NIL; startIndex: INT _ block.startIndex; stop: INT ~ block.startIndex+block.count; nBytes: NAT; code: CompletionCode; WHILE startIndex < stop DO [finger, code] _ PutBlockNext[self, handle, finger]; SELECT code FROM timedOut => { SIGNAL Timeout[]; LOOP }; normal => NULL; ENDCASE => ERROR; IF finger = NIL THEN ERROR; -- can't happen TRUSTED { nBytes _ PrincOpsUtils.ByteBlt[ to~[ blockPointer~@finger.buffer.body, -- WORD SIZE DEPENDENT ???? startIndex~finger.bytes, stopIndexPlusOne~BufIndex.LAST], from~[ blockPointer~block.base, startIndex~startIndex, stopIndexPlusOne~stop] ]; }; finger.bytes _ finger.bytes + nBytes; startIndex _ startIndex + nBytes; ENDLOOP; IF finger # NIL THEN PutBlockLast[handle, finger]; }; PutBlockNext: ENTRY PROC [self: STREAM, handle: Handle, oldFinger: Finger] RETURNS [Finger, CompletionCode] ~ { ENABLE UNWIND => NULL; finger: Finger _ NIL; startTime: Pulses _ Now[]; IF oldFinger # NIL THEN { oldFinger.state _ halfFull; BROADCAST handle.doneFilling; OutputEnqueue[handle~handle, endOfMessage~FALSE] }; DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; finger _ handle.outputEnqueue; SELECT finger.state FROM empty => OutputMakeNotEmpty[handle]; filling => { WAIT handle.doneFilling; LOOP }; halfFull => NULL; full, sending => { IF IsTimedOut[handle.putPulseOut, startTime] THEN RETURN [NIL, timedOut]; WAIT handle.outputSpace; LOOP }; ENDCASE => ERROR; IF finger.bytes = LAST[BufIndex] THEN { OutputEnqueue[handle~handle, endOfMessage~FALSE]; LOOP }; finger.state _ filling; RETURN[finger, normal]; ENDLOOP; }; PutBlockLast: ENTRY PROC [handle: Handle, finger: Finger] ~ { ENABLE UNWIND => NULL; finger.state _ halfFull; BROADCAST handle.doneFilling; }; EndOf: PROC [self: STREAM] RETURNS [BOOL] ~ { handle: Handle ~ NARROW[self.streamData]; RETURN [EntryEndOf[handle]]; }; EntryEndOf: ENTRY PROC [handle: Handle] RETURNS [BOOL] ~ { ENABLE UNWIND => NULL; finger: Finger; DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; finger _ handle.inputDequeue; SELECT finger.state FROM full => { finger.state _ halfEmpty; LOOP }; emptying => { WAIT handle.doneEmptying; LOOP }; halfEmpty => RETURN [ finger.attention OR (finger.buffer.hdr2.sst # handle.inputSSType) OR ((finger.index = finger.bytes) AND finger.buffer.hdr2.connCtl.endOfMsg) ]; empty => RETURN [ FALSE ]; ENDCASE => ERROR; ENDLOOP; }; CharsAvail: PROC [self: STREAM, wait: BOOL] RETURNS [nChars: INT] ~ { handle: Handle ~ NARROW[self.streamData]; code: CompletionCode; DO [nChars, code] _ EntryCharsAvail[handle, wait]; IF code = normal THEN RETURN; SIGNAL Timeout[]; ENDLOOP; }; EntryCharsAvail: ENTRY PROC [handle: Handle, wait: BOOL] RETURNS [INT, CompletionCode] ~ { ENABLE UNWIND => NULL; finger: Finger; startTime: Pulses _ Now[]; DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; finger _ handle.inputDequeue; SELECT finger.state FROM full, halfEmpty => { finger.state _ halfEmpty; IF finger.attention OR (finger.buffer.hdr2.sst # handle.inputSSType) THEN RETURN[0, normal]; IF finger.index < finger.bytes THEN RETURN [(finger.bytes - finger.index), normal]; IF finger.buffer.hdr2.connCtl.endOfMsg THEN RETURN[0, normal]; InputDequeue[handle]; LOOP }; emptying => { WAIT handle.doneEmptying; LOOP }; empty => { IF NOT wait THEN RETURN [0, normal]; IF IsTimedOut[handle.getPulseOut, startTime] THEN RETURN [0, timedOut]; WAIT handle.inputReady; LOOP }; ENDCASE => ERROR; -- Can't happen. ENDLOOP; }; Flush: PROC [self: STREAM] ~ { handle: Handle ~ NARROW[self.streamData]; code: CompletionCode; DO code _ EntryFlush[handle]; IF code = normal THEN RETURN; SIGNAL Timeout[]; ENDLOOP; }; EntryFlush: ENTRY PROC [handle: Handle] RETURNS [CompletionCode] ~ { ENABLE UNWIND => NULL; flushNum: CARDINAL; startTime: Pulses _ Now[]; <> DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; SELECT handle.outputEnqueue.state FROM filling => { WAIT handle.doneFilling; LOOP }; halfFull => { OutputEnqueue[handle~handle, endOfMessage~FALSE]; EXIT }; empty, full, sending => EXIT; ENDCASE => ERROR; ENDLOOP; <> IF handle.outputDequeueNum = handle.outputEnqueueNum THEN RETURN [normal]; <> IF handle.outputSendNum = handle.outputEnqueueNum THEN { DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; SELECT handle.outputEnqueue.state FROM empty => { OutputMakeNotEmpty[handle]; EXIT }; filling => { WAIT handle.doneFilling; LOOP }; halfFull => EXIT; full, sending => { IF IsTimedOut[handle.putPulseOut, startTime] THEN RETURN [timedOut]; WAIT handle.outputSpace; LOOP }; ENDCASE => ERROR; ENDLOOP; OutputEnqueue[handle~handle, endOfMessage~FALSE]; }; <> flushNum _ handle.outputEnqueueNum; handle.flusherCnt _ handle.flusherCnt + 1; DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; IF SignedDiff[handle.outputDequeueNum, flushNum] >= 0 THEN EXIT; IF IsTimedOut[handle.putPulseOut, startTime] THEN RETURN [timedOut]; WAIT handle.flusherWakeup; ENDLOOP; handle.flusherCnt _ handle.flusherCnt - 1; RETURN [normal]; }; Close: PROC [self: STREAM, abort: BOOL] = { handle: Handle ~ NARROW[self.streamData]; EntryNotifyClosed[handle, localClose, "close called"]; FinishHandle[handle]; }; <> nSent: INT _ 0; EntryPush: ENTRY PROC [handle: Handle, sentFinger: Finger] RETURNS [sendFinger: Finger _ NIL, sendBuffer: XNSBuffer _ NIL] ~ { <> <> <> ENABLE UNWIND => NULL; IF sentFinger # NIL THEN { sentFinger.state _ full; BROADCAST handle.doneSending }; DO <> IF handle.closed THEN RETURN; <> IF handle.outputSendNum = handle.outputEnqueueNum THEN { WAIT handle.outputReady; LOOP }; <> IF SignedDiff[handle.recvdAllocNum, handle.outputSendNum] < 0 THEN { WAIT handle.recvdNewAlloc; LOOP }; <> { halfMyBuffers, windowLeft, nSentNoAckReq: INTEGER; sendAckReq: BOOL; halfMyBuffers _ INTEGER[handle.outputBuffersAllocated / 2]; windowLeft _ SignedDiff[handle.recvdAllocNum, handle.outputSendNum]; nSentNoAckReq _ SignedDiff[handle.outputSendNum, handle.expectedAckNum]; sendAckReq _ ((windowLeft = halfMyBuffers) OR (windowLeft = 0) OR (nSentNoAckReq >= halfMyBuffers) OR (handle.flusherCnt > 0)); sendFinger _ handle.outputSend; ReadyOutputBuffer[handle~handle, buffer~sendFinger.buffer, requestAck~sendAckReq]; sendFinger.state _ sending; TRUSTED { sendBuffer _ LOOPHOLE[sendFinger.buffer] }; handle.outputSendNum _ handle.outputSendNum + 1; handle.outputSend _ sendFinger.next; IF sendAckReq THEN BROADCAST handle.mgrLongWakeup; RETURN; }; ENDLOOP; }; Push: PROC [handle: Handle] ~ { sendBuffer: XNSBuffer _ NIL; sendFinger: Finger _ NIL; Process.SetPriority[Process.priorityForeground]; DO [sendFinger, sendBuffer] _ EntryPush[handle, sendFinger]; IF sendFinger = NIL THEN EXIT; CheckUserBytes[sendBuffer, XNSSPPBuf.hdrBytes + sendFinger.bytes]; -- DEBUG XNSSocketBackdoor.PutCached[handle.socket, sendBuffer]; ENDLOOP; IF sendBuffer # NIL THEN ERROR; -- Can't happen ???? }; <> nResent: INT _ 0; MaybeFlushCache: INTERNAL PROC [handle: Handle, pulseOut: Pulses] ~ { IF PulsesSince[handle.cacheFlushedTime] > pulseOut THEN { handle.cacheFlushedTime _ Now[]; XNSSocketBackdoor.FlushCache[handle.socket] }; }; EntryMgr: ENTRY PROC [handle: Handle, sentFinger: Finger] RETURNS [sendFinger: Finger _ NIL, sendBuffer: XNSBuffer _ NIL] ~ { ENABLE UNWIND => NULL; ackReqOutstanding, allocReqOutstanding, timedOutAck, timedOutProbe: BOOL; b: SPPBuffer _ NIL; IF sentFinger # NIL THEN { sentFinger.state _ full; BROADCAST handle.doneSending }; DO <> IF PulsesSince[handle.recvdTime] >= handle.noActivityPulseOut THEN NotifyClosed[handle, unknown, "no activity timeout"]; IF handle.closed THEN RETURN; <> <> MaybeFlushCache[handle, handle.cacheFlushPulseOut]; ackReqOutstanding _ ( <> (handle.outputDequeueNum # handle.outputSendNum) <> AND (SignedDiff[handle.expectedAckNum, handle.outputDequeueNum] > 0)); allocReqOutstanding _ ( <> (handle.outputSendNum # handle.outputEnqueueNum) <> AND (SignedDiff[handle.outputSendNum, handle.recvdAllocNum] > 0)); timedOutAck _ ( PulsesSince[handle.sentAckReqTime] > handle.waitForAckPulseOut); timedOutProbe _ ( (PulsesSince[handle.recvdTime] > handle.probePulseOut) AND (PulsesSince[handle.sentAckReqTime] > handle.probePulseOut)); SELECT TRUE FROM (ackReqOutstanding AND timedOutAck) => { <> SELECT handle.outputDequeue.state FROM full => NULL; sending => { WAIT handle.doneSending; LOOP }; ENDCASE => ERROR; MaybeFlushCache[handle, 3*handle.waitForAckPulseOut]; -- What's the right timeout to use here ???? nResent _ nResent.SUCC; sendFinger _ handle.outputDequeue; sendFinger.state _ sending; b _ sendFinger.buffer; TRUSTED { sendBuffer _ LOOPHOLE[b] }; ReadyOutputBuffer[handle, b, TRUE]; RETURN }; (timedOutProbe OR (allocReqOutstanding AND timedOutAck)) => { <> [nsb~sendBuffer, b~b] _ AllocateOutputBuffer[handle]; InitSystemOutputBuffer[handle, b]; ReadyOutputBuffer[handle, b, TRUE]; RETURN }; handle.mustSendAck => { <> [nsb~sendBuffer, b~b] _ AllocateOutputBuffer[handle]; InitSystemOutputBuffer[handle, b]; ReadyOutputBuffer[handle, b, FALSE]; RETURN }; (ackReqOutstanding OR allocReqOutstanding) => { <> WAIT handle.mgrShortWakeup; LOOP }; ENDCASE => { <> WAIT handle.mgrLongWakeup; LOOP }; ENDLOOP; }; Mgr: PROC [handle: Handle] ~ { sendBuffer: XNSBuffer _ NIL; sendFinger: Finger _ NIL; Process.SetPriority[Process.priorityForeground]; DO [sendFinger, sendBuffer] _ EntryMgr[handle, sendFinger]; SELECT TRUE FROM (sendFinger # NIL) => { CheckUserBytes[sendBuffer, XNSSPPBuf.hdrBytes + sendFinger.bytes]; -- DEBUG XNSSocketBackdoor.PutCached[handle.socket, sendBuffer]; }; (sendBuffer # NIL) => { CheckUserBytes[sendBuffer, XNSSPPBuf.hdrBytes]; -- DEBUG XNSSocketBackdoor.PutCached[handle.socket, sendBuffer]; XNSSocket.FreeBuffer[handle.socket, sendBuffer]; }; ENDCASE => EXIT; ENDLOOP; }; <> nRecvd: INT _ 0; nTooShort: INT _ 0; nBadConnID: INT _ 0; nBadRemoteConnID: INT _ 0; GotNewAck: INTERNAL PROC [handle: Handle, newAckNum: CARDINAL] ~ { <> <> finger: Finger; <> IF SignedDiff[newAckNum, handle.outputSendNum] > 0 THEN { <> RETURN }; WHILE SignedDiff[handle.outputDequeueNum, newAckNum] < 0 DO IF handle.closed THEN RETURN; finger _ handle.outputDequeue; SELECT finger.state FROM full => NULL; sending => { -- extremely unlikely; occurs only when resending. WAIT handle.doneSending; LOOP }; ENDCASE => ERROR; IF finger.attention THEN { <> temp: Finger; nsb: XNSBuffer; FOR temp _ finger, temp.next WHILE temp.next # finger DO NULL ENDLOOP; temp.next _ finger.next; IF finger.buffer = NIL THEN ERROR; -- Can't happen ???? TRUSTED { nsb _ LOOPHOLE[finger.buffer] }; XNSSocket.FreeBuffer[handle.socket, nsb]; finger.buffer _ NIL; handle.outputIBAttnCnt _ handle.outputIBAttnCnt.PRED; } ELSE { <> finger.state _ empty; BROADCAST handle.outputSpace; }; handle.outputDequeueNum _ handle.outputDequeueNum + 1; handle.outputDequeue _ finger.next; ENDLOOP; IF handle.flusherCnt > 0 THEN BROADCAST handle.flusherWakeup; }; GotAckReq: INTERNAL PROC [handle: Handle, seqNum: CARDINAL, systemPacket: BOOL] ~ -- INLINE -- { <> IF systemPacket OR (SignedDiff[seqNum, handle.inputEnqueueNum] < 0) THEN { handle.mustSendAck _ TRUE; BROADCAST handle.mgrShortWakeup; BROADCAST handle.mgrLongWakeup } ELSE { handle.heWantsAlloc _ TRUE }; }; GotNewAlloc: INTERNAL PROC [handle: Handle, newAllocNum: CARDINAL] ~ -- INLINE -- { handle.recvdAllocNum _ newAllocNum; BROADCAST handle.recvdNewAlloc; }; GotOBAttn: INTERNAL PROC [handle: Handle, seqNum: CARDINAL, type: AttentionType] ~ { p, prev, new: OBAttn; seqDiff: INTEGER ~ SignedDiff[seqNum, handle.inputDequeueNum]; IF (seqDiff < 0) OR (seqDiff > 50) THEN RETURN; -- what's to use instead of 50 ???? prev _ NIL; p _ handle.recvdOBAttnList; DO IF p = NIL THEN EXIT; IF p.seqNum = seqNum THEN RETURN; IF p.seqNum > seqNum THEN EXIT; prev _ p; p _ p.next; ENDLOOP; new _ NEW[OBAttnObject_[next~p, seqNum~seqNum, attentionType~type]]; IF prev = NIL THEN handle.recvdOBAttnList _ new ELSE prev.next _ new; BROADCAST handle.recvdOBAttn; }; GotData: INTERNAL PROC [handle: Handle, newSeqNum: CARDINAL, b: SPPBuffer, bytes: NAT] RETURNS [swapb: SPPBuffer] ~ -- INLINE -- { <> finger: Finger; <> IF SignedDiff[newSeqNum, handle.inputAckNum] < 0 THEN { RETURN[b] }; <> IF SignedDiff[newSeqNum, handle.inputDequeueNum] >= INT[handle.inputBuffersAllocated] THEN { <> RETURN[b] }; WHILE (SignedDiff[newSeqNum, handle.inputEnqueueNum] >= 0) DO handle.inputEnqueue _ handle.inputEnqueue.next; handle.inputEnqueueNum _ handle.inputEnqueueNum + 1; ENDLOOP; <> finger _ handle.inputAck; THROUGH [0 .. SignedDiff[newSeqNum, handle.inputAckNum]) DO finger _ finger.next; ENDLOOP; <> swapb _ finger.buffer; finger.buffer _ b; finger.bytes _ bytes; finger.index _ 0; finger.attention _ b.hdr2.connCtl.attn; finger.state _ full; <> WHILE (handle.inputAck.state = full) AND (handle.inputAckNum # handle.inputEnqueueNum) DO BROADCAST handle.inputReady; handle.inputAckNum _ handle.inputAckNum + 1; handle.inputAck _ handle.inputAck.next; ENDLOOP; }; Receive: XNSSocketBackdoor.ReceiveProc <<[handle: XNSSocket.Handle, b: XNSBuf.Buffer, clientData: REF ANY] RETURNS [XNSBuf.Buffer]>> ~ { nRecvd _ nRecvd.SUCC; -- DEBUG RETURN [ EntryReceive[NARROW[clientData], b] ] }; EntryReceive: ENTRY PROC [handle: Handle, nsb: XNSBuf.Buffer] RETURNS [XNSBuf.Buffer] ~ { <> ENABLE UNWIND => NULL; bytes: NAT; IF handle.closed THEN RETURN [nsb]; bytes _ XNSSocket.GetUserBytes[nsb]; SELECT nsb.hdr1.type FROM error => { OPEN XNSErrorTypes; eb: XNSErrorBuf.Buffer; TRUSTED { eb _ LOOPHOLE[nsb] }; SELECT eb.hdr2.type FROM noSocketErr, listenerRejectErr, protocolViolationErr => { NotifyClosed[handle, remoteClose, "remote close"] }; ENDCASE => { XNSSocketBackdoor.FlushCache[handle.socket] }; RETURN [nsb] }; spp => { b: XNSSPPBuf.Buffer; newSeqNum: CARDINAL; newAckNum: CARDINAL; newAllocNum: CARDINAL; TRUSTED { b _ LOOPHOLE[nsb] }; IF bytes < XNSSPPBuf.hdrBytes THEN { XNSSocket.ReturnError[handle.socket, nsb, XNSErrorTypes.unspecifiedErr]; nsb _ NIL; nTooShort _ nTooShort.SUCC; -- DEBUG RETURN [nsb] }; IF b.hdr2.destConnID # handle.connectionID THEN { IF b.hdr2.destConnID # unknownConnectionID THEN { XNSSocket.ReturnError[handle.socket, nsb, XNSErrorTypes.protocolViolationErr]; nsb _ NIL; nBadConnID _ nBadConnID.SUCC; -- DEBUG RETURN [nsb] }; }; IF b.hdr2.sourceConnID # handle.remoteConnectionID THEN { XNSSocket.ReturnError[handle.socket, nsb, XNSErrorTypes.protocolViolationErr]; nsb _ NIL; nBadRemoteConnID _ nBadRemoteConnID.SUCC; -- DEBUG RETURN [nsb] }; newSeqNum _ Endian.CardFromH[b.hdr2.seqNum]; newAckNum _ Endian.CardFromH[b.hdr2.ackNum]; newAllocNum _ Endian.CardFromH[b.hdr2.allocNum]; handle.recvdTime _ Now[]; <<>> IF SignedDiff[newAllocNum, handle.recvdAllocNum] > 0 THEN GotNewAlloc[handle, newAllocNum]; <<>> IF SignedDiff[newAckNum, handle.outputDequeueNum] >= 0 THEN GotNewAck[handle, newAckNum]; <<>> IF b.hdr2.connCtl.sendAck THEN GotAckReq[handle, newSeqNum, b.hdr2.connCtl.system]; <> IF NOT b.hdr2.connCtl.system THEN { IF b.hdr2.connCtl.attn THEN TRUSTED { GotOBAttn[handle, newSeqNum, LOOPHOLE[b.body.bytes[0]] ] }; b _ GotData[handle, newSeqNum, b, (bytes - XNSSPPBuf.hdrBytes)]; TRUSTED { nsb _ LOOPHOLE[b] }; }; RETURN [nsb]; }; ENDCASE => { XNSSocket.ReturnError[handle.socket, nsb, XNSErrorTypes.invalidPacketTypeErr]; nsb _ NIL; RETURN [nsb]; }; }; Pull: PROC [handle: Handle] ~ { nsb: XNSBuf.Buffer _ NIL; XNSSocket.SetGetTimeout[handle.socket, XNSSocket.waitForever]; XNSSocketBackdoor.SetDirectReceive[handle.socket, Receive, handle]; WHILE NOT handle.closed DO IF (nsb _ XNSSocket.Get[handle.socket]) = NIL THEN LOOP; IF (nsb _ Receive[handle.socket, nsb, handle]) = NIL THEN LOOP; XNSSocket.FreeBuffer[handle.socket, nsb]; nsb _ NIL; ENDLOOP; XNSSocketBackdoor.SetDirectReceive[handle.socket, NIL, NIL]; handle _ NIL; -- for finalization }; <> <> <<>> globalLockHandle: Handle _ NEW[Object]; handleChain: Handle _ NIL; AddNewHandle: ENTRY PROC [handle: Handle _ globalLockHandle, newHandle: Handle] ~ { IF handle # globalLockHandle THEN ERROR; newHandle.next _ handleChain; handleChain _ newHandle; }; RemoveOldHandle: ENTRY PROC [handle: Handle _ globalLockHandle, oldHandle: Handle] ~ { IF handle # globalLockHandle THEN ERROR; IF handleChain = oldHandle THEN handleChain _ handleChain.next ELSE FOR h: Handle _ handleChain, h.next DO IF h.next = oldHandle THEN { h.next _ oldHandle.next; EXIT }; ENDLOOP; -- NIL fault if not on chain oldHandle.next _ NIL; -- Help Finalization }; FindExistingHandle: ENTRY PROC [handle: Handle _ globalLockHandle, remote: XNS.Address, remoteConnID: HWORD] RETURNS [existing: Handle] ~ { IF handle # globalLockHandle THEN ERROR; FOR existing _ handleChain, existing.next WHILE existing # NIL DO IF (existing.remote = remote) AND (existing.remoteConnectionID = remoteConnID) THEN EXIT; ENDLOOP; }; Rollback: Booting.RollbackProc = { FOR handle: Handle _ handleChain, handle.next WHILE handle # NIL DO IF NOT handle.closed THEN EntryNotifyClosed[handle, localClose, "Rollback"]; ENDLOOP; }; <> uniqueConnID: CARDINAL _ Basics.LowHalf[Now[]]; GetUniqueConnID: ENTRY PROC [handle: Handle _ globalLockHandle] RETURNS [HWORD] ~ { h: Handle; IF handle # globalLockHandle THEN ERROR; DO uniqueConnID _ uniqueConnID + 1; FOR h _ handleChain, h.next WHILE (h # NIL) AND (h.connectionID # uniqueConnID) DO NULL ENDLOOP; IF h = NIL THEN RETURN [Endian.HFromCard[uniqueConnID]]; ENDLOOP; }; <> listenerChain: Listener _ NIL; AddNewListener: ENTRY PROC [handle: Handle _ globalLockHandle, newListener: Listener] ~ { newListener.next _ listenerChain; listenerChain _ newListener }; RemoveOldListener: ENTRY PROC [handle: Handle _ globalLockHandle, oldListener: Listener] ~ { IF oldListener = listenerChain THEN listenerChain _ listenerChain.next ELSE FOR p: Listener _ listenerChain, p.next DO IF p.next = oldListener THEN { p.next _ oldListener.next; EXIT } ENDLOOP; -- NIL fault if not on chain oldListener.next _ NIL; }; <> <> <> <<>> droppedStreams: INT _ 0; finalizedStreams: INT _ 0; streamFinalizerQueue: SafeStorage.FinalizationQueue _ SafeStorage.NewFQ[]; StreamFinalizer: PROC ~ { Process.SetPriority[Process.priorityBackground]; DO handle: Handle _ NARROW[SafeStorage.FQNext[streamFinalizerQueue]]; IF NOT handle.finished THEN { -- User forgot to call Destroy IF NOT handle.closed THEN ERROR; -- Can't happen FinishHandle[handle]; SafeStorage.EnableFinalization[handle]; droppedStreams _ droppedStreams.SUCC; } ELSE { -- Normal end of life RemoveOldHandle[oldHandle~handle]; finalizedStreams _ finalizedStreams.SUCC; }; handle _ NIL; ENDLOOP; }; <> <> <<>> droppedListeners: INT _ 0; finalizedListeners: INT _ 0; listenerFinalizerQueue: SafeStorage.FinalizationQueue _ SafeStorage.NewFQ[]; ListenerFinalizer: PROC ~ { Process.SetPriority[Process.priorityBackground]; DO listener: Listener _ NARROW[SafeStorage.FQNext[listenerFinalizerQueue]]; IF NOT listener.destroyed THEN { -- User forgot to call Destroy DestroyListener[listener]; SafeStorage.EnableFinalization[listener]; droppedListeners _ droppedListeners.SUCC; } ELSE { -- Normal end of life RemoveOldListener[oldListener~listener]; finalizedListeners _ finalizedListeners.SUCC; }; listener _ NIL; ENDLOOP; }; <> <> { established: BOOL; established _ TRUE; SafeStorage.EstablishFinalization[type~CODE[Object], npr~1, fq~streamFinalizerQueue ! SafeStorage.CantEstablishFinalization => { established _ FALSE; CONTINUE }]; IF NOT established THEN { established _ TRUE; SafeStorage.ReEstablishFinalization[type~CODE[Object], npr~1, fq~streamFinalizerQueue ! SafeStorage.CantEstablishFinalization => { established _ FALSE; CONTINUE }]; }; IF NOT established THEN ERROR }; TRUSTED { Process.Detach[FORK StreamFinalizer[]]; }; <> { established: BOOL; established _ TRUE; SafeStorage.EstablishFinalization[type~CODE[ListenerObject], npr~1, fq~listenerFinalizerQueue ! SafeStorage.CantEstablishFinalization => { established _ FALSE; CONTINUE }]; IF NOT established THEN { established _ TRUE; SafeStorage.ReEstablishFinalization[type~CODE[ListenerObject], npr~1, fq~listenerFinalizerQueue ! SafeStorage.CantEstablishFinalization => { established _ FALSE; CONTINUE }]; }; IF NOT established THEN ERROR }; TRUSTED { Process.Detach[FORK ListenerFinalizer[]]; }; <> Booting.RegisterProcs[r: Rollback]; }.