ComputeControllerImpl:
CEDAR
MONITOR
IMPORTS AMProcess, BasicTime, Commander, ComputeControllerInternal, ComputeServerControl, ComputeServerControllerRpcControl, ComputeServerInternal, ComputeServerRpcControl, DFUtilities, FS, GVNames, IO, Process, PupName, Real, Rope, RPC, SymTab, UserCredentials, UserProfile, ViewerIO, ViewerOps, WorldVM
EXPORTS ComputeControllerInternal, ComputeServerController, SummonerControllerControl
= BEGIN
ROPE: TYPE = Rope.ROPE;
NoticeNewPackageCondition: CONDITION;
InterfaceExported: BOOL ← FALSE;
ControllerEnabled: BOOL ← FALSE;
LatentControllerActive: BOOL ← TRUE;
IAmTheController: PUBLIC BOOL ← FALSE;
ControllerMovingAway: PUBLIC BOOL ← FALSE;
GrabController: PUBLIC BOOL ← FALSE;
TimeIBecameTheController: PUBLIC BasicTime.GMT ← BasicTime.nullGMT;
ControllerUp: BOOL ← TRUE;
ControllerGVName: ROPE ← NIL;
ControllerError: TYPE = {none, import, callFailed};
LastControllerError: ControllerError ← none;
LastImportFailure: RPC.ImportFailure;
LastCallFailure: RPC.CallFailure;
ControllerFailedTime: BasicTime.GMT;
LastReBuildQueueTime: BasicTime.GMT ← BasicTime.nullGMT;
ServerStatusRecord: TYPE = ComputeControllerInternal.ServerStatusRecord;
ServerStatus: TYPE = REF ServerStatusRecord;
ServerStatusList: PUBLIC LIST OF ServerStatus ← NIL;
BrokenCommandTable: SymTab.Ref; -- knowing a command, find where its broken
ExtraCommandTable: SymTab.Ref; -- knowing a non-standard command, find who can run it
MachineToStatusTable: SymTab.Ref; -- knowing a machine's name, find its status; name is of the form "Hornet" or "3#35#"
BrokenCommandObject:
TYPE =
RECORD [
serverMachineName: RPC.ShortROPE,
versions: LIST OF ROPE ← NIL
];
BrokenCommand: TYPE = REF BrokenCommandObject;
ExtraCommandObject:
TYPE =
RECORD [
serverMachineName: RPC.ShortROPE,
versions: LIST OF ROPE ← NIL
];
ExtraCommand: TYPE = REF ExtraCommandObject;
forceReBuildQueuing: BOOL ← FALSE;
WaitingQueue: LIST OF QueueEntry ← NIL;
QueueEntry: TYPE = REF QueueEntryRec;
QueueEntryRec:
TYPE =
RECORD [
service: ROPE ← NIL,
doQueueing: BOOL ← FALSE,
waiters: LIST OF WaitingRequest ← NIL,
servers: LIST OF ROPE ← NIL,
changed: CONDITION
];
WaitingRequest: TYPE = REF WaitingRequestRec;
WaitingRequestRec:
TYPE =
RECORD [
process: PROCESS,
timeout: BasicTime.GMT,
waiting: waitType ← init,
version: ROPE,
clientMachineName: RPC.ShortROPE,
userName: RPC.ShortROPE,
streamPupAddress: Pup.Address,
needListener: BOOL,
serverAssigned: BOOL ← FALSE,
response: ATOM,
instance: RPC.ShortROPE,
serverPupAddress: Pup.Address
];
waitType: TYPE = {init, true, validating, done, timedOut};
serverInterfaceItem:
TYPE =
RECORD [
serverInstance: ROPE ← NIL,
interface: ComputeServerRpcControl.InterfaceRecord ← NIL,
lastUsed: BasicTime.GMT ← BasicTime.earliestGMT
];
serverInterfaceCacheSize: INT = 20;
serverInterfaceArray: TYPE = ARRAY [0..serverInterfaceCacheSize) OF serverInterfaceItem;
serverInterfaceCache: REF serverInterfaceArray;
CheckState: TYPE = {checking, aborted, done};
CheckItemProcessObj:
TYPE =
RECORD[
check: CheckState ← checking,
communicationsOK: BOOL ← TRUE,
askFound: ATOM,
serverPupAddress: Pup.Address,
errMsg: ROPE
];
NoOfLatentSummonerControllerProcesses: INT ← 0 ;
ControllerEventType: TYPE = {service, newPackage, newServer, lostServer, becomeController, noticeOtherController, cedeController};
ControllerEvent:
TYPE =
RECORD [
type: ControllerEventType, -- type of event
reason: Rope.ROPE, -- if relevant, the reason for the event
command: Rope.ROPE, -- command name (service and newPackage)
time: BasicTime.GMT,
remoteMachineName: Rope.ROPE, -- name of other machine if relevant (user's machine or controller)
remoteUser: Rope.ROPE, -- name of user if relevant
chain: REF ControllerEvent -- used internally to chain events together
];
anotherControllerEvent: CONDITION;
controllerEventListTail: REF ControllerEvent ← NIL;
watchingController: BOOLEAN ← FALSE;
StartUpController:
PUBLIC
PROC [controllerName:
ROPE ←
NIL, forceSelfAsPrimaryController:
BOOL ←
FALSE]
RETURNS [] = {
locked: BOOL ← FALSE;
{
ENABLE UNWIND => IF locked THEN UnLock[];
user, password: ROPE;
ControllerGVName ← IF Rope.IsEmpty[controllerName] THEN UserProfile.Token["Summoner.ControllerName"] ELSE controllerName;
IF Rope.IsEmpty[ControllerGVName] THEN ControllerGVName ← "PaloAlto1.summoner" ;
ControllerEnabled ← TRUE;
LatentControllerActive ← TRUE;
ControllerUp ← TRUE;
Lock[];
locked ← TRUE;
IF ~InterfaceExported
THEN {
info: GVNames.ConnectInfo;
connect: GVBasics.Connect;
[info: info, connect: connect ] ← GVNames.GetConnect[ControllerGVName];
IF info = individual
OR info = group
THEN {
IF forceSelfAsPrimaryController
OR Rope.Equal[connect, ComputeServerControl.MyNetAddressRope]
OR Rope.Equal[UserProfile.Token["Summoner.PrimaryController"], ComputeServerControl.MyNetAddressRope]
THEN {
ReportControllerEvent[becomeController, IF forceSelfAsPrimaryController THEN "SummonerBecomeController command" ELSE "Grapevine thinks I'm the controller but I don't (yet)", NIL, BasicTime.Now[], NIL, NIL];
IAmTheController ← TRUE;
TimeIBecameTheController ← BasicTime.Now[];
[user, password] ← UserCredentials.Get[];
InterfaceExported ← TRUE;
BrokenCommandTable ← SymTab.Create[mod: 59, case: FALSE];
ExtraCommandTable ← SymTab.Create[mod: 59, case: FALSE];
MachineToStatusTable ← SymTab.Create[mod: 59, case: FALSE];
ComputeServerControllerRpcControl.ExportInterface[
interfaceName: [ type: "ComputeServerController.summoner", instance: ControllerGVName, version: [1,1]],
user: user,
password: RPC.MakeKey[password]
!
RPC.ExportFailed => {
InterfaceExported ← FALSE;
IAmTheController ← FALSE;
ReportControllerEvent[cedeController, "failed export of RPC interface", NIL, BasicTime.Now[], NIL, NIL];
CONTINUE;
};
];
};
};
};
UnLock[];
locked ← FALSE;
TRUSTED {Process.Detach[FORK LatentSummonerController[]] ;};
};
};
ControllerCannnotBeImported:
PUBLIC
PROC [reason:
RPC.ImportFailure] = {
LastControllerError ← import;
LastImportFailure ← reason;
NoticeControllerIsDown[];
};
ControllerCallFailed:
PUBLIC
PROC [reason:
RPC.CallFailure] = {
LastControllerError ← callFailed;
LastCallFailure ← reason;
NoticeControllerIsDown[];
};
NoticeControllerIsDown:
PROC = {
IF ControllerUp THEN ControllerFailedTime ← BasicTime.Now[];
ControllerUp ← FALSE;
};
NoticeControllerIsUp:
PUBLIC
PROC = {
ControllerUp ← TRUE;
};
TryThisController:
PROC
RETURNS [importedOK:
BOOL ←
TRUE, controllerInterface: ComputeServerControllerRpcControl.InterfaceRecord] = {
controllerInterface ← ComputeServerControllerRpcControl.ImportNewInterface[
interfaceName: [ type: "ComputeServerController.summoner", instance: ControllerGVName, version: [1,1]]
!
RPC.ImportFailed => {
importedOK ← FALSE;
CONTINUE;
};
];
};
LatentSummonerController:
PROC [] = {
MakeSureThereIsOnlyOne:
ENTRY
PROC
RETURNS [exit:
BOOL ←
FALSE] = {
IF NoOfLatentSummonerControllerProcesses = 0 THEN NoOfLatentSummonerControllerProcesses ← 1
ELSE RETURN[TRUE];
};
locked: BOOL ← FALSE;
IF MakeSureThereIsOnlyOne[] THEN RETURN;
DO
ENABLE UNWIND => IF locked THEN UnLock[];
IF ~LatentControllerActive THEN EXIT;
IF ~ControllerUp
OR GrabController
THEN {
-- controller is down or I should grab it - see if I should acquire the controller function
info: GVNames.ConnectInfo;
connect: GVBasics.Connect;
ControllerMovingAway ← FALSE;
IF GrabController
OR ~TryThisController[].importedOK
THEN {
-- down but import will work soon
downTime: INT = BasicTime.Period[ControllerFailedTime, BasicTime.Now[]];
[info: info, connect: connect ] ← GVNames.GetConnect[ControllerGVName];
IF GrabController
OR downTime > 83
OR
((downTime > 13
AND (info = individual
OR info = group)
AND Rope.Equal[connect, ComputeServerControl.MyNetAddressRope])
OR Rope.Equal[UserProfile.Token["Summoner.PrimaryController"], ComputeServerControl.MyNetAddressRope])
THEN {
user, password: ROPE;
Lock[];
locked ← TRUE;
ReportControllerEvent[becomeController, IF GrabController THEN "SummonerBecomeController command" ELSE "Grapevine thinks I'm the controller but I don't (yet)", NIL, BasicTime.Now[], NIL, NIL];
GrabController ← FALSE;
IAmTheController ← TRUE;
TimeIBecameTheController ← BasicTime.Now[];
[user, password] ← UserCredentials.Get[];
IF InterfaceExported THEN ComputeServerControllerRpcControl.UnexportInterface[];
InterfaceExported ← TRUE;
BrokenCommandTable ← SymTab.Create[mod: 59, case: FALSE];
ExtraCommandTable ← SymTab.Create[mod: 59, case: FALSE];
MachineToStatusTable ← SymTab.Create[mod: 59, case: FALSE];
ComputeServerControllerRpcControl.ExportInterface[
interfaceName: [ type: "ComputeServerController.summoner", instance: ControllerGVName, version: [1,1]],
user: user,
password: RPC.MakeKey[password]
!
RPC.ExportFailed => {
InterfaceExported ← FALSE;
IAmTheController ← FALSE;
ReportControllerEvent[cedeController, "failed export of RPC interface", NIL, BasicTime.Now[], NIL, NIL];
CONTINUE;
};
];
UnLock[];
locked ← FALSE;
};
};
}
ELSE {
-- Controller is up. If I think that I am the controller, but grapevine does not think so, then fix the situation
IF IAmTheController
THEN {
info: GVNames.ConnectInfo;
connect: GVBasics.Connect;
[info: info, connect: connect ] ← GVNames.GetConnect[ControllerGVName];
IF (info = individual
OR info = group)
THEN {
IF ~Rope.Equal[connect, ComputeServerControl.MyNetAddressRope]
THEN {
-- Grapevine says I am not the Controller
triedControllerInterface: ComputeServerControllerRpcControl.InterfaceRecord;
tryDifferentController: BOOL ← FALSE;
importedOK: BOOL;
[importedOK: importedOK, controllerInterface: triedControllerInterface] ← TryThisController[];
See if this Controller thinks everything is fine
IF importedOK
THEN [tryDifferentController: tryDifferentController] ← triedControllerInterface.GetSomeInfo[ !
RPC.CallFailed => {
importedOK ← FALSE;
CONTINUE;
};
];
IF importedOK
AND ~tryDifferentController
THEN {
-- new controller answers
IF ServerStatusList =
NIL
THEN {
Lock[];
locked ← TRUE;
IAmTheController ← FALSE;
ReportControllerEvent[cedeController, "other controller has assumed role", NIL, BasicTime.Now[], PupName.HisName[PupName.NameLookup[connect, Pup.nullSocket]], NIL];
ControllerMovingAway ← FALSE;
ComputeServerControllerRpcControl.UnexportInterface[];
InterfaceExported ← FALSE;
ServerStatusList ← NIL;
BrokenCommandTable ← NIL;
ExtraCommandTable ← NIL;
UnLock[];
locked ← FALSE;
}
ELSE {
ReportControllerEvent[noticeOtherController, "new controller starting to assume role", NIL, BasicTime.Now[], PupName.HisName[PupName.NameLookup[connect, Pup.nullSocket]], NIL];
ControllerMovingAway ← TRUE;
};
}
ELSE {
-- new controller does not answer or thinks it is moving too
ControllerMovingAway ← FALSE;
IF ~importedOK
THEN {
-- new controller does not answer — re-assert my controllership
user, password: ROPE;
[user, password] ← UserCredentials.Get[];
[] ← GVNames.SetConnect[user: user, password: RPC.MakeKey[password], individual: ControllerGVName, connect: ComputeServerControl.MyNetAddressRope];
};
};
}
ELSE {
-- Grapevine says I am the Controller
ControllerMovingAway ← FALSE;
};
}
ELSE {
-- NOT info = individual OR info = group (grapevine troubles)
ControllerMovingAway ← FALSE;
};
TossOldStatus[];
ReBuildQueuing[];
};
};
Process.Pause[Process.SecondsToTicks[127]];
ENDLOOP;
};
ShutDownController:
PUBLIC
PROC []
RETURNS []
= {
IF InterfaceExported THEN ComputeServerControllerRpcControl.UnexportInterface[];
InterfaceExported ← FALSE;
ControllerEnabled ← FALSE;
ServerStatusList ← NIL;
BrokenCommandTable ← NIL;
ExtraCommandTable ← NIL;
MachineToStatusTable ← NIL;
LatentControllerActive ← FALSE;
ControllerUp ← FALSE;
IF IAmTheController THEN ReportControllerEvent[cedeController, "Compute Server shutdown", NIL, BasicTime.Now[], NIL, NIL];
IAmTheController ← FALSE;
};
TossOldStatus:
ENTRY
PROC []
RETURNS []
= {
ENABLE UNWIND => NULL;
statusList: LIST OF ServerStatus;
lastStatusList: LIST OF ServerStatus ← NIL;
status: ServerStatus ← NIL;
now: BasicTime.GMT ← BasicTime.Now[];
FOR statusList ← ServerStatusList, statusList.rest
UNTIL statusList =
NIL
DO
status ← statusList.first;
IF BasicTime.Period[status.timeOfStatus, now] > 30
THEN {
InternalReportControllerEvent[type: lostServer, reason: "no status for 30 seconds", command: NIL, time: BasicTime.Now[], remoteMachineName: status.serverMachineName, userName: status.userName];
[] ← SymTab.Delete[x: MachineToStatusTable, key: status.serverMachineName];
[] ← SymTab.Delete[x: MachineToStatusTable, key: status.serverMachinePupAddress];
IF lastStatusList = NIL THEN ServerStatusList ← statusList.rest
ELSE lastStatusList.rest ← statusList.rest;
};
lastStatusList ← statusList;
ENDLOOP;
};
ReBuildQueuing
:
PROC []
RETURNS []
= {
now: BasicTime.GMT ← BasicTime.Now[];
IF forceReBuildQueuing
OR BasicTime.Period[LastReBuildQueueTime, now] > 300
THEN {
setNewQueueing[];
LastReBuildQueueTime ← now;
};
forceReBuildQueuing ← FALSE;
};
GetSomeInfo:
PUBLIC
ENTRY
PROC
RETURNS [error:
BOOL ←
FALSE, tryDifferentController:
BOOL ←
FALSE, msg:
ROPE ←
NIL, serverList:
LIST
OF
ROPE ←
NIL, bestFOM:
REAL ← 0.0] = {
ENABLE UNWIND => NULL;
bestStatus: ServerStatus ← NIL;
statusList: LIST OF ServerStatus;
IF ~IAmTheController THEN RETURN [ FALSE, TRUE, "controller has moved", NIL, 0.0];
FOR statusList ← ServerStatusList, statusList.rest
UNTIL statusList =
NIL
DO
status: ServerStatus = statusList.first;
perCentBusy: INT ← Real.Round[status.CPULoad*100.0];
IF Rope.Equal[status.serverMachineName, ComputeServerInternal.myHostName]
THEN {
serverList ← CONS[IO.PutFR["%g (Controller, %g%% busy)", IO.rope[status.serverMachineName], IO.int[perCentBusy]], serverList];
}
ELSE serverList ← CONS[IO.PutFR["%g (%g%%)", IO.rope[status.serverMachineName], IO.int[perCentBusy]], serverList];
IF Rope.Equal[status.serverMachineName, ComputeServerInternal.myHostName] THEN {
serverList ← CONS[Rope.Concat[status.serverMachineName, " (Controller)"], serverList];
}
ELSE serverList ← CONS[status.serverMachineName, serverList];
IF IsStatusBetter[status, bestStatus] THEN bestStatus ← status;
ENDLOOP;
IF bestStatus = NIL THEN RETURN [FALSE, ControllerMovingAway, "no servers up", serverList, 1.0];
RETURN [FALSE, ControllerMovingAway, NIL, serverList, bestStatus.FOM];
};
BestServerStats:
PUBLIC
ENTRY
PROC
RETURNS[instance:
RPC.ShortROPE,
FOM:
REAL] = {
ENABLE UNWIND => NULL;
bestStatus: ServerStatus ← NIL;
statusList: LIST OF ServerStatus;
IF ~InterfaceExported THEN ERROR RPC.CallFailed[unbound]; -- local import still works after we "UnexportInterface"
FOR statusList ← ServerStatusList, statusList.rest
UNTIL statusList =
NIL
DO
status: ServerStatus = statusList.first;
IF IsStatusBetter[status, bestStatus] THEN bestStatus ← status;
ENDLOOP;
IF bestStatus = NIL THEN RETURN [NIL, 1.0];
RETURN [bestStatus.serverMachineName, bestStatus.FOM];
};
bumpBestServerStats:
ENTRY
PROC [instance:
RPC.ShortROPE] = {
ENABLE UNWIND => NULL;
statusFound: BOOL;
val: REF ANY;
status: ServerStatus;
tempMachineToStatusTable: SymTab.Ref ← MachineToStatusTable;
IF tempMachineToStatusTable #
NIL
THEN {
[found: statusFound, val: val] ← SymTab.Fetch[x: tempMachineToStatusTable, key: instance];
IF statusFound
THEN {
status ← NARROW[val];
status.CPULoad ← MIN[ 1.0, status.CPULoad + 0.5];
status.nonBackgroundCPULoad ← MIN[ 1.0, status.nonBackgroundCPULoad + 0.5];
status.reclamationRate ← status.reclamationRate + 30 ;
status.numberOfCurrentRequests ← status.numberOfCurrentRequests + 1;
status.aveBackgroundLoad ← status.aveBackgroundLoad + 1;
status.FOM ← figureOfMerit[status];
};
};
};
FindService:
PUBLIC
PROC [service:
ROPE, userName:
RPC.ShortROPE]
RETURNS [found:
BOOL, instance:
RPC.ShortROPE] = {
serverName: RPC.ShortROPE;
IF ~InterfaceExported THEN ERROR RPC.CallFailed[unbound]; -- local import still works after we "UnexportInterface"
[found, instance, serverName] ← InnerFindService[service];
IF found THEN ReportControllerEvent[service, "", service, BasicTime.Now[], serverName, userName];
};
FindServiceWithQueueing:
PUBLIC
PROCEDURE [service:
ROPE, userName:
RPC.ShortROPE, version:
RPC.ShortROPE, timeToWait:
INT, clientMachineName:
RPC.ShortROPE, streamPupAddress: Pup.Address, needListener:
BOOL]
RETURNS [found:
ATOM, instance:
RPC.ShortROPE, serverPupAddress: Pup.Address, errMsg:
ROPE] = {
entry: QueueEntry;
item: WaitingRequest ← NIL;
tryForServer:
PROC [] = {
DO
commOK: BOOL;
foundBroken: BOOL;
brokenVal: REF ANY;
brokenList, brokenListLoop: LIST OF BrokenCommand;
askFound: ATOM;
queueItemReady[entry, item];
IF item.waiting = timedOut
THEN
{
item.response ← $timeOut;
EXIT;
};
[found: foundBroken, val: brokenVal] ← SymTab.Fetch[x: BrokenCommandTable, key: service];
brokenList ← NARROW[brokenVal];
FOR brokenListLoop ← brokenList, brokenListLoop.rest
UNTIL brokenListLoop =
NIL
DO
IF Rope.Equal[brokenListLoop.first.serverMachineName, item.instance,
FALSE]
THEN {
item.serverAssigned ← FALSE;
item.waiting ← true;
EXIT;
};
ENDLOOP;
IF ~item.serverAssigned THEN LOOP;
[communicationsOK: commOK, askFound: askFound, serverPupAddress: serverPupAddress] ← CheckItem [entry, item];
IF ~commOK
OR (askFound # $foundOK)
THEN {
item.serverAssigned ← FALSE;
item.waiting ← true;
LOOP;
};
item.response ← askFound;
item.serverPupAddress ← serverPupAddress;
SELECT askFound
FROM
$notFound => {
removeQueueItem[entry, item];
errMsg ← NIL;
RETURN;
};
$foundButTooBusy => {
LOOP;
};
$foundOK => {
removeQueueItem[entry, item];
RETURN;
};
ENDCASE;
ENDLOOP;
removeQueueItem[entry, item];
};
IF ~InterfaceExported THEN ERROR RPC.CallFailed[unbound]; -- local import still works after we "UnexportInterface"
[entry, item] ← findOrCreateQueueEntry[service];
item.version ← version;
item.clientMachineName ← clientMachineName;
item.streamPupAddress ← streamPupAddress;
item.needListener ← needListener;
item.userName ← userName;
IF entry.doQueueing
THEN {
-- queueing already being done
item.process ← LOOPHOLE[Process.GetCurrent[]];
item.timeout ← BasicTime.Update[BasicTime.Now[], timeToWait];
item.waiting ← true;
tryForServer[];
bumpBestServerStats[item.instance];
IF item.response = $foundOK THEN ReportControllerEvent[service, "", service, BasicTime.Now[], item.instance, userName];
RETURN[found: item.response, instance: item.instance, serverPupAddress: item.serverPupAddress, errMsg: NIL];
}
ELSE {
-- queueing not started
serversTried: LIST OF RPC.ShortROPE ← NIL;
DO
commOK: BOOL;
serviceFound: BOOL;
instance: RPC.ShortROPE;
askFound: ATOM;
entry.doQueueing ← TRUE;
setNewQueueing[];
[serviceFound, instance] ← InnerFindService[service];
item.instance ← instance;
IF ~serviceFound
THEN {
found ← $notFound;
removeQueueItem[entry, item];
RETURN;
};
FOR oldServers:
LIST
OF
RPC.ShortROPE ← serversTried, oldServers.rest
UNTIL oldServers =
NIL
DO
IF Rope.Equal[oldServers.first, instance,
FALSE]
THEN {
We are about to retry the same server again. Give up probing, and start queueing.
serversTried ← NIL; -- help garbage collector
entry.doQueueing ← TRUE;
setNewQueueing[];
item.process ← LOOPHOLE[Process.GetCurrent[]];
item.timeout ← BasicTime.Update[BasicTime.Now[], timeToWait];
item.waiting ← true;
tryForServer[];
IF item.response = $foundOK THEN ReportControllerEvent[service, "", service, BasicTime.Now[], item.instance, userName];
RETURN[found: item.response, instance: item.instance, serverPupAddress: item.serverPupAddress, errMsg: NIL];
};
ENDLOOP;
serversTried ← CONS[instance, serversTried];
[communicationsOK: commOK, askFound: askFound, serverPupAddress: serverPupAddress] ← CheckItem[entry, item];
IF ~commOK THEN LOOP;
SELECT askFound
FROM
$notFound => {
removeQueueItem[entry, item];
RETURN [askFound, instance, serverPupAddress, NIL];
};
$foundButTooBusy => {
LOOP;
};
$foundOK => {
removeQueueItem[entry, item];
ReportControllerEvent[service, "", service, BasicTime.Now[], instance, userName];
RETURN [askFound, instance, serverPupAddress, NIL];
};
ENDCASE;
ENDLOOP;
};
};
CheckItem:
PROC [entry: QueueEntry, item: WaitingRequest]
RETURNS [communicationsOK:
BOOL ←
TRUE, askFound:
ATOM, serverPupAddress: Pup.Address, errMsg:
ROPE] = {
checkProcess: PROCESS;
checkObject: REF CheckItemProcessObj ← NEW[CheckItemProcessObj];
startTime: BasicTime.GMT ← BasicTime.Now[];
checkProcess ← FORK CheckItemProcess[entry, item, checkObject, item.userName];
DO
IF BasicTime.Period[startTime, BasicTime.Now[]] > 9
THEN
TRUSTED {
Remote machine is not unresponsive, but will not answer the call
checkProcessTV: AMProcess.Process = AMProcess.PSBIToTV[world: WorldVM.LocalWorld[], psbi: PrincOpsUtils.ProcessToPsbIndex[LOOPHOLE[checkProcess]]];
Detach so we can return. Freeze, abort and thaw the process. After a long time (minutes if ever), the process will finally wake up and go away. Also, if the remote host stops answering the ping, the call will fail and the process will exit.
Process.Detach[checkProcess];
AMProcess.Freeze[LIST[checkProcessTV]];
AMProcess.Abort[checkProcessTV];
AMProcess.Thaw[LIST[checkProcessTV]];
checkObject.check ← aborted;
RETURN[FALSE, $notFound, Pup.nullAddress, ""];
};
IF checkObject.check = done
THEN
TRUSTED {
[] ← JOIN checkProcess;
RETURN[checkObject.communicationsOK, checkObject.askFound, checkObject.serverPupAddress, checkObject.errMsg];
};
Process.Pause[2];
ENDLOOP;
};
CheckItemProcess:
PROC [entry: QueueEntry, item: WaitingRequest, checkObject:
REF CheckItemProcessObj, userName:
ROPE] = {
ENABLE ABORTED => GOTO aborted;
communicationsOK: BOOL ← TRUE;
askFound: ATOM;
serverPupAddress: Pup.Address;
errMsg: ROPE;
firstTime: BOOL ← TRUE;
serverInstance: ROPE ← item.instance;
DO
serverInterface: ComputeServerRpcControl.InterfaceRecord;
serverInterface ← getServerInterfaceFromCache[serverInstance];
IF serverInterface = NIL THEN {communicationsOK ← FALSE; RETURN};
[found: askFound, serverPupAddress: serverPupAddress, errMsg: errMsg] ← serverInterface.AskForService[service: entry.service, version: item.version, clientMachineName: item.clientMachineName, userName: userName
!
RPC.CallFailed => {
[] ← deleteServerInterfaceFromCache[serverInterface];
IF firstTime THEN {firstTime ← FALSE; LOOP;};
communicationsOK ← FALSE;
EXIT;
};
];
EXIT;
ENDLOOP;
checkObject.check ← done;
checkObject.communicationsOK ← communicationsOK;
checkObject.askFound ← askFound;
checkObject.serverPupAddress ← serverPupAddress;
checkObject.errMsg ← errMsg;
EXITS
aborted => {
checkObject.check ← aborted;
RETURN;
};
};
InnerFindService:
ENTRY
PROC [service:
ROPE]
RETURNS [found:
BOOL, instance:
RPC.ShortROPE, serverMachineName:
RPC.ShortROPE] = {
ENABLE UNWIND => NULL;
statusList: LIST OF ServerStatus;
bestStatus: ServerStatus ← NIL;
foundCommand: BOOL;
[found: foundCommand] ← SymTab.Fetch[x: ComputeServerInternal.CommandTable, key: service];
IF foundCommand
THEN {
command should be out on (almost) every server
foundBroken: BOOL;
brokenVal: REF ANY;
brokenList: LIST OF BrokenCommand;
[found: foundBroken, val: brokenVal] ← SymTab.Fetch[x: BrokenCommandTable, key: service];
brokenList ← NARROW[brokenVal];
FOR statusList ← ServerStatusList, statusList.rest
UNTIL statusList =
NIL
DO
skipIt: BOOL ← FALSE;
serverMachineName: ROPE ← statusList.first.serverMachineName;
serverMachinePupAddress: ROPE ← statusList.first.serverMachinePupAddress;
brokenListLoop: LIST OF BrokenCommand;
FOR brokenListLoop ← brokenList, brokenListLoop.rest
UNTIL brokenListLoop =
NIL
DO
IF Rope.Equal[brokenListLoop.first.serverMachineName, serverMachineName,
FALSE]
OR Rope.Equal[brokenListLoop.first.serverMachineName, serverMachinePupAddress,
FALSE]
THEN {
skipIt ← TRUE;
EXIT;
};
ENDLOOP;
IF skipIt THEN LOOP;
IF IsStatusBetter[statusList.first, bestStatus] THEN bestStatus ← statusList.first;
ENDLOOP;
}
ELSE {
command should be out on (almost) no servers
foundExtra: BOOL;
extraVal: REF ANY;
extraListLoop: LIST OF ExtraCommand;
[found: foundExtra, val: extraVal] ← SymTab.Fetch[x: ExtraCommandTable, key: service];
FOR extraListLoop ←
NARROW[extraVal], extraListLoop.rest
UNTIL extraListLoop =
NIL
DO
statusFound: BOOL;
val: REF ANY;
status: ServerStatus;
[found: statusFound, val: val] ← SymTab.Fetch[x: MachineToStatusTable, key: extraListLoop.first.serverMachineName];
IF statusFound
THEN {
status ← NARROW[val];
IF bestStatus = NIL THEN bestStatus ← status
ELSE IF IsStatusBetter[status, bestStatus] THEN bestStatus ← status;
};
ENDLOOP;
};
IF bestStatus =
NIL
THEN {
RETURN[FALSE, "", ""]
}
ELSE {
bestStatus.CPULoad ← MIN[ 1.0, bestStatus.CPULoad + 0.5];
bestStatus.reclamationRate ← bestStatus.reclamationRate + 30 ;
bestStatus.FOM ← figureOfMerit[bestStatus];
RETURN[TRUE, bestStatus.serverMachinePupAddress, bestStatus.serverMachineName];
};
};
figureOfMerit:
PROC [status: ServerStatus]
RETURNS [
FOM:
REAL] = {
expectedPercentAvailable: REAL;
adjustedBusyCPU: REAL = IF status.machineType = dorado THEN status.nonBackgroundCPULoad ELSE 0.8 + (status.nonBackgroundCPULoad/5.0);
expectedPercentAvailable ← (1.0 - adjustedBusyCPU) / ((IF status.aveBackgroundLoad < 0.1 THEN Real.Float[status.numberOfCurrentRequests] ELSE status.aveBackgroundLoad) + 1);
FOM ← 1.0 - expectedPercentAvailable;
};
CommandUnavailable:
PUBLIC ENTRY
PROC [serverMachineName:
RPC.ShortROPE, commandName:
ROPE, version:
RPC.ShortROPE] = {
ENABLE UNWIND => NULL;
found: BOOL;
oldList: LIST OF BrokenCommand;
newBroken: BrokenCommand;
newList: LIST OF BrokenCommand;
machineList: LIST OF BrokenCommand;
val: REF ANY;
IF ~InterfaceExported THEN ERROR RPC.CallFailed[unbound]; -- local import still works after we "UnexportInterface"
[found: found, val: val] ← SymTab.Fetch[x: BrokenCommandTable, key: commandName];
IF found
THEN {
oldList ← NARROW[val];
FOR machineList ← oldList, machineList.rest
UNTIL machineList =
NIL
DO
broken: BrokenCommand = machineList.first;
IF Rope.Equal[broken.serverMachineName, serverMachineName]
THEN {
versionList: LIST OF ROPE;
FOR versionList ← broken.versions, versionList.rest
UNTIL versionList =
NIL
DO
vers: ROPE ← NARROW[versionList.first];
IF Rope.Equal[vers, version] THEN RETURN;
ENDLOOP;
broken.versions ← CONS[version, broken.versions];
RETURN;
};
ENDLOOP;
newBroken ← NEW[BrokenCommandObject ← [serverMachineName: serverMachineName, versions: LIST[version]]];
newList ← CONS[newBroken, NARROW[val]];
[] ← SymTab.Store[x: BrokenCommandTable, key: commandName, val: newList];
}
ELSE {
newBroken ← NEW[BrokenCommandObject ← [serverMachineName: serverMachineName, versions: LIST[version]]];
newList ← LIST[newBroken];
[] ← SymTab.Store[x: BrokenCommandTable, key: commandName, val: newList];
};
};
ExtraCommandAvailable:
PUBLIC
ENTRY
PROC [serverMachineName:
RPC.ShortROPE, commandName:
ROPE, version:
RPC.ShortROPE] = {
ENABLE UNWIND => NULL;
found: BOOL;
oldList: LIST OF ExtraCommand;
newExtra: ExtraCommand;
newList: LIST OF ExtraCommand;
machineList: LIST OF ExtraCommand;
val: REF ANY;
IF ~InterfaceExported
THEN
ERROR
RPC.CallFailed[unbound];
-- local import still works after we "UnexportInterface"
See if this "Extra" reverses a previous broken
[found: found, val: val] ← SymTab.Fetch[x: BrokenCommandTable, key: commandName];
IF found
THEN {
[] ← SymTab.Delete[x: BrokenCommandTable, key: commandName];
};
[found: found, val: val] ← SymTab.Fetch[x: ExtraCommandTable, key: commandName];
IF found
THEN {
oldList ← NARROW[val];
FOR machineList ← oldList, machineList.rest
UNTIL machineList =
NIL
DO
extra: ExtraCommand = machineList.first;
IF Rope.Equal[extra.serverMachineName, serverMachineName]
THEN {
versionList: LIST OF ROPE;
FOR versionList ← extra.versions, versionList.rest
UNTIL versionList =
NIL
DO
vers: ROPE ← NARROW[versionList.first];
IF Rope.Equal[vers, version] THEN RETURN;
ENDLOOP;
extra.versions ← CONS[version, extra.versions];
RETURN;
};
ENDLOOP;
newExtra ← NEW[ExtraCommandObject ← [serverMachineName: serverMachineName, versions: LIST[version]]];
newList ← CONS[newExtra, NARROW[val]];
[] ← SymTab.Store[x: ExtraCommandTable, key: commandName, val: newList];
}
ELSE {
newExtra ← NEW[ExtraCommandObject ← [serverMachineName: serverMachineName, versions: LIST[version]]];
newList ← LIST[newExtra];
[] ← SymTab.Store[x: ExtraCommandTable, key: commandName, val: newList];
};
};
NoticeNewPackage:
PUBLIC
ENTRY
PROC [package:
RPC.ShortROPE]
RETURNS [error:
BOOL ←
FALSE, tryDifferentController:
BOOL ←
FALSE, msg:
ROPE ←
NIL]= {
ENABLE UNWIND => NULL;
statusList: LIST OF ServerStatus;
packageListInStream: IO.STREAM;
packageListGlobalName: ROPE ← NIL;
packageListOutStream: IO.STREAM;
packageListTempName: ROPE ← NIL;
foundItem: BOOL ← FALSE;
fullFName: ROPE;
fsErrMsg: ROPE;
cp: FS.ComponentPositions;
dirOmitted: BOOLEAN;
dfPrefix: ROPE;
shortPackage: ROPE ← NIL;
shortPackageAndDF: ROPE ← NIL;
packageDFDate: BasicTime.GMT ← BasicTime.nullGMT;
sawDir: BOOL ← FALSE;
blankAfterDir: BOOL ← FALSE;
lastDir: REF DFUtilities.DirectoryItem ← NIL;
wroteDir: BOOL ← FALSE;
defaultDirectory: REF DFUtilities.DirectoryItem ← NIL ;
whiteItem: REF DFUtilities.WhiteSpaceItem ← NEW[DFUtilities.WhiteSpaceItem ← [1]];
writeUpdatedItem:
PROC[] = {
file: REF DFUtilities.FileItem;
file ←
NEW[DFUtilities.FileItem ← [
name: shortPackageAndDF, -- a short name
date: [explicit, packageDFDate],
verifyRoot: FALSE]];
IF lastDir =
NIL
OR ~Rope.Equal[lastDir.path1, dfPrefix,
FALSE]
THEN {
dir:
REF DFUtilities.DirectoryItem ←
NEW [DFUtilities.DirectoryItem ← [
path1: dfPrefix,
path2: NIL,
path2IsCameFrom: FALSE,
exported: FALSE,
readOnly: FALSE
]];
DFUtilities.WriteItemToStream[out: packageListOutStream, item: whiteItem];
DFUtilities.WriteItemToStream[out: packageListOutStream, item: dir];
DFUtilities.WriteItemToStream[out: packageListOutStream, item: whiteItem];
lastDir ← dir;
};
DFUtilities.WriteItemToStream[out: packageListOutStream, item: file];
};
DoOneItem: DFUtilities.ProcessItemProc = {
PROC [item: REF ANY] RETURNS [stop: BOOL ← FALSE]
errors, warnings, filesActedUpon: INT ← 0;
WITH item
SELECT
FROM
dir:
REF DFUtilities.DirectoryItem => {
IF lastDir =
NIL
OR ~Rope.Equal[lastDir.path1, dir.path1,
FALSE]
THEN {
wroteDir ← FALSE;
lastDir ← dir;
};
};
comment:
REF DFUtilities.CommentItem => {
DFUtilities.WriteItemToStream[out: packageListOutStream, item: comment];
};
white:
REF DFUtilities.WhiteSpaceItem => {
};
file:
REF DFUtilities.FileItem => {
shortName: ROPE = Rope.Substr[file.name, 0, Rope.Index[s1: file.name, s2: "!"]];
packageDFDate: BasicTime.GMT ← BasicTime.nullGMT;
currentDFDate: BasicTime.GMT ← BasicTime.nullGMT;
IF ~wroteDir
THEN {
wroteDir ← TRUE;
IF lastDir = NIL THEN lastDir ← defaultDirectory;
DFUtilities.WriteItemToStream[out: packageListOutStream, item: whiteItem];
DFUtilities.WriteItemToStream[out: packageListOutStream, item: lastDir];
DFUtilities.WriteItemToStream[out: packageListOutStream, item: whiteItem];
};
IF Rope.Equal[shortPackageAndDF, shortName,
FALSE]
THEN {
writeUpdatedItem[];
foundItem ← TRUE;
}
ELSE {
DFUtilities.WriteItemToStream[out: packageListOutStream, item: file];
};
};
ENDCASE;
};
{
ENABLE
UNWIND => {
IF packageListInStream # NIL THEN packageListInStream.Close[! FS.Error => CONTINUE];
IF packageListOutStream # NIL THEN packageListOutStream.Close[! FS.Error => CONTINUE];
FS.Delete[name: packageListTempName ! FS.Error => CONTINUE];
};
IF package.IsEmpty[] THEN RETURN [ TRUE, FALSE, "null package name"];
IF ~ComputeControllerInternal.IAmTheController THEN RETURN [ FALSE, TRUE, "controller has moved"];
WHILE ComputeControllerInternal.IAmTheController
AND
BasicTime.Period[ComputeControllerInternal.TimeIBecameTheController, BasicTime.Now[]] < 22
DO
WAIT NoticeNewPackageCondition;
ENDLOOP;
IF ~ComputeControllerInternal.IAmTheController THEN RETURN [ FALSE, TRUE, "controller has moved"];
[fullFName: fullFName, cp: cp, dirOmitted: dirOmitted] ← FS.ExpandName[name: package, wDir: NIL ! FS.Error => GOTO badPackageName ];
shortPackage ← fullFName.Substr[cp.base.start, cp.base.length];
shortPackageAndDF ← Rope.Concat[shortPackage, ".df"];
dfPrefix ← fullFName.Substr[0, cp.base.start];
defaultDirectory ←
NEW [DFUtilities.DirectoryItem ← [
path1: dfPrefix,
path2: NIL,
path2IsCameFrom: FALSE,
exported: FALSE,
readOnly: FALSE
]];
[created: packageDFDate] ← FS.FileInfo[name: package, remoteCheck: TRUE ! FS.Error => GOTO noDFFile];
packageListGlobalName ← Rope.Concat[ComputeServerInternal.RemoteCommandDir, "PackageList"];
packageListInStream ← FS.StreamOpen[packageListGlobalName
! FS.Error => GOTO cantOpenPackageList];
packageListTempName ← Rope.Concat[ComputeServerInternal.LocalCommandDir, "PackageList.temp"];
packageListOutStream ← FS.StreamOpen[fileName: packageListTempName, accessOptions: $create];
DFUtilities.ParseFromStream[packageListInStream, DoOneItem ! DFUtilities.SyntaxError => GOTO syntaxErrorInPackageList];
IF ~foundItem THEN writeUpdatedItem[];
packageListInStream.Close[! FS.Error => CONTINUE];
packageListOutStream.Close[! FS.Error => CONTINUE];
[] ← FS.Copy[from: packageListTempName, to: packageListGlobalName, attach: FALSE ! FS.Error => { fsErrMsg ← error.explanation; GOTO cantCopyToServer}];
FS.Delete[name: packageListTempName ! FS.Error => CONTINUE];
IF ~ComputeControllerInternal.IAmTheController THEN RETURN [ FALSE, TRUE, "controller has moved"];
InternalReportControllerEvent[type: newPackage, reason: NIL, command: shortPackage, time: BasicTime.Now[], remoteMachineName: NIL, userName: NIL];
FOR statusList ← ComputeControllerInternal.ServerStatusList, statusList.rest
UNTIL statusList =
NIL
DO
statusList.first.newPackage ← TRUE;
ENDLOOP;
EXITS
cantOpenPackageList => RETURN [ TRUE, FALSE, "controller could not open package list"];
noDFFile => RETURN [ TRUE, FALSE, "controller could not open specified df file - "];
syntaxErrorInPackageList => RETURN [ TRUE, FALSE, "controller could not parse the package list - call an implementor"];
badPackageName => RETURN [ TRUE, FALSE, "bad package df file name"];
cantCopyToServer => RETURN [ TRUE, FALSE, Rope.Concat["could not copy new package list to the file server because ", fsErrMsg]];
};
};
RemoveOldPackage:
PUBLIC
ENTRY PROC [package:
RPC.ShortROPE]
RETURNS [error:
BOOL, tryDifferentController:
BOOL, msg: Rope.
ROPE] = {
ENABLE UNWIND => NULL;
statusList: LIST OF ServerStatus;
packageListInStream: IO.STREAM;
packageListGlobalName: ROPE ← NIL;
packageListOutStream: IO.STREAM;
packageListTempName: ROPE ← NIL;
foundItem: BOOL ← FALSE;
fullFName: ROPE;
cp: FS.ComponentPositions;
dirOmitted: BOOLEAN;
dfPrefix: ROPE;
shortPackage: ROPE ← NIL;
shortPackageAndDF: ROPE ← NIL;
wroteDir: BOOL ← FALSE;
lastDir: REF DFUtilities.DirectoryItem ← NIL;
defaultDirectory: REF DFUtilities.DirectoryItem ← NIL ;
whiteItem: REF DFUtilities.WhiteSpaceItem ← NEW[DFUtilities.WhiteSpaceItem ← [1]];
DoOneItem: DFUtilities.ProcessItemProc = {
PROC [item: REF ANY] RETURNS [stop: BOOL ← FALSE]
errors, warnings, filesActedUpon: INT ← 0;
WITH item
SELECT
FROM
dir:
REF DFUtilities.DirectoryItem => {
IF lastDir =
NIL
OR ~Rope.Equal[lastDir.path1, dir.path1,
FALSE]
THEN {
DFUtilities.WriteItemToStream[out: packageListOutStream, item: dir];
wroteDir ← FALSE;
lastDir ← dir;
};
};
comment:
REF DFUtilities.CommentItem => {
DFUtilities.WriteItemToStream[out: packageListOutStream, item: comment];
};
white:
REF DFUtilities.WhiteSpaceItem => {
DFUtilities.WriteItemToStream[out: packageListOutStream, item: white];
};
file:
REF DFUtilities.FileItem => {
shortName: ROPE = Rope.Substr[file.name, 0, Rope.Index[s1: file.name, s2: "!"]];
currentDFDate: BasicTime.GMT ← BasicTime.nullGMT;
IF Rope.Equal[shortPackageAndDF, shortName,
FALSE]
THEN {
foundItem ← TRUE;
}
ELSE {
IF ~wroteDir
THEN {
wroteDir ← TRUE;
IF lastDir = NIL THEN lastDir ← defaultDirectory;
DFUtilities.WriteItemToStream[out: packageListOutStream, item: whiteItem];
DFUtilities.WriteItemToStream[out: packageListOutStream, item: lastDir];
DFUtilities.WriteItemToStream[out: packageListOutStream, item: whiteItem];
};
DFUtilities.WriteItemToStream[out: packageListOutStream, item: file];
};
};
ENDCASE;
};
{
ENABLE
UNWIND => {
IF packageListInStream # NIL THEN packageListInStream.Close[! FS.Error => CONTINUE];
IF packageListOutStream # NIL THEN packageListOutStream.Close[! FS.Error => CONTINUE];
FS.Delete[name: packageListTempName ! FS.Error => CONTINUE];
};
IF package.IsEmpty[] THEN RETURN [ TRUE, FALSE, "null package name"];
IF ~ComputeControllerInternal.IAmTheController THEN RETURN [ FALSE, TRUE, "controller has moved"];
WHILE ComputeControllerInternal.IAmTheController
AND
BasicTime.Period[ComputeControllerInternal.TimeIBecameTheController, BasicTime.Now[]] < 22
DO
WAIT NoticeNewPackageCondition;
ENDLOOP;
IF ~ComputeControllerInternal.IAmTheController THEN RETURN [ FALSE, TRUE, "controller has moved"];
[fullFName: fullFName, cp: cp, dirOmitted: dirOmitted] ← FS.ExpandName[name: package, wDir: NIL ! FS.Error => GOTO badPackageName ];
shortPackage ← fullFName.Substr[cp.base.start, cp.base.length];
shortPackageAndDF ← Rope.Concat[shortPackage, ".df"];
dfPrefix ← fullFName.Substr[0, cp.base.start];
defaultDirectory ←
NEW [DFUtilities.DirectoryItem ← [
path1: dfPrefix,
path2: NIL,
path2IsCameFrom: FALSE,
exported: FALSE,
readOnly: FALSE
]];
packageListGlobalName ← Rope.Concat[ComputeServerInternal.RemoteCommandDir, "PackageList"];
packageListInStream ← FS.StreamOpen[packageListGlobalName
! FS.Error => GOTO cantOpenPackageList];
packageListTempName ← Rope.Concat[ComputeServerInternal.LocalCommandDir, "PackageList.temp"];
packageListOutStream ← FS.StreamOpen[fileName: packageListTempName, accessOptions: $create];
DFUtilities.ParseFromStream[packageListInStream, DoOneItem ! DFUtilities.SyntaxError => GOTO syntaxErrorInPackageList];
packageListInStream.Close[! FS.Error => CONTINUE];
packageListOutStream.Close[! FS.Error => CONTINUE];
IF ~foundItem
THEN {
FS.Delete[name: packageListTempName ! FS.Error => CONTINUE];
RETURN [ TRUE, FALSE, "package not found"]
};
[] ← FS.Copy[from: packageListTempName, to: packageListGlobalName, attach: FALSE];
FS.Delete[name: packageListTempName ! FS.Error => CONTINUE];
IF ~ComputeControllerInternal.IAmTheController THEN RETURN [ FALSE, TRUE, "controller has moved"];
FOR statusList ← ComputeControllerInternal.ServerStatusList, statusList.rest
UNTIL statusList =
NIL
DO
statusList.first.newPackage ← TRUE;
ENDLOOP;
EXITS
cantOpenPackageList => RETURN [ TRUE, FALSE, "controller could not open package list"];
syntaxErrorInPackageList => RETURN [ TRUE, FALSE, "controller could not parse the package list - call an implementor"];
badPackageName => RETURN [ TRUE, FALSE, "bad package df file name"];
};
};
NewStats:
PUBLIC
ENTRY
PROC [serverMachineName:
RPC.ShortROPE, serverMachinePupAddress:
RPC.ShortROPE, serverUP:
BOOL, firstCall:
BOOL, machineType: PrincOps.MachineType, mainMemory:
CARDINAL, numberCPUs:
CARDINAL, diskPartionSize:
INT, freePagesOnDisk:
INT, freeboard:
INT, freeGFI:
CARDINAL, freeMDS:
CARDINAL, freeVM:
CARDINAL, oldestLRUFileDate: BasicTime.
GMT, CPULoad:
REAL, nonBackgroundCPULoad:
REAL, reclamationRate:
REAL, freeProcesses:
CARDINAL, userName:
RPC.ShortROPE, currentRequests:
LIST
OF ComputeServer.Request, aveBackgroundLoad:
REAL]
RETURNS [terminateService, newPackage:
BOOL ←
FALSE, queueingCommands:
LIST
OF
ROPE ←
NIL] = {
ENABLE UNWIND => NULL;
statusList: LIST OF ServerStatus;
status: ServerStatus ← NIL ;
requests: INT ← 0;
lastStatusList: LIST OF ServerStatus ← NIL ;
IF ~InterfaceExported THEN ERROR RPC.CallFailed[unbound]; -- local import still works after we "UnexportInterface"
FOR cr:
LIST
OF ComputeServer.Request ← currentRequests, cr.rest
UNTIL cr =
NIL
DO
requests ← requests + 1;
ENDLOOP;
FOR statusList ← ServerStatusList, statusList.rest
UNTIL statusList =
NIL
DO
IF Rope.Equal[statusList.first.serverMachineName, serverMachineName]
THEN {
status ← statusList.first;
IF ~serverUP
THEN {
IF lastStatusList = NIL THEN ServerStatusList ← statusList.rest
ELSE lastStatusList.rest ← statusList.rest;
}
ELSE {
newPackage ← status.newPackage;
status.newPackage ← FALSE;
status.freePagesOnDisk ← freePagesOnDisk;
status.freeboard ← freeboard;
status.freeGFI ← freeGFI;
status.freeMDS ← freeMDS;
status.freeVM ← freeVM;
status.oldestLRUFileDate ← oldestLRUFileDate;
status.CPULoad ← CPULoad;
status.nonBackgroundCPULoad ← nonBackgroundCPULoad;
status.reclamationRate ← reclamationRate;
status.freeProcesses ← freeProcesses;
status.userName ← userName;
status.currentRequests ← currentRequests;
status.aveBackgroundLoad ← aveBackgroundLoad;
status.numberOfCurrentRequests ← requests;
status.FOM ← figureOfMerit[status: status];
status.timeOfStatus ← BasicTime.Now[];
IF ~SymTab.Fetch[x: MachineToStatusTable, key: serverMachineName].found
THEN {
[] ← SymTab.Store[x: MachineToStatusTable, key: serverMachineName, val: status];
[] ← SymTab.Store[x: MachineToStatusTable, key: serverMachinePupAddress, val: status];
};
};
EXIT;
};
lastStatusList ← statusList;
REPEAT
FINISHED => status ← NIL;
ENDLOOP;
IF status =
NIL
AND serverUP
THEN {
status ←
NEW[ServerStatusRecord ← [
serverMachineName: serverMachineName,
serverMachinePupAddress: serverMachinePupAddress,
timeOfStatus: BasicTime.Now[],
machineType: machineType,
mainMemory: mainMemory,
numberCPUs: numberCPUs,
diskPartionSize:diskPartionSize,
freePagesOnDisk: freePagesOnDisk,
freeboard: freeboard,
freeGFI: freeGFI,
freeMDS: freeMDS,
freeVM: freeVM,
oldestLRUFileDate: oldestLRUFileDate,
CPULoad: CPULoad,
nonBackgroundCPULoad: nonBackgroundCPULoad,
reclamationRate: reclamationRate,
numberOfCurrentRequests: requests,
freeProcesses: freeProcesses,
userName: userName,
currentRequests: currentRequests,
aveBackgroundLoad: aveBackgroundLoad
]];
status.FOM ← figureOfMerit[status: status];
[] ← SymTab.Store[x: MachineToStatusTable, key: serverMachineName, val: status];
[] ← SymTab.Store[x: MachineToStatusTable, key: serverMachinePupAddress, val: status];
ServerStatusList ← CONS[status, ServerStatusList];
InternalReportControllerEvent[type: newServer, reason: NIL, command: NIL, time: BasicTime.Now[], remoteMachineName: serverMachineName, userName: userName];
};
IF status #
NIL
AND status.newQueueing
THEN {
status.newQueueing ← FALSE;
FOR l:
LIST
OF QueueEntry ← WaitingQueue, l.rest
UNTIL l =
NIL
DO
IF l.first.doQueueing THEN queueingCommands ← CONS[l.first.service, queueingCommands];
ENDLOOP;
};
IF firstCall
THEN {
CleanBroken: SymTab.EachPairAction = {
PROC [key: Key, val: Val] RETURNS [quit: BOOL];
ENABLE UNWIND => NULL;
brokenList, newBrokenList, brokenListLoop: LIST OF BrokenCommand ← NIL;
changed: BOOL ← FALSE;
brokenList ← NARROW[val];
FOR brokenListLoop ← brokenList, brokenListLoop.rest
UNTIL brokenListLoop =
NIL
DO
IF Rope.Equal[brokenListLoop.first.serverMachineName, serverMachinePupAddress,
FALSE]
THEN {
changed ← TRUE;
LOOP;
}
ELSE {
newBrokenList ← CONS[brokenListLoop.first, newBrokenList];
};
ENDLOOP;
IF changed THEN [] ← SymTab.Store[x: BrokenCommandTable, key: key, val: newBrokenList];
RETURN[FALSE];
};
CleanExtra: SymTab.EachPairAction = {
PROC [key: Key, val: Val] RETURNS [quit: BOOL];
ENABLE UNWIND => NULL;
extraList, newExtraList, extraListLoop: LIST OF ExtraCommand ← NIL;
changed: BOOL ← FALSE;
extraList ← NARROW[val];
FOR extraListLoop ← extraList, extraListLoop.rest
UNTIL extraListLoop =
NIL
DO
IF Rope.Equal[extraListLoop.first.serverMachineName, serverMachinePupAddress,
FALSE]
THEN {
changed ← TRUE;
LOOP;
}
ELSE {
newExtraList ← CONS[extraListLoop.first, newExtraList];
};
ENDLOOP;
IF changed THEN [] ← SymTab.Store[x: ExtraCommandTable, key: key, val: newExtraList];
RETURN[FALSE];
};
[] ← SymTab.Pairs[x: BrokenCommandTable, action: CleanBroken];
[] ← SymTab.Pairs[x: ExtraCommandTable, action: CleanExtra];
};
};
getServerInterfaceFromCache:
ENTRY
PROC [serverInstance:
RPC.ShortROPE]
RETURNS [serverInterface: ComputeServerRpcControl.InterfaceRecord ←
NIL] = {
ENABLE UNWIND => NULL;
newServerInterface: ComputeServerRpcControl.InterfaceRecord ← NIL;
bestIndex: INT ← 0;
bestTime: BasicTime.GMT ← BasicTime.latestGMT;
now: BasicTime.GMT ← BasicTime.Now[];
FOR index:
INT
IN [0..serverInterfaceCacheSize)
DO
IF Rope.Equal[serverInstance, serverInterfaceCache[index].serverInstance]
THEN {
newServerInterface ← serverInterfaceCache[index].interface;
serverInterfaceCache[index].lastUsed ← now;
RETURN[serverInterfaceCache[index].interface];
};
IF BasicTime.Period[bestTime, serverInterfaceCache[index].lastUsed] < 0
THEN {
bestTime ← serverInterfaceCache[index].lastUsed;
bestIndex ← index;
};
ENDLOOP;
newServerInterface ← ComputeServerRpcControl.ImportNewInterface[
interfaceName: [ type: "ComputeServer.summoner", instance: serverInstance, version: [1,1]]
!
RPC.ImportFailed => {
CONTINUE;
};
];
IF newServerInterface #
NIL
THEN {
serverInterfaceCache[bestIndex].lastUsed ← now;
serverInterfaceCache[bestIndex].serverInstance ← serverInstance;
serverInterfaceCache[bestIndex].interface ← newServerInterface;
};
RETURN[newServerInterface];
};
deleteServerInterfaceFromCache:
ENTRY
PROC [serverInterface: ComputeServerRpcControl.InterfaceRecord]
RETURNS [gotIt:
BOOL ←
FALSE] = {
ENABLE UNWIND => NULL;
FOR index:
INT
IN [0..serverInterfaceCacheSize)
DO
IF serverInterfaceCache[index].interface = serverInterface
THEN {
serverInterfaceCache[index].serverInstance ← NIL;
serverInterfaceCache[index].interface ← NIL;
serverInterfaceCache[index].lastUsed ← BasicTime.earliestGMT;
gotIt ← TRUE;
EXIT;
};
ENDLOOP;
};
findOrCreateQueueEntry:
ENTRY
PROC [service:
ROPE]
RETURNS [entry: QueueEntry ←
NIL, item: WaitingRequest] = {
ENABLE UNWIND => NULL;
item ← NEW[WaitingRequestRec ];
FOR queue:
LIST
OF QueueEntry ← WaitingQueue, queue.rest
UNTIL queue =
NIL
DO
IF Rope.Equal[queue.first.service, service,
FALSE]
THEN {
entry ← queue.first;
IF entry.waiters = NIL THEN entry.waiters ← CONS[item, NIL]
ELSE
FOR l:
LIST
OF WaitingRequest ← entry.waiters, l.rest
UNTIL l =
NIL
DO
IF l.rest = NIL THEN {l.rest ← CONS[item, NIL]; EXIT};
ENDLOOP;
RETURN; -- existing queue
};
ENDLOOP;
WaitingQueue ←
CONS[
entry ← NEW[QueueEntryRec ← [service: service, waiters: LIST[item]]],
WaitingQueue
];
TRUSTED {Process.InitializeCondition[@entry.changed, Process.SecondsToTicks[1]];};
};
findQueuedServers:
ENTRY
PROC [service:
ROPE]
RETURNS [queuedCommand:
BOOL, servers:
LIST OF ROPE] ={
FOR queue:
LIST
OF QueueEntry ← WaitingQueue, queue.rest
UNTIL queue =
NIL
DO
IF Rope.Equal[queue.first.service, service,
FALSE]
THEN {
FOR lor:
LIST
OF
ROPE ← queue.first.servers, lor.rest
UNTIL lor =
NIL
DO
servers ← CONS[lor.first, servers];
ENDLOOP;
RETURN [TRUE, servers]; -- existing queue
};
ENDLOOP;
RETURN [FALSE, NIL];
};
setNewQueueing:
ENTRY
PROC [] = {
ENABLE UNWIND => NULL;
FOR l:
LIST
OF ServerStatus ← ServerStatusList, l.rest
UNTIL l =
NIL
DO
l.first.newQueueing ← TRUE;
ENDLOOP;
LastReBuildQueueTime ← BasicTime.Now[];
};
queueItemReady:
ENTRY
PROC [entry: QueueEntry, item: WaitingRequest]= {
ENABLE UNWIND => NULL;
firstTime: BOOL ← TRUE;
item.waiting ← true;
DO
IF BasicTime.Period[from: item.timeout, to: BasicTime.Now[]] > 0 THEN {item.waiting ← timedOut; RETURN};
SELECT item.waiting
FROM
true => {
IF entry.servers #
NIL
AND entry.waiters.first = item
THEN {
bestStatus: ServerStatus ← NIL;
prev, prevBest: LIST OF ROPE ← NIL;
best: LIST OF ROPE ← NIL;
serv: ROPE;
FOR l:
LIST
OF
ROPE ← entry.servers, l.rest
UNTIL l =
NIL
DO
statusFound: BOOL;
val: REF ANY;
status: ServerStatus;
serv ← l.first;
[found: statusFound, val: val] ← SymTab.Fetch[x: MachineToStatusTable, key: serv];
IF statusFound
THEN {
status ← NARROW[val];
IF IsStatusBetter[status, bestStatus]
THEN {
bestStatus ← status;
best ← l;
prevBest ← prev;
};
};
prev ← l;
ENDLOOP;
IF bestStatus #
NIL
THEN {
IF prevBest =
NIL
THEN {
item.instance ← entry.servers.first;
entry.servers ← entry.servers.rest;
}
ELSE {
item.instance ← best.first;
prevBest.rest ← best.rest;
};
item.serverAssigned ← TRUE;
item.waiting ← validating;
RETURN;
};
};
};
done => {
item.serverAssigned ← TRUE;
item.waiting ← validating;
RETURN;
};
ENDCASE;
IF firstTime THEN {forceReBuildQueuing ← TRUE};
firstTime ← FALSE;
WAIT entry.changed;
ENDLOOP;
};
removeQueueItem:
ENTRY
PROC [entry: QueueEntry, item: WaitingRequest] = {
ENABLE UNWIND => NULL;
BROADCAST entry.changed;
IF entry.waiters = NIL THEN RETURN; -- actually, this is a bug
IF entry.waiters.first = item THEN {entry.waiters ← entry.waiters.rest; RETURN};
FOR l:
LIST
OF WaitingRequest ← entry.waiters, l.rest
UNTIL l.rest =
NIL
DO
IF l.rest.first = item THEN {l.rest ← l.rest.rest; RETURN};
ENDLOOP;
};
MightAcceptQueuedCommand:
PUBLIC
ENTRY
PROC [serverMachineAddress:
RPC.ShortROPE, commandName: Rope.
ROPE] = {
ENABLE UNWIND => NULL;
item: WaitingRequest ← NIL;
IF ~InterfaceExported THEN ERROR RPC.CallFailed[unbound]; -- local import still works after we "UnexportInterface"
FOR queue:
LIST
OF QueueEntry ← WaitingQueue, queue.rest
UNTIL queue =
NIL
DO
entry: QueueEntry = queue.first;
IF Rope.Equal[commandName, entry.service,
FALSE]
THEN {
IF entry.waiters =
NIL
OR entry.waiters.first.waiting # true
THEN {
-- nobody waiting now or top waiter is busy
IF entry.servers = NIL THEN entry.servers ← CONS[serverMachineAddress, NIL]
ELSE
FOR l:
LIST
OF
ROPE ← entry.servers, l.rest
UNTIL l =
NIL
DO
IF Rope.Equal[l.first , serverMachineAddress, FALSE] THEN EXIT;
IF l.rest = NIL THEN {l.rest ← CONS[serverMachineAddress, NIL]; EXIT};
ENDLOOP;
BROADCAST entry.changed;
EXIT;
}
ELSE {
entry.waiters.first.waiting ← done;
entry.waiters.first.instance ← serverMachineAddress;
BROADCAST entry.changed;
EXIT;
};
};
ENDLOOP;
};
GenericToController:
PUBLIC
PROC [requestCode:
ATOM, requestString: Rope.
ROPE]
RETURNS [resultCode:
ATOM ← NIL, resultString: Rope.
ROPE ← NIL] = {
generic call to allow for expansion without RPC interface recompilation
IF ~InterfaceExported THEN ERROR RPC.CallFailed[unbound]; -- local import still works after we "UnexportInterface"
SELECT requestCode
FROM
$CommandInfo => {
getBrokenList:
PROC
RETURNS [brokenNames:
ROPE ←
NIL, goodNames:
ROPE ←
NIL] = {
brokenVal: REF ANY;
brokenList: LIST OF BrokenCommand;
[val: brokenVal] ← SymTab.Fetch[x: BrokenCommandTable, key: requestString];
brokenList ← NARROW[brokenVal];
FOR statusList:
LIST
OF ServerStatus ← ServerStatusList, statusList.rest
UNTIL statusList =
NIL
DO
skipIt: BOOL ← FALSE;
serverMachineName: ROPE ← statusList.first.serverMachineName;
serverMachinePupAddress: ROPE ← statusList.first.serverMachinePupAddress;
brokenListLoop: LIST OF BrokenCommand;
FOR brokenListLoop ← brokenList, brokenListLoop.rest
UNTIL brokenListLoop =
NIL
DO
IF Rope.Equal[brokenListLoop.first.serverMachineName, serverMachineName,
FALSE]
OR Rope.Equal[brokenListLoop.first.serverMachineName, serverMachinePupAddress,
FALSE]
THEN {
brokenNames ← IF brokenNames = NIL THEN serverMachineName ELSE Rope.Cat[brokenNames, ", ", serverMachineName];
skipIt ← TRUE;
EXIT;
};
ENDLOOP;
IF skipIt THEN LOOP;
goodNames ← IF goodNames = NIL THEN serverMachineName ELSE Rope.Cat[goodNames, ", ", serverMachineName];
ENDLOOP;
};
result: LIST OF ROPE ← NIL;
foundCommand: BOOL;
IF Rope.IsEmpty[requestString]
THEN {
-- list all commands
normalCommands: ROPE ← NIL;
extraCommands: ROPE ← NIL;
normals: SymTab.EachPairAction = {
PROC [key: Key, val: Val] RETURNS [quit: BOOL];
ENABLE UNWIND => NULL;
normalCommands ← IF normalCommands = NIL THEN key ELSE Rope.Cat[normalCommands, ", ", key];
RETURN[FALSE];
};
extras: SymTab.EachPairAction = {
PROC [key: Key, val: Val] RETURNS [quit: BOOL];
ENABLE UNWIND => NULL;
extraCommands ← IF extraCommands = NIL THEN key ELSE Rope.Cat[extraCommands, ", ", key];
RETURN[FALSE];
};
[] ← SymTab.Pairs[x: ComputeServerInternal.CommandTable, action: normals];
[] ← SymTab.Pairs[x: ExtraCommandTable, action: extras];
resultString ← Rope.Concat["Normal commands are ", normalCommands];
IF extraCommands # NIL THEN resultString ← Rope.Cat[resultString, "; and extra commands are ", extraCommands];
RETURN [$success, resultString];
}
ELSE {
-- list a particular command
[queuedCommand: foundCommand, servers: result] ← findQueuedServers[requestString];
IF foundCommand
THEN {
-- command is queued
brokenNames: ROPE ← NIL;
brokenNames ← getBrokenList[].brokenNames;
IF result =
NIL
THEN {
resultString ← IF brokenNames = NIL THEN "Command is queued, but no servers are waiting" ELSE Rope.Concat["Command is queued, no servers are waiting, and the command is unavailable on servers ", brokenNames];
}
ELSE {
FOR lor:
LIST
OF
ROPE ← result, lor.rest
UNTIL lor =
NIL
DO
printName: ROPE;
printName ← PupName.HisName[PupName.NameLookup[lor.first, Pup.nullSocket]];
resultString ← IF resultString = NIL THEN printName ELSE Rope.Cat[resultString, ", ", printName];
ENDLOOP;
resultString ← Rope.Concat["Command is queued on servers ", resultString];
IF brokenNames # NIL THEN resultString ← Rope.Cat[resultString, "; and the command is unavailable on servers ", brokenNames];
};
resultCode ← $success;
}
ELSE {
-- command is normal
[found: foundCommand] ← SymTab.Fetch[x: ComputeServerInternal.CommandTable, key: requestString];
IF foundCommand
THEN {
command should be out on (almost) every server
goodNames: ROPE ← NIL;
brokenNames: ROPE ← NIL;
[brokenNames, goodNames] ← getBrokenList[];
SELECT
TRUE
FROM
goodNames =
NIL
AND brokenNames =
NIL => {
resultString ← "No servers are up";
};
goodNames #
NIL
AND brokenNames =
NIL => {
resultString ← "All servers should accept the command";
};
goodNames =
NIL
AND brokenNames #
NIL => {
resultString ← "All servers refuse the command (it is broken everywhere)";
};
goodNames #
NIL
AND brokenNames #
NIL => {
resultString ← Rope.Cat["Command is available on servers ", goodNames, ", but is unavailable on servers ", brokenNames];
};
ENDCASE;
}
ELSE {
command should be out on (almost) no servers
foundExtra: BOOL;
extraVal: REF ANY;
extraNames: ROPE ← NIL;
extraListLoop: LIST OF ExtraCommand;
[found: foundExtra, val: extraVal] ← SymTab.Fetch[x: ExtraCommandTable, key: requestString];
FOR extraListLoop ←
NARROW[extraVal], extraListLoop.rest
UNTIL extraListLoop =
NIL
DO
printName: ROPE;
statusFound: BOOL;
val: REF ANY;
[found: statusFound, val: val] ← SymTab.Fetch[x: MachineToStatusTable, key: extraListLoop.first.serverMachineName];
printName ← PupName.HisName[PupName.NameLookup[extraListLoop.first.serverMachineName, Pup.nullSocket]];
IF statusFound THEN extraNames ← IF extraNames = NIL THEN printName ELSE Rope.Cat[extraNames, ", ", printName];
ENDLOOP;
IF extraNames = NIL THEN resultString ← "Command not known" ELSE resultString ← Rope.Concat["Command is available on servers ", extraNames];
};
RETURN [$success, resultString];
};
};
};
$ServerLoads => {
FOR statusList:
LIST
OF ServerStatus ← ServerStatusList, statusList.rest
UNTIL statusList =
NIL
DO
thisString: Rope.ROPE ← NIL;
status: ServerStatus = statusList.first;
IF resultString # NIL THEN resultString ← Rope.Cat[resultString, ", "];
thisString ← IO.PutFR["(%5.2f, %5.2f, %5.2f, %5.2f, %g)", IO.real[status.aveBackgroundLoad], IO.real[status.nonBackgroundCPULoad], IO.real[status.CPULoad], IO.real[status.FOM], IO.int[status.numberOfCurrentRequests] ];
resultString ← Rope.Cat[resultString, status.serverMachineName, thisString];
ENDLOOP;
RETURN [$success, resultString];
};
$CurrentController => {
IF ~IAmTheController OR ControllerMovingAway THEN RETURN [$NotCurrentController, NIL] ELSE RETURN [$CurrentController, NIL];
};
ENDCASE => RETURN [$notImplemented, NIL];
};
IsStatusBetter:
PROC [status: ServerStatus, bestStatus: ServerStatus]
RETURNS [ itsBetter:
BOOL ←
FALSE] = {
IF bestStatus =
NIL
THEN {
IF (status.reclamationRate < 20)
AND (status.nonBackgroundCPULoad < 0.99 )
THEN itsBetter ←
TRUE;
}
ELSE {
IF status.FOM < bestStatus.FOM THEN itsBetter ← TRUE;
};
};
locked: BOOL ← FALSE;
lockCondition: CONDITION;
Lock:
ENTRY
PROC = {
WHILE locked
DO
WAIT lockCondition;
ENDLOOP;
locked ← TRUE;
};
UnLock:
ENTRY
PROC = {
locked ← FALSE;
NOTIFY lockCondition
};