WalnutOpsExpungeImpl.mesa
Copyright © 1985 by Xerox Corporation. All rights reserved.
Willie-Sue, March 27, 1986 10:05:53 am PST
Implementation of Expunging
Last Edited by: Willie-sue, January 10, 1985 3:55:08 pm PST
Last Edited by: Donahue, December 11, 1984 9:01:10 pm PST
(Changed Expunge code substantially)
DIRECTORY
BasicTime USING [GMT, Now],
FS USING [BytesForPages, PagesForBytes],
IO,
RefText USING [Length],
Rope,
WalnutDB --using lots-- ,
WalnutDefs USING [Error],
WalnutKernelDefs USING [LogExpungePhase],
WalnutLog USING [ExpungeMsgs, LogLength, OpenLogStreams, ReturnCurrentLogStreams, WriteExpungeLog],
WalnutLogExpunge  --using lots-- ,
WalnutRoot USING [AbortTransaction, CommitAndContinue, GetStreamsForExpunge, StartTransaction, SwapLogs],
WalnutOps USING [DeletedMsgSetName],
WalnutOpsInternal USING [CarefullyApply, CheckInProgress, CheckReport, LongRunningApply, StatsReport, walnutSegment],
WalnutOpsMonitorImpl;
WalnutOpsExpungeImpl: CEDAR MONITOR LOCKS walnutOpsMonitorImpl
IMPORTS
BasicTime, FS, IO, RefText, Rope,
walnutOpsMonitorImpl: WalnutOpsMonitorImpl,
WalnutDB, WalnutDefs, WalnutLog, WalnutLogExpunge,
WalnutOps, WalnutOpsInternal, WalnutRoot
EXPORTS WalnutOps, WalnutOpsInternal
SHARES WalnutOpsMonitorImpl
= BEGIN OPEN WalnutOpsInternal;
Types
ROPE: TYPE = Rope.ROPE;
STREAM: TYPE = IO.STREAM;
Variables
expungeCommitFrequency: INT = 20;
commitFrequency: INT ← expungeCommitFrequency;
Procedures
Space conservation (removing unreferenced messages)
GetExpungeInfo: PUBLIC ENTRY PROC RETURNS[
firstDestroyedMsgPos, bytesInDestroyedMsgs: INT] = {
ENABLE UNWIND => NULL;
Gei: PROC =
{ [ firstDestroyedMsgPos, bytesInDestroyedMsgs] ← WalnutDB.GetExpungeInfo[] };
WalnutOpsInternal.CheckInProgress[];
WalnutOpsInternal.CarefullyApply[Gei, FALSE]
};
ExpungeMsgs: PUBLIC ENTRY PROC[deletedVersion: INT]
RETURNS[bytesInDestroyedMsgs: INT] = {
ENABLE UNWIND => NULL;
Expm: INTERNAL PROC[inProgress: BOOL] = {
IF ~inProgress THEN {
at: INT;
[] ← WalnutDB.VerifyMsgSet[[WalnutOps.DeletedMsgSetName, deletedVersion]];
at ← WalnutLog.ExpungeMsgs[].at;
WalnutDB.SetOpInProgressPos[at];
};
WalnutDB.ExpungeMsgs[deletedVersion];
bytesInDestroyedMsgs ← WalnutDB.GetExpungeInfo[].bytesInDestroyedMsgs
};
WalnutOpsInternal.CheckInProgress[];
CheckReport["\nDeleting msgs"];
WalnutOpsInternal.LongRunningApply[Expm];
};
CopyToExpungeLog: PUBLIC ENTRY PROC = {
ENABLE UNWIND => NULL;
ExpL: INTERNAL PROC[inProgress: BOOL] = {
IF ~inProgress THEN {
at: INT ← WalnutLog.WriteExpungeLog[].at;   -- write log entry of intent
WalnutDB.SetOpInProgressPos[at];
};
DoLogExpunge[0]
};
WalnutOpsInternal.CheckInProgress[];
WalnutOpsInternal.LongRunningApply[ExpL];
};
GetTimeOfLastExpunge: PUBLIC ENTRY PROC RETURNS[when: BasicTime.GMT] = {
ENABLE UNWIND => NULL;
Gdle: PROC =
{ when ← WalnutDB.GetTimeOfLastExpunge[] };
WalnutOpsInternal.CheckInProgress[];
WalnutOpsInternal.CarefullyApply[Gdle, FALSE]
};
DoLogExpunge: PUBLIC INTERNAL PROC[expungeID: INT] = {
The log is re-written, starting from the beginning. Called from within a LongRunningApply; calling SetLogExpungePhase does a commit
For now, expungeID parameter is not used - may be necessary later
DO
expPhase: WalnutKernelDefs.LogExpungePhase← WalnutDB.GetLogExpungePhase[];
SELECT expPhase FROM
idle => {
WalnutOpsInternal.StatsReport["\n ~~~ Starting log expunge"];
WalnutDB.SetLogExpungePhase[initializingExpungeLog];
WalnutRoot.CommitAndContinue[];
commitFrequency← expungeCommitFrequency;
};
initializingExpungeLog => {
bytesInDestroyedMsgs: INT← WalnutDB.GetExpungeInfo[].bytesInDestroyedMsgs;
logLen: INT← WalnutLog.LogLength[];
pagesNeeded: INTFS.PagesForBytes[logLen-bytesInDestroyedMsgs];
expungeFileID: INT;
WalnutLog.ReturnCurrentLogStreams[];
expungeFileID ← WalnutLogExpunge.StartExpunge[pagesNeeded];
WalnutDB.SetExpungeFileID[expungeFileID];
WalnutDB.SetLogExpungePhase[writingExpungeLog];
WalnutRoot.CommitAndContinue[];
};
writingExpungeLog => {
firstDestroyedMsgPos: INT ← WalnutDB.GetExpungeInfo[].firstDestroyedMsgPos;
almostOldLen: INT ← WalnutDB.GetOpInProgressPos[];
WalnutLog.ReturnCurrentLogStreams[];  -- in case of restart
WalnutOpsInternal.CheckReport[IO.PutFR[
"\n Copying the head (%g bytes) of the log, then dumping the tail (%g bytes)\n",
IO.int[firstDestroyedMsgPos], IO.int[almostOldLen-firstDestroyedMsgPos] ]];
[] ← WriteExpungeLog[];
WalnutLogExpunge.EndExpunge[];
WalnutDB.SetLogExpungePhase[swappingLogs];
WalnutRoot.CommitAndContinue[];
};
swappingLogs => {
expungeFileID: INT ← WalnutDB.GetExpungeFileID[];
date: BasicTime.GMT;
logLen: INT;
WalnutLog.ReturnCurrentLogStreams[];  -- in case of restart
[] ← WalnutRoot.GetStreamsForExpunge[];
WalnutOpsInternal.CheckReport["\n Swapping Log Files\n"];
WalnutDB.SetExpungeProgressInfo[0,0];
WalnutDB.SetExpungeInfo[0, 0];
WalnutDB.SetOpInProgressPos[-1];
WalnutDB.SetLogExpungePhase[idle];  -- whew
WalnutDB.SetTimeOfLastExpunge[BasicTime.Now[]];
WalnutDB.SetRootFileVersion[date ← BasicTime.Now[]];
logLen ← WalnutRoot.SwapLogs[expungeFileID, date];
WalnutRoot.CommitAndContinue[];
WalnutOpsInternal.StatsReport["\n ~~~ Finished expunge"];
WalnutLog.OpenLogStreams[];
RETURN;
};
ENDCASE => ERROR;
ENDLOOP;
};
WriteExpungeLog: PROC = {
called from within catch phrases, but must do its own error catching, since the expunge Log cannot be flushed after every write - that makes too much traffic on the alpine server;
reads the old log, starting at currentLogPos, copies entries to the expungeLog, changing msg entryStart pointers as it goes, doing ocasional commits of both the expungeLog and the database
numMsgs: INT ← 0;
currentLogPos, expungeLogPos: INT;
previousAt, at: INT;
startRetryCount: INT = 2;
retryCount: INT ← startRetryCount;
bytesToCopyBeforeFlush: INT ← 200000;
lastExpungeCommitLength: INT ← -1;
DO
BEGIN ENABLE WalnutDefs.Error =>
IF code = $TransactionAbort THEN GOTO retry ELSE REJECT;
bytesCopiedSinceLastCommit: INT ← 0;
lastAcceptNewMailPos: INT;
firstDestroyedMsgPos: INT ← WalnutDB.GetExpungeInfo[].firstDestroyedMsgPos;
[currentLogPos, expungeLogPos] ← WalnutDB.GetExpungeProgressInfo[];
IF lastExpungeCommitLength = expungeLogPos THEN {
IF (retryCount ← retryCount - 1) <= 0 THEN
ERROR WalnutDefs.Error[$log, $TooManyLogAborts,
IO.PutFR["\n During Expunge - last commit pos in expungeLog was %g",
IO.int[expungeLogPos]]];
}
ELSE {
lastExpungeCommitLength ← expungeLogPos;
retryCount ← startRetryCount;
};
IF ~WalnutLogExpunge.RestartExpunge[currentLogPos, expungeLogPos] THEN {
pagesNeeded: INTFS.PagesForBytes[WalnutLogExpunge.LogLength[] - WalnutDB.GetExpungeInfo[].bytesInDestroyedMsgs];
WalnutOpsInternal.CheckReport[IO.PutFR[
"\n ****Could not restart expunge from %g in expungeLog, %g in currentLog",
IO.int[expungeLogPos], IO.int[currentLogPos]], "\n restarting Log Expunge"];
expungeLogPos ← currentLogPos ← 0;
WalnutDB.SetExpungeProgressInfo[currentLogPos, expungeLogPos]; -- commits
[] ← WalnutLogExpunge.StartExpunge[pagesNeeded];   -- try again
WalnutOpsInternal.StatsReport["Restarting expunge from beginning"];
IF ~WalnutLogExpunge.RestartExpunge[currentLogPos, expungeLogPos] THEN
WalnutDefs.Error[$log, $CantStartExpunge];
lastExpungeCommitLength ← expungeLogPos;
};
IF currentLogPos < firstDestroyedMsgPos THEN {  -- copyingHead phase
bytesLeftToCopyInHead: INT ← firstDestroyedMsgPos - currentLogPos;
bytesToCopyAtOneTime: INT = FS.BytesForPages[400];
IF currentLogPos = 0 THEN {
WalnutLogExpunge.SetIndex[0];
[] ← WalnutLogExpunge.SkipEntry[];  -- need to skip the LogFileInfo entry
bytesLeftToCopyInHead ←
bytesLeftToCopyInHead - WalnutLogExpunge.GetIndex[];
};
WalnutOpsInternal.CheckReport[
IO.PutFR["\n Copying the head of the log, starting at bytePos %g, ending at bytePos %g\n",
IO.int[currentLogPos], IO.int[firstDestroyedMsgPos] ]];
DO
bytesToCopyThisTime: INTMIN[bytesLeftToCopyInHead, bytesToCopyAtOneTime];
WalnutLogExpunge.CopyBytesToExpungeLog[bytesToCopyThisTime];
[currentLogPos, expungeLogPos] ← WalnutLogExpunge.GetExpungeProgress[];
WalnutDB.SetExpungeProgressInfo[currentLogPos, expungeLogPos];
WalnutRoot.CommitAndContinue[];
WalnutOpsInternal.CheckReport["#"];
bytesLeftToCopyInHead ← bytesLeftToCopyInHead - bytesToCopyThisTime;
IF bytesLeftToCopyInHead = 0 THEN EXIT;
ENDLOOP;
retryCount ← startRetryCount;  -- in case has abort during copy
};
lastAcceptNewMailPos can't be before firstDestroyedMsgPos
lastAcceptNewMailPos ← WalnutDB.GetAcceptNewMailPos[];
WalnutLogExpunge.SetIndex[at ← currentLogPos];  -- before doing PeekEntry
WalnutOpsInternal.CheckReport[
IO.PutFR["\nProcessing the tail of the log, starting at bytePos %g\n", IO.int[at] ]];
DO
ident: ATOM;
msgID: REF TEXT;
bytesThisCopy: INT ← 0;
previousAt ← at;
[ident, msgID, at] ← WalnutLogExpunge.PeekEntry[];
IF ident = NIL THEN
IF at # -1 THEN EXIT  -- end of log
ELSE {
charSkipped: INT ← WalnutLogExpunge.SetPosition[previousAt+1];
IF charSkipped = -1 THEN
ERROR WalnutDefs.Error[$log, $DuringExpunge,
IO.PutFR["No entry found after %g", IO.int[previousAt]]];
at ← previousAt + charSkipped;
LOOP;
};
SELECT ident FROM
$ExpungeMsgs => [] ← WalnutLogExpunge.SkipEntry[];
$WriteExpungeLog => EXIT;  -- must be last entry on PREVIOUS LOG
$LogFileInfo => [] ← WalnutLogExpunge.SkipEntry[];
$RecordNewMailInfo, $StartCopyNewMail =>
IF at < lastAcceptNewMailPos THEN [] ← WalnutLogExpunge.SkipEntry[]
ELSE bytesThisCopy ← WalnutLogExpunge.CopyEntry[].bytesCopied;
$EndCopyNewMailInfo => {
IF at < lastAcceptNewMailPos THEN {  -- maybe skip
startCopyPos: INT ← WalnutLogExpunge.EndCopyEntry[];
IF firstDestroyedMsgPos < startCopyPos THEN [] ← WalnutLogExpunge.SkipEntry[]
ELSE bytesThisCopy ← WalnutLogExpunge.CopyEntry[].bytesCopied;
}
ELSE bytesThisCopy ← WalnutLogExpunge.CopyEntry[].bytesCopied;
};
$AcceptNewMail =>
IF at < lastAcceptNewMailPos THEN [] ← WalnutLogExpunge.SkipEntry[]
ELSE {
newLogPos: INT;
[newLogPos, bytesThisCopy] ← WalnutLogExpunge.CopyEntry[];
WalnutDB.SetAcceptNewMailPos[newLogPos];
};
$StartCopyReadArchive => [] ← WalnutLogExpunge.SkipEntry[];
$EndCopyReadArchiveInfo => {
startCopyPos: INT ← WalnutLogExpunge.EndCopyEntry[];
IF firstDestroyedMsgPos < startCopyPos THEN [] ← WalnutLogExpunge.SkipEntry[]
ELSE bytesThisCopy ← WalnutLogExpunge.CopyEntry[].bytesCopied;
};
ENDCASE =>
IF RefText.Length[msgID] = 0 OR
WalnutDB.MsgExists[Rope.FromRefText[msgID]] THEN {
the next entry either has nothing to do with a message or it is an operation on a message that still exists
copy the entry to the new log
newLogPos: INT ;
[newLogPos, bytesThisCopy] ← WalnutLogExpunge.CopyEntry[];
IF ident = $CreateMsg THEN {
it is the message body itself; record the change in position in the database
IF newLogPos # at THEN WalnutDB.SetMsgEntryPosition[to: newLogPos];
IF (numMsgs ← numMsgs + 1) MOD 10 = 0 THEN
IF numMsgs MOD 100 = 0 THEN
WalnutOpsInternal.CheckReport[IO.PutFR["(%g)", IO.int[numMsgs]]]
ELSE WalnutOpsInternal.CheckReport["."];
};
}
if you haven't consumed the entry by copying it, then just advance the log pointer
ELSE [] ← WalnutLogExpunge.SkipEntry[];
IF (bytesCopiedSinceLastCommit ← bytesCopiedSinceLastCommit + bytesThisCopy) >= bytesToCopyBeforeFlush THEN {
expLogPos: INT;
[currentLogPos, expLogPos] ← WalnutLogExpunge.GetExpungeProgress[];
WalnutDB.SetExpungeProgressInfo[currentLogPos, expLogPos];
WalnutRoot.CommitAndContinue[];
lastExpungeCommitLength ← expLogPos;
bytesCopiedSinceLastCommit ← 0;
};
ENDLOOP;
Commit the last batch of updates
IF bytesCopiedSinceLastCommit > 0 THEN {
[currentLogPos, expungeLogPos] ← WalnutLogExpunge.GetExpungeProgress[];
WalnutDB.SetExpungeProgressInfo[currentLogPos, expungeLogPos];
WalnutRoot.CommitAndContinue[];
lastExpungeCommitLength ← expungeLogPos;
};
RETURN;
EXITS
retry => {
schemaInvalid: BOOLTRUE;
WalnutLogExpunge.Shutdown[];
WalnutRoot.AbortTransaction[];
schemaInvalid ← WalnutRoot.StartTransaction[];
IF schemaInvalid THEN WalnutDB.InitSchema[walnutSegment];
WalnutLog.OpenLogStreams[];  -- awkward
WalnutLog.ReturnCurrentLogStreams[];
};
END;
ENDLOOP;
};
END.