DIRECTORY Basics USING [bytesPerWord, UnsafeBlock], BasicTime USING [GetClockPulses, MicrosecondsToPulses, Pulses], Booting USING [RegisterProcs, RollbackProc], CommDriver USING [sendPriority], Endian USING [CardFromF, FFromCard, FWORD], IO USING [Close, CreateStream, CreateStreamProcs, EndOfStream, STREAM, StreamProcs], PrincOpsUtils USING [ByteBlt, LongCopy], Process USING [Abort, Detach, DisableTimeout, EnableAborts, MsecToTicks, Pause, priorityBackground, priorityForeground, SecondsToTicks, SetPriority, SetTimeout, Ticks], Pup USING [Address, nullAddress, Socket], PupBuffer USING [Buffer, BufferObject, ByteIndex, maxDataBytes, maxNewGatewayBytes], PupHop USING [GetHop, Hop], PupName USING [IsWellKnown], PupSocket USING [AllocBuffer, AppendRope, CreateServer, CreateEphemeral, Destroy, ExtractAbortRope, ExtractErrorRope, FixupSourceAndDest, FreeBuffer, Get, GetLocalAddress, GetRemoteAddress, GetUserBytes, GetUniqueID, Kick, Put, SetGetTimeout, SetRemoteAddress, SetUserBytes, SetUserHWords, SetUserSize, Socket, waitForever], PupSocketBackdoor USING [AllocRecvBuffer, PutAgain, PutFirst, ReturnToSenderNoFree, SetDirectReceive, SetSoftwareChecksumming], PupStream USING [CloseReason, FilterProc, ListenerProc, MARK, Milliseconds, waitForever], PupStreamBackdoor USING [], PupType USING [Type], Rope USING [ROPE], SafeStorage USING [EnableFinalization, EstablishFinalization, FinalizationQueue, FQNext, NewFQ]; PupStreamImpl: CEDAR MONITOR LOCKS handle USING handle: Handle IMPORTS BasicTime, Booting, Endian, IO, PrincOpsUtils, Process, PupHop, PupName, PupSocket, PupSocketBackdoor, SafeStorage EXPORTS PupStream, PupStreamBackdoor = { Buffer: TYPE = PupBuffer.Buffer; ByteIndex: TYPE = PupBuffer.ByteIndex; CloseReason: TYPE = PupStream.CloseReason; MARK: TYPE = PupStream.MARK; Milliseconds: TYPE = PupStream.Milliseconds; ROPE: TYPE = Rope.ROPE; Socket: TYPE = PupSocket.Socket; STREAM: TYPE = IO.STREAM; WordAligned: PROC [index: NAT] RETURNS [yes: BOOL] = INLINE { RETURN[(index MOD Basics.bytesPerWord) = 0]; }; SocketNotWellKnown: PUBLIC ERROR = CODE; Timeout: PUBLIC SIGNAL = CODE; StreamClosing: PUBLIC ERROR [why: CloseReason, text: ROPE] = CODE; useDirectInput: BOOL _ TRUE; useInLineCopy: BOOL _ FALSE; disableSoftwareChecksums: BOOL _ FALSE; defaultNumberOfBuffers: NAT _ 32; -- These are >1500 bytes, not single pages maxBufferSize: NAT _ maxDataBytes; SetMaxBufferSize: PUBLIC PROC [size: NAT] ~ { maxBufferSize _ size }; maxDataBytes: NAT = PupBuffer.maxDataBytes; -- Max bytes in a packet maxNewGatewayBytes: NAT = PupBuffer.maxNewGatewayBytes; streamProcs: REF IO.StreamProcs = IO.CreateStreamProcs [ variety: $inputOutput, class: $Pup, getChar: GetChar, endOf: EndOf, charsAvail: CharsAvail, unsafeGetBlock: UnsafeGetBlock, putChar: PutChar, unsafePutBlock: UnsafePutBlock, flush: Flush, close: Close ]; oneSecond: BasicTime.Pulses = BasicTime.MicrosecondsToPulses[1D6]; tenSeconds: BasicTime.Pulses = BasicTime.MicrosecondsToPulses[10D6]; retransmitPulses: BasicTime.Pulses = BasicTime.MicrosecondsToPulses[3D6]; probePulses: BasicTime.Pulses = BasicTime.MicrosecondsToPulses[1*60D6]; duplicateRfcReceived: LONG CARDINAL _ 0; funnyPacketType: LONG CARDINAL _ 0; acksDefered: LONG CARDINAL _ 0; oldAcks: LONG CARDINAL _ 0; oldPackets: LONG CARDINAL _ 0; futurePackets: LONG CARDINAL _ 0; clumpsRetransmitted: LONG CARDINAL _ 0; packetsRetransmitted: LONG CARDINAL _ 0; droppedStreams: LONG CARDINAL _ 0; finishedStreams: LONG CARDINAL _ 0; Finger: TYPE = REF FingerObject; FingerObject: TYPE = RECORD [ state: {empty, halfFull, halfEmpty, full} _ empty, index: ByteIndex _ 0, bytes: ByteIndex _ 0, next: Finger _ NIL, mark: BOOL _ FALSE, ack: BOOL _ FALSE, inProgress: BOOL _ FALSE, buffer: Buffer ]; Handle: TYPE = REF Object; Object: TYPE = MONITORED RECORD [ outputWrite, outputRead, outputToAck: Finger _ NIL, inputWrite, inputRead: Finger _ NIL, socket: Socket _ NIL, remote: Pup.Address _ NULL, connectionId: Endian.FWORD, outputFlushed: CONDITION, outputSpace: CONDITION, outputReady: CONDITION, retransmitter, push1, push2, push3: PROCESS, pushId: LONG CARDINAL, ackedId: LONG CARDINAL, pushPackets: CARDINAL _ 0, pushBytes: CARDINAL _ 0, pushPacketSize: CARDINAL _ 0, outputBuffersToUse: CARDINAL _ 0, outputBuffersAllocated: CARDINAL _ 0, outputBuffersReady: CARDINAL _ 0, outputBuffersInFlight: CARDINAL _ 0, timeOfLastPush: BasicTime.Pulses _ BasicTime.GetClockPulses[], retransmitting: BOOL _ FALSE, flush: BOOL _ FALSE, inputReady: CONDITION, pull: PROCESS, pullId: LONG CARDINAL, pullPackets: CARDINAL _ 0, pullBytes: CARDINAL _ 0, inputBuffersToUse: CARDINAL _ 0, inputBuffersAllocated: CARDINAL _ 0, inputBuffersAvailable: CARDINAL _ 0, timeOfLastArrival: BasicTime.Pulses _ BasicTime.GetClockPulses[], stateChange: CONDITION, alloc: CONDITION, ack: CONDITION, allocNeeded: BOOL _ FALSE, sendAttenIdSent: LONG CARDINAL, sendAttenIdAcked: LONG CARDINAL, sendAttention: CONDITION, -- ABORTs NOT ENABLED recvAttenIdArrived: LONG CARDINAL, recvAttenIdSeen: LONG CARDINAL, recvAttention: CONDITION, err: ROPE _ NIL, -- # NIL => StreamClosed why: CloseReason _ NULL, bufferSize: NAT, next: Handle _ NIL, dead: BOOL _ FALSE ]; Listener: TYPE = REF ListenerRep; ListenerRep: PUBLIC TYPE = RECORD [ local: Pup.Socket, worker: PupStream.ListenerProc, clientData: REF ANY, getTimeout: Milliseconds, putTimeout: Milliseconds, filter: PupStream.FilterProc, -- NIL => Accept all requests echoFilter: PupStream.FilterProc, -- NIL => Answer all echos listen: PROCESS ]; Sockets: TYPE = REF SocketsRep; SocketsRep: PUBLIC TYPE = RECORD [ used: BOOL, socket: Socket ]; Create: PUBLIC PROC [remote: Pup.Address, getTimeout, putTimeout: Milliseconds] RETURNS [STREAM] = { handle: Handle _ NewObject[remote, PupSocket.GetUniqueID[], NIL, getTimeout, putTimeout]; OpenConnection[handle]; SetupObject[handle]; ExchangeAcks[handle]; RETURN[HandleToStream[handle]]; }; Abort: PUBLIC PROC [self: STREAM, err: ROPE] = { handle: Handle = NARROW[self.streamData]; IF handle.err # NIL THEN RETURN; SendAbort[handle, err]; }; SendMark: PUBLIC PROC [self: STREAM, mark: MARK] = { handle: Handle = NARROW[self.streamData]; finger: Finger; DO CheckForClosed[handle]; finger _ handle.outputWrite; SELECT finger.state FROM empty => { finger.state _ halfFull; finger.mark _ TRUE; finger.index _ 1; finger.bytes _ handle.pushPacketSize; EXIT; }; halfFull => FinishOutputFinger[handle]; ENDCASE => WaitOutputSpace[handle]; ENDLOOP; finger.buffer.byte[0] _ mark; FinishOutputFinger[handle]; }; ConsumeMark: PUBLIC PROC [self: STREAM] RETURNS [mark: MARK] = { handle: Handle = NARROW[self.streamData]; finger: Finger _ handle.inputRead; DO SELECT finger.state FROM full => { finger.state _ halfEmpty; finger.index _ 0; }; halfEmpty => NULL; ENDCASE => WaitInputReady[handle]; IF finger.mark THEN EXIT; finger _ NotifyInputSpace[handle]; MaybeSendAck[handle]; ENDLOOP; mark _ finger.buffer.byte[0]; [] _ NotifyInputSpace[handle]; MaybeSendAck[handle]; }; GetChar: PROC [self: STREAM] RETURNS [char: CHAR] = { handle: Handle = NARROW[self.streamData]; finger: Finger _ handle.inputRead; index: ByteIndex; DO SELECT finger.state FROM full => { finger.state _ halfEmpty; finger.index _ 0; EXIT; }; halfEmpty => EXIT; ENDCASE => WaitInputReady[handle]; ENDLOOP; IF finger.mark THEN ERROR IO.EndOfStream[self]; index _ finger.index; char _ finger.buffer.char[index]; index _ index.SUCC; finger.index _ index; IF finger.index = finger.bytes THEN { [] _ NotifyInputSpace[handle]; MaybeSendAck[handle]; }; }; EndOf: PROC [self: STREAM] RETURNS [BOOL] = { handle: Handle = NARROW[self.streamData]; finger: Finger _ handle.inputRead; SELECT finger.state FROM empty => NULL; halfEmpty, full => RETURN[finger.mark]; ENDCASE => NULL; CheckForClosed[handle]; RETURN[FALSE]; }; CharsAvail: PROC [self: STREAM, wait: BOOL] RETURNS [INT] = { handle: Handle = NARROW[self.streamData]; finger: Finger _ handle.inputRead; DO SELECT finger.state FROM full => { finger.state _ halfEmpty; finger.index _ 0; EXIT; }; halfEmpty => EXIT; ENDCASE => IF wait THEN WaitInputReady[handle] ELSE RETURN[0]; ENDLOOP; IF finger.mark THEN RETURN[INT.LAST]; RETURN[finger.bytes-finger.index]; }; UnsafeGetBlock: PROC [self: STREAM, block: Basics.UnsafeBlock] RETURNS [nBytesRead: INT _ 0] = { handle: Handle = NARROW[self.streamData]; finger: Finger _ handle.inputRead; base: LONG POINTER = LOOPHOLE[block.base]; startIndex: INT _ block.startIndex; stop: INT = block.startIndex + block.count; nBytes: NAT; WHILE startIndex < stop DO DO SELECT finger.state FROM full => { finger.state _ halfEmpty; finger.index _ 0; EXIT; }; halfEmpty => EXIT; ENDCASE => { IF handle.err # NIL AND nBytesRead # 0 THEN RETURN; WaitInputReady[handle]; }; ENDLOOP; IF finger.mark THEN RETURN; -- EOF IF useInLineCopy AND WordAligned[startIndex] AND WordAligned[stop] AND WordAligned[finger.index] AND WordAligned[finger.bytes] THEN { toBytes: NAT = stop-startIndex; fromBytes: NAT = finger.bytes-finger.index; words: NAT = MIN[toBytes, fromBytes] / Basics.bytesPerWord; -- Round down TRUSTED { PrincOpsUtils.LongCopy[ to: base + startIndex / Basics.bytesPerWord, nwords: words, from: @finger.buffer.byte + finger.index / Basics.bytesPerWord]; }; nBytes _ words * Basics.bytesPerWord; } ELSE TRUSTED { nBytes _ PrincOpsUtils.ByteBlt[ to: [ blockPointer: base, startIndex: startIndex, stopIndexPlusOne: stop], from: [ blockPointer: @finger.buffer.byte, startIndex: finger.index, stopIndexPlusOne: finger.bytes] ]; }; startIndex _ startIndex + nBytes; finger.index _ finger.index + nBytes; IF finger.index = finger.bytes THEN { finger _ NotifyInputSpace[handle]; MaybeSendAck[handle]; }; nBytesRead _ nBytesRead + nBytes; ENDLOOP; }; GetBlock: PROC [self: STREAM, block: REF TEXT, startIndex: NAT _ 0, count: NAT _ NAT.LAST] RETURNS [nBytesRead: NAT _ 0] = { handle: Handle = NARROW[self.streamData]; finger: Finger _ handle.inputRead; base: LONG POINTER = LOOPHOLE[block, LONG POINTER]+TEXT[0].SIZE; stop: NAT = MIN[(startIndex+count), block.maxLength]; nBytes: NAT; WHILE startIndex < stop DO DO SELECT finger.state FROM full => { finger.state _ halfEmpty; finger.index _ 0; EXIT; }; halfEmpty => EXIT; ENDCASE => { IF handle.err # NIL AND nBytesRead # 0 THEN RETURN; WaitInputReady[handle]; }; ENDLOOP; IF finger.mark THEN RETURN; -- EOF IF useInLineCopy AND WordAligned[startIndex] AND WordAligned[stop] AND WordAligned[finger.index] AND WordAligned[finger.bytes] THEN { toBytes: NAT = stop-startIndex; fromBytes: NAT = finger.bytes-finger.index; words: NAT = MIN[toBytes, fromBytes] / Basics.bytesPerWord; -- Round down TRUSTED { PrincOpsUtils.LongCopy[ to: base + startIndex / Basics.bytesPerWord, nwords: words, from: @finger.buffer.byte + finger.index / Basics.bytesPerWord]; }; nBytes _ words * Basics.bytesPerWord; } ELSE TRUSTED { nBytes _ PrincOpsUtils.ByteBlt[ to: [ blockPointer: base, startIndex: startIndex, stopIndexPlusOne: stop], from: [ blockPointer: @finger.buffer.byte, startIndex: finger.index, stopIndexPlusOne: finger.bytes] ]; }; startIndex _ startIndex + nBytes; block.length _ startIndex; finger.index _ finger.index + nBytes; IF finger.index = finger.bytes THEN { finger _ NotifyInputSpace[handle]; MaybeSendAck[handle]; }; nBytesRead _ nBytesRead + nBytes; ENDLOOP; }; PutChar: PROC [self: STREAM, char: CHAR] = { handle: Handle = NARROW[self.streamData]; finger: Finger _ handle.outputWrite; index: ByteIndex; DO CheckForClosed[handle]; SELECT finger.state FROM empty => { finger.state _ halfFull; finger.mark _ FALSE; finger.index _ 0; finger.bytes _ handle.pushPacketSize; EXIT; }; halfFull => EXIT; ENDCASE => WaitOutputSpace[handle]; ENDLOOP; index _ finger.index; finger.buffer.char[finger.index] _ char; index _ index.SUCC; finger.index _ index; IF finger.index = finger.bytes THEN FinishOutputFinger[handle]; }; UnsafePutBlock: PROC [self: STREAM, block: Basics.UnsafeBlock] = { handle: Handle = NARROW[self.streamData]; base: LONG POINTER = LOOPHOLE[block.base]; startIndex: INT _ block.startIndex; stop: INT = block.startIndex + block.count; nBytes: INT; WHILE startIndex < stop DO finger: Finger _ handle.outputWrite; DO CheckForClosed[handle]; SELECT finger.state FROM empty => { finger.state _ halfFull; finger.mark _ FALSE; finger.index _ 0; finger.bytes _ handle.pushPacketSize; EXIT; }; halfFull => EXIT; ENDCASE => WaitOutputSpace[handle]; ENDLOOP; IF useInLineCopy AND WordAligned[startIndex] AND WordAligned[stop] AND WordAligned[finger.index] AND WordAligned[finger.bytes] THEN { toBytes: NAT = finger.bytes-finger.index; fromBytes: NAT = stop-startIndex; words: NAT = MIN[toBytes, fromBytes] / Basics.bytesPerWord; -- Round down TRUSTED { PrincOpsUtils.LongCopy[ to: @finger.buffer.byte + (finger.index / Basics.bytesPerWord), nwords: words, from: base + (startIndex / Basics.bytesPerWord)]; }; nBytes _ words * Basics.bytesPerWord; } ELSE TRUSTED { nBytes _ PrincOpsUtils.ByteBlt[ to: [ blockPointer: @finger.buffer.byte, startIndex: finger.index, stopIndexPlusOne: finger.bytes], from: [ blockPointer: base, startIndex: startIndex, stopIndexPlusOne: stop] ]; }; startIndex _ startIndex + nBytes; finger.index _ finger.index + nBytes; IF finger.index = finger.bytes THEN FinishOutputFinger[handle]; ENDLOOP; }; PutBlock: PROC [self: STREAM, block: REF READONLY TEXT, startIndex: NAT _ 0, count: NAT _ NAT.LAST] = { handle: Handle = NARROW[self.streamData]; base: LONG POINTER = LOOPHOLE[block, LONG POINTER]+TEXT[0].SIZE; stop: NAT = MIN[(startIndex+count), block.length]; nBytes: NAT; WHILE startIndex < stop DO finger: Finger _ handle.outputWrite; DO CheckForClosed[handle]; SELECT finger.state FROM empty => { finger.state _ halfFull; finger.mark _ FALSE; finger.index _ 0; finger.bytes _ handle.pushPacketSize; EXIT; }; halfFull => EXIT; ENDCASE => WaitOutputSpace[handle]; ENDLOOP; IF useInLineCopy AND WordAligned[startIndex] AND WordAligned[stop] AND WordAligned[finger.index] AND WordAligned[finger.bytes] THEN { toBytes: NAT = finger.bytes-finger.index; fromBytes: NAT = stop-startIndex; words: NAT = MIN[toBytes, fromBytes] / Basics.bytesPerWord; -- Round down TRUSTED { PrincOpsUtils.LongCopy[ to: @finger.buffer.byte + (finger.index / Basics.bytesPerWord), nwords: words, from: base + (startIndex / Basics.bytesPerWord)]; }; nBytes _ words * Basics.bytesPerWord; } ELSE TRUSTED { nBytes _ PrincOpsUtils.ByteBlt[ to: [ blockPointer: @finger.buffer.byte, startIndex: finger.index, stopIndexPlusOne: finger.bytes], from: [ blockPointer: base, startIndex: startIndex, stopIndexPlusOne: stop] ]; }; startIndex _ startIndex + nBytes; finger.index _ finger.index + nBytes; IF finger.index = finger.bytes THEN FinishOutputFinger[handle]; ENDLOOP; }; Push: PUBLIC PROC [self: STREAM] = { handle: Handle = NARROW[self.streamData]; SELECT handle.outputWrite.state FROM halfFull => FinishOutputFinger[handle]; ENDCASE => NULL; }; Flush: PROC [self: STREAM] = { handle: Handle = NARROW[self.streamData]; SELECT handle.outputWrite.state FROM halfFull => { handle.flush _ TRUE; FinishOutputFinger[handle]; }; ENDCASE => NULL; IF handle.outputBuffersReady = 0 THEN RETURN; -- Skip closed test IF ~handle.flush THEN SendProbe[handle]; handle.flush _ TRUE; WHILE handle.flush DO CheckForClosed[handle]; DO WaitOutputFlushed[handle]; CheckForClosed[handle]; IF ~handle.flush THEN RETURN; SIGNAL Timeout; ENDLOOP; ENDLOOP; }; Close: PROC [self: STREAM, abort: BOOL] = { handle: Handle = NARROW[self.streamData]; IF handle = NIL THEN RETURN; IF abort AND handle.err = NIL THEN Abort[self, "Close with Abort"]; IF ~abort AND handle.err = NIL THEN Flush[self ! StreamClosing, Timeout => CONTINUE ]; IF handle.outputBuffersReady # 0 AND handle.err = NIL THEN Abort[self, "Close with unFlushed data"]; IF handle.err = NIL THEN CloseConnection[handle]; self.streamData _ NIL; CloseHandle[handle]; }; CloseHandle: PROC [handle: Handle] = { IF CloseHandleInternal[handle] THEN RETURN; TRUSTED { JOIN handle.retransmitter; JOIN handle.push1; JOIN handle.push2; JOIN handle.push3; JOIN handle.pull; }; FreeBuffers[handle]; PupSocket.Destroy[handle.socket]; handle.socket _ NIL; }; CloseHandleInternal: ENTRY PROC [handle: Handle] RETURNS [alreadyDead: BOOL] = { alreadyDead _ handle.dead; handle.dead _ TRUE; NOTIFY handle.outputReady; }; SendAttention: PUBLIC PROC [self: IO.STREAM] = { handle: Handle = NARROW[self.streamData]; WHILE handle.sendAttenIdSent = handle.sendAttenIdAcked DO CheckForClosed[handle]; SendInt[handle]; WaitSendAttention[handle]; ENDLOOP; handle.sendAttenIdSent _ handle.sendAttenIdSent.SUCC; }; WaitAttention: PUBLIC PROC [self: IO.STREAM] = { handle: Handle = NARROW[self.streamData]; WHILE handle.recvAttenIdArrived = handle.recvAttenIdSeen DO CheckForClosed[handle]; WaitRecvAttention[handle]; ENDLOOP; handle.recvAttenIdSeen _ handle.recvAttenIdSeen.SUCC; }; CreateListener: PUBLIC PROC [ local: Pup.Socket, worker: PupStream.ListenerProc, getTimeout: Milliseconds, putTimeout: Milliseconds, clientData: REF ANY _ NIL, filter: PupStream.FilterProc _ NIL, -- NIL => Accept all requests echoFilter: PupStream.FilterProc _ NIL] -- NIL => Answer all echos RETURNS [listener: Listener] = { IF ~PupName.IsWellKnown[local] THEN ERROR SocketNotWellKnown; listener _ NEW[ListenerRep _ [ local: local, worker: worker, clientData: clientData, getTimeout: getTimeout, putTimeout: putTimeout, filter: filter, echoFilter: echoFilter, listen: NIL ]]; listener.listen _ FORK Listen[listener]; }; DestroyListener: PUBLIC PROC [listener: Listener] = { IF listener.listen = NIL THEN RETURN; TRUSTED { Process.Abort[listener.listen]; JOIN listener.listen; }; listener.listen _ NIL; }; Listen: PROC [listener: Listener] = { socket: Socket _ PupSocket.CreateServer[ local: listener.local, sendBuffers: 0, recvBuffers: 2, getTimeout: PupSocket.waitForever]; DO ENABLE ABORTED => EXIT; b: PupBuffer.Buffer _ PupSocket.Get[socket]; remote: Pup.Address _ b.source; LocalSendRFCReply: PROC [handle: Handle, b: PupBuffer.Buffer] = BEGIN b.address _ PupSocket.GetLocalAddress[handle.socket]; -- The ephemeral socket PupSocket.SetUserSize[b, SIZE[Pup.Address]]; -- Just because. PupSocketBackdoor.ReturnToSenderNoFree[b]; END; -- LocalSendRFCReply SELECT b.type FROM echoMe => { err: ROPE _ NIL; IF listener.echoFilter # NIL THEN err _ listener.echoFilter[listener.clientData, remote]; b.type _ iAmEcho; IF err # NIL THEN { b.type _ abort; b.abort.code _ 0; PupSocket.SetUserHWords[b, 1]; PupSocket.AppendRope[b, err]; }; PupSocketBackdoor.ReturnToSenderNoFree[b]; }; rfc => { err: ROPE _ NIL; handle: Handle; FOR handle: Handle _ chain, handle.next UNTIL handle = NIL DO IF remote # handle.remote THEN LOOP; IF ~handle.dead THEN LocalSendRFCReply[handle, b]; duplicateRfcReceived _ duplicateRfcReceived.SUCC; GOTO Processed; ENDLOOP; IF listener.filter # NIL THEN err _ listener.filter[listener.clientData, remote]; IF err # NIL THEN { -- Reject b.type _ abort; b.abort.code _ 0; PupSocket.SetUserHWords[b, 1]; PupSocket.AppendRope[b, err]; PupSocketBackdoor.ReturnToSenderNoFree[b]; GOTO Processed; }; handle _ NewObject[remote, b.id, NIL, listener.getTimeout, listener.putTimeout]; SetupObject[handle]; -- Moved from ListenInit in order to rfc from the listener.local socket. LocalSendRFCReply[handle, b]; TRUSTED { Process.Detach[FORK ListenInit[listener, handle, remote]]; }; EXITS Processed => NULL; }; ENDCASE => NULL; PupSocket.FreeBuffer[b]; ENDLOOP; PupSocket.Destroy[socket]; }; ListenInit: PROC [listener: Listener, handle: Handle, remote: Pup.Address] = { ExchangeAcks[handle]; listener.worker[HandleToStream[handle], listener.clientData, remote]; }; AllocateSocket: PUBLIC PROC [remote: Pup.Address] RETURNS [sockets: Sockets] = { socket: Socket _ PupSocket.CreateEphemeral[ remote: remote, sendBuffers: defaultNumberOfBuffers + 2, recvBuffers: 2*defaultNumberOfBuffers + 2, getTimeout: 60000 ]; sockets _ NEW[SocketsRep _ [used: FALSE, socket: socket]]; }; LocalAddress: PUBLIC PROC [sockets: Sockets] RETURNS [local: Pup.Address] = { local _ PupSocket.GetLocalAddress[sockets.socket]; }; RemoteAddress: PUBLIC PROC [sockets: Sockets] RETURNS [remote: Pup.Address] = { remote _ PupSocket.GetRemoteAddress[sockets.socket]; }; SocketsFromStream: PUBLIC PROC [self: STREAM] RETURNS [sockets: Sockets] = { handle: Handle = NARROW[self.streamData]; sockets _ NEW[SocketsRep _ [used: TRUE, socket: handle.socket]]; }; SocketsAlreadyUsed: PUBLIC ERROR = CODE; WaitForRendezvous: PUBLIC PROC [sockets: Sockets, getTimeout, putTimeout, waitTimeout: Milliseconds] RETURNS [STREAM] = { socket: Socket _ sockets.socket; IF sockets.used THEN ERROR SocketsAlreadyUsed; sockets.used _ TRUE; PupSocket.SetGetTimeout[socket, waitTimeout]; DO b: PupBuffer.Buffer _ PupSocket.Get[socket]; IF b = NIL THEN ERROR StreamClosing[transmissionTimeout, "No RFC arrived"]; SELECT b.type FROM rfc => { handle: Handle; PupSocket.FixupSourceAndDest[b]; PupSocket.SetRemoteAddress[socket, b.source]; handle _ NewObject[b.source, b.id, sockets, getTimeout, putTimeout]; PupSocket.FreeBuffer[b]; PupSocket.SetGetTimeout[socket, 250]; -- also in NewObject SetupObject[handle]; SendRfc[handle]; ExchangeAcks[handle]; RETURN[HandleToStream[handle]]; }; ENDCASE => NULL; PupSocket.FreeBuffer[b]; ENDLOOP; }; ActivelyRendezvous: PUBLIC PROC [sockets: Sockets, getTimeout, putTimeout: Milliseconds] RETURNS [STREAM] = { handle: Handle; IF sockets.used THEN ERROR SocketsAlreadyUsed; sockets.used _ TRUE; handle _ NewObject[Pup.nullAddress, PupSocket.GetUniqueID[], sockets, getTimeout, putTimeout]; OpenConnection[handle]; SetupObject[handle]; ExchangeAcks[handle]; RETURN[HandleToStream[handle]]; }; NewObject: PROC [ remote: Pup.Address, id: Endian.FWORD, sockets: Sockets, getTimeout, putTimeout: Milliseconds] RETURNS [handle: Handle] = { temp: LONG CARDINAL; handle _ NEW[Object]; TRUSTED { Process.SetTimeout[@handle.outputReady, Process.MsecToTicks[2000]]; Process.EnableAborts[@handle.outputSpace]; IF putTimeout # PupStream.waitForever THEN Process.SetTimeout[@handle.outputFlushed, MillisecondsToTicks[putTimeout]] ELSE Process.DisableTimeout[@handle.outputFlushed]; IF putTimeout # PupStream.waitForever THEN Process.SetTimeout[@handle.outputSpace, MillisecondsToTicks[putTimeout]] ELSE Process.DisableTimeout[@handle.outputSpace]; Process.EnableAborts[@handle.inputReady]; IF getTimeout # PupStream.waitForever THEN Process.SetTimeout[@handle.inputReady, MillisecondsToTicks[getTimeout]] ELSE Process.DisableTimeout[@handle.inputReady]; Process.SetTimeout[@handle.sendAttention, Process.MsecToTicks[9000]]; Process.EnableAborts[@handle.recvAttention]; Process.DisableTimeout[@handle.recvAttention]; Process.SetTimeout[@handle.stateChange, Process.MsecToTicks[4000]]; Process.SetTimeout[@handle.alloc, Process.MsecToTicks[5000]]; Process.SetTimeout[@handle.ack, Process.MsecToTicks[6000]]; }; IF sockets # NIL THEN handle.socket _ sockets.socket ELSE handle.socket _ PupSocket.CreateEphemeral[ remote: remote, sendBuffers: defaultNumberOfBuffers + 2, recvBuffers: 2*defaultNumberOfBuffers + 2, getTimeout: 250 ]; -- also in WaitForRendezvous handle.remote _ PupSocket.GetRemoteAddress[handle.socket]; IF disableSoftwareChecksums THEN PupSocketBackdoor.SetSoftwareChecksumming[handle.socket, FALSE, FALSE]; handle.connectionId _ id; temp _ Endian.CardFromF[handle.connectionId]; handle.pullId _ handle.pushId _ handle.ackedId _ temp; handle.sendAttenIdSent _ handle.sendAttenIdAcked _ temp; handle.recvAttenIdArrived _ handle.recvAttenIdSeen _ temp; }; SetupObject: PROC [handle: Handle] = { AllocateBuffers[handle, defaultNumberOfBuffers, defaultNumberOfBuffers]; handle.retransmitter _ FORK Retransmitter[handle]; handle.push1 _ FORK Pusher[handle]; handle.push2 _ FORK Pusher[handle]; handle.push3 _ FORK Pusher[handle]; handle.pull _ FORK Puller[handle]; AddHandle[globalLock, handle]; SafeStorage.EnableFinalization[handle]; }; ExchangeAcks: PROC [handle: Handle] = { SendAckFirst[handle]; -- First ack for free (IFS doesn't play this game) IF handle.pushPacketSize = 0 THEN Process.Pause[2]; -- Dally (a wee bit) in case of free ack FOR i: NAT IN [0..10) DO IF handle.pushPacketSize # 0 THEN EXIT; IF handle.err # NIL THEN EXIT; SendProbe[handle]; WaitForAllocation[handle]; ENDLOOP; IF handle.err = NIL THEN WaitForAllocation[handle]; -- NOTIFY only IF handle.pushPacketSize = 0 THEN SmashClosed[handle, transmissionTimeout, "No allocate (need packet size)"]; }; HandleToStream: PROC [handle: Handle] RETURNS [STREAM] = { RETURN[IO.CreateStream[streamProcs: streamProcs, streamData: handle]]; }; OpenConnection: PROC [handle: Handle] = { socket: Socket _ handle.socket; retransmisson: BasicTime.Pulses _ oneSecond; start: BasicTime.Pulses; FOR i: NAT IN [0..10) DO SendRfc[handle]; start _ BasicTime.GetClockPulses[]; retransmisson _ MIN[retransmisson*2, tenSeconds]; UNTIL ElapsedPulses[start] > retransmisson DO b: Buffer _ PupSocket.Get[socket]; IF b = NIL THEN LOOP; SELECT b.type FROM rfc => { handle.remote _ b.address; PupSocket.SetRemoteAddress[socket, handle.remote]; PupSocket.FreeBuffer[b]; RETURN; }; error => GotError[handle, b]; abort => GotAbort[handle, b]; ENDCASE => NULL; PupSocket.FreeBuffer[b]; CheckForClosed[handle]; ENDLOOP; CheckForClosed[handle]; ENDLOOP; SmashClosed[handle, transmissionTimeout, "No response from remote site"]; CheckForClosed[handle]; }; CloseConnection: PROC [handle: Handle] = { socket: Socket _ handle.socket; IF handle.err # NIL THEN RETURN; FOR i: NAT IN [0..10) DO SendEnd[handle]; WaitStateChange[handle]; IF handle.err # NIL THEN RETURN; ENDLOOP; SendAbort[handle, "No Response to end"]; }; ElapsedPulses: PROC [startTime: BasicTime.Pulses] RETURNS [BasicTime.Pulses] = INLINE { RETURN[BasicTime.GetClockPulses[] - startTime]; }; Offset: PROC [to, from: LONG CARDINAL] RETURNS [INT] = TRUSTED INLINE { RETURN[LOOPHOLE[(to-from), INT]]; }; EnoughWordsForBytes: PROC [bytes: NAT] RETURNS [NAT] = TRUSTED INLINE { RETURN[CARDINAL[(bytes+Basics.bytesPerWord-1)]/Basics.bytesPerWord]; }; MillisecondsToTicks: PROC [ms: Milliseconds] RETURNS [Process.Ticks] = { SELECT ms FROM PupStream.waitForever => ERROR; < CARDINAL.LAST => RETURN[Process.MsecToTicks[ms]]; ENDCASE => RETURN[Process.SecondsToTicks[ms/1000]]; }; Puller: PROC [handle: Handle] = { socket: Socket _ handle.socket; probes: CARDINAL _ 0; timeOfLastArrival: BasicTime.Pulses _ handle.timeOfLastArrival; Process.SetPriority[Process.priorityForeground]; IF useDirectInput THEN PupSocketBackdoor.SetDirectReceive[socket, Receive, handle]; UNTIL handle.err # NIL DO b: Buffer _ PupSocket.Get[socket]; IF b = NIL THEN { -- Nothing arrived recently IF ElapsedPulses[handle.timeOfLastArrival] < probePulses THEN { probes _ 0; timeOfLastArrival _ handle.timeOfLastArrival; } ELSE { IF ElapsedPulses[timeOfLastArrival] > (probes+1)*probePulses THEN { SendProbe[handle]; probes _ probes.SUCC; }; IF probes > 10 THEN SmashClosed[handle, transmissionTimeout, "no response from remote host"]; }; LOOP; }; b _ Receive[socket, b, handle]; PupSocket.FreeBuffer[b]; ENDLOOP; PupSocketBackdoor.SetDirectReceive[socket, NIL, NIL]; IF handle.why = remoteClose THEN { -- Dally in case he retransmits DO b: Buffer _ PupSocket.Get[socket]; IF b = NIL THEN EXIT; SELECT b.type FROM end => GotEnd[handle, b]; endRep => { PupSocket.FreeBuffer[b]; EXIT; }; ENDCASE => NULL; PupSocket.FreeBuffer[b]; ENDLOOP; }; handle _ NIL; -- Allow Finalization }; Receive: PROC [socket: Socket, b: Buffer, user: REF ANY] RETURNS [Buffer] = { handle: Handle = NARROW[user]; SELECT b.type FROM ack => ProcessAck[handle, b]; data, aData, mark, aMark => { bytes: ByteIndex = PupSocket.GetUserBytes[b]; type: PupType.Type _ b.type; IF bytes # 0 THEN b _ DataPacket[handle, b, bytes]; IF type = aData OR type = aMark THEN { oldAllocNeeded: BOOL _ handle.allocNeeded; handle.allocNeeded _ handle.inputBuffersAvailable < handle.inputBuffersToUse; SELECT TRUE FROM oldAllocNeeded => SendAckFirst[handle]; (bytes = 0) => SendAckFirst[handle]; ~handle.allocNeeded => SendAckAgain[handle, b]; ENDCASE => acksDefered _ acksDefered.SUCC; }; }; end => GotEnd[handle, b]; endRep => GotEndReply[handle, b]; error => GotError[handle, b]; abort => GotAbort[handle, b]; int => GotInt[handle, b]; intRep => GotIntReply[handle, b]; rfc => NULL; ENDCASE => funnyPacketType _ funnyPacketType.SUCC; RETURN[b]; }; DataPacket: ENTRY PROC [handle: Handle, b: Buffer, bytes: ByteIndex] RETURNS [swap: Buffer] = { offset: INT _ Offset[Endian.CardFromF[b.id], handle.pullId]; swap _ b; SELECT offset FROM < 0 => oldPackets _ oldPackets.SUCC; > 0 => futurePackets _ futurePackets.SUCC; -- Missed something 0 => { finger: Finger _ handle.inputWrite; IF finger.state # empty THEN RETURN; swap _ finger.buffer; finger.buffer _ b; finger.mark _ b.type = mark OR b.type = aMark; finger.bytes _ bytes; finger.state _ full; handle.pullId _ handle.pullId + bytes; handle.inputBuffersAvailable _ handle.inputBuffersAvailable.PRED; handle.inputWrite _ finger.next; NOTIFY handle.inputReady; handle.timeOfLastArrival _ BasicTime.GetClockPulses[]; }; ENDCASE => ERROR; }; GotEnd: PROC [handle: Handle, b: Buffer] = { IF b.id # handle.connectionId THEN RETURN; IF b.source # handle.remote THEN RETURN; PupSocketBackdoor.SetDirectReceive[handle.socket, NIL, NIL]; IF handle.err = NIL THEN SmashClosed[handle, remoteClose, "Remote close"]; SELECT handle.why FROM remoteClose => IF handle.outputBuffersReady # 0 THEN RETURN; -- Can't accept yet; ENDCASE; SendEndReplyFirst[handle, b]; }; GotEndReply: PROC [handle: Handle, b: Buffer] = { IF b.id # handle.connectionId THEN RETURN; IF b.source # handle.remote THEN RETURN; PupSocketBackdoor.SetDirectReceive[handle.socket, NIL, NIL]; IF handle.err # NIL THEN RETURN; SmashClosed[handle, localClose, "Local close"]; SendEndReplyFirst[handle, b]; }; GotError: PROC [handle: Handle, b: Buffer] = { IF b.source # handle.remote THEN RETURN; SELECT b.error.code FROM noSocket => SmashClosed[handle, remoteReject, PupSocket.ExtractErrorRope[b]]; resourceLimits, gatewayResourceLimits => handle.outputBuffersToUse _ MIN[1, handle.outputBuffersToUse-1]; ENDCASE => RETURN; -- Not a fatal error }; GotAbort: PROC [handle: Handle, b: Buffer] = { IF b.id # handle.connectionId THEN RETURN; IF b.source # handle.remote THEN RETURN; SmashClosed[handle, remoteReject, PupSocket.ExtractAbortRope[b]]; }; GotInt: PROC [handle: Handle, b: Buffer] = { id: LONG CARDINAL = Endian.CardFromF[b.id]; IF b.source # handle.remote THEN RETURN; IF Offset[id, handle.recvAttenIdArrived] = 1 THEN handle.recvAttenIdArrived _ id; SendIntReplyFirst[handle, b]; NotifyRecvAttention[handle]; }; GotIntReply: PROC [handle: Handle, b: Buffer] = { id: LONG CARDINAL = Endian.CardFromF[b.id]; IF b.source # handle.remote THEN RETURN; IF Offset[id, handle.sendAttenIdSent] = 1 THEN handle.sendAttenIdAcked _ id; NotifySendAttention[handle]; }; SendRfc: PROC [handle: Handle] = { socket: Socket _ handle.socket; b: Buffer _ PupSocket.AllocBuffer[socket]; b.type _ rfc; b.id _ handle.connectionId; b.address _ PupSocket.GetLocalAddress[socket]; PupSocket.SetUserSize[b, SIZE[Pup.Address]]; PupSocketBackdoor.PutFirst[socket, b]; PupSocket.FreeBuffer[b]; }; SendAbort: PROC [handle: Handle, err: ROPE] = { socket: Socket _ handle.socket; b: Buffer _ PupSocket.AllocBuffer[socket]; b.type _ abort; b.id _ handle.connectionId; b.abort.code _ 0; PupSocket.SetUserHWords[b, 1]; PupSocket.AppendRope[b, err]; PupSocket.Put[socket, b]; IF handle.err # NIL THEN RETURN; SmashClosed[handle, localAbort, err]; }; MaybeSendAck: PROC [handle: Handle] = { socket: Socket; b: Buffer; IF ~handle.allocNeeded THEN RETURN; IF handle.inputBuffersAvailable < handle.inputBuffersToUse THEN RETURN; socket _ handle.socket; b _ PupSocket.AllocBuffer[socket]; SendAckAgain[handle, b]; PupSocket.FreeBuffer[b]; }; SendAckFirst: PROC [handle: Handle] = { socket: Socket _ handle.socket; b: Buffer _ PupSocket.AllocBuffer[socket]; bytes: INT = handle.inputBuffersAvailable*LONG[handle.bufferSize]; -- Overflow maxBytes: NAT = 32000; -- ARG! IFS gets confused by CARDINAL.LAST handle.allocNeeded _ handle.inputBuffersAvailable < handle.inputBuffersToUse; b.type _ ack; b.id _ Endian.FFromCard[handle.pullId]; TRUSTED { b.body _ ack[ maxBytesPerPup: handle.bufferSize, maxPupsAhead: handle.inputBuffersAvailable, maxBytesAhead: CARDINAL[MIN[bytes, maxBytes]] ]; }; PupSocket.SetUserHWords[b, 3]; PupSocketBackdoor.PutFirst[socket, b]; PupSocket.FreeBuffer[b]; }; SendAckAgain: PROC [handle: Handle, b: Buffer] = { socket: Socket _ handle.socket; bytes: INT = handle.inputBuffersAvailable*LONG[handle.bufferSize]; -- Overflow maxBytes: NAT = 32000; -- ARG! IFS gets confused by CARDINAL.LAST handle.allocNeeded _ handle.inputBuffersAvailable < handle.inputBuffersToUse; b.type _ ack; b.id _ Endian.FFromCard[handle.pullId]; TRUSTED { b.body _ ack[ maxBytesPerPup: handle.bufferSize, maxPupsAhead: handle.inputBuffersAvailable, maxBytesAhead: CARDINAL[MIN[bytes, maxBytes]] ]; }; PupSocket.SetUserHWords[b, 3]; PupSocketBackdoor.PutAgain[socket, b]; }; SendProbe: PROC [handle: Handle] = { socket: Socket _ handle.socket; b: Buffer _ PupSocket.AllocBuffer[socket]; b.type _ aData; b.id _ Endian.FFromCard[handle.pushId]; PupSocket.SetUserBytes[b, 0]; PupSocketBackdoor.PutFirst[socket, b]; PupSocket.FreeBuffer[b]; }; SendEnd: PROC [handle: Handle] = { socket: Socket _ handle.socket; b: Buffer _ PupSocket.AllocBuffer[socket]; b.type _ end; b.id _ handle.connectionId; PupSocket.SetUserBytes[b, 0]; PupSocket.Put[socket, b]; }; SendEndReplyFirst: PROC [handle: Handle, b: Buffer] = { socket: Socket _ handle.socket; b.type _ endRep; b.id _ handle.connectionId; PupSocket.SetUserBytes[b, 0]; PupSocketBackdoor.PutFirst[socket, b]; }; SendInt: PROC [handle: Handle] = { socket: Socket _ handle.socket; b: Buffer _ PupSocket.AllocBuffer[socket]; b.type _ int; b.id _ Endian.FFromCard[handle.sendAttenIdSent]; PupSocket.SetUserBytes[b, 0]; PupSocket.Put[socket, b]; }; SendIntReplyFirst: PROC [handle: Handle, b: Buffer] = { socket: Socket _ handle.socket; b.type _ intRep; b.id _ Endian.FFromCard[handle.recvAttenIdArrived]; PupSocket.SetUserBytes[b, 0]; PupSocketBackdoor.PutFirst[socket, b]; }; FinishOutputFinger: PROC [handle: Handle] = { finger: Finger _ handle.outputWrite; b: Buffer _ finger.buffer; b.id _ Endian.FFromCard[handle.pushId]; handle.pushId _ handle.pushId + finger.index; PupSocket.SetUserBytes[b, finger.index]; finger.inProgress _ TRUE; finger.state _ full; handle.outputWrite _ finger.next; NotifyOutputReady[handle]; }; Pusher: PROC [handle: Handle] = { socket: Socket _ handle.socket; Process.SetPriority[CommDriver.sendPriority]; -- priorityClient3 = priorityForeground+1 DO finger: Finger _ WaitOutputReady[handle]; b: Buffer; IF handle.err # NIL THEN EXIT; b _ finger.buffer; IF finger.ack THEN b.type _ IF finger.mark THEN aMark ELSE aData ELSE b.type _ IF finger.mark THEN mark ELSE data; PupSocketBackdoor.PutAgain[socket, b]; IF finger.ack THEN handle.timeOfLastPush _ BasicTime.GetClockPulses[]; finger.inProgress _ FALSE; ENDLOOP; handle _ NIL; -- Allow Finalization }; Retransmitter: PROC [handle: Handle] = { socket: Socket _ handle.socket; GetValidOutputToAck: PROC RETURNS [Finger] ~ { finger: Finger _ handle.outputToAck; IF finger.state # full THEN ERROR; WHILE finger.inProgress DO Process.Pause[1]; IF handle.err # NIL THEN RETURN[NIL]; ENDLOOP; RETURN [finger] }; Process.SetPriority[Process.priorityForeground]; UNTIL handle.err # NIL DO WaitAck[handle]; -- 6 seconds IF handle.err # NIL THEN EXIT; IF NotifyOutputSpace[handle] THEN LOOP; -- Should calculate retransmit time ??? IF handle.outputBuffersReady = 0 THEN LOOP; -- Nothing to send IF handle.pushPackets = 0 THEN { SendProbe[handle]; LOOP; }; -- Waiting for Alloc handle.retransmitting _ TRUE; clumpsRetransmitted _ clumpsRetransmitted.SUCC; UNTIL handle.err # NIL DO -- Retransmit each packet until it gets acked finger: Finger; IF NotifyOutputSpace[handle] THEN EXIT; IF (finger _ GetValidOutputToAck[]) = NIL THEN EXIT; Process.Pause[2]; IF NotifyOutputSpace[handle] THEN EXIT; IF (finger _ GetValidOutputToAck[]) = NIL THEN EXIT; finger.buffer.type _ IF finger.mark THEN aMark ELSE aData; PupSocketBackdoor.PutFirst[socket, finger.buffer]; packetsRetransmitted _ packetsRetransmitted.SUCC; handle.timeOfLastPush _ BasicTime.GetClockPulses[]; WaitAck[handle]; -- 6 seconds ENDLOOP; ENDLOOP; handle _ NIL; -- Allow Finalization }; WaitInputReady: PROC [handle: Handle] = { DO CheckForClosed[handle]; WaitInputReadyInner[handle]; CheckForClosed[handle]; -- Avoid AddressFault if Closed IF handle.inputRead.state = full THEN RETURN; SIGNAL Timeout; ENDLOOP; }; WaitInputReadyInner: ENTRY PROC [handle: Handle] = INLINE { ENABLE UNWIND => NULL; IF handle.inputRead.state = full THEN RETURN; WAIT handle.inputReady; }; NotifyInputSpace: ENTRY PROC [handle: Handle] RETURNS [finger: Finger] = { finger _ handle.inputRead; finger.state _ empty; finger _ finger.next; handle.inputRead _ finger; handle.inputBuffersAvailable _ handle.inputBuffersAvailable.SUCC; }; WaitOutputSpace: PROC [handle: Handle] = { DO WaitOutputSpaceInner[handle]; CheckForClosed[handle]; -- Avoid AddressFault if Closed IF handle.outputWrite.state = empty THEN RETURN; SIGNAL Timeout; ENDLOOP; }; WaitOutputSpaceInner: ENTRY PROC [handle: Handle] = INLINE { ENABLE UNWIND => NULL; IF handle.outputWrite.state = empty THEN RETURN; WAIT handle.outputSpace; }; WaitOutputFlushed: ENTRY PROC [handle: Handle] = INLINE { ENABLE UNWIND => NULL; IF handle.outputBuffersReady = 0 THEN handle.flush _ FALSE; IF ~handle.flush THEN RETURN; WAIT handle.outputFlushed; }; NotifyOutputSpace: ENTRY PROC [handle: Handle] RETURNS [progress: BOOL _ FALSE] = { DO finger: Finger _ handle.outputToAck; IF finger.state # full THEN EXIT; IF finger.inProgress THEN EXIT; IF Offset[handle.ackedId, Endian.CardFromF[finger.buffer.id]] <= 0 THEN EXIT; IF finger.ack THEN progress _ ~handle.retransmitting; finger.state _ empty; handle.outputToAck _ finger.next; handle.outputBuffersReady _ handle.outputBuffersReady.PRED; handle.outputBuffersInFlight _ handle.outputBuffersInFlight.PRED; NOTIFY handle.outputSpace; ENDLOOP; IF handle.retransmitting AND handle.outputBuffersInFlight = 0 THEN { progress _ TRUE; handle.retransmitting _ FALSE; }; IF handle.outputBuffersReady = 0 THEN { handle.flush _ FALSE; NOTIFY handle.outputFlushed; }; IF progress THEN { BROADCAST handle.alloc; BROADCAST handle.outputReady; }; }; WaitOutputReady: ENTRY PROC [handle: Handle] RETURNS [finger: Finger] = { DO UNTIL handle.pushPackets > handle.outputBuffersInFlight AND handle.pushBytes # 0 DO IF handle.err # NIL THEN RETURN[NIL]; WAIT handle.alloc; ENDLOOP; IF handle.err # NIL THEN RETURN[NIL]; finger _ handle.outputRead; SELECT finger.state FROM full => IF finger.inProgress AND ~handle.retransmitting THEN EXIT; ENDCASE => NULL; WAIT handle.outputReady; ENDLOOP; handle.outputRead _ handle.outputRead.next; handle.outputBuffersInFlight _ handle.outputBuffersInFlight.SUCC; SELECT TRUE FROM handle.pushPackets = handle.outputBuffersInFlight => finger.ack _ TRUE; handle.flush AND finger.next.state # full => finger.ack _ TRUE; handle.outputBuffersToUse = handle.outputBuffersInFlight => finger.ack _ TRUE; handle.outputBuffersAllocated = handle.outputBuffersInFlight => finger.ack _ TRUE; ENDCASE => finger.ack _ FALSE; }; NotifyOutputReady: ENTRY PROC [handle: Handle] = { handle.outputBuffersReady _ handle.outputBuffersReady.SUCC; NOTIFY handle.outputReady; }; WaitStateChange: ENTRY PROC [handle: Handle] = { IF handle.err # NIL THEN RETURN; WAIT handle.stateChange; }; ProcessAck: ENTRY PROC [handle: Handle, b: Buffer] = { offset: INT _ Offset[Endian.CardFromF[b.id], handle.ackedId]; SELECT offset FROM < 0 => oldAcks _ oldAcks.SUCC; ENDCASE => { handle.ackedId _ handle.ackedId + offset; handle.pushPacketSize _ MIN[b.maxBytesPerPup, handle.bufferSize]; handle.pushPackets _ b.maxPupsAhead; handle.pushBytes _ b.maxBytesAhead; IF handle.pushPackets # 0 THEN BROADCAST handle.ack; -- ExchangeAcks too IF offset # 0 AND handle.pushPackets > handle.outputBuffersInFlight THEN BROADCAST handle.alloc; handle.timeOfLastArrival _ BasicTime.GetClockPulses[]; }; }; WaitAck: ENTRY PROC [handle: Handle] = { finger: Finger _ handle.outputToAck; fingerId: LONG CARDINAL = Endian.CardFromF[finger.buffer.id]; IF finger.state = full AND Offset[handle.ackedId, fingerId] > 0 THEN RETURN; WAIT handle.ack; }; WaitForAllocation: ENTRY PROC [handle: Handle] = { IF handle.pushPackets = 0 OR handle.pushBytes = 0 THEN WAIT handle.ack; NOTIFY handle.alloc; }; NotifyRecvAttention: ENTRY PROC [handle: Handle] = { NOTIFY handle.recvAttention; }; WaitRecvAttention: ENTRY PROC [handle: Handle] = { ENABLE UNWIND => NULL; WAIT handle.recvAttention; }; NotifySendAttention: ENTRY PROC [handle: Handle] = { NOTIFY handle.sendAttention; }; WaitSendAttention: ENTRY PROC [handle: Handle] = { ENABLE UNWIND => NULL; WAIT handle.sendAttention; }; CheckForClosed: PROC [handle: Handle] = { IF handle.err # NIL THEN ERROR StreamClosing[handle.why, handle.err]; }; SmashClosed: ENTRY PROC [handle: Handle, why: PupStream.CloseReason, err: ROPE] = { IF handle.err # NIL THEN RETURN; handle.why _ why; handle.err _ err; BROADCAST handle.stateChange; BROADCAST handle.outputReady; BROADCAST handle.outputFlushed; BROADCAST handle.outputSpace; BROADCAST handle.inputReady; BROADCAST handle.alloc; BROADCAST handle.ack; BROADCAST handle.sendAttention; BROADCAST handle.recvAttention; PupSocket.Kick[handle.socket]; }; GetBufferSize: PROC [remote: Pup.Address] RETURNS [bufferSizes: NAT] = { hop: PupHop.Hop = PupHop.GetHop[remote.net]; IF hop # 0 THEN RETURN[MIN[maxBufferSize, maxNewGatewayBytes]]; RETURN[MIN[maxBufferSize, maxDataBytes]]; }; AllocateBuffers: PROC [handle: Handle, outputBuffers, inputBuffers: NAT] = TRUSTED { socket: PupSocket.Socket _ handle.socket; handle.bufferSize _ GetBufferSize[handle.remote]; FOR i: NAT IN [0..outputBuffers) DO finger: Finger _ NEW[FingerObject _ []]; finger.buffer _ PupSocket.AllocBuffer[socket]; finger.buffer.id _ handle.connectionId; IF handle.outputRead = NIL THEN handle.outputRead _ finger; finger.next _ handle.outputWrite; handle.outputWrite _ finger; ENDLOOP; handle.outputRead.next _ handle.outputWrite; handle.outputRead _ handle.outputWrite; handle.outputToAck _ handle.outputWrite; FOR i: NAT IN [0..inputBuffers) DO finger: Finger _ NEW[FingerObject _ []]; finger.buffer _ PupSocketBackdoor.AllocRecvBuffer[socket]; IF handle.inputRead = NIL THEN handle.inputRead _ finger; finger.next _ handle.inputWrite; handle.inputWrite _ finger; ENDLOOP; handle.inputRead.next _ handle.inputWrite; handle.inputRead _ handle.inputWrite; handle.outputBuffersToUse _ (outputBuffers+1)/2; -- Double Buffering handle.outputBuffersAllocated _ outputBuffers; handle.outputBuffersReady _ 0; handle.inputBuffersToUse _ (inputBuffers+1)/2; -- Double Buffering handle.inputBuffersAllocated _ inputBuffers; handle.inputBuffersAvailable _ inputBuffers; }; FreeBuffers: ENTRY PROC [handle: Handle] = TRUSTED { socket: PupSocket.Socket _ handle.socket; FOR i: NAT IN [0..handle.outputBuffersAllocated) DO finger: Finger _ handle.outputWrite; handle.outputWrite _ finger.next; IF finger.buffer # NIL THEN PupSocket.FreeBuffer[finger.buffer]; finger.next _ NIL; ENDLOOP; FOR i: NAT IN [0..handle.inputBuffersAllocated) DO finger: Finger _ handle.inputWrite; handle.inputWrite _ finger.next; IF finger.buffer # NIL THEN PupSocket.FreeBuffer[finger.buffer]; finger.next _ NIL; ENDLOOP; handle.outputToAck _ NIL; handle.outputRead _ NIL; handle.outputWrite _ NIL; handle.inputRead _ NIL; handle.inputWrite _ NIL; }; globalLock: Handle _ NEW[Object]; chain: Handle _ NIL; AddHandle: ENTRY PROC [handle: Handle, new: Handle] = { new.next _ chain; chain _ new; }; RemoveHandle: ENTRY PROC [handle: Handle, old: Handle] = { IF chain = old THEN { chain _ chain.next; old.next _ NIL; -- Help Finalization RETURN; }; FOR finger: Handle _ chain, finger.next DO -- NIL fault if not on chain IF finger.next = old THEN { finger.next _ old.next; old.next _ NIL; -- Help Finalization RETURN; }; ENDLOOP; }; Rollback: Booting.RollbackProc = { FOR handle: Handle _ chain, handle.next UNTIL handle = NIL DO IF handle.err # NIL THEN LOOP; SmashClosed[handle, localClose, "Rollback"]; ENDLOOP; }; StreamFinalizer: PROC = { Process.SetPriority[Process.priorityBackground]; DO handle: Handle _ NARROW[SafeStorage.FQNext[sfq]]; IF ~handle.dead THEN { -- User forgot to call Destroy SafeStorage.EnableFinalization[handle]; IF handle.err = NIL THEN SendAbort[handle, "Client dropped Stream"]; CloseHandle[handle]; droppedStreams _ droppedStreams.SUCC; } ELSE { -- Normal end of life RemoveHandle[globalLock, handle]; finishedStreams _ finishedStreams.SUCC; }; handle _ NIL; ENDLOOP; }; DropTest: PROC [n: NAT _ 25] = { where: Pup.Address _ [[3],[17B],[0,0,0,1]]; --Ivy FOR i: NAT IN [0..3) DO foo: STREAM _ Create[where, 1000, 1000]; IO.Close[foo]; IO.Close[foo]; Process.Pause[Process.MsecToTicks[10000]]; ENDLOOP; FOR i: NAT IN [0..n) DO foo: STREAM _ Create[where, 1000, 1000]; Abort[foo," Testing..."]; Process.Pause[Process.MsecToTicks[10000]]; ENDLOOP; }; sfq: SafeStorage.FinalizationQueue _ SafeStorage.NewFQ[]; SafeStorage.EstablishFinalization[type: Object.CODE, npr: 1, fq: sfq]; TRUSTED { Process.Detach[FORK StreamFinalizer[]]; }; Booting.RegisterProcs[r: Rollback]; }. žPupStreamImpl.mesa Copyright c 1986 by Xerox Corporation. All rights reserved. Hal Murray, November 4, 1986 3:04:52 pm PST Doug Wyatt, June 10, 1986 6:08:40 pm PDT Demers, June 12, 1986 8:26:52 am PDT Tim Diebert: October 13, 1986 7:07:47 pm PDT Copied Types Should move to Endian ?? ERRORs Performance tuning Copied/Derived Values Max bytes through Old Gateways. Beware: There may be a lot of old ones out there! getBlock: GetBlock, putBlock: PutBlock, Counters Our Types Attentions: Out of band signaling Byte Stream Interface handle.flush _ TRUE; Give user tail of data before StreamClosing Special check for the normal/easy case. Give user tail of data before StreamClosing Special check for the normal/easy case. Special check for the normal/easy case. Special check for the normal/easy case. Listeners Moved from ListenInit in order to rfc from the listener.local socket. (Diebert, et al Demers) This RFC corresponds to a known connection. Retransmit our answer. Can't use this buffer or accounting gets confused. New RFC Accept SetupObject[handle]; SendRfc[handle]; Rendezvous Arg, Socket left dangling. Connection management 2+4+8+10+10+10+10+10+10+10 => 84 seconds before timeout Arg, Socket left dangling. Utilities Round up so allocation or LongCopy gets everything. The CARDINAL avoids a KFCB for a signed divide. Recv Side We could actually get here by accident if we are receiving a steady stream of traffic long enough for the pulses clock to wrap around. That shouldn't do anything worse than inject an extra probe. Send Side Beware: The fan gets real dirty if the Retransmitter and a Pusher try to send a packet at the same time. Synchronization The main cause of the complexity here is that this routine does the NOTIFY to the pushing processes. Progress has 2 slightly different meanings. Normally, it indicates that an acked buffer was an aData or aMark (we asked for the ack). During retransmissions, it means that the last buffer to be sent was acked. There may be more buffers ready to send, but they haven't been sent yet. Kill the stream and wakeup everybody waiting for anything connected with this stream. All processes that WAIT should check handle.err when they wakeup and arrange to go away if it is not NIL. Buffer Allocation Assume that all Cedar machines are going through new Gateways Chain of Streams There are two times when we need to be able to locate all of the streams: 1) Smashing them after a rollback, and 2) processing duplicate RFCs. Beware: handle is the global lock, new is the real handle Beware: handle is the global lock, old is the real handle Finalization Making dropped streams get finalized is a bit tricky. In order to get the use count to drop to 0, Push and Pull must 1) notice when the stream dies, and 2) NIL out their copy of handle as they return. (The frame doesn't get recycled until after the JOIN.) The current code won't do anything with dropped streams unless they get closed by the other end. Ê9»˜codešœ™Kšœ Ïmœ1™KšžœžœT˜zKšžœ!˜(—head™ Kšœžœ˜ Kšœ žœ˜&Kšœ žœ˜*Kšžœžœ žœ˜Kšœžœ˜,Kšžœžœžœ˜Kšœžœ˜ šžœžœžœžœ˜K˜——™š Ÿ œžœ žœžœžœžœ˜=Kšžœžœ˜/K˜——šœ™KšŸœžœžœžœ˜(KšŸœžœžœžœ˜š Ÿ œžœžœžœžœ˜BK˜——™Kšœžœžœ˜Kšœžœžœ˜šœžœžœ˜'K™—KšœžœÏc*˜LKšœžœ˜"K˜šŸœžœžœžœ˜EK˜——™Kšœžœ ˜Dšœžœ ˜7KšœQ™Q——˜šœ žœžœžœ˜8Kšœ#˜#K˜K˜ K˜K™Kšœ˜K˜Kšœ™Kšœ˜K˜ K˜K˜—KšœB˜BKšœD˜DKšœI˜IšœG˜GK˜——™Kšœžœžœ˜(K˜Kšœžœžœ˜#Kšœ žœžœ˜Kšœ žœžœ˜Kšœ žœžœ˜Kšœžœžœ˜!K˜Kšœžœžœ˜'Kšœžœžœ˜(K˜Kšœžœžœ˜"Kšœžœžœ˜#—™ Kšœžœžœ˜ šœžœžœ˜K˜2Kšœ˜Kšœ˜Kšœžœ˜Kšœžœžœ˜Kšœžœžœ˜Kšœ žœžœ˜Kšœ˜—K˜Kšœžœžœ˜šœžœž œžœ˜!Kšœ/žœ˜3Kšœ žœ˜$Kšœžœ˜Kšœžœ˜Kšœžœ˜Kšœž œ˜Kšœ ž œ˜Kšœ ž œ˜Kšœ$žœ˜,Kšœž œ˜Kšœ ž œ˜Kšœ žœ˜Kšœ žœ˜Kšœžœ˜Kšœžœ˜!Kšœžœ˜%Kšœžœ˜!Kšœžœ˜$Kšœ>˜>Kšœžœžœ˜Kšœžœžœ˜Kšœ ž œ˜Kšœžœ˜Kšœžœžœ˜Kšœ žœ˜Kšœ žœ˜Kšœžœ˜ Kšœžœ˜$Kšœžœ˜$KšœA˜AKšœ ž œ˜Kšœž œ˜Kšœž œ˜Kšœ žœžœ˜šœ!™!Kšœž œ˜Kšœž œ˜ Kšœž œ ˜/Kšœž œ˜"Kšœž œ˜Kšœž œ˜—Kšœžœžœ ˜)Kšœžœ˜Kšœ žœ˜Kšœžœ˜Kšœžœžœ˜K˜—Kšœ žœžœ ˜!šœ žœžœžœ˜#Kšœ˜Kšœ˜Kšœ žœžœ˜Kšœ˜Kšœ˜Kšœ ˜;Kšœ" ˜—Kšžœ˜—Kš žœ žœžœžœžœ˜%Kšžœ˜"Kšœ˜K˜—š Ÿœžœžœžœžœ ˜`Kšœžœ˜)K˜"Kšœžœžœžœ ˜*Kšœ žœ˜#Kšœžœ"˜+Kšœž˜ šžœž˜šž˜šžœž˜šœ ˜ Kšœ˜Kšœ˜Kšžœ˜—Kšœ žœ˜šžœ˜ š žœžœžœžœžœ˜3Kšœ+™+—Kšœ˜——Kšžœ˜—Kšžœ žœžœ ˜"Kšžœ˜Kšžœžœ˜1šžœžœžœ˜BK™'Kšœ žœ˜Kšœ žœ˜+Kšœžœžœ,  ˜Išžœ˜ šœ˜Kšœ,˜,Kšœ˜KšœC˜C——Kšœ'˜'—šž œ˜šœ˜šœ˜Kšœ˜Kšœ˜Kšœ˜—šœ˜Kšœ"˜"Kšœ˜Kšœ%˜%———Kšœ!˜!Kšœ%˜%šžœžœ˜%Kšœ"˜"Kšœ˜—Kšœ!˜!Kšžœ˜—Kšœ˜K˜—šŸœžœžœ žœžœžœ žœžœžœžœžœ ˜|Kšœžœ˜)K˜"Kšœžœžœžœžœžœžœžœ˜@KšœžœžœÏrœ˜5Kšœž˜ šžœž˜šž˜šžœž˜šœ ˜ Kšœ˜Kšœ˜Kšžœ˜—Kšœ žœ˜šžœ˜ š žœžœžœžœžœ˜3Kšœ+™+—Kšœ˜——Kšžœ˜—Kšžœ žœžœ ˜"Kšžœ˜Kšžœžœ˜1šžœžœžœ˜BK™'Kšœ žœ˜Kšœ žœ˜+Kšœžœžœ,  ˜Išžœ˜ šœ˜Kšœ,˜,Kšœ˜KšœC˜C——Kšœ'˜'—šž œ˜šœ˜šœ˜Kšœ˜Kšœ˜Kšœ˜—šœ˜Kšœ"˜"Kšœ˜Kšœ%˜%———Kšœ!˜!Kš¡˜Kšœ%˜%šžœžœ˜%Kšœ"˜"Kšœ˜—Kšœ!˜!Kšžœ˜—Kšœ˜K˜—šŸœžœžœžœ˜,Kšœžœ˜)K˜$K˜šž˜Kšœ˜šžœž˜šœ ˜ Kšœ˜Kšœžœ˜Kšœ˜Kšœ%˜%Kšžœ˜—Kšœ žœ˜Kšžœ˜#—Kšžœ˜—Kšœ˜Kšœ(˜(Kšœžœ˜Kšœ˜Kšžœžœ˜?Kšœ˜K˜—šŸœžœžœ ˜BKšœžœ˜)Kšœžœžœžœ ˜*Kšœ žœ˜#Kšœžœ"˜+Kšœžœ˜ šžœž˜K˜$šž˜Kšœ˜šžœž˜šœ ˜ Kšœ˜Kšœžœ˜Kšœ˜Kšœ%˜%Kšžœ˜—Kšœ žœ˜Kšžœ˜#—Kšžœ˜—Kšžœ˜Kšžœžœ˜1šžœžœžœ˜BK™'Kšœ žœ˜)Kšœ žœ˜!Kšœžœžœ,  ˜Išžœ˜ šœ˜Kšœ?˜?Kšœ˜Kšœ4˜4——Kšœ'˜'—šž œ˜šœ˜šœ˜Kšœ"˜"Kšœ˜Kšœ ˜ —šœ˜Kšœ˜Kšœ˜Kšœ˜———Kšœ!˜!Kšœ%˜%Kšžœžœ˜?Kšžœ˜—Kšœ˜K˜—šŸœžœžœ žœžœžœžœ žœžœ˜gKšœžœ˜)Kšœžœžœžœžœžœžœžœ˜@Kšœžœžœ¡ œ˜2Kšœžœ˜ šžœž˜K˜$šž˜Kšœ˜šžœž˜šœ ˜ Kšœ˜Kšœžœ˜Kšœ˜Kšœ%˜%Kšžœ˜—Kšœ žœ˜Kšžœ˜#—Kšžœ˜—Kšžœ˜Kšžœžœ˜1šžœžœžœ˜BK™'Kšœ žœ˜)Kšœ žœ˜!Kšœžœžœ,  ˜Išžœ˜ šœ˜Kšœ?˜?Kšœ˜Kšœ4˜4——Kšœ'˜'—šž œ˜šœ˜šœ˜Kšœ"˜"Kšœ˜Kšœ ˜ —šœ˜Kšœ˜Kšœ˜Kšœ˜———Kšœ!˜!Kšœ%˜%Kšžœžœ˜?Kšžœ˜—Kšœ˜K˜—šŸœžœžœžœ˜$Kšœžœ˜)šžœž˜$Kšœ'˜'Kšžœžœ˜—Kšœ˜K˜—šŸœžœžœ˜Kšœžœ˜)šžœž˜$Kšœžœ ˜AKšžœžœ˜—Kšžœžœžœ ˜AKšžœžœ˜(Kšœžœ˜šžœ ž˜Kšœ˜šž˜Kšœ˜Kšœ˜Kšžœžœžœ˜Kšžœ ˜Kšžœ˜—Kšžœ˜—Kšœ˜K˜—šŸœžœžœ žœ˜+Kšœžœ˜)Kšžœ žœžœžœ˜Kšžœžœžœžœ!˜CKš žœžœžœžœ(žœ˜Všžœžœžœž˜:Kšœ)˜)—Kšžœžœžœ˜1Kšœžœ˜Kšœ˜K˜K˜—šŸ œžœ˜&Kšžœžœžœ˜+šžœ˜ Kšžœ˜Kšžœ˜Kšžœ˜Kšžœ˜Kšžœ˜—Kšœ˜K˜!Kšœžœ˜K˜K˜—š Ÿœžœžœžœžœ˜PKšœ žœ ž˜Kšœžœ˜Kšžœ˜K˜K˜—š Ÿ œžœžœžœžœ˜0Kšœžœ˜)šžœ2ž˜9Kšœ˜K˜K˜Kšžœ˜—Kšœ0žœ˜5K˜K˜—šŸ œž œžœžœ˜0Kšœžœ˜)šžœ4ž˜;Kšœ˜Kšœ˜Kšžœ˜—Kšœ0žœ˜5K˜K˜——™ šŸœžœžœ˜Kšœ˜Kšœ˜Kšœ˜Kšœ˜Kšœ žœžœžœ˜Kšœžœ ˜AKšœ#žœ ˜BKšžœ˜ Kšžœžœžœ˜=šœ žœ˜Kšœ ˜ Kšœ˜Kšœ˜Kšœ˜Kšœ˜Kšœ˜Kšœ˜Kšœžœ˜—Kšœžœ˜(Kšœ˜K˜—šŸœžœžœ˜5Kšžœžœžœžœ˜%šžœ˜ Kšœ˜Kšžœ˜—Kšœžœ˜Kšœ˜K˜—šŸœžœ˜%šœ(˜(Kšœ˜Kšœ˜Kšœ˜Kšœ#˜#—šžœžœžœžœ˜Kšœ,˜,K˜šŸœžœ)ž˜EKšœ^™^Kšœ7 ˜NKšœžœ ˜=Kšœ*˜*Kšžœ ˜—šžœž˜šœ ˜ Kšœžœžœ˜Kšžœžœžœ8˜YJšœ˜šžœžœžœ˜K˜Kšœ˜Kšœ˜Kšœ ˜ —Jšœ-˜-—šœ˜Kšœžœžœ˜Kšœ˜šžœ%žœ žœž˜=Kšžœžœžœ˜$Kšœžœm™uKšžœžœ˜2Kšœ,žœ˜1Kšžœ ˜Kšžœ˜—Kšœž™Kšžœžœžœ4˜Qšžœžœžœ  ˜K˜Kšœ˜Kšœ˜Kšœ˜Kšœ*˜*Kšžœ˜—Kšœ™Kšœ!žœ,˜PKšœ H˜^Kšœ˜Kšžœžœ*˜GKšžœžœ˜—Kšžœžœ˜—Kšœ˜Kšžœ˜—Kšœ˜Kšœ˜K˜—šŸ œžœ>˜NKšœ™Kšœ™Kšœ˜KšœE˜EK˜——™ šŸœžœžœžœ˜Pšœ+˜+Kšœ˜Kšœ(˜(Kšœ*˜*Kšœ˜—Kšœ žœžœ˜:Kšœ˜K˜—šŸ œžœžœžœ˜MK˜2K˜K˜—šŸ œžœžœžœ˜OKšœ4˜4K˜K˜—š Ÿœžœžœžœžœ˜LKšœžœ˜)Kšœ žœžœ˜@K˜K˜—KšŸœž œžœ˜(š ŸœžœžœGžœžœ˜yKšœ ˜ Kšžœžœžœ˜.Kšœžœ˜Kšœ-˜-šž˜Kšœ,˜,šžœžœžœžœ6˜KKšÏb™—šžœž˜šœ˜Kšœ˜Kšœ ˜ Kšœ-˜-KšœD˜DKšœ˜Kšœ& ˜:Kšœ˜Kšœ˜Kšœ˜Kšžœ˜"—Kšžœžœ˜—Kšœ˜Kšžœ˜—K˜K˜—š Ÿœžœžœ:žœžœ˜mKšœ˜Kšžœžœžœ˜.Kšœžœ˜Kšœ^˜^K˜Kšœ˜Kšœ˜Kšžœ˜K˜——™šŸ œžœ˜Kšœ žœ9˜^Kšžœ˜Kšœžœžœ˜Kšœ žœ ˜šžœ˜ KšœC˜CKšœ*˜*šžœ$žœ˜+KšœJ˜J—Kšžœ/˜3šžœ$žœ˜+KšœH˜H—Kšžœ-˜1Kšœ)˜)šžœ$žœ˜+KšœG˜G—Kšžœ,˜0KšœE˜EKšœ-˜-Kšœ.˜.KšœC˜CKšœ=˜=Kšœ>˜>—Kšžœ žœžœ˜4šžœ+˜/Kšœ˜Kšœ(˜(Kšœ*˜*Kšœ ˜/—Kšœ:˜:šžœž˜ Kšœ9žœžœ˜G—Kšœ˜Kšœ-˜-Kšœ6˜6Kšœ8˜8Kšœ:˜:Kšœ˜K˜—šŸ œžœ˜&KšœH˜HKšœžœ˜2Kšœžœ˜#Kšœžœ˜#Kšœžœ˜#Kšœžœ˜"Kšœ˜Kšœ'˜'Kšœ˜K˜—šŸ œžœ˜'Kšœ 2˜HKšžœžœ (˜\šžœžœžœ ž˜Kšžœžœžœ˜'Kšžœžœžœžœ˜K˜Kšœ˜Kšžœ˜—Kšžœžœžœ ˜Bšžœž˜!KšœK˜K—Kšœ˜K˜—šŸœžœžœžœ˜:Kšžœžœ=˜FKšœ˜K˜—šŸœžœ˜)Kšœ˜Kšœ,˜,Kšœ˜šžœžœžœ ž˜Kšœ˜K˜#Kšœžœ˜1Kšœ7™7šžœ&ž˜-K˜"Kšžœžœžœžœ˜šžœž˜˜Kšœ˜Kšœ2˜2Kšœ˜Kšžœ˜ —K˜K˜Kšžœžœ˜—Kšœ˜Kšœ˜Kšžœ˜—Kšœ˜Kšžœ˜—KšœI˜Išœ˜Kš¢™—Kšœ˜K˜—šŸœžœ˜*Kšœ˜Kšžœ ž˜ šžœžœžœ ž˜Kšœ˜Kšœ˜Kšžœžœžœžœ˜ Kšžœ˜—Kšœ(˜(Kšœ˜K˜——™ šŸ œžœžœžœ˜WKšžœ,˜2—šŸœžœ žœžœžœžœžœžœ˜GKšžœžœ žœ˜$—šŸœžœ žœžœžœžœžœ˜GK™3Kšœžœ žœ™/Kšžœžœ8˜G—šŸœžœžœ˜Hšžœž˜Kšœžœ˜Kšœžœžœžœ˜3Kšžœžœ"˜3—K˜——™ šŸœžœ˜!Kšœ˜Kšœžœ˜Kšœ?˜?Kšœ0˜0Kšžœžœ=˜Sšžœžœž˜Kšœ"˜"šžœžœžœ ˜-šžœ7žœ˜?Kšœ ˜ Kšœ/˜/—šžœ˜šžœ;žœ˜CK™ÃKšœ˜Kšœžœ˜—šžœ ž˜KšœL˜L——Kšžœ˜—Kšœ˜Kšœ˜Kšžœ˜—Kšœ+žœžœ˜5šžœžœ ˜Bšž˜Kšœ"˜"Kšžœžœžœžœ˜šžœž˜Kšœ˜Kšœ%žœ˜-Kšžœžœ˜—Kšœ˜Kšžœ˜ ——Kšœ žœ ˜#Kšœ˜K˜—š Ÿœžœ#žœžœžœ ˜MKšœžœ˜šžœž˜Kšœ˜˜Kšœ-˜-Kšœ˜Kšžœ žœ"˜3šžœžœžœ˜&Kšœžœ˜*KšœM˜Mšžœžœž˜Kšœ'˜'Kšœ$˜$Kšœ/˜/Kšžœžœ˜0———Kšœ˜Kšœ!˜!Kšœ˜K˜Kšœ˜K˜!Kšœžœ˜ Kšžœ&žœ˜2—Kšžœ˜ Kšœ˜K˜—šŸ œžœžœ/žœ˜_Kšœžœ1˜šœ˜Kšœ#˜#Kšžœžœžœ˜$K˜K˜Kšœžœ˜.K˜K˜Kšœ&˜&Kšœ<žœ˜AKšœ ˜ Kšžœ˜Kšœ9˜9—Kšžœžœ˜—K˜K˜—šŸœžœ ˜,Kšžœžœžœ˜*Kšžœžœžœ˜(Kšœ2žœžœ˜Kšžœžœžœ ˜QKšœžœ˜Kšœ*žœ˜/šžœžœžœ -˜GKšœ˜Kšžœžœžœ˜'Kšžœ$žœžœžœ˜4K˜Kšžœžœžœ˜'Kšžœ$žœžœžœ˜4Kšœžœ žœžœ˜:Kšœ2˜2Kšœ,žœ˜1Kšœ3˜3Kšœ  ˜Kšžœ˜—Kšžœ˜—Kšœ žœ ˜#Kšœ˜K˜——™šŸœžœ˜)šž˜Kšœ˜K˜Kšœ ˜7Kšžœžœžœ˜-Kšžœ ˜Kšžœ˜—Kšœ˜K˜—šŸœžœžœžœ˜;Kšžœžœžœ˜Kšžœžœžœ˜-Kšžœ˜Kšœ˜K˜—šŸœžœžœžœ˜JKšœ˜K˜Kšœ˜Kšœ˜Kšœ<žœ˜AKšœ˜K˜—šŸœžœ˜*šž˜Kšœ˜Kšœ ˜7Kšžœ"žœžœ˜0Kšžœ ˜Kšžœ˜—Kšœ˜K˜—šŸœžœžœžœ˜