-- File: CourierImplM.mesa - last edit:
-- AOF 19-Jun-87 18:53:01
-- Copyright (C) 1984, 1985, 1986, 1987 by Xerox Corporation. All rights reserved.
DIRECTORY
CommPriorities USING [normal],
Courier USING [
Arguments, Description, Dispatcher, Error, ErrorCode, ExportItem,
Exports, Free, Handle, Parameters, SystemElement, VersionRange],
CourierExtras USING [Missing],
CourierInternal USING [
AugmentedStream, ConnectionHandle, ConnectionObject, ConnectionType,
CreateDefaultStream, Creator, DefaultListener, doStats, ExportHandle,
ExportObject, Listener, seal, ServerConnection, SetMessageProtocolVersion,
stats, TransportType, ttDefault, UserConnection, WaitTime],
CourierOps USING [
StackBlockHandle, stackCacheLength, stackPageLength],
CourierProtocol USING [dataSST, Protocol, pvHigh],
Environment USING [Block],
File USING [File, nullFile],
Heap USING [Create, Delete, Prune],
Inline USING [LowHalf],
NetworkStream USING [ClassOfService, FindAddresses],
NSConstants USING [courierSocket],
PilotSwitches USING [heapOwnerChecking],
Process USING [
Abort, Detach, DisableTimeout, EnableAborts, SecondsToTicks, SetTimeout,
Priority, SetPriority, GetPriority],
Space USING [Interval, Kill, Map, Unmap],
Stream USING [Block, Handle, InputOptions],
System USING [
GetClockPulses, GetGreenwichMeanTime, GreenwichMeanTime, HostNumber,
NetworkAddress, switches];
CourierImplM: MONITOR
IMPORTS
Heap, Inline, Courier, CourierInternal, NetworkStream, Process,
Space, System
EXPORTS
Courier, CourierExtras, CourierInternal, CourierOps =
BEGIN
pvLow: CourierProtocol.Protocol = CourierProtocol.pvHigh;
pvHigh: CourierProtocol.Protocol = CourierProtocol.pvHigh;
--EXPORTS
longZone: PUBLIC UNCOUNTED ZONE;
NotFound: PUBLIC <<CourierExtras>> ERROR[what: CourierExtras.Missing] = CODE;
Error: PUBLIC <<Courier>> ERROR [errorCode: Courier.ErrorCode] = CODE;
VersionMismatch: PUBLIC <<Courier>> ERROR [range: Courier.VersionRange] = CODE;
--raised by Courier @ user in response to an "abort" message
RemoteErrorSignalled: PUBLIC <<Courier>> ERROR [
errorNumber: CARDINAL, arguments: Courier.Arguments] = CODE;
--Raised by the dispatcher. Translated to Error[noSuchProcedureNumber] at user.
NoSuchProcedureNumber: PUBLIC <<Courier>> ERROR = CODE;
--Raised by client. Translated to Error[invalidArguments] at user.
InvalidArguments: PUBLIC <<Courier>> ERROR = CODE;
--Generated by implementer of a remote procedure, i.e. the dispatcher.
SignalRemoteError: PUBLIC <<Courier>> ERROR [
errorNumber: CARDINAL, arguments: Courier.Parameters] = CODE;
<<
The listener will create 'streamClass' streams when it acknowledges
connections. ExportRemoteProgram will leave this 'transactional' until
the first export requesting 'bulk' is encountered. After that all
streams created will be bulk.
>>
streamClass: PUBLIC NetworkStream.ClassOfService ← transactional;
--record time Courier was first started on this system element
startTime: PUBLIC System.GreenwichMeanTime ← System.GetGreenwichMeanTime[];
<<
Streams are totalled, user + server, and only maxStreamsAllowed
are allowed to exist an any time.
>>
numberOfStreams: PUBLIC CARDINAL ← 0;
maxStreamsAllowed: PUBLIC CARDINAL ← 25;
--Various times paramaters {tmo ← base + (hops * hopWeight)}
hopWeight: PUBLIC CARDINAL ← 1;
userIdleTimeout: PUBLIC CARDINAL ← 15; --with no activity
dallyTimeout: PUBLIC CARDINAL ← 2; --from time client reqeusts delete
activeTimeout: PUBLIC CourierInternal.WaitTime ← LAST[CourierInternal.WaitTime];
serverIdleTimeout: PUBLIC CourierInternal.WaitTime ← 90000; --no activity
createTimeout: PUBLIC CourierInternal.WaitTime ← 20000; --in formula
watcher: PROCESS ← NIL; --idle line watcher
watch: CONDITION; --condition variable used inside
--one-time check for the default listener on this system element
listening: BOOLEAN ← FALSE;
stackCache: ARRAY CARDINAL[0..CourierOps.stackCacheLength) OF
CourierOps.StackBlockHandle ← ALL[NIL];
export: RECORD [
head: CourierInternal.ExportHandle, active, total: CARDINAL] ←
[NIL, 0, 0];
connection: RECORD [
head: CourierInternal.ConnectionHandle, active, total: CARDINAL] ←
[NIL, 0, 0];
AddressesFromCourier: PUBLIC <<CourierExtras>> ENTRY PROC[
courier: Courier.Handle] RETURNS[local, remote: System.NetworkAddress] =
BEGIN
FOR ch: CourierInternal.ConnectionHandle ← connection.head, ch.link
UNTIL ch = NIL DO
IF @ch.object = courier THEN
BEGIN
sH: Stream.Handle;
IF ch.transFilter = NIL THEN RETURN WITH ERROR NotFound[address];
sH ← LOOPHOLE[@ch.transFilter.context, LONG POINTER TO Stream.Handle]↑;
IF sH = NIL THEN RETURN WITH ERROR NotFound[address];
RETURN NetworkStream.FindAddresses[sH];
END;
REPEAT FINISHED => RETURN WITH ERROR NotFound[courier];
ENDLOOP;
END; --AddressesFromCourier
AssignTransportType: PUBLIC PROC RETURNS [t: CourierInternal.TransportType] =
BEGIN
t ← 0;
UNTIL t # 0 DO t ← Inline.LowHalf[System.GetClockPulses[]]; ENDLOOP;
END; --AssignTransportType
CourierFromAddresses: PUBLIC <<CourierExtras>> ENTRY PROC[
local, remote: System.NetworkAddress] RETURNS[courier: Courier.Handle] =
BEGIN
FOR ch: CourierInternal.ConnectionHandle ← connection.head, ch.link
UNTIL ch = NIL DO
sH: Stream.Handle;
l, r: System.NetworkAddress;
IF ch.transFilter = NIL THEN LOOP;
sH ← LOOPHOLE[@ch.transFilter.context, LONG POINTER TO Stream.Handle]↑;
IF sH = NIL THEN LOOP;
[l, r] ← NetworkStream.FindAddresses[sH];
IF (l = local) AND (r = remote) THEN RETURN[@ch.object];
REPEAT FINISHED => RETURN WITH ERROR NotFound[address];
ENDLOOP;
END; --CourierFromAddresses
Create: PUBLIC PROC[
remote: Courier.SystemElement,
programNumber: LONG CARDINAL, versionNumber: CARDINAL,
zone: UNCOUNTED ZONE, classOfService: NetworkStream.ClassOfService]
RETURNS[cH: Courier.Handle] =
BEGIN
--Build a user connection object, returning a handle.
--This may signal (via CreateInternal)
ch: CourierInternal.ConnectionHandle ← CreateInternal[user, remote];
remote.socket ← NSConstants.courierSocket;
ch.object.remote ← remote;
ch.object.programNumber ← programNumber;
ch.object.versionNumber ← versionNumber;
ch.object.zone ← zone;
ch.object.classOfService ← classOfService;
RETURN[@ch.object];
END; --Create
CreateInternal: PUBLIC ENTRY PROC [type: CourierInternal.ConnectionType,
remote: Courier.SystemElement]
RETURNS[ch: CourierInternal.ConnectionHandle] =
BEGIN
<<ENABLE UNWIND => NULL;>> --no non-fatal errors here
h: CourierInternal.ConnectionHandle;
--look for an existing object.
IF type = user THEN
BEGIN
FOR h ← connection.head, h.link UNTIL h = NIL DO
WITH c: h SELECT FROM
user => --only user objects are acquirable.
IF (c.streamState = dally) AND --and we only grab dallying ones.
(c.object.remote.host = remote.host) THEN
BEGIN --found one!
ch ← h;
c.streamState ← idle; --so nobody else grabs it.
c.seal ← CourierInternal.seal;
IF CourierInternal.doStats THEN
CourierInternal.stats[streamsAcquired] ←
SUCC[CourierInternal.stats[streamsAcquired]];
RETURN; --look no further
END; --found cached connection object
ENDCASE; --server, client => NULL;
ENDLOOP;
--didn't find a cached object - create a new one.
ch ← longZone.NEW[CourierInternal.ConnectionObject];
END
ELSE --type # user, create new object.
ch ← longZone.NEW[CourierInternal.ConnectionObject];
SELECT type FROM
user =>
BEGIN
ch.body ← user[
protocolRange: [pvLow, pvHigh],
message: [messageObject: protocol3[protocol3Body: ]],
clock: System.GetGreenwichMeanTime[],
seal: CourierInternal.seal];
END;
server =>
BEGIN
ch.body ← server[
protocolRange: [pvLow, pvHigh],
message: [messageObject: protocol3[protocol3Body: ]],
clock: System.GetGreenwichMeanTime[],
seal: CourierInternal.seal];
END;
ENDCASE; --client => bad plan!
--set initial pointer to stream
WITH h: ch SELECT FROM
user, server => ch.object.sH ← @ch.bulkFilter.object; ENDCASE;
ch.transFilter ← NIL; --there is no transport at this time
ch.link ← connection.head; connection.head ← ch; --goes at beginning of list
connection.active ← SUCC[connection.active];
connection.total ← SUCC[connection.total];
END; --CreateInternal
CreateNewStream: PUBLIC ENTRY PROC RETURNS[BOOLEAN] =
BEGIN
IF numberOfStreams >= maxStreamsAllowed THEN RETURN[FALSE];
numberOfStreams ← SUCC[numberOfStreams];
RETURN[TRUE];
END; --CreateNewStream
Delete: PUBLIC ENTRY PROC[cH: Courier.Handle] =
BEGIN
IF cH = NIL THEN RETURN WITH ERROR Courier.Error[invalidHandle]; --busted
WITH ch: LOOPHOLE[cH, CourierInternal.ConnectionHandle] SELECT FROM
user =>
BEGIN
SELECT TRUE FROM
(ch.seal # CourierInternal.seal) =>
RETURN WITH ERROR Courier.Error[invalidHandle]; --busted
(ch.streamState = busy), (ch.streamState = out) =>
RETURN WITH ERROR Courier.Error[streamNotYours];
ENDCASE => --start dally and let watcher delete
BEGIN
--client can't use his stream any more.
ch.streamState ← dally; --just waiting to timeout
ch.clock ← [System.GetGreenwichMeanTime[] + dallyTimeout];
ch.seal ← [0, 0]; --smash the seal immediately
--so it will register again (in Call) if re-acquired.
ch.createTransport ← NIL;
END;
END;
ENDCASE => RETURN WITH ERROR Courier.Error[invalidHandle]; --server, client
END; --Delete
DeleteConnection: PUBLIC ENTRY PROC [ch: CourierInternal.ConnectionHandle] =
{ENABLE UNWIND => NULL; ch ← RemoveConnection[ch]; longZone.FREE[@ch]};
DeleteStream: PUBLIC PROC [ch: CourierInternal.ConnectionHandle] =
BEGIN
aH: CourierInternal.AugmentedStream ← NIL;
DeleteStreamLocked: ENTRY PROC = --INLINE--
BEGIN
--extract stream from connection and adjust object's state
WITH h: ch SELECT FROM
user =>
BEGIN --reset protocol arbitration starting point
h.protocolRange ← [pvLow, pvHigh];
CourierInternal.SetMessageProtocolVersion[@h, pvHigh];
aH ← h.transFilter;
h.transFilter ← NIL;
h.streamState ← idle;
END;
server =>
BEGIN
aH ← h.transFilter;
h.transFilter ← NIL;
h.streamState ← idle;
END;
ENDCASE; --don't call this routine with client connection
END; --DeleteStreamLocked
DeleteStreamLocked[];
ch.endRecord ← TRUE; --simulate that we are at the end of the message
IF aH # NIL THEN
BEGIN
aH.object.delete[@aH.object];
DecrementStreamCount[];
IF CourierInternal.doStats THEN CourierInternal.stats[streamsDeleted] ←
SUCC[CourierInternal.stats[streamsDeleted]];
END;
END; --DeleteStream
EnumerateExports: PUBLIC ENTRY PROC
RETURNS[enum: LONG DESCRIPTOR FOR Courier.Exports] =
BEGIN
<<ENABLE UNWIND => NULL;>> --no non-fatal errors here
array: LONG POINTER TO Courier.Exports;
eH: CourierInternal.ExportHandle ← export.head;
Type: TYPE = RECORD[SEQUENCE COMPUTED CARDINAL OF WORD];
array ← LOOPHOLE[longZone.NEW[Type[export.active*SIZE[Courier.ExportItem]]]];
enum ← DESCRIPTOR[array, export.active];
FOR index: CARDINAL IN[0..export.active) DO
enum[index] ← [eH.programNumber, eH.versionRange, NIL, eH.exportTime];
IF eH.length # 0 THEN
BEGIN
enum[index].serviceName ← longZone.NEW[StringBody[eH.length] ← [
length: eH.length, maxlength: eH.length, text:]];
FOR s: CARDINAL IN[0..eH.length) DO
enum[index].serviceName[s] ← eH.serviceName[s];
ENDLOOP;
END;
eH ← eH.link;
ENDLOOP;
END;
ExportRemoteProgram: PUBLIC ENTRY PROC[
programNumber: LONG CARDINAL, versionRange: Courier.VersionRange,
dispatcher: Courier.Dispatcher, serviceName: LONG STRING,
zone: UNCOUNTED ZONE, classOfService: NetworkStream.ClassOfService] =
BEGIN
ENABLE UNWIND => NULL;
eH: CourierInternal.ExportHandle;
length: CARDINAL ← IF serviceName = NIL THEN 0 ELSE serviceName.length;
FOR eH ← export.head, eH.link UNTIL eH = NIL DO
IF eH.programNumber = programNumber
AND eH.versionRange.low = versionRange.low
AND eH.versionRange.high = versionRange.high THEN
RETURN WITH ERROR Courier.Error[duplicateProgramExport];
ENDLOOP;
eH ← longZone.NEW[
CourierInternal.ExportObject[length] ← [
link: NIL, programNumber: programNumber, versionRange: versionRange,
dispatcher: dispatcher, serviceName: , zone: zone,
exportTime: System.GetGreenwichMeanTime[], classOfService: classOfService]];
--once bulk its fixed, it stays fixed
IF streamClass # bulk THEN streamClass ← classOfService;
IF length # 0 THEN
FOR index: CARDINAL IN[0..serviceName.length) DO
eH.serviceName[index] ← serviceName[index]; ENDLOOP;
eH.link ← export.head; export.head ← eH; --link to beginning of list
export.active ← SUCC[export.active];
--set up the default listener if needed.
IF ~listening THEN {listening ← TRUE; RegisterListener[NIL]};
END; --ExportRemoteProgram
FreeEnumeration: PUBLIC PROC[enum: LONG DESCRIPTOR FOR Courier.Exports] =
BEGIN
freeExports: Courier.Description =
BEGIN
LongDescriptor: TYPE = LONG DESCRIPTOR FOR Courier.Exports;
[] ← notes.noteSize[SIZE[LongDescriptor]];
notes.noteArrayDescriptor[@enum, SIZE[Courier.ExportItem], LAST[CARDINAL]];
FOR index: INTEGER IN[0..LENGTH[enum]) DO
notes.noteString[@enum[index].serviceName];
ENDLOOP;
END;
Courier.Free[[@enum, freeExports], longZone];
END;
NewStreamFailed, DecrementStreamCount: PUBLIC ENTRY PROC =
{numberOfStreams ← PRED[numberOfStreams]};
ReleaseDataStream: PUBLIC PROC[cH: Courier.Handle] =
--Instructs Courier that the client is finished with the data stream.
BEGIN
IF cH = NIL THEN ERROR Courier.Error[invalidHandle];
WITH h: LOOPHOLE[cH, CourierInternal.ConnectionHandle] SELECT FROM
user =>
BEGIN
SELECT TRUE FROM
(h.seal # CourierInternal.seal) => ERROR Courier.Error[invalidHandle];
(h.streamState = out) =>
SetIdleWatcher[@h ! Courier.Error => GOTO dead];
ENDCASE => Courier.Error[streamNotYours];
EXITS dead => DeleteStream[@h];
END;
--server => CourierImplS.Receiver (the dispatcher) will release later
--client => Courier doesn't know about the data stream
ENDCASE;
END;
RemoveConnection: INTERNAL PROC [ch: CourierInternal.ConnectionHandle]
RETURNS[c: CourierInternal.ConnectionHandle] =
BEGIN
p: CourierInternal.ConnectionHandle ← NIL;
--assumed to be monitored elsewhere
--THIS DELETE IS IMMEDIATE.
FOR c ← connection.head, c.link UNTIL c = NIL DO
IF c = ch THEN
BEGIN
IF p = NIL THEN connection.head ← c.link ELSE p.link ← c.link;
connection.active ← PRED[connection.active];
EXIT;
END;
p ← c;
ENDLOOP;
END; --RemoveConnection
RequestDataStream: PUBLIC <<CourierExtras>> ENTRY PROC[
cH: Courier.Handle] RETURNS[Stream.Handle] =
BEGIN
OPEN ch: LOOPHOLE[cH, CourierInternal.UserConnection];
SELECT TRUE FROM
(cH = NIL), (ch.seal # CourierInternal.seal) =>
RETURN WITH ERROR Courier.Error[invalidHandle];
(ch.streamState = out), (ch.streamState = busy), (ch.transFilter = NIL) =>
RETURN WITH ERROR Courier.Error[streamNotYours];
ENDCASE => ch.streamState ← out; --make it look like its out
RETURN[ch.object.sH]; --there it is
END; --RequestDataStream
SearchForExport: PUBLIC ENTRY PROC[ch: CourierInternal.ServerConnection]
RETURNS[dispatcher: Courier.Dispatcher, range: Courier.VersionRange] =
BEGIN
<<
IF there is an export that satisfies, then return the relavent dispatcher
and the range is copied from the export.
IF there was any export of the program number, but no valid range, then
the dispatcher is returned NIL and the range is the union of all export
ranges (beware of non-contiguous range exports).
IF there was no export of the program, dispatcher is returned NIL and
the range is not significant (or defined for that matter).
This should probably return
results: RECORD[
SELECT * FROM
match => [Courier.Dispatcher: eH.dispatcher],
mismatch => [Courier.VersionRange: range],
none => [Courier.Dispatcher: NIL],
ENDCASE];
>>
dispatcher ← NIL; --in case we find no match
range ← [LAST[CARDINAL], FIRST[CARDINAL]]; --to record range of exports
FOR eH: CourierInternal.ExportHandle ← export.head, eH.link UNTIL eH = NIL DO
SELECT TRUE FROM
(ch.object.programNumber # eH.programNumber) => LOOP; --don't record
(ch.object.versionNumber IN[eH.versionRange.low..eH.versionRange.high]) =>
{ch.object.zone ← eH.zone; RETURN[eH.dispatcher, eH.versionRange]};
ENDCASE;
dispatcher ← eH.dispatcher; --anything but NIL
range.low ← MIN[range.low, eH.versionRange.low];
range.high ← MAX[range.high, eH.versionRange.high];
ENDLOOP;
END;
SetIdleWatcher: PUBLIC ENTRY PROC [ch: CourierInternal.UserConnection] =
BEGIN
ENABLE UNWIND => NULL;
<<
If ch = NIL or streamState = dally, the client deleted the object
in a catch phrase, then caused the UNWIND. That client probably
deserves to die a horrible death, but ....
>>
IF ch = NIL THEN RETURN; --just ignore the call
NOTIFY watch; --get somebody to keep an eye on this
SELECT ch.streamState FROM
(out) => ch.transFilter.object.setSST[ --old version of bulk data
@ch.transFilter.object, CourierProtocol.dataSST];
(dally) => RETURN; --client already asked for delete
ENDCASE;
IF ch.transFilter # NIL THEN
ch.transFilter.object.setTimeout[@ch.transFilter.object, 0];
ch.clock ← [System.GetGreenwichMeanTime[] + userIdleTimeout];
ch.streamState ← idle; --the stream is now idle
END;
StackBlockPush: PUBLIC ENTRY PROC[stack: CourierOps.StackBlockHandle]
RETURNS[page: CourierOps.StackBlockHandle] =
BEGIN
<<
Trying to push a new element on the stack and found the stack either NIL
or full. Allocate a new block and chain it to the current stack block.
'stack' is either NIL or at the block's ceiling
(i.e., stack == @??.element[CourierOps.stackObjectLimit]).
>>
<<ENABLE UNWIND => NULL;>> --safe as long as pilot not abortable
index: CARDINAL;
BEGIN --searching cache
FOR index DECREASING IN[0..CourierOps.stackCacheLength) DO
IF stackCache[index] # NIL THEN GOTO cache;
ENDLOOP;
<<
no available cached stack block found
NOTE: index is == 0 at this point (the reason for the DECREASING)
>>
stackCache[0] ← Space.Map[
window: [File.nullFile, 0, CourierOps.stackPageLength],
class: data, swapUnits: [unitary[]]].pointer;
IF CourierInternal.doStats THEN
CourierInternal.stats[stackPagesMapped] ←
SUCC[CourierInternal.stats[stackPagesMapped]];
EXITS cache => NULL;
END; --searching cache
page ← stackCache[index]; stackCache[index] ← NIL; --remove from cache
page.nextBlock ← stack; --link to old if it exists
IF CourierInternal.doStats THEN CourierInternal.stats[stackPagesGot] ←
SUCC[CourierInternal.stats[stackPagesGot]];
END;
StackBlockPop: PUBLIC ENTRY PROC[stack: CourierOps.StackBlockHandle]
RETURNS[new: CourierOps.StackBlockHandle] =
BEGIN
<<ENABLE UNWIND => NULL;>> --safe as long as pilot is not abortable
index: CARDINAL;
new ← stack.nextBlock; --pick up old link
--determine if we want to cache this block or free it.
BEGIN --searching cache
FOR index DECREASING IN[0..CourierOps.stackCacheLength) DO
IF stackCache[index] = NIL THEN GOTO cache; ENDLOOP;
--no available slot to cache this block
Space.Kill[[stack, CourierOps.stackPageLength]];
[] ← Space.Unmap[stack, return];
IF CourierInternal.doStats THEN
CourierInternal.stats[stackPagesUnmapped] ←
SUCC[CourierInternal.stats[stackPagesUnmapped]];
EXITS cache => stackCache[index] ← stack; --cache this for use later
END; --searching cache
IF CourierInternal.doStats THEN CourierInternal.stats[stackPagesPut] ←
SUCC[CourierInternal.stats[stackPagesPut]];
END;
RegisterTransport: PUBLIC PROC [cH: Courier.Handle,
transportProc: CourierInternal.Creator,
transportType: CourierInternal.TransportType] =
BEGIN
--Register the transport creation proc for the rpc user side.
ch: CourierInternal.UserConnection ← LOOPHOLE[cH];
IF transportType # ch.transportType THEN
BEGIN
--Get rid of the old stream if there is one.
IF ch.transFilter # NIL THEN
{ch.transFilter.object.delete[@ch.transFilter.object];
ch.transFilter ← NIL};
END;
IF transportProc = NIL THEN
{ch.createTransport ← CourierInternal.CreateDefaultStream;
ch.transportType ← CourierInternal.ttDefault}
ELSE
{ch.createTransport ← transportProc;
ch.transportType ← transportType};
END; --RegisterTransport
RegisterListener: PUBLIC PROC [listener: CourierInternal.Listener] =
BEGIN
prio: Process.Priority ← Process.GetPriority[]; --save his
Process.SetPriority[CommPriorities.normal]; --make sure we do this normal
IF listener # NIL THEN Process.Detach[FORK listener[]]
ELSE Process.Detach[FORK CourierInternal.DefaultListener[]];
Process.SetPriority[prio]; --set client back to whatever
END; --RegisterListener
Watcher: PROC =
BEGIN
--YOU WANNA TOUCH THIS CODE? YOU GONNA REGRET IT!
Wait: ENTRY PROC = INLINE {ENABLE UNWIND => NULL; WAIT watch};
WatcherLocked: ENTRY PROC RETURNS[
aH: CourierInternal.AugmentedStream,
ch: CourierInternal.ConnectionHandle] =
BEGIN
ENABLE UNWIND => NULL;
FOR ch ← connection.head, ch.link UNTIL ch = NIL DO
WITH h: ch SELECT FROM
user =>
BEGIN
SELECT h.streamState FROM
idle =>
BEGIN
b: CARDINAL;
closing: BOOLEAN;
notmo: Stream.InputOptions = [signalTimeout: FALSE];
two: Environment.Block = [LOOPHOLE[LONG[@b]], 0, 2];
IF (aH ← ch.transFilter) = NIL THEN LOOP; --already deleted
BEGIN
ENABLE Courier.Error => {closing ← TRUE; CONTINUE};
closing ← aH.object.get[@aH.object, two, notmo].why # timeout;
END;
IF closing OR (h.clock < now) THEN
BEGIN
ch.transFilter ← NIL; --set it to nil so we skip next time
h.protocolRange ← [ --reset state of arbitration
pvLow, pvHigh];
CourierInternal.SetMessageProtocolVersion[@h, pvHigh];
RETURN[aH, NIL]; --delete the stream, not the object
END;
END;
dally => IF h.clock < now THEN
RETURN[ch.transFilter, RemoveConnection[ch]]; --both
ENDCASE; --busy, out => NULL;
END;
ENDCASE; --server, client => NULL;
ENDLOOP;
WAIT watch; --wait 2 seconds before trying again
RETURN[NIL, NIL]; --just bailing out of the monitor
END; --WatcherLocked
now: System.GreenwichMeanTime;
aH: CourierInternal.AugmentedStream;
ch: CourierInternal.ConnectionHandle;
DO ENABLE ABORTED => EXIT; --exits only if aborted (Stop'd)
Process.SetTimeout[@watch, Process.SecondsToTicks[2]];
UNTIL connection.active = 0 DO
now ← System.GetGreenwichMeanTime[];
[aH, ch] ← WatcherLocked[]; --makes a partial or full pass
IF aH # NIL THEN
BEGIN
aH.object.delete[@aH.object]; DecrementStreamCount[];
IF CourierInternal.doStats THEN CourierInternal.stats[streamsDeleted] ←
SUCC[CourierInternal.stats[streamsDeleted]];
END;
IF ch # NIL THEN longZone.FREE[@ch]; --then delete the object
ENDLOOP;
Process.DisableTimeout[@watch]; --keeps from waking up
Heap.Prune[longZone]; --try to reclaim extra space (noop)
Wait[]; --this waits until someone else creates a connection
ENDLOOP;
END; --Watcher
UnexportRemoteProgram: PUBLIC ENTRY PROC[
programNumber: LONG CARDINAL, versionRange: Courier.VersionRange] =
BEGIN
ENABLE UNWIND => NULL;
prev, current: CourierInternal.ExportHandle;
FOR current ← export.head, current.link UNTIL current = NIL DO
IF (programNumber = current.programNumber)
AND (versionRange = current.versionRange) THEN
BEGIN --This is the element to remove from the list
IF current = export.head THEN export.head ← current.link
--beginning of list
ELSE prev.link ← current.link; --middle or end of list
longZone.FREE[@current];
IF (export.active ← PRED[export.active]) = 0 THEN
streamClass ← transactional;
RETURN;
END; --This is the element to remove from the list
prev ← current;
ENDLOOP;
RETURN WITH ERROR Courier.Error[noSuchProgramExport];
END; --UnexportRemoteProgram
Start: PUBLIC <<CourierOps.>> PROC[] RETURNS[BOOLEAN] =
BEGIN
prio: Process.Priority ← Process.GetPriority[]; --save his
IF longZone # NIL THEN RETURN[TRUE]; --already started
Process.SetPriority[CommPriorities.normal]; --make sure we do this normal
longZone ← Heap.Create[
initial: 5, increment: 5,
ownerChecking: System.switches[PilotSwitches.heapOwnerChecking] = down];
Process.EnableAborts[@watch]; --so we can get his attention
watcher ← FORK Watcher[];
Process.SetPriority[prio]; --set client back to whatever
RETURN[TRUE]; --we're up
END; --Start
Stop: PUBLIC <<CourierOps.>> PROC[] RETURNS[BOOLEAN] =
BEGIN
<<
This is assumed to be serial access due to the fact that some higher
authority is shutting us down. The procedure isn't monitored.
>>
SELECT TRUE FROM
(export.active # 0) => RETURN[FALSE]; --can't do it
(connection.active # 0) => RETURN[FALSE]; --that either
ENDCASE;
Process.Abort[watcher]; --get his attention
JOIN watcher; --come hither, little boy
FOR index: NATURAL IN[0..CourierOps.stackCacheLength) DO
p: LONG POINTER = stackCache[index];
IF p # NIL THEN {[] ← Space.Unmap[p]; stackCache[index] ← NIL};
ENDLOOP;
Heap.Delete[z: longZone,
checkEmpty: System.switches[PilotSwitches.heapOwnerChecking] = down];
longZone ← NIL;
RETURN[TRUE];
END; --Stop
[] ← Start[]; --motion, motion. lots of motion
END.... --CourierImplM.mesa
LOG
5-Jan-87 15:29:48 AOF Trimmed log for Pilot 13.0
5-Jan-87 15:25:12 AOF Merge in Courier extras for DBMS
16-Jan-87 9:44:36 AOF Removal of Courier Version 2.0
16-Jan-87 9:44:36 AOF Move all ERRORs to here
19-Jun-87 13:33:59 AOF Starting and stopping