<> <> <> <> DIRECTORY BufferDefs USING [Buffer, PupBufferObject, QueueCleanup, QueueInitialize, QueueObject], PrincOpsUtils USING [Free], Process USING [Detach], PupDefs USING [ DataWordsPerPupBuffer, GetFreePupBuffer, GetHopsToNetwork, MsToTocks, PupAddress, PupBuffer, PupSocketDestroy, PupSocketMake, SetPupContentsBytes, Tocks, UniqueLocalPupSocketID], PupTypes USING [maxDataWordsPerGatewayPup, PupType], Sequin USING [Broken], SequinPrivate USING [ MakeRequeueClosure, Send, SequenceNumber, SequinID, SequinRep, SocketWarmer]; SequinImplA: MONITOR LOCKS sequin.LOCK USING sequin: Handle IMPORTS BufferDefs, PrincOpsUtils, Process, PupDefs, Sequin, SequinPrivate EXPORTS Sequin, SequinPrivate = BEGIN OPEN PupDefs, SequinPrivate; <> Handle: TYPE = REF SequinRep; SequinRep: PUBLIC TYPE = SequinPrivate.SequinRep; <> maxBytes: PUBLIC CARDINAL; maxAllocate: PUBLIC CARDINAL; <> nSequins: CARDINAL _ 0; <> SequinsInUse: ERROR = CODE; <> Create: PUBLIC PROC [dest: PupAddress, pupType: PupTypes.PupType] RETURNS [sequin: Handle] = { GetTicks: PROC RETURNS [Tocks] = BEGIN hops: CARDINAL _ GetHopsToNetwork[dest.net]; IF hops = LAST[CARDINAL] THEN hops _ 0; -- no route RETURN [[MsToTocks[hops * 750 + 500]]] END; sequin _ NEW[SequinRep _ [pupType: pupType]]; BufferDefs.QueueInitialize[sequin.retransmitQueue _ NEW[BufferDefs.QueueObject]]; BufferDefs.QueueInitialize[sequin.getQueue _ NEW[BufferDefs.QueueObject]]; sequin.socket _ PupSocketMake[UniqueLocalPupSocketID[], dest, GetTicks[]]; sequin.id.allocate _ maxAllocate; sequin.closure _ MakeRequeueClosure[sequin]; sequin.opened _ TRUE; Process.Detach[FORK SocketWarmer[sequin]]; nSequins _ nSequins + 1; }; WasOpened: ENTRY PROC [sequin: Handle] RETURNS [wasOpened: BOOL] = { IF sequin = NIL THEN RETURN [FALSE]; IF wasOpened _ sequin.opened THEN sequin.opened _ FALSE; }; WaitUntilAllQuiet: ENTRY PROC [sequin: Handle] = { UNTIL sequin.state = destroyed DO WAIT sequin.goAhead ENDLOOP; UNTIL sequin.buffersToRequeue = 0 DO WAIT sequin.goAhead ENDLOOP; }; Destroy: PUBLIC PROC [sequin: Handle] = { IF WasOpened[sequin] THEN { buffer: PupBuffer _ GetFreePupBuffer[]; SetPupContentsBytes[buffer, 0]; Send[sequin, buffer, destroy ! Sequin.Broken => CONTINUE]; WaitUntilAllQuiet[sequin]; BufferDefs.QueueCleanup[sequin.retransmitQueue]; BufferDefs.QueueCleanup[sequin.getQueue]; PupSocketDestroy[sequin.socket]; PrincOpsUtils.Free[sequin.closure]; nSequins _ nSequins - 1; }; }; <> Initialize: PROC = { maxBytes _ 2*MIN[DataWordsPerPupBuffer[], PupTypes.maxDataWordsPerGatewayPup]; <<10 = leaf overhead/packet; +1 roundup; +1 for parallelism>> maxAllocate _ 511/(maxBytes-10) + 1 + 1; }; <
> Initialize[]; END.