WalnutOpsParseLogImpl.mesa
Copyright © 1985 by Xerox Corporation. All rights reserved.
Willie-Sue, July 11, 1985 1:37:42 pm PDT
Donahue, August 5, 1985 2:46:06 pm PDT
Implementation of CheckInProgress and ParseLog (for newMail, readArchive and scavenge)
DIRECTORY
BasicTime USING [GMT, nullGMT],
FS USING[PagesForBytes],
IO,
Rope,
WalnutDB --using lots-- ,
WalnutDefs USING [Error, VersionMismatch],
WalnutKernelDefs USING [LogEntry, MsgLogEntry, WhichTempLog],
WalnutLog  --using lots-- ,
WalnutOps USING [dontCareDomainVersion, dontCareMsgSetVersion],
WalnutOpsInternal,
WalnutOpsMonitorImpl,
WalnutRoot USING [AcquireWriteLock, CommitAndContinue];
WalnutOpsParseLogImpl: CEDAR MONITOR LOCKS walnutOpsMonitorImpl
IMPORTS
FS, IO, Rope,
walnutOpsMonitorImpl: WalnutOpsMonitorImpl,
WalnutDB, WalnutDefs, WalnutLog,
WalnutOpsInternal, WalnutRoot
EXPORTS WalnutOpsInternal
SHARES WalnutOpsMonitorImpl
= BEGIN
Types
ROPE: TYPE = Rope.ROPE;
STREAM: TYPE = IO.STREAM;
GMT: TYPE = BasicTime.GMT;
Variables
parseCommitFrequency: INT = 60;
dontCareMsgSetVersion: INT = WalnutOps.dontCareMsgSetVersion;
dontCareDomainVersion: INT = WalnutOps.dontCareDomainVersion;
Procedures
Is some long running operation in progress
CheckInProgress: PUBLIC INTERNAL PROC = {
inProgressPos: INT;
parseInProgress: BOOL;
needsParse: BOOLFALSE;
Cip: INTERNAL PROC = {
IF (parseInProgress ← WalnutDB.GetParseLogInProgress[]) THEN {
WalnutRoot.AcquireWriteLock[]; RETURN };
inProgressPos ← WalnutDB.GetOpInProgressPos[];
IF inProgressPos <= 0 THEN RETURN;
WalnutRoot.AcquireWriteLock[];
};
Fip: INTERNAL PROC[inProgress: BOOL] = { -- FinishInProgress
at: INT;
wle: WalnutKernelDefs.LogEntry;
WalnutLog.SetIndex[inProgressPos];
[wle, at] ← WalnutLog.NextEntry[];
IF at = -1 THEN
ERROR WalnutDefs.Error[$log, $BadLog,
IO.PutFR["No entry at %g", IO.int[inProgressPos]]];
TRUSTED { WITH le: wle SELECT FROM
ExpungeMsgs => WalnutDB.ExpungeMsgs[dontCareMsgSetVersion];
WriteExpungeLog =>
[]← WalnutOpsInternal.DoLogExpunge[le.internalFileID];
EmptyMsgSet => []← WalnutDB.EmptyMsgSet[
[Rope.FromRefText[le.msgSet], dontCareMsgSetVersion] ];
DestroyMsgSet =>
[]← WalnutDB.DestroyMsgSet[
msgSet: [Rope.FromRefText[le.msgSet], dontCareMsgSetVersion],
msDomainVersion: dontCareDomainVersion
];
AcceptNewMail => {
WalnutDB.AcceptNewMail[at, dontCareMsgSetVersion];
WalnutOpsInternal.newMailSomewhere ← FALSE;
};
StartCopyNewMail => {
FinishCopyTempLog[newMail, at];
WalnutDB.SetCopyMailLogPos[0];
WalnutRoot.CommitAndContinue[];
WalnutLog.FinishTempLogCopy[newMail];
needsParse ← TRUE;
};
StartReadArchiveFile => {
WalnutLog.ResetLog[at];
WalnutDB.SetReadArchivePos[0];
};
StartCopyReadArchive => {
FinishCopyTempLog[readArchive, at];
WalnutDB.SetCopyReadArchivePos[0];
WalnutRoot.CommitAndContinue[];
WalnutLog.FinishTempLogCopy[readArchive];
needsParse ← TRUE;
};
ENDCASE =>
ERROR WalnutDefs.Error[$db, $ErroneousInProgress,
IO.PutFR["Entry at %g", IO.int[inProgressPos]]];
};  -- end trusted
WalnutDB.SetOpInProgressPos[-1];
WalnutRoot.CommitAndContinue[];
};
IF WalnutOpsInternal.errorInProgress THEN
ERROR WalnutDefs.Error[$db, $InternalError, "Must do Shutdown & Startup"];
IF WalnutOpsInternal.isShutdown THEN WalnutOpsInternal.Restart[];
WalnutOpsInternal.CarefullyApply[Cip, FALSE];
IF parseInProgress THEN {
WalnutOpsInternal.StatsReport["\n Continuing parseInProgress"];
[] ← ParseLog[TRUE];
RETURN
};
IF inProgressPos <= 0 THEN RETURN;
WalnutOpsInternal.StatsReport["\n Continuing longRunningOperation"];
WalnutOpsInternal.LongRunningApply[Fip];
IF needsParse THEN {
numNew: INT ← 0;
WalnutOpsInternal.CheckReport["Adding messages to the database\n"];
numNew ← ParseLog[TRUE];
IF numNew = 0 THEN
WalnutOpsInternal.CheckReport["No messages were new\n"]
ELSE WalnutOpsInternal.CheckReport[
IO.PutFR[" %g new messages\n", IO.int[numNew] ]];
};
};
FinishCopyTempLog: INTERNAL PROC[which: WalnutKernelDefs.WhichTempLog, at: INT] = {
logLen: INT ← WalnutLog.LogLength[];
startedCopyAt, startCopyPos, fromPos: INT;
pagesAlreadyCopied: INT;
IF WalnutLog.SetPosition[at] # 0 THEN
ERROR WalnutDefs.Error[$log, $BadLog, IO.PutFR["no entry at %g", IO.int[at]]];
startCopyPos ← WalnutLog.NextEntry[].at;  -- skip the copy entry
startedCopyAt ← WalnutLog.NextAt[];
fromPos ← logLen - startedCopyAt;
pagesAlreadyCopied ← FS.PagesForBytes[fromPos];
IF ~WalnutLog.PrepareToCopyTempLog[which: which, pagesAlreadyCopied: pagesAlreadyCopied, reportProc: WalnutOpsInternal.CheckReport]
THEN ERROR WalnutDefs.Error[$log, $DuringOpInProgress, "can't get tempLog for copy"];
WalnutOpsInternal.CheckReport[
IO.PutFR["\nContinuing copy of tempLog, starting at bytePos %g\n", IO.int[fromPos]]];
WalnutLog.CopyTempLog[which, startCopyPos, fromPos, WalnutOpsInternal.CheckReport];
WalnutOpsInternal.CheckReport["\n"];
WalnutDB.SetParseLogInProgress[TRUE];
WalnutDB.SetParseLogPos[WalnutDB.GetOpInProgressPos[]];
WalnutDB.SetOpInProgressPos[-1];
};
Parsing the Log - assumes the log is write-locked
ParseLog: PUBLIC INTERNAL PROC[verbose: BOOLFALSE] RETURNS[numNew: INT] = {
parseLogPos: INT;
at: INT;  -- position in log of entry just processed
lastNumNew, savedNumNew, savedLastNumNew: INT← 0;
wle: WalnutLog.LogEntry;
charsSkipped: INT;
errorRope: ROPE = "Can't find a log entry after pos %g (log length is %g)";
retries: INT← 2;
accepted: BOOL;
commitFrequency: INT ← parseCommitFrequency;
updatesSinceLastCommit: INT ← 0;
numNew ← 0;
DO
currentLogLength: INT← WalnutLog.LogLength[]; -- in case at end of Expunge
BEGIN ENABLE WalnutDefs.Error =>
IF code = $TransactionAbort THEN {
WalnutOpsInternal.StatsReport[" *** TransactionAborts during parseLog"];
IF ( retries ← retries - 1) > 0 THEN GOTO tryAgain;
WalnutOpsInternal.errorInProgress ← TRUE;
WalnutOpsInternal.StatsReport[" *** Too many TransactionAborts during parseLog"];
REJECT
}
ELSE {
WalnutOpsInternal.errorInProgress ← TRUE;
WalnutOpsInternal.StatsReport[
IO.PutFR["InternalError: %g during parseLog", IO.atom[code]]];
REJECT
};
parseLogPos ← WalnutDB.GetParseLogPos[];
IF parseLogPos = currentLogLength THEN RETURN[numNew];
charsSkipped ← WalnutLog.SetPosition[parseLogPos];
IF charsSkipped = -1 THEN
ERROR WalnutDefs.Error[$log, $BadLog,
IO.PutFR[errorRope, IO.int[parseLogPos], IO.int[currentLogLength]]];
at ← parseLogPos + charsSkipped;
make sure things are in good shape before starting
IF at # parseLogPos THEN {
WalnutDB.SetParseLogPos[at];
WalnutRoot.CommitAndContinue[];
};
accepted ← NOT WalnutDB.GetAddingServerMsgs[];
DO
previousAt: INT ← at;
[wle, at]← WalnutLog.NextEntry[];
IF at = -1 THEN {
charsSkipped← WalnutLog.SetPosition[previousAt+1];
IF charsSkipped = -1 THEN
ERROR WalnutDefs.Error[$log, $BadLog,
IO.PutFR[errorRope, IO.int[previousAt], IO.int[currentLogLength]]];
[wle, at]← WalnutLog.NextEntry[]
};
IF wle = NIL
THEN IF at = -1 THEN
ERROR WalnutDefs.Error[$log, $BadLog,
IO.PutFR[errorRope, IO.int[previousAt], IO.int[currentLogLength]]]
ELSE {    -- at#-1 AND wle=NIL => at end
WalnutDB.SetParseLogPos[-1];
WalnutDB.SetParseLogInProgress[FALSE];
WalnutRoot.CommitAndContinue[];
RETURN
};
IF (updatesSinceLastCommit← updatesSinceLastCommit + 1) >= commitFrequency THEN {
WalnutDB.SetParseLogPos[at];  -- at is beginning of current (unseen) entry
WalnutRoot.CommitAndContinue[];
updatesSinceLastCommit ← 0;
savedNumNew ← numNew;
savedLastNumNew ← lastNumNew;
};
BEGIN ENABLE WalnutDefs.VersionMismatch => GOTO continue;
TRUSTED { WITH le: wle SELECT FROM
LogFileInfo => NULL;
ExpungeMsgs => WalnutDB.ExpungeMsgs[dontCareMsgSetVersion];
WriteExpungeLog => {
WalnutDB.SetParseLogPos[-1];
WalnutDB.SetParseLogInProgress[FALSE];
WalnutDB.SetOpInProgressPos[at];  -- at is beginning of current entry
WalnutRoot.CommitAndContinue[];
[] ← WalnutOpsInternal.DoLogExpunge[le.internalFileID];
EXIT;   -- WriteExpungeLog had to be the last entry
};
CreateMsgSet =>
[] ← WalnutDB.CreateMsgSet[
Rope.FromRefText[le.msgSet], dontCareDomainVersion];
EmptyMsgSet =>
[] ← WalnutDB.EmptyMsgSet[
[Rope.FromRefText[le.msgSet], dontCareMsgSetVersion] ];
DestroyMsgSet =>
[] ← WalnutDB.DestroyMsgSet[
msgSet: [Rope.FromRefText[le.msgSet], dontCareMsgSetVersion],
msDomainVersion: dontCareDomainVersion];
CreateMsg => {
me: WalnutKernelDefs.MsgLogEntry ← NARROW[wle];
me.show ← accepted;
IF ~WalnutDB.AddNewMsg[me] THEN numNew ← numNew + 1;
};
AddMsg =>
[]← WalnutDB.AddMsg[
msg: Rope.FromRefText[le.msg],
from: [NIL, dontCareMsgSetVersion],
to: [Rope.FromRefText[le.to], dontCareMsgSetVersion] ];
RemoveMsg =>
[]← WalnutDB.RemoveMsg[
msg: Rope.FromRefText[le.msg],
from: [Rope.FromRefText[le.from], dontCareMsgSetVersion],
deletedVersion: dontCareMsgSetVersion ];
MoveMsg =>
[]← WalnutDB.MoveMsg[
msg: Rope.FromRefText[le.msg],
from: [ Rope.FromRefText[le.from], dontCareMsgSetVersion],
to: [Rope.FromRefText[le.to], dontCareMsgSetVersion] ];
HasBeenRead => WalnutDB.SetHasBeenRead[msg: Rope.FromRefText[le.msg]];
RecordNewMailInfo => {
WalnutDB.SetNewMailInfo[
le.logLen, le.when, Rope.FromRefText[le.server], le.num];
WalnutOpsInternal.newMailSomewhere ← TRUE;
};
StartCopyNewMail => {
WalnutDB.SetCopyMailLogPos[at];
WalnutDB.SetAddingServerMsgs[TRUE];
accepted ← FALSE;
};
EndCopyNewMailInfo => {
WalnutDB.SetCopyMailLogPos[0];
WalnutDB.SetNewMailInfo[0, BasicTime.nullGMT, NIL, 0];
WalnutDB.SetAddingServerMsgs[FALSE];
accepted ← TRUE;
};
AcceptNewMail => {
WalnutDB.AcceptNewMail[at, dontCareMsgSetVersion];
WalnutOpsInternal.newMailSomewhere ← FALSE;
};
StartReadArchiveFile => WalnutDB.SetReadArchivePos[at];
EndReadArchiveFile => WalnutDB.SetReadArchivePos[0];
StartCopyReadArchive =>
WalnutDB.SetCopyReadArchivePos[at];
EndCopyReadArchiveInfo => WalnutDB.SetCopyReadArchivePos[0];
ENDCASE => ERROR;
};
EXITS continue => NULL;
END;
IF verbose THEN
IF numNew # lastNumNew THEN
IF numNew MOD 10 = 0 THEN
IF numNew MOD 100 = 0 THEN
WalnutOpsInternal.CheckReport[IO.PutFR["(%g)", IO.int[numNew]]]
ELSE WalnutOpsInternal.CheckReport["~"];
lastNumNew← numNew;
ENDLOOP;
EXITS
tryAgain => {
numNew ← savedNumNew;
lastNumNew ← savedLastNumNew;
};
END;
ENDLOOP;
};
END.