-- File: NetworkStreamMgr.mesa - last edit:
-- AOF                  9-Sep-87  9:00:59
-- SMA                 22-May-86 18:13:27
-- Copyright (C) 1984, 1985, 1986, 1987 by Xerox Corporation. All rights reserved. 
--Function: The implementation module for the manager of Pilot Network Streams.

DIRECTORY
  NSBuffer USING [
    Body, Buffer, Dequeue,  Enqueue, QueueCleanup, QueueInitialize, ReturnBuffer],
  CommFlags USING [doDebug],
  CommHeap USING [zone],
  Environment USING [Block, nullBlock],
  Driver USING [Glitch],
  HostNumbers USING [IsMulticastID],
  NetworkStream USING [
    ConnectionSuspended, WaitTime, ClassOfService, CloseStatus, closeSST,
    closeReplySST, ListenErrorReason, ConnectionFailed],
  NetworkStreamImpl USING [startByteStream, stopByteStream],
  NetworkStreamInternal USING [
    CloseState, ControlHandle, ListenerHandle, ListenerObject],
  NSTypes USING [ConnectionID],
  PacketStream USING [ConnectionAlreadyThere, FindAddresses, Handle, Make],
  RouterInternal USING [SendErrorPacket],
  Process USING [DisableTimeout, EnableAborts],
  Runtime USING [GlobalFrame],
  SpecialSystem USING [HostNumber],
  Socket USING [ChannelHandle, Delete, SetWaitTime],
  SocketInternal USING [CreateListen, ListenerProcType],
  SppOps USING [GlobalFrameFromPacketStream, PacketStreamFromByteStream],
  Stream USING [
    Byte, CompletionCode, defaultInputOptions, Handle, SubSequenceType, TimeOut],
  System USING [NetworkAddress, nullNetworkAddress];

NetworkStreamMgr: MONITOR
  IMPORTS
    NSBuffer, CommHeap, Driver, NetworkStream, netStrmInst: NetworkStreamImpl,
    HostNumbers, Stream, PacketStream, Process, RouterInternal, Runtime, Socket,
    SocketInternal, SppOps
  EXPORTS NetworkStream, SppOps, System =
  BEGIN

  --EXPORTED TYPE(S) and READONLY Variables
  HostNumber: PUBLIC TYPE = SpecialSystem.HostNumber;

  ConnectionID: PUBLIC TYPE = NSTypes.ConnectionID;
  unknownConnID, uniqueConnID: PUBLIC ConnectionID ← [0];
  
  ListenerHandle: PUBLIC TYPE = NetworkStreamInternal.ListenerHandle;
  uniqueNetworkAddr: PUBLIC System.NetworkAddress ← System.nullNetworkAddress;

  ListenError: PUBLIC ERROR [reason: NetworkStream.ListenErrorReason] = CODE;
  ListenTimeout: PUBLIC SIGNAL = CODE;

  BadDestintationAddress: ERROR = CODE;  --for glitching

  --Complete the pending listener connection
  ApproveConnection: PUBLIC PROC[listenerH: ListenerHandle,
    streamTimeout: NetworkStream.WaitTime,
    classOfService: NetworkStream.ClassOfService]
    RETURNS [sH: Stream.Handle] =
    BEGIN
    IF (listenerH = NIL) OR (listenerH.seal # listenerSeal)
      THEN ERROR ListenError[illegalHandle];
    IF listenerH.state # pending THEN ERROR ListenError[illegalState];
    sH ← CreateTransducer[
      uniqueNetworkAddr, listenerH.buffer.ns.source,
      Environment.nullBlock,
      unknownConnID, listenerH.buffer.ns.sourceConnectionID,
      TRUE, streamTimeout, classOfService !
	UNWIND => {
	  NSBuffer.ReturnBuffer[listenerH.buffer]; listenerH.state ← idle}];
    NSBuffer.ReturnBuffer[listenerH.buffer];
    listenerH.state ← idle;
    END;

  <<
  This procedure closes communication over a network stream at the client's
  level of protocol.  The semantics are I want to close the stream, therefore
  I do not want to transmit any more data and only want to know that the remote
  end is aware of my actions.  Any input data arriving on the stream may be
  discarded.  The status is good, noReply if the other end just did not
  respond, and incomplete if it was a simultaneous close and the third data
  packet did not make it and the other end is gone.
  >>
  Close: PUBLIC PROC[sH: Stream.Handle]
    RETURNS [status: NetworkStream.CloseStatus] =
    BEGIN
    ENABLE
      Stream.TimeOut, NetworkStream.ConnectionSuspended =>
        {status ← noReply; CONTINUE};
    sst: Stream.SubSequenceType;
    compCode: Stream.CompletionCode;
    data: PACKED ARRAY [0..2) OF Stream.Byte;
    state: NetworkStreamInternal.CloseState ← sendClose;
    block: Environment.Block ← [@data, 0, LENGTH[data]];
    status ← good;
    sH.setSST[sH, NetworkStream.closeSST];
    sH.sendNow[sH, FALSE];
    state ← waitCloseReply;
    UNTIL state = closed OR state = sendCloseReply DO
      [, compCode, sst] ← sH.get[sH, block, Stream.defaultInputOptions];
      IF compCode = sstChange THEN
        BEGIN
	SELECT sst FROM
	  NetworkStream.closeReplySST =>
	    BEGIN
	    sH.setSST[sH, NetworkStream.closeReplySST];
	    sH.sendNow[sH, FALSE];
	    state ← closed;
	    END;
	  NetworkStream.closeSST => state ← sendCloseReply;
	  ENDCASE;
	END;
      ENDLOOP;
    IF state = sendCloseReply THEN status ← CloseReply[sH];
    END; --Close
 
  <<
  This procedure conforms to the close protocol and is invoked by the client of
  a network stream when it receives a subsequence change to closeSST.  The
  semantics of this call are that the client knows that the other end wants to
  close the communication and it will not transmit any more data and will reply
  with a closeReplySST.  The status can be good, or incomplete; the latter
  implying that an answer to the closeReplySST did not make it back and the
  other end is gone, or that the closeReply was never acked.
  >>
  CloseReply: PUBLIC PROC[sH: Stream.Handle]
    RETURNS [status: NetworkStream.CloseStatus] =
    BEGIN
    ENABLE
      Stream.TimeOut, NetworkStream.ConnectionSuspended =>
        {status ← noReply; CONTINUE};
    sst: Stream.SubSequenceType;
    compCode: Stream.CompletionCode;
    data: PACKED ARRAY [0..2) OF Stream.Byte;
    block: Environment.Block ← [@data, 0, LENGTH[data]];
    status ← good;
    sH.setSST[sH, NetworkStream.closeReplySST];
    sH.sendNow[sH, FALSE];
    DO  --until exits
      [, compCode, sst] ← sH.get[sH, block, Stream.defaultInputOptions];
      IF (compCode = sstChange) AND (sst = NetworkStream.closeReplySST) THEN EXIT;
      ENDLOOP;
    END; --CloseReply
  
  --Create a network stream to the specified remote address.
  Create: PUBLIC PROC[
    remote: System.NetworkAddress, connectData: Environment.Block,
    timeout: NetworkStream.WaitTime, classOfService: NetworkStream.ClassOfService]
    RETURNS [Stream.Handle] =
    BEGIN
    RETURN[
      CreateTransducer[
	uniqueNetworkAddr, remote, connectData,
        unknownConnID, unknownConnID,
	TRUE, timeout, classOfService]];
    END; --Create

  --Create a listener at the specified local Network Address.
  --The generation of IllegalAddress will become more sophisticated.
  --We enqueue all the buffers onto the socket channel.
  CreateListener: PUBLIC PROC[addr: System.NetworkAddress]
    RETURNS [listenerH: ListenerHandle] =
    BEGIN
    listenerH ← CommHeap.zone.NEW[NetworkStreamInternal.ListenerObject ← [
      seal: listenerSeal, state: idle, buffer: NIL, socket: , queue: ]];
    listenerH.socket ← SocketInternal.CreateListen[
      socket: addr.socket, type: sequencedPacket,
      clientData: listenerH, callback: NewConnection];
    Process.DisableTimeout[@listenerH.condition];
    Process.EnableAborts[@listenerH.condition];
    NSBuffer.QueueInitialize[@listenerH.queue];
    END; --CreateListener
  
  --Create a sequenced packet transducer with all its parameters.
  CreateTransducer: PUBLIC PROC[
    local, remote: System.NetworkAddress, connectData: Environment.Block,
    localConnID, remoteConnID: ConnectionID,
    activelyEstablish: BOOLEAN, timeout: NetworkStream.WaitTime,
    classOfService: NetworkStream.ClassOfService]
    RETURNS [sH: Stream.Handle] =
    BEGIN
    psH: PacketStream.Handle;
    instanceN: LONG POINTER TO FRAME[NetworkStreamImpl];
    SELECT TRUE FROM
      (~HostNumbers.IsMulticastID[@remote.host]) => NULL;  --okay
      (~CommFlags.doDebug) => --DON'T LET HIM GET AWAY WITH THIS-- DO
	SIGNAL NetworkStream.ConnectionFailed[noTranslationForDestination];
	ENDLOOP;
      ENDCASE => Driver.Glitch[BadDestintationAddress];
    psH ← PacketStream.Make[
      local, remote, localConnID, remoteConnID, activelyEstablish,
      timeout, classOfService];
    instanceN ← SppOps.GlobalFrameFromPacketStream[psH];
    sH ← instanceN.startByteStream[psH];
    sH.delete ← Delete;  --don't use instance's version of delete
    END; --CreateTransducer

  Delete: PROC[sH: Stream.Handle] =
    BEGIN
    --delete instance associcated with this stream
    instanceN: LONG POINTER TO FRAME[NetworkStreamImpl] =
      GlobalFrameFromByteStream[sH];
    psH: PacketStream.Handle = SppOps.PacketStreamFromByteStream[sH];
    instanceN.stopByteStream[];  --shut down the bytestream
    psH.destroy[psH];  --then the underlying packet stream
    END;

  --This procedure deletes a listener.
  DeleteListener: PUBLIC PROC[listenerH: ListenerHandle] =
    BEGIN
    IF (listenerH = NIL) OR (listenerH.seal # listenerSeal)
      THEN ERROR ListenError[illegalHandle];
    IF listenerH.state = pending THEN
      RouterInternal.SendErrorPacket[listenerH.buffer, listenerReject];
    listenerH.seal ← unsealed;
    NSBuffer.QueueCleanup[@listenerH.queue];
    Socket.Delete[listenerH.socket];
    CommHeap.zone.FREE[@listenerH];
    END; --DeleteListener


  --This procedure returns the local and remote addresses of the Network Stream.
  FindAddresses: PUBLIC PROC[sH: Stream.Handle]
    RETURNS [local, remote: System.NetworkAddress] =
    BEGIN
    [local, remote] ← PacketStream.FindAddresses[
      LOOPHOLE[sH, NetworkStreamInternal.ControlHandle].psH];
    END; --FindAddresses

  GlobalFrameFromByteStream: PUBLIC PROC[sH: Stream.Handle]
    RETURNS[LONG POINTER --TO FRAME[NetworkStreamImpl]--] =
    {RETURN[LOOPHOLE[Runtime.GlobalFrame[LOOPHOLE[sH.get]]]]};

  <<
  Listen on the specified socket channel for a sequenced packet.
  This procedure checks to make sure that the arriving packet is not a
  duplicate.
  >>
  Listen: PUBLIC PROC[
    listenerH: ListenerHandle, connectData: Environment.Block,
    listenTimeout: NetworkStream.WaitTime]
    RETURNS [remote: System.NetworkAddress, bytes: CARDINAL] =
    BEGIN

    GetPacket: ENTRY PROC =
      BEGIN
      ENABLE UNWIND => NULL;
      WHILE (b ← NSBuffer.Dequeue[@listenerH.queue]) = NIL DO
        WAIT listenerH.condition; ENDLOOP;
      END;  --GetPacket

    b: NSBuffer.Buffer;
    IF (listenerH = NIL) OR (listenerH.seal # listenerSeal)
      THEN ERROR ListenError[illegalHandle];
    b ← listenerH.buffer;
    SELECT listenerH.state FROM
      pending => RouterInternal.SendErrorPacket[b, listenerReject];
      listening => ERROR ListenError[illegalState];
      ENDCASE;
    listenerH.state ← listening;
    Socket.SetWaitTime[listenerH.socket, listenTimeout];
    DO
    --until the packet is a sequenced packet and is not a duplicate,
    --or timeout, or aborted socket
      body: NSBuffer.Body;
      GetPacket[ ! UNWIND => listenerH.state ← idle];
      IF b = NIL THEN LOOP;  --nothing happening here
      body ← b.ns;  --makes for cheaper access
      --discard any cases with something wrong, ENDCASE is only acceptable packet
      SELECT TRUE FROM
	(b.fo.status # goodCompletion) =>
	  NSBuffer.ReturnBuffer[b];  --don't chance it with bad buffers
	(HostNumbers.IsMulticastID[@body.destination.host]) =>
	  NSBuffer.ReturnBuffer[b];  --he's insane, but can't error broadcast
	(HostNumbers.IsMulticastID[@body.source.host]) =>
	  NSBuffer.ReturnBuffer[b];  --he's busted, and I don't care
	(body.packetType # sequencedPacket) =>
	  RouterInternal.SendErrorPacket[b, invalidPacketType];
	(body.destinationConnectionID # unknownConnID) =>
	  RouterInternal.SendErrorPacket[b, protocolViolation];
	(body.sourceConnectionID = unknownConnID) =>
	  RouterInternal.SendErrorPacket[b, protocolViolation];
	(body.sequenceNumber # 0) =>
	  RouterInternal.SendErrorPacket[b, protocolViolation];
	(~body.systemPacket) =>
	  RouterInternal.SendErrorPacket[b, protocolViolation];
	(~PacketStream.ConnectionAlreadyThere[
	    body.source, body.sourceConnectionID]) =>
	  BEGIN
	  listenerH.buffer ← b;
	  listenerH.state ← pending;
	  RETURN [body.source, 0];
	  END;
	ENDCASE => NSBuffer.ReturnBuffer[b];
      ENDLOOP;
    END; --Listen

  NewConnection: ENTRY SocketInternal.ListenerProcType =
    BEGIN
    OPEN lH: NARROW[clientData, NetworkStreamInternal.ListenerHandle];
    NSBuffer.Enqueue[@lH.queue, b]; NOTIFY lH.condition;
    END;  --NewConnection

  SetWaitTime: PUBLIC PROC[sH: Stream.Handle, time: NetworkStream.WaitTime] =
    {sH.setTimeout[sH, time]};

  --initialization (Cold)

  END. --NetworkStreamMgr module

LOG

17-May-84 10:57:14  - AOF  - Post Klamath
 5-Nov-85 11:34:19  - AOF  - Removing LOOPHOLEs
10-Dec-85 18:30:02  - AOF  - Listener restructuring
17-Apr-86 16:46:21  - AOF  - Trap use of multicast address
22-May-86 11:47:18  - SMA  - Remove dependencies on Buffer.
14-Apr-87 11:50:12  - AOF  - Catch multicast sources in Listen.