ComputeServerPupsImpl.mesa
The Compute Server side of the Summoner.
Last Edited by: Bob Hagmann, May 13, 1985 3:43:37 pm PDT
Copyright © 1984 by Xerox Corporation. All rights reserved.
DIRECTORY
Basics,
Commander,
ComputeServerClient,
ComputeServer,
ComputeServerControl,
ComputeServerDebugger,
ComputeServerInternal,
Convert,
InterpreterToolPrivate,
IO,
PrincOps,
Process,
PupDefs,
PupErrors,
PupRouterDefs,
PupStream,
PupTypes,
Rope;
ComputeServerPupsImpl: CEDAR PROGRAM
IMPORTS ComputeServerInternal, Convert, IO, Process, PupDefs, PupErrors, PupRouterDefs, PupStream, Rope
EXPORTS ComputeServerControl, ComputeServerInternal
= BEGIN
Variable Declarations
STREAM: TYPE = IO.STREAM;
ROPE: TYPE = Rope.ROPE;
MyNetAddressRope: PUBLIC ROPENIL;
myPupAddress: PupDefs.PupAddress;
myHostName: PUBLIC ROPE ← PupDefs.GetMyName[];
PupListener: TYPE = REF PupListenerObject;
PupListenerObject: PUBLIC TYPE = RECORD[
socket: PupDefs.PupSocket,
timeout: PupDefs.Tocks,
stop: BOOLEAN,
who: PROCEDURE [STREAM, PupDefs.PupAddress, PupListener, PupDefs.PupAddress],
check: PROCEDURE [PupDefs.PupAddress],
proc: PROCESS
];
inStreamProcs: PUBLIC REF IO.StreamProcs ← IO.CreateStreamProcs[
variety: $input,
class: $ROPE,
getChar: inBufGetChar,
endOf: inBufEndOf,
charsAvail: inCharsAvail,
getIndex: inBufGetIndex,
close: inBufClose
];
outStreamProcs: PUBLIC REF IO.StreamProcs ← IO.CreateStreamProcs[
variety: $output,
class: $ROPE,
putChar: outBufPutChar,
getIndex: outBufGetIndex,
close: outBufClose
];
BufStreamData: TYPE = ComputeServerInternal.BufStreamData;
bufStreamState: TYPE = ComputeServerInternal.bufStreamState;
BufStreamDataObject: TYPE = ComputeServerInternal.BufStreamDataObject;
PupListeners - listen for Pup Stream connection
This code is lifted from PupListeners.mesa. It was modified to return the listener Pup Address when the listener process is created. This allows matching of the Pup Byte Stream created, with the original RPC call that initiated the stream connection.
DontReject: PUBLIC PROCEDURE [PupDefs.PupAddress] = BEGIN END;
CreatePupByteStreamListener: PUBLIC PROC [
local: PupDefs.PupSocketID, proc: PROCEDURE [STREAM, PupDefs.PupAddress, PupListener, PupDefs.PupAddress], ticks: PupDefs.Tocks,
filter: PROCEDURE [PupDefs.PupAddress]] RETURNS [PupListener] =
TRUSTED BEGIN
him: PupListener = NEW[PupListenerObject];
him.socket ← PupDefs.PupSocketMake[local, PupTypes.fillInPupAddress, PupDefs.veryLongWait];
him.timeout ← ticks;
him.stop ← FALSE;
him.who ← proc;
him.check ← filter;
him.proc ← FORK Listen[him, local];
RETURN[him];
END;
DestroyPupListener: PUBLIC PROC [listener: PupListener] =
TRUSTED BEGIN
listener.stop ← TRUE;
PupDefs.PupSocketKick[listener.socket];
JOIN listener.proc;
PupDefs.PupSocketDestroy[listener.socket];
END;
ListenerProcess: PUBLIC PROC [stream: STREAM, connectionPupAddress: PupDefs.PupAddress, listener: PupListener, listenerPupAddress: PupDefs.PupAddress] = {
matchedItem: ComputeServerInternal.ActiveServicesItem ;
matchOK, deleteOK: BOOLFALSE;
[found: matchOK, item: matchedItem] ← ComputeServerInternal.MatchPupAddress[listenerPupAddress];
IF ~matchOK THEN ERROR;
matchedItem.remoteStream ← stream ;
matchedItem.listener ← NIL;
DestroyPupListener[listener];
};
Listen: PROC [listener: PupListener, local: PupDefs.PupSocketID] =
TRUSTED BEGIN
soc: PupRouterDefs.PupRouterSocket;
arg: STREAM;
b: PupDefs.PupBuffer;
UNTIL listener.stop DO
b ← listener.socket.get[];
IF b # NIL THEN
BEGIN
SELECT b.pupType FROM
rfc =>
BEGIN OPEN PupStream;
FOR soc ← PupRouterDefs.GetFirstPupSocket[], soc.next UNTIL soc = NIL DO
check for duplicate
IF soc.remote # b.source THEN LOOP;
IF soc.id # b.pupID THEN LOOP;
b.address ← soc.local;
PupDefs.SwapPupSourceAndDest[b];
PupDefs.PupRouterSendThis[b];
EXIT;
ENDLOOP;
IF soc = NIL THEN
BEGIN -- not a duplicate, make a new connection
him: PupDefs.PupAddress ← b.address;
listener.check[
him !
RejectThisRequest =>
BEGIN
b.pupType ← abort;
PupDefs.SwapPupSourceAndDest[b];
PupDefs.SetPupContentsBytes[b, 2];
PupDefs.AppendRopeToPupBuffer[b, error];
PupDefs.PupRouterSendThis[b];
GOTO Reject;
END];
PupDefs.ReturnFreePupBuffer[b];
arg ← PupByteStreamMake[ local, him, listener.timeout, alreadyOpened, b.pupID];
Process.Detach[FORK listener.who[arg, him, listener, PupDefs.AnyLocalPupAddress[local]]];
END;
EXITS Reject => NULL;
END;
echoMe =>
BEGIN
b.pupType ← iAmEcho;
PupDefs.SwapPupSourceAndDest[b];
PupDefs.PupRouterSendThis[b];
END;
ENDCASE => PupDefs.SendErrorPup[b, LOOPHOLE[100B], "RFC expected"];
END;
ENDLOOP;
END;
Internal Streams
input procedures
inBufGetChar: PUBLIC PROC [self: STREAM] RETURNS [ch: CHAR] = {
data: BufStreamData = NARROW[self.streamData];
DO
IF data.EOF = true THEN ERROR IO.EndOfStream[self];
IF data.EOF = pending AND data.inPointer = data.outPointer THEN {
data.EOF ← true;
ERROR IO.EndOfStream[self];
};
IF data.inPointer # data.outPointer THEN {
ch ← data.buffer[data.inPointer MOD ComputeServerInternal.BufStreamBufferSize];
data.inPointer ← data.inPointer + 1;
RETURN;
}
ELSE IF data.listenerItem.success = communicationFailure THEN {
Process.CheckForAbort[];
ERROR IO.Error[ec: Failure, stream: self];
};
Process.Pause[5];
ENDLOOP;
};
inBufEndOf: PROC [self: STREAM] RETURNS [BOOL] = {
data: BufStreamData = NARROW[self.streamData];
WHILE data.EOF = false AND data.inPointer = data.outPointer DO
Process.Pause[5];
ENDLOOP;
IF data.EOF = true OR data.inPointer = data.outPointer THEN RETURN[TRUE] ELSE RETURN[FALSE];
};
inCharsAvail: PUBLIC PROC [self: STREAM, wait: BOOL] RETURNS [INT] = {
data: BufStreamData = NARROW[self.streamData];
RETURN[(IF data.EOF # false THEN 1 ELSE 0) + data.outPointer - data.inPointer];
};
inBufGetIndex: PROC [self: STREAM] RETURNS [INT] = {
data: BufStreamData = NARROW[self.streamData];
RETURN[data.inPointer];
};
inBufClose: PROC [self: STREAM, abort: BOOL] = {
data: BufStreamData = NARROW[self.streamData];
data.EOF ← true;
};
output procedures
outBufPutChar: PUBLIC PROC [self: STREAM, char: CHAR] = {
data: BufStreamData = NARROW[self.streamData];
WHILE (data.inPointer - (data.outPointer + 1)) MOD ComputeServerInternal.BufStreamBufferSize = 0 DO
IF data.listenerItem.success = communicationFailure THEN RETURN; -- ignore output once communications fails
Process.Pause[5];
ENDLOOP;
data.buffer[data.outPointer MOD ComputeServerInternal.BufStreamBufferSize] ← char;
data.outPointer ← data.outPointer + 1;
};
outBufGetIndex: PROC [self: STREAM] RETURNS [INT] = {
data: BufStreamData = NARROW[self.streamData];
RETURN[data.outPointer];
};
outBufClose: PROC [self: STREAM, abort: BOOL] = {
data: BufStreamData = NARROW[self.streamData];
IF data.EOF = false THEN data.EOF ← pending;
};
Initialization
myPupAddress ← PupDefs.GetPupAddress[[0,0], myHostName];
MyNetAddressRope ← Rope.Cat[Convert.RopeFromCard[myPupAddress.net, 8, FALSE],
"#",
Convert.RopeFromCard[myPupAddress.host, 8, FALSE],
"#"];
END.
Bob Hagmann May 3, 1985 8:41:54 am PDT
changes to: DIRECTORY, ComputeServerPupsImpl, inBufGetChar