-- File: NetworkStreamMgr.mesa - last edit: -- AOF 9-Sep-87 9:00:59 -- SMA 22-May-86 18:13:27 -- Copyright (C) 1984, 1985, 1986, 1987 by Xerox Corporation. All rights reserved. --Function: The implementation module for the manager of Pilot Network Streams. DIRECTORY NSBuffer USING [ Body, Buffer, Dequeue, Enqueue, QueueCleanup, QueueInitialize, ReturnBuffer], CommFlags USING [doDebug], CommHeap USING [zone], Environment USING [Block, nullBlock], Driver USING [Glitch], HostNumbers USING [IsMulticastID], NetworkStream USING [ ConnectionSuspended, WaitTime, ClassOfService, CloseStatus, closeSST, closeReplySST, ListenErrorReason, ConnectionFailed], NetworkStreamImpl USING [startByteStream, stopByteStream], NetworkStreamInternal USING [ CloseState, ControlHandle, ListenerHandle, ListenerObject], NSTypes USING [ConnectionID], PacketStream USING [ConnectionAlreadyThere, FindAddresses, Handle, Make], RouterInternal USING [SendErrorPacket], Process USING [DisableTimeout, EnableAborts], Runtime USING [GlobalFrame], SpecialSystem USING [HostNumber], Socket USING [ChannelHandle, Delete, SetWaitTime], SocketInternal USING [CreateListen, ListenerProcType], SppOps USING [GlobalFrameFromPacketStream, PacketStreamFromByteStream], Stream USING [ Byte, CompletionCode, defaultInputOptions, Handle, SubSequenceType, TimeOut], System USING [NetworkAddress, nullNetworkAddress]; NetworkStreamMgr: MONITOR IMPORTS NSBuffer, CommHeap, Driver, NetworkStream, netStrmInst: NetworkStreamImpl, HostNumbers, Stream, PacketStream, Process, RouterInternal, Runtime, Socket, SocketInternal, SppOps EXPORTS NetworkStream, SppOps, System = BEGIN --EXPORTED TYPE(S) and READONLY Variables HostNumber: PUBLIC TYPE = SpecialSystem.HostNumber; ConnectionID: PUBLIC TYPE = NSTypes.ConnectionID; unknownConnID, uniqueConnID: PUBLIC ConnectionID ← [0]; ListenerHandle: PUBLIC TYPE = NetworkStreamInternal.ListenerHandle; uniqueNetworkAddr: PUBLIC System.NetworkAddress ← System.nullNetworkAddress; ListenError: PUBLIC ERROR [reason: NetworkStream.ListenErrorReason] = CODE; ListenTimeout: PUBLIC SIGNAL = CODE; BadDestintationAddress: ERROR = CODE; --for glitching --Complete the pending listener connection ApproveConnection: PUBLIC PROC[listenerH: ListenerHandle, streamTimeout: NetworkStream.WaitTime, classOfService: NetworkStream.ClassOfService] RETURNS [sH: Stream.Handle] = BEGIN IF (listenerH = NIL) OR (listenerH.seal # listenerSeal) THEN ERROR ListenError[illegalHandle]; IF listenerH.state # pending THEN ERROR ListenError[illegalState]; sH ← CreateTransducer[ uniqueNetworkAddr, listenerH.buffer.ns.source, Environment.nullBlock, unknownConnID, listenerH.buffer.ns.sourceConnectionID, TRUE, streamTimeout, classOfService ! UNWIND => { NSBuffer.ReturnBuffer[listenerH.buffer]; listenerH.state ← idle}]; NSBuffer.ReturnBuffer[listenerH.buffer]; listenerH.state ← idle; END; << This procedure closes communication over a network stream at the client's level of protocol. The semantics are I want to close the stream, therefore I do not want to transmit any more data and only want to know that the remote end is aware of my actions. Any input data arriving on the stream may be discarded. The status is good, noReply if the other end just did not respond, and incomplete if it was a simultaneous close and the third data packet did not make it and the other end is gone. >> Close: PUBLIC PROC[sH: Stream.Handle] RETURNS [status: NetworkStream.CloseStatus] = BEGIN ENABLE Stream.TimeOut, NetworkStream.ConnectionSuspended => {status ← noReply; CONTINUE}; sst: Stream.SubSequenceType; compCode: Stream.CompletionCode; data: PACKED ARRAY [0..2) OF Stream.Byte; state: NetworkStreamInternal.CloseState ← sendClose; block: Environment.Block ← [@data, 0, LENGTH[data]]; status ← good; sH.setSST[sH, NetworkStream.closeSST]; sH.sendNow[sH, FALSE]; state ← waitCloseReply; UNTIL state = closed OR state = sendCloseReply DO [, compCode, sst] ← sH.get[sH, block, Stream.defaultInputOptions]; IF compCode = sstChange THEN BEGIN SELECT sst FROM NetworkStream.closeReplySST => BEGIN sH.setSST[sH, NetworkStream.closeReplySST]; sH.sendNow[sH, FALSE]; state ← closed; END; NetworkStream.closeSST => state ← sendCloseReply; ENDCASE; END; ENDLOOP; IF state = sendCloseReply THEN status ← CloseReply[sH]; END; --Close << This procedure conforms to the close protocol and is invoked by the client of a network stream when it receives a subsequence change to closeSST. The semantics of this call are that the client knows that the other end wants to close the communication and it will not transmit any more data and will reply with a closeReplySST. The status can be good, or incomplete; the latter implying that an answer to the closeReplySST did not make it back and the other end is gone, or that the closeReply was never acked. >> CloseReply: PUBLIC PROC[sH: Stream.Handle] RETURNS [status: NetworkStream.CloseStatus] = BEGIN ENABLE Stream.TimeOut, NetworkStream.ConnectionSuspended => {status ← noReply; CONTINUE}; sst: Stream.SubSequenceType; compCode: Stream.CompletionCode; data: PACKED ARRAY [0..2) OF Stream.Byte; block: Environment.Block ← [@data, 0, LENGTH[data]]; status ← good; sH.setSST[sH, NetworkStream.closeReplySST]; sH.sendNow[sH, FALSE]; DO --until exits [, compCode, sst] ← sH.get[sH, block, Stream.defaultInputOptions]; IF (compCode = sstChange) AND (sst = NetworkStream.closeReplySST) THEN EXIT; ENDLOOP; END; --CloseReply --Create a network stream to the specified remote address. Create: PUBLIC PROC[ remote: System.NetworkAddress, connectData: Environment.Block, timeout: NetworkStream.WaitTime, classOfService: NetworkStream.ClassOfService] RETURNS [Stream.Handle] = BEGIN RETURN[ CreateTransducer[ uniqueNetworkAddr, remote, connectData, unknownConnID, unknownConnID, TRUE, timeout, classOfService]]; END; --Create --Create a listener at the specified local Network Address. --The generation of IllegalAddress will become more sophisticated. --We enqueue all the buffers onto the socket channel. CreateListener: PUBLIC PROC[addr: System.NetworkAddress] RETURNS [listenerH: ListenerHandle] = BEGIN listenerH ← CommHeap.zone.NEW[NetworkStreamInternal.ListenerObject ← [ seal: listenerSeal, state: idle, buffer: NIL, socket: , queue: ]]; listenerH.socket ← SocketInternal.CreateListen[ socket: addr.socket, type: sequencedPacket, clientData: listenerH, callback: NewConnection]; Process.DisableTimeout[@listenerH.condition]; Process.EnableAborts[@listenerH.condition]; NSBuffer.QueueInitialize[@listenerH.queue]; END; --CreateListener --Create a sequenced packet transducer with all its parameters. CreateTransducer: PUBLIC PROC[ local, remote: System.NetworkAddress, connectData: Environment.Block, localConnID, remoteConnID: ConnectionID, activelyEstablish: BOOLEAN, timeout: NetworkStream.WaitTime, classOfService: NetworkStream.ClassOfService] RETURNS [sH: Stream.Handle] = BEGIN psH: PacketStream.Handle; instanceN: LONG POINTER TO FRAME[NetworkStreamImpl]; SELECT TRUE FROM (~HostNumbers.IsMulticastID[@remote.host]) => NULL; --okay (~CommFlags.doDebug) => --DON'T LET HIM GET AWAY WITH THIS-- DO SIGNAL NetworkStream.ConnectionFailed[noTranslationForDestination]; ENDLOOP; ENDCASE => Driver.Glitch[BadDestintationAddress]; psH ← PacketStream.Make[ local, remote, localConnID, remoteConnID, activelyEstablish, timeout, classOfService]; instanceN ← SppOps.GlobalFrameFromPacketStream[psH]; sH ← instanceN.startByteStream[psH]; sH.delete ← Delete; --don't use instance's version of delete END; --CreateTransducer Delete: PROC[sH: Stream.Handle] = BEGIN --delete instance associcated with this stream instanceN: LONG POINTER TO FRAME[NetworkStreamImpl] = GlobalFrameFromByteStream[sH]; psH: PacketStream.Handle = SppOps.PacketStreamFromByteStream[sH]; instanceN.stopByteStream[]; --shut down the bytestream psH.destroy[psH]; --then the underlying packet stream END; --This procedure deletes a listener. DeleteListener: PUBLIC PROC[listenerH: ListenerHandle] = BEGIN IF (listenerH = NIL) OR (listenerH.seal # listenerSeal) THEN ERROR ListenError[illegalHandle]; IF listenerH.state = pending THEN RouterInternal.SendErrorPacket[listenerH.buffer, listenerReject]; listenerH.seal ← unsealed; NSBuffer.QueueCleanup[@listenerH.queue]; Socket.Delete[listenerH.socket]; CommHeap.zone.FREE[@listenerH]; END; --DeleteListener --This procedure returns the local and remote addresses of the Network Stream. FindAddresses: PUBLIC PROC[sH: Stream.Handle] RETURNS [local, remote: System.NetworkAddress] = BEGIN [local, remote] ← PacketStream.FindAddresses[ LOOPHOLE[sH, NetworkStreamInternal.ControlHandle].psH]; END; --FindAddresses GlobalFrameFromByteStream: PUBLIC PROC[sH: Stream.Handle] RETURNS[LONG POINTER --TO FRAME[NetworkStreamImpl]--] = {RETURN[LOOPHOLE[Runtime.GlobalFrame[LOOPHOLE[sH.get]]]]}; << Listen on the specified socket channel for a sequenced packet. This procedure checks to make sure that the arriving packet is not a duplicate. >> Listen: PUBLIC PROC[ listenerH: ListenerHandle, connectData: Environment.Block, listenTimeout: NetworkStream.WaitTime] RETURNS [remote: System.NetworkAddress, bytes: CARDINAL] = BEGIN GetPacket: ENTRY PROC = BEGIN ENABLE UNWIND => NULL; WHILE (b ← NSBuffer.Dequeue[@listenerH.queue]) = NIL DO WAIT listenerH.condition; ENDLOOP; END; --GetPacket b: NSBuffer.Buffer; IF (listenerH = NIL) OR (listenerH.seal # listenerSeal) THEN ERROR ListenError[illegalHandle]; b ← listenerH.buffer; SELECT listenerH.state FROM pending => RouterInternal.SendErrorPacket[b, listenerReject]; listening => ERROR ListenError[illegalState]; ENDCASE; listenerH.state ← listening; Socket.SetWaitTime[listenerH.socket, listenTimeout]; DO --until the packet is a sequenced packet and is not a duplicate, --or timeout, or aborted socket body: NSBuffer.Body; GetPacket[ ! UNWIND => listenerH.state ← idle]; IF b = NIL THEN LOOP; --nothing happening here body ← b.ns; --makes for cheaper access --discard any cases with something wrong, ENDCASE is only acceptable packet SELECT TRUE FROM (b.fo.status # goodCompletion) => NSBuffer.ReturnBuffer[b]; --don't chance it with bad buffers (HostNumbers.IsMulticastID[@body.destination.host]) => NSBuffer.ReturnBuffer[b]; --he's insane, but can't error broadcast (HostNumbers.IsMulticastID[@body.source.host]) => NSBuffer.ReturnBuffer[b]; --he's busted, and I don't care (body.packetType # sequencedPacket) => RouterInternal.SendErrorPacket[b, invalidPacketType]; (body.destinationConnectionID # unknownConnID) => RouterInternal.SendErrorPacket[b, protocolViolation]; (body.sourceConnectionID = unknownConnID) => RouterInternal.SendErrorPacket[b, protocolViolation]; (body.sequenceNumber # 0) => RouterInternal.SendErrorPacket[b, protocolViolation]; (~body.systemPacket) => RouterInternal.SendErrorPacket[b, protocolViolation]; (~PacketStream.ConnectionAlreadyThere[ body.source, body.sourceConnectionID]) => BEGIN listenerH.buffer ← b; listenerH.state ← pending; RETURN [body.source, 0]; END; ENDCASE => NSBuffer.ReturnBuffer[b]; ENDLOOP; END; --Listen NewConnection: ENTRY SocketInternal.ListenerProcType = BEGIN OPEN lH: NARROW[clientData, NetworkStreamInternal.ListenerHandle]; NSBuffer.Enqueue[@lH.queue, b]; NOTIFY lH.condition; END; --NewConnection SetWaitTime: PUBLIC PROC[sH: Stream.Handle, time: NetworkStream.WaitTime] = {sH.setTimeout[sH, time]}; --initialization (Cold) END. --NetworkStreamMgr module LOG 17-May-84 10:57:14 - AOF - Post Klamath 5-Nov-85 11:34:19 - AOF - Removing LOOPHOLEs 10-Dec-85 18:30:02 - AOF - Listener restructuring 17-Apr-86 16:46:21 - AOF - Trap use of multicast address 22-May-86 11:47:18 - SMA - Remove dependencies on Buffer. 14-Apr-87 11:50:12 - AOF - Catch multicast sources in Listen.