<> <> <> <> <> <<>> <> <<>> <> <> <<>> DIRECTORY BasicTime USING [GMT, nullGMT, Now], FS USING [StreamOpen], GVBasics USING [RName], IO, Process USING [Detach, SecondsToTicks], Rope, UserCredentials USING [Get], ViewerClasses USING [Viewer], ViewerIO USING [CreateViewerStreams, GetViewerFromStream], ViewerOps USING [FindViewer], WalnutDB --using lots-- , WalnutDefs USING [Error, SchemaMismatch, Segment, VersionMismatch], WalnutLog --using lots-- , WalnutLogExpunge USING [Shutdown], WalnutMiscLog USING [Shutdown], WalnutOps, WalnutOpsInternal, WalnutOpsMonitorImpl, WalnutRegistryPrivate USING [NotifyForEvent], WalnutRoot USING [AbortTransaction, CloseTransaction, CommitAndContinue, EraseDB, Open, RegisterStatsProc, Shutdown, StartTransaction, UnregisterStatsProc]; WalnutOpsBasicImpl: CEDAR MONITOR LOCKS walnutOpsMonitorImpl IMPORTS FS, IO, Process, Rope, UserCredentials, ViewerIO, ViewerOps, walnutOpsMonitorImpl: WalnutOpsMonitorImpl, WalnutDB, WalnutDefs, WalnutLog, WalnutLogExpunge, WalnutMiscLog, WalnutOpsInternal, WalnutRegistryPrivate, WalnutRoot EXPORTS WalnutOps, WalnutOpsInternal SHARES WalnutOpsMonitorImpl = BEGIN OPEN WalnutOps, WalnutOpsInternal; <> ROPE: TYPE = Rope.ROPE; STREAM: TYPE = IO.STREAM; GMT: TYPE = BasicTime.GMT; <> <> <> Active: PUBLIC ROPE _ "Active"; Deleted: PUBLIC ROPE _ "Deleted"; reporterList: LIST OF IO.STREAM_ NIL; checkActivityCondition: CONDITION _ [timeout: Process.SecondsToTicks[5*60]]; started: BOOL _ FALSE; replayInProgress: BOOL _ FALSE; interference: BOOL _ FALSE; walnutRootFile: ROPE; recentActivity: BOOL _ FALSE; isShutdown: PUBLIC BOOL _ FALSE; errorInProgress: PUBLIC BOOL _ FALSE; walnutSegment: PUBLIC WalnutDefs.Segment; systemIsReadOnly: PUBLIC BOOL _ FALSE; rootFileName: PUBLIC ROPE; newMailSomewhere: PUBLIC BOOL _ FALSE; statsStream, statsProgressStream: STREAM _ NIL; heavyhandedDebugging: BOOL _ FALSE; statsProgressTS: ViewerClasses.Viewer _ NIL; mailStream: PUBLIC STREAM; <> <> <> Startup: PUBLIC ENTRY PROC [rootFile: ROPE, wantReadOnly: BOOL _ FALSE] RETURNS[isReadOnly, newMailExists: BOOL, mailFor: GVBasics.RName, key: ROPE] = { <> <<>> <> <> <> ENABLE { WalnutDefs.Error => { errorInProgress _ TRUE; REJECT}; UNWIND => NULL; }; rootFileCreateDate: GMT; schemaInvalid: BOOL _ TRUE; InitialCheck: INTERNAL PROC = { rootCreateDate: GMT; rootFileKey, mailForFromDB: ROPE; IF key.Length[] # 0 THEN { [rootCreateDate, rootFileKey, mailForFromDB] _ WalnutDB.GetRootInfo[]; IF rootCreateDate = BasicTime.nullGMT OR (rootCreateDate = rootFileCreateDate AND rootFileKey.Length[] = 0 AND mailForFromDB.Length[] = 0) THEN { WalnutDB.SetRootInfo[rootFileCreateDate, key, mailFor]; WalnutRoot.CommitAndContinue[]; } ELSE { IF ~rootFileKey.Equal[key, FALSE] THEN WalnutDefs.Error[$db, $WrongRootFile, IO.PutFR["RootFile has key %g, database says %g", IO.rope[key], IO.rope[rootFileKey]]]; IF ~mailForFromDB.Equal[mailFor, FALSE] THEN WalnutDefs.Error[$db, $WrongRootFile, IO.PutFR["RootFile says mailFor %g, database says %g", IO.rope[mailFor], IO.rope[mailForFromDB]]]; IF rootCreateDate # rootFileCreateDate THEN WalnutDefs.Error[$db, $WrongRootFile, IO.PutFR["RootFile has date %g, database says %g", IO.time[rootFileCreateDate], IO.time[rootCreateDate]]]; }; }; IF (WalnutOpsInternal.newMailSomewhere _ WalnutDB.GetNewMailLogLength[] # 0) THEN RETURN; <> BEGIN serverList: LIST OF ServerInfo _ WalnutDB.EnumerateServers[]; FOR sL: LIST OF ServerInfo _ serverList, sL.rest UNTIL sL = NIL DO IF (WalnutOpsInternal.newMailSomewhere _ sL.first.num#0) THEN RETURN; ENDLOOP; END; }; IF started THEN RETURN WITH ERROR WalnutDefs.Error[$db, $AlreadyStarted]; StartStatsReporting[]; [key, mailFor, rootFileCreateDate, walnutSegment, systemIsReadOnly] _ WalnutRoot.Open[rootName: rootFile, readOnly: wantReadOnly]; rootFileName _ rootFile; BEGIN exp: ROPE; BEGIN schemaInvalid _ WalnutRoot.StartTransaction[ ! WalnutDefs.Error => { IF code # $MismatchedSegment THEN REJECT; exp _ explanation; GOTO mismatched; }]; EXITS mismatched => RETURN WITH ERROR WalnutDefs.Error[$db, $SchemaMismatch, exp]; END; END; schemaInvalid _ WalnutRoot.StartTransaction[]; WalnutLog.OpenLogStreams[]; walnutRootFile _ rootFile; isReadOnly _ systemIsReadOnly; BEGIN exp: ROPE; BEGIN WalnutDB.DeclareDB[walnutSegment, schemaInvalid ! WalnutDefs.SchemaMismatch => { exp _ explanation; GOTO mismatch}]; StatsReport[IO.PutFR["\n\n ***** Startup called with rootFile: %g", IO.rope[rootFile]]]; EXITS mismatch => RETURN WITH ERROR WalnutDefs.Error[$db, $SchemaMismatch, exp]; END; END; started _ TRUE; errorInProgress _ FALSE; CarefullyApply[InitialCheck, FALSE]; CheckInProgress[]; newMailExists _ WalnutOpsInternal.newMailSomewhere; WalnutRegistryPrivate.NotifyForEvent[started]; }; Shutdown: PUBLIC ENTRY PROC = { <> ENABLE { WalnutDefs.Error => { errorInProgress _ TRUE; REJECT}; UNWIND => NULL; }; WalnutRoot.UnregisterStatsProc[StatsReport]; WalnutLogExpunge.Shutdown[]; -- clear its variables WalnutMiscLog.Shutdown[]; -- clear its variables WalnutLog.ShutdownLog[]; WalnutRoot.Shutdown[]; -- takes care of database StatsReport["\n *** Shutdown"]; IF statsProgressTS # NIL THEN { statsProgressTS.inhibitDestroy _ FALSE }; IF statsStream # NIL THEN { statsStream.Close[]; statsStream _ NIL }; IF statsProgressStream # NIL THEN { statsProgressStream.Close[]; statsProgressStream _ NIL }; started _ FALSE; recentActivity_ FALSE; isShutdown _ FALSE; errorInProgress _ FALSE; mailStream _ NIL; WalnutRegistryPrivate.NotifyForEvent[stopped]; }; Scavenge: PUBLIC ENTRY PROC[rootFile: ROPE] RETURNS[newMailExists: BOOL, mailFor: GVBasics.RName, key: ROPE] = { ENABLE { WalnutDefs.Error => { errorInProgress _ TRUE; REJECT}; UNWIND => NULL; }; rootFileCreateDate: GMT; WalnutLog.ShutdownLog[]; WalnutRoot.Shutdown[]; StartStatsReporting[]; StatsReport["\n *** Scavenge"]; [key, mailFor, rootFileCreateDate, walnutSegment, systemIsReadOnly] _ WalnutRoot.Open[rootName: rootFile, readOnly: FALSE, newSegmentOk: TRUE]; IF systemIsReadOnly THEN ERROR WalnutDefs.Error[$DB, $IsReadOnly, "Can't erase a readonly database"]; rootFileName _ rootFile; [] _ WalnutRoot.StartTransaction[openDB: FALSE]; -- don't care if schema is invalid WalnutLog.OpenLogStreams[]; WalnutLog.AcquireWriteLock[]; started _ TRUE; errorInProgress _ FALSE; WalnutRoot.EraseDB[]; WalnutDB.InitSchema[walnutSegment]; WalnutDB.SetRootInfo[rootFileCreateDate, key, mailFor]; WalnutDB.SetParseLogInProgress[TRUE]; WalnutDB.SetParseLogPos[0]; WalnutRoot.CommitAndContinue[]; WalnutOpsInternal.newMailSomewhere _ FALSE; [] _ WalnutOpsInternal.ParseLog[TRUE]; WalnutDB.SetTimeOfLastScavenge[BasicTime.Now[]]; newMailExists _ WalnutOpsInternal.newMailSomewhere; }; CleanupAfterCopy: PUBLIC PROC = { WalnutLog.ForgetLogStreams[]; WalnutLog.OpenLogStreams[]; }; <<-- * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *>> StartStatsReporting: PROC = { statsFile: ROPE _ Rope.Cat["///Users/", UserCredentials.Get[].name, "/WalnutStats.log"]; IF statsStream # NIL THEN statsStream.Close[ ! IO.Error => CONTINUE]; statsStream _ FS.StreamOpen[fileName: statsFile, accessOptions: $create, keep: 4]; IF heavyhandedDebugging THEN { name: ROPE = "Walnut Stats"; statsProgressTS _ ViewerOps.FindViewer[name]; statsProgressStream _ ViewerIO.CreateViewerStreams[name, statsProgressTS].out; IF statsProgressTS = NIL THEN statsProgressTS _ ViewerIO.GetViewerFromStream[statsProgressStream]; statsProgressTS.inhibitDestroy _ TRUE; }; WalnutRoot.RegisterStatsProc[StatsReport]; }; StatsReport: PUBLIC PROC[msg: ROPE] = { r: ROPE; IF statsStream = NIL THEN RETURN; statsStream.PutRope[r _ IO.PutFR["\n %g @ %g", IO.rope[msg], IO.time[]]]; IF statsProgressStream # NIL THEN statsProgressStream.PutRope[r]; }; RegisterReporter: PUBLIC ENTRY PROC[reportStream: IO.STREAM] = { ENABLE UNWIND => NULL; FOR rL: LIST OF IO.STREAM_ reporterList, rL.rest UNTIL rL= NIL DO IF rL.first = reportStream THEN RETURN; ENDLOOP; reporterList_ CONS[reportStream, reporterList]; }; UnregisterReporter: PUBLIC ENTRY PROC[reportStream: IO.STREAM] = { ENABLE UNWIND => NULL; prev: LIST OF IO.STREAM; IF reporterList = NIL OR reportStream = NIL THEN RETURN; IF reporterList.first = reportStream THEN {reporterList_ reporterList.rest; RETURN}; prev_ reporterList; FOR rL: LIST OF IO.STREAM_ reporterList, rL.rest UNTIL rL= NIL DO IF rL.first = reportStream THEN { prev.rest_ rL.rest; RETURN}; prev_ rL; ENDLOOP; }; CheckReport: PUBLIC PROC[msg1, msg2, msg3: ROPE_ NIL] = { IF reporterList = NIL THEN RETURN; FOR rL: LIST OF IO.STREAM_ reporterList, rL.rest UNTIL rL= NIL DO IF msg1 # NIL THEN rL.first.PutRope[msg1]; IF msg2 # NIL THEN rL.first.PutRope[msg2]; IF msg3 # NIL THEN rL.first.PutRope[msg3]; ENDLOOP; }; <> CarefullyApply: PUBLIC INTERNAL PROC[proc: PROC[], didUpdate: BOOL] = { reTryCount: INT _ 2; schemaInvalid: BOOL _ TRUE; recentActivity _ TRUE; IF ~started THEN ERROR WalnutDefs.Error[$root, $NotStarted, "Must do WalnutOps.Startup"]; DO BEGIN ENABLE WalnutDefs.Error => { IF code = $TransactionAbort THEN { StatsReport[" **** TransactionAbort"]; IF ( reTryCount _ reTryCount - 1) > 0 THEN GOTO retry; errorInProgress _ TRUE; StatsReport[" *** Too many TransactionAbort's during WalnutOps call"]; REJECT }; errorInProgress _ TRUE; StatsReport[IO.PutFR[" *** WalnutDefs.Error: %g", IO.atom[code]]]; REJECT }; proc[]; IF didUpdate THEN WalnutRoot.CommitAndContinue[]; RETURN; EXITS retry => NULL; END; WalnutLog.ForgetLogStreams[]; WalnutRoot.AbortTransaction[]; IF (schemaInvalid _ WalnutRoot.StartTransaction[]) THEN WalnutDB.InitSchema[walnutSegment]; WalnutLog.OpenLogStreams[]; ENDLOOP; }; LongRunningApply: PUBLIC INTERNAL PROC[proc: PROC[inProgress: BOOL]] = { reTryCount: INT _ 2; alreadyCalled: BOOL _ FALSE; schemaInvalid: BOOL _ TRUE; recentActivity _ TRUE; isShutdown _ FALSE; IF ~started THEN ERROR WalnutDefs.Error[$root, $NotStarted, "Must do WalnutOps.Startup"]; DO BEGIN ENABLE WalnutDefs.Error => { IF code = $TransactionAbort THEN { StatsReport[" **** TransactionAbort"]; IF ( reTryCount _ reTryCount - 1) > 0 THEN GOTO retry; errorInProgress _ TRUE; StatsReport[" *** Too many TransactionAbort's during LongRunning op"]; REJECT }; errorInProgress _ TRUE; StatsReport[IO.PutFR[" *** WalnutDefs.Error: %g", IO.atom[code]]]; REJECT }; WalnutLog.AcquireWriteLock[]; proc[alreadyCalled ! WalnutDefs.VersionMismatch => { ReleaseWriteLock[]; REJECT} ]; ReleaseWriteLock[]; -- also does commit RETURN; EXITS retry => NULL; END; WalnutLog.ForgetLogStreams[]; WalnutRoot.AbortTransaction[]; IF (schemaInvalid _ WalnutRoot.StartTransaction[]) THEN WalnutDB.InitSchema[walnutSegment]; WalnutLog.OpenLogStreams[]; alreadyCalled _ WalnutDB.GetOpInProgressPos[] > 0; ENDLOOP; }; Restart: PUBLIC INTERNAL PROC = { -- need to re-open the transaction schemaInvalid: BOOL = WalnutRoot.StartTransaction[]; IF schemaInvalid THEN WalnutDB.InitSchema[walnutSegment]; WalnutLog.OpenLogStreams[]; isShutdown _ FALSE; }; CheckForRecentActivity: ENTRY PROC = { ENABLE UNWIND => recentActivity _ FALSE; DO WAIT checkActivityCondition; IF started THEN { IF recentActivity = TRUE THEN recentActivity _ FALSE ELSE IF ~isShutdown THEN { isShutdown _ TRUE; WalnutLog.ForgetLogStreams[]; WalnutRoot.CloseTransaction[]; isShutdown _ TRUE; StatsReport[" $$ Close transactions due to inactivity"]; }; }; ENDLOOP; }; ReleaseWriteLock: INTERNAL PROC = { WalnutRoot.CommitAndContinue[]; WalnutLog.ReleaseWriteLock[]; }; <<* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *>> recentActivity_ FALSE; TRUSTED {Process.Detach[FORK CheckForRecentActivity] }; END.