DIRECTORY AMProcess, BasicTime, Commander, ComputeControllerInternal, ComputeServerControl, ComputeServerController, ComputeServerControllerRpcControl, ComputeServer, ComputeServerInternal, ComputeServerRpcControl, DFUtilities, FS, GVBasics, GVNames, IO, PrincOps, PrincOpsUtils, Process, PupDefs, PupTypes, Real, Rope, RPC, SummonerControllerControl, SymTab, UserCredentials, UserProfile, WorldVM; ComputeControllerImpl: CEDAR MONITOR IMPORTS AMProcess, BasicTime, Commander, ComputeControllerInternal, ComputeServerControl, ComputeServerControllerRpcControl, ComputeServerInternal, ComputeServerRpcControl, DFUtilities, FS, GVNames, IO, Process, Real, Rope, RPC, SymTab, UserCredentials, UserProfile, WorldVM EXPORTS ComputeControllerInternal, ComputeServerController, SummonerControllerControl = BEGIN ROPE: TYPE = Rope.ROPE; NoticeNewPackageCondition: CONDITION; InterfaceExported: BOOL _ FALSE; ControllerEnabled: BOOL _ FALSE; LatentControllerActive: BOOL _ TRUE; IAmTheController: PUBLIC BOOL _ FALSE; ControllerMovingAway: PUBLIC BOOL _ FALSE; GrabController: PUBLIC BOOL _ FALSE; TimeIBecameTheController: PUBLIC BasicTime.GMT _ BasicTime.nullGMT; ControllerUp: BOOL _ TRUE; ControllerGVName: ROPE _ NIL; ControllerError: TYPE = {none, import, callFailed}; LastControllerError: ControllerError _ none; LastImportFailure: RPC.ImportFailure; LastCallFailure: RPC.CallFailure; ControllerFailedTime: BasicTime.GMT; LastReBuildQueueTime: BasicTime.GMT _ BasicTime.nullGMT; ServerStatusRecord: TYPE = ComputeControllerInternal.ServerStatusRecord; ServerStatus: TYPE = REF ServerStatusRecord; ServerStatusList: PUBLIC LIST OF ServerStatus _ NIL; BrokenCommandTable: SymTab.Ref; -- knowing a command, find where its broken ExtraCommandTable: SymTab.Ref; -- knowing a non-standard command, find who can run it MachineToStatusTable: SymTab.Ref; -- knowing a machine's name, find its status; name is of the form "Hornet" or "3#35#" BrokenCommandObject: TYPE = RECORD [ serverMachineName: RPC.ShortROPE, versions: LIST OF ROPE _ NIL ]; BrokenCommand: TYPE = REF BrokenCommandObject; ExtraCommandObject: TYPE = RECORD [ serverMachineName: RPC.ShortROPE, versions: LIST OF ROPE _ NIL ]; ExtraCommand: TYPE = REF ExtraCommandObject; forceReBuildQueuing: BOOL _ FALSE; WaitingQueue: LIST OF QueueEntry _ NIL; QueueEntry: TYPE = REF QueueEntryRec; QueueEntryRec: TYPE = RECORD [ service: ROPE _ NIL, doQueueing: BOOL _ FALSE, waiters: LIST OF WaitingRequest _ NIL, servers: LIST OF ROPE _ NIL, changed: CONDITION ]; WaitingRequest: TYPE = REF WaitingRequestRec; WaitingRequestRec: TYPE = RECORD [ process: PROCESS, timeout: BasicTime.GMT, waiting: waitType _ init, version: ROPE, clientMachineName: RPC.ShortROPE, streamPupAddress: PupDefs.PupAddress, needListener: BOOL, serverAssigned: BOOL _ FALSE, response: ComputeServer.AskResponce, instance: RPC.ShortROPE, serverPupAddress: PupDefs.PupAddress ]; waitType: TYPE = {init, true, validating, done, timedOut}; serverInterfaceItem: TYPE = RECORD [ serverInstance: ROPE _ NIL, interface: ComputeServerRpcControl.InterfaceRecord _ NIL, lastUsed: BasicTime.GMT _ BasicTime.earliestGMT ]; serverInterfaceCacheSize: INT = 20; serverInterfaceArray: TYPE = ARRAY [0..serverInterfaceCacheSize) OF serverInterfaceItem; serverInterfaceCache: REF serverInterfaceArray; CheckState: TYPE = {checking, aborted, done}; CheckItemProcessObj: TYPE = RECORD[ check: CheckState _ checking, communicationsOK: BOOL _ TRUE, askFound: ComputeServer.AskResponce, serverPupAddress: PupDefs.PupAddress, errMsg: ROPE ]; NoOfLatentSummonerControllerProcesses: INT _ 0 ; StartUpController: PUBLIC PROC [controllerName: ROPE _ NIL, forceSelfAsPrimaryController: BOOL _ FALSE] RETURNS [] = { locked: BOOL _ FALSE; { ENABLE UNWIND => IF locked THEN UnLock[]; user, password: ROPE; ControllerGVName _ IF Rope.IsEmpty[controllerName] THEN UserProfile.Token["Summoner.ControllerName"] ELSE controllerName; IF Rope.IsEmpty[ControllerGVName] THEN ControllerGVName _ "PaloAlto.summoner" ; ControllerEnabled _ TRUE; LatentControllerActive _ TRUE; ControllerUp _ TRUE; Lock[]; locked _ TRUE; IF ~InterfaceExported THEN { info: GVNames.ConnectInfo; connect: GVBasics.Connect; [info: info, connect: connect ] _ GVNames.GetConnect[ControllerGVName]; IF info = individual OR info = group THEN { IF forceSelfAsPrimaryController OR Rope.Equal[connect, ComputeServerControl.MyNetAddressRope] OR Rope.Equal[UserProfile.Token["Summoner.PrimaryController"], ComputeServerControl.MyNetAddressRope] THEN { IAmTheController _ TRUE; TimeIBecameTheController _ BasicTime.Now[]; [user, password] _ UserCredentials.Get[]; InterfaceExported _ TRUE; BrokenCommandTable _ SymTab.Create[mod: 59, case: FALSE]; ExtraCommandTable _ SymTab.Create[mod: 59, case: FALSE]; MachineToStatusTable _ SymTab.Create[mod: 59, case: FALSE]; ComputeServerControllerRpcControl.ExportInterface[ interfaceName: [ type: "ComputeServerController.summoner", instance: ControllerGVName, version: [1,1]], user: user, password: RPC.MakeKey[password] ! RPC.ExportFailed => { InterfaceExported _ FALSE; IAmTheController _ FALSE; CONTINUE; }; ]; }; }; }; UnLock[]; locked _ FALSE; TRUSTED {Process.Detach[FORK LatentSummonerController[]] ;}; }; }; ControllerCannnotBeImported: PUBLIC PROC [reason: RPC.ImportFailure] = { LastControllerError _ import; LastImportFailure _ reason; NoticeControllerIsDown[]; }; ControllerCallFailed: PUBLIC PROC [reason: RPC.CallFailure] = { LastControllerError _ callFailed; LastCallFailure _ reason; NoticeControllerIsDown[]; }; NoticeControllerIsDown: PROC = { IF ControllerUp THEN ControllerFailedTime _ BasicTime.Now[]; ControllerUp _ FALSE; }; NoticeControllerIsUp: PUBLIC PROC = { ControllerUp _ TRUE; }; TryThisController: PROC RETURNS [importedOK: BOOL _ TRUE, controllerInterface: ComputeServerControllerRpcControl.InterfaceRecord] = { controllerInterface _ ComputeServerControllerRpcControl.ImportNewInterface[ interfaceName: [ type: "ComputeServerController.summoner", instance: ControllerGVName, version: [1,1]] ! RPC.ImportFailed => { importedOK _ FALSE; CONTINUE; }; ]; }; LatentSummonerController: PROC [] = { MakeSureThereIsOnlyOne: ENTRY PROC RETURNS [exit: BOOL _ FALSE] = { IF NoOfLatentSummonerControllerProcesses = 0 THEN NoOfLatentSummonerControllerProcesses _ 1 ELSE RETURN[TRUE]; }; locked: BOOL _ FALSE; IF MakeSureThereIsOnlyOne[] THEN RETURN; DO ENABLE UNWIND => IF locked THEN UnLock[]; IF ~LatentControllerActive THEN EXIT; IF ~ControllerUp OR GrabController THEN { -- controller is down or I should grab it - see if I should acquire the controller function info: GVNames.ConnectInfo; connect: GVBasics.Connect; ControllerMovingAway _ FALSE; IF GrabController OR ~TryThisController[].importedOK THEN { -- down but import will work soon downTime: INT = BasicTime.Period[ControllerFailedTime, BasicTime.Now[]]; [info: info, connect: connect ] _ GVNames.GetConnect[ControllerGVName]; IF GrabController OR downTime > 83 OR ((downTime > 13 AND (info = individual OR info = group) AND Rope.Equal[connect, ComputeServerControl.MyNetAddressRope]) OR Rope.Equal[UserProfile.Token["Summoner.PrimaryController"], ComputeServerControl.MyNetAddressRope]) THEN { user, password: ROPE; Lock[]; locked _ TRUE; GrabController _ FALSE; IAmTheController _ TRUE; TimeIBecameTheController _ BasicTime.Now[]; [user, password] _ UserCredentials.Get[]; IF InterfaceExported THEN ComputeServerControllerRpcControl.UnexportInterface[]; InterfaceExported _ TRUE; BrokenCommandTable _ SymTab.Create[mod: 59, case: FALSE]; ExtraCommandTable _ SymTab.Create[mod: 59, case: FALSE]; MachineToStatusTable _ SymTab.Create[mod: 59, case: FALSE]; ComputeServerControllerRpcControl.ExportInterface[ interfaceName: [ type: "ComputeServerController.summoner", instance: ControllerGVName, version: [1,1]], user: user, password: RPC.MakeKey[password] ! RPC.ExportFailed => { InterfaceExported _ FALSE; IAmTheController _ FALSE; CONTINUE; }; ]; UnLock[]; locked _ FALSE; }; }; } ELSE { -- Controller is up. If I think that I am the controller, but grapevine does not think so, then fix the situation IF IAmTheController THEN { info: GVNames.ConnectInfo; connect: GVBasics.Connect; [info: info, connect: connect ] _ GVNames.GetConnect[ControllerGVName]; IF (info = individual OR info = group) THEN { IF ~Rope.Equal[connect, ComputeServerControl.MyNetAddressRope] THEN { -- Grapevine says I am not the Controller triedControllerInterface: ComputeServerControllerRpcControl.InterfaceRecord; tryDifferentController: BOOL _ FALSE; importedOK: BOOL; [importedOK: importedOK, controllerInterface: triedControllerInterface] _ TryThisController[]; IF importedOK THEN [tryDifferentController: tryDifferentController] _ triedControllerInterface.GetSomeInfo[ ! RPC.CallFailed => { importedOK _ FALSE; CONTINUE; }; ]; IF importedOK AND ~tryDifferentController THEN { -- new controller answers IF ServerStatusList = NIL THEN { Lock[]; locked _ TRUE; IAmTheController _ FALSE; ControllerMovingAway _ FALSE; ComputeServerControllerRpcControl.UnexportInterface[]; InterfaceExported _ FALSE; ServerStatusList _ NIL; BrokenCommandTable _ NIL; ExtraCommandTable _ NIL; UnLock[]; locked _ FALSE; } ELSE { ControllerMovingAway _ TRUE; }; } ELSE { -- new controller does not answer or thinks it is moving too ControllerMovingAway _ FALSE; IF ~importedOK THEN { -- new controller does not answer  re-assert my controllership user, password: ROPE; [user, password] _ UserCredentials.Get[]; [] _ GVNames.SetConnect[user: user, password: RPC.MakeKey[password], individual: ControllerGVName, connect: ComputeServerControl.MyNetAddressRope]; }; }; } ELSE { -- Grapevine says I am the Controller ControllerMovingAway _ FALSE; }; } ELSE { -- NOT info = individual OR info = group (grapevine troubles) ControllerMovingAway _ FALSE; }; TossOldStatus[]; ReBuildQueuing[]; }; }; Process.Pause[Process.SecondsToTicks[127]]; ENDLOOP; }; ShutDownController: PUBLIC PROC [] RETURNS [] = { IF InterfaceExported THEN ComputeServerControllerRpcControl.UnexportInterface[]; InterfaceExported _ FALSE; ControllerEnabled _ FALSE; ServerStatusList _ NIL; BrokenCommandTable _ NIL; ExtraCommandTable _ NIL; MachineToStatusTable _ NIL; LatentControllerActive _ FALSE; ControllerUp _ FALSE; IAmTheController _ FALSE; }; TossOldStatus: ENTRY PROC [] RETURNS [] = { ENABLE UNWIND => NULL; statusList: LIST OF ServerStatus; lastStatusList: LIST OF ServerStatus _ NIL; status: ServerStatus _ NIL; now: BasicTime.GMT _ BasicTime.Now[]; FOR statusList _ ServerStatusList, statusList.rest UNTIL statusList = NIL DO status _ statusList.first; IF BasicTime.Period[status.timeOfStatus, now] > 30 THEN { [] _ SymTab.Delete[x: MachineToStatusTable, key: status.serverMachineName]; [] _ SymTab.Delete[x: MachineToStatusTable, key: status.serverMachinePupAddress]; IF lastStatusList = NIL THEN ServerStatusList _ statusList.rest ELSE lastStatusList.rest _ statusList.rest; }; lastStatusList _ statusList; ENDLOOP; }; ReBuildQueuing: PROC [] RETURNS [] = { now: BasicTime.GMT _ BasicTime.Now[]; IF forceReBuildQueuing OR BasicTime.Period[LastReBuildQueueTime, now] > 300 THEN { setNewQueueing[]; LastReBuildQueueTime _ now; }; forceReBuildQueuing _ FALSE; }; GetSomeInfo: PUBLIC ENTRY PROC RETURNS [error: BOOL _ FALSE, tryDifferentController: BOOL _ FALSE, msg: ROPE _ NIL, serverList: LIST OF ROPE _ NIL, bestFOM: REAL _ 0.0] = { ENABLE UNWIND => NULL; bestStatus: ServerStatus _ NIL; statusList: LIST OF ServerStatus; IF ~IAmTheController THEN RETURN [ FALSE, TRUE, "controller has moved", NIL, 0.0]; FOR statusList _ ServerStatusList, statusList.rest UNTIL statusList = NIL DO status: ServerStatus = statusList.first; perCentBusy: INT _ Real.Round[status.CPULoad*100.0]; IF Rope.Equal[status.serverMachineName, ComputeServerInternal.myHostName] THEN { serverList _ CONS[IO.PutFR["%g (Controller, %g%% busy)", IO.rope[status.serverMachineName], IO.int[perCentBusy]], serverList]; } ELSE serverList _ CONS[IO.PutFR["%g (%g%%)", IO.rope[status.serverMachineName], IO.int[perCentBusy]], serverList]; IF IsStatusBetter[status, bestStatus] THEN bestStatus _ status; ENDLOOP; IF bestStatus = NIL THEN RETURN [FALSE, ControllerMovingAway, "no servers up", serverList, 1.0]; RETURN [FALSE, ControllerMovingAway, NIL, serverList, bestStatus.FOM]; }; BestServerStats: PUBLIC ENTRY PROC RETURNS[instance: RPC.ShortROPE, FOM: REAL] = { ENABLE UNWIND => NULL; bestStatus: ServerStatus _ NIL; statusList: LIST OF ServerStatus; FOR statusList _ ServerStatusList, statusList.rest UNTIL statusList = NIL DO status: ServerStatus = statusList.first; IF IsStatusBetter[status, bestStatus] THEN bestStatus _ status; ENDLOOP; IF bestStatus = NIL THEN RETURN [NIL, 1.0]; RETURN [bestStatus.serverMachineName, bestStatus.FOM]; }; bumpBestServerStats: ENTRY PROC [instance: RPC.ShortROPE] = { ENABLE UNWIND => NULL; statusFound: BOOL; val: REF ANY; status: ServerStatus; [found: statusFound, val: val] _ SymTab.Fetch[x: MachineToStatusTable, key: instance]; IF statusFound THEN { status _ NARROW[val]; status.CPULoad _ MIN[ 1.0, status.CPULoad + 0.5]; status.reclamationRate _ status.reclamationRate + 30 ; status.FOM _ figureOfMerit[status]; }; }; FindService: PUBLIC PROC [service: ROPE] RETURNS [found: BOOL, instance: RPC.ShortROPE] = { [found, instance] _ InnerFindService[service]; }; FindServiceWithQueueing: PUBLIC PROCEDURE [service: ROPE, version: RPC.ShortROPE, timeToWait: INT, clientMachineName: RPC.ShortROPE, streamPupAddress: PupDefs.PupAddress, needListener: BOOL] RETURNS [found: ComputeServer.AskResponce, instance: RPC.ShortROPE, serverPupAddress: PupDefs.PupAddress, errMsg: ROPE] = { entry: QueueEntry; item: WaitingRequest _ NIL; tryForServer: PROC [] = { DO commOK: BOOL; foundBroken: BOOL; brokenVal: REF ANY; brokenList, brokenListLoop: LIST OF BrokenCommand; askFound: ComputeServer.AskResponce; queueItemReady[entry, item]; IF item.waiting = timedOut THEN EXIT; [found: foundBroken, val: brokenVal] _ SymTab.Fetch[x: BrokenCommandTable, key: service]; brokenList _ NARROW[brokenVal]; FOR brokenListLoop _ brokenList, brokenListLoop.rest UNTIL brokenListLoop = NIL DO IF Rope.Equal[brokenListLoop.first.serverMachineName, item.instance, FALSE] THEN { item.serverAssigned _ FALSE; item.waiting _ true; EXIT; }; ENDLOOP; IF ~item.serverAssigned THEN LOOP; [communicationsOK: commOK, askFound: askFound, serverPupAddress: serverPupAddress] _ CheckItem [entry, item]; IF ~commOK OR (askFound # foundOK) THEN { item.serverAssigned _ FALSE; item.waiting _ true; LOOP; }; item.response _ askFound; item.serverPupAddress _ serverPupAddress; SELECT askFound FROM notFound => { removeQueueItem[entry, item]; errMsg _ NIL; RETURN; }; foundButTooBusy => { LOOP; }; foundOK => { removeQueueItem[entry, item]; RETURN; }; ENDCASE; ENDLOOP; removeQueueItem[entry, item]; }; [entry, item] _ findOrCreateQueueEntry[service]; item.version _ version; item.clientMachineName _ clientMachineName; item.streamPupAddress _ streamPupAddress; item.needListener _ needListener; IF entry.doQueueing THEN { -- queueing already being done item.process _ LOOPHOLE[Process.GetCurrent[]]; item.timeout _ BasicTime.Update[BasicTime.Now[], timeToWait]; item.waiting _ true; tryForServer[]; bumpBestServerStats[item.instance]; RETURN[found: item.response, instance: item.instance, serverPupAddress: item.serverPupAddress, errMsg: NIL]; } ELSE { -- queueing not started serversTried: LIST OF RPC.ShortROPE _ NIL; DO commOK: BOOL; serviceFound: BOOL; instance: RPC.ShortROPE; askFound: ComputeServer.AskResponce; entry.doQueueing _ TRUE; setNewQueueing[]; [serviceFound, instance] _ InnerFindService[service]; item.instance _ instance; IF ~serviceFound THEN { found _ notFound; removeQueueItem[entry, item]; RETURN; }; FOR oldServers: LIST OF RPC.ShortROPE _ serversTried, oldServers.rest UNTIL oldServers = NIL DO IF Rope.Equal[oldServers.first, instance, FALSE] THEN { serversTried _ NIL; -- help garbage collector entry.doQueueing _ TRUE; setNewQueueing[]; item.process _ LOOPHOLE[Process.GetCurrent[]]; item.timeout _ BasicTime.Update[BasicTime.Now[], timeToWait]; item.waiting _ true; tryForServer[]; RETURN[found: item.response, instance: item.instance, serverPupAddress: item.serverPupAddress, errMsg: NIL]; }; ENDLOOP; serversTried _ CONS[instance, serversTried]; [communicationsOK: commOK, askFound: askFound, serverPupAddress: serverPupAddress] _ CheckItem[entry, item]; IF ~commOK THEN LOOP; SELECT askFound FROM notFound => { removeQueueItem[entry, item]; RETURN [askFound, instance, serverPupAddress, NIL]; }; foundButTooBusy => { LOOP; }; foundOK => { removeQueueItem[entry, item]; RETURN [askFound, instance, serverPupAddress, NIL]; }; ENDCASE; ENDLOOP; }; }; CheckItem: PROC [entry: QueueEntry, item: WaitingRequest] RETURNS [communicationsOK: BOOL _ TRUE, askFound: ComputeServer.AskResponce, serverPupAddress: PupDefs.PupAddress, errMsg: ROPE] = { checkProcess: PROCESS; checkObject: REF CheckItemProcessObj _ NEW[CheckItemProcessObj]; startTime: BasicTime.GMT _ BasicTime.Now[]; checkProcess _ FORK CheckItemProcess[entry, item, checkObject]; DO IF BasicTime.Period[startTime, BasicTime.Now[]] > 9 THEN TRUSTED { checkProcessTV: AMProcess.Process = AMProcess.PSBIToTV[world: WorldVM.LocalWorld[], psbi: PrincOpsUtils.ProcessToPsbIndex[LOOPHOLE[checkProcess]]]; Process.Detach[checkProcess]; AMProcess.Freeze[LIST[checkProcessTV]]; AMProcess.Abort[checkProcessTV]; AMProcess.Thaw[LIST[checkProcessTV]]; checkObject.check _ aborted; RETURN[FALSE, notFound, PupTypes.fillInPupAddress, ""]; }; IF checkObject.check = done THEN TRUSTED { [] _ JOIN checkProcess; RETURN[checkObject.communicationsOK, checkObject.askFound, checkObject.serverPupAddress, checkObject.errMsg]; }; Process.Pause[2]; ENDLOOP; }; CheckItemProcess: PROC [entry: QueueEntry, item: WaitingRequest, checkObject: REF CheckItemProcessObj] = { ENABLE ABORTED => GOTO aborted; communicationsOK: BOOL _ TRUE; askFound: ComputeServer.AskResponce; serverPupAddress: PupDefs.PupAddress; errMsg: ROPE; firstTime: BOOL _ TRUE; serverInstance: ROPE _ item.instance; DO serverInterface: ComputeServerRpcControl.InterfaceRecord; serverInterface _ getServerInterfaceFromCache[serverInstance]; IF serverInterface = NIL THEN {communicationsOK _ FALSE; RETURN}; [found: askFound, serverPupAddress: serverPupAddress, errMsg: errMsg] _ serverInterface.AskForService[service: entry.service, version: item.version, clientMachineName: item.clientMachineName, streamPupAddress: item.streamPupAddress, needListener: item.needListener ! RPC.CallFailed => { [] _ deleteServerInterfaceFromCache[serverInterface]; IF firstTime THEN {firstTime _ FALSE; LOOP;}; communicationsOK _ FALSE; EXIT; }; ]; EXIT; ENDLOOP; checkObject.check _ done; checkObject.communicationsOK _ communicationsOK; checkObject.askFound _ askFound; checkObject.serverPupAddress _ serverPupAddress; checkObject.errMsg _ errMsg; EXITS aborted => { checkObject.check _ aborted; RETURN; }; }; InnerFindService: ENTRY PROC [service: ROPE] RETURNS [found: BOOL, instance: RPC.ShortROPE] = { ENABLE UNWIND => NULL; statusList: LIST OF ServerStatus; bestStatus: ServerStatus _ NIL; foundCommand: BOOL; [found: foundCommand] _ SymTab.Fetch[x: ComputeServerInternal.CommandTable, key: service]; IF foundCommand THEN { foundBroken: BOOL; brokenVal: REF ANY; brokenList: LIST OF BrokenCommand; [found: foundBroken, val: brokenVal] _ SymTab.Fetch[x: BrokenCommandTable, key: service]; brokenList _ NARROW[brokenVal]; FOR statusList _ ServerStatusList, statusList.rest UNTIL statusList = NIL DO skipIt: BOOL _ FALSE; serverMachineName: ROPE _ statusList.first.serverMachineName; serverMachinePupAddress: ROPE _ statusList.first.serverMachinePupAddress; brokenListLoop: LIST OF BrokenCommand; FOR brokenListLoop _ brokenList, brokenListLoop.rest UNTIL brokenListLoop = NIL DO IF Rope.Equal[brokenListLoop.first.serverMachineName, serverMachineName, FALSE] OR Rope.Equal[brokenListLoop.first.serverMachineName, serverMachinePupAddress, FALSE] THEN { skipIt _ TRUE; EXIT; }; ENDLOOP; IF skipIt THEN LOOP; IF IsStatusBetter[statusList.first, bestStatus] THEN bestStatus _ statusList.first; ENDLOOP; } ELSE { foundExtra: BOOL; extraVal: REF ANY; extraListLoop: LIST OF ExtraCommand; [found: foundExtra, val: extraVal] _ SymTab.Fetch[x: ExtraCommandTable, key: service]; FOR extraListLoop _ NARROW[extraVal], extraListLoop.rest UNTIL extraListLoop = NIL DO statusFound: BOOL; val: REF ANY; status: ServerStatus; [found: statusFound, val: val] _ SymTab.Fetch[x: MachineToStatusTable, key: extraListLoop.first.serverMachineName]; IF statusFound THEN { status _ NARROW[val]; IF IsStatusBetter[status, bestStatus] THEN bestStatus _ status; }; ENDLOOP; }; IF bestStatus = NIL THEN { RETURN[FALSE, ""] } ELSE { bestStatus.CPULoad _ MIN[ 1.0, bestStatus.CPULoad + 0.5]; bestStatus.reclamationRate _ bestStatus.reclamationRate + 30 ; bestStatus.FOM _ figureOfMerit[bestStatus]; RETURN[TRUE, bestStatus.serverMachinePupAddress]; }; }; figureOfMerit: PROC [status: ServerStatus] RETURNS [FOM: REAL] = { adjustedBusyCPU: REAL = IF status.machineType = dorado THEN status.CPULoad ELSE 0.8 + (status.CPULoad/5.0); thrashRate: REAL _ status.reclamationRate/90.0; IF thrashRate > 1.0 THEN thrashRate _ 1.0; FOM _ Real.SqRt[thrashRate*thrashRate + adjustedBusyCPU*adjustedBusyCPU]; }; CommandUnavailable: PUBLIC ENTRY PROC [serverMachineName: RPC.ShortROPE, commandName: ROPE, version: RPC.ShortROPE] = { ENABLE UNWIND => NULL; found: BOOL; oldList: LIST OF BrokenCommand; newBroken: BrokenCommand; newList: LIST OF BrokenCommand; machineList: LIST OF BrokenCommand; val: REF ANY; [found: found, val: val] _ SymTab.Fetch[x: BrokenCommandTable, key: commandName]; IF found THEN { oldList _ NARROW[val]; FOR machineList _ oldList, machineList.rest UNTIL machineList = NIL DO broken: BrokenCommand = machineList.first; IF Rope.Equal[broken.serverMachineName, serverMachineName] THEN { versionList: LIST OF ROPE; FOR versionList _ broken.versions, versionList.rest UNTIL versionList = NIL DO vers: ROPE _ NARROW[versionList.first]; IF Rope.Equal[vers, version] THEN RETURN; ENDLOOP; broken.versions _ CONS[version, broken.versions]; RETURN; }; ENDLOOP; newBroken _ NEW[BrokenCommandObject _ [serverMachineName: serverMachineName, versions: LIST[version]]]; newList _ CONS[newBroken, NARROW[val]]; [] _ SymTab.Store[x: BrokenCommandTable, key: commandName, val: newList]; } ELSE { newBroken _ NEW[BrokenCommandObject _ [serverMachineName: serverMachineName, versions: LIST[version]]]; newList _ LIST[newBroken]; [] _ SymTab.Store[x: BrokenCommandTable, key: commandName, val: newList]; }; }; ExtraCommandAvailable: PUBLIC ENTRY PROC [serverMachineName: RPC.ShortROPE, commandName: ROPE, version: RPC.ShortROPE] = { ENABLE UNWIND => NULL; found: BOOL; oldList: LIST OF ExtraCommand; newExtra: ExtraCommand; newList: LIST OF ExtraCommand; machineList: LIST OF ExtraCommand; val: REF ANY; [found: found, val: val] _ SymTab.Fetch[x: BrokenCommandTable, key: commandName]; IF found THEN { [] _ SymTab.Delete[x: BrokenCommandTable, key: commandName]; }; [found: found, val: val] _ SymTab.Fetch[x: ExtraCommandTable, key: commandName]; IF found THEN { oldList _ NARROW[val]; FOR machineList _ oldList, machineList.rest UNTIL machineList = NIL DO extra: ExtraCommand = machineList.first; IF Rope.Equal[extra.serverMachineName, serverMachineName] THEN { versionList: LIST OF ROPE; FOR versionList _ extra.versions, versionList.rest UNTIL versionList = NIL DO vers: ROPE _ NARROW[versionList.first]; IF Rope.Equal[vers, version] THEN RETURN; ENDLOOP; extra.versions _ CONS[version, extra.versions]; RETURN; }; ENDLOOP; newExtra _ NEW[ExtraCommandObject _ [serverMachineName: serverMachineName, versions: LIST[version]]]; newList _ CONS[newExtra, NARROW[val]]; [] _ SymTab.Store[x: ExtraCommandTable, key: commandName, val: newList]; } ELSE { newExtra _ NEW[ExtraCommandObject _ [serverMachineName: serverMachineName, versions: LIST[version]]]; newList _ LIST[newExtra]; [] _ SymTab.Store[x: ExtraCommandTable, key: commandName, val: newList]; }; }; NoticeNewPackage: PUBLIC ENTRY PROC [package: RPC.ShortROPE] RETURNS [error: BOOL _ FALSE, tryDifferentController: BOOL _ FALSE, msg: ROPE _ NIL]= { ENABLE UNWIND => NULL; statusList: LIST OF ServerStatus; packageListInStream: IO.STREAM; packageListGlobalName: ROPE _ NIL; packageListOutStream: IO.STREAM; packageListTempName: ROPE _ NIL; foundItem: BOOL _ FALSE; fullFName: ROPE; fsErrMsg: ROPE; cp: FS.ComponentPositions; dirOmitted: BOOLEAN; dfPrefix: ROPE; shortPackage: ROPE _ NIL; shortPackageAndDF: ROPE _ NIL; packageDFDate: BasicTime.GMT _ BasicTime.nullGMT; sawDir: BOOL _ FALSE; blankAfterDir: BOOL _ FALSE; lastDir: REF DFUtilities.DirectoryItem _ NIL; wroteDir: BOOL _ FALSE; defaultDirectory: REF DFUtilities.DirectoryItem _ NIL ; whiteItem: REF DFUtilities.WhiteSpaceItem _ NEW[DFUtilities.WhiteSpaceItem _ [1]]; writeUpdatedItem: PROC[] = { file: REF DFUtilities.FileItem; file _ NEW[DFUtilities.FileItem _ [ name: shortPackageAndDF, -- a short name date: [explicit, packageDFDate], verifyRoot: FALSE]]; IF lastDir = NIL OR ~Rope.Equal[lastDir.path1, dfPrefix, FALSE] THEN { dir: REF DFUtilities.DirectoryItem _ NEW [DFUtilities.DirectoryItem _ [ path1: dfPrefix, path2: NIL, path2IsCameFrom: FALSE, exported: FALSE, readOnly: FALSE ]]; DFUtilities.WriteItemToStream[out: packageListOutStream, item: whiteItem]; DFUtilities.WriteItemToStream[out: packageListOutStream, item: dir]; DFUtilities.WriteItemToStream[out: packageListOutStream, item: whiteItem]; lastDir _ dir; }; DFUtilities.WriteItemToStream[out: packageListOutStream, item: file]; }; DoOneItem: DFUtilities.ProcessItemProc = { errors, warnings, filesActedUpon: INT _ 0; WITH item SELECT FROM dir: REF DFUtilities.DirectoryItem => { IF lastDir = NIL OR ~Rope.Equal[lastDir.path1, dir.path1, FALSE] THEN { wroteDir _ FALSE; lastDir _ dir; }; }; comment: REF DFUtilities.CommentItem => { DFUtilities.WriteItemToStream[out: packageListOutStream, item: comment]; }; white: REF DFUtilities.WhiteSpaceItem => { }; file: REF DFUtilities.FileItem => { shortName: ROPE = Rope.Substr[file.name, 0, Rope.Index[s1: file.name, s2: "!"]]; packageDFDate: BasicTime.GMT _ BasicTime.nullGMT; currentDFDate: BasicTime.GMT _ BasicTime.nullGMT; IF ~wroteDir THEN { wroteDir _ TRUE; IF lastDir = NIL THEN lastDir _ defaultDirectory; DFUtilities.WriteItemToStream[out: packageListOutStream, item: whiteItem]; DFUtilities.WriteItemToStream[out: packageListOutStream, item: lastDir]; DFUtilities.WriteItemToStream[out: packageListOutStream, item: whiteItem]; }; IF Rope.Equal[shortPackageAndDF, shortName, FALSE] THEN { writeUpdatedItem[]; foundItem _ TRUE; } ELSE { DFUtilities.WriteItemToStream[out: packageListOutStream, item: file]; }; }; ENDCASE; }; { ENABLE UNWIND => { IF packageListInStream # NIL THEN packageListInStream.Close[! FS.Error => CONTINUE]; IF packageListOutStream # NIL THEN packageListOutStream.Close[! FS.Error => CONTINUE]; FS.Delete[name: packageListTempName ! FS.Error => CONTINUE]; }; IF package.IsEmpty[] THEN RETURN [ TRUE, FALSE, "null package name"]; IF ~ComputeControllerInternal.IAmTheController THEN RETURN [ FALSE, TRUE, "controller has moved"]; WHILE ComputeControllerInternal.IAmTheController AND BasicTime.Period[ComputeControllerInternal.TimeIBecameTheController, BasicTime.Now[]] < 22 DO WAIT NoticeNewPackageCondition; ENDLOOP; IF ~ComputeControllerInternal.IAmTheController THEN RETURN [ FALSE, TRUE, "controller has moved"]; [fullFName: fullFName, cp: cp, dirOmitted: dirOmitted] _ FS.ExpandName[name: package, wDir: NIL ! FS.Error => GOTO badPackageName ]; shortPackage _ fullFName.Substr[cp.base.start, cp.base.length]; shortPackageAndDF _ Rope.Concat[shortPackage, ".df"]; dfPrefix _ fullFName.Substr[0, cp.base.start]; defaultDirectory _ NEW [DFUtilities.DirectoryItem _ [ path1: dfPrefix, path2: NIL, path2IsCameFrom: FALSE, exported: FALSE, readOnly: FALSE ]]; [created: packageDFDate] _ FS.FileInfo[name: package, remoteCheck: TRUE ! FS.Error => GOTO noDFFile]; packageListGlobalName _ Rope.Concat[ComputeServerInternal.RemoteCommandDir, "PackageList"]; packageListInStream _ FS.StreamOpen[packageListGlobalName ! FS.Error => GOTO cantOpenPackageList]; packageListTempName _ Rope.Concat[ComputeServerInternal.LocalCommandDir, "PackageList.temp"]; packageListOutStream _ FS.StreamOpen[fileName: packageListTempName, accessOptions: $create]; DFUtilities.ParseFromStream[packageListInStream, DoOneItem ! DFUtilities.SyntaxError => GOTO syntaxErrorInPackageList]; IF ~foundItem THEN writeUpdatedItem[]; packageListInStream.Close[! FS.Error => CONTINUE]; packageListOutStream.Close[! FS.Error => CONTINUE]; [] _ FS.Copy[from: packageListTempName, to: packageListGlobalName, attach: FALSE ! FS.Error => { fsErrMsg _ error.explanation; GOTO cantCopyToServer}]; FS.Delete[name: packageListTempName ! FS.Error => CONTINUE]; IF ~ComputeControllerInternal.IAmTheController THEN RETURN [ FALSE, TRUE, "controller has moved"]; FOR statusList _ ComputeControllerInternal.ServerStatusList, statusList.rest UNTIL statusList = NIL DO statusList.first.newPackage _ TRUE; ENDLOOP; EXITS cantOpenPackageList => RETURN [ TRUE, FALSE, "controller could not open package list"]; noDFFile => RETURN [ TRUE, FALSE, "controller could not open specified df file - "]; syntaxErrorInPackageList => RETURN [ TRUE, FALSE, "controller could not parse the package list - call an implementor"]; badPackageName => RETURN [ TRUE, FALSE, "bad package df file name"]; cantCopyToServer => RETURN [ TRUE, FALSE, Rope.Concat["could not copy new package list to the file server because ", fsErrMsg]]; }; }; RemoveOldPackage: PUBLIC ENTRY PROC [package: RPC.ShortROPE] RETURNS [error: BOOL, tryDifferentController: BOOL, msg: Rope.ROPE] = { ENABLE UNWIND => NULL; statusList: LIST OF ServerStatus; packageListInStream: IO.STREAM; packageListGlobalName: ROPE _ NIL; packageListOutStream: IO.STREAM; packageListTempName: ROPE _ NIL; foundItem: BOOL _ FALSE; fullFName: ROPE; cp: FS.ComponentPositions; dirOmitted: BOOLEAN; dfPrefix: ROPE; shortPackage: ROPE _ NIL; shortPackageAndDF: ROPE _ NIL; wroteDir: BOOL _ FALSE; lastDir: REF DFUtilities.DirectoryItem _ NIL; defaultDirectory: REF DFUtilities.DirectoryItem _ NIL ; whiteItem: REF DFUtilities.WhiteSpaceItem _ NEW[DFUtilities.WhiteSpaceItem _ [1]]; DoOneItem: DFUtilities.ProcessItemProc = { errors, warnings, filesActedUpon: INT _ 0; WITH item SELECT FROM dir: REF DFUtilities.DirectoryItem => { IF lastDir = NIL OR ~Rope.Equal[lastDir.path1, dir.path1, FALSE] THEN { wroteDir _ FALSE; lastDir _ dir; }; }; comment: REF DFUtilities.CommentItem => { DFUtilities.WriteItemToStream[out: packageListOutStream, item: comment]; }; white: REF DFUtilities.WhiteSpaceItem => { DFUtilities.WriteItemToStream[out: packageListOutStream, item: white]; }; file: REF DFUtilities.FileItem => { shortName: ROPE = Rope.Substr[file.name, 0, Rope.Index[s1: file.name, s2: "!"]]; currentDFDate: BasicTime.GMT _ BasicTime.nullGMT; IF Rope.Equal[shortPackageAndDF, shortName, FALSE] THEN { foundItem _ TRUE; } ELSE { IF ~wroteDir THEN { wroteDir _ TRUE; IF lastDir = NIL THEN lastDir _ defaultDirectory; DFUtilities.WriteItemToStream[out: packageListOutStream, item: whiteItem]; DFUtilities.WriteItemToStream[out: packageListOutStream, item: lastDir]; DFUtilities.WriteItemToStream[out: packageListOutStream, item: whiteItem]; }; DFUtilities.WriteItemToStream[out: packageListOutStream, item: file]; }; }; ENDCASE; }; { ENABLE UNWIND => { IF packageListInStream # NIL THEN packageListInStream.Close[! FS.Error => CONTINUE]; IF packageListOutStream # NIL THEN packageListOutStream.Close[! FS.Error => CONTINUE]; FS.Delete[name: packageListTempName ! FS.Error => CONTINUE]; }; IF package.IsEmpty[] THEN RETURN [ TRUE, FALSE, "null package name"]; IF ~ComputeControllerInternal.IAmTheController THEN RETURN [ FALSE, TRUE, "controller has moved"]; WHILE ComputeControllerInternal.IAmTheController AND BasicTime.Period[ComputeControllerInternal.TimeIBecameTheController, BasicTime.Now[]] < 22 DO WAIT NoticeNewPackageCondition; ENDLOOP; IF ~ComputeControllerInternal.IAmTheController THEN RETURN [ FALSE, TRUE, "controller has moved"]; [fullFName: fullFName, cp: cp, dirOmitted: dirOmitted] _ FS.ExpandName[name: package, wDir: NIL ! FS.Error => GOTO badPackageName ]; shortPackage _ fullFName.Substr[cp.base.start, cp.base.length]; shortPackageAndDF _ Rope.Concat[shortPackage, ".df"]; dfPrefix _ fullFName.Substr[0, cp.base.start]; defaultDirectory _ NEW [DFUtilities.DirectoryItem _ [ path1: dfPrefix, path2: NIL, path2IsCameFrom: FALSE, exported: FALSE, readOnly: FALSE ]]; packageListGlobalName _ Rope.Concat[ComputeServerInternal.RemoteCommandDir, "PackageList"]; packageListInStream _ FS.StreamOpen[packageListGlobalName ! FS.Error => GOTO cantOpenPackageList]; packageListTempName _ Rope.Concat[ComputeServerInternal.LocalCommandDir, "PackageList.temp"]; packageListOutStream _ FS.StreamOpen[fileName: packageListTempName, accessOptions: $create]; DFUtilities.ParseFromStream[packageListInStream, DoOneItem ! DFUtilities.SyntaxError => GOTO syntaxErrorInPackageList]; packageListInStream.Close[! FS.Error => CONTINUE]; packageListOutStream.Close[! FS.Error => CONTINUE]; IF ~foundItem THEN { FS.Delete[name: packageListTempName ! FS.Error => CONTINUE]; RETURN [ TRUE, FALSE, "package not found"] }; [] _ FS.Copy[from: packageListTempName, to: packageListGlobalName, attach: FALSE]; FS.Delete[name: packageListTempName ! FS.Error => CONTINUE]; IF ~ComputeControllerInternal.IAmTheController THEN RETURN [ FALSE, TRUE, "controller has moved"]; FOR statusList _ ComputeControllerInternal.ServerStatusList, statusList.rest UNTIL statusList = NIL DO statusList.first.newPackage _ TRUE; ENDLOOP; EXITS cantOpenPackageList => RETURN [ TRUE, FALSE, "controller could not open package list"]; syntaxErrorInPackageList => RETURN [ TRUE, FALSE, "controller could not parse the package list - call an implementor"]; badPackageName => RETURN [ TRUE, FALSE, "bad package df file name"]; }; }; NewStats: PUBLIC ENTRY PROC [serverMachineName: RPC.ShortROPE, serverMachinePupAddress: RPC.ShortROPE, serverUP: BOOL, firstCall: BOOL, machineType: PrincOps.MachineType, mainMemory: CARDINAL, numberCPUs: CARDINAL, diskPartionSize: INT, freePagesOnDisk: INT, freeboard: INT, freeGFI: CARDINAL, freeMDS: CARDINAL, freeVM: CARDINAL, oldestLRUFileDate: BasicTime.GMT, CPULoad: REAL, reclamationRate: REAL, freeProcesses: CARDINAL] RETURNS [terminateService, newPackage: BOOL _ FALSE, queueingCommands: LIST OF ROPE _ NIL] = { ENABLE UNWIND => NULL; statusList: LIST OF ServerStatus; status: ServerStatus _ NIL ; lastStatusList: LIST OF ServerStatus _ NIL ; FOR statusList _ ServerStatusList, statusList.rest UNTIL statusList = NIL DO IF Rope.Equal[statusList.first.serverMachineName, serverMachineName] THEN { status _ statusList.first; IF ~serverUP THEN { IF lastStatusList = NIL THEN ServerStatusList _ statusList.rest ELSE lastStatusList.rest _ statusList.rest; } ELSE { newPackage _ status.newPackage; status.newPackage _ FALSE; status.freePagesOnDisk _ freePagesOnDisk; status.freeboard _ freeboard; status.freeGFI _ freeGFI; status.freeMDS _ freeMDS; status.freeVM _ freeVM; status.oldestLRUFileDate _ oldestLRUFileDate; status.CPULoad _ CPULoad; status.reclamationRate _ reclamationRate; status.freeProcesses _ freeProcesses; status.FOM _ figureOfMerit[status: status]; status.timeOfStatus _ BasicTime.Now[]; IF ~SymTab.Fetch[x: MachineToStatusTable, key: serverMachineName].found THEN { [] _ SymTab.Store[x: MachineToStatusTable, key: serverMachineName, val: status]; [] _ SymTab.Store[x: MachineToStatusTable, key: serverMachinePupAddress, val: status]; }; }; EXIT; }; lastStatusList _ statusList; REPEAT FINISHED => status _ NIL; ENDLOOP; IF status = NIL AND serverUP THEN { status _ NEW[ServerStatusRecord _ [ serverMachineName: serverMachineName, serverMachinePupAddress: serverMachinePupAddress, timeOfStatus: BasicTime.Now[], machineType: machineType, mainMemory: mainMemory, numberCPUs: numberCPUs, diskPartionSize:diskPartionSize, freePagesOnDisk: freePagesOnDisk, freeboard: freeboard, freeGFI: freeGFI, freeMDS: freeMDS, freeVM: freeVM, oldestLRUFileDate: oldestLRUFileDate, CPULoad: CPULoad, reclamationRate: reclamationRate, freeProcesses: freeProcesses ]]; status.FOM _ figureOfMerit[status: status]; [] _ SymTab.Store[x: MachineToStatusTable, key: serverMachineName, val: status]; [] _ SymTab.Store[x: MachineToStatusTable, key: serverMachinePupAddress, val: status]; ServerStatusList _ CONS[status, ServerStatusList]; }; IF status # NIL AND status.newQueueing THEN { status.newQueueing _ FALSE; FOR l: LIST OF QueueEntry _ WaitingQueue, l.rest UNTIL l = NIL DO IF l.first.doQueueing THEN queueingCommands _ CONS[l.first.service, queueingCommands]; ENDLOOP; }; IF firstCall THEN { CleanBroken: SymTab.EachPairAction = { ENABLE UNWIND => NULL; brokenList, newBrokenList, brokenListLoop: LIST OF BrokenCommand _ NIL; changed: BOOL _ FALSE; brokenList _ NARROW[val]; FOR brokenListLoop _ brokenList, brokenListLoop.rest UNTIL brokenListLoop = NIL DO IF Rope.Equal[brokenListLoop.first.serverMachineName, serverMachinePupAddress, FALSE] THEN { changed _ TRUE; LOOP; } ELSE { newBrokenList _ CONS[brokenListLoop.first, newBrokenList]; }; ENDLOOP; IF changed THEN [] _ SymTab.Store[x: BrokenCommandTable, key: key, val: newBrokenList]; RETURN[FALSE]; }; CleanExtra: SymTab.EachPairAction = { ENABLE UNWIND => NULL; extraList, newExtraList, extraListLoop: LIST OF ExtraCommand _ NIL; changed: BOOL _ FALSE; extraList _ NARROW[val]; FOR extraListLoop _ extraList, extraListLoop.rest UNTIL extraListLoop = NIL DO IF Rope.Equal[extraListLoop.first.serverMachineName, serverMachinePupAddress, FALSE] THEN { changed _ TRUE; LOOP; } ELSE { newExtraList _ CONS[extraListLoop.first, newExtraList]; }; ENDLOOP; IF changed THEN [] _ SymTab.Store[x: ExtraCommandTable, key: key, val: newExtraList]; RETURN[FALSE]; }; [] _ SymTab.Pairs[x: BrokenCommandTable, action: CleanBroken]; [] _ SymTab.Pairs[x: ExtraCommandTable, action: CleanExtra]; }; }; getServerInterfaceFromCache: ENTRY PROC [serverInstance: RPC.ShortROPE] RETURNS [serverInterface: ComputeServerRpcControl.InterfaceRecord _ NIL] = { ENABLE UNWIND => NULL; newServerInterface: ComputeServerRpcControl.InterfaceRecord _ NIL; bestIndex: INT _ 0; bestTime: BasicTime.GMT _ BasicTime.latestGMT; now: BasicTime.GMT _ BasicTime.Now[]; FOR index: INT IN [0..serverInterfaceCacheSize) DO IF Rope.Equal[serverInstance, serverInterfaceCache[index].serverInstance] THEN { newServerInterface _ serverInterfaceCache[index].interface; serverInterfaceCache[index].lastUsed _ now; RETURN[serverInterfaceCache[index].interface]; }; IF BasicTime.Period[bestTime, serverInterfaceCache[index].lastUsed] < 0 THEN { bestTime _ serverInterfaceCache[index].lastUsed; bestIndex _ index; }; ENDLOOP; newServerInterface _ ComputeServerRpcControl.ImportNewInterface[ interfaceName: [ type: "ComputeServer.summoner", instance: serverInstance, version: [1,1]] ! RPC.ImportFailed => { CONTINUE; }; ]; IF newServerInterface # NIL THEN { serverInterfaceCache[bestIndex].lastUsed _ now; serverInterfaceCache[bestIndex].serverInstance _ serverInstance; serverInterfaceCache[bestIndex].interface _ newServerInterface; }; RETURN[newServerInterface]; }; deleteServerInterfaceFromCache: ENTRY PROC [serverInterface: ComputeServerRpcControl.InterfaceRecord] RETURNS [gotIt: BOOL _ FALSE] = { ENABLE UNWIND => NULL; FOR index: INT IN [0..serverInterfaceCacheSize) DO IF serverInterfaceCache[index].interface = serverInterface THEN { serverInterfaceCache[index].serverInstance _ NIL; serverInterfaceCache[index].interface _ NIL; serverInterfaceCache[index].lastUsed _ BasicTime.earliestGMT; gotIt _ TRUE; EXIT; }; ENDLOOP; }; findOrCreateQueueEntry: ENTRY PROC [service: ROPE] RETURNS [entry: QueueEntry _ NIL, item: WaitingRequest] = { ENABLE UNWIND => NULL; item _ NEW[WaitingRequestRec ]; FOR queue: LIST OF QueueEntry _ WaitingQueue, queue.rest UNTIL queue = NIL DO IF Rope.Equal[queue.first.service, service, FALSE] THEN { entry _ queue.first; IF entry.waiters = NIL THEN entry.waiters _ CONS[item, NIL] ELSE FOR l: LIST OF WaitingRequest _ entry.waiters, l.rest UNTIL l = NIL DO IF l.rest = NIL THEN {l.rest _ CONS[item, NIL]; EXIT}; ENDLOOP; RETURN; -- existing queue }; ENDLOOP; WaitingQueue _ CONS[ entry _ NEW[QueueEntryRec _ [service: service, waiters: LIST[item]]], WaitingQueue ]; TRUSTED {Process.InitializeCondition[@entry.changed, Process.SecondsToTicks[1]];}; }; setNewQueueing: ENTRY PROC [] = { ENABLE UNWIND => NULL; FOR l: LIST OF ServerStatus _ ServerStatusList, l.rest UNTIL l = NIL DO l.first.newQueueing _ TRUE; ENDLOOP; LastReBuildQueueTime _ BasicTime.Now[]; }; queueItemReady: ENTRY PROC [entry: QueueEntry, item: WaitingRequest]= { ENABLE UNWIND => NULL; firstTime: BOOL _ TRUE; item.waiting _ true; DO IF BasicTime.Period[from: item.timeout, to: BasicTime.Now[]] > 0 THEN {item.waiting _ timedOut; RETURN}; SELECT item.waiting FROM true => { IF entry.servers # NIL AND entry.waiters.first = item THEN { bestStatus: ServerStatus _ NIL; prev, prevBest: LIST OF ROPE _ NIL; best: LIST OF ROPE _ NIL; serv: ROPE; FOR l: LIST OF ROPE _ entry.servers, l.rest UNTIL l = NIL DO statusFound: BOOL; val: REF ANY; status: ServerStatus; serv _ l.first; [found: statusFound, val: val] _ SymTab.Fetch[x: MachineToStatusTable, key: serv]; IF statusFound THEN { status _ NARROW[val]; IF IsStatusBetter[status, bestStatus] THEN { bestStatus _ status; best _ l; prevBest _ prev; }; }; prev _ l; ENDLOOP; IF prevBest = NIL THEN { item.instance _ entry.servers.first; entry.servers _ entry.servers.rest; } ELSE { item.instance _ best.first; prevBest.rest _ best.rest; }; item.serverAssigned _ TRUE; item.waiting _ validating; RETURN; }; }; done => { item.serverAssigned _ TRUE; item.waiting _ validating; RETURN; }; timedOut => { RETURN; }; ENDCASE; IF firstTime THEN {forceReBuildQueuing _ TRUE}; firstTime _ FALSE; WAIT entry.changed; ENDLOOP; }; removeQueueItem: ENTRY PROC [entry: QueueEntry, item: WaitingRequest] = { ENABLE UNWIND => NULL; BROADCAST entry.changed; IF entry.waiters = NIL THEN RETURN; -- actually, this is a bug IF entry.waiters.first = item THEN {entry.waiters _ entry.waiters.rest; RETURN}; FOR l: LIST OF WaitingRequest _ entry.waiters, l.rest UNTIL l.rest = NIL DO IF l.rest.first = item THEN {l.rest _ l.rest.rest; RETURN}; ENDLOOP; }; MightAcceptQueuedCommand: PUBLIC ENTRY PROC [serverMachineAddress: RPC.ShortROPE, commandName: Rope.ROPE] = { ENABLE UNWIND => NULL; item: WaitingRequest _ NIL; FOR queue: LIST OF QueueEntry _ WaitingQueue, queue.rest UNTIL queue = NIL DO entry: QueueEntry = queue.first; IF Rope.Equal[commandName, entry.service, FALSE] THEN { IF entry.waiters = NIL OR entry.waiters.first.waiting # true THEN { -- nobody waiting now or top waiter is busy IF entry.servers = NIL THEN entry.servers _ CONS[serverMachineAddress, NIL] ELSE FOR l: LIST OF ROPE _ entry.servers, l.rest UNTIL l = NIL DO IF Rope.Equal[l.first , serverMachineAddress, FALSE] THEN EXIT; IF l.rest = NIL THEN {l.rest _ CONS[serverMachineAddress, NIL]; EXIT}; ENDLOOP; BROADCAST entry.changed; EXIT; } ELSE { entry.waiters.first.waiting _ done; entry.waiters.first.instance _ serverMachineAddress; BROADCAST entry.changed; EXIT; }; }; ENDLOOP; }; IsStatusBetter: PROC [status: ServerStatus, bestStatus: ServerStatus] RETURNS [ itsBetter: BOOL _ FALSE] = { SELECT TRUE FROM bestStatus = NIL => itsBetter _ TRUE; ((3*bestStatus.reclamationRate) > status.reclamationRate) AND (bestStatus.reclamationRate > 5) AND (status.FOM < 2*bestStatus.FOM )=> itsBetter _ TRUE; ENDCASE => { IF status.FOM < bestStatus.FOM THEN itsBetter _ TRUE; }; }; locked: BOOL _ FALSE; lockCondition: CONDITION; Lock: ENTRY PROC = { WHILE locked DO WAIT lockCondition; ENDLOOP; locked _ TRUE; }; UnLock: ENTRY PROC = { locked _ FALSE; NOTIFY lockCondition }; SetGrabController: Commander.CommandProc = { ComputeControllerInternal.GrabController _ TRUE; }; JustAProcessThatYouCanFreeze: PROC = { DO Process.Pause[200]; ENDLOOP; }; TRUSTED {Process.Detach[FORK JustAProcessThatYouCanFreeze[]];}; Commander.Register[key: "///Commands/SummonerBecomeController", proc: SetGrabController, doc: "Try to force the Compute Server Controller to this machine"]; TRUSTED {Process.InitializeCondition[@NoticeNewPackageCondition, Process.SecondsToTicks[1]];}; serverInterfaceCache _ NEW[serverInterfaceArray]; END. ΖComputeControllerImpl.mesa Implement the controller. This runs on any machine that is also a compute server. Last Edited by: Bob Hagmann, December 24, 1985 2:20:51 pm PST Copyright c 1984 by Xerox Corporation. All rights reserved. See if this Controller thinks everything is fine IF Rope.Equal[status.serverMachineName, ComputeServerInternal.myHostName] THEN { serverList _ CONS[Rope.Concat[status.serverMachineName, " (Controller)"], serverList]; } ELSE serverList _ CONS[status.serverMachineName, serverList]; We are about to retry the same server again. Give up probing, and start queueing. Remote machine is not unresponsive, but will not answer the call Detach so we can return. Freeze, abort and thaw the process. After a long time (minutes if ever), the process will finally wake up and go away. Also, if the remote host stops answering the ping, the call will fail and the process will exit. command should be out on (almost) every server command should be out on (almost) no servers See if this "Extra" reverses a previous broken PROC [item: REF ANY] RETURNS [stop: BOOL _ FALSE] PROC [item: REF ANY] RETURNS [stop: BOOL _ FALSE] DFUtilities.WriteItemToStream[out: packageListOutStream, item: dir]; PROC [key: Key, val: Val] RETURNS [quit: BOOL]; PROC [key: Key, val: Val] RETURNS [quit: BOOL]; item.instance _ entry.servers.first; entry.servers _ entry.servers.rest; Grap Controller Debugging Process Initialize Bob Hagmann December 24, 1985 2:21:09 pm PST Made sure in NewStats that status was not NIL before doing newQueueing processing Bob Hagmann January 15, 1986 5:43:15 pm PST Queuing of commands did not drive up the usage numbers changes to: bumpBestServerStats, FindService, FindServiceWithQueueing Κ4ΐ– "Cedar" style˜headšœ™IbodyšœR™RL™=I copywrightšœ Οmœ1™<code2šΟk ˜ N˜ N˜ N˜ N˜Nšœ˜Nšœ˜Nšœ"˜"Nšœ˜Nšœ˜Nšœ˜N˜ N˜N˜ N˜N˜N˜ N˜N˜N˜N˜ N˜N˜Nšžœ˜Nšœ˜Nšœ˜Nšœ˜Nšœ ˜ Nšœ˜——šœžœž˜$NšžœΩžœ/˜’NšžœN˜UNšœž˜Nšžœžœžœ˜Nšœž œ˜%Icode˜Ošœžœžœ˜ Ošœžœžœ˜ Ošœžœžœ˜$Ošœžœžœžœ˜&Ošœžœžœžœ˜*Ošœžœžœžœ˜$Ošœžœ žœ˜COšœžœžœ˜Ošœžœžœ˜O˜Ošœžœ˜3O˜,Ošœžœ˜%Ošœžœ ˜!Ošœ žœ˜$Ošœ žœ˜8O˜O˜Ošœžœ0˜HOšœžœžœ˜,Ošœž œžœžœ˜4O˜Ošœ!Οc+˜LJšœ Ÿ6˜VJšœ#ŸU˜xJ˜šœžœžœ˜$Jšœžœ ˜!Jšœ žœžœžœž˜J˜—Jšœžœžœ˜.J˜šœžœžœ˜#Jšœžœ ˜!Jšœ žœžœžœž˜J˜—Jšœžœžœ˜,J˜Jšœžœžœ˜"Jšœžœžœžœ˜'Jšœ žœžœ˜%šœžœžœ˜Jšœ žœžœ˜Jšœ žœžœ˜Jšœ žœžœžœ˜&Jš œ žœžœžœžœ˜Jšœ ž ˜J˜—J˜Jšœžœžœ˜-šœžœžœ˜"Jšœ žœ˜Jšœžœ˜Jšœ˜Jšœ žœ˜Jšœžœ ˜!Jšœ%˜%Jšœžœ˜Jšœžœžœ˜Jšœ$˜$Jšœ žœ ˜Jšœ$˜$J˜—Jšœ žœ,˜:J˜J˜šœžœžœ˜$Jšœžœžœ˜Jšœ5žœ˜9Jšœžœ˜/J˜J˜—Jšœžœ˜#J˜Jšœžœžœžœ˜XJšœžœ˜/J˜J˜-J˜šœžœžœ˜#Jšœ˜Jšœžœžœ˜Jšœ$˜$Jšœ%˜%Jšœž˜ J˜—J˜Jšœ'žœ˜0J˜šΠbnœžœžœžœžœ žœžœžœ˜vNšœžœžœ˜˜Nšžœžœžœžœ ˜)Nšœžœ˜Nšœžœžœ.žœ˜yNšžœ žœ)˜ONšœžœ˜Nšœžœ˜Nšœžœ˜N˜Nšœ žœ˜šžœžœ˜Nšœ˜Nšœ˜NšœG˜Gšžœžœžœ˜+šžœžœ<žœežœ˜ΛNšœžœ˜Nšœ+˜+Nšœ)˜)Nšœžœ˜Nšœ2žœ˜9Jšœ1žœ˜8Jšœ4žœ˜;šœ2˜2Nšœg˜gNšœ ˜ Nšœ žœ˜šœžœ˜Nšœžœ˜Nšœžœ˜Nšžœ˜ N˜—N˜—N˜—N˜—N˜—N˜ Nšœ žœ˜Nšžœžœ ˜Nšžœ$žœ˜?Nšžœ˜—Nš žœžœžœžœžœ:˜`Nšžœžœžœžœ˜FNšœ˜—š‘œžœžœžœžœ žœ žœžœ˜RNšžœžœžœ˜Nšœžœ˜Nšœ žœžœ˜!šžœ0žœžœž˜LNšœ(˜(Nšžœ$žœ˜?Nšžœ˜—Nš žœžœžœžœžœ˜+Nšžœ+žœ˜6N˜—š’œžœžœ žœ˜=Nšžœžœžœ˜Nšœ žœ˜Nšœžœžœ˜ Nšœ˜NšœV˜Všžœ žœ˜Nšœ žœ˜Nšœžœ˜1Nšœ6˜6Nšœžœ˜#N˜—N˜—š‘ œžœžœ žœžœ žœ žœ˜[Nšœ.˜.N˜—š‘œžœž œ žœ žœžœžœ@žœžœ.žœ:žœ˜ΊNšœ˜Nšœžœ˜š’ œžœ˜šž˜Nšœžœ˜ Nšœ žœ˜Nšœ žœžœ˜Nšœžœžœ˜2Nšœ$˜$Nšœ˜Nšžœžœžœ˜%NšœY˜YNšœ žœ ˜šžœ2žœžœž˜RšžœCžœžœ˜RNšœžœ˜Nšœ˜Nšžœ˜N˜—Nšžœ˜—Nšžœžœžœ˜"Nšœm˜mšžœ žœžœ˜)Nšœžœ˜Nšœ˜Nšžœ˜N˜—Nšœ˜Nšœ)˜)šžœ ž˜šœ ˜ Nšœ˜Nšœ žœ˜ Nšžœ˜N˜—šœ˜Nšžœ˜N˜—šœ ˜ Nšœ˜Nšžœ˜N˜—Nšžœ˜—Nšžœ˜—Nšœ˜N˜—Nšœ0˜0N˜Nšœ+˜+Nšœ)˜)Nšœ!˜!šžœžœŸ˜:Nšœžœ˜.Nšœ=˜=Nšœ˜N˜Nšœ#˜#Nšžœažœ˜lN˜—šœžœŸ˜ Nš œžœžœžœ žœ˜*šž˜Nšœžœ˜ Nšœžœ˜Nšœ žœ ˜Nšœ$˜$Nšœžœ˜N˜Nšœ5˜5Nšœ˜šžœžœ˜Nšœ˜Nšœ˜Nšžœ˜N˜—š žœ žœžœžœ+žœžœž˜_šžœ(žœžœ˜7N™RNšœžœŸ˜.Nšœžœ˜N˜Nšœžœ˜.Nšœ=˜=Nšœ˜N˜Nšžœažœ˜lN˜—Nšžœ˜—Nšœžœ˜,Nšœl˜lNšžœ žœžœ˜šžœ ž˜šœ ˜ Nšœ˜Nšžœ(žœ˜3N˜—šœ˜Nšžœ˜N˜—šœ ˜ Nšœ˜Nšžœ(žœ˜3N˜—Nšžœ˜—Nšžœ˜—N˜—N˜—š   œžœ+žœžœžœUžœ˜ΎNšœžœ˜Nšœ žœžœ˜@Nšœžœ˜+Nšœžœ,˜?šž˜šžœ2žœžœ˜BN™@Nšœzžœ˜“Nšœσ™σNšœ˜Nšœžœ˜'Nšœ ˜ Nšœžœ˜%Nšœ˜Nšžœžœ+˜7N˜—šžœžœžœ˜*Nšœžœ˜Nšžœg˜mNšœ˜—Nšœ˜Nšžœ˜—N˜—š œžœ8žœ˜jNšžœžœžœ ˜Nšœžœžœ˜Nšœ$˜$Nšœ%˜%Nšœžœ˜ Nšœ žœžœ˜Nšœžœ˜%šž˜Nšœ9˜9Nšœ>˜>Nš žœžœžœžœžœ˜ANšœˆ˜ˆšœžœ˜Nšœ5˜5Nšžœ žœžœžœ˜-Nšœžœ˜Nšžœ˜Nšœ˜—Nšœ˜Nšžœ˜Nšžœ˜—Nšœ˜Nšœ0˜0Nšœ ˜ Nšœ0˜0Nšœ˜šž˜šœ ˜ Nšœ˜Nšžœ˜N˜——N˜—š‘œžœžœ žœžœ žœ žœ˜_Nšžœžœžœ˜Nšœ žœžœ˜!Nšœžœ˜Nšœžœ˜N˜NšœZ˜Zšžœžœ˜N™.Nšœ žœ˜Nšœ žœžœ˜Nšœ žœžœ˜"NšœY˜YNšœ žœ ˜šžœ0žœžœž˜LNšœžœžœ˜Nšœžœ&˜=Nšœžœ,˜INšœžœžœ˜&šžœ2žœžœž˜Rš žœGžœžœMžœžœ˜¬Nšœ žœ˜Nšžœ˜N˜—Nšžœ˜—Nšžœžœžœ˜Nšžœ.žœ˜SNšžœ˜—N˜—šœžœ˜N™,Nšœ žœ˜Nšœ žœžœ˜Nšœžœžœ˜$NšœV˜Vš žœžœžœžœž˜UNšœ žœ˜Nšœžœžœ˜ Nšœ˜Nšœs˜sšžœ žœ˜Nšœ žœ˜Nšžœ$žœ˜?N˜—Nšžœ˜—N˜—šžœžœžœ˜Nšžœžœ˜Nšœ˜—šœžœ˜Nšœžœ!˜9Nšœ>˜>Nšœ žœ˜+Nšžœžœ&˜1N˜—Nšœ˜—š ’ œžœžœžœžœ˜BNš œžœžœžœžœ˜kNšœ žœ˜/Nšžœžœ˜*NšžœG˜JN˜—š ‘œž œžœžœžœ žœ˜wIdefault˜Pšœžœ˜ Pšœ žœžœ˜Pšœ˜Pšœ žœžœ˜Pšœ žœžœ˜#Pšœžœžœ˜ PšœQ˜Qšžœžœ˜Pšœ žœ˜šžœ)žœžœž˜FJšœ*˜*šžœ9žœ˜AJšœ žœžœžœ˜šžœ1žœžœž˜NJšœžœžœ˜'Jšžœžœžœ˜)Jšžœ˜—Jšœžœ˜2Jšžœ˜J˜—Jšžœ˜—Jšœ žœHžœ ˜gJšœ žœ žœ˜'JšœI˜IJšœ˜—šœžœ˜Jšœ žœHžœ ˜gPšœ žœ ˜PšœI˜IP˜—Pšœ˜—P˜š‘œžœžœžœžœžœ žœ˜zPšžœžœžœ˜Pšœžœ˜ Pšœ žœžœ˜Pšœ˜Pšœ žœžœ˜Pšœ žœžœ˜"šœžœžœ˜ P™.—PšœQ˜Qšžœžœ˜Pšœ<˜˜>Nšœ<˜žœ˜BNšœ žœ˜Nšœžœ˜.Nšœžœ˜%šžœžœžœž˜2šžœHžœ˜PNšœ;˜;Nšœ+˜+Nšžœ(˜.N˜—šžœFžœ˜NNšœ0˜0Nšœ˜N˜—Nšžœ˜—šœ@˜@NšœZ˜Zšœžœ˜Nšžœ˜ Nšœ˜—N˜—šžœžœžœ˜"Nšœ0˜0Nšœ@˜@Nšœ?˜?N˜—Nšžœ˜N˜—š ’œžœžœ<žœ žœžœ˜‡Nšžœžœžœ˜šžœžœžœž˜2šžœ9žœ˜ANšœ-žœ˜1Nšœ(žœ˜,Nšœ=˜=Nšœžœ˜ Nšžœ˜N˜—Nšžœ˜—N˜—š ’œžœžœ žœžœžœ˜nNšžœžœžœ˜Nšœžœ˜š žœžœžœ'žœ žœž˜Mšžœ*žœžœ˜9Nšœ˜Nš žœžœžœžœžœ˜<šœžœžœžœžœ(žœžœž˜LNš žœ žœžœ žœžœžœ˜6Nšžœ˜—NšžœŸ˜N˜—Nšžœ˜—šœžœ˜Nšœžœ-žœ ˜ENšœ ˜ —Nšœ˜NšžœK˜RN˜—š’œžœžœ˜!Nšžœžœžœ˜š žœžœžœ)žœžœž˜GNšœžœ˜Nšž˜—Nšœ'˜'Nšœ˜—š’œžœžœ-˜GNšžœžœžœ˜Nšœ˜Nšœ˜šž˜Nšžœ?žœžœ˜išžœž˜šœ ˜ šžœžœžœžœ˜