DIRECTORY BasicTime USING [GMT, Now], FS USING [BytesForPages, PagesForBytes], IO, RefText USING [Length], Rope, WalnutDB --using lots-- , WalnutDefs USING [Error], WalnutKernelDefs USING [LogExpungePhase], WalnutLog USING [ExpungeMsgs, LogLength, OpenLogStreams, ReturnCurrentLogStreams, WriteExpungeLog], WalnutLogExpunge --using lots-- , WalnutRoot USING [AbortTransaction, CommitAndContinue, GetStreamsForExpunge, StartTransaction, SwapLogs], WalnutOps USING [DeletedMsgSetName], WalnutOpsInternal USING [CarefullyApply, CheckInProgress, CheckReport, LongRunningApply, StatsReport, walnutSegment], WalnutOpsMonitorImpl; WalnutOpsExpungeImpl: CEDAR MONITOR LOCKS walnutOpsMonitorImpl IMPORTS BasicTime, FS, IO, RefText, Rope, walnutOpsMonitorImpl: WalnutOpsMonitorImpl, WalnutDB, WalnutDefs, WalnutLog, WalnutLogExpunge, WalnutOps, WalnutOpsInternal, WalnutRoot EXPORTS WalnutOps, WalnutOpsInternal SHARES WalnutOpsMonitorImpl = BEGIN OPEN WalnutOpsInternal; ROPE: TYPE = Rope.ROPE; STREAM: TYPE = IO.STREAM; expungeCommitFrequency: INT = 20; commitFrequency: INT _ expungeCommitFrequency; GetExpungeInfo: PUBLIC ENTRY PROC RETURNS[ firstDestroyedMsgPos, bytesInDestroyedMsgs: INT] = { ENABLE UNWIND => NULL; Gei: PROC = { [ firstDestroyedMsgPos, bytesInDestroyedMsgs] _ WalnutDB.GetExpungeInfo[] }; WalnutOpsInternal.CheckInProgress[]; WalnutOpsInternal.CarefullyApply[Gei, FALSE] }; ExpungeMsgs: PUBLIC ENTRY PROC[deletedVersion: INT] RETURNS[bytesInDestroyedMsgs: INT] = { ENABLE UNWIND => NULL; Expm: INTERNAL PROC[inProgress: BOOL] = { IF ~inProgress THEN { at: INT; [] _ WalnutDB.VerifyMsgSet[[WalnutOps.DeletedMsgSetName, deletedVersion]]; at _ WalnutLog.ExpungeMsgs[].at; WalnutDB.SetOpInProgressPos[at]; }; WalnutDB.ExpungeMsgs[deletedVersion]; bytesInDestroyedMsgs _ WalnutDB.GetExpungeInfo[].bytesInDestroyedMsgs }; WalnutOpsInternal.CheckInProgress[]; CheckReport["\nDeleting msgs"]; WalnutOpsInternal.LongRunningApply[Expm]; }; CopyToExpungeLog: PUBLIC ENTRY PROC = { ENABLE UNWIND => NULL; ExpL: INTERNAL PROC[inProgress: BOOL] = { IF ~inProgress THEN { at: INT _ WalnutLog.WriteExpungeLog[].at; -- write log entry of intent WalnutDB.SetOpInProgressPos[at]; }; DoLogExpunge[0] }; WalnutOpsInternal.CheckInProgress[]; WalnutOpsInternal.LongRunningApply[ExpL]; }; GetTimeOfLastExpunge: PUBLIC ENTRY PROC RETURNS[when: BasicTime.GMT] = { ENABLE UNWIND => NULL; Gdle: PROC = { when _ WalnutDB.GetTimeOfLastExpunge[] }; WalnutOpsInternal.CheckInProgress[]; WalnutOpsInternal.CarefullyApply[Gdle, FALSE] }; DoLogExpunge: PUBLIC INTERNAL PROC[expungeID: INT] = { DO expPhase: WalnutKernelDefs.LogExpungePhase_ WalnutDB.GetLogExpungePhase[]; SELECT expPhase FROM idle => { WalnutOpsInternal.StatsReport["\n ~~~ Starting log expunge"]; WalnutDB.SetLogExpungePhase[initializingExpungeLog]; WalnutRoot.CommitAndContinue[]; commitFrequency_ expungeCommitFrequency; }; initializingExpungeLog => { bytesInDestroyedMsgs: INT_ WalnutDB.GetExpungeInfo[].bytesInDestroyedMsgs; logLen: INT_ WalnutLog.LogLength[]; pagesNeeded: INT_ FS.PagesForBytes[logLen-bytesInDestroyedMsgs]; expungeFileID: INT; WalnutLog.ReturnCurrentLogStreams[]; expungeFileID _ WalnutLogExpunge.StartExpunge[pagesNeeded]; WalnutDB.SetExpungeFileID[expungeFileID]; WalnutDB.SetLogExpungePhase[writingExpungeLog]; WalnutRoot.CommitAndContinue[]; }; writingExpungeLog => { WalnutLog.ReturnCurrentLogStreams[]; -- in case of restart [] _ WriteExpungeLog[]; WalnutLogExpunge.EndExpunge[]; WalnutDB.SetLogExpungePhase[swappingLogs]; WalnutRoot.CommitAndContinue[]; }; swappingLogs => { expungeFileID: INT _ WalnutDB.GetExpungeFileID[]; date: BasicTime.GMT; logLen: INT; WalnutLog.ReturnCurrentLogStreams[]; -- in case of restart [] _ WalnutRoot.GetStreamsForExpunge[]; WalnutOpsInternal.CheckReport["\n Swapping Log Files\n"]; WalnutDB.SetExpungeProgressInfo[0,0]; WalnutDB.SetExpungeInfo[0, 0]; WalnutDB.SetOpInProgressPos[-1]; WalnutDB.SetLogExpungePhase[idle]; -- whew WalnutDB.SetTimeOfLastExpunge[BasicTime.Now[]]; WalnutDB.SetRootFileVersion[date _ BasicTime.Now[]]; logLen _ WalnutRoot.SwapLogs[expungeFileID, date]; WalnutRoot.CommitAndContinue[]; WalnutOpsInternal.StatsReport["\n ~~~ Finished expunge"]; WalnutLog.OpenLogStreams[]; RETURN; }; ENDCASE => ERROR; ENDLOOP; }; WriteExpungeLog: PROC = { numMsgs: INT _ 0; currentLogPos, expungeLogPos: INT; previousAt, at: INT; startRetryCount: INT = 2; retryCount: INT _ startRetryCount; bytesToCopyBeforeFlush: INT _ 200000; lastExpungeCommitLength: INT _ -1; DO BEGIN ENABLE WalnutDefs.Error => IF code = $TransactionAbort THEN GOTO retry ELSE REJECT; bytesCopiedSinceLastCommit: INT _ 0; lastAcceptNewMailPos: INT; firstDestroyedMsgPos: INT _ WalnutDB.GetExpungeInfo[].firstDestroyedMsgPos; [currentLogPos, expungeLogPos] _ WalnutDB.GetExpungeProgressInfo[]; IF lastExpungeCommitLength = expungeLogPos THEN { IF (retryCount _ retryCount - 1) <= 0 THEN ERROR WalnutDefs.Error[$log, $TooManyLogAborts, IO.PutFR["\n During Expunge - last commit pos in expungeLog was %g", IO.int[expungeLogPos]]]; } ELSE { lastExpungeCommitLength _ expungeLogPos; retryCount _ startRetryCount; }; IF ~WalnutLogExpunge.RestartExpunge[currentLogPos, expungeLogPos] THEN { pagesNeeded: INT _ FS.PagesForBytes[WalnutLogExpunge.LogLength[] - WalnutDB.GetExpungeInfo[].bytesInDestroyedMsgs]; WalnutOpsInternal.CheckReport[IO.PutFR[ "\n ****Could not restart expunge from %g in expungeLog, %g in currentLog", IO.int[expungeLogPos], IO.int[currentLogPos]], "\n restarting Log Expunge"]; expungeLogPos _ currentLogPos _ 0; WalnutDB.SetExpungeProgressInfo[currentLogPos, expungeLogPos]; -- commits [] _ WalnutLogExpunge.StartExpunge[pagesNeeded]; -- try again WalnutOpsInternal.StatsReport["Restarting expunge from beginning"]; IF ~WalnutLogExpunge.RestartExpunge[currentLogPos, expungeLogPos] THEN WalnutDefs.Error[$log, $CantStartExpunge]; lastExpungeCommitLength _ expungeLogPos; }; IF currentLogPos < firstDestroyedMsgPos THEN { -- copyingHead phase bytesLeftToCopyInHead: INT _ firstDestroyedMsgPos - currentLogPos; bytesToCopyAtOneTime: INT = FS.BytesForPages[400]; IF currentLogPos = 0 THEN { WalnutLogExpunge.SetIndex[0]; [] _ WalnutLogExpunge.SkipEntry[]; -- need to skip the LogFileInfo entry bytesLeftToCopyInHead _ bytesLeftToCopyInHead - WalnutLogExpunge.GetIndex[]; }; WalnutOpsInternal.CheckReport[ IO.PutFR["\n Copying the head of the log, starting at bytePos %g, ending at bytePos %g\n", IO.int[currentLogPos], IO.int[firstDestroyedMsgPos] ]]; DO bytesToCopyThisTime: INT _ MIN[bytesLeftToCopyInHead, bytesToCopyAtOneTime]; WalnutLogExpunge.CopyBytesToExpungeLog[bytesToCopyThisTime]; [currentLogPos, expungeLogPos] _ WalnutLogExpunge.GetExpungeProgress[]; WalnutDB.SetExpungeProgressInfo[currentLogPos, expungeLogPos]; WalnutRoot.CommitAndContinue[]; WalnutOpsInternal.CheckReport["#"]; bytesLeftToCopyInHead _ bytesLeftToCopyInHead - bytesToCopyThisTime; IF bytesLeftToCopyInHead = 0 THEN EXIT; ENDLOOP; retryCount _ startRetryCount; -- in case has abort during copy }; lastAcceptNewMailPos _ WalnutDB.GetAcceptNewMailPos[]; WalnutLogExpunge.SetIndex[at _ currentLogPos]; -- before doing PeekEntry WalnutOpsInternal.CheckReport[ IO.PutFR["\nProcessing the tail of the log, starting at bytePos %g\n", IO.int[at] ]]; DO ident: ATOM; msgID: REF TEXT; bytesThisCopy: INT _ 0; previousAt _ at; [ident, msgID, at] _ WalnutLogExpunge.PeekEntry[]; IF ident = NIL THEN IF at # -1 THEN EXIT -- end of log ELSE { charSkipped: INT _ WalnutLogExpunge.SetPosition[previousAt+1]; IF charSkipped = -1 THEN ERROR WalnutDefs.Error[$log, $DuringExpunge, IO.PutFR["No entry found after %g", IO.int[previousAt]]]; at _ previousAt + charSkipped; LOOP; }; SELECT ident FROM $ExpungeMsgs => [] _ WalnutLogExpunge.SkipEntry[]; $WriteExpungeLog => EXIT; -- must be last entry on PREVIOUS LOG $LogFileInfo => [] _ WalnutLogExpunge.SkipEntry[]; $RecordNewMailInfo, $StartCopyNewMail => IF at < lastAcceptNewMailPos THEN [] _ WalnutLogExpunge.SkipEntry[] ELSE bytesThisCopy _ WalnutLogExpunge.CopyEntry[].bytesCopied; $EndCopyNewMailInfo => { IF at < lastAcceptNewMailPos THEN { -- maybe skip startCopyPos: INT _ WalnutLogExpunge.EndCopyEntry[]; IF firstDestroyedMsgPos < startCopyPos THEN [] _ WalnutLogExpunge.SkipEntry[] ELSE bytesThisCopy _ WalnutLogExpunge.CopyEntry[].bytesCopied; } ELSE bytesThisCopy _ WalnutLogExpunge.CopyEntry[].bytesCopied; }; $AcceptNewMail => IF at < lastAcceptNewMailPos THEN [] _ WalnutLogExpunge.SkipEntry[] ELSE { newLogPos: INT; [newLogPos, bytesThisCopy] _ WalnutLogExpunge.CopyEntry[]; WalnutDB.SetAcceptNewMailPos[newLogPos]; }; $StartCopyReadArchive => [] _ WalnutLogExpunge.SkipEntry[]; $EndCopyReadArchiveInfo => { startCopyPos: INT _ WalnutLogExpunge.EndCopyEntry[]; IF firstDestroyedMsgPos < startCopyPos THEN [] _ WalnutLogExpunge.SkipEntry[] ELSE bytesThisCopy _ WalnutLogExpunge.CopyEntry[].bytesCopied; }; ENDCASE => IF RefText.Length[msgID] = 0 OR WalnutDB.MsgExists[Rope.FromRefText[msgID]] THEN { newLogPos: INT ; [newLogPos, bytesThisCopy] _ WalnutLogExpunge.CopyEntry[]; IF ident = $CreateMsg THEN { IF newLogPos # at THEN WalnutDB.SetMsgEntryPosition[to: newLogPos]; IF (numMsgs _ numMsgs + 1) MOD 10 = 0 THEN IF numMsgs MOD 100 = 0 THEN WalnutOpsInternal.CheckReport[IO.PutFR["(%g)", IO.int[numMsgs]]] ELSE WalnutOpsInternal.CheckReport["."]; }; } ELSE [] _ WalnutLogExpunge.SkipEntry[]; IF (bytesCopiedSinceLastCommit _ bytesCopiedSinceLastCommit + bytesThisCopy) >= bytesToCopyBeforeFlush THEN { expLogPos: INT; [currentLogPos, expLogPos] _ WalnutLogExpunge.GetExpungeProgress[]; WalnutDB.SetExpungeProgressInfo[currentLogPos, expLogPos]; WalnutRoot.CommitAndContinue[]; lastExpungeCommitLength _ expLogPos; bytesCopiedSinceLastCommit _ 0; }; ENDLOOP; IF bytesCopiedSinceLastCommit > 0 THEN { [currentLogPos, expungeLogPos] _ WalnutLogExpunge.GetExpungeProgress[]; WalnutDB.SetExpungeProgressInfo[currentLogPos, expungeLogPos]; WalnutRoot.CommitAndContinue[]; lastExpungeCommitLength _ expungeLogPos; }; RETURN; EXITS retry => { schemaInvalid: BOOL _ TRUE; WalnutLogExpunge.Shutdown[]; WalnutRoot.AbortTransaction[]; schemaInvalid _ WalnutRoot.StartTransaction[]; IF schemaInvalid THEN WalnutDB.InitSchema[walnutSegment]; WalnutLog.OpenLogStreams[]; -- awkward WalnutLog.ReturnCurrentLogStreams[]; }; END; ENDLOOP; }; END. ”WalnutOpsExpungeImpl.mesa Copyright c 1985 by Xerox Corporation. All rights reserved. Willie-Sue, March 27, 1986 10:05:53 am PST Implementation of Expunging Last Edited by: Willie-sue, January 10, 1985 3:55:08 pm PST Last Edited by: Donahue, December 11, 1984 9:01:10 pm PST (Changed Expunge code substantially) Types Variables Procedures Space conservation (removing unreferenced messages) The log is re-written, starting from the beginning. Called from within a LongRunningApply; calling SetLogExpungePhase does a commit For now, expungeID parameter is not used - may be necessary later firstDestroyedMsgPos: INT _ WalnutDB.GetExpungeInfo[].firstDestroyedMsgPos; almostOldLen: INT _ WalnutDB.GetOpInProgressPos[]; WalnutOpsInternal.CheckReport[IO.PutFR[ "\n Copying the head (%g bytes) of the log, then dumping the tail (%g bytes)\n", IO.int[firstDestroyedMsgPos], IO.int[almostOldLen-firstDestroyedMsgPos] ]]; called from within catch phrases, but must do its own error catching, since the expunge Log cannot be flushed after every write - that makes too much traffic on the alpine server; reads the old log, starting at currentLogPos, copies entries to the expungeLog, changing msg entryStart pointers as it goes, doing ocasional commits of both the expungeLog and the database lastAcceptNewMailPos can't be before firstDestroyedMsgPos the next entry either has nothing to do with a message or it is an operation on a message that still exists copy the entry to the new log it is the message body itself; record the change in position in the database if you haven't consumed the entry by copying it, then just advance the log pointer Commit the last batch of updates Κ g– "cedar" style˜šΟb™Jšœ Οmœ1™šŸ˜Jšœ ŸœŸœ˜!Jšœ+˜+Jšœ2˜2Jšœ(˜(—J˜JšŸœ˜$JšŸœ˜J˜JšœŸ œ˜—head™JšŸœŸœŸœ˜JšŸœŸœŸœŸœ˜—™ JšœŸœ˜!JšœŸœ˜/—š ™ ™3šΟnœŸœŸœ.Ÿœ˜_JšŸœŸœŸœ˜š‘œŸœ˜ JšœN˜N—Jšœ$˜$Jšœ&Ÿœ˜,J˜—J˜š‘ œŸœŸœŸœŸœŸœŸœ˜ZJšŸœŸœŸœ˜š‘œŸœŸœ Ÿœ˜)šŸœ Ÿœ˜JšœŸœ˜JšœJ˜JJšœ ˜ Jšœ ˜ J˜—Jšœ%˜%JšœE˜EJšœ˜—Jšœ$˜$J˜Jšœ)˜)Jšœ˜—J˜š‘œŸœŸ œ˜'JšŸœŸœŸœ˜š‘œŸ œ Ÿœ˜)šŸœ Ÿœ˜JšœŸœ% ˜HJšœ ˜ J˜—Jšœ˜Jšœ˜—Jšœ$˜$Jšœ)˜)Jšœ˜—J˜š ‘œŸœŸœŸœŸœŸœ˜HJšŸœŸœŸœ˜š‘œŸœ˜ Jšœ+˜+—Jšœ$˜$Jšœ'Ÿœ˜-J˜—J˜š ‘ œŸœŸœŸœ Ÿœ˜6Jšœ„™„šœA™AšŸ˜J˜JšŸœ Ÿ˜˜ J˜=Jšœ4˜4J˜J˜(J˜—šœ˜JšœŸœ1˜JJšœŸœ˜#Jšœ ŸœŸœ,˜@JšœŸ˜J˜$Jšœ;˜;Jšœ)˜)Jšœ/˜/J˜J˜—šœ˜JšœŸœ2™KJšœŸœ!™2Jšœ& ˜;šœŸœ™'šœP™PJšŸœŸœ+™K——Jšœ˜J˜Jšœ*˜*J˜J˜—šœ˜JšœŸœ˜1JšœŸœ˜JšœŸœ˜ Jšœ& ˜;Jšœ'˜'Jšœ;˜;J˜%Jšœ˜J˜ Jšœ$ ˜,Jšœ/˜/Jšœ4˜4Jšœ2˜2J˜J˜J˜9Jšœ˜JšŸœ˜J˜—JšŸœŸœ˜——JšŸœ˜—J˜—J™š‘œŸœ˜Jšœπ™πJšœ Ÿœ˜JšœŸœ˜"JšœŸœ˜JšœŸœ˜Jšœ Ÿœ˜"JšœŸœ ˜%JšœŸœ˜"J˜JšŸ˜šŸœŸœ˜ Jš ŸœŸœŸœŸœŸœ˜8—J˜JšœŸœ˜$JšœŸœ˜JšœŸœ2˜KJ˜JšœC˜CšŸœ)Ÿœ˜1šŸœ$Ÿ˜*šŸœ*˜/šŸœB˜DJšŸœ˜———J˜šŸœ˜Jšœ(˜(Jšœ˜J˜——J˜šŸœ@Ÿœ˜HJšœ ŸœŸœ^˜sJ˜šœŸœ˜'šœK˜KJšŸœŸœ3˜L——Jšœ"˜"Jšœ?  ˜IJšœ3  ˜?J˜CšŸœ@Ÿ˜FJšœ*˜*—Jšœ(˜(J˜—J˜šŸœ&Ÿœ ˜DJšœŸœ(˜BJšœŸœŸœ˜2šŸœŸœ˜J˜Jšœ$ %˜Išœ˜Jšœ4˜4—J˜—šœ˜šŸœX˜ZJšŸœŸœ˜7——šŸ˜JšœŸœŸœ.˜LJšœ<˜˜>J˜Jšœ#˜#JšœD˜DJšŸœŸœŸœ˜'JšŸœ˜—Jšœ  ˜?J˜—J˜Jšœ9™9Jšœ6˜6Jšœ0 ˜Išœ˜JšŸœEŸœ ˜U—šŸ˜JšœŸœ˜ JšœŸœŸœ˜JšœŸœ˜Jšœ˜J˜Jšœ3˜3šŸœ ŸœŸ˜JšŸœ ŸœŸœ  ˜#šŸœ˜Jšœ Ÿœ.˜>šŸœŸ˜šŸœ'˜,JšŸœ"Ÿœ˜9——Jšœ˜JšŸœ˜J˜——J˜šŸœŸ˜J˜Jšœ2˜2J˜JšœŸœ %˜@J˜Jšœ2˜2J˜šœ(˜(šŸœŸœ"˜CJšŸœ:˜>——J˜šœ˜šŸœŸœ  ˜2JšœŸœ#˜4šŸœ%Ÿœ!˜MJšŸœ:˜>—J˜JšŸœ:˜>—J˜—J˜˜šŸœŸœ"˜CšŸœ˜Jšœ Ÿœ˜Jšœ:˜:Jšœ(˜(J˜———J˜Jšœ;˜;J˜šœ˜JšœŸœ#˜4šŸœ%Ÿœ!˜MJšŸœ;˜?—J˜—J˜šŸœ˜ šŸœŸ˜Jšœ,Ÿœ˜2Jšœk™kJšœ™Jšœ Ÿœ˜Jšœ:˜:šŸœŸœ˜JšœL™LJšŸœŸœ-˜CšŸœŸœŸ˜*šŸœ Ÿœ Ÿ˜šœŸœŸœ˜@JšŸœ$˜(———Jšœ˜—Jšœ˜—JšœR™RJšŸœ#˜'——J˜šŸœeŸœ˜mJšœ Ÿœ˜JšœC˜CJšœ:˜:J˜Jšœ$˜$Jšœ˜J˜—J˜JšŸœ˜—J˜šœ ™ šŸœ Ÿœ˜(JšœG˜GJšœ>˜>J˜Jšœ(˜(Jšœ˜—JšŸœ˜—˜šŸ˜šœ ˜ JšœŸœŸœ˜J˜J˜Jšœ.˜.JšŸœŸœ$˜9Jšœ  ˜'Jšœ$˜$Jšœ˜J˜——JšŸœ˜—JšŸœ˜J˜J˜———J˜JšŸœ˜J˜—…—(T:O