ComputeControllerImpl.mesa
Implement the controller. This runs on any machine that is also a compute server.
Last Edited by: Bob Hagmann, August 5, 1986 11:02:55 am PDT
Hal Murray, March 22, 1986 11:16:09 pm PST
Copyright © 1984 by Xerox Corporation. All rights reserved.
DIRECTORY
AMProcess,
BasicTime,
Commander,
ComputeControllerInternal,
ComputeServerControl,
ComputeServerController,
ComputeServerControllerRpcControl,
ComputeServer,
ComputeServerInternal,
ComputeServerRpcControl,
DFUtilities,
FS,
GVBasics,
GVNames,
IO,
PrincOps,
PrincOpsUtils,
Process,
Pup,
PupName,
Real,
Rope,
RPC,
SummonerControllerControl,
SymTab,
UserCredentials,
UserProfile,
ViewerClasses,
ViewerIO,
ViewerOps,
WorldVM;
ComputeControllerImpl: CEDAR MONITOR
IMPORTS AMProcess, BasicTime, Commander, ComputeControllerInternal, ComputeServerControl, ComputeServerControllerRpcControl, ComputeServerInternal, ComputeServerRpcControl, DFUtilities, FS, GVNames, IO, Process, PupName, Real, Rope, RPC, SymTab, UserCredentials, UserProfile, ViewerIO, ViewerOps, WorldVM
EXPORTS ComputeControllerInternal, ComputeServerController, SummonerControllerControl
= BEGIN
ROPE: TYPE = Rope.ROPE;
NoticeNewPackageCondition: CONDITION;
InterfaceExported: BOOLFALSE;
ControllerEnabled: BOOLFALSE;
LatentControllerActive: BOOLTRUE;
IAmTheController: PUBLIC BOOLFALSE;
ControllerMovingAway: PUBLIC BOOLFALSE;
GrabController: PUBLIC BOOLFALSE;
TimeIBecameTheController: PUBLIC BasicTime.GMT ← BasicTime.nullGMT;
ControllerUp: BOOLTRUE;
ControllerGVName: ROPENIL;
ControllerError: TYPE = {none, import, callFailed};
LastControllerError: ControllerError ← none;
LastImportFailure: RPC.ImportFailure;
LastCallFailure: RPC.CallFailure;
ControllerFailedTime: BasicTime.GMT;
LastReBuildQueueTime: BasicTime.GMT ← BasicTime.nullGMT;
ServerStatusRecord: TYPE = ComputeControllerInternal.ServerStatusRecord;
ServerStatus: TYPE = REF ServerStatusRecord;
ServerStatusList: PUBLIC LIST OF ServerStatus ← NIL;
BrokenCommandTable: SymTab.Ref; -- knowing a command, find where its broken
ExtraCommandTable: SymTab.Ref; -- knowing a non-standard command, find who can run it
MachineToStatusTable: SymTab.Ref; -- knowing a machine's name, find its status; name is of the form "Hornet" or "3#35#"
BrokenCommandObject: TYPE = RECORD [
serverMachineName: RPC.ShortROPE,
versions: LIST OF ROPENIL
];
BrokenCommand: TYPE = REF BrokenCommandObject;
ExtraCommandObject: TYPE = RECORD [
serverMachineName: RPC.ShortROPE,
versions: LIST OF ROPENIL
];
ExtraCommand: TYPE = REF ExtraCommandObject;
forceReBuildQueuing: BOOLFALSE;
WaitingQueue: LIST OF QueueEntry ← NIL;
QueueEntry: TYPE = REF QueueEntryRec;
QueueEntryRec: TYPE = RECORD [
service: ROPENIL,
doQueueing: BOOLFALSE,
waiters: LIST OF WaitingRequest ← NIL,
servers: LIST OF ROPENIL,
changed: CONDITION
];
WaitingRequest: TYPE = REF WaitingRequestRec;
WaitingRequestRec: TYPE = RECORD [
process: PROCESS,
timeout: BasicTime.GMT,
waiting: waitType ← init,
version: ROPE,
clientMachineName: RPC.ShortROPE,
userName: RPC.ShortROPE,
streamPupAddress: Pup.Address,
needListener: BOOL,
serverAssigned: BOOLFALSE,
response: ATOM,
instance: RPC.ShortROPE,
serverPupAddress: Pup.Address
];
waitType: TYPE = {init, true, validating, done, timedOut};
serverInterfaceItem: TYPE = RECORD [
serverInstance: ROPENIL,
interface: ComputeServerRpcControl.InterfaceRecord ← NIL,
lastUsed: BasicTime.GMT ← BasicTime.earliestGMT
];
serverInterfaceCacheSize: INT = 20;
serverInterfaceArray: TYPE = ARRAY [0..serverInterfaceCacheSize) OF serverInterfaceItem;
serverInterfaceCache: REF serverInterfaceArray;
CheckState: TYPE = {checking, aborted, done};
CheckItemProcessObj: TYPE = RECORD[
check: CheckState ← checking,
communicationsOK: BOOLTRUE,
askFound: ATOM,
serverPupAddress: Pup.Address,
errMsg: ROPE
];
NoOfLatentSummonerControllerProcesses: INT ← 0 ;
ControllerEventType: TYPE = {service, newPackage, newServer, lostServer, becomeController, noticeOtherController, cedeController};
ControllerEvent: TYPE = RECORD [
type: ControllerEventType, -- type of event
reason: Rope.ROPE, -- if relevant, the reason for the event
command: Rope.ROPE, -- command name (service and newPackage)
time: BasicTime.GMT,
remoteMachineName: Rope.ROPE, -- name of other machine if relevant (user's machine or controller)
remoteUser: Rope.ROPE, -- name of user if relevant
chain: REF ControllerEvent -- used internally to chain events together
];
anotherControllerEvent: CONDITION;
controllerEventListTail: REF ControllerEvent ← NIL;
watchingController: BOOLEANFALSE;
StartUpController: PUBLIC PROC [controllerName: ROPENIL, forceSelfAsPrimaryController: BOOLFALSE] RETURNS [] = {
locked: BOOLFALSE;
{
ENABLE UNWIND => IF locked THEN UnLock[];
user, password: ROPE;
ControllerGVName ← IF Rope.IsEmpty[controllerName] THEN UserProfile.Token["Summoner.ControllerName"] ELSE controllerName;
IF Rope.IsEmpty[ControllerGVName] THEN ControllerGVName ← "PaloAlto1.summoner" ;
ControllerEnabled ← TRUE;
LatentControllerActive ← TRUE;
ControllerUp ← TRUE;
Lock[];
locked ← TRUE;
IF ~InterfaceExported THEN {
info: GVNames.ConnectInfo;
connect: GVBasics.Connect;
[info: info, connect: connect ] ← GVNames.GetConnect[ControllerGVName];
IF info = individual OR info = group THEN {
IF forceSelfAsPrimaryController OR Rope.Equal[connect, ComputeServerControl.MyNetAddressRope] OR Rope.Equal[UserProfile.Token["Summoner.PrimaryController"], ComputeServerControl.MyNetAddressRope] THEN {
ReportControllerEvent[becomeController, IF forceSelfAsPrimaryController THEN "SummonerBecomeController command" ELSE "Grapevine thinks I'm the controller but I don't (yet)", NIL, BasicTime.Now[], NIL, NIL];
IAmTheController ← TRUE;
TimeIBecameTheController ← BasicTime.Now[];
[user, password] ← UserCredentials.Get[];
InterfaceExported ← TRUE;
BrokenCommandTable ← SymTab.Create[mod: 59, case: FALSE];
ExtraCommandTable ← SymTab.Create[mod: 59, case: FALSE];
MachineToStatusTable ← SymTab.Create[mod: 59, case: FALSE];
ComputeServerControllerRpcControl.ExportInterface[
interfaceName: [ type: "ComputeServerController.summoner", instance: ControllerGVName, version: [1,1]],
user: user,
password: RPC.MakeKey[password]
! RPC.ExportFailed => {
InterfaceExported ← FALSE;
IAmTheController ← FALSE;
ReportControllerEvent[cedeController, "failed export of RPC interface", NIL, BasicTime.Now[], NIL, NIL];
CONTINUE;
};
];
};
};
};
UnLock[];
locked ← FALSE;
TRUSTED {Process.Detach[FORK LatentSummonerController[]] ;};
};
};
ControllerCannnotBeImported: PUBLIC PROC [reason: RPC.ImportFailure] = {
LastControllerError ← import;
LastImportFailure ← reason;
NoticeControllerIsDown[];
};
ControllerCallFailed: PUBLIC PROC [reason: RPC.CallFailure] = {
LastControllerError ← callFailed;
LastCallFailure ← reason;
NoticeControllerIsDown[];
};
NoticeControllerIsDown: PROC = {
IF ControllerUp THEN ControllerFailedTime ← BasicTime.Now[];
ControllerUp ← FALSE;
};
NoticeControllerIsUp: PUBLIC PROC = {
ControllerUp ← TRUE;
};
TryThisController: PROC RETURNS [importedOK: BOOLTRUE, controllerInterface: ComputeServerControllerRpcControl.InterfaceRecord] = {
controllerInterface ← ComputeServerControllerRpcControl.ImportNewInterface[
interfaceName: [ type: "ComputeServerController.summoner", instance: ControllerGVName, version: [1,1]]
! RPC.ImportFailed => {
importedOK ← FALSE;
CONTINUE;
};
];
};
LatentSummonerController: PROC [] = {
MakeSureThereIsOnlyOne: ENTRY PROC RETURNS [exit: BOOLFALSE] = {
IF NoOfLatentSummonerControllerProcesses = 0 THEN NoOfLatentSummonerControllerProcesses ← 1
ELSE RETURN[TRUE];
};
locked: BOOLFALSE;
IF MakeSureThereIsOnlyOne[] THEN RETURN;
DO
ENABLE UNWIND => IF locked THEN UnLock[];
IF ~LatentControllerActive THEN EXIT;
IF ~ControllerUp OR GrabController THEN { -- controller is down or I should grab it - see if I should acquire the controller function
info: GVNames.ConnectInfo;
connect: GVBasics.Connect;
ControllerMovingAway ← FALSE;
IF GrabController OR ~TryThisController[].importedOK THEN { -- down but import will work soon
downTime: INT = BasicTime.Period[ControllerFailedTime, BasicTime.Now[]];
[info: info, connect: connect ] ← GVNames.GetConnect[ControllerGVName];
IF GrabController OR downTime > 83 OR
((downTime > 13 AND (info = individual OR info = group) AND Rope.Equal[connect, ComputeServerControl.MyNetAddressRope]) OR Rope.Equal[UserProfile.Token["Summoner.PrimaryController"], ComputeServerControl.MyNetAddressRope])
THEN {
user, password: ROPE;
Lock[];
locked ← TRUE;
ReportControllerEvent[becomeController, IF GrabController THEN "SummonerBecomeController command" ELSE "Grapevine thinks I'm the controller but I don't (yet)", NIL, BasicTime.Now[], NIL, NIL];
GrabController ← FALSE;
IAmTheController ← TRUE;
TimeIBecameTheController ← BasicTime.Now[];
[user, password] ← UserCredentials.Get[];
IF InterfaceExported THEN ComputeServerControllerRpcControl.UnexportInterface[];
InterfaceExported ← TRUE;
BrokenCommandTable ← SymTab.Create[mod: 59, case: FALSE];
ExtraCommandTable ← SymTab.Create[mod: 59, case: FALSE];
MachineToStatusTable ← SymTab.Create[mod: 59, case: FALSE];
ComputeServerControllerRpcControl.ExportInterface[
interfaceName: [ type: "ComputeServerController.summoner", instance: ControllerGVName, version: [1,1]],
user: user,
password: RPC.MakeKey[password]
! RPC.ExportFailed => {
InterfaceExported ← FALSE;
IAmTheController ← FALSE;
ReportControllerEvent[cedeController, "failed export of RPC interface", NIL, BasicTime.Now[], NIL, NIL];
CONTINUE;
};
];
UnLock[];
locked ← FALSE;
};
};
}
ELSE { -- Controller is up. If I think that I am the controller, but grapevine does not think so, then fix the situation
IF IAmTheController THEN {
info: GVNames.ConnectInfo;
connect: GVBasics.Connect;
[info: info, connect: connect ] ← GVNames.GetConnect[ControllerGVName];
IF (info = individual OR info = group) THEN {
IF ~Rope.Equal[connect, ComputeServerControl.MyNetAddressRope] THEN { -- Grapevine says I am not the Controller
triedControllerInterface: ComputeServerControllerRpcControl.InterfaceRecord;
tryDifferentController: BOOLFALSE;
importedOK: BOOL;
[importedOK: importedOK, controllerInterface: triedControllerInterface] ← TryThisController[];
See if this Controller thinks everything is fine
IF importedOK THEN [tryDifferentController: tryDifferentController] ← triedControllerInterface.GetSomeInfo[ ! RPC.CallFailed => {
importedOK ← FALSE;
CONTINUE;
};
];
IF importedOK AND ~tryDifferentController THEN { -- new controller answers
IF ServerStatusList = NIL THEN {
Lock[];
locked ← TRUE;
IAmTheController ← FALSE;
ReportControllerEvent[cedeController, "other controller has assumed role", NIL, BasicTime.Now[], PupName.HisName[PupName.NameLookup[connect, Pup.nullSocket]], NIL];
ControllerMovingAway ← FALSE;
ComputeServerControllerRpcControl.UnexportInterface[];
InterfaceExported ← FALSE;
ServerStatusList ← NIL;
BrokenCommandTable ← NIL;
ExtraCommandTable ← NIL;
UnLock[];
locked ← FALSE;
}
ELSE {
ReportControllerEvent[noticeOtherController, "new controller starting to assume role", NIL, BasicTime.Now[], PupName.HisName[PupName.NameLookup[connect, Pup.nullSocket]], NIL];
ControllerMovingAway ← TRUE;
};
}
ELSE { -- new controller does not answer or thinks it is moving too
ControllerMovingAway ← FALSE;
IF ~importedOK THEN { -- new controller does not answer — re-assert my controllership
user, password: ROPE;
[user, password] ← UserCredentials.Get[];
[] ← GVNames.SetConnect[user: user, password: RPC.MakeKey[password], individual: ControllerGVName, connect: ComputeServerControl.MyNetAddressRope];
};
};
}
ELSE { -- Grapevine says I am the Controller
ControllerMovingAway ← FALSE;
};
}
ELSE { -- NOT info = individual OR info = group (grapevine troubles)
ControllerMovingAway ← FALSE;
};
TossOldStatus[];
ReBuildQueuing[];
};
};
Process.Pause[Process.SecondsToTicks[127]];
ENDLOOP;
};
ShutDownController: PUBLIC PROC [] RETURNS [] = {
IF InterfaceExported THEN ComputeServerControllerRpcControl.UnexportInterface[];
InterfaceExported ← FALSE;
ControllerEnabled ← FALSE;
ServerStatusList ← NIL;
BrokenCommandTable ← NIL;
ExtraCommandTable ← NIL;
MachineToStatusTable ← NIL;
LatentControllerActive ← FALSE;
ControllerUp ← FALSE;
IF IAmTheController THEN ReportControllerEvent[cedeController, "Compute Server shutdown", NIL, BasicTime.Now[], NIL, NIL];
IAmTheController ← FALSE;
};
TossOldStatus: ENTRY PROC [] RETURNS [] = {
ENABLE UNWIND => NULL;
statusList: LIST OF ServerStatus;
lastStatusList: LIST OF ServerStatus ← NIL;
status: ServerStatus ← NIL;
now: BasicTime.GMT ← BasicTime.Now[];
FOR statusList ← ServerStatusList, statusList.rest UNTIL statusList = NIL DO
status ← statusList.first;
IF BasicTime.Period[status.timeOfStatus, now] > 30 THEN {
InternalReportControllerEvent[type: lostServer, reason: "no status for 30 seconds", command: NIL, time: BasicTime.Now[], remoteMachineName: status.serverMachineName, userName: status.userName];
[] ← SymTab.Delete[x: MachineToStatusTable, key: status.serverMachineName];
[] ← SymTab.Delete[x: MachineToStatusTable, key: status.serverMachinePupAddress];
IF lastStatusList = NIL THEN ServerStatusList ← statusList.rest
ELSE lastStatusList.rest ← statusList.rest;
};
lastStatusList ← statusList;
ENDLOOP;
};
ReBuildQueuing: PROC [] RETURNS [] = {
now: BasicTime.GMT ← BasicTime.Now[];
IF forceReBuildQueuing OR BasicTime.Period[LastReBuildQueueTime, now] > 300 THEN {
setNewQueueing[];
LastReBuildQueueTime ← now;
};
forceReBuildQueuing ← FALSE;
};
GetSomeInfo: PUBLIC ENTRY PROC RETURNS [error: BOOLFALSE, tryDifferentController: BOOLFALSE, msg: ROPENIL, serverList: LIST OF ROPENIL, bestFOM: REAL ← 0.0] = {
ENABLE UNWIND => NULL;
bestStatus: ServerStatus ← NIL;
statusList: LIST OF ServerStatus;
IF ~IAmTheController THEN RETURN [ FALSE, TRUE, "controller has moved", NIL, 0.0];
FOR statusList ← ServerStatusList, statusList.rest UNTIL statusList = NIL DO
status: ServerStatus = statusList.first;
perCentBusy: INT ← Real.Round[status.CPULoad*100.0];
IF Rope.Equal[status.serverMachineName, ComputeServerInternal.myHostName] THEN {
serverList ← CONS[IO.PutFR["%g (Controller, %g%% busy)", IO.rope[status.serverMachineName], IO.int[perCentBusy]], serverList];
}
ELSE serverList ← CONS[IO.PutFR["%g (%g%%)", IO.rope[status.serverMachineName], IO.int[perCentBusy]], serverList];
IF Rope.Equal[status.serverMachineName, ComputeServerInternal.myHostName] THEN {
serverList ← CONS[Rope.Concat[status.serverMachineName, " (Controller)"], serverList];
}
ELSE serverList ← CONS[status.serverMachineName, serverList];
IF IsStatusBetter[status, bestStatus] THEN bestStatus ← status;
ENDLOOP;
IF bestStatus = NIL THEN RETURN [FALSE, ControllerMovingAway, "no servers up", serverList, 1.0];
RETURN [FALSE, ControllerMovingAway, NIL, serverList, bestStatus.FOM];
};
BestServerStats: PUBLIC ENTRY PROC RETURNS[instance: RPC.ShortROPE, FOM: REAL] = {
ENABLE UNWIND => NULL;
bestStatus: ServerStatus ← NIL;
statusList: LIST OF ServerStatus;
IF ~InterfaceExported THEN ERROR RPC.CallFailed[unbound]; -- local import still works after we "UnexportInterface"
FOR statusList ← ServerStatusList, statusList.rest UNTIL statusList = NIL DO
status: ServerStatus = statusList.first;
IF IsStatusBetter[status, bestStatus] THEN bestStatus ← status;
ENDLOOP;
IF bestStatus = NIL THEN RETURN [NIL, 1.0];
RETURN [bestStatus.serverMachineName, bestStatus.FOM];
};
bumpBestServerStats: ENTRY PROC [instance: RPC.ShortROPE] = {
ENABLE UNWIND => NULL;
statusFound: BOOL;
val: REF ANY;
status: ServerStatus;
tempMachineToStatusTable: SymTab.Ref ← MachineToStatusTable;
IF tempMachineToStatusTable # NIL THEN {
[found: statusFound, val: val] ← SymTab.Fetch[x: tempMachineToStatusTable, key: instance];
IF statusFound THEN {
status ← NARROW[val];
status.CPULoad ← MIN[ 1.0, status.CPULoad + 0.5];
status.nonBackgroundCPULoad ← MIN[ 1.0, status.nonBackgroundCPULoad + 0.5];
status.reclamationRate ← status.reclamationRate + 30 ;
status.numberOfCurrentRequests ← status.numberOfCurrentRequests + 1;
status.aveBackgroundLoad ← status.aveBackgroundLoad + 1;
status.FOM ← figureOfMerit[status];
};
};
};
FindService: PUBLIC PROC [service: ROPE, userName: RPC.ShortROPE] RETURNS [found: BOOL, instance: RPC.ShortROPE] = {
serverName: RPC.ShortROPE;
IF ~InterfaceExported THEN ERROR RPC.CallFailed[unbound]; -- local import still works after we "UnexportInterface"
[found, instance, serverName] ← InnerFindService[service];
IF found THEN ReportControllerEvent[service, "", service, BasicTime.Now[], serverName, userName];
};
FindServiceWithQueueing: PUBLIC PROCEDURE [service: ROPE, userName: RPC.ShortROPE, version: RPC.ShortROPE, timeToWait: INT, clientMachineName: RPC.ShortROPE, streamPupAddress: Pup.Address, needListener: BOOL] RETURNS [found: ATOM, instance: RPC.ShortROPE, serverPupAddress: Pup.Address, errMsg: ROPE] = {
entry: QueueEntry;
item: WaitingRequest ← NIL;
tryForServer: PROC [] = {
DO
commOK: BOOL;
foundBroken: BOOL;
brokenVal: REF ANY;
brokenList, brokenListLoop: LIST OF BrokenCommand;
askFound: ATOM;
queueItemReady[entry, item];
IF item.waiting = timedOut THEN {
item.response ← $timeOut;
EXIT;
};
[found: foundBroken, val: brokenVal] ← SymTab.Fetch[x: BrokenCommandTable, key: service];
brokenList ← NARROW[brokenVal];
FOR brokenListLoop ← brokenList, brokenListLoop.rest UNTIL brokenListLoop = NIL DO
IF Rope.Equal[brokenListLoop.first.serverMachineName, item.instance, FALSE] THEN {
item.serverAssigned ← FALSE;
item.waiting ← true;
EXIT;
};
ENDLOOP;
IF ~item.serverAssigned THEN LOOP;
[communicationsOK: commOK, askFound: askFound, serverPupAddress: serverPupAddress] ← CheckItem [entry, item];
IF ~commOK OR (askFound # $foundOK) THEN {
item.serverAssigned ← FALSE;
item.waiting ← true;
LOOP;
};
item.response ← askFound;
item.serverPupAddress ← serverPupAddress;
SELECT askFound FROM
$notFound => {
removeQueueItem[entry, item];
errMsg ← NIL;
RETURN;
};
$foundButTooBusy => {
LOOP;
};
$foundOK => {
removeQueueItem[entry, item];
RETURN;
};
ENDCASE;
ENDLOOP;
removeQueueItem[entry, item];
};
IF ~InterfaceExported THEN ERROR RPC.CallFailed[unbound]; -- local import still works after we "UnexportInterface"
[entry, item] ← findOrCreateQueueEntry[service];
item.version ← version;
item.clientMachineName ← clientMachineName;
item.streamPupAddress ← streamPupAddress;
item.needListener ← needListener;
item.userName ← userName;
IF entry.doQueueing THEN { -- queueing already being done
item.process ← LOOPHOLE[Process.GetCurrent[]];
item.timeout ← BasicTime.Update[BasicTime.Now[], timeToWait];
item.waiting ← true;
tryForServer[];
bumpBestServerStats[item.instance];
IF item.response = $foundOK THEN ReportControllerEvent[service, "", service, BasicTime.Now[], item.instance, userName];
RETURN[found: item.response, instance: item.instance, serverPupAddress: item.serverPupAddress, errMsg: NIL];
}
ELSE { -- queueing not started
serversTried: LIST OF RPC.ShortROPE ← NIL;
DO
commOK: BOOL;
serviceFound: BOOL;
instance: RPC.ShortROPE;
askFound: ATOM;
entry.doQueueing ← TRUE;
setNewQueueing[];
[serviceFound, instance] ← InnerFindService[service];
item.instance ← instance;
IF ~serviceFound THEN {
found ← $notFound;
removeQueueItem[entry, item];
RETURN;
};
FOR oldServers: LIST OF RPC.ShortROPE ← serversTried, oldServers.rest UNTIL oldServers = NIL DO
IF Rope.Equal[oldServers.first, instance, FALSE] THEN {
We are about to retry the same server again. Give up probing, and start queueing.
serversTried ← NIL; -- help garbage collector
entry.doQueueing ← TRUE;
setNewQueueing[];
item.process ← LOOPHOLE[Process.GetCurrent[]];
item.timeout ← BasicTime.Update[BasicTime.Now[], timeToWait];
item.waiting ← true;
tryForServer[];
IF item.response = $foundOK THEN ReportControllerEvent[service, "", service, BasicTime.Now[], item.instance, userName];
RETURN[found: item.response, instance: item.instance, serverPupAddress: item.serverPupAddress, errMsg: NIL];
};
ENDLOOP;
serversTried ← CONS[instance, serversTried];
[communicationsOK: commOK, askFound: askFound, serverPupAddress: serverPupAddress] ← CheckItem[entry, item];
IF ~commOK THEN LOOP;
SELECT askFound FROM
$notFound => {
removeQueueItem[entry, item];
RETURN [askFound, instance, serverPupAddress, NIL];
};
$foundButTooBusy => {
LOOP;
};
$foundOK => {
removeQueueItem[entry, item];
ReportControllerEvent[service, "", service, BasicTime.Now[], instance, userName];
RETURN [askFound, instance, serverPupAddress, NIL];
};
ENDCASE;
ENDLOOP;
};
};
CheckItem: PROC [entry: QueueEntry, item: WaitingRequest] RETURNS [communicationsOK: BOOLTRUE, askFound: ATOM, serverPupAddress: Pup.Address, errMsg: ROPE] = {
checkProcess: PROCESS;
checkObject: REF CheckItemProcessObj ← NEW[CheckItemProcessObj];
startTime: BasicTime.GMT ← BasicTime.Now[];
checkProcess ← FORK CheckItemProcess[entry, item, checkObject, item.userName];
DO
IF BasicTime.Period[startTime, BasicTime.Now[]] > 9 THEN TRUSTED {
Remote machine is not unresponsive, but will not answer the call
checkProcessTV: AMProcess.Process = AMProcess.PSBIToTV[world: WorldVM.LocalWorld[], psbi: PrincOpsUtils.ProcessToPsbIndex[LOOPHOLE[checkProcess]]];
Detach so we can return. Freeze, abort and thaw the process. After a long time (minutes if ever), the process will finally wake up and go away. Also, if the remote host stops answering the ping, the call will fail and the process will exit.
Process.Detach[checkProcess];
AMProcess.Freeze[LIST[checkProcessTV]];
AMProcess.Abort[checkProcessTV];
AMProcess.Thaw[LIST[checkProcessTV]];
checkObject.check ← aborted;
RETURN[FALSE, $notFound, Pup.nullAddress, ""];
};
IF checkObject.check = done THEN TRUSTED {
[] ← JOIN checkProcess;
RETURN[checkObject.communicationsOK, checkObject.askFound, checkObject.serverPupAddress, checkObject.errMsg];
};
Process.Pause[2];
ENDLOOP;
};
CheckItemProcess: PROC [entry: QueueEntry, item: WaitingRequest, checkObject: REF CheckItemProcessObj, userName: ROPE] = {
ENABLE ABORTED => GOTO aborted;
communicationsOK: BOOLTRUE;
askFound: ATOM;
serverPupAddress: Pup.Address;
errMsg: ROPE;
firstTime: BOOLTRUE;
serverInstance: ROPE ← item.instance;
DO
serverInterface: ComputeServerRpcControl.InterfaceRecord;
serverInterface ← getServerInterfaceFromCache[serverInstance];
IF serverInterface = NIL THEN {communicationsOK ← FALSE; RETURN};
[found: askFound, serverPupAddress: serverPupAddress, errMsg: errMsg] ← serverInterface.AskForService[service: entry.service, version: item.version, clientMachineName: item.clientMachineName, userName: userName
! RPC.CallFailed => {
[] ← deleteServerInterfaceFromCache[serverInterface];
IF firstTime THEN {firstTime ← FALSE; LOOP;};
communicationsOK ← FALSE;
EXIT;
};
];
EXIT;
ENDLOOP;
checkObject.check ← done;
checkObject.communicationsOK ← communicationsOK;
checkObject.askFound ← askFound;
checkObject.serverPupAddress ← serverPupAddress;
checkObject.errMsg ← errMsg;
EXITS
aborted => {
checkObject.check ← aborted;
RETURN;
};
};
InnerFindService: ENTRY PROC [service: ROPE] RETURNS [found: BOOL, instance: RPC.ShortROPE, serverMachineName: RPC.ShortROPE] = {
ENABLE UNWIND => NULL;
statusList: LIST OF ServerStatus;
bestStatus: ServerStatus ← NIL;
foundCommand: BOOL;
[found: foundCommand] ← SymTab.Fetch[x: ComputeServerInternal.CommandTable, key: service];
IF foundCommand THEN {
command should be out on (almost) every server
foundBroken: BOOL;
brokenVal: REF ANY;
brokenList: LIST OF BrokenCommand;
[found: foundBroken, val: brokenVal] ← SymTab.Fetch[x: BrokenCommandTable, key: service];
brokenList ← NARROW[brokenVal];
FOR statusList ← ServerStatusList, statusList.rest UNTIL statusList = NIL DO
skipIt: BOOLFALSE;
serverMachineName: ROPE ← statusList.first.serverMachineName;
serverMachinePupAddress: ROPE ← statusList.first.serverMachinePupAddress;
brokenListLoop: LIST OF BrokenCommand;
FOR brokenListLoop ← brokenList, brokenListLoop.rest UNTIL brokenListLoop = NIL DO
IF Rope.Equal[brokenListLoop.first.serverMachineName, serverMachineName, FALSE] OR Rope.Equal[brokenListLoop.first.serverMachineName, serverMachinePupAddress, FALSE] THEN {
skipIt ← TRUE;
EXIT;
};
ENDLOOP;
IF skipIt THEN LOOP;
IF IsStatusBetter[statusList.first, bestStatus] THEN bestStatus ← statusList.first;
ENDLOOP;
}
ELSE {
command should be out on (almost) no servers
foundExtra: BOOL;
extraVal: REF ANY;
extraListLoop: LIST OF ExtraCommand;
[found: foundExtra, val: extraVal] ← SymTab.Fetch[x: ExtraCommandTable, key: service];
FOR extraListLoop ← NARROW[extraVal], extraListLoop.rest UNTIL extraListLoop = NIL DO
statusFound: BOOL;
val: REF ANY;
status: ServerStatus;
[found: statusFound, val: val] ← SymTab.Fetch[x: MachineToStatusTable, key: extraListLoop.first.serverMachineName];
IF statusFound THEN {
status ← NARROW[val];
IF bestStatus = NIL THEN bestStatus ← status
ELSE IF IsStatusBetter[status, bestStatus] THEN bestStatus ← status;
};
ENDLOOP;
};
IF bestStatus = NIL THEN {
RETURN[FALSE, "", ""]
}
ELSE {
bestStatus.CPULoad ← MIN[ 1.0, bestStatus.CPULoad + 0.5];
bestStatus.reclamationRate ← bestStatus.reclamationRate + 30 ;
bestStatus.FOM ← figureOfMerit[bestStatus];
RETURN[TRUE, bestStatus.serverMachinePupAddress, bestStatus.serverMachineName];
};
};
figureOfMerit: PROC [status: ServerStatus] RETURNS [FOM: REAL] = {
expectedPercentAvailable: REAL;
adjustedBusyCPU: REAL = IF status.machineType = dorado THEN status.nonBackgroundCPULoad ELSE 0.8 + (status.nonBackgroundCPULoad/5.0);
expectedPercentAvailable ← (1.0 - adjustedBusyCPU) / ((IF status.aveBackgroundLoad < 0.1 THEN Real.Float[status.numberOfCurrentRequests] ELSE status.aveBackgroundLoad) + 1);
FOM ← 1.0 - expectedPercentAvailable;
};
CommandUnavailable: PUBLIC ENTRY PROC [serverMachineName: RPC.ShortROPE, commandName: ROPE, version: RPC.ShortROPE] = {
ENABLE UNWIND => NULL;
found: BOOL;
oldList: LIST OF BrokenCommand;
newBroken: BrokenCommand;
newList: LIST OF BrokenCommand;
machineList: LIST OF BrokenCommand;
val: REF ANY;
IF ~InterfaceExported THEN ERROR RPC.CallFailed[unbound]; -- local import still works after we "UnexportInterface"
[found: found, val: val] ← SymTab.Fetch[x: BrokenCommandTable, key: commandName];
IF found THEN {
oldList ← NARROW[val];
FOR machineList ← oldList, machineList.rest UNTIL machineList = NIL DO
broken: BrokenCommand = machineList.first;
IF Rope.Equal[broken.serverMachineName, serverMachineName] THEN {
versionList: LIST OF ROPE;
FOR versionList ← broken.versions, versionList.rest UNTIL versionList = NIL DO
vers: ROPENARROW[versionList.first];
IF Rope.Equal[vers, version] THEN RETURN;
ENDLOOP;
broken.versions ← CONS[version, broken.versions];
RETURN;
};
ENDLOOP;
newBroken ← NEW[BrokenCommandObject ← [serverMachineName: serverMachineName, versions: LIST[version]]];
newList ← CONS[newBroken, NARROW[val]];
[] ← SymTab.Store[x: BrokenCommandTable, key: commandName, val: newList];
}
ELSE {
newBroken ← NEW[BrokenCommandObject ← [serverMachineName: serverMachineName, versions: LIST[version]]];
newList ← LIST[newBroken];
[] ← SymTab.Store[x: BrokenCommandTable, key: commandName, val: newList];
};
};
ExtraCommandAvailable: PUBLIC ENTRY PROC [serverMachineName: RPC.ShortROPE, commandName: ROPE, version: RPC.ShortROPE] = {
ENABLE UNWIND => NULL;
found: BOOL;
oldList: LIST OF ExtraCommand;
newExtra: ExtraCommand;
newList: LIST OF ExtraCommand;
machineList: LIST OF ExtraCommand;
val: REF ANY;
IF ~InterfaceExported THEN ERROR RPC.CallFailed[unbound]; -- local import still works after we "UnexportInterface"
See if this "Extra" reverses a previous broken
[found: found, val: val] ← SymTab.Fetch[x: BrokenCommandTable, key: commandName];
IF found THEN {
[] ← SymTab.Delete[x: BrokenCommandTable, key: commandName];
};
[found: found, val: val] ← SymTab.Fetch[x: ExtraCommandTable, key: commandName];
IF found THEN {
oldList ← NARROW[val];
FOR machineList ← oldList, machineList.rest UNTIL machineList = NIL DO
extra: ExtraCommand = machineList.first;
IF Rope.Equal[extra.serverMachineName, serverMachineName] THEN {
versionList: LIST OF ROPE;
FOR versionList ← extra.versions, versionList.rest UNTIL versionList = NIL DO
vers: ROPENARROW[versionList.first];
IF Rope.Equal[vers, version] THEN RETURN;
ENDLOOP;
extra.versions ← CONS[version, extra.versions];
RETURN;
};
ENDLOOP;
newExtra ← NEW[ExtraCommandObject ← [serverMachineName: serverMachineName, versions: LIST[version]]];
newList ← CONS[newExtra, NARROW[val]];
[] ← SymTab.Store[x: ExtraCommandTable, key: commandName, val: newList];
}
ELSE {
newExtra ← NEW[ExtraCommandObject ← [serverMachineName: serverMachineName, versions: LIST[version]]];
newList ← LIST[newExtra];
[] ← SymTab.Store[x: ExtraCommandTable, key: commandName, val: newList];
};
};
NoticeNewPackage: PUBLIC ENTRY PROC [package: RPC.ShortROPE] RETURNS [error: BOOLFALSE, tryDifferentController: BOOLFALSE, msg: ROPENIL]= {
ENABLE UNWIND => NULL;
statusList: LIST OF ServerStatus;
packageListInStream: IO.STREAM;
packageListGlobalName: ROPENIL;
packageListOutStream: IO.STREAM;
packageListTempName: ROPENIL;
foundItem: BOOLFALSE;
fullFName: ROPE;
fsErrMsg: ROPE;
cp: FS.ComponentPositions;
dirOmitted: BOOLEAN;
dfPrefix: ROPE;
shortPackage: ROPENIL;
shortPackageAndDF: ROPENIL;
packageDFDate: BasicTime.GMT ← BasicTime.nullGMT;
sawDir: BOOLFALSE;
blankAfterDir: BOOLFALSE;
lastDir: REF DFUtilities.DirectoryItem ← NIL;
wroteDir: BOOLFALSE;
defaultDirectory: REF DFUtilities.DirectoryItem ← NIL ;
whiteItem: REF DFUtilities.WhiteSpaceItem ← NEW[DFUtilities.WhiteSpaceItem ← [1]];
writeUpdatedItem: PROC[] = {
file: REF DFUtilities.FileItem;
file ← NEW[DFUtilities.FileItem ← [
name: shortPackageAndDF, -- a short name
date: [explicit, packageDFDate],
verifyRoot: FALSE]];
IF lastDir = NIL OR ~Rope.Equal[lastDir.path1, dfPrefix, FALSE] THEN {
dir: REF DFUtilities.DirectoryItem ← NEW [DFUtilities.DirectoryItem ← [
path1: dfPrefix,
path2: NIL,
path2IsCameFrom: FALSE,
exported: FALSE,
readOnly: FALSE
]];
DFUtilities.WriteItemToStream[out: packageListOutStream, item: whiteItem];
DFUtilities.WriteItemToStream[out: packageListOutStream, item: dir];
DFUtilities.WriteItemToStream[out: packageListOutStream, item: whiteItem];
lastDir ← dir;
};
DFUtilities.WriteItemToStream[out: packageListOutStream, item: file];
};
DoOneItem: DFUtilities.ProcessItemProc = {
PROC [item: REF ANY] RETURNS [stop: BOOLFALSE]
errors, warnings, filesActedUpon: INT ← 0;
WITH item SELECT FROM
dir: REF DFUtilities.DirectoryItem => {
IF lastDir = NIL OR ~Rope.Equal[lastDir.path1, dir.path1, FALSE] THEN {
wroteDir ← FALSE;
lastDir ← dir;
};
};
comment: REF DFUtilities.CommentItem => {
DFUtilities.WriteItemToStream[out: packageListOutStream, item: comment];
};
white: REF DFUtilities.WhiteSpaceItem => {
};
file: REF DFUtilities.FileItem => {
shortName: ROPE = Rope.Substr[file.name, 0, Rope.Index[s1: file.name, s2: "!"]];
packageDFDate: BasicTime.GMT ← BasicTime.nullGMT;
currentDFDate: BasicTime.GMT ← BasicTime.nullGMT;
IF ~wroteDir THEN {
wroteDir ← TRUE;
IF lastDir = NIL THEN lastDir ← defaultDirectory;
DFUtilities.WriteItemToStream[out: packageListOutStream, item: whiteItem];
DFUtilities.WriteItemToStream[out: packageListOutStream, item: lastDir];
DFUtilities.WriteItemToStream[out: packageListOutStream, item: whiteItem];
};
IF Rope.Equal[shortPackageAndDF, shortName, FALSE] THEN {
writeUpdatedItem[];
foundItem ← TRUE;
}
ELSE {
DFUtilities.WriteItemToStream[out: packageListOutStream, item: file];
};
};
ENDCASE;
};
{
ENABLE UNWIND => {
IF packageListInStream # NIL THEN packageListInStream.Close[! FS.Error => CONTINUE];
IF packageListOutStream # NIL THEN packageListOutStream.Close[! FS.Error => CONTINUE];
FS.Delete[name: packageListTempName ! FS.Error => CONTINUE];
};
IF package.IsEmpty[] THEN RETURN [ TRUE, FALSE, "null package name"];
IF ~ComputeControllerInternal.IAmTheController THEN RETURN [ FALSE, TRUE, "controller has moved"];
WHILE ComputeControllerInternal.IAmTheController AND
BasicTime.Period[ComputeControllerInternal.TimeIBecameTheController, BasicTime.Now[]] < 22 DO
WAIT NoticeNewPackageCondition;
ENDLOOP;
IF ~ComputeControllerInternal.IAmTheController THEN RETURN [ FALSE, TRUE, "controller has moved"];
[fullFName: fullFName, cp: cp, dirOmitted: dirOmitted] ← FS.ExpandName[name: package, wDir: NIL ! FS.Error => GOTO badPackageName ];
shortPackage ← fullFName.Substr[cp.base.start, cp.base.length];
shortPackageAndDF ← Rope.Concat[shortPackage, ".df"];
dfPrefix ← fullFName.Substr[0, cp.base.start];
defaultDirectory ← NEW [DFUtilities.DirectoryItem ← [
path1: dfPrefix,
path2: NIL,
path2IsCameFrom: FALSE,
exported: FALSE,
readOnly: FALSE
]];
[created: packageDFDate] ← FS.FileInfo[name: package, remoteCheck: TRUE ! FS.Error => GOTO noDFFile];
packageListGlobalName ← Rope.Concat[ComputeServerInternal.RemoteCommandDir, "PackageList"];
packageListInStream ← FS.StreamOpen[packageListGlobalName
! FS.Error => GOTO cantOpenPackageList];
packageListTempName ← Rope.Concat[ComputeServerInternal.LocalCommandDir, "PackageList.temp"];
packageListOutStream ← FS.StreamOpen[fileName: packageListTempName, accessOptions: $create];
DFUtilities.ParseFromStream[packageListInStream, DoOneItem ! DFUtilities.SyntaxError => GOTO syntaxErrorInPackageList];
IF ~foundItem THEN writeUpdatedItem[];
packageListInStream.Close[! FS.Error => CONTINUE];
packageListOutStream.Close[! FS.Error => CONTINUE];
[] ← FS.Copy[from: packageListTempName, to: packageListGlobalName, attach: FALSE ! FS.Error => { fsErrMsg ← error.explanation; GOTO cantCopyToServer}];
FS.Delete[name: packageListTempName ! FS.Error => CONTINUE];
IF ~ComputeControllerInternal.IAmTheController THEN RETURN [ FALSE, TRUE, "controller has moved"];
InternalReportControllerEvent[type: newPackage, reason: NIL, command: shortPackage, time: BasicTime.Now[], remoteMachineName: NIL, userName: NIL];
FOR statusList ← ComputeControllerInternal.ServerStatusList, statusList.rest UNTIL statusList = NIL DO
statusList.first.newPackage ← TRUE;
ENDLOOP;
EXITS
cantOpenPackageList => RETURN [ TRUE, FALSE, "controller could not open package list"];
noDFFile => RETURN [ TRUE, FALSE, "controller could not open specified df file - "];
syntaxErrorInPackageList => RETURN [ TRUE, FALSE, "controller could not parse the package list - call an implementor"];
badPackageName => RETURN [ TRUE, FALSE, "bad package df file name"];
cantCopyToServer => RETURN [ TRUE, FALSE, Rope.Concat["could not copy new package list to the file server because ", fsErrMsg]];
};
};
RemoveOldPackage: PUBLIC ENTRY PROC [package: RPC.ShortROPE] RETURNS [error: BOOL, tryDifferentController: BOOL, msg: Rope.ROPE] = {
ENABLE UNWIND => NULL;
statusList: LIST OF ServerStatus;
packageListInStream: IO.STREAM;
packageListGlobalName: ROPENIL;
packageListOutStream: IO.STREAM;
packageListTempName: ROPENIL;
foundItem: BOOLFALSE;
fullFName: ROPE;
cp: FS.ComponentPositions;
dirOmitted: BOOLEAN;
dfPrefix: ROPE;
shortPackage: ROPENIL;
shortPackageAndDF: ROPENIL;
wroteDir: BOOLFALSE;
lastDir: REF DFUtilities.DirectoryItem ← NIL;
defaultDirectory: REF DFUtilities.DirectoryItem ← NIL ;
whiteItem: REF DFUtilities.WhiteSpaceItem ← NEW[DFUtilities.WhiteSpaceItem ← [1]];
DoOneItem: DFUtilities.ProcessItemProc = {
PROC [item: REF ANY] RETURNS [stop: BOOLFALSE]
errors, warnings, filesActedUpon: INT ← 0;
WITH item SELECT FROM
dir: REF DFUtilities.DirectoryItem => {
IF lastDir = NIL OR ~Rope.Equal[lastDir.path1, dir.path1, FALSE] THEN {
DFUtilities.WriteItemToStream[out: packageListOutStream, item: dir];
wroteDir ← FALSE;
lastDir ← dir;
};
};
comment: REF DFUtilities.CommentItem => {
DFUtilities.WriteItemToStream[out: packageListOutStream, item: comment];
};
white: REF DFUtilities.WhiteSpaceItem => {
DFUtilities.WriteItemToStream[out: packageListOutStream, item: white];
};
file: REF DFUtilities.FileItem => {
shortName: ROPE = Rope.Substr[file.name, 0, Rope.Index[s1: file.name, s2: "!"]];
currentDFDate: BasicTime.GMT ← BasicTime.nullGMT;
IF Rope.Equal[shortPackageAndDF, shortName, FALSE] THEN {
foundItem ← TRUE;
}
ELSE {
IF ~wroteDir THEN {
wroteDir ← TRUE;
IF lastDir = NIL THEN lastDir ← defaultDirectory;
DFUtilities.WriteItemToStream[out: packageListOutStream, item: whiteItem];
DFUtilities.WriteItemToStream[out: packageListOutStream, item: lastDir];
DFUtilities.WriteItemToStream[out: packageListOutStream, item: whiteItem];
};
DFUtilities.WriteItemToStream[out: packageListOutStream, item: file];
};
};
ENDCASE;
};
{
ENABLE UNWIND => {
IF packageListInStream # NIL THEN packageListInStream.Close[! FS.Error => CONTINUE];
IF packageListOutStream # NIL THEN packageListOutStream.Close[! FS.Error => CONTINUE];
FS.Delete[name: packageListTempName ! FS.Error => CONTINUE];
};
IF package.IsEmpty[] THEN RETURN [ TRUE, FALSE, "null package name"];
IF ~ComputeControllerInternal.IAmTheController THEN RETURN [ FALSE, TRUE, "controller has moved"];
WHILE ComputeControllerInternal.IAmTheController AND
BasicTime.Period[ComputeControllerInternal.TimeIBecameTheController, BasicTime.Now[]] < 22 DO
WAIT NoticeNewPackageCondition;
ENDLOOP;
IF ~ComputeControllerInternal.IAmTheController THEN RETURN [ FALSE, TRUE, "controller has moved"];
[fullFName: fullFName, cp: cp, dirOmitted: dirOmitted] ← FS.ExpandName[name: package, wDir: NIL ! FS.Error => GOTO badPackageName ];
shortPackage ← fullFName.Substr[cp.base.start, cp.base.length];
shortPackageAndDF ← Rope.Concat[shortPackage, ".df"];
dfPrefix ← fullFName.Substr[0, cp.base.start];
defaultDirectory ← NEW [DFUtilities.DirectoryItem ← [
path1: dfPrefix,
path2: NIL,
path2IsCameFrom: FALSE,
exported: FALSE,
readOnly: FALSE
]];
packageListGlobalName ← Rope.Concat[ComputeServerInternal.RemoteCommandDir, "PackageList"];
packageListInStream ← FS.StreamOpen[packageListGlobalName
! FS.Error => GOTO cantOpenPackageList];
packageListTempName ← Rope.Concat[ComputeServerInternal.LocalCommandDir, "PackageList.temp"];
packageListOutStream ← FS.StreamOpen[fileName: packageListTempName, accessOptions: $create];
DFUtilities.ParseFromStream[packageListInStream, DoOneItem ! DFUtilities.SyntaxError => GOTO syntaxErrorInPackageList];
packageListInStream.Close[! FS.Error => CONTINUE];
packageListOutStream.Close[! FS.Error => CONTINUE];
IF ~foundItem THEN {
FS.Delete[name: packageListTempName ! FS.Error => CONTINUE];
RETURN [ TRUE, FALSE, "package not found"]
};
[] ← FS.Copy[from: packageListTempName, to: packageListGlobalName, attach: FALSE];
FS.Delete[name: packageListTempName ! FS.Error => CONTINUE];
IF ~ComputeControllerInternal.IAmTheController THEN RETURN [ FALSE, TRUE, "controller has moved"];
FOR statusList ← ComputeControllerInternal.ServerStatusList, statusList.rest UNTIL statusList = NIL DO
statusList.first.newPackage ← TRUE;
ENDLOOP;
EXITS
cantOpenPackageList => RETURN [ TRUE, FALSE, "controller could not open package list"];
syntaxErrorInPackageList => RETURN [ TRUE, FALSE, "controller could not parse the package list - call an implementor"];
badPackageName => RETURN [ TRUE, FALSE, "bad package df file name"];
};
};
NewStats: PUBLIC ENTRY PROC [serverMachineName: RPC.ShortROPE, serverMachinePupAddress: RPC.ShortROPE, serverUP: BOOL, firstCall: BOOL, machineType: PrincOps.MachineType, mainMemory: CARDINAL, numberCPUs: CARDINAL, diskPartionSize: INT, freePagesOnDisk: INT, freeboard: INT, freeGFI: CARDINAL, freeMDS: CARDINAL, freeVM: CARDINAL, oldestLRUFileDate: BasicTime.GMT, CPULoad: REAL, nonBackgroundCPULoad: REAL, reclamationRate: REAL, freeProcesses: CARDINAL, userName: RPC.ShortROPE, currentRequests: LIST OF ComputeServer.Request, aveBackgroundLoad: REAL] RETURNS [terminateService, newPackage: BOOLFALSE, queueingCommands: LIST OF ROPENIL] = {
ENABLE UNWIND => NULL;
statusList: LIST OF ServerStatus;
status: ServerStatus ← NIL ;
requests: INT ← 0;
lastStatusList: LIST OF ServerStatus ← NIL ;
IF ~InterfaceExported THEN ERROR RPC.CallFailed[unbound]; -- local import still works after we "UnexportInterface"
FOR cr: LIST OF ComputeServer.Request ← currentRequests, cr.rest UNTIL cr = NIL DO
requests ← requests + 1;
ENDLOOP;
FOR statusList ← ServerStatusList, statusList.rest UNTIL statusList = NIL DO
IF Rope.Equal[statusList.first.serverMachineName, serverMachineName] THEN {
status ← statusList.first;
IF ~serverUP THEN {
IF lastStatusList = NIL THEN ServerStatusList ← statusList.rest
ELSE lastStatusList.rest ← statusList.rest;
}
ELSE {
newPackage ← status.newPackage;
status.newPackage ← FALSE;
status.freePagesOnDisk ← freePagesOnDisk;
status.freeboard ← freeboard;
status.freeGFI ← freeGFI;
status.freeMDS ← freeMDS;
status.freeVM ← freeVM;
status.oldestLRUFileDate ← oldestLRUFileDate;
status.CPULoad ← CPULoad;
status.nonBackgroundCPULoad ← nonBackgroundCPULoad;
status.reclamationRate ← reclamationRate;
status.freeProcesses ← freeProcesses;
status.userName ← userName;
status.currentRequests ← currentRequests;
status.aveBackgroundLoad ← aveBackgroundLoad;
status.numberOfCurrentRequests ← requests;
status.FOM ← figureOfMerit[status: status];
status.timeOfStatus ← BasicTime.Now[];
IF ~SymTab.Fetch[x: MachineToStatusTable, key: serverMachineName].found THEN {
[] ← SymTab.Store[x: MachineToStatusTable, key: serverMachineName, val: status];
[] ← SymTab.Store[x: MachineToStatusTable, key: serverMachinePupAddress, val: status];
};
};
EXIT;
};
lastStatusList ← statusList;
REPEAT
FINISHED => status ← NIL;
ENDLOOP;
IF status = NIL AND serverUP THEN {
status ← NEW[ServerStatusRecord ← [
serverMachineName: serverMachineName,
serverMachinePupAddress: serverMachinePupAddress,
timeOfStatus: BasicTime.Now[],
machineType: machineType,
mainMemory: mainMemory,
numberCPUs: numberCPUs,
diskPartionSize:diskPartionSize,
freePagesOnDisk: freePagesOnDisk,
freeboard: freeboard,
freeGFI: freeGFI,
freeMDS: freeMDS,
freeVM: freeVM,
oldestLRUFileDate: oldestLRUFileDate,
CPULoad: CPULoad,
nonBackgroundCPULoad: nonBackgroundCPULoad,
reclamationRate: reclamationRate,
numberOfCurrentRequests: requests,
freeProcesses: freeProcesses,
userName: userName,
currentRequests: currentRequests,
aveBackgroundLoad: aveBackgroundLoad
]];
status.FOM ← figureOfMerit[status: status];
[] ← SymTab.Store[x: MachineToStatusTable, key: serverMachineName, val: status];
[] ← SymTab.Store[x: MachineToStatusTable, key: serverMachinePupAddress, val: status];
ServerStatusList ← CONS[status, ServerStatusList];
InternalReportControllerEvent[type: newServer, reason: NIL, command: NIL, time: BasicTime.Now[], remoteMachineName: serverMachineName, userName: userName];
};
IF status # NIL AND status.newQueueing THEN {
status.newQueueing ← FALSE;
FOR l: LIST OF QueueEntry ← WaitingQueue, l.rest UNTIL l = NIL DO
IF l.first.doQueueing THEN queueingCommands ← CONS[l.first.service, queueingCommands];
ENDLOOP;
};
IF firstCall THEN {
CleanBroken: SymTab.EachPairAction = {
PROC [key: Key, val: Val] RETURNS [quit: BOOL];
ENABLE UNWIND => NULL;
brokenList, newBrokenList, brokenListLoop: LIST OF BrokenCommand ← NIL;
changed: BOOLFALSE;
brokenList ← NARROW[val];
FOR brokenListLoop ← brokenList, brokenListLoop.rest UNTIL brokenListLoop = NIL DO
IF Rope.Equal[brokenListLoop.first.serverMachineName, serverMachinePupAddress, FALSE] THEN {
changed ← TRUE;
LOOP;
}
ELSE {
newBrokenList ← CONS[brokenListLoop.first, newBrokenList];
};
ENDLOOP;
IF changed THEN [] ← SymTab.Store[x: BrokenCommandTable, key: key, val: newBrokenList];
RETURN[FALSE];
};
CleanExtra: SymTab.EachPairAction = {
PROC [key: Key, val: Val] RETURNS [quit: BOOL];
ENABLE UNWIND => NULL;
extraList, newExtraList, extraListLoop: LIST OF ExtraCommand ← NIL;
changed: BOOLFALSE;
extraList ← NARROW[val];
FOR extraListLoop ← extraList, extraListLoop.rest UNTIL extraListLoop = NIL DO
IF Rope.Equal[extraListLoop.first.serverMachineName, serverMachinePupAddress, FALSE] THEN {
changed ← TRUE;
LOOP;
}
ELSE {
newExtraList ← CONS[extraListLoop.first, newExtraList];
};
ENDLOOP;
IF changed THEN [] ← SymTab.Store[x: ExtraCommandTable, key: key, val: newExtraList];
RETURN[FALSE];
};
[] ← SymTab.Pairs[x: BrokenCommandTable, action: CleanBroken];
[] ← SymTab.Pairs[x: ExtraCommandTable, action: CleanExtra];
};
};
getServerInterfaceFromCache: ENTRY PROC [serverInstance: RPC.ShortROPE] RETURNS [serverInterface: ComputeServerRpcControl.InterfaceRecord ← NIL] = {
ENABLE UNWIND => NULL;
newServerInterface: ComputeServerRpcControl.InterfaceRecord ← NIL;
bestIndex: INT ← 0;
bestTime: BasicTime.GMT ← BasicTime.latestGMT;
now: BasicTime.GMT ← BasicTime.Now[];
FOR index: INT IN [0..serverInterfaceCacheSize) DO
IF Rope.Equal[serverInstance, serverInterfaceCache[index].serverInstance] THEN {
newServerInterface ← serverInterfaceCache[index].interface;
serverInterfaceCache[index].lastUsed ← now;
RETURN[serverInterfaceCache[index].interface];
};
IF BasicTime.Period[bestTime, serverInterfaceCache[index].lastUsed] < 0 THEN {
bestTime ← serverInterfaceCache[index].lastUsed;
bestIndex ← index;
};
ENDLOOP;
newServerInterface ← ComputeServerRpcControl.ImportNewInterface[
interfaceName: [ type: "ComputeServer.summoner", instance: serverInstance, version: [1,1]]
! RPC.ImportFailed => {
CONTINUE;
};
];
IF newServerInterface # NIL THEN {
serverInterfaceCache[bestIndex].lastUsed ← now;
serverInterfaceCache[bestIndex].serverInstance ← serverInstance;
serverInterfaceCache[bestIndex].interface ← newServerInterface;
};
RETURN[newServerInterface];
};
deleteServerInterfaceFromCache: ENTRY PROC [serverInterface: ComputeServerRpcControl.InterfaceRecord] RETURNS [gotIt: BOOLFALSE] = {
ENABLE UNWIND => NULL;
FOR index: INT IN [0..serverInterfaceCacheSize) DO
IF serverInterfaceCache[index].interface = serverInterface THEN {
serverInterfaceCache[index].serverInstance ← NIL;
serverInterfaceCache[index].interface ← NIL;
serverInterfaceCache[index].lastUsed ← BasicTime.earliestGMT;
gotIt ← TRUE;
EXIT;
};
ENDLOOP;
};
findOrCreateQueueEntry: ENTRY PROC [service: ROPE] RETURNS [entry: QueueEntry ← NIL, item: WaitingRequest] = {
ENABLE UNWIND => NULL;
item ← NEW[WaitingRequestRec ];
FOR queue: LIST OF QueueEntry ← WaitingQueue, queue.rest UNTIL queue = NIL DO
IF Rope.Equal[queue.first.service, service, FALSE] THEN {
entry ← queue.first;
IF entry.waiters = NIL THEN entry.waiters ← CONS[item, NIL]
ELSE FOR l: LIST OF WaitingRequest ← entry.waiters, l.rest UNTIL l = NIL DO
IF l.rest = NIL THEN {l.rest ← CONS[item, NIL]; EXIT};
ENDLOOP;
RETURN; -- existing queue
};
ENDLOOP;
WaitingQueue ← CONS[
entry ← NEW[QueueEntryRec ← [service: service, waiters: LIST[item]]],
WaitingQueue
];
TRUSTED {Process.InitializeCondition[@entry.changed, Process.SecondsToTicks[1]];};
};
findQueuedServers: ENTRY PROC [service: ROPE] RETURNS [queuedCommand: BOOL, servers: LIST OF ROPE] ={
FOR queue: LIST OF QueueEntry ← WaitingQueue, queue.rest UNTIL queue = NIL DO
IF Rope.Equal[queue.first.service, service, FALSE] THEN {
FOR lor: LIST OF ROPE ← queue.first.servers, lor.rest UNTIL lor = NIL DO
servers ← CONS[lor.first, servers];
ENDLOOP;
RETURN [TRUE, servers]; -- existing queue
};
ENDLOOP;
RETURN [FALSE, NIL];
};
setNewQueueing: ENTRY PROC [] = {
ENABLE UNWIND => NULL;
FOR l: LIST OF ServerStatus ← ServerStatusList, l.rest UNTIL l = NIL DO
l.first.newQueueing ← TRUE;
ENDLOOP;
LastReBuildQueueTime ← BasicTime.Now[];
};
queueItemReady: ENTRY PROC [entry: QueueEntry, item: WaitingRequest]= {
ENABLE UNWIND => NULL;
firstTime: BOOLTRUE;
item.waiting ← true;
DO
IF BasicTime.Period[from: item.timeout, to: BasicTime.Now[]] > 0 THEN {item.waiting ← timedOut; RETURN};
SELECT item.waiting FROM
true => {
IF entry.servers # NIL AND entry.waiters.first = item THEN {
bestStatus: ServerStatus ← NIL;
prev, prevBest: LIST OF ROPENIL;
best: LIST OF ROPENIL;
serv: ROPE;
FOR l: LIST OF ROPE ← entry.servers, l.rest UNTIL l = NIL DO
statusFound: BOOL;
val: REF ANY;
status: ServerStatus;
serv ← l.first;
[found: statusFound, val: val] ← SymTab.Fetch[x: MachineToStatusTable, key: serv];
IF statusFound THEN {
status ← NARROW[val];
IF IsStatusBetter[status, bestStatus] THEN {
bestStatus ← status;
best ← l;
prevBest ← prev;
};
};
prev ← l;
ENDLOOP;
IF bestStatus # NIL THEN {
IF prevBest = NIL THEN {
item.instance ← entry.servers.first;
entry.servers ← entry.servers.rest;
}
ELSE {
item.instance ← best.first;
prevBest.rest ← best.rest;
};
item.serverAssigned ← TRUE;
item.waiting ← validating;
RETURN;
};
};
};
done => {
item.serverAssigned ← TRUE;
item.waiting ← validating;
RETURN;
};
timedOut => {
RETURN;
};
ENDCASE;
IF firstTime THEN {forceReBuildQueuing ← TRUE};
firstTime ← FALSE;
WAIT entry.changed;
ENDLOOP;
};
removeQueueItem: ENTRY PROC [entry: QueueEntry, item: WaitingRequest] = {
ENABLE UNWIND => NULL;
BROADCAST entry.changed;
IF entry.waiters = NIL THEN RETURN; -- actually, this is a bug
IF entry.waiters.first = item THEN {entry.waiters ← entry.waiters.rest; RETURN};
FOR l: LIST OF WaitingRequest ← entry.waiters, l.rest UNTIL l.rest = NIL DO
IF l.rest.first = item THEN {l.rest ← l.rest.rest; RETURN};
ENDLOOP;
};
MightAcceptQueuedCommand: PUBLIC ENTRY PROC [serverMachineAddress: RPC.ShortROPE, commandName: Rope.ROPE] = {
ENABLE UNWIND => NULL;
item: WaitingRequest ← NIL;
IF ~InterfaceExported THEN ERROR RPC.CallFailed[unbound]; -- local import still works after we "UnexportInterface"
FOR queue: LIST OF QueueEntry ← WaitingQueue, queue.rest UNTIL queue = NIL DO
entry: QueueEntry = queue.first;
IF Rope.Equal[commandName, entry.service, FALSE] THEN {
IF entry.waiters = NIL OR entry.waiters.first.waiting # true THEN { -- nobody waiting now or top waiter is busy
IF entry.servers = NIL THEN entry.servers ← CONS[serverMachineAddress, NIL]
ELSE FOR l: LIST OF ROPE ← entry.servers, l.rest UNTIL l = NIL DO
IF Rope.Equal[l.first , serverMachineAddress, FALSE] THEN EXIT;
IF l.rest = NIL THEN {l.rest ← CONS[serverMachineAddress, NIL]; EXIT};
ENDLOOP;
BROADCAST entry.changed;
EXIT;
}
ELSE {
entry.waiters.first.waiting ← done;
entry.waiters.first.instance ← serverMachineAddress;
BROADCAST entry.changed;
EXIT;
};
};
ENDLOOP;
};
GenericToController: PUBLIC PROC [requestCode: ATOM, requestString: Rope.ROPE] RETURNS [resultCode: ATOM ← NIL, resultString: Rope.ROPE ← NIL] = {
 generic call to allow for expansion without RPC interface recompilation
IF ~InterfaceExported THEN ERROR RPC.CallFailed[unbound]; -- local import still works after we "UnexportInterface"
SELECT requestCode FROM
$CommandInfo => {
getBrokenList: PROC RETURNS [brokenNames: ROPENIL, goodNames: ROPENIL] = {
brokenVal: REF ANY;
brokenList: LIST OF BrokenCommand;
[val: brokenVal] ← SymTab.Fetch[x: BrokenCommandTable, key: requestString];
brokenList ← NARROW[brokenVal];
FOR statusList: LIST OF ServerStatus ← ServerStatusList, statusList.rest UNTIL statusList = NIL DO
skipIt: BOOLFALSE;
serverMachineName: ROPE ← statusList.first.serverMachineName;
serverMachinePupAddress: ROPE ← statusList.first.serverMachinePupAddress;
brokenListLoop: LIST OF BrokenCommand;
FOR brokenListLoop ← brokenList, brokenListLoop.rest UNTIL brokenListLoop = NIL DO
IF Rope.Equal[brokenListLoop.first.serverMachineName, serverMachineName, FALSE] OR Rope.Equal[brokenListLoop.first.serverMachineName, serverMachinePupAddress, FALSE] THEN {
brokenNames ← IF brokenNames = NIL THEN serverMachineName ELSE Rope.Cat[brokenNames, ", ", serverMachineName];
skipIt ← TRUE;
EXIT;
};
ENDLOOP;
IF skipIt THEN LOOP;
goodNames ← IF goodNames = NIL THEN serverMachineName ELSE Rope.Cat[goodNames, ", ", serverMachineName];
ENDLOOP;
};
result: LIST OF ROPENIL;
foundCommand: BOOL;
IF Rope.IsEmpty[requestString] THEN { -- list all commands
normalCommands: ROPENIL;
extraCommands: ROPENIL;
normals: SymTab.EachPairAction = {
PROC [key: Key, val: Val] RETURNS [quit: BOOL];
ENABLE UNWIND => NULL;
normalCommands ← IF normalCommands = NIL THEN key ELSE Rope.Cat[normalCommands, ", ", key];
RETURN[FALSE];
};
extras: SymTab.EachPairAction = {
PROC [key: Key, val: Val] RETURNS [quit: BOOL];
ENABLE UNWIND => NULL;
extraCommands ← IF extraCommands = NIL THEN key ELSE Rope.Cat[extraCommands, ", ", key];
RETURN[FALSE];
};
[] ← SymTab.Pairs[x: ComputeServerInternal.CommandTable, action: normals];
[] ← SymTab.Pairs[x: ExtraCommandTable, action: extras];
resultString ← Rope.Concat["Normal commands are ", normalCommands];
IF extraCommands # NIL THEN resultString ← Rope.Cat[resultString, "; and extra commands are ", extraCommands];
RETURN [$success, resultString];
}
ELSE { -- list a particular command
[queuedCommand: foundCommand, servers: result] ← findQueuedServers[requestString];
IF foundCommand THEN { -- command is queued
brokenNames: ROPENIL;
brokenNames ← getBrokenList[].brokenNames;
IF result = NIL THEN {
resultString ← IF brokenNames = NIL THEN "Command is queued, but no servers are waiting" ELSE Rope.Concat["Command is queued, no servers are waiting, and the command is unavailable on servers ", brokenNames];
}
ELSE {
FOR lor: LIST OF ROPE ← result, lor.rest UNTIL lor = NIL DO
printName: ROPE;
printName ← PupName.HisName[PupName.NameLookup[lor.first, Pup.nullSocket]];
resultString ← IF resultString = NIL THEN printName ELSE Rope.Cat[resultString, ", ", printName];
ENDLOOP;
resultString ← Rope.Concat["Command is queued on servers ", resultString];
IF brokenNames # NIL THEN resultString ← Rope.Cat[resultString, "; and the command is unavailable on servers ", brokenNames];
};
resultCode ← $success;
}
ELSE { -- command is normal
[found: foundCommand] ← SymTab.Fetch[x: ComputeServerInternal.CommandTable, key: requestString];
IF foundCommand THEN {
command should be out on (almost) every server
goodNames: ROPENIL;
brokenNames: ROPENIL;
[brokenNames, goodNames] ← getBrokenList[];
SELECT TRUE FROM
goodNames = NIL AND brokenNames = NIL => {
resultString ← "No servers are up";
};
goodNames # NIL AND brokenNames = NIL => {
resultString ← "All servers should accept the command";
};
goodNames = NIL AND brokenNames # NIL => {
resultString ← "All servers refuse the command (it is broken everywhere)";
};
goodNames # NIL AND brokenNames # NIL => {
resultString ← Rope.Cat["Command is available on servers ", goodNames, ", but is unavailable on servers ", brokenNames];
};
ENDCASE;
}
ELSE {
command should be out on (almost) no servers
foundExtra: BOOL;
extraVal: REF ANY;
extraNames: ROPENIL;
extraListLoop: LIST OF ExtraCommand;
[found: foundExtra, val: extraVal] ← SymTab.Fetch[x: ExtraCommandTable, key: requestString];
FOR extraListLoop ← NARROW[extraVal], extraListLoop.rest UNTIL extraListLoop = NIL DO
printName: ROPE;
statusFound: BOOL;
val: REF ANY;
[found: statusFound, val: val] ← SymTab.Fetch[x: MachineToStatusTable, key: extraListLoop.first.serverMachineName];
printName ← PupName.HisName[PupName.NameLookup[extraListLoop.first.serverMachineName, Pup.nullSocket]];
IF statusFound THEN extraNames ← IF extraNames = NIL THEN printName ELSE Rope.Cat[extraNames, ", ", printName];
ENDLOOP;
IF extraNames = NIL THEN resultString ← "Command not known" ELSE resultString ← Rope.Concat["Command is available on servers ", extraNames];
};
RETURN [$success, resultString];
};
};
};
$ServerLoads => {
FOR statusList: LIST OF ServerStatus ← ServerStatusList, statusList.rest UNTIL statusList = NIL DO
thisString: Rope.ROPE ← NIL;
status: ServerStatus = statusList.first;
IF resultString # NIL THEN resultString ← Rope.Cat[resultString, ", "];
thisString ← IO.PutFR["(%5.2f, %5.2f, %5.2f, %5.2f, %g)", IO.real[status.aveBackgroundLoad], IO.real[status.nonBackgroundCPULoad], IO.real[status.CPULoad], IO.real[status.FOM], IO.int[status.numberOfCurrentRequests] ];
resultString ← Rope.Cat[resultString, status.serverMachineName, thisString];
ENDLOOP;
RETURN [$success, resultString];
};
$CurrentController => {
IF ~IAmTheController OR ControllerMovingAway THEN RETURN [$NotCurrentController, NIL] ELSE RETURN [$CurrentController, NIL];
};
ENDCASE => RETURN [$notImplemented, NIL];
};
IsStatusBetter: PROC [status: ServerStatus, bestStatus: ServerStatus] RETURNS [ itsBetter: BOOLFALSE] = {
IF bestStatus = NIL THEN {
IF (status.reclamationRate < 20) AND (status.nonBackgroundCPULoad < 0.99 ) THEN itsBetter ← TRUE;
}
ELSE {
IF status.FOM < bestStatus.FOM THEN itsBetter ← TRUE;
};
};
locked: BOOLFALSE;
lockCondition: CONDITION;
Lock: ENTRY PROC = {
WHILE locked DO
WAIT lockCondition;
ENDLOOP;
locked ← TRUE;
};
UnLock: ENTRY PROC = {
locked ← FALSE;
NOTIFY lockCondition
};
Grap Controller
SetGrabController: Commander.CommandProc = {
ComputeControllerInternal.GrabController ← TRUE;
};
Controller Event Typescript
StartControllerEventTypescript: Commander.CommandProc = {
TRUSTED {Process.Detach[FORK ControllerEventTypescript[]];};
};
ControllerEventTypescript: PROC = {
ENABLE IO.Error => GOTO ViewerDestroyed;
currentViewer: ViewerClasses.Viewer ← NIL;
currentStream: IO.STREAMNIL;
currentEvent: REF ControllerEvent ← NIL;
currentViewer ← ViewerOps.CreateViewer[flavor: $Typescript, info: [iconic: FALSE, name: "Compute Server ""Controller"" Event Log"]];
currentStream ← ViewerIO.CreateViewerStreams[name: "", viewer: currentViewer].out;
currentStream.PutF["Starting log at %g\n", IO.time[BasicTime.Now[]]];
DO
currentEvent ← NextControllerEvent[currentEvent];
SELECT currentEvent.type FROM
service => {
currentStream.PutF["Service %g for %g", IO.rope[currentEvent.command], IO.rope[currentEvent.remoteUser]];
IF ~Rope.IsEmpty[currentEvent.remoteMachineName] THEN {
IF Rope.Find[currentEvent.remoteMachineName, "#"] < 0 THEN
currentStream.PutF[" to %g", IO.rope[currentEvent.remoteMachineName]]
ELSE currentStream.PutF[" to %g", IO.rope[PupName.HisName[PupName.NameLookup[ currentEvent.remoteMachineName, Pup.nullSocket]]]];
};
currentStream.PutF[" at %g\n", IO.time[currentEvent.time]];
};
newPackage => {
currentStream.PutF["New package %g at %g\n", IO.rope[currentEvent.command], IO.time[currentEvent.time]];
};
newServer => {
currentStream.PutF[" --- New server from %g on %g at %g\n", IO.rope[currentEvent.remoteUser], IO.rope[currentEvent.remoteMachineName], IO.time[currentEvent.time]];
};
lostServer => {
currentStream.PutF[" --- Droped %g from %g at %g because %g\n", IO.rope[currentEvent.remoteMachineName], IO.rope[currentEvent.remoteUser], IO.time[currentEvent.time], IO.rope[currentEvent.reason]];
};
becomeController => {
currentStream.PutF[" **** Became controller at %g ", IO.time[currentEvent.time]];
IF ~Rope.IsEmpty[currentEvent.reason] THEN currentStream.PutF["because %g", IO.rope[currentEvent.reason]];
IF ~Rope.IsEmpty[currentEvent.remoteMachineName] THEN currentStream.PutF["; previous controller was %g\n", IO.rope[currentEvent.remoteMachineName]];
currentStream.PutF["\n"];
};
noticeOtherController => {
currentStream.PutF[" **** Noticed other controller at %g", IO.time[currentEvent.time]];
IF ~Rope.IsEmpty[currentEvent.reason] THEN currentStream.PutF[" because %g", IO.rope[currentEvent.reason]];
IF ~Rope.IsEmpty[currentEvent.remoteMachineName] THEN currentStream.PutF["; potential new controller is %g\n", IO.rope[currentEvent.remoteMachineName]];
currentStream.PutF["\n"];
};
cedeController => {
currentStream.PutF[" **** Ceded controller at %g", IO.time[currentEvent.time]];
IF ~Rope.IsEmpty[currentEvent.reason] THEN currentStream.PutF[" because %g", IO.rope[currentEvent.reason]];
IF ~Rope.IsEmpty[currentEvent.remoteMachineName] THEN currentStream.PutF["; new controller is %g\n", IO.rope[currentEvent.remoteMachineName]];
currentStream.PutF["\n"];
};
ENDCASE;
ENDLOOP;
EXITS
ViewerDestroyed => {};
};
NextControllerEvent: ENTRY PROC [last: REF ControllerEvent ← NIL]
RETURNS [REF ControllerEvent] = {
IF last = NIL THEN {
first call for this client
IF controllerEventListTail = NIL THEN {
no events in list yet
watchingController ← TRUE;
UNTIL controllerEventListTail # NIL DO WAIT anotherControllerEvent ENDLOOP;
RETURN [controllerEventListTail];
}
ELSE last ← controllerEventListTail;
};
UNTIL last.chain # NIL DO WAIT anotherControllerEvent ENDLOOP; -- wait for next event
RETURN [last.chain];
};
ReportControllerEvent: ENTRY PROC [type: ControllerEventType, reason: Rope.ROPE, command: Rope.ROPE, time: BasicTime.GMT, remoteMachineName: Rope.ROPE, userName: Rope.ROPE] = {
InternalReportControllerEvent[type, reason, command, time, remoteMachineName, userName];
};
InternalReportControllerEvent: INTERNAL PROC [type: ControllerEventType, reason: Rope.ROPE, command: Rope.ROPE, time: BasicTime.GMT, remoteMachineName: Rope.ROPE, userName: Rope.ROPE] = {
IF watchingController THEN {
eventREF: REF ControllerEvent =
NEW [ ControllerEvent ← [type, reason, command, time, remoteMachineName, userName, NIL] ];
IF controllerEventListTail # NIL THEN controllerEventListTail.chain ← eventREF;
controllerEventListTail ← eventREF;
BROADCAST anotherControllerEvent;
};
};
Debugging Process
JustAProcessThatYouCanFreeze: PROC = {
DO
Process.Pause[200];
ENDLOOP;
};
Initialize
TRUSTED {Process.Detach[FORK JustAProcessThatYouCanFreeze[]];};
Commander.Register[key: "///Commands/SummonerBecomeController", proc: SetGrabController, doc: "Try to force the Compute Server Controller to this machine"];
Commander.Register[key: "///Commands/SummonerControllerTypescript", proc: StartControllerEventTypescript, doc: "Start typescript of controller events"];
TRUSTED {Process.InitializeCondition[@NoticeNewPackageCondition, Process.SecondsToTicks[1]];};
serverInterfaceCache ← NEW[serverInterfaceArray];
END.
Bob Hagmann December 24, 1985 2:21:09 pm PST
Made sure in NewStats that status was not NIL before doing newQueueing processing
Bob Hagmann January 15, 1986 5:43:15 pm PST
Queuing of commands did not drive up the usage numbers
changes to: bumpBestServerStats, FindService, FindServiceWithQueueing