<> <> <> <> <> DIRECTORY BasicTime USING [Now, Period], BufferDefs USING [Buffer, Enqueue, PupBufferObject, QueueCleanup, QueueInitialize, QueueObject, ReturnFreeBuffer], PrincOpsUtils USING [Free, GetReturnLink, MyLocalFrame], Process USING [Detach], PupDefs USING [DataWordsPerPupBuffer, DequeuePup, EnqueuePup, GetFreePupBuffer, GetHopsToNetwork, GetPupContentsBytes, MsToTocks, PupAddress, PupBuffer, PupSocketDestroy, PupSocketMake, ReturnFreePupBuffer, SetPupContentsBytes, Tocks, UniqueLocalPupSocketID], PupTypes USING [maxDataWordsPerGatewayPup, PupType], Sequin USING [Buffer, noBuffer], SequinPrivate USING [maxPings, RequeueClosure, SequenceNumber, SequinControl, SequinID, SequinRep]; SequinImpl: MONITOR LOCKS sequin.LOCK USING sequin: Handle IMPORTS BasicTime, BufferDefs, PrincOpsUtils, Process, PupDefs EXPORTS Sequin, SequinPrivate = BEGIN OPEN PupDefs, Sequin, SequinPrivate; <> Handle: TYPE = REF SequinRep; SequinRep: PUBLIC TYPE = SequinPrivate.SequinRep; <> maxBytes: PUBLIC CARDINAL; maxAllocate: PUBLIC CARDINAL; <> nSequins: CARDINAL _ 0; <> SequinsInUse: ERROR = CODE; BogusBuffer: ERROR = CODE; ServerIsSpeakingGreek: ERROR = CODE; WhereDidIPutThatBuffer: ERROR = CODE; idleLatency: INT = 10; <> Create: PUBLIC PROC [dest: PupAddress, pupType: PupTypes.PupType] RETURNS [sequin: Handle] = { GetTicks: PROC RETURNS [Tocks] = { hops: CARDINAL _ GetHopsToNetwork[dest.net]; IF hops = LAST[CARDINAL] THEN hops _ 0; -- no route RETURN [[MsToTocks[hops * 750 + 500]]] }; sequin _ NEW[SequinRep _ [pupType: pupType]]; BufferDefs.QueueInitialize[sequin.retransmitQueue _ NEW[BufferDefs.QueueObject]]; BufferDefs.QueueInitialize[sequin.getQueue _ NEW[BufferDefs.QueueObject]]; sequin.socket _ PupSocketMake[UniqueLocalPupSocketID[], dest, GetTicks[]]; sequin.id.allocate _ maxAllocate; sequin.closure _ MakeRequeueClosure[sequin]; sequin.opened _ TRUE; Process.Detach[FORK SocketWarmer[sequin]]; nSequins _ nSequins + 1; }; WasOpened: ENTRY PROC [sequin: Handle] RETURNS [wasOpened: BOOL] = { IF sequin = NIL THEN RETURN [FALSE]; IF wasOpened _ sequin.opened THEN sequin.opened _ FALSE; }; WaitUntilAllQuiet: ENTRY PROC [sequin: Handle] = { UNTIL sequin.state = destroyed DO WAIT sequin.goAhead ENDLOOP; UNTIL sequin.buffersToRequeue = 0 DO WAIT sequin.goAhead ENDLOOP; }; Destroy: PUBLIC PROC [sequin: Handle] = { IF WasOpened[sequin] THEN { buffer: PupBuffer _ GetFreePupBuffer[]; SetPupContentsBytes[buffer, 0]; Send[sequin, buffer, destroy ! Broken => CONTINUE]; WaitUntilAllQuiet[sequin]; BufferDefs.QueueCleanup[sequin.retransmitQueue]; BufferDefs.QueueCleanup[sequin.getQueue]; PupSocketDestroy[sequin.socket]; PrincOpsUtils.Free[sequin.closure]; nSequins _ nSequins - 1; }; }; Broken: PUBLIC ERROR = CODE; Get: PUBLIC ENTRY PROC [sequin: Handle] RETURNS [Buffer] = { packet: PupBuffer; DO IF sequin.broken THEN RETURN WITH ERROR Broken; IF (packet _ DequeuePup[sequin.getQueue]) ~= NIL THEN EXIT; WAIT sequin.goAhead; ENDLOOP; RETURN[PupToSequinBuffer[packet]] }; GetIfAvailable: PUBLIC ENTRY PROC [sequin: Handle] RETURNS [Buffer] = { packet: PupBuffer; IF sequin.broken THEN RETURN WITH ERROR Broken; IF (packet _ DequeuePup[sequin.getQueue]) = NIL THEN RETURN[noBuffer]; RETURN[PupToSequinBuffer[packet]] }; Put: PUBLIC PROC [sequin: Handle, buffer: Buffer] = { packet: PupBuffer = SequinToPupBuffer[buffer]; SetPupContentsBytes[packet, buffer.nBytes]; Send[sequin, packet, data]; }; GetEmptyBuffer: PUBLIC PROC RETURNS [Buffer] = { packet: PupBuffer = GetFreePupBuffer[]; SetPupContentsBytes[packet, 0]; RETURN[PupToSequinBuffer[packet]] }; ReleaseBuffer: PUBLIC PROC [buffer: Buffer] = { IF buffer = noBuffer THEN RETURN; ReturnFreePupBuffer[SequinToPupBuffer[buffer]]; }; <> Send: PUBLIC ENTRY PROC [sequin: Handle, packet: PupBuffer, control: SequinControl] = { DO retrans: SequenceNumber = AddWrap[sequin.retransmitSequence, MIN[sequin.allocate, maxAllocate]]; IF sequin.broken OR Compare[retrans, sequin.id.sendSequence] = ahead THEN EXIT; WAIT sequin.goAhead; ENDLOOP; IF sequin.broken THEN {ReturnFreePupBuffer[packet]; RETURN WITH ERROR Broken}; IF control = data AND sequin.state = init THEN {control _ open; sequin.state _ open}; sequin.id.control _ control; LOOPHOLE[@packet.pupID, LONG POINTER TO SequinID]^ _ sequin.id; IF GetPupContentsBytes[packet] ~= 0 THEN sequin.id.sendSequence _ AddWrap[sequin.id.sendSequence]; packet.pupType _ sequin.pupType; SendRequeueable[sequin, packet]; }; SocketWarmer: PUBLIC PROC [sequin: Handle] = { <> packet: PupBuffer; packetID: LONG POINTER TO SequinID; alive: BOOLEAN _ TRUE; ProcessPacket: PROC [sequin: Handle] RETURNS [alive: BOOLEAN] = INLINE { alive _ TRUE; IF packet = NIL THEN { SELECT CheckForPing[sequin] FROM no => NULL; retransmit => Retransmit[sequin]; check => RespondWithNewPacket[check]; dead => RETURN[FALSE]; ENDCASE; RETURN }; IF GetPupContentsBytes[packet] > maxBytes THEN RETURN; SELECT packet.pupType FROM error => { OPEN PupTypes; SELECT packet.errorCode FROM noProcessPupErrorCode, cantGetTherePupErrorCode, hostDownPupErrorCode, eightHopsPupErrorCode => RETURN[FALSE]; ENDCASE => RETURN; }; sequin.pupType => NULL; ENDCASE => RETURN; packetID _ LOOPHOLE[@packet.pupID]; IF packetID.control = broken THEN RETURN[FALSE]; SELECT Compare[packetID.sendSequence, sequin.id.receiveSequence] FROM duplicate => RETURN; ahead => { DiscardAckedPackets[sequin]; IF packetID.control = restart THEN Retransmit[sequin]; IF ~sequin.restartRequested THEN { sequin.restartRequested _ TRUE; RespondWithCurrentPacket[sequin, restart]; }; RETURN }; ENDCASE; <> ResetPinging[]; DiscardAckedPackets[sequin]; SELECT packetID.control FROM data => {EnqueueArrival[sequin]; RespondWithNewPacket[ack]}; dallying => {RespondWithCurrentPacket[sequin, quit]; RETURN[FALSE]}; restart => Retransmit[sequin]; check => RespondWithCurrentPacket[sequin, ack]; ack, nop => NULL; ENDCASE => ERROR ServerIsSpeakingGreek; }; CheckForPing: ENTRY PROC [sequin: Handle] RETURNS [{no, retransmit, check, dead}] = INLINE { SELECT sequin.pings FROM 0 => IF BasicTime.Period[sequin.lastPacketTime, BasicTime.Now[]] <= idleLatency AND sequin.retransmitQueue.length = 0 THEN RETURN[no]; maxPings => RETURN[dead]; ENDCASE; IF sequin.state = init THEN RETURN[no]; -- don't ping until first real packet sequin.pings _ sequin.pings + 1; sequin.recentRestart _ sequin.restartRequested _ FALSE; RETURN[ IF sequin.retransmitQueue.length = 1 AND sequin.pings = 1 THEN retransmit ELSE check] }; ResetPinging: PROC = { sequin.pings _ 0; sequin.lastPacketTime _ BasicTime.Now[]; }; DiscardAckedPackets: ENTRY PROC [sequin: Handle] = { sequin.allocate _ packetID.allocate; IF Compare[packetID.receiveSequence, sequin.retransmitSequence] = ahead THEN { skipped: CARDINAL _ 0; UNTIL packetID.receiveSequence = sequin.retransmitSequence OR skipped = sequin.retransmitQueue.length DO b: PupBuffer _ DequeuePup[sequin.retransmitQueue]; IF b = NIL THEN EXIT; -- buffer hasn't made it to queue yet IF sequin.retransmitSequence ~= LOOPHOLE[b.pupID, SequinID].sendSequence THEN {EnqueuePup[sequin.retransmitQueue, b]; skipped _ skipped + 1; LOOP}; ReturnFreePupBuffer[b]; sequin.retransmitSequence _ AddWrap[sequin.retransmitSequence, 1]; skipped _ 0; ENDLOOP; BROADCAST sequin.goAhead; }; }; EnqueueArrival: ENTRY PROC [sequin: Handle] = INLINE { EnqueuePup[sequin.getQueue, packet]; sequin.id.receiveSequence _ AddWrap[sequin.id.receiveSequence, 1]; sequin.recentRestart _ sequin.restartRequested _ FALSE; NOTIFY sequin.goAhead; }; RespondWithNewPacket: PROC [control: SequinControl] = { packet _ GetFreePupBuffer[]; packetID _ LOOPHOLE[@packet.pupID]; RespondWithCurrentPacket[sequin, control]; }; RespondWithCurrentPacket: ENTRY PROC [sequin: Handle, control: SequinControl] = { sequin.id.control _ control; packetID^ _ sequin.id; packet.pupType _ sequin.pupType; SetPupContentsBytes[packet, 0]; sequin.socket.put[packet]; packet _ NIL; }; MarkDead: ENTRY PROC [sequin: Handle] = { sequin.broken _ TRUE; sequin.state _ destroyed; BROADCAST sequin.goAhead; }; Retransmit: ENTRY PROC [sequin: Handle] = { skipped: CARDINAL _ 0; seq: SequenceNumber _ sequin.retransmitSequence; IF sequin.recentRestart THEN RETURN; UNTIL skipped = sequin.retransmitQueue.length DO buffer: PupBuffer = DequeuePup[sequin.retransmitQueue]; bufferID: LONG POINTER TO SequinID; IF buffer = NIL THEN ERROR WhereDidIPutThatBuffer; IF (bufferID _ LOOPHOLE[@buffer.pupID]).sendSequence ~= seq THEN {EnqueuePup[sequin.retransmitQueue, buffer]; skipped _ skipped + 1; LOOP}; bufferID.inPart _ sequin.id.inPart; SendRequeueable[sequin, buffer]; sequin.recentRestart _ TRUE; skipped _ 0; seq _ AddWrap[seq, 1]; ENDLOOP; }; ResetPinging[]; WHILE alive DO packet _ sequin.socket.get[]; IF ~(alive _ ProcessPacket[sequin]) THEN MarkDead[sequin]; IF packet ~= NIL THEN ReturnFreePupBuffer[packet]; ENDLOOP; }; MakeRequeueClosure: PUBLIC PROC [sequin: Handle] RETURNS [closure: RequeueClosure] = { OPEN PrincOpsUtils; Return: PROC [RequeueClosure] _ LOOPHOLE[GetReturnLink[]]; Requeue: PROC [buffer: BufferDefs.Buffer] = { DoRequeue: ENTRY PROC [sequin: Handle] = INLINE { sequin.buffersToRequeue _ sequin.buffersToRequeue - 1; IF sequin.state = destroyed THEN { BufferDefs.ReturnFreeBuffer[buffer]; IF sequin.buffersToRequeue = 0 THEN {NOTIFY sequin.goAhead; RETURN}; } ELSE BufferDefs.Enqueue[sequin.retransmitQueue, buffer]; }; DoRequeue[sequin]; }; sequin.requeuer _ Requeue; Return[MyLocalFrame[]]; <> }; <> SendRequeueable: INTERNAL PROC [sequin: Handle, buffer: PupBuffer] = { buffer.requeueProcedure _ sequin.requeuer; sequin.buffersToRequeue _ sequin.buffersToRequeue + 1; sequin.socket.put[buffer]; }; Compare: PROC [a, b: SequenceNumber] RETURNS [{equal, duplicate, ahead}] = { maxGetAhead: CARDINAL = 128; RETURN[ SELECT TRUE FROM a = b => equal, a > b => IF a <= b + maxGetAhead THEN ahead ELSE duplicate, ENDCASE => IF b <= a + maxGetAhead THEN duplicate ELSE ahead] }; AddWrap: PROC [seq: SequenceNumber, delta: CARDINAL _ 1] RETURNS [SequenceNumber] = INLINE { RETURN [(seq+delta) MOD (LAST[SequenceNumber]+1)]; }; PupToSequinBuffer: PROC [b: PupBuffer] RETURNS [Buffer] = { RETURN[[data: LOOPHOLE[@b.pupBody], nBytes: GetPupContentsBytes[b], maxBytes: maxBytes]]; }; SequinToPupBuffer: PROC [b: Buffer] RETURNS [PupBuffer] = { IF b.data = NIL THEN ERROR BogusBuffer; RETURN[LOOPHOLE[b.data-SIZE[pupBytes BufferDefs.PupBufferObject]]] }; <> Initialize: PROC = { maxBytes _ 2*MIN[DataWordsPerPupBuffer[], PupTypes.maxDataWordsPerGatewayPup]; <<10 = leaf overhead/packet; +1 roundup; +1 for parallelism>> maxAllocate _ 511/(maxBytes-10) + 1 + 1; }; <
> Initialize[]; END.