DIRECTORY BasicTime USING [Now, Period], BufferDefs USING [Buffer, Enqueue, PupBufferObject, QueueCleanup, QueueInitialize, QueueObject, ReturnFreeBuffer], PrincOpsUtils USING [Free, GetReturnLink, MyLocalFrame], Process USING [Detach], PupDefs USING [DataWordsPerPupBuffer, DequeuePup, EnqueuePup, GetFreePupBuffer, GetHopsToNetwork, GetPupContentsBytes, MsToTocks, PupAddress, PupBuffer, PupSocketDestroy, PupSocketMake, ReturnFreePupBuffer, SetPupContentsBytes, Tocks, UniqueLocalPupSocketID], PupTypes USING [maxDataWordsPerGatewayPup, PupType], Sequin USING [Buffer, noBuffer], SequinPrivate USING [maxPings, RequeueClosure, SequenceNumber, SequinControl, SequinID, SequinRep]; SequinImpl: MONITOR LOCKS sequin.LOCK USING sequin: Handle IMPORTS BasicTime, BufferDefs, PrincOpsUtils, Process, PupDefs EXPORTS Sequin, SequinPrivate = BEGIN OPEN PupDefs, Sequin, SequinPrivate; Handle: TYPE = REF SequinRep; SequinRep: PUBLIC TYPE = SequinPrivate.SequinRep; maxBytes: PUBLIC CARDINAL; maxAllocate: PUBLIC CARDINAL; nSequins: CARDINAL _ 0; SequinsInUse: ERROR = CODE; BogusBuffer: ERROR = CODE; ServerIsSpeakingGreek: ERROR = CODE; WhereDidIPutThatBuffer: ERROR = CODE; idleLatency: INT = 10; Create: PUBLIC PROC [dest: PupAddress, pupType: PupTypes.PupType] RETURNS [sequin: Handle] = { GetTicks: PROC RETURNS [Tocks] = { hops: CARDINAL _ GetHopsToNetwork[dest.net]; IF hops = LAST[CARDINAL] THEN hops _ 0; -- no route RETURN [[MsToTocks[hops * 750 + 500]]] }; sequin _ NEW[SequinRep _ [pupType: pupType]]; BufferDefs.QueueInitialize[sequin.retransmitQueue _ NEW[BufferDefs.QueueObject]]; BufferDefs.QueueInitialize[sequin.getQueue _ NEW[BufferDefs.QueueObject]]; sequin.socket _ PupSocketMake[UniqueLocalPupSocketID[], dest, GetTicks[]]; sequin.id.allocate _ maxAllocate; sequin.closure _ MakeRequeueClosure[sequin]; sequin.opened _ TRUE; Process.Detach[FORK SocketWarmer[sequin]]; nSequins _ nSequins + 1; }; WasOpened: ENTRY PROC [sequin: Handle] RETURNS [wasOpened: BOOL] = { IF sequin = NIL THEN RETURN [FALSE]; IF wasOpened _ sequin.opened THEN sequin.opened _ FALSE; }; WaitUntilAllQuiet: ENTRY PROC [sequin: Handle] = { UNTIL sequin.state = destroyed DO WAIT sequin.goAhead ENDLOOP; UNTIL sequin.buffersToRequeue = 0 DO WAIT sequin.goAhead ENDLOOP; }; Destroy: PUBLIC PROC [sequin: Handle] = { IF WasOpened[sequin] THEN { buffer: PupBuffer _ GetFreePupBuffer[]; SetPupContentsBytes[buffer, 0]; Send[sequin, buffer, destroy ! Broken => CONTINUE]; WaitUntilAllQuiet[sequin]; BufferDefs.QueueCleanup[sequin.retransmitQueue]; BufferDefs.QueueCleanup[sequin.getQueue]; PupSocketDestroy[sequin.socket]; PrincOpsUtils.Free[sequin.closure]; nSequins _ nSequins - 1; }; }; Broken: PUBLIC ERROR = CODE; Get: PUBLIC ENTRY PROC [sequin: Handle] RETURNS [Buffer] = { packet: PupBuffer; DO IF sequin.broken THEN RETURN WITH ERROR Broken; IF (packet _ DequeuePup[sequin.getQueue]) ~= NIL THEN EXIT; WAIT sequin.goAhead; ENDLOOP; RETURN[PupToSequinBuffer[packet]] }; GetIfAvailable: PUBLIC ENTRY PROC [sequin: Handle] RETURNS [Buffer] = { packet: PupBuffer; IF sequin.broken THEN RETURN WITH ERROR Broken; IF (packet _ DequeuePup[sequin.getQueue]) = NIL THEN RETURN[noBuffer]; RETURN[PupToSequinBuffer[packet]] }; Put: PUBLIC PROC [sequin: Handle, buffer: Buffer] = { packet: PupBuffer = SequinToPupBuffer[buffer]; SetPupContentsBytes[packet, buffer.nBytes]; Send[sequin, packet, data]; }; GetEmptyBuffer: PUBLIC PROC RETURNS [Buffer] = { packet: PupBuffer = GetFreePupBuffer[]; SetPupContentsBytes[packet, 0]; RETURN[PupToSequinBuffer[packet]] }; ReleaseBuffer: PUBLIC PROC [buffer: Buffer] = { IF buffer = noBuffer THEN RETURN; ReturnFreePupBuffer[SequinToPupBuffer[buffer]]; }; Send: PUBLIC ENTRY PROC [sequin: Handle, packet: PupBuffer, control: SequinControl] = { DO retrans: SequenceNumber = AddWrap[sequin.retransmitSequence, MIN[sequin.allocate, maxAllocate]]; IF sequin.broken OR Compare[retrans, sequin.id.sendSequence] = ahead THEN EXIT; WAIT sequin.goAhead; ENDLOOP; IF sequin.broken THEN {ReturnFreePupBuffer[packet]; RETURN WITH ERROR Broken}; IF control = data AND sequin.state = init THEN {control _ open; sequin.state _ open}; sequin.id.control _ control; LOOPHOLE[@packet.pupID, LONG POINTER TO SequinID]^ _ sequin.id; IF GetPupContentsBytes[packet] ~= 0 THEN sequin.id.sendSequence _ AddWrap[sequin.id.sendSequence]; packet.pupType _ sequin.pupType; SendRequeueable[sequin, packet]; }; SocketWarmer: PUBLIC PROC [sequin: Handle] = { packet: PupBuffer; packetID: LONG POINTER TO SequinID; alive: BOOLEAN _ TRUE; ProcessPacket: PROC [sequin: Handle] RETURNS [alive: BOOLEAN] = INLINE { alive _ TRUE; IF packet = NIL THEN { SELECT CheckForPing[sequin] FROM no => NULL; retransmit => Retransmit[sequin]; check => RespondWithNewPacket[check]; dead => RETURN[FALSE]; ENDCASE; RETURN }; IF GetPupContentsBytes[packet] > maxBytes THEN RETURN; SELECT packet.pupType FROM error => { OPEN PupTypes; SELECT packet.errorCode FROM noProcessPupErrorCode, cantGetTherePupErrorCode, hostDownPupErrorCode, eightHopsPupErrorCode => RETURN[FALSE]; ENDCASE => RETURN; }; sequin.pupType => NULL; ENDCASE => RETURN; packetID _ LOOPHOLE[@packet.pupID]; IF packetID.control = broken THEN RETURN[FALSE]; SELECT Compare[packetID.sendSequence, sequin.id.receiveSequence] FROM duplicate => RETURN; ahead => { DiscardAckedPackets[sequin]; IF packetID.control = restart THEN Retransmit[sequin]; IF ~sequin.restartRequested THEN { sequin.restartRequested _ TRUE; RespondWithCurrentPacket[sequin, restart]; }; RETURN }; ENDCASE; ResetPinging[]; DiscardAckedPackets[sequin]; SELECT packetID.control FROM data => {EnqueueArrival[sequin]; RespondWithNewPacket[ack]}; dallying => {RespondWithCurrentPacket[sequin, quit]; RETURN[FALSE]}; restart => Retransmit[sequin]; check => RespondWithCurrentPacket[sequin, ack]; ack, nop => NULL; ENDCASE => ERROR ServerIsSpeakingGreek; }; CheckForPing: ENTRY PROC [sequin: Handle] RETURNS [{no, retransmit, check, dead}] = INLINE { SELECT sequin.pings FROM 0 => IF BasicTime.Period[sequin.lastPacketTime, BasicTime.Now[]] <= idleLatency AND sequin.retransmitQueue.length = 0 THEN RETURN[no]; maxPings => RETURN[dead]; ENDCASE; IF sequin.state = init THEN RETURN[no]; -- don't ping until first real packet sequin.pings _ sequin.pings + 1; sequin.recentRestart _ sequin.restartRequested _ FALSE; RETURN[ IF sequin.retransmitQueue.length = 1 AND sequin.pings = 1 THEN retransmit ELSE check] }; ResetPinging: PROC = { sequin.pings _ 0; sequin.lastPacketTime _ BasicTime.Now[]; }; DiscardAckedPackets: ENTRY PROC [sequin: Handle] = { sequin.allocate _ packetID.allocate; IF Compare[packetID.receiveSequence, sequin.retransmitSequence] = ahead THEN { skipped: CARDINAL _ 0; UNTIL packetID.receiveSequence = sequin.retransmitSequence OR skipped = sequin.retransmitQueue.length DO b: PupBuffer _ DequeuePup[sequin.retransmitQueue]; IF b = NIL THEN EXIT; -- buffer hasn't made it to queue yet IF sequin.retransmitSequence ~= LOOPHOLE[b.pupID, SequinID].sendSequence THEN {EnqueuePup[sequin.retransmitQueue, b]; skipped _ skipped + 1; LOOP}; ReturnFreePupBuffer[b]; sequin.retransmitSequence _ AddWrap[sequin.retransmitSequence, 1]; skipped _ 0; ENDLOOP; BROADCAST sequin.goAhead; }; }; EnqueueArrival: ENTRY PROC [sequin: Handle] = INLINE { EnqueuePup[sequin.getQueue, packet]; sequin.id.receiveSequence _ AddWrap[sequin.id.receiveSequence, 1]; sequin.recentRestart _ sequin.restartRequested _ FALSE; NOTIFY sequin.goAhead; }; RespondWithNewPacket: PROC [control: SequinControl] = { packet _ GetFreePupBuffer[]; packetID _ LOOPHOLE[@packet.pupID]; RespondWithCurrentPacket[sequin, control]; }; RespondWithCurrentPacket: ENTRY PROC [sequin: Handle, control: SequinControl] = { sequin.id.control _ control; packetID^ _ sequin.id; packet.pupType _ sequin.pupType; SetPupContentsBytes[packet, 0]; sequin.socket.put[packet]; packet _ NIL; }; MarkDead: ENTRY PROC [sequin: Handle] = { sequin.broken _ TRUE; sequin.state _ destroyed; BROADCAST sequin.goAhead; }; Retransmit: ENTRY PROC [sequin: Handle] = { skipped: CARDINAL _ 0; seq: SequenceNumber _ sequin.retransmitSequence; IF sequin.recentRestart THEN RETURN; UNTIL skipped = sequin.retransmitQueue.length DO buffer: PupBuffer = DequeuePup[sequin.retransmitQueue]; bufferID: LONG POINTER TO SequinID; IF buffer = NIL THEN ERROR WhereDidIPutThatBuffer; IF (bufferID _ LOOPHOLE[@buffer.pupID]).sendSequence ~= seq THEN {EnqueuePup[sequin.retransmitQueue, buffer]; skipped _ skipped + 1; LOOP}; bufferID.inPart _ sequin.id.inPart; SendRequeueable[sequin, buffer]; sequin.recentRestart _ TRUE; skipped _ 0; seq _ AddWrap[seq, 1]; ENDLOOP; }; ResetPinging[]; WHILE alive DO packet _ sequin.socket.get[]; IF ~(alive _ ProcessPacket[sequin]) THEN MarkDead[sequin]; IF packet ~= NIL THEN ReturnFreePupBuffer[packet]; ENDLOOP; }; MakeRequeueClosure: PUBLIC PROC [sequin: Handle] RETURNS [closure: RequeueClosure] = { OPEN PrincOpsUtils; Return: PROC [RequeueClosure] _ LOOPHOLE[GetReturnLink[]]; Requeue: PROC [buffer: BufferDefs.Buffer] = { DoRequeue: ENTRY PROC [sequin: Handle] = INLINE { sequin.buffersToRequeue _ sequin.buffersToRequeue - 1; IF sequin.state = destroyed THEN { BufferDefs.ReturnFreeBuffer[buffer]; IF sequin.buffersToRequeue = 0 THEN {NOTIFY sequin.goAhead; RETURN}; } ELSE BufferDefs.Enqueue[sequin.retransmitQueue, buffer]; }; DoRequeue[sequin]; }; sequin.requeuer _ Requeue; Return[MyLocalFrame[]]; }; SendRequeueable: INTERNAL PROC [sequin: Handle, buffer: PupBuffer] = { buffer.requeueProcedure _ sequin.requeuer; sequin.buffersToRequeue _ sequin.buffersToRequeue + 1; sequin.socket.put[buffer]; }; Compare: PROC [a, b: SequenceNumber] RETURNS [{equal, duplicate, ahead}] = { maxGetAhead: CARDINAL = 128; RETURN[ SELECT TRUE FROM a = b => equal, a > b => IF a <= b + maxGetAhead THEN ahead ELSE duplicate, ENDCASE => IF b <= a + maxGetAhead THEN duplicate ELSE ahead] }; AddWrap: PROC [seq: SequenceNumber, delta: CARDINAL _ 1] RETURNS [SequenceNumber] = INLINE { RETURN [(seq+delta) MOD (LAST[SequenceNumber]+1)]; }; PupToSequinBuffer: PROC [b: PupBuffer] RETURNS [Buffer] = { RETURN[[data: LOOPHOLE[@b.pupBody], nBytes: GetPupContentsBytes[b], maxBytes: maxBytes]]; }; SequinToPupBuffer: PROC [b: Buffer] RETURNS [PupBuffer] = { IF b.data = NIL THEN ERROR BogusBuffer; RETURN[LOOPHOLE[b.data-SIZE[pupBytes BufferDefs.PupBufferObject]]] }; Initialize: PROC = { maxBytes _ 2*MIN[DataWordsPerPupBuffer[], PupTypes.maxDataWordsPerGatewayPup]; maxAllocate _ 511/(maxBytes-10) + 1 + 1; }; Initialize[]; END. 8SequinImpl.mesa Copyright c 1985 by Xerox Corporation. All rights reserved. Loosely derived (after extensive discussions with Wobber) from Butterfield's Sequin.mesa of August 27, 1979 2:49 PM. Levin: 3-Feb-82 9:29:58 Russ Atkinson, March 7, 1985 1:57:40 pm PST Types Variables exported to SequinPrivate Global Variables Miscellaneous Declarations Procedures exported to Sequin Procedures exported to SequinPrivate This procedure is forked as a separate process which tries to keep the socket as "clean" as possible. It absorbs incoming packets and initiates retransmissions as necessary. Good packets are moved to the getQueue. we've seen everything from the server never gets here; see SequinImplA.Destroy Internal Procedures Initialization 10 = leaf overhead/packet; +1 roundup; +1 for parallelism Main body Κ @˜codešœ™Kšœ Οmœ1™Kšžœ˜K˜Kšžœžœ ˜*K˜—šœ™K˜Kšœžœžœ ˜Kšœ žœžœ˜1K˜—šœ#™#K˜Kšœ žœžœ˜K˜Kšœ žœžœ˜K˜—šœ™K˜Kšœ žœ˜K˜—šœ™K˜Kšœžœžœ˜Kšœ žœžœ˜Kšœžœžœ˜$Kšœžœžœ˜%K˜Kšœ žœ˜K˜—šœ™K˜šΟnœžœžœ/žœ˜^K˜šŸœžœžœ ˜"Kšœžœ˜,Kš žœžœžœžœ Οc ˜4Kšžœ ˜&Kšœ˜K˜—Kšœ žœ!˜-Kšœ4žœ˜QKšœ-žœ˜JK˜JK˜!K˜,Kšœžœ˜Kšœžœ˜*K˜Kšœ˜K˜—š Ÿ œžœžœžœ žœ˜DKš žœ žœžœžœžœ˜$Kšžœžœžœ˜8Kšœ˜K˜—šŸœžœžœ˜2Kšžœžœžœžœ˜>Kšžœžœžœžœ˜AKšœ˜K˜—šŸœžœžœ˜)šžœžœ˜K˜'K˜Kšœ)žœ˜3K˜K˜0K˜)K˜ Kšœ#˜#K˜K˜—Kšœ˜K˜—Kšœžœžœžœ˜K˜š Ÿœžœžœžœžœ ˜˜NKšœ9™9K˜(Kšœ˜K˜——šœ ™ K˜K˜ K˜Kšžœ˜K˜K˜——…—)*9’