-- File: PupByteStreams.mesa - last edit:
-- AOF                 17-Feb-88 15:26:30
-- SMA                 14-Dec-84 17:47:26
-- Copyright (C) 1983, 1986, 1988 by Xerox Corporation. All rights reserved. 

DIRECTORY
  Environment USING [Byte],
  Runtime USING [GlobalFrame],
  ByteBlt USING [ByteBlt],
  Stream USING [
    Byte, Word, Block, CompletionCode, defaultInputOptions, defaultObject,
    Handle, InputOptions, LongBlock, Object, ShortBlock, SSTChange,
    SubSequenceType, TimeOut],
  CommFlags USING [doDebug],
  Driver USING [Glitch],
  PrincOpsMinus USING [NewSelf],
  PupPktDefs USING [
    PupPktStream, PupPktStreamAbort, PupPktStreamDestroy, PupPktStreamMake],
  PupStream USING [PupOpenMode],
  PupDefs USING [
    GetBuffer, ReturnBuffer, GetPupContentsBytes, SetPupContentsBytes,
    Pair, PupAddress, PupBuffer, PupSocketID, Tocks],
  PupPktOps USING [pupBuffers],
  PupTypes USING [fillInSocketID];

PupByteStreams: MONITOR
  IMPORTS
    PrincOpsMinus, Runtime, Stream, ByteBlt, Driver, PupPktDefs, PupDefs,
    PupPktOps
  EXPORTS PupStream =
  BEGIN OPEN PupDefs;

  -- Manager data
  free: LONG POINTER TO FRAME[PupByteStreams] ←
    LOOPHOLE[Runtime.GlobalFrame[LOOPHOLE[PupByteStreamCreate]]];
  next: LONG POINTER TO FRAME[PupByteStreams] ← NIL;

  GetInstance: ENTRY PROC RETURNS [him: LONG POINTER TO FRAME[PupByteStreams]] =
    BEGIN
    IF free = NIL THEN
      BEGIN
      him ← --NEW PupByteStreams-- PrincOpsMinus.NewSelf[];
      START him;  -- Do initialization code
      RETURN;
      END;
    him ← free;
    free ← free.next;
    END;

  FreeInstance: ENTRY PROC[him: LONG POINTER TO FRAME[PupByteStreams]] =
    {him.next ← free; free ← him};

  myPupByteStream: Stream.Object ← [
    options:, getByte: GetByte, putByte: PutByte, getWord: GetWord,
    putWord: PutWord, get: GetBlock, put: PutBlock, setSST: SendMark,
    sendAttention: SendAttention, waitAttention: WaitAttention, delete:,
    setTimeout: SetTimeout,
    getTimeout: GetTimeout,
    getPosition: Stream.defaultObject.getPosition,
    setPosition: Stream.defaultObject.setPosition,
    sendNow: SendNow, clientData: NIL];

  myPs: PupPktDefs.PupPktStream;

  -- Beware, things don't get reinitialized when a module is reused
  inputBuffer: PupBuffer;
  inputFinger: CARDINAL;
  inputBufferSize: CARDINAL;
  inputSST: Stream.SubSequenceType;
  outputBuffer: PupBuffer;
  outputFinger: CARDINAL;  -- 0 if aData/aMark sent
  outputBufferSize: CARDINAL;

  HandleLooksLikeGarbage: PUBLIC ERROR = CODE;

  LeftAndRight: TYPE = MACHINE DEPENDENT RECORD [left, right: Environment.Byte];

  PupByteStreamCreate: PUBLIC PROC[remote: PupAddress, ticks: Tocks]
    RETURNS [Stream.Handle] =
    BEGIN
    RETURN[
      PupByteStreamMake[PupTypes.fillInSocketID, remote, ticks, sendRfc, [0, 0]]];
    END;

  PupByteStreamMake: PUBLIC PROC[
    local: PupSocketID, remote: PupAddress, ticks: Tocks,
    mode: PupStream.PupOpenMode, id: Pair] RETURNS [Stream.Handle] =
    BEGIN
    newPs: PupPktDefs.PupPktStream;
    him: LONG POINTER TO FRAME[PupByteStreams];
    -- It is ok to UNWIND here if PupPktStreamMake doesn't work.
    newPs ← PupPktDefs.PupPktStreamMake[local, remote, ticks, mode, id];
    him ← GetInstance[];
    -- Initialization
    him.myPs ← newPs;
    him.myPupByteStream.options ← Stream.defaultInputOptions;
    him.myPupByteStream.delete ← Destroy;
    him.inputBuffer ← NIL;
    him.inputSST ← 0;
    him.outputBuffer ← NIL;
    him.outputFinger ← 0;
    him.outputBufferSize ← 0;
    RETURN[@him.myPupByteStream];
    END;

  PupByteStreamAbort: PUBLIC --ENTRY-- PROC[bs: Stream.Handle, e: LONG STRING] =
    BEGIN
    ENABLE UNWIND => NULL;
    him: LONG POINTER TO FRAME[PupByteStreams] =
      LOOPHOLE[Runtime.GlobalFrame[LOOPHOLE[bs.put]]];
    IF CommFlags.doDebug AND bs.delete # Destroy THEN
      Driver.Glitch[HandleLooksLikeGarbage];
    PupPktDefs.PupPktStreamAbort[him.myPs, e];
    END;

  Destroy: PUBLIC --ENTRY-- PROC[bs: Stream.Handle] =
    BEGIN
    ENABLE UNWIND => NULL;
    him: LONG POINTER TO FRAME[PupByteStreams] =
      LOOPHOLE[Runtime.GlobalFrame[LOOPHOLE[bs.put]]];
    IF CommFlags.doDebug AND bs.delete # Destroy THEN
      Driver.Glitch[HandleLooksLikeGarbage];
    bs.delete ← NIL;
    PupPktDefs.PupPktStreamDestroy[him.myPs];
    IF him.inputBuffer # NIL THEN ReturnBuffer[@him.inputBuffer];
    IF him.outputBuffer # NIL THEN ReturnBuffer[@him.outputBuffer];
    FreeInstance[him];
    END;


  SendAttention: PROC[sH: Stream.Handle, byte: Stream.Byte] =
    BEGIN myPs.sendAttention[]; END;

  WaitAttention: PROC[sH: Stream.Handle] RETURNS [Stream.Byte] =
    BEGIN myPs.waitForAttention[]; RETURN[0]; END;

  GetByte: --ENTRY-- PROC[sH: Stream.Handle] RETURNS [byte: Stream.Byte] =
    BEGIN
    ENABLE UNWIND => NULL;
    IF inputBuffer # NIL AND inputFinger + 2 < inputBufferSize THEN
      BEGIN  -- "+2" lets GetBlock give back the buffer if we take the last byte
      byte ← inputBuffer.pup.pupBytes[inputFinger];
      inputFinger ← inputFinger + 1;
      RETURN;
      END
    ELSE
      BEGIN
      array: PACKED ARRAY [0..1] OF Stream.Byte;
      [] ← sH.get[sH, [@array, 0, 1], [FALSE, FALSE, FALSE, TRUE, TRUE, TRUE]];
      RETURN[array[0]];
      END;
    END;
    
  GetTimeout: PROC [sH: Stream.Handle] RETURNS [waitTime: LONG CARDINAL] =
    {RETURN [myPs.getTimeout[]]};

  GetWord: --ENTRY-- PROC[sH: Stream.Handle] RETURNS [word: Stream.Word] =
    BEGIN OPEN w: LOOPHOLE[word, LeftAndRight];
    w.left ← GetByte[sH];
    w.right ← GetByte[sH];
    END;

  GetBlock: --ENTRY-- PROC[
    sH: Stream.Handle, block: Stream.Block, options: Stream.InputOptions]
    RETURNS [
      bytesTransferred: CARDINAL, why: Stream.CompletionCode,
      sst: Stream.SubSequenceType] =
    BEGIN
    ENABLE UNWIND => NULL;
    input: Stream.Block;
    moved: CARDINAL;
    bytesTransferred ← 0;
    sst ← inputSST;
    why ← normal;
    WHILE block.startIndex < block.stopIndexPlusOne DO
      UNTIL inputBuffer # NIL DO
        inputFinger ← 0;
        inputBuffer ← myPs.get[];
        IF inputBuffer = NIL THEN SIGNAL Stream.TimeOut[block.startIndex]
        ELSE
          IF inputBuffer.pup.pupType = mark OR inputBuffer.pup.pupType = aMark THEN
            BEGIN
            sst ← inputSST ← inputBuffer.pup.pupBytes[0];
            ReturnBuffer[@inputBuffer];
            IF options.signalSSTChange THEN
              SIGNAL Stream.SSTChange[inputSST, block.startIndex]
            ELSE BEGIN why ← sstChange; RETURN; END;
            END;
        ENDLOOP;
      inputBufferSize ← GetPupContentsBytes[inputBuffer];
      input ← [
        blockPointer: --Krock in Environment-- LOOPHOLE[
	  @inputBuffer.pup.pupBytes], startIndex: inputFinger,
        stopIndexPlusOne: inputBufferSize];
      moved ← ByteBlt.ByteBlt[block, input];
      bytesTransferred ← bytesTransferred + moved;
      block.startIndex ← block.startIndex + moved;
      inputFinger ← inputFinger + moved;
      IF inputFinger = inputBufferSize THEN BEGIN ReturnBuffer[@inputBuffer]; END;
      IF inputBuffer = NIL AND block.startIndex < block.stopIndexPlusOne
        AND options.signalLongBlock THEN
        BEGIN SIGNAL Stream.LongBlock[block.startIndex]; END;
      IF inputBuffer = NIL AND options.terminateOnEndRecord THEN
        BEGIN why ← endRecord; EXIT; END;
      ENDLOOP;
    IF inputBuffer # NIL AND options.signalShortBlock THEN
      BEGIN ERROR Stream.ShortBlock; END;
    END;

  PutByte: --ENTRY-- PROC[sH: Stream.Handle, byte: Stream.Byte] =
    BEGIN
    ENABLE UNWIND => NULL;
    IF outputBuffer # NIL AND outputFinger + 2 < outputBufferSize THEN
      BEGIN  -- "+2" lets PutBlock flush the buffer if we fill the last byte
      outputBuffer.pup.pupBytes[outputFinger] ← byte;
      outputFinger ← outputFinger + 1;
      RETURN;
      END
    ELSE
      BEGIN
      array: PACKED ARRAY [0..1] OF Stream.Byte ← [byte, ];
      PutBlock[sH, [@array, 0, 1], FALSE];
      END;
    END;

  PutWord: --ENTRY-- PROC[sH: Stream.Handle, word: Stream.Word] =
    BEGIN OPEN w: LOOPHOLE[word, LeftAndRight];
    sH.putByte[sH, w.left];
    sH.putByte[sH, w.right];
    END;

  PutBlock: --ENTRY-- PROC[
    sH: Stream.Handle, block: Stream.Block, endRecord: BOOLEAN] =
    BEGIN
    ENABLE UNWIND => NULL;
    output: Stream.Block;
    moved: CARDINAL;
    IF outputBufferSize = 0 THEN outputBufferSize ← myPs.getSenderSizeLimit[];
    WHILE block.startIndex < block.stopIndexPlusOne DO
      IF outputFinger = outputBufferSize THEN FlushOutputBuffer[];
      IF outputBuffer = NIL THEN
        BEGIN
	 outputBuffer ← PupDefs.GetBuffer[PupPktOps.pupBuffers, send];
	outputBuffer.pup.pupType ← data;
        outputFinger ← 0;
        END;
      output ← [
        blockPointer: --Krock-- LOOPHOLE[
	@outputBuffer.pup.pupBytes], startIndex: outputFinger,
        stopIndexPlusOne: outputBufferSize];
      moved ← ByteBlt.ByteBlt[output, block];
      block.startIndex ← block.startIndex + moved;
      outputFinger ← outputFinger + moved;
      ENDLOOP;
    IF (endRecord AND outputFinger # 0) OR outputFinger = outputBufferSize
      THEN
      BEGIN
      IF outputBuffer = NIL THEN
        BEGIN 
	outputBuffer ← PupDefs.GetBuffer[PupPktOps.pupBuffers, send];
	outputBuffer.pup.pupType ← data;
	outputFinger ← 0;
        END;
      IF endRecord THEN outputBuffer.pup.pupType ← aData;
      FlushOutputBuffer[];
      IF endRecord THEN outputFinger ← 0;
      END;
    END;
  
  SendNow: PROC[sH: Stream.Handle, endRecord: BOOLEAN ← TRUE] =
    BEGIN
    IF outputBuffer = NIL THEN
      BEGIN
      outputBuffer ← PupDefs.GetBuffer[PupPktOps.pupBuffers, send];
      outputBuffer.pup.pupType ← data;
      outputFinger ← 0;
      END;
    IF endRecord THEN outputBuffer.pup.pupType ← aData
    ELSE outputBuffer.pup.pupType ← data;
    FlushOutputBuffer[];
    END;

  SendMark: --ENTRY-- PROC[sH: Stream.Handle, sst: Stream.SubSequenceType] =
    BEGIN
    ENABLE UNWIND => NULL;
    FlushOutputBuffer[];
    outputFinger ← 0;
    myPs.putMark[sst];
    END;
    
  SetTimeout: PROC [sH: Stream.Handle, waitTime: LONG CARDINAL] =
    BEGIN
    myPs.setTimeout[waitTime];
    END;  --SetTimeout

  FlushOutputBuffer: PROC =
    BEGIN
    b: PupBuffer;  -- don't leave outputBuffer dangling in case of StreamClosing
    IF outputBuffer = NIL THEN RETURN;
    b ← outputBuffer;
    outputBuffer ← NIL;
    SetPupContentsBytes[b, outputFinger];
    myPs.put[b];
    END;

  ReturnBuffer: PROC[p: LONG POINTER TO PupBuffer] =
    {b: PupBuffer; b ← p↑; p↑ ← NIL; PupDefs.ReturnBuffer[b]};


  END.

LOG

Time: 14-Dec-81 10:55:04  By: AOF  Action: Adjust to new stream interface
Time:  6-Apr-83 13:53:47  By: SMA  Action: Update for Klamath integration.
Time: 19-May-83 10:11:02  By: SMA  Action: Convert to new BufferMgr.
Time: 20-Jun-83 13:15:19  By: SMA  Action: Set pupType for data buffer.
Time: 12-Jul-83 11:47:03  By: AOF  Action: 32-bit procs.
Time: 13-Dec-84 13:40:10  By: SMA  SendNow will flush if not endRecord.
Time: 14-Dec-84 17:47:49  By: SMA  Added GetTimeout and SetTimeout.
Time:  2-Sep-86 14:47:38  By: AOF  New Buffer mgmt.
Time: 17-Feb-88 15:23:25  By: AOF  Fix for compiler bug NEWing self.