WalnutOpsInternalImpl.mesa
Copyright © 1984 by Xerox Corporation. All rights reserved.
Willie-Sue, September 17, 1986 11:20:07 am PDT
Donahue, July 16, 1985 2:25:35 pm PDT
Implementation of procedures of WalnutOpsInternal
Last Edited by: Willie-sue, January 10, 1985 3:55:08 pm PST
Last Edited by: Donahue, December 11, 1984 9:01:10 pm PST
DIRECTORY
BasicTime USING [GMT, nullGMT, Now],
FS USING [BytesForPages, PagesForBytes, StreamOpen],
IO,
RefText USING [Length],
Rope,
UserCredentials USING [Get],
ViewerClasses USING [Viewer],
ViewerIO USING [CreateViewerStreams, GetViewerFromStream],
ViewerOps USING [FindViewer],
WalnutDB --using lots-- ,
WalnutDefs USING [Error, Segment, VersionMismatch],
WalnutKernelDefs USING [LogEntry, LogExpungePhase, MsgLogEntry, WhichTempLog],
WalnutLog  --using lots-- ,
WalnutLogExpunge  --using lots-- ,
WalnutOps,
WalnutOpsInternal,
WalnutRoot USING [AbortTransaction, AcquireWriteLock, CommitAndContinue,
 GetStreamsForExpunge, RegisterStatsProc, StartTransaction, SwapLogs];
WalnutOpsInternalImpl: CEDAR PROGRAM
IMPORTS
FS, IO, RefText, Rope, UserCredentials, ViewerIO, ViewerOps,
WalnutDB, WalnutDefs,
WalnutLog, WalnutLogExpunge,
WalnutOpsInternal, WalnutRoot
EXPORTS WalnutOpsInternal
= BEGIN OPEN WalnutOps, WalnutOpsInternal;
Types
ROPE: TYPE = Rope.ROPE;
STREAM: TYPE = IO.STREAM;
GMT: TYPE = BasicTime.GMT;
Signals
Error: PUBLIC SIGNAL [who, code: ATOM, explanation: ROPENIL];
Variables
statsStream, statsProgressStream: PUBLIC STREAMNIL;
heavyhandedDebugging: BOOLFALSE;
statsProgressTS: PUBLIC ViewerClasses.Viewer ← NIL;
mailStream: PUBLIC STREAM;
expungeCommitFrequency: INT = 20;
commitFrequency: INT ← expungeCommitFrequency;
parseCommitFrequency: INT = 60;
dontCareMsgSetVersion: INT = WalnutOps.dontCareMsgSetVersion;
dontCareDomainVersion: INT = WalnutOps.dontCareDomainVersion;
Procedures
These procedures are the primitive atomic actions out of which Walnut is built. More complex operations will be found in WalnutClientOps (someday).
-- * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
StartStatsReporting: PUBLIC PROC = {
statsFile: ROPE ← Rope.Cat["///Users/", UserCredentials.Get[].name, "/WalnutStats.log"];
IF statsStream # NIL THEN statsStream.Close[ ! IO.Error => CONTINUE];
statsStream ← FS.StreamOpen[fileName: statsFile, accessOptions: $create, keep: 4];
IF heavyhandedDebugging THEN {
name: ROPE = "Walnut Stats";
statsProgressTS ← ViewerOps.FindViewer[name];
statsProgressStream ← ViewerIO.CreateViewerStreams[name, statsProgressTS].out;
IF statsProgressTS = NIL THEN
statsProgressTS ← ViewerIO.GetViewerFromStream[statsProgressStream];
statsProgressTS.inhibitDestroy ← TRUE;
};
WalnutRoot.RegisterStatsProc[StatsReport];
};
StatsReport: PUBLIC PROC[msg: ROPE] = {
r: ROPE;
IF statsStream = NIL THEN RETURN;
statsStream.PutRope[r ← IO.PutFR["\n %g @ %g", IO.rope[msg], IO.time[]]];
IF statsProgressStream # NIL THEN statsProgressStream.PutRope[r];
};
Utilities
CarefullyApply: PUBLIC PROC[proc: PROC[], didUpdate: BOOL] = {
reTryCount: INT ← 2;
schemaInvalid: BOOLTRUE;
recentActivity ← TRUE;
IF ~started THEN
ERROR WalnutDefs.Error[$root, $NotStarted, "Must do WalnutOps.Startup"];
DO
BEGIN ENABLE
WalnutDefs.Error => {
IF code = $TransactionAbort THEN {
StatsReport[" **** TransactionAbort"];
IF ( reTryCount ← reTryCount - 1) > 0 THEN GOTO retry;
errorInProgress ← TRUE;
StatsReport[" *** Too many TransactionAbort's during WalnutOps call"];
REJECT
};
errorInProgress ← TRUE;
StatsReport[IO.PutFR[" *** WalnutDefs.Error: %g", IO.atom[code]]];
REJECT
};
proc[];
IF didUpdate THEN WalnutRoot.CommitAndContinue[];
RETURN;
EXITS
retry => NULL;
END;
WalnutLog.ForgetLogStreams[];
WalnutRoot.AbortTransaction[];
IF (schemaInvalid ← WalnutRoot.StartTransaction[]) THEN
  WalnutDB.InitSchema[walnutSegment];
WalnutLog.OpenLogStreams[];
ENDLOOP;
};
LongRunningApply: PUBLIC PROC[proc: PROC[inProgress: BOOL]] = {
reTryCount: INT ← 2;
alreadyCalled: BOOLFALSE;
schemaInvalid: BOOLTRUE;
recentActivity ← TRUE;
isShutdown ← FALSE;
IF ~started THEN
ERROR WalnutDefs.Error[$root, $NotStarted, "Must do WalnutOps.Startup"];
DO
BEGIN ENABLE
WalnutDefs.Error => {
IF code = $TransactionAbort THEN {
StatsReport[" **** TransactionAbort"];
IF ( reTryCount ← reTryCount - 1) > 0 THEN GOTO retry;
errorInProgress ← TRUE;
StatsReport[" *** Too many TransactionAbort's during LongRunning op"];
REJECT
};
errorInProgress ← TRUE;
StatsReport[IO.PutFR[" *** WalnutDefs.Error: %g", IO.atom[code]]];
REJECT
};
WalnutLog.AcquireWriteLock[];
proc[alreadyCalled ! WalnutDefs.VersionMismatch => { ReleaseWriteLock[]; REJECT} ];
ReleaseWriteLock[];  -- also does commit
RETURN;
EXITS
retry => NULL;
END;
WalnutLog.ForgetLogStreams[];
WalnutRoot.AbortTransaction[];
IF (schemaInvalid ← WalnutRoot.StartTransaction[]) THEN
WalnutDB.InitSchema[walnutSegment];
WalnutLog.OpenLogStreams[];
alreadyCalled ← WalnutDB.GetOpInProgressPos[] > 0;
ENDLOOP;
};
Restart: PUBLIC PROC = {  -- need to re-open the transaction
schemaInvalid: BOOL = WalnutRoot.StartTransaction[];
IF schemaInvalid THEN WalnutDB.InitSchema[walnutSegment];
WalnutLog.OpenLogStreams[];
isShutdown ← FALSE;
};
ReleaseWriteLock: PROC = {
WalnutRoot.CommitAndContinue[];
WalnutLog.ReleaseWriteLock[];
};
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
Taken from WalnutOpsExpungeImpl
DoLogExpunge: PUBLIC PROC[expungeID: INT] = {
The log is re-written, starting from the beginning. Called from within a LongRunningApply; calling SetLogExpungePhase does a commit
For now, expungeID parameter is not used - may be necessary later
DO
expPhase: WalnutKernelDefs.LogExpungePhase← WalnutDB.GetLogExpungePhase[];
SELECT expPhase FROM
idle => {
WalnutOpsInternal.StatsReport["\n ~~~ Starting log expunge"];
WalnutDB.SetLogExpungePhase[initializingExpungeLog];
WalnutRoot.CommitAndContinue[];
commitFrequency← expungeCommitFrequency;
};
initializingExpungeLog => {
bytesInDestroyedMsgs: INT← WalnutDB.GetExpungeInfo[].bytesInDestroyedMsgs;
logLen: INT← WalnutLog.LogLength[];
pagesNeeded: INTFS.PagesForBytes[logLen-bytesInDestroyedMsgs];
expungeFileID: INT;
WalnutLog.ReturnCurrentLogStreams[];
expungeFileID ← WalnutLogExpunge.StartExpunge[pagesNeeded];
WalnutDB.SetExpungeFileID[expungeFileID];
WalnutDB.SetLogExpungePhase[writingExpungeLog];
WalnutRoot.CommitAndContinue[];
};
writingExpungeLog => {
firstDestroyedMsgPos: INT ← WalnutDB.GetExpungeInfo[].firstDestroyedMsgPos;
almostOldLen: INT ← WalnutDB.GetOpInProgressPos[];
WalnutLog.ReturnCurrentLogStreams[];  -- in case of restart
WalnutOpsInternal.CheckReport[IO.PutFR[
"\n Copying the head (%g bytes) of the log, then dumping the tail (%g bytes)\n",
IO.int[firstDestroyedMsgPos], IO.int[almostOldLen-firstDestroyedMsgPos] ]];
[] ← WriteExpungeLog[];
WalnutLogExpunge.EndExpunge[];
WalnutDB.SetLogExpungePhase[swappingLogs];
WalnutRoot.CommitAndContinue[];
};
swappingLogs => {
expungeFileID: INT ← WalnutDB.GetExpungeFileID[];
date: BasicTime.GMT;
logLen: INT;
WalnutLog.ReturnCurrentLogStreams[];  -- in case of restart
[] ← WalnutRoot.GetStreamsForExpunge[];
WalnutOpsInternal.CheckReport["\n Swapping Log Files\n"];
WalnutDB.SetExpungeProgressInfo[0,0];
WalnutDB.SetExpungeInfo[0, 0];
WalnutDB.SetOpInProgressPos[-1];
WalnutDB.SetLogExpungePhase[idle];  -- whew
WalnutDB.SetTimeOfLastExpunge[BasicTime.Now[]];
WalnutDB.SetRootFileVersion[date ← BasicTime.Now[]];
logLen ← WalnutRoot.SwapLogs[expungeFileID, date];
WalnutRoot.CommitAndContinue[];
WalnutOpsInternal.StatsReport["\n ~~~ Finished expunge"];
WalnutLog.OpenLogStreams[];
RETURN;
};
ENDCASE => ERROR;
ENDLOOP;
};
WriteExpungeLog: PROC = {
called from within catch phrases, but must do its own error catching, since the expunge Log cannot be flushed after every write - that makes too much traffic on the alpine server;
reads the old log, starting at currentLogPos, copies entries to the expungeLog, changing msg entryStart pointers as it goes, doing ocasional commits of both the expungeLog and the database
numMsgs: INT ← 0;
currentLogPos, expungeLogPos: INT;
previousAt, at: INT;
startRetryCount: INT = 2;
retryCount: INT ← startRetryCount;
bytesToCopyBeforeFlush: INT ← 200000;
lastExpungeCommitLength: INT ← -1;
DO
BEGIN ENABLE WalnutDefs.Error =>
IF code = $TransactionAbort THEN GOTO retry ELSE REJECT;
bytesCopiedSinceLastCommit: INT ← 0;
lastAcceptNewMailPos: INT;
firstDestroyedMsgPos: INT ← WalnutDB.GetExpungeInfo[].firstDestroyedMsgPos;
[currentLogPos, expungeLogPos] ← WalnutDB.GetExpungeProgressInfo[];
IF lastExpungeCommitLength = expungeLogPos THEN {
IF (retryCount ← retryCount - 1) <= 0 THEN
ERROR WalnutDefs.Error[$log, $TooManyLogAborts,
IO.PutFR["\n During Expunge - last commit pos in expungeLog was %g",
IO.int[expungeLogPos]]];
}
ELSE {
lastExpungeCommitLength ← expungeLogPos;
retryCount ← startRetryCount;
};
IF ~WalnutLogExpunge.RestartExpunge[currentLogPos, expungeLogPos] THEN {
pagesNeeded: INTFS.PagesForBytes[WalnutLogExpunge.ExpLogLength[] - WalnutDB.GetExpungeInfo[].bytesInDestroyedMsgs];
WalnutOpsInternal.CheckReport[IO.PutFR[
"\n ****Could not restart expunge from %g in expungeLog, %g in currentLog",
IO.int[expungeLogPos], IO.int[currentLogPos]], "\n restarting Log Expunge"];
expungeLogPos ← currentLogPos ← 0;
WalnutDB.SetExpungeProgressInfo[currentLogPos, expungeLogPos]; -- commits
[] ← WalnutLogExpunge.StartExpunge[pagesNeeded];   -- try again
WalnutOpsInternal.StatsReport["Restarting expunge from beginning"];
IF ~WalnutLogExpunge.RestartExpunge[currentLogPos, expungeLogPos] THEN
WalnutDefs.Error[$log, $CantStartExpunge];
lastExpungeCommitLength ← expungeLogPos;
};
IF currentLogPos < firstDestroyedMsgPos THEN {  -- copyingHead phase
bytesLeftToCopyInHead: INT ← firstDestroyedMsgPos - currentLogPos;
bytesToCopyAtOneTime: INT = FS.BytesForPages[400];
IF currentLogPos = 0 THEN {
WalnutLogExpunge.ExpSetIndex[0];
[] ← WalnutLogExpunge.SkipEntry[];  -- need to skip the LogFileInfo entry
bytesLeftToCopyInHead ←
bytesLeftToCopyInHead - WalnutLogExpunge.GetIndex[];
};
WalnutOpsInternal.CheckReport[
IO.PutFR["\n Copying the head of the log, starting at bytePos %g, ending at bytePos %g\n",
IO.int[currentLogPos], IO.int[firstDestroyedMsgPos] ]];
DO
bytesToCopyThisTime: INTMIN[bytesLeftToCopyInHead, bytesToCopyAtOneTime];
WalnutLogExpunge.CopyBytesToExpungeLog[bytesToCopyThisTime];
[currentLogPos, expungeLogPos] ← WalnutLogExpunge.GetExpungeProgress[];
WalnutDB.SetExpungeProgressInfo[currentLogPos, expungeLogPos];
WalnutRoot.CommitAndContinue[];
WalnutOpsInternal.CheckReport["#"];
bytesLeftToCopyInHead ← bytesLeftToCopyInHead - bytesToCopyThisTime;
IF bytesLeftToCopyInHead = 0 THEN EXIT;
ENDLOOP;
retryCount ← startRetryCount;  -- in case has abort during copy
};
lastAcceptNewMailPos can't be before firstDestroyedMsgPos
lastAcceptNewMailPos ← WalnutDB.GetAcceptNewMailPos[];
WalnutLogExpunge.ExpSetIndex[at ← currentLogPos];  -- before doing PeekEntry
WalnutOpsInternal.CheckReport[
IO.PutFR["\nProcessing the tail of the log, starting at bytePos %g\n", IO.int[at] ]];
DO
ident: ATOM;
msgID: REF TEXT;
bytesThisCopy: INT ← 0;
previousAt ← at;
[ident, msgID, at] ← WalnutLogExpunge.PeekEntry[];
IF ident = NIL THEN
IF at # -1 THEN EXIT  -- end of log
ELSE {
charSkipped: INT ← WalnutLogExpunge.ExpSetPosition[previousAt+1];
IF charSkipped = -1 THEN
ERROR WalnutDefs.Error[$log, $DuringExpunge,
IO.PutFR["No entry found after %g", IO.int[previousAt]]];
at ← previousAt + charSkipped;
LOOP;
};
SELECT ident FROM
$ExpungeMsgs => [] ← WalnutLogExpunge.SkipEntry[];
$WriteExpungeLog => EXIT;  -- must be last entry on PREVIOUS LOG
$LogFileInfo => [] ← WalnutLogExpunge.SkipEntry[];
$RecordNewMailInfo, $StartCopyNewMail =>
IF at < lastAcceptNewMailPos THEN [] ← WalnutLogExpunge.SkipEntry[]
ELSE bytesThisCopy ← WalnutLogExpunge.CopyEntry[].bytesCopied;
$EndCopyNewMailInfo => {
IF at < lastAcceptNewMailPos THEN {  -- maybe skip
startCopyPos: INT ← WalnutLogExpunge.EndCopyEntry[];
IF firstDestroyedMsgPos < startCopyPos THEN [] ← WalnutLogExpunge.SkipEntry[]
ELSE bytesThisCopy ← WalnutLogExpunge.CopyEntry[].bytesCopied;
}
ELSE bytesThisCopy ← WalnutLogExpunge.CopyEntry[].bytesCopied;
};
$AcceptNewMail =>
IF at < lastAcceptNewMailPos THEN [] ← WalnutLogExpunge.SkipEntry[]
ELSE {
newLogPos: INT;
[newLogPos, bytesThisCopy] ← WalnutLogExpunge.CopyEntry[];
WalnutDB.SetAcceptNewMailPos[newLogPos];
};
$StartCopyReadArchive => [] ← WalnutLogExpunge.SkipEntry[];
$EndCopyReadArchiveInfo => {
startCopyPos: INT ← WalnutLogExpunge.EndCopyEntry[];
IF firstDestroyedMsgPos < startCopyPos THEN [] ← WalnutLogExpunge.SkipEntry[]
ELSE bytesThisCopy ← WalnutLogExpunge.CopyEntry[].bytesCopied;
};
ENDCASE =>
IF RefText.Length[msgID] = 0 OR
WalnutDB.MsgExists[Rope.FromRefText[msgID]] THEN {
the next entry either has nothing to do with a message or it is an operation on a message that still exists
copy the entry to the new log
newLogPos: INT ;
[newLogPos, bytesThisCopy] ← WalnutLogExpunge.CopyEntry[];
IF ident = $CreateMsg THEN {
it is the message body itself; record the change in position in the database
IF newLogPos # at THEN WalnutDB.SetMsgEntryPosition[to: newLogPos];
IF (numMsgs ← numMsgs + 1) MOD 10 = 0 THEN
IF numMsgs MOD 100 = 0 THEN
WalnutOpsInternal.CheckReport[IO.PutFR["(%g)", IO.int[numMsgs]]]
ELSE WalnutOpsInternal.CheckReport["."];
};
}
if you haven't consumed the entry by copying it, then just advance the log pointer
ELSE [] ← WalnutLogExpunge.SkipEntry[];
IF (bytesCopiedSinceLastCommit ← bytesCopiedSinceLastCommit + bytesThisCopy) >= bytesToCopyBeforeFlush THEN {
expLogPos: INT;
[currentLogPos, expLogPos] ← WalnutLogExpunge.GetExpungeProgress[];
WalnutDB.SetExpungeProgressInfo[currentLogPos, expLogPos];
WalnutRoot.CommitAndContinue[];
lastExpungeCommitLength ← expLogPos;
bytesCopiedSinceLastCommit ← 0;
};
ENDLOOP;
Commit the last batch of updates
IF bytesCopiedSinceLastCommit > 0 THEN {
[currentLogPos, expungeLogPos] ← WalnutLogExpunge.GetExpungeProgress[];
WalnutDB.SetExpungeProgressInfo[currentLogPos, expungeLogPos];
WalnutRoot.CommitAndContinue[];
lastExpungeCommitLength ← expungeLogPos;
};
RETURN;
EXITS
retry => {
schemaInvalid: BOOLTRUE;
WalnutLogExpunge.ExpShutdown[];
WalnutRoot.AbortTransaction[];
schemaInvalid ← WalnutRoot.StartTransaction[];
IF schemaInvalid THEN WalnutDB.InitSchema[walnutSegment];
WalnutLog.OpenLogStreams[];  -- awkward
WalnutLog.ReturnCurrentLogStreams[];
};
END;
ENDLOOP;
};
Taken from WalnutOpsParseLogImpl
Is some long running operation in progress
CheckInProgress: PUBLIC PROC = {
inProgressPos: INT;
parseInProgress: BOOL;
needsParse: BOOLFALSE;
Cip: PROC = {
IF (parseInProgress ← WalnutDB.GetParseLogInProgress[]) THEN {
WalnutRoot.AcquireWriteLock[]; RETURN };
inProgressPos ← WalnutDB.GetOpInProgressPos[];
IF inProgressPos <= 0 THEN RETURN;
WalnutRoot.AcquireWriteLock[];
};
Fip: PROC[inProgress: BOOL] = { -- FinishInProgress
at: INT;
wle: WalnutKernelDefs.LogEntry;
WalnutLog.SetIndex[inProgressPos];
[wle, at] ← WalnutLog.NextEntry[];
IF at = -1 THEN
ERROR WalnutDefs.Error[$log, $BadLog,
IO.PutFR["No entry at %g", IO.int[inProgressPos]]];
TRUSTED { WITH le: wle SELECT FROM
ExpungeMsgs =>
WalnutDB.ExpungeMsgs[dontCareMsgSetVersion, WalnutOpsInternal.CheckReport];
WriteExpungeLog =>
[]← WalnutOpsInternal.DoLogExpunge[le.internalFileID];
EmptyMsgSet => []← WalnutDB.EmptyMsgSet[
[Rope.FromRefText[le.msgSet], dontCareMsgSetVersion],
WalnutOpsInternal.CheckReport ];
DestroyMsgSet =>
[]← WalnutDB.DestroyMsgSet[
msgSet: [Rope.FromRefText[le.msgSet], dontCareMsgSetVersion],
msDomainVersion: dontCareDomainVersion,
report: WalnutOpsInternal.CheckReport
];
AcceptNewMail => {
WalnutDB.AcceptNewMail[at, dontCareMsgSetVersion];
WalnutOpsInternal.newMailSomewhere ← FALSE;
};
StartCopyNewMail => {
FinishCopyTempLog[newMail, at];
WalnutDB.SetCopyMailLogPos[0];
WalnutRoot.CommitAndContinue[];
WalnutLog.FinishTempLogCopy[newMail];
needsParse ← TRUE;
};
StartReadArchiveFile => {
WalnutLog.ResetLog[at];
WalnutDB.SetReadArchivePos[0];
};
StartCopyReadArchive => {
FinishCopyTempLog[readArchive, at];
WalnutDB.SetCopyReadArchivePos[0];
WalnutRoot.CommitAndContinue[];
WalnutLog.FinishTempLogCopy[readArchive];
needsParse ← TRUE;
};
ENDCASE =>
ERROR WalnutDefs.Error[$db, $ErroneousInProgress,
IO.PutFR["Entry at %g", IO.int[inProgressPos]]];
};  -- end trusted
WalnutDB.SetOpInProgressPos[-1];
WalnutRoot.CommitAndContinue[];
};
IF WalnutOpsInternal.errorInProgress THEN
ERROR WalnutDefs.Error[$db, $InternalError, "Must do Shutdown & Startup"];
IF WalnutOpsInternal.isShutdown THEN WalnutOpsInternal.Restart[];
WalnutOpsInternal.CarefullyApply[Cip, FALSE];
IF parseInProgress THEN {
WalnutOpsInternal.StatsReport["\n Continuing parseInProgress"];
[] ← ParseLog[TRUE];
RETURN
};
IF inProgressPos <= 0 THEN RETURN;
WalnutOpsInternal.StatsReport["\n Continuing longRunningOperation"];
WalnutOpsInternal.LongRunningApply[Fip];
IF needsParse THEN {
numNew: INT ← 0;
WalnutOpsInternal.CheckReport["Adding messages to the database\n"];
numNew ← ParseLog[TRUE];
IF numNew = 0 THEN
WalnutOpsInternal.CheckReport["No messages were new\n"]
ELSE WalnutOpsInternal.CheckReport[
IO.PutFR[" %g new messages\n", IO.int[numNew] ]];
};
};
FinishCopyTempLog: PROC[which: WalnutKernelDefs.WhichTempLog, at: INT] = {
logLen: INT ← WalnutLog.LogLength[];
startedCopyAt, startCopyPos, fromPos: INT;
pagesAlreadyCopied: INT;
IF WalnutLog.SetPosition[at] # 0 THEN
ERROR WalnutDefs.Error[$log, $BadLog, IO.PutFR["no entry at %g", IO.int[at]]];
startCopyPos ← WalnutLog.NextEntry[].at;  -- skip the copy entry
startedCopyAt ← WalnutLog.NextAt[];
fromPos ← logLen - startedCopyAt;
pagesAlreadyCopied ← FS.PagesForBytes[fromPos];
IF ~WalnutLog.PrepareToCopyTempLog[which: which, pagesAlreadyCopied: pagesAlreadyCopied, reportProc: WalnutOpsInternal.CheckReport]
THEN ERROR WalnutDefs.Error[$log, $DuringOpInProgress, "can't get tempLog for copy"];
WalnutOpsInternal.CheckReport[
IO.PutFR["\nContinuing copy of tempLog, starting at bytePos %g\n", IO.int[fromPos]]];
WalnutLog.CopyTempLog[which, startCopyPos, fromPos, WalnutOpsInternal.CheckReport];
WalnutOpsInternal.CheckReport["\n"];
WalnutDB.SetParseLogInProgress[TRUE];
WalnutDB.SetParseLogPos[WalnutDB.GetOpInProgressPos[]];
WalnutDB.SetOpInProgressPos[-1];
};
Parsing the Log - assumes the log is write-locked
ParseLog: PUBLIC PROC[verbose: BOOLFALSE] RETURNS[numNew: INT] = {
parseLogPos: INT;
at: INT;  -- position in log of entry just processed
lastNumNew, savedNumNew, savedLastNumNew: INT← 0;
wle: WalnutLog.LogEntry;
charsSkipped: INT;
errorRope: ROPE = "Can't find a log entry after pos %g (log length is %g)";
retries: INT← 2;
accepted: BOOL;
commitFrequency: INT ← parseCommitFrequency;
updatesSinceLastCommit: INT ← 0;
numNew ← 0;
DO
currentLogLength: INT← WalnutLog.LogLength[]; -- in case at end of Expunge
BEGIN ENABLE WalnutDefs.Error =>
IF code = $TransactionAbort THEN {
WalnutOpsInternal.StatsReport[" *** TransactionAborts during parseLog"];
IF ( retries ← retries - 1) > 0 THEN GOTO tryAgain;
WalnutOpsInternal.errorInProgress ← TRUE;
WalnutOpsInternal.StatsReport[" *** Too many TransactionAborts during parseLog"];
REJECT
}
ELSE {
WalnutOpsInternal.errorInProgress ← TRUE;
WalnutOpsInternal.StatsReport[
IO.PutFR["InternalError: %g during parseLog", IO.atom[code]]];
REJECT
};
parseLogPos ← WalnutDB.GetParseLogPos[];
IF parseLogPos = currentLogLength THEN RETURN[numNew];
charsSkipped ← WalnutLog.SetPosition[parseLogPos];
IF charsSkipped = -1 THEN
ERROR WalnutDefs.Error[$log, $BadLog,
IO.PutFR[errorRope, IO.int[parseLogPos], IO.int[currentLogLength]]];
at ← parseLogPos + charsSkipped;
make sure things are in good shape before starting
IF at # parseLogPos THEN {
WalnutDB.SetParseLogPos[at];
WalnutRoot.CommitAndContinue[];
};
accepted ← NOT WalnutDB.GetAddingServerMsgs[];
DO
previousAt: INT ← at;
[wle, at]← WalnutLog.NextEntry[];
IF at = -1 THEN {
charsSkipped← WalnutLog.SetPosition[previousAt+1];
IF charsSkipped = -1 THEN
ERROR WalnutDefs.Error[$log, $BadLog,
IO.PutFR[errorRope, IO.int[previousAt], IO.int[currentLogLength]]];
[wle, at]← WalnutLog.NextEntry[]
};
IF wle = NIL
THEN IF at = -1 THEN
ERROR WalnutDefs.Error[$log, $BadLog,
IO.PutFR[errorRope, IO.int[previousAt], IO.int[currentLogLength]]]
ELSE {    -- at#-1 AND wle=NIL => at end
WalnutDB.SetParseLogPos[-1];
WalnutDB.SetParseLogInProgress[FALSE];
WalnutRoot.CommitAndContinue[];
RETURN
};
IF (updatesSinceLastCommit← updatesSinceLastCommit + 1) >= commitFrequency THEN {
WalnutDB.SetParseLogPos[at];  -- at is beginning of current (unseen) entry
WalnutRoot.CommitAndContinue[];
updatesSinceLastCommit ← 0;
savedNumNew ← numNew;
savedLastNumNew ← lastNumNew;
};
BEGIN ENABLE WalnutDefs.VersionMismatch => GOTO continue;
TRUSTED { WITH le: wle SELECT FROM
LogFileInfo => NULL;
ExpungeMsgs =>
WalnutDB.ExpungeMsgs[dontCareMsgSetVersion, WalnutOpsInternal.CheckReport];
WriteExpungeLog => {
WalnutDB.SetParseLogPos[-1];
WalnutDB.SetParseLogInProgress[FALSE];
WalnutDB.SetOpInProgressPos[at];  -- at is beginning of current entry
WalnutRoot.CommitAndContinue[];
[] ← WalnutOpsInternal.DoLogExpunge[le.internalFileID];
EXIT;   -- WriteExpungeLog had to be the last entry
};
CreateMsgSet =>
[] ← WalnutDB.CreateMsgSet[
Rope.FromRefText[le.msgSet], dontCareDomainVersion];
EmptyMsgSet =>
[] ← WalnutDB.EmptyMsgSet[
[Rope.FromRefText[le.msgSet], dontCareMsgSetVersion],
WalnutOpsInternal.CheckReport
];
DestroyMsgSet =>
[] ← WalnutDB.DestroyMsgSet[
msgSet: [Rope.FromRefText[le.msgSet], dontCareMsgSetVersion],
msDomainVersion: dontCareDomainVersion,
report: WalnutOpsInternal.CheckReport
];
CreateMsg => {
me: WalnutKernelDefs.MsgLogEntry ← NARROW[wle];
me.show ← accepted;
IF ~WalnutDB.AddNewMsg[me] THEN numNew ← numNew + 1;
};
AddMsg =>
[]← WalnutDB.AddMsg[
msg: Rope.FromRefText[le.msg],
from: [NIL, dontCareMsgSetVersion],
to: [Rope.FromRefText[le.to], dontCareMsgSetVersion] ];
RemoveMsg =>
[]← WalnutDB.RemoveMsg[
msg: Rope.FromRefText[le.msg],
from: [Rope.FromRefText[le.from], dontCareMsgSetVersion],
deletedVersion: dontCareMsgSetVersion ];
MoveMsg =>
[]← WalnutDB.MoveMsg[
msg: Rope.FromRefText[le.msg],
from: [ Rope.FromRefText[le.from], dontCareMsgSetVersion],
to: [Rope.FromRefText[le.to], dontCareMsgSetVersion] ];
HasBeenRead => WalnutDB.SetHasBeenRead[msg: Rope.FromRefText[le.msg]];
RecordNewMailInfo => {
WalnutDB.SetNewMailInfo[
le.logLen, le.when, Rope.FromRefText[le.server], le.num];
WalnutOpsInternal.newMailSomewhere ← TRUE;
};
StartCopyNewMail => {
WalnutDB.SetCopyMailLogPos[at];
WalnutDB.SetAddingServerMsgs[TRUE];
accepted ← FALSE;
};
EndCopyNewMailInfo => {
WalnutDB.SetCopyMailLogPos[0];
WalnutDB.SetNewMailInfo[0, BasicTime.nullGMT, NIL, 0];
WalnutDB.SetAddingServerMsgs[FALSE];
accepted ← TRUE;
};
AcceptNewMail => {
WalnutDB.AcceptNewMail[at, dontCareMsgSetVersion];
WalnutOpsInternal.newMailSomewhere ← FALSE;
};
StartReadArchiveFile => WalnutDB.SetReadArchivePos[at];
EndReadArchiveFile => WalnutDB.SetReadArchivePos[0];
StartCopyReadArchive =>
WalnutDB.SetCopyReadArchivePos[at];
EndCopyReadArchiveInfo => WalnutDB.SetCopyReadArchivePos[0];
ENDCASE => ERROR;
};
EXITS continue => NULL;
END;
IF verbose THEN
IF numNew # lastNumNew THEN
IF numNew MOD 10 = 0 THEN
IF numNew MOD 100 = 0 THEN
WalnutOpsInternal.CheckReport[IO.PutFR["(%g)", IO.int[numNew]]]
ELSE WalnutOpsInternal.CheckReport["~"];
lastNumNew← numNew;
ENDLOOP;
EXITS
tryAgain => {
numNew ← savedNumNew;
lastNumNew ← savedLastNumNew;
};
END;
ENDLOOP;
};
END.