ComputeControllerImpl:
CEDAR
MONITOR
IMPORTS AMProcess, BasicTime, Commander, ComputeControllerInternal, ComputeServerControl, ComputeServerControllerRpcControl, ComputeServerInternal, ComputeServerRpcControl, DFUtilities, FS, GVNames, IO, Process, Real, Rope, RPC, SymTab, UserCredentials, UserProfile, WorldVM
EXPORTS ComputeControllerInternal, ComputeServerController, SummonerControllerControl
= BEGIN
ROPE: TYPE = Rope.ROPE;
NoticeNewPackageCondition: CONDITION;
InterfaceExported: BOOL ← FALSE;
ControllerEnabled: BOOL ← FALSE;
LatentControllerActive: BOOL ← TRUE;
IAmTheController: PUBLIC BOOL ← FALSE;
ControllerMovingAway: PUBLIC BOOL ← FALSE;
GrabController: PUBLIC BOOL ← FALSE;
TimeIBecameTheController: PUBLIC BasicTime.GMT ← BasicTime.nullGMT;
ControllerUp: BOOL ← TRUE;
ControllerGVName: ROPE ← NIL;
ControllerError: TYPE = {none, import, callFailed};
LastControllerError: ControllerError ← none;
LastImportFailure: RPC.ImportFailure;
LastCallFailure: RPC.CallFailure;
ControllerFailedTime: BasicTime.GMT;
LastReBuildQueueTime: BasicTime.GMT ← BasicTime.nullGMT;
ServerStatusRecord: TYPE = ComputeControllerInternal.ServerStatusRecord;
ServerStatus: TYPE = REF ServerStatusRecord;
ServerStatusList: PUBLIC LIST OF ServerStatus ← NIL;
BrokenCommandTable: SymTab.Ref; -- knowing a command, find where its broken
ExtraCommandTable: SymTab.Ref; -- knowing a non-standard command, find who can run it
MachineToStatusTable: SymTab.Ref; -- knowing a machine's name, find its status; name is of the form "Hornet" or "3#35#"
BrokenCommandObject:
TYPE =
RECORD [
serverMachineName: RPC.ShortROPE,
versions: LIST OF ROPE ← NIL
];
BrokenCommand: TYPE = REF BrokenCommandObject;
ExtraCommandObject:
TYPE =
RECORD [
serverMachineName: RPC.ShortROPE,
versions: LIST OF ROPE ← NIL
];
ExtraCommand: TYPE = REF ExtraCommandObject;
forceReBuildQueuing: BOOL ← FALSE;
WaitingQueue: LIST OF QueueEntry ← NIL;
QueueEntry: TYPE = REF QueueEntryRec;
QueueEntryRec:
TYPE =
RECORD [
service: ROPE ← NIL,
doQueueing: BOOL ← FALSE,
waiters: LIST OF WaitingRequest ← NIL,
servers: LIST OF ROPE ← NIL,
changed: CONDITION
];
WaitingRequest: TYPE = REF WaitingRequestRec;
WaitingRequestRec:
TYPE =
RECORD [
process: PROCESS,
timeout: BasicTime.GMT,
waiting: waitType ← init,
version: ROPE,
clientMachineName: RPC.ShortROPE,
streamPupAddress: PupDefs.PupAddress,
needListener: BOOL,
serverAssigned: BOOL ← FALSE,
response: ComputeServer.AskResponce,
instance: RPC.ShortROPE,
serverPupAddress: PupDefs.PupAddress
];
waitType: TYPE = {init, true, validating, done, timedOut};
serverInterfaceItem:
TYPE =
RECORD [
serverInstance: ROPE ← NIL,
interface: ComputeServerRpcControl.InterfaceRecord ← NIL,
lastUsed: BasicTime.GMT ← BasicTime.earliestGMT
];
serverInterfaceCacheSize: INT = 20;
serverInterfaceArray: TYPE = ARRAY [0..serverInterfaceCacheSize) OF serverInterfaceItem;
serverInterfaceCache: REF serverInterfaceArray;
CheckState: TYPE = {checking, aborted, done};
CheckItemProcessObj:
TYPE =
RECORD[
check: CheckState ← checking,
communicationsOK: BOOL ← TRUE,
askFound: ComputeServer.AskResponce,
serverPupAddress: PupDefs.PupAddress,
errMsg: ROPE
];
NoOfLatentSummonerControllerProcesses: INT ← 0 ;
StartUpController:
PUBLIC
PROC [controllerName:
ROPE ←
NIL, forceSelfAsPrimaryController:
BOOL ←
FALSE]
RETURNS [] = {
locked: BOOL ← FALSE;
{
ENABLE UNWIND => IF locked THEN UnLock[];
user, password: ROPE;
ControllerGVName ← IF Rope.IsEmpty[controllerName] THEN UserProfile.Token["Summoner.ControllerName"] ELSE controllerName;
IF Rope.IsEmpty[ControllerGVName] THEN ControllerGVName ← "PaloAlto.summoner" ;
ControllerEnabled ← TRUE;
LatentControllerActive ← TRUE;
ControllerUp ← TRUE;
Lock[];
locked ← TRUE;
IF ~InterfaceExported
THEN {
info: GVNames.ConnectInfo;
connect: GVBasics.Connect;
[info: info, connect: connect ] ← GVNames.GetConnect[ControllerGVName];
IF info = individual
OR info = group
THEN {
IF forceSelfAsPrimaryController
OR Rope.Equal[connect, ComputeServerControl.MyNetAddressRope]
OR Rope.Equal[UserProfile.Token["Summoner.PrimaryController"], ComputeServerControl.MyNetAddressRope]
THEN {
IAmTheController ← TRUE;
TimeIBecameTheController ← BasicTime.Now[];
[user, password] ← UserCredentials.Get[];
InterfaceExported ← TRUE;
BrokenCommandTable ← SymTab.Create[mod: 59, case: FALSE];
ExtraCommandTable ← SymTab.Create[mod: 59, case: FALSE];
MachineToStatusTable ← SymTab.Create[mod: 59, case: FALSE];
ComputeServerControllerRpcControl.ExportInterface[
interfaceName: [ type: "ComputeServerController.summoner", instance: ControllerGVName, version: [1,1]],
user: user,
password: RPC.MakeKey[password]
!
RPC.ExportFailed => {
InterfaceExported ← FALSE;
IAmTheController ← FALSE;
CONTINUE;
};
];
};
};
};
UnLock[];
locked ← FALSE;
TRUSTED {Process.Detach[FORK LatentSummonerController[]] ;};
};
};
ControllerCannnotBeImported:
PUBLIC
PROC [reason:
RPC.ImportFailure] = {
LastControllerError ← import;
LastImportFailure ← reason;
NoticeControllerIsDown[];
};
ControllerCallFailed:
PUBLIC
PROC [reason:
RPC.CallFailure] = {
LastControllerError ← callFailed;
LastCallFailure ← reason;
NoticeControllerIsDown[];
};
NoticeControllerIsDown:
PROC = {
IF ControllerUp THEN ControllerFailedTime ← BasicTime.Now[];
ControllerUp ← FALSE;
};
NoticeControllerIsUp:
PUBLIC
PROC = {
ControllerUp ← TRUE;
};
TryThisController:
PROC
RETURNS [importedOK:
BOOL ←
TRUE, controllerInterface: ComputeServerControllerRpcControl.InterfaceRecord] = {
controllerInterface ← ComputeServerControllerRpcControl.ImportNewInterface[
interfaceName: [ type: "ComputeServerController.summoner", instance: ControllerGVName, version: [1,1]]
!
RPC.ImportFailed => {
importedOK ← FALSE;
CONTINUE;
};
];
};
LatentSummonerController:
PROC [] = {
MakeSureThereIsOnlyOne:
ENTRY
PROC
RETURNS [exit:
BOOL ←
FALSE] = {
IF NoOfLatentSummonerControllerProcesses = 0 THEN NoOfLatentSummonerControllerProcesses ← 1
ELSE RETURN[TRUE];
};
locked: BOOL ← FALSE;
IF MakeSureThereIsOnlyOne[] THEN RETURN;
DO
ENABLE UNWIND => IF locked THEN UnLock[];
IF ~LatentControllerActive THEN EXIT;
IF ~ControllerUp
OR GrabController
THEN {
-- controller is down or I should grab it - see if I should acquire the controller function
info: GVNames.ConnectInfo;
connect: GVBasics.Connect;
ControllerMovingAway ← FALSE;
IF GrabController
OR ~TryThisController[].importedOK
THEN {
-- down but import will work soon
downTime: INT = BasicTime.Period[ControllerFailedTime, BasicTime.Now[]];
[info: info, connect: connect ] ← GVNames.GetConnect[ControllerGVName];
IF GrabController
OR downTime > 83
OR
((downTime > 13
AND (info = individual
OR info = group)
AND Rope.Equal[connect, ComputeServerControl.MyNetAddressRope])
OR Rope.Equal[UserProfile.Token["Summoner.PrimaryController"], ComputeServerControl.MyNetAddressRope])
THEN {
user, password: ROPE;
Lock[];
locked ← TRUE;
GrabController ← FALSE;
IAmTheController ← TRUE;
TimeIBecameTheController ← BasicTime.Now[];
[user, password] ← UserCredentials.Get[];
IF InterfaceExported THEN ComputeServerControllerRpcControl.UnexportInterface[];
InterfaceExported ← TRUE;
BrokenCommandTable ← SymTab.Create[mod: 59, case: FALSE];
ExtraCommandTable ← SymTab.Create[mod: 59, case: FALSE];
MachineToStatusTable ← SymTab.Create[mod: 59, case: FALSE];
ComputeServerControllerRpcControl.ExportInterface[
interfaceName: [ type: "ComputeServerController.summoner", instance: ControllerGVName, version: [1,1]],
user: user,
password: RPC.MakeKey[password]
!
RPC.ExportFailed => {
InterfaceExported ← FALSE;
IAmTheController ← FALSE;
CONTINUE;
};
];
UnLock[];
locked ← FALSE;
};
};
}
ELSE {
-- Controller is up. If I think that I am the controller, but grapevine does not think so, then fix the situation
IF IAmTheController
THEN {
info: GVNames.ConnectInfo;
connect: GVBasics.Connect;
[info: info, connect: connect ] ← GVNames.GetConnect[ControllerGVName];
IF (info = individual
OR info = group)
THEN {
IF ~Rope.Equal[connect, ComputeServerControl.MyNetAddressRope]
THEN {
-- Grapevine says I am not the Controller
triedControllerInterface: ComputeServerControllerRpcControl.InterfaceRecord;
tryDifferentController: BOOL ← FALSE;
importedOK: BOOL;
[importedOK: importedOK, controllerInterface: triedControllerInterface] ← TryThisController[];
See if this Controller thinks everything is fine
IF importedOK
THEN [tryDifferentController: tryDifferentController] ← triedControllerInterface.GetSomeInfo[ !
RPC.CallFailed => {
importedOK ← FALSE;
CONTINUE;
};
];
IF importedOK
AND ~tryDifferentController
THEN {
-- new controller answers
IF ServerStatusList =
NIL
THEN {
Lock[];
locked ← TRUE;
IAmTheController ← FALSE;
ControllerMovingAway ← FALSE;
ComputeServerControllerRpcControl.UnexportInterface[];
InterfaceExported ← FALSE;
ServerStatusList ← NIL;
BrokenCommandTable ← NIL;
ExtraCommandTable ← NIL;
UnLock[];
locked ← FALSE;
}
ELSE {
ControllerMovingAway ← TRUE;
};
}
ELSE {
-- new controller does not answer or thinks it is moving too
ControllerMovingAway ← FALSE;
IF ~importedOK
THEN {
-- new controller does not answer — re-assert my controllership
user, password: ROPE;
[user, password] ← UserCredentials.Get[];
[] ← GVNames.SetConnect[user: user, password: RPC.MakeKey[password], individual: ControllerGVName, connect: ComputeServerControl.MyNetAddressRope];
};
};
}
ELSE {
-- Grapevine says I am the Controller
ControllerMovingAway ← FALSE;
};
}
ELSE {
-- NOT info = individual OR info = group (grapevine troubles)
ControllerMovingAway ← FALSE;
};
TossOldStatus[];
ReBuildQueuing[];
};
};
Process.Pause[Process.SecondsToTicks[127]];
ENDLOOP;
};
ShutDownController:
PUBLIC
PROC []
RETURNS []
= {
IF InterfaceExported THEN ComputeServerControllerRpcControl.UnexportInterface[];
InterfaceExported ← FALSE;
ControllerEnabled ← FALSE;
ServerStatusList ← NIL;
BrokenCommandTable ← NIL;
ExtraCommandTable ← NIL;
MachineToStatusTable ← NIL;
LatentControllerActive ← FALSE;
ControllerUp ← FALSE;
IAmTheController ← FALSE;
};
TossOldStatus:
ENTRY
PROC []
RETURNS []
= {
ENABLE UNWIND => NULL;
statusList: LIST OF ServerStatus;
lastStatusList: LIST OF ServerStatus ← NIL;
status: ServerStatus ← NIL;
now: BasicTime.GMT ← BasicTime.Now[];
FOR statusList ← ServerStatusList, statusList.rest
UNTIL statusList =
NIL
DO
status ← statusList.first;
IF BasicTime.Period[status.timeOfStatus, now] > 30
THEN {
[] ← SymTab.Delete[x: MachineToStatusTable, key: status.serverMachineName];
[] ← SymTab.Delete[x: MachineToStatusTable, key: status.serverMachinePupAddress];
IF lastStatusList = NIL THEN ServerStatusList ← statusList.rest
ELSE lastStatusList.rest ← statusList.rest;
};
lastStatusList ← statusList;
ENDLOOP;
};
ReBuildQueuing
:
PROC []
RETURNS []
= {
now: BasicTime.GMT ← BasicTime.Now[];
IF forceReBuildQueuing
OR BasicTime.Period[LastReBuildQueueTime, now] > 300
THEN {
setNewQueueing[];
LastReBuildQueueTime ← now;
};
forceReBuildQueuing ← FALSE;
};
GetSomeInfo:
PUBLIC
ENTRY
PROC
RETURNS [error:
BOOL ←
FALSE, tryDifferentController:
BOOL ←
FALSE, msg:
ROPE ←
NIL, serverList:
LIST
OF
ROPE ←
NIL, bestFOM:
REAL ← 0.0] = {
ENABLE UNWIND => NULL;
bestStatus: ServerStatus ← NIL;
statusList: LIST OF ServerStatus;
IF ~IAmTheController THEN RETURN [ FALSE, TRUE, "controller has moved", NIL, 0.0];
FOR statusList ← ServerStatusList, statusList.rest
UNTIL statusList =
NIL
DO
status: ServerStatus = statusList.first;
perCentBusy: INT ← Real.Round[status.CPULoad*100.0];
IF Rope.Equal[status.serverMachineName, ComputeServerInternal.myHostName]
THEN {
serverList ← CONS[IO.PutFR["%g (Controller, %g%% busy)", IO.rope[status.serverMachineName], IO.int[perCentBusy]], serverList];
}
ELSE serverList ← CONS[IO.PutFR["%g (%g%%)", IO.rope[status.serverMachineName], IO.int[perCentBusy]], serverList];
IF Rope.Equal[status.serverMachineName, ComputeServerInternal.myHostName] THEN {
serverList ← CONS[Rope.Concat[status.serverMachineName, " (Controller)"], serverList];
}
ELSE serverList ← CONS[status.serverMachineName, serverList];
IF IsStatusBetter[status, bestStatus] THEN bestStatus ← status;
ENDLOOP;
IF bestStatus = NIL THEN RETURN [FALSE, ControllerMovingAway, "no servers up", serverList, 1.0];
RETURN [FALSE, ControllerMovingAway, NIL, serverList, bestStatus.FOM];
};
BestServerStats:
PUBLIC
ENTRY
PROC
RETURNS[instance:
RPC.ShortROPE,
FOM:
REAL] = {
ENABLE UNWIND => NULL;
bestStatus: ServerStatus ← NIL;
statusList: LIST OF ServerStatus;
FOR statusList ← ServerStatusList, statusList.rest
UNTIL statusList =
NIL
DO
status: ServerStatus = statusList.first;
IF IsStatusBetter[status, bestStatus] THEN bestStatus ← status;
ENDLOOP;
IF bestStatus = NIL THEN RETURN [NIL, 1.0];
RETURN [bestStatus.serverMachineName, bestStatus.FOM];
};
bumpBestServerStats:
ENTRY
PROC [instance:
RPC.ShortROPE] = {
ENABLE UNWIND => NULL;
statusFound: BOOL;
val: REF ANY;
status: ServerStatus;
[found: statusFound, val: val] ← SymTab.Fetch[x: MachineToStatusTable, key: instance];
IF statusFound
THEN {
status ← NARROW[val];
status.CPULoad ← MIN[ 1.0, status.CPULoad + 0.5];
status.reclamationRate ← status.reclamationRate + 30 ;
status.FOM ← figureOfMerit[status];
};
};
FindService:
PUBLIC
PROC [service:
ROPE]
RETURNS [found:
BOOL, instance:
RPC.ShortROPE] = {
[found, instance] ← InnerFindService[service];
};
FindServiceWithQueueing:
PUBLIC
PROCEDURE [service:
ROPE, version:
RPC.ShortROPE, timeToWait:
INT, clientMachineName:
RPC.ShortROPE, streamPupAddress: PupDefs.PupAddress, needListener:
BOOL]
RETURNS [found: ComputeServer.AskResponce, instance:
RPC.ShortROPE, serverPupAddress: PupDefs.PupAddress, errMsg:
ROPE] = {
entry: QueueEntry;
item: WaitingRequest ← NIL;
tryForServer:
PROC [] = {
DO
commOK: BOOL;
foundBroken: BOOL;
brokenVal: REF ANY;
brokenList, brokenListLoop: LIST OF BrokenCommand;
askFound: ComputeServer.AskResponce;
queueItemReady[entry, item];
IF item.waiting = timedOut THEN EXIT;
[found: foundBroken, val: brokenVal] ← SymTab.Fetch[x: BrokenCommandTable, key: service];
brokenList ← NARROW[brokenVal];
FOR brokenListLoop ← brokenList, brokenListLoop.rest
UNTIL brokenListLoop =
NIL
DO
IF Rope.Equal[brokenListLoop.first.serverMachineName, item.instance,
FALSE]
THEN {
item.serverAssigned ← FALSE;
item.waiting ← true;
EXIT;
};
ENDLOOP;
IF ~item.serverAssigned THEN LOOP;
[communicationsOK: commOK, askFound: askFound, serverPupAddress: serverPupAddress] ← CheckItem [entry, item];
IF ~commOK
OR (askFound # foundOK)
THEN {
item.serverAssigned ← FALSE;
item.waiting ← true;
LOOP;
};
item.response ← askFound;
item.serverPupAddress ← serverPupAddress;
SELECT askFound
FROM
notFound => {
removeQueueItem[entry, item];
errMsg ← NIL;
RETURN;
};
foundButTooBusy => {
LOOP;
};
foundOK => {
removeQueueItem[entry, item];
RETURN;
};
ENDCASE;
ENDLOOP;
removeQueueItem[entry, item];
};
[entry, item] ← findOrCreateQueueEntry[service];
item.version ← version;
item.clientMachineName ← clientMachineName;
item.streamPupAddress ← streamPupAddress;
item.needListener ← needListener;
IF entry.doQueueing
THEN {
-- queueing already being done
item.process ← LOOPHOLE[Process.GetCurrent[]];
item.timeout ← BasicTime.Update[BasicTime.Now[], timeToWait];
item.waiting ← true;
tryForServer[];
bumpBestServerStats[item.instance];
RETURN[found: item.response, instance: item.instance, serverPupAddress: item.serverPupAddress, errMsg: NIL];
}
ELSE {
-- queueing not started
serversTried: LIST OF RPC.ShortROPE ← NIL;
DO
commOK: BOOL;
serviceFound: BOOL;
instance: RPC.ShortROPE;
askFound: ComputeServer.AskResponce;
entry.doQueueing ← TRUE;
setNewQueueing[];
[serviceFound, instance] ← InnerFindService[service];
item.instance ← instance;
IF ~serviceFound
THEN {
found ← notFound;
removeQueueItem[entry, item];
RETURN;
};
FOR oldServers:
LIST
OF
RPC.ShortROPE ← serversTried, oldServers.rest
UNTIL oldServers =
NIL
DO
IF Rope.Equal[oldServers.first, instance,
FALSE]
THEN {
We are about to retry the same server again. Give up probing, and start queueing.
serversTried ← NIL; -- help garbage collector
entry.doQueueing ← TRUE;
setNewQueueing[];
item.process ← LOOPHOLE[Process.GetCurrent[]];
item.timeout ← BasicTime.Update[BasicTime.Now[], timeToWait];
item.waiting ← true;
tryForServer[];
RETURN[found: item.response, instance: item.instance, serverPupAddress: item.serverPupAddress, errMsg: NIL];
};
ENDLOOP;
serversTried ← CONS[instance, serversTried];
[communicationsOK: commOK, askFound: askFound, serverPupAddress: serverPupAddress] ← CheckItem[entry, item];
IF ~commOK THEN LOOP;
SELECT askFound
FROM
notFound => {
removeQueueItem[entry, item];
RETURN [askFound, instance, serverPupAddress, NIL];
};
foundButTooBusy => {
LOOP;
};
foundOK => {
removeQueueItem[entry, item];
RETURN [askFound, instance, serverPupAddress, NIL];
};
ENDCASE;
ENDLOOP;
};
};
CheckItem:
PROC [entry: QueueEntry, item: WaitingRequest]
RETURNS [communicationsOK:
BOOL ←
TRUE, askFound: ComputeServer.AskResponce, serverPupAddress: PupDefs.PupAddress, errMsg:
ROPE] = {
checkProcess: PROCESS;
checkObject: REF CheckItemProcessObj ← NEW[CheckItemProcessObj];
startTime: BasicTime.GMT ← BasicTime.Now[];
checkProcess ← FORK CheckItemProcess[entry, item, checkObject];
DO
IF BasicTime.Period[startTime, BasicTime.Now[]] > 9
THEN
TRUSTED {
Remote machine is not unresponsive, but will not answer the call
checkProcessTV: AMProcess.Process = AMProcess.PSBIToTV[world: WorldVM.LocalWorld[], psbi: PrincOpsUtils.ProcessToPsbIndex[LOOPHOLE[checkProcess]]];
Detach so we can return. Freeze, abort and thaw the process. After a long time (minutes if ever), the process will finally wake up and go away. Also, if the remote host stops answering the ping, the call will fail and the process will exit.
Process.Detach[checkProcess];
AMProcess.Freeze[LIST[checkProcessTV]];
AMProcess.Abort[checkProcessTV];
AMProcess.Thaw[LIST[checkProcessTV]];
checkObject.check ← aborted;
RETURN[FALSE, notFound, PupTypes.fillInPupAddress, ""];
};
IF checkObject.check = done
THEN
TRUSTED {
[] ← JOIN checkProcess;
RETURN[checkObject.communicationsOK, checkObject.askFound, checkObject.serverPupAddress, checkObject.errMsg];
};
Process.Pause[2];
ENDLOOP;
};
CheckItemProcess:
PROC [entry: QueueEntry, item: WaitingRequest, checkObject:
REF CheckItemProcessObj] = {
ENABLE ABORTED => GOTO aborted;
communicationsOK: BOOL ← TRUE;
askFound: ComputeServer.AskResponce;
serverPupAddress: PupDefs.PupAddress;
errMsg: ROPE;
firstTime: BOOL ← TRUE;
serverInstance: ROPE ← item.instance;
DO
serverInterface: ComputeServerRpcControl.InterfaceRecord;
serverInterface ← getServerInterfaceFromCache[serverInstance];
IF serverInterface = NIL THEN {communicationsOK ← FALSE; RETURN};
[found: askFound, serverPupAddress: serverPupAddress, errMsg: errMsg] ← serverInterface.AskForService[service: entry.service, version: item.version, clientMachineName: item.clientMachineName, streamPupAddress: item.streamPupAddress, needListener: item.needListener
!
RPC.CallFailed => {
[] ← deleteServerInterfaceFromCache[serverInterface];
IF firstTime THEN {firstTime ← FALSE; LOOP;};
communicationsOK ← FALSE;
EXIT;
};
];
EXIT;
ENDLOOP;
checkObject.check ← done;
checkObject.communicationsOK ← communicationsOK;
checkObject.askFound ← askFound;
checkObject.serverPupAddress ← serverPupAddress;
checkObject.errMsg ← errMsg;
EXITS
aborted => {
checkObject.check ← aborted;
RETURN;
};
};
InnerFindService:
ENTRY
PROC [service:
ROPE]
RETURNS [found:
BOOL, instance:
RPC.ShortROPE] = {
ENABLE UNWIND => NULL;
statusList: LIST OF ServerStatus;
bestStatus: ServerStatus ← NIL;
foundCommand: BOOL;
[found: foundCommand] ← SymTab.Fetch[x: ComputeServerInternal.CommandTable, key: service];
IF foundCommand
THEN {
command should be out on (almost) every server
foundBroken: BOOL;
brokenVal: REF ANY;
brokenList: LIST OF BrokenCommand;
[found: foundBroken, val: brokenVal] ← SymTab.Fetch[x: BrokenCommandTable, key: service];
brokenList ← NARROW[brokenVal];
FOR statusList ← ServerStatusList, statusList.rest
UNTIL statusList =
NIL
DO
skipIt: BOOL ← FALSE;
serverMachineName: ROPE ← statusList.first.serverMachineName;
serverMachinePupAddress: ROPE ← statusList.first.serverMachinePupAddress;
brokenListLoop: LIST OF BrokenCommand;
FOR brokenListLoop ← brokenList, brokenListLoop.rest
UNTIL brokenListLoop =
NIL
DO
IF Rope.Equal[brokenListLoop.first.serverMachineName, serverMachineName,
FALSE]
OR Rope.Equal[brokenListLoop.first.serverMachineName, serverMachinePupAddress,
FALSE]
THEN {
skipIt ← TRUE;
EXIT;
};
ENDLOOP;
IF skipIt THEN LOOP;
IF IsStatusBetter[statusList.first, bestStatus] THEN bestStatus ← statusList.first;
ENDLOOP;
}
ELSE {
command should be out on (almost) no servers
foundExtra: BOOL;
extraVal: REF ANY;
extraListLoop: LIST OF ExtraCommand;
[found: foundExtra, val: extraVal] ← SymTab.Fetch[x: ExtraCommandTable, key: service];
FOR extraListLoop ←
NARROW[extraVal], extraListLoop.rest
UNTIL extraListLoop =
NIL
DO
statusFound: BOOL;
val: REF ANY;
status: ServerStatus;
[found: statusFound, val: val] ← SymTab.Fetch[x: MachineToStatusTable, key: extraListLoop.first.serverMachineName];
IF statusFound
THEN {
status ← NARROW[val];
IF IsStatusBetter[status, bestStatus] THEN bestStatus ← status;
};
ENDLOOP;
};
IF bestStatus =
NIL
THEN {
RETURN[FALSE, ""]
}
ELSE {
bestStatus.CPULoad ← MIN[ 1.0, bestStatus.CPULoad + 0.5];
bestStatus.reclamationRate ← bestStatus.reclamationRate + 30 ;
bestStatus.FOM ← figureOfMerit[bestStatus];
RETURN[TRUE, bestStatus.serverMachinePupAddress];
};
};
figureOfMerit:
PROC [status: ServerStatus]
RETURNS [
FOM:
REAL] = {
adjustedBusyCPU: REAL = IF status.machineType = dorado THEN status.CPULoad ELSE 0.8 + (status.CPULoad/5.0);
thrashRate: REAL ← status.reclamationRate/90.0;
IF thrashRate > 1.0 THEN thrashRate ← 1.0;
FOM ← Real.SqRt[thrashRate*thrashRate + adjustedBusyCPU*adjustedBusyCPU];
};
CommandUnavailable:
PUBLIC ENTRY
PROC [serverMachineName:
RPC.ShortROPE, commandName:
ROPE, version:
RPC.ShortROPE] = {
ENABLE UNWIND => NULL;
found: BOOL;
oldList: LIST OF BrokenCommand;
newBroken: BrokenCommand;
newList: LIST OF BrokenCommand;
machineList: LIST OF BrokenCommand;
val: REF ANY;
[found: found, val: val] ← SymTab.Fetch[x: BrokenCommandTable, key: commandName];
IF found
THEN {
oldList ← NARROW[val];
FOR machineList ← oldList, machineList.rest
UNTIL machineList =
NIL
DO
broken: BrokenCommand = machineList.first;
IF Rope.Equal[broken.serverMachineName, serverMachineName]
THEN {
versionList: LIST OF ROPE;
FOR versionList ← broken.versions, versionList.rest
UNTIL versionList =
NIL
DO
vers: ROPE ← NARROW[versionList.first];
IF Rope.Equal[vers, version] THEN RETURN;
ENDLOOP;
broken.versions ← CONS[version, broken.versions];
RETURN;
};
ENDLOOP;
newBroken ← NEW[BrokenCommandObject ← [serverMachineName: serverMachineName, versions: LIST[version]]];
newList ← CONS[newBroken, NARROW[val]];
[] ← SymTab.Store[x: BrokenCommandTable, key: commandName, val: newList];
}
ELSE {
newBroken ← NEW[BrokenCommandObject ← [serverMachineName: serverMachineName, versions: LIST[version]]];
newList ← LIST[newBroken];
[] ← SymTab.Store[x: BrokenCommandTable, key: commandName, val: newList];
};
};
ExtraCommandAvailable:
PUBLIC
ENTRY
PROC [serverMachineName:
RPC.ShortROPE, commandName:
ROPE, version:
RPC.ShortROPE] = {
ENABLE UNWIND => NULL;
found: BOOL;
oldList: LIST OF ExtraCommand;
newExtra: ExtraCommand;
newList: LIST OF ExtraCommand;
machineList: LIST OF ExtraCommand;
val:
REF
ANY;
See if this "Extra" reverses a previous broken
[found: found, val: val] ← SymTab.Fetch[x: BrokenCommandTable, key: commandName];
IF found
THEN {
[] ← SymTab.Delete[x: BrokenCommandTable, key: commandName];
};
[found: found, val: val] ← SymTab.Fetch[x: ExtraCommandTable, key: commandName];
IF found
THEN {
oldList ← NARROW[val];
FOR machineList ← oldList, machineList.rest
UNTIL machineList =
NIL
DO
extra: ExtraCommand = machineList.first;
IF Rope.Equal[extra.serverMachineName, serverMachineName]
THEN {
versionList: LIST OF ROPE;
FOR versionList ← extra.versions, versionList.rest
UNTIL versionList =
NIL
DO
vers: ROPE ← NARROW[versionList.first];
IF Rope.Equal[vers, version] THEN RETURN;
ENDLOOP;
extra.versions ← CONS[version, extra.versions];
RETURN;
};
ENDLOOP;
newExtra ← NEW[ExtraCommandObject ← [serverMachineName: serverMachineName, versions: LIST[version]]];
newList ← CONS[newExtra, NARROW[val]];
[] ← SymTab.Store[x: ExtraCommandTable, key: commandName, val: newList];
}
ELSE {
newExtra ← NEW[ExtraCommandObject ← [serverMachineName: serverMachineName, versions: LIST[version]]];
newList ← LIST[newExtra];
[] ← SymTab.Store[x: ExtraCommandTable, key: commandName, val: newList];
};
};
NoticeNewPackage:
PUBLIC
ENTRY
PROC [package:
RPC.ShortROPE]
RETURNS [error:
BOOL ←
FALSE, tryDifferentController:
BOOL ←
FALSE, msg:
ROPE ←
NIL]= {
ENABLE UNWIND => NULL;
statusList: LIST OF ServerStatus;
packageListInStream: IO.STREAM;
packageListGlobalName: ROPE ← NIL;
packageListOutStream: IO.STREAM;
packageListTempName: ROPE ← NIL;
foundItem: BOOL ← FALSE;
fullFName: ROPE;
fsErrMsg: ROPE;
cp: FS.ComponentPositions;
dirOmitted: BOOLEAN;
dfPrefix: ROPE;
shortPackage: ROPE ← NIL;
shortPackageAndDF: ROPE ← NIL;
packageDFDate: BasicTime.GMT ← BasicTime.nullGMT;
sawDir: BOOL ← FALSE;
blankAfterDir: BOOL ← FALSE;
lastDir: REF DFUtilities.DirectoryItem ← NIL;
wroteDir: BOOL ← FALSE;
defaultDirectory: REF DFUtilities.DirectoryItem ← NIL ;
whiteItem: REF DFUtilities.WhiteSpaceItem ← NEW[DFUtilities.WhiteSpaceItem ← [1]];
writeUpdatedItem:
PROC[] = {
file: REF DFUtilities.FileItem;
file ←
NEW[DFUtilities.FileItem ← [
name: shortPackageAndDF, -- a short name
date: [explicit, packageDFDate],
verifyRoot: FALSE]];
IF lastDir =
NIL
OR ~Rope.Equal[lastDir.path1, dfPrefix,
FALSE]
THEN {
dir:
REF DFUtilities.DirectoryItem ←
NEW [DFUtilities.DirectoryItem ← [
path1: dfPrefix,
path2: NIL,
path2IsCameFrom: FALSE,
exported: FALSE,
readOnly: FALSE
]];
DFUtilities.WriteItemToStream[out: packageListOutStream, item: whiteItem];
DFUtilities.WriteItemToStream[out: packageListOutStream, item: dir];
DFUtilities.WriteItemToStream[out: packageListOutStream, item: whiteItem];
lastDir ← dir;
};
DFUtilities.WriteItemToStream[out: packageListOutStream, item: file];
};
DoOneItem: DFUtilities.ProcessItemProc = {
PROC [item: REF ANY] RETURNS [stop: BOOL ← FALSE]
errors, warnings, filesActedUpon: INT ← 0;
WITH item
SELECT
FROM
dir:
REF DFUtilities.DirectoryItem => {
IF lastDir =
NIL
OR ~Rope.Equal[lastDir.path1, dir.path1,
FALSE]
THEN {
wroteDir ← FALSE;
lastDir ← dir;
};
};
comment:
REF DFUtilities.CommentItem => {
DFUtilities.WriteItemToStream[out: packageListOutStream, item: comment];
};
white:
REF DFUtilities.WhiteSpaceItem => {
};
file:
REF DFUtilities.FileItem => {
shortName: ROPE = Rope.Substr[file.name, 0, Rope.Index[s1: file.name, s2: "!"]];
packageDFDate: BasicTime.GMT ← BasicTime.nullGMT;
currentDFDate: BasicTime.GMT ← BasicTime.nullGMT;
IF ~wroteDir
THEN {
wroteDir ← TRUE;
IF lastDir = NIL THEN lastDir ← defaultDirectory;
DFUtilities.WriteItemToStream[out: packageListOutStream, item: whiteItem];
DFUtilities.WriteItemToStream[out: packageListOutStream, item: lastDir];
DFUtilities.WriteItemToStream[out: packageListOutStream, item: whiteItem];
};
IF Rope.Equal[shortPackageAndDF, shortName,
FALSE]
THEN {
writeUpdatedItem[];
foundItem ← TRUE;
}
ELSE {
DFUtilities.WriteItemToStream[out: packageListOutStream, item: file];
};
};
ENDCASE;
};
{
ENABLE
UNWIND => {
IF packageListInStream # NIL THEN packageListInStream.Close[! FS.Error => CONTINUE];
IF packageListOutStream # NIL THEN packageListOutStream.Close[! FS.Error => CONTINUE];
FS.Delete[name: packageListTempName ! FS.Error => CONTINUE];
};
IF package.IsEmpty[] THEN RETURN [ TRUE, FALSE, "null package name"];
IF ~ComputeControllerInternal.IAmTheController THEN RETURN [ FALSE, TRUE, "controller has moved"];
WHILE ComputeControllerInternal.IAmTheController
AND
BasicTime.Period[ComputeControllerInternal.TimeIBecameTheController, BasicTime.Now[]] < 22
DO
WAIT NoticeNewPackageCondition;
ENDLOOP;
IF ~ComputeControllerInternal.IAmTheController THEN RETURN [ FALSE, TRUE, "controller has moved"];
[fullFName: fullFName, cp: cp, dirOmitted: dirOmitted] ← FS.ExpandName[name: package, wDir: NIL ! FS.Error => GOTO badPackageName ];
shortPackage ← fullFName.Substr[cp.base.start, cp.base.length];
shortPackageAndDF ← Rope.Concat[shortPackage, ".df"];
dfPrefix ← fullFName.Substr[0, cp.base.start];
defaultDirectory ←
NEW [DFUtilities.DirectoryItem ← [
path1: dfPrefix,
path2: NIL,
path2IsCameFrom: FALSE,
exported: FALSE,
readOnly: FALSE
]];
[created: packageDFDate] ← FS.FileInfo[name: package, remoteCheck: TRUE ! FS.Error => GOTO noDFFile];
packageListGlobalName ← Rope.Concat[ComputeServerInternal.RemoteCommandDir, "PackageList"];
packageListInStream ← FS.StreamOpen[packageListGlobalName
! FS.Error => GOTO cantOpenPackageList];
packageListTempName ← Rope.Concat[ComputeServerInternal.LocalCommandDir, "PackageList.temp"];
packageListOutStream ← FS.StreamOpen[fileName: packageListTempName, accessOptions: $create];
DFUtilities.ParseFromStream[packageListInStream, DoOneItem ! DFUtilities.SyntaxError => GOTO syntaxErrorInPackageList];
IF ~foundItem THEN writeUpdatedItem[];
packageListInStream.Close[! FS.Error => CONTINUE];
packageListOutStream.Close[! FS.Error => CONTINUE];
[] ← FS.Copy[from: packageListTempName, to: packageListGlobalName, attach: FALSE ! FS.Error => { fsErrMsg ← error.explanation; GOTO cantCopyToServer}];
FS.Delete[name: packageListTempName ! FS.Error => CONTINUE];
IF ~ComputeControllerInternal.IAmTheController THEN RETURN [ FALSE, TRUE, "controller has moved"];
FOR statusList ← ComputeControllerInternal.ServerStatusList, statusList.rest
UNTIL statusList =
NIL
DO
statusList.first.newPackage ← TRUE;
ENDLOOP;
EXITS
cantOpenPackageList => RETURN [ TRUE, FALSE, "controller could not open package list"];
noDFFile => RETURN [ TRUE, FALSE, "controller could not open specified df file - "];
syntaxErrorInPackageList => RETURN [ TRUE, FALSE, "controller could not parse the package list - call an implementor"];
badPackageName => RETURN [ TRUE, FALSE, "bad package df file name"];
cantCopyToServer => RETURN [ TRUE, FALSE, Rope.Concat["could not copy new package list to the file server because ", fsErrMsg]];
};
};
RemoveOldPackage:
PUBLIC
ENTRY PROC [package:
RPC.ShortROPE]
RETURNS [error:
BOOL, tryDifferentController:
BOOL, msg: Rope.
ROPE] = {
ENABLE UNWIND => NULL;
statusList: LIST OF ServerStatus;
packageListInStream: IO.STREAM;
packageListGlobalName: ROPE ← NIL;
packageListOutStream: IO.STREAM;
packageListTempName: ROPE ← NIL;
foundItem: BOOL ← FALSE;
fullFName: ROPE;
cp: FS.ComponentPositions;
dirOmitted: BOOLEAN;
dfPrefix: ROPE;
shortPackage: ROPE ← NIL;
shortPackageAndDF: ROPE ← NIL;
wroteDir: BOOL ← FALSE;
lastDir: REF DFUtilities.DirectoryItem ← NIL;
defaultDirectory: REF DFUtilities.DirectoryItem ← NIL ;
whiteItem: REF DFUtilities.WhiteSpaceItem ← NEW[DFUtilities.WhiteSpaceItem ← [1]];
DoOneItem: DFUtilities.ProcessItemProc = {
PROC [item: REF ANY] RETURNS [stop: BOOL ← FALSE]
errors, warnings, filesActedUpon: INT ← 0;
WITH item
SELECT
FROM
dir:
REF DFUtilities.DirectoryItem => {
IF lastDir =
NIL
OR ~Rope.Equal[lastDir.path1, dir.path1,
FALSE]
THEN {
DFUtilities.WriteItemToStream[out: packageListOutStream, item: dir];
wroteDir ← FALSE;
lastDir ← dir;
};
};
comment:
REF DFUtilities.CommentItem => {
DFUtilities.WriteItemToStream[out: packageListOutStream, item: comment];
};
white:
REF DFUtilities.WhiteSpaceItem => {
DFUtilities.WriteItemToStream[out: packageListOutStream, item: white];
};
file:
REF DFUtilities.FileItem => {
shortName: ROPE = Rope.Substr[file.name, 0, Rope.Index[s1: file.name, s2: "!"]];
currentDFDate: BasicTime.GMT ← BasicTime.nullGMT;
IF Rope.Equal[shortPackageAndDF, shortName,
FALSE]
THEN {
foundItem ← TRUE;
}
ELSE {
IF ~wroteDir
THEN {
wroteDir ← TRUE;
IF lastDir = NIL THEN lastDir ← defaultDirectory;
DFUtilities.WriteItemToStream[out: packageListOutStream, item: whiteItem];
DFUtilities.WriteItemToStream[out: packageListOutStream, item: lastDir];
DFUtilities.WriteItemToStream[out: packageListOutStream, item: whiteItem];
};
DFUtilities.WriteItemToStream[out: packageListOutStream, item: file];
};
};
ENDCASE;
};
{
ENABLE
UNWIND => {
IF packageListInStream # NIL THEN packageListInStream.Close[! FS.Error => CONTINUE];
IF packageListOutStream # NIL THEN packageListOutStream.Close[! FS.Error => CONTINUE];
FS.Delete[name: packageListTempName ! FS.Error => CONTINUE];
};
IF package.IsEmpty[] THEN RETURN [ TRUE, FALSE, "null package name"];
IF ~ComputeControllerInternal.IAmTheController THEN RETURN [ FALSE, TRUE, "controller has moved"];
WHILE ComputeControllerInternal.IAmTheController
AND
BasicTime.Period[ComputeControllerInternal.TimeIBecameTheController, BasicTime.Now[]] < 22
DO
WAIT NoticeNewPackageCondition;
ENDLOOP;
IF ~ComputeControllerInternal.IAmTheController THEN RETURN [ FALSE, TRUE, "controller has moved"];
[fullFName: fullFName, cp: cp, dirOmitted: dirOmitted] ← FS.ExpandName[name: package, wDir: NIL ! FS.Error => GOTO badPackageName ];
shortPackage ← fullFName.Substr[cp.base.start, cp.base.length];
shortPackageAndDF ← Rope.Concat[shortPackage, ".df"];
dfPrefix ← fullFName.Substr[0, cp.base.start];
defaultDirectory ←
NEW [DFUtilities.DirectoryItem ← [
path1: dfPrefix,
path2: NIL,
path2IsCameFrom: FALSE,
exported: FALSE,
readOnly: FALSE
]];
packageListGlobalName ← Rope.Concat[ComputeServerInternal.RemoteCommandDir, "PackageList"];
packageListInStream ← FS.StreamOpen[packageListGlobalName
! FS.Error => GOTO cantOpenPackageList];
packageListTempName ← Rope.Concat[ComputeServerInternal.LocalCommandDir, "PackageList.temp"];
packageListOutStream ← FS.StreamOpen[fileName: packageListTempName, accessOptions: $create];
DFUtilities.ParseFromStream[packageListInStream, DoOneItem ! DFUtilities.SyntaxError => GOTO syntaxErrorInPackageList];
packageListInStream.Close[! FS.Error => CONTINUE];
packageListOutStream.Close[! FS.Error => CONTINUE];
IF ~foundItem
THEN {
FS.Delete[name: packageListTempName ! FS.Error => CONTINUE];
RETURN [ TRUE, FALSE, "package not found"]
};
[] ← FS.Copy[from: packageListTempName, to: packageListGlobalName, attach: FALSE];
FS.Delete[name: packageListTempName ! FS.Error => CONTINUE];
IF ~ComputeControllerInternal.IAmTheController THEN RETURN [ FALSE, TRUE, "controller has moved"];
FOR statusList ← ComputeControllerInternal.ServerStatusList, statusList.rest
UNTIL statusList =
NIL
DO
statusList.first.newPackage ← TRUE;
ENDLOOP;
EXITS
cantOpenPackageList => RETURN [ TRUE, FALSE, "controller could not open package list"];
syntaxErrorInPackageList => RETURN [ TRUE, FALSE, "controller could not parse the package list - call an implementor"];
badPackageName => RETURN [ TRUE, FALSE, "bad package df file name"];
};
};
NewStats:
PUBLIC
ENTRY
PROC [serverMachineName:
RPC.ShortROPE, serverMachinePupAddress:
RPC.ShortROPE, serverUP:
BOOL, firstCall:
BOOL, machineType: PrincOps.MachineType, mainMemory:
CARDINAL, numberCPUs:
CARDINAL, diskPartionSize:
INT, freePagesOnDisk:
INT, freeboard:
INT, freeGFI:
CARDINAL, freeMDS:
CARDINAL, freeVM:
CARDINAL, oldestLRUFileDate: BasicTime.
GMT, CPULoad:
REAL, reclamationRate:
REAL, freeProcesses:
CARDINAL]
RETURNS [terminateService, newPackage:
BOOL ←
FALSE, queueingCommands:
LIST
OF
ROPE ←
NIL] = {
ENABLE UNWIND => NULL;
statusList: LIST OF ServerStatus;
status: ServerStatus ← NIL ;
lastStatusList: LIST OF ServerStatus ← NIL ;
FOR statusList ← ServerStatusList, statusList.rest
UNTIL statusList =
NIL
DO
IF Rope.Equal[statusList.first.serverMachineName, serverMachineName]
THEN {
status ← statusList.first;
IF ~serverUP
THEN {
IF lastStatusList = NIL THEN ServerStatusList ← statusList.rest
ELSE lastStatusList.rest ← statusList.rest;
}
ELSE {
newPackage ← status.newPackage;
status.newPackage ← FALSE;
status.freePagesOnDisk ← freePagesOnDisk;
status.freeboard ← freeboard;
status.freeGFI ← freeGFI;
status.freeMDS ← freeMDS;
status.freeVM ← freeVM;
status.oldestLRUFileDate ← oldestLRUFileDate;
status.CPULoad ← CPULoad;
status.reclamationRate ← reclamationRate;
status.freeProcesses ← freeProcesses;
status.FOM ← figureOfMerit[status: status];
status.timeOfStatus ← BasicTime.Now[];
IF ~SymTab.Fetch[x: MachineToStatusTable, key: serverMachineName].found
THEN {
[] ← SymTab.Store[x: MachineToStatusTable, key: serverMachineName, val: status];
[] ← SymTab.Store[x: MachineToStatusTable, key: serverMachinePupAddress, val: status];
};
};
EXIT;
};
lastStatusList ← statusList;
REPEAT
FINISHED => status ← NIL;
ENDLOOP;
IF status =
NIL
AND serverUP
THEN {
status ←
NEW[ServerStatusRecord ← [
serverMachineName: serverMachineName,
serverMachinePupAddress: serverMachinePupAddress,
timeOfStatus: BasicTime.Now[],
machineType: machineType,
mainMemory: mainMemory,
numberCPUs: numberCPUs,
diskPartionSize:diskPartionSize,
freePagesOnDisk: freePagesOnDisk,
freeboard: freeboard,
freeGFI: freeGFI,
freeMDS: freeMDS,
freeVM: freeVM,
oldestLRUFileDate: oldestLRUFileDate,
CPULoad: CPULoad,
reclamationRate: reclamationRate,
freeProcesses: freeProcesses
]];
status.FOM ← figureOfMerit[status: status];
[] ← SymTab.Store[x: MachineToStatusTable, key: serverMachineName, val: status];
[] ← SymTab.Store[x: MachineToStatusTable, key: serverMachinePupAddress, val: status];
ServerStatusList ← CONS[status, ServerStatusList];
};
IF status #
NIL
AND status.newQueueing
THEN {
status.newQueueing ← FALSE;
FOR l:
LIST
OF QueueEntry ← WaitingQueue, l.rest
UNTIL l =
NIL
DO
IF l.first.doQueueing THEN queueingCommands ← CONS[l.first.service, queueingCommands];
ENDLOOP;
};
IF firstCall
THEN {
CleanBroken: SymTab.EachPairAction = {
PROC [key: Key, val: Val] RETURNS [quit: BOOL];
ENABLE UNWIND => NULL;
brokenList, newBrokenList, brokenListLoop: LIST OF BrokenCommand ← NIL;
changed: BOOL ← FALSE;
brokenList ← NARROW[val];
FOR brokenListLoop ← brokenList, brokenListLoop.rest
UNTIL brokenListLoop =
NIL
DO
IF Rope.Equal[brokenListLoop.first.serverMachineName, serverMachinePupAddress,
FALSE]
THEN {
changed ← TRUE;
LOOP;
}
ELSE {
newBrokenList ← CONS[brokenListLoop.first, newBrokenList];
};
ENDLOOP;
IF changed THEN [] ← SymTab.Store[x: BrokenCommandTable, key: key, val: newBrokenList];
RETURN[FALSE];
};
CleanExtra: SymTab.EachPairAction = {
PROC [key: Key, val: Val] RETURNS [quit: BOOL];
ENABLE UNWIND => NULL;
extraList, newExtraList, extraListLoop: LIST OF ExtraCommand ← NIL;
changed: BOOL ← FALSE;
extraList ← NARROW[val];
FOR extraListLoop ← extraList, extraListLoop.rest
UNTIL extraListLoop =
NIL
DO
IF Rope.Equal[extraListLoop.first.serverMachineName, serverMachinePupAddress,
FALSE]
THEN {
changed ← TRUE;
LOOP;
}
ELSE {
newExtraList ← CONS[extraListLoop.first, newExtraList];
};
ENDLOOP;
IF changed THEN [] ← SymTab.Store[x: ExtraCommandTable, key: key, val: newExtraList];
RETURN[FALSE];
};
[] ← SymTab.Pairs[x: BrokenCommandTable, action: CleanBroken];
[] ← SymTab.Pairs[x: ExtraCommandTable, action: CleanExtra];
};
};
getServerInterfaceFromCache:
ENTRY
PROC [serverInstance:
RPC.ShortROPE]
RETURNS [serverInterface: ComputeServerRpcControl.InterfaceRecord ←
NIL] = {
ENABLE UNWIND => NULL;
newServerInterface: ComputeServerRpcControl.InterfaceRecord ← NIL;
bestIndex: INT ← 0;
bestTime: BasicTime.GMT ← BasicTime.latestGMT;
now: BasicTime.GMT ← BasicTime.Now[];
FOR index:
INT
IN [0..serverInterfaceCacheSize)
DO
IF Rope.Equal[serverInstance, serverInterfaceCache[index].serverInstance]
THEN {
newServerInterface ← serverInterfaceCache[index].interface;
serverInterfaceCache[index].lastUsed ← now;
RETURN[serverInterfaceCache[index].interface];
};
IF BasicTime.Period[bestTime, serverInterfaceCache[index].lastUsed] < 0
THEN {
bestTime ← serverInterfaceCache[index].lastUsed;
bestIndex ← index;
};
ENDLOOP;
newServerInterface ← ComputeServerRpcControl.ImportNewInterface[
interfaceName: [ type: "ComputeServer.summoner", instance: serverInstance, version: [1,1]]
!
RPC.ImportFailed => {
CONTINUE;
};
];
IF newServerInterface #
NIL
THEN {
serverInterfaceCache[bestIndex].lastUsed ← now;
serverInterfaceCache[bestIndex].serverInstance ← serverInstance;
serverInterfaceCache[bestIndex].interface ← newServerInterface;
};
RETURN[newServerInterface];
};
deleteServerInterfaceFromCache:
ENTRY
PROC [serverInterface: ComputeServerRpcControl.InterfaceRecord]
RETURNS [gotIt:
BOOL ←
FALSE] = {
ENABLE UNWIND => NULL;
FOR index:
INT
IN [0..serverInterfaceCacheSize)
DO
IF serverInterfaceCache[index].interface = serverInterface
THEN {
serverInterfaceCache[index].serverInstance ← NIL;
serverInterfaceCache[index].interface ← NIL;
serverInterfaceCache[index].lastUsed ← BasicTime.earliestGMT;
gotIt ← TRUE;
EXIT;
};
ENDLOOP;
};
findOrCreateQueueEntry:
ENTRY
PROC [service:
ROPE]
RETURNS [entry: QueueEntry ←
NIL, item: WaitingRequest] = {
ENABLE UNWIND => NULL;
item ← NEW[WaitingRequestRec ];
FOR queue:
LIST
OF QueueEntry ← WaitingQueue, queue.rest
UNTIL queue =
NIL
DO
IF Rope.Equal[queue.first.service, service,
FALSE]
THEN {
entry ← queue.first;
IF entry.waiters = NIL THEN entry.waiters ← CONS[item, NIL]
ELSE
FOR l:
LIST
OF WaitingRequest ← entry.waiters, l.rest
UNTIL l =
NIL
DO
IF l.rest = NIL THEN {l.rest ← CONS[item, NIL]; EXIT};
ENDLOOP;
RETURN; -- existing queue
};
ENDLOOP;
WaitingQueue ←
CONS[
entry ← NEW[QueueEntryRec ← [service: service, waiters: LIST[item]]],
WaitingQueue
];
TRUSTED {Process.InitializeCondition[@entry.changed, Process.SecondsToTicks[1]];};
};
setNewQueueing:
ENTRY
PROC [] = {
ENABLE UNWIND => NULL;
FOR l:
LIST
OF ServerStatus ← ServerStatusList, l.rest
UNTIL l =
NIL
DO
l.first.newQueueing ← TRUE;
ENDLOOP;
LastReBuildQueueTime ← BasicTime.Now[];
};
queueItemReady:
ENTRY
PROC [entry: QueueEntry, item: WaitingRequest]= {
ENABLE UNWIND => NULL;
firstTime: BOOL ← TRUE;
item.waiting ← true;
DO
IF BasicTime.Period[from: item.timeout, to: BasicTime.Now[]] > 0 THEN {item.waiting ← timedOut; RETURN};
SELECT item.waiting
FROM
true => {
IF entry.servers #
NIL
AND entry.waiters.first = item
THEN {
bestStatus: ServerStatus ← NIL;
prev, prevBest: LIST OF ROPE ← NIL;
best: LIST OF ROPE ← NIL;
serv: ROPE;
FOR l:
LIST
OF
ROPE ← entry.servers, l.rest
UNTIL l =
NIL
DO
statusFound: BOOL;
val: REF ANY;
status: ServerStatus;
serv ← l.first;
[found: statusFound, val: val] ← SymTab.Fetch[x: MachineToStatusTable, key: serv];
IF statusFound
THEN {
status ← NARROW[val];
IF IsStatusBetter[status, bestStatus]
THEN {
bestStatus ← status;
best ← l;
prevBest ← prev;
};
};
prev ← l;
ENDLOOP;
IF prevBest =
NIL
THEN {
item.instance ← entry.servers.first;
entry.servers ← entry.servers.rest;
}
ELSE {
item.instance ← best.first;
prevBest.rest ← best.rest;
};
item.instance ← entry.servers.first;
entry.servers ← entry.servers.rest;
item.serverAssigned ← TRUE;
item.waiting ← validating;
RETURN;
};
};
done => {
item.serverAssigned ← TRUE;
item.waiting ← validating;
RETURN;
};
ENDCASE;
IF firstTime THEN {forceReBuildQueuing ← TRUE};
firstTime ← FALSE;
WAIT entry.changed;
ENDLOOP;
};
removeQueueItem:
ENTRY
PROC [entry: QueueEntry, item: WaitingRequest] = {
ENABLE UNWIND => NULL;
BROADCAST entry.changed;
IF entry.waiters = NIL THEN RETURN; -- actually, this is a bug
IF entry.waiters.first = item THEN {entry.waiters ← entry.waiters.rest; RETURN};
FOR l:
LIST
OF WaitingRequest ← entry.waiters, l.rest
UNTIL l.rest =
NIL
DO
IF l.rest.first = item THEN {l.rest ← l.rest.rest; RETURN};
ENDLOOP;
};
MightAcceptQueuedCommand:
PUBLIC
ENTRY
PROC [serverMachineAddress:
RPC.ShortROPE, commandName: Rope.
ROPE] = {
ENABLE UNWIND => NULL;
item: WaitingRequest ← NIL;
FOR queue:
LIST
OF QueueEntry ← WaitingQueue, queue.rest
UNTIL queue =
NIL
DO
entry: QueueEntry = queue.first;
IF Rope.Equal[commandName, entry.service,
FALSE]
THEN {
IF entry.waiters =
NIL
OR entry.waiters.first.waiting # true
THEN {
-- nobody waiting now or top waiter is busy
IF entry.servers = NIL THEN entry.servers ← CONS[serverMachineAddress, NIL]
ELSE
FOR l:
LIST
OF
ROPE ← entry.servers, l.rest
UNTIL l =
NIL
DO
IF Rope.Equal[l.first , serverMachineAddress, FALSE] THEN EXIT;
IF l.rest = NIL THEN {l.rest ← CONS[serverMachineAddress, NIL]; EXIT};
ENDLOOP;
BROADCAST entry.changed;
EXIT;
}
ELSE {
entry.waiters.first.waiting ← done;
entry.waiters.first.instance ← serverMachineAddress;
BROADCAST entry.changed;
EXIT;
};
};
ENDLOOP;
};
IsStatusBetter:
PROC [status: ServerStatus, bestStatus: ServerStatus]
RETURNS [ itsBetter:
BOOL ←
FALSE] = {
SELECT
TRUE
FROM
bestStatus = NIL => itsBetter ← TRUE;
((3*bestStatus.reclamationRate) > status.reclamationRate) AND (bestStatus.reclamationRate > 5) AND (status.FOM < 2*bestStatus.FOM )=> itsBetter ← TRUE;
ENDCASE => {
IF status.FOM < bestStatus.FOM THEN itsBetter ← TRUE;
};
};
locked: BOOL ← FALSE;
lockCondition: CONDITION;
Lock:
ENTRY
PROC = {
WHILE locked
DO
WAIT lockCondition;
ENDLOOP;
locked ← TRUE;
};
UnLock:
ENTRY
PROC = {
locked ← FALSE;
NOTIFY lockCondition
};