Declarations
ROPE: TYPE = Rope.ROPE;
STREAM: TYPE = IO.STREAM;
Enabled: PUBLIC ComputeClientInternal.TristateBool ← false;
serviceStateListBase: serviceState ← NIL;
serviceState: TYPE = REF serviceStateObject;
serviceStateObject:
PUBLIC
TYPE =
RECORD [
hack: BOOL ← FALSE, --hopefully adding this field changes type hash
next: serviceState ← NIL, -- NIL until added to the list of active calls (serviceStateListBase)
listenerPupAddress: Pup.Address ← StreamPupAddress, -- initially bogus; once established, this is the unique ID for the connection and hence the RPC call
socketReady: BOOL ← FALSE,
streamReady: BOOL ← FALSE,
remoteStream: STREAM ← NIL,
inStreamEOF: BOOL ← FALSE,
outStreamEOF: BOOL ← FALSE,
enumerateItem: enumerationState ← NIL,
nextFile: ComputeServerCallbacks.OpenFile ← 1,
first16OpenFiles: ARRAY [1..16] OF FS.OpenFile ← ALL[FS.nullOpenFile],
openFileListBase: openFileItem ← NIL,
requestingProcess: PROCESS,
callFailed: BOOL ← FALSE,
callOver: BOOL ← FALSE,
callAbandoned: BOOL ← FALSE,
serverInstance: ROPE ← NIL,
command: ROPE ← NIL,
controllerAndServerOK: BOOL ← TRUE,
found: BOOL ← TRUE,
success: ComputeServerClient.RemoteSuccess,
msg: ROPE ← NIL
];
openFileItem: TYPE = REF openFileItemObject;
openFileItemObject:
TYPE =
RECORD [
next: openFileItem ← NIL,
fileNumber: ComputeServerCallbacks.OpenFile,
openFile: FS.OpenFile
];
serviceListObject: TYPE = ComputeClientInternal.ServiceListObject;
enumerationReady: CONDITION;
enumerationNeeded: CONDITION;
enumerationState: TYPE = REF enumerationStateObject;
enumerationStateObject:
TYPE =
RECORD [
needNext: BOOL ← FALSE,
ready: BOOL ← FALSE,
fullFName: ROPE ← NIL,
attachedTo: ROPE ← NIL,
created: BasicTime.GMT ← BasicTime.nullGMT ,
bytes: INT ← 0,
keep: CARDINAL ← 0,
continue: BOOLEAN ← TRUE
];
ControllerInterface: PUBLIC ComputeServerControllerRpcControl.InterfaceRecord;
ControllerGVName: PUBLIC ROPE ← NIL;
NoOfGetControllerInterfaceProcesses: INT ← 0 ;
FirstTryToGetController: BOOL ← TRUE;
serverInterfaceItem:
TYPE =
RECORD [
serverInstance: ROPE ← NIL,
interface: ComputeServerRpcControl.InterfaceRecord ← NIL,
lastUsed: BasicTime.GMT ← BasicTime.earliestGMT
];
serverInterfaceCacheSize: INT = 20;
serverInterfaceArray: TYPE = ARRAY [0..serverInterfaceCacheSize) OF serverInterfaceItem;
serverInterfaceCache: REF serverInterfaceArray;
ClientMachineName: PUBLIC RPC.ShortROPE ;
StreamPupAddress: PUBLIC Pup.Address;
myNetAddressRope: PUBLIC ROPE;
CachePrefixesOKToRead: PUBLIC LIST OF ROPE;
disableServerIFIdle: BOOL ← FALSE;
disableIFIdleAfter: INT ← 1900;
disableIFIdleBefore: INT ← 700;
idleMinutesTilPowerOff: INT ← 10;
triedToPowerOff: BOOL ← FALSE;
Service Initialization and Status
GetControllerInterfaceProcess:
PUBLIC PROC [forceImport:
BOOL ←
FALSE] = {
delayBeforeNextProbe: INT ← 13;
MakeSureThereIsOne:
ENTRY
PROC
RETURNS [exit:
BOOL ←
FALSE] = {
IF NoOfGetControllerInterfaceProcesses = 0 THEN NoOfGetControllerInterfaceProcesses ← 1
ELSE RETURN[TRUE];
};
NowWeAreGone: ENTRY PROC = { NoOfGetControllerInterfaceProcesses ← 0};
importedControllerInterface: ComputeServerControllerRpcControl.InterfaceRecord ← NIL;
IF forceImport
THEN {
DO
IF ~MakeSureThereIsOne[] THEN EXIT;
Process.Pause[5];
ENDLOOP;
}
ELSE {IF MakeSureThereIsOne[] THEN RETURN;};
FirstTryToGetController ← TRUE;
WHILE importedControllerInterface =
NIL
DO
info: GVNames.ConnectInfo;
connect: GVBasics.Connect;
localImportWorked: BOOL ← FALSE;
[info: info, connect: connect ] ← GVNames.GetConnect[ComputeClientInternal.ControllerGVName];
IF (info = individual
OR info = group)
AND Rope.Equal[connect, ComputeClientInternal.myNetAddressRope]
THEN
TRUSTED {
result: AMTypes.TV;
localImportWorked ← TRUE;
importedControllerInterface ← NEW[ComputeServerControllerRpcControl.InterfaceRecordObject];
result ← Interpreter.Evaluate["ComputeControllerImpl.FindService"].result;
IF result = NIL THEN localImportWorked ← FALSE;
importedControllerInterface.clientStubFindService ← lFindService;
MyFindService ← LOOPHOLE[AMBridge.TVToProc[result]];
result ← Interpreter.Evaluate["ComputeControllerImpl.FindServiceWithQueueing"].result;
IF result = NIL THEN localImportWorked ← FALSE;
importedControllerInterface.clientStubFindServiceWithQueueing ← lMyFindServiceWithQueueing;
MyFindServiceWithQueueing ← LOOPHOLE[AMBridge.TVToProc[result]];
result ← Interpreter.Evaluate["ComputeControllerImpl.NewStats"].result;
IF result = NIL THEN localImportWorked ← FALSE;
importedControllerInterface.clientStubNewStats ← lNewStats;
MyNewStats ← LOOPHOLE[AMBridge.TVToProc[result]];
result ← Interpreter.Evaluate["ComputeControllerImpl.BestServerStats"].result;
IF result = NIL THEN localImportWorked ← FALSE;
importedControllerInterface.clientStubBestServerStats ← lBestServerStats;
MyBestServerStats ← LOOPHOLE[AMBridge.TVToProc[result]];
result ← Interpreter.Evaluate["ComputeControllerImpl.NoticeNewPackage"].result;
IF result = NIL THEN localImportWorked ← FALSE;
importedControllerInterface.clientStubNoticeNewPackage ← lNoticeNewPackage;
MyNoticeNewPackage ← LOOPHOLE[AMBridge.TVToProc[result]];
result ← Interpreter.Evaluate["ComputeControllerImpl.RemoveOldPackage"].result;
IF result = NIL THEN localImportWorked ← FALSE;
importedControllerInterface.clientStubRemoveOldPackage ← lRemoveOldPackage;
MyRemoveOldPackage ← LOOPHOLE[AMBridge.TVToProc[result]];
result ← Interpreter.Evaluate["ComputeControllerImpl.GetSomeInfo"].result;
IF result = NIL THEN localImportWorked ← FALSE;
importedControllerInterface.clientStubGetSomeInfo ← lGetSomeInfo;
MyGetSomeInfo ← LOOPHOLE[AMBridge.TVToProc[result]];
result ← Interpreter.Evaluate["ComputeControllerImpl.CommandUnavailable"].result;
IF result = NIL THEN localImportWorked ← FALSE;
importedControllerInterface.clientStubCommandUnavailable ← lCommandUnavailable;
MyCommandUnavailable ← LOOPHOLE[AMBridge.TVToProc[result]];
result ← Interpreter.Evaluate["ComputeControllerImpl.ExtraCommandAvailable"].result;
IF result = NIL THEN localImportWorked ← FALSE;
importedControllerInterface.clientStubExtraCommandAvailable ← lExtraCommandAvailable;
MyExtraCommandAvailable ← LOOPHOLE[AMBridge.TVToProc[result]];
result ← Interpreter.Evaluate[ "ComputeControllerImpl.MightAcceptQueuedCommand"].result;
IF result = NIL THEN localImportWorked ← FALSE;
importedControllerInterface.clientStubMightAcceptQueuedCommand ← lMightAcceptQueuedCommand;
MyMightAcceptQueuedCommand ← LOOPHOLE[AMBridge.TVToProc[result]];
result ← Interpreter.Evaluate[ "ComputeControllerImpl.GenericToController"].result;
IF result = NIL THEN localImportWorked ← FALSE;
importedControllerInterface.clientStubGenericToController ← lGenericToController;
MyGenericToController ← LOOPHOLE[AMBridge.TVToProc[result]];
};
IF ~localImportWorked
THEN {
importedControllerInterface ← NIL;
importedControllerInterface ← ComputeServerControllerRpcControl.ImportNewInterface[interfaceName: [ type: "ComputeServerController.summoner", instance: ComputeClientInternal.ControllerGVName, version: [1,1]]
!
RPC.ImportFailed => {
delayBeforeNextProbe ← MIN[delayBeforeNextProbe+3, 180];
Process.Pause[Process.SecondsToTicks[delayBeforeNextProbe]];
CONTINUE;
};
];
};
FirstTryToGetController ← FALSE;
ENDLOOP;
ComputeClientInternal.ControllerInterface ← importedControllerInterface;
NowWeAreGone[];
};
TryToImportController:
PUBLIC
PROC = {
IF NoOfGetControllerInterfaceProcesses = 1 THEN RETURN;
TRUSTED {Process.Detach[FORK GetControllerInterfaceProcess[]]; };
Process.Pause[Process.SecondsToTicks[1]];
WHILE NoOfGetControllerInterfaceProcesses = 1
AND FirstTryToGetController
DO
Process.Pause[Process.SecondsToTicks[1]];
ENDLOOP;
};
BestServerStatsDisabled:
PROC
RETURNS[success: ComputeServerClient.RemoteSuccess ← false, selfIsBest:
BOOL←
TRUE,
FOM:
REAL ← 1.0] = {
RETURN [clientNotRunning, FALSE, 1.0];
};
StartServiceDisabled:
PROC [service:
ROPE, version:
ROPE, cmdLine:
ROPE, in, out:
STREAM, queueService:
BOOL ←
FALSE, timeToWait:
INT ← 3600, retries:
NAT ← 3]
RETURNS [found:
BOOL ←
TRUE, success: ComputeServerClient.RemoteSuccess ← false, remoteMsg:
ROPE ←
NIL, serverInstance:
ROPE ←
NIL] = {
RETURN [FALSE, clientNotRunning, "Client Not Enabled", NIL];
};
StartService:
PUBLIC
PROC [service:
ROPE, version:
ROPE, cmdLine:
ROPE, in, out:
STREAM, queueService:
BOOL ←
FALSE, timeToWait:
INT ← 3600, retries:
NAT ← 3]
RETURNS [found:
BOOL ←
TRUE, success: ComputeServerClient.RemoteSuccess ← false, remoteMsg:
ROPE ←
NIL, serverInstance:
ROPE ←
NIL] = {
RETURN StartServiceExtra[service, version, cmdLine, in, out, queueService, timeToWait, retries, NIL];
};
StartServiceExtra:
PUBLIC
PROC [service:
ROPE, version:
ROPE, cmdLine:
ROPE, in, out:
STREAM, queueService:
BOOL ←
FALSE, timeToWait:
INT ← 3600, retries:
NAT ← 3, serverName:
ROPE ←
NIL]
RETURNS [found:
BOOL ←
TRUE, success: ComputeServerClient.RemoteSuccess ← false, remoteMsg:
ROPE ←
NIL, serverInstance:
ROPE ←
NIL] = {
importCounter: INT ← 0 ;
workingDirectory: ROPE ← NARROW [ ProcessProps.GetProp[$WorkingDirectory] ];
serviceProcess: PROCESS;
currentService: serviceState;
tempControllerInterface: ComputeServerControllerRpcControl.InterfaceRecord ;
controllerAndServerOK: BOOL ← TRUE;
count: CARDINAL ← 0 ;
abortCount: CARDINAL ← 0;
abandon: BOOL ← FALSE;
needIn: BOOL ← TRUE;
needOut: BOOL ← TRUE;
DoAbort:
PROC [serviceToAbort: serviceState, fullAbort:
BOOL ←
FALSE] = {
IF abortCount = 0
THEN
TRUSTED {
Process.Detach[FORK TryToAbortProcess[serviceToAbort]];
};
abortCount ← abortCount + 1;
IF abortCount = 3
OR fullAbort
THEN {
TRUSTED {Process.Detach[serviceProcess];};
serviceToAbort.callFailed ← TRUE;
serviceToAbort.callOver ← TRUE;
serviceToAbort.callAbandoned ← TRUE;
abandon ← TRUE;
DestroyServiceItem[serviceToAbort.listenerPupAddress, serviceToAbort];
};
};
{
-- begin of block for ENABLE ABORTED
ENABLE
ABORTED => {
DoAbort[currentService];
};
monitorAndBufferStreams:
PROC = {
msg: ROPE ← NIL;
noPages: INT = 2;
bufferBlockSize: INT = VM.BytesForPages[noPages];
textBuffer: REF TEXT = NEW[TEXT [bufferBlockSize]];
WHILE ~currentService.callFailed
AND (~currentService.callOver
OR ~currentService.outStreamEOF)
AND (~currentService.inStreamEOF
OR ~currentService.outStreamEOF)
DO
doneSomething: BOOL ← FALSE;
Process.CheckForAbort[ ! ABORTED => {DoAbort[currentService]; CONTINUE;};];
IF (needIn
OR needOut)
AND currentService.socketReady
THEN {
inOutPupStream: STREAM ← NIL;
inOutPupStream ← PupStream.Create[remote: currentService.listenerPupAddress, getTimeout: 1000, putTimeout: PupStream.waitForever
! PupStream.StreamClosing => {
msg ← Rope.Concat["Unable to open Pup Byte Stream to Server",
SELECT why
FROM
noRouteToNetwork => " because noRouteToNetwork",
transmissionTimeout => " because transmissionTimeout",
remoteReject => " because remoteReject",
ENDCASE => ""];
GOTO streamOpenFailed;
};
] ;
currentService.remoteStream ← inOutPupStream;
currentService.streamReady ← TRUE; -- must set streamReady before ~socketReady
currentService.socketReady ← FALSE;
EXITS
streamOpenFailed => {
currentService.msg ← msg;
currentService.socketReady ← FALSE;
DoAbort[currentService, TRUE];
};
};
IF currentService.streamReady
AND ~currentService.inStreamEOF
AND
in.CharsAvail[] > 0
THEN {
WHILE in.CharsAvail[] > 0
AND ~currentService.inStreamEOF
DO
currentService.remoteStream.PutChar[in.GetChar[!
IO.EndOfStream => {
currentService.inStreamEOF ← TRUE;
PupStream.SendMark[currentService.remoteStream, 26B ! PupStream.StreamClosing => CONTINUE;];
CONTINUE;
};
PupStream.StreamClosing => {
currentService.inStreamEOF ← TRUE;
CONTINUE;
};
]];
ENDLOOP;
currentService.remoteStream.Flush[ ! PupStream.StreamClosing => {
currentService.inStreamEOF ← TRUE;
CONTINUE;
};
];
doneSomething ← TRUE;
};
IF currentService.callOver AND currentService.streamReady AND currentService.remoteStream.CharsAvail[] = 0 THEN currentService.outStreamEOF ← TRUE;
IF currentService.streamReady
AND ~currentService.outStreamEOF
AND currentService.remoteStream.CharsAvail[] > 0
THEN {
charsWanted: INT ← currentService.remoteStream.CharsAvail[];
nBytesRead: INT;
TRUSTED {
nBytesRead ← currentService.remoteStream.GetBlock[textBuffer, 0, MIN[charsWanted, bufferBlockSize]];
};
IF nBytesRead = 0 AND currentService.remoteStream.EndOf[] THEN currentService.outStreamEOF ← TRUE;
IF out # NIL AND ~currentService.outStreamEOF THEN out.PutBlock[textBuffer, 0, nBytesRead];
WHILE currentService.remoteStream.CharsAvail[] > 0 AND
~currentService.outStreamEOF DO
char: CHAR;
char ← currentService.remoteStream.GetChar[! IO.EndOfStream => {currentService.outStreamEOF ← TRUE; CONTINUE;}];
IF out # NIL AND ~currentService.outStreamEOF THEN out.PutChar[char];
ENDLOOP;
doneSomething ← TRUE;
};
IF ~doneSomething THEN Process.Pause[5 ! ABORTED => {DoAbort[currentService]; CONTINUE;};];
ENDLOOP;
};
—— Start of Body for StartService ——
The idea is to start up another process to actually do the call. This process will wake up periodically and check ABORT and copy streams around (monitorAndBufferStreams). On ABORT, we try to abort the remote process. We either Detach the forked process after three ABORTs, or we JOIN with it and recover the results.
IF in = NIL OR in = IO.noInputStream THEN needIn ← FALSE;
IF out =
NIL
OR out =
IO.noWhereStream
THEN needOut ←
FALSE;
loop to allow for retries, re-import of controller and server, stream errors and who knows what else
WHILE found
AND count < retries
AND (controllerAndServerOK
OR (count <= 1))
AND success = false
AND ~abandon
DO
count ← count + 1;
abortCount ← 0;
currentService ← NEW[serviceStateObject]; -- new object per loop => new object per fork
currentService.requestingProcess ← LOOPHOLE[Process.GetCurrent[]];
tempControllerInterface ← ControllerInterface;
IF serverName =
NIL
AND tempControllerInterface =
NIL
THEN {
-- try to get a new interface if this one is NIL; not needed if we have an explicit serverName
msecInFourTicks: CARDINAL = Process.TicksToMsec[4];
loopsForFiveSeconds: INT = (5*1000)/msecInFourTicks;
ComputeClientInternal.TryToImportController[];
FOR loopCount:
INT
IN [0 .. loopsForFiveSeconds)
DO
tempControllerInterface ← ControllerInterface;
IF tempControllerInterface # NIL THEN EXIT;
Process.Pause[4];
ENDLOOP;
IF tempControllerInterface = NIL THEN RETURN[FALSE, cantImportController, "Controller cannot be Imported - may be down or recovering"];
};
serviceProcess ← FORK ServiceProcess[tempControllerInterface, currentService, service, version, timeToWait, cmdLine, ClientMachineName, queueService, workingDirectory, needIn, needOut, serverName];
IF ~needIn THEN currentService.inStreamEOF ← TRUE;
IF ~needOut THEN currentService.outStreamEOF ← TRUE;
monitorAndBufferStreams[]; -- this is where we spend almost all of the call
IF currentService.remoteStream # NIL THEN currentService.remoteStream.Close[ ! PupStream.StreamClosing => CONTINUE;];
WHILE ~currentService.callAbandoned
AND ~currentService.callOver
AND ~currentService.callFailed
DO
Process.Pause[2 ! ABORTED => {DoAbort[currentService]; CONTINUE;};];
ENDLOOP;
IF ~currentService.callAbandoned
THEN {
TRUSTED {[] ← JOIN serviceProcess;};
controllerAndServerOK ← currentService.controllerAndServerOK;
found ← currentService.found;
success ← currentService.success;
remoteMsg ← currentService.msg;
serverInstance ← currentService.serverInstance;
};
ENDLOOP;
}; -- end of block for ENABLE ABORTED
};
ServiceProcess:
PROC [controllerInterface: ComputeServerControllerRpcControl.InterfaceRecord, currentService: serviceState, service:
ROPE, version:
ROPE, timeToWait:
INT, cmdLine:
ROPE, clientMachineName:
RPC.ShortROPE, queueService:
BOOL, workingDirectory:
ROPE, needRemoteInStream:
BOOL, needRemoteOutStream:
BOOL, serverName:
ROPE ] = {
Must set serverInstance, controllerAndServerOK, found, success, and msg in currentService before returning!
controllerAndServerOK: BOOL ← TRUE;
found: BOOL ← TRUE;
success: ComputeServerClient.RemoteSuccess;
msg: ROPE ← NIL;
userName: RPC.ShortROPE = UserCredentials.Get[].name;
serverInstance: RPC.ShortROPE;
serverPupAddress: Pup.Address;
serverInterface: ComputeServerRpcControl.InterfaceRecord;
askForServiceErrMsg: ROPE;
errMsg: Rope.ROPE ← NIL;
askFound: ATOM;
currentService.serverInstance ← NIL;
currentService.command ← service;
{
IF queueService
AND serverName =
NIL
THEN {
[found: askFound, instance: serverInstance, serverPupAddress: serverPupAddress, errMsg: errMsg] ← controllerInterface.FindServiceWithQueueing[service: service, userName: userName, version: version, timeToWait: timeToWait, clientMachineName: clientMachineName, streamPupAddress: StreamPupAddress, needListener: (needRemoteInStream
OR needRemoteOutStream) !
RPC.CallFailed => {
currentService.callFailed ← TRUE;
ControllerInterface ← NIL;
GOTO controllerCallFailed;
};
];
IF askFound = $foundOK
THEN {
currentService.serverInstance ← serverInstance;
serverInterface ← getServerInterfaceFromCache[serverInstance];
IF serverInterface =
NIL
THEN {
currentService.callFailed ← TRUE;
currentService.controllerAndServerOK ← FALSE;
currentService.found ← TRUE;
currentService.success ← cantImportServer;
currentService.msg ← "Cannot Import from Server Selected by Controller";
RETURN;
RETURN[FALSE, TRUE, cantImportServer, "Cannot Import from Server Selected by Controller", NIL];
};
}
ELSE found ← FALSE;
}
ELSE {
-- not queueService or specified server
askForServiceCount: INT ← 0;
errorMessageX: ROPE ← NIL;
IF serverName #
NIL
THEN {
askFound ← $foundOK;
errorMessageX ← "Cannot Import from Server specified by request";
serverPupAddress ← PupName.NameLookup[serverName, Pup.nullSocket ! PupName.Error => GOTO badName];
serverInstance ← PupName.AddressToRope[serverPupAddress];
}
ELSE {
errorMessageX ← "Cannot Import from Server Selected by Controller";
[found: found, instance: serverInstance] ← controllerInterface.FindService[service, userName
!
RPC.CallFailed => {
currentService.callFailed ← TRUE;
ControllerInterface ← NIL;
GOTO controllerCallFailed;
};
];
IF ~found
THEN {
currentService.callFailed ← TRUE;
currentService.controllerAndServerOK ← TRUE;
currentService.found ← found;
currentService.success ← false;
currentService.msg ← "Requested command not found";
RETURN;
RETURN[TRUE, found, false, "Requested command not found", NIL];
};
};
currentService.serverInstance ← serverInstance;
serverInterface ← getServerInterfaceFromCache[serverInstance];
IF serverInterface =
NIL
THEN {
currentService.callFailed ← TRUE;
currentService.controllerAndServerOK ← FALSE;
currentService.found ← TRUE;
currentService.success ← cantImportServer;
currentService.msg ← errorMessageX;
RETURN;
RETURN[FALSE, TRUE, cantImportServer, "Cannot Import from Server Selected by Controller", NIL];
};
[found: askFound, serverPupAddress: serverPupAddress, errMsg: askForServiceErrMsg] ← serverInterface.AskForService[service: service, version: version, clientMachineName: clientMachineName, userName: userName
!
RPC.CallFailed => {
askForServiceCount ← askForServiceCount + 1;
IF askForServiceCount = 1
THEN {
[] ← deleteServerInterfaceFromCache[serverInterface];
serverInterface ← getServerInterfaceFromCache[serverInstance];
IF serverInterface # NIL THEN RETRY;
};
currentService.callFailed ← TRUE;
[] ← deleteServerInterfaceFromCache[serverInterface];
currentService.msg ← "Server did not respond to 'AskForService' request";
GOTO serverCallFailed;
};
];
};
SELECT askFound
FROM
$notFound => {
currentService.callFailed ← TRUE;
currentService.controllerAndServerOK ← TRUE;
currentService.found ← FALSE;
currentService.success ← commandNotFound;
currentService.msg ← askForServiceErrMsg;
currentService.serverInstance ← serverInstance;
RETURN;
RETURN [TRUE, FALSE, commandNotFound, errMsg, serverInstance];
};
$foundButTooBusy => {
currentService.callFailed ← TRUE;
currentService.controllerAndServerOK ← TRUE;
currentService.found ← FALSE;
currentService.success ← serverTooBusy;
currentService.msg ← askForServiceErrMsg;
currentService.serverInstance ← serverInstance;
RETURN;
RETURN [TRUE, FALSE, serverTooBusy, errMsg, serverInstance];
};
$foundOK => {
doServiceCount: INT ← 0 ;
AddPupAddress[newItem: currentService, serverPupAddress: serverPupAddress] ;
IF needRemoteInStream
OR needRemoteOutStream
THEN {
currentService.socketReady ← TRUE;
};
msg ← NIL; -- see comment on the GOTO above
[success: success, msg: msg] ← serverInterface.DoService[serverPupAddress, myNetAddressRope, cmdLine, workingDirectory, needRemoteInStream, needRemoteOutStream
!
RPC.CallFailed => {
doServiceCount ← doServiceCount + 1;
IF doServiceCount = 1
THEN {
[] ← deleteServerInterfaceFromCache[serverInterface];
serverInterface ← getServerInterfaceFromCache[serverInstance];
IF serverInterface # NIL THEN RETRY;
};
currentService.callFailed ← TRUE;
[] ← deleteServerInterfaceFromCache[serverInterface];
DestroyServiceItem[serverPupAddress, currentService];
currentService.msg ← "Server did not respond to 'DoService' request";
GOTO serverCallFailed;
};
];
DestroyServiceItem[serverPupAddress, currentService];
currentService.callOver ← TRUE;
};
$timeOut => {
currentService.callFailed ← TRUE;
currentService.controllerAndServerOK ← TRUE;
currentService.found ← TRUE;
currentService.success ← timeOut;
currentService.msg ← "Call timed out";
RETURN;
RETURN [TRUE, FALSE, serverTooBusy, errMsg, serverInstance];
};
ENDCASE => {
currentService.callFailed ← TRUE;
currentService.controllerAndServerOK ← TRUE;
currentService.found ← TRUE;
currentService.success ← serverTooBusy;
currentService.msg ← Rope.Cat["Return atom ", Atom.GetPName[askFound]," not understood"];
currentService.serverInstance ← serverInstance;
RETURN;
};
currentService.controllerAndServerOK ← controllerAndServerOK;
currentService.found ← found;
currentService.success ← success;
currentService.msg ← Rope.Concat[askForServiceErrMsg, msg];
EXITS
controllerCallFailed => {
currentService.controllerAndServerOK ← FALSE;
currentService.found ← FALSE;
currentService.success ← false;
currentService.msg ← "Call to the Controller Failed";
currentService.serverInstance ← NIL;
RETURN;
RETURN[FALSE, FALSE, false, "Call to the Controller Failed", NIL];
};
serverCallFailed => {
currentService.callFailed ← TRUE;
DestroyServiceItem[serverPupAddress, currentService];
currentService.controllerAndServerOK ← FALSE;
currentService.found ← TRUE;
currentService.success ← false;
currentService.msg ← msg; --setup by all places that do a GOTO serverCallFailed
RETURN;
RETURN[FALSE, TRUE, false, msg, serverInstance];
};
badName => {
currentService.callFailed ← TRUE;
currentService.controllerAndServerOK ← FALSE;
currentService.found ← FALSE;
currentService.success ← false;
currentService.msg ← "Bad server name";
currentService.serverInstance ← NIL;
RETURN;
RETURN[FALSE, FALSE, false, "Bad server name", NIL];
};
};
};
DestroyServiceItem:
PROC [serverPupAddress: Pup.Address, currentService: serviceState] = {
IF DeletePupAddress[serverPupAddress]
THEN {
FOR file:
INT
IN [1..16]
DO
FS.Close[currentService.first16OpenFiles[file] ! FS.Error => CONTINUE;];
currentService.first16OpenFiles[file] ← FS.nullOpenFile;
ENDLOOP;
FOR nowItem: openFileItem ← currentService.openFileListBase, nowItem.next
UNTIL nowItem =
NIL
DO
FS.Close[nowItem.openFile ! FS.Error => CONTINUE;];
nowItem.openFile ← FS.nullOpenFile;
ENDLOOP;
};
};
TryToAbortProcess:
PROC [currentService: serviceState ] = {
serverInterface: ComputeServerRpcControl.InterfaceRecord;
WHILE ~currentService.streamReady
DO
IF currentService.callFailed OR currentService.callOver THEN RETURN;
Process.Pause[7];
ENDLOOP;
serverInterface ← getServerInterfaceFromCache[currentService.serverInstance];
IF serverInterface # NIL THEN serverInterface.AskForAbort[currentService.listenerPupAddress ! RPC.CallFailed => CONTINUE;];
};
getServerInterfaceFromCache:
ENTRY
PROC [serverInstance:
RPC.ShortROPE]
RETURNS [serverInterface: ComputeServerRpcControl.InterfaceRecord ←
NIL] = {
newServerInterface: ComputeServerRpcControl.InterfaceRecord ← NIL;
bestIndex: INT ← 0;
bestTime: BasicTime.GMT ← BasicTime.latestGMT;
now: BasicTime.GMT ← BasicTime.Now[];
FOR index:
INT
IN [0..serverInterfaceCacheSize)
DO
IF Rope.Equal[serverInstance, serverInterfaceCache[index].serverInstance]
THEN {
newServerInterface ← serverInterfaceCache[index].interface;
serverInterfaceCache[index].lastUsed ← now;
RETURN[serverInterfaceCache[index].interface];
};
IF BasicTime.Period[bestTime, serverInterfaceCache[index].lastUsed] < 0
THEN {
bestTime ← serverInterfaceCache[index].lastUsed;
bestIndex ← index;
};
ENDLOOP;
newServerInterface ← ComputeServerRpcControl.ImportNewInterface[
interfaceName: [ type: "ComputeServer.summoner", instance: serverInstance, version: [1,1]]
!
RPC.ImportFailed => {
CONTINUE;
};
];
IF newServerInterface #
NIL
THEN {
serverInterfaceCache[bestIndex].lastUsed ← now;
serverInterfaceCache[bestIndex].serverInstance ← serverInstance;
serverInterfaceCache[bestIndex].interface ← newServerInterface;
};
RETURN[newServerInterface];
};
deleteServerInterfaceFromCache:
ENTRY
PROC [serverInterface: ComputeServerRpcControl.InterfaceRecord]
RETURNS [gotIt:
BOOL ←
FALSE] = {
FOR index:
INT
IN [0..serverInterfaceCacheSize)
DO
IF serverInterfaceCache[index].interface = serverInterface
THEN {
serverInterfaceCache[index].serverInstance ← NIL;
serverInterfaceCache[index].interface ← NIL;
serverInterfaceCache[index].lastUsed ← BasicTime.earliestGMT;
gotIt ← TRUE;
EXIT;
};
ENDLOOP;
};
FS from the Client Workstation - Exported to ComputeServerCallbacks
MakeUnmatchedError:
PROC [] = {
ERROR ComputeServerCallbacks.Error[[ $environment, $serverUnmatchedComputation, "remote FS operation performed on server that does not know about the computation"]];
};
MakeEnumerationBugError:
PROC [] = {
ERROR ComputeServerCallbacks.Error[[ $bug, $serverEnumerationBug, "remote FS enumeration error"]];
};
MakeError:
PROC [error:
FS.ErrorDesc] = {
ERROR ComputeServerCallbacks.Error[[ error.group, error.code, error.explanation.Substr[0, MIN[60, error.explanation.Length[]]]]];
};
rGeneric:
PUBLIC PROC [requestCode:
ATOM, requestString:
ROPE]
RETURNS [resultCode:
ATOM, resultString:
ROPE] = {
generic call to allow for expansion without RPC interface recompilation
RETURN [$notImplemented, NIL];
};
rSetDefaultWDir:
PUBLIC
PROC [dir:
RPC.ShortROPE] = {
ENABLE FS.Error => {MakeError[error]; };
FS.SetDefaultWDir[dir];
};
rGetDefaultWDir:
PUBLIC
PROC
RETURNS [
RPC.ShortROPE] = {
ENABLE FS.Error => {MakeError[error]; };
RETURN[FS.GetDefaultWDir[]];
};
rExpandName:
PUBLIC
PROC[name:
ROPE, wDir:
RPC.ShortROPE]
RETURNS [fullFName:
ROPE, cp:
FS.ComponentPositions, dirOmitted:
BOOLEAN] = {
ENABLE FS.Error => {MakeError[error]; };
[fullFName, cp, dirOmitted] ← FS.ExpandName[name, wDir];
};
rFileInfo:
PUBLIC
PROC [name:
ROPE, wantedCreatedTime: BasicTime.
GMT, remoteCheck:
BOOLEAN, wDir:
RPC.ShortROPE]
RETURNS [fullFName, attachedTo:
ROPE, keep:
CARDINAL, bytes:
INT, created: BasicTime.
GMT] = {
ENABLE FS.Error => {MakeError[error]; };
[fullFName, attachedTo, keep, bytes, created] ← FS.FileInfo[name, wantedCreatedTime, remoteCheck, wDir];
};
rStartEnumerateForInfo:
PUBLIC
PROC [listenerPupAddress: Pup.Address, pattern:
ROPE, wDir:
RPC.ShortROPE]
RETURNS [fullFName, attachedTo:
ROPE, created: BasicTime.
GMT, bytes:
INT, keep:
CARDINAL, continue:
BOOLEAN] = {
ENABLE FS.Error => {MakeError[error]; };
item: serviceState;
found: BOOL;
[found, item] ← MatchPupAddress[listenerPupAddress];
IF ~found THEN MakeUnmatchedError[];
IF item.enumerateItem # NIL THEN MakeEnumerationBugError[];
item.enumerateItem ← NEW[enumerationStateObject];
TRUSTED {Process.Detach[ FORK enumerationInfoProcess[item, pattern, wDir]];};
[fullFName, attachedTo, created, bytes, keep, continue] ← getEnumerateForInfo[item.enumerateItem];
};
rNextEnumerateForInfo:
PUBLIC
PROC [listenerPupAddress: Pup.Address]
RETURNS [fullFName, attachedTo:
ROPE, created: BasicTime.
GMT, bytes:
INT, keep:
CARDINAL, continue:
BOOLEAN] = {
ENABLE FS.Error => {MakeError[error]; };
item: serviceState;
found: BOOL;
[found, item] ← MatchPupAddress[listenerPupAddress];
IF ~found THEN MakeUnmatchedError[];
IF item.enumerateItem = NIL THEN MakeEnumerationBugError[];
[fullFName, attachedTo, created, bytes, keep, continue] ← getEnumerateForInfo[item.enumerateItem];
};
rStartEnumerateForNames:
PUBLIC
PROC [listenerPupAddress: Pup.Address, pattern:
ROPE, wDir:
RPC.ShortROPE]
RETURNS [fullFName:
ROPE, continue:
BOOLEAN] = {
ENABLE FS.Error => {MakeError[error]; };
item: serviceState;
found: BOOL;
[found, item] ← MatchPupAddress[listenerPupAddress];
IF ~found THEN MakeUnmatchedError[];
IF item.enumerateItem # NIL THEN MakeEnumerationBugError[];
item.enumerateItem ← NEW[enumerationStateObject];
TRUSTED {Process.Detach[ FORK enumerationNameProcess[item, pattern, wDir]];};
[fullFName, continue] ← getEnumerateForName[item.enumerateItem];
};
rNextEnumerateForNames:
PUBLIC
PROC [listenerPupAddress: Pup.Address]
RETURNS [fullFName:
ROPE, continue:
BOOLEAN] = {
ENABLE FS.Error => {MakeError[error]; };
item: serviceState;
found: BOOL;
[found, item] ← MatchPupAddress[listenerPupAddress];
IF ~found THEN MakeUnmatchedError[];
IF item.enumerateItem = NIL THEN MakeEnumerationBugError[];
[fullFName, continue] ← getEnumerateForName[item.enumerateItem];
};
rDoneEnumerate:
PUBLIC
PROC [listenerPupAddress: Pup.Address] = {
setDone:
ENTRY
PROC = {
item.enumerateItem.continue ← FALSE;
BROADCAST enumerationNeeded;
WHILE item.enumerateItem #
NIL
DO
WAIT enumerationReady;
ENDLOOP;
};
item: serviceState;
found: BOOL;
[found, item] ← MatchPupAddress[listenerPupAddress];
IF ~found THEN MakeUnmatchedError[];
IF item.enumerateItem = NIL THEN MakeEnumerationBugError[];
setDone[];
};
enumerationInfoProcess:
PROC [item: serviceState, pattern:
ROPE, wDir:
ROPE] = {
enumerateItem: enumerationState = item.enumerateItem;
inner:
FS.InfoProc = {
enterNext:
ENTRY
PROC = {
enumerateItem.fullFName ← fullFName;
enumerateItem.attachedTo ← attachedTo;
enumerateItem.created ← created;
enumerateItem.bytes ← bytes;
enumerateItem.keep ← keep;
enumerateItem.ready ← TRUE;
enumerateItem.needNext ← FALSE;
BROADCAST enumerationReady;
WHILE ~enumerateItem.needNext
AND enumerateItem.continue
DO
WAIT enumerationNeeded;
ENDLOOP;
};
enterNext[];
RETURN[enumerateItem.continue];
};
innerDone:
ENTRY
PROC = {
enumerateItem.continue ← FALSE;
enumerateItem.ready ← TRUE;
item.enumerateItem ← NIL ;
BROADCAST enumerationReady;
};
FS.EnumerateForInfo[pattern, inner, wDir];
innerDone[];
};
getEnumerateForInfo:
ENTRY
PROC
[enumerateItem: enumerationState]
RETURNS [fullFName, attachedTo:
ROPE, created: BasicTime.
GMT, bytes:
INT, keep:
CARDINAL, continue:
BOOL] = {
enumerateItem.needNext ← TRUE;
WHILE ~enumerateItem.ready
DO
BROADCAST enumerationNeeded;
WAIT enumerationReady;
ENDLOOP;
enumerateItem.ready ← FALSE;
RETURN[enumerateItem.fullFName, enumerateItem.attachedTo, enumerateItem.created, enumerateItem.bytes, enumerateItem.keep, enumerateItem.continue];
};
enumerationNameProcess:
PROC [item: serviceState, pattern:
ROPE, wDir:
ROPE] = {
enumerateItem: enumerationState = item.enumerateItem;
inner:
FS.NameProc = {
enterNext:
ENTRY
PROC = {
enumerateItem.fullFName ← fullFName;
enumerateItem.ready ← TRUE;
enumerateItem.needNext ← FALSE;
BROADCAST enumerationReady;
WHILE ~enumerateItem.needNext
AND enumerateItem.continue
DO
WAIT enumerationNeeded;
ENDLOOP;
};
enterNext[];
RETURN[enumerateItem.continue];
};
innerDone:
ENTRY
PROC = {
enumerateItem.continue ← FALSE;
enumerateItem.ready ← TRUE;
item.enumerateItem ← NIL ;
BROADCAST enumerationReady;
};
FS.EnumerateForNames[pattern, inner, wDir];
innerDone[];
};
getEnumerateForName:
ENTRY
PROC
[enumerateItem: enumerationState]
RETURNS [fullFName:
ROPE, continue:
BOOL] = {
enumerateItem.needNext ← TRUE;
WHILE ~enumerateItem.ready
DO
BROADCAST enumerationNeeded;
WAIT enumerationReady;
ENDLOOP;
enumerateItem.ready ← FALSE;
RETURN[enumerateItem.fullFName, enumerateItem.continue];
};
rOpen:
PUBLIC
PROC [listenerPupAddress: Pup.Address, name:
ROPE, lock:
FS.Lock ← $read, wantedCreatedTime: BasicTime.
GMT ← BasicTime.nullGMT, remoteCheck:
BOOLEAN ←
TRUE, wDir:
RPC.ShortROPE, forceRemoteOpen:
BOOL ←
FALSE]
RETURNS [globalNameToOpen:
ROPE ←
NIL, openFile: ComputeServerCallbacks.OpenFile ← 0] = {
ENABLE FS.Error => {MakeError[error]; };
cacheNameProc: FSBackdoor.NameProc = {
gotOne ← TRUE;
};
localOpenFile: FS.OpenFile;
item: serviceState;
found: BOOL;
fullFName, attachedTo: ROPE;
keep: CARDINAL;
created: BasicTime.GMT;
gotOne: BOOL ← FALSE;
[found, item] ← MatchPupAddress[listenerPupAddress];
IF ~found THEN MakeUnmatchedError[];
[fullFName: fullFName, attachedTo: attachedTo, keep: keep, created: created] ← FS.FileInfo[name, wantedCreatedTime, remoteCheck, wDir];
IF keep = 0 THEN attachedTo ← fullFName; -- global filenames return a keep of 0
FSBackdoor.EnumerateCacheForNames[proc: cacheNameProc, volName: NIL, pattern: attachedTo];
IF gotOne
AND Rope.Length[attachedTo] > 0
AND lock = $read
AND ~forceRemoteOpen
THEN {
prefixFound: BOOL ← FALSE;
globalNameToOpen ← attachedTo;
FOR l:
LIST
OF
ROPE ← CachePrefixesOKToRead, l.rest
UNTIL l =
NIL
DO
IF Rope.Find[attachedTo, l.first, 0, FALSE] = 0 THEN {prefixFound ← TRUE; EXIT;};
ENDLOOP;
IF prefixFound AND (listenerPupAddress.net = StreamPupAddress.net) THEN openFile ← -1
}
ELSE {
localOpenFile ← FS.Open[name, lock, wantedCreatedTime, remoteCheck, wDir];
openFile ← addOpenFileToServiceState[item, localOpenFile];
};
};
rCreate:
PUBLIC
PROC [listenerPupAddress: Pup.Address, name:
ROPE, setPages:
BOOLEAN ←
TRUE, pages:
INT ← 0, setKeep:
BOOLEAN ←
FALSE, keep:
CARDINAL ← 1, wDir:
RPC.ShortROPE]
RETURNS [openFile: ComputeServerCallbacks.OpenFile ← 0] = {
ENABLE FS.Error => {MakeError[error]; };
localOpenFile: FS.OpenFile;
item: serviceState;
found: BOOL;
[found, item] ← MatchPupAddress[listenerPupAddress];
IF ~found THEN MakeUnmatchedError[];
localOpenFile ← FS.Create[name, setPages, pages, setKeep, keep, wDir];
openFile ← addOpenFileToServiceState[item, localOpenFile];
};
rOpenOrCreate:
PUBLIC
PROC [listenerPupAddress: Pup.Address, name:
ROPE, keep:
CARDINAL ← 1, pages:
INT ← 5, wDir:
RPC.ShortROPE]
RETURNS [openFile: ComputeServerCallbacks.OpenFile ← 0] = {
ENABLE FS.Error => {MakeError[error]; };
localOpenFile: FS.OpenFile;
item: serviceState;
found: BOOL;
[found, item] ← MatchPupAddress[listenerPupAddress];
IF ~found THEN MakeUnmatchedError[];
localOpenFile ← FS.OpenOrCreate[name, keep, pages, wDir];
openFile ← addOpenFileToServiceState[item, localOpenFile];
};
rGetClass:
PUBLIC
PROC [listenerPupAddress: Pup.Address, file: ComputeServerCallbacks.OpenFile]
RETURNS [
ATOM] = {
ENABLE FS.Error => {MakeError[error]; };
localOpenFile: FS.OpenFile;
item: serviceState;
found: BOOL;
[found, item] ← MatchPupAddress[listenerPupAddress];
IF ~found THEN MakeUnmatchedError[];
localOpenFile ← findOpenFileInServiceState[item, file];
RETURN[FS.GetClass[localOpenFile]];
};
rSameFile:
PUBLIC
PROC [listenerPupAddress: Pup.Address, file1, file2: ComputeServerCallbacks.OpenFile]
RETURNS [
BOOLEAN] = {
ENABLE FS.Error => {MakeError[error]; };
localOpenFile1, localOpenFile2: FS.OpenFile;
item: serviceState;
found: BOOL;
[found, item] ← MatchPupAddress[listenerPupAddress];
IF ~found THEN MakeUnmatchedError[];
localOpenFile1 ← findOpenFileInServiceState[item, file1];
localOpenFile2 ← findOpenFileInServiceState[item, file2];
RETURN[FS.SameFile[localOpenFile1, localOpenFile2]];
};
rGetName:
PUBLIC
PROC [listenerPupAddress: Pup.Address, file: ComputeServerCallbacks.OpenFile]
RETURNS [fullFName, attachedTo:
ROPE] = {
ENABLE FS.Error => {MakeError[error]; };
localOpenFile: FS.OpenFile;
item: serviceState;
found: BOOL;
[found, item] ← MatchPupAddress[listenerPupAddress];
IF ~found THEN MakeUnmatchedError[];
localOpenFile ← findOpenFileInServiceState[item, file];
[fullFName, attachedTo] ← FS.GetName[localOpenFile];
};
rGetInfo:
PUBLIC
PROC [listenerPupAddress: Pup.Address, file: ComputeServerCallbacks.OpenFile]
RETURNS [keep:
CARDINAL, pages, bytes:
INT, created: BasicTime.
GMT, lock:
FS.Lock] = {
ENABLE FS.Error => {MakeError[error]; };
localOpenFile: FS.OpenFile;
item: serviceState;
found: BOOL;
[found, item] ← MatchPupAddress[listenerPupAddress];
IF ~found THEN MakeUnmatchedError[];
localOpenFile ← findOpenFileInServiceState[item, file];
[keep, pages, bytes, created, lock] ← FS.GetInfo[localOpenFile];
};
rSetPageCount:
PUBLIC
PROC [listenerPupAddress: Pup.Address, file: ComputeServerCallbacks.OpenFile, pages:
INT] = {
ENABLE FS.Error => {MakeError[error]; };
localOpenFile: FS.OpenFile;
item: serviceState;
found: BOOL;
[found, item] ← MatchPupAddress[listenerPupAddress];
IF ~found THEN MakeUnmatchedError[];
localOpenFile ← findOpenFileInServiceState[item, file];
FS.SetPageCount[localOpenFile, pages];
};
rSetByteCountAndCreatedTime:
PUBLIC
PROC [listenerPupAddress: Pup.Address, file: ComputeServerCallbacks.OpenFile, bytes:
INT ← -1, created: BasicTime.
GMT ← BasicTime.nullGMT] = {
ENABLE FS.Error => {MakeError[error]; };
localOpenFile: FS.OpenFile;
item: serviceState;
found: BOOL;
[found, item] ← MatchPupAddress[listenerPupAddress];
IF ~found THEN MakeUnmatchedError[];
localOpenFile ← findOpenFileInServiceState[item, file];
FS.SetByteCountAndCreatedTime[localOpenFile, bytes, created];
};
rRead:
PUBLIC
PROC [listenerPupAddress: Pup.Address, file: ComputeServerCallbacks.OpenFile, from, nPages:
INT, pageBuffer: ComputeServerCallbacks.RESULTPageBuffer] = {
ENABLE FS.Error => {MakeError[error]; };
localOpenFile: FS.OpenFile;
item: serviceState;
found: BOOL;
[found, item] ← MatchPupAddress[listenerPupAddress];
IF ~found THEN MakeUnmatchedError[];
localOpenFile ← findOpenFileInServiceState[item, file];
TRUSTED {
IF
BASE[pageBuffer] =
VM.AddressForPageNumber[
VM.PageNumberForAddress[
BASE[pageBuffer]]]
THEN {
FS.Read[localOpenFile, from, nPages, BASE[pageBuffer]]; -- buffer page aligned
}
ELSE {
vmBuffer: VM.Interval ← VM.Allocate[nPages];
vmBufferAddress: LONG POINTER ← VM.AddressForPageNumber[vmBuffer.page];
{
ENABLE
UNWIND =>
IF vmBuffer.page # 0
THEN
VM.Free[vmBuffer !
VM.AddressFault =>
CONTINUE;];
Get buffer, transfer file data into it, copy it into the result buffer. All of this because the result buffer is not page aligned.
FS.Read[localOpenFile, from, nPages, vmBufferAddress];
PrincOpsUtils.LongCopy[from: vmBufferAddress, nwords: VM.WordsForPages[nPages], to: BASE[pageBuffer]];
VM.Free[vmBuffer];
};
};
};
};
rWrite:
PUBLIC
PROC [listenerPupAddress: Pup.Address, file: ComputeServerCallbacks.OpenFile, to:
INT, nPages:
INT, pageBuffer: ComputeServerCallbacks.VALUEPageBuffer] = {
ENABLE FS.Error => {MakeError[error]; };
localOpenFile: FS.OpenFile;
item: serviceState;
found: BOOL;
[found, item] ← MatchPupAddress[listenerPupAddress];
IF ~found THEN MakeUnmatchedError[];
localOpenFile ← findOpenFileInServiceState[item, file];
TRUSTED {
IF
BASE[pageBuffer] =
VM.AddressForPageNumber[
VM.PageNumberForAddress[
BASE[pageBuffer]]]
THEN {
FS.Write[localOpenFile, to, nPages, BASE[pageBuffer]]; -- buffer page aligned
}
ELSE {
vmBuffer: VM.Interval ← VM.Allocate[nPages];
vmBufferAddress: LONG POINTER ← VM.AddressForPageNumber[vmBuffer.page];
{
ENABLE
UNWIND =>
IF vmBuffer.page # 0
THEN
VM.Free[vmBuffer !
VM.AddressFault =>
CONTINUE;];
Get buffer, transfer file data into it, write it into the file. All of this because the result buffer is not page aligned.
PrincOpsUtils.LongCopy[from: BASE[pageBuffer], nwords: VM.WordsForPages[nPages], to: vmBufferAddress];
FS.Write[localOpenFile, to, nPages, vmBufferAddress];
VM.Free[vmBuffer];
};
};
};
};
rClose:
PUBLIC
PROC [listenerPupAddress: Pup.Address, file: ComputeServerCallbacks.OpenFile] = {
ENABLE FS.Error => {MakeError[error]; };
localOpenFile: FS.OpenFile;
item: serviceState;
found: BOOL;
[found, item] ← MatchPupAddress[listenerPupAddress];
IF ~found THEN MakeUnmatchedError[];
localOpenFile ← findOpenFileInServiceState[item, file];
FS.Close[localOpenFile ! FS.Error => MakeError[error]];
removeOpenFileFromServiceState[item, file];
};
rCopy:
PUBLIC
PROC [listenerPupAddress: Pup.Address, from, to:
ROPE, setKeep:
BOOLEAN ←
FALSE, keep:
CARDINAL ← 1, wantedCreatedTime: BasicTime.
GMT ← BasicTime.nullGMT, remoteCheck:
BOOLEAN ←
TRUE, attach:
BOOLEAN ←
FALSE, wDir:
RPC.ShortROPE ←
NIL]
RETURNS [toFName:
ROPE] = {
ENABLE FS.Error => {MakeError[error]; };
item: serviceState;
found: BOOL;
toFName: ROPE;
[found, item] ← MatchPupAddress[listenerPupAddress];
IF ~found THEN MakeUnmatchedError[];
[toFName: toFName] ← FS.Copy[from, to, setKeep, keep, wantedCreatedTime, remoteCheck, attach, wDir];
};
rDelete:
PUBLIC
PROC [listenerPupAddress: Pup.Address, name:
ROPE, wantedCreatedTime: BasicTime.
GMT ← BasicTime.nullGMT, wDir:
RPC.ShortROPE ←
NIL] = {
ENABLE FS.Error => {MakeError[error]; };
item: serviceState;
found: BOOL;
[found, item] ← MatchPupAddress[listenerPupAddress];
IF ~found THEN MakeUnmatchedError[];
FS.Delete[name, wantedCreatedTime, wDir];
};
rRename:
PUBLIC
PROC [listenerPupAddress: Pup.Address, from, to:
ROPE, setKeep:
BOOLEAN ←
FALSE, keep:
CARDINAL ← 1, wantedCreatedTime: BasicTime.
GMT ← BasicTime.nullGMT, wDir:
RPC.ShortROPE ←
NIL] = {
ENABLE FS.Error => {MakeError[error]; };
item: serviceState;
found: BOOL;
[found, item] ← MatchPupAddress[listenerPupAddress];
IF ~found THEN MakeUnmatchedError[];
FS.Rename[from, to, setKeep, keep, wantedCreatedTime, wDir];
};
rSetKeep:
PUBLIC
PROC [listenerPupAddress: Pup.Address, name:
ROPE, keep:
CARDINAL ← 1, wDir:
RPC.ShortROPE ←
NIL] = {
ENABLE FS.Error => {MakeError[error]; };
item: serviceState;
found: BOOL;
[found, item] ← MatchPupAddress[listenerPupAddress];
IF ~found THEN MakeUnmatchedError[];
FS.SetKeep[name, keep, wDir];
};
Dummies to strip off an arg for the local call
MyFindService: PROC [service: Rope.ROPE, userName: RPC.ShortROPE] RETURNS [found: BOOL, instance: RPC.ShortROPE];
MyFindServiceWithQueueing: PROC [service: Rope.ROPE, userName: RPC.ShortROPE, version: RPC.ShortROPE, timeToWait: INT, clientMachineName: RPC.ShortROPE, streamPupAddress: Pup.Address, needListener: BOOL] RETURNS [found: ATOM, instance: RPC.ShortROPE, serverPupAddress: Pup.Address, errMsg: Rope.ROPE];
MyNewStats: PROC [serverMachineName: RPC.ShortROPE, serverMachinePupAddress: RPC.ShortROPE, serverUP: BOOL, firstCall: BOOL, machineType: PrincOps.MachineType, mainMemory: CARDINAL, numberCPUs: CARDINAL, diskPartionSize: INT, freePagesOnDisk: INT, freeboard: INT, freeGFI: CARDINAL, freeMDS: CARDINAL, freeVM: CARDINAL, oldestLRUFileDate: BasicTime.GMT, CPULoad: REAL, nonBackgroundCPULoad: REAL, reclamationRate: REAL, freeProcesses: CARDINAL, userName: RPC.ShortROPE, currentRequests: LIST OF ComputeServer.Request, aveBackgroundLoad: REAL] RETURNS [terminateService, newPackage: BOOL, queueingCommands: LIST OF Rope.ROPE];
MyBestServerStats: PROC RETURNS[instance: RPC.ShortROPE, FOM: REAL] ;
MyNoticeNewPackage: PROC [package: RPC.ShortROPE] RETURNS [error: BOOL, tryDifferentController: BOOL, msg: Rope.ROPE];
MyRemoveOldPackage: PROC [package: RPC.ShortROPE] RETURNS [error: BOOL, tryDifferentController: BOOL, msg: Rope.ROPE];
MyGetSomeInfo: PROC RETURNS [error: BOOL, tryDifferentController: BOOL, msg: Rope.ROPE, serverList: LIST OF Rope.ROPE, bestFOM: REAL];
MyCommandUnavailable: PROC [serverMachineName: RPC.ShortROPE, commandName: Rope.ROPE, version: RPC.ShortROPE];
MyExtraCommandAvailable: PROC [serverMachineName: RPC.ShortROPE, commandName: Rope.ROPE, version: RPC.ShortROPE];
MyMightAcceptQueuedCommand: PROC [serverMachineAddress: RPC.ShortROPE, commandName: Rope.ROPE];
MyGenericToController: PROC [requestCode: ATOM, requestString: Rope.ROPE] RETURNS [resultCode: ATOM, resultString: Rope.ROPE];
lFindService:
PROC [interface: ComputeServerControllerRpcControl.InterfaceRecord, service: Rope.
ROPE, userName:
RPC.ShortROPE]
RETURNS [found:
BOOL, instance:
RPC.ShortROPE] = {
RETURN MyFindService[service, userName];
};
lMyFindServiceWithQueueing:
PROC [interface: ComputeServerControllerRpcControl.InterfaceRecord, service: Rope.
ROPE, userName:
RPC.ShortROPE, version:
RPC.ShortROPE, timeToWait:
INT, clientMachineName:
RPC.ShortROPE, streamPupAddress: Pup.Address, needListener:
BOOL]
RETURNS [found:
ATOM, instance:
RPC.ShortROPE, serverPupAddress: Pup.Address, errMsg: Rope.
ROPE] = {
RETURN MyFindServiceWithQueueing[service, userName, version, timeToWait, clientMachineName, streamPupAddress, needListener];
};
lNewStats:
PROC [interface: ComputeServerControllerRpcControl.InterfaceRecord, serverMachineName:
RPC.ShortROPE, serverMachinePupAddress:
RPC.ShortROPE, serverUP:
BOOL, firstCall:
BOOL, machineType: PrincOps.MachineType, mainMemory:
CARDINAL, numberCPUs:
CARDINAL, diskPartionSize:
INT, freePagesOnDisk:
INT, freeboard:
INT, freeGFI:
CARDINAL, freeMDS:
CARDINAL, freeVM:
CARDINAL, oldestLRUFileDate: BasicTime.
GMT, CPULoad:
REAL, nonBackgroundCPULoad:
REAL, reclamationRate:
REAL, freeProcesses:
CARDINAL, userName:
RPC.ShortROPE, currentRequests:
LIST
OF ComputeServer.Request, aveBackgroundLoad:
REAL]
RETURNS [terminateService, newPackage:
BOOL, queueingCommands:
LIST
OF Rope.
ROPE] = {
RETURN MyNewStats[serverMachineName, serverMachinePupAddress, serverUP, firstCall, machineType, mainMemory, numberCPUs, diskPartionSize, freePagesOnDisk, freeboard, freeGFI, freeMDS, freeVM, oldestLRUFileDate, CPULoad, nonBackgroundCPULoad, reclamationRate, freeProcesses, userName, currentRequests, aveBackgroundLoad];
};
lBestServerStats:
PROC
[interface: ComputeServerControllerRpcControl.InterfaceRecord
] RETURNS[instance:
RPC.ShortROPE,
FOM:
REAL] = {
RETURN MyBestServerStats[];
};
lNoticeNewPackage:
PROC [interface: ComputeServerControllerRpcControl.InterfaceRecord, package:
RPC.ShortROPE]
RETURNS [error:
BOOL, tryDifferentController:
BOOL, msg: Rope.
ROPE] = {
RETURN MyNoticeNewPackage[package];
};
lRemoveOldPackage:
PROC [interface: ComputeServerControllerRpcControl.InterfaceRecord, package:
RPC.ShortROPE]
RETURNS [error:
BOOL, tryDifferentController:
BOOL, msg: Rope.
ROPE] = {
RETURN MyRemoveOldPackage [package];
};
lGetSomeInfo:
PROC [interface: ComputeServerControllerRpcControl.InterfaceRecord]
RETURNS [error:
BOOL, tryDifferentController:
BOOL, msg: Rope.
ROPE, serverList:
LIST
OF Rope.
ROPE, bestFOM:
REAL] = {
RETURN MyGetSomeInfo[];
};
lCommandUnavailable:
PROC [interface: ComputeServerControllerRpcControl.InterfaceRecord, serverMachineName:
RPC.ShortROPE, commandName: Rope.
ROPE, version:
RPC.ShortROPE] = {
MyCommandUnavailable[serverMachineName, commandName, version];
};
lExtraCommandAvailable:
PROC [interface: ComputeServerControllerRpcControl.InterfaceRecord, serverMachineName:
RPC.ShortROPE, commandName: Rope.
ROPE, version:
RPC.ShortROPE] = {
MyExtraCommandAvailable[serverMachineName, commandName, version];
};
lMightAcceptQueuedCommand:
PROC [interface: ComputeServerControllerRpcControl.InterfaceRecord, serverMachineAddress:
RPC.ShortROPE, commandName: Rope.
ROPE] = {
MyMightAcceptQueuedCommand[serverMachineAddress, commandName];
};
lGenericToController:
PROC [interface: ComputeServerControllerRpcControl.InterfaceRecord, requestCode:
ATOM, requestString: Rope.
ROPE]
RETURNS [resultCode:
ATOM, resultString: Rope.
ROPE] = {
RETURN MyGenericToController[requestCode, requestString];
};
Initialization
name, password: ROPE;
zones: RPC.Zones ← RPC.standardZones;
serverInterfaceCache ← NEW[serverInterfaceArray];
ClientMachineName ← PupName.MyName[];
myNetAddressRope ← PupName.MyRope[];
StreamPupAddress ← PupName.NameLookup[ClientMachineName, Pup.nullSocket];
[name, password] ← UserCredentials.Get[];
TRUSTED {
IF PrincOpsUtils.IsBound[LOOPHOLE[UnsafeStorage.GetTransientPageUZone]] THEN zones.heap ← UnsafeStorage.GetTransientPageUZone[] ; -- LOOPHOLE should not be needed, but the compiler give a bogus type error
};
ComputeServerCallbacksRpcControl.ExportInterface[
interfaceName: [ type: "ComputeServerCallbacks.summoner", instance: myNetAddressRope, version: [1,1]],
user: name,
password: RPC.MakeKey[password],
parameterStorage: zones
];
ControllerGVName ← UserProfile.Token["Summoner.ControllerName"];
IF ControllerGVName = NIL THEN ControllerGVName ← "PaloAlto1.summoner" ;
GetProfileConstants[];
UserProfile.CallWhenProfileChanges[proc: ProfileChanged];
TRUSTED {Process.Detach[FORK ComputeClientInternal.GetControllerInterfaceProcess[]]; };
ControllerInterface ← ComputeServerControllerRpcControl.ImportNewInterface[
interfaceName: [ type: "ComputeServerController.summoner", instance: ControllerGVName, version: [1,1]]
! RPC.ImportFailed => {
TRUSTED {Process.Detach[FORK GetControllerInterfaceProcess[]]; };
CONTINUE;
};
];
TRUSTED {Process.Detach[FORK ComputeClientInternal.PokeTheController[]]; };
Commander.Register[key: "///Commands/SummonerClientOn", proc: ClientOn, doc: "Turn on use of the Compute Server Client {for the specified cluster}"];
Commander.Register[key: "///Commands/SummonerClientOff", proc: ClientOff, doc: "Turn off use of the Compute Server Client"];
Commander.Register[key: "///Commands/SummonerClientRequests", proc: ClientRequests, doc: "Print all active requests"];
Commander.Register[key: "///Commands/SummonerCommandInfo", proc: CanRunProc, doc: "Print all commands or print all servers that the Controller expects to be able to do a command"];
Commander.Register[key: "///Commands/SummonerServerLoads", proc: ServerLoadsProc, doc: "Print server load statistics"];
Booting.RegisterProcs[r: RollbackRecovery];
END.