-- File: NetworkStreamMgr.mesa - last edit:
-- AOF 9-Sep-87 9:00:59
-- SMA 22-May-86 18:13:27
-- Copyright (C) 1984, 1985, 1986, 1987 by Xerox Corporation. All rights reserved.
--Function: The implementation module for the manager of Pilot Network Streams.
DIRECTORY
NSBuffer USING [
Body, Buffer, Dequeue, Enqueue, QueueCleanup, QueueInitialize, ReturnBuffer],
CommFlags USING [doDebug],
CommHeap USING [zone],
Environment USING [Block, nullBlock],
Driver USING [Glitch],
HostNumbers USING [IsMulticastID],
NetworkStream USING [
ConnectionSuspended, WaitTime, ClassOfService, CloseStatus, closeSST,
closeReplySST, ListenErrorReason, ConnectionFailed],
NetworkStreamImpl USING [startByteStream, stopByteStream],
NetworkStreamInternal USING [
CloseState, ControlHandle, ListenerHandle, ListenerObject],
NSTypes USING [ConnectionID],
PacketStream USING [ConnectionAlreadyThere, FindAddresses, Handle, Make],
RouterInternal USING [SendErrorPacket],
Process USING [DisableTimeout, EnableAborts],
Runtime USING [GlobalFrame],
SpecialSystem USING [HostNumber],
Socket USING [ChannelHandle, Delete, SetWaitTime],
SocketInternal USING [CreateListen, ListenerProcType],
SppOps USING [GlobalFrameFromPacketStream, PacketStreamFromByteStream],
Stream USING [
Byte, CompletionCode, defaultInputOptions, Handle, SubSequenceType, TimeOut],
System USING [NetworkAddress, nullNetworkAddress];
NetworkStreamMgr: MONITOR
IMPORTS
NSBuffer, CommHeap, Driver, NetworkStream, netStrmInst: NetworkStreamImpl,
HostNumbers, Stream, PacketStream, Process, RouterInternal, Runtime, Socket,
SocketInternal, SppOps
EXPORTS NetworkStream, SppOps, System =
BEGIN
--EXPORTED TYPE(S) and READONLY Variables
HostNumber: PUBLIC TYPE = SpecialSystem.HostNumber;
ConnectionID: PUBLIC TYPE = NSTypes.ConnectionID;
unknownConnID, uniqueConnID: PUBLIC ConnectionID ← [0];
ListenerHandle: PUBLIC TYPE = NetworkStreamInternal.ListenerHandle;
uniqueNetworkAddr: PUBLIC System.NetworkAddress ← System.nullNetworkAddress;
ListenError: PUBLIC ERROR [reason: NetworkStream.ListenErrorReason] = CODE;
ListenTimeout: PUBLIC SIGNAL = CODE;
BadDestintationAddress: ERROR = CODE; --for glitching
--Complete the pending listener connection
ApproveConnection: PUBLIC PROC[listenerH: ListenerHandle,
streamTimeout: NetworkStream.WaitTime,
classOfService: NetworkStream.ClassOfService]
RETURNS [sH: Stream.Handle] =
BEGIN
IF (listenerH = NIL) OR (listenerH.seal # listenerSeal)
THEN ERROR ListenError[illegalHandle];
IF listenerH.state # pending THEN ERROR ListenError[illegalState];
sH ← CreateTransducer[
uniqueNetworkAddr, listenerH.buffer.ns.source,
Environment.nullBlock,
unknownConnID, listenerH.buffer.ns.sourceConnectionID,
TRUE, streamTimeout, classOfService !
UNWIND => {
NSBuffer.ReturnBuffer[listenerH.buffer]; listenerH.state ← idle}];
NSBuffer.ReturnBuffer[listenerH.buffer];
listenerH.state ← idle;
END;
<<
This procedure closes communication over a network stream at the client's
level of protocol. The semantics are I want to close the stream, therefore
I do not want to transmit any more data and only want to know that the remote
end is aware of my actions. Any input data arriving on the stream may be
discarded. The status is good, noReply if the other end just did not
respond, and incomplete if it was a simultaneous close and the third data
packet did not make it and the other end is gone.
>>
Close: PUBLIC PROC[sH: Stream.Handle]
RETURNS [status: NetworkStream.CloseStatus] =
BEGIN
ENABLE
Stream.TimeOut, NetworkStream.ConnectionSuspended =>
{status ← noReply; CONTINUE};
sst: Stream.SubSequenceType;
compCode: Stream.CompletionCode;
data: PACKED ARRAY [0..2) OF Stream.Byte;
state: NetworkStreamInternal.CloseState ← sendClose;
block: Environment.Block ← [@data, 0, LENGTH[data]];
status ← good;
sH.setSST[sH, NetworkStream.closeSST];
sH.sendNow[sH, FALSE];
state ← waitCloseReply;
UNTIL state = closed OR state = sendCloseReply DO
[, compCode, sst] ← sH.get[sH, block, Stream.defaultInputOptions];
IF compCode = sstChange THEN
BEGIN
SELECT sst FROM
NetworkStream.closeReplySST =>
BEGIN
sH.setSST[sH, NetworkStream.closeReplySST];
sH.sendNow[sH, FALSE];
state ← closed;
END;
NetworkStream.closeSST => state ← sendCloseReply;
ENDCASE;
END;
ENDLOOP;
IF state = sendCloseReply THEN status ← CloseReply[sH];
END; --Close
<<
This procedure conforms to the close protocol and is invoked by the client of
a network stream when it receives a subsequence change to closeSST. The
semantics of this call are that the client knows that the other end wants to
close the communication and it will not transmit any more data and will reply
with a closeReplySST. The status can be good, or incomplete; the latter
implying that an answer to the closeReplySST did not make it back and the
other end is gone, or that the closeReply was never acked.
>>
CloseReply: PUBLIC PROC[sH: Stream.Handle]
RETURNS [status: NetworkStream.CloseStatus] =
BEGIN
ENABLE
Stream.TimeOut, NetworkStream.ConnectionSuspended =>
{status ← noReply; CONTINUE};
sst: Stream.SubSequenceType;
compCode: Stream.CompletionCode;
data: PACKED ARRAY [0..2) OF Stream.Byte;
block: Environment.Block ← [@data, 0, LENGTH[data]];
status ← good;
sH.setSST[sH, NetworkStream.closeReplySST];
sH.sendNow[sH, FALSE];
DO --until exits
[, compCode, sst] ← sH.get[sH, block, Stream.defaultInputOptions];
IF (compCode = sstChange) AND (sst = NetworkStream.closeReplySST) THEN EXIT;
ENDLOOP;
END; --CloseReply
--Create a network stream to the specified remote address.
Create: PUBLIC PROC[
remote: System.NetworkAddress, connectData: Environment.Block,
timeout: NetworkStream.WaitTime, classOfService: NetworkStream.ClassOfService]
RETURNS [Stream.Handle] =
BEGIN
RETURN[
CreateTransducer[
uniqueNetworkAddr, remote, connectData,
unknownConnID, unknownConnID,
TRUE, timeout, classOfService]];
END; --Create
--Create a listener at the specified local Network Address.
--The generation of IllegalAddress will become more sophisticated.
--We enqueue all the buffers onto the socket channel.
CreateListener: PUBLIC PROC[addr: System.NetworkAddress]
RETURNS [listenerH: ListenerHandle] =
BEGIN
listenerH ← CommHeap.zone.NEW[NetworkStreamInternal.ListenerObject ← [
seal: listenerSeal, state: idle, buffer: NIL, socket: , queue: ]];
listenerH.socket ← SocketInternal.CreateListen[
socket: addr.socket, type: sequencedPacket,
clientData: listenerH, callback: NewConnection];
Process.DisableTimeout[@listenerH.condition];
Process.EnableAborts[@listenerH.condition];
NSBuffer.QueueInitialize[@listenerH.queue];
END; --CreateListener
--Create a sequenced packet transducer with all its parameters.
CreateTransducer: PUBLIC PROC[
local, remote: System.NetworkAddress, connectData: Environment.Block,
localConnID, remoteConnID: ConnectionID,
activelyEstablish: BOOLEAN, timeout: NetworkStream.WaitTime,
classOfService: NetworkStream.ClassOfService]
RETURNS [sH: Stream.Handle] =
BEGIN
psH: PacketStream.Handle;
instanceN: LONG POINTER TO FRAME[NetworkStreamImpl];
SELECT TRUE FROM
(~HostNumbers.IsMulticastID[@remote.host]) => NULL; --okay
(~CommFlags.doDebug) => --DON'T LET HIM GET AWAY WITH THIS-- DO
SIGNAL NetworkStream.ConnectionFailed[noTranslationForDestination];
ENDLOOP;
ENDCASE => Driver.Glitch[BadDestintationAddress];
psH ← PacketStream.Make[
local, remote, localConnID, remoteConnID, activelyEstablish,
timeout, classOfService];
instanceN ← SppOps.GlobalFrameFromPacketStream[psH];
sH ← instanceN.startByteStream[psH];
sH.delete ← Delete; --don't use instance's version of delete
END; --CreateTransducer
Delete: PROC[sH: Stream.Handle] =
BEGIN
--delete instance associcated with this stream
instanceN: LONG POINTER TO FRAME[NetworkStreamImpl] =
GlobalFrameFromByteStream[sH];
psH: PacketStream.Handle = SppOps.PacketStreamFromByteStream[sH];
instanceN.stopByteStream[]; --shut down the bytestream
psH.destroy[psH]; --then the underlying packet stream
END;
--This procedure deletes a listener.
DeleteListener: PUBLIC PROC[listenerH: ListenerHandle] =
BEGIN
IF (listenerH = NIL) OR (listenerH.seal # listenerSeal)
THEN ERROR ListenError[illegalHandle];
IF listenerH.state = pending THEN
RouterInternal.SendErrorPacket[listenerH.buffer, listenerReject];
listenerH.seal ← unsealed;
NSBuffer.QueueCleanup[@listenerH.queue];
Socket.Delete[listenerH.socket];
CommHeap.zone.FREE[@listenerH];
END; --DeleteListener
--This procedure returns the local and remote addresses of the Network Stream.
FindAddresses: PUBLIC PROC[sH: Stream.Handle]
RETURNS [local, remote: System.NetworkAddress] =
BEGIN
[local, remote] ← PacketStream.FindAddresses[
LOOPHOLE[sH, NetworkStreamInternal.ControlHandle].psH];
END; --FindAddresses
GlobalFrameFromByteStream: PUBLIC PROC[sH: Stream.Handle]
RETURNS[LONG POINTER --TO FRAME[NetworkStreamImpl]--] =
{RETURN[LOOPHOLE[Runtime.GlobalFrame[LOOPHOLE[sH.get]]]]};
<<
Listen on the specified socket channel for a sequenced packet.
This procedure checks to make sure that the arriving packet is not a
duplicate.
>>
Listen: PUBLIC PROC[
listenerH: ListenerHandle, connectData: Environment.Block,
listenTimeout: NetworkStream.WaitTime]
RETURNS [remote: System.NetworkAddress, bytes: CARDINAL] =
BEGIN
GetPacket: ENTRY PROC =
BEGIN
ENABLE UNWIND => NULL;
WHILE (b ← NSBuffer.Dequeue[@listenerH.queue]) = NIL DO
WAIT listenerH.condition; ENDLOOP;
END; --GetPacket
b: NSBuffer.Buffer;
IF (listenerH = NIL) OR (listenerH.seal # listenerSeal)
THEN ERROR ListenError[illegalHandle];
b ← listenerH.buffer;
SELECT listenerH.state FROM
pending => RouterInternal.SendErrorPacket[b, listenerReject];
listening => ERROR ListenError[illegalState];
ENDCASE;
listenerH.state ← listening;
Socket.SetWaitTime[listenerH.socket, listenTimeout];
DO
--until the packet is a sequenced packet and is not a duplicate,
--or timeout, or aborted socket
body: NSBuffer.Body;
GetPacket[ ! UNWIND => listenerH.state ← idle];
IF b = NIL THEN LOOP; --nothing happening here
body ← b.ns; --makes for cheaper access
--discard any cases with something wrong, ENDCASE is only acceptable packet
SELECT TRUE FROM
(b.fo.status # goodCompletion) =>
NSBuffer.ReturnBuffer[b]; --don't chance it with bad buffers
(HostNumbers.IsMulticastID[@body.destination.host]) =>
NSBuffer.ReturnBuffer[b]; --he's insane, but can't error broadcast
(HostNumbers.IsMulticastID[@body.source.host]) =>
NSBuffer.ReturnBuffer[b]; --he's busted, and I don't care
(body.packetType # sequencedPacket) =>
RouterInternal.SendErrorPacket[b, invalidPacketType];
(body.destinationConnectionID # unknownConnID) =>
RouterInternal.SendErrorPacket[b, protocolViolation];
(body.sourceConnectionID = unknownConnID) =>
RouterInternal.SendErrorPacket[b, protocolViolation];
(body.sequenceNumber # 0) =>
RouterInternal.SendErrorPacket[b, protocolViolation];
(~body.systemPacket) =>
RouterInternal.SendErrorPacket[b, protocolViolation];
(~PacketStream.ConnectionAlreadyThere[
body.source, body.sourceConnectionID]) =>
BEGIN
listenerH.buffer ← b;
listenerH.state ← pending;
RETURN [body.source, 0];
END;
ENDCASE => NSBuffer.ReturnBuffer[b];
ENDLOOP;
END; --Listen
NewConnection: ENTRY SocketInternal.ListenerProcType =
BEGIN
OPEN lH: NARROW[clientData, NetworkStreamInternal.ListenerHandle];
NSBuffer.Enqueue[@lH.queue, b]; NOTIFY lH.condition;
END; --NewConnection
SetWaitTime: PUBLIC PROC[sH: Stream.Handle, time: NetworkStream.WaitTime] =
{sH.setTimeout[sH, time]};
--initialization (Cold)
END. --NetworkStreamMgr module
LOG
17-May-84 10:57:14 - AOF - Post Klamath
5-Nov-85 11:34:19 - AOF - Removing LOOPHOLEs
10-Dec-85 18:30:02 - AOF - Listener restructuring
17-Apr-86 16:46:21 - AOF - Trap use of multicast address
22-May-86 11:47:18 - SMA - Remove dependencies on Buffer.
14-Apr-87 11:50:12 - AOF - Catch multicast sources in Listen.