<> <> <> <<>> <> <> <> <<(Changed Expunge code substantially)>> <<>> DIRECTORY BasicTime USING [GMT, Now], FS USING [BytesForPages, PagesForBytes], IO, RefText USING [Length], Rope, WalnutDB --using lots-- , WalnutDefs USING [Error], WalnutKernelDefs USING [LogExpungePhase], WalnutLog USING [ExpungeMsgs, LogLength, OpenLogStreams, ReturnCurrentLogStreams, WriteExpungeLog], WalnutLogExpunge --using lots-- , WalnutRoot USING [AbortTransaction, CommitAndContinue, GetStreamsForExpunge, StartTransaction, SwapLogs], WalnutOps USING [DeletedMsgSetName], WalnutOpsInternal USING [CarefullyApply, CheckInProgress, CheckReport, LongRunningApply, StatsReport, systemIsReadOnly, walnutSegment], WalnutOpsMonitorImpl; WalnutOpsExpungeImpl: CEDAR MONITOR LOCKS walnutOpsMonitorImpl IMPORTS BasicTime, FS, IO, RefText, Rope, walnutOpsMonitorImpl: WalnutOpsMonitorImpl, WalnutDB, WalnutDefs, WalnutLog, WalnutLogExpunge, WalnutOps, WalnutOpsInternal, WalnutRoot EXPORTS WalnutOps, WalnutOpsInternal SHARES WalnutOpsMonitorImpl = BEGIN OPEN WalnutOpsInternal; <> ROPE: TYPE = Rope.ROPE; STREAM: TYPE = IO.STREAM; <> expungeCommitFrequency: INT = 20; commitFrequency: INT _ expungeCommitFrequency; <> <> GetExpungeInfo: PUBLIC ENTRY PROC RETURNS[ firstDestroyedMsgPos, bytesInDestroyedMsgs: INT] = { ENABLE UNWIND => NULL; Gei: PROC = { [ firstDestroyedMsgPos, bytesInDestroyedMsgs] _ WalnutDB.GetExpungeInfo[] }; WalnutOpsInternal.CheckInProgress[]; WalnutOpsInternal.CarefullyApply[Gei, FALSE] }; ExpungeMsgs: PUBLIC ENTRY PROC[deletedVersion: INT] RETURNS[bytesInDestroyedMsgs: INT] = { ENABLE UNWIND => NULL; Expm: INTERNAL PROC[inProgress: BOOL] = { IF ~inProgress THEN { at: INT; [] _ WalnutDB.VerifyMsgSet[[WalnutOps.DeletedMsgSetName, deletedVersion]]; at _ WalnutLog.ExpungeMsgs[].at; WalnutDB.SetOpInProgressPos[at]; }; WalnutDB.ExpungeMsgs[deletedVersion]; bytesInDestroyedMsgs _ WalnutDB.GetExpungeInfo[].bytesInDestroyedMsgs }; WalnutOpsInternal.CheckInProgress[]; CheckReport["\nDeleting msgs"]; WalnutOpsInternal.LongRunningApply[Expm]; }; CopyToExpungeLog: PUBLIC ENTRY PROC = { ENABLE UNWIND => NULL; ExpL: INTERNAL PROC[inProgress: BOOL] = { IF ~inProgress THEN { at: INT _ WalnutLog.WriteExpungeLog[].at; -- write log entry of intent WalnutDB.SetOpInProgressPos[at]; }; DoLogExpunge[0] }; WalnutOpsInternal.CheckInProgress[]; WalnutOpsInternal.LongRunningApply[ExpL]; }; DoLogExpunge: PUBLIC INTERNAL PROC[expungeID: INT] = { <> <> DO expPhase: WalnutKernelDefs.LogExpungePhase_ WalnutDB.GetLogExpungePhase[]; SELECT expPhase FROM idle => { WalnutOpsInternal.StatsReport["\n ~~~ Starting log expunge"]; WalnutDB.SetLogExpungePhase[initializingExpungeLog]; WalnutRoot.CommitAndContinue[]; commitFrequency_ expungeCommitFrequency; }; initializingExpungeLog => { bytesInDestroyedMsgs: INT_ WalnutDB.GetExpungeInfo[].bytesInDestroyedMsgs; logLen: INT_ WalnutLog.LogLength[]; pagesNeeded: INT_ FS.PagesForBytes[logLen-bytesInDestroyedMsgs]; expungeFileID: INT; WalnutLog.ReturnCurrentLogStreams[]; expungeFileID _ WalnutLogExpunge.StartExpunge[pagesNeeded]; WalnutDB.SetExpungeFileID[expungeFileID]; WalnutDB.SetLogExpungePhase[writingExpungeLog]; WalnutRoot.CommitAndContinue[]; }; writingExpungeLog => { <> <> WalnutLog.ReturnCurrentLogStreams[]; -- in case of restart <> <<"\n Copying the head (%g bytes) of the log, then dumping the tail (%g bytes)\n",>> <> [] _ WriteExpungeLog[]; WalnutLogExpunge.EndExpunge[]; WalnutDB.SetLogExpungePhase[swappingLogs]; WalnutRoot.CommitAndContinue[]; }; swappingLogs => { expungeFileID: INT _ WalnutDB.GetExpungeFileID[]; date: BasicTime.GMT; logLen: INT; WalnutLog.ReturnCurrentLogStreams[]; -- in case of restart [] _ WalnutRoot.GetStreamsForExpunge[]; WalnutOpsInternal.CheckReport["\n Swapping Log Files\n"]; WalnutDB.SetExpungeProgressInfo[0,0]; WalnutDB.SetExpungeInfo[0, 0]; WalnutDB.SetOpInProgressPos[-1]; WalnutDB.SetLogExpungePhase[idle]; -- whew WalnutDB.SetRootFileVersion[date _ BasicTime.Now[]]; logLen _ WalnutRoot.SwapLogs[expungeFileID, date]; WalnutRoot.CommitAndContinue[]; WalnutOpsInternal.StatsReport["\n ~~~ Finished expunge"]; WalnutLog.OpenLogStreams[]; RETURN; }; ENDCASE => ERROR; ENDLOOP; }; <<>> WriteExpungeLog: PROC = { <> numMsgs: INT _ 0; currentLogPos, expungeLogPos: INT; previousAt, at: INT; startRetryCount: INT = 2; retryCount: INT _ startRetryCount; bytesToCopyBeforeFlush: INT _ 200000; lastExpungeCommitLength: INT _ -1; DO BEGIN ENABLE WalnutDefs.Error => IF code = $TransactionAbort THEN GOTO retry ELSE REJECT; bytesCopiedSinceLastCommit: INT _ 0; lastAcceptNewMailPos: INT; firstDestroyedMsgPos: INT _ WalnutDB.GetExpungeInfo[].firstDestroyedMsgPos; [currentLogPos, expungeLogPos] _ WalnutDB.GetExpungeProgressInfo[]; IF lastExpungeCommitLength = expungeLogPos THEN { IF (retryCount _ retryCount - 1) <= 0 THEN ERROR WalnutDefs.Error[$log, $TooManyLogAborts, IO.PutFR["\n During Expunge - last commit pos in expungeLog was %g", IO.int[expungeLogPos]]]; } ELSE { lastExpungeCommitLength _ expungeLogPos; retryCount _ startRetryCount; }; IF ~WalnutLogExpunge.RestartExpunge[currentLogPos, expungeLogPos] THEN { pagesNeeded: INT _ FS.PagesForBytes[WalnutLogExpunge.LogLength[] - WalnutDB.GetExpungeInfo[].bytesInDestroyedMsgs]; WalnutOpsInternal.CheckReport[IO.PutFR[ "\n ****Could not restart expunge from %g in expungeLog, %g in currentLog", IO.int[expungeLogPos], IO.int[currentLogPos]], "\n restarting Log Expunge"]; expungeLogPos _ currentLogPos _ 0; WalnutDB.SetExpungeProgressInfo[currentLogPos, expungeLogPos]; -- commits [] _ WalnutLogExpunge.StartExpunge[pagesNeeded]; -- try again WalnutOpsInternal.StatsReport["Restarting expunge from beginning"]; IF ~WalnutLogExpunge.RestartExpunge[currentLogPos, expungeLogPos] THEN WalnutDefs.Error[$log, $CantStartExpunge]; lastExpungeCommitLength _ expungeLogPos; }; IF currentLogPos < firstDestroyedMsgPos THEN { -- copyingHead phase bytesLeftToCopyInHead: INT _ firstDestroyedMsgPos - currentLogPos; bytesToCopyAtOneTime: INT = FS.BytesForPages[400]; IF currentLogPos = 0 THEN { WalnutLogExpunge.SetIndex[0]; [] _ WalnutLogExpunge.SkipEntry[]; -- need to skip the LogFileInfo entry bytesLeftToCopyInHead _ bytesLeftToCopyInHead - WalnutLogExpunge.GetIndex[]; }; WalnutOpsInternal.CheckReport[ IO.PutFR["\n Copying the head of the log, starting at bytePos %g, ending at bytePos %g\n", IO.int[currentLogPos], IO.int[firstDestroyedMsgPos] ]]; DO bytesToCopyThisTime: INT _ MIN[bytesLeftToCopyInHead, bytesToCopyAtOneTime]; WalnutLogExpunge.CopyBytesToExpungeLog[bytesToCopyThisTime]; [currentLogPos, expungeLogPos] _ WalnutLogExpunge.GetExpungeProgress[]; WalnutDB.SetExpungeProgressInfo[currentLogPos, expungeLogPos]; WalnutRoot.CommitAndContinue[]; WalnutOpsInternal.CheckReport["#"]; bytesLeftToCopyInHead _ bytesLeftToCopyInHead - bytesToCopyThisTime; IF bytesLeftToCopyInHead = 0 THEN EXIT; ENDLOOP; retryCount _ startRetryCount; -- in case has abort during copy }; <> lastAcceptNewMailPos _ WalnutDB.GetAcceptNewMailPos[]; WalnutLogExpunge.SetIndex[at _ currentLogPos]; -- before doing PeekEntry WalnutOpsInternal.CheckReport[ IO.PutFR["\nProcessing the tail of the log, starting at bytePos %g\n", IO.int[at] ]]; DO ident: ATOM; msgID: REF TEXT; bytesThisCopy: INT _ 0; previousAt _ at; [ident, msgID, at] _ WalnutLogExpunge.PeekEntry[]; IF ident = NIL THEN IF at # -1 THEN EXIT -- end of log ELSE { charSkipped: INT _ WalnutLogExpunge.SetPosition[previousAt+1]; IF charSkipped = -1 THEN ERROR WalnutDefs.Error[$log, $DuringExpunge, IO.PutFR["No entry found after %g", IO.int[previousAt]]]; at _ previousAt + charSkipped; LOOP; }; SELECT ident FROM $ExpungeMsgs => [] _ WalnutLogExpunge.SkipEntry[]; $WriteExpungeLog => EXIT; -- must be last entry on PREVIOUS LOG $LogFileInfo => [] _ WalnutLogExpunge.SkipEntry[]; $RecordNewMailInfo, $StartCopyNewMail => IF at < lastAcceptNewMailPos THEN [] _ WalnutLogExpunge.SkipEntry[] ELSE bytesThisCopy _ WalnutLogExpunge.CopyEntry[].bytesCopied; $EndCopyNewMailInfo => { IF at < lastAcceptNewMailPos THEN { -- maybe skip startCopyPos: INT _ WalnutLogExpunge.EndCopyEntry[]; IF firstDestroyedMsgPos < startCopyPos THEN [] _ WalnutLogExpunge.SkipEntry[] ELSE bytesThisCopy _ WalnutLogExpunge.CopyEntry[].bytesCopied; } ELSE bytesThisCopy _ WalnutLogExpunge.CopyEntry[].bytesCopied; }; $AcceptNewMail => IF at < lastAcceptNewMailPos THEN [] _ WalnutLogExpunge.SkipEntry[] ELSE { newLogPos: INT; [newLogPos, bytesThisCopy] _ WalnutLogExpunge.CopyEntry[]; WalnutDB.SetAcceptNewMailPos[newLogPos]; }; $StartCopyReadArchive => [] _ WalnutLogExpunge.SkipEntry[]; $EndCopyReadArchiveInfo => { startCopyPos: INT _ WalnutLogExpunge.EndCopyEntry[]; IF firstDestroyedMsgPos < startCopyPos THEN [] _ WalnutLogExpunge.SkipEntry[] ELSE bytesThisCopy _ WalnutLogExpunge.CopyEntry[].bytesCopied; }; ENDCASE => IF RefText.Length[msgID] = 0 OR WalnutDB.MsgExists[Rope.FromRefText[msgID]] THEN { <> <> newLogPos: INT ; [newLogPos, bytesThisCopy] _ WalnutLogExpunge.CopyEntry[]; IF ident = $CreateMsg THEN { <> IF newLogPos # at THEN WalnutDB.SetMsgEntryPosition[to: newLogPos]; IF (numMsgs _ numMsgs + 1) MOD 10 = 0 THEN IF numMsgs MOD 100 = 0 THEN WalnutOpsInternal.CheckReport[IO.PutFR["(%g)", IO.int[numMsgs]]] ELSE WalnutOpsInternal.CheckReport["."]; }; } <> ELSE [] _ WalnutLogExpunge.SkipEntry[]; IF (bytesCopiedSinceLastCommit _ bytesCopiedSinceLastCommit + bytesThisCopy) >= bytesToCopyBeforeFlush THEN { expLogPos: INT; [currentLogPos, expLogPos] _ WalnutLogExpunge.GetExpungeProgress[]; WalnutDB.SetExpungeProgressInfo[currentLogPos, expLogPos]; WalnutRoot.CommitAndContinue[]; lastExpungeCommitLength _ expLogPos; bytesCopiedSinceLastCommit _ 0; }; ENDLOOP; <> IF bytesCopiedSinceLastCommit > 0 THEN { [currentLogPos, expungeLogPos] _ WalnutLogExpunge.GetExpungeProgress[]; WalnutDB.SetExpungeProgressInfo[currentLogPos, expungeLogPos]; WalnutRoot.CommitAndContinue[]; lastExpungeCommitLength _ expungeLogPos; }; RETURN; EXITS retry => { WalnutLogExpunge.Shutdown[]; WalnutRoot.AbortTransaction[]; WalnutRoot.StartTransaction[]; WalnutDB.CheckSchema[walnutSegment, systemIsReadOnly]; WalnutLog.OpenLogStreams[]; -- awkward WalnutLog.ReturnCurrentLogStreams[]; }; END; ENDLOOP; }; END.