<> <> <> <> DIRECTORY BufferDefs USING [Buffer, Enqueue, PupBufferObject, ReturnFreeBuffer], PrincOpsUtils USING [GetReturnLink, MyLocalFrame], PupDefs USING [ DequeuePup, EnqueuePup, GetFreePupBuffer, GetPupContentsBytes, PupBuffer, ReturnFreePupBuffer, SetPupContentsBytes], PupTypes USING [PupType], Sequin USING [Buffer, noBuffer], SequinPrivate USING [maxAllocate, maxBytes, maxPings, RequeueClosure, SequenceNumber, SequinID, SequinControl, SequinRep], BasicTime USING [Now, Period]; SequinImplB: MONITOR LOCKS sequin.LOCK USING sequin: Handle IMPORTS BufferDefs, PrincOpsUtils, PupDefs, SequinPrivate, BasicTime EXPORTS Sequin, SequinPrivate = BEGIN OPEN PupDefs, Sequin, SequinPrivate; <> Handle: TYPE = REF SequinRep; SequinRep: PUBLIC TYPE = SequinPrivate.SequinRep; <> BogusBuffer: ERROR = CODE; ServerIsSpeakingGreek: ERROR = CODE; WhereDidIPutThatBuffer: ERROR = CODE; idleLatency: INT = 10; <> Broken: PUBLIC ERROR = CODE; Get: PUBLIC ENTRY PROC [sequin: Handle] RETURNS [Buffer] = BEGIN packet: PupBuffer; DO IF sequin.broken THEN RETURN WITH ERROR Broken; IF (packet _ DequeuePup[sequin.getQueue]) ~= NIL THEN EXIT; WAIT sequin.goAhead; ENDLOOP; RETURN[PupToSequinBuffer[packet]] END; GetIfAvailable: PUBLIC ENTRY PROC [sequin: Handle] RETURNS [Buffer] = BEGIN packet: PupBuffer; IF sequin.broken THEN RETURN WITH ERROR Broken; IF (packet _ DequeuePup[sequin.getQueue]) = NIL THEN RETURN[noBuffer]; RETURN[PupToSequinBuffer[packet]] END; Put: PUBLIC PROC [sequin: Handle, buffer: Buffer] = BEGIN packet: PupBuffer = SequinToPupBuffer[buffer]; SetPupContentsBytes[packet, buffer.nBytes]; Send[sequin, packet, data]; END; GetEmptyBuffer: PUBLIC PROC RETURNS [Buffer] = BEGIN packet: PupBuffer = GetFreePupBuffer[]; SetPupContentsBytes[packet, 0]; RETURN[PupToSequinBuffer[packet]] END; ReleaseBuffer: PUBLIC PROC [buffer: Buffer] = BEGIN IF buffer = noBuffer THEN RETURN; ReturnFreePupBuffer[SequinToPupBuffer[buffer]]; END; <> Send: PUBLIC ENTRY PROC [ sequin: Handle, packet: PupBuffer, control: SequinControl] = { DO retrans: SequenceNumber = AddWrap[sequin.retransmitSequence, MIN[sequin.allocate, maxAllocate]]; IF sequin.broken OR Compare[retrans, sequin.id.sendSequence] = ahead THEN EXIT; WAIT sequin.goAhead; ENDLOOP; IF sequin.broken THEN {ReturnFreePupBuffer[packet]; RETURN WITH ERROR Broken}; IF control = data AND sequin.state = init THEN {control _ open; sequin.state _ open}; sequin.id.control _ control; LOOPHOLE[@packet.pupID, LONG POINTER TO SequinID]^ _ sequin.id; IF GetPupContentsBytes[packet] ~= 0 THEN sequin.id.sendSequence _ AddWrap[sequin.id.sendSequence]; packet.pupType _ sequin.pupType; SendRequeueable[sequin, packet]; }; SocketWarmer: PUBLIC PROC [sequin: Handle] = <> BEGIN packet: PupBuffer; packetID: LONG POINTER TO SequinID; alive: BOOLEAN _ TRUE; ProcessPacket: PROC [sequin: Handle] RETURNS [alive: BOOLEAN] = INLINE BEGIN alive _ TRUE; IF packet = NIL THEN BEGIN SELECT CheckForPing[sequin] FROM no => NULL; retransmit => Retransmit[sequin]; check => RespondWithNewPacket[check]; dead => RETURN[FALSE]; ENDCASE; RETURN END; IF GetPupContentsBytes[packet] > maxBytes THEN RETURN; SELECT packet.pupType FROM error => BEGIN OPEN PupTypes; SELECT packet.errorCode FROM noProcessPupErrorCode, cantGetTherePupErrorCode, hostDownPupErrorCode, eightHopsPupErrorCode => RETURN[FALSE]; ENDCASE => RETURN; END; sequin.pupType => NULL; ENDCASE => RETURN; packetID _ LOOPHOLE[@packet.pupID]; IF packetID.control = broken THEN RETURN[FALSE]; SELECT Compare[packetID.sendSequence, sequin.id.receiveSequence] FROM duplicate => RETURN; ahead => BEGIN DiscardAckedPackets[sequin]; IF packetID.control = restart THEN Retransmit[sequin]; IF ~sequin.restartRequested THEN BEGIN sequin.restartRequested _ TRUE; RespondWithCurrentPacket[sequin, restart]; END; RETURN END; ENDCASE; <> ResetPinging[]; DiscardAckedPackets[sequin]; SELECT packetID.control FROM data => {EnqueueArrival[sequin]; RespondWithNewPacket[ack]}; dallying => {RespondWithCurrentPacket[sequin, quit]; RETURN[FALSE]}; restart => Retransmit[sequin]; check => RespondWithCurrentPacket[sequin, ack]; ack, nop => NULL; ENDCASE => ERROR ServerIsSpeakingGreek; END; CheckForPing: ENTRY PROC [sequin: Handle] RETURNS [{no, retransmit, check, dead}] = INLINE BEGIN SELECT sequin.pings FROM 0 => IF BasicTime.Period[sequin.lastPacketTime, BasicTime.Now[]] <= idleLatency AND sequin.retransmitQueue.length = 0 THEN RETURN[no]; maxPings => RETURN[dead]; ENDCASE; IF sequin.state = init THEN RETURN[no]; -- don't ping until first real packet sequin.pings _ sequin.pings + 1; sequin.recentRestart _ sequin.restartRequested _ FALSE; RETURN[ IF sequin.retransmitQueue.length = 1 AND sequin.pings = 1 THEN retransmit ELSE check] END; ResetPinging: PROC = INLINE {sequin.pings _ 0; sequin.lastPacketTime _ BasicTime.Now[]}; DiscardAckedPackets: ENTRY PROC [sequin: Handle] = BEGIN sequin.allocate _ packetID.allocate; IF Compare[packetID.receiveSequence, sequin.retransmitSequence] = ahead THEN BEGIN skipped: CARDINAL _ 0; UNTIL packetID.receiveSequence = sequin.retransmitSequence OR skipped = sequin.retransmitQueue.length DO b: PupBuffer _ DequeuePup[sequin.retransmitQueue]; IF b = NIL THEN EXIT; -- buffer hasn't made it to queue yet IF sequin.retransmitSequence ~= LOOPHOLE[b.pupID, SequinID].sendSequence THEN {EnqueuePup[sequin.retransmitQueue, b]; skipped _ skipped + 1; LOOP}; ReturnFreePupBuffer[b]; sequin.retransmitSequence _ AddWrap[sequin.retransmitSequence, 1]; skipped _ 0; ENDLOOP; BROADCAST sequin.goAhead; END; END; EnqueueArrival: ENTRY PROC [sequin: Handle] = INLINE BEGIN EnqueuePup[sequin.getQueue, packet]; sequin.id.receiveSequence _ AddWrap[sequin.id.receiveSequence, 1]; sequin.recentRestart _ sequin.restartRequested _ FALSE; NOTIFY sequin.goAhead; END; RespondWithNewPacket: PROC [control: SequinControl] = BEGIN packet _ GetFreePupBuffer[]; packetID _ LOOPHOLE[@packet.pupID]; RespondWithCurrentPacket[sequin, control]; END; RespondWithCurrentPacket: ENTRY PROC [sequin: Handle, control: SequinControl] = BEGIN sequin.id.control _ control; packetID^ _ sequin.id; packet.pupType _ sequin.pupType; SetPupContentsBytes[packet, 0]; sequin.socket.put[packet]; packet _ NIL; END; MarkDead: ENTRY PROC [sequin: Handle] = BEGIN sequin.broken _ TRUE; sequin.state _ destroyed; BROADCAST sequin.goAhead; END; Retransmit: ENTRY PROC [sequin: Handle] = BEGIN skipped: CARDINAL _ 0; seq: SequenceNumber _ sequin.retransmitSequence; IF sequin.recentRestart THEN RETURN; UNTIL skipped = sequin.retransmitQueue.length DO buffer: PupBuffer = DequeuePup[sequin.retransmitQueue]; bufferID: LONG POINTER TO SequinID; IF buffer = NIL THEN ERROR WhereDidIPutThatBuffer; IF (bufferID _ LOOPHOLE[@buffer.pupID]).sendSequence ~= seq THEN {EnqueuePup[sequin.retransmitQueue, buffer]; skipped _ skipped + 1; LOOP}; bufferID.inPart _ sequin.id.inPart; SendRequeueable[sequin, buffer]; sequin.recentRestart _ TRUE; skipped _ 0; seq _ AddWrap[seq, 1]; ENDLOOP; END; ResetPinging[]; WHILE alive DO packet _ sequin.socket.get[]; IF ~(alive _ ProcessPacket[sequin]) THEN MarkDead[sequin]; IF packet ~= NIL THEN ReturnFreePupBuffer[packet]; ENDLOOP; END; MakeRequeueClosure: PUBLIC PROC [sequin: Handle] RETURNS [closure: RequeueClosure] = BEGIN OPEN PrincOpsUtils; Return: PROC [RequeueClosure] _ LOOPHOLE[GetReturnLink[]]; Requeue: PROC [buffer: BufferDefs.Buffer] = BEGIN DoRequeue: ENTRY PROC [sequin: Handle] = INLINE BEGIN sequin.buffersToRequeue _ sequin.buffersToRequeue - 1; IF sequin.state = destroyed THEN BEGIN BufferDefs.ReturnFreeBuffer[buffer]; IF sequin.buffersToRequeue = 0 THEN {NOTIFY sequin.goAhead; RETURN}; END ELSE BufferDefs.Enqueue[sequin.retransmitQueue, buffer]; END; DoRequeue[sequin]; END; sequin.requeuer _ Requeue; Return[MyLocalFrame[]]; <> END; <> SendRequeueable: INTERNAL PROC [sequin: Handle, buffer: PupBuffer] = BEGIN buffer.requeueProcedure _ sequin.requeuer; sequin.buffersToRequeue _ sequin.buffersToRequeue + 1; sequin.socket.put[buffer]; END; Compare: PROC [a, b: SequenceNumber] RETURNS [{equal, duplicate, ahead}] = BEGIN maxGetAhead: CARDINAL = 128; RETURN[ SELECT TRUE FROM a = b => equal, a > b => IF a <= b + maxGetAhead THEN ahead ELSE duplicate, ENDCASE => IF b <= a + maxGetAhead THEN duplicate ELSE ahead] END; AddWrap: PROC [seq: SequenceNumber, delta: CARDINAL _ 1] RETURNS [SequenceNumber] = INLINE { RETURN [(seq+delta) MOD (LAST[SequenceNumber]+1)]; }; PupToSequinBuffer: PROC [b: PupBuffer] RETURNS [Buffer] = INLINE {RETURN[[data: LOOPHOLE[@b.pupBody], nBytes: GetPupContentsBytes[b], maxBytes: maxBytes]]}; SequinToPupBuffer: PROC [b: Buffer] RETURNS [PupBuffer] = BEGIN IF b.data = NIL THEN ERROR BogusBuffer; RETURN[LOOPHOLE[b.data-SIZE[pupBytes BufferDefs.PupBufferObject]]] END; END.