-- File: StreamCoupleImpl.mesa - last edit:
-- AOF                  8-Dec-87  9:35:05
-- Copyright (C) 1983, 1984, 1987 by Xerox Corporation. All rights reserved. 

DIRECTORY
  ByteBlt USING [ByteBlt],
  Heap USING [systemZone],
  Process USING [EnableAborts],
  Stream USING [
    Attention, Block, Byte, CompletionCode, defaultObject, EndOfStream, Handle,
    InputOptions, LongBlock, Object, ShortBlock, SSTChange, SubSequenceType],
  StreamCouple USING [];

StreamCoupleImpl: MONITOR
  IMPORTS ByteBlt, Heap, Process, Stream EXPORTS StreamCouple =

  BEGIN

  --types
  HalfStreamPair: TYPE = LONG POINTER TO HSPObject;

  HSPObject: TYPE = MACHINE DEPENDENT RECORD [
    stream: Stream.Object, data: DataRecord];

  DataRecord: TYPE = RECORD [
    otherStream: Stream.Handle ← NIL,  --the other half of the stream pair
    sstReceived: CONDITION ← [timeout: 100],  --TRUE if sst change rcvd by other
    sstChange: BOOLEAN ← FALSE,
    currentSST: Stream.SubSequenceType ← 1,
    attn: CONDITION ← [timeout: 100],  -- attention flag
    attnQueue: AttnQueue ← [NIL, NIL],  -- queue for attn bytes
    attnStatus: AttnStatus ← none,
    attnAnnounced: BOOLEAN ← FALSE,
    waitForPut: CONDITION ← [timeout: 100],
    endRecord: BOOLEAN ← FALSE,
    blockCondition: CONDITION ← [timeout: 100],  -- change in block status
    blockStatus: {available, inUse, emptied} ← available,  -- status of block
    block: Stream.Block];  -- block for put operation goes here

  AttnQueue: TYPE = RECORD [
    first: AttnList,  -- head of the queue
    last: AttnList];  -- tail of the queue

  AttnList: TYPE = LONG POINTER TO AttnRec;

  AttnRec: TYPE = RECORD [
    next: AttnList,  -- next attn byte in list
    byte: Stream.Byte];  -- attn byte

  AttnStatus: TYPE = {none, awaited, arrived, deliveredOutOfBand, delivered};

  -- constants
  blockNil: Stream.Block = [NIL, 0, 0];

  -- public procedures

  Create: PUBLIC ENTRY PROCEDURE RETURNS [sH1, sH2: Stream.Handle] =
    BEGIN
    ENABLE UNWIND => NULL;
    hsp1: HalfStreamPair ← Heap.systemZone.NEW[HSPObject];
    hsp2: HalfStreamPair ← Heap.systemZone.NEW[HSPObject];

    Process.EnableAborts[@hsp1.data.attn];
    Process.EnableAborts[@hsp2.data.attn];
    Process.EnableAborts[@hsp1.data.blockCondition];
    Process.EnableAborts[@hsp2.data.blockCondition];

    hsp1.stream ← Stream.defaultObject;
    hsp1.stream.get ← GetBlock;
    hsp1.stream.put ← PutBlock;
    hsp1.stream.setSST ← SetSST;
    hsp1.stream.sendAttention ← SendAttention;
    hsp1.stream.waitAttention ← WaitAttention;
    hsp1.stream.delete ← Delete;

    hsp2.stream ← hsp1.stream;
    hsp1.data.otherStream ← @hsp2.stream;
    hsp2.data.otherStream ← @hsp1.stream;

    RETURN[hsp2.data.otherStream, hsp1.data.otherStream];
    END;  -- Create

  -- required stream procedures

  Delete: ENTRY PROCEDURE [sH: Stream.Handle] =
    BEGIN
    ENABLE UNWIND => NULL;
    hsp1, hsp2: HalfStreamPair;
    attn, nextAttn: AttnList;

    [hsp1, hsp2] ← ConvertHandle[sH];

    IF hsp1 # NIL THEN {
      FOR attn ← hsp1.data.attnQueue.first, nextAttn WHILE attn # NIL DO
        nextAttn ← attn.next; Heap.systemZone.FREE[@attn] ENDLOOP;
      Heap.systemZone.FREE[@hsp1]};
    IF hsp2 # NIL THEN hsp2.data.otherStream ← NIL;

    END;  -- Delete

  GetBlock: ENTRY PROCEDURE [
    sH: Stream.Handle, block: Stream.Block, options: Stream.InputOptions]
    RETURNS [
      bytesTransferred: CARDINAL, why: Stream.CompletionCode,
      sst: Stream.SubSequenceType] =
    BEGIN
    ENABLE UNWIND => NULL;
    blockBytes, inputBytes, bytesMoved: CARDINAL ← 0;
    done: BOOLEAN ← FALSE;
    hsp1, hsp2: HalfStreamPair;

    [hsp1, hsp2] ← ConvertHandle[sH];
    bytesTransferred ← 0;
    why ← normal;
    sst ← hsp2.data.currentSST;

    WHILE NOT done DO

      UNTIL hsp2.data.blockStatus = inUse OR hsp2.data.sstChange
        OR hsp2.data.endRecord OR hsp2.data.attnStatus > awaited DO
        WAIT hsp1.data.waitForPut ENDLOOP;

      sst ← hsp2.data.currentSST;
      IF hsp2.data.sstChange THEN
        BEGIN
        hsp2.data.sstChange ← FALSE;
        BROADCAST hsp1.data.sstReceived;
        IF options.signalSSTChange THEN
          SIGNAL Stream.SSTChange[sst, block.startIndex];
        why ← sstChange;
        done ← TRUE;
        END;

      IF hsp2.data.attnStatus > awaited THEN
        BEGIN
        oldStatus: AttnStatus ← hsp2.data.attnStatus;
        hsp2.data.attnStatus ← delivered;
        IF oldStatus = arrived THEN [] ← WaitForAttention[sH];
        hsp2.data.blockStatus ← emptied;
        BROADCAST hsp2.data.blockCondition;
        why ← attention;
        done ← TRUE
        END;

      IF NOT done THEN
        BEGIN
        blockBytes ← block.stopIndexPlusOne - block.startIndex;
        inputBytes ←
          hsp2.data.block.stopIndexPlusOne - hsp2.data.block.startIndex;

        IF inputBytes > 0 THEN
          BEGIN
          bytesMoved ← ByteBlt.ByteBlt[to: block, from: hsp2.data.block];
          block.startIndex ← block.startIndex + bytesMoved;
          hsp2.data.block.startIndex ← hsp2.data.block.startIndex + bytesMoved;
          bytesTransferred ← bytesTransferred + bytesMoved
          END;

        IF bytesMoved = blockBytes THEN {why ← normal; done ← TRUE};

        IF hsp2.data.endRecord THEN BEGIN why ← endOfStream; done ← TRUE; END;

        IF bytesMoved = inputBytes OR hsp2.data.endRecord THEN
          BEGIN
          hsp2.data.blockStatus ← emptied;
          BROADCAST hsp2.data.blockCondition;
          END;

        IF bytesMoved < blockBytes AND options.signalLongBlock THEN
          SIGNAL Stream.LongBlock[block.startIndex];
        IF bytesMoved < inputBytes AND options.signalShortBlock THEN
          ERROR Stream.ShortBlock;

        END;

      ENDLOOP;

    IF why = endOfStream AND options.signalEndOfStream THEN
      SIGNAL Stream.EndOfStream[bytesTransferred]
    END;  -- GetBLock

  PutBlock: ENTRY PROCEDURE [
    sH: Stream.Handle, block: Stream.Block, endRecord: BOOLEAN ← FALSE] =
    BEGIN
    ENABLE UNWIND => NULL;
    hsp1, hsp2: HalfStreamPair;

    -- IF block=blockNil THEN RETURN; --  -- stop gap solution to nullify SendNow function

    [hsp1, hsp2] ← ConvertHandle[sH];
    InternalPutBlock[
      hsp1, hsp2, block, endRecord ! Stream.Attention => GOTO ReturnAttention]
    EXITS ReturnAttention => RETURN WITH ERROR Stream.Attention[0]
    END;  -- PutBlock

  InternalPutBlock: INTERNAL PROCEDURE [
    hsp1, hsp2: HalfStreamPair, block: Stream.Block,
    endRecord: BOOLEAN ← FALSE] =
    BEGIN
    attentionReceived: BOOLEAN ← FALSE;
    UNTIL hsp1.data.blockStatus = available DO
      WAIT hsp1.data.blockCondition ENDLOOP;

    hsp1.data.block ← block;
    hsp1.data.blockStatus ← inUse;
    IF endRecord THEN hsp1.data.endRecord ← TRUE;
    BROADCAST hsp2.data.waitForPut;

    UNTIL hsp1.data.blockStatus = emptied OR hsp2.data.attnStatus > awaited DO
      WAIT hsp1.data.blockCondition ENDLOOP;

    IF hsp2.data.attnStatus > awaited AND NOT hsp2.data.attnAnnounced THEN
      BEGIN hsp2.data.attnAnnounced ← TRUE; attentionReceived ← TRUE; END;

    hsp1.data.blockStatus ← available;
    BROADCAST hsp2.data.blockCondition;

    IF attentionReceived THEN SIGNAL Stream.Attention[0];

    END;  -- InternalPutBlock

  SetSST: ENTRY PROCEDURE [sH: Stream.Handle, sst: Stream.SubSequenceType] =
    BEGIN
    ENABLE UNWIND => NULL;
    hsp1, hsp2: HalfStreamPair;

    [hsp1, hsp2] ← ConvertHandle[sH];
    WHILE hsp1.data.sstChange DO WAIT hsp2.data.sstReceived ENDLOOP;

    IF sst # hsp1.data.currentSST THEN
      BEGIN
      hsp1.data.currentSST ← sst;
      hsp1.data.sstChange ← TRUE;

      BROADCAST hsp2.data.waitForPut;
      END;

    END;  -- SetSST

  SendAttention: ENTRY PROCEDURE [sH: Stream.Handle, byte: Stream.Byte] =
    BEGIN
    ENABLE UNWIND => NULL;
    attnList: AttnList ← Heap.systemZone.NEW[AttnRec];
    bytes: PACKED ARRAY [0..2) OF Stream.Byte ← [byte, 0];
    hsp1, hsp2: HalfStreamPair;

    [hsp1, hsp2] ← ConvertHandle[sH];

    attnList.next ← NIL;
    attnList.byte ← byte;

    IF hsp1.data.attnQueue.first = NIL THEN
      hsp1.data.attnQueue.first ← hsp1.data.attnQueue.last ← attnList
    ELSE {
      hsp1.data.attnQueue.last.next ← attnList;
      hsp1.data.attnQueue.last ← attnList};
    hsp1.data.attnStatus ← arrived;
    BROADCAST hsp2.data.attn;

    InternalPutBlock[hsp1, hsp2, [@bytes, 0, 1] ! Stream.Attention => CONTINUE];

    END;  -- SendAttention

  WaitAttention: ENTRY PROCEDURE [sH: Stream.Handle]
    RETURNS [byte: Stream.Byte] =
    BEGIN
    ENABLE UNWIND => NULL;
    hsp1, hsp2: HalfStreamPair;
    [hsp1, hsp2] ← ConvertHandle[sH];
    SELECT hsp2.data.attnStatus FROM
      none => hsp2.data.attnStatus ← awaited;
      arrived => NULL;
      ENDCASE => RETURN[0];
    byte ← WaitForAttention[sH]
    END;  -- WaitAttention

  WaitForAttention: INTERNAL PROCEDURE [sH: Stream.Handle]
    RETURNS [byte: Stream.Byte] =
    BEGIN
    attnList: AttnList;
    hsp1, hsp2: HalfStreamPair;

    [hsp1, hsp2] ← ConvertHandle[sH];

    WHILE hsp2.data.attnQueue.first = NIL DO WAIT hsp1.data.attn ENDLOOP;

    byte ← hsp2.data.attnQueue.first.byte;

    IF hsp2.data.attnQueue.first.next = NIL THEN {
      Heap.systemZone.FREE[@hsp2.data.attnQueue.first];
      hsp2.data.attnQueue ← [NIL, NIL]}
    ELSE
      BEGIN
      attnList ← hsp2.data.attnQueue.first.next;
      Heap.systemZone.FREE[@hsp2.data.attnQueue.first];
      hsp2.data.attnQueue.first ← attnList;
      END;

    IF hsp2.data.attnStatus <= arrived THEN
      hsp2.data.attnStatus ← deliveredOutOfBand;
    BROADCAST hsp1.data.blockCondition  -- notify a Put that is in progress.

    END;  -- WaitForAttention


  -- private procedures

  ConvertHandle: PROCEDURE [sH: Stream.Handle]
    RETURNS [hsp1, hsp2: HalfStreamPair] =
    BEGIN
    hsp1 ← LOOPHOLE[sH, HalfStreamPair];
    IF hsp1 = NIL THEN ERROR;

    IF hsp1.data.otherStream = NIL THEN hsp2 ← NIL
    ELSE hsp2 ← LOOPHOLE[hsp1.data.otherStream, HalfStreamPair];

    END;  -- ConvertHandle

  END...

LOG

20-Dec-84 16:23:19  AOF  Post Klamath
 8-Dec-87  9:34:52  AOF  No changes - just changing the time stamp