DIRECTORY BasicTime USING [GMT, nullGMT], FS USING[PagesForBytes], IO, Rope, WalnutDB --using lots-- , WalnutDefs USING [Error, VersionMismatch], WalnutKernelDefs USING [LogEntry, MsgLogEntry, WhichTempLog], WalnutLog --using lots-- , WalnutOps USING [dontCareDomainVersion, dontCareMsgSetVersion], WalnutOpsInternal, WalnutOpsMonitorImpl, WalnutRoot USING [AcquireWriteLock, CommitAndContinue]; WalnutOpsParseLogImpl: CEDAR MONITOR LOCKS walnutOpsMonitorImpl IMPORTS FS, IO, Rope, walnutOpsMonitorImpl: WalnutOpsMonitorImpl, WalnutDB, WalnutDefs, WalnutLog, WalnutOpsInternal, WalnutRoot EXPORTS WalnutOpsInternal SHARES WalnutOpsMonitorImpl = BEGIN ROPE: TYPE = Rope.ROPE; STREAM: TYPE = IO.STREAM; GMT: TYPE = BasicTime.GMT; parseCommitFrequency: INT = 60; dontCareMsgSetVersion: INT = WalnutOps.dontCareMsgSetVersion; dontCareDomainVersion: INT = WalnutOps.dontCareDomainVersion; CheckInProgress: PUBLIC INTERNAL PROC = { inProgressPos: INT; parseInProgress: BOOL; needsParse: BOOL _ FALSE; Cip: INTERNAL PROC = { IF (parseInProgress _ WalnutDB.GetParseLogInProgress[]) THEN { WalnutRoot.AcquireWriteLock[]; RETURN }; inProgressPos _ WalnutDB.GetOpInProgressPos[]; IF inProgressPos <= 0 THEN RETURN; WalnutRoot.AcquireWriteLock[]; }; Fip: INTERNAL PROC[inProgress: BOOL] = { -- FinishInProgress at: INT; wle: WalnutKernelDefs.LogEntry; WalnutLog.SetIndex[inProgressPos]; [wle, at] _ WalnutLog.NextEntry[]; IF at = -1 THEN ERROR WalnutDefs.Error[$log, $BadLog, IO.PutFR["No entry at %g", IO.int[inProgressPos]]]; TRUSTED { WITH le: wle SELECT FROM ExpungeMsgs => WalnutDB.ExpungeMsgs[dontCareMsgSetVersion]; WriteExpungeLog => []_ WalnutOpsInternal.DoLogExpunge[le.internalFileID]; EmptyMsgSet => []_ WalnutDB.EmptyMsgSet[ [Rope.FromRefText[le.msgSet], dontCareMsgSetVersion] ]; DestroyMsgSet => []_ WalnutDB.DestroyMsgSet[ msgSet: [Rope.FromRefText[le.msgSet], dontCareMsgSetVersion], msDomainVersion: dontCareDomainVersion ]; AcceptNewMail => { WalnutDB.AcceptNewMail[at, dontCareMsgSetVersion]; WalnutOpsInternal.newMailSomewhere _ FALSE; }; StartCopyNewMail => { FinishCopyTempLog[newMail, at]; WalnutDB.SetCopyMailLogPos[0]; WalnutRoot.CommitAndContinue[]; WalnutLog.FinishTempLogCopy[newMail]; needsParse _ TRUE; }; StartReadArchiveFile => { WalnutLog.ResetLog[at]; WalnutDB.SetReadArchivePos[0]; }; StartCopyReadArchive => { FinishCopyTempLog[readArchive, at]; WalnutDB.SetCopyReadArchivePos[0]; WalnutRoot.CommitAndContinue[]; WalnutLog.FinishTempLogCopy[readArchive]; needsParse _ TRUE; }; ENDCASE => ERROR WalnutDefs.Error[$db, $ErroneousInProgress, IO.PutFR["Entry at %g", IO.int[inProgressPos]]]; }; -- end trusted WalnutDB.SetOpInProgressPos[-1]; WalnutRoot.CommitAndContinue[]; }; IF WalnutOpsInternal.errorInProgress THEN ERROR WalnutDefs.Error[$db, $InternalError, "Must do Shutdown & Startup"]; IF WalnutOpsInternal.isShutdown THEN WalnutOpsInternal.Restart[]; WalnutOpsInternal.CarefullyApply[Cip, FALSE]; IF parseInProgress THEN { WalnutOpsInternal.StatsReport["\n Continuing parseInProgress"]; [] _ ParseLog[TRUE]; RETURN }; IF inProgressPos <= 0 THEN RETURN; WalnutOpsInternal.StatsReport["\n Continuing longRunningOperation"]; WalnutOpsInternal.LongRunningApply[Fip]; IF needsParse THEN { numNew: INT _ 0; WalnutOpsInternal.CheckReport["Adding messages to the database\n"]; numNew _ ParseLog[TRUE]; IF numNew = 0 THEN WalnutOpsInternal.CheckReport["No messages were new\n"] ELSE WalnutOpsInternal.CheckReport[ IO.PutFR[" %g new messages\n", IO.int[numNew] ]]; }; }; FinishCopyTempLog: INTERNAL PROC[which: WalnutKernelDefs.WhichTempLog, at: INT] = { logLen: INT _ WalnutLog.LogLength[]; startedCopyAt, startCopyPos, fromPos: INT; pagesAlreadyCopied: INT; IF WalnutLog.SetPosition[at] # 0 THEN ERROR WalnutDefs.Error[$log, $BadLog, IO.PutFR["no entry at %g", IO.int[at]]]; startCopyPos _ WalnutLog.NextEntry[].at; -- skip the copy entry startedCopyAt _ WalnutLog.NextAt[]; fromPos _ logLen - startedCopyAt; pagesAlreadyCopied _ FS.PagesForBytes[fromPos]; IF ~WalnutLog.PrepareToCopyTempLog[which: which, pagesAlreadyCopied: pagesAlreadyCopied, reportProc: WalnutOpsInternal.CheckReport] THEN ERROR WalnutDefs.Error[$log, $DuringOpInProgress, "can't get tempLog for copy"]; WalnutOpsInternal.CheckReport[ IO.PutFR["\nContinuing copy of tempLog, starting at bytePos %g\n", IO.int[fromPos]]]; WalnutLog.CopyTempLog[which, startCopyPos, fromPos, WalnutOpsInternal.CheckReport]; WalnutOpsInternal.CheckReport["\n"]; WalnutDB.SetParseLogInProgress[TRUE]; WalnutDB.SetParseLogPos[WalnutDB.GetOpInProgressPos[]]; WalnutDB.SetOpInProgressPos[-1]; }; ParseLog: PUBLIC INTERNAL PROC[verbose: BOOL_ FALSE] RETURNS[numNew: INT] = { parseLogPos: INT; at: INT; -- position in log of entry just processed lastNumNew, savedNumNew, savedLastNumNew: INT_ 0; wle: WalnutLog.LogEntry; charsSkipped: INT; errorRope: ROPE = "Can't find a log entry after pos %g (log length is %g)"; retries: INT_ 2; accepted: BOOL; commitFrequency: INT _ parseCommitFrequency; updatesSinceLastCommit: INT _ 0; numNew _ 0; DO currentLogLength: INT_ WalnutLog.LogLength[]; -- in case at end of Expunge BEGIN ENABLE WalnutDefs.Error => IF code = $TransactionAbort THEN { WalnutOpsInternal.StatsReport[" *** TransactionAborts during parseLog"]; IF ( retries _ retries - 1) > 0 THEN GOTO tryAgain; WalnutOpsInternal.errorInProgress _ TRUE; WalnutOpsInternal.StatsReport[" *** Too many TransactionAborts during parseLog"]; REJECT } ELSE { WalnutOpsInternal.errorInProgress _ TRUE; WalnutOpsInternal.StatsReport[ IO.PutFR["InternalError: %g during parseLog", IO.atom[code]]]; REJECT }; parseLogPos _ WalnutDB.GetParseLogPos[]; IF parseLogPos = currentLogLength THEN RETURN[numNew]; charsSkipped _ WalnutLog.SetPosition[parseLogPos]; IF charsSkipped = -1 THEN ERROR WalnutDefs.Error[$log, $BadLog, IO.PutFR[errorRope, IO.int[parseLogPos], IO.int[currentLogLength]]]; at _ parseLogPos + charsSkipped; IF at # parseLogPos THEN { WalnutDB.SetParseLogPos[at]; WalnutRoot.CommitAndContinue[]; }; accepted _ NOT WalnutDB.GetAddingServerMsgs[]; DO previousAt: INT _ at; [wle, at]_ WalnutLog.NextEntry[]; IF at = -1 THEN { charsSkipped_ WalnutLog.SetPosition[previousAt+1]; IF charsSkipped = -1 THEN ERROR WalnutDefs.Error[$log, $BadLog, IO.PutFR[errorRope, IO.int[previousAt], IO.int[currentLogLength]]]; [wle, at]_ WalnutLog.NextEntry[] }; IF wle = NIL THEN IF at = -1 THEN ERROR WalnutDefs.Error[$log, $BadLog, IO.PutFR[errorRope, IO.int[previousAt], IO.int[currentLogLength]]] ELSE { -- at#-1 AND wle=NIL => at end WalnutDB.SetParseLogPos[-1]; WalnutDB.SetParseLogInProgress[FALSE]; WalnutRoot.CommitAndContinue[]; RETURN }; IF (updatesSinceLastCommit_ updatesSinceLastCommit + 1) >= commitFrequency THEN { WalnutDB.SetParseLogPos[at]; -- at is beginning of current (unseen) entry WalnutRoot.CommitAndContinue[]; updatesSinceLastCommit _ 0; savedNumNew _ numNew; savedLastNumNew _ lastNumNew; }; BEGIN ENABLE WalnutDefs.VersionMismatch => GOTO continue; TRUSTED { WITH le: wle SELECT FROM LogFileInfo => NULL; ExpungeMsgs => WalnutDB.ExpungeMsgs[dontCareMsgSetVersion]; WriteExpungeLog => { WalnutDB.SetParseLogPos[-1]; WalnutDB.SetParseLogInProgress[FALSE]; WalnutDB.SetOpInProgressPos[at]; -- at is beginning of current entry WalnutRoot.CommitAndContinue[]; [] _ WalnutOpsInternal.DoLogExpunge[le.internalFileID]; EXIT; -- WriteExpungeLog had to be the last entry }; CreateMsgSet => [] _ WalnutDB.CreateMsgSet[ Rope.FromRefText[le.msgSet], dontCareDomainVersion]; EmptyMsgSet => [] _ WalnutDB.EmptyMsgSet[ [Rope.FromRefText[le.msgSet], dontCareMsgSetVersion] ]; DestroyMsgSet => [] _ WalnutDB.DestroyMsgSet[ msgSet: [Rope.FromRefText[le.msgSet], dontCareMsgSetVersion], msDomainVersion: dontCareDomainVersion]; CreateMsg => { me: WalnutKernelDefs.MsgLogEntry _ NARROW[wle]; me.show _ accepted; IF ~WalnutDB.AddNewMsg[me] THEN numNew _ numNew + 1; }; AddMsg => []_ WalnutDB.AddMsg[ msg: Rope.FromRefText[le.msg], from: [NIL, dontCareMsgSetVersion], to: [Rope.FromRefText[le.to], dontCareMsgSetVersion] ]; RemoveMsg => []_ WalnutDB.RemoveMsg[ msg: Rope.FromRefText[le.msg], from: [Rope.FromRefText[le.from], dontCareMsgSetVersion], deletedVersion: dontCareMsgSetVersion ]; MoveMsg => []_ WalnutDB.MoveMsg[ msg: Rope.FromRefText[le.msg], from: [ Rope.FromRefText[le.from], dontCareMsgSetVersion], to: [Rope.FromRefText[le.to], dontCareMsgSetVersion] ]; HasBeenRead => WalnutDB.SetHasBeenRead[msg: Rope.FromRefText[le.msg]]; RecordNewMailInfo => { WalnutDB.SetNewMailInfo[ le.logLen, le.when, Rope.FromRefText[le.server], le.num]; WalnutOpsInternal.newMailSomewhere _ TRUE; }; StartCopyNewMail => { WalnutDB.SetCopyMailLogPos[at]; WalnutDB.SetAddingServerMsgs[TRUE]; accepted _ FALSE; }; EndCopyNewMailInfo => { WalnutDB.SetCopyMailLogPos[0]; WalnutDB.SetNewMailInfo[0, BasicTime.nullGMT, NIL, 0]; WalnutDB.SetAddingServerMsgs[FALSE]; accepted _ TRUE; }; AcceptNewMail => { WalnutDB.AcceptNewMail[at, dontCareMsgSetVersion]; WalnutOpsInternal.newMailSomewhere _ FALSE; }; StartReadArchiveFile => WalnutDB.SetReadArchivePos[at]; EndReadArchiveFile => WalnutDB.SetReadArchivePos[0]; StartCopyReadArchive => WalnutDB.SetCopyReadArchivePos[at]; EndCopyReadArchiveInfo => WalnutDB.SetCopyReadArchivePos[0]; ENDCASE => ERROR; }; EXITS continue => NULL; END; IF verbose THEN IF numNew # lastNumNew THEN IF numNew MOD 10 = 0 THEN IF numNew MOD 100 = 0 THEN WalnutOpsInternal.CheckReport[IO.PutFR["(%g)", IO.int[numNew]]] ELSE WalnutOpsInternal.CheckReport["~"]; lastNumNew_ numNew; ENDLOOP; EXITS tryAgain => { numNew _ savedNumNew; lastNumNew _ savedLastNumNew; }; END; ENDLOOP; }; END. ΄WalnutOpsParseLogImpl.mesa Copyright c 1985 by Xerox Corporation. All rights reserved. Willie-Sue, July 11, 1985 1:37:42 pm PDT Donahue, August 5, 1985 2:46:06 pm PDT Implementation of CheckInProgress and ParseLog (for newMail, readArchive and scavenge) Types Variables Procedures Is some long running operation in progress Parsing the Log - assumes the log is write-locked make sure things are in good shape before starting Κ -– "cedar" style˜šΟb™Jšœ Οmœ1™JšœŸœ˜(—J˜.JšŸœŸœŸœ˜"J˜J˜—š‘œŸ œ Ÿœ ˜—JšŸ˜J˜J˜——Jšœ(˜(J˜JšŸœ ŸœŸœ ˜6Jšœ2˜2šŸœŸ˜šŸœ ˜%JšŸœŸœŸœ˜D——J˜Jšœ ˜ J™Jšœ2™2šŸœŸœ˜Jšœ˜J˜J˜—J˜Jšœ Ÿœ ˜.J˜šŸ˜Jšœ Ÿœ˜J˜!šŸœ Ÿœ˜Jšœ2˜2šŸœŸ˜šŸœ ˜%JšŸœŸœŸœ˜C——J˜ J˜—šŸœŸœ˜ šŸœŸœ Ÿ˜šŸœ ˜%JšŸœŸœŸœ˜B——šŸœ ˜(Jšœ˜JšœŸœ˜&Jšœ˜JšŸ˜J˜——J˜šŸœIŸœ˜QJšœ ,˜JJšœ˜Jšœ˜Jšœ˜Jšœ˜J˜—J˜JšŸ œŸœ Ÿ˜9J˜šŸœŸœ ŸœŸ˜"J˜JšœŸœ˜J˜Jšœ;˜;J˜šœ˜Jšœ˜JšœŸœ˜&Jšœ" #˜EJ˜Jšœ7˜7JšŸœ +˜3J˜—J˜šœ˜šœ˜Jšœ4˜4——J˜šœ˜šœ˜Jšœ7˜7——J˜šœ˜šœ˜Jšœ=˜=Jšœ(˜(——J˜šœ˜Jšœ#Ÿœ˜/J˜JšŸœŸœ˜4J˜—J˜šœ ˜ šœ˜Jšœ˜JšœŸœ˜#Jšœ7˜7——J˜šœ ˜ šœ˜Jšœ˜Jšœ9˜9Jšœ(˜(——J˜˜ ˜Jšœ˜Jšœ:˜:Jšœ7˜7——J˜JšœF˜FJ˜šœ˜šœ˜Jšœ9˜9—Jšœ%Ÿœ˜*Jšœ˜—J˜šœ˜Jšœ˜JšœŸœ˜#Jšœ Ÿœ˜J˜—J˜šœ˜Jšœ˜Jšœ.Ÿœ˜6JšœŸœ˜$Jšœ Ÿœ˜J˜—J˜˜Jšœ2˜2Jšœ%Ÿœ˜+J˜—J˜Jšœ7˜7J˜Jšœ4˜4J˜šœ˜Jšœ#˜#—J˜Jšœ<˜