SequinImplB.mesa
Loosely derived (after extensive discussions with Wobber) from Butterfield's Sequin.mesa of August 27, 1979 2:49 PM.
Levin: 6-Jul-81 16:15:00
Russ Atkinson, November 22, 1983 11:12 am
DIRECTORY
BufferDefs USING [Buffer, Enqueue, PupBufferObject, ReturnFreeBuffer],
PrincOpsUtils USING [GetReturnLink, MyLocalFrame],
PupDefs USING [
DequeuePup, EnqueuePup, GetFreePupBuffer, GetPupContentsBytes, PupBuffer,
ReturnFreePupBuffer, SetPupContentsBytes],
PupTypes USING [PupType],
Sequin USING [Buffer, noBuffer],
SequinPrivate USING
[maxAllocate, maxBytes, maxPings, RequeueClosure, SequenceNumber, SequinID, SequinControl, SequinRep],
BasicTime USING [Now, Period];
SequinImplB: MONITOR
LOCKS sequin.LOCK USING sequin: Handle
IMPORTS BufferDefs, PrincOpsUtils, PupDefs, SequinPrivate, BasicTime
EXPORTS Sequin, SequinPrivate =
BEGIN OPEN PupDefs, Sequin, SequinPrivate;
Types --
Handle: TYPE = REF SequinRep;
SequinRep: PUBLIC TYPE = SequinPrivate.SequinRep;
Miscellaneous Declarations --
BogusBuffer: ERROR = CODE;
ServerIsSpeakingGreek: ERROR = CODE;
WhereDidIPutThatBuffer: ERROR = CODE;
idleLatency: INT = 10;
Procedures and Signals Exported to Sequin --
Broken: PUBLIC ERROR = CODE;
Get: PUBLIC ENTRY PROC [sequin: Handle] RETURNS [Buffer] =
BEGIN
packet: PupBuffer;
DO
IF sequin.broken THEN RETURN WITH ERROR Broken;
IF (packet ← DequeuePup[sequin.getQueue]) ~= NIL THEN EXIT;
WAIT sequin.goAhead;
ENDLOOP;
RETURN[PupToSequinBuffer[packet]]
END;
GetIfAvailable: PUBLIC ENTRY PROC [sequin: Handle] RETURNS [Buffer] =
BEGIN
packet: PupBuffer;
IF sequin.broken THEN RETURN WITH ERROR Broken;
IF (packet ← DequeuePup[sequin.getQueue]) = NIL THEN RETURN[noBuffer];
RETURN[PupToSequinBuffer[packet]]
END;
Put: PUBLIC PROC [sequin: Handle, buffer: Buffer] =
BEGIN
packet: PupBuffer = SequinToPupBuffer[buffer];
SetPupContentsBytes[packet, buffer.nBytes];
Send[sequin, packet, data];
END;
GetEmptyBuffer: PUBLIC PROC RETURNS [Buffer] =
BEGIN
packet: PupBuffer = GetFreePupBuffer[];
SetPupContentsBytes[packet, 0];
RETURN[PupToSequinBuffer[packet]]
END;
ReleaseBuffer: PUBLIC PROC [buffer: Buffer] =
BEGIN
IF buffer = noBuffer THEN RETURN;
ReturnFreePupBuffer[SequinToPupBuffer[buffer]];
END;
Procedures exported to SequinPrivate --
Send: PUBLIC ENTRY PROC [
sequin: Handle, packet: PupBuffer, control: SequinControl] = {
DO
retrans: SequenceNumber =
AddWrap[sequin.retransmitSequence, MIN[sequin.allocate, maxAllocate]];
IF sequin.broken OR Compare[retrans, sequin.id.sendSequence] = ahead THEN EXIT;
WAIT sequin.goAhead;
ENDLOOP;
IF sequin.broken THEN {ReturnFreePupBuffer[packet]; RETURN WITH ERROR Broken};
IF control = data AND sequin.state = init THEN {control ← open; sequin.state ← open};
sequin.id.control ← control;
LOOPHOLE[@packet.pupID, LONG POINTER TO SequinID]^ ← sequin.id;
IF GetPupContentsBytes[packet] ~= 0 THEN
sequin.id.sendSequence ← AddWrap[sequin.id.sendSequence];
packet.pupType ← sequin.pupType;
SendRequeueable[sequin, packet];
};
SocketWarmer: PUBLIC PROC [sequin: Handle] =
This procedure is forked as a separate process which tries to keep the socket as "clean" as possible. It absorbs incoming packets and initiates retransmissions as necessary. Good packets are moved to the getQueue.
BEGIN
packet: PupBuffer;
packetID: LONG POINTER TO SequinID;
alive: BOOLEANTRUE;
ProcessPacket: PROC [sequin: Handle] RETURNS [alive: BOOLEAN] = INLINE
BEGIN
alive ← TRUE;
IF packet = NIL THEN
BEGIN
SELECT CheckForPing[sequin] FROM
no => NULL;
retransmit => Retransmit[sequin];
check => RespondWithNewPacket[check];
dead => RETURN[FALSE];
ENDCASE;
RETURN
END;
IF GetPupContentsBytes[packet] > maxBytes THEN RETURN;
SELECT packet.pupType FROM
error =>
BEGIN OPEN PupTypes;
SELECT packet.errorCode FROM
noProcessPupErrorCode, cantGetTherePupErrorCode,
hostDownPupErrorCode, eightHopsPupErrorCode => RETURN[FALSE];
ENDCASE => RETURN;
END;
sequin.pupType => NULL;
ENDCASE => RETURN;
packetID ← LOOPHOLE[@packet.pupID];
IF packetID.control = broken THEN RETURN[FALSE];
SELECT Compare[packetID.sendSequence, sequin.id.receiveSequence] FROM
duplicate => RETURN;
ahead =>
BEGIN
DiscardAckedPackets[sequin];
IF packetID.control = restart THEN Retransmit[sequin];
IF ~sequin.restartRequested THEN
BEGIN
sequin.restartRequested ← TRUE;
RespondWithCurrentPacket[sequin, restart];
END;
RETURN
END;
ENDCASE;
we've seen everything from the server
ResetPinging[];
DiscardAckedPackets[sequin];
SELECT packetID.control FROM
data => {EnqueueArrival[sequin]; RespondWithNewPacket[ack]};
dallying => {RespondWithCurrentPacket[sequin, quit]; RETURN[FALSE]};
restart => Retransmit[sequin];
check => RespondWithCurrentPacket[sequin, ack];
ack, nop => NULL;
ENDCASE => ERROR ServerIsSpeakingGreek;
END;
CheckForPing: ENTRY PROC [sequin: Handle]
RETURNS [{no, retransmit, check, dead}] = INLINE
BEGIN
SELECT sequin.pings FROM
0 =>
IF BasicTime.Period[sequin.lastPacketTime, BasicTime.Now[]]
<= idleLatency AND sequin.retransmitQueue.length = 0 THEN
RETURN[no];
maxPings => RETURN[dead];
ENDCASE;
IF sequin.state = init THEN RETURN[no]; -- don't ping until first real packet
sequin.pings ← sequin.pings + 1;
sequin.recentRestart ← sequin.restartRequested ← FALSE;
RETURN[
IF sequin.retransmitQueue.length = 1 AND sequin.pings = 1 THEN retransmit
ELSE check]
END;
ResetPinging: PROC = INLINE
{sequin.pings ← 0; sequin.lastPacketTime ← BasicTime.Now[]};
DiscardAckedPackets: ENTRY PROC [sequin: Handle] =
BEGIN
sequin.allocate ← packetID.allocate;
IF Compare[packetID.receiveSequence, sequin.retransmitSequence] = ahead THEN
BEGIN
skipped: CARDINAL ← 0;
UNTIL packetID.receiveSequence = sequin.retransmitSequence OR
skipped = sequin.retransmitQueue.length DO
b: PupBuffer ← DequeuePup[sequin.retransmitQueue];
IF b = NIL THEN EXIT; -- buffer hasn't made it to queue yet
IF sequin.retransmitSequence ~= LOOPHOLE[b.pupID, SequinID].sendSequence THEN
{EnqueuePup[sequin.retransmitQueue, b]; skipped ← skipped + 1; LOOP};
ReturnFreePupBuffer[b];
sequin.retransmitSequence ← AddWrap[sequin.retransmitSequence, 1];
skipped ← 0;
ENDLOOP;
BROADCAST sequin.goAhead;
END;
END;
EnqueueArrival: ENTRY PROC [sequin: Handle] = INLINE
BEGIN
EnqueuePup[sequin.getQueue, packet];
sequin.id.receiveSequence ← AddWrap[sequin.id.receiveSequence, 1];
sequin.recentRestart ← sequin.restartRequested ← FALSE;
NOTIFY sequin.goAhead;
END;
RespondWithNewPacket: PROC [control: SequinControl] =
BEGIN
packet ← GetFreePupBuffer[];
packetID ← LOOPHOLE[@packet.pupID];
RespondWithCurrentPacket[sequin, control];
END;
RespondWithCurrentPacket: ENTRY PROC [sequin: Handle, control: SequinControl] =
BEGIN
sequin.id.control ← control;
packetID^ ← sequin.id;
packet.pupType ← sequin.pupType;
SetPupContentsBytes[packet, 0];
sequin.socket.put[packet];
packet ← NIL;
END;
MarkDead: ENTRY PROC [sequin: Handle] =
BEGIN
sequin.broken ← TRUE;
sequin.state ← destroyed;
BROADCAST sequin.goAhead;
END;
Retransmit: ENTRY PROC [sequin: Handle] =
BEGIN
skipped: CARDINAL ← 0;
seq: SequenceNumber ← sequin.retransmitSequence;
IF sequin.recentRestart THEN RETURN;
UNTIL skipped = sequin.retransmitQueue.length DO
buffer: PupBuffer = DequeuePup[sequin.retransmitQueue];
bufferID: LONG POINTER TO SequinID;
IF buffer = NIL THEN ERROR WhereDidIPutThatBuffer;
IF (bufferID ← LOOPHOLE[@buffer.pupID]).sendSequence ~= seq THEN
{EnqueuePup[sequin.retransmitQueue, buffer]; skipped ← skipped + 1; LOOP};
bufferID.inPart ← sequin.id.inPart;
SendRequeueable[sequin, buffer];
sequin.recentRestart ← TRUE;
skipped ← 0;
seq ← AddWrap[seq, 1];
ENDLOOP;
END;
ResetPinging[];
WHILE alive DO
packet ← sequin.socket.get[];
IF ~(alive ← ProcessPacket[sequin]) THEN MarkDead[sequin];
IF packet ~= NIL THEN ReturnFreePupBuffer[packet];
ENDLOOP;
END;
MakeRequeueClosure: PUBLIC PROC [sequin: Handle]
RETURNS [closure: RequeueClosure] =
BEGIN OPEN PrincOpsUtils;
Return: PROC [RequeueClosure] ← LOOPHOLE[GetReturnLink[]];
Requeue: PROC [buffer: BufferDefs.Buffer] =
BEGIN
DoRequeue: ENTRY PROC [sequin: Handle] = INLINE
BEGIN
sequin.buffersToRequeue ← sequin.buffersToRequeue - 1;
IF sequin.state = destroyed THEN
BEGIN
BufferDefs.ReturnFreeBuffer[buffer];
IF sequin.buffersToRequeue = 0 THEN {NOTIFY sequin.goAhead; RETURN};
END
ELSE BufferDefs.Enqueue[sequin.retransmitQueue, buffer];
END;
DoRequeue[sequin];
END;
sequin.requeuer ← Requeue;
Return[MyLocalFrame[]];
never gets here; see SequinImplA.Destroy
END;
Internal Procedures --
SendRequeueable: INTERNAL PROC [sequin: Handle, buffer: PupBuffer] =
BEGIN
buffer.requeueProcedure ← sequin.requeuer;
sequin.buffersToRequeue ← sequin.buffersToRequeue + 1;
sequin.socket.put[buffer];
END;
Compare: PROC [a, b: SequenceNumber] RETURNS [{equal, duplicate, ahead}] =
BEGIN
maxGetAhead: CARDINAL = 128;
RETURN[
SELECT TRUE FROM
a = b => equal,
a > b => IF a <= b + maxGetAhead THEN ahead ELSE duplicate,
ENDCASE => IF b <= a + maxGetAhead THEN duplicate ELSE ahead]
END;
AddWrap: PROC
[seq: SequenceNumber, delta: CARDINAL ← 1] RETURNS [SequenceNumber] = INLINE {
RETURN [(seq+delta) MOD (LAST[SequenceNumber]+1)];
};
PupToSequinBuffer: PROC [b: PupBuffer] RETURNS [Buffer] = INLINE
{RETURN[[data: LOOPHOLE[@b.pupBody], nBytes: GetPupContentsBytes[b],
maxBytes: maxBytes]]};
SequinToPupBuffer: PROC [b: Buffer] RETURNS [PupBuffer] =
BEGIN
IF b.data = NIL THEN ERROR BogusBuffer;
RETURN[LOOPHOLE[b.data-SIZE[pupBytes BufferDefs.PupBufferObject]]]
END;
END.