-- File: CourierImplM.mesa - last edit:
-- AOF                 19-Jun-87 18:53:01
-- Copyright (C) 1984, 1985, 1986, 1987 by Xerox Corporation. All rights reserved. 

DIRECTORY
  CommPriorities USING [normal],
  Courier USING [
    Arguments, Description, Dispatcher, Error, ErrorCode, ExportItem,
    Exports, Free, Handle, Parameters, SystemElement, VersionRange],
  CourierExtras USING [Missing],
  CourierInternal USING [
    AugmentedStream, ConnectionHandle, ConnectionObject, ConnectionType,
    CreateDefaultStream, Creator, DefaultListener, doStats, ExportHandle,
    ExportObject, Listener, seal, ServerConnection, SetMessageProtocolVersion,
    stats, TransportType, ttDefault, UserConnection, WaitTime],
  CourierOps USING [
    StackBlockHandle, stackCacheLength, stackPageLength],
  CourierProtocol USING [dataSST, Protocol, pvHigh],
  Environment USING [Block],
  File USING [File, nullFile],
  Heap USING [Create, Delete, Prune],
  Inline USING [LowHalf],
  NetworkStream USING [ClassOfService, FindAddresses],
  NSConstants USING [courierSocket],
  PilotSwitches USING [heapOwnerChecking],
  Process USING [
    Abort, Detach, DisableTimeout, EnableAborts, SecondsToTicks, SetTimeout,
    Priority, SetPriority, GetPriority],
  Space USING [Interval, Kill, Map, Unmap],
  Stream USING [Block, Handle, InputOptions],
  System USING [
    GetClockPulses, GetGreenwichMeanTime, GreenwichMeanTime, HostNumber,
    NetworkAddress, switches];
  
CourierImplM: MONITOR
  IMPORTS
    Heap, Inline, Courier, CourierInternal, NetworkStream, Process,
    Space, System
  EXPORTS
    Courier, CourierExtras, CourierInternal, CourierOps =
  BEGIN

  pvLow: CourierProtocol.Protocol = CourierProtocol.pvHigh;
  pvHigh: CourierProtocol.Protocol = CourierProtocol.pvHigh;   
  
  --EXPORTS

  longZone: PUBLIC UNCOUNTED ZONE;
  NotFound: PUBLIC <<CourierExtras>> ERROR[what: CourierExtras.Missing] = CODE;
  Error: PUBLIC <<Courier>> ERROR [errorCode: Courier.ErrorCode] = CODE;
  VersionMismatch: PUBLIC <<Courier>> ERROR [range: Courier.VersionRange] = CODE;
  --raised by Courier @ user in response to an "abort" message
  RemoteErrorSignalled: PUBLIC <<Courier>> ERROR [
    errorNumber: CARDINAL, arguments: Courier.Arguments] = CODE;
  --Raised by the dispatcher.  Translated to Error[noSuchProcedureNumber] at user.
  NoSuchProcedureNumber: PUBLIC <<Courier>> ERROR = CODE;
  --Raised by client.  Translated to Error[invalidArguments] at user.
  InvalidArguments: PUBLIC <<Courier>> ERROR = CODE;
  --Generated by implementer of a remote procedure, i.e. the dispatcher.
  SignalRemoteError: PUBLIC <<Courier>> ERROR [
    errorNumber: CARDINAL, arguments: Courier.Parameters] = CODE;

  
  <<
  The listener will create 'streamClass' streams when it acknowledges
  connections.  ExportRemoteProgram will leave this 'transactional' until
  the first export requesting 'bulk' is encountered.  After that all
  streams created will be bulk.
  >>
  streamClass: PUBLIC NetworkStream.ClassOfService ← transactional;
  
  --record time Courier was first started on this system element
  startTime: PUBLIC System.GreenwichMeanTime ← System.GetGreenwichMeanTime[];
  
  <<
  Streams are totalled, user + server, and only maxStreamsAllowed
  are allowed to exist an any time.
  >>
  numberOfStreams: PUBLIC CARDINAL ← 0;
  maxStreamsAllowed: PUBLIC CARDINAL ← 25; 
  
  --Various times paramaters {tmo ← base + (hops * hopWeight)}
  hopWeight: PUBLIC CARDINAL ← 1;
  userIdleTimeout: PUBLIC CARDINAL ← 15;  --with no activity
  dallyTimeout: PUBLIC CARDINAL ← 2;  --from time client reqeusts delete
  activeTimeout: PUBLIC CourierInternal.WaitTime ← LAST[CourierInternal.WaitTime];
  serverIdleTimeout: PUBLIC CourierInternal.WaitTime ← 90000;  --no activity
  createTimeout: PUBLIC CourierInternal.WaitTime ← 20000;  --in formula
  
  watcher: PROCESS ← NIL;  --idle line watcher
  watch: CONDITION;  --condition variable used inside
  
  --one-time check for the default listener on this system element
  listening: BOOLEAN ← FALSE;
  
  stackCache: ARRAY CARDINAL[0..CourierOps.stackCacheLength) OF
    CourierOps.StackBlockHandle ← ALL[NIL];
  
  export: RECORD [
    head: CourierInternal.ExportHandle, active, total: CARDINAL] ←
      [NIL, 0, 0];
  connection: RECORD [
    head: CourierInternal.ConnectionHandle, active, total: CARDINAL] ←
      [NIL, 0, 0];

  AddressesFromCourier: PUBLIC <<CourierExtras>> ENTRY PROC[
    courier: Courier.Handle] RETURNS[local, remote: System.NetworkAddress] =
    BEGIN
    FOR ch: CourierInternal.ConnectionHandle ← connection.head, ch.link
      UNTIL ch = NIL DO
      IF @ch.object = courier THEN
        BEGIN
	sH: Stream.Handle;
	IF ch.transFilter = NIL THEN RETURN WITH ERROR NotFound[address];
	sH ← LOOPHOLE[@ch.transFilter.context, LONG POINTER TO Stream.Handle]↑;
	IF sH = NIL THEN RETURN WITH ERROR NotFound[address];
	RETURN NetworkStream.FindAddresses[sH];
	END;
      REPEAT FINISHED => RETURN WITH ERROR NotFound[courier];
      ENDLOOP;
    END;  --AddressesFromCourier

  AssignTransportType: PUBLIC PROC RETURNS [t: CourierInternal.TransportType] =
    BEGIN
    t ← 0;
    UNTIL t # 0 DO t ← Inline.LowHalf[System.GetClockPulses[]]; ENDLOOP;
    END;  --AssignTransportType

  CourierFromAddresses: PUBLIC <<CourierExtras>> ENTRY PROC[
    local, remote: System.NetworkAddress] RETURNS[courier: Courier.Handle] =
    BEGIN
    FOR ch: CourierInternal.ConnectionHandle ← connection.head, ch.link
      UNTIL ch = NIL DO
      sH: Stream.Handle;
      l, r: System.NetworkAddress;
      IF ch.transFilter = NIL THEN LOOP;
      sH ← LOOPHOLE[@ch.transFilter.context, LONG POINTER TO Stream.Handle]↑;
      IF sH = NIL THEN LOOP;
      [l, r] ← NetworkStream.FindAddresses[sH];
      IF (l = local) AND (r = remote) THEN RETURN[@ch.object];
      REPEAT FINISHED => RETURN WITH ERROR NotFound[address];
      ENDLOOP;
    END;  --CourierFromAddresses
    
  Create: PUBLIC PROC[
    remote: Courier.SystemElement,
    programNumber: LONG CARDINAL, versionNumber: CARDINAL,
    zone: UNCOUNTED ZONE, classOfService: NetworkStream.ClassOfService]
    RETURNS[cH: Courier.Handle] =
    BEGIN
    --Build a user connection object, returning a handle.
    --This may signal (via CreateInternal)
    ch: CourierInternal.ConnectionHandle ← CreateInternal[user, remote];
    remote.socket ← NSConstants.courierSocket;
    ch.object.remote ← remote;
    ch.object.programNumber ← programNumber;
    ch.object.versionNumber ← versionNumber;
    ch.object.zone ← zone;
    ch.object.classOfService ← classOfService;
    RETURN[@ch.object];
    END;  --Create
  
  CreateInternal: PUBLIC ENTRY PROC [type: CourierInternal.ConnectionType,
    remote: Courier.SystemElement]
    RETURNS[ch: CourierInternal.ConnectionHandle] =
    BEGIN
    <<ENABLE UNWIND => NULL;>>  --no non-fatal errors here
    h: CourierInternal.ConnectionHandle;
    --look for an existing object.
    IF type = user THEN
      BEGIN
      FOR h ← connection.head, h.link UNTIL h = NIL DO
	WITH c: h SELECT FROM
	  user =>                             --only user objects are acquirable.
	    IF (c.streamState = dally) AND    --and we only grab dallying ones.
	      (c.object.remote.host = remote.host) THEN
	      BEGIN  --found one!
	      ch ← h;
	      c.streamState ← idle;           --so nobody else grabs it.
	      c.seal ← CourierInternal.seal;
	      IF CourierInternal.doStats THEN
		CourierInternal.stats[streamsAcquired] ←
		SUCC[CourierInternal.stats[streamsAcquired]];
	      RETURN;  --look no further
	      END;  --found cached connection object
	  ENDCASE;  --server, client => NULL;
	ENDLOOP;
      --didn't find a cached object - create a new one.  
      ch ← longZone.NEW[CourierInternal.ConnectionObject];
      END
    ELSE  --type # user, create new object.
      ch ← longZone.NEW[CourierInternal.ConnectionObject];
    SELECT type FROM
      user =>
        BEGIN
	ch.body ← user[
	  protocolRange: [pvLow, pvHigh],
	  message: [messageObject: protocol3[protocol3Body: ]],
	  clock: System.GetGreenwichMeanTime[],
	  seal: CourierInternal.seal];
	END;
      server =>
        BEGIN
	ch.body ← server[
	  protocolRange: [pvLow, pvHigh],
          message: [messageObject: protocol3[protocol3Body: ]],
	  clock: System.GetGreenwichMeanTime[],
	  seal: CourierInternal.seal];
	END;
      ENDCASE;  --client => bad plan!
    --set initial pointer to stream
    WITH h: ch SELECT FROM
      user, server => ch.object.sH ← @ch.bulkFilter.object; ENDCASE;
    ch.transFilter ← NIL;  --there is no transport at this time
    ch.link ← connection.head; connection.head ← ch;  --goes at beginning of list
    connection.active ← SUCC[connection.active];
    connection.total ← SUCC[connection.total];
    END;  --CreateInternal
    
  
  CreateNewStream: PUBLIC ENTRY PROC RETURNS[BOOLEAN] =
    BEGIN
    IF numberOfStreams >= maxStreamsAllowed THEN RETURN[FALSE];
    numberOfStreams ← SUCC[numberOfStreams];
    RETURN[TRUE];
    END;  --CreateNewStream 
  
  Delete: PUBLIC ENTRY PROC[cH: Courier.Handle] =
    BEGIN
    IF cH = NIL THEN RETURN WITH ERROR Courier.Error[invalidHandle];  --busted
    WITH ch: LOOPHOLE[cH, CourierInternal.ConnectionHandle] SELECT FROM
      user =>
	BEGIN
	SELECT TRUE FROM
	  (ch.seal # CourierInternal.seal) =>
	    RETURN WITH ERROR Courier.Error[invalidHandle];  --busted
	  (ch.streamState = busy), (ch.streamState = out) => 
	    RETURN WITH ERROR Courier.Error[streamNotYours];
	  ENDCASE =>  --start dally and let watcher delete
	    BEGIN
	    --client can't use his stream any more.
	    ch.streamState ← dally;  --just waiting to timeout
	    ch.clock ← [System.GetGreenwichMeanTime[] + dallyTimeout];
	    ch.seal ← [0, 0];  --smash the seal immediately
	    --so it will register again (in Call) if re-acquired.
	    ch.createTransport ← NIL;
	    END;
	END;
      ENDCASE => RETURN WITH ERROR Courier.Error[invalidHandle];  --server, client
    END;  --Delete
    
  
  DeleteConnection: PUBLIC ENTRY PROC [ch: CourierInternal.ConnectionHandle] =
    {ENABLE UNWIND => NULL; ch ← RemoveConnection[ch]; longZone.FREE[@ch]};
    
  
  DeleteStream: PUBLIC PROC [ch: CourierInternal.ConnectionHandle] =
    BEGIN
    aH: CourierInternal.AugmentedStream ← NIL;

    DeleteStreamLocked: ENTRY PROC = --INLINE--
      BEGIN
      --extract stream from connection and adjust object's state
      WITH h: ch SELECT FROM
        user =>
	  BEGIN  --reset protocol arbitration starting point
	  h.protocolRange ← [pvLow, pvHigh];
	  CourierInternal.SetMessageProtocolVersion[@h, pvHigh];
	  aH ← h.transFilter;
	  h.transFilter ← NIL;
	  h.streamState ← idle;
	  END;
	server =>
	  BEGIN
	  aH ← h.transFilter;
	  h.transFilter ← NIL;
	  h.streamState ← idle;
	  END;
	ENDCASE;  --don't call this routine with client connection
      END;  --DeleteStreamLocked
    
    DeleteStreamLocked[];
    ch.endRecord ← TRUE;  --simulate that we are at the end of the message
    IF aH # NIL THEN
      BEGIN
      aH.object.delete[@aH.object];
      DecrementStreamCount[];
      IF CourierInternal.doStats THEN CourierInternal.stats[streamsDeleted] ←
        SUCC[CourierInternal.stats[streamsDeleted]];
      END;
    END;  --DeleteStream
    
  
  EnumerateExports: PUBLIC ENTRY PROC
    RETURNS[enum: LONG DESCRIPTOR FOR Courier.Exports] =
    BEGIN
    <<ENABLE UNWIND => NULL;>>  --no non-fatal errors here
    array: LONG POINTER TO Courier.Exports;
    eH: CourierInternal.ExportHandle ← export.head;
    Type: TYPE = RECORD[SEQUENCE COMPUTED CARDINAL OF WORD];
    array ← LOOPHOLE[longZone.NEW[Type[export.active*SIZE[Courier.ExportItem]]]];
    enum ← DESCRIPTOR[array, export.active];
    FOR index: CARDINAL IN[0..export.active) DO
      enum[index] ← [eH.programNumber, eH.versionRange, NIL, eH.exportTime];
      IF eH.length # 0 THEN
        BEGIN
	enum[index].serviceName ← longZone.NEW[StringBody[eH.length] ← [
	  length: eH.length, maxlength: eH.length, text:]];
	FOR s: CARDINAL IN[0..eH.length) DO
	  enum[index].serviceName[s] ← eH.serviceName[s];
	  ENDLOOP;
	END;
      eH ← eH.link;
      ENDLOOP;
    END;
    
  
  ExportRemoteProgram: PUBLIC ENTRY PROC[
    programNumber: LONG CARDINAL, versionRange: Courier.VersionRange,
    dispatcher: Courier.Dispatcher, serviceName: LONG STRING,
    zone: UNCOUNTED ZONE, classOfService: NetworkStream.ClassOfService] =
    BEGIN
    ENABLE UNWIND => NULL;
    eH: CourierInternal.ExportHandle;
    length: CARDINAL ← IF serviceName = NIL THEN 0 ELSE serviceName.length;
    FOR eH ← export.head, eH.link UNTIL eH = NIL DO
      IF eH.programNumber = programNumber
        AND eH.versionRange.low = versionRange.low
        AND eH.versionRange.high = versionRange.high THEN
        RETURN WITH ERROR Courier.Error[duplicateProgramExport];
      ENDLOOP;
    eH ← longZone.NEW[
      CourierInternal.ExportObject[length] ← [
      link: NIL, programNumber: programNumber, versionRange: versionRange,
      dispatcher: dispatcher, serviceName: , zone: zone,
      exportTime: System.GetGreenwichMeanTime[], classOfService: classOfService]];
    --once bulk its fixed, it stays fixed
    IF streamClass # bulk THEN streamClass ← classOfService;
    IF length # 0 THEN
      FOR index: CARDINAL IN[0..serviceName.length) DO
        eH.serviceName[index] ← serviceName[index]; ENDLOOP;
    eH.link ← export.head; export.head ← eH;  --link to beginning of list
    export.active ← SUCC[export.active];
    --set up the default listener if needed.
    IF ~listening THEN {listening ← TRUE; RegisterListener[NIL]};
    END;  --ExportRemoteProgram
    
  
  FreeEnumeration: PUBLIC PROC[enum: LONG DESCRIPTOR FOR Courier.Exports] =
    BEGIN
    
    freeExports: Courier.Description =
      BEGIN
      LongDescriptor: TYPE = LONG DESCRIPTOR FOR Courier.Exports;
      [] ← notes.noteSize[SIZE[LongDescriptor]];
      notes.noteArrayDescriptor[@enum, SIZE[Courier.ExportItem], LAST[CARDINAL]];
      FOR index: INTEGER IN[0..LENGTH[enum]) DO
        notes.noteString[@enum[index].serviceName];
	ENDLOOP;
      END;

    Courier.Free[[@enum, freeExports], longZone];
    END;
  
  NewStreamFailed, DecrementStreamCount: PUBLIC ENTRY PROC =
    {numberOfStreams ← PRED[numberOfStreams]};
  
  ReleaseDataStream: PUBLIC PROC[cH: Courier.Handle] =
    --Instructs Courier that the client is finished with the data stream.
    BEGIN
    IF cH = NIL THEN ERROR Courier.Error[invalidHandle];
    WITH h: LOOPHOLE[cH, CourierInternal.ConnectionHandle] SELECT FROM
      user =>
        BEGIN
	SELECT TRUE FROM
	  (h.seal # CourierInternal.seal) => ERROR Courier.Error[invalidHandle];
	  (h.streamState = out) =>
	    SetIdleWatcher[@h ! Courier.Error => GOTO dead];
	  ENDCASE => Courier.Error[streamNotYours];
	EXITS dead => DeleteStream[@h];
	END;
      --server => CourierImplS.Receiver (the dispatcher) will release later
      --client => Courier doesn't know about the data stream
      ENDCASE;
    END;
  
  RemoveConnection: INTERNAL PROC [ch: CourierInternal.ConnectionHandle]
    RETURNS[c: CourierInternal.ConnectionHandle] =
    BEGIN
    p: CourierInternal.ConnectionHandle ← NIL;
    --assumed to be monitored elsewhere
    --THIS DELETE IS IMMEDIATE.
    FOR c ← connection.head, c.link UNTIL c = NIL DO
      IF c = ch THEN
        BEGIN
	IF p = NIL THEN connection.head ← c.link ELSE p.link ← c.link;
	connection.active ← PRED[connection.active];
	EXIT;
	END;
      p ← c;
      ENDLOOP;
    END;  --RemoveConnection

  RequestDataStream: PUBLIC <<CourierExtras>> ENTRY PROC[
    cH: Courier.Handle] RETURNS[Stream.Handle] =
    BEGIN
    OPEN ch: LOOPHOLE[cH, CourierInternal.UserConnection];
    SELECT TRUE FROM
      (cH = NIL), (ch.seal # CourierInternal.seal) =>
        RETURN WITH ERROR Courier.Error[invalidHandle];
      (ch.streamState = out), (ch.streamState = busy), (ch.transFilter = NIL) => 
        RETURN WITH ERROR Courier.Error[streamNotYours];
      ENDCASE => ch.streamState ← out;  --make it look like its out
    RETURN[ch.object.sH];  --there it is
    END;  --RequestDataStream
    
  SearchForExport: PUBLIC ENTRY PROC[ch: CourierInternal.ServerConnection]
    RETURNS[dispatcher: Courier.Dispatcher, range: Courier.VersionRange] =
    BEGIN
    <<
    IF there is an export that satisfies, then return the relavent dispatcher
    and the range is copied from the export.
    IF there was any export of the program number, but no valid range, then
    the dispatcher is returned NIL and the range is the union of all export
    ranges (beware of non-contiguous range exports).
    IF there was no export of the program, dispatcher is returned NIL and
    the range is not significant (or defined for that matter).
    This should probably return
      results: RECORD[
	SELECT * FROM
	  match => [Courier.Dispatcher: eH.dispatcher],
	  mismatch => [Courier.VersionRange: range],
	  none => [Courier.Dispatcher: NIL],
	  ENDCASE];
    >>
    dispatcher ← NIL;  --in case we find no match
    range ← [LAST[CARDINAL], FIRST[CARDINAL]];  --to record range of exports
    FOR eH: CourierInternal.ExportHandle ← export.head, eH.link UNTIL eH = NIL DO
      SELECT TRUE FROM
        (ch.object.programNumber # eH.programNumber) => LOOP;  --don't record
	(ch.object.versionNumber IN[eH.versionRange.low..eH.versionRange.high]) =>
	  {ch.object.zone ← eH.zone; RETURN[eH.dispatcher, eH.versionRange]};
	ENDCASE;
      dispatcher ← eH.dispatcher;  --anything but NIL
      range.low ← MIN[range.low, eH.versionRange.low];
      range.high ← MAX[range.high, eH.versionRange.high];
      ENDLOOP;
    END;
    
  SetIdleWatcher: PUBLIC ENTRY PROC [ch: CourierInternal.UserConnection] =
    BEGIN
    ENABLE UNWIND => NULL;
    <<
    If ch = NIL or streamState = dally, the client deleted the object
    in a catch phrase, then caused the UNWIND.  That client probably
    deserves to die a horrible death, but ....
    >>
    IF ch = NIL THEN RETURN;  --just ignore the call
    NOTIFY watch;  --get somebody to keep an eye on this
    SELECT ch.streamState FROM
      (out) => ch.transFilter.object.setSST[  --old version of bulk data
        @ch.transFilter.object, CourierProtocol.dataSST];
      (dally) => RETURN;  --client already asked for delete
      ENDCASE;
    IF ch.transFilter # NIL THEN
      ch.transFilter.object.setTimeout[@ch.transFilter.object, 0];
    ch.clock ← [System.GetGreenwichMeanTime[] + userIdleTimeout];
    ch.streamState ← idle;  --the stream is now idle
    END;
  
  StackBlockPush: PUBLIC ENTRY PROC[stack: CourierOps.StackBlockHandle]
    RETURNS[page: CourierOps.StackBlockHandle] =
    BEGIN
    <<
    Trying to push a new element on the stack and found the stack either NIL
    or full.  Allocate a new block and chain it to the current stack block.
    'stack' is either NIL or at the block's ceiling
      (i.e., stack  == @??.element[CourierOps.stackObjectLimit]).
    >>
    <<ENABLE UNWIND => NULL;>>  --safe as long as pilot not abortable
    index: CARDINAL;

    BEGIN  --searching cache
    FOR index DECREASING IN[0..CourierOps.stackCacheLength) DO
      IF stackCache[index] # NIL THEN GOTO cache;
      ENDLOOP;
    <<
    no available cached stack block found
    NOTE: index is == 0 at this point (the reason for the DECREASING)
    >>
    stackCache[0] ← Space.Map[
      window: [File.nullFile, 0, CourierOps.stackPageLength],
      class: data, swapUnits: [unitary[]]].pointer;
    IF CourierInternal.doStats THEN
      CourierInternal.stats[stackPagesMapped] ←
        SUCC[CourierInternal.stats[stackPagesMapped]];
    EXITS cache => NULL;
    END;  --searching cache

    page ← stackCache[index]; stackCache[index] ← NIL;  --remove from cache
    page.nextBlock ← stack;  --link to old if it exists
    IF CourierInternal.doStats THEN CourierInternal.stats[stackPagesGot] ←
      SUCC[CourierInternal.stats[stackPagesGot]];
    END;

  StackBlockPop: PUBLIC ENTRY PROC[stack: CourierOps.StackBlockHandle]
    RETURNS[new: CourierOps.StackBlockHandle] =
    BEGIN
    <<ENABLE UNWIND => NULL;>>  --safe as long as pilot is not abortable
    index: CARDINAL;
    new ← stack.nextBlock;  --pick up old link

    --determine if we want to cache this block or free it.
    BEGIN  --searching cache
    FOR index DECREASING IN[0..CourierOps.stackCacheLength) DO
      IF stackCache[index] = NIL THEN GOTO cache; ENDLOOP;
    --no available slot to cache this block
    Space.Kill[[stack, CourierOps.stackPageLength]];
    [] ← Space.Unmap[stack, return];
    IF CourierInternal.doStats THEN
      CourierInternal.stats[stackPagesUnmapped] ←
        SUCC[CourierInternal.stats[stackPagesUnmapped]];
    EXITS cache => stackCache[index] ← stack;  --cache this for use later
    END;  --searching cache

    IF CourierInternal.doStats THEN CourierInternal.stats[stackPagesPut] ←
      SUCC[CourierInternal.stats[stackPagesPut]]; 
    END;
    
    
  RegisterTransport: PUBLIC PROC [cH: Courier.Handle,
    transportProc: CourierInternal.Creator,
    transportType: CourierInternal.TransportType] =
    BEGIN
    --Register the transport creation proc for the rpc user side.
    ch: CourierInternal.UserConnection ← LOOPHOLE[cH];
    IF transportType # ch.transportType THEN
      BEGIN 
      --Get rid of the old stream if there is one.
      IF ch.transFilter # NIL THEN
        {ch.transFilter.object.delete[@ch.transFilter.object];
	ch.transFilter ← NIL};
      END;
    IF transportProc = NIL THEN 
      {ch.createTransport ← CourierInternal.CreateDefaultStream;
      ch.transportType ← CourierInternal.ttDefault}
    ELSE
      {ch.createTransport ← transportProc;
      ch.transportType ← transportType};
    END;  --RegisterTransport
    
    
  RegisterListener: PUBLIC PROC [listener: CourierInternal.Listener] =
     BEGIN
     prio: Process.Priority ← Process.GetPriority[];  --save his
     Process.SetPriority[CommPriorities.normal];  --make sure we do this normal
     IF listener # NIL THEN Process.Detach[FORK listener[]]
     ELSE Process.Detach[FORK CourierInternal.DefaultListener[]];
     Process.SetPriority[prio];  --set client back to whatever
     END;  --RegisterListener
     
  
  Watcher: PROC =
    BEGIN
    --YOU WANNA TOUCH THIS CODE?  YOU GONNA REGRET IT!
    
    Wait: ENTRY PROC = INLINE {ENABLE UNWIND => NULL; WAIT watch};
    
    WatcherLocked: ENTRY PROC RETURNS[
      aH: CourierInternal.AugmentedStream,
      ch: CourierInternal.ConnectionHandle] =
      BEGIN
      ENABLE UNWIND => NULL;
      FOR ch ← connection.head, ch.link UNTIL ch = NIL DO
        WITH h: ch SELECT FROM
	  user =>
	    BEGIN
	    SELECT h.streamState FROM
	      idle =>
		BEGIN
		b: CARDINAL;
		closing: BOOLEAN;
		notmo: Stream.InputOptions = [signalTimeout: FALSE];
		two: Environment.Block = [LOOPHOLE[LONG[@b]], 0, 2];
		IF (aH ← ch.transFilter) = NIL THEN LOOP;  --already deleted
		BEGIN
		ENABLE Courier.Error => {closing ← TRUE; CONTINUE};
		closing ← aH.object.get[@aH.object, two, notmo].why # timeout;
		END;
		IF closing OR (h.clock < now) THEN
		  BEGIN
		  ch.transFilter ← NIL;  --set it to nil so we skip next time
		  h.protocolRange ← [  --reset state of arbitration
		    pvLow, pvHigh];
		  CourierInternal.SetMessageProtocolVersion[@h, pvHigh];
		  RETURN[aH, NIL];  --delete the stream, not the object
		  END;
		END;
	      dally => IF h.clock < now THEN
		RETURN[ch.transFilter, RemoveConnection[ch]];  --both
	      ENDCASE;  --busy, out => NULL;
	    END;
	  ENDCASE;  --server, client => NULL;
	ENDLOOP;
      WAIT watch;  --wait 2 seconds before trying again
      RETURN[NIL, NIL];  --just bailing out of the monitor
      END;  --WatcherLocked
    
    now: System.GreenwichMeanTime;
    aH: CourierInternal.AugmentedStream;
    ch: CourierInternal.ConnectionHandle;
    DO ENABLE ABORTED => EXIT;  --exits only if aborted (Stop'd)
      Process.SetTimeout[@watch, Process.SecondsToTicks[2]];
      UNTIL connection.active = 0 DO
        now ← System.GetGreenwichMeanTime[];
        [aH, ch] ← WatcherLocked[];  --makes a partial or full pass
        IF aH # NIL THEN
          BEGIN
	  aH.object.delete[@aH.object];  DecrementStreamCount[];
          IF CourierInternal.doStats THEN CourierInternal.stats[streamsDeleted] ← 
            SUCC[CourierInternal.stats[streamsDeleted]];
	  END;
	IF ch # NIL THEN longZone.FREE[@ch];  --then delete the object
        ENDLOOP;
      Process.DisableTimeout[@watch];  --keeps from waking up
      Heap.Prune[longZone];  --try to reclaim extra space (noop)
      Wait[];  --this waits until someone else creates a connection
      ENDLOOP;
    END;  --Watcher

  UnexportRemoteProgram: PUBLIC ENTRY PROC[
    programNumber: LONG CARDINAL, versionRange: Courier.VersionRange] =
    BEGIN
    ENABLE UNWIND => NULL;
    prev, current: CourierInternal.ExportHandle;
    FOR current ← export.head, current.link UNTIL current = NIL DO
      IF (programNumber = current.programNumber)
        AND (versionRange = current.versionRange) THEN
        BEGIN  --This is the element to remove from the list
        IF current = export.head THEN export.head ← current.link
	--beginning of list
        ELSE prev.link ← current.link;  --middle or end of list
        longZone.FREE[@current];
	IF (export.active ← PRED[export.active]) = 0 THEN
	  streamClass ← transactional;
        RETURN;
        END;  --This is the element to remove from the list
      prev ← current;
      ENDLOOP;
    RETURN WITH ERROR Courier.Error[noSuchProgramExport];
    END;  --UnexportRemoteProgram

  Start: PUBLIC <<CourierOps.>> PROC[] RETURNS[BOOLEAN] =
     BEGIN
     prio: Process.Priority ← Process.GetPriority[];  --save his
     IF longZone # NIL THEN RETURN[TRUE];  --already started
     Process.SetPriority[CommPriorities.normal];  --make sure we do this normal
     longZone ← Heap.Create[
       initial: 5, increment: 5,
       ownerChecking: System.switches[PilotSwitches.heapOwnerChecking] = down];
     Process.EnableAborts[@watch];  --so we can get his attention
     watcher ← FORK Watcher[];
     Process.SetPriority[prio];  --set client back to whatever
     RETURN[TRUE];  --we're up
     END;  --Start

  Stop: PUBLIC <<CourierOps.>> PROC[] RETURNS[BOOLEAN] =
     BEGIN
    <<
    This is assumed to be serial access due to the fact that some higher
    authority is shutting us down. The procedure isn't monitored.
    >>
    SELECT TRUE FROM
      (export.active # 0) => RETURN[FALSE];  --can't do it
      (connection.active # 0) => RETURN[FALSE];  --that either
      ENDCASE;
    Process.Abort[watcher];  --get his attention
    JOIN watcher;  --come hither, little boy
    FOR index: NATURAL IN[0..CourierOps.stackCacheLength) DO
      p: LONG POINTER = stackCache[index];
      IF p # NIL THEN {[] ← Space.Unmap[p]; stackCache[index] ← NIL};
      ENDLOOP;
    Heap.Delete[z: longZone,
      checkEmpty: System.switches[PilotSwitches.heapOwnerChecking] = down];
    longZone ← NIL;
    RETURN[TRUE];
    END;  --Stop

  [] ← Start[];  --motion, motion. lots of motion
  	
  END....  --CourierImplM.mesa
  
LOG
 5-Jan-87 15:29:48  AOF  Trimmed log for Pilot 13.0
 5-Jan-87 15:25:12  AOF  Merge in Courier extras for DBMS
16-Jan-87  9:44:36  AOF  Removal of Courier Version 2.0
16-Jan-87  9:44:36  AOF  Move all ERRORs to here
19-Jun-87 13:33:59  AOF  Starting and stopping