-- File: XStreamImpl.mesa - last edit:
-- AOF                 28-Jan-88 11:23:01
-- Copyright (C) 1985, 1986, 1987, 1988 by Xerox Corporation. All rights reserved.

DIRECTORY
  BulkData USING [
    attnByte, bulkSST, Descriptor, Error, Identifier, Proc, program,
    Sink, Source, version],
  Courier USING [
    Arguments, Call, Create, Delete, Error, ExportRemoteProgram, ErrorCode,
    Description, Dispatcher, Handle, NoSuchProcedureNumber, Parameters,
    RemoteErrorSignalled, SignalRemoteError, UnexportRemoteProgram,
    LocalSystemElement],
  CourierInternal USING [
    AugmentedStream, ConnectionHandle, longZone, SetBulkStream],
  CourierOps USING [NotesObject, RedirectStream],
  CourierProtocol USING [dataSST],
  Environment USING [Block],
  NSBuffer USING [Body, Buffer],
  NSTypes USING [bytesPerIDPHeader, bytesPerSppHeader],
  PacketStream USING [ConnectionSuspended, Handle],
  Process USING [
    Abort, EnableAborts, GetCurrent, priorityForeground, SetPriority,
    SetTimeout, SecondsToTicks],
  Router USING [GetDelayToNet, NoTableEntryForNet],
  SppOps USING [PacketStreamFromByteStream],
  Stream USING [
    Block, CompletionCode, defaultObject, DeleteProcedure, EndOfStream,
    GetProcedure, Handle, InputOptions, InvalidOperation, PutProcedure,
    SendNowProcedure, SubSequenceType, TimeOut],
  System USING [
    GetClockPulses, NetworkAddress, NetworkNumber, nullHostNumber,
    nullSocketNumber, nullNetworkNumber, SocketNumber],
  XStream USING [Request],
  XStreamOps USING [Handle, IndicatedUse, Object, ObjectSeal];

XStreamImpl: MONITOR
  IMPORTS
    Courier, CourierInternal, CourierOps, PacketStream, Process, Router,
    SppOps, Stream, System
  EXPORTS BulkData, XStream, XStreamOps =
  BEGIN

  baseTMO: CARDINAL = 20;  --20 seconds
  hopTMO: CARDINAL = 2;  --seconds again

  ID: PUBLIC --BulkData-- TYPE = LONG CARDINAL;

  Handle: PUBLIC --XStream-- TYPE = XStreamOps.Handle;
  Object: PUBLIC --XStream-- TYPE = XStreamOps.Object;

  CancelArguments: TYPE = MACHINE DEPENDENT RECORD[
    identifier: BulkData.Identifier, timeout: CARDINAL];

  TransferArguments: TYPE = MACHINE DEPENDENT RECORD[
    identifier: BulkData.Identifier,
    descriptor: immediate BulkData.Descriptor ← [immediate []],
    timeout: CARDINAL];

  list: RECORD[first: Handle, count: CARDINAL] ← [NIL, 0];
  OnList: ENTRY PROC[h: Handle] =
    {h.link ← list.first; list.first ← h; list.count ← SUCC[list.count]};

  address: System.NetworkAddress ← Courier.LocalSystemElement[];  --local copy

  currentID: ID ← System.GetClockPulses[];
  NewID: ENTRY PROC RETURNS[ID] = INLINE {RETURN[(currentID ← currentID + 1)]};

  BUG: ERROR[code: Code] = CODE;
  Code: TYPE = {bug, reused, lost, protocol, undeleted};

  AbortTransfer: PUBLIC <<XStream>> PROC[sH: Stream.Handle] =
    BEGIN
    <<
    This is an immediate transfer that is being aborted.  In order to
    abort, we need to insure the SST is set, then send the attention.
    Once the attention is sent, don't send and "end-of-data" when the
    stream is deleted.
    >>
    sH.setSST[sH, BulkData.bulkSST];  --attention has to go as sst=1
    sH.sendAttention[sH, BulkData.attnByte];  --send the abort
    sH.delete ← DeleteNull;  --no longer want to send endRecord
    END;  --AbortTransfer

  ActiveRendezvous: <<ENTRY>> PROC[active: Handle] =
    BEGIN
    OPEN aH: NARROW[active↑, active Object];
    <<
    This is the active party of a three party transfer.  We have to contact
    the passive party using the Bulk Data Remote Program.  Then connect the
    active party's immediate request to the stream made available by the
    Courier call.  The local active party will never know what happened. 
    >>

    TransferBulkData: PROC[cH: Courier.Handle] =
      BEGIN
      WITH request: active.request SELECT FROM
        none, deferred => Courier.SignalRemoteError[
	  BulkData.Error[invalidDescriptor].ORD];
	stream => StreamCopy[active];  --blind transfer
	proc => request.proc[active];  --then call the client's proc
	ENDCASE;
      END;  --TransferBulkData

    error: PROC[errorNumber: CARDINAL, arguments: Courier.Arguments] =
      BEGIN
      arguments[];  --there aren't any arguments
      --and I don't know what do do with this--
      errorCode ← truncatedTransfer;
      <<
      errorCode ← SELECT errorNumber FROM
        BulkData.Error[invalidDescriptor].ORD => truncatedTransfer, 
        BulkData.Error[noSuchIdentifier].ORD => truncatedTransfer,
        BulkData.Error[identifierBusy].ORD => truncatedTransfer, 
        BulkData.Error[wrongHost].ORD => truncatedTransfer, 
        BulkData.Error[transferAborted].ORD => truncatedTransfer,
	ENDCASE => ERROR Error[protocol];
      >> 
      END;

    cH: Courier.Handle;
    arguments: TransferArguments;
    errorCode: Courier.ErrorCode ← noError; 

    WITH aH: active SELECT FROM
      active =>
        BEGIN
	procedure: CARDINAL = SELECT active.use FROM
	  puts => BulkData.Proc[receive].ORD,
	  gets => BulkData.Proc[send].ORD,
	  ENDCASE => ERROR BUG[protocol];

	cH ← Courier.Create[
	  remote: [aH.passive.net, aH.passive.host, System.nullSocketNumber],
	  programNumber: BulkData.program, versionNumber: BulkData.version,
	  zone: aH.courier.object.zone, classOfService: bulk];
	arguments ← [
	  identifier: active.identifier, timeout: WaitTime[aH.passive.net]];
    
	aH.courier ← LOOPHOLE[cH];  --replace Courier handle in request
    
	[] ← Courier.Call[
	  cH: cH, procedureNumber: procedure,
	  arguments: [@arguments, DescribeTransferArguments],
	  streamCheckoutProc: TransferBulkData !
	    UNWIND => Courier.Delete[cH];
	    Courier.RemoteErrorSignalled =>
	      {error[errorNumber, arguments]; CONTINUE}];
	Courier.Delete[cH];  --that just about wraps it up
	IF errorCode # noError THEN RETURN WITH ERROR Courier.Error[errorCode];
	END;
      ENDCASE;

    END;  --ActiveRendezvous

  AttentionProc: <<FORKed>> PROC[sH: Stream.Handle, handle: Handle] =
    BEGIN
    ENABLE ABORTED => CONTINUE;  --that's our forker telling us to go away
    Process.SetPriority[Process.priorityForeground];  --make it run fast
    [] ← sH.waitAttention[sH !  --wait for one to come in
      Courier.Error => CONTINUE];  --stream died with 'transportTimeout'
    handle.use ← truncated;  --and if it does, then drop the hammer
    END;  --AttentionProc

  BulkDataDispatcher: Courier.Dispatcher =
    BEGIN
    <<
    This dispatcher is not written in the proper style of client/consumer
    IMPORTer/EXPORTer like a good Courier program should be.  So don't bother
    trying to impress me with you understanding of software achitecture by
    telling me how impure this is.
    This model doesn't care whether the transfer request is a sink or source.
    That bit of information should exist in the passive's request that we
    rendezvous with.  Nor is it going to call ServerCheckout.  Instead it's
    going to rendezvous directly.
    >>
    parameters: Courier.Parameters;  --this isn't typed yet
    BEGIN
    ENABLE UNWIND => cH.zone.FREE[@parameters.location];
    SELECT procedureNumber FROM
      BulkData.Proc[send].ORD =>
        BEGIN
	parameters ← [
	  location: cH.zone.NEW[TransferArguments],
	  description: DescribeTransferArguments];
	arguments[parameters];  --get the arguments
	ServerRendezvous[cH, parameters.location, gets];
	END;
      BulkData.Proc[receive].ORD =>
        BEGIN
	parameters ← [
	  location: cH.zone.NEW[TransferArguments],
	  description: DescribeTransferArguments];
	arguments[parameters];  --get the arguments
	ServerRendezvous[cH, parameters.location, puts];
	END;
      BulkData.Proc[cancel].ORD =>
        BEGIN
	parameters ← [
	  location: cH.zone.NEW[CancelArguments],
	  description: DescribeCancelArguments];
	arguments[parameters];  --get the arguments
	CancelRendezvous[parameters.location];
	END;
      ENDCASE => RETURN WITH ERROR Courier.NoSuchProcedureNumber;
    END;
    cH.zone.FREE[@parameters.location];  --free up the storage
    [] ← results[];  --we don't have any results to send
    END;  --BulkDataDispatcher

  CancelRendezvous: PROC[cancel: LONG POINTER TO CancelArguments] =
    BEGIN
    <<
    This operation is a remote procedure call coming in from the active
    party of a three party transfer.  This procedure has been called instead
    of Produce or Consume because of some inability of the active party.
    Since this is a Produce or Consume replacement, this code looks a lot like
    the code needed to rendezvous the incoming call with the passive object at
    the passive site (see ServerRendezvous).

    If the passive party is waiting here, tell him to go away, else hang
    a new object on the list and he'll find us, hopefully before we time out.
    >>
    CoupleWithPassive: ENTRY PROC[wait: BOOLEAN] RETURNS[BOOLEAN ← FALSE] =
      BEGIN
      FOR passive: Handle ← list.first, passive.link UNTIL passive = NIL DO
	IF passive.identifier = cancel.identifier THEN
	  BEGIN
	  WITH pH: passive SELECT FROM
	    passive =>
	      SELECT passive.seal FROM
	        cancelled =>
		  RETURN WITH ERROR Courier.SignalRemoteError[
		    BulkData.Error[transferAborted].ORD];
		ENDCASE =>
		  BEGIN  --that's the one we wanted
		  passive.seal ← cancelled;  --well, he is now
		  Process.Abort[pH.process];  --and wake him up
		  RETURN[TRUE];  --and tell caller we found him
		  END;
	    ENDCASE;
	  END;
	ENDLOOP;
      <<
      Passive call isn't here yet, so we'll supply an cancel object and
      he'll rendezvous with it when he gets here.  If we get called with
      wait = TRUE then this is the first call.  If we don't make the rendezvous
      with wait = FALSE, then signal BulkData.Error[noSuchIdentifier].
      >>
      IF wait THEN WAIT active.rendezvous  --it isn't abortable
      ELSE RETURN WITH ERROR 
        Courier.SignalRemoteError[BulkData.Error[noSuchIdentifier].ORD];
      END;  --CoupleWithPassive

    active: LONG POINTER TO cancel Object ← CourierInternal.longZone.NEW[
      cancel Object ← [
	seal: old,  --old before its time
	courier: NIL,  --we don't have any idea
	identifier: cancel.identifier,  --copy incoming object
	request: [none[]], use: cancel,  --to be used as one
	variant: cancel[process: Process.GetCurrent[]]]];

    Process.SetTimeout[
      @active.rendezvous, Process.SecondsToTicks[cancel.timeout]];
    OnList[active];  --insert into list

    BEGIN
    ENABLE UNWIND => OffList[active];
    SELECT TRUE FROM
      (CoupleWithPassive[TRUE]) => NULL;  --he's cancelled
      (CoupleWithPassive[FALSE]) => NULL;  --he finally got here
      ENDCASE;  --error will have been raised in CoupleWithPassive
    END;

    OffList[active];  --remove element we added for rendezvous
    END;  --CancelRendezvous

  CancelTransfer: PUBLIC <<XStream>> PROC[handle: Handle] =
    BEGIN
    WITH h: handle SELECT FROM
      immediate =>
        BEGIN
        <<
        This is an immediate transfer that is being aborted.  In order to
        abort, we need to insure the SST is set, then send the attention.
        This almost has to be done no matter what state the call is in.  It
        is possible to bum this out IFF we are the called party and the
        sender (source) of the bulk data, but that's too many checks and
        just sending the thing is "correct". 
        >>
        AbortTransfer[h.courier.object.sH];
        END;
      active =>
        BEGIN
        <<
        The active party in a 3-party transfer can't produce|consume the
        data.  He calls this routine instead so that the passive party won't
        hang around forever for the rendezvous to arrive.
        Is it interesting to notify the client of any errors here?
        >>
        cancel: CancelArguments ← [handle.identifier, WaitTime[h.passive.net]];
        cH: Courier.Handle ← Courier.Create[
          remote: [h.passive.net, h.passive.host, System.nullSocketNumber],
	  programNumber: BulkData.program, versionNumber: BulkData.version,
	  zone: h.courier.object.zone, classOfService: bulk];
        [] ← Courier.Call[
          cH: cH, procedureNumber: BulkData.Proc[cancel].ORD,
	  arguments: [@cancel, DescribeCancelArguments] !
	Courier.Error, Courier.RemoteErrorSignalled => CONTINUE];
	Courier.Delete[cH];
	END;
     passive =>
	<<
	What do you do here?  I'd like to leave around a passive object
	marked cancelled so when (IF) the active process came in, it could
	detect the cancellation quickly.  But I don't know if the object
	is queued yet.  One end or the other is going to take a chance on
	having to time out.  I'm going to let the active rendezvous be the one.
	>>
	{};  --fill in as needed
      ENDCASE => ERROR BUG[bug];
    handle.seal ← cancelled;  --looks easy, but what does it mean?
    END;  --CancelTransfer

  GetSppLength: PROC[body: NSBuffer.Body] RETURNS[NATURAL] = INLINE
    {RETURN[
      body.pktLength -
      NSTypes.bytesPerIDPHeader -
      NSTypes.bytesPerSppHeader]};

  SetSppLength: PROC[body: NSBuffer.Body, bytes: NATURAL] = INLINE
    {body.pktLength ← bytes +
    NSTypes.bytesPerIDPHeader +
    NSTypes.bytesPerSppHeader};

  PacketFromBulk: PROC[sH: Stream.Handle] RETURNS[PacketStream.Handle] =
    BEGIN
    sH ← RealStream[sH];  --that's okay as far as it goes
    sH ← LOOPHOLE[@LOOPHOLE[sH, CourierInternal.AugmentedStream].context,
      LONG POINTER TO Stream.Handle]↑;  --retch!!!! (copied from CourierImplN)
    RETURN[SppOps.PacketStreamFromByteStream[sH]];  --more loop-de-loops
    END;  --PacketFromBulk
   
  Copy: PUBLIC <<XStream>> PROC[sink, source: Stream.Handle] =
    BEGIN
    <<
    This procedure dives in and finds the packet stream behind Courier's
    transport. It uses that so any bulk data operation that uses copy only
    has to BLT the data once from his buffer to the ethernet buffers. Wonder
    if all this is really worth it?
    >>
    handle: Handle;
    moved: NATURAL;
    sH: Stream.Handle;
    b: NSBuffer.Buffer;
    body: NSBuffer.Body;
    block: Environment.Block;
    psH: PacketStream.Handle;
    why: Stream.CompletionCode ← normal;
    options: Stream.InputOptions = [
      TRUE, FALSE, FALSE, FALSE, FALSE, FALSE, TRUE, FALSE];
    SELECT TRUE FROM  --figure out which stream is ours, which it theirs
      (source.delete = DeleteNull), (source.delete = DeleteEnd) => sH ← source;
      (sink.delete = DeleteNull), (sink.delete = DeleteEnd) => sH ← sink;
      ENDCASE => BUG[bug];  --we're in trouble now

    psH ← PacketFromBulk[sH];  --that's the real packet stream
    handle ← NARROW[sH.clientData];  --and that's the XStream object

    UNTIL why = endOfStream DO
      ENABLE PacketStream.ConnectionSuspended => GOTO streamFailed;
      SELECT handle.use FROM
        gets =>
	  BEGIN
	  SELECT TRUE FROM
	    ((b ← psH.get[]) # NIL) => body ← b.ns;  --got it; copy local pointer
	    (~source.options.signalTimeout) =>  GOTO streamFailed;  --worst
	    ENDCASE => {SIGNAL Stream.TimeOut[0]; LOOP};  --he wants to try again
	  IF body.endOfMessage THEN why ← endOfStream;  --to get out of loop
	  block ← [LOOPHOLE[@body.sppBody], 0, GetSppLength[body]];
	  SELECT TRUE FROM
	    (body.attention) => why ← attention;  --remote's bailing out
	    (block.stopIndexPlusOne = 0) => NULL;  --nothing going on here
	    (body.subtype = BulkData.bulkSST) =>  --this is for us
	      sink.put[sink, block, FALSE];  --scribble the bits
	    ENDCASE => why ← endOfStream;  --truncated data of some kind
	  psH.returnReceiveBuffer[b];  --give the buffer back
	  IF why = attention THEN GOTO remoteAbort;  --set earlier
	  END;
	puts =>
	  BEGIN
	  body ← (b ← psH.getSendBuffer[]).ns;  --get buffer to fill
	  block ← [LOOPHOLE[@body.sppBody], 0, psH.getSenderSizeLimit[]];
	  [moved, why, ] ← source.get[source, block, source.options !
	    UNWIND => psH.returnSendBuffer[b]];
	  SetSppLength[body, moved];  --set the length of this move
	  body.subtype ← BulkData.bulkSST;  --set the sst appropriately
	  IF why # normal THEN why ← endOfStream;
	  psH.put[b];  --send out the buffer
	  END;
	truncated => GOTO remoteAbort;  --got an attention packet
	ENDCASE;
      REPEAT
        remoteAbort =>
	  BEGIN
	  <<[] ← sH.getByte[sH];  --consume inband attention>>
	  sH.sendAttention[sH, BulkData.attnByte];
	  sH.delete ← DeleteNull;  --so we don't send endRecord
	  Process.Abort[Process.GetCurrent[]];  --set to abort myself
	  --we want to raise aborted anyhow, so let this signal do it--
	  DO --ENABLE ABORTED => REJECT-- [] ← sH.waitAttention[sH]; ENDLOOP;
	  END;
	streamFailed => {sH.delete ← DeleteNull; ERROR ABORTED};
      ENDLOOP;
    END;  --Copy

  Create: PUBLIC <<XStream>> PROC[handle: Handle] RETURNS[sH: Stream.Handle] =
    BEGIN
    <<
    These stream are simplex, i.e., they can't be used in both directions. The
    direction this instance of stream will be used is determined by the routine
    used to describe the BulkData.SINK | SOURCE. Whichever direction is defined
    for this stream is assigned a local procedure. The other direction is
    assigned a Stream.defaultObject value. That will result in an ERROR
    Stream.InvalidOperation if the client tries to use it incorrectly.
    >>
    IF handle.request.access = deferred THEN GOTO nostream;  --he can't do that
    WITH h: handle SELECT FROM  --pull the stream out of the object
      active, passive, immediate =>
        BEGIN
	sH ← handle.courier.object.sH;  --that's his stream
	SELECT handle.use FROM  --what does he want to do?
	  (puts) =>
	    BEGIN
	    sH.setSST[sH, BulkData.bulkSST];  --then set the SST
	    sH.put ← XStreamPut;  --my own put procedure
	    sH.get ← Stream.defaultObject.get;  --invalid operation
	    sH.delete ← DeleteEnd;  --we have to send the endRecord
	    sH.sendNow ← SendNow;  --so we can trap client endRecords
	    handle.attentionProc ← FORK AttentionProc[sH, handle];  --watcher
	    sH.clientData ← handle;  --store so get/put can find it
	    END;
	  (gets) =>
	    BEGIN
	    sH.get ← XStreamGet;  --my own get procedure
	    sH.put ← Stream.defaultObject.put;  --invalid operation
	    sH.delete ← DeleteNull;  --we don't have to send the end
	    handle.attentionProc ← FORK AttentionProc[sH, handle];  --watcher
	    sH.clientData ← handle;  --store so get/put can find it
	    END;
	  ENDCASE => ERROR BUG[bug];  --what's going on?
	END;
      <<active, null, deferred, cancel, truncated>>
      ENDCASE => GOTO nostream;  --and he can't do those
    EXITS nostream => ERROR Courier.Error[streamNotYours];
    END;  --Create

  DeleteEnd: Stream.DeleteProcedure =
    BEGIN
    tsH: Stream.Handle = RealStream[sH];
    tsH.sendNow[tsH, TRUE];  --send the endRecord bit
    DeleteNull[sH];  --then finish off the stream
    END;  --DeleteEnd

  DeleteNull: Stream.DeleteProcedure =
    BEGIN
    ch: CourierInternal.ConnectionHandle = RealCourier[sH];
    handle: Handle ← NARROW[sH.clientData];  --get the object
    Process.Abort[handle.attentionProc]; JOIN handle.attentionProc;
    handle.attentionProc ← NIL;  --so we don't do it twice
    sH.delete ← Stream.defaultObject.delete;  --so he doesn't do it twice
    END;  --DeleteNull

  DescribeCancelArguments: Courier.Description =
    {[] ← notes.noteSize[SIZE[CancelArguments]]};

  DescribeSink: PUBLIC <<XStream>> Courier.Description =
    BEGIN
    OPEN n: LOOPHOLE[notes, POINTER TO CourierOps.NotesObject];
    <<
    Clients that
      'fetch' an immediate|null 'sink' will be doing "gets"
      'store' an immediate|null 'sink' will be doing "puts"
    >>

    descriptor: BulkData.Descriptor;  --garden variety type
    handle: LONG POINTER TO Handle ← notes.noteSize[SIZE[Handle]];
    notes.noteDeadSpace[handle, SIZE[Handle]];  --dispense with parm area
    SELECT notes.operation FROM
      fetch =>
        BEGIN
	WITH h: handle SELECT FROM
	  null =>
	    BEGIN
	    IF handle.seal = smashed THEN ERROR BUG[reused];
	    descriptor ← [null[]];  --make the descriptor
	    notes.noteSpace[@descriptor, SIZE[null BulkData.Sink]];
	    handle.use ← gets;  --tag it intended use
	    handle.seal ← old;  --type it and mark it ready
	    h.courier ← n.ch;  --tie Courier handle to request
	    END;
	  immediate =>
	    BEGIN
	    IF handle.seal = smashed THEN ERROR BUG[reused];
	    descriptor ← [immediate[]];  --make the descriptor
	    notes.noteSpace[@descriptor, SIZE[immediate BulkData.Sink]];
	    handle.seal ← old;  --mark it ready
	    handle.use ← gets;  --tag it intended use
	    h.courier ← n.ch;  --tie Courier handle to request
	    END;
	  deferred =>
	    BEGIN
	    <<
	    Which remote should be used for passive, which for active?
	    >>
	    OPEN r: NARROW[handle.request, deferred XStream.Request];
	    IF WaitTime[r.sink.net] <= WaitTime[r.source.net] THEN
	      BEGIN
	      descriptor ← [passive[r.sink.net, r.sink.host, handle.identifier]];
	      h.passive ← n.ch;  --tie Courier handle to request
	      END
	    ELSE
	      BEGIN
	      descriptor ← [active[r.sink.net, r.sink.host, handle.identifier]];
	      h.active ← n.ch;  --tie Courier handle to request
	      END;
	    notes.noteSpace[@descriptor, SIZE[passive BulkData.Sink]];
	    handle.seal ← old;  --type it and mark it ready
	    handle.use ← deferred;  --that's its final state
	    END;
	  ENDCASE;
        END;
      store =>
	BEGIN
	notes.noteSpace[@descriptor, SIZE[null BulkData.Sink]];
	handle↑ ← CourierInternal.longZone.NEW[Object ← [
	  seal: old, use: puts, identifier: TRASH,
	  courier: n.ch, variant: null[]]];
	WITH d: descriptor SELECT FROM
	  null =>
	    BEGIN
	    handle↑.variant ← null[];
	    handle.identifier ← [n.ch.object.remote.host, NewID[]];
	    END;
	  immediate =>
	    BEGIN
	    handle↑.variant ← immediate[];
	    handle.identifier ← [n.ch.object.remote.host, NewID[]];
	    END;
	  active =>
	    BEGIN
	    notes.noteSpace[@d.network,
	      SIZE[active BulkData.Sink] - SIZE[null BulkData.Sink]];
	    handle.identifier ← d.identifier;
	    handle↑.variant ← active[
	      process: Process.GetCurrent[],
	      passive: [d.network, d.host]];
	    END;
	  passive =>
	    BEGIN
	    notes.noteSpace[@d.network,
	      SIZE[active BulkData.Sink] - SIZE[null BulkData.Sink]];
	    handle.identifier ← d.identifier;
	    handle↑.variant ← passive[
	      process: Process.GetCurrent[],
	      active: [d.network, d.host]];
	    END;
	  ENDCASE;
	OnList[handle↑];  --tack it on to the list
        END;
      free => Destroy[handle↑];  --equivalent to delete
      ENDCASE;
    END;  --DescribeSink

    DescribeSource: PUBLIC <<XStream>> Courier.Description =
    BEGIN
    OPEN n: LOOPHOLE[notes, POINTER TO CourierOps.NotesObject];
    <<
    Clients that
      'fetch' an immediate|null 'source' will be doing "puts"
      'store' an immediate|null 'source' will be doing "gets"
    >>

    descriptor: BulkData.Descriptor;  --garden variety type
    handle: LONG POINTER TO Handle ← notes.noteSize[SIZE[Handle]];
    notes.noteDeadSpace[handle, SIZE[Handle]];
    SELECT notes.operation FROM
      fetch =>
        BEGIN
	WITH h: handle SELECT FROM
	  null =>
	    BEGIN
	    IF handle.seal = smashed THEN ERROR BUG[reused];
	    descriptor ← [null[]];  --build the descriptor
	    notes.noteSpace[@descriptor, SIZE[null BulkData.Source]];
	    handle.seal ← old;  --age it a bit
	    handle.use ← puts;  --type it and mark it ready
	    h.courier ← n.ch;  --tie Courier handle to request
	    END;
	  immediate =>
	    BEGIN
	    IF handle.seal = smashed THEN ERROR BUG[reused];
	    descriptor ← [immediate[]];  --build the descriptor
	    notes.noteSpace[@descriptor, SIZE[immediate BulkData.Source]];
	    handle.seal ← old;  --age it a bit
	    handle.use ← puts;  --type it and mark it ready
	    h.courier ← n.ch;  --tie Courier handle to request
	    END;
	  deferred =>
	    BEGIN
	    <<
	    Which remote should be used for passive, which for active?
	    Just make sure it's opposite of what was done in describing the
	    sink.  We don't want both parties to be active | passive.
	    >>
	    OPEN r: NARROW[handle.request, deferred XStream.Request];
	    IF WaitTime[r.sink.net] > WaitTime[r.source.net] THEN
	      BEGIN
	      descriptor ← [passive[
	        r.source.net, r.source.host, handle.identifier]];
	      h.passive ← n.ch;  --tie Courier handle to request
	      END
	    ELSE
	      BEGIN
	      descriptor ← [active[
	        r.source.net, r.source.host, handle.identifier]];
	      h.active ← n.ch;  --tie Courier handle to request
	      END;
	    handle.seal ← old;  --age it a bit
	    handle.use ← deferred;  --that's its final state
	    notes.noteSpace[@descriptor, SIZE[passive BulkData.Sink]];
	    END;
	  ENDCASE;
        END;
      store =>
        BEGIN
	notes.noteSpace[@descriptor, SIZE[null BulkData.Source]];
	handle↑ ← CourierInternal.longZone.NEW[Object ← [
	  seal: old, use: gets, identifier: TRASH,
	  courier: n.ch, variant: null[]]];
	WITH d: descriptor SELECT FROM
	  null =>
	    BEGIN
	    handle↑.variant ← null[];
	    handle.identifier ← [n.ch.object.remote.host, NewID[]];
	    END;
	  immediate =>
	    BEGIN
	    handle↑.variant ← immediate[];
	    handle.identifier ← [n.ch.object.remote.host, NewID[]];
	    END;
	  active =>
	    BEGIN
	    notes.noteSpace[@d.network,
	      SIZE[active BulkData.Source] - SIZE[null BulkData.Source]];
	    handle.identifier ← d.identifier;
	    handle↑.variant ← active[
	      process: Process.GetCurrent[],
	      passive: [d.network, d.host]];
	    END;
	  passive =>
	    BEGIN
	    notes.noteSpace[@d.network,
	      SIZE[active BulkData.Source] - SIZE[null BulkData.Source]];
	    handle.identifier ← d.identifier;
	    handle↑.variant ← passive[
	      process: Process.GetCurrent[],
	      active: [d.network, d.host]];
	    END;
	  ENDCASE;
	OnList[handle↑];  --tack it on to the list
	END;
      free => Destroy[handle↑];  --delete the object
      ENDCASE;
    END;  --DescribeSource

  DescribeTransferArguments: Courier.Description =
    {[] ← notes.noteSize[SIZE[TransferArguments]]};

  Destroy, OffList: PUBLIC <<XStream>> ENTRY PROC[handle: Handle] =
    BEGIN
    p: Handle ← NIL;
    FOR h: Handle ← list.first, h.link UNTIL h = NIL DO
      IF h = handle THEN
        BEGIN
	handle.seal ← smashed;  --smash the object's seal
	IF p = NIL THEN list.first ← h.link ELSE p.link ← h.link;
	CourierInternal.longZone.FREE[@handle];
	list.count ← PRED[list.count];  --one less
	EXIT;  --let's not look further
	END
      ELSE p ← h;
      REPEAT FINISHED => RETURN WITH ERROR Courier.Error[streamNotYours];
      ENDLOOP;
    END;  --Destroy

  Make: PUBLIC <<XStream>> PROC[request: XStream.Request]
    RETURNS[handle: Handle] =
    BEGIN
    handle ← CourierInternal.longZone.NEW[Object ← [
      request: request, seal: new, use: undeclared, courier: NIL,
      identifier: [address.host, NewID[]], variant: null[]]];
    WITH request SELECT FROM
        none => handle.variant ← null[];
	deferred => handle.variant ← deferred[];
	stream, proc => handle.variant ← immediate[];
        ENDCASE => ERROR BUG[bug];
    OnList[handle];  --add it to the list
    END;  --Make

  PassiveRendezvous: ENTRY PROC[passive: Handle] =
    BEGIN
    --PASSIVE ASSERTION MADE BY CALLER--
    OPEN pH: NARROW[passive, LONG POINTER TO passive Object];
    ENABLE UNWIND => NULL;
    <<
    Check the list, checking it twice, seeing who's here and who ain't.
    If we find a match between our handle.descriptor.identifier and one that's
    already in the list, toss him out of bed.  Give him our handle.reqeust
    and then go to sleep ourselves.
    If we find no match, our entry is already there and the active process will
    find it when he comes in from the cold. He should wake us when he goes home
    and then we can return.
    >>
    Process.EnableAborts[@pH.rendezvous];  --in case active wants to gun us
    Process.SetTimeout[
      @pH.rendezvous, Process.SecondsToTicks[  --set a condition timeout
	WaitTime[pH.courier.object.remote.net] +  --us to initiator
	WaitTime[pH.active.net]]];  --plus us to active
    passive.seal ← rendezvous;  --we're ready to link up
    FOR active: Handle ← list.first, active.link UNTIL active = NIL DO
      <<
      If the active party managed to call us before we got here, he will
      have linked up an active descriptor with a null host number.  If
      active is here, wake him up and go to sleep ourselves.  It's his 
      repsonsibility to wake us up when he's finished the transfer.
      >>
      IF active.identifier = passive.identifier THEN
	WITH aH: active SELECT FROM
	  active => {NOTIFY aH.rendezvous; EXIT};  --jar him loose and exit
	  ENDCASE;  --this is not the one we want; continue searching
	<<
	The active isn't here yet - wait for him.  When he comes in he will
	process the bulk data and notify us when he's finished.  We need to
	wait only a reasonable amount of time for the active party to get here.
	Once the transfer starts, it may take an arbitrary amount of time.
	Setting the timeout - we're passive.  That means that we are as close
	or closer to the initiator than the active element is.  The worst case
	for distance between the initiator and the active would be the distance
	from us to active plus the distance from us to initiator (that assumes
	the path from initiator to active goes through the net we live on).
	>>
      ENDLOOP;

    <<
    In this loop, a wakeup when the the seal is rendezvous | transferring
    is ignored.
    A wakeup with any other seal exits the loop.  It may be a timeout
    or the transfer may be complete.
    An ABORTED means that the active party called Cancel instead of
    Send or Recieive.
    >>  
    WHILE passive.seal IN[rendezvous..transferring] DO
      ENABLE ABORTED => EXIT; WAIT pH.rendezvous; ENDLOOP;

    END;  --PassiveRendezvous

  RealCourier: PROC[sH: Stream.Handle]
    RETURNS[ch: CourierInternal.ConnectionHandle] = INLINE
    {RETURN[LOOPHOLE[sH, CourierInternal.AugmentedStream].back]};

  RealStream: PROC[sH: Stream.Handle] RETURNS[Stream.Handle] = INLINE
    {RETURN[@LOOPHOLE[
      sH, CourierInternal.AugmentedStream].back.transFilter.object]};

  SendNow: Stream.SendNowProcedure =
    BEGIN
    --Don't let silly client screw up the protocol.
    IF endRecord THEN ERROR Stream.InvalidOperation;
    sH ← RealStream[sH]; sH.sendNow[sH, endRecord];
    END;  --SendNow 

  ServerCheckout: PUBLIC <<XStream>> PROC[
    cH: Courier.Handle, request: XStream.Request] =
    BEGIN
    <<
    This means the source/sink has been deserialized, but servers don't know
    they are playing a 3-party game, so we have to move carefully.
    When (IF) we locate the Object behind this Courier.Handle, the object's
    seal has the information about whether or not we are playing a 3rd
    party game.  If we are doing such, then this routine will call off to
    rendezvous with the other half of the party.  That means if we are passive,
    we wait for the active's incoming.  Else we initiate a call (using the
    Bulk Data Protocol procedure appropriate (Send | Receive).
    >>
    CoupleCourierToRequest: ENTRY PROC =
      BEGIN  
      FOR handle ← list.first, handle.link UNTIL handle = NIL DO
        WITH h: handle SELECT FROM
	  null => IF @h.courier.object = cH THEN EXIT;
	  immediate => IF @h.courier.object = cH THEN EXIT;
	  active => IF @h.courier.object = cH THEN EXIT;
	  passive => IF @h.courier.object = cH THEN EXIT;
	  ENDCASE;
	REPEAT FINISHED => RETURN WITH ERROR Courier.Error[streamNotYours];
	ENDLOOP;
      END;  --CoupleCourierToRequest

    handle: Handle;

    CoupleCourierToRequest[];  --search for the handle
    handle.request ← request;  --just for posterity
    WITH handle SELECT FROM
      null => {};  --I don't know what goes on here
      immediate =>
        BEGIN
	ENABLE UNWIND => TestForStreamDeleted[handle];
	WITH req: request SELECT FROM
	  none => {};  --accepts or supplies no data
	  stream => StreamCopy[handle];  --blind stream copy
	  proc => req.proc[handle];  --user supplied proc
	  ENDCASE;
	TestForStreamDeleted[handle];
        END;
      active => ActiveRendezvous[handle];
      passive => PassiveRendezvous[handle];
      ENDCASE;
    END;  --ServerCheckout

  StartProtocol: PUBLIC <<XStreamOps>> PROC =
    BEGIN
    ENABLE Courier.Error => CONTINUE;  --should've been duplicate export
    Courier.ExportRemoteProgram[
      programNumber: BulkData.program,
      versionRange: [BulkData.version, BulkData.version],
      dispatcher: BulkDataDispatcher, serviceName: "BulkDataTransfer"L,
      zone: CourierInternal.longZone, classOfService: bulk];
    END;  --StartProtocol

  StopProtocol: PUBLIC <<XStreamOps>> PROC =
    BEGIN
    ENABLE Courier.Error => CONTINUE;  --should've been no such export
    Courier.UnexportRemoteProgram[
      programNumber: BulkData.program,
      versionRange: [BulkData.version, BulkData.version]];
    END;  --StopProtocol

  ServerRendezvous: PROC[
    cH: Courier.Handle,
    transfer: LONG POINTER TO TransferArguments,
    use: XStreamOps.IndicatedUse] =
    BEGIN
    <<
    The active process is calling in to rendezvous with the passive party.
    If the passive party is waiting here, process his Reqeuest, else hang
    a new object on the list and he'll find us, hopefully before we time out.
    >>
    CoupleWithPassive: ENTRY PROC[wait: BOOLEAN] RETURNS[BOOLEAN ← FALSE] =
      BEGIN
      ENABLE UNWIND => NULL;
      FOR passive ← list.first, passive.link UNTIL passive = NIL DO
        IF passive.identifier = transfer.identifier THEN
	  BEGIN
	  WITH pH: passive SELECT FROM
	    passive =>
	      BEGIN
	      error: NATURAL;
	      SELECT TRUE FROM
	        (passive.seal = transferring) =>
		  error ← BulkData.Error[identifierBusy].ORD;
		(passive.seal = cancelled) =>
		  error ← BulkData.Error[transferAborted].ORD;
		(passive.use = use) =>
		  error ← BulkData.Error[wrongDirection].ORD;
		(pH.active.host # cH.remote.host) =>
		  error ← BulkData.Error[wrongHost].ORD;
		ENDCASE =>
		  BEGIN
		  passive.seal ← active.seal ← transferring;
		  RETURN[TRUE];  --ready to run
		  END;
	      RETURN WITH ERROR Courier.SignalRemoteError[error];
	      END;
	    ENDCASE;
	  END;
	<<
	Passive call isn't here yet, so we'll supply an active object and
	he'll rendezvous with it when he gets here.  If we get called with
	wait = TRUE then this is the first call.  If we're in the second
	call, we were either notified out of the first or we timed out.
	>>
	ENDLOOP;
      IF wait THEN WAIT active.rendezvous  --wait for passive to arrive
      ELSE RETURN WITH ERROR
        Courier.SignalRemoteError[BulkData.Error[noSuchIdentifier].ORD];
      END;  --CoupleWithPassive

    WakePassive: ENTRY PROC[seal: XStreamOps.ObjectSeal] = INLINE
      BEGIN
      OPEN pH: NARROW[passive, LONG POINTER TO passive Object];
      passive.seal ← seal;  --reset the use
      NOTIFY pH.rendezvous;  --life's so simple
      END;

    passive: Handle;
    active: LONG POINTER TO active Object ← cH.zone.NEW[active Object ← [
      seal: old, use: use, identifier: transfer.identifier,
      courier: LOOPHOLE[cH],  --get the handle wired in
      variant: active[
        process: Process.GetCurrent[],  --this is us
	passive: [System.nullNetworkNumber, System.nullHostNumber]]]];
    Process.SetTimeout[
      @active.rendezvous, Process.SecondsToTicks[transfer.timeout]];
    Process.EnableAborts[@active.rendezvous];
    OnList[active];  --insert into list

    BEGIN
    ENABLE
      BEGIN
      Courier.Error => ERROR BUG[bug];
      UNWIND => {OffList[active]; IF passive # NIL THEN WakePassive[cancelled]};
      END;

    SELECT TRUE FROM
      (CoupleWithPassive[TRUE]) => NULL;  --passive was laying in the bushes
      (CoupleWithPassive[FALSE]) => NULL;  --he finally got here (in time too)
      ENDCASE;  --too little too late - errored from CoupleWithPassive

    active.request ← passive.request;  --copy original request (NIT)
    active.use ← passive.use;  --that's not just a NIT!
    WITH req: active.request SELECT FROM
      none => {};  --accepts or supplies no data
      stream => StreamCopy[active];  --copies stream
      proc => req.proc[active];  --then 
      ENDCASE;
    OffList[active];  --get rid of active element
    WakePassive[finished];  --and tell passive we're finished
    END;

    END;  --ServerRendezvous

  StreamCopy: PROC[h: Handle] =
    BEGIN
    OPEN req: NARROW[h.request, stream XStream.Request];
    sH: Stream.Handle ← Create[h];  --lot of error checks
    {ENABLE UNWIND => sH.delete[sH];  --keep track of that stream
    SELECT h.use FROM  --choose direction of bit traffic
      puts => Copy[sH, req.sH];  --bits go that way
      gets => Copy[req.sH, sH];  --or bits come this way
      ENDCASE};
    sH.delete[sH];  --we created it, so we delete it
    END;  --StreamCopy

  TestForStreamDeleted: PROC[handle: Handle] =
    BEGIN
    IF handle.courier.object.sH.delete # Stream.defaultObject.delete THEN
      ERROR BUG[undeleted];
    CourierInternal.SetBulkStream[handle.courier];
    END;  --TestForStreamDeleted

  UserCheckout: PUBLIC <<XStream>> PROC[cH: Courier.Handle] =
    BEGIN
    FindCourierHandleMatch: ENTRY PROC = INLINE
      BEGIN
      FOR handle ← list.first, handle.link UNTIL handle = NIL DO
        WITH h: handle SELECT FROM
	  null => IF @h.courier.object = cH THEN EXIT;
	  immediate => IF @h.courier.object = cH THEN EXIT;
	  deferred =>
	    IF (@h.passive.object = cH) OR (@h.active.object = cH) THEN EXIT;
	  ENDCASE;
	REPEAT FINISHED => RETURN WITH ERROR Courier.Error[streamNotYours];
	ENDLOOP;
      END;  --FindCourierHandleMatch
    handle: Handle;
    --this means the source/sink has been deserialized
    FindCourierHandleMatch[];  --see if we can locate the handle
    WITH h: handle.request SELECT FROM
      none => {};  --accepts or supplies no data
      stream => StreamCopy[handle];  --blind stream copy
      proc =>
        BEGIN
	ENABLE UNWIND => TestForStreamDeleted[handle];
	h.proc[handle];  --client supplied proc
	TestForStreamDeleted[handle];
	END;
      ENDCASE;
    END;  --UserCheckout

  WaitTime: PROC[net: System.NetworkNumber] RETURNS[seconds: CARDINAL] =
    BEGIN
    <<Sure wish this wasn't monitored in some cases.>>
    hops: CARDINAL ← Router.GetDelayToNet[net!
      Router.NoTableEntryForNet => hops ← 0  --this oughta be fun--];
    RETURN[baseTMO + (hops * hopTMO)];
    END;  --WaitTime

  XStreamGet: Stream.GetProcedure =
    BEGIN
    myOptions: Stream.InputOptions = [
      terminateOnEndRecord: TRUE, signalLongBlock: FALSE,
      signalShortBlock: FALSE, signalSSTChange: FALSE,
      signalEndOfStream: options.signalEndOfStream,
      signalAttention: FALSE, signalTimeout: options.signalTimeout,
      signalEndRecord: FALSE];
    handle: Handle = NARROW[sH.clientData];  --that's the owning handle
    ch: CourierInternal.ConnectionHandle = handle.courier;  --owning courier
    sH ← RealStream[sH];  --get the real stream 
    why ← normal; sst ← BulkData.bulkSST;  --in case the block is a null block
    bytesTransferred ← 0;  --to make the loop invariant work
    UNTIL ((block.stopIndexPlusOne - block.startIndex) = bytesTransferred) DO
      BEGIN
      IF handle.use = truncated THEN GOTO truncated;  --that's all
      [bytesTransferred, why, sst] ← sH.get[sH, block, myOptions];

      SELECT why FROM
        normal =>
	  SELECT TRUE FROM
	    (sst = BulkData.bulkSST) => EXIT;  --(why = normal) AND (sst = bulk)
	    (sst # CourierProtocol.dataSST) => ERROR BUG[protocol];  --no way
	    (bytesTransferred # 0) => GOTO lostData;  --data was system data
	    ENDCASE => LOOP;  --my sst but no bytes - just ignore
        sstChange =>
	  SELECT TRUE FROM
	    (sst = CourierProtocol.dataSST) => GOTO truncated;  --short
	    (sst # BulkData.bulkSST) => ERROR BUG[protocol];  --no way
	    (bytesTransferred # 0) => GOTO lostData;  --some system data passed
	    ENDCASE => LOOP;  --retry the get again
	endRecord =>
	  IF ~options.signalEndOfStream THEN {why ← endOfStream; EXIT}
	  ELSE DO SIGNAL Stream.EndOfStream[bytesTransferred]; ENDLOOP;
	attention => GOTO truncated;  --other guys bailing out
	ENDCASE => ERROR BUG[bug];  --what else is there?

      EXITS
	lostData =>
	  BEGIN
	  CourierOps.RedirectStream[ch, block, bytesTransferred, why, sst];
	  WITH vep: ch SELECT FROM
	    user => SELECT TRUE FROM
	      (vep.versExchProc = NIL) => GOTO truncated;  --already been done
	      (vep.versExchProc[ch] # noError) => GOTO truncated;
	      ENDCASE;  --version exchange and it's okay - do .get again
	    ENDCASE => GOTO truncated;  --wasn't a user connection
	  bytesTransferred ← 0;  --reset loop invariant
	  END;
      END;

      REPEAT
        truncated =>
          BEGIN
	  handle.use ← truncated;  --record this bit of trivia
	  RETURN WITH ERROR ABORTED;  --simply abort the client
	  END;
      ENDLOOP;
    END;  --XStreamGet

  XStreamPut: Stream.PutProcedure =
    BEGIN
    IF NARROW[sH.clientData, Handle].use = truncated THEN
      BEGIN
      sH.sendAttention[sH, BulkData.attnByte];  --we see the close
      sH.delete ← DeleteNull;  --so we don't also send EOM
      ERROR ABORTED;  --never to return
      END;
    sH ← RealStream[sH];  --if you don't do this, you'll recurse
    sH.put[sH, block, endRecord];  --else do the put for him 
    END;  --XStreamPut

  END.  --XStreamImpl 

LOG  ( date - person - action )
 2-Jun-85 15:10:00  AOF  Created file
19-Oct-86 12:40:45  AOF  The store/fetch vs source/sink => gets/puts mapping
21-Oct-86 15:03:39  AOF  Aborting (swapping attn packets and other lies).
31-Oct-86 11:46:10  AOF  Reorder statements in Create to avoid race with attn.
26-Nov-86 10:14:31  AOF  Get Copy to use PacketStream.
 8-Jan-87 10:30:57  AOF  Lighten up on checking for stream being "reused".
27-Jan-87 18:42:40  AOF  AttentionProc catches Courier.Error, ~ConnectionSusp.
 4-Jun-87 11:28:00  AOF  Xlate BD error to Courier.Error in ActiveRendezvous.
 4-Jun-87 12:27:23  AOF  Allow for active to arrive earlier than passive.
 3-Sep-87 14:49:34  AOF  Watch out for NIL from psH.get in $Copy.
23-Sep-87  9:35:21  AOF  Use Stream.defaultObject.* on for invalid stream ops.
23-Sep-87 19:46:30  AOF  Help save client from him/her self.
 9-Jan-88 17:59:34  AOF  Playing the version # exchange game.
28-Jan-88  9:39:12  AOF  AR#12728 - sending eom twice using $Copy.