<> <> <> <> <> <> DIRECTORY Basics USING [bytesPerWord, UnsafeBlock], BasicTime USING [GetClockPulses, MicrosecondsToPulses, Pulses], Booting USING [RegisterProcs, RollbackProc], CommDriver USING [sendPriority], Endian USING [CardFromF, FFromCard, FWORD], IO USING [Close, CreateStream, CreateStreamProcs, EndOfStream, STREAM, StreamProcs], PrincOpsUtils USING [ByteBlt, LongCopy], Process USING [Abort, Detach, DisableTimeout, EnableAborts, MsecToTicks, Pause, priorityBackground, priorityForeground, SecondsToTicks, SetPriority, SetTimeout, Ticks], Pup USING [Address, nullAddress, Socket], PupBuffer USING [Buffer, BufferObject, ByteIndex, maxDataBytes, maxNewGatewayBytes], PupHop USING [GetHop, Hop], PupName USING [IsWellKnown], PupSocket USING [AllocBuffer, AppendRope, CreateServer, CreateEphemeral, Destroy, ExtractAbortRope, ExtractErrorRope, FixupSourceAndDest, FreeBuffer, Get, GetLocalAddress, GetRemoteAddress, GetUserBytes, GetUniqueID, Kick, Put, SetGetTimeout, SetRemoteAddress, SetUserBytes, SetUserHWords, SetUserSize, Socket, waitForever], PupSocketBackdoor USING [AllocRecvBuffer, PutAgain, PutFirst, ReturnToSenderNoFree, SetDirectReceive, SetSoftwareChecksumming], PupStream USING [CloseReason, FilterProc, ListenerProc, MARK, Milliseconds, waitForever], PupStreamBackdoor USING [], PupType USING [Type], Rope USING [ROPE], SafeStorage USING [EnableFinalization, EstablishFinalization, FinalizationQueue, FQNext, NewFQ]; PupStreamImpl: CEDAR MONITOR LOCKS handle USING handle: Handle IMPORTS BasicTime, Booting, Endian, IO, PrincOpsUtils, Process, PupHop, PupName, PupSocket, PupSocketBackdoor, SafeStorage EXPORTS PupStream, PupStreamBackdoor = { <> Buffer: TYPE = PupBuffer.Buffer; ByteIndex: TYPE = PupBuffer.ByteIndex; CloseReason: TYPE = PupStream.CloseReason; MARK: TYPE = PupStream.MARK; Milliseconds: TYPE = PupStream.Milliseconds; ROPE: TYPE = Rope.ROPE; Socket: TYPE = PupSocket.Socket; STREAM: TYPE = IO.STREAM; <> WordAligned: PROC [index: NAT] RETURNS [yes: BOOL] = INLINE { RETURN[(index MOD Basics.bytesPerWord) = 0]; }; <> SocketNotWellKnown: PUBLIC ERROR = CODE; Timeout: PUBLIC SIGNAL = CODE; StreamClosing: PUBLIC ERROR [why: CloseReason, text: ROPE] = CODE; <> useDirectInput: BOOL _ TRUE; useInLineCopy: BOOL _ FALSE; disableSoftwareChecksums: BOOL _ FALSE; <<>> defaultNumberOfBuffers: NAT _ 32; -- These are >1500 bytes, not single pages maxBufferSize: NAT _ maxDataBytes; SetMaxBufferSize: PUBLIC PROC [size: NAT] ~ { maxBufferSize _ size }; <> maxDataBytes: NAT = PupBuffer.maxDataBytes; -- Max bytes in a packet maxNewGatewayBytes: NAT = PupBuffer.maxNewGatewayBytes; <> streamProcs: REF IO.StreamProcs = IO.CreateStreamProcs [ variety: $inputOutput, class: $Pup, getChar: GetChar, endOf: EndOf, charsAvail: CharsAvail, <> unsafeGetBlock: UnsafeGetBlock, putChar: PutChar, <> unsafePutBlock: UnsafePutBlock, flush: Flush, close: Close ]; oneSecond: BasicTime.Pulses = BasicTime.MicrosecondsToPulses[1D6]; tenSeconds: BasicTime.Pulses = BasicTime.MicrosecondsToPulses[10D6]; retransmitPulses: BasicTime.Pulses = BasicTime.MicrosecondsToPulses[3D6]; probePulses: BasicTime.Pulses = BasicTime.MicrosecondsToPulses[1*60D6]; <> duplicateRfcReceived: LONG CARDINAL _ 0; funnyPacketType: LONG CARDINAL _ 0; acksDefered: LONG CARDINAL _ 0; oldAcks: LONG CARDINAL _ 0; oldPackets: LONG CARDINAL _ 0; futurePackets: LONG CARDINAL _ 0; clumpsRetransmitted: LONG CARDINAL _ 0; packetsRetransmitted: LONG CARDINAL _ 0; droppedStreams: LONG CARDINAL _ 0; finishedStreams: LONG CARDINAL _ 0; <> Finger: TYPE = REF FingerObject; FingerObject: TYPE = RECORD [ state: {empty, halfFull, halfEmpty, full} _ empty, index: ByteIndex _ 0, bytes: ByteIndex _ 0, next: Finger _ NIL, mark: BOOL _ FALSE, ack: BOOL _ FALSE, inProgress: BOOL _ FALSE, buffer: Buffer ]; Handle: TYPE = REF Object; Object: TYPE = MONITORED RECORD [ outputWrite, outputRead, outputToAck: Finger _ NIL, inputWrite, inputRead: Finger _ NIL, socket: Socket _ NIL, remote: Pup.Address _ NULL, connectionId: Endian.FWORD, outputFlushed: CONDITION, outputSpace: CONDITION, outputReady: CONDITION, retransmitter, push1, push2, push3: PROCESS, pushId: LONG CARDINAL, ackedId: LONG CARDINAL, pushPackets: CARDINAL _ 0, pushBytes: CARDINAL _ 0, pushPacketSize: CARDINAL _ 0, outputBuffersToUse: CARDINAL _ 0, outputBuffersAllocated: CARDINAL _ 0, outputBuffersReady: CARDINAL _ 0, outputBuffersInFlight: CARDINAL _ 0, timeOfLastPush: BasicTime.Pulses _ BasicTime.GetClockPulses[], retransmitting: BOOL _ FALSE, flush: BOOL _ FALSE, inputReady: CONDITION, pull: PROCESS, pullId: LONG CARDINAL, pullPackets: CARDINAL _ 0, pullBytes: CARDINAL _ 0, inputBuffersToUse: CARDINAL _ 0, inputBuffersAllocated: CARDINAL _ 0, inputBuffersAvailable: CARDINAL _ 0, timeOfLastArrival: BasicTime.Pulses _ BasicTime.GetClockPulses[], stateChange: CONDITION, alloc: CONDITION, ack: CONDITION, allocNeeded: BOOL _ FALSE, <> sendAttenIdSent: LONG CARDINAL, sendAttenIdAcked: LONG CARDINAL, sendAttention: CONDITION, -- ABORTs NOT ENABLED recvAttenIdArrived: LONG CARDINAL, recvAttenIdSeen: LONG CARDINAL, recvAttention: CONDITION, err: ROPE _ NIL, -- # NIL => StreamClosed why: CloseReason _ NULL, bufferSize: NAT, next: Handle _ NIL, dead: BOOL _ FALSE ]; Listener: TYPE = REF ListenerRep; ListenerRep: PUBLIC TYPE = RECORD [ local: Pup.Socket, worker: PupStream.ListenerProc, clientData: REF ANY, getTimeout: Milliseconds, putTimeout: Milliseconds, filter: PupStream.FilterProc, -- NIL => Accept all requests echoFilter: PupStream.FilterProc, -- NIL => Answer all echos listen: PROCESS ]; Sockets: TYPE = REF SocketsRep; SocketsRep: PUBLIC TYPE = RECORD [ used: BOOL, socket: Socket ]; <> Create: PUBLIC PROC [remote: Pup.Address, getTimeout, putTimeout: Milliseconds] RETURNS [STREAM] = { handle: Handle _ NewObject[remote, PupSocket.GetUniqueID[], NIL, getTimeout, putTimeout]; OpenConnection[handle]; SetupObject[handle]; ExchangeAcks[handle]; RETURN[HandleToStream[handle]]; }; Abort: PUBLIC PROC [self: STREAM, err: ROPE] = { handle: Handle = NARROW[self.streamData]; IF handle.err # NIL THEN RETURN; SendAbort[handle, err]; }; SendMark: PUBLIC PROC [self: STREAM, mark: MARK] = { handle: Handle = NARROW[self.streamData]; finger: Finger; DO CheckForClosed[handle]; finger _ handle.outputWrite; SELECT finger.state FROM empty => { finger.state _ halfFull; finger.mark _ TRUE; finger.index _ 1; finger.bytes _ handle.pushPacketSize; EXIT; }; halfFull => FinishOutputFinger[handle]; ENDCASE => WaitOutputSpace[handle]; ENDLOOP; finger.buffer.byte[0] _ mark; <> FinishOutputFinger[handle]; }; ConsumeMark: PUBLIC PROC [self: STREAM] RETURNS [mark: MARK] = { handle: Handle = NARROW[self.streamData]; finger: Finger _ handle.inputRead; DO SELECT finger.state FROM full => { finger.state _ halfEmpty; finger.index _ 0; }; halfEmpty => NULL; ENDCASE => WaitInputReady[handle]; IF finger.mark THEN EXIT; finger _ NotifyInputSpace[handle]; MaybeSendAck[handle]; ENDLOOP; mark _ finger.buffer.byte[0]; [] _ NotifyInputSpace[handle]; MaybeSendAck[handle]; }; GetChar: PROC [self: STREAM] RETURNS [char: CHAR] = { handle: Handle = NARROW[self.streamData]; finger: Finger _ handle.inputRead; index: ByteIndex; DO SELECT finger.state FROM full => { finger.state _ halfEmpty; finger.index _ 0; EXIT; }; halfEmpty => EXIT; ENDCASE => WaitInputReady[handle]; ENDLOOP; IF finger.mark THEN ERROR IO.EndOfStream[self]; index _ finger.index; char _ finger.buffer.char[index]; index _ index.SUCC; finger.index _ index; IF finger.index = finger.bytes THEN { [] _ NotifyInputSpace[handle]; MaybeSendAck[handle]; }; }; EndOf: PROC [self: STREAM] RETURNS [BOOL] = { handle: Handle = NARROW[self.streamData]; finger: Finger _ handle.inputRead; SELECT finger.state FROM empty => NULL; halfEmpty, full => RETURN[finger.mark]; ENDCASE => NULL; CheckForClosed[handle]; RETURN[FALSE]; }; CharsAvail: PROC [self: STREAM, wait: BOOL] RETURNS [INT] = { handle: Handle = NARROW[self.streamData]; finger: Finger _ handle.inputRead; DO SELECT finger.state FROM full => { finger.state _ halfEmpty; finger.index _ 0; EXIT; }; halfEmpty => EXIT; ENDCASE => IF wait THEN WaitInputReady[handle] ELSE RETURN[0]; ENDLOOP; IF finger.mark THEN RETURN[INT.LAST]; RETURN[finger.bytes-finger.index]; }; UnsafeGetBlock: PROC [self: STREAM, block: Basics.UnsafeBlock] RETURNS [nBytesRead: INT _ 0] = { handle: Handle = NARROW[self.streamData]; finger: Finger _ handle.inputRead; base: LONG POINTER = LOOPHOLE[block.base]; startIndex: INT _ block.startIndex; stop: INT = block.startIndex + block.count; nBytes: NAT; WHILE startIndex < stop DO DO SELECT finger.state FROM full => { finger.state _ halfEmpty; finger.index _ 0; EXIT; }; halfEmpty => EXIT; ENDCASE => { IF handle.err # NIL AND nBytesRead # 0 THEN RETURN; <> WaitInputReady[handle]; }; ENDLOOP; IF finger.mark THEN RETURN; -- EOF IF useInLineCopy AND WordAligned[startIndex] AND WordAligned[stop] AND WordAligned[finger.index] AND WordAligned[finger.bytes] THEN { <> toBytes: NAT = stop-startIndex; fromBytes: NAT = finger.bytes-finger.index; words: NAT = MIN[toBytes, fromBytes] / Basics.bytesPerWord; -- Round down TRUSTED { PrincOpsUtils.LongCopy[ to: base + startIndex / Basics.bytesPerWord, nwords: words, from: @finger.buffer.byte + finger.index / Basics.bytesPerWord]; }; nBytes _ words * Basics.bytesPerWord; } ELSE TRUSTED { nBytes _ PrincOpsUtils.ByteBlt[ to: [ blockPointer: base, startIndex: startIndex, stopIndexPlusOne: stop], from: [ blockPointer: @finger.buffer.byte, startIndex: finger.index, stopIndexPlusOne: finger.bytes] ]; }; startIndex _ startIndex + nBytes; finger.index _ finger.index + nBytes; IF finger.index = finger.bytes THEN { finger _ NotifyInputSpace[handle]; MaybeSendAck[handle]; }; nBytesRead _ nBytesRead + nBytes; ENDLOOP; }; GetBlock: PROC [self: STREAM, block: REF TEXT, startIndex: NAT _ 0, count: NAT _ NAT.LAST] RETURNS [nBytesRead: NAT _ 0] = { handle: Handle = NARROW[self.streamData]; finger: Finger _ handle.inputRead; base: LONG POINTER = LOOPHOLE[block, LONG POINTER]+TEXT[0].SIZE; stop: NAT = MIN[(startIndex+count), block.maxLength]; nBytes: NAT; WHILE startIndex < stop DO DO SELECT finger.state FROM full => { finger.state _ halfEmpty; finger.index _ 0; EXIT; }; halfEmpty => EXIT; ENDCASE => { IF handle.err # NIL AND nBytesRead # 0 THEN RETURN; <> WaitInputReady[handle]; }; ENDLOOP; IF finger.mark THEN RETURN; -- EOF IF useInLineCopy AND WordAligned[startIndex] AND WordAligned[stop] AND WordAligned[finger.index] AND WordAligned[finger.bytes] THEN { <> toBytes: NAT = stop-startIndex; fromBytes: NAT = finger.bytes-finger.index; words: NAT = MIN[toBytes, fromBytes] / Basics.bytesPerWord; -- Round down TRUSTED { PrincOpsUtils.LongCopy[ to: base + startIndex / Basics.bytesPerWord, nwords: words, from: @finger.buffer.byte + finger.index / Basics.bytesPerWord]; }; nBytes _ words * Basics.bytesPerWord; } ELSE TRUSTED { nBytes _ PrincOpsUtils.ByteBlt[ to: [ blockPointer: base, startIndex: startIndex, stopIndexPlusOne: stop], from: [ blockPointer: @finger.buffer.byte, startIndex: finger.index, stopIndexPlusOne: finger.bytes] ]; }; startIndex _ startIndex + nBytes; block.length _ startIndex; finger.index _ finger.index + nBytes; IF finger.index = finger.bytes THEN { finger _ NotifyInputSpace[handle]; MaybeSendAck[handle]; }; nBytesRead _ nBytesRead + nBytes; ENDLOOP; }; PutChar: PROC [self: STREAM, char: CHAR] = { handle: Handle = NARROW[self.streamData]; finger: Finger _ handle.outputWrite; index: ByteIndex; DO CheckForClosed[handle]; SELECT finger.state FROM empty => { finger.state _ halfFull; finger.mark _ FALSE; finger.index _ 0; finger.bytes _ handle.pushPacketSize; EXIT; }; halfFull => EXIT; ENDCASE => WaitOutputSpace[handle]; ENDLOOP; index _ finger.index; finger.buffer.char[finger.index] _ char; index _ index.SUCC; finger.index _ index; IF finger.index = finger.bytes THEN FinishOutputFinger[handle]; }; UnsafePutBlock: PROC [self: STREAM, block: Basics.UnsafeBlock] = { handle: Handle = NARROW[self.streamData]; base: LONG POINTER = LOOPHOLE[block.base]; startIndex: INT _ block.startIndex; stop: INT = block.startIndex + block.count; nBytes: INT; WHILE startIndex < stop DO finger: Finger _ handle.outputWrite; DO CheckForClosed[handle]; SELECT finger.state FROM empty => { finger.state _ halfFull; finger.mark _ FALSE; finger.index _ 0; finger.bytes _ handle.pushPacketSize; EXIT; }; halfFull => EXIT; ENDCASE => WaitOutputSpace[handle]; ENDLOOP; IF useInLineCopy AND WordAligned[startIndex] AND WordAligned[stop] AND WordAligned[finger.index] AND WordAligned[finger.bytes] THEN { <> toBytes: NAT = finger.bytes-finger.index; fromBytes: NAT = stop-startIndex; words: NAT = MIN[toBytes, fromBytes] / Basics.bytesPerWord; -- Round down TRUSTED { PrincOpsUtils.LongCopy[ to: @finger.buffer.byte + (finger.index / Basics.bytesPerWord), nwords: words, from: base + (startIndex / Basics.bytesPerWord)]; }; nBytes _ words * Basics.bytesPerWord; } ELSE TRUSTED { nBytes _ PrincOpsUtils.ByteBlt[ to: [ blockPointer: @finger.buffer.byte, startIndex: finger.index, stopIndexPlusOne: finger.bytes], from: [ blockPointer: base, startIndex: startIndex, stopIndexPlusOne: stop] ]; }; startIndex _ startIndex + nBytes; finger.index _ finger.index + nBytes; IF finger.index = finger.bytes THEN FinishOutputFinger[handle]; ENDLOOP; }; PutBlock: PROC [self: STREAM, block: REF READONLY TEXT, startIndex: NAT _ 0, count: NAT _ NAT.LAST] = { handle: Handle = NARROW[self.streamData]; base: LONG POINTER = LOOPHOLE[block, LONG POINTER]+TEXT[0].SIZE; stop: NAT = MIN[(startIndex+count), block.length]; nBytes: NAT; WHILE startIndex < stop DO finger: Finger _ handle.outputWrite; DO CheckForClosed[handle]; SELECT finger.state FROM empty => { finger.state _ halfFull; finger.mark _ FALSE; finger.index _ 0; finger.bytes _ handle.pushPacketSize; EXIT; }; halfFull => EXIT; ENDCASE => WaitOutputSpace[handle]; ENDLOOP; IF useInLineCopy AND WordAligned[startIndex] AND WordAligned[stop] AND WordAligned[finger.index] AND WordAligned[finger.bytes] THEN { <> toBytes: NAT = finger.bytes-finger.index; fromBytes: NAT = stop-startIndex; words: NAT = MIN[toBytes, fromBytes] / Basics.bytesPerWord; -- Round down TRUSTED { PrincOpsUtils.LongCopy[ to: @finger.buffer.byte + (finger.index / Basics.bytesPerWord), nwords: words, from: base + (startIndex / Basics.bytesPerWord)]; }; nBytes _ words * Basics.bytesPerWord; } ELSE TRUSTED { nBytes _ PrincOpsUtils.ByteBlt[ to: [ blockPointer: @finger.buffer.byte, startIndex: finger.index, stopIndexPlusOne: finger.bytes], from: [ blockPointer: base, startIndex: startIndex, stopIndexPlusOne: stop] ]; }; startIndex _ startIndex + nBytes; finger.index _ finger.index + nBytes; IF finger.index = finger.bytes THEN FinishOutputFinger[handle]; ENDLOOP; }; Push: PUBLIC PROC [self: STREAM] = { handle: Handle = NARROW[self.streamData]; SELECT handle.outputWrite.state FROM halfFull => FinishOutputFinger[handle]; ENDCASE => NULL; }; Flush: PROC [self: STREAM] = { handle: Handle = NARROW[self.streamData]; SELECT handle.outputWrite.state FROM halfFull => { handle.flush _ TRUE; FinishOutputFinger[handle]; }; ENDCASE => NULL; IF handle.outputBuffersReady = 0 THEN RETURN; -- Skip closed test IF ~handle.flush THEN SendProbe[handle]; handle.flush _ TRUE; WHILE handle.flush DO CheckForClosed[handle]; DO WaitOutputFlushed[handle]; CheckForClosed[handle]; IF ~handle.flush THEN RETURN; SIGNAL Timeout; ENDLOOP; ENDLOOP; }; Close: PROC [self: STREAM, abort: BOOL] = { handle: Handle = NARROW[self.streamData]; IF handle = NIL THEN RETURN; IF abort AND handle.err = NIL THEN Abort[self, "Close with Abort"]; IF ~abort AND handle.err = NIL THEN Flush[self ! StreamClosing, Timeout => CONTINUE ]; IF handle.outputBuffersReady # 0 AND handle.err = NIL THEN Abort[self, "Close with unFlushed data"]; IF handle.err = NIL THEN CloseConnection[handle]; self.streamData _ NIL; CloseHandle[handle]; }; CloseHandle: PROC [handle: Handle] = { IF CloseHandleInternal[handle] THEN RETURN; TRUSTED { JOIN handle.retransmitter; JOIN handle.push1; JOIN handle.push2; JOIN handle.push3; JOIN handle.pull; }; FreeBuffers[handle]; PupSocket.Destroy[handle.socket]; handle.socket _ NIL; }; CloseHandleInternal: ENTRY PROC [handle: Handle] RETURNS [alreadyDead: BOOL] = { alreadyDead _ handle.dead; handle.dead _ TRUE; NOTIFY handle.outputReady; }; SendAttention: PUBLIC PROC [self: IO.STREAM] = { handle: Handle = NARROW[self.streamData]; WHILE handle.sendAttenIdSent = handle.sendAttenIdAcked DO CheckForClosed[handle]; SendInt[handle]; WaitSendAttention[handle]; ENDLOOP; handle.sendAttenIdSent _ handle.sendAttenIdSent.SUCC; }; WaitAttention: PUBLIC PROC [self: IO.STREAM] = { handle: Handle = NARROW[self.streamData]; WHILE handle.recvAttenIdArrived = handle.recvAttenIdSeen DO CheckForClosed[handle]; WaitRecvAttention[handle]; ENDLOOP; handle.recvAttenIdSeen _ handle.recvAttenIdSeen.SUCC; }; <> CreateListener: PUBLIC PROC [ local: Pup.Socket, worker: PupStream.ListenerProc, getTimeout: Milliseconds, putTimeout: Milliseconds, clientData: REF ANY _ NIL, filter: PupStream.FilterProc _ NIL, -- NIL => Accept all requests echoFilter: PupStream.FilterProc _ NIL] -- NIL => Answer all echos RETURNS [listener: Listener] = { IF ~PupName.IsWellKnown[local] THEN ERROR SocketNotWellKnown; listener _ NEW[ListenerRep _ [ local: local, worker: worker, clientData: clientData, getTimeout: getTimeout, putTimeout: putTimeout, filter: filter, echoFilter: echoFilter, listen: NIL ]]; listener.listen _ FORK Listen[listener]; }; DestroyListener: PUBLIC PROC [listener: Listener] = { IF listener.listen = NIL THEN RETURN; TRUSTED { Process.Abort[listener.listen]; JOIN listener.listen; }; listener.listen _ NIL; }; Listen: PROC [listener: Listener] = { socket: Socket _ PupSocket.CreateServer[ local: listener.local, sendBuffers: 0, recvBuffers: 2, getTimeout: PupSocket.waitForever]; DO ENABLE ABORTED => EXIT; b: PupBuffer.Buffer _ PupSocket.Get[socket]; remote: Pup.Address _ b.source; LocalSendRFCReply: PROC [handle: Handle, b: PupBuffer.Buffer] = BEGIN <> b.address _ PupSocket.GetLocalAddress[handle.socket]; -- The ephemeral socket PupSocket.SetUserSize[b, SIZE[Pup.Address]]; -- Just because. PupSocketBackdoor.ReturnToSenderNoFree[b]; END; -- LocalSendRFCReply SELECT b.type FROM echoMe => { err: ROPE _ NIL; IF listener.echoFilter # NIL THEN err _ listener.echoFilter[listener.clientData, remote]; b.type _ iAmEcho; IF err # NIL THEN { b.type _ abort; b.abort.code _ 0; PupSocket.SetUserHWords[b, 1]; PupSocket.AppendRope[b, err]; }; PupSocketBackdoor.ReturnToSenderNoFree[b]; }; rfc => { err: ROPE _ NIL; handle: Handle; FOR handle: Handle _ chain, handle.next UNTIL handle = NIL DO IF remote # handle.remote THEN LOOP; <> IF ~handle.dead THEN LocalSendRFCReply[handle, b]; duplicateRfcReceived _ duplicateRfcReceived.SUCC; GOTO Processed; ENDLOOP; <> IF listener.filter # NIL THEN err _ listener.filter[listener.clientData, remote]; IF err # NIL THEN { -- Reject b.type _ abort; b.abort.code _ 0; PupSocket.SetUserHWords[b, 1]; PupSocket.AppendRope[b, err]; PupSocketBackdoor.ReturnToSenderNoFree[b]; GOTO Processed; }; <> handle _ NewObject[remote, b.id, NIL, listener.getTimeout, listener.putTimeout]; SetupObject[handle]; -- Moved from ListenInit in order to rfc from the listener.local socket. LocalSendRFCReply[handle, b]; TRUSTED { Process.Detach[FORK ListenInit[listener, handle, remote]]; }; EXITS Processed => NULL; }; ENDCASE => NULL; PupSocket.FreeBuffer[b]; ENDLOOP; PupSocket.Destroy[socket]; }; ListenInit: PROC [listener: Listener, handle: Handle, remote: Pup.Address] = { <> <> ExchangeAcks[handle]; listener.worker[HandleToStream[handle], listener.clientData, remote]; }; <> AllocateSocket: PUBLIC PROC [remote: Pup.Address] RETURNS [sockets: Sockets] = { socket: Socket _ PupSocket.CreateEphemeral[ remote: remote, sendBuffers: defaultNumberOfBuffers + 2, recvBuffers: 2*defaultNumberOfBuffers + 2, getTimeout: 60000 ]; sockets _ NEW[SocketsRep _ [used: FALSE, socket: socket]]; }; LocalAddress: PUBLIC PROC [sockets: Sockets] RETURNS [local: Pup.Address] = { local _ PupSocket.GetLocalAddress[sockets.socket]; }; RemoteAddress: PUBLIC PROC [sockets: Sockets] RETURNS [remote: Pup.Address] = { remote _ PupSocket.GetRemoteAddress[sockets.socket]; }; SocketsFromStream: PUBLIC PROC [self: STREAM] RETURNS [sockets: Sockets] = { handle: Handle = NARROW[self.streamData]; sockets _ NEW[SocketsRep _ [used: TRUE, socket: handle.socket]]; }; SocketsAlreadyUsed: PUBLIC ERROR = CODE; WaitForRendezvous: PUBLIC PROC [sockets: Sockets, getTimeout, putTimeout, waitTimeout: Milliseconds] RETURNS [STREAM] = { socket: Socket _ sockets.socket; IF sockets.used THEN ERROR SocketsAlreadyUsed; sockets.used _ TRUE; PupSocket.SetGetTimeout[socket, waitTimeout]; DO b: PupBuffer.Buffer _ PupSocket.Get[socket]; IF b = NIL THEN ERROR StreamClosing[transmissionTimeout, "No RFC arrived"]; <> SELECT b.type FROM rfc => { handle: Handle; PupSocket.FixupSourceAndDest[b]; PupSocket.SetRemoteAddress[socket, b.source]; handle _ NewObject[b.source, b.id, sockets, getTimeout, putTimeout]; PupSocket.FreeBuffer[b]; PupSocket.SetGetTimeout[socket, 250]; -- also in NewObject SetupObject[handle]; SendRfc[handle]; ExchangeAcks[handle]; RETURN[HandleToStream[handle]]; }; ENDCASE => NULL; PupSocket.FreeBuffer[b]; ENDLOOP; }; ActivelyRendezvous: PUBLIC PROC [sockets: Sockets, getTimeout, putTimeout: Milliseconds] RETURNS [STREAM] = { handle: Handle; IF sockets.used THEN ERROR SocketsAlreadyUsed; sockets.used _ TRUE; handle _ NewObject[Pup.nullAddress, PupSocket.GetUniqueID[], sockets, getTimeout, putTimeout]; OpenConnection[handle]; SetupObject[handle]; ExchangeAcks[handle]; RETURN[HandleToStream[handle]]; }; <> NewObject: PROC [ remote: Pup.Address, id: Endian.FWORD, sockets: Sockets, getTimeout, putTimeout: Milliseconds] RETURNS [handle: Handle] = { temp: LONG CARDINAL; handle _ NEW[Object]; TRUSTED { Process.SetTimeout[@handle.outputReady, Process.MsecToTicks[2000]]; Process.EnableAborts[@handle.outputSpace]; IF putTimeout # PupStream.waitForever THEN Process.SetTimeout[@handle.outputFlushed, MillisecondsToTicks[putTimeout]] ELSE Process.DisableTimeout[@handle.outputFlushed]; IF putTimeout # PupStream.waitForever THEN Process.SetTimeout[@handle.outputSpace, MillisecondsToTicks[putTimeout]] ELSE Process.DisableTimeout[@handle.outputSpace]; Process.EnableAborts[@handle.inputReady]; IF getTimeout # PupStream.waitForever THEN Process.SetTimeout[@handle.inputReady, MillisecondsToTicks[getTimeout]] ELSE Process.DisableTimeout[@handle.inputReady]; Process.SetTimeout[@handle.sendAttention, Process.MsecToTicks[9000]]; Process.EnableAborts[@handle.recvAttention]; Process.DisableTimeout[@handle.recvAttention]; Process.SetTimeout[@handle.stateChange, Process.MsecToTicks[4000]]; Process.SetTimeout[@handle.alloc, Process.MsecToTicks[5000]]; Process.SetTimeout[@handle.ack, Process.MsecToTicks[6000]]; }; IF sockets # NIL THEN handle.socket _ sockets.socket ELSE handle.socket _ PupSocket.CreateEphemeral[ remote: remote, sendBuffers: defaultNumberOfBuffers + 2, recvBuffers: 2*defaultNumberOfBuffers + 2, getTimeout: 250 ]; -- also in WaitForRendezvous handle.remote _ PupSocket.GetRemoteAddress[handle.socket]; IF disableSoftwareChecksums THEN PupSocketBackdoor.SetSoftwareChecksumming[handle.socket, FALSE, FALSE]; handle.connectionId _ id; temp _ Endian.CardFromF[handle.connectionId]; handle.pullId _ handle.pushId _ handle.ackedId _ temp; handle.sendAttenIdSent _ handle.sendAttenIdAcked _ temp; handle.recvAttenIdArrived _ handle.recvAttenIdSeen _ temp; }; SetupObject: PROC [handle: Handle] = { AllocateBuffers[handle, defaultNumberOfBuffers, defaultNumberOfBuffers]; handle.retransmitter _ FORK Retransmitter[handle]; handle.push1 _ FORK Pusher[handle]; handle.push2 _ FORK Pusher[handle]; handle.push3 _ FORK Pusher[handle]; handle.pull _ FORK Puller[handle]; AddHandle[globalLock, handle]; SafeStorage.EnableFinalization[handle]; }; ExchangeAcks: PROC [handle: Handle] = { SendAckFirst[handle]; -- First ack for free (IFS doesn't play this game) IF handle.pushPacketSize = 0 THEN Process.Pause[2]; -- Dally (a wee bit) in case of free ack FOR i: NAT IN [0..10) DO IF handle.pushPacketSize # 0 THEN EXIT; IF handle.err # NIL THEN EXIT; SendProbe[handle]; WaitForAllocation[handle]; ENDLOOP; IF handle.err = NIL THEN WaitForAllocation[handle]; -- NOTIFY only IF handle.pushPacketSize = 0 THEN SmashClosed[handle, transmissionTimeout, "No allocate (need packet size)"]; }; HandleToStream: PROC [handle: Handle] RETURNS [STREAM] = { RETURN[IO.CreateStream[streamProcs: streamProcs, streamData: handle]]; }; OpenConnection: PROC [handle: Handle] = { socket: Socket _ handle.socket; retransmisson: BasicTime.Pulses _ oneSecond; start: BasicTime.Pulses; FOR i: NAT IN [0..10) DO SendRfc[handle]; start _ BasicTime.GetClockPulses[]; retransmisson _ MIN[retransmisson*2, tenSeconds]; <<2+4+8+10+10+10+10+10+10+10 => 84 seconds before timeout>> UNTIL ElapsedPulses[start] > retransmisson DO b: Buffer _ PupSocket.Get[socket]; IF b = NIL THEN LOOP; SELECT b.type FROM rfc => { handle.remote _ b.address; PupSocket.SetRemoteAddress[socket, handle.remote]; PupSocket.FreeBuffer[b]; RETURN; }; error => GotError[handle, b]; abort => GotAbort[handle, b]; ENDCASE => NULL; PupSocket.FreeBuffer[b]; CheckForClosed[handle]; ENDLOOP; CheckForClosed[handle]; ENDLOOP; SmashClosed[handle, transmissionTimeout, "No response from remote site"]; CheckForClosed[handle]; <> }; CloseConnection: PROC [handle: Handle] = { socket: Socket _ handle.socket; IF handle.err # NIL THEN RETURN; FOR i: NAT IN [0..10) DO SendEnd[handle]; WaitStateChange[handle]; IF handle.err # NIL THEN RETURN; ENDLOOP; SendAbort[handle, "No Response to end"]; }; <> ElapsedPulses: PROC [startTime: BasicTime.Pulses] RETURNS [BasicTime.Pulses] = INLINE { RETURN[BasicTime.GetClockPulses[] - startTime]; }; Offset: PROC [to, from: LONG CARDINAL] RETURNS [INT] = TRUSTED INLINE { RETURN[LOOPHOLE[(to-from), INT]]; }; EnoughWordsForBytes: PROC [bytes: NAT] RETURNS [NAT] = TRUSTED INLINE { <> <> RETURN[CARDINAL[(bytes+Basics.bytesPerWord-1)]/Basics.bytesPerWord]; }; MillisecondsToTicks: PROC [ms: Milliseconds] RETURNS [Process.Ticks] = { SELECT ms FROM PupStream.waitForever => ERROR; < CARDINAL.LAST => RETURN[Process.MsecToTicks[ms]]; ENDCASE => RETURN[Process.SecondsToTicks[ms/1000]]; }; <> Puller: PROC [handle: Handle] = { socket: Socket _ handle.socket; probes: CARDINAL _ 0; timeOfLastArrival: BasicTime.Pulses _ handle.timeOfLastArrival; Process.SetPriority[Process.priorityForeground]; IF useDirectInput THEN PupSocketBackdoor.SetDirectReceive[socket, Receive, handle]; UNTIL handle.err # NIL DO b: Buffer _ PupSocket.Get[socket]; IF b = NIL THEN { -- Nothing arrived recently IF ElapsedPulses[handle.timeOfLastArrival] < probePulses THEN { probes _ 0; timeOfLastArrival _ handle.timeOfLastArrival; } ELSE { IF ElapsedPulses[timeOfLastArrival] > (probes+1)*probePulses THEN { <> SendProbe[handle]; probes _ probes.SUCC; }; IF probes > 10 THEN SmashClosed[handle, transmissionTimeout, "no response from remote host"]; }; LOOP; }; b _ Receive[socket, b, handle]; PupSocket.FreeBuffer[b]; ENDLOOP; PupSocketBackdoor.SetDirectReceive[socket, NIL, NIL]; IF handle.why = remoteClose THEN { -- Dally in case he retransmits DO b: Buffer _ PupSocket.Get[socket]; IF b = NIL THEN EXIT; SELECT b.type FROM end => GotEnd[handle, b]; endRep => { PupSocket.FreeBuffer[b]; EXIT; }; ENDCASE => NULL; PupSocket.FreeBuffer[b]; ENDLOOP; }; handle _ NIL; -- Allow Finalization }; Receive: PROC [socket: Socket, b: Buffer, user: REF ANY] RETURNS [Buffer] = { handle: Handle = NARROW[user]; SELECT b.type FROM ack => ProcessAck[handle, b]; data, aData, mark, aMark => { bytes: ByteIndex = PupSocket.GetUserBytes[b]; type: PupType.Type _ b.type; IF bytes # 0 THEN b _ DataPacket[handle, b, bytes]; IF type = aData OR type = aMark THEN { oldAllocNeeded: BOOL _ handle.allocNeeded; handle.allocNeeded _ handle.inputBuffersAvailable < handle.inputBuffersToUse; SELECT TRUE FROM oldAllocNeeded => SendAckFirst[handle]; (bytes = 0) => SendAckFirst[handle]; ~handle.allocNeeded => SendAckAgain[handle, b]; ENDCASE => acksDefered _ acksDefered.SUCC; }; }; end => GotEnd[handle, b]; endRep => GotEndReply[handle, b]; error => GotError[handle, b]; abort => GotAbort[handle, b]; int => GotInt[handle, b]; intRep => GotIntReply[handle, b]; rfc => NULL; ENDCASE => funnyPacketType _ funnyPacketType.SUCC; RETURN[b]; }; DataPacket: ENTRY PROC [handle: Handle, b: Buffer, bytes: ByteIndex] RETURNS [swap: Buffer] = { offset: INT _ Offset[Endian.CardFromF[b.id], handle.pullId]; swap _ b; SELECT offset FROM < 0 => oldPackets _ oldPackets.SUCC; > 0 => futurePackets _ futurePackets.SUCC; -- Missed something 0 => { finger: Finger _ handle.inputWrite; IF finger.state # empty THEN RETURN; swap _ finger.buffer; finger.buffer _ b; finger.mark _ b.type = mark OR b.type = aMark; finger.bytes _ bytes; finger.state _ full; handle.pullId _ handle.pullId + bytes; handle.inputBuffersAvailable _ handle.inputBuffersAvailable.PRED; handle.inputWrite _ finger.next; NOTIFY handle.inputReady; handle.timeOfLastArrival _ BasicTime.GetClockPulses[]; }; ENDCASE => ERROR; }; GotEnd: PROC [handle: Handle, b: Buffer] = { IF b.id # handle.connectionId THEN RETURN; IF b.source # handle.remote THEN RETURN; PupSocketBackdoor.SetDirectReceive[handle.socket, NIL, NIL]; IF handle.err = NIL THEN SmashClosed[handle, remoteClose, "Remote close"]; SELECT handle.why FROM remoteClose => IF handle.outputBuffersReady # 0 THEN RETURN; -- Can't accept yet; ENDCASE; SendEndReplyFirst[handle, b]; }; GotEndReply: PROC [handle: Handle, b: Buffer] = { IF b.id # handle.connectionId THEN RETURN; IF b.source # handle.remote THEN RETURN; PupSocketBackdoor.SetDirectReceive[handle.socket, NIL, NIL]; IF handle.err # NIL THEN RETURN; SmashClosed[handle, localClose, "Local close"]; SendEndReplyFirst[handle, b]; }; GotError: PROC [handle: Handle, b: Buffer] = { IF b.source # handle.remote THEN RETURN; SELECT b.error.code FROM noSocket => SmashClosed[handle, remoteReject, PupSocket.ExtractErrorRope[b]]; resourceLimits, gatewayResourceLimits => handle.outputBuffersToUse _ MIN[1, handle.outputBuffersToUse-1]; ENDCASE => RETURN; -- Not a fatal error }; GotAbort: PROC [handle: Handle, b: Buffer] = { IF b.id # handle.connectionId THEN RETURN; IF b.source # handle.remote THEN RETURN; SmashClosed[handle, remoteReject, PupSocket.ExtractAbortRope[b]]; }; GotInt: PROC [handle: Handle, b: Buffer] = { id: LONG CARDINAL = Endian.CardFromF[b.id]; IF b.source # handle.remote THEN RETURN; IF Offset[id, handle.recvAttenIdArrived] = 1 THEN handle.recvAttenIdArrived _ id; SendIntReplyFirst[handle, b]; NotifyRecvAttention[handle]; }; GotIntReply: PROC [handle: Handle, b: Buffer] = { id: LONG CARDINAL = Endian.CardFromF[b.id]; IF b.source # handle.remote THEN RETURN; IF Offset[id, handle.sendAttenIdSent] = 1 THEN handle.sendAttenIdAcked _ id; NotifySendAttention[handle]; }; SendRfc: PROC [handle: Handle] = { socket: Socket _ handle.socket; b: Buffer _ PupSocket.AllocBuffer[socket]; b.type _ rfc; b.id _ handle.connectionId; b.address _ PupSocket.GetLocalAddress[socket]; PupSocket.SetUserSize[b, SIZE[Pup.Address]]; PupSocketBackdoor.PutFirst[socket, b]; PupSocket.FreeBuffer[b]; }; SendAbort: PROC [handle: Handle, err: ROPE] = { socket: Socket _ handle.socket; b: Buffer _ PupSocket.AllocBuffer[socket]; b.type _ abort; b.id _ handle.connectionId; b.abort.code _ 0; PupSocket.SetUserHWords[b, 1]; PupSocket.AppendRope[b, err]; PupSocket.Put[socket, b]; IF handle.err # NIL THEN RETURN; SmashClosed[handle, localAbort, err]; }; MaybeSendAck: PROC [handle: Handle] = { socket: Socket; b: Buffer; IF ~handle.allocNeeded THEN RETURN; IF handle.inputBuffersAvailable < handle.inputBuffersToUse THEN RETURN; socket _ handle.socket; b _ PupSocket.AllocBuffer[socket]; SendAckAgain[handle, b]; PupSocket.FreeBuffer[b]; }; SendAckFirst: PROC [handle: Handle] = { socket: Socket _ handle.socket; b: Buffer _ PupSocket.AllocBuffer[socket]; bytes: INT = handle.inputBuffersAvailable*LONG[handle.bufferSize]; -- Overflow maxBytes: NAT = 32000; -- ARG! IFS gets confused by CARDINAL.LAST handle.allocNeeded _ handle.inputBuffersAvailable < handle.inputBuffersToUse; b.type _ ack; b.id _ Endian.FFromCard[handle.pullId]; TRUSTED { b.body _ ack[ maxBytesPerPup: handle.bufferSize, maxPupsAhead: handle.inputBuffersAvailable, maxBytesAhead: CARDINAL[MIN[bytes, maxBytes]] ]; }; PupSocket.SetUserHWords[b, 3]; PupSocketBackdoor.PutFirst[socket, b]; PupSocket.FreeBuffer[b]; }; SendAckAgain: PROC [handle: Handle, b: Buffer] = { socket: Socket _ handle.socket; bytes: INT = handle.inputBuffersAvailable*LONG[handle.bufferSize]; -- Overflow maxBytes: NAT = 32000; -- ARG! IFS gets confused by CARDINAL.LAST handle.allocNeeded _ handle.inputBuffersAvailable < handle.inputBuffersToUse; b.type _ ack; b.id _ Endian.FFromCard[handle.pullId]; TRUSTED { b.body _ ack[ maxBytesPerPup: handle.bufferSize, maxPupsAhead: handle.inputBuffersAvailable, maxBytesAhead: CARDINAL[MIN[bytes, maxBytes]] ]; }; PupSocket.SetUserHWords[b, 3]; PupSocketBackdoor.PutAgain[socket, b]; }; SendProbe: PROC [handle: Handle] = { socket: Socket _ handle.socket; b: Buffer _ PupSocket.AllocBuffer[socket]; b.type _ aData; b.id _ Endian.FFromCard[handle.pushId]; PupSocket.SetUserBytes[b, 0]; PupSocketBackdoor.PutFirst[socket, b]; PupSocket.FreeBuffer[b]; }; SendEnd: PROC [handle: Handle] = { socket: Socket _ handle.socket; b: Buffer _ PupSocket.AllocBuffer[socket]; b.type _ end; b.id _ handle.connectionId; PupSocket.SetUserBytes[b, 0]; PupSocket.Put[socket, b]; }; SendEndReplyFirst: PROC [handle: Handle, b: Buffer] = { socket: Socket _ handle.socket; b.type _ endRep; b.id _ handle.connectionId; PupSocket.SetUserBytes[b, 0]; PupSocketBackdoor.PutFirst[socket, b]; }; SendInt: PROC [handle: Handle] = { socket: Socket _ handle.socket; b: Buffer _ PupSocket.AllocBuffer[socket]; b.type _ int; b.id _ Endian.FFromCard[handle.sendAttenIdSent]; PupSocket.SetUserBytes[b, 0]; PupSocket.Put[socket, b]; }; SendIntReplyFirst: PROC [handle: Handle, b: Buffer] = { socket: Socket _ handle.socket; b.type _ intRep; b.id _ Endian.FFromCard[handle.recvAttenIdArrived]; PupSocket.SetUserBytes[b, 0]; PupSocketBackdoor.PutFirst[socket, b]; }; <> FinishOutputFinger: PROC [handle: Handle] = { finger: Finger _ handle.outputWrite; b: Buffer _ finger.buffer; b.id _ Endian.FFromCard[handle.pushId]; handle.pushId _ handle.pushId + finger.index; PupSocket.SetUserBytes[b, finger.index]; finger.inProgress _ TRUE; finger.state _ full; handle.outputWrite _ finger.next; NotifyOutputReady[handle]; }; Pusher: PROC [handle: Handle] = { socket: Socket _ handle.socket; Process.SetPriority[CommDriver.sendPriority]; -- priorityClient3 = priorityForeground+1 DO finger: Finger _ WaitOutputReady[handle]; b: Buffer; IF handle.err # NIL THEN EXIT; b _ finger.buffer; IF finger.ack THEN b.type _ IF finger.mark THEN aMark ELSE aData ELSE b.type _ IF finger.mark THEN mark ELSE data; PupSocketBackdoor.PutAgain[socket, b]; IF finger.ack THEN handle.timeOfLastPush _ BasicTime.GetClockPulses[]; finger.inProgress _ FALSE; ENDLOOP; handle _ NIL; -- Allow Finalization }; <> <<>> Retransmitter: PROC [handle: Handle] = { socket: Socket _ handle.socket; GetValidOutputToAck: PROC RETURNS [Finger] ~ { finger: Finger _ handle.outputToAck; IF finger.state # full THEN ERROR; WHILE finger.inProgress DO Process.Pause[1]; IF handle.err # NIL THEN RETURN[NIL]; ENDLOOP; RETURN [finger] }; Process.SetPriority[Process.priorityForeground]; UNTIL handle.err # NIL DO WaitAck[handle]; -- 6 seconds IF handle.err # NIL THEN EXIT; IF NotifyOutputSpace[handle] THEN LOOP; -- Should calculate retransmit time ??? IF handle.outputBuffersReady = 0 THEN LOOP; -- Nothing to send IF handle.pushPackets = 0 THEN { SendProbe[handle]; LOOP; }; -- Waiting for Alloc handle.retransmitting _ TRUE; clumpsRetransmitted _ clumpsRetransmitted.SUCC; UNTIL handle.err # NIL DO -- Retransmit each packet until it gets acked finger: Finger; IF NotifyOutputSpace[handle] THEN EXIT; IF (finger _ GetValidOutputToAck[]) = NIL THEN EXIT; Process.Pause[2]; IF NotifyOutputSpace[handle] THEN EXIT; IF (finger _ GetValidOutputToAck[]) = NIL THEN EXIT; finger.buffer.type _ IF finger.mark THEN aMark ELSE aData; PupSocketBackdoor.PutFirst[socket, finger.buffer]; packetsRetransmitted _ packetsRetransmitted.SUCC; handle.timeOfLastPush _ BasicTime.GetClockPulses[]; WaitAck[handle]; -- 6 seconds ENDLOOP; ENDLOOP; handle _ NIL; -- Allow Finalization }; <> WaitInputReady: PROC [handle: Handle] = { DO CheckForClosed[handle]; WaitInputReadyInner[handle]; CheckForClosed[handle]; -- Avoid AddressFault if Closed IF handle.inputRead.state = full THEN RETURN; SIGNAL Timeout; ENDLOOP; }; WaitInputReadyInner: ENTRY PROC [handle: Handle] = INLINE { ENABLE UNWIND => NULL; IF handle.inputRead.state = full THEN RETURN; WAIT handle.inputReady; }; NotifyInputSpace: ENTRY PROC [handle: Handle] RETURNS [finger: Finger] = { finger _ handle.inputRead; finger.state _ empty; finger _ finger.next; handle.inputRead _ finger; handle.inputBuffersAvailable _ handle.inputBuffersAvailable.SUCC; }; WaitOutputSpace: PROC [handle: Handle] = { DO WaitOutputSpaceInner[handle]; CheckForClosed[handle]; -- Avoid AddressFault if Closed IF handle.outputWrite.state = empty THEN RETURN; SIGNAL Timeout; ENDLOOP; }; WaitOutputSpaceInner: ENTRY PROC [handle: Handle] = INLINE { ENABLE UNWIND => NULL; IF handle.outputWrite.state = empty THEN RETURN; WAIT handle.outputSpace; }; WaitOutputFlushed: ENTRY PROC [handle: Handle] = INLINE { ENABLE UNWIND => NULL; IF handle.outputBuffersReady = 0 THEN handle.flush _ FALSE; IF ~handle.flush THEN RETURN; WAIT handle.outputFlushed; }; NotifyOutputSpace: ENTRY PROC [handle: Handle] RETURNS [progress: BOOL _ FALSE] = { <> DO finger: Finger _ handle.outputToAck; IF finger.state # full THEN EXIT; IF finger.inProgress THEN EXIT; IF Offset[handle.ackedId, Endian.CardFromF[finger.buffer.id]] <= 0 THEN EXIT; IF finger.ack THEN progress _ ~handle.retransmitting; finger.state _ empty; handle.outputToAck _ finger.next; handle.outputBuffersReady _ handle.outputBuffersReady.PRED; handle.outputBuffersInFlight _ handle.outputBuffersInFlight.PRED; NOTIFY handle.outputSpace; ENDLOOP; IF handle.retransmitting AND handle.outputBuffersInFlight = 0 THEN { progress _ TRUE; handle.retransmitting _ FALSE; }; IF handle.outputBuffersReady = 0 THEN { handle.flush _ FALSE; NOTIFY handle.outputFlushed; }; IF progress THEN { BROADCAST handle.alloc; BROADCAST handle.outputReady; }; }; WaitOutputReady: ENTRY PROC [handle: Handle] RETURNS [finger: Finger] = { DO UNTIL handle.pushPackets > handle.outputBuffersInFlight AND handle.pushBytes # 0 DO IF handle.err # NIL THEN RETURN[NIL]; WAIT handle.alloc; ENDLOOP; IF handle.err # NIL THEN RETURN[NIL]; finger _ handle.outputRead; SELECT finger.state FROM full => IF finger.inProgress AND ~handle.retransmitting THEN EXIT; ENDCASE => NULL; WAIT handle.outputReady; ENDLOOP; handle.outputRead _ handle.outputRead.next; handle.outputBuffersInFlight _ handle.outputBuffersInFlight.SUCC; SELECT TRUE FROM handle.pushPackets = handle.outputBuffersInFlight => finger.ack _ TRUE; handle.flush AND finger.next.state # full => finger.ack _ TRUE; handle.outputBuffersToUse = handle.outputBuffersInFlight => finger.ack _ TRUE; handle.outputBuffersAllocated = handle.outputBuffersInFlight => finger.ack _ TRUE; ENDCASE => finger.ack _ FALSE; }; NotifyOutputReady: ENTRY PROC [handle: Handle] = { handle.outputBuffersReady _ handle.outputBuffersReady.SUCC; NOTIFY handle.outputReady; }; WaitStateChange: ENTRY PROC [handle: Handle] = { IF handle.err # NIL THEN RETURN; WAIT handle.stateChange; }; ProcessAck: ENTRY PROC [handle: Handle, b: Buffer] = { offset: INT _ Offset[Endian.CardFromF[b.id], handle.ackedId]; SELECT offset FROM < 0 => oldAcks _ oldAcks.SUCC; ENDCASE => { handle.ackedId _ handle.ackedId + offset; handle.pushPacketSize _ MIN[b.maxBytesPerPup, handle.bufferSize]; handle.pushPackets _ b.maxPupsAhead; handle.pushBytes _ b.maxBytesAhead; IF handle.pushPackets # 0 THEN BROADCAST handle.ack; -- ExchangeAcks too IF offset # 0 AND handle.pushPackets > handle.outputBuffersInFlight THEN BROADCAST handle.alloc; handle.timeOfLastArrival _ BasicTime.GetClockPulses[]; }; }; WaitAck: ENTRY PROC [handle: Handle] = { finger: Finger _ handle.outputToAck; fingerId: LONG CARDINAL = Endian.CardFromF[finger.buffer.id]; IF finger.state = full AND Offset[handle.ackedId, fingerId] > 0 THEN RETURN; WAIT handle.ack; }; WaitForAllocation: ENTRY PROC [handle: Handle] = { IF handle.pushPackets = 0 OR handle.pushBytes = 0 THEN WAIT handle.ack; NOTIFY handle.alloc; }; NotifyRecvAttention: ENTRY PROC [handle: Handle] = { NOTIFY handle.recvAttention; }; WaitRecvAttention: ENTRY PROC [handle: Handle] = { ENABLE UNWIND => NULL; WAIT handle.recvAttention; }; NotifySendAttention: ENTRY PROC [handle: Handle] = { NOTIFY handle.sendAttention; }; WaitSendAttention: ENTRY PROC [handle: Handle] = { ENABLE UNWIND => NULL; WAIT handle.sendAttention; }; CheckForClosed: PROC [handle: Handle] = { IF handle.err # NIL THEN ERROR StreamClosing[handle.why, handle.err]; }; SmashClosed: ENTRY PROC [handle: Handle, why: PupStream.CloseReason, err: ROPE] = { <> IF handle.err # NIL THEN RETURN; handle.why _ why; handle.err _ err; BROADCAST handle.stateChange; BROADCAST handle.outputReady; BROADCAST handle.outputFlushed; BROADCAST handle.outputSpace; BROADCAST handle.inputReady; BROADCAST handle.alloc; BROADCAST handle.ack; BROADCAST handle.sendAttention; BROADCAST handle.recvAttention; PupSocket.Kick[handle.socket]; }; <> GetBufferSize: PROC [remote: Pup.Address] RETURNS [bufferSizes: NAT] = { hop: PupHop.Hop = PupHop.GetHop[remote.net]; IF hop # 0 THEN RETURN[MIN[maxBufferSize, maxNewGatewayBytes]]; <> RETURN[MIN[maxBufferSize, maxDataBytes]]; }; AllocateBuffers: PROC [handle: Handle, outputBuffers, inputBuffers: NAT] = TRUSTED { socket: PupSocket.Socket _ handle.socket; handle.bufferSize _ GetBufferSize[handle.remote]; FOR i: NAT IN [0..outputBuffers) DO finger: Finger _ NEW[FingerObject _ []]; finger.buffer _ PupSocket.AllocBuffer[socket]; finger.buffer.id _ handle.connectionId; IF handle.outputRead = NIL THEN handle.outputRead _ finger; finger.next _ handle.outputWrite; handle.outputWrite _ finger; ENDLOOP; handle.outputRead.next _ handle.outputWrite; handle.outputRead _ handle.outputWrite; handle.outputToAck _ handle.outputWrite; FOR i: NAT IN [0..inputBuffers) DO finger: Finger _ NEW[FingerObject _ []]; finger.buffer _ PupSocketBackdoor.AllocRecvBuffer[socket]; IF handle.inputRead = NIL THEN handle.inputRead _ finger; finger.next _ handle.inputWrite; handle.inputWrite _ finger; ENDLOOP; handle.inputRead.next _ handle.inputWrite; handle.inputRead _ handle.inputWrite; handle.outputBuffersToUse _ (outputBuffers+1)/2; -- Double Buffering handle.outputBuffersAllocated _ outputBuffers; handle.outputBuffersReady _ 0; handle.inputBuffersToUse _ (inputBuffers+1)/2; -- Double Buffering handle.inputBuffersAllocated _ inputBuffers; handle.inputBuffersAvailable _ inputBuffers; }; FreeBuffers: ENTRY PROC [handle: Handle] = TRUSTED { socket: PupSocket.Socket _ handle.socket; FOR i: NAT IN [0..handle.outputBuffersAllocated) DO finger: Finger _ handle.outputWrite; handle.outputWrite _ finger.next; IF finger.buffer # NIL THEN PupSocket.FreeBuffer[finger.buffer]; finger.next _ NIL; ENDLOOP; FOR i: NAT IN [0..handle.inputBuffersAllocated) DO finger: Finger _ handle.inputWrite; handle.inputWrite _ finger.next; IF finger.buffer # NIL THEN PupSocket.FreeBuffer[finger.buffer]; finger.next _ NIL; ENDLOOP; handle.outputToAck _ NIL; handle.outputRead _ NIL; handle.outputWrite _ NIL; handle.inputRead _ NIL; handle.inputWrite _ NIL; }; <> <> <<>> globalLock: Handle _ NEW[Object]; chain: Handle _ NIL; AddHandle: ENTRY PROC [handle: Handle, new: Handle] = { <> new.next _ chain; chain _ new; }; RemoveHandle: ENTRY PROC [handle: Handle, old: Handle] = { <> IF chain = old THEN { chain _ chain.next; old.next _ NIL; -- Help Finalization RETURN; }; FOR finger: Handle _ chain, finger.next DO -- NIL fault if not on chain IF finger.next = old THEN { finger.next _ old.next; old.next _ NIL; -- Help Finalization RETURN; }; ENDLOOP; }; Rollback: Booting.RollbackProc = { FOR handle: Handle _ chain, handle.next UNTIL handle = NIL DO IF handle.err # NIL THEN LOOP; SmashClosed[handle, localClose, "Rollback"]; ENDLOOP; }; <> <> <<>> StreamFinalizer: PROC = { Process.SetPriority[Process.priorityBackground]; DO handle: Handle _ NARROW[SafeStorage.FQNext[sfq]]; IF ~handle.dead THEN { -- User forgot to call Destroy SafeStorage.EnableFinalization[handle]; IF handle.err = NIL THEN SendAbort[handle, "Client dropped Stream"]; CloseHandle[handle]; droppedStreams _ droppedStreams.SUCC; } ELSE { -- Normal end of life RemoveHandle[globalLock, handle]; finishedStreams _ finishedStreams.SUCC; }; handle _ NIL; ENDLOOP; }; DropTest: PROC [n: NAT _ 25] = { where: Pup.Address _ [[3],[17B],[0,0,0,1]]; --Ivy FOR i: NAT IN [0..3) DO foo: STREAM _ Create[where, 1000, 1000]; IO.Close[foo]; IO.Close[foo]; Process.Pause[Process.MsecToTicks[10000]]; ENDLOOP; FOR i: NAT IN [0..n) DO foo: STREAM _ Create[where, 1000, 1000]; Abort[foo," Testing..."]; Process.Pause[Process.MsecToTicks[10000]]; ENDLOOP; }; sfq: SafeStorage.FinalizationQueue _ SafeStorage.NewFQ[]; SafeStorage.EstablishFinalization[type: Object.CODE, npr: 1, fq: sfq]; TRUSTED { Process.Detach[FORK StreamFinalizer[]]; }; Booting.RegisterProcs[r: Rollback]; }.