<> <> <> <> DIRECTORY AMProcess, BasicTime, Commander, ComputeControllerInternal, ComputeServerControl, ComputeServerController, ComputeServerControllerRpcControl, ComputeServer, ComputeServerInternal, ComputeServerRpcControl, DFUtilities, FS, GVBasics, GVNames, IO, PrincOps, PrincOpsUtils, Process, PupDefs, PupTypes, Real, Rope, RPC, SummonerControllerControl, SymTab, UserCredentials, UserProfile, WorldVM; ComputeControllerImpl: CEDAR MONITOR IMPORTS AMProcess, BasicTime, Commander, ComputeControllerInternal, ComputeServerControl, ComputeServerControllerRpcControl, ComputeServerInternal, ComputeServerRpcControl, DFUtilities, FS, GVNames, IO, Process, Real, Rope, RPC, SymTab, UserCredentials, UserProfile, WorldVM EXPORTS ComputeControllerInternal, ComputeServerController, SummonerControllerControl = BEGIN ROPE: TYPE = Rope.ROPE; NoticeNewPackageCondition: CONDITION; InterfaceExported: BOOL _ FALSE; ControllerEnabled: BOOL _ FALSE; LatentControllerActive: BOOL _ TRUE; IAmTheController: PUBLIC BOOL _ FALSE; ControllerMovingAway: PUBLIC BOOL _ FALSE; GrabController: PUBLIC BOOL _ FALSE; TimeIBecameTheController: PUBLIC BasicTime.GMT _ BasicTime.nullGMT; ControllerUp: BOOL _ TRUE; ControllerGVName: ROPE _ NIL; ControllerError: TYPE = {none, import, callFailed}; LastControllerError: ControllerError _ none; LastImportFailure: RPC.ImportFailure; LastCallFailure: RPC.CallFailure; ControllerFailedTime: BasicTime.GMT; LastReBuildQueueTime: BasicTime.GMT _ BasicTime.nullGMT; ServerStatusRecord: TYPE = ComputeControllerInternal.ServerStatusRecord; ServerStatus: TYPE = REF ServerStatusRecord; ServerStatusList: PUBLIC LIST OF ServerStatus _ NIL; BrokenCommandTable: SymTab.Ref; -- knowing a command, find where its broken ExtraCommandTable: SymTab.Ref; -- knowing a non-standard command, find who can run it MachineToStatusTable: SymTab.Ref; -- knowing a machine's name, find its status; name is of the form "Hornet" or "3#35#" BrokenCommandObject: TYPE = RECORD [ serverMachineName: RPC.ShortROPE, versions: LIST OF ROPE _ NIL ]; BrokenCommand: TYPE = REF BrokenCommandObject; ExtraCommandObject: TYPE = RECORD [ serverMachineName: RPC.ShortROPE, versions: LIST OF ROPE _ NIL ]; ExtraCommand: TYPE = REF ExtraCommandObject; forceReBuildQueuing: BOOL _ FALSE; WaitingQueue: LIST OF QueueEntry _ NIL; QueueEntry: TYPE = REF QueueEntryRec; QueueEntryRec: TYPE = RECORD [ service: ROPE _ NIL, doQueueing: BOOL _ FALSE, waiters: LIST OF WaitingRequest _ NIL, servers: LIST OF ROPE _ NIL, changed: CONDITION ]; WaitingRequest: TYPE = REF WaitingRequestRec; WaitingRequestRec: TYPE = RECORD [ process: PROCESS, timeout: BasicTime.GMT, waiting: waitType _ init, version: ROPE, clientMachineName: RPC.ShortROPE, streamPupAddress: PupDefs.PupAddress, needListener: BOOL, serverAssigned: BOOL _ FALSE, response: ComputeServer.AskResponce, instance: RPC.ShortROPE, serverPupAddress: PupDefs.PupAddress ]; waitType: TYPE = {init, true, validating, done, timedOut}; serverInterfaceItem: TYPE = RECORD [ serverInstance: ROPE _ NIL, interface: ComputeServerRpcControl.InterfaceRecord _ NIL, lastUsed: BasicTime.GMT _ BasicTime.earliestGMT ]; serverInterfaceCacheSize: INT = 20; serverInterfaceArray: TYPE = ARRAY [0..serverInterfaceCacheSize) OF serverInterfaceItem; serverInterfaceCache: REF serverInterfaceArray; CheckState: TYPE = {checking, aborted, done}; CheckItemProcessObj: TYPE = RECORD[ check: CheckState _ checking, communicationsOK: BOOL _ TRUE, askFound: ComputeServer.AskResponce, serverPupAddress: PupDefs.PupAddress, errMsg: ROPE ]; NoOfLatentSummonerControllerProcesses: INT _ 0 ; StartUpController: PUBLIC PROC [controllerName: ROPE _ NIL, forceSelfAsPrimaryController: BOOL _ FALSE] RETURNS [] = { locked: BOOL _ FALSE; { ENABLE UNWIND => IF locked THEN UnLock[]; user, password: ROPE; ControllerGVName _ IF Rope.IsEmpty[controllerName] THEN UserProfile.Token["Summoner.ControllerName"] ELSE controllerName; IF Rope.IsEmpty[ControllerGVName] THEN ControllerGVName _ "PaloAlto.summoner" ; ControllerEnabled _ TRUE; LatentControllerActive _ TRUE; ControllerUp _ TRUE; Lock[]; locked _ TRUE; IF ~InterfaceExported THEN { info: GVNames.ConnectInfo; connect: GVBasics.Connect; [info: info, connect: connect ] _ GVNames.GetConnect[ControllerGVName]; IF info = individual OR info = group THEN { IF forceSelfAsPrimaryController OR Rope.Equal[connect, ComputeServerControl.MyNetAddressRope] OR Rope.Equal[UserProfile.Token["Summoner.PrimaryController"], ComputeServerControl.MyNetAddressRope] THEN { IAmTheController _ TRUE; TimeIBecameTheController _ BasicTime.Now[]; [user, password] _ UserCredentials.Get[]; InterfaceExported _ TRUE; BrokenCommandTable _ SymTab.Create[mod: 59, case: FALSE]; ExtraCommandTable _ SymTab.Create[mod: 59, case: FALSE]; MachineToStatusTable _ SymTab.Create[mod: 59, case: FALSE]; ComputeServerControllerRpcControl.ExportInterface[ interfaceName: [ type: "ComputeServerController.summoner", instance: ControllerGVName, version: [1,1]], user: user, password: RPC.MakeKey[password] ! RPC.ExportFailed => { InterfaceExported _ FALSE; IAmTheController _ FALSE; CONTINUE; }; ]; }; }; }; UnLock[]; locked _ FALSE; TRUSTED {Process.Detach[FORK LatentSummonerController[]] ;}; }; }; ControllerCannnotBeImported: PUBLIC PROC [reason: RPC.ImportFailure] = { LastControllerError _ import; LastImportFailure _ reason; NoticeControllerIsDown[]; }; ControllerCallFailed: PUBLIC PROC [reason: RPC.CallFailure] = { LastControllerError _ callFailed; LastCallFailure _ reason; NoticeControllerIsDown[]; }; NoticeControllerIsDown: PROC = { IF ControllerUp THEN ControllerFailedTime _ BasicTime.Now[]; ControllerUp _ FALSE; }; NoticeControllerIsUp: PUBLIC PROC = { ControllerUp _ TRUE; }; TryThisController: PROC RETURNS [importedOK: BOOL _ TRUE, controllerInterface: ComputeServerControllerRpcControl.InterfaceRecord] = { controllerInterface _ ComputeServerControllerRpcControl.ImportNewInterface[ interfaceName: [ type: "ComputeServerController.summoner", instance: ControllerGVName, version: [1,1]] ! RPC.ImportFailed => { importedOK _ FALSE; CONTINUE; }; ]; }; LatentSummonerController: PROC [] = { MakeSureThereIsOnlyOne: ENTRY PROC RETURNS [exit: BOOL _ FALSE] = { IF NoOfLatentSummonerControllerProcesses = 0 THEN NoOfLatentSummonerControllerProcesses _ 1 ELSE RETURN[TRUE]; }; locked: BOOL _ FALSE; IF MakeSureThereIsOnlyOne[] THEN RETURN; DO ENABLE UNWIND => IF locked THEN UnLock[]; IF ~LatentControllerActive THEN EXIT; IF ~ControllerUp OR GrabController THEN { -- controller is down or I should grab it - see if I should acquire the controller function info: GVNames.ConnectInfo; connect: GVBasics.Connect; ControllerMovingAway _ FALSE; IF GrabController OR ~TryThisController[].importedOK THEN { -- down but import will work soon downTime: INT = BasicTime.Period[ControllerFailedTime, BasicTime.Now[]]; [info: info, connect: connect ] _ GVNames.GetConnect[ControllerGVName]; IF GrabController OR downTime > 83 OR ((downTime > 13 AND (info = individual OR info = group) AND Rope.Equal[connect, ComputeServerControl.MyNetAddressRope]) OR Rope.Equal[UserProfile.Token["Summoner.PrimaryController"], ComputeServerControl.MyNetAddressRope]) THEN { user, password: ROPE; Lock[]; locked _ TRUE; GrabController _ FALSE; IAmTheController _ TRUE; TimeIBecameTheController _ BasicTime.Now[]; [user, password] _ UserCredentials.Get[]; IF InterfaceExported THEN ComputeServerControllerRpcControl.UnexportInterface[]; InterfaceExported _ TRUE; BrokenCommandTable _ SymTab.Create[mod: 59, case: FALSE]; ExtraCommandTable _ SymTab.Create[mod: 59, case: FALSE]; MachineToStatusTable _ SymTab.Create[mod: 59, case: FALSE]; ComputeServerControllerRpcControl.ExportInterface[ interfaceName: [ type: "ComputeServerController.summoner", instance: ControllerGVName, version: [1,1]], user: user, password: RPC.MakeKey[password] ! RPC.ExportFailed => { InterfaceExported _ FALSE; IAmTheController _ FALSE; CONTINUE; }; ]; UnLock[]; locked _ FALSE; }; }; } ELSE { -- Controller is up. If I think that I am the controller, but grapevine does not think so, then fix the situation IF IAmTheController THEN { info: GVNames.ConnectInfo; connect: GVBasics.Connect; [info: info, connect: connect ] _ GVNames.GetConnect[ControllerGVName]; IF (info = individual OR info = group) THEN { IF ~Rope.Equal[connect, ComputeServerControl.MyNetAddressRope] THEN { -- Grapevine says I am not the Controller triedControllerInterface: ComputeServerControllerRpcControl.InterfaceRecord; tryDifferentController: BOOL _ FALSE; importedOK: BOOL; [importedOK: importedOK, controllerInterface: triedControllerInterface] _ TryThisController[]; <> IF importedOK THEN [tryDifferentController: tryDifferentController] _ triedControllerInterface.GetSomeInfo[ ! RPC.CallFailed => { importedOK _ FALSE; CONTINUE; }; ]; IF importedOK AND ~tryDifferentController THEN { -- new controller answers IF ServerStatusList = NIL THEN { Lock[]; locked _ TRUE; IAmTheController _ FALSE; ControllerMovingAway _ FALSE; ComputeServerControllerRpcControl.UnexportInterface[]; InterfaceExported _ FALSE; ServerStatusList _ NIL; BrokenCommandTable _ NIL; ExtraCommandTable _ NIL; UnLock[]; locked _ FALSE; } ELSE { ControllerMovingAway _ TRUE; }; } ELSE { -- new controller does not answer or thinks it is moving too ControllerMovingAway _ FALSE; IF ~importedOK THEN { -- new controller does not answer  re-assert my controllership user, password: ROPE; [user, password] _ UserCredentials.Get[]; [] _ GVNames.SetConnect[user: user, password: RPC.MakeKey[password], individual: ControllerGVName, connect: ComputeServerControl.MyNetAddressRope]; }; }; } ELSE { -- Grapevine says I am the Controller ControllerMovingAway _ FALSE; }; } ELSE { -- NOT info = individual OR info = group (grapevine troubles) ControllerMovingAway _ FALSE; }; TossOldStatus[]; ReBuildQueuing[]; }; }; Process.Pause[Process.SecondsToTicks[127]]; ENDLOOP; }; ShutDownController: PUBLIC PROC [] RETURNS [] = { IF InterfaceExported THEN ComputeServerControllerRpcControl.UnexportInterface[]; InterfaceExported _ FALSE; ControllerEnabled _ FALSE; ServerStatusList _ NIL; BrokenCommandTable _ NIL; ExtraCommandTable _ NIL; MachineToStatusTable _ NIL; LatentControllerActive _ FALSE; ControllerUp _ FALSE; IAmTheController _ FALSE; }; TossOldStatus: ENTRY PROC [] RETURNS [] = { ENABLE UNWIND => NULL; statusList: LIST OF ServerStatus; lastStatusList: LIST OF ServerStatus _ NIL; status: ServerStatus _ NIL; now: BasicTime.GMT _ BasicTime.Now[]; FOR statusList _ ServerStatusList, statusList.rest UNTIL statusList = NIL DO status _ statusList.first; IF BasicTime.Period[status.timeOfStatus, now] > 30 THEN { [] _ SymTab.Delete[x: MachineToStatusTable, key: status.serverMachineName]; [] _ SymTab.Delete[x: MachineToStatusTable, key: status.serverMachinePupAddress]; IF lastStatusList = NIL THEN ServerStatusList _ statusList.rest ELSE lastStatusList.rest _ statusList.rest; }; lastStatusList _ statusList; ENDLOOP; }; ReBuildQueuing: PROC [] RETURNS [] = { now: BasicTime.GMT _ BasicTime.Now[]; IF forceReBuildQueuing OR BasicTime.Period[LastReBuildQueueTime, now] > 300 THEN { setNewQueueing[]; LastReBuildQueueTime _ now; }; forceReBuildQueuing _ FALSE; }; GetSomeInfo: PUBLIC ENTRY PROC RETURNS [error: BOOL _ FALSE, tryDifferentController: BOOL _ FALSE, msg: ROPE _ NIL, serverList: LIST OF ROPE _ NIL, bestFOM: REAL _ 0.0] = { ENABLE UNWIND => NULL; bestStatus: ServerStatus _ NIL; statusList: LIST OF ServerStatus; IF ~IAmTheController THEN RETURN [ FALSE, TRUE, "controller has moved", NIL, 0.0]; FOR statusList _ ServerStatusList, statusList.rest UNTIL statusList = NIL DO status: ServerStatus = statusList.first; perCentBusy: INT _ Real.Round[status.CPULoad*100.0]; IF Rope.Equal[status.serverMachineName, ComputeServerInternal.myHostName] THEN { serverList _ CONS[IO.PutFR["%g (Controller, %g%% busy)", IO.rope[status.serverMachineName], IO.int[perCentBusy]], serverList]; } ELSE serverList _ CONS[IO.PutFR["%g (%g%%)", IO.rope[status.serverMachineName], IO.int[perCentBusy]], serverList]; <> <> <<}>> << ELSE serverList _ CONS[status.serverMachineName, serverList];>> IF IsStatusBetter[status, bestStatus] THEN bestStatus _ status; ENDLOOP; IF bestStatus = NIL THEN RETURN [FALSE, ControllerMovingAway, "no servers up", serverList, 1.0]; RETURN [FALSE, ControllerMovingAway, NIL, serverList, bestStatus.FOM]; }; BestServerStats: PUBLIC ENTRY PROC RETURNS[instance: RPC.ShortROPE, FOM: REAL] = { ENABLE UNWIND => NULL; bestStatus: ServerStatus _ NIL; statusList: LIST OF ServerStatus; FOR statusList _ ServerStatusList, statusList.rest UNTIL statusList = NIL DO status: ServerStatus = statusList.first; IF IsStatusBetter[status, bestStatus] THEN bestStatus _ status; ENDLOOP; IF bestStatus = NIL THEN RETURN [NIL, 1.0]; RETURN [bestStatus.serverMachineName, bestStatus.FOM]; }; bumpBestServerStats: ENTRY PROC [instance: RPC.ShortROPE] = { ENABLE UNWIND => NULL; statusFound: BOOL; val: REF ANY; status: ServerStatus; [found: statusFound, val: val] _ SymTab.Fetch[x: MachineToStatusTable, key: instance]; IF statusFound THEN { status _ NARROW[val]; status.CPULoad _ MIN[ 1.0, status.CPULoad + 0.5]; status.reclamationRate _ status.reclamationRate + 30 ; status.FOM _ figureOfMerit[status]; }; }; FindService: PUBLIC PROC [service: ROPE] RETURNS [found: BOOL, instance: RPC.ShortROPE] = { [found, instance] _ InnerFindService[service]; }; FindServiceWithQueueing: PUBLIC PROCEDURE [service: ROPE, version: RPC.ShortROPE, timeToWait: INT, clientMachineName: RPC.ShortROPE, streamPupAddress: PupDefs.PupAddress, needListener: BOOL] RETURNS [found: ComputeServer.AskResponce, instance: RPC.ShortROPE, serverPupAddress: PupDefs.PupAddress, errMsg: ROPE] = { entry: QueueEntry; item: WaitingRequest _ NIL; tryForServer: PROC [] = { DO commOK: BOOL; foundBroken: BOOL; brokenVal: REF ANY; brokenList, brokenListLoop: LIST OF BrokenCommand; askFound: ComputeServer.AskResponce; queueItemReady[entry, item]; IF item.waiting = timedOut THEN EXIT; [found: foundBroken, val: brokenVal] _ SymTab.Fetch[x: BrokenCommandTable, key: service]; brokenList _ NARROW[brokenVal]; FOR brokenListLoop _ brokenList, brokenListLoop.rest UNTIL brokenListLoop = NIL DO IF Rope.Equal[brokenListLoop.first.serverMachineName, item.instance, FALSE] THEN { item.serverAssigned _ FALSE; item.waiting _ true; EXIT; }; ENDLOOP; IF ~item.serverAssigned THEN LOOP; [communicationsOK: commOK, askFound: askFound, serverPupAddress: serverPupAddress] _ CheckItem [entry, item]; IF ~commOK OR (askFound # foundOK) THEN { item.serverAssigned _ FALSE; item.waiting _ true; LOOP; }; item.response _ askFound; item.serverPupAddress _ serverPupAddress; SELECT askFound FROM notFound => { removeQueueItem[entry, item]; errMsg _ NIL; RETURN; }; foundButTooBusy => { LOOP; }; foundOK => { removeQueueItem[entry, item]; RETURN; }; ENDCASE; ENDLOOP; removeQueueItem[entry, item]; }; [entry, item] _ findOrCreateQueueEntry[service]; item.version _ version; item.clientMachineName _ clientMachineName; item.streamPupAddress _ streamPupAddress; item.needListener _ needListener; IF entry.doQueueing THEN { -- queueing already being done item.process _ LOOPHOLE[Process.GetCurrent[]]; item.timeout _ BasicTime.Update[BasicTime.Now[], timeToWait]; item.waiting _ true; tryForServer[]; bumpBestServerStats[item.instance]; RETURN[found: item.response, instance: item.instance, serverPupAddress: item.serverPupAddress, errMsg: NIL]; } ELSE { -- queueing not started serversTried: LIST OF RPC.ShortROPE _ NIL; DO commOK: BOOL; serviceFound: BOOL; instance: RPC.ShortROPE; askFound: ComputeServer.AskResponce; entry.doQueueing _ TRUE; setNewQueueing[]; [serviceFound, instance] _ InnerFindService[service]; item.instance _ instance; IF ~serviceFound THEN { found _ notFound; removeQueueItem[entry, item]; RETURN; }; FOR oldServers: LIST OF RPC.ShortROPE _ serversTried, oldServers.rest UNTIL oldServers = NIL DO IF Rope.Equal[oldServers.first, instance, FALSE] THEN { <> serversTried _ NIL; -- help garbage collector entry.doQueueing _ TRUE; setNewQueueing[]; item.process _ LOOPHOLE[Process.GetCurrent[]]; item.timeout _ BasicTime.Update[BasicTime.Now[], timeToWait]; item.waiting _ true; tryForServer[]; RETURN[found: item.response, instance: item.instance, serverPupAddress: item.serverPupAddress, errMsg: NIL]; }; ENDLOOP; serversTried _ CONS[instance, serversTried]; [communicationsOK: commOK, askFound: askFound, serverPupAddress: serverPupAddress] _ CheckItem[entry, item]; IF ~commOK THEN LOOP; SELECT askFound FROM notFound => { removeQueueItem[entry, item]; RETURN [askFound, instance, serverPupAddress, NIL]; }; foundButTooBusy => { LOOP; }; foundOK => { removeQueueItem[entry, item]; RETURN [askFound, instance, serverPupAddress, NIL]; }; ENDCASE; ENDLOOP; }; }; CheckItem: PROC [entry: QueueEntry, item: WaitingRequest] RETURNS [communicationsOK: BOOL _ TRUE, askFound: ComputeServer.AskResponce, serverPupAddress: PupDefs.PupAddress, errMsg: ROPE] = { checkProcess: PROCESS; checkObject: REF CheckItemProcessObj _ NEW[CheckItemProcessObj]; startTime: BasicTime.GMT _ BasicTime.Now[]; checkProcess _ FORK CheckItemProcess[entry, item, checkObject]; DO IF BasicTime.Period[startTime, BasicTime.Now[]] > 9 THEN TRUSTED { <> checkProcessTV: AMProcess.Process = AMProcess.PSBIToTV[world: WorldVM.LocalWorld[], psbi: PrincOpsUtils.ProcessToPsbIndex[LOOPHOLE[checkProcess]]]; <> Process.Detach[checkProcess]; AMProcess.Freeze[LIST[checkProcessTV]]; AMProcess.Abort[checkProcessTV]; AMProcess.Thaw[LIST[checkProcessTV]]; checkObject.check _ aborted; RETURN[FALSE, notFound, PupTypes.fillInPupAddress, ""]; }; IF checkObject.check = done THEN TRUSTED { [] _ JOIN checkProcess; RETURN[checkObject.communicationsOK, checkObject.askFound, checkObject.serverPupAddress, checkObject.errMsg]; }; Process.Pause[2]; ENDLOOP; }; CheckItemProcess: PROC [entry: QueueEntry, item: WaitingRequest, checkObject: REF CheckItemProcessObj] = { ENABLE ABORTED => GOTO aborted; communicationsOK: BOOL _ TRUE; askFound: ComputeServer.AskResponce; serverPupAddress: PupDefs.PupAddress; errMsg: ROPE; firstTime: BOOL _ TRUE; serverInstance: ROPE _ item.instance; DO serverInterface: ComputeServerRpcControl.InterfaceRecord; serverInterface _ getServerInterfaceFromCache[serverInstance]; IF serverInterface = NIL THEN {communicationsOK _ FALSE; RETURN}; [found: askFound, serverPupAddress: serverPupAddress, errMsg: errMsg] _ serverInterface.AskForService[service: entry.service, version: item.version, clientMachineName: item.clientMachineName, streamPupAddress: item.streamPupAddress, needListener: item.needListener ! RPC.CallFailed => { [] _ deleteServerInterfaceFromCache[serverInterface]; IF firstTime THEN {firstTime _ FALSE; LOOP;}; communicationsOK _ FALSE; EXIT; }; ]; EXIT; ENDLOOP; checkObject.check _ done; checkObject.communicationsOK _ communicationsOK; checkObject.askFound _ askFound; checkObject.serverPupAddress _ serverPupAddress; checkObject.errMsg _ errMsg; EXITS aborted => { checkObject.check _ aborted; RETURN; }; }; InnerFindService: ENTRY PROC [service: ROPE] RETURNS [found: BOOL, instance: RPC.ShortROPE] = { ENABLE UNWIND => NULL; statusList: LIST OF ServerStatus; bestStatus: ServerStatus _ NIL; foundCommand: BOOL; [found: foundCommand] _ SymTab.Fetch[x: ComputeServerInternal.CommandTable, key: service]; IF foundCommand THEN { <> foundBroken: BOOL; brokenVal: REF ANY; brokenList: LIST OF BrokenCommand; [found: foundBroken, val: brokenVal] _ SymTab.Fetch[x: BrokenCommandTable, key: service]; brokenList _ NARROW[brokenVal]; FOR statusList _ ServerStatusList, statusList.rest UNTIL statusList = NIL DO skipIt: BOOL _ FALSE; serverMachineName: ROPE _ statusList.first.serverMachineName; serverMachinePupAddress: ROPE _ statusList.first.serverMachinePupAddress; brokenListLoop: LIST OF BrokenCommand; FOR brokenListLoop _ brokenList, brokenListLoop.rest UNTIL brokenListLoop = NIL DO IF Rope.Equal[brokenListLoop.first.serverMachineName, serverMachineName, FALSE] OR Rope.Equal[brokenListLoop.first.serverMachineName, serverMachinePupAddress, FALSE] THEN { skipIt _ TRUE; EXIT; }; ENDLOOP; IF skipIt THEN LOOP; IF IsStatusBetter[statusList.first, bestStatus] THEN bestStatus _ statusList.first; ENDLOOP; } ELSE { <> foundExtra: BOOL; extraVal: REF ANY; extraListLoop: LIST OF ExtraCommand; [found: foundExtra, val: extraVal] _ SymTab.Fetch[x: ExtraCommandTable, key: service]; FOR extraListLoop _ NARROW[extraVal], extraListLoop.rest UNTIL extraListLoop = NIL DO statusFound: BOOL; val: REF ANY; status: ServerStatus; [found: statusFound, val: val] _ SymTab.Fetch[x: MachineToStatusTable, key: extraListLoop.first.serverMachineName]; IF statusFound THEN { status _ NARROW[val]; IF IsStatusBetter[status, bestStatus] THEN bestStatus _ status; }; ENDLOOP; }; IF bestStatus = NIL THEN { RETURN[FALSE, ""] } ELSE { bestStatus.CPULoad _ MIN[ 1.0, bestStatus.CPULoad + 0.5]; bestStatus.reclamationRate _ bestStatus.reclamationRate + 30 ; bestStatus.FOM _ figureOfMerit[bestStatus]; RETURN[TRUE, bestStatus.serverMachinePupAddress]; }; }; figureOfMerit: PROC [status: ServerStatus] RETURNS [FOM: REAL] = { adjustedBusyCPU: REAL = IF status.machineType = dorado THEN status.CPULoad ELSE 0.8 + (status.CPULoad/5.0); thrashRate: REAL _ status.reclamationRate/90.0; IF thrashRate > 1.0 THEN thrashRate _ 1.0; FOM _ Real.SqRt[thrashRate*thrashRate + adjustedBusyCPU*adjustedBusyCPU]; }; CommandUnavailable: PUBLIC ENTRY PROC [serverMachineName: RPC.ShortROPE, commandName: ROPE, version: RPC.ShortROPE] = { ENABLE UNWIND => NULL; found: BOOL; oldList: LIST OF BrokenCommand; newBroken: BrokenCommand; newList: LIST OF BrokenCommand; machineList: LIST OF BrokenCommand; val: REF ANY; [found: found, val: val] _ SymTab.Fetch[x: BrokenCommandTable, key: commandName]; IF found THEN { oldList _ NARROW[val]; FOR machineList _ oldList, machineList.rest UNTIL machineList = NIL DO broken: BrokenCommand = machineList.first; IF Rope.Equal[broken.serverMachineName, serverMachineName] THEN { versionList: LIST OF ROPE; FOR versionList _ broken.versions, versionList.rest UNTIL versionList = NIL DO vers: ROPE _ NARROW[versionList.first]; IF Rope.Equal[vers, version] THEN RETURN; ENDLOOP; broken.versions _ CONS[version, broken.versions]; RETURN; }; ENDLOOP; newBroken _ NEW[BrokenCommandObject _ [serverMachineName: serverMachineName, versions: LIST[version]]]; newList _ CONS[newBroken, NARROW[val]]; [] _ SymTab.Store[x: BrokenCommandTable, key: commandName, val: newList]; } ELSE { newBroken _ NEW[BrokenCommandObject _ [serverMachineName: serverMachineName, versions: LIST[version]]]; newList _ LIST[newBroken]; [] _ SymTab.Store[x: BrokenCommandTable, key: commandName, val: newList]; }; }; ExtraCommandAvailable: PUBLIC ENTRY PROC [serverMachineName: RPC.ShortROPE, commandName: ROPE, version: RPC.ShortROPE] = { ENABLE UNWIND => NULL; found: BOOL; oldList: LIST OF ExtraCommand; newExtra: ExtraCommand; newList: LIST OF ExtraCommand; machineList: LIST OF ExtraCommand; val: REF ANY; <> [found: found, val: val] _ SymTab.Fetch[x: BrokenCommandTable, key: commandName]; IF found THEN { [] _ SymTab.Delete[x: BrokenCommandTable, key: commandName]; }; [found: found, val: val] _ SymTab.Fetch[x: ExtraCommandTable, key: commandName]; IF found THEN { oldList _ NARROW[val]; FOR machineList _ oldList, machineList.rest UNTIL machineList = NIL DO extra: ExtraCommand = machineList.first; IF Rope.Equal[extra.serverMachineName, serverMachineName] THEN { versionList: LIST OF ROPE; FOR versionList _ extra.versions, versionList.rest UNTIL versionList = NIL DO vers: ROPE _ NARROW[versionList.first]; IF Rope.Equal[vers, version] THEN RETURN; ENDLOOP; extra.versions _ CONS[version, extra.versions]; RETURN; }; ENDLOOP; newExtra _ NEW[ExtraCommandObject _ [serverMachineName: serverMachineName, versions: LIST[version]]]; newList _ CONS[newExtra, NARROW[val]]; [] _ SymTab.Store[x: ExtraCommandTable, key: commandName, val: newList]; } ELSE { newExtra _ NEW[ExtraCommandObject _ [serverMachineName: serverMachineName, versions: LIST[version]]]; newList _ LIST[newExtra]; [] _ SymTab.Store[x: ExtraCommandTable, key: commandName, val: newList]; }; }; NoticeNewPackage: PUBLIC ENTRY PROC [package: RPC.ShortROPE] RETURNS [error: BOOL _ FALSE, tryDifferentController: BOOL _ FALSE, msg: ROPE _ NIL]= { ENABLE UNWIND => NULL; statusList: LIST OF ServerStatus; packageListInStream: IO.STREAM; packageListGlobalName: ROPE _ NIL; packageListOutStream: IO.STREAM; packageListTempName: ROPE _ NIL; foundItem: BOOL _ FALSE; fullFName: ROPE; fsErrMsg: ROPE; cp: FS.ComponentPositions; dirOmitted: BOOLEAN; dfPrefix: ROPE; shortPackage: ROPE _ NIL; shortPackageAndDF: ROPE _ NIL; packageDFDate: BasicTime.GMT _ BasicTime.nullGMT; sawDir: BOOL _ FALSE; blankAfterDir: BOOL _ FALSE; lastDir: REF DFUtilities.DirectoryItem _ NIL; wroteDir: BOOL _ FALSE; defaultDirectory: REF DFUtilities.DirectoryItem _ NIL ; whiteItem: REF DFUtilities.WhiteSpaceItem _ NEW[DFUtilities.WhiteSpaceItem _ [1]]; writeUpdatedItem: PROC[] = { file: REF DFUtilities.FileItem; file _ NEW[DFUtilities.FileItem _ [ name: shortPackageAndDF, -- a short name date: [explicit, packageDFDate], verifyRoot: FALSE]]; IF lastDir = NIL OR ~Rope.Equal[lastDir.path1, dfPrefix, FALSE] THEN { dir: REF DFUtilities.DirectoryItem _ NEW [DFUtilities.DirectoryItem _ [ path1: dfPrefix, path2: NIL, path2IsCameFrom: FALSE, exported: FALSE, readOnly: FALSE ]]; DFUtilities.WriteItemToStream[out: packageListOutStream, item: whiteItem]; DFUtilities.WriteItemToStream[out: packageListOutStream, item: dir]; DFUtilities.WriteItemToStream[out: packageListOutStream, item: whiteItem]; lastDir _ dir; }; DFUtilities.WriteItemToStream[out: packageListOutStream, item: file]; }; DoOneItem: DFUtilities.ProcessItemProc = { <> errors, warnings, filesActedUpon: INT _ 0; WITH item SELECT FROM dir: REF DFUtilities.DirectoryItem => { IF lastDir = NIL OR ~Rope.Equal[lastDir.path1, dir.path1, FALSE] THEN { wroteDir _ FALSE; lastDir _ dir; }; }; comment: REF DFUtilities.CommentItem => { DFUtilities.WriteItemToStream[out: packageListOutStream, item: comment]; }; white: REF DFUtilities.WhiteSpaceItem => { }; file: REF DFUtilities.FileItem => { shortName: ROPE = Rope.Substr[file.name, 0, Rope.Index[s1: file.name, s2: "!"]]; packageDFDate: BasicTime.GMT _ BasicTime.nullGMT; currentDFDate: BasicTime.GMT _ BasicTime.nullGMT; IF ~wroteDir THEN { wroteDir _ TRUE; IF lastDir = NIL THEN lastDir _ defaultDirectory; DFUtilities.WriteItemToStream[out: packageListOutStream, item: whiteItem]; DFUtilities.WriteItemToStream[out: packageListOutStream, item: lastDir]; DFUtilities.WriteItemToStream[out: packageListOutStream, item: whiteItem]; }; IF Rope.Equal[shortPackageAndDF, shortName, FALSE] THEN { writeUpdatedItem[]; foundItem _ TRUE; } ELSE { DFUtilities.WriteItemToStream[out: packageListOutStream, item: file]; }; }; ENDCASE; }; { ENABLE UNWIND => { IF packageListInStream # NIL THEN packageListInStream.Close[! FS.Error => CONTINUE]; IF packageListOutStream # NIL THEN packageListOutStream.Close[! FS.Error => CONTINUE]; FS.Delete[name: packageListTempName ! FS.Error => CONTINUE]; }; IF package.IsEmpty[] THEN RETURN [ TRUE, FALSE, "null package name"]; IF ~ComputeControllerInternal.IAmTheController THEN RETURN [ FALSE, TRUE, "controller has moved"]; WHILE ComputeControllerInternal.IAmTheController AND BasicTime.Period[ComputeControllerInternal.TimeIBecameTheController, BasicTime.Now[]] < 22 DO WAIT NoticeNewPackageCondition; ENDLOOP; IF ~ComputeControllerInternal.IAmTheController THEN RETURN [ FALSE, TRUE, "controller has moved"]; [fullFName: fullFName, cp: cp, dirOmitted: dirOmitted] _ FS.ExpandName[name: package, wDir: NIL ! FS.Error => GOTO badPackageName ]; shortPackage _ fullFName.Substr[cp.base.start, cp.base.length]; shortPackageAndDF _ Rope.Concat[shortPackage, ".df"]; dfPrefix _ fullFName.Substr[0, cp.base.start]; defaultDirectory _ NEW [DFUtilities.DirectoryItem _ [ path1: dfPrefix, path2: NIL, path2IsCameFrom: FALSE, exported: FALSE, readOnly: FALSE ]]; [created: packageDFDate] _ FS.FileInfo[name: package, remoteCheck: TRUE ! FS.Error => GOTO noDFFile]; packageListGlobalName _ Rope.Concat[ComputeServerInternal.RemoteCommandDir, "PackageList"]; packageListInStream _ FS.StreamOpen[packageListGlobalName ! FS.Error => GOTO cantOpenPackageList]; packageListTempName _ Rope.Concat[ComputeServerInternal.LocalCommandDir, "PackageList.temp"]; packageListOutStream _ FS.StreamOpen[fileName: packageListTempName, accessOptions: $create]; DFUtilities.ParseFromStream[packageListInStream, DoOneItem ! DFUtilities.SyntaxError => GOTO syntaxErrorInPackageList]; IF ~foundItem THEN writeUpdatedItem[]; packageListInStream.Close[! FS.Error => CONTINUE]; packageListOutStream.Close[! FS.Error => CONTINUE]; [] _ FS.Copy[from: packageListTempName, to: packageListGlobalName, attach: FALSE ! FS.Error => { fsErrMsg _ error.explanation; GOTO cantCopyToServer}]; FS.Delete[name: packageListTempName ! FS.Error => CONTINUE]; IF ~ComputeControllerInternal.IAmTheController THEN RETURN [ FALSE, TRUE, "controller has moved"]; FOR statusList _ ComputeControllerInternal.ServerStatusList, statusList.rest UNTIL statusList = NIL DO statusList.first.newPackage _ TRUE; ENDLOOP; EXITS cantOpenPackageList => RETURN [ TRUE, FALSE, "controller could not open package list"]; noDFFile => RETURN [ TRUE, FALSE, "controller could not open specified df file - "]; syntaxErrorInPackageList => RETURN [ TRUE, FALSE, "controller could not parse the package list - call an implementor"]; badPackageName => RETURN [ TRUE, FALSE, "bad package df file name"]; cantCopyToServer => RETURN [ TRUE, FALSE, Rope.Concat["could not copy new package list to the file server because ", fsErrMsg]]; }; }; RemoveOldPackage: PUBLIC ENTRY PROC [package: RPC.ShortROPE] RETURNS [error: BOOL, tryDifferentController: BOOL, msg: Rope.ROPE] = { ENABLE UNWIND => NULL; statusList: LIST OF ServerStatus; packageListInStream: IO.STREAM; packageListGlobalName: ROPE _ NIL; packageListOutStream: IO.STREAM; packageListTempName: ROPE _ NIL; foundItem: BOOL _ FALSE; fullFName: ROPE; cp: FS.ComponentPositions; dirOmitted: BOOLEAN; dfPrefix: ROPE; shortPackage: ROPE _ NIL; shortPackageAndDF: ROPE _ NIL; wroteDir: BOOL _ FALSE; lastDir: REF DFUtilities.DirectoryItem _ NIL; defaultDirectory: REF DFUtilities.DirectoryItem _ NIL ; whiteItem: REF DFUtilities.WhiteSpaceItem _ NEW[DFUtilities.WhiteSpaceItem _ [1]]; DoOneItem: DFUtilities.ProcessItemProc = { <> errors, warnings, filesActedUpon: INT _ 0; WITH item SELECT FROM dir: REF DFUtilities.DirectoryItem => { IF lastDir = NIL OR ~Rope.Equal[lastDir.path1, dir.path1, FALSE] THEN { <> wroteDir _ FALSE; lastDir _ dir; }; }; comment: REF DFUtilities.CommentItem => { DFUtilities.WriteItemToStream[out: packageListOutStream, item: comment]; }; white: REF DFUtilities.WhiteSpaceItem => { DFUtilities.WriteItemToStream[out: packageListOutStream, item: white]; }; file: REF DFUtilities.FileItem => { shortName: ROPE = Rope.Substr[file.name, 0, Rope.Index[s1: file.name, s2: "!"]]; currentDFDate: BasicTime.GMT _ BasicTime.nullGMT; IF Rope.Equal[shortPackageAndDF, shortName, FALSE] THEN { foundItem _ TRUE; } ELSE { IF ~wroteDir THEN { wroteDir _ TRUE; IF lastDir = NIL THEN lastDir _ defaultDirectory; DFUtilities.WriteItemToStream[out: packageListOutStream, item: whiteItem]; DFUtilities.WriteItemToStream[out: packageListOutStream, item: lastDir]; DFUtilities.WriteItemToStream[out: packageListOutStream, item: whiteItem]; }; DFUtilities.WriteItemToStream[out: packageListOutStream, item: file]; }; }; ENDCASE; }; { ENABLE UNWIND => { IF packageListInStream # NIL THEN packageListInStream.Close[! FS.Error => CONTINUE]; IF packageListOutStream # NIL THEN packageListOutStream.Close[! FS.Error => CONTINUE]; FS.Delete[name: packageListTempName ! FS.Error => CONTINUE]; }; IF package.IsEmpty[] THEN RETURN [ TRUE, FALSE, "null package name"]; IF ~ComputeControllerInternal.IAmTheController THEN RETURN [ FALSE, TRUE, "controller has moved"]; WHILE ComputeControllerInternal.IAmTheController AND BasicTime.Period[ComputeControllerInternal.TimeIBecameTheController, BasicTime.Now[]] < 22 DO WAIT NoticeNewPackageCondition; ENDLOOP; IF ~ComputeControllerInternal.IAmTheController THEN RETURN [ FALSE, TRUE, "controller has moved"]; [fullFName: fullFName, cp: cp, dirOmitted: dirOmitted] _ FS.ExpandName[name: package, wDir: NIL ! FS.Error => GOTO badPackageName ]; shortPackage _ fullFName.Substr[cp.base.start, cp.base.length]; shortPackageAndDF _ Rope.Concat[shortPackage, ".df"]; dfPrefix _ fullFName.Substr[0, cp.base.start]; defaultDirectory _ NEW [DFUtilities.DirectoryItem _ [ path1: dfPrefix, path2: NIL, path2IsCameFrom: FALSE, exported: FALSE, readOnly: FALSE ]]; packageListGlobalName _ Rope.Concat[ComputeServerInternal.RemoteCommandDir, "PackageList"]; packageListInStream _ FS.StreamOpen[packageListGlobalName ! FS.Error => GOTO cantOpenPackageList]; packageListTempName _ Rope.Concat[ComputeServerInternal.LocalCommandDir, "PackageList.temp"]; packageListOutStream _ FS.StreamOpen[fileName: packageListTempName, accessOptions: $create]; DFUtilities.ParseFromStream[packageListInStream, DoOneItem ! DFUtilities.SyntaxError => GOTO syntaxErrorInPackageList]; packageListInStream.Close[! FS.Error => CONTINUE]; packageListOutStream.Close[! FS.Error => CONTINUE]; IF ~foundItem THEN { FS.Delete[name: packageListTempName ! FS.Error => CONTINUE]; RETURN [ TRUE, FALSE, "package not found"] }; [] _ FS.Copy[from: packageListTempName, to: packageListGlobalName, attach: FALSE]; FS.Delete[name: packageListTempName ! FS.Error => CONTINUE]; IF ~ComputeControllerInternal.IAmTheController THEN RETURN [ FALSE, TRUE, "controller has moved"]; FOR statusList _ ComputeControllerInternal.ServerStatusList, statusList.rest UNTIL statusList = NIL DO statusList.first.newPackage _ TRUE; ENDLOOP; EXITS cantOpenPackageList => RETURN [ TRUE, FALSE, "controller could not open package list"]; syntaxErrorInPackageList => RETURN [ TRUE, FALSE, "controller could not parse the package list - call an implementor"]; badPackageName => RETURN [ TRUE, FALSE, "bad package df file name"]; }; }; NewStats: PUBLIC ENTRY PROC [serverMachineName: RPC.ShortROPE, serverMachinePupAddress: RPC.ShortROPE, serverUP: BOOL, firstCall: BOOL, machineType: PrincOps.MachineType, mainMemory: CARDINAL, numberCPUs: CARDINAL, diskPartionSize: INT, freePagesOnDisk: INT, freeboard: INT, freeGFI: CARDINAL, freeMDS: CARDINAL, freeVM: CARDINAL, oldestLRUFileDate: BasicTime.GMT, CPULoad: REAL, reclamationRate: REAL, freeProcesses: CARDINAL] RETURNS [terminateService, newPackage: BOOL _ FALSE, queueingCommands: LIST OF ROPE _ NIL] = { ENABLE UNWIND => NULL; statusList: LIST OF ServerStatus; status: ServerStatus _ NIL ; lastStatusList: LIST OF ServerStatus _ NIL ; FOR statusList _ ServerStatusList, statusList.rest UNTIL statusList = NIL DO IF Rope.Equal[statusList.first.serverMachineName, serverMachineName] THEN { status _ statusList.first; IF ~serverUP THEN { IF lastStatusList = NIL THEN ServerStatusList _ statusList.rest ELSE lastStatusList.rest _ statusList.rest; } ELSE { newPackage _ status.newPackage; status.newPackage _ FALSE; status.freePagesOnDisk _ freePagesOnDisk; status.freeboard _ freeboard; status.freeGFI _ freeGFI; status.freeMDS _ freeMDS; status.freeVM _ freeVM; status.oldestLRUFileDate _ oldestLRUFileDate; status.CPULoad _ CPULoad; status.reclamationRate _ reclamationRate; status.freeProcesses _ freeProcesses; status.FOM _ figureOfMerit[status: status]; status.timeOfStatus _ BasicTime.Now[]; IF ~SymTab.Fetch[x: MachineToStatusTable, key: serverMachineName].found THEN { [] _ SymTab.Store[x: MachineToStatusTable, key: serverMachineName, val: status]; [] _ SymTab.Store[x: MachineToStatusTable, key: serverMachinePupAddress, val: status]; }; }; EXIT; }; lastStatusList _ statusList; REPEAT FINISHED => status _ NIL; ENDLOOP; IF status = NIL AND serverUP THEN { status _ NEW[ServerStatusRecord _ [ serverMachineName: serverMachineName, serverMachinePupAddress: serverMachinePupAddress, timeOfStatus: BasicTime.Now[], machineType: machineType, mainMemory: mainMemory, numberCPUs: numberCPUs, diskPartionSize:diskPartionSize, freePagesOnDisk: freePagesOnDisk, freeboard: freeboard, freeGFI: freeGFI, freeMDS: freeMDS, freeVM: freeVM, oldestLRUFileDate: oldestLRUFileDate, CPULoad: CPULoad, reclamationRate: reclamationRate, freeProcesses: freeProcesses ]]; status.FOM _ figureOfMerit[status: status]; [] _ SymTab.Store[x: MachineToStatusTable, key: serverMachineName, val: status]; [] _ SymTab.Store[x: MachineToStatusTable, key: serverMachinePupAddress, val: status]; ServerStatusList _ CONS[status, ServerStatusList]; }; IF status # NIL AND status.newQueueing THEN { status.newQueueing _ FALSE; FOR l: LIST OF QueueEntry _ WaitingQueue, l.rest UNTIL l = NIL DO IF l.first.doQueueing THEN queueingCommands _ CONS[l.first.service, queueingCommands]; ENDLOOP; }; IF firstCall THEN { CleanBroken: SymTab.EachPairAction = { <> ENABLE UNWIND => NULL; brokenList, newBrokenList, brokenListLoop: LIST OF BrokenCommand _ NIL; changed: BOOL _ FALSE; brokenList _ NARROW[val]; FOR brokenListLoop _ brokenList, brokenListLoop.rest UNTIL brokenListLoop = NIL DO IF Rope.Equal[brokenListLoop.first.serverMachineName, serverMachinePupAddress, FALSE] THEN { changed _ TRUE; LOOP; } ELSE { newBrokenList _ CONS[brokenListLoop.first, newBrokenList]; }; ENDLOOP; IF changed THEN [] _ SymTab.Store[x: BrokenCommandTable, key: key, val: newBrokenList]; RETURN[FALSE]; }; CleanExtra: SymTab.EachPairAction = { <> ENABLE UNWIND => NULL; extraList, newExtraList, extraListLoop: LIST OF ExtraCommand _ NIL; changed: BOOL _ FALSE; extraList _ NARROW[val]; FOR extraListLoop _ extraList, extraListLoop.rest UNTIL extraListLoop = NIL DO IF Rope.Equal[extraListLoop.first.serverMachineName, serverMachinePupAddress, FALSE] THEN { changed _ TRUE; LOOP; } ELSE { newExtraList _ CONS[extraListLoop.first, newExtraList]; }; ENDLOOP; IF changed THEN [] _ SymTab.Store[x: ExtraCommandTable, key: key, val: newExtraList]; RETURN[FALSE]; }; [] _ SymTab.Pairs[x: BrokenCommandTable, action: CleanBroken]; [] _ SymTab.Pairs[x: ExtraCommandTable, action: CleanExtra]; }; }; getServerInterfaceFromCache: ENTRY PROC [serverInstance: RPC.ShortROPE] RETURNS [serverInterface: ComputeServerRpcControl.InterfaceRecord _ NIL] = { ENABLE UNWIND => NULL; newServerInterface: ComputeServerRpcControl.InterfaceRecord _ NIL; bestIndex: INT _ 0; bestTime: BasicTime.GMT _ BasicTime.latestGMT; now: BasicTime.GMT _ BasicTime.Now[]; FOR index: INT IN [0..serverInterfaceCacheSize) DO IF Rope.Equal[serverInstance, serverInterfaceCache[index].serverInstance] THEN { newServerInterface _ serverInterfaceCache[index].interface; serverInterfaceCache[index].lastUsed _ now; RETURN[serverInterfaceCache[index].interface]; }; IF BasicTime.Period[bestTime, serverInterfaceCache[index].lastUsed] < 0 THEN { bestTime _ serverInterfaceCache[index].lastUsed; bestIndex _ index; }; ENDLOOP; newServerInterface _ ComputeServerRpcControl.ImportNewInterface[ interfaceName: [ type: "ComputeServer.summoner", instance: serverInstance, version: [1,1]] ! RPC.ImportFailed => { CONTINUE; }; ]; IF newServerInterface # NIL THEN { serverInterfaceCache[bestIndex].lastUsed _ now; serverInterfaceCache[bestIndex].serverInstance _ serverInstance; serverInterfaceCache[bestIndex].interface _ newServerInterface; }; RETURN[newServerInterface]; }; deleteServerInterfaceFromCache: ENTRY PROC [serverInterface: ComputeServerRpcControl.InterfaceRecord] RETURNS [gotIt: BOOL _ FALSE] = { ENABLE UNWIND => NULL; FOR index: INT IN [0..serverInterfaceCacheSize) DO IF serverInterfaceCache[index].interface = serverInterface THEN { serverInterfaceCache[index].serverInstance _ NIL; serverInterfaceCache[index].interface _ NIL; serverInterfaceCache[index].lastUsed _ BasicTime.earliestGMT; gotIt _ TRUE; EXIT; }; ENDLOOP; }; findOrCreateQueueEntry: ENTRY PROC [service: ROPE] RETURNS [entry: QueueEntry _ NIL, item: WaitingRequest] = { ENABLE UNWIND => NULL; item _ NEW[WaitingRequestRec ]; FOR queue: LIST OF QueueEntry _ WaitingQueue, queue.rest UNTIL queue = NIL DO IF Rope.Equal[queue.first.service, service, FALSE] THEN { entry _ queue.first; IF entry.waiters = NIL THEN entry.waiters _ CONS[item, NIL] ELSE FOR l: LIST OF WaitingRequest _ entry.waiters, l.rest UNTIL l = NIL DO IF l.rest = NIL THEN {l.rest _ CONS[item, NIL]; EXIT}; ENDLOOP; RETURN; -- existing queue }; ENDLOOP; WaitingQueue _ CONS[ entry _ NEW[QueueEntryRec _ [service: service, waiters: LIST[item]]], WaitingQueue ]; TRUSTED {Process.InitializeCondition[@entry.changed, Process.SecondsToTicks[1]];}; }; setNewQueueing: ENTRY PROC [] = { ENABLE UNWIND => NULL; FOR l: LIST OF ServerStatus _ ServerStatusList, l.rest UNTIL l = NIL DO l.first.newQueueing _ TRUE; ENDLOOP; LastReBuildQueueTime _ BasicTime.Now[]; }; queueItemReady: ENTRY PROC [entry: QueueEntry, item: WaitingRequest]= { ENABLE UNWIND => NULL; firstTime: BOOL _ TRUE; item.waiting _ true; DO IF BasicTime.Period[from: item.timeout, to: BasicTime.Now[]] > 0 THEN {item.waiting _ timedOut; RETURN}; SELECT item.waiting FROM true => { IF entry.servers # NIL AND entry.waiters.first = item THEN { bestStatus: ServerStatus _ NIL; prev, prevBest: LIST OF ROPE _ NIL; best: LIST OF ROPE _ NIL; serv: ROPE; FOR l: LIST OF ROPE _ entry.servers, l.rest UNTIL l = NIL DO statusFound: BOOL; val: REF ANY; status: ServerStatus; serv _ l.first; [found: statusFound, val: val] _ SymTab.Fetch[x: MachineToStatusTable, key: serv]; IF statusFound THEN { status _ NARROW[val]; IF IsStatusBetter[status, bestStatus] THEN { bestStatus _ status; best _ l; prevBest _ prev; }; }; prev _ l; ENDLOOP; IF prevBest = NIL THEN { item.instance _ entry.servers.first; entry.servers _ entry.servers.rest; } ELSE { item.instance _ best.first; prevBest.rest _ best.rest; }; <> <> item.serverAssigned _ TRUE; item.waiting _ validating; RETURN; }; }; done => { item.serverAssigned _ TRUE; item.waiting _ validating; RETURN; }; timedOut => { RETURN; }; ENDCASE; IF firstTime THEN {forceReBuildQueuing _ TRUE}; firstTime _ FALSE; WAIT entry.changed; ENDLOOP; }; removeQueueItem: ENTRY PROC [entry: QueueEntry, item: WaitingRequest] = { ENABLE UNWIND => NULL; BROADCAST entry.changed; IF entry.waiters = NIL THEN RETURN; -- actually, this is a bug IF entry.waiters.first = item THEN {entry.waiters _ entry.waiters.rest; RETURN}; FOR l: LIST OF WaitingRequest _ entry.waiters, l.rest UNTIL l.rest = NIL DO IF l.rest.first = item THEN {l.rest _ l.rest.rest; RETURN}; ENDLOOP; }; MightAcceptQueuedCommand: PUBLIC ENTRY PROC [serverMachineAddress: RPC.ShortROPE, commandName: Rope.ROPE] = { ENABLE UNWIND => NULL; item: WaitingRequest _ NIL; FOR queue: LIST OF QueueEntry _ WaitingQueue, queue.rest UNTIL queue = NIL DO entry: QueueEntry = queue.first; IF Rope.Equal[commandName, entry.service, FALSE] THEN { IF entry.waiters = NIL OR entry.waiters.first.waiting # true THEN { -- nobody waiting now or top waiter is busy IF entry.servers = NIL THEN entry.servers _ CONS[serverMachineAddress, NIL] ELSE FOR l: LIST OF ROPE _ entry.servers, l.rest UNTIL l = NIL DO IF Rope.Equal[l.first , serverMachineAddress, FALSE] THEN EXIT; IF l.rest = NIL THEN {l.rest _ CONS[serverMachineAddress, NIL]; EXIT}; ENDLOOP; BROADCAST entry.changed; EXIT; } ELSE { entry.waiters.first.waiting _ done; entry.waiters.first.instance _ serverMachineAddress; BROADCAST entry.changed; EXIT; }; }; ENDLOOP; }; IsStatusBetter: PROC [status: ServerStatus, bestStatus: ServerStatus] RETURNS [ itsBetter: BOOL _ FALSE] = { SELECT TRUE FROM bestStatus = NIL => itsBetter _ TRUE; ((3*bestStatus.reclamationRate) > status.reclamationRate) AND (bestStatus.reclamationRate > 5) AND (status.FOM < 2*bestStatus.FOM )=> itsBetter _ TRUE; ENDCASE => { IF status.FOM < bestStatus.FOM THEN itsBetter _ TRUE; }; }; locked: BOOL _ FALSE; lockCondition: CONDITION; Lock: ENTRY PROC = { WHILE locked DO WAIT lockCondition; ENDLOOP; locked _ TRUE; }; UnLock: ENTRY PROC = { locked _ FALSE; NOTIFY lockCondition }; <> SetGrabController: Commander.CommandProc = { ComputeControllerInternal.GrabController _ TRUE; }; <> JustAProcessThatYouCanFreeze: PROC = { DO Process.Pause[200]; ENDLOOP; }; <> TRUSTED {Process.Detach[FORK JustAProcessThatYouCanFreeze[]];}; Commander.Register[key: "///Commands/SummonerBecomeController", proc: SetGrabController, doc: "Try to force the Compute Server Controller to this machine"]; TRUSTED {Process.InitializeCondition[@NoticeNewPackageCondition, Process.SecondsToTicks[1]];}; serverInterfaceCache _ NEW[serverInterfaceArray]; END. <<>> <> <> <> <> <> <<>>