WriteExpungeLog:
PROC = {
called from within catch phrases, but must do its own error catching, since the expunge Log cannot be flushed after every write - that makes too much traffic on the alpine server;
reads the old log, starting at currentLogPos, copies entries to the expungeLog, changing msg entryStart pointers as it goes, doing ocasional commits of both the expungeLog and the database
numMsgs: INT ← 0;
currentLogPos, expungeLogPos: INT;
previousAt, at: INT;
startRetryCount: INT = 2;
retryCount: INT ← startRetryCount;
bytesToCopyBeforeFlush: INT ← 200000;
lastExpungeCommitLength: INT ← -1;
DO
BEGIN
ENABLE WalnutDefs.Error =>
IF code = $TransactionAbort THEN GOTO retry ELSE REJECT;
bytesCopiedSinceLastCommit: INT ← 0;
lastAcceptNewMailPos: INT;
firstDestroyedMsgPos: INT ← WalnutDB.GetExpungeInfo[].firstDestroyedMsgPos;
[currentLogPos, expungeLogPos] ← WalnutDB.GetExpungeProgressInfo[];
IF lastExpungeCommitLength = expungeLogPos
THEN {
IF (retryCount ← retryCount - 1) <= 0
THEN
ERROR WalnutDefs.Error[$log, $TooManyLogAborts,
IO.PutFR["\n During Expunge - last commit pos in expungeLog was %g",
IO.int[expungeLogPos]]];
}
ELSE {
lastExpungeCommitLength ← expungeLogPos;
retryCount ← startRetryCount;
};
IF ~WalnutLogExpunge.RestartExpunge[currentLogPos, expungeLogPos]
THEN {
pagesNeeded: INT ← FS.PagesForBytes[WalnutLogExpunge.ExpLogLength[] - WalnutDB.GetExpungeInfo[].bytesInDestroyedMsgs];
WalnutOpsInternal.CheckReport[
IO.PutFR[
"\n ****Could not restart expunge from %g in expungeLog, %g in currentLog",
IO.int[expungeLogPos], IO.int[currentLogPos]], "\n restarting Log Expunge"];
expungeLogPos ← currentLogPos ← 0;
WalnutDB.SetExpungeProgressInfo[currentLogPos, expungeLogPos]; -- commits
[] ← WalnutLogExpunge.StartExpunge[pagesNeeded]; -- try again
WalnutOpsInternal.StatsReport["Restarting expunge from beginning"];
IF ~WalnutLogExpunge.RestartExpunge[currentLogPos, expungeLogPos]
THEN
WalnutDefs.Error[$log, $CantStartExpunge];
lastExpungeCommitLength ← expungeLogPos;
};
IF currentLogPos < firstDestroyedMsgPos
THEN {
-- copyingHead phase
bytesLeftToCopyInHead: INT ← firstDestroyedMsgPos - currentLogPos;
bytesToCopyAtOneTime: INT = FS.BytesForPages[400];
IF currentLogPos = 0
THEN {
WalnutLogExpunge.ExpSetIndex[0];
[] ← WalnutLogExpunge.SkipEntry[]; -- need to skip the LogFileInfo entry
bytesLeftToCopyInHead ←
bytesLeftToCopyInHead - WalnutLogExpunge.GetIndex[];
};
WalnutOpsInternal.CheckReport[
IO.PutFR["\n Copying the head of the log, starting at bytePos %g, ending at bytePos %g\n",
IO.int[currentLogPos], IO.int[firstDestroyedMsgPos] ]];
DO
bytesToCopyThisTime: INT ← MIN[bytesLeftToCopyInHead, bytesToCopyAtOneTime];
WalnutLogExpunge.CopyBytesToExpungeLog[bytesToCopyThisTime];
[currentLogPos, expungeLogPos] ← WalnutLogExpunge.GetExpungeProgress[];
WalnutDB.SetExpungeProgressInfo[currentLogPos, expungeLogPos];
WalnutRoot.CommitAndContinue[];
WalnutOpsInternal.CheckReport["#"];
bytesLeftToCopyInHead ← bytesLeftToCopyInHead - bytesToCopyThisTime;
IF bytesLeftToCopyInHead = 0 THEN EXIT;
ENDLOOP;
retryCount ← startRetryCount; -- in case has abort during copy
};
lastAcceptNewMailPos can't be before firstDestroyedMsgPos
lastAcceptNewMailPos ← WalnutDB.GetAcceptNewMailPos[];
WalnutLogExpunge.ExpSetIndex[at ← currentLogPos]; -- before doing PeekEntry
WalnutOpsInternal.CheckReport[
IO.PutFR["\nProcessing the tail of the log, starting at bytePos %g\n", IO.int[at] ]];
DO
ident: ATOM;
msgID: REF TEXT;
bytesThisCopy: INT ← 0;
previousAt ← at;
[ident, msgID, at] ← WalnutLogExpunge.PeekEntry[];
IF ident =
NIL
THEN
IF at # -1 THEN EXIT -- end of log
ELSE {
charSkipped: INT ← WalnutLogExpunge.ExpSetPosition[previousAt+1];
IF charSkipped = -1
THEN
ERROR WalnutDefs.Error[$log, $DuringExpunge,
IO.PutFR["No entry found after %g", IO.int[previousAt]]];
at ← previousAt + charSkipped;
LOOP;
};
SELECT ident
FROM
$ExpungeMsgs => [] ← WalnutLogExpunge.SkipEntry[];
$WriteExpungeLog => EXIT; -- must be last entry on PREVIOUS LOG
$LogFileInfo => [] ← WalnutLogExpunge.SkipEntry[];
$RecordNewMailInfo, $StartCopyNewMail =>
IF at < lastAcceptNewMailPos
THEN [] ← WalnutLogExpunge.SkipEntry[]
ELSE bytesThisCopy ← WalnutLogExpunge.CopyEntry[].bytesCopied;
$EndCopyNewMailInfo => {
IF at < lastAcceptNewMailPos
THEN {
-- maybe skip
startCopyPos: INT ← WalnutLogExpunge.EndCopyEntry[];
IF firstDestroyedMsgPos < startCopyPos
THEN [] ← WalnutLogExpunge.SkipEntry[]
ELSE bytesThisCopy ← WalnutLogExpunge.CopyEntry[].bytesCopied;
}
ELSE bytesThisCopy ← WalnutLogExpunge.CopyEntry[].bytesCopied;
};
$AcceptNewMail =>
IF at < lastAcceptNewMailPos
THEN [] ← WalnutLogExpunge.SkipEntry[]
ELSE {
newLogPos: INT;
[newLogPos, bytesThisCopy] ← WalnutLogExpunge.CopyEntry[];
WalnutDB.SetAcceptNewMailPos[newLogPos];
};
$StartCopyReadArchive => [] ← WalnutLogExpunge.SkipEntry[];
$EndCopyReadArchiveInfo => {
startCopyPos: INT ← WalnutLogExpunge.EndCopyEntry[];
IF firstDestroyedMsgPos < startCopyPos
THEN [] ← WalnutLogExpunge.SkipEntry[]
ELSE bytesThisCopy ← WalnutLogExpunge.CopyEntry[].bytesCopied;
};
ENDCASE =>
IF RefText.Length[msgID] = 0
OR
WalnutDB.MsgExists[Rope.FromRefText[msgID]] THEN {
the next entry either has nothing to do with a message or it is an operation on a message that still exists
copy the entry to the new log
newLogPos: INT ;
[newLogPos, bytesThisCopy] ← WalnutLogExpunge.CopyEntry[];
IF ident = $CreateMsg
THEN {
it is the message body itself; record the change in position in the database
IF newLogPos # at THEN WalnutDB.SetMsgEntryPosition[to: newLogPos];
IF (numMsgs ← numMsgs + 1)
MOD 10 = 0
THEN
IF numMsgs
MOD 100 = 0
THEN
WalnutOpsInternal.CheckReport[
IO.PutFR["(%g)",
IO.int[numMsgs]]]
ELSE WalnutOpsInternal.CheckReport["."];
};
}
if you haven't consumed the entry by copying it, then just advance the log pointer
ELSE [] ← WalnutLogExpunge.SkipEntry[];
IF (bytesCopiedSinceLastCommit ← bytesCopiedSinceLastCommit + bytesThisCopy) >= bytesToCopyBeforeFlush
THEN {
expLogPos: INT;
[currentLogPos, expLogPos] ← WalnutLogExpunge.GetExpungeProgress[];
WalnutDB.SetExpungeProgressInfo[currentLogPos, expLogPos];
WalnutRoot.CommitAndContinue[];
lastExpungeCommitLength ← expLogPos;
bytesCopiedSinceLastCommit ← 0;
};
ENDLOOP;
Commit the last batch of updates
IF bytesCopiedSinceLastCommit > 0
THEN {
[currentLogPos, expungeLogPos] ← WalnutLogExpunge.GetExpungeProgress[];
WalnutDB.SetExpungeProgressInfo[currentLogPos, expungeLogPos];
WalnutRoot.CommitAndContinue[];
lastExpungeCommitLength ← expungeLogPos;
};
RETURN;
EXITS
retry => {
schemaInvalid: BOOL ← TRUE;
WalnutLogExpunge.ExpShutdown[];
WalnutRoot.AbortTransaction[];
schemaInvalid ← WalnutRoot.StartTransaction[];
IF schemaInvalid THEN WalnutDB.InitSchema[walnutSegment];
WalnutLog.OpenLogStreams[]; -- awkward
WalnutLog.ReturnCurrentLogStreams[];
};
END;
ENDLOOP;
};