DIRECTORY BufferDefs USING [Buffer, Enqueue, PupBufferObject, ReturnFreeBuffer], PrincOpsUtils USING [GetReturnLink, MyLocalFrame], PupDefs USING [ DequeuePup, EnqueuePup, GetFreePupBuffer, GetPupContentsBytes, PupBuffer, ReturnFreePupBuffer, SetPupContentsBytes], PupTypes USING [PupType], Sequin USING [Buffer, noBuffer], SequinPrivate USING [maxAllocate, maxBytes, maxPings, RequeueClosure, SequenceNumber, SequinID, SequinControl, SequinRep], BasicTime USING [Now, Period]; SequinImplB: MONITOR LOCKS sequin.LOCK USING sequin: Handle IMPORTS BufferDefs, PrincOpsUtils, PupDefs, SequinPrivate, BasicTime EXPORTS Sequin, SequinPrivate = BEGIN OPEN PupDefs, Sequin, SequinPrivate; Handle: TYPE = REF SequinRep; SequinRep: PUBLIC TYPE = SequinPrivate.SequinRep; BogusBuffer: ERROR = CODE; ServerIsSpeakingGreek: ERROR = CODE; WhereDidIPutThatBuffer: ERROR = CODE; idleLatency: INT = 10; Broken: PUBLIC ERROR = CODE; Get: PUBLIC ENTRY PROC [sequin: Handle] RETURNS [Buffer] = BEGIN packet: PupBuffer; DO IF sequin.broken THEN RETURN WITH ERROR Broken; IF (packet _ DequeuePup[sequin.getQueue]) ~= NIL THEN EXIT; WAIT sequin.goAhead; ENDLOOP; RETURN[PupToSequinBuffer[packet]] END; GetIfAvailable: PUBLIC ENTRY PROC [sequin: Handle] RETURNS [Buffer] = BEGIN packet: PupBuffer; IF sequin.broken THEN RETURN WITH ERROR Broken; IF (packet _ DequeuePup[sequin.getQueue]) = NIL THEN RETURN[noBuffer]; RETURN[PupToSequinBuffer[packet]] END; Put: PUBLIC PROC [sequin: Handle, buffer: Buffer] = BEGIN packet: PupBuffer = SequinToPupBuffer[buffer]; SetPupContentsBytes[packet, buffer.nBytes]; Send[sequin, packet, data]; END; GetEmptyBuffer: PUBLIC PROC RETURNS [Buffer] = BEGIN packet: PupBuffer = GetFreePupBuffer[]; SetPupContentsBytes[packet, 0]; RETURN[PupToSequinBuffer[packet]] END; ReleaseBuffer: PUBLIC PROC [buffer: Buffer] = BEGIN IF buffer = noBuffer THEN RETURN; ReturnFreePupBuffer[SequinToPupBuffer[buffer]]; END; Send: PUBLIC ENTRY PROC [ sequin: Handle, packet: PupBuffer, control: SequinControl] = { DO retrans: SequenceNumber = AddWrap[sequin.retransmitSequence, MIN[sequin.allocate, maxAllocate]]; IF sequin.broken OR Compare[retrans, sequin.id.sendSequence] = ahead THEN EXIT; WAIT sequin.goAhead; ENDLOOP; IF sequin.broken THEN {ReturnFreePupBuffer[packet]; RETURN WITH ERROR Broken}; IF control = data AND sequin.state = init THEN {control _ open; sequin.state _ open}; sequin.id.control _ control; LOOPHOLE[@packet.pupID, LONG POINTER TO SequinID]^ _ sequin.id; IF GetPupContentsBytes[packet] ~= 0 THEN sequin.id.sendSequence _ AddWrap[sequin.id.sendSequence]; packet.pupType _ sequin.pupType; SendRequeueable[sequin, packet]; }; SocketWarmer: PUBLIC PROC [sequin: Handle] = BEGIN packet: PupBuffer; packetID: LONG POINTER TO SequinID; alive: BOOLEAN _ TRUE; ProcessPacket: PROC [sequin: Handle] RETURNS [alive: BOOLEAN] = INLINE BEGIN alive _ TRUE; IF packet = NIL THEN BEGIN SELECT CheckForPing[sequin] FROM no => NULL; retransmit => Retransmit[sequin]; check => RespondWithNewPacket[check]; dead => RETURN[FALSE]; ENDCASE; RETURN END; IF GetPupContentsBytes[packet] > maxBytes THEN RETURN; SELECT packet.pupType FROM error => BEGIN OPEN PupTypes; SELECT packet.errorCode FROM noProcessPupErrorCode, cantGetTherePupErrorCode, hostDownPupErrorCode, eightHopsPupErrorCode => RETURN[FALSE]; ENDCASE => RETURN; END; sequin.pupType => NULL; ENDCASE => RETURN; packetID _ LOOPHOLE[@packet.pupID]; IF packetID.control = broken THEN RETURN[FALSE]; SELECT Compare[packetID.sendSequence, sequin.id.receiveSequence] FROM duplicate => RETURN; ahead => BEGIN DiscardAckedPackets[sequin]; IF packetID.control = restart THEN Retransmit[sequin]; IF ~sequin.restartRequested THEN BEGIN sequin.restartRequested _ TRUE; RespondWithCurrentPacket[sequin, restart]; END; RETURN END; ENDCASE; ResetPinging[]; DiscardAckedPackets[sequin]; SELECT packetID.control FROM data => {EnqueueArrival[sequin]; RespondWithNewPacket[ack]}; dallying => {RespondWithCurrentPacket[sequin, quit]; RETURN[FALSE]}; restart => Retransmit[sequin]; check => RespondWithCurrentPacket[sequin, ack]; ack, nop => NULL; ENDCASE => ERROR ServerIsSpeakingGreek; END; CheckForPing: ENTRY PROC [sequin: Handle] RETURNS [{no, retransmit, check, dead}] = INLINE BEGIN SELECT sequin.pings FROM 0 => IF BasicTime.Period[sequin.lastPacketTime, BasicTime.Now[]] <= idleLatency AND sequin.retransmitQueue.length = 0 THEN RETURN[no]; maxPings => RETURN[dead]; ENDCASE; IF sequin.state = init THEN RETURN[no]; -- don't ping until first real packet sequin.pings _ sequin.pings + 1; sequin.recentRestart _ sequin.restartRequested _ FALSE; RETURN[ IF sequin.retransmitQueue.length = 1 AND sequin.pings = 1 THEN retransmit ELSE check] END; ResetPinging: PROC = INLINE {sequin.pings _ 0; sequin.lastPacketTime _ BasicTime.Now[]}; DiscardAckedPackets: ENTRY PROC [sequin: Handle] = BEGIN sequin.allocate _ packetID.allocate; IF Compare[packetID.receiveSequence, sequin.retransmitSequence] = ahead THEN BEGIN skipped: CARDINAL _ 0; UNTIL packetID.receiveSequence = sequin.retransmitSequence OR skipped = sequin.retransmitQueue.length DO b: PupBuffer _ DequeuePup[sequin.retransmitQueue]; IF b = NIL THEN EXIT; -- buffer hasn't made it to queue yet IF sequin.retransmitSequence ~= LOOPHOLE[b.pupID, SequinID].sendSequence THEN {EnqueuePup[sequin.retransmitQueue, b]; skipped _ skipped + 1; LOOP}; ReturnFreePupBuffer[b]; sequin.retransmitSequence _ AddWrap[sequin.retransmitSequence, 1]; skipped _ 0; ENDLOOP; BROADCAST sequin.goAhead; END; END; EnqueueArrival: ENTRY PROC [sequin: Handle] = INLINE BEGIN EnqueuePup[sequin.getQueue, packet]; sequin.id.receiveSequence _ AddWrap[sequin.id.receiveSequence, 1]; sequin.recentRestart _ sequin.restartRequested _ FALSE; NOTIFY sequin.goAhead; END; RespondWithNewPacket: PROC [control: SequinControl] = BEGIN packet _ GetFreePupBuffer[]; packetID _ LOOPHOLE[@packet.pupID]; RespondWithCurrentPacket[sequin, control]; END; RespondWithCurrentPacket: ENTRY PROC [sequin: Handle, control: SequinControl] = BEGIN sequin.id.control _ control; packetID^ _ sequin.id; packet.pupType _ sequin.pupType; SetPupContentsBytes[packet, 0]; sequin.socket.put[packet]; packet _ NIL; END; MarkDead: ENTRY PROC [sequin: Handle] = BEGIN sequin.broken _ TRUE; sequin.state _ destroyed; BROADCAST sequin.goAhead; END; Retransmit: ENTRY PROC [sequin: Handle] = BEGIN skipped: CARDINAL _ 0; seq: SequenceNumber _ sequin.retransmitSequence; IF sequin.recentRestart THEN RETURN; UNTIL skipped = sequin.retransmitQueue.length DO buffer: PupBuffer = DequeuePup[sequin.retransmitQueue]; bufferID: LONG POINTER TO SequinID; IF buffer = NIL THEN ERROR WhereDidIPutThatBuffer; IF (bufferID _ LOOPHOLE[@buffer.pupID]).sendSequence ~= seq THEN {EnqueuePup[sequin.retransmitQueue, buffer]; skipped _ skipped + 1; LOOP}; bufferID.inPart _ sequin.id.inPart; SendRequeueable[sequin, buffer]; sequin.recentRestart _ TRUE; skipped _ 0; seq _ AddWrap[seq, 1]; ENDLOOP; END; ResetPinging[]; WHILE alive DO packet _ sequin.socket.get[]; IF ~(alive _ ProcessPacket[sequin]) THEN MarkDead[sequin]; IF packet ~= NIL THEN ReturnFreePupBuffer[packet]; ENDLOOP; END; MakeRequeueClosure: PUBLIC PROC [sequin: Handle] RETURNS [closure: RequeueClosure] = BEGIN OPEN PrincOpsUtils; Return: PROC [RequeueClosure] _ LOOPHOLE[GetReturnLink[]]; Requeue: PROC [buffer: BufferDefs.Buffer] = BEGIN DoRequeue: ENTRY PROC [sequin: Handle] = INLINE BEGIN sequin.buffersToRequeue _ sequin.buffersToRequeue - 1; IF sequin.state = destroyed THEN BEGIN BufferDefs.ReturnFreeBuffer[buffer]; IF sequin.buffersToRequeue = 0 THEN {NOTIFY sequin.goAhead; RETURN}; END ELSE BufferDefs.Enqueue[sequin.retransmitQueue, buffer]; END; DoRequeue[sequin]; END; sequin.requeuer _ Requeue; Return[MyLocalFrame[]]; END; SendRequeueable: INTERNAL PROC [sequin: Handle, buffer: PupBuffer] = BEGIN buffer.requeueProcedure _ sequin.requeuer; sequin.buffersToRequeue _ sequin.buffersToRequeue + 1; sequin.socket.put[buffer]; END; Compare: PROC [a, b: SequenceNumber] RETURNS [{equal, duplicate, ahead}] = BEGIN maxGetAhead: CARDINAL = 128; RETURN[ SELECT TRUE FROM a = b => equal, a > b => IF a <= b + maxGetAhead THEN ahead ELSE duplicate, ENDCASE => IF b <= a + maxGetAhead THEN duplicate ELSE ahead] END; AddWrap: PROC [seq: SequenceNumber, delta: CARDINAL _ 1] RETURNS [SequenceNumber] = INLINE { RETURN [(seq+delta) MOD (LAST[SequenceNumber]+1)]; }; PupToSequinBuffer: PROC [b: PupBuffer] RETURNS [Buffer] = INLINE {RETURN[[data: LOOPHOLE[@b.pupBody], nBytes: GetPupContentsBytes[b], maxBytes: maxBytes]]}; SequinToPupBuffer: PROC [b: Buffer] RETURNS [PupBuffer] = BEGIN IF b.data = NIL THEN ERROR BogusBuffer; RETURN[LOOPHOLE[b.data-SIZE[pupBytes BufferDefs.PupBufferObject]]] END; END. ŒSequinImplB.mesa Loosely derived (after extensive discussions with Wobber) from Butterfield's Sequin.mesa of August 27, 1979 2:49 PM. Levin: 6-Jul-81 16:15:00 Russ Atkinson, November 22, 1983 11:12 am Types -- Miscellaneous Declarations -- Procedures and Signals Exported to Sequin -- Procedures exported to SequinPrivate -- This procedure is forked as a separate process which tries to keep the socket as "clean" as possible. It absorbs incoming packets and initiates retransmissions as necessary. Good packets are moved to the getQueue. we've seen everything from the server never gets here; see SequinImplA.Destroy Internal Procedures -- Ê Ä˜šœ™Jšœu™uJšœ™J™)—J˜šÏk ˜ Jšœ œ6˜FJšœœ˜2šœœ˜J˜IJ˜*—Jšœ œ ˜Jšœœ˜ šœ˜Jšœf˜f—Jšœ œ˜J˜—šœ ˜Jšœœœ˜&Jšœ=˜DJšœ˜J˜Jšœœ ˜*J˜J˜Jšœ™J˜Jšœœœ ˜Jšœ œœ˜1J˜J˜Jšœ™J˜Jšœ œœ˜Jšœœœ˜$Jšœœœ˜%J˜Jšœ œ˜J˜J˜Jšœ,™,J˜Jšœœœœ˜J˜š Ïnœœœœœ ˜:Jš˜J˜š˜Jš œœœœœ˜/Jšœ+œœœ˜;Jšœ˜Jšœ˜—Jšœ˜!Jšœ˜J˜—š žœœœœœ ˜EJš˜J˜Jš œœœœœ˜/Jšœ*œœœ ˜FJšœ˜!Jšœ˜J˜—šžœœœ#˜3Jš˜J˜.J˜+J˜Jšœ˜J˜—šžœœœœ ˜.Jš˜J˜'J˜Jšœ˜!Jšœ˜J˜—šž œœœ˜-Jš˜Jšœœœ˜!J˜/Jšœ˜J˜J˜—Jšœ'™'J˜šžœœœœ˜J˜>š˜šœ˜Jšœ#œ ˜F—Jšœœ2œœ˜OJšœ˜Jšœ˜—Jš œœœœœ ˜NJšœœœ'˜UJ˜Jšœœœœ˜?šœ"˜(Jšœ9˜9—J˜ J˜ Jšœ˜J˜—šž œœœ˜,Jšœ×™×Jš˜J˜Jšœ œœœ ˜#Jšœœœ˜J˜š ž œœœ œ˜FJš˜Jšœœ˜ šœ œ˜Jš˜šœ˜ Jšœœ˜ J˜!J˜%Jšœœœ˜Jšœ˜—Jš˜Jšœ˜—Jšœ(œœ˜6šœ˜˜Jšœœ ˜šœ˜J˜0Jšœ/œœ˜=Jšœœ˜—Jšœ˜—Jšœœ˜Jšœœ˜—Jšœ œ˜#Jšœœœœ˜0šœ;˜EJšœ œ˜˜Jš˜J˜Jšœœ˜6šœ˜ Jš˜Jšœœ˜J˜*Jšœ˜—Jš˜Jšœ˜—Jšœ˜—Jšœ%™%J˜J˜šœ˜J˜