Send:
PUBLIC
ENTRY
PROC [sequin: Handle, packet: PupBuffer, control: SequinControl] = {
DO
retrans: SequenceNumber =
AddWrap[sequin.retransmitSequence, MIN[sequin.allocate, maxAllocate]];
IF sequin.broken OR Compare[retrans, sequin.id.sendSequence] = ahead THEN EXIT;
WAIT sequin.goAhead;
ENDLOOP;
IF sequin.broken THEN {ReturnFreePupBuffer[packet]; RETURN WITH ERROR Broken};
IF control = data AND sequin.state = init THEN {control ← open; sequin.state ← open};
sequin.id.control ← control;
LOOPHOLE[@packet.pupID, LONG POINTER TO SequinID]^ ← sequin.id;
IF GetPupContentsBytes[packet] ~= 0
THEN
sequin.id.sendSequence ← AddWrap[sequin.id.sendSequence];
packet.pupType ← sequin.pupType;
SendRequeueable[sequin, packet];
};
SocketWarmer:
PUBLIC
PROC [sequin: Handle] = {
This procedure is forked as a separate process which tries to keep the socket as "clean" as possible. It absorbs incoming packets and initiates retransmissions as necessary. Good packets are moved to the getQueue.
packet: PupBuffer;
packetID: LONG POINTER TO SequinID;
alive: BOOLEAN ← TRUE;
ProcessPacket:
PROC [sequin: Handle]
RETURNS [alive:
BOOLEAN] =
INLINE {
alive ← TRUE;
IF packet =
NIL
THEN {
SELECT CheckForPing[sequin]
FROM
no => NULL;
retransmit => Retransmit[sequin];
check => RespondWithNewPacket[check];
dead => RETURN[FALSE];
ENDCASE;
RETURN
};
IF GetPupContentsBytes[packet] > maxBytes THEN RETURN;
SELECT packet.pupType
FROM
error =>
{ OPEN PupTypes;
SELECT packet.errorCode
FROM
noProcessPupErrorCode, cantGetTherePupErrorCode,
hostDownPupErrorCode, eightHopsPupErrorCode => RETURN[FALSE];
ENDCASE => RETURN;
};
sequin.pupType => NULL;
ENDCASE => RETURN;
packetID ← LOOPHOLE[@packet.pupID];
IF packetID.control = broken THEN RETURN[FALSE];
SELECT Compare[packetID.sendSequence, sequin.id.receiveSequence]
FROM
duplicate => RETURN;
ahead => {
DiscardAckedPackets[sequin];
IF packetID.control = restart THEN Retransmit[sequin];
IF ~sequin.restartRequested
THEN
{
sequin.restartRequested ← TRUE;
RespondWithCurrentPacket[sequin, restart];
};
RETURN
};
ENDCASE;
we've seen everything from the server
ResetPinging[];
DiscardAckedPackets[sequin];
SELECT packetID.control
FROM
data => {EnqueueArrival[sequin]; RespondWithNewPacket[ack]};
dallying => {RespondWithCurrentPacket[sequin, quit]; RETURN[FALSE]};
restart => Retransmit[sequin];
check => RespondWithCurrentPacket[sequin, ack];
ack, nop => NULL;
ENDCASE => ERROR ServerIsSpeakingGreek;
};
CheckForPing:
ENTRY
PROC [sequin: Handle]
RETURNS [{no, retransmit, check, dead}] =
INLINE {
SELECT sequin.pings
FROM
0 =>
IF BasicTime.Period[sequin.lastPacketTime, BasicTime.Now[]]
<= idleLatency AND sequin.retransmitQueue.length = 0 THEN
RETURN[no];
maxPings => RETURN[dead];
ENDCASE;
IF sequin.state = init THEN RETURN[no]; -- don't ping until first real packet
sequin.pings ← sequin.pings + 1;
sequin.recentRestart ← sequin.restartRequested ← FALSE;
RETURN[
IF sequin.retransmitQueue.length = 1 AND sequin.pings = 1 THEN retransmit
ELSE check]
};
ResetPinging:
PROC = {
sequin.pings ← 0;
sequin.lastPacketTime ← BasicTime.Now[];
};
DiscardAckedPackets:
ENTRY
PROC [sequin: Handle] = {
sequin.allocate ← packetID.allocate;
IF Compare[packetID.receiveSequence, sequin.retransmitSequence] = ahead
THEN {
skipped: CARDINAL ← 0;
UNTIL packetID.receiveSequence = sequin.retransmitSequence
OR
skipped = sequin.retransmitQueue.length DO
b: PupBuffer ← DequeuePup[sequin.retransmitQueue];
IF b = NIL THEN EXIT; -- buffer hasn't made it to queue yet
IF sequin.retransmitSequence ~=
LOOPHOLE[b.pupID, SequinID].sendSequence
THEN
{EnqueuePup[sequin.retransmitQueue, b]; skipped ← skipped + 1; LOOP};
ReturnFreePupBuffer[b];
sequin.retransmitSequence ← AddWrap[sequin.retransmitSequence, 1];
skipped ← 0;
ENDLOOP;
BROADCAST sequin.goAhead;
};
};
EnqueueArrival:
ENTRY
PROC [sequin: Handle] =
INLINE {
EnqueuePup[sequin.getQueue, packet];
sequin.id.receiveSequence ← AddWrap[sequin.id.receiveSequence, 1];
sequin.recentRestart ← sequin.restartRequested ← FALSE;
NOTIFY sequin.goAhead;
};
RespondWithNewPacket:
PROC [control: SequinControl] = {
packet ← GetFreePupBuffer[];
packetID ← LOOPHOLE[@packet.pupID];
RespondWithCurrentPacket[sequin, control];
};
RespondWithCurrentPacket:
ENTRY
PROC [sequin: Handle, control: SequinControl] = {
sequin.id.control ← control;
packetID^ ← sequin.id;
packet.pupType ← sequin.pupType;
SetPupContentsBytes[packet, 0];
sequin.socket.put[packet];
packet ← NIL;
};
MarkDead:
ENTRY
PROC [sequin: Handle] = {
sequin.broken ← TRUE;
sequin.state ← destroyed;
BROADCAST sequin.goAhead;
};
Retransmit:
ENTRY
PROC [sequin: Handle] = {
skipped: CARDINAL ← 0;
seq: SequenceNumber ← sequin.retransmitSequence;
IF sequin.recentRestart THEN RETURN;
UNTIL skipped = sequin.retransmitQueue.length
DO
buffer: PupBuffer = DequeuePup[sequin.retransmitQueue];
bufferID: LONG POINTER TO SequinID;
IF buffer = NIL THEN ERROR WhereDidIPutThatBuffer;
IF (bufferID ←
LOOPHOLE[@buffer.pupID]).sendSequence ~= seq
THEN
{EnqueuePup[sequin.retransmitQueue, buffer]; skipped ← skipped + 1; LOOP};
bufferID.inPart ← sequin.id.inPart;
SendRequeueable[sequin, buffer];
sequin.recentRestart ← TRUE;
skipped ← 0;
seq ← AddWrap[seq, 1];
ENDLOOP;
};
ResetPinging[];
WHILE alive
DO
packet ← sequin.socket.get[];
IF ~(alive ← ProcessPacket[sequin]) THEN MarkDead[sequin];
IF packet ~= NIL THEN ReturnFreePupBuffer[packet];
ENDLOOP;
};
MakeRequeueClosure:
PUBLIC
PROC [sequin: Handle]
RETURNS [closure: RequeueClosure] =
{
OPEN PrincOpsUtils;
Return: PROC [RequeueClosure] ← LOOPHOLE[GetReturnLink[]];
Requeue:
PROC [buffer: BufferDefs.Buffer] = {
DoRequeue:
ENTRY
PROC [sequin: Handle] =
INLINE {
sequin.buffersToRequeue ← sequin.buffersToRequeue - 1;
IF sequin.state = destroyed
THEN {
BufferDefs.ReturnFreeBuffer[buffer];
IF sequin.buffersToRequeue = 0 THEN {NOTIFY sequin.goAhead; RETURN};
}
ELSE BufferDefs.Enqueue[sequin.retransmitQueue, buffer];
};
DoRequeue[sequin];
};
sequin.requeuer ← Requeue;
Return[MyLocalFrame[]];
never gets here; see SequinImplA.Destroy
};