DIRECTORY Basics USING [LowHalf], BasicTime USING [GetClockPulses, MicrosecondsToPulses, Pulses, PulsesToMicroseconds], Booting USING [RegisterProcs, RollbackProc], Endian USING [CardFromH, HFromCard, HWORD], IO USING [CreateStream, CreateStreamProcs, EndOfStream, STREAM, StreamProcs, UnsafeBlock], PrincOpsUtils USING [ByteBlt], Process USING [Abort, ConditionPointer, Detach, DisableTimeout, EnableAborts, Milliseconds, MsecToTicks, priorityBackground, priorityForeground, Seconds, SecondsToTicks, SetPriority, SetTimeout, Ticks], Rope USING [ROPE], SafeStorage USING [CantEstablishFinalization, EnableFinalization, EstablishFinalization, FinalizationQueue, FQNext, NewFQ, ReEstablishFinalization], XNS USING [Address, Socket], XNSBuf USING [Buffer], XNSEchoBuf USING [Buffer, hdrBytes], XNSErrorBuf USING [Buffer], XNSErrorTypes USING [badChecksumErr, cantGetThereErr, invalidPacketTypeErr, listenerRejectErr, noSocketErr, protocolViolationErr, resourceLimitsErr, unspecifiedErr], XNSSocket USING [AllocBuffer, Create, Destroy, FreeBuffer, Get, GetRemoteAddress, GetUserBytes, Handle, Kick, Milliseconds, Put, ReturnError, ReturnToSender, SetGetTimeout, SetRemoteAddress, SetUserBytes, waitForever], XNSSocketBackdoor USING [FlushCache, PutCached, ReceiveProc, SetDirectReceive], XNSSPPBuf USING [Buffer, ConnCtl, hdrBytes], XNSSPPTypes USING [endReplySST, endSST], XNSStream USING [AttentionType, CloseReason, FilterProc, ListenerProc, Milliseconds, State, SubSequenceType, waitForever], XNSStreamPrivate USING [AttentionType, BufIndex, Finger, FingerObject, Handle, HWORD, OBAttn, OBAttnObject, Object, SignedDiff, SPPBuffer, SubSequenceType, unknownConnectionID, XNSBuffer]; XNSStreamImpl: CEDAR MONITOR LOCKS handle USING handle: Handle IMPORTS Basics, BasicTime, Booting, Endian, IO, PrincOpsUtils, Process, SafeStorage, XNSStreamPrivate, XNSSocket, XNSSocketBackdoor EXPORTS XNSStream ~ { OPEN XNSStreamPrivate; CheckUserBytes: PROC [buffer: XNSBuffer, bytes: CARDINAL] ~ { IF bytes # XNSSocket.GetUserBytes[buffer] THEN ERROR }; ConnectionClosed: PUBLIC ERROR [why: XNSStream.CloseReason, text: Rope.ROPE] ~ CODE; Timeout: PUBLIC SIGNAL ~ CODE; CompletionCode: TYPE ~ { normal, timedOut }; STREAM: TYPE ~ IO.STREAM; streamProcs: REF IO.StreamProcs ~ IO.CreateStreamProcs [ variety~$inputOutput, class~$XNS, getChar~GetChar, getBlock~GetBlock, unsafeGetBlock~UnsafeGetBlock, endOf~EndOf, charsAvail~CharsAvail, putChar~PutChar, putBlock~PutBlock, unsafePutBlock~UnsafePutBlock, flush~Flush, close~Close ]; Milliseconds: TYPE ~ XNSStream.Milliseconds; waitForever: Milliseconds ~ XNSStream.waitForever; Pulses: TYPE ~ BasicTime.Pulses; oneSecondOfPulses: Pulses ~ BasicTime.MicrosecondsToPulses[1000000]; MsecToPulses: PROC [msec: Milliseconds] RETURNS [Pulses] ~ { IF msec >= waitForever THEN RETURN [Pulses.LAST] ELSE RETURN [BasicTime.MicrosecondsToPulses[1000*msec]] }; PulsesSince: PROC [then: Pulses] RETURNS [Pulses] ~ INLINE { RETURN [BasicTime.GetClockPulses[] - then] }; TimeoutTicksFromPulses: PROC[pulses: Pulses] RETURNS[ticks: Process.Ticks] ~ { usec: LONG CARDINAL ~ BasicTime.PulsesToMicroseconds[pulses]; msec: Milliseconds; msec _ (usec + usec/10) / 1000; IF msec < Process.Milliseconds.LAST THEN RETURN[ Process.MsecToTicks[Process.Milliseconds[msec]] ]; RETURN[ Process.SecondsToTicks[Process.Seconds[msec/1000]] ]; }; Now: PROC RETURNS [Pulses] ~ INLINE { RETURN [BasicTime.GetClockPulses[]] }; IsTimedOut: PROC [timeout, then: Pulses] RETURNS [BOOL] ~ INLINE { RETURN [(BasicTime.GetClockPulses[] - then) > timeout] }; defaultConnCtl: XNSSPPBuf.ConnCtl ~ [ system~FALSE, sendAck~FALSE, attn~FALSE, endOfMsg~FALSE, filler~0]; defaultSystemConnCtl: XNSSPPBuf.ConnCtl ~ [ system~TRUE, sendAck~FALSE, attn~FALSE, endOfMsg~FALSE, filler~0]; defaultAttnConnCtl: XNSSPPBuf.ConnCtl ~ [ system~FALSE, sendAck~FALSE, attn~TRUE, endOfMsg~FALSE, filler~0]; AllocateOutputBuffer: INTERNAL PROC [handle: Handle] RETURNS [nsb: XNSBuffer, b: SPPBuffer] ~ { nsb _ XNSSocket.AllocBuffer[handle.socket]; TRUSTED { b _ LOOPHOLE[nsb] }; b.hdr1.type _ spp; b.hdr2.sourceConnID _ handle.connectionID; b.hdr2.destConnID _ handle.remoteConnectionID; }; OutputMakeNotEmpty: INTERNAL PROC [handle: Handle] ~ { finger: Finger ~ handle.outputEnqueue; buffer: SPPBuffer _ finger.buffer; IF finger.state # empty THEN ERROR; -- Can't happen. IF buffer = NIL THEN finger.buffer _ buffer _ AllocateOutputBuffer[handle].b; buffer.hdr2.connCtl _ IF finger.attention THEN defaultAttnConnCtl ELSE defaultConnCtl; buffer.hdr2.sst _ handle.outputSSType; buffer.hdr2.seqNum _ Endian.HFromCard[handle.outputEnqueueNum]; finger.state _ halfFull; finger.index _ finger.bytes _ 0; }; InitSystemOutputBuffer: INTERNAL PROC [handle: Handle, buffer: SPPBuffer] ~ { buffer.hdr2.connCtl _ defaultSystemConnCtl; buffer.hdr2.sst _ handle.outputSSType; buffer.hdr2.seqNum _ Endian.HFromCard[handle.outputSendNum]; TRUSTED { XNSSocket.SetUserBytes[LOOPHOLE[buffer], XNSSPPBuf.hdrBytes] }; }; OutputEnqueue: INTERNAL PROC [handle: Handle, endOfMessage: BOOL] ~ { finger: Finger ~ handle.outputEnqueue; IF finger.state # halfFull THEN ERROR; -- Can't happen. finger.buffer.hdr2.connCtl.endOfMsg _ endOfMessage; TRUSTED { XNSSocket.SetUserBytes[LOOPHOLE[finger.buffer], XNSSPPBuf.hdrBytes + finger.bytes] }; finger.state _ full; IF finger.index # 0 THEN ERROR; -- Can't happen. handle.outputEnqueue _ handle.outputEnqueue.next; handle.outputEnqueueNum _ handle.outputEnqueueNum + 1; NOTIFY handle.outputReady; }; ReadyOutputBuffer: INTERNAL PROC [handle: Handle, buffer: SPPBuffer, requestAck: BOOL] ~ { expectedAckNum, allocNum: CARDINAL; IF requestAck THEN { expectedAckNum _ Endian.CardFromH[buffer.hdr2.seqNum]; IF (NOT buffer.hdr2.connCtl.system) AND (expectedAckNum = handle.outputSendNum) THEN expectedAckNum _ expectedAckNum + 1; handle.expectedAckNum _ expectedAckNum; buffer.hdr2.connCtl.sendAck _ TRUE; handle.sentAckReqTime _ Now[] }; buffer.hdr2.ackNum _ Endian.HFromCard[handle.inputAckNum]; handle.mustSendAck _ FALSE; allocNum _ handle.inputDequeueNum + handle.inputBuffersAllocated - 1; buffer.hdr2.allocNum _ Endian.HFromCard[allocNum]; IF allocNum # handle.sentAllocNum THEN { handle.sentAllocNum _ allocNum }; IF buffer.hdr1.type # spp THEN ERROR; IF buffer.hdr2.sourceConnID # handle.connectionID THEN ERROR; IF buffer.hdr2.destConnID # handle.remoteConnectionID THEN ERROR; }; MakeCopyOfXNSBuffer: PROC [sH: XNSSocket.Handle, nsb: XNSBuffer] RETURNS[copy: XNSBuffer] ~ { bytes: CARDINAL ~ Endian.CardFromH[nsb.hdr1.length]; copy _ XNSSocket.AllocBuffer[sH]; TRUSTED { [] _ PrincOpsUtils.ByteBlt[ to ~ [ blockPointer ~ @copy.hdr1, startIndex ~ 0, stopIndexPlusOne ~ bytes], from ~ [ blockPointer ~ @nsb.hdr1, startIndex ~ 0, stopIndexPlusOne ~ bytes] ]; }; }; InputDequeue: INTERNAL PROC [handle: Handle] ~ -- INLINE -- { finger: Finger; sendAllocNow, cantPiggyback: BOOL; finger _ handle.inputDequeue; finger.state _ empty; finger.attention _ FALSE; handle.inputDequeue _ finger.next; handle.inputDequeueNum _ handle.inputDequeueNum + 1; sendAllocNow _ handle.heWantsAlloc AND (SignedDiff[handle.inputEnqueueNum, handle.inputDequeueNum] <= (handle.inputBuffersAllocated/2)); IF sendAllocNow THEN handle.heWantsAlloc _ FALSE; cantPiggyback _ (handle.outputSendNum = handle.outputEnqueueNum) OR (SignedDiff[handle.recvdAllocNum, handle.outputSendNum] < 0); IF sendAllocNow AND cantPiggyback THEN { nsb: XNSBuffer; b: SPPBuffer; [nsb, b] _ AllocateOutputBuffer[handle]; InitSystemOutputBuffer[handle, b]; ReadyOutputBuffer[handle, b, FALSE]; CheckUserBytes[nsb, XNSSPPBuf.hdrBytes]; -- DEBUG XNSSocketBackdoor.PutCached[handle.socket, nsb]; XNSSocket.FreeBuffer[handle.socket, nsb]; }; }; NotifyClosed: INTERNAL PROC [handle: Handle, closeReason: XNSStream.CloseReason, closeText: Rope.ROPE] ~ { IF NOT handle.closed THEN { handle.closeReason _ closeReason; handle.closeText _ closeText; handle.closed _ TRUE }; BROADCAST handle.inputReady; BROADCAST handle.outputSpace; BROADCAST handle.outputReady; BROADCAST handle.recvdNewAlloc; BROADCAST handle.mgrShortWakeup; BROADCAST handle.mgrLongWakeup; BROADCAST handle.flusherWakeup; BROADCAST handle.recvdOBAttn; XNSSocket.Kick[handle.socket]; }; EntryNotifyClosed: ENTRY PROC [handle: Handle, closeReason: XNSStream.CloseReason, closeText: Rope.ROPE] ~ { NotifyClosed[handle, closeReason, closeText] }; MakeBufferRing: PROC [sH: XNSSocket.Handle, nBuffers: CARDINAL] RETURNS [finger: Finger _ NIL] ~ { tail: Finger _ NIL; THROUGH [1..nBuffers] DO finger _ NEW[ FingerObject _ [next~finger] ]; IF tail = NIL THEN tail _ finger; ENDLOOP; tail.next _ finger; }; FreeBufferRing: PROC [sH: XNSSocket.Handle, finger: Finger] ~ { WHILE finger # NIL DO IF finger.buffer # NIL THEN { nsb: XNSBuffer; TRUSTED { nsb _ LOOPHOLE[finger.buffer] }; XNSSocket.FreeBuffer[sH, nsb]; finger.buffer _ NIL }; { temp: Finger ~ finger.next; finger.next _ NIL; finger _ temp }; ENDLOOP; }; InitConditions: PROC [handle: Handle] ~ { TRUSTED { Process.EnableAborts[@handle.inputReady]; Process.EnableAborts[@handle.outputSpace]; Process.EnableAborts[@handle.outputReady]; Process.EnableAborts[@handle.recvdNewAlloc]; Process.EnableAborts[@handle.flusherWakeup]; Process.EnableAborts[@handle.recvdOBAttn]; }; }; InitTimeouts: PROC [handle: Handle, getTimeout, putTimeout: Milliseconds] ~ { handle.getTimeout _ getTimeout; handle.getPulseOut _ MsecToPulses[getTimeout]; IF getTimeout = waitForever THEN TRUSTED { Process.DisableTimeout[@handle.inputReady] } ELSE TRUSTED { Process.SetTimeout[@handle.inputReady, TimeoutTicksFromPulses[handle.getPulseOut]] }; handle.putTimeout _ putTimeout; handle.putPulseOut _ MsecToPulses[putTimeout]; IF putTimeout = waitForever THEN TRUSTED { Process.DisableTimeout[@handle.outputSpace] } ELSE TRUSTED { Process.SetTimeout[@handle.outputSpace, TimeoutTicksFromPulses[handle.putPulseOut]] }; handle.cacheFlushPulseOut _ 30 * oneSecondOfPulses; handle.roundTripPulses _ oneSecondOfPulses; UpdateTimeouts[handle]; }; UpdateTimeouts: PROC [handle: Handle] ~ { handle.waitForAckPulseOut _ 3 * handle.roundTripPulses; handle.waitForAllocPulseOut _ handle.waitForAckPulseOut; TRUSTED { Process.SetTimeout[@handle.mgrShortWakeup, TimeoutTicksFromPulses[handle.waitForAckPulseOut]] }; handle.probePulseOut _ 30 * handle.roundTripPulses; TRUSTED { Process.SetTimeout[@handle.mgrLongWakeup, TimeoutTicksFromPulses[handle.probePulseOut]] }; handle.noActivityPulseOut _ 100 * handle.roundTripPulses; }; MakeHandle: PROC [socket: XNSSocket.Handle, connectionID, remoteConnectionID: HWORD, sendBuffers, recvBuffers: CARDINAL, getTimeout, putTimeout: Milliseconds] RETURNS [handle: Handle] ~ { handle _ NEW[Object]; handle.socket _ socket; handle.remote _ XNSSocket.GetRemoteAddress[socket]; handle.recvdTime _ Now[]; handle.cacheFlushedTime _ Now[]; handle.sentAckReqTime _ Now[]; handle.connectionID _ connectionID; handle.remoteConnectionID _ remoteConnectionID; InitConditions[handle]; InitTimeouts[handle, getTimeout, putTimeout]; { f: Finger ~ MakeBufferRing[handle.socket, sendBuffers]; handle.outputEnqueue _ handle.outputSend _ handle.outputDequeue _ f; handle.outputBuffersAllocated _ sendBuffers }; { f: Finger ~ MakeBufferRing[handle.socket, recvBuffers]; handle.inputEnqueue _ handle.inputAck _ handle.inputDequeue _ f; handle.inputBuffersAllocated _ recvBuffers }; handle.pull _ FORK Pull[handle]; handle.mgr _ FORK Mgr[handle]; handle.push1 _ FORK Push[handle]; handle.push2 _ FORK Push[handle]; AddNewHandle[newHandle~handle]; SafeStorage.EnableFinalization[handle]; }; FinishHandle: PROC [handle: Handle] ~ { IF NOT handle.closed THEN ERROR; IF handle.finished THEN RETURN; TRUSTED { JOIN handle.push1; handle.push1 _ NIL; JOIN handle.push2; handle.push2 _ NIL; JOIN handle.mgr; handle.mgr _ NIL; JOIN handle.pull; handle.pull _ NIL }; FreeBufferRing[handle.socket, handle.outputEnqueue]; handle.outputEnqueue _ handle.outputSend _ handle.outputDequeue _ NIL; FreeBufferRing[handle.socket, handle.inputEnqueue]; handle.inputEnqueue _ handle.inputAck _ handle.inputDequeue _ NIL; XNSSocket.Destroy[handle.socket]; handle.finished _ TRUE }; SendRFC: PROC [socket: XNSSocket.Handle, connectionID: HWORD] ~ { nsb: XNSBuffer; b: SPPBuffer; nsb _ XNSSocket.AllocBuffer[socket]; TRUSTED { b _ LOOPHOLE[nsb] }; b.hdr1.type _ spp; b.hdr2.connCtl _ defaultSystemConnCtl; b.hdr2.connCtl.sendAck _ TRUE; b.hdr2.sst _ 0; b.hdr2.sourceConnID _ connectionID; b.hdr2.destConnID _ unknownConnectionID; b.hdr2.seqNum _ Endian.HFromCard[0]; b.hdr2.ackNum _ Endian.HFromCard[0]; b.hdr2.allocNum _ Endian.HFromCard[0]; XNSSocket.SetUserBytes[nsb, XNSSPPBuf.hdrBytes]; XNSSocket.Put[handle~socket, b~nsb]; }; Create: PUBLIC PROC [remote: XNS.Address, getTimeout, putTimeout: Milliseconds _ waitForever] RETURNS [STREAM] ~ { nsb: XNSBuffer _ NIL; socket: XNSSocket.Handle _ NIL; handle: Handle _ NIL; retransmissionTime: Milliseconds _ 500; connID: HWORD; getTimeout _ MIN[getTimeout, waitForever]; putTimeout _ MIN[putTimeout, waitForever]; BEGIN ENABLE UNWIND => { IF nsb # NIL THEN { XNSSocket.FreeBuffer[socket, nsb]; nsb _ NIL }; XNSSocket.Destroy[socket] }; socket _ XNSSocket.Create[remote~remote, sendBuffers~10, recvBuffers~14]; -- buffers ???? connID _ GetUniqueConnID[]; FOR nTries: CARDINAL IN [1..10] DO IF nTries > 5 THEN retransmissionTime _ 2*retransmissionTime; XNSSocket.SetGetTimeout[socket, retransmissionTime]; SendRFC[socket, connID]; nsb _ XNSSocket.Get[socket]; IF nsb = NIL THEN LOOP; SELECT nsb.hdr1.type FROM spp => { b: SPPBuffer; TRUSTED { b _ LOOPHOLE[nsb] }; IF b.hdr2.destConnID # connID THEN { XNSSocket.ReturnError[socket, nsb, XNSErrorTypes.protocolViolationErr]; nsb _ NIL; LOOP }; IF b.hdr1.source.host # remote.host THEN { XNSSocket.ReturnError[socket, nsb, XNSErrorTypes.protocolViolationErr]; nsb _ NIL; LOOP }; XNSSocket.SetRemoteAddress[socket, b.hdr1.source]; handle _ MakeHandle[socket~socket, connectionID~connID, remoteConnectionID~b.hdr2.sourceConnID, sendBuffers~6, recvBuffers~6, getTimeout~getTimeout, putTimeout~putTimeout]; nsb _ Receive[socket, nsb, handle]; IF nsb # NIL THEN { XNSSocket.FreeBuffer[socket, nsb]; nsb _ NIL }; RETURN [IO.CreateStream[streamProcs~streamProcs, streamData~handle]]; }; error => { OPEN XNSErrorTypes; eb: XNSErrorBuf.Buffer; TRUSTED { eb _ LOOPHOLE[nsb] }; SELECT eb.hdr2.type FROM badChecksumErr, resourceLimitsErr => LOOP; noSocketErr, listenerRejectErr, invalidPacketTypeErr, protocolViolationErr => ERROR ConnectionClosed[remoteReject, "rejected"]; cantGetThereErr => ERROR ConnectionClosed[noRoute, "no route"]; ENDCASE => ERROR ConnectionClosed[unknown, "error reply to RFC"]; }; ENDCASE => { XNSSocket.ReturnError[socket, nsb, XNSErrorTypes.invalidPacketTypeErr]; nsb _ NIL; LOOP }; ENDLOOP; ERROR ConnectionClosed[noResponse, "no response"]; END; }; Listener: TYPE ~ REF ListenerObject; ListenerObject: PUBLIC TYPE ~ RECORD [ next: Listener, destroyed: BOOL _ FALSE, listenProcess: PROCESS ]; CreateListener: PUBLIC PROC [ socket: XNS.Socket, worker: XNSStream.ListenerProc, getTimeout, putTimeout: Milliseconds _ waitForever, filter: XNSStream.FilterProc _ NIL, -- NIL => Accept all requests echoFilter: XNSStream.FilterProc _ NIL] -- NIL => Answer all echos RETURNS [listener: Listener] ~ { listener _ NEW[ ListenerObject _ [listenProcess~ FORK Listen[socket, worker, getTimeout, putTimeout, filter, echoFilter]] ]; AddNewListener[newListener~listener]; SafeStorage.EnableFinalization[listener]; }; DestroyListener: PUBLIC PROC [listener: Listener] ~ { IF listener.destroyed THEN RETURN; TRUSTED { Process.Abort[listener.listenProcess]; JOIN listener.listenProcess }; listener.destroyed _ TRUE; }; Listen: PROC [ socket: XNS.Socket, worker: XNSStream.ListenerProc, getTimeout, putTimeout: Milliseconds, filter: XNSStream.FilterProc, echoFilter: XNSStream.FilterProc] ~ { socketHandle: XNSSocket.Handle _ XNSSocket.Create[getTimeout~XNSSocket.waitForever, local~socket]; nsb: XNSBuffer _ NIL; bytes: NAT; { ENABLE UNWIND => { IF nsb # NIL THEN { XNSSocket.FreeBuffer[handle~socketHandle, b~nsb]; nsb _ NIL }; XNSSocket.Destroy[socketHandle] }; DO IF nsb # NIL THEN { XNSSocket.FreeBuffer[handle~socketHandle, b~nsb]; nsb _ NIL }; IF (nsb _ XNSSocket.Get[socketHandle]) = NIL THEN LOOP; bytes _ XNSSocket.GetUserBytes[nsb]; SELECT nsb.hdr1.type FROM echo => { b: XNSEchoBuf.Buffer; IF bytes < XNSEchoBuf.hdrBytes THEN LOOP; TRUSTED { b _ LOOPHOLE[nsb] }; IF b.hdr2.type # request THEN LOOP; IF (echoFilter # NIL) AND NOT echoFilter[nsb.hdr1.source] THEN LOOP; b.hdr2.type _ reply; { XNSSocket.ReturnToSender[handle~socketHandle, b~nsb]; nsb _ NIL }; }; spp => { b: SPPBuffer; streamSocketHandle: XNSSocket.Handle; stream: STREAM; handle: Handle; IF bytes < XNSSPPBuf.hdrBytes THEN LOOP; TRUSTED { b _ LOOPHOLE[nsb] }; IF (b.hdr2.seqNum # 0) OR (b.hdr2.destConnID # unknownConnectionID) OR (b.hdr2.sourceConnID = unknownConnectionID) THEN { XNSSocket.ReturnError[socketHandle, nsb, XNSErrorTypes.protocolViolationErr]; nsb _ NIL; LOOP }; IF FindExistingHandle[remote~nsb.hdr1.source, remoteConnID~b.hdr2.sourceConnID] # NIL THEN LOOP; IF (filter # NIL) AND NOT filter[nsb.hdr1.source] THEN { XNSSocket.ReturnError[socketHandle, nsb, XNSErrorTypes.listenerRejectErr]; nsb _ NIL; LOOP }; streamSocketHandle _ XNSSocket.Create[remote~nsb.hdr1.source, sendBuffers~10, recvBuffers~14]; -- how many buffers really? what should the timeouts be? ???? handle _ MakeHandle[socket~streamSocketHandle, connectionID~GetUniqueConnID[], remoteConnectionID~b.hdr2.sourceConnID, sendBuffers~6, recvBuffers~6, getTimeout~getTimeout, putTimeout~putTimeout]; -- what should buffers be really? ???? stream _ IO.CreateStream[streamProcs~streamProcs, streamData~handle]; b.hdr2.connCtl.sendAck _ TRUE; -- force us to reply nsb _ Receive[streamSocketHandle, nsb, handle]; TRUSTED { Process.Detach[FORK worker[stream~stream, remote~b.hdr1.source]] }; }; ENDCASE; ENDLOOP; }; }; GetTimeouts: PUBLIC PROC [self: IO.STREAM] RETURNS [getTimeout, putTimeout: Milliseconds] ~ { handle: Handle = NARROW[self.streamData]; [getTimeout, putTimeout] _ EntryGetTimeouts[handle] }; EntryGetTimeouts: ENTRY PROC [handle: Handle] RETURNS [getTimeout, putTimeout: Milliseconds] ~ { IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; getTimeout _ handle.getTimeout; putTimeout _ handle.putTimeout; }; SetTimeouts: PUBLIC PROC [self: IO.STREAM, getTimeout, putTimeout: Milliseconds _ waitForever] ~ { handle: Handle = NARROW[self.streamData]; EntrySetTimeouts[handle, getTimeout, putTimeout]; }; EntrySetTimeouts: ENTRY PROC [handle: Handle, getTimeout, putTimeout: Milliseconds] ~ { IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; InitTimeouts[handle, getTimeout, putTimeout]; }; SetSSType: PUBLIC PROC [self: STREAM, ssType: SubSequenceType] ~ { handle: Handle = NARROW[self.streamData]; code: CompletionCode; DO code _ EntrySetSSType[handle, ssType]; IF code = normal THEN RETURN; SIGNAL Timeout[]; ENDLOOP; }; EntrySetSSType: ENTRY PROC [handle: Handle, ssType: SubSequenceType, sendNow: BOOL _ FALSE] RETURNS [CompletionCode _ normal] ~ { ENABLE UNWIND => NULL; finger: Finger; startTime: Pulses _ Now[]; IF ssType = handle.outputSSType THEN RETURN; DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; finger _ handle.outputEnqueue; SELECT finger.state FROM empty => EXIT; sending, full => { IF IsTimedOut[handle.putPulseOut, startTime] THEN RETURN [timedOut]; WAIT handle.outputSpace; LOOP }; filling => { WAIT handle.doneFilling; LOOP }; halfFull => { OutputEnqueue[handle~handle, endOfMessage~FALSE]; LOOP }; ENDCASE => ERROR; ENDLOOP; handle.outputSSType _ ssType; OutputMakeNotEmpty[handle]; -- gets new handle.outputSSType IF sendNow THEN OutputEnqueue[handle~handle, endOfMessage~FALSE]; }; SendEndOfMessage: PUBLIC PROC [self: STREAM] ~ { handle: Handle = NARROW[self.streamData]; code: CompletionCode; DO code _ EntrySendEndOfMessage[handle]; IF code = normal THEN RETURN; SIGNAL Timeout[]; ENDLOOP; }; EntrySendEndOfMessage: ENTRY PROC [handle: Handle] RETURNS [CompletionCode] ~ { ENABLE UNWIND => NULL; finger: Finger; startTime: Pulses _ Now[]; DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; finger _ handle.outputEnqueue; SELECT finger.state FROM empty => { OutputMakeNotEmpty[handle]; EXIT }; sending, full => { IF IsTimedOut[handle.putPulseOut, startTime] THEN RETURN [timedOut]; WAIT handle.outputSpace; LOOP }; filling => { WAIT handle.doneFilling; LOOP }; halfFull => EXIT; ENDCASE => ERROR; ENDLOOP; OutputEnqueue[handle~handle, endOfMessage~TRUE]; RETURN [normal]; }; SendAttention: PUBLIC PROC [self: STREAM, attentionType: AttentionType] ~ { handle: Handle = NARROW[self.streamData]; nsb: XNSBuffer; code: CompletionCode; DO [nsb, code] _ EntrySendAttention[handle, attentionType]; IF code = normal THEN EXIT; IF nsb # NIL THEN ERROR; -- can't happen SIGNAL Timeout[]; ENDLOOP; IF nsb # NIL THEN XNSSocket.Put[handle.socket, nsb]; }; EntrySendAttention: ENTRY PROC [handle: Handle, attentionType: AttentionType] RETURNS [nsb: XNSBuffer, code: CompletionCode] ~ { ENABLE UNWIND => NULL; finger: Finger; startTime: Pulses _ Now[]; attnNum: CARDINAL; DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; IF handle.outputIBAttnCnt >= 3 THEN { -- How many really ???? IF IsTimedOut[handle.putPulseOut, startTime] THEN RETURN [NIL, timedOut]; WAIT handle.outputSpace; LOOP }; finger _ handle.outputEnqueue; SELECT finger.state FROM filling => { WAIT handle.doneFilling; LOOP }; halfFull => { OutputEnqueue[handle~handle, endOfMessage~FALSE]; EXIT }; ENDCASE => EXIT; ENDLOOP; WHILE finger.next # handle.outputEnqueue DO finger _ finger.next ENDLOOP; finger.next _ NEW [FingerObject _ [next~handle.outputEnqueue, attention~TRUE]]; finger _ finger.next; handle.outputEnqueue _ finger; handle.outputIBAttnCnt _ handle.outputIBAttnCnt.SUCC; attnNum _ handle.outputEnqueueNum; OutputMakeNotEmpty[handle]; TRUSTED { finger.buffer.body.bytes[0] _ LOOPHOLE[attentionType] }; finger.bytes _ 1; OutputEnqueue[handle~handle, endOfMessage~FALSE]; IF SignedDiff[handle.recvdAllocNum, attnNum] < 0 THEN { nsb: XNSBuffer; TRUSTED { nsb _ MakeCopyOfXNSBuffer[handle.socket, LOOPHOLE[finger.buffer, XNSBuffer]]; ReadyOutputBuffer[handle~handle, buffer~LOOPHOLE[nsb, SPPBuffer], requestAck~FALSE] }; RETURN [nsb, normal]; }; RETURN [NIL, normal]; }; WaitAttention: PUBLIC PROC [self: STREAM, waitTimeout: Milliseconds _ XNSStream.waitForever] RETURNS [type: AttentionType] ~ { handle: Handle = NARROW[self.streamData]; code: CompletionCode; DO [type, code] _ EntryWaitAttention[handle, MsecToPulses[waitTimeout]]; IF code = normal THEN RETURN; SIGNAL Timeout[]; ENDLOOP; }; EntryWaitAttention: ENTRY PROC [handle: Handle, pulseOut: Pulses] RETURNS [AttentionType, CompletionCode] ~ { ENABLE UNWIND => NULL; startTime: Pulses _ Now[]; p: OBAttn _ NIL; DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; IF (p _ handle.recvdOBAttnList) # NIL THEN { handle.recvdOBAttnList _ p.next; RETURN [p.attentionType, normal] }; IF IsTimedOut[pulseOut, startTime] THEN RETURN[0, timedOut]; TRUSTED { IF pulseOut < Pulses.LAST THEN Process.SetTimeout[@handle.recvdOBAttn, TimeoutTicksFromPulses[pulseOut]] ELSE Process.DisableTimeout[@handle.recvdOBAttn]; }; WAIT handle.recvdOBAttn; ENDLOOP; }; GetStatus: PUBLIC PROC [self: STREAM, reset: BOOL] RETURNS [state: XNSStream.State, ssType: SubSequenceType, attentionType: AttentionType] ~ { handle: Handle = NARROW[self.streamData]; [state, ssType, attentionType] _ EntryGetStatus[handle, reset]; }; EntryGetStatus: ENTRY PROC [handle: Handle, reset: BOOL] RETURNS [state: XNSStream.State, ssType: SubSequenceType, attentionType: AttentionType] ~ { ENABLE UNWIND => NULL; finger: Finger; buffer: SPPBuffer; DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; finger _ handle.inputDequeue; SELECT finger.state FROM full, halfEmpty => { finger.state _ halfEmpty; buffer _ finger.buffer; IF finger.attention THEN { state _ attention; ssType _ handle.inputSSType; TRUSTED { attentionType _ LOOPHOLE[finger.buffer.body.bytes[0]] }; IF reset THEN { p: OBAttn ~ handle.recvdOBAttnList; IF (p # NIL) AND (p.seqNum = handle.inputDequeueNum) THEN handle.recvdOBAttnList _ p.next; InputDequeue[handle] }; RETURN }; IF buffer.hdr2.sst # handle.inputSSType THEN { IF reset THEN handle.inputSSType _ buffer.hdr2.sst; RETURN [state~ssTypeChange, ssType~buffer.hdr2.sst, attentionType~0] }; IF finger.index < finger.bytes THEN RETURN [state~open, ssType~handle.inputSSType, attentionType~0]; IF buffer.hdr2.connCtl.endOfMsg THEN { IF reset THEN InputDequeue[handle]; RETURN [state~endOfMessage, ssType~handle.inputSSType, attentionType~0] }; InputDequeue[handle]; LOOP }; emptying => { WAIT handle.doneEmptying; LOOP }; empty => { RETURN[state~open, ssType~handle.inputSSType, attentionType~0] }; ENDCASE => ERROR; ENDLOOP; }; FlushInput: PUBLIC PROC [self: IO.STREAM, wait: BOOL _ FALSE] RETURNS [bytesSkipped: LONG CARDINAL _ 0] ~ { handle: Handle = NARROW[self.streamData]; code: CompletionCode; DO [bytesSkipped, code] _ EntryFlushInput[handle, wait, bytesSkipped]; IF code = normal THEN RETURN; SIGNAL Timeout[]; ENDLOOP; }; EntryFlushInput: ENTRY PROC [handle: Handle, wait: BOOL _ FALSE, bytesSkipped: LONG CARDINAL] RETURNS [newSkipped: LONG CARDINAL, code: CompletionCode] ~ { ENABLE UNWIND => NULL; startTime: Pulses _ Now[]; finger: Finger; buffer: SPPBuffer; DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; finger _ handle.inputDequeue; SELECT finger.state FROM full, halfEmpty => { finger.state _ halfEmpty; IF finger.attention THEN RETURN [bytesSkipped, normal]; buffer _ finger.buffer; IF buffer.hdr2.sst # handle.inputSSType THEN RETURN [bytesSkipped, normal]; IF finger.index < finger.bytes THEN { bytesSkipped _ bytesSkipped + (finger.bytes - finger.index); finger.index _ finger.bytes }; IF buffer.hdr2.connCtl.endOfMsg THEN RETURN [bytesSkipped, normal]; InputDequeue[handle]; LOOP }; emptying => { WAIT handle.doneEmptying; LOOP }; empty => { IF NOT wait THEN RETURN [bytesSkipped, normal]; IF IsTimedOut[handle.getPulseOut, startTime] THEN RETURN [bytesSkipped, timedOut]; WAIT handle.inputReady; LOOP }; ENDCASE => ERROR; ENDLOOP; }; SendNow: PUBLIC PROC [self: IO.STREAM] ~ { handle: Handle = NARROW[self.streamData]; EntrySendNow[handle]; }; EntrySendNow: ENTRY PROC [handle: Handle] ~ { finger: Finger; DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; finger _ handle.outputEnqueue; SELECT finger.state FROM filling => { WAIT handle.doneFilling; LOOP }; halfFull => { OutputEnqueue[handle~handle, endOfMessage~FALSE]; EXIT }; ENDCASE => EXIT; ENDLOOP; }; SendClose: PUBLIC PROC [self: IO.STREAM] RETURNS [ok: BOOL _ FALSE] ~ { ENABLE ConnectionClosed => CONTINUE; handle: Handle = NARROW[self.streamData]; sst: SubSequenceType; code: CompletionCode; code _ EntrySetSSType[handle~handle, ssType~XNSSPPTypes.endSST, sendNow~TRUE]; IF code # normal THEN RETURN; DO [code~code] _ EntryFlushInput[handle~handle, wait~TRUE, bytesSkipped~0]; IF code # normal THEN RETURN; [ssType~sst] _ EntryGetStatus[handle~handle, reset~TRUE]; IF (sst = XNSSPPTypes.endReplySST) OR (sst = XNSSPPTypes.endSST) THEN { [] _ EntrySetSSType[handle~handle, ssType~XNSSPPTypes.endReplySST, sendNow~TRUE]; RETURN [TRUE] }; ENDLOOP; }; SendCloseReply: PUBLIC PROC [self: IO.STREAM] RETURNS [ok: BOOL _ FALSE] ~ { ENABLE ConnectionClosed => CONTINUE; handle: Handle = NARROW[self.streamData]; sst: SubSequenceType; code: CompletionCode; code _ EntrySetSSType[handle~handle, ssType~XNSSPPTypes.endReplySST, sendNow~TRUE]; IF code # normal THEN RETURN; DO [code~code] _ EntryFlushInput[handle~handle, wait~TRUE, bytesSkipped~0]; IF code # normal THEN RETURN; [ssType~sst] _ EntryGetStatus[handle~handle, reset~TRUE]; IF (sst = XNSSPPTypes.endReplySST) THEN RETURN [TRUE]; ENDLOOP; }; GetChar: PROC [self: STREAM] RETURNS [char: CHAR] ~ { handle: Handle ~ NARROW[self.streamData]; code: CompletionCode; DO [char, code] _ EntryGetChar[handle, self]; IF code = normal THEN RETURN; SIGNAL Timeout[]; ENDLOOP; }; EntryGetChar: ENTRY PROC [handle: Handle, self: STREAM] RETURNS [char: CHAR, code: CompletionCode] ~ { ENABLE UNWIND => NULL; finger: Finger; buffer: SPPBuffer; startTime: Pulses _ Now[]; DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; finger _ handle.inputDequeue; SELECT finger.state FROM full, halfEmpty => { finger.state _ halfEmpty; buffer _ finger.buffer; IF finger.attention OR (buffer.hdr2.sst # handle.inputSSType) THEN RETURN WITH ERROR IO.EndOfStream[self]; IF finger.index >= finger.bytes THEN { IF buffer.hdr2.connCtl.endOfMsg THEN RETURN WITH ERROR IO.EndOfStream[self]; InputDequeue[handle]; LOOP }; TRUSTED { char _ buffer.body.chars[finger.index] }; finger.index _ finger.index + 1; RETURN [char, normal]; }; emptying => { WAIT handle.doneEmptying; LOOP }; empty => { IF IsTimedOut[handle.getPulseOut, startTime] THEN RETURN [char~, code~timedOut]; WAIT handle.inputReady; LOOP }; ENDCASE => ERROR; ENDLOOP; }; GetBlock: PROC [self: STREAM, block: REF TEXT, startIndex: NAT, count: NAT] RETURNS [nBytesRead: NAT _ 0] ~ { handle: Handle ~ NARROW[self.streamData]; finger: Finger _ NIL; base: LONG POINTER ~ LOOPHOLE[block, LONG POINTER]+SIZE[TEXT[0]]; -- WORD SIZE DEPENDENT ???? stop: NAT ~ MIN[(startIndex+count), block.maxLength]; nBytes: NAT; code: CompletionCode; WHILE startIndex < stop DO [finger, code] _ GetBlockNext[self, handle, finger]; SELECT code FROM timedOut => { SIGNAL Timeout[]; LOOP }; normal => IF finger = NIL THEN EXIT; ENDCASE => ERROR; TRUSTED { nBytes _ PrincOpsUtils.ByteBlt[ to~[ blockPointer~base, startIndex~startIndex, stopIndexPlusOne~stop], from~[ blockPointer~@finger.buffer.body, -- WORD SIZE DEPENDENT ???? startIndex~finger.index, stopIndexPlusOne~finger.bytes] ]; }; finger.index _ finger.index + nBytes; nBytesRead _ nBytesRead + nBytes; startIndex _ startIndex + nBytes; block.length _ startIndex; ENDLOOP; IF finger # NIL THEN GetBlockLast[handle, finger]; }; UnsafeGetBlock: UNSAFE PROC [self: STREAM, block: IO.UnsafeBlock] RETURNS [nBytesRead: INT _ 0] ~ { handle: Handle ~ NARROW[self.streamData]; finger: Finger _ NIL; startIndex: INT _ block.startIndex; stop: INT ~ block.startIndex+block.count; nBytes: NAT; code: CompletionCode; WHILE startIndex < stop DO [finger, code] _ GetBlockNext[self, handle, finger]; SELECT code FROM timedOut => { SIGNAL Timeout[]; LOOP }; normal => IF finger = NIL THEN EXIT; ENDCASE => ERROR; TRUSTED { nBytes _ PrincOpsUtils.ByteBlt[ to~[ blockPointer~block.base, startIndex~startIndex, stopIndexPlusOne~stop], from~[ blockPointer~@finger.buffer.body, -- WORD SIZE DEPENDENT ???? startIndex~finger.index, stopIndexPlusOne~finger.bytes] ]; }; finger.index _ finger.index + nBytes; nBytesRead _ nBytesRead + nBytes; startIndex _ startIndex + nBytes; ENDLOOP; IF finger # NIL THEN GetBlockLast[handle, finger]; }; GetBlockNext: ENTRY PROC [self: STREAM, handle: Handle, oldFinger: Finger] RETURNS [Finger, CompletionCode] ~ { ENABLE UNWIND => NULL; finger: Finger _ NIL; buffer: SPPBuffer; startTime: Pulses _ Now[]; IF oldFinger # NIL THEN { oldFinger.state _ halfEmpty; BROADCAST handle.doneEmptying; }; DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; finger _ handle.inputDequeue; buffer _ finger.buffer; SELECT finger.state FROM full, halfEmpty => { IF finger.attention OR (buffer.hdr2.sst # handle.inputSSType) THEN RETURN[NIL, normal]; IF finger.index >= finger.bytes THEN { IF buffer.hdr2.connCtl.endOfMsg THEN RETURN[NIL, normal]; InputDequeue[handle]; LOOP }; finger.state _ emptying; RETURN[finger, normal] }; emptying => { WAIT handle.doneEmptying; LOOP }; empty => { IF IsTimedOut[handle.getPulseOut, startTime] THEN RETURN [NIL, timedOut]; WAIT handle.inputReady; LOOP }; ENDCASE => ERROR; ENDLOOP; }; GetBlockLast: ENTRY PROC [handle: Handle, oldFinger: Finger] ~ { IF oldFinger = NIL THEN ERROR; -- can't happen oldFinger.state _ halfEmpty; BROADCAST handle.doneEmptying; }; PutChar: PROC [self: STREAM, char: CHAR] ~ { handle: Handle ~ NARROW[self.streamData]; code: CompletionCode; DO code _ EntryPutChar[handle, char]; IF code = normal THEN RETURN; SIGNAL Timeout[]; ENDLOOP; }; EntryPutChar: ENTRY PROC [handle: Handle, char: CHAR] RETURNS [CompletionCode] ~ { ENABLE UNWIND => NULL; finger: Finger; startTime: Pulses _ Now[]; DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; finger _ handle.outputEnqueue; SELECT finger.state FROM empty => OutputMakeNotEmpty[handle]; filling => { WAIT handle.doneFilling; LOOP }; halfFull => NULL; full, sending => { IF IsTimedOut[handle.putPulseOut, startTime] THEN RETURN [timedOut]; WAIT handle.outputSpace; LOOP }; ENDCASE => ERROR; IF finger.bytes = LAST[BufIndex] THEN { OutputEnqueue[handle~handle, endOfMessage~FALSE]; LOOP }; TRUSTED { finger.buffer.body.chars[finger.bytes] _ char }; finger.bytes _ finger.bytes + 1; RETURN [normal]; ENDLOOP; }; PutBlock: PROC [self: STREAM, block: REF READONLY TEXT, startIndex: NAT, count: NAT] ~ { handle: Handle ~ NARROW[self.streamData]; finger: Finger _ NIL; base: LONG POINTER ~ LOOPHOLE[block, LONG POINTER]+SIZE[TEXT[0]]; -- WORD SIZE DEPENDENT ???? stop: NAT ~ MIN[(startIndex+count), block.length]; nBytes: NAT; code: CompletionCode; WHILE startIndex < stop DO [finger, code] _ PutBlockNext[self, handle, finger]; SELECT code FROM timedOut => { SIGNAL Timeout[]; LOOP }; normal => NULL; ENDCASE => ERROR; IF finger = NIL THEN ERROR; -- can't happen TRUSTED { nBytes _ PrincOpsUtils.ByteBlt[ to~[ blockPointer~@finger.buffer.body, -- WORD SIZE DEPENDENT ???? startIndex~finger.bytes, stopIndexPlusOne~BufIndex.LAST], from~[ blockPointer~base, startIndex~startIndex, stopIndexPlusOne~stop] ]; }; finger.bytes _ finger.bytes + nBytes; startIndex _ startIndex + nBytes; ENDLOOP; IF finger # NIL THEN PutBlockLast[handle, finger]; }; UnsafePutBlock: PROC [self: STREAM, block: IO.UnsafeBlock] ~ { handle: Handle ~ NARROW[self.streamData]; finger: Finger _ NIL; startIndex: INT _ block.startIndex; stop: INT ~ block.startIndex+block.count; nBytes: NAT; code: CompletionCode; WHILE startIndex < stop DO [finger, code] _ PutBlockNext[self, handle, finger]; SELECT code FROM timedOut => { SIGNAL Timeout[]; LOOP }; normal => NULL; ENDCASE => ERROR; IF finger = NIL THEN ERROR; -- can't happen TRUSTED { nBytes _ PrincOpsUtils.ByteBlt[ to~[ blockPointer~@finger.buffer.body, -- WORD SIZE DEPENDENT ???? startIndex~finger.bytes, stopIndexPlusOne~BufIndex.LAST], from~[ blockPointer~block.base, startIndex~startIndex, stopIndexPlusOne~stop] ]; }; finger.bytes _ finger.bytes + nBytes; startIndex _ startIndex + nBytes; ENDLOOP; IF finger # NIL THEN PutBlockLast[handle, finger]; }; PutBlockNext: ENTRY PROC [self: STREAM, handle: Handle, oldFinger: Finger] RETURNS [Finger, CompletionCode] ~ { ENABLE UNWIND => NULL; finger: Finger _ NIL; startTime: Pulses _ Now[]; IF oldFinger # NIL THEN { oldFinger.state _ halfFull; BROADCAST handle.doneFilling; OutputEnqueue[handle~handle, endOfMessage~FALSE] }; DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; finger _ handle.outputEnqueue; SELECT finger.state FROM empty => OutputMakeNotEmpty[handle]; filling => { WAIT handle.doneFilling; LOOP }; halfFull => NULL; full, sending => { IF IsTimedOut[handle.putPulseOut, startTime] THEN RETURN [NIL, timedOut]; WAIT handle.outputSpace; LOOP }; ENDCASE => ERROR; IF finger.bytes = LAST[BufIndex] THEN { OutputEnqueue[handle~handle, endOfMessage~FALSE]; LOOP }; finger.state _ filling; RETURN[finger, normal]; ENDLOOP; }; PutBlockLast: ENTRY PROC [handle: Handle, finger: Finger] ~ { ENABLE UNWIND => NULL; finger.state _ halfFull; BROADCAST handle.doneFilling; }; EndOf: PROC [self: STREAM] RETURNS [BOOL] ~ { handle: Handle ~ NARROW[self.streamData]; RETURN [EntryEndOf[handle]]; }; EntryEndOf: ENTRY PROC [handle: Handle] RETURNS [BOOL] ~ { ENABLE UNWIND => NULL; finger: Finger; DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; finger _ handle.inputDequeue; SELECT finger.state FROM full => { finger.state _ halfEmpty; LOOP }; emptying => { WAIT handle.doneEmptying; LOOP }; halfEmpty => RETURN [ finger.attention OR (finger.buffer.hdr2.sst # handle.inputSSType) OR ((finger.index = finger.bytes) AND finger.buffer.hdr2.connCtl.endOfMsg) ]; empty => RETURN [ FALSE ]; ENDCASE => ERROR; ENDLOOP; }; CharsAvail: PROC [self: STREAM, wait: BOOL] RETURNS [nChars: INT] ~ { handle: Handle ~ NARROW[self.streamData]; code: CompletionCode; DO [nChars, code] _ EntryCharsAvail[handle, wait]; IF code = normal THEN RETURN; SIGNAL Timeout[]; ENDLOOP; }; EntryCharsAvail: ENTRY PROC [handle: Handle, wait: BOOL] RETURNS [INT, CompletionCode] ~ { ENABLE UNWIND => NULL; finger: Finger; startTime: Pulses _ Now[]; DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; finger _ handle.inputDequeue; SELECT finger.state FROM full, halfEmpty => { finger.state _ halfEmpty; IF finger.attention OR (finger.buffer.hdr2.sst # handle.inputSSType) THEN RETURN[0, normal]; IF finger.index < finger.bytes THEN RETURN [(finger.bytes - finger.index), normal]; IF finger.buffer.hdr2.connCtl.endOfMsg THEN RETURN[0, normal]; InputDequeue[handle]; LOOP }; emptying => { WAIT handle.doneEmptying; LOOP }; empty => { IF NOT wait THEN RETURN [0, normal]; IF IsTimedOut[handle.getPulseOut, startTime] THEN RETURN [0, timedOut]; WAIT handle.inputReady; LOOP }; ENDCASE => ERROR; -- Can't happen. ENDLOOP; }; Flush: PROC [self: STREAM] ~ { handle: Handle ~ NARROW[self.streamData]; code: CompletionCode; DO code _ EntryFlush[handle]; IF code = normal THEN RETURN; SIGNAL Timeout[]; ENDLOOP; }; EntryFlush: ENTRY PROC [handle: Handle] RETURNS [CompletionCode] ~ { ENABLE UNWIND => NULL; flushNum: CARDINAL; startTime: Pulses _ Now[]; DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; SELECT handle.outputEnqueue.state FROM filling => { WAIT handle.doneFilling; LOOP }; halfFull => { OutputEnqueue[handle~handle, endOfMessage~FALSE]; EXIT }; empty, full, sending => EXIT; ENDCASE => ERROR; ENDLOOP; IF handle.outputDequeueNum = handle.outputEnqueueNum THEN RETURN [normal]; IF handle.outputSendNum = handle.outputEnqueueNum THEN { DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; SELECT handle.outputEnqueue.state FROM empty => { OutputMakeNotEmpty[handle]; EXIT }; filling => { WAIT handle.doneFilling; LOOP }; halfFull => EXIT; full, sending => { IF IsTimedOut[handle.putPulseOut, startTime] THEN RETURN [timedOut]; WAIT handle.outputSpace; LOOP }; ENDCASE => ERROR; ENDLOOP; OutputEnqueue[handle~handle, endOfMessage~FALSE]; }; flushNum _ handle.outputEnqueueNum; handle.flusherCnt _ handle.flusherCnt + 1; DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; IF SignedDiff[handle.outputDequeueNum, flushNum] >= 0 THEN EXIT; IF IsTimedOut[handle.putPulseOut, startTime] THEN RETURN [timedOut]; WAIT handle.flusherWakeup; ENDLOOP; handle.flusherCnt _ handle.flusherCnt - 1; RETURN [normal]; }; Close: PROC [self: STREAM, abort: BOOL] = { handle: Handle ~ NARROW[self.streamData]; EntryNotifyClosed[handle, localClose, "close called"]; FinishHandle[handle]; }; nSent: INT _ 0; EntryPush: ENTRY PROC [handle: Handle, sentFinger: Finger] RETURNS [sendFinger: Finger _ NIL, sendBuffer: XNSBuffer _ NIL] ~ { ENABLE UNWIND => NULL; IF sentFinger # NIL THEN { sentFinger.state _ full; BROADCAST handle.doneSending }; DO IF handle.closed THEN RETURN; IF handle.outputSendNum = handle.outputEnqueueNum THEN { WAIT handle.outputReady; LOOP }; IF SignedDiff[handle.recvdAllocNum, handle.outputSendNum] < 0 THEN { WAIT handle.recvdNewAlloc; LOOP }; { halfMyBuffers, windowLeft, nSentNoAckReq: INTEGER; sendAckReq: BOOL; halfMyBuffers _ INTEGER[handle.outputBuffersAllocated / 2]; windowLeft _ SignedDiff[handle.recvdAllocNum, handle.outputSendNum]; nSentNoAckReq _ SignedDiff[handle.outputSendNum, handle.expectedAckNum]; sendAckReq _ ((windowLeft = halfMyBuffers) OR (windowLeft = 0) OR (nSentNoAckReq >= halfMyBuffers) OR (handle.flusherCnt > 0)); sendFinger _ handle.outputSend; ReadyOutputBuffer[handle~handle, buffer~sendFinger.buffer, requestAck~sendAckReq]; sendFinger.state _ sending; TRUSTED { sendBuffer _ LOOPHOLE[sendFinger.buffer] }; handle.outputSendNum _ handle.outputSendNum + 1; handle.outputSend _ sendFinger.next; IF sendAckReq THEN BROADCAST handle.mgrLongWakeup; RETURN; }; ENDLOOP; }; Push: PROC [handle: Handle] ~ { sendBuffer: XNSBuffer _ NIL; sendFinger: Finger _ NIL; Process.SetPriority[Process.priorityForeground]; DO [sendFinger, sendBuffer] _ EntryPush[handle, sendFinger]; IF sendFinger = NIL THEN EXIT; CheckUserBytes[sendBuffer, XNSSPPBuf.hdrBytes + sendFinger.bytes]; -- DEBUG XNSSocketBackdoor.PutCached[handle.socket, sendBuffer]; ENDLOOP; IF sendBuffer # NIL THEN ERROR; -- Can't happen ???? }; nResent: INT _ 0; MaybeFlushCache: INTERNAL PROC [handle: Handle, pulseOut: Pulses] ~ { IF PulsesSince[handle.cacheFlushedTime] > pulseOut THEN { handle.cacheFlushedTime _ Now[]; XNSSocketBackdoor.FlushCache[handle.socket] }; }; EntryMgr: ENTRY PROC [handle: Handle, sentFinger: Finger] RETURNS [sendFinger: Finger _ NIL, sendBuffer: XNSBuffer _ NIL] ~ { ENABLE UNWIND => NULL; ackReqOutstanding, allocReqOutstanding, timedOutAck, timedOutProbe: BOOL; b: SPPBuffer _ NIL; IF sentFinger # NIL THEN { sentFinger.state _ full; BROADCAST handle.doneSending }; DO IF PulsesSince[handle.recvdTime] >= handle.noActivityPulseOut THEN NotifyClosed[handle, unknown, "no activity timeout"]; IF handle.closed THEN RETURN; MaybeFlushCache[handle, handle.cacheFlushPulseOut]; ackReqOutstanding _ ( (handle.outputDequeueNum # handle.outputSendNum) AND (SignedDiff[handle.expectedAckNum, handle.outputDequeueNum] > 0)); allocReqOutstanding _ ( (handle.outputSendNum # handle.outputEnqueueNum) AND (SignedDiff[handle.outputSendNum, handle.recvdAllocNum] > 0)); timedOutAck _ ( PulsesSince[handle.sentAckReqTime] > handle.waitForAckPulseOut); timedOutProbe _ ( (PulsesSince[handle.recvdTime] > handle.probePulseOut) AND (PulsesSince[handle.sentAckReqTime] > handle.probePulseOut)); SELECT TRUE FROM (ackReqOutstanding AND timedOutAck) => { SELECT handle.outputDequeue.state FROM full => NULL; sending => { WAIT handle.doneSending; LOOP }; ENDCASE => ERROR; MaybeFlushCache[handle, 3*handle.waitForAckPulseOut]; -- What's the right timeout to use here ???? nResent _ nResent.SUCC; sendFinger _ handle.outputDequeue; sendFinger.state _ sending; b _ sendFinger.buffer; TRUSTED { sendBuffer _ LOOPHOLE[b] }; ReadyOutputBuffer[handle, b, TRUE]; RETURN }; (timedOutProbe OR (allocReqOutstanding AND timedOutAck)) => { [nsb~sendBuffer, b~b] _ AllocateOutputBuffer[handle]; InitSystemOutputBuffer[handle, b]; ReadyOutputBuffer[handle, b, TRUE]; RETURN }; handle.mustSendAck => { [nsb~sendBuffer, b~b] _ AllocateOutputBuffer[handle]; InitSystemOutputBuffer[handle, b]; ReadyOutputBuffer[handle, b, FALSE]; RETURN }; (ackReqOutstanding OR allocReqOutstanding) => { WAIT handle.mgrShortWakeup; LOOP }; ENDCASE => { WAIT handle.mgrLongWakeup; LOOP }; ENDLOOP; }; Mgr: PROC [handle: Handle] ~ { sendBuffer: XNSBuffer _ NIL; sendFinger: Finger _ NIL; Process.SetPriority[Process.priorityForeground]; DO [sendFinger, sendBuffer] _ EntryMgr[handle, sendFinger]; SELECT TRUE FROM (sendFinger # NIL) => { CheckUserBytes[sendBuffer, XNSSPPBuf.hdrBytes + sendFinger.bytes]; -- DEBUG XNSSocketBackdoor.PutCached[handle.socket, sendBuffer]; }; (sendBuffer # NIL) => { CheckUserBytes[sendBuffer, XNSSPPBuf.hdrBytes]; -- DEBUG XNSSocketBackdoor.PutCached[handle.socket, sendBuffer]; XNSSocket.FreeBuffer[handle.socket, sendBuffer]; }; ENDCASE => EXIT; ENDLOOP; }; nRecvd: INT _ 0; nTooShort: INT _ 0; nBadConnID: INT _ 0; nBadRemoteConnID: INT _ 0; GotNewAck: INTERNAL PROC [handle: Handle, newAckNum: CARDINAL] ~ { finger: Finger; IF SignedDiff[newAckNum, handle.outputSendNum] > 0 THEN { RETURN }; WHILE SignedDiff[handle.outputDequeueNum, newAckNum] < 0 DO IF handle.closed THEN RETURN; finger _ handle.outputDequeue; SELECT finger.state FROM full => NULL; sending => { -- extremely unlikely; occurs only when resending. WAIT handle.doneSending; LOOP }; ENDCASE => ERROR; IF finger.attention THEN { temp: Finger; nsb: XNSBuffer; FOR temp _ finger, temp.next WHILE temp.next # finger DO NULL ENDLOOP; temp.next _ finger.next; IF finger.buffer = NIL THEN ERROR; -- Can't happen ???? TRUSTED { nsb _ LOOPHOLE[finger.buffer] }; XNSSocket.FreeBuffer[handle.socket, nsb]; finger.buffer _ NIL; handle.outputIBAttnCnt _ handle.outputIBAttnCnt.PRED; } ELSE { finger.state _ empty; BROADCAST handle.outputSpace; }; handle.outputDequeueNum _ handle.outputDequeueNum + 1; handle.outputDequeue _ finger.next; ENDLOOP; IF handle.flusherCnt > 0 THEN BROADCAST handle.flusherWakeup; }; GotAckReq: INTERNAL PROC [handle: Handle, seqNum: CARDINAL, systemPacket: BOOL] ~ -- INLINE -- { IF systemPacket OR (SignedDiff[seqNum, handle.inputEnqueueNum] < 0) THEN { handle.mustSendAck _ TRUE; BROADCAST handle.mgrShortWakeup; BROADCAST handle.mgrLongWakeup } ELSE { handle.heWantsAlloc _ TRUE }; }; GotNewAlloc: INTERNAL PROC [handle: Handle, newAllocNum: CARDINAL] ~ -- INLINE -- { handle.recvdAllocNum _ newAllocNum; BROADCAST handle.recvdNewAlloc; }; GotOBAttn: INTERNAL PROC [handle: Handle, seqNum: CARDINAL, type: AttentionType] ~ { p, prev, new: OBAttn; seqDiff: INTEGER ~ SignedDiff[seqNum, handle.inputDequeueNum]; IF (seqDiff < 0) OR (seqDiff > 50) THEN RETURN; -- what's to use instead of 50 ???? prev _ NIL; p _ handle.recvdOBAttnList; DO IF p = NIL THEN EXIT; IF p.seqNum = seqNum THEN RETURN; IF p.seqNum > seqNum THEN EXIT; prev _ p; p _ p.next; ENDLOOP; new _ NEW[OBAttnObject_[next~p, seqNum~seqNum, attentionType~type]]; IF prev = NIL THEN handle.recvdOBAttnList _ new ELSE prev.next _ new; BROADCAST handle.recvdOBAttn; }; GotData: INTERNAL PROC [handle: Handle, newSeqNum: CARDINAL, b: SPPBuffer, bytes: NAT] RETURNS [swapb: SPPBuffer] ~ -- INLINE -- { finger: Finger; IF SignedDiff[newSeqNum, handle.inputAckNum] < 0 THEN { RETURN[b] }; IF SignedDiff[newSeqNum, handle.inputDequeueNum] >= INT[handle.inputBuffersAllocated] THEN { RETURN[b] }; WHILE (SignedDiff[newSeqNum, handle.inputEnqueueNum] >= 0) DO handle.inputEnqueue _ handle.inputEnqueue.next; handle.inputEnqueueNum _ handle.inputEnqueueNum + 1; ENDLOOP; finger _ handle.inputAck; THROUGH [0 .. SignedDiff[newSeqNum, handle.inputAckNum]) DO finger _ finger.next; ENDLOOP; swapb _ finger.buffer; finger.buffer _ b; finger.bytes _ bytes; finger.index _ 0; finger.attention _ b.hdr2.connCtl.attn; finger.state _ full; WHILE (handle.inputAck.state = full) AND (handle.inputAckNum # handle.inputEnqueueNum) DO BROADCAST handle.inputReady; handle.inputAckNum _ handle.inputAckNum + 1; handle.inputAck _ handle.inputAck.next; ENDLOOP; }; Receive: XNSSocketBackdoor.ReceiveProc ~ { nRecvd _ nRecvd.SUCC; -- DEBUG RETURN [ EntryReceive[NARROW[clientData], b] ] }; EntryReceive: ENTRY PROC [handle: Handle, nsb: XNSBuf.Buffer] RETURNS [XNSBuf.Buffer] ~ { ENABLE UNWIND => NULL; bytes: NAT; IF handle.closed THEN RETURN [nsb]; bytes _ XNSSocket.GetUserBytes[nsb]; SELECT nsb.hdr1.type FROM error => { OPEN XNSErrorTypes; eb: XNSErrorBuf.Buffer; TRUSTED { eb _ LOOPHOLE[nsb] }; SELECT eb.hdr2.type FROM noSocketErr, listenerRejectErr, protocolViolationErr => { NotifyClosed[handle, remoteClose, "remote close"] }; ENDCASE => { XNSSocketBackdoor.FlushCache[handle.socket] }; RETURN [nsb] }; spp => { b: XNSSPPBuf.Buffer; newSeqNum: CARDINAL; newAckNum: CARDINAL; newAllocNum: CARDINAL; TRUSTED { b _ LOOPHOLE[nsb] }; IF bytes < XNSSPPBuf.hdrBytes THEN { XNSSocket.ReturnError[handle.socket, nsb, XNSErrorTypes.unspecifiedErr]; nsb _ NIL; nTooShort _ nTooShort.SUCC; -- DEBUG RETURN [nsb] }; IF b.hdr2.destConnID # handle.connectionID THEN { IF b.hdr2.destConnID # unknownConnectionID THEN { XNSSocket.ReturnError[handle.socket, nsb, XNSErrorTypes.protocolViolationErr]; nsb _ NIL; nBadConnID _ nBadConnID.SUCC; -- DEBUG RETURN [nsb] }; }; IF b.hdr2.sourceConnID # handle.remoteConnectionID THEN { XNSSocket.ReturnError[handle.socket, nsb, XNSErrorTypes.protocolViolationErr]; nsb _ NIL; nBadRemoteConnID _ nBadRemoteConnID.SUCC; -- DEBUG RETURN [nsb] }; newSeqNum _ Endian.CardFromH[b.hdr2.seqNum]; newAckNum _ Endian.CardFromH[b.hdr2.ackNum]; newAllocNum _ Endian.CardFromH[b.hdr2.allocNum]; handle.recvdTime _ Now[]; IF SignedDiff[newAllocNum, handle.recvdAllocNum] > 0 THEN GotNewAlloc[handle, newAllocNum]; IF SignedDiff[newAckNum, handle.outputDequeueNum] >= 0 THEN GotNewAck[handle, newAckNum]; IF b.hdr2.connCtl.sendAck THEN GotAckReq[handle, newSeqNum, b.hdr2.connCtl.system]; IF NOT b.hdr2.connCtl.system THEN { IF b.hdr2.connCtl.attn THEN TRUSTED { GotOBAttn[handle, newSeqNum, LOOPHOLE[b.body.bytes[0]] ] }; b _ GotData[handle, newSeqNum, b, (bytes - XNSSPPBuf.hdrBytes)]; TRUSTED { nsb _ LOOPHOLE[b] }; }; RETURN [nsb]; }; ENDCASE => { XNSSocket.ReturnError[handle.socket, nsb, XNSErrorTypes.invalidPacketTypeErr]; nsb _ NIL; RETURN [nsb]; }; }; Pull: PROC [handle: Handle] ~ { nsb: XNSBuf.Buffer _ NIL; XNSSocket.SetGetTimeout[handle.socket, XNSSocket.waitForever]; XNSSocketBackdoor.SetDirectReceive[handle.socket, Receive, handle]; WHILE NOT handle.closed DO IF (nsb _ XNSSocket.Get[handle.socket]) = NIL THEN LOOP; IF (nsb _ Receive[handle.socket, nsb, handle]) = NIL THEN LOOP; XNSSocket.FreeBuffer[handle.socket, nsb]; nsb _ NIL; ENDLOOP; XNSSocketBackdoor.SetDirectReceive[handle.socket, NIL, NIL]; handle _ NIL; -- for finalization }; globalLockHandle: Handle _ NEW[Object]; handleChain: Handle _ NIL; AddNewHandle: ENTRY PROC [handle: Handle _ globalLockHandle, newHandle: Handle] ~ { IF handle # globalLockHandle THEN ERROR; newHandle.next _ handleChain; handleChain _ newHandle; }; RemoveOldHandle: ENTRY PROC [handle: Handle _ globalLockHandle, oldHandle: Handle] ~ { IF handle # globalLockHandle THEN ERROR; IF handleChain = oldHandle THEN handleChain _ handleChain.next ELSE FOR h: Handle _ handleChain, h.next DO IF h.next = oldHandle THEN { h.next _ oldHandle.next; EXIT }; ENDLOOP; -- NIL fault if not on chain oldHandle.next _ NIL; -- Help Finalization }; FindExistingHandle: ENTRY PROC [handle: Handle _ globalLockHandle, remote: XNS.Address, remoteConnID: HWORD] RETURNS [existing: Handle] ~ { IF handle # globalLockHandle THEN ERROR; FOR existing _ handleChain, existing.next WHILE existing # NIL DO IF (existing.remote = remote) AND (existing.remoteConnectionID = remoteConnID) THEN EXIT; ENDLOOP; }; Rollback: Booting.RollbackProc = { FOR handle: Handle _ handleChain, handle.next WHILE handle # NIL DO IF NOT handle.closed THEN EntryNotifyClosed[handle, localClose, "Rollback"]; ENDLOOP; }; uniqueConnID: CARDINAL _ Basics.LowHalf[Now[]]; GetUniqueConnID: ENTRY PROC [handle: Handle _ globalLockHandle] RETURNS [HWORD] ~ { h: Handle; IF handle # globalLockHandle THEN ERROR; DO uniqueConnID _ uniqueConnID + 1; FOR h _ handleChain, h.next WHILE (h # NIL) AND (h.connectionID # uniqueConnID) DO NULL ENDLOOP; IF h = NIL THEN RETURN [Endian.HFromCard[uniqueConnID]]; ENDLOOP; }; listenerChain: Listener _ NIL; AddNewListener: ENTRY PROC [handle: Handle _ globalLockHandle, newListener: Listener] ~ { newListener.next _ listenerChain; listenerChain _ newListener }; RemoveOldListener: ENTRY PROC [handle: Handle _ globalLockHandle, oldListener: Listener] ~ { IF oldListener = listenerChain THEN listenerChain _ listenerChain.next ELSE FOR p: Listener _ listenerChain, p.next DO IF p.next = oldListener THEN { p.next _ oldListener.next; EXIT } ENDLOOP; -- NIL fault if not on chain oldListener.next _ NIL; }; droppedStreams: INT _ 0; finalizedStreams: INT _ 0; streamFinalizerQueue: SafeStorage.FinalizationQueue _ SafeStorage.NewFQ[]; StreamFinalizer: PROC ~ { Process.SetPriority[Process.priorityBackground]; DO handle: Handle _ NARROW[SafeStorage.FQNext[streamFinalizerQueue]]; IF NOT handle.finished THEN { -- User forgot to call Destroy IF NOT handle.closed THEN ERROR; -- Can't happen FinishHandle[handle]; SafeStorage.EnableFinalization[handle]; droppedStreams _ droppedStreams.SUCC; } ELSE { -- Normal end of life RemoveOldHandle[oldHandle~handle]; finalizedStreams _ finalizedStreams.SUCC; }; handle _ NIL; ENDLOOP; }; droppedListeners: INT _ 0; finalizedListeners: INT _ 0; listenerFinalizerQueue: SafeStorage.FinalizationQueue _ SafeStorage.NewFQ[]; ListenerFinalizer: PROC ~ { Process.SetPriority[Process.priorityBackground]; DO listener: Listener _ NARROW[SafeStorage.FQNext[listenerFinalizerQueue]]; IF NOT listener.destroyed THEN { -- User forgot to call Destroy DestroyListener[listener]; SafeStorage.EnableFinalization[listener]; droppedListeners _ droppedListeners.SUCC; } ELSE { -- Normal end of life RemoveOldListener[oldListener~listener]; finalizedListeners _ finalizedListeners.SUCC; }; listener _ NIL; ENDLOOP; }; { established: BOOL; established _ TRUE; SafeStorage.EstablishFinalization[type~CODE[Object], npr~1, fq~streamFinalizerQueue ! SafeStorage.CantEstablishFinalization => { established _ FALSE; CONTINUE }]; IF NOT established THEN { established _ TRUE; SafeStorage.ReEstablishFinalization[type~CODE[Object], npr~1, fq~streamFinalizerQueue ! SafeStorage.CantEstablishFinalization => { established _ FALSE; CONTINUE }]; }; IF NOT established THEN ERROR }; TRUSTED { Process.Detach[FORK StreamFinalizer[]]; }; { established: BOOL; established _ TRUE; SafeStorage.EstablishFinalization[type~CODE[ListenerObject], npr~1, fq~listenerFinalizerQueue ! SafeStorage.CantEstablishFinalization => { established _ FALSE; CONTINUE }]; IF NOT established THEN { established _ TRUE; SafeStorage.ReEstablishFinalization[type~CODE[ListenerObject], npr~1, fq~listenerFinalizerQueue ! SafeStorage.CantEstablishFinalization => { established _ FALSE; CONTINUE }]; }; IF NOT established THEN ERROR }; TRUSTED { Process.Detach[FORK ListenerFinalizer[]]; }; Booting.RegisterProcs[r: Rollback]; }. ςXNSStreamImpl.mesa Copyright c 1986 by Xerox Corporation. All rights reserved. Hal Murray, February 9, 1986 7:02:38 am PST Demers, September 4, 1986 2:21:03 pm PDT TODO: Fix CheckUserBytes. Eventually take out the "these can't hurt" checks in ReadyOutputBuffer. The mgrXXXXWakeup conditions don't need BROADCASTS. Careful that state is never left xxxing after closed! ???? SOMEDAY experiment with making the LOCKS clause work directly on the IO.STREAM, thereby eliminating a level of procedure call from user processes. QUESTION: If there's a halfempty packet buffer AND there's allocation AND we want to send a system packet, does it make sense to send the data packet instead? Note the other end may respond differently to data vs. system packets with AckMe set. Probing strategy is wrong???? I think probing should expect a response on the same order of timeout as waitForAck; multiple probes should drive up the estimated round trip delay ???? Temporary for debugging: Signals and Errors Byte Stream Interface Times The idea is to set the timeout of a condition variable so that the specified number of pulses (+ epsilon) will have elapsed when the condition times out. Epsilon here is 10%, which is pretty crude. I should do some experiments to see how much adjustment Cedar really requires here. Buffer Enqueue and Dequeue Utilities Allocate an SPP buffer for handle. Initialize type and connectionID fields. Make handle.outputEnqueue nonempty. Make sure the buffer is actually allocated and set its connCtl, sst and seqNum fields. PRECONDITION: handle.outputEnqueue.state = empty Initialize output buffer for system packet. Performs the same buffer initialization as OutputMakeNotEmpty above, and sets buffer length field. Enqueue the finger+buffer pointed to by handle.outputEnqueue; set its length field, set its state to full and NOTIFY that an output buffer is ready. PRECONDITION: handle.outputEnqueue.state = halfFull Fill in time-dependent fields of output buffer before sending it. Called before handle.outputSend pointer is advanced. Acknowledgement request ... Acknowledgement ... Allocation ... handle.heWantsAlloc _ FALSE; -- I THINK THIS IS BOGUS ???? Do we need sentAllocNum at all ???? THESE CAN'T HURT, BUT WHAT ARE THEY DOING HERE ???? I'll find out! buffer.hdr1.type _ spp; buffer.hdr2.sourceConnID _ handle.connectionID; buffer.hdr2.destConnID _ handle.remoteConnectionID; Check whether the other side has asked for allocation and a reasonable number of input buffers have become available. If so, make sure the other side gets the allocation by sending a system packet if necessary: He wants some allocation AND I have enough input buffers to give him a reasonable allocation (i.e., at most half my receive buffers are occupied) There's no data packet at all OR I'm waiting for allocation from him BROADCAST handle.doneEmptying; -- ???? BROADCAST handle.doneFilling; -- ???? BROADCAST handle.doneSending; -- ???? Creation Process.EnableAborts[@handle.doneEmptying]; -- ???? Process.EnableAborts[@handle.doneFilling]; -- ???? Process.EnableAborts[@handle.doneSending]; -- ???? Process.EnableAborts[@handle.mgrShortWakeup]; -- ???? Process.EnableAborts[@handle.mgrLongWakeup]; -- ???? TODO: None of this stuff is very good. I don't know any good retransmission heuristics, and I guess I've got to learn some. Byte Stream Interface Note: After changing ssType it's necessary to have a halfFull buffer as handle.inputEnqueue^; this ensures that the sequence SetSSType[h, t1]; SetSSType[h, t2]; will work correctly. Note this should only time out if there are too many outstanding output in-band attentions queued. Allocate a new buffer for the in-band attention (to determine its sequence number). Fill the in-band attention buffer and enqueue it. If there's no allocation for the attention, make a copy of it to be send it out-of-band. Note: we test finger.attention before anything else. In effect, this means the sst of an attention packet is ignored. Stream Procs Get rid of partly filled buffer. Check for already acknowledged. Make sure there's at least one unsent buffer to get its ackMe bit set. Wait until everything is acknowledged. Output (Push) Processes If (sentFinger # NIL), we've just finished sending it. In that case, change its state from sending to full and BROADCAST handle.doneSending. Then wait for a data buffer to send and return it. Returned value of NIL indicates the handle is closed. Check for closed: Check for no buffer ready to send: Check for no allocation: All the checks succeeded, so we're now committed to sending the buffer: Retransmitter (Mgr) Process Die if no activity for a long time. When direct input is implemented, this might be the place to call get on handle.socket with a getTimeout of 0 (don't wait). NO! Don't do it holding the ML! ???? Periodically flush the routing info cache associated with the socket. I have unacknowledged packets -- THIS IS NOT NECESSARY ???? (next condition implies it) ???? AND I am expecting an acknowledgement I have unsent packets AND I have no allocation An acknowledgement is overdue. Retransmit the packet. No activity for a long time, or else I want allocation and it's overdue. In either case, send a probe. We've been asked to send an acknowledgement immediately. Send it. There's an outstanding request, so we should time out pretty soon. We don't expect anything to happen soon. Input (Pull) Process Release sent packets that have been acknowledged. NOTE: This messes with the OUTPUT queues from an INPUT process. Sanity check. This is a protocol error Discard the attention finger. Advance the finger pointer, make the data buffer available. TODO: Work on "heWantsAlloc" heuristics ???? Do I want to sent an ack immediately if he's used up his allocation? ???? PRECONDITION: NOT b.hdr2.connCtl.system Check for duplicate. Push inputEnqueue so there's room for new packet. The new buffer won't fit, so drop it. Set finger to the slot where the new packet should go. Put the packet there. If it's a duplicate we use the new one, but so what? Mark as acknowledged as many packets as possible and pass them to the client. [handle: XNSSocket.Handle, b: XNSBuf.Buffer, clientData: REF ANY] RETURNS [XNSBuf.Buffer] Note this sends error packets itself, Handle a data packet. Chain of Streams There are three times when we need to be able to locate all of the streams: 1) Smashing them after a rollback, 2) processing duplicate RFCs, and 3) making sure connectionIDs are unique. Unique Connection IDs Chain of Listeners Stream Finalization Making dropped streams get finalized is a bit tricky. In order to get the use count to drop to 0, Push, Pull and Mgr must 1) notice when the stream dies, and 2) NIL out their copy of handle as they return. (The frame doesn't get recycled until after the JOIN.) The current code won't do anything with dropped streams unless they get closed by the other end. Initialization appears in mainline code. Listener Finalization Initialization appears in mainline code. MainLine Code Initialization for stream finalizer Initialization for listener finalizer Initialization for rollback ΚB˜codešœ™Kšœ Οmœ1™Kšžœ%žœU˜ƒKšžœ ˜šœ˜Kšžœ˜K˜™šΟnœžœžœ˜=Kšžœ(žœžœ˜7——K˜—head™Kš  œžœžœ)žœžœ˜TKš œžœžœžœ˜K˜Kšœžœ˜,—™Kšžœžœžœžœ˜K˜šœ žœžœžœ˜8Kšœ!˜!K˜K˜K˜K˜ K˜K˜Kšœ˜K˜K˜ K˜——™Kšœžœ˜,Kšœ2˜2Kšœžœ˜ K˜KšœD˜DK˜š  œžœžœ ˜<šžœ˜Kšžœžœ žœ˜Kšžœžœ/˜:——š  œžœžœ žœ˜žœžœ(žœ˜»Kšœ žœ ˜K˜Kšœ3˜3Kšœ˜K˜ Kšœ˜Kšœ#˜#Kšœ/˜/Kšœ˜Kšœ-˜-šœ9˜9K˜DKšœ.˜.—šœ9˜9K˜@Kšœ-˜-—Kšœžœ˜ Kšœ žœ ˜Kšœžœ˜!Kšœžœ˜!Kšœ˜Kšœ'˜'K˜K˜—š  œžœ˜'Kšžœžœžœžœ˜ Kšžœžœžœ˜šžœ˜ Kšžœžœ˜&Kšžœžœ˜&Kšžœžœ˜"Kšžœžœ˜&—Kšœ4˜4KšœBžœ˜FKšœ3˜3Kšœ>žœ˜BKšœ!˜!Kšœžœ˜—K˜š œžœ*žœ˜AK˜K˜ Kšœ$˜$Kšžœžœ˜K˜Kšœ&˜&Kšœžœ˜K˜Kšœ#˜#Kšœ(˜(K˜$Kšœ$˜$Kšœ&˜&K˜0Kšœ$˜$K˜—K˜š  œžœžœ žœ>žœžœ˜rKšœžœ˜Kšœžœ˜Kšœžœ˜K˜'Kšœžœ˜Kšœ žœ˜*Kšœ žœ˜*šž˜šžœžœ˜Kšžœžœžœ,žœ˜CK˜—KšœJ‘˜YKšœ˜šžœ žœžœ ž˜"Kšžœ žœ+˜=Kšœ4˜4Kšœ˜Kšœ˜Kšžœžœžœžœ˜šžœž˜šœ˜K˜ Kšžœžœ˜šžœžœ˜$KšœG˜GKšœžœ˜ Kšžœ˜—šžœ"žœ˜*KšœG˜GKšœžœ˜ Kšžœ˜—Kšœ2˜2Kšœ¬˜¬Kšœ#˜#Kšžœžœžœ-žœ˜DKšžœžœ;˜EKšœ˜—˜ Kšžœ˜Kšœ˜Kšžœžœ˜šžœž˜Kšœ%žœ˜*KšœNžœ,˜Kšœžœ'˜?Kšžœžœ1˜A—Kšœ˜—šžœ˜ KšœG˜GKšœžœ˜ Kšžœ˜——Kšžœ˜—Kšžœ-˜2Kšžœ˜—Kšœ˜K˜—K˜Kšœ žœžœ˜$šœžœžœžœ˜&K˜Kšœ žœžœ˜Kšœžœ˜K˜—š œž œ˜Kšœžœ˜Kšœ˜Kšœ3˜3Kšœžœ‘˜AKšœ#žœ‘˜BKšžœ˜K˜Kšœ žœ#žœG˜|Kšœ%˜%K˜)K˜K˜—š œžœžœ˜5Kšžœžœžœ˜"šžœ˜ Kšœ&˜&Kšžœ˜—Kšœžœ˜K˜K˜—š  œžœ žœk‘œ!‘œ˜¬K˜bKšœžœ˜Kšœžœ˜ šœ˜šžœžœ˜šžœžœž˜Kšœ:žœ˜@—Kšœ"˜"—šž˜šžœžœž˜Kšœ:žœ˜@—Kšžœ'žœžœžœ˜7K˜$šžœž˜˜ K˜Kšžœžœžœ˜)Kšžœžœ˜Kšžœžœžœ˜#Kš žœžœžœžœžœžœ˜DKšœ˜Kšœ?žœ˜EK˜—˜K˜ Kšœ%˜%Kšœžœ˜K˜Kšžœžœžœ˜(Kšžœžœ˜šžœžœ+žœ-žœ˜yJšœM˜MJšœžœ˜ Jšžœ˜—KšžœPžœžœžœ˜`š žœ žœžœžœžœ˜8JšœJ˜JJšœžœ˜ Kšžœ˜—Kšœ_‘=˜œKšœΔ‘&˜κKšœ žœ:˜EKšœžœ‘˜3Kšœ/˜/šžœ˜ Kšœžœ0˜C—K˜—Kšžœ˜—Kšžœ˜—K˜—K˜K˜——™š   œžœžœžœžœžœ+˜]Kšœžœ˜)Kšœ6˜6K˜—š œžœžœžœ+˜`šžœž˜Kšžœžœžœ8˜I—K˜K˜K˜K˜—š  œž œžœžœ9˜bKšœžœ˜)K˜1K˜K˜—š œžœžœ;˜Wšžœž˜Kšžœžœžœ8˜I—K˜-K˜—K˜š  œžœžœžœ˜BKšœžœ˜)Kšœ˜šž˜Kšœ&˜&Jšžœžœžœ˜Jšžœ ˜Kšžœ˜—K˜—K˜š  œžœžœ4žœžœžœ˜K™ΆKšžœžœžœ˜K˜K˜Kšžœžœžœ˜,šž˜šžœž˜Kšžœžœžœ8˜I—K˜šžœž˜Kšœ žœ˜šœ˜Kšžœ+žœžœ ˜DKšžœžœ˜ —Kšœ žœžœ˜-Kšœ8žœžœ˜GKšžœžœ˜—Kšžœ˜—Kšœ˜Kšœ‘˜;Kšžœ žœ+žœ˜AK˜—K˜š œžœžœžœ˜0Kšœžœ˜)Kšœ˜šž˜Kšœ%˜%Kšžœžœžœ˜Jšžœ ˜Kšžœ˜—K˜—K˜š œžœžœžœ˜OKšžœžœžœ˜K˜K˜šž˜šžœž˜Kšžœžœžœ8˜I—K˜šžœž˜Kšœ'žœ˜.šœ˜Kšžœ+žœžœ ˜DKšžœžœ˜ —Kšœ žœžœ˜-Kšœ žœ˜Kšžœžœ˜—Kšžœ˜—Kšœ*žœ˜0Kšžœ ˜K˜—K˜š  œž œžœ#˜KKšœžœ˜)K˜K˜šž˜Kšœ8˜8Kšžœžœžœ˜Kš žœžœžœžœ‘˜(Kšžœ ˜Kšžœ˜—Kšžœžœžœ#˜4K˜—K˜š œžœžœ0žœ+˜€Kšœb™bKšžœžœžœ˜Kšœ˜K˜Kšœ žœ˜šž˜šžœž˜Kšžœžœžœ8˜I—šžœžœ‘˜=Kšžœ+žœžœžœ ˜IKšžœžœ˜ —Kšœ˜šžœž˜Kšœ žœžœ˜-Kšœ8žœžœ˜GKšžœžœ˜—Kšžœ˜—™SKšžœ$žœžœ˜IKšœžœ7žœ˜OK˜K˜Kšœ0žœ˜5K˜"—™1Kšœ˜Kšžœ!žœ˜BK˜Kšœ*žœ˜1—™Xšžœ/žœ˜7K˜šžœ˜ Kšœ)žœ˜MKšœ(žœžœ˜V—Kšžœ˜K˜——Kšžœžœ ˜K˜K˜—š  œž œžœ5žœ˜~Kšœžœ˜)K˜šž˜KšœE˜EKšžœžœžœ˜Jšžœ ˜Kšžœ˜—K˜—K˜š œžœžœ$žœ$˜mKšžœžœžœ˜K˜Kšœ žœ˜šž˜šžœž˜Kšžœžœžœ8˜I—šžœ žœžœ˜,Kšœ ˜ Kšžœ˜#—Kšžœ!žœžœ˜<šžœ˜ šžœž˜KšžœJ˜NKšžœ-˜1—K˜—Kšžœ˜Kšžœ˜—K˜—K˜š   œžœžœžœ žœžœT˜ŽKšœžœ˜)Kšœ?˜?K˜—K˜š  œžœžœžœžœT˜”Kšžœžœžœ˜K˜K˜šž˜šžœž˜Kšžœžœžœ8˜I—Kšœ˜šžœž˜šœ˜Kšœ˜K˜K™všžœžœ˜Kšœ/˜/Kšžœžœ ˜Bšžœžœ˜Kšœ#˜#šžœžœžœ$ž˜9Kšœ ˜ —Kšœ˜—Kšžœ˜ —šžœ&žœ˜.Kšžœžœ&˜3KšžœA˜G—šžœž˜#Kšžœ:˜@—šžœžœ˜&Kšžœžœ˜#KšžœD˜J—Kšœ˜Kšžœ˜—Kšœ žœžœ˜0šœ ˜ Kšžœ;˜A—Kšžœžœ˜—Kšžœ˜—K˜K˜—š  œžœžœžœžœžœžœžœžœžœ ˜kKšœžœ˜)K˜šž˜KšœC˜CKšžœžœžœ˜Jšžœ ˜Kšžœ˜—˜K˜——š œžœžœžœžœžœžœžœžœžœ˜›Kšžœžœžœ˜K˜K˜K˜šž˜šžœž˜Kšžœžœžœ8˜I—Kšœ˜šžœž˜šœ˜Kšœ˜Kšžœžœžœ˜7K˜Kšžœ&žœžœ˜Kšžœžœ˜%K˜Kšœžœ˜)Kšœžœ˜Kšœ žœ˜#Kšœžœ ˜)Kšœžœ˜ K˜šžœž˜K˜4šžœž˜Kšœžœ žœ˜'Kšœ žœ˜Kšžœžœ˜—Kš žœ žœžœžœ‘˜+šžœ˜ šœ˜šœ˜Kšœ"‘˜=Kšœ˜Kšœžœ˜ —šœ˜Kšœ˜Kšœ˜Kšœ˜——K˜—K˜%Kšœ!˜!Kšžœ˜—Kšžœ žœžœ˜2K˜K˜—š   œžœžœžœ%žœ˜oKšžœžœžœ˜Kšœžœ˜K˜šžœ žœžœ˜Kšœ˜Kšž œ˜Kšœ*žœ˜3—šž˜šžœž˜Kšžœžœžœ8˜I—K˜šžœž˜Kšœ$˜$Kšœ žœžœ˜-Kšœ žœ˜šœ˜Kšžœ+žœžœžœ ˜IKšžœžœ˜ —Kšžœžœ˜—šžœžœ žœ˜'Kšœ*žœ˜1Kšžœ˜—K˜Kšžœ˜Kšžœ˜—K˜—K˜š  œžœžœ%˜=Kšžœžœžœ˜K˜Kšž œ˜K˜—K˜š  œžœžœžœžœ˜-Kšœžœ˜)Kšžœ˜K˜—K˜š   œžœžœžœžœ˜:Kšžœžœžœ˜K˜šž˜Kš žœžœžœžœžœ8˜_K˜šžœž˜Kšœ$žœ˜+Kšœžœžœ˜/šœ žœ˜Kšœžœ/žœ žœ%˜ŒK˜—Kšœ žœžœ˜Kšžœžœ˜—Kšžœ˜—K˜—K˜š   œžœžœžœžœ žœ˜EKšœžœ˜)K˜šž˜Kšœ/˜/Kšžœžœžœ˜Kšžœ ˜Kšžœ˜—K˜—K˜š  œžœžœžœžœžœ˜ZKšžœžœžœ˜K˜K˜šž˜šžœž˜Kšžœžœžœ8˜I—K˜šžœž˜šœ˜Kšœ˜šžœžœ.˜DKšžœžœ ˜—šžœ˜Kšžœžœ)˜4—Kšžœ%žœžœ ˜>Kšœ˜Kšžœ˜—šœ ˜ Kšžœ˜Kšžœ˜—šœ ˜ Kšžœžœžœžœ ˜$Kšžœ+žœžœ˜GKšžœ˜Kšžœ˜—Kšžœžœ‘˜"—Kšžœ˜—K˜—K˜š œžœžœ˜Kšœžœ˜)K˜šž˜K˜Kšžœžœžœ˜Kšžœ ˜Kšžœ˜—K˜—K˜š  œžœžœžœ˜DKšžœžœžœ˜Kšœ žœ˜K˜™ šž˜šžœž˜Jšžœžœžœ8˜I—šžœž˜&Jšœ žœžœ˜-Jšœ8žœžœ˜GJšœžœ˜Jšžœžœ˜—Jšžœ˜——™Kšžœ3žœžœ ˜J—™Fšžœ0žœ˜8šž˜šžœž˜Jšžœžœžœ8˜I—šžœž˜&Kšœ'žœ˜.Kšœ žœžœ˜-Kšœ žœ˜šœ˜Kšžœ+žœžœ ˜DKšžœžœ˜ —Kšžœžœ˜—Kšžœ˜—Kšœ*žœ˜1K˜——™&Kšœ#˜#K˜*šž˜šžœž˜Kšžœžœžœ8˜I—Kšžœ4žœžœ˜@Kšžœ+žœžœ ˜DKšžœ˜Kšžœ˜—Kšœ*˜*—Kšžœ ˜K˜—K˜š œžœžœ žœ˜+Kšœžœ˜)Kšœ6˜6K˜K˜——™Kšœžœ˜K˜š   œžœžœ&žœžœžœ˜~K™ŽK™2K™5Kšžœžœžœ˜šžœžœžœ˜Kšœ˜Kšž œ˜—šž˜™Kšžœžœžœ˜—™"šžœ0žœ˜8Kšžœ˜Kšžœ˜——™šžœ<žœ˜DKšžœ˜Kšžœ˜——™Gšœ,žœ˜4Kšœ žœ˜Kšœžœ$˜;KšœD˜DKšœH˜HKšœ+žœžœ"žœ˜Kšœ˜KšœR˜RKšœ˜Kšžœžœ˜5K˜0Kšœ$˜$Kšžœ žœž œ˜2Kšžœ˜K˜——Kšžœ˜—K˜—K˜š œžœ˜Kšœžœ˜Kšœžœ˜K˜Kšœ0˜0K˜šž˜Jšœ9˜9Kšžœžœžœžœ˜KšœC‘˜KKšœ7˜7Kšžœ˜—Kš žœžœžœžœ‘˜4K˜——™Kšœ žœ˜K˜š œžœžœ'˜Ešžœ1žœ˜9Kšœ ˜ K˜.—K˜—K˜š  œžœžœ&žœžœžœ˜}Kšžœžœžœ˜KšœDžœ˜IKšœžœ˜K˜šžœžœž˜Kšœž œ˜:—K˜šž˜™#šžœ;˜=Kšžœ6˜:—Kšžœžœžœ˜—K˜K™‘K˜™EKšœ3˜3—K˜šœ˜™\Kšœ0˜0—šžœ"™%KšžœC˜F——šœ˜™K˜0—šžœ™Kšžœ?˜B——KšœP˜P˜Kšœ6˜6Kšžœ>˜A—K˜šžœžœž˜K˜šœžœ˜(K™6šžœž˜&Kšœžœ˜ Kšœ žœžœ˜-Kšžœžœ˜—Kšœ6‘,˜bKšœžœ˜K˜"K˜K˜Kšžœžœ˜%Kšœžœ˜#Kšžœ˜ —K˜šœžœžœ˜=K™gK˜5Kšœ"˜"Kšœžœ˜#Kšžœ˜ —K˜˜K™BK˜5Kšœ"˜"Kšœžœ˜$Kšžœ˜ —K˜šœžœ˜/K™BKšžœ˜Kšžœ˜—K˜šžœ˜ K™(Kšžœ˜Kšžœ˜——K˜Kšžœ˜—K˜—K˜š œžœ˜Kšœžœ˜Kšœžœ˜K˜Kšœ0˜0K˜šž˜Jšœ8˜8šžœžœž˜šžœ žœ˜KšœC‘˜KKšœ7˜7K˜—šœžœ˜Kšœ0‘˜8Kšœ7˜7Kšœ0˜0K˜—Kšžœžœ˜—Kšžœ˜—K˜K˜——™Kšœžœ˜Kšœ žœ˜Kšœ žœ˜Kšœžœ˜K˜š  œžœžœžœ˜BK™1K™?K˜™ šžœ1žœ˜9K™Kšžœ˜ ——šžœ4ž˜;Kšžœžœžœ˜K˜šžœž˜Kšœžœ˜ šœ ‘2˜?Kšžœ˜Kšžœ˜—Kšžœžœ˜—šžœ˜šžœ˜™K˜ K˜Kš žœžœžœžœžœ˜FKšœ˜Kš žœžœžœžœ‘˜7Kšžœ žœ˜*Kšœ)˜)Kšœžœ˜Kšœ0žœ˜5—K˜—šžœ˜™;K˜Kšž œ˜—K˜——K˜6K˜#Kšžœ˜—Kšžœžœž œ˜=K˜—K˜š   œžœžœžœžœ‘ œ˜`K™wšžœžœ1˜Cšžœ˜Kšœžœ˜Kšž œ˜ Kšž œ˜ —šžœ˜Kšœžœ˜——K˜—K˜š   œžœžœžœ‘ œ˜SKšœ#˜#Jšž œ˜K˜—K˜š  œžœžœžœ˜TK˜Kšœ žœ.˜>Kš žœžœžœžœ‘#˜SKšœžœ˜'šž˜Kšžœžœžœžœ˜Kšžœžœžœ˜!Kšžœžœžœ˜K˜Kšžœ˜—Kšœžœ;˜DKšžœžœžœžœ˜EKšž œ˜K˜—K˜š œžœžœžœžœžœ‘ œ˜‚Kšž œžœ™'K˜™šžœ/žœ˜7Kšžœ˜ ——™1šžœ2žœžœ˜\K™%Kšžœ˜ —šžœ6ž˜=K˜/K˜4Kšžœ˜——™6K˜šžœ2ž˜;K˜Kšžœ˜——™KKšœ˜K˜K˜K˜K˜'K˜—™Mšžœ žœ/ž˜YKšž œ˜K˜,K˜'Kšžœ˜——K˜—K˜š œ˜&Kšœ9žœžœžœ™YKšœ˜Kšœžœ‘˜Kšžœžœ˜1K˜—š  œž œ&žœ˜YK™&Kšžœžœžœ˜Kšœžœ˜ K˜Kšžœžœžœ˜#Kšœ$˜$šžœž˜K˜˜ Kšžœ˜K˜Kšžœžœ˜šžœž˜šœ9˜9K˜4—šžœ˜ K˜.——Kšžœ˜ Kšœ˜—K˜šœ˜Kšœ˜Kšœ žœ˜Kšœ žœ˜Kšœ žœ˜K˜Kšžœžœ˜šžœžœ˜$KšœH˜HKšœžœ˜ Kšœžœ‘˜$Kšžœ ˜—šžœ)žœ˜1šžœ)žœ˜1KšœN˜NKšœžœ˜ Kšœžœ‘˜&Kšžœ ˜—Kšœ˜—šžœ1žœ˜9KšœN˜NKšœžœ˜ Kšœ$žœ‘˜2Kšžœ ˜—K˜Kšœ,˜,Kšœ,˜,Kšœ0˜0Kšœ˜K™šžœ2ž˜9Kšœ!˜!—K™šžœ4ž˜;Kšœ˜—K™šžœž˜Kšœ4˜4—K˜™šžœžœžœ˜#šžœžœžœ˜%Kšœžœ˜;—Kšœ@˜@Kšžœ žœ˜K˜——K˜Kšžœ˜ K˜K˜—šžœ˜ KšœN˜NKšœžœ˜ Kšžœ˜ Kšœ˜——Kšœ˜—K˜š œžœ˜Kšœžœ˜Kšœ>˜>K˜Cšžœžœž˜Kšžœ(žœžœžœ˜8Kšžœ/žœžœžœ˜?Kšœ)˜)Kšœžœ˜ Kšžœ˜—Kšœ2žœžœ˜