ComputeServerPupsImpl.mesa
The Compute Server side of the Summoner.
Last Edited by: Bob Hagmann, May 13, 1985 3:43:37 pm PDT
Copyright © 1984 by Xerox Corporation. All rights reserved.
DIRECTORY
Basics,
Commander,
ComputeServerClient,
ComputeServer,
ComputeServerControl,
ComputeServerDebugger,
ComputeServerInternal,
Convert,
InterpreterToolPrivate,
IO,
PrincOps,
Process,
PupDefs,
PupErrors,
PupRouterDefs,
PupStream,
PupTypes,
Rope;
ComputeServerPupsImpl:
CEDAR
PROGRAM
IMPORTS ComputeServerInternal, Convert, IO, Process, PupDefs, PupErrors, PupRouterDefs, PupStream, Rope
EXPORTS ComputeServerControl, ComputeServerInternal
= BEGIN
Variable Declarations
STREAM: TYPE = IO.STREAM;
ROPE: TYPE = Rope.ROPE;
MyNetAddressRope: PUBLIC ROPE ← NIL;
myPupAddress: PupDefs.PupAddress;
myHostName: PUBLIC ROPE ← PupDefs.GetMyName[];
PupListener: TYPE = REF PupListenerObject;
PupListenerObject:
PUBLIC
TYPE =
RECORD[
socket: PupDefs.PupSocket,
timeout: PupDefs.Tocks,
stop: BOOLEAN,
who: PROCEDURE [STREAM, PupDefs.PupAddress, PupListener, PupDefs.PupAddress],
check: PROCEDURE [PupDefs.PupAddress],
proc: PROCESS
];
inStreamProcs:
PUBLIC
REF
IO.StreamProcs ←
IO.CreateStreamProcs[
variety: $input,
class: $ROPE,
getChar: inBufGetChar,
endOf: inBufEndOf,
charsAvail: inCharsAvail,
getIndex: inBufGetIndex,
close: inBufClose
];
outStreamProcs:
PUBLIC
REF
IO.StreamProcs ←
IO.CreateStreamProcs[
variety: $output,
class: $ROPE,
putChar: outBufPutChar,
getIndex: outBufGetIndex,
close: outBufClose
];
BufStreamData: TYPE = ComputeServerInternal.BufStreamData;
bufStreamState: TYPE = ComputeServerInternal.bufStreamState;
BufStreamDataObject: TYPE = ComputeServerInternal.BufStreamDataObject;
PupListeners - listen for Pup Stream connection
This code is lifted from PupListeners.mesa. It was modified to return the listener Pup Address when the listener process is created. This allows matching of the Pup Byte Stream created, with the original RPC call that initiated the stream connection.
DontReject: PUBLIC PROCEDURE [PupDefs.PupAddress] = BEGIN END;
CreatePupByteStreamListener:
PUBLIC
PROC [
local: PupDefs.PupSocketID, proc: PROCEDURE [STREAM, PupDefs.PupAddress, PupListener, PupDefs.PupAddress], ticks: PupDefs.Tocks,
filter: PROCEDURE [PupDefs.PupAddress]] RETURNS [PupListener] =
TRUSTED BEGIN
him: PupListener = NEW[PupListenerObject];
him.socket ← PupDefs.PupSocketMake[local, PupTypes.fillInPupAddress, PupDefs.veryLongWait];
him.timeout ← ticks;
him.stop ← FALSE;
him.who ← proc;
him.check ← filter;
him.proc ← FORK Listen[him, local];
RETURN[him];
END;
DestroyPupListener:
PUBLIC PROC [listener: PupListener] =
TRUSTED BEGIN
listener.stop ← TRUE;
PupDefs.PupSocketKick[listener.socket];
JOIN listener.proc;
PupDefs.PupSocketDestroy[listener.socket];
END;
ListenerProcess:
PUBLIC
PROC [stream:
STREAM, connectionPupAddress: PupDefs.PupAddress, listener: PupListener, listenerPupAddress: PupDefs.PupAddress] = {
matchedItem: ComputeServerInternal.ActiveServicesItem ;
matchOK, deleteOK: BOOL ← FALSE;
[found: matchOK, item: matchedItem] ← ComputeServerInternal.MatchPupAddress[listenerPupAddress];
IF ~matchOK THEN ERROR;
matchedItem.remoteStream ← stream ;
matchedItem.listener ← NIL;
DestroyPupListener[listener];
};
Listen:
PROC [listener: PupListener, local: PupDefs.PupSocketID] =
TRUSTED BEGIN
soc: PupRouterDefs.PupRouterSocket;
arg: STREAM;
b: PupDefs.PupBuffer;
UNTIL listener.stop
DO
b ← listener.socket.get[];
IF b #
NIL
THEN
BEGIN
SELECT b.pupType
FROM
rfc =>
BEGIN OPEN PupStream;
FOR soc ← PupRouterDefs.GetFirstPupSocket[], soc.next
UNTIL soc =
NIL
DO
check for duplicate
IF soc.remote # b.source THEN LOOP;
IF soc.id # b.pupID THEN LOOP;
b.address ← soc.local;
PupDefs.SwapPupSourceAndDest[b];
PupDefs.PupRouterSendThis[b];
EXIT;
ENDLOOP;
IF soc =
NIL
THEN
BEGIN -- not a duplicate, make a new connection
him: PupDefs.PupAddress ← b.address;
listener.check[
him !
RejectThisRequest =>
BEGIN
b.pupType ← abort;
PupDefs.SwapPupSourceAndDest[b];
PupDefs.SetPupContentsBytes[b, 2];
PupDefs.AppendRopeToPupBuffer[b, error];
PupDefs.PupRouterSendThis[b];
GOTO Reject;
END];
PupDefs.ReturnFreePupBuffer[b];
arg ← PupByteStreamMake[ local, him, listener.timeout, alreadyOpened, b.pupID];
Process.Detach[FORK listener.who[arg, him, listener, PupDefs.AnyLocalPupAddress[local]]];
END;
EXITS Reject => NULL;
END;
echoMe =>
BEGIN
b.pupType ← iAmEcho;
PupDefs.SwapPupSourceAndDest[b];
PupDefs.PupRouterSendThis[b];
END;
ENDCASE => PupDefs.SendErrorPup[b, LOOPHOLE[100B], "RFC expected"];
END;
ENDLOOP;
END;
Internal Streams
input procedures
inBufGetChar:
PUBLIC
PROC [self:
STREAM]
RETURNS [ch:
CHAR] = {
data: BufStreamData = NARROW[self.streamData];
DO
IF data.EOF = true THEN ERROR IO.EndOfStream[self];
IF data.
EOF = pending
AND data.inPointer = data.outPointer
THEN {
data.EOF ← true;
ERROR IO.EndOfStream[self];
};
IF data.inPointer # data.outPointer
THEN {
ch ← data.buffer[data.inPointer MOD ComputeServerInternal.BufStreamBufferSize];
data.inPointer ← data.inPointer + 1;
RETURN;
}
ELSE
IF data.listenerItem.success = communicationFailure
THEN {
Process.CheckForAbort[];
ERROR IO.Error[ec: Failure, stream: self];
};
Process.Pause[5];
ENDLOOP;
};
inBufEndOf:
PROC [self:
STREAM]
RETURNS [
BOOL] = {
data: BufStreamData = NARROW[self.streamData];
WHILE data.
EOF = false
AND data.inPointer = data.outPointer
DO
Process.Pause[5];
ENDLOOP;
IF data.EOF = true OR data.inPointer = data.outPointer THEN RETURN[TRUE] ELSE RETURN[FALSE];
};
inCharsAvail:
PUBLIC
PROC [self:
STREAM, wait:
BOOL]
RETURNS [
INT] = {
data: BufStreamData = NARROW[self.streamData];
RETURN[(IF data.EOF # false THEN 1 ELSE 0) + data.outPointer - data.inPointer];
};
inBufGetIndex:
PROC [self:
STREAM]
RETURNS [
INT] = {
data: BufStreamData = NARROW[self.streamData];
RETURN[data.inPointer];
};
inBufClose:
PROC [self:
STREAM, abort:
BOOL] = {
data: BufStreamData = NARROW[self.streamData];
data.EOF ← true;
};
output procedures
outBufPutChar:
PUBLIC
PROC [self:
STREAM, char:
CHAR] = {
data: BufStreamData = NARROW[self.streamData];
WHILE (data.inPointer - (data.outPointer + 1))
MOD ComputeServerInternal.BufStreamBufferSize = 0
DO
IF data.listenerItem.success = communicationFailure THEN RETURN; -- ignore output once communications fails
Process.Pause[5];
ENDLOOP;
data.buffer[data.outPointer MOD ComputeServerInternal.BufStreamBufferSize] ← char;
data.outPointer ← data.outPointer + 1;
};
outBufGetIndex:
PROC [self:
STREAM]
RETURNS [
INT] = {
data: BufStreamData = NARROW[self.streamData];
RETURN[data.outPointer];
};
outBufClose:
PROC [self:
STREAM, abort:
BOOL] = {
data: BufStreamData = NARROW[self.streamData];
IF data.EOF = false THEN data.EOF ← pending;
};
Initialization
myPupAddress ← PupDefs.GetPupAddress[[0,0], myHostName];
MyNetAddressRope ← Rope.Cat[Convert.RopeFromCard[myPupAddress.net, 8,
FALSE],
"#",
Convert.RopeFromCard[myPupAddress.host, 8, FALSE],
"#"];
END.