SequinImpl.mesa
Copyright © 1985 by Xerox Corporation. All rights reserved.
Loosely derived (after extensive discussions with Wobber) from Butterfield's Sequin.mesa of August 27, 1979 2:49 PM.
Levin: 3-Feb-82 9:29:58
Russ Atkinson, March 7, 1985 1:57:40 pm PST
DIRECTORY
BasicTime USING [Now, Period],
BufferDefs USING [Buffer, Enqueue, PupBufferObject, QueueCleanup, QueueInitialize, QueueObject, ReturnFreeBuffer],
PrincOpsUtils USING [Free, GetReturnLink, MyLocalFrame],
Process USING [Detach],
PupDefs USING [DataWordsPerPupBuffer, DequeuePup, EnqueuePup, GetFreePupBuffer, GetHopsToNetwork, GetPupContentsBytes, MsToTocks, PupAddress, PupBuffer, PupSocketDestroy, PupSocketMake, ReturnFreePupBuffer, SetPupContentsBytes, Tocks, UniqueLocalPupSocketID],
PupTypes USING [maxDataWordsPerGatewayPup, PupType],
Sequin USING [Buffer, noBuffer],
SequinPrivate USING [maxPings, RequeueClosure, SequenceNumber, SequinControl, SequinID, SequinRep];
SequinImpl: MONITOR LOCKS sequin.LOCK USING sequin: Handle
IMPORTS BasicTime, BufferDefs, PrincOpsUtils, Process, PupDefs
EXPORTS Sequin, SequinPrivate =
BEGIN OPEN PupDefs, Sequin, SequinPrivate;
Types
Handle: TYPE = REF SequinRep;
SequinRep: PUBLIC TYPE = SequinPrivate.SequinRep;
Variables exported to SequinPrivate
maxBytes: PUBLIC CARDINAL;
maxAllocate: PUBLIC CARDINAL;
Global Variables
nSequins: CARDINAL ← 0;
Miscellaneous Declarations
SequinsInUse: ERROR = CODE;
BogusBuffer: ERROR = CODE;
ServerIsSpeakingGreek: ERROR = CODE;
WhereDidIPutThatBuffer: ERROR = CODE;
idleLatency: INT = 10;
Procedures exported to Sequin
Create: PUBLIC PROC [dest: PupAddress, pupType: PupTypes.PupType] RETURNS [sequin: Handle] = {
GetTicks: PROC RETURNS [Tocks] = {
hops: CARDINAL ← GetHopsToNetwork[dest.net];
IF hops = LAST[CARDINAL] THEN hops ← 0; -- no route
RETURN [[MsToTocks[hops * 750 + 500]]]
};
sequin ← NEW[SequinRep ← [pupType: pupType]];
BufferDefs.QueueInitialize[sequin.retransmitQueue ← NEW[BufferDefs.QueueObject]];
BufferDefs.QueueInitialize[sequin.getQueue ← NEW[BufferDefs.QueueObject]];
sequin.socket ← PupSocketMake[UniqueLocalPupSocketID[], dest, GetTicks[]];
sequin.id.allocate ← maxAllocate;
sequin.closure ← MakeRequeueClosure[sequin];
sequin.opened ← TRUE;
Process.Detach[FORK SocketWarmer[sequin]];
nSequins ← nSequins + 1;
};
WasOpened: ENTRY PROC [sequin: Handle] RETURNS [wasOpened: BOOL] = {
IF sequin = NIL THEN RETURN [FALSE];
IF wasOpened ← sequin.opened THEN sequin.opened ← FALSE;
};
WaitUntilAllQuiet: ENTRY PROC [sequin: Handle] = {
UNTIL sequin.state = destroyed DO WAIT sequin.goAhead ENDLOOP;
UNTIL sequin.buffersToRequeue = 0 DO WAIT sequin.goAhead ENDLOOP;
};
Destroy: PUBLIC PROC [sequin: Handle] = {
IF WasOpened[sequin] THEN {
buffer: PupBuffer ← GetFreePupBuffer[];
SetPupContentsBytes[buffer, 0];
Send[sequin, buffer, destroy ! Broken => CONTINUE];
WaitUntilAllQuiet[sequin];
BufferDefs.QueueCleanup[sequin.retransmitQueue];
BufferDefs.QueueCleanup[sequin.getQueue];
PupSocketDestroy[sequin.socket];
PrincOpsUtils.Free[sequin.closure];
nSequins ← nSequins - 1;
};
};
Broken: PUBLIC ERROR = CODE;
Get: PUBLIC ENTRY PROC [sequin: Handle] RETURNS [Buffer] = {
packet: PupBuffer;
DO
IF sequin.broken THEN RETURN WITH ERROR Broken;
IF (packet ← DequeuePup[sequin.getQueue]) ~= NIL THEN EXIT;
WAIT sequin.goAhead;
ENDLOOP;
RETURN[PupToSequinBuffer[packet]]
};
GetIfAvailable: PUBLIC ENTRY PROC [sequin: Handle] RETURNS [Buffer] = {
packet: PupBuffer;
IF sequin.broken THEN RETURN WITH ERROR Broken;
IF (packet ← DequeuePup[sequin.getQueue]) = NIL THEN RETURN[noBuffer];
RETURN[PupToSequinBuffer[packet]]
};
Put: PUBLIC PROC [sequin: Handle, buffer: Buffer] = {
packet: PupBuffer = SequinToPupBuffer[buffer];
SetPupContentsBytes[packet, buffer.nBytes];
Send[sequin, packet, data];
};
GetEmptyBuffer: PUBLIC PROC RETURNS [Buffer] = {
packet: PupBuffer = GetFreePupBuffer[];
SetPupContentsBytes[packet, 0];
RETURN[PupToSequinBuffer[packet]]
};
ReleaseBuffer: PUBLIC PROC [buffer: Buffer] = {
IF buffer = noBuffer THEN RETURN;
ReturnFreePupBuffer[SequinToPupBuffer[buffer]];
};
Procedures exported to SequinPrivate
Send: PUBLIC ENTRY PROC [sequin: Handle, packet: PupBuffer, control: SequinControl] = {
DO
retrans: SequenceNumber =
AddWrap[sequin.retransmitSequence, MIN[sequin.allocate, maxAllocate]];
IF sequin.broken OR Compare[retrans, sequin.id.sendSequence] = ahead THEN EXIT;
WAIT sequin.goAhead;
ENDLOOP;
IF sequin.broken THEN {ReturnFreePupBuffer[packet]; RETURN WITH ERROR Broken};
IF control = data AND sequin.state = init THEN {control ← open; sequin.state ← open};
sequin.id.control ← control;
LOOPHOLE[@packet.pupID, LONG POINTER TO SequinID]^ ← sequin.id;
IF GetPupContentsBytes[packet] ~= 0 THEN
sequin.id.sendSequence ← AddWrap[sequin.id.sendSequence];
packet.pupType ← sequin.pupType;
SendRequeueable[sequin, packet];
};
SocketWarmer: PUBLIC PROC [sequin: Handle] = {
This procedure is forked as a separate process which tries to keep the socket as "clean" as possible. It absorbs incoming packets and initiates retransmissions as necessary. Good packets are moved to the getQueue.
packet: PupBuffer;
packetID: LONG POINTER TO SequinID;
alive: BOOLEANTRUE;
ProcessPacket: PROC [sequin: Handle] RETURNS [alive: BOOLEAN] = INLINE {
alive ← TRUE;
IF packet = NIL THEN {
SELECT CheckForPing[sequin] FROM
no => NULL;
retransmit => Retransmit[sequin];
check => RespondWithNewPacket[check];
dead => RETURN[FALSE];
ENDCASE;
RETURN
};
IF GetPupContentsBytes[packet] > maxBytes THEN RETURN;
SELECT packet.pupType FROM
error =>
{ OPEN PupTypes;
SELECT packet.errorCode FROM
noProcessPupErrorCode, cantGetTherePupErrorCode,
hostDownPupErrorCode, eightHopsPupErrorCode => RETURN[FALSE];
ENDCASE => RETURN;
};
sequin.pupType => NULL;
ENDCASE => RETURN;
packetID ← LOOPHOLE[@packet.pupID];
IF packetID.control = broken THEN RETURN[FALSE];
SELECT Compare[packetID.sendSequence, sequin.id.receiveSequence] FROM
duplicate => RETURN;
ahead => {
DiscardAckedPackets[sequin];
IF packetID.control = restart THEN Retransmit[sequin];
IF ~sequin.restartRequested THEN
{
sequin.restartRequested ← TRUE;
RespondWithCurrentPacket[sequin, restart];
};
RETURN
};
ENDCASE;
we've seen everything from the server
ResetPinging[];
DiscardAckedPackets[sequin];
SELECT packetID.control FROM
data => {EnqueueArrival[sequin]; RespondWithNewPacket[ack]};
dallying => {RespondWithCurrentPacket[sequin, quit]; RETURN[FALSE]};
restart => Retransmit[sequin];
check => RespondWithCurrentPacket[sequin, ack];
ack, nop => NULL;
ENDCASE => ERROR ServerIsSpeakingGreek;
};
CheckForPing: ENTRY PROC [sequin: Handle] RETURNS [{no, retransmit, check, dead}] = INLINE {
SELECT sequin.pings FROM
0 =>
IF BasicTime.Period[sequin.lastPacketTime, BasicTime.Now[]]
<= idleLatency AND sequin.retransmitQueue.length = 0 THEN
RETURN[no];
maxPings => RETURN[dead];
ENDCASE;
IF sequin.state = init THEN RETURN[no]; -- don't ping until first real packet
sequin.pings ← sequin.pings + 1;
sequin.recentRestart ← sequin.restartRequested ← FALSE;
RETURN[
IF sequin.retransmitQueue.length = 1 AND sequin.pings = 1 THEN retransmit
ELSE check]
};
ResetPinging: PROC = {
sequin.pings ← 0;
sequin.lastPacketTime ← BasicTime.Now[];
};
DiscardAckedPackets: ENTRY PROC [sequin: Handle] = {
sequin.allocate ← packetID.allocate;
IF Compare[packetID.receiveSequence, sequin.retransmitSequence] = ahead THEN {
skipped: CARDINAL ← 0;
UNTIL packetID.receiveSequence = sequin.retransmitSequence OR
skipped = sequin.retransmitQueue.length DO
b: PupBuffer ← DequeuePup[sequin.retransmitQueue];
IF b = NIL THEN EXIT; -- buffer hasn't made it to queue yet
IF sequin.retransmitSequence ~= LOOPHOLE[b.pupID, SequinID].sendSequence THEN
{EnqueuePup[sequin.retransmitQueue, b]; skipped ← skipped + 1; LOOP};
ReturnFreePupBuffer[b];
sequin.retransmitSequence ← AddWrap[sequin.retransmitSequence, 1];
skipped ← 0;
ENDLOOP;
BROADCAST sequin.goAhead;
};
};
EnqueueArrival: ENTRY PROC [sequin: Handle] = INLINE {
EnqueuePup[sequin.getQueue, packet];
sequin.id.receiveSequence ← AddWrap[sequin.id.receiveSequence, 1];
sequin.recentRestart ← sequin.restartRequested ← FALSE;
NOTIFY sequin.goAhead;
};
RespondWithNewPacket: PROC [control: SequinControl] = {
packet ← GetFreePupBuffer[];
packetID ← LOOPHOLE[@packet.pupID];
RespondWithCurrentPacket[sequin, control];
};
RespondWithCurrentPacket: ENTRY PROC [sequin: Handle, control: SequinControl] = {
sequin.id.control ← control;
packetID^ ← sequin.id;
packet.pupType ← sequin.pupType;
SetPupContentsBytes[packet, 0];
sequin.socket.put[packet];
packet ← NIL;
};
MarkDead: ENTRY PROC [sequin: Handle] = {
sequin.broken ← TRUE;
sequin.state ← destroyed;
BROADCAST sequin.goAhead;
};
Retransmit: ENTRY PROC [sequin: Handle] = {
skipped: CARDINAL ← 0;
seq: SequenceNumber ← sequin.retransmitSequence;
IF sequin.recentRestart THEN RETURN;
UNTIL skipped = sequin.retransmitQueue.length DO
buffer: PupBuffer = DequeuePup[sequin.retransmitQueue];
bufferID: LONG POINTER TO SequinID;
IF buffer = NIL THEN ERROR WhereDidIPutThatBuffer;
IF (bufferID ← LOOPHOLE[@buffer.pupID]).sendSequence ~= seq THEN
{EnqueuePup[sequin.retransmitQueue, buffer]; skipped ← skipped + 1; LOOP};
bufferID.inPart ← sequin.id.inPart;
SendRequeueable[sequin, buffer];
sequin.recentRestart ← TRUE;
skipped ← 0;
seq ← AddWrap[seq, 1];
ENDLOOP;
};
ResetPinging[];
WHILE alive DO
packet ← sequin.socket.get[];
IF ~(alive ← ProcessPacket[sequin]) THEN MarkDead[sequin];
IF packet ~= NIL THEN ReturnFreePupBuffer[packet];
ENDLOOP;
};
MakeRequeueClosure: PUBLIC PROC [sequin: Handle] RETURNS [closure: RequeueClosure] = {
OPEN PrincOpsUtils;
Return: PROC [RequeueClosure] ← LOOPHOLE[GetReturnLink[]];
Requeue: PROC [buffer: BufferDefs.Buffer] = {
DoRequeue: ENTRY PROC [sequin: Handle] = INLINE {
sequin.buffersToRequeue ← sequin.buffersToRequeue - 1;
IF sequin.state = destroyed
THEN {
BufferDefs.ReturnFreeBuffer[buffer];
IF sequin.buffersToRequeue = 0 THEN {NOTIFY sequin.goAhead; RETURN};
}
ELSE BufferDefs.Enqueue[sequin.retransmitQueue, buffer];
};
DoRequeue[sequin];
};
sequin.requeuer ← Requeue;
Return[MyLocalFrame[]];
never gets here; see SequinImplA.Destroy
};
Internal Procedures
SendRequeueable: INTERNAL PROC [sequin: Handle, buffer: PupBuffer] = {
buffer.requeueProcedure ← sequin.requeuer;
sequin.buffersToRequeue ← sequin.buffersToRequeue + 1;
sequin.socket.put[buffer];
};
Compare: PROC [a, b: SequenceNumber] RETURNS [{equal, duplicate, ahead}] = {
maxGetAhead: CARDINAL = 128;
RETURN[
SELECT TRUE FROM
a = b => equal,
a > b => IF a <= b + maxGetAhead THEN ahead ELSE duplicate,
ENDCASE => IF b <= a + maxGetAhead THEN duplicate ELSE ahead]
};
AddWrap: PROC [seq: SequenceNumber, delta: CARDINAL ← 1] RETURNS [SequenceNumber] = INLINE {
RETURN [(seq+delta) MOD (LAST[SequenceNumber]+1)];
};
PupToSequinBuffer: PROC [b: PupBuffer] RETURNS [Buffer] = {
RETURN[[data: LOOPHOLE[@b.pupBody], nBytes: GetPupContentsBytes[b], maxBytes: maxBytes]];
};
SequinToPupBuffer: PROC [b: Buffer] RETURNS [PupBuffer] = {
IF b.data = NIL THEN ERROR BogusBuffer;
RETURN[LOOPHOLE[b.data-SIZE[pupBytes BufferDefs.PupBufferObject]]]
};
Initialization
Initialize: PROC = {
maxBytes ← 2*MIN[DataWordsPerPupBuffer[], PupTypes.maxDataWordsPerGatewayPup];
10 = leaf overhead/packet; +1 roundup; +1 for parallelism
maxAllocate ← 511/(maxBytes-10) + 1 + 1;
};
Main body
Initialize[];
END.