-- File: CourierImplF.mesa - last edit:
-- AOF                 11-Jan-88 19:50:46
-- SMA                 10-Jan-85 14:40:41
-- Copyright (C) 1984, 1985, 1986, 1987, 1988 by Xerox Corporation. All rights reserved. 

DIRECTORY
  ByteBlt USING [ByteBlt],
  Courier USING [Error, ErrorCode],
  CourierInternal USING [
    AugmentedStream, AugmentedObj, ConnectionHandle, ConnectionObject,
    longZone, UserConnection, Redirect, RedirectedObject],
  CourierOps USING [],
  CourierProtocol USING [dataSST],
  Stream USING [
    Attention, Block, CompletionCode, Handle, InputOptions,
    InvalidOperation, Object, Position, PutProcedure, SSTChange,
    SubSequenceType, defaultObject, GetProcedure, GetSSTProcedure,
    GetTimeoutProcedure, SendAttentionProcedure, SendNowProcedure,
    SetSSTProcedure, SetTimeoutProcedure, WaitAttentionProcedure];
  
CourierImplF: PROGRAM
  IMPORTS ByteBlt, Courier, CourierInternal, Stream
  EXPORTS CourierInternal, CourierOps =
  BEGIN

  Block: TYPE = Stream.Block;
  Handle: TYPE = Stream.Handle;
  Position: TYPE = Stream.Position;
  InputOptions: TYPE = Stream.InputOptions;
  CompletionCode: TYPE = Stream.CompletionCode;
  SubSequenceType: TYPE = Stream.SubSequenceType;

  RealStream: PROC[sH: Stream.Handle] RETURNS[Stream.Handle] = INLINE
    BEGIN
    RETURN[@LOOPHOLE[
      sH, CourierInternal.AugmentedStream].back.transFilter.object];
    END;  --RealStream  

  Get: Stream.GetProcedure =
    BEGIN
    gotSST: Stream.SubSequenceType;
    ch: CourierInternal.ConnectionHandle = LOOPHOLE[
      sH, CourierInternal.AugmentedStream].back;
    sH ← RealStream[sH];  --get the real stream
    why ← normal; sst ← ch.lastSST;  --in case the block is a null block
    bytesTransferred ← 0;  --to make the loop invariant work
    UNTIL ((block.stopIndexPlusOne - block.startIndex) = bytesTransferred) DO
      BEGIN
      [bytesTransferred, why, gotSST] ← sH.get[sH, block, options !
	Stream.SSTChange =>
	  BEGIN
	  bytesTransferred ← nextIndex;  --moved that many bytes
	  gotSST ← sst;  --copy the new sst into local variable
	  why ← sstChange;  --that's what it will be if we process it
	  IF ch.lastSST = CourierProtocol.dataSST THEN CONTINUE;  --my data
	  ch.lastSST ← sst;  --copy this into our state
	  REJECT;  --and let the client deal with it
	  END];
      IF why = sstChange THEN
	SELECT TRUE FROM
	  (gotSST = CourierProtocol.dataSST) => GOTO truncated;  --short
	  (ch.lastSST # CourierProtocol.dataSST) => GOTO notFinished;
	  (bytesTransferred # 0) => GOTO lostData;  --that was my data
	  ENDCASE => GOTO notFinished  --no bytes of my data moved
      ELSE
	SELECT TRUE FROM
	  (gotSST # CourierProtocol.dataSST) => GOTO exit;  --it wasn't my data
	  (bytesTransferred # 0) => GOTO lostData;  --that data was mine
	  ENDCASE => LOOP;  --my sst but no bytes - just ignore
      EXITS
        notFinished => block.startIndex ← block.startIndex + bytesTransferred;
        lostData =>
	  BEGIN
	  RedirectStream[ch, block, bytesTransferred, why, gotSST];  --backtrack
	  WITH vep: ch SELECT FROM
	    user => SELECT TRUE FROM
	      (vep.versExchProc = NIL) => GOTO truncated;  --already been done
	      (vep.versExchProc[ch] # noError) => GOTO truncated;
	      ENDCASE;  --version exchange and it's okay - do .get again
	    ENDCASE => GOTO truncated;  --wasn't a user connection
	  END;
      END;
      ch.lastSST ← gotSST;  --record the last sst we've seen
      bytesTransferred ← 0;  --reset counter since we modified block

      REPEAT
        exit => sst ← ch.lastSST ← gotSST;  --this is what the client sees
	truncated =>
	  {ch.lastSST ← gotSST; ERROR Courier.Error[truncatedTransfer]};
      ENDLOOP;
    END;

  GetSST: Stream.GetSSTProcedure =
    BEGIN
    sH ← RealStream[sH];  --get the real stream
    RETURN[sH.getSST[sH]];
    END;

  GetTimeout: Stream.GetTimeoutProcedure =
    BEGIN
    sH ← RealStream[sH];  --get the real stream
    RETURN[sH.getTimeout[sH]];
    END;  --GetTimeout
    
  Put: Stream.PutProcedure =
    BEGIN
    sH ← RealStream[sH];  --get the real stream
    sH.put[sH, block, endRecord];
    END;  --Put

  SendAttn: Stream.SendAttentionProcedure =
    BEGIN
    sH ← RealStream[sH];  --get the real stream
    sH.sendAttention[sH, byte];
    END;  --SendAttn
    
  SendNow: Stream.SendNowProcedure =
    BEGIN
    sH ← RealStream[sH];  --get the real stream
    sH.sendNow[sH, endRecord];
    END;  --SendNow

  SetSST: Stream.SetSSTProcedure =
    BEGIN
    sH ← RealStream[sH];  --get the real stream
    IF sst # CourierProtocol.dataSST THEN sH.setSST[sH, sst]
    ELSE ERROR Stream.InvalidOperation;
    END;

  SetTimeout: Stream.SetTimeoutProcedure =
    BEGIN
    sH ← RealStream[sH];  --get the real stream
    sH.setTimeout[sH, waitTime];
    END;  --SetTimeout

  RedirectStream: PUBLIC <<CourierOps>> PROC [
    ch: CourierInternal.ConnectionHandle, block: Stream.Block,
    bytesTransferred: CARDINAL, why: Stream.CompletionCode,
    sst: Stream.SubSequenceType] =
    BEGIN
    WITH h: ch SELECT FROM
      user, server =>
        BEGIN
	rH: CourierInternal.Redirect ← h.redirected;
	IF rH # NIL THEN RETURN;  --we've already done this
	rH ← h.redirected ← CourierInternal.longZone.NEW[
	  CourierInternal.RedirectedObject[bytesTransferred] ← [
	    directGet: h.transFilter.object.get,  --save direct .get proc
	    connection: @h,  --the owning connection
	    why: why,  --the original completion code
	    sst: sst,  --and the observed sst
	    block: , text: ]];
	--copy the captured data into the object
	rH.block ← [LOOPHOLE[@rH.text], 0, bytesTransferred];
	block.stopIndexPlusOne ← block.startIndex + bytesTransferred;
	[] ← ByteBlt.ByteBlt[to: rH.block, from: block];
	h.transFilter.object.get ← RedirectedGet;  --replace get routine
	END;
      ENDCASE;
    END;
  
  RedirectedGet: Stream.GetProcedure =
    BEGIN
    --sH points to the direct stream object with this new Get procedure.
    ch: CourierInternal.UserConnection ← LOOPHOLE[
      LOOPHOLE[sH, CourierInternal.AugmentedStream].back];
    rH: CourierInternal.Redirect ← ch.redirected;
    bytesTransferred ← ByteBlt.ByteBlt[from: rH.block, to: block];  --move data
    sst ← CourierProtocol.dataSST; why ← normal;  --set default values
    rH.block.startIndex ← rH.block.startIndex + bytesTransferred;  --consume ours
    block.startIndex ← block.startIndex + bytesTransferred;  --fill his
    IF block.startIndex = block.stopIndexPlusOne THEN RETURN;  --all normal
    IF rH.block.startIndex = rH.block.stopIndexPlusOne THEN
      BEGIN
      why ← rH.why; sst ← rH.sst;  --he should get these
      ch.transFilter.object.get ← rH.directGet;  --the original .get proc.
      CourierInternal.longZone.FREE[@ch.redirected];  --free up and NIL
      SELECT why FROM
        normal =>
	  BEGIN
	  --his original get was shorter than this one
	  bytes: CARDINAL;
	  [bytes, why, sst] ← sH.get[sH, block, options];  --from real .get proc
	  bytesTransferred ← bytesTransferred + bytes;  --add these bytes on
	  END;
        sstChange =>
          BEGIN
	  --original get ended in sst change - so should this one
	  IF options.signalSSTChange THEN
	    SIGNAL Stream.SSTChange[sst, block.startIndex];
	  END;
	attention =>
	  BEGIN
	  --original get ended with attention - so should this one
	  IF options.signalAttention THEN
	    SIGNAL Stream.Attention[block.startIndex];
	  END;
	ENDCASE;
      END;
    END; 
    
    
  SetBulkStream: PUBLIC PROC [ch: CourierInternal.ConnectionHandle] =
    BEGIN
    ch.object.sH ← @ch.bulkFilter.object;  --that's the one the client gets
    ch.bulkFilter.back ← ch;  --maintain back pointer to owning courier object
    ch.bulkFilter.object ← Stream.defaultObject;  --just fill the slots

    <<
    The following procedures are supported through the filtered stream.
    The other I/O procs (getByte, putWord, etc) are supported by the default
    stream via the Get and Put procs.
    >>
    ch.bulkFilter.object.get ← Get;
    ch.bulkFilter.object.put ← Put;
    ch.bulkFilter.object.setSST ← SetSST;
    ch.bulkFilter.object.getSST ← GetSST;
    ch.bulkFilter.object.getTimeout ← GetTimeout;
    ch.bulkFilter.object.sendNow ← SendNow;
    ch.bulkFilter.object.sendAttention ← SendAttn;
    ch.bulkFilter.object.setTimeout ← SetTimeout;
    ch.bulkFilter.object.waitAttention ← WaitAttn;

    END;  --SetBulkStream
    
  WaitAttn: Stream.WaitAttentionProcedure =
    BEGIN
    sH ← RealStream[sH];  --get the real stream
    RETURN[sH.waitAttention[sH]];
    END;  --WaitAttn
    	
  END....  --CourierImplF.mesa
  
LOG
21-Jul-82 10:09:41  AOF  Created file.
23-Jul-82 10:49:02  AOF  Use raw stream procs when possible.
 4-Aug-82 14:43:58  AOF  Add SetStreamTimeout procedure.
11-Aug-82 10:02:33  AOF  Add EXPORT Courier and pass Get's options to stream.
18-Aug-82 13:13:54  AOF  Capture Courier data consumed by client.
29-Mar-83 13:50:48  AOF  Klamath changes (default stream, ~unbound link).
 6-Apr-83 13:52:07  AOF  Klamath changes (stream.SetTimeout, etc).
29-Jun-84 10:23:39  SMA  Factor Courier from NetworkStreams.
10-Jul-84 10:43:53  SMA  Bulk data factoring.
21-Dec-84 13:12:23  SMA  AugmentedStreams and new interfaces.
10-Jan-85 14:40:51  SMA  SetBulkData does not create new object (it's in ch now).
14-Jan-86 11:37:14  AOF  Cleanup of LOOPHOLES and filtered stream
 4-Nov-87 12:23:50  AOF  Trapping sst changes vs redirected streams, etc
 8-Jan-88 15:43:39  AOF  Allowing for Adobe's version of bulk data