ComputeControllerImpl.mesa
Implement the controller. This runs on any machine that is also a compute server.
Last Edited by: Bob Hagmann, December 24, 1985 2:20:51 pm PST
Copyright © 1984 by Xerox Corporation. All rights reserved.
DIRECTORY
AMProcess,
BasicTime,
Commander,
ComputeControllerInternal,
ComputeServerControl,
ComputeServerController,
ComputeServerControllerRpcControl,
ComputeServer,
ComputeServerInternal,
ComputeServerRpcControl,
DFUtilities,
FS,
GVBasics,
GVNames,
IO,
PrincOps,
PrincOpsUtils,
Process,
PupDefs,
PupTypes,
Real,
Rope,
RPC,
SummonerControllerControl,
SymTab,
UserCredentials,
UserProfile,
WorldVM;
ComputeControllerImpl: CEDAR MONITOR
IMPORTS AMProcess, BasicTime, Commander, ComputeControllerInternal, ComputeServerControl, ComputeServerControllerRpcControl, ComputeServerInternal, ComputeServerRpcControl, DFUtilities, FS, GVNames, IO, Process, Real, Rope, RPC, SymTab, UserCredentials, UserProfile, WorldVM
EXPORTS ComputeControllerInternal, ComputeServerController, SummonerControllerControl
= BEGIN
ROPE: TYPE = Rope.ROPE;
NoticeNewPackageCondition: CONDITION;
InterfaceExported: BOOLFALSE;
ControllerEnabled: BOOLFALSE;
LatentControllerActive: BOOLTRUE;
IAmTheController: PUBLIC BOOLFALSE;
ControllerMovingAway: PUBLIC BOOLFALSE;
GrabController: PUBLIC BOOLFALSE;
TimeIBecameTheController: PUBLIC BasicTime.GMT ← BasicTime.nullGMT;
ControllerUp: BOOLTRUE;
ControllerGVName: ROPENIL;
ControllerError: TYPE = {none, import, callFailed};
LastControllerError: ControllerError ← none;
LastImportFailure: RPC.ImportFailure;
LastCallFailure: RPC.CallFailure;
ControllerFailedTime: BasicTime.GMT;
LastReBuildQueueTime: BasicTime.GMT ← BasicTime.nullGMT;
ServerStatusRecord: TYPE = ComputeControllerInternal.ServerStatusRecord;
ServerStatus: TYPE = REF ServerStatusRecord;
ServerStatusList: PUBLIC LIST OF ServerStatus ← NIL;
BrokenCommandTable: SymTab.Ref; -- knowing a command, find where its broken
ExtraCommandTable: SymTab.Ref; -- knowing a non-standard command, find who can run it
MachineToStatusTable: SymTab.Ref; -- knowing a machine's name, find its status; name is of the form "Hornet" or "3#35#"
BrokenCommandObject: TYPE = RECORD [
serverMachineName: RPC.ShortROPE,
versions: LIST OF ROPENIL
];
BrokenCommand: TYPE = REF BrokenCommandObject;
ExtraCommandObject: TYPE = RECORD [
serverMachineName: RPC.ShortROPE,
versions: LIST OF ROPENIL
];
ExtraCommand: TYPE = REF ExtraCommandObject;
forceReBuildQueuing: BOOLFALSE;
WaitingQueue: LIST OF QueueEntry ← NIL;
QueueEntry: TYPE = REF QueueEntryRec;
QueueEntryRec: TYPE = RECORD [
service: ROPENIL,
doQueueing: BOOLFALSE,
waiters: LIST OF WaitingRequest ← NIL,
servers: LIST OF ROPENIL,
changed: CONDITION
];
WaitingRequest: TYPE = REF WaitingRequestRec;
WaitingRequestRec: TYPE = RECORD [
process: PROCESS,
timeout: BasicTime.GMT,
waiting: waitType ← init,
version: ROPE,
clientMachineName: RPC.ShortROPE,
streamPupAddress: PupDefs.PupAddress,
needListener: BOOL,
serverAssigned: BOOLFALSE,
response: ComputeServer.AskResponce,
instance: RPC.ShortROPE,
serverPupAddress: PupDefs.PupAddress
];
waitType: TYPE = {init, true, validating, done, timedOut};
serverInterfaceItem: TYPE = RECORD [
serverInstance: ROPENIL,
interface: ComputeServerRpcControl.InterfaceRecord ← NIL,
lastUsed: BasicTime.GMT ← BasicTime.earliestGMT
];
serverInterfaceCacheSize: INT = 20;
serverInterfaceArray: TYPE = ARRAY [0..serverInterfaceCacheSize) OF serverInterfaceItem;
serverInterfaceCache: REF serverInterfaceArray;
CheckState: TYPE = {checking, aborted, done};
CheckItemProcessObj: TYPE = RECORD[
check: CheckState ← checking,
communicationsOK: BOOLTRUE,
askFound: ComputeServer.AskResponce,
serverPupAddress: PupDefs.PupAddress,
errMsg: ROPE
];
NoOfLatentSummonerControllerProcesses: INT ← 0 ;
StartUpController: PUBLIC PROC [controllerName: ROPENIL, forceSelfAsPrimaryController: BOOLFALSE] RETURNS [] = {
locked: BOOLFALSE;
{
ENABLE UNWIND => IF locked THEN UnLock[];
user, password: ROPE;
ControllerGVName ← IF Rope.IsEmpty[controllerName] THEN UserProfile.Token["Summoner.ControllerName"] ELSE controllerName;
IF Rope.IsEmpty[ControllerGVName] THEN ControllerGVName ← "PaloAlto.summoner" ;
ControllerEnabled ← TRUE;
LatentControllerActive ← TRUE;
ControllerUp ← TRUE;
Lock[];
locked ← TRUE;
IF ~InterfaceExported THEN {
info: GVNames.ConnectInfo;
connect: GVBasics.Connect;
[info: info, connect: connect ] ← GVNames.GetConnect[ControllerGVName];
IF info = individual OR info = group THEN {
IF forceSelfAsPrimaryController OR Rope.Equal[connect, ComputeServerControl.MyNetAddressRope] OR Rope.Equal[UserProfile.Token["Summoner.PrimaryController"], ComputeServerControl.MyNetAddressRope] THEN {
IAmTheController ← TRUE;
TimeIBecameTheController ← BasicTime.Now[];
[user, password] ← UserCredentials.Get[];
InterfaceExported ← TRUE;
BrokenCommandTable ← SymTab.Create[mod: 59, case: FALSE];
ExtraCommandTable ← SymTab.Create[mod: 59, case: FALSE];
MachineToStatusTable ← SymTab.Create[mod: 59, case: FALSE];
ComputeServerControllerRpcControl.ExportInterface[
interfaceName: [ type: "ComputeServerController.summoner", instance: ControllerGVName, version: [1,1]],
user: user,
password: RPC.MakeKey[password]
! RPC.ExportFailed => {
InterfaceExported ← FALSE;
IAmTheController ← FALSE;
CONTINUE;
};
];
};
};
};
UnLock[];
locked ← FALSE;
TRUSTED {Process.Detach[FORK LatentSummonerController[]] ;};
};
};
ControllerCannnotBeImported: PUBLIC PROC [reason: RPC.ImportFailure] = {
LastControllerError ← import;
LastImportFailure ← reason;
NoticeControllerIsDown[];
};
ControllerCallFailed: PUBLIC PROC [reason: RPC.CallFailure] = {
LastControllerError ← callFailed;
LastCallFailure ← reason;
NoticeControllerIsDown[];
};
NoticeControllerIsDown: PROC = {
IF ControllerUp THEN ControllerFailedTime ← BasicTime.Now[];
ControllerUp ← FALSE;
};
NoticeControllerIsUp: PUBLIC PROC = {
ControllerUp ← TRUE;
};
TryThisController: PROC RETURNS [importedOK: BOOLTRUE, controllerInterface: ComputeServerControllerRpcControl.InterfaceRecord] = {
controllerInterface ← ComputeServerControllerRpcControl.ImportNewInterface[
interfaceName: [ type: "ComputeServerController.summoner", instance: ControllerGVName, version: [1,1]]
! RPC.ImportFailed => {
importedOK ← FALSE;
CONTINUE;
};
];
};
LatentSummonerController: PROC [] = {
MakeSureThereIsOnlyOne: ENTRY PROC RETURNS [exit: BOOLFALSE] = {
IF NoOfLatentSummonerControllerProcesses = 0 THEN NoOfLatentSummonerControllerProcesses ← 1
ELSE RETURN[TRUE];
};
locked: BOOLFALSE;
IF MakeSureThereIsOnlyOne[] THEN RETURN;
DO
ENABLE UNWIND => IF locked THEN UnLock[];
IF ~LatentControllerActive THEN EXIT;
IF ~ControllerUp OR GrabController THEN { -- controller is down or I should grab it - see if I should acquire the controller function
info: GVNames.ConnectInfo;
connect: GVBasics.Connect;
ControllerMovingAway ← FALSE;
IF GrabController OR ~TryThisController[].importedOK THEN { -- down but import will work soon
downTime: INT = BasicTime.Period[ControllerFailedTime, BasicTime.Now[]];
[info: info, connect: connect ] ← GVNames.GetConnect[ControllerGVName];
IF GrabController OR downTime > 83 OR
((downTime > 13 AND (info = individual OR info = group) AND Rope.Equal[connect, ComputeServerControl.MyNetAddressRope]) OR Rope.Equal[UserProfile.Token["Summoner.PrimaryController"], ComputeServerControl.MyNetAddressRope])
THEN {
user, password: ROPE;
Lock[];
locked ← TRUE;
GrabController ← FALSE;
IAmTheController ← TRUE;
TimeIBecameTheController ← BasicTime.Now[];
[user, password] ← UserCredentials.Get[];
IF InterfaceExported THEN ComputeServerControllerRpcControl.UnexportInterface[];
InterfaceExported ← TRUE;
BrokenCommandTable ← SymTab.Create[mod: 59, case: FALSE];
ExtraCommandTable ← SymTab.Create[mod: 59, case: FALSE];
MachineToStatusTable ← SymTab.Create[mod: 59, case: FALSE];
ComputeServerControllerRpcControl.ExportInterface[
interfaceName: [ type: "ComputeServerController.summoner", instance: ControllerGVName, version: [1,1]],
user: user,
password: RPC.MakeKey[password]
! RPC.ExportFailed => {
InterfaceExported ← FALSE;
IAmTheController ← FALSE;
CONTINUE;
};
];
UnLock[];
locked ← FALSE;
};
};
}
ELSE { -- Controller is up. If I think that I am the controller, but grapevine does not think so, then fix the situation
IF IAmTheController THEN {
info: GVNames.ConnectInfo;
connect: GVBasics.Connect;
[info: info, connect: connect ] ← GVNames.GetConnect[ControllerGVName];
IF (info = individual OR info = group) THEN {
IF ~Rope.Equal[connect, ComputeServerControl.MyNetAddressRope] THEN { -- Grapevine says I am not the Controller
triedControllerInterface: ComputeServerControllerRpcControl.InterfaceRecord;
tryDifferentController: BOOLFALSE;
importedOK: BOOL;
[importedOK: importedOK, controllerInterface: triedControllerInterface] ← TryThisController[];
See if this Controller thinks everything is fine
IF importedOK THEN [tryDifferentController: tryDifferentController] ← triedControllerInterface.GetSomeInfo[ ! RPC.CallFailed => {
importedOK ← FALSE;
CONTINUE;
};
];
IF importedOK AND ~tryDifferentController THEN { -- new controller answers
IF ServerStatusList = NIL THEN {
Lock[];
locked ← TRUE;
IAmTheController ← FALSE;
ControllerMovingAway ← FALSE;
ComputeServerControllerRpcControl.UnexportInterface[];
InterfaceExported ← FALSE;
ServerStatusList ← NIL;
BrokenCommandTable ← NIL;
ExtraCommandTable ← NIL;
UnLock[];
locked ← FALSE;
}
ELSE {
ControllerMovingAway ← TRUE;
};
}
ELSE { -- new controller does not answer or thinks it is moving too
ControllerMovingAway ← FALSE;
IF ~importedOK THEN { -- new controller does not answer — re-assert my controllership
user, password: ROPE;
[user, password] ← UserCredentials.Get[];
[] ← GVNames.SetConnect[user: user, password: RPC.MakeKey[password], individual: ControllerGVName, connect: ComputeServerControl.MyNetAddressRope];
};
};
}
ELSE { -- Grapevine says I am the Controller
ControllerMovingAway ← FALSE;
};
}
ELSE { -- NOT info = individual OR info = group (grapevine troubles)
ControllerMovingAway ← FALSE;
};
TossOldStatus[];
ReBuildQueuing[];
};
};
Process.Pause[Process.SecondsToTicks[127]];
ENDLOOP;
};
ShutDownController: PUBLIC PROC [] RETURNS [] = {
IF InterfaceExported THEN ComputeServerControllerRpcControl.UnexportInterface[];
InterfaceExported ← FALSE;
ControllerEnabled ← FALSE;
ServerStatusList ← NIL;
BrokenCommandTable ← NIL;
ExtraCommandTable ← NIL;
MachineToStatusTable ← NIL;
LatentControllerActive ← FALSE;
ControllerUp ← FALSE;
IAmTheController ← FALSE;
};
TossOldStatus: ENTRY PROC [] RETURNS [] = {
ENABLE UNWIND => NULL;
statusList: LIST OF ServerStatus;
lastStatusList: LIST OF ServerStatus ← NIL;
status: ServerStatus ← NIL;
now: BasicTime.GMT ← BasicTime.Now[];
FOR statusList ← ServerStatusList, statusList.rest UNTIL statusList = NIL DO
status ← statusList.first;
IF BasicTime.Period[status.timeOfStatus, now] > 30 THEN {
[] ← SymTab.Delete[x: MachineToStatusTable, key: status.serverMachineName];
[] ← SymTab.Delete[x: MachineToStatusTable, key: status.serverMachinePupAddress];
IF lastStatusList = NIL THEN ServerStatusList ← statusList.rest
ELSE lastStatusList.rest ← statusList.rest;
};
lastStatusList ← statusList;
ENDLOOP;
};
ReBuildQueuing: PROC [] RETURNS [] = {
now: BasicTime.GMT ← BasicTime.Now[];
IF forceReBuildQueuing OR BasicTime.Period[LastReBuildQueueTime, now] > 300 THEN {
setNewQueueing[];
LastReBuildQueueTime ← now;
};
forceReBuildQueuing ← FALSE;
};
GetSomeInfo: PUBLIC ENTRY PROC RETURNS [error: BOOLFALSE, tryDifferentController: BOOLFALSE, msg: ROPENIL, serverList: LIST OF ROPENIL, bestFOM: REAL ← 0.0] = {
ENABLE UNWIND => NULL;
bestStatus: ServerStatus ← NIL;
statusList: LIST OF ServerStatus;
IF ~IAmTheController THEN RETURN [ FALSE, TRUE, "controller has moved", NIL, 0.0];
FOR statusList ← ServerStatusList, statusList.rest UNTIL statusList = NIL DO
status: ServerStatus = statusList.first;
perCentBusy: INT ← Real.Round[status.CPULoad*100.0];
IF Rope.Equal[status.serverMachineName, ComputeServerInternal.myHostName] THEN {
serverList ← CONS[IO.PutFR["%g (Controller, %g%% busy)", IO.rope[status.serverMachineName], IO.int[perCentBusy]], serverList];
}
ELSE serverList ← CONS[IO.PutFR["%g (%g%%)", IO.rope[status.serverMachineName], IO.int[perCentBusy]], serverList];
IF Rope.Equal[status.serverMachineName, ComputeServerInternal.myHostName] THEN {
serverList ← CONS[Rope.Concat[status.serverMachineName, " (Controller)"], serverList];
}
ELSE serverList ← CONS[status.serverMachineName, serverList];
IF IsStatusBetter[status, bestStatus] THEN bestStatus ← status;
ENDLOOP;
IF bestStatus = NIL THEN RETURN [FALSE, ControllerMovingAway, "no servers up", serverList, 1.0];
RETURN [FALSE, ControllerMovingAway, NIL, serverList, bestStatus.FOM];
};
BestServerStats: PUBLIC ENTRY PROC RETURNS[instance: RPC.ShortROPE, FOM: REAL] = {
ENABLE UNWIND => NULL;
bestStatus: ServerStatus ← NIL;
statusList: LIST OF ServerStatus;
FOR statusList ← ServerStatusList, statusList.rest UNTIL statusList = NIL DO
status: ServerStatus = statusList.first;
IF IsStatusBetter[status, bestStatus] THEN bestStatus ← status;
ENDLOOP;
IF bestStatus = NIL THEN RETURN [NIL, 1.0];
RETURN [bestStatus.serverMachineName, bestStatus.FOM];
};
bumpBestServerStats: ENTRY PROC [instance: RPC.ShortROPE] = {
ENABLE UNWIND => NULL;
statusFound: BOOL;
val: REF ANY;
status: ServerStatus;
[found: statusFound, val: val] ← SymTab.Fetch[x: MachineToStatusTable, key: instance];
IF statusFound THEN {
status ← NARROW[val];
status.CPULoad ← MIN[ 1.0, status.CPULoad + 0.5];
status.reclamationRate ← status.reclamationRate + 30 ;
status.FOM ← figureOfMerit[status];
};
};
FindService: PUBLIC PROC [service: ROPE] RETURNS [found: BOOL, instance: RPC.ShortROPE] = {
[found, instance] ← InnerFindService[service];
};
FindServiceWithQueueing: PUBLIC PROCEDURE [service: ROPE, version: RPC.ShortROPE, timeToWait: INT, clientMachineName: RPC.ShortROPE, streamPupAddress: PupDefs.PupAddress, needListener: BOOL] RETURNS [found: ComputeServer.AskResponce, instance: RPC.ShortROPE, serverPupAddress: PupDefs.PupAddress, errMsg: ROPE] = {
entry: QueueEntry;
item: WaitingRequest ← NIL;
tryForServer: PROC [] = {
DO
commOK: BOOL;
foundBroken: BOOL;
brokenVal: REF ANY;
brokenList, brokenListLoop: LIST OF BrokenCommand;
askFound: ComputeServer.AskResponce;
queueItemReady[entry, item];
IF item.waiting = timedOut THEN EXIT;
[found: foundBroken, val: brokenVal] ← SymTab.Fetch[x: BrokenCommandTable, key: service];
brokenList ← NARROW[brokenVal];
FOR brokenListLoop ← brokenList, brokenListLoop.rest UNTIL brokenListLoop = NIL DO
IF Rope.Equal[brokenListLoop.first.serverMachineName, item.instance, FALSE] THEN {
item.serverAssigned ← FALSE;
item.waiting ← true;
EXIT;
};
ENDLOOP;
IF ~item.serverAssigned THEN LOOP;
[communicationsOK: commOK, askFound: askFound, serverPupAddress: serverPupAddress] ← CheckItem [entry, item];
IF ~commOK OR (askFound # foundOK) THEN {
item.serverAssigned ← FALSE;
item.waiting ← true;
LOOP;
};
item.response ← askFound;
item.serverPupAddress ← serverPupAddress;
SELECT askFound FROM
notFound => {
removeQueueItem[entry, item];
errMsg ← NIL;
RETURN;
};
foundButTooBusy => {
LOOP;
};
foundOK => {
removeQueueItem[entry, item];
RETURN;
};
ENDCASE;
ENDLOOP;
removeQueueItem[entry, item];
};
[entry, item] ← findOrCreateQueueEntry[service];
item.version ← version;
item.clientMachineName ← clientMachineName;
item.streamPupAddress ← streamPupAddress;
item.needListener ← needListener;
IF entry.doQueueing THEN { -- queueing already being done
item.process ← LOOPHOLE[Process.GetCurrent[]];
item.timeout ← BasicTime.Update[BasicTime.Now[], timeToWait];
item.waiting ← true;
tryForServer[];
bumpBestServerStats[item.instance];
RETURN[found: item.response, instance: item.instance, serverPupAddress: item.serverPupAddress, errMsg: NIL];
}
ELSE { -- queueing not started
serversTried: LIST OF RPC.ShortROPE ← NIL;
DO
commOK: BOOL;
serviceFound: BOOL;
instance: RPC.ShortROPE;
askFound: ComputeServer.AskResponce;
entry.doQueueing ← TRUE;
setNewQueueing[];
[serviceFound, instance] ← InnerFindService[service];
item.instance ← instance;
IF ~serviceFound THEN {
found ← notFound;
removeQueueItem[entry, item];
RETURN;
};
FOR oldServers: LIST OF RPC.ShortROPE ← serversTried, oldServers.rest UNTIL oldServers = NIL DO
IF Rope.Equal[oldServers.first, instance, FALSE] THEN {
We are about to retry the same server again. Give up probing, and start queueing.
serversTried ← NIL; -- help garbage collector
entry.doQueueing ← TRUE;
setNewQueueing[];
item.process ← LOOPHOLE[Process.GetCurrent[]];
item.timeout ← BasicTime.Update[BasicTime.Now[], timeToWait];
item.waiting ← true;
tryForServer[];
RETURN[found: item.response, instance: item.instance, serverPupAddress: item.serverPupAddress, errMsg: NIL];
};
ENDLOOP;
serversTried ← CONS[instance, serversTried];
[communicationsOK: commOK, askFound: askFound, serverPupAddress: serverPupAddress] ← CheckItem[entry, item];
IF ~commOK THEN LOOP;
SELECT askFound FROM
notFound => {
removeQueueItem[entry, item];
RETURN [askFound, instance, serverPupAddress, NIL];
};
foundButTooBusy => {
LOOP;
};
foundOK => {
removeQueueItem[entry, item];
RETURN [askFound, instance, serverPupAddress, NIL];
};
ENDCASE;
ENDLOOP;
};
};
CheckItem: PROC [entry: QueueEntry, item: WaitingRequest] RETURNS [communicationsOK: BOOLTRUE, askFound: ComputeServer.AskResponce, serverPupAddress: PupDefs.PupAddress, errMsg: ROPE] = {
checkProcess: PROCESS;
checkObject: REF CheckItemProcessObj ← NEW[CheckItemProcessObj];
startTime: BasicTime.GMT ← BasicTime.Now[];
checkProcess ← FORK CheckItemProcess[entry, item, checkObject];
DO
IF BasicTime.Period[startTime, BasicTime.Now[]] > 9 THEN TRUSTED {
Remote machine is not unresponsive, but will not answer the call
checkProcessTV: AMProcess.Process = AMProcess.PSBIToTV[world: WorldVM.LocalWorld[], psbi: PrincOpsUtils.ProcessToPsbIndex[LOOPHOLE[checkProcess]]];
Detach so we can return. Freeze, abort and thaw the process. After a long time (minutes if ever), the process will finally wake up and go away. Also, if the remote host stops answering the ping, the call will fail and the process will exit.
Process.Detach[checkProcess];
AMProcess.Freeze[LIST[checkProcessTV]];
AMProcess.Abort[checkProcessTV];
AMProcess.Thaw[LIST[checkProcessTV]];
checkObject.check ← aborted;
RETURN[FALSE, notFound, PupTypes.fillInPupAddress, ""];
};
IF checkObject.check = done THEN TRUSTED {
[] ← JOIN checkProcess;
RETURN[checkObject.communicationsOK, checkObject.askFound, checkObject.serverPupAddress, checkObject.errMsg];
};
Process.Pause[2];
ENDLOOP;
};
CheckItemProcess: PROC [entry: QueueEntry, item: WaitingRequest, checkObject: REF CheckItemProcessObj] = {
ENABLE ABORTED => GOTO aborted;
communicationsOK: BOOLTRUE;
askFound: ComputeServer.AskResponce;
serverPupAddress: PupDefs.PupAddress;
errMsg: ROPE;
firstTime: BOOLTRUE;
serverInstance: ROPE ← item.instance;
DO
serverInterface: ComputeServerRpcControl.InterfaceRecord;
serverInterface ← getServerInterfaceFromCache[serverInstance];
IF serverInterface = NIL THEN {communicationsOK ← FALSE; RETURN};
[found: askFound, serverPupAddress: serverPupAddress, errMsg: errMsg] ← serverInterface.AskForService[service: entry.service, version: item.version, clientMachineName: item.clientMachineName, streamPupAddress: item.streamPupAddress, needListener: item.needListener
! RPC.CallFailed => {
[] ← deleteServerInterfaceFromCache[serverInterface];
IF firstTime THEN {firstTime ← FALSE; LOOP;};
communicationsOK ← FALSE;
EXIT;
};
];
EXIT;
ENDLOOP;
checkObject.check ← done;
checkObject.communicationsOK ← communicationsOK;
checkObject.askFound ← askFound;
checkObject.serverPupAddress ← serverPupAddress;
checkObject.errMsg ← errMsg;
EXITS
aborted => {
checkObject.check ← aborted;
RETURN;
};
};
InnerFindService: ENTRY PROC [service: ROPE] RETURNS [found: BOOL, instance: RPC.ShortROPE] = {
ENABLE UNWIND => NULL;
statusList: LIST OF ServerStatus;
bestStatus: ServerStatus ← NIL;
foundCommand: BOOL;
[found: foundCommand] ← SymTab.Fetch[x: ComputeServerInternal.CommandTable, key: service];
IF foundCommand THEN {
command should be out on (almost) every server
foundBroken: BOOL;
brokenVal: REF ANY;
brokenList: LIST OF BrokenCommand;
[found: foundBroken, val: brokenVal] ← SymTab.Fetch[x: BrokenCommandTable, key: service];
brokenList ← NARROW[brokenVal];
FOR statusList ← ServerStatusList, statusList.rest UNTIL statusList = NIL DO
skipIt: BOOLFALSE;
serverMachineName: ROPE ← statusList.first.serverMachineName;
serverMachinePupAddress: ROPE ← statusList.first.serverMachinePupAddress;
brokenListLoop: LIST OF BrokenCommand;
FOR brokenListLoop ← brokenList, brokenListLoop.rest UNTIL brokenListLoop = NIL DO
IF Rope.Equal[brokenListLoop.first.serverMachineName, serverMachineName, FALSE] OR Rope.Equal[brokenListLoop.first.serverMachineName, serverMachinePupAddress, FALSE] THEN {
skipIt ← TRUE;
EXIT;
};
ENDLOOP;
IF skipIt THEN LOOP;
IF IsStatusBetter[statusList.first, bestStatus] THEN bestStatus ← statusList.first;
ENDLOOP;
}
ELSE {
command should be out on (almost) no servers
foundExtra: BOOL;
extraVal: REF ANY;
extraListLoop: LIST OF ExtraCommand;
[found: foundExtra, val: extraVal] ← SymTab.Fetch[x: ExtraCommandTable, key: service];
FOR extraListLoop ← NARROW[extraVal], extraListLoop.rest UNTIL extraListLoop = NIL DO
statusFound: BOOL;
val: REF ANY;
status: ServerStatus;
[found: statusFound, val: val] ← SymTab.Fetch[x: MachineToStatusTable, key: extraListLoop.first.serverMachineName];
IF statusFound THEN {
status ← NARROW[val];
IF IsStatusBetter[status, bestStatus] THEN bestStatus ← status;
};
ENDLOOP;
};
IF bestStatus = NIL THEN {
RETURN[FALSE, ""]
}
ELSE {
bestStatus.CPULoad ← MIN[ 1.0, bestStatus.CPULoad + 0.5];
bestStatus.reclamationRate ← bestStatus.reclamationRate + 30 ;
bestStatus.FOM ← figureOfMerit[bestStatus];
RETURN[TRUE, bestStatus.serverMachinePupAddress];
};
};
figureOfMerit: PROC [status: ServerStatus] RETURNS [FOM: REAL] = {
adjustedBusyCPU: REAL = IF status.machineType = dorado THEN status.CPULoad ELSE 0.8 + (status.CPULoad/5.0);
thrashRate: REAL ← status.reclamationRate/90.0;
IF thrashRate > 1.0 THEN thrashRate ← 1.0;
FOM ← Real.SqRt[thrashRate*thrashRate + adjustedBusyCPU*adjustedBusyCPU];
};
CommandUnavailable: PUBLIC ENTRY PROC [serverMachineName: RPC.ShortROPE, commandName: ROPE, version: RPC.ShortROPE] = {
ENABLE UNWIND => NULL;
found: BOOL;
oldList: LIST OF BrokenCommand;
newBroken: BrokenCommand;
newList: LIST OF BrokenCommand;
machineList: LIST OF BrokenCommand;
val: REF ANY;
[found: found, val: val] ← SymTab.Fetch[x: BrokenCommandTable, key: commandName];
IF found THEN {
oldList ← NARROW[val];
FOR machineList ← oldList, machineList.rest UNTIL machineList = NIL DO
broken: BrokenCommand = machineList.first;
IF Rope.Equal[broken.serverMachineName, serverMachineName] THEN {
versionList: LIST OF ROPE;
FOR versionList ← broken.versions, versionList.rest UNTIL versionList = NIL DO
vers: ROPENARROW[versionList.first];
IF Rope.Equal[vers, version] THEN RETURN;
ENDLOOP;
broken.versions ← CONS[version, broken.versions];
RETURN;
};
ENDLOOP;
newBroken ← NEW[BrokenCommandObject ← [serverMachineName: serverMachineName, versions: LIST[version]]];
newList ← CONS[newBroken, NARROW[val]];
[] ← SymTab.Store[x: BrokenCommandTable, key: commandName, val: newList];
}
ELSE {
newBroken ← NEW[BrokenCommandObject ← [serverMachineName: serverMachineName, versions: LIST[version]]];
newList ← LIST[newBroken];
[] ← SymTab.Store[x: BrokenCommandTable, key: commandName, val: newList];
};
};
ExtraCommandAvailable: PUBLIC ENTRY PROC [serverMachineName: RPC.ShortROPE, commandName: ROPE, version: RPC.ShortROPE] = {
ENABLE UNWIND => NULL;
found: BOOL;
oldList: LIST OF ExtraCommand;
newExtra: ExtraCommand;
newList: LIST OF ExtraCommand;
machineList: LIST OF ExtraCommand;
val: REF ANY;
See if this "Extra" reverses a previous broken
[found: found, val: val] ← SymTab.Fetch[x: BrokenCommandTable, key: commandName];
IF found THEN {
[] ← SymTab.Delete[x: BrokenCommandTable, key: commandName];
};
[found: found, val: val] ← SymTab.Fetch[x: ExtraCommandTable, key: commandName];
IF found THEN {
oldList ← NARROW[val];
FOR machineList ← oldList, machineList.rest UNTIL machineList = NIL DO
extra: ExtraCommand = machineList.first;
IF Rope.Equal[extra.serverMachineName, serverMachineName] THEN {
versionList: LIST OF ROPE;
FOR versionList ← extra.versions, versionList.rest UNTIL versionList = NIL DO
vers: ROPENARROW[versionList.first];
IF Rope.Equal[vers, version] THEN RETURN;
ENDLOOP;
extra.versions ← CONS[version, extra.versions];
RETURN;
};
ENDLOOP;
newExtra ← NEW[ExtraCommandObject ← [serverMachineName: serverMachineName, versions: LIST[version]]];
newList ← CONS[newExtra, NARROW[val]];
[] ← SymTab.Store[x: ExtraCommandTable, key: commandName, val: newList];
}
ELSE {
newExtra ← NEW[ExtraCommandObject ← [serverMachineName: serverMachineName, versions: LIST[version]]];
newList ← LIST[newExtra];
[] ← SymTab.Store[x: ExtraCommandTable, key: commandName, val: newList];
};
};
NoticeNewPackage: PUBLIC ENTRY PROC [package: RPC.ShortROPE] RETURNS [error: BOOLFALSE, tryDifferentController: BOOLFALSE, msg: ROPENIL]= {
ENABLE UNWIND => NULL;
statusList: LIST OF ServerStatus;
packageListInStream: IO.STREAM;
packageListGlobalName: ROPENIL;
packageListOutStream: IO.STREAM;
packageListTempName: ROPENIL;
foundItem: BOOLFALSE;
fullFName: ROPE;
fsErrMsg: ROPE;
cp: FS.ComponentPositions;
dirOmitted: BOOLEAN;
dfPrefix: ROPE;
shortPackage: ROPENIL;
shortPackageAndDF: ROPENIL;
packageDFDate: BasicTime.GMT ← BasicTime.nullGMT;
sawDir: BOOLFALSE;
blankAfterDir: BOOLFALSE;
lastDir: REF DFUtilities.DirectoryItem ← NIL;
wroteDir: BOOLFALSE;
defaultDirectory: REF DFUtilities.DirectoryItem ← NIL ;
whiteItem: REF DFUtilities.WhiteSpaceItem ← NEW[DFUtilities.WhiteSpaceItem ← [1]];
writeUpdatedItem: PROC[] = {
file: REF DFUtilities.FileItem;
file ← NEW[DFUtilities.FileItem ← [
name: shortPackageAndDF, -- a short name
date: [explicit, packageDFDate],
verifyRoot: FALSE]];
IF lastDir = NIL OR ~Rope.Equal[lastDir.path1, dfPrefix, FALSE] THEN {
dir: REF DFUtilities.DirectoryItem ← NEW [DFUtilities.DirectoryItem ← [
path1: dfPrefix,
path2: NIL,
path2IsCameFrom: FALSE,
exported: FALSE,
readOnly: FALSE
]];
DFUtilities.WriteItemToStream[out: packageListOutStream, item: whiteItem];
DFUtilities.WriteItemToStream[out: packageListOutStream, item: dir];
DFUtilities.WriteItemToStream[out: packageListOutStream, item: whiteItem];
lastDir ← dir;
};
DFUtilities.WriteItemToStream[out: packageListOutStream, item: file];
};
DoOneItem: DFUtilities.ProcessItemProc = {
PROC [item: REF ANY] RETURNS [stop: BOOLFALSE]
errors, warnings, filesActedUpon: INT ← 0;
WITH item SELECT FROM
dir: REF DFUtilities.DirectoryItem => {
IF lastDir = NIL OR ~Rope.Equal[lastDir.path1, dir.path1, FALSE] THEN {
wroteDir ← FALSE;
lastDir ← dir;
};
};
comment: REF DFUtilities.CommentItem => {
DFUtilities.WriteItemToStream[out: packageListOutStream, item: comment];
};
white: REF DFUtilities.WhiteSpaceItem => {
};
file: REF DFUtilities.FileItem => {
shortName: ROPE = Rope.Substr[file.name, 0, Rope.Index[s1: file.name, s2: "!"]];
packageDFDate: BasicTime.GMT ← BasicTime.nullGMT;
currentDFDate: BasicTime.GMT ← BasicTime.nullGMT;
IF ~wroteDir THEN {
wroteDir ← TRUE;
IF lastDir = NIL THEN lastDir ← defaultDirectory;
DFUtilities.WriteItemToStream[out: packageListOutStream, item: whiteItem];
DFUtilities.WriteItemToStream[out: packageListOutStream, item: lastDir];
DFUtilities.WriteItemToStream[out: packageListOutStream, item: whiteItem];
};
IF Rope.Equal[shortPackageAndDF, shortName, FALSE] THEN {
writeUpdatedItem[];
foundItem ← TRUE;
}
ELSE {
DFUtilities.WriteItemToStream[out: packageListOutStream, item: file];
};
};
ENDCASE;
};
{
ENABLE UNWIND => {
IF packageListInStream # NIL THEN packageListInStream.Close[! FS.Error => CONTINUE];
IF packageListOutStream # NIL THEN packageListOutStream.Close[! FS.Error => CONTINUE];
FS.Delete[name: packageListTempName ! FS.Error => CONTINUE];
};
IF package.IsEmpty[] THEN RETURN [ TRUE, FALSE, "null package name"];
IF ~ComputeControllerInternal.IAmTheController THEN RETURN [ FALSE, TRUE, "controller has moved"];
WHILE ComputeControllerInternal.IAmTheController AND
BasicTime.Period[ComputeControllerInternal.TimeIBecameTheController, BasicTime.Now[]] < 22 DO
WAIT NoticeNewPackageCondition;
ENDLOOP;
IF ~ComputeControllerInternal.IAmTheController THEN RETURN [ FALSE, TRUE, "controller has moved"];
[fullFName: fullFName, cp: cp, dirOmitted: dirOmitted] ← FS.ExpandName[name: package, wDir: NIL ! FS.Error => GOTO badPackageName ];
shortPackage ← fullFName.Substr[cp.base.start, cp.base.length];
shortPackageAndDF ← Rope.Concat[shortPackage, ".df"];
dfPrefix ← fullFName.Substr[0, cp.base.start];
defaultDirectory ← NEW [DFUtilities.DirectoryItem ← [
path1: dfPrefix,
path2: NIL,
path2IsCameFrom: FALSE,
exported: FALSE,
readOnly: FALSE
]];
[created: packageDFDate] ← FS.FileInfo[name: package, remoteCheck: TRUE ! FS.Error => GOTO noDFFile];
packageListGlobalName ← Rope.Concat[ComputeServerInternal.RemoteCommandDir, "PackageList"];
packageListInStream ← FS.StreamOpen[packageListGlobalName
! FS.Error => GOTO cantOpenPackageList];
packageListTempName ← Rope.Concat[ComputeServerInternal.LocalCommandDir, "PackageList.temp"];
packageListOutStream ← FS.StreamOpen[fileName: packageListTempName, accessOptions: $create];
DFUtilities.ParseFromStream[packageListInStream, DoOneItem ! DFUtilities.SyntaxError => GOTO syntaxErrorInPackageList];
IF ~foundItem THEN writeUpdatedItem[];
packageListInStream.Close[! FS.Error => CONTINUE];
packageListOutStream.Close[! FS.Error => CONTINUE];
[] ← FS.Copy[from: packageListTempName, to: packageListGlobalName, attach: FALSE ! FS.Error => { fsErrMsg ← error.explanation; GOTO cantCopyToServer}];
FS.Delete[name: packageListTempName ! FS.Error => CONTINUE];
IF ~ComputeControllerInternal.IAmTheController THEN RETURN [ FALSE, TRUE, "controller has moved"];
FOR statusList ← ComputeControllerInternal.ServerStatusList, statusList.rest UNTIL statusList = NIL DO
statusList.first.newPackage ← TRUE;
ENDLOOP;
EXITS
cantOpenPackageList => RETURN [ TRUE, FALSE, "controller could not open package list"];
noDFFile => RETURN [ TRUE, FALSE, "controller could not open specified df file - "];
syntaxErrorInPackageList => RETURN [ TRUE, FALSE, "controller could not parse the package list - call an implementor"];
badPackageName => RETURN [ TRUE, FALSE, "bad package df file name"];
cantCopyToServer => RETURN [ TRUE, FALSE, Rope.Concat["could not copy new package list to the file server because ", fsErrMsg]];
};
};
RemoveOldPackage: PUBLIC ENTRY PROC [package: RPC.ShortROPE] RETURNS [error: BOOL, tryDifferentController: BOOL, msg: Rope.ROPE] = {
ENABLE UNWIND => NULL;
statusList: LIST OF ServerStatus;
packageListInStream: IO.STREAM;
packageListGlobalName: ROPENIL;
packageListOutStream: IO.STREAM;
packageListTempName: ROPENIL;
foundItem: BOOLFALSE;
fullFName: ROPE;
cp: FS.ComponentPositions;
dirOmitted: BOOLEAN;
dfPrefix: ROPE;
shortPackage: ROPENIL;
shortPackageAndDF: ROPENIL;
wroteDir: BOOLFALSE;
lastDir: REF DFUtilities.DirectoryItem ← NIL;
defaultDirectory: REF DFUtilities.DirectoryItem ← NIL ;
whiteItem: REF DFUtilities.WhiteSpaceItem ← NEW[DFUtilities.WhiteSpaceItem ← [1]];
DoOneItem: DFUtilities.ProcessItemProc = {
PROC [item: REF ANY] RETURNS [stop: BOOLFALSE]
errors, warnings, filesActedUpon: INT ← 0;
WITH item SELECT FROM
dir: REF DFUtilities.DirectoryItem => {
IF lastDir = NIL OR ~Rope.Equal[lastDir.path1, dir.path1, FALSE] THEN {
DFUtilities.WriteItemToStream[out: packageListOutStream, item: dir];
wroteDir ← FALSE;
lastDir ← dir;
};
};
comment: REF DFUtilities.CommentItem => {
DFUtilities.WriteItemToStream[out: packageListOutStream, item: comment];
};
white: REF DFUtilities.WhiteSpaceItem => {
DFUtilities.WriteItemToStream[out: packageListOutStream, item: white];
};
file: REF DFUtilities.FileItem => {
shortName: ROPE = Rope.Substr[file.name, 0, Rope.Index[s1: file.name, s2: "!"]];
currentDFDate: BasicTime.GMT ← BasicTime.nullGMT;
IF Rope.Equal[shortPackageAndDF, shortName, FALSE] THEN {
foundItem ← TRUE;
}
ELSE {
IF ~wroteDir THEN {
wroteDir ← TRUE;
IF lastDir = NIL THEN lastDir ← defaultDirectory;
DFUtilities.WriteItemToStream[out: packageListOutStream, item: whiteItem];
DFUtilities.WriteItemToStream[out: packageListOutStream, item: lastDir];
DFUtilities.WriteItemToStream[out: packageListOutStream, item: whiteItem];
};
DFUtilities.WriteItemToStream[out: packageListOutStream, item: file];
};
};
ENDCASE;
};
{
ENABLE UNWIND => {
IF packageListInStream # NIL THEN packageListInStream.Close[! FS.Error => CONTINUE];
IF packageListOutStream # NIL THEN packageListOutStream.Close[! FS.Error => CONTINUE];
FS.Delete[name: packageListTempName ! FS.Error => CONTINUE];
};
IF package.IsEmpty[] THEN RETURN [ TRUE, FALSE, "null package name"];
IF ~ComputeControllerInternal.IAmTheController THEN RETURN [ FALSE, TRUE, "controller has moved"];
WHILE ComputeControllerInternal.IAmTheController AND
BasicTime.Period[ComputeControllerInternal.TimeIBecameTheController, BasicTime.Now[]] < 22 DO
WAIT NoticeNewPackageCondition;
ENDLOOP;
IF ~ComputeControllerInternal.IAmTheController THEN RETURN [ FALSE, TRUE, "controller has moved"];
[fullFName: fullFName, cp: cp, dirOmitted: dirOmitted] ← FS.ExpandName[name: package, wDir: NIL ! FS.Error => GOTO badPackageName ];
shortPackage ← fullFName.Substr[cp.base.start, cp.base.length];
shortPackageAndDF ← Rope.Concat[shortPackage, ".df"];
dfPrefix ← fullFName.Substr[0, cp.base.start];
defaultDirectory ← NEW [DFUtilities.DirectoryItem ← [
path1: dfPrefix,
path2: NIL,
path2IsCameFrom: FALSE,
exported: FALSE,
readOnly: FALSE
]];
packageListGlobalName ← Rope.Concat[ComputeServerInternal.RemoteCommandDir, "PackageList"];
packageListInStream ← FS.StreamOpen[packageListGlobalName
! FS.Error => GOTO cantOpenPackageList];
packageListTempName ← Rope.Concat[ComputeServerInternal.LocalCommandDir, "PackageList.temp"];
packageListOutStream ← FS.StreamOpen[fileName: packageListTempName, accessOptions: $create];
DFUtilities.ParseFromStream[packageListInStream, DoOneItem ! DFUtilities.SyntaxError => GOTO syntaxErrorInPackageList];
packageListInStream.Close[! FS.Error => CONTINUE];
packageListOutStream.Close[! FS.Error => CONTINUE];
IF ~foundItem THEN {
FS.Delete[name: packageListTempName ! FS.Error => CONTINUE];
RETURN [ TRUE, FALSE, "package not found"]
};
[] ← FS.Copy[from: packageListTempName, to: packageListGlobalName, attach: FALSE];
FS.Delete[name: packageListTempName ! FS.Error => CONTINUE];
IF ~ComputeControllerInternal.IAmTheController THEN RETURN [ FALSE, TRUE, "controller has moved"];
FOR statusList ← ComputeControllerInternal.ServerStatusList, statusList.rest UNTIL statusList = NIL DO
statusList.first.newPackage ← TRUE;
ENDLOOP;
EXITS
cantOpenPackageList => RETURN [ TRUE, FALSE, "controller could not open package list"];
syntaxErrorInPackageList => RETURN [ TRUE, FALSE, "controller could not parse the package list - call an implementor"];
badPackageName => RETURN [ TRUE, FALSE, "bad package df file name"];
};
};
NewStats: PUBLIC ENTRY PROC [serverMachineName: RPC.ShortROPE, serverMachinePupAddress: RPC.ShortROPE, serverUP: BOOL, firstCall: BOOL, machineType: PrincOps.MachineType, mainMemory: CARDINAL, numberCPUs: CARDINAL, diskPartionSize: INT, freePagesOnDisk: INT, freeboard: INT, freeGFI: CARDINAL, freeMDS: CARDINAL, freeVM: CARDINAL, oldestLRUFileDate: BasicTime.GMT, CPULoad: REAL, reclamationRate: REAL, freeProcesses: CARDINAL] RETURNS [terminateService, newPackage: BOOLFALSE, queueingCommands: LIST OF ROPENIL] = {
ENABLE UNWIND => NULL;
statusList: LIST OF ServerStatus;
status: ServerStatus ← NIL ;
lastStatusList: LIST OF ServerStatus ← NIL ;
FOR statusList ← ServerStatusList, statusList.rest UNTIL statusList = NIL DO
IF Rope.Equal[statusList.first.serverMachineName, serverMachineName] THEN {
status ← statusList.first;
IF ~serverUP THEN {
IF lastStatusList = NIL THEN ServerStatusList ← statusList.rest
ELSE lastStatusList.rest ← statusList.rest;
}
ELSE {
newPackage ← status.newPackage;
status.newPackage ← FALSE;
status.freePagesOnDisk ← freePagesOnDisk;
status.freeboard ← freeboard;
status.freeGFI ← freeGFI;
status.freeMDS ← freeMDS;
status.freeVM ← freeVM;
status.oldestLRUFileDate ← oldestLRUFileDate;
status.CPULoad ← CPULoad;
status.reclamationRate ← reclamationRate;
status.freeProcesses ← freeProcesses;
status.FOM ← figureOfMerit[status: status];
status.timeOfStatus ← BasicTime.Now[];
IF ~SymTab.Fetch[x: MachineToStatusTable, key: serverMachineName].found THEN {
[] ← SymTab.Store[x: MachineToStatusTable, key: serverMachineName, val: status];
[] ← SymTab.Store[x: MachineToStatusTable, key: serverMachinePupAddress, val: status];
};
};
EXIT;
};
lastStatusList ← statusList;
REPEAT
FINISHED => status ← NIL;
ENDLOOP;
IF status = NIL AND serverUP THEN {
status ← NEW[ServerStatusRecord ← [
serverMachineName: serverMachineName,
serverMachinePupAddress: serverMachinePupAddress,
timeOfStatus: BasicTime.Now[],
machineType: machineType,
mainMemory: mainMemory,
numberCPUs: numberCPUs,
diskPartionSize:diskPartionSize,
freePagesOnDisk: freePagesOnDisk,
freeboard: freeboard,
freeGFI: freeGFI,
freeMDS: freeMDS,
freeVM: freeVM,
oldestLRUFileDate: oldestLRUFileDate,
CPULoad: CPULoad,
reclamationRate: reclamationRate,
freeProcesses: freeProcesses
]];
status.FOM ← figureOfMerit[status: status];
[] ← SymTab.Store[x: MachineToStatusTable, key: serverMachineName, val: status];
[] ← SymTab.Store[x: MachineToStatusTable, key: serverMachinePupAddress, val: status];
ServerStatusList ← CONS[status, ServerStatusList];
};
IF status # NIL AND status.newQueueing THEN {
status.newQueueing ← FALSE;
FOR l: LIST OF QueueEntry ← WaitingQueue, l.rest UNTIL l = NIL DO
IF l.first.doQueueing THEN queueingCommands ← CONS[l.first.service, queueingCommands];
ENDLOOP;
};
IF firstCall THEN {
CleanBroken: SymTab.EachPairAction = {
PROC [key: Key, val: Val] RETURNS [quit: BOOL];
ENABLE UNWIND => NULL;
brokenList, newBrokenList, brokenListLoop: LIST OF BrokenCommand ← NIL;
changed: BOOLFALSE;
brokenList ← NARROW[val];
FOR brokenListLoop ← brokenList, brokenListLoop.rest UNTIL brokenListLoop = NIL DO
IF Rope.Equal[brokenListLoop.first.serverMachineName, serverMachinePupAddress, FALSE] THEN {
changed ← TRUE;
LOOP;
}
ELSE {
newBrokenList ← CONS[brokenListLoop.first, newBrokenList];
};
ENDLOOP;
IF changed THEN [] ← SymTab.Store[x: BrokenCommandTable, key: key, val: newBrokenList];
RETURN[FALSE];
};
CleanExtra: SymTab.EachPairAction = {
PROC [key: Key, val: Val] RETURNS [quit: BOOL];
ENABLE UNWIND => NULL;
extraList, newExtraList, extraListLoop: LIST OF ExtraCommand ← NIL;
changed: BOOLFALSE;
extraList ← NARROW[val];
FOR extraListLoop ← extraList, extraListLoop.rest UNTIL extraListLoop = NIL DO
IF Rope.Equal[extraListLoop.first.serverMachineName, serverMachinePupAddress, FALSE] THEN {
changed ← TRUE;
LOOP;
}
ELSE {
newExtraList ← CONS[extraListLoop.first, newExtraList];
};
ENDLOOP;
IF changed THEN [] ← SymTab.Store[x: ExtraCommandTable, key: key, val: newExtraList];
RETURN[FALSE];
};
[] ← SymTab.Pairs[x: BrokenCommandTable, action: CleanBroken];
[] ← SymTab.Pairs[x: ExtraCommandTable, action: CleanExtra];
};
};
getServerInterfaceFromCache: ENTRY PROC [serverInstance: RPC.ShortROPE] RETURNS [serverInterface: ComputeServerRpcControl.InterfaceRecord ← NIL] = {
ENABLE UNWIND => NULL;
newServerInterface: ComputeServerRpcControl.InterfaceRecord ← NIL;
bestIndex: INT ← 0;
bestTime: BasicTime.GMT ← BasicTime.latestGMT;
now: BasicTime.GMT ← BasicTime.Now[];
FOR index: INT IN [0..serverInterfaceCacheSize) DO
IF Rope.Equal[serverInstance, serverInterfaceCache[index].serverInstance] THEN {
newServerInterface ← serverInterfaceCache[index].interface;
serverInterfaceCache[index].lastUsed ← now;
RETURN[serverInterfaceCache[index].interface];
};
IF BasicTime.Period[bestTime, serverInterfaceCache[index].lastUsed] < 0 THEN {
bestTime ← serverInterfaceCache[index].lastUsed;
bestIndex ← index;
};
ENDLOOP;
newServerInterface ← ComputeServerRpcControl.ImportNewInterface[
interfaceName: [ type: "ComputeServer.summoner", instance: serverInstance, version: [1,1]]
! RPC.ImportFailed => {
CONTINUE;
};
];
IF newServerInterface # NIL THEN {
serverInterfaceCache[bestIndex].lastUsed ← now;
serverInterfaceCache[bestIndex].serverInstance ← serverInstance;
serverInterfaceCache[bestIndex].interface ← newServerInterface;
};
RETURN[newServerInterface];
};
deleteServerInterfaceFromCache: ENTRY PROC [serverInterface: ComputeServerRpcControl.InterfaceRecord] RETURNS [gotIt: BOOLFALSE] = {
ENABLE UNWIND => NULL;
FOR index: INT IN [0..serverInterfaceCacheSize) DO
IF serverInterfaceCache[index].interface = serverInterface THEN {
serverInterfaceCache[index].serverInstance ← NIL;
serverInterfaceCache[index].interface ← NIL;
serverInterfaceCache[index].lastUsed ← BasicTime.earliestGMT;
gotIt ← TRUE;
EXIT;
};
ENDLOOP;
};
findOrCreateQueueEntry: ENTRY PROC [service: ROPE] RETURNS [entry: QueueEntry ← NIL, item: WaitingRequest] = {
ENABLE UNWIND => NULL;
item ← NEW[WaitingRequestRec ];
FOR queue: LIST OF QueueEntry ← WaitingQueue, queue.rest UNTIL queue = NIL DO
IF Rope.Equal[queue.first.service, service, FALSE] THEN {
entry ← queue.first;
IF entry.waiters = NIL THEN entry.waiters ← CONS[item, NIL]
ELSE FOR l: LIST OF WaitingRequest ← entry.waiters, l.rest UNTIL l = NIL DO
IF l.rest = NIL THEN {l.rest ← CONS[item, NIL]; EXIT};
ENDLOOP;
RETURN; -- existing queue
};
ENDLOOP;
WaitingQueue ← CONS[
entry ← NEW[QueueEntryRec ← [service: service, waiters: LIST[item]]],
WaitingQueue
];
TRUSTED {Process.InitializeCondition[@entry.changed, Process.SecondsToTicks[1]];};
};
setNewQueueing: ENTRY PROC [] = {
ENABLE UNWIND => NULL;
FOR l: LIST OF ServerStatus ← ServerStatusList, l.rest UNTIL l = NIL DO
l.first.newQueueing ← TRUE;
ENDLOOP;
LastReBuildQueueTime ← BasicTime.Now[];
};
queueItemReady: ENTRY PROC [entry: QueueEntry, item: WaitingRequest]= {
ENABLE UNWIND => NULL;
firstTime: BOOL ← TRUE;
item.waiting ← true;
DO
IF BasicTime.Period[from: item.timeout, to: BasicTime.Now[]] > 0 THEN {item.waiting ← timedOut; RETURN};
SELECT item.waiting FROM
true => {
IF entry.servers # NIL AND entry.waiters.first = item THEN {
bestStatus: ServerStatus ← NIL;
prev, prevBest: LIST OF ROPE ← NIL;
best: LIST OF ROPE ← NIL;
serv: ROPE;
FOR l: LIST OF ROPE ← entry.servers, l.rest UNTIL l = NIL DO
statusFound: BOOL;
val: REF ANY;
status: ServerStatus;
serv ← l.first;
[found: statusFound, val: val] ← SymTab.Fetch[x: MachineToStatusTable, key: serv];
IF statusFound THEN {
status ← NARROW[val];
IF IsStatusBetter[status, bestStatus] THEN {
bestStatus ← status;
best ← l;
prevBest ← prev;
};
};
prev ← l;
ENDLOOP;
IF prevBest = NIL THEN {
item.instance ← entry.servers.first;
entry.servers ← entry.servers.rest;
}
ELSE {
item.instance ← best.first;
prevBest.rest ← best.rest;
};
item.instance ← entry.servers.first;
entry.servers ← entry.servers.rest;
item.serverAssigned ← TRUE;
item.waiting ← validating;
RETURN;
};
};
done => {
item.serverAssigned ← TRUE;
item.waiting ← validating;
RETURN;
};
timedOut => {
RETURN;
};
ENDCASE;
IF firstTime THEN {forceReBuildQueuing ← TRUE};
firstTime ← FALSE;
WAIT entry.changed;
ENDLOOP;
};
removeQueueItem: ENTRY PROC [entry: QueueEntry, item: WaitingRequest] = {
ENABLE UNWIND => NULL;
BROADCAST entry.changed;
IF entry.waiters = NIL THEN RETURN; -- actually, this is a bug
IF entry.waiters.first = item THEN {entry.waiters ← entry.waiters.rest; RETURN};
FOR l: LIST OF WaitingRequest ← entry.waiters, l.rest UNTIL l.rest = NIL DO
IF l.rest.first = item THEN {l.rest ← l.rest.rest; RETURN};
ENDLOOP;
};
MightAcceptQueuedCommand: PUBLIC ENTRY PROC [serverMachineAddress: RPC.ShortROPE, commandName: Rope.ROPE] = {
ENABLE UNWIND => NULL;
item: WaitingRequest ← NIL;
FOR queue: LIST OF QueueEntry ← WaitingQueue, queue.rest UNTIL queue = NIL DO
entry: QueueEntry = queue.first;
IF Rope.Equal[commandName, entry.service, FALSE] THEN {
IF entry.waiters = NIL OR entry.waiters.first.waiting # true THEN { -- nobody waiting now or top waiter is busy
IF entry.servers = NIL THEN entry.servers ← CONS[serverMachineAddress, NIL]
ELSE FOR l: LIST OF ROPE ← entry.servers, l.rest UNTIL l = NIL DO
IF Rope.Equal[l.first , serverMachineAddress, FALSE] THEN EXIT;
IF l.rest = NIL THEN {l.rest ← CONS[serverMachineAddress, NIL]; EXIT};
ENDLOOP;
BROADCAST entry.changed;
EXIT;
}
ELSE {
entry.waiters.first.waiting ← done;
entry.waiters.first.instance ← serverMachineAddress;
BROADCAST entry.changed;
EXIT;
};
};
ENDLOOP;
};
IsStatusBetter: PROC [status: ServerStatus, bestStatus: ServerStatus] RETURNS [ itsBetter: BOOLFALSE] = {
SELECT TRUE FROM
bestStatus = NIL => itsBetter ← TRUE;
((3*bestStatus.reclamationRate) > status.reclamationRate) AND (bestStatus.reclamationRate > 5) AND (status.FOM < 2*bestStatus.FOM )=> itsBetter ← TRUE;
ENDCASE => {
IF status.FOM < bestStatus.FOM THEN itsBetter ← TRUE;
};
};
locked: BOOLFALSE;
lockCondition: CONDITION;
Lock: ENTRY PROC = {
WHILE locked DO
WAIT lockCondition;
ENDLOOP;
locked ← TRUE;
};
UnLock: ENTRY PROC = {
locked ← FALSE;
NOTIFY lockCondition
};
Grap Controller
SetGrabController: Commander.CommandProc = {
ComputeControllerInternal.GrabController ← TRUE;
};
Debugging Process
JustAProcessThatYouCanFreeze: PROC = {
DO
Process.Pause[200];
ENDLOOP;
};
Initialize
TRUSTED {Process.Detach[FORK JustAProcessThatYouCanFreeze[]];};
Commander.Register[key: "///Commands/SummonerBecomeController", proc: SetGrabController, doc: "Try to force the Compute Server Controller to this machine"];
TRUSTED {Process.InitializeCondition[@NoticeNewPackageCondition, Process.SecondsToTicks[1]];};
serverInterfaceCache ← NEW[serverInterfaceArray];
END.
Bob Hagmann December 24, 1985 2:21:09 pm PST
Made sure in NewStats that status was not NIL before doing newQueueing processing
Bob Hagmann January 15, 1986 5:43:15 pm PST
Queuing of commands did not drive up the usage numbers
changes to: bumpBestServerStats, FindService, FindServiceWithQueueing