-- Copyright (C) 1981, 1984, 1985 by Xerox Corporation. All rights reserved.
-- SequinImplB.mesa, HGM, 7-Jul-85 6:47:57
-- last edited by Hankins: 6-Aug-84 13:38:30
-- Klamath update (pup changes, sequinClosure change)
-- Last edited by Levin: 6-Jul-81 16:15:00
-- Loosely derived (after extensive discussions with Wobber)
-- from Butterfield's Sequin.mesa of August 27, 1979 2:49 PM.
DIRECTORY
Buffer USING [Buffer, BufferObject, Enqueue, GetBuffer, ReturnBuffer],
PupDefs USING [
DequeuePup, EnqueuePup, GetPupContentsBytes, PupBuffer, SetPupContentsBytes],
PupTypes USING [BufferBody, PupType],
Sequin USING [Buffer, noBuffer],
SequinPrivate USING [
bufferPool, Handle, LongHandle, maxAllocate, maxBytes, maxPings, Seconds,
SequenceNumber, SequinID, SequinControl],
Time USING [Current];
SequinImplB: MONITOR LOCKS sequin.LOCK USING sequin: SequinPrivate.Handle
IMPORTS Buffer, PupDefs, SequinPrivate, Time EXPORTS Sequin, SequinPrivate =
BEGIN
-- Types --
Handle: PUBLIC TYPE = SequinPrivate.Handle;
-- Miscellaneous Declarations --
BogusBuffer: ERROR = CODE;
ServerIsSpeakingGreek: ERROR = CODE;
WhereDidIPutThatBuffer: ERROR = CODE;
idleLatency: SequinPrivate.Seconds = 10;
-- Procedures and Signals Exported to Sequin --
Broken: PUBLIC ERROR = CODE;
Get: PUBLIC ENTRY PROCEDURE [sequin: Handle] RETURNS [Sequin.Buffer] =
BEGIN
packet: PupDefs.PupBuffer;
DO
IF sequin.broken THEN RETURN WITH ERROR Broken;
IF (packet ← PupDefs.DequeuePup[@sequin.getQueue]) ~= NIL THEN EXIT;
WAIT sequin.goAhead;
ENDLOOP;
RETURN[PupToSequinBuffer[packet]]
END;
GetIfAvailable: PUBLIC ENTRY PROC [sequin: Handle] RETURNS [Sequin.Buffer] =
BEGIN
packet: PupDefs.PupBuffer;
IF sequin.broken THEN RETURN WITH ERROR Broken;
IF (packet ← PupDefs.DequeuePup[@sequin.getQueue]) = NIL THEN
RETURN[Sequin.noBuffer];
RETURN[PupToSequinBuffer[packet]]
END;
Put: PUBLIC PROCEDURE [sequin: Handle, buffer: Sequin.Buffer] =
BEGIN
packet: PupDefs.PupBuffer = SequinToPupBuffer[buffer];
PupDefs.SetPupContentsBytes[packet, buffer.nBytes];
Send[sequin, packet, data];
END;
GetEmptyBuffer: PUBLIC PROCEDURE RETURNS [Sequin.Buffer] =
BEGIN
packet: PupDefs.PupBuffer = Buffer.GetBuffer[
type: pup, aH: SequinPrivate.bufferPool, function: send];
--is send correct?
PupDefs.SetPupContentsBytes[packet, 0];
RETURN[PupToSequinBuffer[packet]]
END;
ReleaseBuffer: PUBLIC PROCEDURE [buffer: Sequin.Buffer] =
BEGIN
IF buffer = Sequin.noBuffer THEN RETURN;
Buffer.ReturnBuffer[SequinToPupBuffer[buffer]];
END;
-- Procedures exported to SequinPrivate --
Send: PUBLIC ENTRY PROCEDURE [
sequin: Handle, packet: PupDefs.PupBuffer,
control: SequinPrivate.SequinControl] =
BEGIN
UNTIL Compare[
sequin.retransmitSequence + MIN[sequin.allocate, SequinPrivate.maxAllocate],
sequin.id.sendSequence] = ahead OR sequin.broken DO
WAIT sequin.goAhead; ENDLOOP;
IF sequin.broken THEN {Buffer.ReturnBuffer[packet]; RETURN WITH ERROR Broken};
IF control = data AND sequin.state = init THEN {
control ← open; sequin.state ← open};
sequin.id.control ← control;
LOOPHOLE[@packet.pup.pupID, LONG POINTER TO SequinPrivate.SequinID]↑ ←
sequin.id;
IF PupDefs.GetPupContentsBytes[packet] ~= 0 THEN
sequin.id.sendSequence ← sequin.id.sendSequence + 1;
packet.pup.pupType ← sequin.pupType;
SendRequeueable[sequin, packet];
END;
SocketWarmer: PUBLIC PROCEDURE [sequin: Handle] =
-- This procedure is forked as a separate process which tries to keep the
-- socket as "clean" as possible. It absorbs incoming packets and initiates
-- retransmissions as necessary. Good packets are moved to the getQueue.
BEGIN
packet: PupDefs.PupBuffer;
packetID: LONG POINTER TO SequinPrivate.SequinID;
alive: BOOLEAN ← TRUE;
ProcessPacket: PROC [sequin: Handle] RETURNS [alive: BOOLEAN] = INLINE
BEGIN
alive ← TRUE;
IF packet = NIL THEN
BEGIN
SELECT CheckForPing[sequin] FROM
no => NULL;
retransmit => Retransmit[sequin];
check => RespondWithNewPacket[check];
dead => RETURN[FALSE];
ENDCASE;
RETURN
END;
IF PupDefs.GetPupContentsBytes[packet] > SequinPrivate.maxBytes THEN RETURN;
SELECT packet.pup.pupType FROM
error =>
BEGIN OPEN PupTypes;
SELECT packet.pup.errorCode FROM
noProcessPupErrorCode, cantGetTherePupErrorCode, hostDownPupErrorCode,
eightHopsPupErrorCode => RETURN[FALSE];
ENDCASE => RETURN;
END;
sequin.pupType => NULL;
ENDCASE => RETURN;
packetID ← LOOPHOLE[@packet.pup.pupID];
IF packetID.control = broken THEN RETURN[FALSE];
SELECT Compare[packetID.sendSequence, sequin.id.receiveSequence] FROM
duplicate => RETURN;
ahead =>
BEGIN
DiscardAckedPackets[sequin];
IF packetID.control = restart THEN Retransmit[sequin];
IF ~sequin.restartRequested THEN
BEGIN
sequin.restartRequested ← TRUE;
RespondWithCurrentPacket[sequin, restart];
END;
RETURN
END;
ENDCASE;
-- we've seen everything from the server
ResetPinging[];
DiscardAckedPackets[sequin];
SELECT packetID.control FROM
data => {EnqueueArrival[sequin]; RespondWithNewPacket[ack]};
dallying => {RespondWithCurrentPacket[sequin, quit]; RETURN[FALSE]};
restart => Retransmit[sequin];
check => RespondWithCurrentPacket[sequin, ack];
ack, nop => NULL;
ENDCASE => ERROR ServerIsSpeakingGreek;
END;
CheckForPing: ENTRY PROCEDURE [sequin: Handle]
RETURNS [{no, retransmit, check, dead}] = INLINE
BEGIN
SELECT sequin.pings FROM
0 =>
IF Time.Current[] - sequin.lastPacketTime <= idleLatency
AND sequin.retransmitQueue.length = 0 THEN RETURN[no];
SequinPrivate.maxPings => RETURN[dead];
ENDCASE;
IF sequin.state = init THEN RETURN[no]; -- don't ping until first real packet
sequin.pings ← sequin.pings + 1;
sequin.recentRestart ← sequin.restartRequested ← FALSE;
RETURN[
IF sequin.retransmitQueue.length = 1 AND sequin.pings = 1 THEN retransmit
ELSE check]
END;
ResetPinging: PROCEDURE = INLINE {
sequin.pings ← 0; sequin.lastPacketTime ← Time.Current[]};
DiscardAckedPackets: ENTRY PROCEDURE [sequin: Handle] =
BEGIN
sequin.allocate ← packetID.allocate;
IF Compare[packetID.receiveSequence, sequin.retransmitSequence] = ahead THEN
BEGIN
skipped: CARDINAL ← 0;
UNTIL packetID.receiveSequence = sequin.retransmitSequence
OR skipped = sequin.retransmitQueue.length DO
b: PupDefs.PupBuffer ← PupDefs.DequeuePup[@sequin.retransmitQueue];
IF b = NIL THEN EXIT; -- buffer hasn't made it to queue yet
IF sequin.retransmitSequence ~= LOOPHOLE[b.pup.pupID,
SequinPrivate.SequinID].sendSequence THEN {
PupDefs.EnqueuePup[@sequin.retransmitQueue, b];
skipped ← skipped + 1;
LOOP};
Buffer.ReturnBuffer[b];
sequin.retransmitSequence ← sequin.retransmitSequence + 1;
skipped ← 0;
ENDLOOP;
BROADCAST sequin.goAhead;
END;
END;
EnqueueArrival: ENTRY PROCEDURE [sequin: Handle] = INLINE
BEGIN
PupDefs.EnqueuePup[@sequin.getQueue, packet];
sequin.id.receiveSequence ← sequin.id.receiveSequence + 1;
sequin.recentRestart ← sequin.restartRequested ← FALSE;
NOTIFY sequin.goAhead;
END;
RespondWithNewPacket: PROCEDURE [control: SequinPrivate.SequinControl] =
BEGIN
packet ← Buffer.GetBuffer[
type: pup, aH: SequinPrivate.bufferPool, function: send];
--is send correct?
packetID ← LOOPHOLE[@packet.pup.pupID];
RespondWithCurrentPacket[sequin, control];
END;
RespondWithCurrentPacket: ENTRY PROCEDURE [
sequin: Handle, control: SequinPrivate.SequinControl] =
BEGIN
sequin.id.control ← control;
packetID↑ ← sequin.id;
packet.pup.pupType ← sequin.pupType;
PupDefs.SetPupContentsBytes[packet, 0];
sequin.socket.put[packet];
packet ← NIL;
END;
MarkDead: ENTRY PROCEDURE [sequin: Handle] =
BEGIN
sequin.broken ← TRUE;
sequin.state ← destroyed;
BROADCAST sequin.goAhead;
END;
Retransmit: ENTRY PROCEDURE [sequin: Handle] =
BEGIN
skipped: CARDINAL ← 0;
seq: SequinPrivate.SequenceNumber ← sequin.retransmitSequence;
IF sequin.recentRestart THEN RETURN;
UNTIL skipped = sequin.retransmitQueue.length DO
buffer: PupDefs.PupBuffer = PupDefs.DequeuePup[@sequin.retransmitQueue];
bufferID: LONG POINTER TO SequinPrivate.SequinID;
IF buffer = NIL THEN ERROR WhereDidIPutThatBuffer;
IF (bufferID ← LOOPHOLE[@buffer.pup.pupID]).sendSequence ~= seq THEN {
PupDefs.EnqueuePup[@sequin.retransmitQueue, buffer];
skipped ← skipped + 1;
LOOP};
bufferID.inPart ← sequin.id.inPart;
SendRequeueable[sequin, buffer];
sequin.recentRestart ← TRUE;
skipped ← 0;
seq ← IF seq = LAST[SequinPrivate.SequenceNumber] THEN 0 ELSE seq + 1;
ENDLOOP;
END;
ResetPinging[];
WHILE alive DO
packet ← sequin.socket.get[];
IF ~(alive ← ProcessPacket[sequin]) THEN MarkDead[sequin];
IF packet ~= NIL THEN Buffer.ReturnBuffer[packet];
ENDLOOP;
END;
Requeue: PROCEDURE [buffer: Buffer.Buffer] =
BEGIN
longUnspecified: SequinPrivate.LongHandle ← LOOPHOLE[buffer.requeueData];
DoRequeue: ENTRY PROCEDURE [sequin: Handle] = INLINE
BEGIN
sequin.buffersToRequeue ← sequin.buffersToRequeue - 1;
IF sequin.state = destroyed THEN
BEGIN
Buffer.ReturnBuffer[buffer];
IF sequin.buffersToRequeue = 0 THEN {NOTIFY sequin.goAhead; RETURN};
END
ELSE Buffer.Enqueue[@sequin.retransmitQueue, buffer];
END;
DoRequeue[longUnspecified.shortHandle];
END;
-- Internal Procedures --
SendRequeueable: INTERNAL PROC [sequin: Handle, buffer: PupDefs.PupBuffer] =
BEGIN
longUnspecified: SequinPrivate.LongHandle ← [sequin, 0];
buffer.requeueProcedure ← Requeue;
buffer.requeueData ← LOOPHOLE[longUnspecified];
sequin.buffersToRequeue ← sequin.buffersToRequeue + 1;
sequin.socket.put[buffer];
END;
Compare: PROCEDURE [a, b: SequinPrivate.SequenceNumber]
RETURNS [{equal, duplicate, ahead}] =
BEGIN
maxGetAhead: CARDINAL = 128;
RETURN[
SELECT TRUE FROM
a = b => equal,
a > b => IF a <= b + maxGetAhead THEN ahead ELSE duplicate,
ENDCASE => IF b <= a + maxGetAhead THEN duplicate ELSE ahead]
END;
PupToSequinBuffer: PROC [b: PupDefs.PupBuffer] RETURNS [Sequin.Buffer] = INLINE
{
RETURN[
[
data: LOOPHOLE[@b.pup.pupBody], nBytes: PupDefs.GetPupContentsBytes[b],
maxBytes: SequinPrivate.maxBytes]]};
foo: POINTER TO Buffer.BufferObject = NIL;
positionInBuffer: CARDINAL = LOOPHOLE[@foo.pup.pupBody, CARDINAL] - LOOPHOLE[foo, CARDINAL];
SequinToPupBuffer: PROC [b: Sequin.Buffer] RETURNS [PupDefs.PupBuffer] =
BEGIN
IF b.data = NIL THEN ERROR BogusBuffer;
RETURN[LOOPHOLE[b.data - positionInBuffer]];
END;
END.