DIRECTORY
BufferDefs USING [Buffer, Enqueue, PupBufferObject, ReturnFreeBuffer],
PrincOpsUtils USING [GetReturnLink, MyLocalFrame],
PupDefs
USING [
DequeuePup, EnqueuePup, GetFreePupBuffer, GetPupContentsBytes, PupBuffer,
ReturnFreePupBuffer, SetPupContentsBytes],
PupTypes USING [PupType],
Sequin USING [Buffer, noBuffer],
SequinPrivate
USING
[maxAllocate, maxBytes, maxPings, RequeueClosure, SequenceNumber, SequinID, SequinControl, SequinRep],
BasicTime USING [Now, Period];
SequinImplB:
MONITOR
LOCKS sequin.LOCK USING sequin: Handle
IMPORTS BufferDefs, PrincOpsUtils, PupDefs, SequinPrivate, BasicTime
EXPORTS Sequin, SequinPrivate =
BEGIN OPEN PupDefs, Sequin, SequinPrivate;
Types --
Handle: TYPE = REF SequinRep;
SequinRep: PUBLIC TYPE = SequinPrivate.SequinRep;
Miscellaneous Declarations --
BogusBuffer: ERROR = CODE;
ServerIsSpeakingGreek: ERROR = CODE;
WhereDidIPutThatBuffer: ERROR = CODE;
idleLatency: INT = 10;
Procedures and Signals Exported to Sequin --
Broken: PUBLIC ERROR = CODE;
Get:
PUBLIC
ENTRY
PROC [sequin: Handle]
RETURNS [Buffer] =
BEGIN
packet: PupBuffer;
DO
IF sequin.broken THEN RETURN WITH ERROR Broken;
IF (packet ← DequeuePup[sequin.getQueue]) ~= NIL THEN EXIT;
WAIT sequin.goAhead;
ENDLOOP;
RETURN[PupToSequinBuffer[packet]]
END;
GetIfAvailable:
PUBLIC
ENTRY
PROC [sequin: Handle]
RETURNS [Buffer] =
BEGIN
packet: PupBuffer;
IF sequin.broken THEN RETURN WITH ERROR Broken;
IF (packet ← DequeuePup[sequin.getQueue]) = NIL THEN RETURN[noBuffer];
RETURN[PupToSequinBuffer[packet]]
END;
Put:
PUBLIC
PROC [sequin: Handle, buffer: Buffer] =
BEGIN
packet: PupBuffer = SequinToPupBuffer[buffer];
SetPupContentsBytes[packet, buffer.nBytes];
Send[sequin, packet, data];
END;
GetEmptyBuffer:
PUBLIC
PROC
RETURNS [Buffer] =
BEGIN
packet: PupBuffer = GetFreePupBuffer[];
SetPupContentsBytes[packet, 0];
RETURN[PupToSequinBuffer[packet]]
END;
ReleaseBuffer:
PUBLIC
PROC [buffer: Buffer] =
BEGIN
IF buffer = noBuffer THEN RETURN;
ReturnFreePupBuffer[SequinToPupBuffer[buffer]];
END;
Procedures exported to SequinPrivate --
Send:
PUBLIC
ENTRY
PROC [
sequin: Handle, packet: PupBuffer, control: SequinControl] = {
DO
retrans: SequenceNumber =
AddWrap[sequin.retransmitSequence, MIN[sequin.allocate, maxAllocate]];
IF sequin.broken OR Compare[retrans, sequin.id.sendSequence] = ahead THEN EXIT;
WAIT sequin.goAhead;
ENDLOOP;
IF sequin.broken THEN {ReturnFreePupBuffer[packet]; RETURN WITH ERROR Broken};
IF control = data AND sequin.state = init THEN {control ← open; sequin.state ← open};
sequin.id.control ← control;
LOOPHOLE[@packet.pupID, LONG POINTER TO SequinID]^ ← sequin.id;
IF GetPupContentsBytes[packet] ~= 0
THEN
sequin.id.sendSequence ← AddWrap[sequin.id.sendSequence];
packet.pupType ← sequin.pupType;
SendRequeueable[sequin, packet];
};
SocketWarmer:
PUBLIC
PROC [sequin: Handle] =
This procedure is forked as a separate process which tries to keep the socket as "clean" as possible. It absorbs incoming packets and initiates retransmissions as necessary. Good packets are moved to the getQueue.
BEGIN
packet: PupBuffer;
packetID: LONG POINTER TO SequinID;
alive: BOOLEAN ← TRUE;
ProcessPacket:
PROC [sequin: Handle]
RETURNS [alive:
BOOLEAN] =
INLINE
BEGIN
alive ← TRUE;
IF packet =
NIL
THEN
BEGIN
SELECT CheckForPing[sequin]
FROM
no => NULL;
retransmit => Retransmit[sequin];
check => RespondWithNewPacket[check];
dead => RETURN[FALSE];
ENDCASE;
RETURN
END;
IF GetPupContentsBytes[packet] > maxBytes THEN RETURN;
SELECT packet.pupType
FROM
error =>
BEGIN OPEN PupTypes;
SELECT packet.errorCode
FROM
noProcessPupErrorCode, cantGetTherePupErrorCode,
hostDownPupErrorCode, eightHopsPupErrorCode => RETURN[FALSE];
ENDCASE => RETURN;
END;
sequin.pupType => NULL;
ENDCASE => RETURN;
packetID ← LOOPHOLE[@packet.pupID];
IF packetID.control = broken THEN RETURN[FALSE];
SELECT Compare[packetID.sendSequence, sequin.id.receiveSequence]
FROM
duplicate => RETURN;
ahead =>
BEGIN
DiscardAckedPackets[sequin];
IF packetID.control = restart THEN Retransmit[sequin];
IF ~sequin.restartRequested
THEN
BEGIN
sequin.restartRequested ← TRUE;
RespondWithCurrentPacket[sequin, restart];
END;
RETURN
END;
ENDCASE;
we've seen everything from the server
ResetPinging[];
DiscardAckedPackets[sequin];
SELECT packetID.control
FROM
data => {EnqueueArrival[sequin]; RespondWithNewPacket[ack]};
dallying => {RespondWithCurrentPacket[sequin, quit]; RETURN[FALSE]};
restart => Retransmit[sequin];
check => RespondWithCurrentPacket[sequin, ack];
ack, nop => NULL;
ENDCASE => ERROR ServerIsSpeakingGreek;
END;
CheckForPing:
ENTRY
PROC [sequin: Handle]
RETURNS [{no, retransmit, check, dead}] = INLINE
BEGIN
SELECT sequin.pings
FROM
0 =>
IF BasicTime.Period[sequin.lastPacketTime, BasicTime.Now[]]
<= idleLatency AND sequin.retransmitQueue.length = 0 THEN
RETURN[no];
maxPings => RETURN[dead];
ENDCASE;
IF sequin.state = init THEN RETURN[no]; -- don't ping until first real packet
sequin.pings ← sequin.pings + 1;
sequin.recentRestart ← sequin.restartRequested ← FALSE;
RETURN[
IF sequin.retransmitQueue.length = 1 AND sequin.pings = 1 THEN retransmit
ELSE check]
END;
ResetPinging:
PROC =
INLINE
{sequin.pings ← 0; sequin.lastPacketTime ← BasicTime.Now[]};
DiscardAckedPackets:
ENTRY
PROC [sequin: Handle] =
BEGIN
sequin.allocate ← packetID.allocate;
IF Compare[packetID.receiveSequence, sequin.retransmitSequence] = ahead
THEN
BEGIN
skipped: CARDINAL ← 0;
UNTIL packetID.receiveSequence = sequin.retransmitSequence
OR
skipped = sequin.retransmitQueue.length DO
b: PupBuffer ← DequeuePup[sequin.retransmitQueue];
IF b = NIL THEN EXIT; -- buffer hasn't made it to queue yet
IF sequin.retransmitSequence ~=
LOOPHOLE[b.pupID, SequinID].sendSequence
THEN
{EnqueuePup[sequin.retransmitQueue, b]; skipped ← skipped + 1; LOOP};
ReturnFreePupBuffer[b];
sequin.retransmitSequence ← AddWrap[sequin.retransmitSequence, 1];
skipped ← 0;
ENDLOOP;
BROADCAST sequin.goAhead;
END;
END;
EnqueueArrival:
ENTRY
PROC [sequin: Handle] =
INLINE
BEGIN
EnqueuePup[sequin.getQueue, packet];
sequin.id.receiveSequence ← AddWrap[sequin.id.receiveSequence, 1];
sequin.recentRestart ← sequin.restartRequested ← FALSE;
NOTIFY sequin.goAhead;
END;
RespondWithNewPacket:
PROC [control: SequinControl] =
BEGIN
packet ← GetFreePupBuffer[];
packetID ← LOOPHOLE[@packet.pupID];
RespondWithCurrentPacket[sequin, control];
END;
RespondWithCurrentPacket:
ENTRY
PROC [sequin: Handle, control: SequinControl] =
BEGIN
sequin.id.control ← control;
packetID^ ← sequin.id;
packet.pupType ← sequin.pupType;
SetPupContentsBytes[packet, 0];
sequin.socket.put[packet];
packet ← NIL;
END;
MarkDead:
ENTRY
PROC [sequin: Handle] =
BEGIN
sequin.broken ← TRUE;
sequin.state ← destroyed;
BROADCAST sequin.goAhead;
END;
Retransmit:
ENTRY
PROC [sequin: Handle] =
BEGIN
skipped: CARDINAL ← 0;
seq: SequenceNumber ← sequin.retransmitSequence;
IF sequin.recentRestart THEN RETURN;
UNTIL skipped = sequin.retransmitQueue.length
DO
buffer: PupBuffer = DequeuePup[sequin.retransmitQueue];
bufferID: LONG POINTER TO SequinID;
IF buffer = NIL THEN ERROR WhereDidIPutThatBuffer;
IF (bufferID ←
LOOPHOLE[@buffer.pupID]).sendSequence ~= seq
THEN
{EnqueuePup[sequin.retransmitQueue, buffer]; skipped ← skipped + 1; LOOP};
bufferID.inPart ← sequin.id.inPart;
SendRequeueable[sequin, buffer];
sequin.recentRestart ← TRUE;
skipped ← 0;
seq ← AddWrap[seq, 1];
ENDLOOP;
END;
ResetPinging[];
WHILE alive
DO
packet ← sequin.socket.get[];
IF ~(alive ← ProcessPacket[sequin]) THEN MarkDead[sequin];
IF packet ~= NIL THEN ReturnFreePupBuffer[packet];
ENDLOOP;
END;
MakeRequeueClosure:
PUBLIC
PROC [sequin: Handle]
RETURNS [closure: RequeueClosure] =
BEGIN OPEN PrincOpsUtils;
Return: PROC [RequeueClosure] ← LOOPHOLE[GetReturnLink[]];
Requeue:
PROC [buffer: BufferDefs.Buffer] =
BEGIN
DoRequeue:
ENTRY
PROC [sequin: Handle] =
INLINE
BEGIN
sequin.buffersToRequeue ← sequin.buffersToRequeue - 1;
IF sequin.state = destroyed
THEN
BEGIN
BufferDefs.ReturnFreeBuffer[buffer];
IF sequin.buffersToRequeue = 0 THEN {NOTIFY sequin.goAhead; RETURN};
END
ELSE BufferDefs.Enqueue[sequin.retransmitQueue, buffer];
END;
DoRequeue[sequin];
END;
sequin.requeuer ← Requeue;
Return[MyLocalFrame[]];
never gets here; see SequinImplA.Destroy
END;
Internal Procedures --
SendRequeueable:
INTERNAL
PROC [sequin: Handle, buffer: PupBuffer] =
BEGIN
buffer.requeueProcedure ← sequin.requeuer;
sequin.buffersToRequeue ← sequin.buffersToRequeue + 1;
sequin.socket.put[buffer];
END;
Compare:
PROC [a, b: SequenceNumber]
RETURNS [{equal, duplicate, ahead}] =
BEGIN
maxGetAhead: CARDINAL = 128;
RETURN[
SELECT
TRUE
FROM
a = b => equal,
a > b => IF a <= b + maxGetAhead THEN ahead ELSE duplicate,
ENDCASE => IF b <= a + maxGetAhead THEN duplicate ELSE ahead]
END;
AddWrap:
PROC
[seq: SequenceNumber, delta: CARDINAL ← 1] RETURNS [SequenceNumber] = INLINE {
RETURN [(seq+delta) MOD (LAST[SequenceNumber]+1)];
};
PupToSequinBuffer:
PROC [b: PupBuffer]
RETURNS [Buffer] =
INLINE
{
RETURN[[data:
LOOPHOLE[@b.pupBody], nBytes: GetPupContentsBytes[b],
maxBytes: maxBytes]]};
SequinToPupBuffer:
PROC [b: Buffer]
RETURNS [PupBuffer] =
BEGIN
IF b.data = NIL THEN ERROR BogusBuffer;
RETURN[LOOPHOLE[b.data-SIZE[pupBytes BufferDefs.PupBufferObject]]]
END;
END.