-- Copyright (C) 1985, 1986  by Xerox Corporation. All rights reserved. 
-- PipeImpl.mesa
-- NFS   21-Aug-85 10:39:27

DIRECTORY
  Environment USING [Byte],
  Heap USING [Create],
  Pipe USING [defaultBufferSize, NWords],
  Process USING [EnableAborts],
  Stream USING [
    defaultObject, DeleteProcedure, GetByteProcedure, GetProcedure, Handle,
    InvalidOperation, Object, PutByteProcedure];
PipeImpl: MONITOR LOCKS h USING h: Handle
  IMPORTS Heap, Process, Stream EXPORTS Pipe =
  {

  Handle: TYPE = LONG POINTER TO Object;
  Object: PUBLIC TYPE = MONITORED RECORD [
    producer, consumer: PipeStreamHandle,
    inPos, outPos: BufferPosition,
    BytesInBuffer, SpaceInBuffer: CONDITION,
    buffer: PACKED SEQUENCE bufferSize: CARDINAL OF Environment.Byte];

  BufferPosition: TYPE = CARDINAL;

  PipeStreamHandle: TYPE = LONG POINTER TO PipeStreamObject;

  PipeStreamObject: TYPE = RECORD [streamObject: Stream.Object, pipe: Handle];


  z: UNCOUNTED ZONE = Heap.Create[initial: 3];  << 2 pages for 1 default sized
              buffer, and an extra page for pipe overhead.>>

  Create: PUBLIC PROCEDURE [bufferSize: Pipe.NWords ← Pipe.defaultBufferSize]
    RETURNS [h: Handle, producer, consumer: Stream.Handle] = {
    prod: PipeStreamHandle ← z.NEW[PipeStreamObject];
    cons: PipeStreamHandle ← z.NEW[PipeStreamObject];
    h ← z.NEW[Object [bufferSize]];
    h.producer ← prod;
    h.consumer ← cons;
    h.inPos ← h.outPos ← BufferPosition.FIRST;
    prod.pipe ← cons.pipe ← h;
    prod.streamObject ← cons.streamObject ← Stream.defaultObject;
    prod.streamObject.putByte ← WriteByteToBuffer;
    prod.streamObject.getByte ← ErrorGetByte;
    prod.streamObject.delete ← cons.streamObject.delete ← NoOpDelete;
    cons.streamObject.getByte ← ReadByteFromBuffer;
    cons.streamObject.get ← ReadFromBuffer;
    cons.streamObject.putByte ← ErrorPutByte;
    Process.EnableAborts[@h.BytesInBuffer];
    Process.EnableAborts[@h.SpaceInBuffer];
    RETURN[h, LOOPHOLE[prod], LOOPHOLE[cons]];
    };

  Delete: PUBLIC PROCEDURE [h: Handle] = {
    z.FREE[@h.producer]; z.FREE[@h.consumer]; z.FREE[@h]; };

  WriteByteToBuffer: Stream.PutByteProcedure = {
    LockedWriteByte: ENTRY PROC [h: Handle] = {
      WHILE BufferFull[h] DO WAIT h.SpaceInBuffer; ENDLOOP;
      h.inPos ← (h.inPos + 1) MOD h.bufferSize;
      h[h.inPos] ← byte;
      NOTIFY h.BytesInBuffer;
      };

    LockedWriteByte[LOOPHOLE[sH, PipeStreamHandle].pipe];
    };



  ReadByteFromBuffer: Stream.GetByteProcedure = {
    LockedReadByte: ENTRY PROC [h: Handle] = {
      WHILE BufferEmpty[h] DO WAIT h.BytesInBuffer; ENDLOOP;
      h.outPos ← (h.outPos + 1) MOD h.bufferSize;
      byte ← h[h.outPos];
      NOTIFY h.SpaceInBuffer;
      };

    LockedReadByte[LOOPHOLE[sH, PipeStreamHandle].pipe];
    };

  ReadFromBuffer: Stream.GetProcedure = {
    LockedRead: ENTRY PROC [h: Handle] = {
      WHILE BufferEmpty[h] DO WAIT h.BytesInBuffer; ENDLOOP;
      WHILE block.startIndex < block.stopIndexPlusOne DO
        IF BufferEmpty[h] THEN {why ← endRecord; RETURN};
        h.outPos ← (h.outPos + 1) MOD h.bufferSize;
        block.blockPointer[block.startIndex] ← h[h.outPos];
        block.startIndex ← block.startIndex.SUCC;
        bytesTransferred ← bytesTransferred.SUCC;
        ENDLOOP;
      };
    bytesTransferred ← sst ← 0;
    why ← normal;
    LockedRead[LOOPHOLE[sH, PipeStreamHandle].pipe];
    };


  BufferFull: PROCEDURE [h: Handle] RETURNS [BOOLEAN] = INLINE {
    RETURN[((h.inPos + 1) MOD h.bufferSize) = h.outPos]; };

  BufferEmpty: PROCEDURE [h: Handle] RETURNS [BOOLEAN] = INLINE {
    RETURN[h.inPos = h.outPos]; };

  NoOpDelete: Stream.DeleteProcedure = {};
  ErrorPutByte: Stream.PutByteProcedure = {ERROR Stream.InvalidOperation; };
  ErrorGetByte: Stream.GetByteProcedure = {ERROR Stream.InvalidOperation; };

  GetProducer: PUBLIC PROCEDURE [h: Handle] RETURNS [Stream.Handle] = {
    RETURN[LOOPHOLE[h.producer]]; };

  GetConsumer: PUBLIC PROCEDURE [h: Handle] RETURNS [Stream.Handle] = {
    RETURN[LOOPHOLE[h.consumer]]; };

  }.