-- File: CourierImplM.mesa - last edit: -- AOF 19-Jun-87 18:53:01 -- Copyright (C) 1984, 1985, 1986, 1987 by Xerox Corporation. All rights reserved. DIRECTORY CommPriorities USING [normal], Courier USING [ Arguments, Description, Dispatcher, Error, ErrorCode, ExportItem, Exports, Free, Handle, Parameters, SystemElement, VersionRange], CourierExtras USING [Missing], CourierInternal USING [ AugmentedStream, ConnectionHandle, ConnectionObject, ConnectionType, CreateDefaultStream, Creator, DefaultListener, doStats, ExportHandle, ExportObject, Listener, seal, ServerConnection, SetMessageProtocolVersion, stats, TransportType, ttDefault, UserConnection, WaitTime], CourierOps USING [ StackBlockHandle, stackCacheLength, stackPageLength], CourierProtocol USING [dataSST, Protocol, pvHigh], Environment USING [Block], File USING [File, nullFile], Heap USING [Create, Delete, Prune], Inline USING [LowHalf], NetworkStream USING [ClassOfService, FindAddresses], NSConstants USING [courierSocket], PilotSwitches USING [heapOwnerChecking], Process USING [ Abort, Detach, DisableTimeout, EnableAborts, SecondsToTicks, SetTimeout, Priority, SetPriority, GetPriority], Space USING [Interval, Kill, Map, Unmap], Stream USING [Block, Handle, InputOptions], System USING [ GetClockPulses, GetGreenwichMeanTime, GreenwichMeanTime, HostNumber, NetworkAddress, switches]; CourierImplM: MONITOR IMPORTS Heap, Inline, Courier, CourierInternal, NetworkStream, Process, Space, System EXPORTS Courier, CourierExtras, CourierInternal, CourierOps = BEGIN pvLow: CourierProtocol.Protocol = CourierProtocol.pvHigh; pvHigh: CourierProtocol.Protocol = CourierProtocol.pvHigh; --EXPORTS longZone: PUBLIC UNCOUNTED ZONE; NotFound: PUBLIC <<CourierExtras>> ERROR[what: CourierExtras.Missing] = CODE; Error: PUBLIC <<Courier>> ERROR [errorCode: Courier.ErrorCode] = CODE; VersionMismatch: PUBLIC <<Courier>> ERROR [range: Courier.VersionRange] = CODE; --raised by Courier @ user in response to an "abort" message RemoteErrorSignalled: PUBLIC <<Courier>> ERROR [ errorNumber: CARDINAL, arguments: Courier.Arguments] = CODE; --Raised by the dispatcher. Translated to Error[noSuchProcedureNumber] at user. NoSuchProcedureNumber: PUBLIC <<Courier>> ERROR = CODE; --Raised by client. Translated to Error[invalidArguments] at user. InvalidArguments: PUBLIC <<Courier>> ERROR = CODE; --Generated by implementer of a remote procedure, i.e. the dispatcher. SignalRemoteError: PUBLIC <<Courier>> ERROR [ errorNumber: CARDINAL, arguments: Courier.Parameters] = CODE; << The listener will create 'streamClass' streams when it acknowledges connections. ExportRemoteProgram will leave this 'transactional' until the first export requesting 'bulk' is encountered. After that all streams created will be bulk. >> streamClass: PUBLIC NetworkStream.ClassOfService ← transactional; --record time Courier was first started on this system element startTime: PUBLIC System.GreenwichMeanTime ← System.GetGreenwichMeanTime[]; << Streams are totalled, user + server, and only maxStreamsAllowed are allowed to exist an any time. >> numberOfStreams: PUBLIC CARDINAL ← 0; maxStreamsAllowed: PUBLIC CARDINAL ← 25; --Various times paramaters {tmo ← base + (hops * hopWeight)} hopWeight: PUBLIC CARDINAL ← 1; userIdleTimeout: PUBLIC CARDINAL ← 15; --with no activity dallyTimeout: PUBLIC CARDINAL ← 2; --from time client reqeusts delete activeTimeout: PUBLIC CourierInternal.WaitTime ← LAST[CourierInternal.WaitTime]; serverIdleTimeout: PUBLIC CourierInternal.WaitTime ← 90000; --no activity createTimeout: PUBLIC CourierInternal.WaitTime ← 20000; --in formula watcher: PROCESS ← NIL; --idle line watcher watch: CONDITION; --condition variable used inside --one-time check for the default listener on this system element listening: BOOLEAN ← FALSE; stackCache: ARRAY CARDINAL[0..CourierOps.stackCacheLength) OF CourierOps.StackBlockHandle ← ALL[NIL]; export: RECORD [ head: CourierInternal.ExportHandle, active, total: CARDINAL] ← [NIL, 0, 0]; connection: RECORD [ head: CourierInternal.ConnectionHandle, active, total: CARDINAL] ← [NIL, 0, 0]; AddressesFromCourier: PUBLIC <<CourierExtras>> ENTRY PROC[ courier: Courier.Handle] RETURNS[local, remote: System.NetworkAddress] = BEGIN FOR ch: CourierInternal.ConnectionHandle ← connection.head, ch.link UNTIL ch = NIL DO IF @ch.object = courier THEN BEGIN sH: Stream.Handle; IF ch.transFilter = NIL THEN RETURN WITH ERROR NotFound[address]; sH ← LOOPHOLE[@ch.transFilter.context, LONG POINTER TO Stream.Handle]↑; IF sH = NIL THEN RETURN WITH ERROR NotFound[address]; RETURN NetworkStream.FindAddresses[sH]; END; REPEAT FINISHED => RETURN WITH ERROR NotFound[courier]; ENDLOOP; END; --AddressesFromCourier AssignTransportType: PUBLIC PROC RETURNS [t: CourierInternal.TransportType] = BEGIN t ← 0; UNTIL t # 0 DO t ← Inline.LowHalf[System.GetClockPulses[]]; ENDLOOP; END; --AssignTransportType CourierFromAddresses: PUBLIC <<CourierExtras>> ENTRY PROC[ local, remote: System.NetworkAddress] RETURNS[courier: Courier.Handle] = BEGIN FOR ch: CourierInternal.ConnectionHandle ← connection.head, ch.link UNTIL ch = NIL DO sH: Stream.Handle; l, r: System.NetworkAddress; IF ch.transFilter = NIL THEN LOOP; sH ← LOOPHOLE[@ch.transFilter.context, LONG POINTER TO Stream.Handle]↑; IF sH = NIL THEN LOOP; [l, r] ← NetworkStream.FindAddresses[sH]; IF (l = local) AND (r = remote) THEN RETURN[@ch.object]; REPEAT FINISHED => RETURN WITH ERROR NotFound[address]; ENDLOOP; END; --CourierFromAddresses Create: PUBLIC PROC[ remote: Courier.SystemElement, programNumber: LONG CARDINAL, versionNumber: CARDINAL, zone: UNCOUNTED ZONE, classOfService: NetworkStream.ClassOfService] RETURNS[cH: Courier.Handle] = BEGIN --Build a user connection object, returning a handle. --This may signal (via CreateInternal) ch: CourierInternal.ConnectionHandle ← CreateInternal[user, remote]; remote.socket ← NSConstants.courierSocket; ch.object.remote ← remote; ch.object.programNumber ← programNumber; ch.object.versionNumber ← versionNumber; ch.object.zone ← zone; ch.object.classOfService ← classOfService; RETURN[@ch.object]; END; --Create CreateInternal: PUBLIC ENTRY PROC [type: CourierInternal.ConnectionType, remote: Courier.SystemElement] RETURNS[ch: CourierInternal.ConnectionHandle] = BEGIN <<ENABLE UNWIND => NULL;>> --no non-fatal errors here h: CourierInternal.ConnectionHandle; --look for an existing object. IF type = user THEN BEGIN FOR h ← connection.head, h.link UNTIL h = NIL DO WITH c: h SELECT FROM user => --only user objects are acquirable. IF (c.streamState = dally) AND --and we only grab dallying ones. (c.object.remote.host = remote.host) THEN BEGIN --found one! ch ← h; c.streamState ← idle; --so nobody else grabs it. c.seal ← CourierInternal.seal; IF CourierInternal.doStats THEN CourierInternal.stats[streamsAcquired] ← SUCC[CourierInternal.stats[streamsAcquired]]; RETURN; --look no further END; --found cached connection object ENDCASE; --server, client => NULL; ENDLOOP; --didn't find a cached object - create a new one. ch ← longZone.NEW[CourierInternal.ConnectionObject]; END ELSE --type # user, create new object. ch ← longZone.NEW[CourierInternal.ConnectionObject]; SELECT type FROM user => BEGIN ch.body ← user[ protocolRange: [pvLow, pvHigh], message: [messageObject: protocol3[protocol3Body: ]], clock: System.GetGreenwichMeanTime[], seal: CourierInternal.seal]; END; server => BEGIN ch.body ← server[ protocolRange: [pvLow, pvHigh], message: [messageObject: protocol3[protocol3Body: ]], clock: System.GetGreenwichMeanTime[], seal: CourierInternal.seal]; END; ENDCASE; --client => bad plan! --set initial pointer to stream WITH h: ch SELECT FROM user, server => ch.object.sH ← @ch.bulkFilter.object; ENDCASE; ch.transFilter ← NIL; --there is no transport at this time ch.link ← connection.head; connection.head ← ch; --goes at beginning of list connection.active ← SUCC[connection.active]; connection.total ← SUCC[connection.total]; END; --CreateInternal CreateNewStream: PUBLIC ENTRY PROC RETURNS[BOOLEAN] = BEGIN IF numberOfStreams >= maxStreamsAllowed THEN RETURN[FALSE]; numberOfStreams ← SUCC[numberOfStreams]; RETURN[TRUE]; END; --CreateNewStream Delete: PUBLIC ENTRY PROC[cH: Courier.Handle] = BEGIN IF cH = NIL THEN RETURN WITH ERROR Courier.Error[invalidHandle]; --busted WITH ch: LOOPHOLE[cH, CourierInternal.ConnectionHandle] SELECT FROM user => BEGIN SELECT TRUE FROM (ch.seal # CourierInternal.seal) => RETURN WITH ERROR Courier.Error[invalidHandle]; --busted (ch.streamState = busy), (ch.streamState = out) => RETURN WITH ERROR Courier.Error[streamNotYours]; ENDCASE => --start dally and let watcher delete BEGIN --client can't use his stream any more. ch.streamState ← dally; --just waiting to timeout ch.clock ← [System.GetGreenwichMeanTime[] + dallyTimeout]; ch.seal ← [0, 0]; --smash the seal immediately --so it will register again (in Call) if re-acquired. ch.createTransport ← NIL; END; END; ENDCASE => RETURN WITH ERROR Courier.Error[invalidHandle]; --server, client END; --Delete DeleteConnection: PUBLIC ENTRY PROC [ch: CourierInternal.ConnectionHandle] = {ENABLE UNWIND => NULL; ch ← RemoveConnection[ch]; longZone.FREE[@ch]}; DeleteStream: PUBLIC PROC [ch: CourierInternal.ConnectionHandle] = BEGIN aH: CourierInternal.AugmentedStream ← NIL; DeleteStreamLocked: ENTRY PROC = --INLINE-- BEGIN --extract stream from connection and adjust object's state WITH h: ch SELECT FROM user => BEGIN --reset protocol arbitration starting point h.protocolRange ← [pvLow, pvHigh]; CourierInternal.SetMessageProtocolVersion[@h, pvHigh]; aH ← h.transFilter; h.transFilter ← NIL; h.streamState ← idle; END; server => BEGIN aH ← h.transFilter; h.transFilter ← NIL; h.streamState ← idle; END; ENDCASE; --don't call this routine with client connection END; --DeleteStreamLocked DeleteStreamLocked[]; ch.endRecord ← TRUE; --simulate that we are at the end of the message IF aH # NIL THEN BEGIN aH.object.delete[@aH.object]; DecrementStreamCount[]; IF CourierInternal.doStats THEN CourierInternal.stats[streamsDeleted] ← SUCC[CourierInternal.stats[streamsDeleted]]; END; END; --DeleteStream EnumerateExports: PUBLIC ENTRY PROC RETURNS[enum: LONG DESCRIPTOR FOR Courier.Exports] = BEGIN <<ENABLE UNWIND => NULL;>> --no non-fatal errors here array: LONG POINTER TO Courier.Exports; eH: CourierInternal.ExportHandle ← export.head; Type: TYPE = RECORD[SEQUENCE COMPUTED CARDINAL OF WORD]; array ← LOOPHOLE[longZone.NEW[Type[export.active*SIZE[Courier.ExportItem]]]]; enum ← DESCRIPTOR[array, export.active]; FOR index: CARDINAL IN[0..export.active) DO enum[index] ← [eH.programNumber, eH.versionRange, NIL, eH.exportTime]; IF eH.length # 0 THEN BEGIN enum[index].serviceName ← longZone.NEW[StringBody[eH.length] ← [ length: eH.length, maxlength: eH.length, text:]]; FOR s: CARDINAL IN[0..eH.length) DO enum[index].serviceName[s] ← eH.serviceName[s]; ENDLOOP; END; eH ← eH.link; ENDLOOP; END; ExportRemoteProgram: PUBLIC ENTRY PROC[ programNumber: LONG CARDINAL, versionRange: Courier.VersionRange, dispatcher: Courier.Dispatcher, serviceName: LONG STRING, zone: UNCOUNTED ZONE, classOfService: NetworkStream.ClassOfService] = BEGIN ENABLE UNWIND => NULL; eH: CourierInternal.ExportHandle; length: CARDINAL ← IF serviceName = NIL THEN 0 ELSE serviceName.length; FOR eH ← export.head, eH.link UNTIL eH = NIL DO IF eH.programNumber = programNumber AND eH.versionRange.low = versionRange.low AND eH.versionRange.high = versionRange.high THEN RETURN WITH ERROR Courier.Error[duplicateProgramExport]; ENDLOOP; eH ← longZone.NEW[ CourierInternal.ExportObject[length] ← [ link: NIL, programNumber: programNumber, versionRange: versionRange, dispatcher: dispatcher, serviceName: , zone: zone, exportTime: System.GetGreenwichMeanTime[], classOfService: classOfService]]; --once bulk its fixed, it stays fixed IF streamClass # bulk THEN streamClass ← classOfService; IF length # 0 THEN FOR index: CARDINAL IN[0..serviceName.length) DO eH.serviceName[index] ← serviceName[index]; ENDLOOP; eH.link ← export.head; export.head ← eH; --link to beginning of list export.active ← SUCC[export.active]; --set up the default listener if needed. IF ~listening THEN {listening ← TRUE; RegisterListener[NIL]}; END; --ExportRemoteProgram FreeEnumeration: PUBLIC PROC[enum: LONG DESCRIPTOR FOR Courier.Exports] = BEGIN freeExports: Courier.Description = BEGIN LongDescriptor: TYPE = LONG DESCRIPTOR FOR Courier.Exports; [] ← notes.noteSize[SIZE[LongDescriptor]]; notes.noteArrayDescriptor[@enum, SIZE[Courier.ExportItem], LAST[CARDINAL]]; FOR index: INTEGER IN[0..LENGTH[enum]) DO notes.noteString[@enum[index].serviceName]; ENDLOOP; END; Courier.Free[[@enum, freeExports], longZone]; END; NewStreamFailed, DecrementStreamCount: PUBLIC ENTRY PROC = {numberOfStreams ← PRED[numberOfStreams]}; ReleaseDataStream: PUBLIC PROC[cH: Courier.Handle] = --Instructs Courier that the client is finished with the data stream. BEGIN IF cH = NIL THEN ERROR Courier.Error[invalidHandle]; WITH h: LOOPHOLE[cH, CourierInternal.ConnectionHandle] SELECT FROM user => BEGIN SELECT TRUE FROM (h.seal # CourierInternal.seal) => ERROR Courier.Error[invalidHandle]; (h.streamState = out) => SetIdleWatcher[@h ! Courier.Error => GOTO dead]; ENDCASE => Courier.Error[streamNotYours]; EXITS dead => DeleteStream[@h]; END; --server => CourierImplS.Receiver (the dispatcher) will release later --client => Courier doesn't know about the data stream ENDCASE; END; RemoveConnection: INTERNAL PROC [ch: CourierInternal.ConnectionHandle] RETURNS[c: CourierInternal.ConnectionHandle] = BEGIN p: CourierInternal.ConnectionHandle ← NIL; --assumed to be monitored elsewhere --THIS DELETE IS IMMEDIATE. FOR c ← connection.head, c.link UNTIL c = NIL DO IF c = ch THEN BEGIN IF p = NIL THEN connection.head ← c.link ELSE p.link ← c.link; connection.active ← PRED[connection.active]; EXIT; END; p ← c; ENDLOOP; END; --RemoveConnection RequestDataStream: PUBLIC <<CourierExtras>> ENTRY PROC[ cH: Courier.Handle] RETURNS[Stream.Handle] = BEGIN OPEN ch: LOOPHOLE[cH, CourierInternal.UserConnection]; SELECT TRUE FROM (cH = NIL), (ch.seal # CourierInternal.seal) => RETURN WITH ERROR Courier.Error[invalidHandle]; (ch.streamState = out), (ch.streamState = busy), (ch.transFilter = NIL) => RETURN WITH ERROR Courier.Error[streamNotYours]; ENDCASE => ch.streamState ← out; --make it look like its out RETURN[ch.object.sH]; --there it is END; --RequestDataStream SearchForExport: PUBLIC ENTRY PROC[ch: CourierInternal.ServerConnection] RETURNS[dispatcher: Courier.Dispatcher, range: Courier.VersionRange] = BEGIN << IF there is an export that satisfies, then return the relavent dispatcher and the range is copied from the export. IF there was any export of the program number, but no valid range, then the dispatcher is returned NIL and the range is the union of all export ranges (beware of non-contiguous range exports). IF there was no export of the program, dispatcher is returned NIL and the range is not significant (or defined for that matter). This should probably return results: RECORD[ SELECT * FROM match => [Courier.Dispatcher: eH.dispatcher], mismatch => [Courier.VersionRange: range], none => [Courier.Dispatcher: NIL], ENDCASE]; >> dispatcher ← NIL; --in case we find no match range ← [LAST[CARDINAL], FIRST[CARDINAL]]; --to record range of exports FOR eH: CourierInternal.ExportHandle ← export.head, eH.link UNTIL eH = NIL DO SELECT TRUE FROM (ch.object.programNumber # eH.programNumber) => LOOP; --don't record (ch.object.versionNumber IN[eH.versionRange.low..eH.versionRange.high]) => {ch.object.zone ← eH.zone; RETURN[eH.dispatcher, eH.versionRange]}; ENDCASE; dispatcher ← eH.dispatcher; --anything but NIL range.low ← MIN[range.low, eH.versionRange.low]; range.high ← MAX[range.high, eH.versionRange.high]; ENDLOOP; END; SetIdleWatcher: PUBLIC ENTRY PROC [ch: CourierInternal.UserConnection] = BEGIN ENABLE UNWIND => NULL; << If ch = NIL or streamState = dally, the client deleted the object in a catch phrase, then caused the UNWIND. That client probably deserves to die a horrible death, but .... >> IF ch = NIL THEN RETURN; --just ignore the call NOTIFY watch; --get somebody to keep an eye on this SELECT ch.streamState FROM (out) => ch.transFilter.object.setSST[ --old version of bulk data @ch.transFilter.object, CourierProtocol.dataSST]; (dally) => RETURN; --client already asked for delete ENDCASE; IF ch.transFilter # NIL THEN ch.transFilter.object.setTimeout[@ch.transFilter.object, 0]; ch.clock ← [System.GetGreenwichMeanTime[] + userIdleTimeout]; ch.streamState ← idle; --the stream is now idle END; StackBlockPush: PUBLIC ENTRY PROC[stack: CourierOps.StackBlockHandle] RETURNS[page: CourierOps.StackBlockHandle] = BEGIN << Trying to push a new element on the stack and found the stack either NIL or full. Allocate a new block and chain it to the current stack block. 'stack' is either NIL or at the block's ceiling (i.e., stack == @??.element[CourierOps.stackObjectLimit]). >> <<ENABLE UNWIND => NULL;>> --safe as long as pilot not abortable index: CARDINAL; BEGIN --searching cache FOR index DECREASING IN[0..CourierOps.stackCacheLength) DO IF stackCache[index] # NIL THEN GOTO cache; ENDLOOP; << no available cached stack block found NOTE: index is == 0 at this point (the reason for the DECREASING) >> stackCache[0] ← Space.Map[ window: [File.nullFile, 0, CourierOps.stackPageLength], class: data, swapUnits: [unitary[]]].pointer; IF CourierInternal.doStats THEN CourierInternal.stats[stackPagesMapped] ← SUCC[CourierInternal.stats[stackPagesMapped]]; EXITS cache => NULL; END; --searching cache page ← stackCache[index]; stackCache[index] ← NIL; --remove from cache page.nextBlock ← stack; --link to old if it exists IF CourierInternal.doStats THEN CourierInternal.stats[stackPagesGot] ← SUCC[CourierInternal.stats[stackPagesGot]]; END; StackBlockPop: PUBLIC ENTRY PROC[stack: CourierOps.StackBlockHandle] RETURNS[new: CourierOps.StackBlockHandle] = BEGIN <<ENABLE UNWIND => NULL;>> --safe as long as pilot is not abortable index: CARDINAL; new ← stack.nextBlock; --pick up old link --determine if we want to cache this block or free it. BEGIN --searching cache FOR index DECREASING IN[0..CourierOps.stackCacheLength) DO IF stackCache[index] = NIL THEN GOTO cache; ENDLOOP; --no available slot to cache this block Space.Kill[[stack, CourierOps.stackPageLength]]; [] ← Space.Unmap[stack, return]; IF CourierInternal.doStats THEN CourierInternal.stats[stackPagesUnmapped] ← SUCC[CourierInternal.stats[stackPagesUnmapped]]; EXITS cache => stackCache[index] ← stack; --cache this for use later END; --searching cache IF CourierInternal.doStats THEN CourierInternal.stats[stackPagesPut] ← SUCC[CourierInternal.stats[stackPagesPut]]; END; RegisterTransport: PUBLIC PROC [cH: Courier.Handle, transportProc: CourierInternal.Creator, transportType: CourierInternal.TransportType] = BEGIN --Register the transport creation proc for the rpc user side. ch: CourierInternal.UserConnection ← LOOPHOLE[cH]; IF transportType # ch.transportType THEN BEGIN --Get rid of the old stream if there is one. IF ch.transFilter # NIL THEN {ch.transFilter.object.delete[@ch.transFilter.object]; ch.transFilter ← NIL}; END; IF transportProc = NIL THEN {ch.createTransport ← CourierInternal.CreateDefaultStream; ch.transportType ← CourierInternal.ttDefault} ELSE {ch.createTransport ← transportProc; ch.transportType ← transportType}; END; --RegisterTransport RegisterListener: PUBLIC PROC [listener: CourierInternal.Listener] = BEGIN prio: Process.Priority ← Process.GetPriority[]; --save his Process.SetPriority[CommPriorities.normal]; --make sure we do this normal IF listener # NIL THEN Process.Detach[FORK listener[]] ELSE Process.Detach[FORK CourierInternal.DefaultListener[]]; Process.SetPriority[prio]; --set client back to whatever END; --RegisterListener Watcher: PROC = BEGIN --YOU WANNA TOUCH THIS CODE? YOU GONNA REGRET IT! Wait: ENTRY PROC = INLINE {ENABLE UNWIND => NULL; WAIT watch}; WatcherLocked: ENTRY PROC RETURNS[ aH: CourierInternal.AugmentedStream, ch: CourierInternal.ConnectionHandle] = BEGIN ENABLE UNWIND => NULL; FOR ch ← connection.head, ch.link UNTIL ch = NIL DO WITH h: ch SELECT FROM user => BEGIN SELECT h.streamState FROM idle => BEGIN b: CARDINAL; closing: BOOLEAN; notmo: Stream.InputOptions = [signalTimeout: FALSE]; two: Environment.Block = [LOOPHOLE[LONG[@b]], 0, 2]; IF (aH ← ch.transFilter) = NIL THEN LOOP; --already deleted BEGIN ENABLE Courier.Error => {closing ← TRUE; CONTINUE}; closing ← aH.object.get[@aH.object, two, notmo].why # timeout; END; IF closing OR (h.clock < now) THEN BEGIN ch.transFilter ← NIL; --set it to nil so we skip next time h.protocolRange ← [ --reset state of arbitration pvLow, pvHigh]; CourierInternal.SetMessageProtocolVersion[@h, pvHigh]; RETURN[aH, NIL]; --delete the stream, not the object END; END; dally => IF h.clock < now THEN RETURN[ch.transFilter, RemoveConnection[ch]]; --both ENDCASE; --busy, out => NULL; END; ENDCASE; --server, client => NULL; ENDLOOP; WAIT watch; --wait 2 seconds before trying again RETURN[NIL, NIL]; --just bailing out of the monitor END; --WatcherLocked now: System.GreenwichMeanTime; aH: CourierInternal.AugmentedStream; ch: CourierInternal.ConnectionHandle; DO ENABLE ABORTED => EXIT; --exits only if aborted (Stop'd) Process.SetTimeout[@watch, Process.SecondsToTicks[2]]; UNTIL connection.active = 0 DO now ← System.GetGreenwichMeanTime[]; [aH, ch] ← WatcherLocked[]; --makes a partial or full pass IF aH # NIL THEN BEGIN aH.object.delete[@aH.object]; DecrementStreamCount[]; IF CourierInternal.doStats THEN CourierInternal.stats[streamsDeleted] ← SUCC[CourierInternal.stats[streamsDeleted]]; END; IF ch # NIL THEN longZone.FREE[@ch]; --then delete the object ENDLOOP; Process.DisableTimeout[@watch]; --keeps from waking up Heap.Prune[longZone]; --try to reclaim extra space (noop) Wait[]; --this waits until someone else creates a connection ENDLOOP; END; --Watcher UnexportRemoteProgram: PUBLIC ENTRY PROC[ programNumber: LONG CARDINAL, versionRange: Courier.VersionRange] = BEGIN ENABLE UNWIND => NULL; prev, current: CourierInternal.ExportHandle; FOR current ← export.head, current.link UNTIL current = NIL DO IF (programNumber = current.programNumber) AND (versionRange = current.versionRange) THEN BEGIN --This is the element to remove from the list IF current = export.head THEN export.head ← current.link --beginning of list ELSE prev.link ← current.link; --middle or end of list longZone.FREE[@current]; IF (export.active ← PRED[export.active]) = 0 THEN streamClass ← transactional; RETURN; END; --This is the element to remove from the list prev ← current; ENDLOOP; RETURN WITH ERROR Courier.Error[noSuchProgramExport]; END; --UnexportRemoteProgram Start: PUBLIC <<CourierOps.>> PROC[] RETURNS[BOOLEAN] = BEGIN prio: Process.Priority ← Process.GetPriority[]; --save his IF longZone # NIL THEN RETURN[TRUE]; --already started Process.SetPriority[CommPriorities.normal]; --make sure we do this normal longZone ← Heap.Create[ initial: 5, increment: 5, ownerChecking: System.switches[PilotSwitches.heapOwnerChecking] = down]; Process.EnableAborts[@watch]; --so we can get his attention watcher ← FORK Watcher[]; Process.SetPriority[prio]; --set client back to whatever RETURN[TRUE]; --we're up END; --Start Stop: PUBLIC <<CourierOps.>> PROC[] RETURNS[BOOLEAN] = BEGIN << This is assumed to be serial access due to the fact that some higher authority is shutting us down. The procedure isn't monitored. >> SELECT TRUE FROM (export.active # 0) => RETURN[FALSE]; --can't do it (connection.active # 0) => RETURN[FALSE]; --that either ENDCASE; Process.Abort[watcher]; --get his attention JOIN watcher; --come hither, little boy FOR index: NATURAL IN[0..CourierOps.stackCacheLength) DO p: LONG POINTER = stackCache[index]; IF p # NIL THEN {[] ← Space.Unmap[p]; stackCache[index] ← NIL}; ENDLOOP; Heap.Delete[z: longZone, checkEmpty: System.switches[PilotSwitches.heapOwnerChecking] = down]; longZone ← NIL; RETURN[TRUE]; END; --Stop [] ← Start[]; --motion, motion. lots of motion END.... --CourierImplM.mesa LOG 5-Jan-87 15:29:48 AOF Trimmed log for Pilot 13.0 5-Jan-87 15:25:12 AOF Merge in Courier extras for DBMS 16-Jan-87 9:44:36 AOF Removal of Courier Version 2.0 16-Jan-87 9:44:36 AOF Move all ERRORs to here 19-Jun-87 13:33:59 AOF Starting and stopping