-- File: SequinImplB.mesa -- Last edited by Levin: 6-Jul-81 16:15:00 -- Loosely derived (after extensive discussions with Wobber) from Butterfield's -- Sequin.mesa of August 27, 1979 2:49 PM. DIRECTORY BufferDefs USING [Buffer, Enqueue, PupBufferObject, ReturnFreeBuffer], Frame USING [GetReturnLink, MyLocalFrame], PupDefs USING [ DequeuePup, EnqueuePup, GetFreePupBuffer, GetPupContentsBytes, PupBuffer, ReturnFreePupBuffer, SetPupContentsBytes], PupTypes USING [PupType], Sequin USING [Buffer, noBuffer], SequinPrivate USING [Handle, maxAllocate, maxBytes, maxPings, RequeueClosure, Seconds, SequenceNumber, SequinID, SequinControl], Time USING [Current]; SequinImplB: MONITOR LOCKS sequin.LOCK USING sequin: SequinPrivate.Handle IMPORTS BufferDefs, Frame, PupDefs, SequinPrivate, Time EXPORTS Sequin, SequinPrivate = BEGIN OPEN PupDefs, Sequin, SequinPrivate; -- Types -- Handle: PUBLIC TYPE = SequinPrivate.Handle; -- Miscellaneous Declarations -- BogusBuffer: ERROR = CODE; ServerIsSpeakingGreek: ERROR = CODE; WhereDidIPutThatBuffer: ERROR = CODE; idleLatency: Seconds = 10; -- Procedures and Signals Exported to Sequin -- Broken: PUBLIC ERROR = CODE; Get: PUBLIC ENTRY PROCEDURE [sequin: Handle] RETURNS [Buffer] = BEGIN packet: PupBuffer; DO IF sequin.broken THEN RETURN WITH ERROR Broken; IF (packet _ DequeuePup[@sequin.getQueue]) ~= NIL THEN EXIT; WAIT sequin.goAhead; ENDLOOP; RETURN[PupToSequinBuffer[packet]] END; GetIfAvailable: PUBLIC ENTRY PROCEDURE [sequin: Handle] RETURNS [Buffer] = BEGIN packet: PupBuffer; IF sequin.broken THEN RETURN WITH ERROR Broken; IF (packet _ DequeuePup[@sequin.getQueue]) = NIL THEN RETURN[noBuffer]; RETURN[PupToSequinBuffer[packet]] END; Put: PUBLIC PROCEDURE [sequin: Handle, buffer: Buffer] = BEGIN packet: PupBuffer = SequinToPupBuffer[buffer]; SetPupContentsBytes[packet, buffer.nBytes]; Send[sequin, packet, data]; END; GetEmptyBuffer: PUBLIC PROCEDURE RETURNS [Buffer] = BEGIN packet: PupBuffer = GetFreePupBuffer[]; SetPupContentsBytes[packet, 0]; RETURN[PupToSequinBuffer[packet]] END; ReleaseBuffer: PUBLIC PROCEDURE [buffer: Buffer] = BEGIN IF buffer = noBuffer THEN RETURN; ReturnFreePupBuffer[SequinToPupBuffer[buffer]]; END; -- Procedures exported to SequinPrivate -- Send: PUBLIC ENTRY PROCEDURE [ sequin: Handle, packet: PupBuffer, control: SequinControl] = BEGIN UNTIL Compare[sequin.retransmitSequence + MIN[sequin.allocate, maxAllocate], sequin.id.sendSequence] = ahead OR sequin.broken DO WAIT sequin.goAhead; ENDLOOP; IF sequin.broken THEN {ReturnFreePupBuffer[packet]; RETURN WITH ERROR Broken}; IF control = data AND sequin.state = init THEN {control _ open; sequin.state _ open}; sequin.id.control _ control; LOOPHOLE[@packet.pupID, LONG POINTER TO SequinID]^ _ sequin.id; IF GetPupContentsBytes[packet] ~= 0 THEN sequin.id.sendSequence _ sequin.id.sendSequence + 1; packet.pupType _ sequin.pupType; SendRequeueable[sequin, packet]; END; SocketWarmer: PUBLIC PROCEDURE [sequin: Handle] = -- This procedure is forked as a separate process which tries to keep the socket -- as "clean" as possible. It absorbs incoming packets and initiates -- retransmissions as necessary. Good packets are moved to the getQueue. BEGIN packet: PupBuffer; packetID: LONG POINTER TO SequinID; alive: BOOLEAN _ TRUE; ProcessPacket: PROCEDURE [sequin: Handle] RETURNS [alive: BOOLEAN] = INLINE BEGIN alive _ TRUE; IF packet = NIL THEN BEGIN SELECT CheckForPing[sequin] FROM no => NULL; retransmit => Retransmit[sequin]; check => RespondWithNewPacket[check]; dead => RETURN[FALSE]; ENDCASE; RETURN END; IF GetPupContentsBytes[packet] > maxBytes THEN RETURN; SELECT packet.pupType FROM error => BEGIN OPEN PupTypes; SELECT packet.errorCode FROM noProcessPupErrorCode, cantGetTherePupErrorCode, hostDownPupErrorCode, eightHopsPupErrorCode => RETURN[FALSE]; ENDCASE => RETURN; END; sequin.pupType => NULL; ENDCASE => RETURN; packetID _ LOOPHOLE[@packet.pupID]; IF packetID.control = broken THEN RETURN[FALSE]; SELECT Compare[packetID.sendSequence, sequin.id.receiveSequence] FROM duplicate => RETURN; ahead => BEGIN DiscardAckedPackets[sequin]; IF packetID.control = restart THEN Retransmit[sequin]; IF ~sequin.restartRequested THEN BEGIN sequin.restartRequested _ TRUE; RespondWithCurrentPacket[sequin, restart]; END; RETURN END; ENDCASE; -- we've seen everything from the server ResetPinging[]; DiscardAckedPackets[sequin]; SELECT packetID.control FROM data => {EnqueueArrival[sequin]; RespondWithNewPacket[ack]}; dallying => {RespondWithCurrentPacket[sequin, quit]; RETURN[FALSE]}; restart => Retransmit[sequin]; check => RespondWithCurrentPacket[sequin, ack]; ack, nop => NULL; ENDCASE => ERROR ServerIsSpeakingGreek; END; CheckForPing: ENTRY PROCEDURE [sequin: Handle] RETURNS [{no, retransmit, check, dead}] = INLINE BEGIN SELECT sequin.pings FROM 0 => IF Time.Current[] - sequin.lastPacketTime <= idleLatency AND sequin.retransmitQueue.length = 0 THEN RETURN[no]; maxPings => RETURN[dead]; ENDCASE; IF sequin.state = init THEN RETURN[no]; -- don't ping until first real packet sequin.pings _ sequin.pings + 1; sequin.recentRestart _ sequin.restartRequested _ FALSE; RETURN[ IF sequin.retransmitQueue.length = 1 AND sequin.pings = 1 THEN retransmit ELSE check] END; ResetPinging: PROCEDURE = INLINE {sequin.pings _ 0; sequin.lastPacketTime _ Time.Current[]}; DiscardAckedPackets: ENTRY PROCEDURE [sequin: Handle] = BEGIN sequin.allocate _ packetID.allocate; IF Compare[packetID.receiveSequence, sequin.retransmitSequence] = ahead THEN BEGIN skipped: CARDINAL _ 0; UNTIL packetID.receiveSequence = sequin.retransmitSequence OR skipped = sequin.retransmitQueue.length DO b: PupBuffer _ DequeuePup[@sequin.retransmitQueue]; IF b = NIL THEN EXIT; -- buffer hasn't made it to queue yet IF sequin.retransmitSequence ~= LOOPHOLE[b.pupID, SequinID].sendSequence THEN {EnqueuePup[@sequin.retransmitQueue, b]; skipped _ skipped + 1; LOOP}; ReturnFreePupBuffer[b]; sequin.retransmitSequence _ sequin.retransmitSequence + 1; skipped _ 0; ENDLOOP; BROADCAST sequin.goAhead; END; END; EnqueueArrival: ENTRY PROCEDURE [sequin: Handle] = INLINE BEGIN EnqueuePup[@sequin.getQueue, packet]; sequin.id.receiveSequence _ sequin.id.receiveSequence + 1; sequin.recentRestart _ sequin.restartRequested _ FALSE; NOTIFY sequin.goAhead; END; RespondWithNewPacket: PROCEDURE [control: SequinControl] = BEGIN packet _ GetFreePupBuffer[]; packetID _ LOOPHOLE[@packet.pupID]; RespondWithCurrentPacket[sequin, control]; END; RespondWithCurrentPacket: ENTRY PROCEDURE [sequin: Handle, control: SequinControl] = BEGIN sequin.id.control _ control; packetID^ _ sequin.id; packet.pupType _ sequin.pupType; SetPupContentsBytes[packet, 0]; sequin.socket.put[packet]; packet _ NIL; END; MarkDead: ENTRY PROCEDURE [sequin: Handle] = BEGIN sequin.broken _ TRUE; sequin.state _ destroyed; BROADCAST sequin.goAhead; END; Retransmit: ENTRY PROCEDURE [sequin: Handle] = BEGIN skipped: CARDINAL _ 0; seq: SequenceNumber _ sequin.retransmitSequence; IF sequin.recentRestart THEN RETURN; UNTIL skipped = sequin.retransmitQueue.length DO buffer: PupBuffer = DequeuePup[@sequin.retransmitQueue]; bufferID: LONG POINTER TO SequinID; IF buffer = NIL THEN ERROR WhereDidIPutThatBuffer; IF (bufferID _ LOOPHOLE[@buffer.pupID]).sendSequence ~= seq THEN {EnqueuePup[@sequin.retransmitQueue, buffer]; skipped _ skipped + 1; LOOP}; bufferID.inPart _ sequin.id.inPart; SendRequeueable[sequin, buffer]; sequin.recentRestart _ TRUE; skipped _ 0; seq _ IF seq = LAST[SequenceNumber] THEN 0 ELSE seq + 1; ENDLOOP; END; ResetPinging[]; WHILE alive DO packet _ sequin.socket.get[]; IF ~(alive _ ProcessPacket[sequin]) THEN MarkDead[sequin]; IF packet ~= NIL THEN ReturnFreePupBuffer[packet]; ENDLOOP; END; MakeRequeueClosure: PUBLIC PROCEDURE [sequin: Handle] RETURNS [closure: RequeueClosure] = BEGIN OPEN Frame; Return: PROCEDURE [RequeueClosure] _ LOOPHOLE[GetReturnLink[]]; Requeue: PROCEDURE [buffer: BufferDefs.Buffer] = BEGIN DoRequeue: ENTRY PROCEDURE [sequin: Handle] = INLINE BEGIN sequin.buffersToRequeue _ sequin.buffersToRequeue - 1; IF sequin.state = destroyed THEN BEGIN BufferDefs.ReturnFreeBuffer[buffer]; IF sequin.buffersToRequeue = 0 THEN {NOTIFY sequin.goAhead; RETURN}; END ELSE BufferDefs.Enqueue[@sequin.retransmitQueue, buffer]; END; DoRequeue[sequin]; END; sequin.requeuer _ Requeue; Return[MyLocalFrame[]]; -- never gets here; see SequinImplA.Destroy END; -- Internal Procedures -- SendRequeueable: INTERNAL PROCEDURE [sequin: Handle, buffer: PupBuffer] = BEGIN buffer.requeueProcedure _ sequin.requeuer; sequin.buffersToRequeue _ sequin.buffersToRequeue + 1; sequin.socket.put[buffer]; END; Compare: PROCEDURE [a, b: SequenceNumber] RETURNS [{equal, duplicate, ahead}] = BEGIN maxGetAhead: CARDINAL = 128; RETURN[ SELECT TRUE FROM a = b => equal, a > b => IF a <= b + maxGetAhead THEN ahead ELSE duplicate, ENDCASE => IF b <= a + maxGetAhead THEN duplicate ELSE ahead] END; PupToSequinBuffer: PROCEDURE [b: PupBuffer] RETURNS [Buffer] = INLINE {RETURN[[data: LOOPHOLE[@b.pupBody], nBytes: GetPupContentsBytes[b], maxBytes: maxBytes]]}; SequinToPupBuffer: PROCEDURE [b: Buffer] RETURNS [PupBuffer] = BEGIN IF b.data = NIL THEN ERROR BogusBuffer; RETURN[LOOPHOLE[b.data-SIZE[pupBytes BufferDefs.PupBufferObject]]] END; END.