DIRECTORY BasicTime USING [GMT, nullGMT, Now], FS USING [BytesForPages, PagesForBytes, StreamOpen], IO, RefText USING [Length], Rope, UserCredentials USING [Get], ViewerClasses USING [Viewer], ViewerIO USING [CreateViewerStreams, GetViewerFromStream], ViewerOps USING [FindViewer], WalnutDB --using lots-- , WalnutDefs USING [Error, Segment, VersionMismatch], WalnutKernelDefs USING [LogEntry, LogExpungePhase, MsgLogEntry, WhichTempLog], WalnutLog --using lots-- , WalnutLogExpunge --using lots-- , WalnutOps, WalnutOpsInternal, WalnutRoot USING [AbortTransaction, AcquireWriteLock, CommitAndContinue, GetStreamsForExpunge, RegisterStatsProc, StartTransaction, SwapLogs]; WalnutOpsInternalImpl: CEDAR PROGRAM IMPORTS FS, IO, RefText, Rope, UserCredentials, ViewerIO, ViewerOps, WalnutDB, WalnutDefs, WalnutLog, WalnutLogExpunge, WalnutOpsInternal, WalnutRoot EXPORTS WalnutOpsInternal = BEGIN OPEN WalnutOps, WalnutOpsInternal; ROPE: TYPE = Rope.ROPE; STREAM: TYPE = IO.STREAM; GMT: TYPE = BasicTime.GMT; statsStream, statsProgressStream: PUBLIC STREAM _ NIL; heavyhandedDebugging: BOOL _ FALSE; statsProgressTS: PUBLIC ViewerClasses.Viewer _ NIL; mailStream: PUBLIC STREAM; expungeCommitFrequency: INT = 20; commitFrequency: INT _ expungeCommitFrequency; parseCommitFrequency: INT = 60; dontCareMsgSetVersion: INT = WalnutOps.dontCareMsgSetVersion; dontCareDomainVersion: INT = WalnutOps.dontCareDomainVersion; StartStatsReporting: PUBLIC PROC = { statsFile: ROPE _ Rope.Cat["///Users/", UserCredentials.Get[].name, "/WalnutStats.log"]; IF statsStream # NIL THEN statsStream.Close[ ! IO.Error => CONTINUE]; statsStream _ FS.StreamOpen[fileName: statsFile, accessOptions: $create, keep: 4]; IF heavyhandedDebugging THEN { name: ROPE = "Walnut Stats"; statsProgressTS _ ViewerOps.FindViewer[name]; statsProgressStream _ ViewerIO.CreateViewerStreams[name, statsProgressTS].out; IF statsProgressTS = NIL THEN statsProgressTS _ ViewerIO.GetViewerFromStream[statsProgressStream]; statsProgressTS.inhibitDestroy _ TRUE; }; WalnutRoot.RegisterStatsProc[StatsReport]; }; StatsReport: PUBLIC PROC[msg: ROPE] = { r: ROPE; IF statsStream = NIL THEN RETURN; statsStream.PutRope[r _ IO.PutFR["\n %g @ %g", IO.rope[msg], IO.time[]]]; IF statsProgressStream # NIL THEN statsProgressStream.PutRope[r]; }; CarefullyApply: PUBLIC PROC[proc: PROC[], didUpdate: BOOL] = { reTryCount: INT _ 2; schemaInvalid: BOOL _ TRUE; recentActivity _ TRUE; IF ~started THEN ERROR WalnutDefs.Error[$root, $NotStarted, "Must do WalnutOps.Startup"]; DO BEGIN ENABLE WalnutDefs.Error => { IF code = $TransactionAbort THEN { StatsReport[" **** TransactionAbort"]; IF ( reTryCount _ reTryCount - 1) > 0 THEN GOTO retry; errorInProgress _ TRUE; StatsReport[" *** Too many TransactionAbort's during WalnutOps call"]; REJECT }; errorInProgress _ TRUE; StatsReport[IO.PutFR[" *** WalnutDefs.Error: %g", IO.atom[code]]]; REJECT }; proc[]; IF didUpdate THEN WalnutRoot.CommitAndContinue[]; RETURN; EXITS retry => NULL; END; WalnutLog.ForgetLogStreams[]; WalnutRoot.AbortTransaction[]; IF (schemaInvalid _ WalnutRoot.StartTransaction[]) THEN WalnutDB.InitSchema[walnutSegment]; WalnutLog.OpenLogStreams[]; ENDLOOP; }; LongRunningApply: PUBLIC PROC[proc: PROC[inProgress: BOOL]] = { reTryCount: INT _ 2; alreadyCalled: BOOL _ FALSE; schemaInvalid: BOOL _ TRUE; recentActivity _ TRUE; isShutdown _ FALSE; IF ~started THEN ERROR WalnutDefs.Error[$root, $NotStarted, "Must do WalnutOps.Startup"]; DO BEGIN ENABLE WalnutDefs.Error => { IF code = $TransactionAbort THEN { StatsReport[" **** TransactionAbort"]; IF ( reTryCount _ reTryCount - 1) > 0 THEN GOTO retry; errorInProgress _ TRUE; StatsReport[" *** Too many TransactionAbort's during LongRunning op"]; REJECT }; errorInProgress _ TRUE; StatsReport[IO.PutFR[" *** WalnutDefs.Error: %g", IO.atom[code]]]; REJECT }; WalnutLog.AcquireWriteLock[]; proc[alreadyCalled ! WalnutDefs.VersionMismatch => { ReleaseWriteLock[]; REJECT} ]; ReleaseWriteLock[]; -- also does commit RETURN; EXITS retry => NULL; END; WalnutLog.ForgetLogStreams[]; WalnutRoot.AbortTransaction[]; IF (schemaInvalid _ WalnutRoot.StartTransaction[]) THEN WalnutDB.InitSchema[walnutSegment]; WalnutLog.OpenLogStreams[]; alreadyCalled _ WalnutDB.GetOpInProgressPos[] > 0; ENDLOOP; }; Restart: PUBLIC PROC = { -- need to re-open the transaction schemaInvalid: BOOL = WalnutRoot.StartTransaction[]; IF schemaInvalid THEN WalnutDB.InitSchema[walnutSegment]; WalnutLog.OpenLogStreams[]; isShutdown _ FALSE; }; ReleaseWriteLock: PROC = { WalnutRoot.CommitAndContinue[]; WalnutLog.ReleaseWriteLock[]; }; DoLogExpunge: PUBLIC PROC[expungeID: INT] = { DO expPhase: WalnutKernelDefs.LogExpungePhase_ WalnutDB.GetLogExpungePhase[]; SELECT expPhase FROM idle => { WalnutOpsInternal.StatsReport["\n ~~~ Starting log expunge"]; WalnutDB.SetLogExpungePhase[initializingExpungeLog]; WalnutRoot.CommitAndContinue[]; commitFrequency_ expungeCommitFrequency; }; initializingExpungeLog => { bytesInDestroyedMsgs: INT_ WalnutDB.GetExpungeInfo[].bytesInDestroyedMsgs; logLen: INT_ WalnutLog.LogLength[]; pagesNeeded: INT_ FS.PagesForBytes[logLen-bytesInDestroyedMsgs]; expungeFileID: INT; WalnutLog.ReturnCurrentLogStreams[]; expungeFileID _ WalnutLogExpunge.StartExpunge[pagesNeeded]; WalnutDB.SetExpungeFileID[expungeFileID]; WalnutDB.SetLogExpungePhase[writingExpungeLog]; WalnutRoot.CommitAndContinue[]; }; writingExpungeLog => { WalnutLog.ReturnCurrentLogStreams[]; -- in case of restart [] _ WriteExpungeLog[]; WalnutLogExpunge.EndExpunge[]; WalnutDB.SetLogExpungePhase[swappingLogs]; WalnutRoot.CommitAndContinue[]; }; swappingLogs => { expungeFileID: INT _ WalnutDB.GetExpungeFileID[]; date: BasicTime.GMT; logLen: INT; WalnutLog.ReturnCurrentLogStreams[]; -- in case of restart [] _ WalnutRoot.GetStreamsForExpunge[]; WalnutOpsInternal.CheckReport["\n Swapping Log Files\n"]; WalnutDB.SetExpungeProgressInfo[0,0]; WalnutDB.SetExpungeInfo[0, 0]; WalnutDB.SetOpInProgressPos[-1]; WalnutDB.SetLogExpungePhase[idle]; -- whew WalnutDB.SetTimeOfLastExpunge[BasicTime.Now[]]; WalnutDB.SetRootFileVersion[date _ BasicTime.Now[]]; logLen _ WalnutRoot.SwapLogs[expungeFileID, date]; WalnutRoot.CommitAndContinue[]; WalnutOpsInternal.StatsReport["\n ~~~ Finished expunge"]; WalnutLog.OpenLogStreams[]; RETURN; }; ENDCASE => ERROR; ENDLOOP; }; WriteExpungeLog: PROC = { numMsgs: INT _ 0; currentLogPos, expungeLogPos: INT; previousAt, at: INT; startRetryCount: INT = 2; retryCount: INT _ startRetryCount; bytesToCopyBeforeFlush: INT _ 200000; lastExpungeCommitLength: INT _ -1; DO BEGIN ENABLE WalnutDefs.Error => IF code = $TransactionAbort THEN GOTO retry ELSE REJECT; bytesCopiedSinceLastCommit: INT _ 0; lastAcceptNewMailPos: INT; firstDestroyedMsgPos: INT _ WalnutDB.GetExpungeInfo[].firstDestroyedMsgPos; [currentLogPos, expungeLogPos] _ WalnutDB.GetExpungeProgressInfo[]; IF lastExpungeCommitLength = expungeLogPos THEN { IF (retryCount _ retryCount - 1) <= 0 THEN ERROR WalnutDefs.Error[$log, $TooManyLogAborts, IO.PutFR["\n During Expunge - last commit pos in expungeLog was %g", IO.int[expungeLogPos]]]; } ELSE { lastExpungeCommitLength _ expungeLogPos; retryCount _ startRetryCount; }; IF ~WalnutLogExpunge.RestartExpunge[currentLogPos, expungeLogPos] THEN { pagesNeeded: INT _ FS.PagesForBytes[WalnutLogExpunge.ExpLogLength[] - WalnutDB.GetExpungeInfo[].bytesInDestroyedMsgs]; WalnutOpsInternal.CheckReport[IO.PutFR[ "\n ****Could not restart expunge from %g in expungeLog, %g in currentLog", IO.int[expungeLogPos], IO.int[currentLogPos]], "\n restarting Log Expunge"]; expungeLogPos _ currentLogPos _ 0; WalnutDB.SetExpungeProgressInfo[currentLogPos, expungeLogPos]; -- commits [] _ WalnutLogExpunge.StartExpunge[pagesNeeded]; -- try again WalnutOpsInternal.StatsReport["Restarting expunge from beginning"]; IF ~WalnutLogExpunge.RestartExpunge[currentLogPos, expungeLogPos] THEN WalnutDefs.Error[$log, $CantStartExpunge]; lastExpungeCommitLength _ expungeLogPos; }; IF currentLogPos < firstDestroyedMsgPos THEN { -- copyingHead phase bytesLeftToCopyInHead: INT _ firstDestroyedMsgPos - currentLogPos; bytesToCopyAtOneTime: INT = FS.BytesForPages[400]; IF currentLogPos = 0 THEN { WalnutLogExpunge.ExpSetIndex[0]; [] _ WalnutLogExpunge.SkipEntry[]; -- need to skip the LogFileInfo entry bytesLeftToCopyInHead _ bytesLeftToCopyInHead - WalnutLogExpunge.GetIndex[]; }; WalnutOpsInternal.CheckReport[ IO.PutFR["\n Copying the head of the log, starting at bytePos %g, ending at bytePos %g\n", IO.int[currentLogPos], IO.int[firstDestroyedMsgPos] ]]; DO bytesToCopyThisTime: INT _ MIN[bytesLeftToCopyInHead, bytesToCopyAtOneTime]; WalnutLogExpunge.CopyBytesToExpungeLog[bytesToCopyThisTime]; [currentLogPos, expungeLogPos] _ WalnutLogExpunge.GetExpungeProgress[]; WalnutDB.SetExpungeProgressInfo[currentLogPos, expungeLogPos]; WalnutRoot.CommitAndContinue[]; WalnutOpsInternal.CheckReport["#"]; bytesLeftToCopyInHead _ bytesLeftToCopyInHead - bytesToCopyThisTime; IF bytesLeftToCopyInHead = 0 THEN EXIT; ENDLOOP; retryCount _ startRetryCount; -- in case has abort during copy }; lastAcceptNewMailPos _ WalnutDB.GetAcceptNewMailPos[]; WalnutLogExpunge.ExpSetIndex[at _ currentLogPos]; -- before doing PeekEntry WalnutOpsInternal.CheckReport[ IO.PutFR["\nProcessing the tail of the log, starting at bytePos %g\n", IO.int[at] ]]; DO ident: ATOM; msgID: REF TEXT; bytesThisCopy: INT _ 0; previousAt _ at; [ident, msgID, at] _ WalnutLogExpunge.PeekEntry[]; IF ident = NIL THEN IF at # -1 THEN EXIT -- end of log ELSE { charSkipped: INT _ WalnutLogExpunge.ExpSetPosition[previousAt+1]; IF charSkipped = -1 THEN ERROR WalnutDefs.Error[$log, $DuringExpunge, IO.PutFR["No entry found after %g", IO.int[previousAt]]]; at _ previousAt + charSkipped; LOOP; }; SELECT ident FROM $ExpungeMsgs => [] _ WalnutLogExpunge.SkipEntry[]; $WriteExpungeLog => EXIT; -- must be last entry on PREVIOUS LOG $LogFileInfo => [] _ WalnutLogExpunge.SkipEntry[]; $RecordNewMailInfo, $StartCopyNewMail => IF at < lastAcceptNewMailPos THEN [] _ WalnutLogExpunge.SkipEntry[] ELSE bytesThisCopy _ WalnutLogExpunge.CopyEntry[].bytesCopied; $EndCopyNewMailInfo => { IF at < lastAcceptNewMailPos THEN { -- maybe skip startCopyPos: INT _ WalnutLogExpunge.EndCopyEntry[]; IF firstDestroyedMsgPos < startCopyPos THEN [] _ WalnutLogExpunge.SkipEntry[] ELSE bytesThisCopy _ WalnutLogExpunge.CopyEntry[].bytesCopied; } ELSE bytesThisCopy _ WalnutLogExpunge.CopyEntry[].bytesCopied; }; $AcceptNewMail => IF at < lastAcceptNewMailPos THEN [] _ WalnutLogExpunge.SkipEntry[] ELSE { newLogPos: INT; [newLogPos, bytesThisCopy] _ WalnutLogExpunge.CopyEntry[]; WalnutDB.SetAcceptNewMailPos[newLogPos]; }; $StartCopyReadArchive => [] _ WalnutLogExpunge.SkipEntry[]; $EndCopyReadArchiveInfo => { startCopyPos: INT _ WalnutLogExpunge.EndCopyEntry[]; IF firstDestroyedMsgPos < startCopyPos THEN [] _ WalnutLogExpunge.SkipEntry[] ELSE bytesThisCopy _ WalnutLogExpunge.CopyEntry[].bytesCopied; }; ENDCASE => IF RefText.Length[msgID] = 0 OR WalnutDB.MsgExists[Rope.FromRefText[msgID]] THEN { newLogPos: INT ; [newLogPos, bytesThisCopy] _ WalnutLogExpunge.CopyEntry[]; IF ident = $CreateMsg THEN { IF newLogPos # at THEN WalnutDB.SetMsgEntryPosition[to: newLogPos]; IF (numMsgs _ numMsgs + 1) MOD 10 = 0 THEN IF numMsgs MOD 100 = 0 THEN WalnutOpsInternal.CheckReport[IO.PutFR["(%g)", IO.int[numMsgs]]] ELSE WalnutOpsInternal.CheckReport["."]; }; } ELSE [] _ WalnutLogExpunge.SkipEntry[]; IF (bytesCopiedSinceLastCommit _ bytesCopiedSinceLastCommit + bytesThisCopy) >= bytesToCopyBeforeFlush THEN { expLogPos: INT; [currentLogPos, expLogPos] _ WalnutLogExpunge.GetExpungeProgress[]; WalnutDB.SetExpungeProgressInfo[currentLogPos, expLogPos]; WalnutRoot.CommitAndContinue[]; lastExpungeCommitLength _ expLogPos; bytesCopiedSinceLastCommit _ 0; }; ENDLOOP; IF bytesCopiedSinceLastCommit > 0 THEN { [currentLogPos, expungeLogPos] _ WalnutLogExpunge.GetExpungeProgress[]; WalnutDB.SetExpungeProgressInfo[currentLogPos, expungeLogPos]; WalnutRoot.CommitAndContinue[]; lastExpungeCommitLength _ expungeLogPos; }; RETURN; EXITS retry => { schemaInvalid: BOOL _ TRUE; WalnutLogExpunge.ExpShutdown[]; WalnutRoot.AbortTransaction[]; schemaInvalid _ WalnutRoot.StartTransaction[]; IF schemaInvalid THEN WalnutDB.InitSchema[walnutSegment]; WalnutLog.OpenLogStreams[]; -- awkward WalnutLog.ReturnCurrentLogStreams[]; }; END; ENDLOOP; }; CheckInProgress: PUBLIC PROC = { inProgressPos: INT; parseInProgress: BOOL; needsParse: BOOL _ FALSE; Cip: PROC = { IF (parseInProgress _ WalnutDB.GetParseLogInProgress[]) THEN { WalnutRoot.AcquireWriteLock[]; RETURN }; inProgressPos _ WalnutDB.GetOpInProgressPos[]; IF inProgressPos <= 0 THEN RETURN; WalnutRoot.AcquireWriteLock[]; }; Fip: PROC[inProgress: BOOL] = { -- FinishInProgress at: INT; wle: WalnutKernelDefs.LogEntry; WalnutLog.SetIndex[inProgressPos]; [wle, at] _ WalnutLog.NextEntry[]; IF at = -1 THEN ERROR WalnutDefs.Error[$log, $BadLog, IO.PutFR["No entry at %g", IO.int[inProgressPos]]]; TRUSTED { WITH le: wle SELECT FROM ExpungeMsgs => WalnutDB.ExpungeMsgs[dontCareMsgSetVersion, WalnutOpsInternal.CheckReport]; WriteExpungeLog => []_ WalnutOpsInternal.DoLogExpunge[le.internalFileID]; EmptyMsgSet => []_ WalnutDB.EmptyMsgSet[ [Rope.FromRefText[le.msgSet], dontCareMsgSetVersion], WalnutOpsInternal.CheckReport ]; DestroyMsgSet => []_ WalnutDB.DestroyMsgSet[ msgSet: [Rope.FromRefText[le.msgSet], dontCareMsgSetVersion], msDomainVersion: dontCareDomainVersion, report: WalnutOpsInternal.CheckReport ]; AcceptNewMail => { WalnutDB.AcceptNewMail[at, dontCareMsgSetVersion]; WalnutOpsInternal.newMailSomewhere _ FALSE; }; StartCopyNewMail => { FinishCopyTempLog[newMail, at]; WalnutDB.SetCopyMailLogPos[0]; WalnutRoot.CommitAndContinue[]; WalnutLog.FinishTempLogCopy[newMail]; needsParse _ TRUE; }; StartReadArchiveFile => { WalnutLog.ResetLog[at]; WalnutDB.SetReadArchivePos[0]; }; StartCopyReadArchive => { FinishCopyTempLog[readArchive, at]; WalnutDB.SetCopyReadArchivePos[0]; WalnutRoot.CommitAndContinue[]; WalnutLog.FinishTempLogCopy[readArchive]; needsParse _ TRUE; }; ENDCASE => ERROR WalnutDefs.Error[$db, $ErroneousInProgress, IO.PutFR["Entry at %g", IO.int[inProgressPos]]]; }; -- end trusted WalnutDB.SetOpInProgressPos[-1]; WalnutRoot.CommitAndContinue[]; }; IF WalnutOpsInternal.errorInProgress THEN ERROR WalnutDefs.Error[$db, $InternalError, "Must do Shutdown & Startup"]; IF WalnutOpsInternal.isShutdown THEN WalnutOpsInternal.Restart[]; WalnutOpsInternal.CarefullyApply[Cip, FALSE]; IF parseInProgress THEN { WalnutOpsInternal.StatsReport["\n Continuing parseInProgress"]; [] _ ParseLog[TRUE]; RETURN }; IF inProgressPos <= 0 THEN RETURN; WalnutOpsInternal.StatsReport["\n Continuing longRunningOperation"]; WalnutOpsInternal.LongRunningApply[Fip]; IF needsParse THEN { numNew: INT _ 0; WalnutOpsInternal.CheckReport["Adding messages to the database\n"]; numNew _ ParseLog[TRUE]; IF numNew = 0 THEN WalnutOpsInternal.CheckReport["No messages were new\n"] ELSE WalnutOpsInternal.CheckReport[ IO.PutFR[" %g new messages\n", IO.int[numNew] ]]; }; }; FinishCopyTempLog: PROC[which: WalnutKernelDefs.WhichTempLog, at: INT] = { logLen: INT _ WalnutLog.LogLength[]; startedCopyAt, startCopyPos, fromPos: INT; pagesAlreadyCopied: INT; IF WalnutLog.SetPosition[at] # 0 THEN ERROR WalnutDefs.Error[$log, $BadLog, IO.PutFR["no entry at %g", IO.int[at]]]; startCopyPos _ WalnutLog.NextEntry[].at; -- skip the copy entry startedCopyAt _ WalnutLog.NextAt[]; fromPos _ logLen - startedCopyAt; pagesAlreadyCopied _ FS.PagesForBytes[fromPos]; IF ~WalnutLog.PrepareToCopyTempLog[which: which, pagesAlreadyCopied: pagesAlreadyCopied, reportProc: WalnutOpsInternal.CheckReport] THEN ERROR WalnutDefs.Error[$log, $DuringOpInProgress, "can't get tempLog for copy"]; WalnutOpsInternal.CheckReport[ IO.PutFR["\nContinuing copy of tempLog, starting at bytePos %g\n", IO.int[fromPos]]]; WalnutLog.CopyTempLog[which, startCopyPos, fromPos, WalnutOpsInternal.CheckReport]; WalnutOpsInternal.CheckReport["\n"]; WalnutDB.SetParseLogInProgress[TRUE]; WalnutDB.SetParseLogPos[WalnutDB.GetOpInProgressPos[]]; WalnutDB.SetOpInProgressPos[-1]; }; ParseLog: PUBLIC PROC[verbose: BOOL_ FALSE] RETURNS[numNew: INT] = { parseLogPos: INT; at: INT; -- position in log of entry just processed lastNumNew, savedNumNew, savedLastNumNew: INT_ 0; wle: WalnutLog.LogEntry; charsSkipped: INT; errorRope: ROPE = "Can't find a log entry after pos %g (log length is %g)"; retries: INT_ 2; accepted: BOOL; commitFrequency: INT _ parseCommitFrequency; updatesSinceLastCommit: INT _ 0; numNew _ 0; DO currentLogLength: INT_ WalnutLog.LogLength[]; -- in case at end of Expunge BEGIN ENABLE WalnutDefs.Error => IF code = $TransactionAbort THEN { WalnutOpsInternal.StatsReport[" *** TransactionAborts during parseLog"]; IF ( retries _ retries - 1) > 0 THEN GOTO tryAgain; WalnutOpsInternal.errorInProgress _ TRUE; WalnutOpsInternal.StatsReport[" *** Too many TransactionAborts during parseLog"]; REJECT } ELSE { WalnutOpsInternal.errorInProgress _ TRUE; WalnutOpsInternal.StatsReport[ IO.PutFR["InternalError: %g during parseLog", IO.atom[code]]]; REJECT }; parseLogPos _ WalnutDB.GetParseLogPos[]; IF parseLogPos = currentLogLength THEN RETURN[numNew]; charsSkipped _ WalnutLog.SetPosition[parseLogPos]; IF charsSkipped = -1 THEN ERROR WalnutDefs.Error[$log, $BadLog, IO.PutFR[errorRope, IO.int[parseLogPos], IO.int[currentLogLength]]]; at _ parseLogPos + charsSkipped; IF at # parseLogPos THEN { WalnutDB.SetParseLogPos[at]; WalnutRoot.CommitAndContinue[]; }; accepted _ NOT WalnutDB.GetAddingServerMsgs[]; DO previousAt: INT _ at; [wle, at]_ WalnutLog.NextEntry[]; IF at = -1 THEN { charsSkipped_ WalnutLog.SetPosition[previousAt+1]; IF charsSkipped = -1 THEN ERROR WalnutDefs.Error[$log, $BadLog, IO.PutFR[errorRope, IO.int[previousAt], IO.int[currentLogLength]]]; [wle, at]_ WalnutLog.NextEntry[] }; IF wle = NIL THEN IF at = -1 THEN ERROR WalnutDefs.Error[$log, $BadLog, IO.PutFR[errorRope, IO.int[previousAt], IO.int[currentLogLength]]] ELSE { -- at#-1 AND wle=NIL => at end WalnutDB.SetParseLogPos[-1]; WalnutDB.SetParseLogInProgress[FALSE]; WalnutRoot.CommitAndContinue[]; RETURN }; IF (updatesSinceLastCommit_ updatesSinceLastCommit + 1) >= commitFrequency THEN { WalnutDB.SetParseLogPos[at]; -- at is beginning of current (unseen) entry WalnutRoot.CommitAndContinue[]; updatesSinceLastCommit _ 0; savedNumNew _ numNew; savedLastNumNew _ lastNumNew; }; BEGIN ENABLE WalnutDefs.VersionMismatch => GOTO continue; TRUSTED { WITH le: wle SELECT FROM LogFileInfo => NULL; ExpungeMsgs => WalnutDB.ExpungeMsgs[dontCareMsgSetVersion, WalnutOpsInternal.CheckReport]; WriteExpungeLog => { WalnutDB.SetParseLogPos[-1]; WalnutDB.SetParseLogInProgress[FALSE]; WalnutDB.SetOpInProgressPos[at]; -- at is beginning of current entry WalnutRoot.CommitAndContinue[]; [] _ WalnutOpsInternal.DoLogExpunge[le.internalFileID]; EXIT; -- WriteExpungeLog had to be the last entry }; CreateMsgSet => [] _ WalnutDB.CreateMsgSet[ Rope.FromRefText[le.msgSet], dontCareDomainVersion]; EmptyMsgSet => [] _ WalnutDB.EmptyMsgSet[ [Rope.FromRefText[le.msgSet], dontCareMsgSetVersion], WalnutOpsInternal.CheckReport ]; DestroyMsgSet => [] _ WalnutDB.DestroyMsgSet[ msgSet: [Rope.FromRefText[le.msgSet], dontCareMsgSetVersion], msDomainVersion: dontCareDomainVersion, report: WalnutOpsInternal.CheckReport ]; CreateMsg => { me: WalnutKernelDefs.MsgLogEntry _ NARROW[wle]; me.show _ accepted; IF ~WalnutDB.AddNewMsg[me] THEN numNew _ numNew + 1; }; AddMsg => []_ WalnutDB.AddMsg[ msg: Rope.FromRefText[le.msg], from: [NIL, dontCareMsgSetVersion], to: [Rope.FromRefText[le.to], dontCareMsgSetVersion] ]; RemoveMsg => []_ WalnutDB.RemoveMsg[ msg: Rope.FromRefText[le.msg], from: [Rope.FromRefText[le.from], dontCareMsgSetVersion], deletedVersion: dontCareMsgSetVersion ]; MoveMsg => []_ WalnutDB.MoveMsg[ msg: Rope.FromRefText[le.msg], from: [ Rope.FromRefText[le.from], dontCareMsgSetVersion], to: [Rope.FromRefText[le.to], dontCareMsgSetVersion] ]; HasBeenRead => WalnutDB.SetHasBeenRead[msg: Rope.FromRefText[le.msg]]; RecordNewMailInfo => { WalnutDB.SetNewMailInfo[ le.logLen, le.when, Rope.FromRefText[le.server], le.num]; WalnutOpsInternal.newMailSomewhere _ TRUE; }; StartCopyNewMail => { WalnutDB.SetCopyMailLogPos[at]; WalnutDB.SetAddingServerMsgs[TRUE]; accepted _ FALSE; }; EndCopyNewMailInfo => { WalnutDB.SetCopyMailLogPos[0]; WalnutDB.SetNewMailInfo[0, BasicTime.nullGMT, NIL, 0]; WalnutDB.SetAddingServerMsgs[FALSE]; accepted _ TRUE; }; AcceptNewMail => { WalnutDB.AcceptNewMail[at, dontCareMsgSetVersion]; WalnutOpsInternal.newMailSomewhere _ FALSE; }; StartReadArchiveFile => WalnutDB.SetReadArchivePos[at]; EndReadArchiveFile => WalnutDB.SetReadArchivePos[0]; StartCopyReadArchive => WalnutDB.SetCopyReadArchivePos[at]; EndCopyReadArchiveInfo => WalnutDB.SetCopyReadArchivePos[0]; ENDCASE => ERROR; }; EXITS continue => NULL; END; IF verbose THEN IF numNew # lastNumNew THEN IF numNew MOD 10 = 0 THEN IF numNew MOD 100 = 0 THEN WalnutOpsInternal.CheckReport[IO.PutFR["(%g)", IO.int[numNew]]] ELSE WalnutOpsInternal.CheckReport["~"]; lastNumNew_ numNew; ENDLOOP; EXITS tryAgain => { numNew _ savedNumNew; lastNumNew _ savedLastNumNew; }; END; ENDLOOP; }; END. ΔWalnutOpsInternalImpl.mesa Copyright c 1984 by Xerox Corporation. All rights reserved. Willie-Sue, September 17, 1986 11:20:07 am PDT Donahue, July 16, 1985 2:25:35 pm PDT Implementation of procedures of WalnutOpsInternal Last Edited by: Willie-sue, January 10, 1985 3:55:08 pm PST Last Edited by: Donahue, December 11, 1984 9:01:10 pm PST Types Signals Error: PUBLIC SIGNAL [who, code: ATOM, explanation: ROPE _ NIL]; Variables Procedures These procedures are the primitive atomic actions out of which Walnut is built. More complex operations will be found in WalnutClientOps (someday). -- * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * Utilities * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * Taken from WalnutOpsExpungeImpl The log is re-written, starting from the beginning. Called from within a LongRunningApply; calling SetLogExpungePhase does a commit For now, expungeID parameter is not used - may be necessary later firstDestroyedMsgPos: INT _ WalnutDB.GetExpungeInfo[].firstDestroyedMsgPos; almostOldLen: INT _ WalnutDB.GetOpInProgressPos[]; WalnutOpsInternal.CheckReport[IO.PutFR[ "\n Copying the head (%g bytes) of the log, then dumping the tail (%g bytes)\n", IO.int[firstDestroyedMsgPos], IO.int[almostOldLen-firstDestroyedMsgPos] ]]; called from within catch phrases, but must do its own error catching, since the expunge Log cannot be flushed after every write - that makes too much traffic on the alpine server; reads the old log, starting at currentLogPos, copies entries to the expungeLog, changing msg entryStart pointers as it goes, doing ocasional commits of both the expungeLog and the database lastAcceptNewMailPos can't be before firstDestroyedMsgPos the next entry either has nothing to do with a message or it is an operation on a message that still exists copy the entry to the new log it is the message body itself; record the change in position in the database if you haven't consumed the entry by copying it, then just advance the log pointer Commit the last batch of updates Taken from WalnutOpsParseLogImpl Is some long running operation in progress Parsing the Log - assumes the log is write-locked make sure things are in good shape before starting ΚE– "cedar" style˜šΟb™Jšœ Οmœ1™Jšœ Ÿœ˜JšœŸœŸœ˜JšœŸœ˜J˜šŸœ Ÿ˜JšŸœC˜H—J˜JšŸ˜šŸœŸ˜ šœ˜šŸœŸœ˜"Jšœ'˜'JšŸœ$ŸœŸœ˜6JšœŸœ˜JšœF˜FJšŸ˜J˜—JšœŸœ˜Jšœ Ÿœ$Ÿœ˜BJšŸ˜Jšœ˜——˜Jšœ˜J˜JšŸœ Ÿœ ˜1JšŸœ˜J˜šŸ˜Jšœ Ÿœ˜—JšŸœ˜J˜J˜J˜J˜šŸœ1Ÿ˜7Jšœ%˜%—Jšœ˜—šŸœ˜J˜—J˜——˜š ‘œŸœŸœŸœ Ÿœ˜?Jšœ Ÿœ˜JšœŸœŸœ˜JšœŸœŸœ˜JšœŸœ˜Jšœ Ÿœ˜J˜šŸœ Ÿ˜JšŸœC˜H—J˜JšŸ˜šŸœŸ˜ šœ˜šŸœŸœ˜"Jšœ'˜'JšŸœ$ŸœŸœ˜6JšœŸœ˜JšœF˜FJšŸ˜J˜—JšœŸœ˜Jšœ Ÿœ$Ÿœ˜BJšŸ˜Jšœ˜——˜J˜JšœIŸœ˜SJ˜Jšœ ˜(JšŸœ˜šŸ˜Jšœ Ÿœ˜—JšŸœ˜J˜J˜Jšœ˜J˜šŸœ1Ÿ˜7Jšœ#˜#—Jšœ˜Jšœ2˜2—JšŸœ˜J˜——˜š‘œŸ œ "˜˜>J˜Jšœ#˜#JšœD˜DJšŸœŸœŸœ˜'JšŸœ˜—Jšœ  ˜?J˜—J˜Jšœ9™9Jšœ6˜6Jšœ3 ˜Lšœ˜JšŸœEŸœ ˜U—šŸ˜JšœŸœ˜ JšœŸœŸœ˜JšœŸœ˜Jšœ˜J˜Jšœ3˜3šŸœ ŸœŸ˜JšŸœ ŸœŸœ  ˜#šŸœ˜Jšœ Ÿœ1˜AšŸœŸ˜šŸœ'˜,JšŸœ"Ÿœ˜9——Jšœ˜JšŸœ˜J˜——J˜šŸœŸ˜J˜Jšœ2˜2J˜JšœŸœ %˜@J˜Jšœ2˜2J˜šœ(˜(šŸœŸœ"˜CJšŸœ:˜>——J˜šœ˜šŸœŸœ  ˜2JšœŸœ#˜4šŸœ%Ÿœ!˜MJšŸœ:˜>—J˜JšŸœ:˜>—J˜—J˜˜šŸœŸœ"˜CšŸœ˜Jšœ Ÿœ˜Jšœ:˜:Jšœ(˜(J˜———J˜Jšœ;˜;J˜šœ˜JšœŸœ#˜4šŸœ%Ÿœ!˜MJšŸœ;˜?—J˜—J˜šŸœ˜ šŸœŸ˜Jšœ,Ÿœ˜2Jšœk™kJšœ™Jšœ Ÿœ˜Jšœ:˜:šŸœŸœ˜JšœL™LJšŸœŸœ-˜CšŸœŸœŸ˜*šŸœ Ÿœ Ÿ˜šœŸœŸœ˜@JšŸœ$˜(———Jšœ˜—Jšœ˜—JšœR™RJšŸœ#˜'——J˜šŸœeŸœ˜mJšœ Ÿœ˜JšœC˜CJšœ:˜:J˜Jšœ$˜$Jšœ˜J˜—J˜JšŸœ˜—J˜šœ ™ šŸœ Ÿœ˜(JšœG˜GJšœ>˜>J˜Jšœ(˜(Jšœ˜—JšŸœ˜—˜šŸ˜šœ ˜ JšœŸœŸœ˜J˜J˜Jšœ.˜.JšŸœŸœ$˜9Jšœ  ˜'Jšœ$˜$Jšœ˜J˜——JšŸœ˜—JšŸœ˜J˜J˜—J˜—J™ ™*š‘œŸ œ˜ JšœŸœ˜JšœŸœ˜Jšœ ŸœŸœ˜š‘œŸœ˜ šŸœ6Ÿœ˜>JšœŸœ˜(—J˜.JšŸœŸœŸœ˜"J˜J˜—š‘œŸœ Ÿœ ˜3JšœŸœ˜J˜J˜Jšœ"˜"J˜"šŸœ Ÿ˜šŸœ ˜%JšŸœŸœ˜3——šŸœŸœ ŸœŸ˜"J˜šœ˜JšœK˜K—J˜šœ˜Jšœ6˜6—J˜šœ(˜(Jšœ5˜5Jšœ ˜ —J˜šœ˜šœ˜Jšœ=˜=Jšœ'˜'Jšœ%˜%Jšœ˜——J˜˜Jšœ2˜2Jšœ%Ÿœ˜+J˜—J˜˜J˜J˜J˜Jšœ%˜%Jšœ Ÿœ˜J˜—J˜šœ˜Jšœ˜Jšœ˜J˜—J˜šœ˜Jšœ#˜#J˜"Jšœ˜Jšœ)˜)Jšœ Ÿœ˜J˜—J˜šŸœ˜ šŸœ,˜1JšŸœŸœ˜0——Jšœ ˜—J˜ Jšœ˜J˜—šŸœ#Ÿ˜)JšŸœE˜J—JšŸœŸœ˜AJšœ&Ÿœ˜-šŸœŸœ˜Jšœ?˜?JšœŸœ˜JšŸ˜Jšœ˜—JšŸœŸœŸœ˜"JšœD˜DJšœ(˜(šŸœ Ÿœ˜JšœŸœ˜JšœC˜CJšœŸœ˜šŸœ Ÿ˜Jšœ7˜7šŸœ˜#JšŸœŸœ˜1——J˜—J˜—J˜š‘œŸœ+Ÿœ˜KJšœŸœ˜$Jšœ&Ÿœ˜*J˜šŸœŸ˜%JšŸœ!ŸœŸœ ˜N—Jšœ* ˜@Jšœ#˜#Jšœ!˜!Jšœ/˜/JšŸœ˜ƒJšŸ œK˜Ušœ˜JšŸœAŸœ˜U—JšœS˜SJšœ$˜$JšœŸœ˜%Jšœ7˜7J˜ J˜——J˜L™1š‘œŸœŸœ ŸœŸœŸœ Ÿœ˜DJ˜Jšœ Ÿœ˜JšœŸœ *˜4Jšœ*Ÿœ˜1J˜J˜JšœŸœ˜Jšœ Ÿœ<˜KJšœ Ÿœ˜Jšœ Ÿœ˜JšœŸœ˜,JšœŸœ˜ J˜Jšœ ˜ J˜šŸ˜JšœŸœ ˜JšŸœŸœ˜ šŸœŸœ˜"JšœH˜HJšŸœŸœŸœ ˜3Jšœ$Ÿœ˜)JšœQ˜QJšŸ˜J˜—šŸœ˜Jšœ$Ÿœ˜)šœ˜JšŸœ,Ÿœ˜>—JšŸ˜J˜J˜——Jšœ(˜(J˜JšŸœ ŸœŸœ ˜6Jšœ2˜2šŸœŸ˜šŸœ ˜%JšŸœŸœŸœ˜D——J˜Jšœ ˜ J™Jšœ2™2šŸœŸœ˜Jšœ˜J˜J˜—J˜Jšœ Ÿœ ˜.J˜šŸ˜Jšœ Ÿœ˜J˜!šŸœ Ÿœ˜Jšœ2˜2šŸœŸ˜šŸœ ˜%JšŸœŸœŸœ˜C——J˜ J˜—šŸœŸœ˜ šŸœŸœ Ÿ˜šŸœ ˜%JšŸœŸœŸœ˜B——šŸœ ˜(Jšœ˜JšœŸœ˜&Jšœ˜JšŸ˜J˜——J˜šŸœIŸœ˜QJšœ ,˜JJšœ˜Jšœ˜Jšœ˜Jšœ˜J˜—J˜JšŸ œŸœ Ÿ˜9J˜šŸœŸœ ŸœŸ˜"J˜JšœŸœ˜J˜šœ˜JšœK˜K—J˜šœ˜Jšœ˜JšœŸœ˜&Jšœ" #˜EJ˜Jšœ7˜7JšŸœ +˜3J˜—J˜šœ˜šœ˜Jšœ4˜4——J˜šœ˜šœ˜Jšœ5˜5Jšœ˜Jšœ˜——J˜šœ˜šœ˜Jšœ=˜=Jšœ'˜'Jšœ%˜%Jšœ˜——J˜šœ˜Jšœ#Ÿœ˜/J˜JšŸœŸœ˜4J˜—J˜šœ ˜ šœ˜Jšœ˜JšœŸœ˜#Jšœ7˜7——J˜šœ ˜ šœ˜Jšœ˜Jšœ9˜9Jšœ(˜(——J˜˜ ˜Jšœ˜Jšœ:˜:Jšœ7˜7——J˜JšœF˜FJ˜šœ˜šœ˜Jšœ9˜9—Jšœ%Ÿœ˜*Jšœ˜—J˜šœ˜Jšœ˜JšœŸœ˜#Jšœ Ÿœ˜J˜—J˜šœ˜Jšœ˜Jšœ.Ÿœ˜6JšœŸœ˜$Jšœ Ÿœ˜J˜—J˜˜Jšœ2˜2Jšœ%Ÿœ˜+J˜—J˜Jšœ7˜7J˜Jšœ4˜4J˜šœ˜Jšœ#˜#—J˜Jšœ<˜