<> <> <> <> <<>> <> <<>> <> <> <<>> DIRECTORY BasicTime USING [GMT, nullGMT, Now], FS USING [BytesForPages, PagesForBytes, StreamOpen], IO, RefText USING [Length], Rope, UserCredentials USING [Get], ViewerClasses USING [Viewer], ViewerIO USING [CreateViewerStreams, GetViewerFromStream], ViewerOps USING [FindViewer], WalnutDB --using lots-- , WalnutDefs USING [Error, Segment, VersionMismatch], WalnutKernelDefs USING [LogEntry, LogExpungePhase, MsgLogEntry, WhichTempLog], WalnutLog --using lots-- , WalnutLogExpunge --using lots-- , WalnutOps, WalnutOpsInternal, WalnutRoot USING [AbortTransaction, AcquireWriteLock, CommitAndContinue, GetStreamsForExpunge, RegisterStatsProc, StartTransaction, SwapLogs]; WalnutOpsInternalImpl: CEDAR PROGRAM IMPORTS FS, IO, RefText, Rope, UserCredentials, ViewerIO, ViewerOps, WalnutDB, WalnutDefs, WalnutLog, WalnutLogExpunge, WalnutOpsInternal, WalnutRoot EXPORTS WalnutOpsInternal = BEGIN OPEN WalnutOps, WalnutOpsInternal; <> ROPE: TYPE = Rope.ROPE; STREAM: TYPE = IO.STREAM; GMT: TYPE = BasicTime.GMT; <> <> <> statsStream, statsProgressStream: PUBLIC STREAM _ NIL; heavyhandedDebugging: BOOL _ FALSE; statsProgressTS: PUBLIC ViewerClasses.Viewer _ NIL; mailStream: PUBLIC STREAM; expungeCommitFrequency: INT = 20; commitFrequency: INT _ expungeCommitFrequency; parseCommitFrequency: INT = 60; dontCareMsgSetVersion: INT = WalnutOps.dontCareMsgSetVersion; dontCareDomainVersion: INT = WalnutOps.dontCareDomainVersion; <> <> <<-- * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *>> StartStatsReporting: PUBLIC PROC = { statsFile: ROPE _ Rope.Cat["///Users/", UserCredentials.Get[].name, "/WalnutStats.log"]; IF statsStream # NIL THEN statsStream.Close[ ! IO.Error => CONTINUE]; statsStream _ FS.StreamOpen[fileName: statsFile, accessOptions: $create, keep: 4]; IF heavyhandedDebugging THEN { name: ROPE = "Walnut Stats"; statsProgressTS _ ViewerOps.FindViewer[name]; statsProgressStream _ ViewerIO.CreateViewerStreams[name, statsProgressTS].out; IF statsProgressTS = NIL THEN statsProgressTS _ ViewerIO.GetViewerFromStream[statsProgressStream]; statsProgressTS.inhibitDestroy _ TRUE; }; WalnutRoot.RegisterStatsProc[StatsReport]; }; StatsReport: PUBLIC PROC[msg: ROPE] = { r: ROPE; IF statsStream = NIL THEN RETURN; statsStream.PutRope[r _ IO.PutFR["\n %g @ %g", IO.rope[msg], IO.time[]]]; IF statsProgressStream # NIL THEN statsProgressStream.PutRope[r]; }; <> CarefullyApply: PUBLIC PROC[proc: PROC[], didUpdate: BOOL] = { reTryCount: INT _ 2; schemaInvalid: BOOL _ TRUE; recentActivity _ TRUE; IF ~started THEN ERROR WalnutDefs.Error[$root, $NotStarted, "Must do WalnutOps.Startup"]; DO BEGIN ENABLE WalnutDefs.Error => { IF code = $TransactionAbort THEN { StatsReport[" **** TransactionAbort"]; IF ( reTryCount _ reTryCount - 1) > 0 THEN GOTO retry; errorInProgress _ TRUE; StatsReport[" *** Too many TransactionAbort's during WalnutOps call"]; REJECT }; errorInProgress _ TRUE; StatsReport[IO.PutFR[" *** WalnutDefs.Error: %g", IO.atom[code]]]; REJECT }; proc[]; IF didUpdate THEN WalnutRoot.CommitAndContinue[]; RETURN; EXITS retry => NULL; END; WalnutLog.ForgetLogStreams[]; WalnutRoot.AbortTransaction[]; IF (schemaInvalid _ WalnutRoot.StartTransaction[]) THEN WalnutDB.InitSchema[walnutSegment]; WalnutLog.OpenLogStreams[]; ENDLOOP; }; LongRunningApply: PUBLIC PROC[proc: PROC[inProgress: BOOL]] = { reTryCount: INT _ 2; alreadyCalled: BOOL _ FALSE; schemaInvalid: BOOL _ TRUE; recentActivity _ TRUE; isShutdown _ FALSE; IF ~started THEN ERROR WalnutDefs.Error[$root, $NotStarted, "Must do WalnutOps.Startup"]; DO BEGIN ENABLE WalnutDefs.Error => { IF code = $TransactionAbort THEN { StatsReport[" **** TransactionAbort"]; IF ( reTryCount _ reTryCount - 1) > 0 THEN GOTO retry; errorInProgress _ TRUE; StatsReport[" *** Too many TransactionAbort's during LongRunning op"]; REJECT }; errorInProgress _ TRUE; StatsReport[IO.PutFR[" *** WalnutDefs.Error: %g", IO.atom[code]]]; REJECT }; WalnutLog.AcquireWriteLock[]; proc[alreadyCalled ! WalnutDefs.VersionMismatch => { ReleaseWriteLock[]; REJECT} ]; ReleaseWriteLock[]; -- also does commit RETURN; EXITS retry => NULL; END; WalnutLog.ForgetLogStreams[]; WalnutRoot.AbortTransaction[]; IF (schemaInvalid _ WalnutRoot.StartTransaction[]) THEN WalnutDB.InitSchema[walnutSegment]; WalnutLog.OpenLogStreams[]; alreadyCalled _ WalnutDB.GetOpInProgressPos[] > 0; ENDLOOP; }; Restart: PUBLIC PROC = { -- need to re-open the transaction schemaInvalid: BOOL = WalnutRoot.StartTransaction[]; IF schemaInvalid THEN WalnutDB.InitSchema[walnutSegment]; WalnutLog.OpenLogStreams[]; isShutdown _ FALSE; }; ReleaseWriteLock: PROC = { WalnutRoot.CommitAndContinue[]; WalnutLog.ReleaseWriteLock[]; }; <<* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *>> <> <<>> DoLogExpunge: PUBLIC PROC[expungeID: INT] = { <> <> DO expPhase: WalnutKernelDefs.LogExpungePhase_ WalnutDB.GetLogExpungePhase[]; SELECT expPhase FROM idle => { WalnutOpsInternal.StatsReport["\n ~~~ Starting log expunge"]; WalnutDB.SetLogExpungePhase[initializingExpungeLog]; WalnutRoot.CommitAndContinue[]; commitFrequency_ expungeCommitFrequency; }; initializingExpungeLog => { bytesInDestroyedMsgs: INT_ WalnutDB.GetExpungeInfo[].bytesInDestroyedMsgs; logLen: INT_ WalnutLog.LogLength[]; pagesNeeded: INT_ FS.PagesForBytes[logLen-bytesInDestroyedMsgs]; expungeFileID: INT; WalnutLog.ReturnCurrentLogStreams[]; expungeFileID _ WalnutLogExpunge.StartExpunge[pagesNeeded]; WalnutDB.SetExpungeFileID[expungeFileID]; WalnutDB.SetLogExpungePhase[writingExpungeLog]; WalnutRoot.CommitAndContinue[]; }; writingExpungeLog => { <> <> WalnutLog.ReturnCurrentLogStreams[]; -- in case of restart <> <<"\n Copying the head (%g bytes) of the log, then dumping the tail (%g bytes)\n",>> <> [] _ WriteExpungeLog[]; WalnutLogExpunge.EndExpunge[]; WalnutDB.SetLogExpungePhase[swappingLogs]; WalnutRoot.CommitAndContinue[]; }; swappingLogs => { expungeFileID: INT _ WalnutDB.GetExpungeFileID[]; date: BasicTime.GMT; logLen: INT; WalnutLog.ReturnCurrentLogStreams[]; -- in case of restart [] _ WalnutRoot.GetStreamsForExpunge[]; WalnutOpsInternal.CheckReport["\n Swapping Log Files\n"]; WalnutDB.SetExpungeProgressInfo[0,0]; WalnutDB.SetExpungeInfo[0, 0]; WalnutDB.SetOpInProgressPos[-1]; WalnutDB.SetLogExpungePhase[idle]; -- whew WalnutDB.SetTimeOfLastExpunge[BasicTime.Now[]]; WalnutDB.SetRootFileVersion[date _ BasicTime.Now[]]; logLen _ WalnutRoot.SwapLogs[expungeFileID, date]; WalnutRoot.CommitAndContinue[]; WalnutOpsInternal.StatsReport["\n ~~~ Finished expunge"]; WalnutLog.OpenLogStreams[]; RETURN; }; ENDCASE => ERROR; ENDLOOP; }; WriteExpungeLog: PROC = { <> numMsgs: INT _ 0; currentLogPos, expungeLogPos: INT; previousAt, at: INT; startRetryCount: INT = 2; retryCount: INT _ startRetryCount; bytesToCopyBeforeFlush: INT _ 200000; lastExpungeCommitLength: INT _ -1; DO BEGIN ENABLE WalnutDefs.Error => IF code = $TransactionAbort THEN GOTO retry ELSE REJECT; bytesCopiedSinceLastCommit: INT _ 0; lastAcceptNewMailPos: INT; firstDestroyedMsgPos: INT _ WalnutDB.GetExpungeInfo[].firstDestroyedMsgPos; [currentLogPos, expungeLogPos] _ WalnutDB.GetExpungeProgressInfo[]; IF lastExpungeCommitLength = expungeLogPos THEN { IF (retryCount _ retryCount - 1) <= 0 THEN ERROR WalnutDefs.Error[$log, $TooManyLogAborts, IO.PutFR["\n During Expunge - last commit pos in expungeLog was %g", IO.int[expungeLogPos]]]; } ELSE { lastExpungeCommitLength _ expungeLogPos; retryCount _ startRetryCount; }; IF ~WalnutLogExpunge.RestartExpunge[currentLogPos, expungeLogPos] THEN { pagesNeeded: INT _ FS.PagesForBytes[WalnutLogExpunge.ExpLogLength[] - WalnutDB.GetExpungeInfo[].bytesInDestroyedMsgs]; WalnutOpsInternal.CheckReport[IO.PutFR[ "\n ****Could not restart expunge from %g in expungeLog, %g in currentLog", IO.int[expungeLogPos], IO.int[currentLogPos]], "\n restarting Log Expunge"]; expungeLogPos _ currentLogPos _ 0; WalnutDB.SetExpungeProgressInfo[currentLogPos, expungeLogPos]; -- commits [] _ WalnutLogExpunge.StartExpunge[pagesNeeded]; -- try again WalnutOpsInternal.StatsReport["Restarting expunge from beginning"]; IF ~WalnutLogExpunge.RestartExpunge[currentLogPos, expungeLogPos] THEN WalnutDefs.Error[$log, $CantStartExpunge]; lastExpungeCommitLength _ expungeLogPos; }; IF currentLogPos < firstDestroyedMsgPos THEN { -- copyingHead phase bytesLeftToCopyInHead: INT _ firstDestroyedMsgPos - currentLogPos; bytesToCopyAtOneTime: INT = FS.BytesForPages[400]; IF currentLogPos = 0 THEN { WalnutLogExpunge.ExpSetIndex[0]; [] _ WalnutLogExpunge.SkipEntry[]; -- need to skip the LogFileInfo entry bytesLeftToCopyInHead _ bytesLeftToCopyInHead - WalnutLogExpunge.GetIndex[]; }; WalnutOpsInternal.CheckReport[ IO.PutFR["\n Copying the head of the log, starting at bytePos %g, ending at bytePos %g\n", IO.int[currentLogPos], IO.int[firstDestroyedMsgPos] ]]; DO bytesToCopyThisTime: INT _ MIN[bytesLeftToCopyInHead, bytesToCopyAtOneTime]; WalnutLogExpunge.CopyBytesToExpungeLog[bytesToCopyThisTime]; [currentLogPos, expungeLogPos] _ WalnutLogExpunge.GetExpungeProgress[]; WalnutDB.SetExpungeProgressInfo[currentLogPos, expungeLogPos]; WalnutRoot.CommitAndContinue[]; WalnutOpsInternal.CheckReport["#"]; bytesLeftToCopyInHead _ bytesLeftToCopyInHead - bytesToCopyThisTime; IF bytesLeftToCopyInHead = 0 THEN EXIT; ENDLOOP; retryCount _ startRetryCount; -- in case has abort during copy }; <> lastAcceptNewMailPos _ WalnutDB.GetAcceptNewMailPos[]; WalnutLogExpunge.ExpSetIndex[at _ currentLogPos]; -- before doing PeekEntry WalnutOpsInternal.CheckReport[ IO.PutFR["\nProcessing the tail of the log, starting at bytePos %g\n", IO.int[at] ]]; DO ident: ATOM; msgID: REF TEXT; bytesThisCopy: INT _ 0; previousAt _ at; [ident, msgID, at] _ WalnutLogExpunge.PeekEntry[]; IF ident = NIL THEN IF at # -1 THEN EXIT -- end of log ELSE { charSkipped: INT _ WalnutLogExpunge.ExpSetPosition[previousAt+1]; IF charSkipped = -1 THEN ERROR WalnutDefs.Error[$log, $DuringExpunge, IO.PutFR["No entry found after %g", IO.int[previousAt]]]; at _ previousAt + charSkipped; LOOP; }; SELECT ident FROM $ExpungeMsgs => [] _ WalnutLogExpunge.SkipEntry[]; $WriteExpungeLog => EXIT; -- must be last entry on PREVIOUS LOG $LogFileInfo => [] _ WalnutLogExpunge.SkipEntry[]; $RecordNewMailInfo, $StartCopyNewMail => IF at < lastAcceptNewMailPos THEN [] _ WalnutLogExpunge.SkipEntry[] ELSE bytesThisCopy _ WalnutLogExpunge.CopyEntry[].bytesCopied; $EndCopyNewMailInfo => { IF at < lastAcceptNewMailPos THEN { -- maybe skip startCopyPos: INT _ WalnutLogExpunge.EndCopyEntry[]; IF firstDestroyedMsgPos < startCopyPos THEN [] _ WalnutLogExpunge.SkipEntry[] ELSE bytesThisCopy _ WalnutLogExpunge.CopyEntry[].bytesCopied; } ELSE bytesThisCopy _ WalnutLogExpunge.CopyEntry[].bytesCopied; }; $AcceptNewMail => IF at < lastAcceptNewMailPos THEN [] _ WalnutLogExpunge.SkipEntry[] ELSE { newLogPos: INT; [newLogPos, bytesThisCopy] _ WalnutLogExpunge.CopyEntry[]; WalnutDB.SetAcceptNewMailPos[newLogPos]; }; $StartCopyReadArchive => [] _ WalnutLogExpunge.SkipEntry[]; $EndCopyReadArchiveInfo => { startCopyPos: INT _ WalnutLogExpunge.EndCopyEntry[]; IF firstDestroyedMsgPos < startCopyPos THEN [] _ WalnutLogExpunge.SkipEntry[] ELSE bytesThisCopy _ WalnutLogExpunge.CopyEntry[].bytesCopied; }; ENDCASE => IF RefText.Length[msgID] = 0 OR WalnutDB.MsgExists[Rope.FromRefText[msgID]] THEN { <> <> newLogPos: INT ; [newLogPos, bytesThisCopy] _ WalnutLogExpunge.CopyEntry[]; IF ident = $CreateMsg THEN { <> IF newLogPos # at THEN WalnutDB.SetMsgEntryPosition[to: newLogPos]; IF (numMsgs _ numMsgs + 1) MOD 10 = 0 THEN IF numMsgs MOD 100 = 0 THEN WalnutOpsInternal.CheckReport[IO.PutFR["(%g)", IO.int[numMsgs]]] ELSE WalnutOpsInternal.CheckReport["."]; }; } <> ELSE [] _ WalnutLogExpunge.SkipEntry[]; IF (bytesCopiedSinceLastCommit _ bytesCopiedSinceLastCommit + bytesThisCopy) >= bytesToCopyBeforeFlush THEN { expLogPos: INT; [currentLogPos, expLogPos] _ WalnutLogExpunge.GetExpungeProgress[]; WalnutDB.SetExpungeProgressInfo[currentLogPos, expLogPos]; WalnutRoot.CommitAndContinue[]; lastExpungeCommitLength _ expLogPos; bytesCopiedSinceLastCommit _ 0; }; ENDLOOP; <> IF bytesCopiedSinceLastCommit > 0 THEN { [currentLogPos, expungeLogPos] _ WalnutLogExpunge.GetExpungeProgress[]; WalnutDB.SetExpungeProgressInfo[currentLogPos, expungeLogPos]; WalnutRoot.CommitAndContinue[]; lastExpungeCommitLength _ expungeLogPos; }; RETURN; EXITS retry => { schemaInvalid: BOOL _ TRUE; WalnutLogExpunge.ExpShutdown[]; WalnutRoot.AbortTransaction[]; schemaInvalid _ WalnutRoot.StartTransaction[]; IF schemaInvalid THEN WalnutDB.InitSchema[walnutSegment]; WalnutLog.OpenLogStreams[]; -- awkward WalnutLog.ReturnCurrentLogStreams[]; }; END; ENDLOOP; }; <> <> CheckInProgress: PUBLIC PROC = { inProgressPos: INT; parseInProgress: BOOL; needsParse: BOOL _ FALSE; Cip: PROC = { IF (parseInProgress _ WalnutDB.GetParseLogInProgress[]) THEN { WalnutRoot.AcquireWriteLock[]; RETURN }; inProgressPos _ WalnutDB.GetOpInProgressPos[]; IF inProgressPos <= 0 THEN RETURN; WalnutRoot.AcquireWriteLock[]; }; Fip: PROC[inProgress: BOOL] = { -- FinishInProgress at: INT; wle: WalnutKernelDefs.LogEntry; WalnutLog.SetIndex[inProgressPos]; [wle, at] _ WalnutLog.NextEntry[]; IF at = -1 THEN ERROR WalnutDefs.Error[$log, $BadLog, IO.PutFR["No entry at %g", IO.int[inProgressPos]]]; TRUSTED { WITH le: wle SELECT FROM ExpungeMsgs => WalnutDB.ExpungeMsgs[dontCareMsgSetVersion, WalnutOpsInternal.CheckReport]; WriteExpungeLog => []_ WalnutOpsInternal.DoLogExpunge[le.internalFileID]; EmptyMsgSet => []_ WalnutDB.EmptyMsgSet[ [Rope.FromRefText[le.msgSet], dontCareMsgSetVersion], WalnutOpsInternal.CheckReport ]; DestroyMsgSet => []_ WalnutDB.DestroyMsgSet[ msgSet: [Rope.FromRefText[le.msgSet], dontCareMsgSetVersion], msDomainVersion: dontCareDomainVersion, report: WalnutOpsInternal.CheckReport ]; AcceptNewMail => { WalnutDB.AcceptNewMail[at, dontCareMsgSetVersion]; WalnutOpsInternal.newMailSomewhere _ FALSE; }; StartCopyNewMail => { FinishCopyTempLog[newMail, at]; WalnutDB.SetCopyMailLogPos[0]; WalnutRoot.CommitAndContinue[]; WalnutLog.FinishTempLogCopy[newMail]; needsParse _ TRUE; }; StartReadArchiveFile => { WalnutLog.ResetLog[at]; WalnutDB.SetReadArchivePos[0]; }; StartCopyReadArchive => { FinishCopyTempLog[readArchive, at]; WalnutDB.SetCopyReadArchivePos[0]; WalnutRoot.CommitAndContinue[]; WalnutLog.FinishTempLogCopy[readArchive]; needsParse _ TRUE; }; ENDCASE => ERROR WalnutDefs.Error[$db, $ErroneousInProgress, IO.PutFR["Entry at %g", IO.int[inProgressPos]]]; }; -- end trusted WalnutDB.SetOpInProgressPos[-1]; WalnutRoot.CommitAndContinue[]; }; IF WalnutOpsInternal.errorInProgress THEN ERROR WalnutDefs.Error[$db, $InternalError, "Must do Shutdown & Startup"]; IF WalnutOpsInternal.isShutdown THEN WalnutOpsInternal.Restart[]; WalnutOpsInternal.CarefullyApply[Cip, FALSE]; IF parseInProgress THEN { WalnutOpsInternal.StatsReport["\n Continuing parseInProgress"]; [] _ ParseLog[TRUE]; RETURN }; IF inProgressPos <= 0 THEN RETURN; WalnutOpsInternal.StatsReport["\n Continuing longRunningOperation"]; WalnutOpsInternal.LongRunningApply[Fip]; IF needsParse THEN { numNew: INT _ 0; WalnutOpsInternal.CheckReport["Adding messages to the database\n"]; numNew _ ParseLog[TRUE]; IF numNew = 0 THEN WalnutOpsInternal.CheckReport["No messages were new\n"] ELSE WalnutOpsInternal.CheckReport[ IO.PutFR[" %g new messages\n", IO.int[numNew] ]]; }; }; FinishCopyTempLog: PROC[which: WalnutKernelDefs.WhichTempLog, at: INT] = { logLen: INT _ WalnutLog.LogLength[]; startedCopyAt, startCopyPos, fromPos: INT; pagesAlreadyCopied: INT; IF WalnutLog.SetPosition[at] # 0 THEN ERROR WalnutDefs.Error[$log, $BadLog, IO.PutFR["no entry at %g", IO.int[at]]]; startCopyPos _ WalnutLog.NextEntry[].at; -- skip the copy entry startedCopyAt _ WalnutLog.NextAt[]; fromPos _ logLen - startedCopyAt; pagesAlreadyCopied _ FS.PagesForBytes[fromPos]; IF ~WalnutLog.PrepareToCopyTempLog[which: which, pagesAlreadyCopied: pagesAlreadyCopied, reportProc: WalnutOpsInternal.CheckReport] THEN ERROR WalnutDefs.Error[$log, $DuringOpInProgress, "can't get tempLog for copy"]; WalnutOpsInternal.CheckReport[ IO.PutFR["\nContinuing copy of tempLog, starting at bytePos %g\n", IO.int[fromPos]]]; WalnutLog.CopyTempLog[which, startCopyPos, fromPos, WalnutOpsInternal.CheckReport]; WalnutOpsInternal.CheckReport["\n"]; WalnutDB.SetParseLogInProgress[TRUE]; WalnutDB.SetParseLogPos[WalnutDB.GetOpInProgressPos[]]; WalnutDB.SetOpInProgressPos[-1]; }; <> ParseLog: PUBLIC PROC[verbose: BOOL_ FALSE] RETURNS[numNew: INT] = { parseLogPos: INT; at: INT; -- position in log of entry just processed lastNumNew, savedNumNew, savedLastNumNew: INT_ 0; wle: WalnutLog.LogEntry; charsSkipped: INT; errorRope: ROPE = "Can't find a log entry after pos %g (log length is %g)"; retries: INT_ 2; accepted: BOOL; commitFrequency: INT _ parseCommitFrequency; updatesSinceLastCommit: INT _ 0; numNew _ 0; DO currentLogLength: INT_ WalnutLog.LogLength[]; -- in case at end of Expunge BEGIN ENABLE WalnutDefs.Error => IF code = $TransactionAbort THEN { WalnutOpsInternal.StatsReport[" *** TransactionAborts during parseLog"]; IF ( retries _ retries - 1) > 0 THEN GOTO tryAgain; WalnutOpsInternal.errorInProgress _ TRUE; WalnutOpsInternal.StatsReport[" *** Too many TransactionAborts during parseLog"]; REJECT } ELSE { WalnutOpsInternal.errorInProgress _ TRUE; WalnutOpsInternal.StatsReport[ IO.PutFR["InternalError: %g during parseLog", IO.atom[code]]]; REJECT }; parseLogPos _ WalnutDB.GetParseLogPos[]; IF parseLogPos = currentLogLength THEN RETURN[numNew]; charsSkipped _ WalnutLog.SetPosition[parseLogPos]; IF charsSkipped = -1 THEN ERROR WalnutDefs.Error[$log, $BadLog, IO.PutFR[errorRope, IO.int[parseLogPos], IO.int[currentLogLength]]]; at _ parseLogPos + charsSkipped; <<>> <> IF at # parseLogPos THEN { WalnutDB.SetParseLogPos[at]; WalnutRoot.CommitAndContinue[]; }; accepted _ NOT WalnutDB.GetAddingServerMsgs[]; DO previousAt: INT _ at; [wle, at]_ WalnutLog.NextEntry[]; IF at = -1 THEN { charsSkipped_ WalnutLog.SetPosition[previousAt+1]; IF charsSkipped = -1 THEN ERROR WalnutDefs.Error[$log, $BadLog, IO.PutFR[errorRope, IO.int[previousAt], IO.int[currentLogLength]]]; [wle, at]_ WalnutLog.NextEntry[] }; IF wle = NIL THEN IF at = -1 THEN ERROR WalnutDefs.Error[$log, $BadLog, IO.PutFR[errorRope, IO.int[previousAt], IO.int[currentLogLength]]] ELSE { -- at#-1 AND wle=NIL => at end WalnutDB.SetParseLogPos[-1]; WalnutDB.SetParseLogInProgress[FALSE]; WalnutRoot.CommitAndContinue[]; RETURN }; IF (updatesSinceLastCommit_ updatesSinceLastCommit + 1) >= commitFrequency THEN { WalnutDB.SetParseLogPos[at]; -- at is beginning of current (unseen) entry WalnutRoot.CommitAndContinue[]; updatesSinceLastCommit _ 0; savedNumNew _ numNew; savedLastNumNew _ lastNumNew; }; BEGIN ENABLE WalnutDefs.VersionMismatch => GOTO continue; TRUSTED { WITH le: wle SELECT FROM LogFileInfo => NULL; ExpungeMsgs => WalnutDB.ExpungeMsgs[dontCareMsgSetVersion, WalnutOpsInternal.CheckReport]; WriteExpungeLog => { WalnutDB.SetParseLogPos[-1]; WalnutDB.SetParseLogInProgress[FALSE]; WalnutDB.SetOpInProgressPos[at]; -- at is beginning of current entry WalnutRoot.CommitAndContinue[]; [] _ WalnutOpsInternal.DoLogExpunge[le.internalFileID]; EXIT; -- WriteExpungeLog had to be the last entry }; CreateMsgSet => [] _ WalnutDB.CreateMsgSet[ Rope.FromRefText[le.msgSet], dontCareDomainVersion]; EmptyMsgSet => [] _ WalnutDB.EmptyMsgSet[ [Rope.FromRefText[le.msgSet], dontCareMsgSetVersion], WalnutOpsInternal.CheckReport ]; DestroyMsgSet => [] _ WalnutDB.DestroyMsgSet[ msgSet: [Rope.FromRefText[le.msgSet], dontCareMsgSetVersion], msDomainVersion: dontCareDomainVersion, report: WalnutOpsInternal.CheckReport ]; CreateMsg => { me: WalnutKernelDefs.MsgLogEntry _ NARROW[wle]; me.show _ accepted; IF ~WalnutDB.AddNewMsg[me] THEN numNew _ numNew + 1; }; AddMsg => []_ WalnutDB.AddMsg[ msg: Rope.FromRefText[le.msg], from: [NIL, dontCareMsgSetVersion], to: [Rope.FromRefText[le.to], dontCareMsgSetVersion] ]; RemoveMsg => []_ WalnutDB.RemoveMsg[ msg: Rope.FromRefText[le.msg], from: [Rope.FromRefText[le.from], dontCareMsgSetVersion], deletedVersion: dontCareMsgSetVersion ]; MoveMsg => []_ WalnutDB.MoveMsg[ msg: Rope.FromRefText[le.msg], from: [ Rope.FromRefText[le.from], dontCareMsgSetVersion], to: [Rope.FromRefText[le.to], dontCareMsgSetVersion] ]; HasBeenRead => WalnutDB.SetHasBeenRead[msg: Rope.FromRefText[le.msg]]; RecordNewMailInfo => { WalnutDB.SetNewMailInfo[ le.logLen, le.when, Rope.FromRefText[le.server], le.num]; WalnutOpsInternal.newMailSomewhere _ TRUE; }; StartCopyNewMail => { WalnutDB.SetCopyMailLogPos[at]; WalnutDB.SetAddingServerMsgs[TRUE]; accepted _ FALSE; }; EndCopyNewMailInfo => { WalnutDB.SetCopyMailLogPos[0]; WalnutDB.SetNewMailInfo[0, BasicTime.nullGMT, NIL, 0]; WalnutDB.SetAddingServerMsgs[FALSE]; accepted _ TRUE; }; AcceptNewMail => { WalnutDB.AcceptNewMail[at, dontCareMsgSetVersion]; WalnutOpsInternal.newMailSomewhere _ FALSE; }; StartReadArchiveFile => WalnutDB.SetReadArchivePos[at]; EndReadArchiveFile => WalnutDB.SetReadArchivePos[0]; StartCopyReadArchive => WalnutDB.SetCopyReadArchivePos[at]; EndCopyReadArchiveInfo => WalnutDB.SetCopyReadArchivePos[0]; ENDCASE => ERROR; }; EXITS continue => NULL; END; IF verbose THEN IF numNew # lastNumNew THEN IF numNew MOD 10 = 0 THEN IF numNew MOD 100 = 0 THEN WalnutOpsInternal.CheckReport[IO.PutFR["(%g)", IO.int[numNew]]] ELSE WalnutOpsInternal.CheckReport["~"]; lastNumNew_ numNew; ENDLOOP; EXITS tryAgain => { numNew _ savedNumNew; lastNumNew _ savedLastNumNew; }; END; ENDLOOP; }; END.