-- File: NSDataStreamImpl.mesa - last edit:
-- AOF                  9-Dec-87 19:59:46
-- Copyright (C) 1984, 1985, 1987 by Xerox Corporation. All rights reserved. 

DIRECTORY
  BulkDataTransfer USING [
    Descriptor, immediateDescriptor, nullDescriptor, Sink, Source],
  Courier USING [Description, Error, Handle, SystemElement],
  CourierInternal USING [longZone],
  CourierOps USING [
    StackBlockHandle, StackBlockPush, stackPageLength, StackBlockPop],
  Environment USING [Byte, bytesPerPage],
  NSDataStream USING [
    Couple, ErrorCode, Handle, SinkStream, Sink, Source, SourceStream, Ticket],
  Process USING [Abort, CancelAbort, GetCurrent, Pause, SecondsToTicks, Yield],
  Stream USING [
    Attention, Block, Byte, CompletionCode, defaultInputOptions, defaultObject,
    EndOfStream,  InputOptions, GetProcedure, Handle, Object, PutProcedure,
    SSTChange, SubSequenceType, TimeOut],
  StreamCouple USING [Create];

NSDataStreamImpl: MONITOR
  IMPORTS Courier, CourierInternal, CourierOps, Process, Stream, StreamCouple
  EXPORTS NSDataStream =
  BEGIN

  ---- ---- ---- ---- ---- ---- ----  ----
  -- Types
  ---- ---- ---- ---- ---- ---- ----  ----

  AbortStatus: TYPE = {none, bySender, byReceiver};
  Direction: TYPE = {send, receive};

  DS: TYPE = LONG POINTER TO DSObject;
  DSObject: TYPE = RECORD [
    object: Stream.Object,
    baseStream: Stream.Handle,
    clientData: LONG UNSPECIFIED ← LONG[NIL],
    deleteProc: PROC [Stream.Handle, LONG UNSPECIFIED] ← NIL,
    deleteWithStatusProc: PROC [Stream.Handle, Status] ← NIL,
    id: CARDINAL,
    activated, endOfStreamEncountered, deleted, notifiedOfAbort, local,
      attentionByteReceivedInBand, attentionByteReceivedOutOfBand,
      courierRejectSeen: BOOLEAN ← FALSE,
    aborted: AbortStatus ← none,
    direction: Direction,
    attentionWaiter: PROCESS RETURNS [BOOLEAN] ← NIL,
    rejectWatcher: PROCESS RETURNS [BOOLEAN] ← NIL,
    attentionByte: Stream.Byte ← NULL,
    streamOpProcess: PROCESS ← NIL,
    seal: Seal ← okay];

  Seal: TYPE = MACHINE DEPENDENT{(0), okay(1987), busted, (LAST[NATURAL])}; 

  ID: TYPE = CARDINAL;
  Side: TYPE = {this, that};

  Status: TYPE = RECORD [aborted: BOOLEAN];
  StreamPair: TYPE = ARRAY Side OF Stream.Handle;
  StreamRequest: TYPE = LONG POINTER TO StreamRequestObject;
  StreamRequestObject: TYPE = RECORD [
    id: ID,
    ds: DS,
    transition: CONDITION,
    variant: SELECT type: StreamRequestState FROM
      requested => [direction: Direction],
      registeredCancelled => [status: Status, cH: Courier.Handle],
      registeredFree => [status: Status],
      registered => [
        systemElement: Courier.SystemElement,
        cH: Courier.Handle,
        direction: Direction],
      registeredWaiting => [
        ticket: NSDataStream.Ticket,
        cH: Courier.Handle,
        other: StreamRequest ← NIL],
      registeredInUse => [stream: Stream.Handle, other: StreamRequest ← NIL],
      inUse, free => [stream: Stream.Handle],
      ENDCASE];
  StreamRequestState: TYPE = {
    requested,  -- one end local
    registered,  -- one end remote
    registeredWaiting,  -- one end local, the other remote, no stream announced
    registeredInUse,  -- local-remote stream established
    registeredCancelled,  -- local-remote cancelled
    registeredFree,  -- local-remote: remote abort or end-of-stream received
    inUse,  -- local stream established (both ends)
    free};  -- one end of local stream deleted
  SRList: TYPE = LONG POINTER TO SRObject;
  SRObject: TYPE = RECORD [first: StreamRequest, rest: SRList];
  SRRef: TYPE = LONG POINTER TO SRList;

  CancelStreamAction: TYPE = RECORD [
    SELECT type: * FROM
    none => NULL, waitForDeletion, terminate => [ds: DS], ENDCASE];

  ---- ---- ---- ---- ---- ---- ----  ----
  -- Constants & Variables
  ---- ---- ---- ---- ---- ---- ----  ----

  currentID: ID ← 0;
  dataSST: Stream.SubSequenceType = 1;
  noProcess: PROCESS = LOOPHOLE[LAST[INTEGER]];
  defaultTimeout: LONG CARDINAL = LAST[LONG CARDINAL];
  immediateTicket: NSDataStream.Ticket = LOOPHOLE[
    BulkDataTransfer.immediateDescriptor];
  nullTicket: NSDataStream.Ticket = LOOPHOLE[
    BulkDataTransfer.nullDescriptor];

  srList: SRList ← NIL;
  srListProcs: RECORD [
    Find: PROCEDURE [list: SRRef, element: StreamRequest] RETURNS [SRRef],
    Free: PROCEDURE [list: SRRef, zone: UNCOUNTED ZONE ← NIL],
    New: PROCEDURE [element: StreamRequest, list: SRList ← NIL] RETURNS [SRList]];
  ticketVariantSizes: ARRAY [0..4) OF CARDINAL ← [
    SIZE[null BulkDataTransfer.Descriptor], SIZE[
    immediate BulkDataTransfer.Descriptor], SIZE[
    passive BulkDataTransfer.Descriptor], SIZE[
    active BulkDataTransfer.Descriptor]];
  desTicketVariantSizes: LONG DESCRIPTOR FOR ARRAY CARDINAL OF CARDINAL ←
    DESCRIPTOR[BASE[ticketVariantSizes], LENGTH[ticketVariantSizes]];

  ---- ---- ---- ---- ---- ---- ----  ----
  --  SIGNALS
  ---- ---- ---- ---- ---- ---- ----  ----

  Aborted: PUBLIC ERROR = CODE;

  Error: PUBLIC ERROR [errorCode: NSDataStream.ErrorCode] = CODE;

  InternalError: ERROR = CODE;

  ---- ---- ---- ---- ---- ---- ----  ----
  --  PUBLIC PROCEDURES
  ---- ---- ---- ---- ---- ---- ----  ----

  Abort: PUBLIC PROCEDURE [stream: NSDataStream.Handle] =
     {InitiateAbort[GetActiveDS[stream ! Aborted => CONTINUE]]};

  AnnounceStream: PUBLIC PROCEDURE [cH: Courier.Handle] =
    BEGIN
    thisDS, otherDS: DS;
    cancelled: BOOLEAN;
    cH.sH.setTimeout[cH.sH, defaultTimeout];
    [thisDS, otherDS, cancelled] ← FillInStream[cH];
    IF thisDS # NIL THEN {
      IF cancelled THEN TerminateDataStream[thisDS];
      IF otherDS # NIL THEN
        CopyStreamToStream[
          to: IF thisDS.direction = send THEN thisDS ELSE otherDS,
          from: IF thisDS.direction = receive THEN thisDS ELSE otherDS];
      [] ← WaitForDeletion[thisDS]}
    END;

  <<
  AssertLocal is like a Get/Put of 0 bytes, except it won't block when
  the stream is not yet activated and it won't raise Aborted.
  >>
  AssertLocal: PUBLIC PROCEDURE [stream: NSDataStream.Handle] =
    {[] ← GetActiveDS[stream, TRUE]};

  CancelTicket: PUBLIC PROCEDURE [
    ticket: NSDataStream.Ticket, cH: Courier.Handle] =
    BEGIN
    IF ticket # nullTicket THEN
      BEGIN
      action: CancelStreamAction = CancelStream[ticket, cH];
      WITH a: action SELECT FROM
        none => NULL;
        waitForDeletion => [] ← WaitForDeletion[a.ds];
        terminate => TerminateDataStream[a.ds];
        ENDCASE;
      END
    END;

  CreateCouple: PUBLIC PROCEDURE RETURNS [couple: NSDataStream.Couple] =
    BEGIN
    id: ID ← NewID[];
    couple.source ← [InternalCreate[NIL, receive, id]];
    couple.sink ← [InternalCreate[NIL, send, id]];
    END;

  DescribeTicket: PUBLIC Courier.Description --[notes: Notes]--  =
    BEGIN
    parameters: LONG POINTER TO BulkDataTransfer.Descriptor = notes.noteSize[
      size: SIZE[BulkDataTransfer.Descriptor]];
    notes.noteChoice[
      site: parameters, size: SIZE[BulkDataTransfer.Descriptor],
      variant: desTicketVariantSizes];
    END;

  Find: INTERNAL PROC[list: SRRef, element: StreamRequest] RETURNS [SRRef] =
    BEGIN
    FOR list ← list, @list.rest UNTIL list↑ = NIL DO
      IF list.first=element THEN RETURN[list]
      ENDLOOP;
    RETURN[NIL]
    END;  --Find 
  
  Free: INTERNAL PROC[list: SRRef, zone: UNCOUNTED ZONE ← NIL] =
    BEGIN
    handle: SRList ← list↑;
    IF zone # NIL[UNCOUNTED ZONE] THEN
      zone.FREE[@LOOPHOLE[list.first, LONG POINTER]];
    list↑ ← list.rest;
    CourierInternal.longZone.FREE[@handle]
    END;  --Free
  
  New: INTERNAL PROC[element: StreamRequest, list: SRList ← NIL]
    RETURNS [handle: SRList] =
    BEGIN
    handle ← CourierInternal.longZone.NEW[SRObject];
    handle↑ ← [element, list]
    END;  --New
  
  OpenSink: PUBLIC PROCEDURE [ticket: NSDataStream.Ticket, cH: Courier.Handle]
    RETURNS [NSDataStream.SinkStream] =
    BEGIN
    sink: BulkDataTransfer.Sink ← LOOPHOLE[ticket];
    WITH sink SELECT FROM
      null => RETURN[[[NIL]]];
      immediate => RETURN[[CreateFilter[cH.sH, send, 0, NIL]]];
      ENDCASE => ERROR Error[tooManyTickets]  -- third party not implemented
    END;

  OpenSource: PUBLIC PROCEDURE [ticket: NSDataStream.Ticket, cH: Courier.Handle]
    RETURNS [NSDataStream.SourceStream] =
    BEGIN
    source: BulkDataTransfer.Source ← LOOPHOLE[ticket];
    WITH source SELECT FROM
      null => RETURN[[[NIL]]];
      immediate => RETURN[[CreateFilter[cH.sH, receive, 0, NIL]]];
      ENDCASE => ERROR Error[tooManyTickets]  -- third party not implemented
    END;

  OperateOnSink: PUBLIC PROCEDURE [
    sink: NSDataStream.Sink, operation: PROCEDURE [NSDataStream.SinkStream]] =
    BEGIN
    WITH s: sink SELECT FROM
      proc =>
        BEGIN
        couple: NSDataStream.Couple ← CreateCouple[];
        p: PROCESS ← FORK operation[couple.sink];
        s.proc[couple.source ! UNWIND => JOIN p];
        JOIN p;
        RETURN
        END;
      stream => operation[s.stream];
      none => operation[[[NIL]]];
      ENDCASE
    END;

  OperateOnSource: PUBLIC PROCEDURE [
    source: NSDataStream.Source,
    operation: PROCEDURE [NSDataStream.SourceStream]] =
    BEGIN
    WITH s: source SELECT FROM
      proc =>
        BEGIN
        couple: NSDataStream.Couple ← CreateCouple[];
        p: PROCESS ← FORK operation[couple.source];
        s.proc[couple.sink ! UNWIND => JOIN p];
        JOIN p;
        RETURN
        END;
      stream => operation[s.stream];
      none => operation[[[NIL]]];
      ENDCASE
    END;

  Register: PUBLIC ENTRY PROCEDURE [
    stream: NSDataStream.Handle, forUseAt: Courier.SystemElement,
    cH: Courier.Handle, useImmediateTicket: BOOLEAN ← TRUE]
    RETURNS [ticket: NSDataStream.Ticket ← immediateTicket] =
    BEGIN
    ENABLE UNWIND => NULL;
    ds: DS ← DSFromHandle[stream];
    otherStreamRequest: StreamRequest ← NIL;
    streamRequest: StreamRequest;
    IF ds = NIL THEN RETURN[nullTicket];
    FOR list: SRList ← srList, list.rest UNTIL list = NIL[SRList] DO
      streamRequest ← list.first;
      IF streamRequest.id = ds.id THEN
        WITH request: streamRequest SELECT FROM
          requested =>
            BEGIN
            IF request.direction = ds.direction THEN ERROR InternalError[];
            streamRequest.variant ← registeredWaiting[immediateTicket, cH];
            BROADCAST streamRequest.transition;
            DeleteDS[@ds];
            RETURN
            END;
          registered =>
            BEGIN
            IF request.direction = ds.direction THEN ERROR InternalError[];
            otherStreamRequest ← streamRequest
            END;
          ENDCASE => ERROR InternalError[];
      REPEAT
        FINISHED =>
          BEGIN
          streamRequest ← NewStreamRequest[ds.id, ds];
          srList ← srListProcs.New[streamRequest, srList];
          IF otherStreamRequest = NIL THEN
            srList.first.variant ← registered[forUseAt, cH, ds.direction]
          ELSE
            BEGIN
            srList.first.variant ← registeredWaiting[
              immediateTicket, cH, otherStreamRequest];
            WITH request: otherStreamRequest SELECT FROM
              registered =>
                BEGIN
                otherCH: Courier.Handle ← request.cH;
                otherStreamRequest.variant ← registeredWaiting[
                  immediateTicket, otherCH, streamRequest];
                BROADCAST otherStreamRequest.transition
                END;
              ENDCASE => ERROR InternalError[]
            END
          END;
      ENDLOOP;
    DO
      WITH request: streamRequest SELECT FROM
        registered => WAIT streamRequest.transition;
        registeredWaiting => RETURN[request.ticket];
        registeredCancelled => RETURN[immediateTicket];  -- he'll find out later
        ENDCASE => InternalError[];
      ENDLOOP
    END;

  SetStreamTimeout: PUBLIC PROCEDURE [
    stream: NSDataStream.Handle, waitTimeInSeconds: LONG CARDINAL] =
    BEGIN
    ENABLE Aborted => CONTINUE;
    ds: DS ← GetActiveDS[stream];
    IF NOT ds.local THEN
      ds.baseStream.setTimeout[ds.baseStream, waitTimeInSeconds * 1000];
    END;


  ---- ---- ---- ---- ---- ---- ----  ----
  --  PRIVATE PROCEDURES
  ---- ---- ---- ---- ---- ---- ----  ----

  ActivateDS: PROCEDURE [ds: DS, notificationOnly: BOOLEAN ← FALSE] =
    BEGIN
    IF ~ds.activated THEN
      BEGIN
      inputOptions: Stream.InputOptions ← Stream.defaultInputOptions;
      ds.activated ← TRUE;
      IF ds.baseStream = NIL THEN
        BEGIN
        ds.baseStream ← GetStream[ds, notificationOnly];
        ds.deleteWithStatusProc ← FreeStream;
        END;
      IF ds.baseStream = NIL THEN ds.activated ← FALSE  -- AssertLocal case
      ELSE {
        inputOptions.terminateOnEndRecord ← TRUE;
	ds.baseStream.options ← inputOptions;
        IF ds.direction = send THEN
          BEGIN
          ds.baseStream.setSST[ds.baseStream, dataSST];
          IF NOT ds.local THEN ds.rejectWatcher ← FORK RejectWatcher[ds]
          END;
        ds.attentionWaiter ← FORK AttentionWaiter[ds]}
      END;
    IF ds.aborted = bySender
      OR (ds.aborted = byReceiver AND ds.direction = send) THEN
      IF notificationOnly THEN RETURN ELSE ErrorAborted[ds];
    END;

  AnnounceAttention: ENTRY PROCEDURE [ds: DS, actuallySeen: BOOLEAN] =
    BEGIN
    IF actuallySeen THEN ds.attentionByteReceivedOutOfBand ← TRUE
    ELSE ds.courierRejectSeen ← TRUE;
    SELECT TRUE FROM
     (ds.streamOpProcess = NIL), (ds.streamOpProcess = noProcess) => NULL;
     (~ds.local) => Process.Abort[ds.streamOpProcess];
     ENDCASE;
    ds.streamOpProcess ←
      IF ds.streamOpProcess # NIL AND NOT actuallySeen THEN noProcess ELSE NIL
    END;

  AssignGetAndPutProcs: PROCEDURE [direction: Direction]
    RETURNS [Stream.GetProcedure, Stream.PutProcedure] =
    BEGIN
    PutProcs: ARRAY Direction OF Stream.PutProcedure = [
      LOOPHOLE[PutProcedure], Stream.defaultObject.put];
    GetProcs: ARRAY Direction OF Stream.GetProcedure = [
      Stream.defaultObject.get, LOOPHOLE[GetProcedure]];
    RETURN[GetProcs[direction], PutProcs[direction]]
    END;

  AttentionWaiter: PROCEDURE [ds: DS] RETURNS [received: BOOLEAN ← FALSE] =
    BEGIN
    ENABLE ABORTED, Courier.Error => CONTINUE;
    [] ← ds.baseStream.waitAttention[ds.baseStream];
    AnnounceAttention[ds, (received ← TRUE)]
    END;

  CancelStream: ENTRY PROCEDURE [
    ticket: NSDataStream.Ticket, cH: Courier.Handle]
    RETURNS [action: CancelStreamAction ← [none[]]] =
    BEGIN
    ENABLE UNWIND => NULL;
    DeleteStreamAndRequest: PROCEDURE [streamRequest: StreamRequest] =
      BEGIN DeleteDS[@streamRequest.ds]; FreeStreamRequest[streamRequest]; END;
    streamRequest: StreamRequest;
    FOR list: SRList ← srList, list.rest UNTIL list = NIL[SRList] DO
      streamRequest ← list.first;
      WITH request: streamRequest SELECT FROM
        registeredWaiting =>
          BEGIN
          otherStreamRequest: StreamRequest ← request.other;
          IF request.cH # cH THEN LOOP;
          IF otherStreamRequest = NIL THEN  -- local-remote data transfer
            BEGIN
            streamRequest.ds.aborted ← bySender;
            streamRequest.variant ← registeredCancelled[[TRUE], cH];
            BROADCAST streamRequest.transition;
            RETURN[[waitForDeletion[streamRequest.ds]]]
            END
          ELSE  -- remote-remote data transfer
            BEGIN
            DeleteStreamAndRequest[streamRequest];
            WITH otherRequest: otherStreamRequest SELECT FROM
              registeredWaiting => {
                otherCH: Courier.Handle ← otherRequest.cH;
                otherStreamRequest.variant ← registeredCancelled[
                  [TRUE], otherCH];
                BROADCAST otherStreamRequest.transition};
              registeredInUse => RETURN[[terminate[otherRequest.ds]]];
              ENDCASE => ERROR InternalError[];
            END;
          EXIT
          END;
        registeredCancelled =>
          IF request.cH = cH THEN DeleteStreamAndRequest[streamRequest];
        ENDCASE
      ENDLOOP
    END;

  CheckForAbortBefore: PROCEDURE [ds: DS] =
    {IF StartStreamOp[ds].aborted THEN ProcessAbort[ds, FALSE]};

  CheckForAbortAfter: PROCEDURE [ds: DS, abortSeen: BOOLEAN] =
    BEGIN
    aborted: BOOLEAN;
    Process.Yield[];  -- Give the attention waiter a chance to run.
    aborted ← EndStreamOp[ds].aborted;
    -- Pause to allow the ABORTED signal to be flushed.
    IF aborted AND NOT abortSeen AND NOT ds.local THEN {
      Process.Pause[Process.SecondsToTicks[60] ! ABORTED => CONTINUE]};
    IF aborted OR ds.courierRejectSeen THEN ProcessAbort[ds, FALSE]
    END;

  CheckForOutOfBandAttention: PROCEDURE [ds: DS]
    RETURNS [receivedIt: BOOLEAN ← FALSE] =
    BEGIN
    abortSeen: BOOLEAN ← FALSE;
    self: PROCESS = Process.GetCurrent[];
    Process.Abort[self];

    BEGIN
    ENABLE
      BEGIN
      ABORTED => {abortSeen ← TRUE; CONTINUE};
      Stream.TimeOut, Courier.Error => CONTINUE;
      END;
    [] ← ds.baseStream.waitAttention[ds.baseStream];
    receivedIt ← TRUE
    END;

    IF NOT abortSeen THEN Process.CancelAbort[self];
    ds.attentionByteReceivedOutOfBand ← receivedIt;
    END;

  Close: PROCEDURE [ds: DS] =
    BEGIN
    ENABLE
      Courier.Error, Stream.TimeOut => HandleProblem[ds, FALSE];
    SynchronizeAttentions[ds];
    IF ds.direction = send THEN
      SELECT ds.aborted FROM
        none =>
          ds.baseStream.sendNow[ds.baseStream, TRUE !
            Stream.Attention => {
              ds.attentionByteReceivedOutOfBand ← TRUE;
              SynchronizeAttentions[ds];
              IF ds.notifiedOfAbort THEN CONTINUE ELSE ErrorAborted[ds]}];
        byReceiver => ds.baseStream.sendAttention[ds.baseStream, 1];
        ENDCASE
    END;

  CopyStreamToStream: PROCEDURE [to, from: DS] =
    BEGIN
    abortStatus: AbortStatus ← none;
    toStream: NSDataStream.Handle ← HandleFromDS[to];
    fromStream: NSDataStream.Handle ← HandleFromDS[from];
    buffer: CourierOps.StackBlockHandle ← CourierOps.StackBlockPush[NIL];
    bufferSize: INTEGER = CourierOps.stackPageLength * Environment.bytesPerPage;
    block: Stream.Block ← [
      blockPointer: LOOPHOLE[buffer],
      startIndex: 0, stopIndexPlusOne: bufferSize];
    whyStop: Stream.CompletionCode ← normal;
    UNTIL whyStop = endOfStream DO
      block.stopIndexPlusOne ← bufferSize;
      [bytesTransferred: block.stopIndexPlusOne, why: whyStop] ←
        fromStream.get[fromStream, block, fromStream.options !
	  Aborted => {abortStatus ← bySender; EXIT}];
      toStream.put[toStream, block, FALSE !
        Aborted => {abortStatus ← byReceiver; EXIT}];
      ENDLOOP;
    buffer.nextBlock ← NIL; [] ← CourierOps.StackBlockPop[buffer];
    IF abortStatus = bySender THEN Abort[toStream];
    toStream.delete[toStream ! Aborted => {abortStatus ← byReceiver; CONTINUE}];
    IF abortStatus = byReceiver THEN Abort[fromStream];
    fromStream.delete[fromStream]
    END;

  CreateFilter: PROCEDURE [
    networkStream: Stream.Handle, direction: Direction,
    clientData: LONG UNSPECIFIED,
    deleteProc: PROC [Stream.Handle, LONG UNSPECIFIED]]
    RETURNS [NSDataStream.Handle] = {
    networkStream.setTimeout[networkStream, defaultTimeout];
    RETURN[
      InternalCreate[networkStream, direction, NewID[], clientData, deleteProc]]};

  DeleteProcedure: PROCEDURE [stream: NSDataStream.Handle] =
    BEGIN
    aborted: BOOLEAN ← FALSE;
    ds: DS ← DSFromHandle[stream];
    notifiedOfAbort: BOOLEAN ← ds.notifiedOfAbort;
    ds.deleted ← TRUE;
    ActivateDS[
      ds ! Aborted => {ds.notifiedOfAbort ← notifiedOfAbort; CONTINUE}];
    IF ds.direction = receive AND ~ds.endOfStreamEncountered THEN Abort[stream];
    notifiedOfAbort ← ds.notifiedOfAbort;
    IF ds.baseStream # NIL THEN
      BEGIN
      IF ds.deleteProc # NIL THEN ds.deleteProc[ds.baseStream, ds.clientData];
      IF ds.rejectWatcher # NIL THEN
        BEGIN
        Process.Abort[ds.rejectWatcher];
	[] ← JOIN ds.rejectWatcher;
	ds.rejectWatcher ← NIL;
	END;
      IF ds.attentionWaiter # NIL THEN
        BEGIN
        Process.Abort[ds.attentionWaiter];
	aborted ← JOIN ds.attentionWaiter;
	ds.attentionWaiter ← NIL;
	END;
      Close[ds ! Aborted => {aborted ← TRUE; CONTINUE}];
      IF ds.deleteWithStatusProc # NIL THEN
	ds.deleteWithStatusProc[ds.baseStream, [ds.aborted # none]];
      END;
    DeleteDS[@ds];
    IF aborted AND NOT notifiedOfAbort THEN ERROR Aborted
    END;

  DeleteDS: PROCEDURE [ds: LONG POINTER TO DS] =
    {ds.seal ← busted; CourierInternal.longZone.FREE[ds]};

  DSFromHandle: PROCEDURE [stream: NSDataStream.Handle] RETURNS [DS] = INLINE
    BEGIN
    OPEN ds: LOOPHOLE[stream, DS];
    SELECT TRUE FROM
      (stream = NIL) => RETURN[NIL];  --that means it's a null ticket
      (ds.seal # okay) => ERROR InternalError;  --somebody deleted it!
      ENDCASE => RETURN[@ds];  --give back the coerced handle
    END;  --DSFromHandle

  EndStreamOp: ENTRY PROCEDURE [ds: DS] RETURNS [aborted: BOOLEAN] =
    BEGIN
    aborted ← (ds.streamOpProcess = NIL OR ds.streamOpProcess = noProcess);
    ds.streamOpProcess ← NIL
    END;

  ErrorAborted: PROCEDURE [ds: DS] = {ds.notifiedOfAbort ← TRUE; ERROR Aborted};

  FillInStream: ENTRY PROCEDURE [cH: Courier.Handle]
    RETURNS [thisDS, otherDS: DS ← NIL, cancelled: BOOLEAN ← FALSE] =
    BEGIN
    ENABLE UNWIND => NULL;
    streamRequest, otherRequest: StreamRequest;
    FOR list: SRList ← srList, list.rest UNTIL list = NIL[SRList] DO
      streamRequest ← list.first;
      thisDS ← streamRequest.ds;
      WITH request: streamRequest SELECT FROM
        registeredWaiting =>
          IF request.cH = cH THEN
            BEGIN
            otherRequest ← request.other;
            otherDS ←
              IF otherRequest # NIL AND otherRequest.type = registeredInUse THEN
              otherRequest.ds ELSE NIL;
            BROADCAST streamRequest.transition;
            EXIT
            END;
        registeredCancelled =>
          IF request.cH = cH THEN {otherRequest ← NIL; cancelled ← TRUE; EXIT};
        ENDCASE;
      REPEAT FINISHED => RETURN[thisDS: NIL]
      ENDLOOP;
    streamRequest.variant ← registeredInUse[cH.sH, otherRequest];
    thisDS.baseStream ← cH.sH;
    thisDS.deleteWithStatusProc ← FreeStream;
    IF otherRequest # NIL OR cancelled THEN ReverseDirection[thisDS];
    BROADCAST streamRequest.transition;
    END;

  FlushToEndOfStream: PROCEDURE [ds: DS, waitIfNecessary: BOOLEAN] =
    BEGIN
    sH: Stream.Handle ← ds.baseStream;
    IF NOT (ds.local OR waitIfNecessary) THEN sH.setTimeout[sH, 0];
    BEGIN
    ENABLE
      Courier.Error, Stream.TimeOut => {
        ds.attentionByteReceivedInBand ← TRUE; CONTINUE};
    array: PACKED ARRAY [0..1] OF Environment.Byte;
    why: Stream.CompletionCode;
    sst: Stream.SubSequenceType;
    firstTime: BOOLEAN ← TRUE;  -- workaround for lost first attention byte.
    UNTIL ds.endOfStreamEncountered OR ds.attentionByteReceivedInBand DO
      [, why, sst] ← sH.get[sH, [@array, 1, 2], sH.options];
      SELECT why FROM
        normal =>
          IF firstTime THEN {
            firstTime ← FALSE;
            IF NOT ds.local AND CheckForOutOfBandAttention[ds] THEN
              sH.setTimeout[sH, 0]};
        endRecord, endOfStream => ds.endOfStreamEncountered ← TRUE;
        sstChange =>
          IF sst # dataSST THEN ds.attentionByteReceivedInBand ← TRUE;
        attention => {
          [] ← sH.get[sH, [@array, 1, 2], sH.options];
          ds.attentionByteReceivedInBand ← TRUE};
        ENDCASE;
      ENDLOOP
    END;
    END;

  FreeStream: ENTRY PROCEDURE [stream: Stream.Handle, status: Status] =
    BEGIN
    ENABLE UNWIND => NULL;
    FOR list: SRList ← srList, list.rest UNTIL list = NIL[SRList] DO
      streamRequest: StreamRequest ← list.first;
      WITH request: streamRequest SELECT FROM
        registeredInUse =>
          BEGIN
          IF request.stream # stream THEN LOOP;
          streamRequest.variant ← registeredFree[status];
          BROADCAST streamRequest.transition;
          RETURN;
          END;
        inUse =>
          BEGIN
          IF request.stream # stream THEN LOOP;
          streamRequest.variant ← free[stream];
          FOR list: SRList ← srList, list.rest UNTIL list = NIL[SRList] DO
            IF list.first # streamRequest AND list.first.id = request.id THEN
              DO
                SELECT list.first.type FROM
                  free => {BROADCAST streamRequest.transition; RETURN};
                  inUse =>
                    DO
                      WAIT list.first.transition;
                      WITH sr: list.first SELECT FROM
                        free =>
                          BEGIN
			  stream.delete[stream]; sr.stream.delete[sr.stream];
                          FreeStreamRequest[streamRequest];
                          FreeStreamRequest[list.first];
                          RETURN
                          END;
                        ENDCASE;
                      ENDLOOP;
                  ENDCASE => EXIT
                ENDLOOP;
            ENDLOOP;
          InternalError[]
          END;
        ENDCASE;
      ENDLOOP;
    InternalError[]
    END;

  FreeStreamRequest: PROCEDURE [streamRequest: StreamRequest] = {
    other: StreamRequest ←
      WITH request: streamRequest SELECT FROM
        registeredWaiting => request.other,
        registeredInUse => request.other,
        ENDCASE => NIL;
    IF other # NIL THEN
      WITH request: other SELECT FROM
        registeredWaiting => request.other ← NIL;
        registeredInUse => request.other ← NIL;
        ENDCASE;
    srListProcs.Free[
      srListProcs.Find[@srList, streamRequest], CourierInternal.longZone]};

  GetActiveDS: PROCEDURE [
    stream: NSDataStream.Handle, notificationOnly: BOOLEAN ← FALSE]
    RETURNS [ds: DS] = INLINE {
    ActivateDS[(ds ← DSFromHandle[stream]), notificationOnly]};

  GetProcedure: PROCEDURE [
    stream: NSDataStream.Handle, block: Stream.Block,
    options: Stream.InputOptions]
    RETURNS [
      bytesTransferred: CARDINAL, why: Stream.CompletionCode,
      sst: Stream.SubSequenceType] =
    BEGIN
    BEGIN
    abortSeen: BOOLEAN ← FALSE;
    ds: DS ← GetActiveDS[stream];
    IF ds.endOfStreamEncountered THEN {
      IF options.signalEndOfStream THEN
        ERROR Stream.EndOfStream[block.startIndex];
      bytesTransferred ← 0;
      why ← endOfStream;
      sst ← dataSST}
    ELSE
      BEGIN
      CheckForAbortBefore[ds];
      BEGIN
      ENABLE
        BEGIN
        Courier.Error --[truncatedTransfer] --, Stream.TimeOut =>
          HandleProblem[DSFromHandle[stream], FALSE];
        Aborted, Stream.EndOfStream =>
          CheckForAbortAfter[ds, abortSeen ! Aborted => CONTINUE]
        END;
      DO
        [bytesTransferred, why, sst] ← ds.baseStream.get[
          ds.baseStream, block, ds.baseStream.options !
	    ABORTED => {abortSeen ← TRUE; GOTO SeenAbort}];
        SELECT why FROM
          normal => EXIT;
          sstChange => IF sst # dataSST THEN GOTO unexpectedPacketSubtype;
          endRecord, endOfStream =>
            IF bytesTransferred = block.stopIndexPlusOne - block.startIndex THEN
              {why ← normal; ds.endOfStreamEncountered ← TRUE; EXIT}
            ELSE {
              why ← endOfStream;
              ds.endOfStreamEncountered ← TRUE;
              IF options.signalEndOfStream THEN
                ERROR Stream.EndOfStream[block.startIndex + bytesTransferred]
              ELSE RETURN};
          attention => ProcessAbort[ds, TRUE];  -- should not return
          ENDCASE => ERROR InternalError[];
        REPEAT
          unexpectedPacketSubtype => HandleProblem[ds, TRUE];
          SeenAbort => NULL;
        ENDLOOP
      END;
      CheckForAbortAfter[ds, abortSeen];
      IF abortSeen THEN ERROR ABORTED;
      END
    END
    END;

  GetStream: ENTRY PROCEDURE [ds: DS, dontWait: BOOLEAN ← FALSE]
    RETURNS [stream: Stream.Handle] =
    BEGIN
    ENABLE UNWIND => NULL;
    streamRequest: StreamRequest;
    FOR ref: SRRef ← @srList, @ref.rest UNTIL ref↑ = NIL[SRList] DO
      streamRequest ← ref.first;
      IF streamRequest.id = ds.id THEN
        WITH request: streamRequest SELECT FROM
          registered =>
            BEGIN
            cH: Courier.Handle ← request.cH;
            IF request.direction = ds.direction THEN ERROR InternalError[];
            DeleteDS[@streamRequest.ds];
            streamRequest.ds ← ds;
            streamRequest.variant ← registeredWaiting[immediateTicket, cH];
            BROADCAST streamRequest.transition;
            EXIT
            END;
          registeredWaiting =>
            IF request.other = NIL THEN EXIT ELSE ERROR InternalError[];
          registeredInUse =>
            IF request.other = NIL THEN EXIT ELSE ERROR InternalError[];
          inUse => EXIT;
          requested =>
            IF request.direction = ds.direction THEN EXIT
            ELSE
              BEGIN
              streamPair: StreamPair = GetStreamPair[];
              newStreamRequest: StreamRequest ← NewStreamRequest[ds.id, ds];
              streamRequest.ds.local ← ds.local ← TRUE;
              newStreamRequest.variant ← inUse[streamPair[this]];
              srList ← srListProcs.New[newStreamRequest, srList];
              streamRequest.variant ← inUse[streamPair[that]];
              BROADCAST streamRequest.transition;
              RETURN[streamPair[this]]
              END;
          registeredCancelled => EXIT;
          ENDCASE => ERROR;
      REPEAT
        FINISHED =>
          BEGIN
          streamRequest ← NewStreamRequest[ds.id, ds, ds.direction];
          srList ← srListProcs.New[streamRequest, srList]
          END
      ENDLOOP;
    IF dontWait THEN RETURN[NIL];
    DO
      WITH request: streamRequest SELECT FROM
        requested, registeredWaiting => WAIT streamRequest.transition;
        inUse => RETURN[request.stream];
        registeredInUse => RETURN[request.stream];
        registeredCancelled => {
          status: Status ← request.status;
          streamRequest.variant ← registeredFree[status];
          BROADCAST streamRequest.transition;
          ErrorAborted[request.ds]};
        registeredFree => ErrorAborted[request.ds];
        ENDCASE => InternalError[];
      ENDLOOP;
    END;

  GetStreamPair: PROCEDURE RETURNS [streams: StreamPair] = {
    [streams[this], streams[that]] ← StreamCouple.Create[]};

  HandleFromDS: PROCEDURE [ds: DS] RETURNS [NSDataStream.Handle] = INLINE {
    RETURN[LOOPHOLE[ds]]};

  HandleProblem: PROCEDURE [ds: DS, streamOK: BOOLEAN ← TRUE] =
    BEGIN
    IF NOT streamOK THEN ds.attentionByteReceivedInBand ← TRUE;
    IF ds.aborted = none THEN
      IF streamOK THEN InitiateAbort[ds] ELSE ds.aborted ← bySender;
    ErrorAborted[ds]
    END;

  InitiateAbort: PROCEDURE [ds: DS] =
    BEGIN
    ENABLE Stream.TimeOut, Courier.Error => CONTINUE;
    IF ds.aborted # none OR ds.endOfStreamEncountered THEN RETURN;
    ds.aborted ← IF ds.direction = send THEN bySender ELSE byReceiver;
    ds.baseStream.setSST[ds.baseStream, dataSST];
    ds.baseStream.sendAttention[ds.baseStream, 1]
    END;

  InternalCreate: PROCEDURE [
    stream: Stream.Handle, direction: Direction, id: ID,
    clientData: LONG UNSPECIFIED ← LONG[NIL],
    deleteProc: PROC [Stream.Handle, LONG UNSPECIFIED] ← NIL]
    RETURNS [NSDataStream.Handle] =
    BEGIN
    ds: DS ← CourierInternal.longZone.NEW[
      DSObject ← [
      object: Stream.defaultObject, baseStream: stream, id: id,
      clientData: clientData, direction: direction]];
    [ds.object.get, ds.object.put] ← AssignGetAndPutProcs[direction];
    ds.object.delete ← LOOPHOLE[DeleteProcedure];
    ds.deleteProc ← deleteProc;
    RETURN[[@ds.object]]
    END;

  NewID: ENTRY PROCEDURE RETURNS [ID] = {RETURN[currentID ← currentID + 1]};

  NewStreamRequest: PROCEDURE [
    id: ID, ds: DS ← NIL, direction: Direction ← send]
    RETURNS [streamRequest: StreamRequest] = {
    streamRequest ← CourierInternal.longZone.NEW[
      StreamRequestObject ← [id, ds, , requested[direction]]]};

  ProcessAbort: PROCEDURE [ds: DS, receivedInBand: BOOLEAN] =
    BEGIN
    ds.aborted ← IF ds.direction = send THEN byReceiver ELSE bySender;
    IF receivedInBand THEN ds.attentionByteReceivedInBand ← TRUE;
    ErrorAborted[ds]
    END;

  PutProcedure: PROCEDURE [
    stream: NSDataStream.Handle, block: Stream.Block,
    endPhysicalRecord: BOOLEAN] =
    BEGIN
    abortSeen: BOOLEAN ← FALSE;
    ds: DS ← GetActiveDS[stream];
    IF block.stopIndexPlusOne = block.startIndex THEN RETURN;
    CheckForAbortBefore[ds];
    BEGIN
    ENABLE
      BEGIN
      Courier.Error, Stream.TimeOut =>
        HandleProblem[DSFromHandle[stream], FALSE];
      Aborted => CheckForAbortAfter[ds, abortSeen ! Aborted => CONTINUE]
      END;
    ds.baseStream.put[ds.baseStream, block, FALSE !
      Stream.Attention => ProcessAbort[ds, FALSE];
      ABORTED => {abortSeen ← TRUE; CONTINUE}]
    END;
    CheckForAbortAfter[ds, abortSeen];
    IF abortSeen THEN ERROR ABORTED;
    END;

  RejectWatcher: PROCEDURE [ds: DS] RETURNS [received: BOOLEAN ← FALSE] =
    BEGIN
    ENABLE ABORTED => CONTINUE;
    [] ← ds.baseStream.getByte[ds.baseStream !
      Courier.Error =>
        IF errorCode = transportTimeout THEN GOTO abort ELSE CONTINUE;
      Stream.SSTChange, Stream.TimeOut => GOTO abort];
    received ← TRUE;
    AnnounceAttention[ds, FALSE];
    EXITS abort => NULL;
    END;

  ReverseDirection: INTERNAL PROCEDURE [ds: DS] =
    BEGIN
    ds.direction ← IF ds.direction = send THEN receive ELSE send;
    [ds.object.get, ds.object.put] ← AssignGetAndPutProcs[ds.direction]
    END;

  StartStreamOp: ENTRY PROCEDURE [ds: DS] RETURNS [aborted: BOOLEAN ← FALSE] =
    BEGIN
    IF ds.attentionByteReceivedOutOfBand OR ds.courierRejectSeen THEN
      RETURN[TRUE];
    ds.streamOpProcess ← Process.GetCurrent[]
    END;

  SynchronizeAttentions: PROCEDURE [ds: DS] =
    BEGIN
    DO
      IF ds.attentionByteReceivedInBand
        AND NOT ds.attentionByteReceivedOutOfBand THEN {
        [] ← CheckForOutOfBandAttention[ds]; EXIT}
      ELSE
        SELECT ds.direction FROM
          send =>
            IF ds.attentionByteReceivedOutOfBand
              AND NOT ds.attentionByteReceivedInBand THEN
              FlushToEndOfStream[ds, FALSE]
            ELSE EXIT;
          receive =>
            IF NOT (ds.attentionByteReceivedInBand OR ds.endOfStreamEncountered)
              THEN FlushToEndOfStream[ds, NOT ds.attentionByteReceivedOutOfBand]
            ELSE EXIT;
          ENDCASE
      ENDLOOP
    END;

  TerminateDataStream: PROCEDURE [ds: DS] =
    BEGIN
    Abort[HandleFromDS[ds]];
    HandleFromDS[ds].delete[HandleFromDS[ds] ! Aborted => CONTINUE]
    END;

  WaitForDeletion: ENTRY PROCEDURE [ds: DS] RETURNS [status: Status] =
    BEGIN 
    ENABLE UNWIND => NULL;
    streamRequest: StreamRequest;
    FOR list: SRList ← srList, list.rest UNTIL list = NIL[SRList] DO
      streamRequest ← list.first;
      IF ds = streamRequest.ds THEN EXIT
      REPEAT FINISHED => ERROR InternalError[]
      ENDLOOP;
    DO
      WITH request: streamRequest SELECT FROM
        registeredInUse, registeredCancelled => WAIT streamRequest.transition;
        registeredFree => {status ← request.status; EXIT};
        ENDCASE => InternalError[];
      ENDLOOP;
    FreeStreamRequest[streamRequest];
    END;

  srListProcs ← [Find, Free, New];

  END.

LOG  ( date - person - action )

20-Dec-84 15:31:10 - AOF - Post Klamath
27-May-85 12:07:32 - AOF - Starting rewrite
25-Nov-85  7:26:41 - AOF - AR# 7888