DIRECTORY
Process USING [InitializeCondition, DisableTimeout, SetTimeout, MsecToTicks],
ProcessorFace USING [microsecondsPerHundredPulses],
CommUtilDefs USING [EnableAborts],
PupPktOps,
PupStream USING [CloseReason, PupOpenMode],
PupPktDefs USING [PupPktStream],
PupDefs USING [Pair, defaultPupsToAllocate, DataWordsPerPupBuffer, PupAddress, PupBuffer, PupSocketDestroy, PupSocketKick, PupSocketID, Tocks, veryLongWait, veryShortWait],
BufferDefs USING [QueueCleanup, QueueInitialize, QueueObject];
PupPktMgr:
PROGRAM
IMPORTS ProcessorFace, Process, CommUtilDefs, PupPktOps, PupDefs, BufferDefs
EXPORTS PupPktOps, PupStream, PupPktDefs =
BEGIN OPEN PupPktOps, PupPktDefs, PupDefs;
PupPktStreamObject: PUBLIC TYPE = PupPktOps.InstanceData;
myPing: BOOLEAN ← TRUE;
myMaxAllocate, myPathMaxAllocate: CARDINAL ← defaultPupsToAllocate;
myMaxBufferSize: CARDINAL ← 0;
NoBufferToSend: PUBLIC ERROR = CODE;
StreamAlreadyOpen: PUBLIC ERROR = CODE;
maxRetransmitPulses:
PUBLIC PupPktOps.Pulses ←
100*(5000000/ProcessorFace.microsecondsPerHundredPulses);
minRetransmitPulses:
PUBLIC PupPktOps.Pulses ←
100*(100000/ProcessorFace.microsecondsPerHundredPulses);
initialRetransmitPulses:
PUBLIC PupPktOps.Pulses ←
100*(5000000/ProcessorFace.microsecondsPerHundredPulses);
ctlRetransmitPulses:
PUBLIC PupPktOps.Pulses ←
100*(2000000/ProcessorFace.microsecondsPerHundredPulses);
pingPulses:
PUBLIC PupPktOps.Pulses ←
100*(60000000/ProcessorFace.microsecondsPerHundredPulses);
PupPktStreamMake:
PUBLIC
PROCEDURE [
local: PupSocketID, remote: PupAddress, ticks: Tocks,
mode: PupStream.PupOpenMode, id: Pair] RETURNS [PupPktStream] =
BEGIN
him: Instance ← NEW[PupPktOps.InstanceData];
him.state ← idle;
him.c ← NIL;
him.dontWait ← FALSE;
him.dataBytesPerPup ← 2*DataWordsPerPupBuffer[];
him.outIntPending ← FALSE;
him.outEnd ← 0;
him.probeCounter ← 0;
him.ping ← myPing;
him.myMaxAllocate ← myMaxAllocate;
him.pathMaxAllocate ← myPathMaxAllocate;
him.hisMaxAllocate ← 0;
him.throttle ← 0;
him.unackedPups ← 0;
him.allocatedPups ← 0;
him.clumpsSinceBump ← 0;
him.sentBuffer ← NIL;
him.pleaseDie ← FALSE;
him.sameNet ← FALSE;
him.sendAck ← FALSE;
him.aDataOut ← FALSE;
him.retransmitPulses ← initialRetransmitPulses;
him.whyClosed ← localClose;
him.text ← NIL;
him.inputQueue ← NEW[BufferDefs.QueueObject];
him.sentQueue ← NEW[BufferDefs.QueueObject];
BufferDefs.QueueInitialize[him.inputQueue];
BufferDefs.QueueInitialize[him.sentQueue];
Process.InitializeCondition[@him.stateChange, Process.MsecToTicks[1000]];
Process.InitializeCondition[@him.retransmitterReady, Process.MsecToTicks[100]];
Process.InitializeCondition[@him.inputReady, Process.MsecToTicks[5000]];
Process.InitializeCondition[@him.waitingForInterrupt, Process.MsecToTicks[5000]];
Process.DisableTimeout[@him.waitingForInterrupt];
CommUtilDefs.EnableAborts[@him.waitingForInterrupt];
IF myMaxBufferSize # 0 THEN him.dataBytesPerPup ← myMaxBufferSize;
SELECT ticks
FROM
veryShortWait => him.dontWait ← TRUE;
veryLongWait => Process.DisableTimeout[@him.inputReady];
ENDCASE => Process.SetTimeout[@him.inputReady, ticks];
MakeLocal[
him, local, remote, mode, id ! UNWIND => PupPktStreamDestroy[him]];
RETURN[him];
END;
PupPktStreamDestroy:
PUBLIC
PROCEDURE [him: Instance] =
BEGIN OPEN him;
DestroyLocalLocked[him];
JOIN retransmitterFork;
PupSocketKick[socket];
JOIN slurpFork;
IF sentBuffer#NIL THEN ReturnFreePupBuffer[sentBuffer];
BufferDefs.QueueCleanup[inputQueue];
BufferDefs.QueueCleanup[sentQueue];
PupSocketDestroy[socket];
END;
SetMaxAllocation:
PUBLIC
SAFE
PROCEDURE [n:
CARDINAL] =
CHECKED BEGIN
myMaxAllocate ← n;
myPathMaxAllocate ← MIN[defaultPupsToAllocate, n];
END;
SetMaxBufferSize:
PUBLIC
SAFE
PROCEDURE [n:
CARDINAL] =
CHECKED BEGIN myMaxBufferSize ← 2*MIN[n, DataWordsPerPupBuffer[]]; END;
SetPinging:
PUBLIC
SAFE
PROCEDURE [ping:
BOOLEAN] =
CHECKED
BEGIN myPing ← ping; END;
END.