-- File: CourierImplS.mesa - last edit:
-- AOF                 27-Oct-87 21:15:25
-- Copyright (C) 1984, 1985, 1986, 1987 by Xerox Corporation. All rights reserved. 

DIRECTORY
  Courier USING [
    Arguments, Description, Dispatcher, Error, ErrorCode,
    InvalidArguments, LocalSystemElement, NoSuchProcedureNumber,
    Parameters, Results, SignalRemoteError, SystemElement, VersionRange],
  CourierInternal USING [
    activeTimeout, AugmentedStream, Closed, doStats, ExchWords,
    inputOptions, ServerConnection, serverIdleTimeout, SetBulkStream,
    SetMessageProtocolVersion, stats, streamDefaultWaitTime],
  CourierOps USING [
    CreateInternal, DeleteConnection, DeleteStream, Fetch,
    FlushToEndOfMessage, GetBlock, PutBlock, SearchForExport, Store],
  CourierProtocol USING [
    dataSST, Protocol, Protocol3Body, ProtocolRange, pvHigh, RejectCode],
  Environment USING [Byte, bytesPerWord],
  NetworkStream USING [closeSST],
  NSConstants USING [courierSocket],
  Process USING [Abort, CancelAbort, GetCurrent],
  Stream USING [
    Byte, CompletionCode, defaultInputOptions, Handle, InputOptions,
    SubSequenceType, TimeOut],
  System USING [NetworkAddress, nullNetworkNumber];
  
CourierImplS: PROGRAM
  IMPORTS Process, Courier, CourierInternal, CourierOps, Stream
  EXPORTS CourierInternal =
  BEGIN
  --Local Variables/Constants
  bpw: CARDINAL = Environment.bytesPerWord;
  exchangeVersionNumbersImmediately: BOOLEAN = TRUE;
  pvEqual: CourierProtocol.Protocol = CourierProtocol.pvHigh;
  
  IntersectingRanges: PROC[range: CourierProtocol.ProtocolRange]
    RETURNS[BOOLEAN] = INLINE {RETURN[pvEqual IN[range.low..range.high]]};

  FlushOutOfBandAttn: PROC[sH: Stream.Handle] =
    BEGIN
    Process.Abort[Process.GetCurrent[]];
    --UNTIL ABORTED-- DO
      [] ← sH.waitAttention[sH ! ABORTED => EXIT]; ENDLOOP;
    END;  --FlushOutOfBandAttn
    

  Receiver: PUBLIC PROC [
    aH: CourierInternal.AugmentedStream, remote: System.NetworkAddress,
    myPv: CourierProtocol.ProtocolRange, connectionless: BOOLEAN] =
    BEGIN
      
    arguments: Courier.Arguments =
      --PROCEDURE [argumentsRecord: Courier.Parameters];
      --Remote program's dispatcher calls here to get the procedure arguments.
      BEGIN
      ENABLE
        BEGIN
	UNWIND => [] ← CourierOps.FlushToEndOfMessage[ch];
	Stream.TimeOut => {error ← transportTimeout; GOTO stream};
	END;
        
      IF (argumentsRecord.location # NIL)
        AND (argumentsRecord.description # NIL) THEN
          CourierOps.Store[
	    ch, argumentsRecord.location, argumentsRecord.description];
      IF CourierOps.FlushToEndOfMessage[ch] THEN
        {error ← parameterInconsistency; GOTO stream};
      --the bulkFilter object is already set - here we are just allowing the
      --client to use it.
      ch.streamState ← out;  --until further notice
      ch.lastSST ← CourierProtocol.dataSST;  --to filter
      ch.transFilter.object.options ← Stream.defaultInputOptions;
      ch.transFilter.object.setTimeout[
        @ch.transFilter.object, CourierInternal.streamDefaultWaitTime];
      EXITS stream => Courier.Error[error];
      END;  --arguments

    results: Courier.Results =
      --PROCEDURE [
      --     resultsRecord: Courier.Parameters,
      --     requestDataStream: BOOLEAN]
      --     RETURNS [sH: Stream.Handle];
      --Remote program's dispatcher calls here to get the procedure arguments.
      BEGIN
      ENABLE
        --Problems with the stream.  The signal generated if the stream is
	--suspended will proprogate thru here to the client.
	Stream.TimeOut=> {error ← transportTimeout; GOTO transportError};
	
      bytes: CARDINAL;
      sst: Stream.SubSequenceType;
      status: Stream.CompletionCode;
      specialOptions: Stream.InputOptions = [
        terminateOnEndRecord: TRUE, signalLongBlock: FALSE,
	signalShortBlock: FALSE, signalSSTChange: FALSE,
	signalEndOfStream: FALSE, signalAttention: FALSE,
	signalTimeout: FALSE, signalEndRecord:FALSE];
      trash: PACKED ARRAY INTEGER[0..2) OF Environment.Byte;
      sH ← @ch.transFilter.object; ch.streamState ← busy;
      Process.CancelAbort[ch.owner];  --just in case client ...
      sH.setSST[sH, CourierProtocol.dataSST];  --in case he didn't

      --UNTIL EXIT | ERROR-- DO
	sH.setTimeout[sH, 0];  --set very short timeout
	[bytes, status, sst] ← sH.get[
	  sH, [@trash, 0, LENGTH[trash]], specialOptions];
	sH.setTimeout[sH, CourierInternal.activeTimeout];
	SELECT TRUE FROM
	  (status = timeout) => EXIT;  --this is as good as it gets
	  (status = attention) => FlushOutOfBandAttn[sH];  --and this is as bad
	  (sst = NetworkStream.closeSST) =>  --caller tired of waiting
	    {streamClosed ← TRUE; ERROR Courier.Error[callerAborted]};
	  (sst # CourierProtocol.dataSST) => NULL;  --leftover from bulk data?
	  (bytes > 0) => ERROR Courier.Error[parameterInconsistency];
	  ENDCASE;
	ENDLOOP;

      IF ~exchangeVersionNumbersImmediately THEN
        CourierOps.PutBlock[  --send the protocol version numbers
	  ch, [@myPv, 0, SIZE[CourierProtocol.ProtocolRange] * bpw]];

      WITH ch.message SELECT FROM
	protocol3 =>
	  BEGIN
	  protocol3Body ← [return[tid]];
	  CourierOps.PutBlock[
	    ch, [@protocol3Body, 0,
	    SIZE[return CourierProtocol.Protocol3Body] * bpw]]; 
	  END;
	ENDCASE;
      IF CourierInternal.doStats THEN
        CourierInternal.stats[returnsTransmitted] ←
	  CourierInternal.stats[returnsTransmitted] + 1;
      IF (resultsRecord.location # NIL) AND (resultsRecord.description # NIL) THEN
        CourierOps.Fetch[
	  ch, resultsRecord.location, resultsRecord.description];
      sH.sendNow[sH, TRUE];
      IF requestDataStream THEN
        BEGIN  --the bulkFilter.object has already been set.
	ch.streamState ← out;
	sH.setTimeout[sH, CourierInternal.streamDefaultWaitTime];
	ch.object.sH ← sH;
	END
      ELSE sH ← NIL;
      EXITS transportError => Courier.Error[error];
      END;  --results

    tid: CARDINAL;
    streamClosed: BOOLEAN ← FALSE;
    error: Courier.ErrorCode ← noError;
    ch: CourierInternal.ServerConnection;

    BEGIN
    ENABLE Stream.TimeOut, CourierInternal.Closed => GOTO error;
    procedureNumber: CARDINAL;
    range: Courier.VersionRange;
    dispatcher: Courier.Dispatcher;
    sH: Stream.Handle;
    
    --The transport client has created a transport for us, now create the
    --connection object and set the back pointer in the augmented stream object.
    ch ← LOOPHOLE[CourierOps.CreateInternal[server, remote]];
    ch.streamState ← busy;  --it will be real soon
    ch.transFilter ← aH;
    ch.transFilter.back ← ch;
    ch.object.remote ← remote;
    ch.owner ← Process.GetCurrent[];
    CourierInternal.SetBulkStream[ch];
    sH ← @ch.transFilter.object;
    ch.object.remote.socket ← NSConstants.courierSocket;
    
    BEGIN
    <<
    Protocol version arbitration.
    There doesn't seem to be any way to piggyback our version numbers with
    the return message safely. The version numbers have to go before the
    bulk data, but what if the bulk data is sent to us in the first call of
    a brand new stream? The server here can't tell that the client is going
    to do that. If he's sending (i.e., the other end is receiving) then he
    is obligated to change the SST, thus flushing the version numbers.
    This is the pits! Let's just force out the damn things and forget it.
    Ya, it may be the pits, and it also has to be done. [AOF 26-Oct-87  9:21:58]
    >> 
    ENABLE Courier.Error => GOTO error;  --just can't use this stream    
    CourierOps.GetBlock[  --be careful, we haven't set input options
      ch, [@ch.protocolRange, 0, SIZE[CourierProtocol.ProtocolRange] * bpw]];

    IF exchangeVersionNumbersImmediately OR
      (ch.protocolRange.low # pvEqual) OR
      (ch.protocolRange.high # pvEqual) THEN
      BEGIN
      CourierOps.PutBlock[
        ch, [@myPv, 0, SIZE[CourierProtocol.ProtocolRange] * bpw]];
      sH.sendNow[sH, FALSE];  --force the packet out - no EOM set
      END;
    IF ~IntersectingRanges[ch.protocolRange] THEN  GOTO delete;  --failure

    CourierInternal.SetMessageProtocolVersion[ch, pvEqual];
    END;
    
    DO  --until connection ends via close or timeout.
      ENABLE Courier.Error =>
        IF errorCode = parameterInconsistency THEN RETRY ELSE GOTO error;
      Process.CancelAbort[ch.owner];  --just in case client ...
      IF streamClosed THEN GOTO error;  --caller aborted (in receiver)
      ch.endRecord ← FALSE; ch.streamState ← busy;  --set state of stream
      sH.setTimeout[sH, CourierInternal.serverIdleTimeout];
      sH.options ← CourierInternal.inputOptions;
      sH.setSST[sH, CourierProtocol.dataSST];
      WITH ch.message SELECT FROM
	protocol3 =>
	  BEGIN
	  CourierOps.GetBlock[
	    ch, [@protocol3Body, 0,
	    SIZE[call CourierProtocol.Protocol3Body] * bpw]];
	  WITH protocol3Body SELECT FROM
	    call =>
	      BEGIN
	      ch.object.programNumber ← CourierInternal.ExchWords[program];
	      ch.object.versionNumber ← version;
	      procedureNumber ← procedure;
	      tid ← transaction;
	      END;
	    --anything else is invalid
	    ENDCASE => {[] ← CourierOps.FlushToEndOfMessage[ch]; LOOP};
	  END;
	ENDCASE;

      --now we have something to do
      sH.setTimeout[sH, CourierInternal.activeTimeout];
      
      IF CourierInternal.doStats THEN
        BEGIN
        CourierInternal.stats[callsReceived] ←
	  CourierInternal.stats[callsReceived] + 1;
	IF (ch.object.remote.net = System.nullNetworkNumber) OR
	  (ch.object.remote.net = Courier.LocalSystemElement[].net) THEN
	  CourierInternal.stats[localCallsReceived] ←
	    CourierInternal.stats[localCallsReceived] + 1;
	END;

      [dispatcher, range] ← CourierOps.SearchForExport[ch];
      IF dispatcher = NIL THEN
        {SendReject[ch, range, noSuchProgramNumber, tid]; LOOP};
      IF ch.object.versionNumber ~IN[range.low..range.high] THEN
        {SendReject[ch, range, noSuchVersionNumber, tid]; LOOP};

      --let client code take over
      dispatcher[@ch.object, procedureNumber, arguments, results !
        Courier.NoSuchProcedureNumber =>
	  {SendReject[ch, range, noSuchProcedureValue, tid]; CONTINUE};
	Courier.InvalidArguments =>
	  {SendReject[ch, range, invalidArguments, tid]; CONTINUE};
        --Don't UNWIND yet - client may have local frames of interest
        Courier.SignalRemoteError =>
	  {SendAbort[ch, errorNumber, tid, arguments]; CONTINUE}];
      --delete stream if transport is connectionless.
      IF connectionless THEN GOTO delete;
      ENDLOOP;
    EXITS delete, error =>
      BEGIN
      CourierOps.DeleteStream[ch];
      CourierOps.DeleteConnection[ch];
      END;
    END;
    END;  --Receiver
    

  SendReject: PROCEDURE [
    ch: CourierInternal.ServerConnection,
    range: Courier.VersionRange,
    rejectCode: CourierProtocol.RejectCode,
    tid: CARDINAL] =
    BEGIN
    --Send error message to the remote found in the connection specified.
    --These errors are discovered short of receiving the data from the link,
    --so the link is in an unknown state requiring we flush the input
    --stream's data.

    ENABLE Stream.TimeOut, Courier.Error =>  --Can only be a stream problem.
      GOTO error;
    
    msgSize: CARDINAL;
    msgAddr: LONG POINTER;

    IF CourierInternal.doStats THEN
      CourierInternal.stats[rejectsTransmitted] ←
        CourierInternal.stats[rejectsTransmitted] + 1;
    WITH ch.message SELECT FROM
      protocol3 =>
        BEGIN
	msgAddr ← @protocol3Body;
	SELECT rejectCode FROM
	  noSuchProgramNumber =>
	    BEGIN
	    protocol3Body ← [reject[tid, noSuchProgramNumber[]]];
	    msgSize ← SIZE[
	      noSuchProgramNumber reject CourierProtocol.Protocol3Body] * bpw;
	    END;
	  noSuchProcedureValue =>
	    BEGIN
	    protocol3Body ← [reject[tid, noSuchProcedureValue[]]];
	    msgSize ← SIZE[
	      noSuchProcedureValue reject CourierProtocol.Protocol3Body] * bpw;
	    END;
	  invalidArguments =>
	    BEGIN
	    protocol3Body ← [reject[tid, invalidArguments[]]];
	    msgSize ← SIZE[
	      invalidArguments reject CourierProtocol.Protocol3Body] * bpw;
	    END;
	  noSuchVersionNumber =>
	    BEGIN
	    protocol3Body ← [reject[tid, noSuchVersionNumber[range]]];
	    msgSize ← SIZE[
	      noSuchVersionNumber reject CourierProtocol.Protocol3Body] * bpw;
	    END;
	  ENDCASE;
	END;
      ENDCASE;
    [] ← CourierOps.FlushToEndOfMessage[ch];
    CourierOps.PutBlock[ch, [msgAddr, 0, msgSize]];
    ch.transFilter.object.sendNow[@ch.transFilter.object, TRUE];

    --Ignore problems with the stream.  They get noticed on the next get.
    EXITS error => NULL;  
    END;  --SendReject
    

  SendAbort: PROCEDURE [
    ch: CourierInternal.ServerConnection,
    abortCode: CARDINAL, tid: CARDINAL,
    arguments: Courier.Parameters] =
    BEGIN
    --Send error message to the remote found in the connection specified.
    --This is an error generated by the remote program. The stream will(?)
    --survive as it is.

    ENABLE Stream.TimeOut, Courier.Error => --Can only be a stream problem.
      GOTO error;
    
    IF CourierInternal.doStats THEN
      CourierInternal.stats[abortsTransmitted] ←
        CourierInternal.stats[abortsTransmitted] + 1;
    ch.streamState ← busy;
    ch.transFilter.object.setSST[@ch.transFilter.object, CourierProtocol.dataSST];
    WITH ch.message SELECT FROM
      protocol3 =>
        BEGIN
	protocol3Body ← [abort[tid, abortCode]];
	CourierOps.PutBlock[ch,
	  [@protocol3Body, 0, SIZE[abort CourierProtocol.Protocol3Body] * bpw]];
	END;
      ENDCASE;
    IF (arguments.location # NIL) AND (arguments.description # NIL) THEN
      CourierOps.Fetch[ch, arguments.location, arguments.description];
    ch.transFilter.object.sendNow[@ch.transFilter.object, TRUE];
    EXITS error => NULL;  --Ignore problems with the stream.  They will be
                          --noticed on the next operation.
    END;  --SendAbort

END..... --of CourierImplS.mesa

LOG
 4-Feb-85 18:34:44  AOF  Post Klamath.
29-Jun-84 10:25:01  SMA  Factor Courier from NetworkStreams.
28-Nov-84 16:39:01  SMA  Delete stream after return if connectionless.
28-Nov-84 16:39:01  SMA  AugmentedStreams and new interfaces.
31-Dec-84  8:32:50  AOF  SetSST = dataSST before sending results.
10-Jan-85 14:24:23  SMA  bulkFilter set when connection and transport created.
10-Jan-85 17:48:06  SMA  Delete stream if protocol versions don't intersect.
 4-Feb-85 18:35:07  AOF  More care about RETRY'ng Courier.Error.
21-Sep-85 12:03:39  AOF  Allow bulk data on stream in 'results'.
20-Mar-86 15:43:28  AOF  Force version # exchange ~piggybacked
 8-Jun-86 17:21:41  AOF  Undo ~piggyback version numbers
12-Nov-86 15:36:47  AOF  FlushToEOS on UNWIND in arguments proc
16-Jan-87 10:14:31  AOF  Removal of Courier Version 2.0
26-Oct-87 11:19:49  AOF  And hopefully making it right (sorting version exchs)