-- File: CourierImplU.mesa - last edit:
-- AOF                 27-Oct-87 21:15:13
-- Copyright (C) 1984, 1985, 1986, 1987 by Xerox Corporation. All rights reserved. 

DIRECTORY
  Courier USING [
    Arguments, Description, Error, ErrorCode, Handle, InvalidArguments,
    LocalSystemElement, Parameters, RemoteErrorSignalled, VersionMismatch,
    VersionRange],
  CourierInternal USING [
    activeTimeout, AugmentedStream, Closed, CreateDefaultStream, doStats,
    ExchWords, inputOptions, RegisterTransport, seal, VersExchProcType,
    SetBulkStream, SetMessageProtocolVersion, stats, streamDefaultWaitTime,
    ttDefault, UserConnection],
  CourierOps USING [
    DeleteStream, Fetch, FlushToEndOfMessage, GetBlock, PutBlock,
    SetIdleWatcher, Store],
  CourierProtocol USING [
    dataSST, Protocol, Protocol3Body, ProtocolRange, pvHigh, RejectCode],
  Environment USING [bytesPerWord],
  Process USING [CancelAbort, GetCurrent],
  Stream USING [Handle, TimeOut],
  System USING [GetGreenwichMeanTime, nullNetworkNumber];

CourierImplU: PROGRAM
  IMPORTS Courier, CourierInternal, CourierOps, Process, Stream, System
  EXPORTS Courier =
  BEGIN  --Implementation of the Courier User.
  
  --LOCAL VARIABLES/CONSTANTS
  uniqueTransaction: CARDINAL = 0;
  bpw: CARDINAL = Environment.bytesPerWord;
  maxSecondsWaitable: LONG CARDINAL = LAST[LONG CARDINAL] / 1000;
  pvEqual: CourierProtocol.Protocol = CourierProtocol.pvHigh;

  Call: PUBLIC PROC[
    cH: Courier.Handle, procedureNumber: CARDINAL,
    arguments, results: Courier.Parameters, timeoutInSeconds: LONG CARDINAL,
    requestDataStream: BOOLEAN, streamCheckoutProc: PROC[cH: Courier.Handle]]
    RETURNS[sH: Stream.Handle] =
    BEGIN
    versExchDone: BOOLEAN ← TRUE;
    error: Courier.ErrorCode ← noError;
    ch: CourierInternal.UserConnection ← LOOPHOLE[cH];
    connectionless: BOOLEAN ← FALSE;  --delete stream after call?
    
    SELECT TRUE FROM
      (ch = NIL), (ch.seal # CourierInternal.seal) =>
        RETURN WITH ERROR Courier.Error[invalidHandle];
      (ch.streamState = busy), (ch.streamState = out) => 
        RETURN WITH ERROR Courier.Error[streamNotYours];
      ENDCASE;
    
    BEGIN
    ENABLE
      BEGIN
      UNWIND => CourierOps.SetIdleWatcher[ch];  --start clocking the stream
      Courier.Error =>
      --delete the stream on those errors that leave a broken stream around
        SELECT errorCode FROM
	  transportTimeout => CourierOps.DeleteStream[ch];
	  noRouteToSystemElement => CourierOps.DeleteStream[ch];
	  ENDCASE; 
      Stream.TimeOut =>
      --timeout waiting for results or processing same
        BEGIN
	error ← IF (System.GetGreenwichMeanTime[] < ch.clock) THEN
	  transportTimeout ELSE returnTimedOut;
	GOTO delete;  --gun the stream in any case
	END;
      --signal raised locally; translate to Error
      Courier.InvalidArguments => {error ← invalidArguments; GOTO delete};
      --cases where server closed stream | pseudo-version mismatch
      CourierInternal.Closed => {CourierOps.DeleteStream[ch]; RETRY};
      END;

    ch.owner ← Process.GetCurrent[];  --record current owner
    ch.streamState ← busy;  --and lock the stream
    
    IF (ch.transFilter = NIL) THEN
      BEGIN
      --set up default transport if nobody has registered one for this connection.
      IF ch.createTransport = NIL THEN CourierInternal.RegisterTransport[
	cH, CourierInternal.CreateDefaultStream, CourierInternal.ttDefault];
      [ch.transFilter, ch.protocolRange, connectionless] ← ch.createTransport[ch];
      
      <<
      Protocol version arbitration. The transport filter tells us what our
      low and high is, because transports that are not connection-oriented need
      to piggyback the version numbers (which can only be done if low = high).
      >>
      CourierOps.PutBlock[
	ch, [@ch.protocolRange, 0, SIZE[CourierProtocol.ProtocolRange] * bpw]];
      CourierInternal.SetMessageProtocolVersion[ch, pvEqual];  --has to be
      CourierInternal.SetBulkStream[ch];  --build up the bulk stream
      ch.versExchProc ← VersExchProc;  --proc to deal with version exchanges
      END;

    sH ← @ch.transFilter.object;
     
    --Transmit call message 
    WITH ch.message SELECT FROM
       protocol3 =>
	 BEGIN
	 protocol3Body ← [call[
	   transaction: uniqueTransaction,
	   program: CourierInternal.ExchWords[ch.object.programNumber],
	   version: ch.object.versionNumber, procedure: procedureNumber]];
	 CourierOps.PutBlock[ch,
	   [@protocol3Body, 0, SIZE[call CourierProtocol.Protocol3Body] * bpw]]; 
	 END;
       ENDCASE;

    --Append client arguments
    IF (arguments.location # NIL) AND (arguments.description # NIL) THEN
      CourierOps.Fetch[ch, arguments.location, arguments.description];
    sH.sendNow[sH, TRUE];  --transmit and set endRecord
      
    IF CourierInternal.doStats THEN
      BEGIN
      CourierInternal.stats[callsTransmitted] ←
        CourierInternal.stats[callsTransmitted] + 1;
      IF (ch.object.remote.net = System.nullNetworkNumber) OR
        (ch.object.remote.net = Courier.LocalSystemElement[].net) THEN
          CourierInternal.stats[localCallsTransmitted] ←
            CourierInternal.stats[localCallsTransmitted] + 1;
      END;

    --Bulk data, version 2
    IF streamCheckoutProc # NIL THEN
      BEGIN
      <<
      Bulk stream is already set in the object, here we are just allowing
      client to use it. This is really a crock!
      >>
      ch.streamState ← out; ch.lastSST ← CourierProtocol.dataSST;
      sH.setTimeout[sH, CourierInternal.streamDefaultWaitTime];
      streamCheckoutProc[cH];
      Process.CancelAbort[Process.GetCurrent[]];
      ch.streamState ← busy;
      sH.setSST[sH, CourierProtocol.dataSST];
      END;

    sH.options ← CourierInternal.inputOptions;
    IF timeoutInSeconds > maxSecondsWaitable THEN
      BEGIN     
      ch.clock ← [LAST[LONG CARDINAL]];
      sH.setTimeout[sH, LAST[LONG CARDINAL]];
      END
    ELSE
      BEGIN
      ch.clock ← [System.GetGreenwichMeanTime[] + timeoutInSeconds];
      sH.setTimeout[sH, timeoutInSeconds * 1000];
      END;
    <<
    IF ch.versExchProc is NIL then the version exchange is complete. If it
    isn't then we still have to deal with this situation.
    >>
    SELECT TRUE FROM
      (ch.versExchProc = NIL) => NULL;  --this has already been done
      ((error ← ch.versExchProc[ch]) # noError) => GOTO delete;
      ENDCASE;  --if it endcased then it passed the version test

    BEGIN
    ENABLE ABORTED => GOTO aborted;      
    --Wait for return message to arrive
    ch.endRecord ← FALSE;  --just in case it was left over
    WITH ch.message SELECT FROM
      protocol3 =>
        BEGIN
	CourierOps.GetBlock[ch, [@protocol3Body, 0, 4]];
	ch.clock ← [LAST[LONG CARDINAL]];  --disallow 'returnTimedOut'
	sH.setTimeout[sH, CourierInternal.activeTimeout];
	WITH p3b: protocol3Body SELECT FROM
	  return => NULL;
	  abort => P3Abort[ch, @p3b];
	  reject => P3Reject[ch, @p3b];
	  ENDCASE => ERROR Courier.Error[invalidMessage];
	END;
      ENDCASE;
    END;  --enable aborted
    
    --only return message types get here...all others signal and never return
    
    IF CourierInternal.doStats THEN
      CourierInternal.stats[returnsReceived] ←
        CourierInternal.stats[returnsReceived] + 1;

    --now receive and store the client's results
    IF (results.location # NIL) AND (results.description # NIL) THEN
      CourierOps.Store[ch, results.location, results.description !
        UNWIND => [] ← CourierOps.FlushToEndOfMessage[ch]];
    IF CourierOps.FlushToEndOfMessage[ch] THEN
	ERROR Courier.Error[parameterInconsistency];

    --Bulk data, version 1
    IF requestDataStream THEN
      BEGIN  --the bulkFilter.object has already been set.
      ch.streamState ← out;
      sH.setTimeout[sH, CourierInternal.streamDefaultWaitTime];
      ch.object.sH ← sH;
      END
    ELSE
      BEGIN
      --transport is not connection oriented, delete stream now.
      IF connectionless THEN CourierOps.DeleteStream[ch]
      ELSE CourierOps.SetIdleWatcher[ch];  --idles the stream
      sH ← NIL;  --client gets nothing if he doesn't ask
      END;

    EXITS
      delete => {CourierOps.DeleteStream[ch]; ERROR Courier.Error[error]};
      aborted => {CourierOps.DeleteStream[ch]; ERROR ABORTED};
    END;  --encompasses entire module
    END;  --Call

  VersExchProc: CourierInternal.VersExchProcType =
    BEGIN
    hisRange: CourierProtocol.ProtocolRange;
    ch.endRecord ← FALSE;  --just like starting out fresh
    CourierOps.GetBlock[ 
      ch, [@hisRange, 0, SIZE[CourierProtocol.ProtocolRange] * bpw]];
    code ← IF pvEqual IN[hisRange.low..hisRange.high]
      THEN noError ELSE protocolMismatch;
    ch.endRecord ← FALSE;  --just in case he sent it with the version #s
    NARROW[ch, CourierInternal.UserConnection].versExchProc ← NIL;  --fini
    END;  --VersExchProc
  
  P3Abort: PROC [
    ch: CourierInternal.UserConnection,
    abort: LONG POINTER TO abort CourierProtocol.Protocol3Body] =
    BEGIN
    <<
    We have received an "abort" response from the remote procedure.  We have
    to translate that to an ERROR RemoteErrorSignalled and allow the client to
    extract the data from the error.  Once the parameters are set up, it looks
    just like a normal return.
    >>

    AbortDescription: Courier.Arguments =
      --PROC[argumentsRecord: Courier.Parameters];
      BEGIN
      IF (argumentsRecord.location # NIL)
        AND (argumentsRecord.description # NIL) THEN
        CourierOps.Store[
          ch, argumentsRecord.location, argumentsRecord.description];
      IF CourierOps.FlushToEndOfMessage[ch] THEN
	  ERROR Courier.Error[parameterInconsistency];
      END;

    --Get the remainder of the abort message. 
    CourierOps.GetBlock[ch, [@abort.abort, 0, SIZE[CARDINAL]*bpw]];
    IF CourierInternal.doStats THEN
      CourierInternal.stats[abortsReceived] ←
        CourierInternal.stats[abortsReceived] + 1;
    ERROR Courier.RemoteErrorSignalled[abort.abort, AbortDescription];
    END;

  P3Reject: PROC[
    ch: CourierInternal.UserConnection,
    reject: LONG POINTER TO reject CourierProtocol.Protocol3Body] =
    BEGIN
    CourierOps.GetBlock[ch,
      [@reject.rejectBody, 0, (SIZE[CourierProtocol.RejectCode] * bpw)]];

    WITH v: reject SELECT FROM  --just how ugly can we make this?
      noSuchVersionNumber =>  --how many times would we like to test these bits?
	CourierOps.GetBlock[ch,  --and fill in a couple more bytes of data
	  [@v.range, 0, (SIZE[Courier.VersionRange] * bpw)]];
      ENDCASE;
    
    [] ← CourierOps.FlushToEndOfMessage[ch];  --was there anything left?

    IF CourierInternal.doStats THEN
      CourierInternal.stats[rejectsReceived] ←
        CourierInternal.stats[rejectsReceived] + 1;

    WITH rb: reject SELECT FROM  --finally raise the error
      noSuchProgramNumber => ERROR Courier.Error[noSuchProgramNumber];
      noSuchVersionNumber => ERROR Courier.VersionMismatch[rb.range];
      noSuchProcedureValue => ERROR Courier.Error[noSuchProcedureNumber];
      invalidArguments => ERROR Courier.Error[invalidArguments];
      ENDCASE => ERROR Courier.Error[unknownErrorInRemoteProcedure];
    END;

  END..... -- of CourierImplU.mesa

LOG

30-Jan-85 19:55:55  AOF  Post Klamath.
29-Jun-84 10:23:39  SMA  Factor Courier from NetworkStreams.
 2-Jul-84 16:33:35  SMA  Get rid of ch.sH.
 9-Oct-84 12:59:06  SMA  Remove dependency on NSTypes.WaitTime.
26-Nov-84 15:08:21  SMA  Version arbitration back into Courier.
29-Nov-84  9:32:57  SMA  Delete stream after call if transport is connectionless.
21-Dec-84 13:08:06  SMA  AugmentedStreams and new interfaces.
31-Dec-84  8:29:12  AOF  SetSST before placing call on old transport.
 7-Jan-85 17:05:04  SMA  No more callerAborted translating.
10-Jan-85 14:36:21  SMA  bulkFilter set when transport created.
30-Jan-85 19:56:07  AOF  Code byte squeezing.
 5-Feb-85 18:06:48  SMA  Don't try to delete stream when error is invalidHandle.
12-Mar-85 10:57:28  AOF  Moved createTimeout and hopWeight to CourierImplM.
 8-Apr-85 16:59:47  AOF  Delete stream when client aborts waiting for results.
31-May-85 16:50:14  AOF  Check Courier.errorCode for parmInc before delete sH.
12-Nov-86 15:30:23  AOF  P3.Abort: First get the error code, then test it
16-Jan-87 10:14:10  AOF  Removal of Courier Version 2.0
23-Oct-87 19:03:14  AOF  And then try to figure out a way to support vers exch.