-- Copyright (C) 1981, 1984 by Xerox Corporation. All rights reserved. -- File: SequinImplB.mesa -- HGM: 10-Dec-84 21:24:46 -- last edited by Hankins: 6-Aug-84 13:38:30 -- Klamath update (pup changes, sequinClosure change) -- Last edited by Levin: 6-Jul-81 16:15:00 -- Loosely derived (after extensive discussions with Wobber) -- from Butterfield's Sequin.mesa of August 27, 1979 2:49 PM. DIRECTORY Buffer USING [Buffer, BufferObject, Enqueue, GetBuffer, ReturnBuffer], PupDefs USING [ DequeuePup, EnqueuePup, GetPupContentsBytes, PupBuffer, SetPupContentsBytes], PupTypes USING [BufferBody, PupType], Sequin USING [Buffer, noBuffer], SequinPrivate USING [ bufferPool, Handle, LongHandle, maxAllocate, maxBytes, maxPings, Seconds, SequenceNumber, SequinID, SequinControl], Time USING [Current]; SequinImplB: MONITOR LOCKS sequin.LOCK USING sequin: SequinPrivate.Handle IMPORTS Buffer, PupDefs, SequinPrivate, Time EXPORTS Sequin, SequinPrivate = BEGIN -- Types -- Handle: PUBLIC TYPE = SequinPrivate.Handle; -- Miscellaneous Declarations -- BogusBuffer: ERROR = CODE; ServerIsSpeakingGreek: ERROR = CODE; WhereDidIPutThatBuffer: ERROR = CODE; idleLatency: SequinPrivate.Seconds = 10; -- Procedures and Signals Exported to Sequin -- Broken: PUBLIC ERROR = CODE; Get: PUBLIC ENTRY PROCEDURE [sequin: Handle] RETURNS [Sequin.Buffer] = BEGIN packet: PupDefs.PupBuffer; DO IF sequin.broken THEN RETURN WITH ERROR Broken; IF (packet ← PupDefs.DequeuePup[@sequin.getQueue]) ~= NIL THEN EXIT; WAIT sequin.goAhead; ENDLOOP; RETURN[PupToSequinBuffer[packet]] END; GetIfAvailable: PUBLIC ENTRY PROC [sequin: Handle] RETURNS [Sequin.Buffer] = BEGIN packet: PupDefs.PupBuffer; IF sequin.broken THEN RETURN WITH ERROR Broken; IF (packet ← PupDefs.DequeuePup[@sequin.getQueue]) = NIL THEN RETURN[Sequin.noBuffer]; RETURN[PupToSequinBuffer[packet]] END; Put: PUBLIC PROCEDURE [sequin: Handle, buffer: Sequin.Buffer] = BEGIN packet: PupDefs.PupBuffer = SequinToPupBuffer[buffer]; PupDefs.SetPupContentsBytes[packet, buffer.nBytes]; Send[sequin, packet, data]; END; GetEmptyBuffer: PUBLIC PROCEDURE RETURNS [Sequin.Buffer] = BEGIN packet: PupDefs.PupBuffer = Buffer.GetBuffer[ type: pup, aH: SequinPrivate.bufferPool, function: send]; --is send correct? PupDefs.SetPupContentsBytes[packet, 0]; RETURN[PupToSequinBuffer[packet]] END; ReleaseBuffer: PUBLIC PROCEDURE [buffer: Sequin.Buffer] = BEGIN IF buffer = Sequin.noBuffer THEN RETURN; Buffer.ReturnBuffer[SequinToPupBuffer[buffer]]; END; -- Procedures exported to SequinPrivate -- Send: PUBLIC ENTRY PROCEDURE [ sequin: Handle, packet: PupDefs.PupBuffer, control: SequinPrivate.SequinControl] = BEGIN UNTIL Compare[ sequin.retransmitSequence + MIN[sequin.allocate, SequinPrivate.maxAllocate], sequin.id.sendSequence] = ahead OR sequin.broken DO WAIT sequin.goAhead; ENDLOOP; IF sequin.broken THEN {Buffer.ReturnBuffer[packet]; RETURN WITH ERROR Broken}; IF control = data AND sequin.state = init THEN { control ← open; sequin.state ← open}; sequin.id.control ← control; LOOPHOLE[@packet.pup.pupID, LONG POINTER TO SequinPrivate.SequinID]↑ ← sequin.id; IF PupDefs.GetPupContentsBytes[packet] ~= 0 THEN sequin.id.sendSequence ← sequin.id.sendSequence + 1; packet.pup.pupType ← sequin.pupType; SendRequeueable[sequin, packet]; END; SocketWarmer: PUBLIC PROCEDURE [sequin: Handle] = -- This procedure is forked as a separate process which tries to keep the -- socket as "clean" as possible. It absorbs incoming packets and initiates -- retransmissions as necessary. Good packets are moved to the getQueue. BEGIN packet: PupDefs.PupBuffer; packetID: LONG POINTER TO SequinPrivate.SequinID; alive: BOOLEAN ← TRUE; ProcessPacket: PROC [sequin: Handle] RETURNS [alive: BOOLEAN] = INLINE BEGIN alive ← TRUE; IF packet = NIL THEN BEGIN SELECT CheckForPing[sequin] FROM no => NULL; retransmit => Retransmit[sequin]; check => RespondWithNewPacket[check]; dead => RETURN[FALSE]; ENDCASE; RETURN END; IF PupDefs.GetPupContentsBytes[packet] > SequinPrivate.maxBytes THEN RETURN; SELECT packet.pup.pupType FROM error => BEGIN OPEN PupTypes; SELECT packet.pup.errorCode FROM noProcessPupErrorCode, cantGetTherePupErrorCode, hostDownPupErrorCode, eightHopsPupErrorCode => RETURN[FALSE]; ENDCASE => RETURN; END; sequin.pupType => NULL; ENDCASE => RETURN; packetID ← LOOPHOLE[@packet.pup.pupID]; IF packetID.control = broken THEN RETURN[FALSE]; SELECT Compare[packetID.sendSequence, sequin.id.receiveSequence] FROM duplicate => RETURN; ahead => BEGIN DiscardAckedPackets[sequin]; IF packetID.control = restart THEN Retransmit[sequin]; IF ~sequin.restartRequested THEN BEGIN sequin.restartRequested ← TRUE; RespondWithCurrentPacket[sequin, restart]; END; RETURN END; ENDCASE; -- we've seen everything from the server ResetPinging[]; DiscardAckedPackets[sequin]; SELECT packetID.control FROM data => {EnqueueArrival[sequin]; RespondWithNewPacket[ack]}; dallying => {RespondWithCurrentPacket[sequin, quit]; RETURN[FALSE]}; restart => Retransmit[sequin]; check => RespondWithCurrentPacket[sequin, ack]; ack, nop => NULL; ENDCASE => ERROR ServerIsSpeakingGreek; END; CheckForPing: ENTRY PROCEDURE [sequin: Handle] RETURNS [{no, retransmit, check, dead}] = INLINE BEGIN SELECT sequin.pings FROM 0 => IF Time.Current[] - sequin.lastPacketTime <= idleLatency AND sequin.retransmitQueue.length = 0 THEN RETURN[no]; SequinPrivate.maxPings => RETURN[dead]; ENDCASE; IF sequin.state = init THEN RETURN[no]; -- don't ping until first real packet sequin.pings ← sequin.pings + 1; sequin.recentRestart ← sequin.restartRequested ← FALSE; RETURN[ IF sequin.retransmitQueue.length = 1 AND sequin.pings = 1 THEN retransmit ELSE check] END; ResetPinging: PROCEDURE = INLINE { sequin.pings ← 0; sequin.lastPacketTime ← Time.Current[]}; DiscardAckedPackets: ENTRY PROCEDURE [sequin: Handle] = BEGIN sequin.allocate ← packetID.allocate; IF Compare[packetID.receiveSequence, sequin.retransmitSequence] = ahead THEN BEGIN skipped: CARDINAL ← 0; UNTIL packetID.receiveSequence = sequin.retransmitSequence OR skipped = sequin.retransmitQueue.length DO b: PupDefs.PupBuffer ← PupDefs.DequeuePup[@sequin.retransmitQueue]; IF b = NIL THEN EXIT; -- buffer hasn't made it to queue yet IF sequin.retransmitSequence ~= LOOPHOLE[b.pup.pupID, SequinPrivate.SequinID].sendSequence THEN { PupDefs.EnqueuePup[@sequin.retransmitQueue, b]; skipped ← skipped + 1; LOOP}; Buffer.ReturnBuffer[b]; sequin.retransmitSequence ← sequin.retransmitSequence + 1; skipped ← 0; ENDLOOP; BROADCAST sequin.goAhead; END; END; EnqueueArrival: ENTRY PROCEDURE [sequin: Handle] = INLINE BEGIN PupDefs.EnqueuePup[@sequin.getQueue, packet]; sequin.id.receiveSequence ← sequin.id.receiveSequence + 1; sequin.recentRestart ← sequin.restartRequested ← FALSE; NOTIFY sequin.goAhead; END; RespondWithNewPacket: PROCEDURE [control: SequinPrivate.SequinControl] = BEGIN packet ← Buffer.GetBuffer[ type: pup, aH: SequinPrivate.bufferPool, function: send]; --is send correct? packetID ← LOOPHOLE[@packet.pup.pupID]; RespondWithCurrentPacket[sequin, control]; END; RespondWithCurrentPacket: ENTRY PROCEDURE [ sequin: Handle, control: SequinPrivate.SequinControl] = BEGIN sequin.id.control ← control; packetID↑ ← sequin.id; packet.pup.pupType ← sequin.pupType; PupDefs.SetPupContentsBytes[packet, 0]; sequin.socket.put[packet]; packet ← NIL; END; MarkDead: ENTRY PROCEDURE [sequin: Handle] = BEGIN sequin.broken ← TRUE; sequin.state ← destroyed; BROADCAST sequin.goAhead; END; Retransmit: ENTRY PROCEDURE [sequin: Handle] = BEGIN skipped: CARDINAL ← 0; seq: SequinPrivate.SequenceNumber ← sequin.retransmitSequence; IF sequin.recentRestart THEN RETURN; UNTIL skipped = sequin.retransmitQueue.length DO buffer: PupDefs.PupBuffer = PupDefs.DequeuePup[@sequin.retransmitQueue]; bufferID: LONG POINTER TO SequinPrivate.SequinID; IF buffer = NIL THEN ERROR WhereDidIPutThatBuffer; IF (bufferID ← LOOPHOLE[@buffer.pup.pupID]).sendSequence ~= seq THEN { PupDefs.EnqueuePup[@sequin.retransmitQueue, buffer]; skipped ← skipped + 1; LOOP}; bufferID.inPart ← sequin.id.inPart; SendRequeueable[sequin, buffer]; sequin.recentRestart ← TRUE; skipped ← 0; seq ← IF seq = LAST[SequinPrivate.SequenceNumber] THEN 0 ELSE seq + 1; ENDLOOP; END; ResetPinging[]; WHILE alive DO packet ← sequin.socket.get[]; IF ~(alive ← ProcessPacket[sequin]) THEN MarkDead[sequin]; IF packet ~= NIL THEN Buffer.ReturnBuffer[packet]; ENDLOOP; END; Requeue: PROCEDURE [buffer: Buffer.Buffer] = BEGIN longUnspecified: SequinPrivate.LongHandle ← LOOPHOLE[buffer.requeueData]; DoRequeue: ENTRY PROCEDURE [sequin: Handle] = INLINE BEGIN sequin.buffersToRequeue ← sequin.buffersToRequeue - 1; IF sequin.state = destroyed THEN BEGIN Buffer.ReturnBuffer[buffer]; IF sequin.buffersToRequeue = 0 THEN {NOTIFY sequin.goAhead; RETURN}; END ELSE Buffer.Enqueue[@sequin.retransmitQueue, buffer]; END; DoRequeue[longUnspecified.shortHandle]; END; -- Internal Procedures -- SendRequeueable: INTERNAL PROC [sequin: Handle, buffer: PupDefs.PupBuffer] = BEGIN longUnspecified: SequinPrivate.LongHandle ← [sequin, 0]; buffer.requeueProcedure ← Requeue; buffer.requeueData ← LOOPHOLE[longUnspecified]; sequin.buffersToRequeue ← sequin.buffersToRequeue + 1; sequin.socket.put[buffer]; END; Compare: PROCEDURE [a, b: SequinPrivate.SequenceNumber] RETURNS [{equal, duplicate, ahead}] = BEGIN maxGetAhead: CARDINAL = 128; RETURN[ SELECT TRUE FROM a = b => equal, a > b => IF a <= b + maxGetAhead THEN ahead ELSE duplicate, ENDCASE => IF b <= a + maxGetAhead THEN duplicate ELSE ahead] END; PupToSequinBuffer: PROC [b: PupDefs.PupBuffer] RETURNS [Sequin.Buffer] = INLINE { RETURN[ [ data: LOOPHOLE[@b.pup.pupBody], nBytes: PupDefs.GetPupContentsBytes[b], maxBytes: SequinPrivate.maxBytes]]}; positionInBuffer: CARDINAL = 42; -- data offset into PupDefs.PupBuffer SequinToPupBuffer: PROC [b: Sequin.Buffer] RETURNS [PupDefs.PupBuffer] = BEGIN IF b.data = NIL THEN ERROR BogusBuffer; RETURN[LOOPHOLE[b.data - positionInBuffer]]; -- old: RETURN[LOOPHOLE[b.data-SIZE[pupBytes Buffer.PupBufferObject]]] -- **** it is possible for the defn of PupDefs.PupBuffer and -- PupTypes.BufferBody to change such that this var positionInBuffer is -- no longer correct even if check in mainline code does not fail. END; -- mainline code: IF SIZE[pupBytes PupTypes.BufferBody] + SIZE[Buffer.BufferObject] # (67B + 12B) THEN ERROR; -- defn of these two types has changed and the positionInBuffer var must be altered. END.