WalnutOpsInternalImpl.mesa
Copyright Ó 1984, 1988, 1989, 1990, 1991, 1992 by Xerox Corporation. All rights reserved.
Willie-sue, April 30, 1992 11:21 am PDT
Donahue, July 16, 1985 2:25:35 pm PDT
Doug Terry, July 20, 1992 5:28 pm PDT
Swinehar, February 20, 1991 4:32 pm PST
Implementation of procedures of WalnutOpsInternal
DIRECTORY
BasicTime USING [GMT, nullGMT, Now],
FS USING [BytesForPages, Error, <<ErrorDesc,>> <<ErrorFromStream,>> PagesForBytes, StreamOpen],
IO,
LoganBerry USING [Error],
LoganBerryEntry USING [I2V],
Process USING [PauseMsec],
RefText USING [MaxLen, New, TrustTextAsRope],
Rope,
-- UserCredentials USING [Get],
TapFilter USING [Agent, CreateAgent, Error, IsAgentIdle, MonitorAgent, MonitorProc, ParseMsgIntoFields, WakeupAgent, msgID, seqNum],
TapMsgQueue USING [Create, Msg, Put],
UserProfile USING [Boolean, Token],
ViewerClasses USING [Viewer],
ViewerIO USING [CreateViewerStreams, GetViewerFromStream],
ViewerOps USING [FindViewer],
WalnutDB --using lots-- ,
WalnutDefs USING [CheckReportProc, dontCareMsgSetVersion, Error, LogInfo, MsgSet, VersionMismatch, WalnutOpsHandle],
WalnutKernelDefs USING [LogEntry, LogExpungePhase, MsgLogEntry, WhichTempLog],
WalnutLog  --using lots-- ,
WalnutLogExpunge  --using lots-- ,
WalnutMiscLog USING [CreateReadArchiveLog, GetNewMailLog, walnutItemFixedLength],
WalnutOps,
WalnutOpsInternal,
WalnutRegistryPrivate USING [NotifyForEvent],
WalnutRoot USING [AbortTransaction, AcquireWriteLock, CommitAndContinue, GetStreamsForExpunge, Open, RegisterStatsProc, RootHandle, RootHandleRec, Shutdown, StartTransaction, StatsReport, SwapLogs, UnregisterStatsProc],
WalnutStream USING [Open];
WalnutOpsInternalImpl: CEDAR PROGRAM
IMPORTS
BasicTime, FS, IO, LoganBerryEntry, Process, RefText, Rope, <<UserCredentials,>> UserProfile, ViewerIO, ViewerOps,
LoganBerry, TapFilter, TapMsgQueue,
WalnutDB, WalnutDefs,
WalnutLog, WalnutLogExpunge, WalnutMiscLog,
WalnutOps, WalnutOpsInternal, WalnutRegistryPrivate, WalnutRoot, WalnutStream
EXPORTS WalnutDefs, WalnutOpsInternal
= BEGIN OPEN WalnutOps, WalnutOpsInternal;
Types
ROPE: TYPE = Rope.ROPE;
STREAM: TYPE = IO.STREAM;
GMT: TYPE = BasicTime.GMT;
CheckReportProc: TYPE = WalnutDefs.CheckReportProc;
LogInfo: TYPE = WalnutDefs.LogInfo;
MsgSet: TYPE = WalnutDefs.MsgSet;
WalnutOpsHandle: TYPE = WalnutDefs.WalnutOpsHandle;
RootHandle: TYPE = WalnutRoot.RootHandle;
RootHandleRec: PUBLIC TYPE = WalnutRoot.RootHandleRec;
KernelHandle: TYPE = WalnutOpsInternal.KernelHandle;
KernelHandleRec: PUBLIC TYPE = WalnutOpsInternal.KernelHandleRec;
WhichTempLog: TYPE = WalnutKernelDefs.WhichTempLog;
OutCome: TYPE = { done, couldntRestart };
Variables
heavyhandedDebugging: BOOL ¬ FALSE;
debugging: BOOL ¬ FALSE;
DBErrorSeen: SIGNAL = CODE;
WDErrorSeen: SIGNAL = CODE;
defaultLogLength: INT ¬ FS.BytesForPages[10000]; -- five megabytes
expungeCommitFrequency: INT ¬ 200;
commitFrequency: INT ¬ expungeCommitFrequency;
shutdownFrequency: INT ¬ 1200;
parseCommitFrequency: INT ¬ 200;
dontCareMsgSetVersion: INT = WalnutOps.dontCareMsgSetVersion;
dontCareDomainVersion: INT = WalnutOps.dontCareDomainVersion;
feederSeqNum: INT ¬ 0;
Procedures
-- * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
StartStatsReporting: PUBLIC PROC[opsH: WalnutOpsHandle] = {
which: ROPE = Atom.GetPName[opsH.segmentID.segment];
which: ROPE = "WalnutLB";
kH: KernelHandle = opsH.kernelHandle;
statsFile: ROPEIO.PutFR["///Users/%g/%g.log", [rope[UserCredentials.Get[].name]], [rope[which]] ];
statsFile: ROPE ¬ "/tmp/WalnutLB.log";
IF kH.statsStream # NIL THEN kH.statsStream.Close[ ! IO.Error => CONTINUE];
kH.statsStream ¬ FS.StreamOpen[fileName: statsFile, accessOptions: $create, keep: 4];
IF heavyhandedDebugging THEN {
name: ROPE = Rope.Concat[which, " Stats"];
kH.statsProgressTS ¬ ViewerOps.FindViewer[name];
kH.statsProgressStream ¬ ViewerIO.CreateViewerStreams[name, kH.statsProgressTS].out;
IF kH.statsProgressTS = NIL THEN
kH.statsProgressTS ¬ ViewerIO.GetViewerFromStream[kH.statsProgressStream];
kH.statsProgressTS.inhibitDestroy ¬ TRUE;
};
WalnutRoot.RegisterStatsProc[opsH, StatsReport];
};
StatsReport: PUBLIC WalnutDefs.CheckReportProc = {
r: ROPE = "@ %g\n";
thyme: GMT = BasicTime.Now[];
strm: STREAM;
IF (strm ¬ opsH.kernelHandle.statsStream) = NIL THEN RETURN;
strm.PutRope["\n "];
strm.PutF[format, v1, v2, v3];
strm.PutF1[r, [time[thyme]] ];
IF (strm ¬ opsH.kernelHandle.statsProgressStream) # NIL THEN {
strm.PutRope["\n "];
strm.PutF[format, v1, v2, v3];
strm.PutF1[r, [time[thyme]] ];
};
};
Utilities
CarefullyApply: PUBLIC PROC[opsH: WalnutOpsHandle, proc: PROC[], didUpdate: BOOL] = {
reTryCount: INT ¬ 2;
schemaInvalid: BOOL ¬ TRUE;
eCode: ATOM;
exp: ROPE;
opsH.kernelHandle.recentActivity ¬ TRUE;
IF NOT opsH.kernelHandle.started THEN
ERROR WalnutDefs.Error[$root, $NotStarted, "Must do WalnutOps.Startup"];
DO
BEGIN ENABLE {
WalnutDefs.Error => {
IF debugging THEN SIGNAL WDErrorSeen;
opsH.kernelHandle.errorInProgress ¬ TRUE;
};
LoganBerry.Error => {
IF debugging THEN SIGNAL DBErrorSeen;
eCode ¬ ec;
exp ¬ explanation;
GOTO dbError
};
DB.Aborted => {
IF debugging THEN SIGNAL DBAbortedSeen;
GOTO aborted;
};
};
IF opsH.rootHandle.currentLog.readStream = NIL THEN
ERROR WalnutDefs.Error[$log, $LogNotOpen];
proc[];
IF didUpdate THEN WalnutRoot.CommitAndContinue[opsH];
RETURN;
EXITS
dbError => {
opsH.kernelHandle.errorInProgress ¬ TRUE;
WalnutRoot.StatsReport[opsH, "\n ***DB error - code: %g, exp: %g ",
[atom[eCode]], [rope[exp]] ];
ERROR WalnutDefs.Error[$db, eCode, exp];
};
aborted => {
WalnutRoot.StatsReport[opsH, "\n**** trans abort" ];
StatsReport[opsH, " **** TransactionAbort"];
IF ( reTryCount ← reTryCount - 1) < 0 THEN {
opsH.kernelHandle.errorInProgress ← TRUE;
StatsReport[opsH, " *** Too many TransactionAbort's during WalnutOps call"];
WalnutRoot.AbortTransaction[opsH];
ERROR WalnutDefs.Error[$log, $TransactionAbort];
};
};
END;
WalnutRoot.AbortTransaction[opsH];
IF (schemaInvalid ← WalnutRoot.StartTransaction[opsH]) THEN
 [] ← WalnutDB.InitSchema[opsH, schemaInvalid];
WalnutLog.OpenLogStreams[opsH];
ENDLOOP;
};
LongRunningApply: PUBLIC PROC[opsH: WalnutOpsHandle, proc: PROC[inProgress: BOOL]] = {
reTryCount: INT ¬ 2;
alreadyCalled: BOOL ¬ FALSE;
schemaInvalid: BOOL ¬ TRUE;
eCode: ATOM;
exp: ROPE;
opsH.kernelHandle.recentActivity ¬ TRUE;
opsH.kernelHandle.isShutdown ¬ FALSE;
IF NOT opsH.kernelHandle.started THEN
ERROR WalnutDefs.Error[$root, $NotStarted, "Must do WalnutOps.Startup"];
DO
BEGIN ENABLE {
WalnutDefs.Error => {
IF debugging THEN SIGNAL WDErrorSeen;
opsH.kernelHandle.errorInProgress ¬ TRUE;
};
LoganBerry.Error => {
IF debugging THEN SIGNAL DBErrorSeen;
eCode ¬ ec;
exp ¬ explanation;
GOTO dbError
};
DB.Aborted => GOTO aborted;
};
IF opsH.rootHandle.currentLog.readStream = NIL THEN
ERROR WalnutDefs.Error[$log, $LogNotOpen];
WalnutRoot.AcquireWriteLock[opsH];
proc[alreadyCalled
 ! WalnutDefs.VersionMismatch => { ReleaseWriteLock[opsH]; REJECT} ];
WalnutDB.SetOpInProgressPos[opsH, -1];
ReleaseWriteLock[opsH];  -- also does commit
RETURN;
EXITS
dbError => {
opsH.kernelHandle.errorInProgress ¬ TRUE;
WalnutRoot.StatsReport[opsH, "\n ***DB error - code: %g, exp: %g ",
[atom[eCode]], [rope[exp]] ];
ERROR WalnutDefs.Error[$db, eCode, exp];
};
aborted => {
WalnutRoot.StatsReport[opsH, "\n**** trans abort" ];
StatsReport[opsH, " **** TransactionAbort"];
IF ( reTryCount ← reTryCount - 1) < 0 THEN {
opsH.kernelHandle.errorInProgress ← TRUE;
StatsReport[opsH, " *** Too many TransactionAbort's during WalnutOps call"];
WalnutRoot.AbortTransaction[opsH];
ERROR WalnutDefs.Error[$log, $TransactionAbort];
};
};
END;
WalnutRoot.AbortTransaction[opsH];
IF (schemaInvalid ← WalnutRoot.StartTransaction[opsH]) THEN
[] ← WalnutDB.InitSchema[opsH, schemaInvalid];
WalnutLog.OpenLogStreams[opsH];
alreadyCalled ← WalnutDB.GetOpInProgressPos[opsH] > 0;
ENDLOOP;
};
Restart: PUBLIC PROC[opsH: WalnutOpsHandle] = { -- need to re-open the transaction
schemaInvalid: BOOL = WalnutRoot.StartTransaction[opsH];
IF schemaInvalid THEN [] ¬ WalnutDB.InitSchema[opsH, schemaInvalid];
WalnutLog.OpenLogStreams[opsH];
opsH.kernelHandle.isShutdown ¬ FALSE;
};
ReleaseWriteLock: PROC[opsH: WalnutOpsHandle] = {
WalnutRoot.CommitAndContinue[opsH];
WalnutLog.ReleaseWriteLock[opsH];
};
CheckReport: PUBLIC CheckReportProc = {
IF opsH.kernelHandle.reporterList = NIL THEN RETURN;
FOR rL: LIST OF STREAM ¬ opsH.kernelHandle.reporterList, rL.rest UNTIL rL= NIL DO
rL.first.PutF[format, v1, v2, v3];
ENDLOOP;
};
InternalShutdown: PUBLIC PROC[opsH: WalnutOpsHandle] = {
kH: KernelHandle = opsH.kernelHandle;
WalnutRoot.UnregisterStatsProc[opsH, StatsReport];
IF kH.errorInProgress THEN WalnutRoot.AbortTransaction[opsH];
WalnutLogExpunge.EndExpunge[opsH]; -- clear its variables
WalnutLog.ShutdownLog[opsH];
WalnutRoot.Shutdown[opsH];  -- takes care of database
StatsReport[opsH, "\n *** Shutdown"];
IF kH.statsProgressTS # NIL THEN { kH.statsProgressTS.inhibitDestroy ¬ FALSE };
IF kH.statsStream # NIL THEN { kH.statsStream.Close[]; kH.statsStream ¬ NIL };
IF kH.statsProgressStream # NIL THEN
{ kH.statsProgressStream.Close[]; kH.statsProgressStream ¬ NIL };
kH.started ¬ FALSE;
kH.recentActivity¬ FALSE;
kH.isShutdown ¬ TRUE;
kH.errorInProgress ¬ FALSE;
kH.mailStream ¬ NIL;
WalnutRegistryPrivate.NotifyForEvent[stopped];
};
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
ComputeMaxExpungeLogPages: PUBLIC PROC[opsH: WalnutOpsHandle] RETURNS[pages: INT] = {
bytesInDestroyedMsgs: INT = WalnutDB.GetExpungeInfo[opsH].bytesInDestroyedMsgs;
logLen: INT = WalnutLog.LogLength[opsH];
RETURN[FS.PagesForBytes[logLen-bytesInDestroyedMsgs]];
};
DoLogExpunge: PUBLIC PROC[opsH: WalnutOpsHandle] = {
The log is re-written, starting from the first file with deleted messages. Called from within a LongRunningApply; calling SetLogExpungePhase does a commit
DO
expPhase: WalnutKernelDefs.LogExpungePhase ¬ WalnutDB.GetLogExpungePhase[opsH];
SELECT expPhase FROM
idle => {
StatsReport[opsH, "\n ~~~ Starting log expunge"];
WalnutDB.SetLogExpungePhase[opsH, initializingExpungeLog];
WalnutRoot.CommitAndContinue[opsH];
commitFrequency ¬ expungeCommitFrequency;
};
initializingExpungeLog => {
expungeFileID: INT;
pagesNeeded: INT = ComputeMaxExpungeLogPages[opsH];
expungeFileID ¬ WalnutLogExpunge.StartExpunge[opsH, pagesNeeded];
WalnutDB.SetExpungeFileID[opsH, expungeFileID];
WalnutDB.SetLogExpungePhase[opsH, writingExpungeLog];
WalnutRoot.CommitAndContinue[opsH];
};
writingExpungeLog => {
[] ¬ WriteExpungeLog[opsH];
[] ¬ WalnutLogExpunge.EndExpunge[opsH];
WalnutDB.SetLogExpungePhase[opsH, swappingLogs];
WalnutRoot.CommitAndContinue[opsH];
};
swappingLogs => {
expungeFileID: INT ¬ WalnutDB.GetExpungeFileID[opsH];
date: BasicTime.GMT;
logLen: INT;
[] ¬ WalnutRoot.GetStreamsForExpunge[opsH: opsH, starting: FALSE, pagesWanted: -1];
StatsReport[opsH, "\n Swapping Log Files\n"];
WalnutDB.SetExpungeProgressInfo[opsH, 0,0];
WalnutDB.SetExpungeInfo[opsH, 0, 0];
WalnutDB.SetOpInProgressPos[opsH, -1];
WalnutDB.SetLogExpungePhase[opsH, idle];  -- whew
date ¬ BasicTime.Now[];
WalnutDB.SetTimeOfLastExpunge[opsH, date];
logLen ¬ WalnutRoot.SwapLogs[opsH, expungeFileID, date];
WalnutDB.SetRootFileVersion[opsH, opsH.rootHandle.createDate];
WalnutRoot.CommitAndContinue[opsH];
StatsReport[opsH, "\n ~~~ Finished expunge"];
WalnutLog.OpenLogStreams[opsH];
RETURN;
};
ENDCASE => ERROR;
ENDLOOP;
};
ExpungeLogResult: TYPE = { unknown, couldntRestart, finished };
WriteExpungeLog: PROC[opsH: WalnutOpsHandle] RETURNS[result: ExpungeLogResult] = {
called from within catch phrases, but must do its own error catching, since the expunge Log cannot be flushed after every write - that makes too much traffic on the alpine server;
reads the old log, starting at currentLogPos, copies entries to the expungeLog, changing msg entryStart pointers as it goes, doing ocasional commits of both the expungeLog and the database
numMsgs: INT ¬ 0;
currentLogPos, expungeLogPos: INT;
previousAt, at: INT;
startRetryCount: INT = 2;
retryCount: INT ¬ startRetryCount;
bytesToCopyBeforeFlush: INT ¬ FS.BytesForPages[500];
lastExpungeCommitLength: INT ¬ -1;
oldLength: INT = WalnutLog.LogLength[opsH];
DO
BEGIN ENABLE WalnutDefs.Error => {
IF debugging THEN SIGNAL WDErrorSeen;
IF code = $TransactionAbort THEN GOTO retry ELSE REJECT;
};
bytesCopiedSinceLastCommit, numMsgsSinceShutdown: INT ¬ 0;
lastAcceptNewMailPos: INT;
firstDestroyedMsgPos: INT ¬ WalnutDB.GetExpungeInfo[opsH].firstDestroyedMsgPos;
[currentLogPos, expungeLogPos] ¬ WalnutDB.GetExpungeProgressInfo[opsH];
IF lastExpungeCommitLength = expungeLogPos THEN {
IF (retryCount ¬ retryCount - 1) <= 0 THEN
ERROR WalnutDefs.Error[$log, $TooManyLogAborts,
IO.PutFR1["\n During Expunge - last commit pos in expungeLog was %g",
[integer[expungeLogPos]]] ];
}
ELSE {
lastExpungeCommitLength ¬ expungeLogPos;
retryCount ¬ startRetryCount;
};
IF ~WalnutLogExpunge.RestartExpunge[opsH, currentLogPos, expungeLogPos] THEN {
pagesNeeded: INT ¬ FS.PagesForBytes[WalnutLogExpunge.ExpLogLength[opsH] - WalnutDB.GetExpungeInfo[opsH].bytesInDestroyedMsgs];
CheckReport[opsH,
"\n ****Could not restart expunge from %g in expungeLog, %g in currentLog - restarting Log Expunge\n",
[integer[expungeLogPos]], [integer[currentLogPos]] ];
expungeLogPos ¬ currentLogPos ¬ 0;
WalnutDB.SetExpungeProgressInfo[opsH, currentLogPos, expungeLogPos]; -- commits
[] ¬ WalnutLogExpunge.StartExpunge[opsH, pagesNeeded];   -- try again
StatsReport[opsH, "Restarting expunge from beginning"];
IF ~WalnutLogExpunge.RestartExpunge[opsH, currentLogPos, expungeLogPos] THEN
WalnutDefs.Error[$log, $CantStartExpunge];
lastExpungeCommitLength ¬ expungeLogPos;
};
IF currentLogPos < firstDestroyedMsgPos THEN {  -- copyingHead phase
bytesLeftToCopyInHead: INT ¬ firstDestroyedMsgPos - currentLogPos;
bytesToCopyAtOneTime: INT = FS.BytesForPages[400];
IF currentLogPos = 0 THEN {
WalnutLogExpunge.SetIndex[opsH, 0];
[] ¬ WalnutLogExpunge.SkipEntry[opsH]; -- need to skip the LogFileInfo entry
bytesLeftToCopyInHead ¬
bytesLeftToCopyInHead - WalnutLogExpunge.GetIndex[opsH];
};
CheckReport[opsH,
"Copying the head of the log, starting at bytePos %g, ending at bytePos %g\n\t",
[integer[currentLogPos]], [integer[firstDestroyedMsgPos]] ];
DO
bytesToCopyThisTime: INT ¬ MIN[bytesLeftToCopyInHead, bytesToCopyAtOneTime];
WalnutLogExpunge.CopyBytesToExpungeLog[opsH, bytesToCopyThisTime];
[currentLogPos, expungeLogPos] ¬ WalnutLogExpunge.GetExpungeProgress[opsH];
WalnutDB.SetExpungeProgressInfo[opsH, currentLogPos, expungeLogPos];
WalnutRoot.CommitAndContinue[opsH];
CheckReport[opsH, "#"];
bytesLeftToCopyInHead ¬ bytesLeftToCopyInHead - bytesToCopyThisTime;
IF bytesLeftToCopyInHead = 0 THEN EXIT;
ENDLOOP;
retryCount ¬ startRetryCount;  -- in case has abort during copy
};
lastAcceptNewMailPos can't be before firstDestroyedMsgPos
lastAcceptNewMailPos ¬ WalnutDB.GetAcceptNewMailPos[opsH];
WalnutLogExpunge.SetIndex[opsH, at ¬ currentLogPos]; -- before doing PeekEntry
CheckReport[opsH,
"\nProcessing the tail of the log, starting at bytePos %g (old length %g bytes, %g pages)\n",
[integer[at]], [integer[oldLength]], [integer[FS.PagesForBytes[oldLength]]] ];
DO
ident: ATOM;
msgID: ROPE;
bytesThisCopy: INT ¬ 0;
previousAt ¬ at;
[ident, msgID, at] ¬ WalnutLogExpunge.PeekEntry[opsH];
IF ident = NIL THEN
IF at # -1 THEN EXIT  -- end of log
ELSE {
charSkipped: INT ¬ WalnutLogExpunge.SetPosition[opsH, previousAt+1];
IF charSkipped = -1 THEN
ERROR WalnutDefs.Error[$log, $DuringExpunge,
IO.PutFR1["No entry found after %g", [integer[previousAt]] ]];
at ¬ previousAt + charSkipped;
LOOP;
};
SELECT ident FROM
$ExpungeMsgs => [] ¬ WalnutLogExpunge.SkipEntry[opsH];
$WriteExpungeLog => { result ¬ finished; EXIT }; -- last entry on the Log
$LogFileInfo => [] ¬ WalnutLogExpunge.SkipEntry[opsH];
$RecordNewMailInfo, $StartCopyNewMail =>
IF at < lastAcceptNewMailPos THEN [] ¬ WalnutLogExpunge.SkipEntry[opsH]
ELSE bytesThisCopy ¬ WalnutLogExpunge.CopyEntry[opsH].bytesCopied;
$EndCopyNewMailInfo => {
IF at < lastAcceptNewMailPos THEN {  -- maybe skip
startCopyPos: INT ¬ WalnutLogExpunge.EndCopyEntry[opsH];
IF firstDestroyedMsgPos < startCopyPos THEN [] ¬ WalnutLogExpunge.SkipEntry[opsH]
ELSE bytesThisCopy ¬ WalnutLogExpunge.CopyEntry[opsH].bytesCopied;
}
ELSE bytesThisCopy ¬ WalnutLogExpunge.CopyEntry[opsH].bytesCopied;
};
$AcceptNewMail =>
IF at < lastAcceptNewMailPos THEN [] ¬ WalnutLogExpunge.SkipEntry[opsH]
ELSE {
newLogPos: INT;
[newLogPos, bytesThisCopy] ¬ WalnutLogExpunge.CopyEntry[opsH];
WalnutDB.SetAcceptNewMailPos[opsH, newLogPos];
};
$StartCopyReadArchive => [] ¬ WalnutLogExpunge.SkipEntry[opsH];
$EndCopyReadArchiveInfo => {
startCopyPos: INT ¬ WalnutLogExpunge.EndCopyEntry[opsH];
IF firstDestroyedMsgPos < startCopyPos THEN [] ¬ WalnutLogExpunge.SkipEntry[opsH]
ELSE bytesThisCopy ¬ WalnutLogExpunge.CopyEntry[opsH].bytesCopied;
};
ENDCASE =>
IF msgID.Length[] = 0 OR WalnutDB.MsgExists[opsH, msgID] THEN {
the next entry either has nothing to do with a message or it is an operation on a message that still exists
copy the entry to the new log
newLogPos: INT ;
[newLogPos, bytesThisCopy] ¬ WalnutLogExpunge.CopyEntry[opsH];
IF ident = $CreateMsg THEN {
it is the message body itself; record the change in position in the database
IF newLogPos # at THEN WalnutDB.SetMsgEntryPosition[opsH, newLogPos];
numMsgsSinceShutdown ¬ numMsgsSinceShutdown + 1;
IF (numMsgs ¬ numMsgs + 1) MOD 10 = 0 THEN
IF numMsgs MOD 100 = 0 THEN
CheckReport[opsH, "(%g) ", [integer[numMsgs]] ]
ELSE CheckReport[opsH, "."];
};
}
if you haven't consumed the entry by copying it, then just advance the log pointer
ELSE [] ¬ WalnutLogExpunge.SkipEntry[opsH];
IF numMsgsSinceShutdown >= shutdownFrequency THEN {
expLogPos: INT;
[currentLogPos, expLogPos] ¬ WalnutLogExpunge.GetExpungeProgress[opsH];
WalnutDB.SetExpungeProgressInfo[opsH, currentLogPos, expLogPos];
WalnutLogExpunge.EndExpunge[opsH];
ShortShutdown[opsH, 20, -1];
[] ¬ WalnutLogExpunge.RestartExpunge[opsH, currentLogPos, expLogPos];
lastExpungeCommitLength ¬ expLogPos;
bytesCopiedSinceLastCommit ¬ bytesThisCopy ¬ 0;
numMsgsSinceShutdown ¬ 0;
};
IF (bytesCopiedSinceLastCommit ¬ bytesCopiedSinceLastCommit + bytesThisCopy) >= bytesToCopyBeforeFlush THEN {
expLogPos: INT;
[currentLogPos, expLogPos] ¬ WalnutLogExpunge.GetExpungeProgress[opsH];
WalnutDB.SetExpungeProgressInfo[opsH, currentLogPos, expLogPos];
WalnutRoot.CommitAndContinue[opsH];
lastExpungeCommitLength ¬ expLogPos;
bytesCopiedSinceLastCommit ¬ 0;
};
ENDLOOP;
Commit the last batch of updates
IF bytesCopiedSinceLastCommit > 0 THEN {
[currentLogPos, expungeLogPos] ¬ WalnutLogExpunge.GetExpungeProgress[opsH];
WalnutDB.SetExpungeProgressInfo[opsH, currentLogPos, expungeLogPos];
WalnutRoot.CommitAndContinue[opsH];
lastExpungeCommitLength ¬ expungeLogPos;
};
RETURN;
EXITS
retry => {
schemaInvalid: BOOL ¬ TRUE;
WalnutLogExpunge.EndExpunge[opsH];
WalnutRoot.AbortTransaction[opsH];
schemaInvalid ¬ WalnutRoot.StartTransaction[opsH];
IF schemaInvalid THEN [] ¬ WalnutDB.InitSchema[opsH, schemaInvalid];
WalnutLog.OpenLogStreams[opsH];
};
END;
ENDLOOP;
};
Is some long running operation in progress
CheckInProgress: PUBLIC PROC[opsH: WalnutOpsHandle] = {
inProgressPos: INT ¬ 0;
parseInProgress: BOOL ¬ FALSE;
needsParse: BOOL ¬ FALSE;
doingLogExpunge: BOOL ¬ FALSE;
Cip: PROC = {
IF (parseInProgress ¬ WalnutDB.GetParseLogInProgress[opsH]) THEN {
WalnutRoot.AcquireWriteLock[opsH]; RETURN };
inProgressPos ¬ WalnutDB.GetOpInProgressPos[opsH];
doingLogExpunge ¬ ( WalnutDB.GetLogExpungePhase[opsH] # idle );
IF inProgressPos <= 0 THEN RETURN;
WalnutRoot.AcquireWriteLock[opsH];
};
Fip: PROC[inProgress: BOOL] = { -- FinishInProgress
at: INT;
wle: WalnutKernelDefs.LogEntry;
WalnutLog.SetIndex[opsH, inProgressPos];
[wle, at] ¬ WalnutLog.NextEntry[opsH];
IF at = -1 THEN
ERROR WalnutDefs.Error[$log, $BadLog,
IO.PutFR1["No entry at %g", [integer[inProgressPos]]] ];
TRUSTED { WITH le: wle SELECT FROM
ExpungeMsgs =>
WalnutDB.ExpungeMsgs[opsH, dontCareMsgSetVersion, CheckReport];
WriteExpungeLog => DoLogExpunge[opsH];
EmptyMsgSet => []¬ WalnutDB.EmptyMsgSet[
opsH,
[ le.msgSet, dontCareMsgSetVersion ],
WalnutOpsInternal.CheckReport ];
DestroyMsgSet =>
[]¬ WalnutDB.DestroyMsgSet[
opsH: opsH,
msgSet: [ le.msgSet, dontCareMsgSetVersion ],
msDomainVersion: dontCareDomainVersion,
report: WalnutOpsInternal.CheckReport
];
AcceptNewMail => {
WalnutDB.AcceptNewMail[opsH, at, dontCareMsgSetVersion];
opsH.kernelHandle.newMailSomewhere ¬ FALSE;
};
StartCopyNewMail => {
FinishCopyTempLog[opsH, newMail, at];
WalnutDB.SetCopyMailLogPos[opsH, 0];
WalnutRoot.CommitAndContinue[opsH];
WalnutLog.FinishTempLogCopy[opsH, newMail];
needsParse ¬ TRUE;
};
StartReadArchiveFile => {
WalnutLog.ResetLog[opsH, at];
WalnutDB.SetReadArchivePos[opsH, 0];
};
StartCopyReadArchive => {
FinishCopyTempLog[opsH, readArchive, at];
WalnutDB.SetCopyReadArchivePos[opsH, 0];
WalnutRoot.CommitAndContinue[opsH];
WalnutLog.FinishTempLogCopy[opsH, readArchive];
needsParse ¬ TRUE;
};
ENDCASE =>
ERROR WalnutDefs.Error[$db, $ErroneousInProgress,
IO.PutFR1["Entry at %g", [integer[inProgressPos]]] ];
};  -- end trusted
WalnutDB.SetOpInProgressPos[opsH, -1];
WalnutRoot.CommitAndContinue[opsH];
};
IF opsH.kernelHandle.errorInProgress THEN
ERROR WalnutDefs.Error[$db, $InternalError, "Must do Shutdown & Startup"];
IF opsH.kernelHandle.isShutdown THEN {
IF opsH.rootHandle = NIL THEN [] ¬ WalnutRoot.Open[opsH];
Restart[opsH];
};
WalnutOpsInternal.CarefullyApply[opsH, Cip, FALSE];
IF parseInProgress THEN {
StatsReport[opsH, "\n Continuing parseInProgress"];
[] ¬ ParseLog[opsH, TRUE];
RETURN
};
IF inProgressPos <= 0 THEN RETURN;
StatsReport[opsH, "\n Continuing longRunningOperation"];
WalnutOpsInternal.LongRunningApply[opsH, Fip];
IF needsParse THEN {
numNew: INT ¬ 0;
CheckReport[opsH, "Adding messages to the database\n"];
numNew ¬ ParseLog[opsH, FALSE];
IF numNew = 0 THEN
CheckReport[opsH, "No messages were new\n"]
ELSE CheckReport[opsH,
" %g new messages\n", [integer[numNew]] ];
};
};
FinishCopyTempLog: PROC[opsH: WalnutOpsHandle, which: WhichTempLog, at: INT] = {
logLen: INT ¬ WalnutLog.LogLength[opsH];
startedCopyAt, startCopyPos, fromPos: INT;
pagesAlreadyCopied: INT;
IF WalnutLog.SetPosition[opsH, at] # 0 THEN
ERROR WalnutDefs.Error[$log, $BadLog, IO.PutFR1["no entry at %g", [integer[at]]] ];
startCopyPos ¬ WalnutLog.NextEntry[opsH].at;  -- skip the copy entry
startedCopyAt ¬ WalnutLog.NextAt[opsH];
fromPos ¬ logLen - startedCopyAt;
pagesAlreadyCopied ¬ FS.PagesForBytes[fromPos];
IF ~WalnutLog.PrepareToCopyTempLog[opsH: opsH, which: which, pagesAlreadyCopied: pagesAlreadyCopied, reportProc: WalnutOpsInternal.CheckReport]
THEN ERROR WalnutDefs.Error[$log, $DuringOpInProgress, "can't get tempLog for copy"];
CheckReport[opsH,
"\nContinuing copy of tempLog, starting at bytePos %g\n", [integer[fromPos]] ];
WalnutLog.CopyTempLog[opsH, which, startCopyPos, fromPos, CheckReport];
WalnutDB.SetParseLogInProgress[opsH, TRUE];
WalnutDB.SetParseLogPos[opsH, WalnutDB.GetOpInProgressPos[opsH]];
WalnutDB.SetOpInProgressPos[opsH, -1];
};
Parsing the Log - assumes the log is write-locked
ParseLog: PUBLIC PROC[opsH: WalnutOpsHandle, doingScavenge: BOOL ¬ FALSE]
RETURNS[numNew: INT] = {
parseLogPos: INT;
at: INT;  -- position in log of entry just processed
lastNumNew, savedNumNew, savedLastNumNew: INT ¬ 0;
wle: WalnutLog.LogEntry;
charsSkipped: INT;
errorRope: ROPE = "Can't find a log entry after pos %g (log length is %g)";
retries: INT ¬ 2;
accepted: BOOL;
giveStartInfo: BOOL ¬ TRUE;
commitFrequency: INT ¬ parseCommitFrequency;
updatesSinceLastCommit, numSinceShutdown: INT ¬ 0;
numNew ¬ 0;
DO
currentLogLength: INT ¬ WalnutLog.LogLength[opsH]; -- in case at end of Expunge
BEGIN ENABLE BEGIN
WalnutDefs.Error => {
IF debugging THEN SIGNAL WDErrorSeen;
IF code = $TransactionAbort THEN {
StatsReport[opsH, " *** TransactionAborts during parseLog"];
IF ( retries ¬ retries - 1) > 0 THEN GOTO tryAgain;
opsH.kernelHandle.errorInProgress ¬ TRUE;
StatsReport[opsH, " *** Too many TransactionAborts during parseLog"];
REJECT
}
ELSE {
opsH.kernelHandle.errorInProgress ¬ TRUE;
StatsReport[opsH, "InternalError: %g during parseLog", [atom[code]] ];
REJECT
};
};
END;
parseLogPos ¬ WalnutDB.GetParseLogPos[opsH];
IF giveStartInfo THEN {
IF doingScavenge THEN numNew ¬ WalnutDB.SizeOfDatabase[opsH].messages
ELSE numNew ¬ 0;
IF doingScavenge AND numNew # 0 THEN
CheckReport[opsH,
" Parsing the log - database has %g messages so far\n", [integer[numNew]] ];
giveStartInfo ¬ FALSE;
};
IF ( parseLogPos < 0 ) OR ( parseLogPos = currentLogLength ) THEN RETURN[numNew];
charsSkipped ¬ WalnutLog.SetPosition[opsH, parseLogPos];
IF charsSkipped = -1 THEN
ERROR WalnutDefs.Error[$log, $BadLog,
IO.PutFR[errorRope, [integer[parseLogPos]], [integer[currentLogLength]]] ];
at ¬ parseLogPos + charsSkipped;
make sure things are in good shape before starting
IF at # parseLogPos THEN {
WalnutDB.SetParseLogPos[opsH, at];
WalnutRoot.CommitAndContinue[opsH];
};
accepted ¬ NOT WalnutDB.GetAddingServerMsgs[opsH];
DO
previousAt: INT ¬ at;
[wle, at] ¬ WalnutLog.NextEntry[opsH];
IF wle = NIL THEN {
maybeBadAt: INT = at;
charsSkipped ¬ WalnutLog.SetPosition[opsH, previousAt+1];
IF charsSkipped = -1 THEN
ERROR WalnutDefs.Error[$log, $BadLog,
IO.PutFR[errorRope, [integer[previousAt]], [integer[currentLogLength]]] ];
[wle, at] ¬ WalnutLog.NextEntry[opsH];
IF wle = NIL THEN
ERROR WalnutDefs.Error[$log, $BadLog,
IO.PutFR[errorRope, [integer[previousAt]], [integer[currentLogLength]]] ];
CheckReport[opsH,
"\n*** Bad log entry at or near %g or %g; do a ScanWalnutLog sometime soon\n",
[integer[previousAt]], [integer[at]] ];
};
IF numSinceShutdown >= shutdownFrequency THEN {
WalnutDB.SetParseLogPos[opsH, at]; -- at is beginning of current (unseen) entry
ShortShutdown[opsH, 20, at];
updatesSinceLastCommit ¬ 0;
savedNumNew ¬ numNew;
savedLastNumNew ¬ lastNumNew;
numSinceShutdown ¬ 0;
};
IF (updatesSinceLastCommit ¬ updatesSinceLastCommit+1) >= commitFrequency THEN {
WalnutDB.SetParseLogPos[opsH, at]; -- at is beginning of current (unseen) entry
WalnutRoot.CommitAndContinue[opsH];
updatesSinceLastCommit ¬ 0;
savedNumNew ¬ numNew;
savedLastNumNew ¬ lastNumNew;
};
BEGIN ENABLE WalnutDefs.VersionMismatch => GOTO continue;
TRUSTED { WITH le: wle SELECT FROM
LogFileInfo => NULL;
ExpungeMsgs => {
if we don't SetParseLogPos & Commit here, we might delete some messages, commit and later get an error, parse the log from an earlier point, and add more to bytesInDestroyedMsgs
WalnutDB.SetParseLogPos[opsH, at];
WalnutRoot.CommitAndContinue[opsH];
CheckReport[opsH, "\nStarting ExpungeMsgs "];
WalnutDB.ExpungeMsgs[opsH, dontCareMsgSetVersion, WalnutOpsInternal.CheckReport];
WalnutRoot.CommitAndContinue[opsH];
CheckReport[opsH, " ... finished ExpungeMsgs\n"];
};
WriteExpungeLog => {
CheckReport[opsH, "\nStarting WriteExpungeLog "];
WalnutDB.SetParseLogPos[opsH, -1];
WalnutDB.SetParseLogInProgress[opsH, FALSE];
WalnutDB.SetOpInProgressPos[opsH, at]; -- at is beginning of current entry
WalnutRoot.CommitAndContinue[opsH];
[] ¬ DoLogExpunge[opsH];
CheckReport[opsH, " ... finished WriteExpungeLog\n"];
EXIT;  -- WriteExpungeLog had to be the last entry
};
EndOfLog => {
WalnutDB.SetParseLogPos[opsH, -1];
WalnutDB.SetParseLogInProgress[opsH, FALSE];
WalnutRoot.CommitAndContinue[opsH];
RETURN
};
CreateMsgSet =>
[] ¬ WalnutDB.CreateMsgSet[opsH, le.msgSet, dontCareDomainVersion];
EmptyMsgSet => {
[] ¬ WalnutDB.EmptyMsgSet[
opsH,
[le.msgSet, dontCareMsgSetVersion],
WalnutOpsInternal.CheckReport
];
};
DestroyMsgSet => {
[] ¬ WalnutDB.DestroyMsgSet[
opsH: opsH,
msgSet: [le.msgSet, dontCareMsgSetVersion],
msDomainVersion: dontCareDomainVersion,
report: WalnutOpsInternal.CheckReport
];
};
CreateMsg => {
me: WalnutKernelDefs.MsgLogEntry = NARROW[wle];
me.show ¬ accepted;
IF me.msg # NIL THEN {
IF ~WalnutDB.AddNewMsg[opsH, me] THEN numNew ¬ numNew + 1;
numSinceShutdown ¬ numSinceShutdown + 1;
};
};
AddMsg =>
IF le.msg # NIL THEN []¬ WalnutDB.AddMsg[
opsH: opsH,
msg: le.msg,
from: [NIL, dontCareMsgSetVersion],
to: [le.to, dontCareMsgSetVersion] ];
RemoveMsg =>
IF le.msg # NIL THEN []¬ WalnutDB.RemoveMsg[
opsH: opsH,
msg: le.msg,
from: [le.from, dontCareMsgSetVersion],
deletedVersion: dontCareMsgSetVersion ];
MoveMsg =>
IF le.msg # NIL THEN []¬ WalnutDB.MoveMsg[
opsH: opsH,
msg: le.msg,
from: [ le.from, dontCareMsgSetVersion],
to: [le.to, dontCareMsgSetVersion] ];
HasBeenRead => IF le.msg # NIL THEN WalnutDB.SetHasBeenRead[opsH, le.msg];
RecordNewMailInfo => {
WalnutDB.SetNewMailInfo[
opsH, le.logLen, le.when, le.server, le.num];
opsH.kernelHandle.newMailSomewhere ¬ TRUE;
};
StartCopyNewMail => {
WalnutDB.SetCopyMailLogPos[opsH, at];
WalnutDB.SetAddingServerMsgs[opsH, TRUE];
accepted ¬ FALSE;
};
EndCopyNewMailInfo => {
WalnutDB.SetCopyMailLogPos[opsH, 0];
WalnutDB.SetNewMailInfo[opsH, 0, BasicTime.nullGMT, NIL, 0];
WalnutDB.SetAddingServerMsgs[opsH, FALSE];
accepted ¬ TRUE;
};
AcceptNewMail => {
WalnutDB.AcceptNewMail[opsH, at, dontCareMsgSetVersion];
opsH.kernelHandle.newMailSomewhere ¬ FALSE;
};
StartReadArchiveFile => WalnutDB.SetReadArchivePos[opsH, at];
EndReadArchiveFile => WalnutDB.SetReadArchivePos[opsH, 0];
StartCopyReadArchive => WalnutDB.SetCopyReadArchivePos[opsH, at];
EndCopyReadArchiveInfo => WalnutDB.SetCopyReadArchivePos[opsH, 0];
ENDCASE => ERROR;
};
EXITS continue => NULL;
END;
IF numNew # lastNumNew THEN
IF numNew MOD 10 = 0 THEN
IF numNew MOD 100 = 0 THEN
CheckReport[opsH, "(%g) ", [integer[numNew]] ]
ELSE CheckReport[opsH, "~"];
lastNumNew ¬ numNew;
ENDLOOP;
EXITS
tryAgain => {
schemaInvalid: BOOL ¬ TRUE;
numNew ¬ savedNumNew;
lastNumNew ¬ savedLastNumNew;
WalnutRoot.AbortTransaction[opsH];
schemaInvalid ¬ WalnutRoot.StartTransaction[opsH, TRUE];
IF schemaInvalid THEN [] ¬ WalnutDB.InitSchema[opsH, schemaInvalid];
WalnutLog.OpenLogStreams[opsH];
giveStartInfo ¬ TRUE;
};
END;
ENDLOOP;
};
ShortShutdown: PROC[opsH: WalnutOpsHandle, sec: INT, logPos: INT] = {
WalnutLog.ShutdownLog[opsH]; -- this does a close transaction as well
Process.PauseMsec[sec*1000];
[] ¬ WalnutRoot.StartTransaction[opsH, TRUE];
WalnutLog.OpenLogStreams[opsH];
IF logPos >= 0 THEN [] ¬ WalnutLog.SetPosition[opsH, logPos];
};
DoStartNewMail: PUBLIC PROC[opsH: WalnutOpsHandle] RETURNS[STREAM] = {
newMailLogLength: INT ¬ -1;
expungeInProgress: BOOL ¬ FALSE;
Gnml: PROC = {
newMailLogLength ¬ WalnutDB.GetNewMailLogLength[opsH];
expungeInProgress ¬ WalnutDB.GetLogExpungePhase[opsH] # idle;
};
CheckInProgress[opsH];
IF opsH.kernelHandle.mailStream # NIL THEN RETURN[NIL];
CarefullyApply[opsH, Gnml, FALSE];
IF newMailLogLength = -1 OR expungeInProgress THEN RETURN[NIL];
RETURN[opsH.kernelHandle.mailStream ¬ WalnutMiscLog.GetNewMailLog[opsH, newMailLogLength, 200]];
};
DoAcceptNewMail: PUBLIC PROC[opsH: WalnutOpsHandle, activeVersion: INT] = {
Anm: PROC[inProgress: BOOL] = {
at: INT;
IF ~inProgress THEN {
[] ¬ WalnutDB.VerifyMsgSet[opsH, [ActiveName, activeVersion]];
at ¬ WalnutLog.AcceptNewMail[opsH].at;
WalnutDB.SetOpInProgressPos[opsH, at];
}
ELSE at ¬ WalnutDB.GetAcceptNewMailPos[opsH];
WalnutDB.AcceptNewMail[opsH, at, activeVersion];
};
CheckInProgress[opsH];
LongRunningApply[opsH, Anm];
opsH.kernelHandle.newMailSomewhere ¬ FALSE;
};
DoNewMail: PUBLIC PROC[
opsH: WalnutOpsHandle, activeVersion: INT,
proc: PROC[msg, tocEntry: ROPE, startOfSubject: INT]]
RETURNS[responses: LIST OF WalnutOps.ServerInfo, complete: BOOL] = {
someEntries: BOOL ¬ FALSE;
Cml: PROC = {
IF activeVersion # WalnutDefs.dontCareMsgSetVersion THEN
[] ¬ WalnutDB.VerifyMsgSet[opsH, [ActiveName, activeVersion] ];
someEntries ¬ WalnutDB.GetNewMailLogLength[opsH] # 0;
};
Cml2: PROC[inProgress: BOOL] = {
fromPos: INT ¬ 0;
complete ¬ WalnutLog.PrepareToCopyTempLog[opsH: opsH, which: newMail, pagesAlreadyCopied: 0, reportProc: CheckReport];
IF ~complete THEN RETURN;
IF ~inProgress THEN {
at: INT ¬ WalnutLog.StartCopyNewMail[opsH].at;
WalnutDB.SetOpInProgressPos[opsH, at];
WalnutDB.SetCopyMailLogPos[opsH, at];
}
ELSE {
at: INT = WalnutDB.GetCopyMailLogPos[opsH];
CheckReport[opsH, "\n Continue copying the newMailLog\n"];
[] ¬ WalnutLog.SetPosition[opsH, at];
[] ¬ WalnutLog.NextEntry[opsH]; -- skip the copy entry
fromPos ¬ WalnutLog.LogLength[opsH] - WalnutLog.NextAt[opsH];
};
WalnutDB.SetAddingServerMsgs[opsH, TRUE];
WalnutLog.CopyTempLog[opsH, newMail, WalnutDB.GetCopyMailLogPos[opsH], fromPos, CheckReport];
raises error if problem
WalnutDB.SetParseLogInProgress[opsH, TRUE];
WalnutDB.SetParseLogPos[opsH, WalnutDB.GetOpInProgressPos[opsH]];
WalnutDB.SetOpInProgressPos[opsH, -1];
};
Gnm: PROC = {
IF proc # NIL THEN WalnutDB.EnumerateUnacceptedMsgs[opsH, activeVersion, proc];
responses ¬ WalnutDB.EnumerateServers[opsH];
};
CheckInProgress[opsH];
CarefullyApply[opsH, Cml, FALSE];
IF opsH.kernelHandle.mailStream # NIL THEN RETURN[NIL, FALSE]; -- file is busy
complete ¬ TRUE;
IF someEntries THEN {
LongRunningApply[opsH, Cml2];
IF complete THEN {
WalnutLog.OpenLogStreams[opsH];
CheckReport[opsH,
"Adding new mail to the database @ %g\n", [time[BasicTime.Now[]]] ];
[] ¬ ParseLog[opsH, FALSE]; -- "see" messages
[] ¬ FilterMessages[opsH, activeVersion];
};
};
CarefullyApply[opsH, Gnm, FALSE];
};
TapRef: TYPE = REF TapFilterRep;
TapFilterRep: TYPE = RECORD[opsH: WalnutOpsHandle, annotations: INT ¬ 0];
ReportProgress: TapFilter.MonitorProc = {
[msgID: ROPE, msg: TapMsgQueue.Msg, filterID: ROPE, annot: TapFilter.Annotation, clientData: REF ANY] RETURNS [doIt: BOOLEAN ← TRUE]
tR: TapRef ¬ NARROW[clientData];
CheckReport[tR.opsH, "@"];
tR.annotations ¬ tR.annotations + 1;
};
FilterMessages: PROC[opsH: WalnutOpsHandle, activeVersion: INT]
RETURNS [annotations: INT ¬ 0] ~ {
ENABLE TapFilter.Error => {
CheckReport[opsH, "Problem with filtering agent: %g - %g.\n", IO.atom[ec], IO.rope[explanation]];
CONTINUE;
};
GetMsgText: PROC [opsHandle: WalnutOpsHandle, msg: ROPE] RETURNS [ROPE] = {
Adapted from WalnutOpsImpl.GetMsgText.
textStart, textLen: INT;
contents: REF TEXT;
[textStart, textLen, ] ¬ WalnutDB.GetMsgText[opsHandle, msg];
IF textStart = 0 THEN RETURN[NIL];
IF textLen > RefText.MaxLen-8 THEN textLen ¬ RefText.MaxLen-8;
contents ¬ RefText.New[textLen];
WalnutLog.GetRefTextFromLog[opsHandle, textStart, textLen, contents];
RETURN[RefText.TrustTextAsRope[contents]];
};
PassToFilterAgent: PROC[msg, tocEntry: ROPE, startOfSubject: INT] = {
parsedMsg: TapMsgQueue.Msg;
contents: ROPE ¬ GetMsgText[opsH, msg];
parsedMsg ¬ TapFilter.ParseMsgIntoFields[contents];
feederSeqNum ¬ feederSeqNum + 1;
parsedMsg ¬ CONS[[TapFilter.seqNum, LoganBerryEntry.I2V[feederSeqNum]], parsedMsg];
parsedMsg ¬ CONS[[TapFilter.msgID, msg], parsedMsg];
TapMsgQueue.Put[parsedMsg, opsH.filterFeeder];
};
Fm: PROC = {
Get user profile info.
wantAnnotations: BOOLEAN ¬ UserProfile.Boolean[key: "WallTapestry.AnnotateNewMail", default: FALSE];
filterDBName: ROPE ¬ UserProfile.Token[key: "WallTapestry.FilterDB", default: NIL];
annotationDBName: ROPE ¬ UserProfile.Token[key: "WallTapestry.AnnotationDB", default: NIL];
filteringAgent: TapFilter.Agent ¬ opsH.filterAgent;
IF NOT wantAnnotations OR filterDBName = NIL OR annotationDBName = NIL THEN RETURN;
Create filtering agent if necessary.
IF filteringAgent = NIL THEN {
opsH.filterFeeder ¬ TapMsgQueue.Create[];
filteringAgent ¬ opsH.filterAgent ¬ TapFilter.CreateAgent[feeder: opsH.filterFeeder, filterDB: filterDBName, user: NIL, annotDB: annotationDBName];
IF filteringAgent = NIL THEN RETURN;
};
Parse msgs and place on filter queue.
tR ¬ NEW[TapFilterRep ¬ [opsH, 0]];
CheckReport[opsH, "\nAnnotating messages: "];
TapFilter.MonitorAgent[agent: filteringAgent, proc: ReportProgress, clientData: tR];
WalnutDB.EnumerateUnacceptedMsgs[opsH, activeVersion, PassToFilterAgent];
Wakeup agent to filter messages and wait until it is finished.
TapFilter.WakeupAgent[filteringAgent];
[] ¬ TapFilter.IsAgentIdle[agent: filteringAgent, wait: TRUE];
CheckReport[opsH, " done.\n"];
TapFilter.MonitorAgent[agent: filteringAgent, proc: NIL, clientData: NIL];
};
tR: TapRef;
CarefullyApply[opsH, Fm, FALSE];
RETURN[IF tR # NIL THEN tR.annotations ELSE 0];
};
DoReadArchiveFile: PUBLIC PROC[opsH: WalnutOpsHandle, file: ROPE, msgSet: MsgSet]
RETURNS[numNew: INT] = {
Write a "readArchiveFile" log entry; if the archiveFile exists, parses it and writes the appropriate entries (new msgs and moves if msgSet is not "Active" (NIL defaults to categories specified in the file). Then replays the log. If the file couldn't be read, numNew = -1;
ENABLE UNWIND => NULL;
ok: BOOL ¬ FALSE;
fStream: STREAM;
reason: ROPE ¬ NIL;
Raf: PROC = {
at: INT;
IF msgSet.name # NIL THEN [] ¬ WalnutDB.VerifyMsgSet[opsH, msgSet];
at ¬ WalnutLog.StartReadArchiveFile[opsH, file, msgSet.name].at;
WalnutDB.SetReadArchivePos[opsH, at];
};
Raf2: PROC = {
[] ¬ WalnutLog.EndReadArchiveFile[opsH];
WalnutDB.SetReadArchivePos[opsH, 0];
};
Caf: PROC[inProgress: BOOL] = {
fromPos: INT ¬ 0;
IF ~inProgress THEN {
at: INT;
ok ¬ WalnutLog.PrepareToCopyTempLog[ opsH: opsH, which: readArchive, pagesAlreadyCopied: 0, reportProc: WalnutOpsInternal.CheckReport];
IF ~ok THEN RETURN;
at ¬ WalnutLog.StartCopyReadArchive[opsH].at;
WalnutDB.SetCopyReadArchivePos[opsH, at];
WalnutDB.SetOpInProgressPos[opsH, at];
WalnutRoot.CommitAndContinue[opsH];
}
ELSE {  -- calculate fromPos
logLen: INT ¬ WalnutLog.LogLength[opsH];
startedCopyAt, startCopyPos: INT;
startCopyPos ¬ WalnutDB.GetCopyReadArchivePos[opsH];
IF WalnutLog.SetPosition[opsH, startCopyPos] # 0 THEN
ERROR WalnutDefs.Error[$log, $BadLog, IO.PutFR1["no entry at %g",
[integer[startCopyPos]]] ];
[] ¬ WalnutLog.NextEntry[opsH];   -- skip the copy entry
startedCopyAt ¬ WalnutLog.NextAt[opsH];
fromPos ¬ logLen - startedCopyAt;
};
CheckReport[opsH,
"Copying the ReadArchiveTempLog, starting at bytePos %g\n", [integer[fromPos]]];
WalnutLog.CopyTempLog[opsH,
readArchive, WalnutDB.GetCopyReadArchivePos[opsH], fromPos, CheckReport];
WalnutDB.SetParseLogInProgress[opsH, TRUE];
WalnutDB.SetParseLogPos[opsH, WalnutDB.GetOpInProgressPos[opsH]];
WalnutDB.SetOpInProgressPos[opsH, -1];
};
CheckInProgress[opsH];
fStream ¬ WalnutStream.Open[name: file, readOnly: TRUE ! FS.Error =>
{ CheckReport[opsH, error.explanation]; fStream ¬ NIL; CONTINUE} ].strm;
IF fStream = NIL THEN RETURN[-1];
CarefullyApply[opsH, Raf, TRUE];
BEGIN ENABLE BEGIN
FS.Error => { reason ¬ error.explanation; GOTO exit };
IO.Error => {
<<reason ¬ FS.ErrorFromStream[stream].explanation;>>
IF reason = NIL THEN reason ¬ "IO Error creating readArchiveLog";
GOTO exit
};
END;
[ok, reason] ¬
WalnutMiscLog.CreateReadArchiveLog[opsH, fStream, msgSet.name, CheckReport];
EXITS
exit => ok ¬ FALSE;
END;
fStream.Close[ ! IO.Error, FS.Error => CONTINUE];
IF ~ok THEN {
CheckReport[opsH, " Archive Read of %g failed\n", [rope[file]] ];
IF reason # NIL THEN CheckReport[opsH, " Error reported as: %g\n", [rope[reason]]];
RETURN[-1];
}
ELSE Raf2[];
LongRunningApply[opsH, Caf];
IF ~ok THEN {
CheckReport[opsH, "Out of space trying to copy readArchiveLog for file %g\n", [rope[file]] ];
RETURN[-1];
};
CheckReport[opsH, "Adding messages to database\n"];
numNew ¬ ParseLog[opsH, FALSE];
};
nativeWalnutItemForm: ROPE = "@%05d 00525 %05d\n"; -- 20 chars, tioga formatting
crWalnutItemForm: ROPE = "@%05d 00525 %05d\r"; -- 20 chars, tioga formatting
nativeStartHeaderForm: ROPE = "*start*\n%05d %05d US \n";
crStartHeaderForm: ROPE = "*start*\r%05d %05d US \r";
DoWriteArchiveFile: PUBLIC PROC[
opsH: WalnutOpsHandle, file: ROPE, msgSetList: LIST OF MsgSet, append: BOOL]
RETURNS[ok: BOOL]= {
Write an archive file that contains the messages from the given message sets. No log entry is written and no updates are made to the database (we just hold the monitor to guarantee that no changes to the message sets occur).
wStream: STREAM;
someMsgWasTooBig: BOOL ¬ FALSE;
thisMsgSet, exp: ROPE;
startHeaderFixedLen: INT = 24;
useCRForArchive: BOOL ¬ UserProfile.Boolean["Walnut.useCRForArchiveFile", FALSE];
walnutItemForm: ROPE ~ IF useCRForArchive THEN crWalnutItemForm ELSE nativeWalnutItemForm;
startHeaderForm: ROPE ~ IF useCRForArchive THEN crStartHeaderForm ELSE nativeStartHeaderForm;
first: BOOL ¬ TRUE;
WriteProc: PROC[msg, tocEntry: ROPE, hasBeenRead: BOOL, startOfSubject: INT] RETURNS[continue: BOOL] = {
textStart, textLen, formatLen, prefixLen: INT;
length, walnutItemLen: INT ¬ 0;
walnutItem: ROPE;
continue ¬ TRUE;
[textStart, textLen, formatLen, , ] ¬ WalnutDB.GetMsgText[opsH, msg];
walnutItem ¬ IF useCRForArchive THEN Rope.Cat[msg, "\r", thisMsgSet, "\r"]
ELSE Rope.Cat[msg, "\n", thisMsgSet, "\n"];
walnutItemLen ¬
WalnutMiscLog.walnutItemFixedLength + walnutItem.Length[] + formatLen;
IF formatLen # 0 THEN walnutItemLen ¬ walnutItemLen + 1; -- for extra CR after formatting
prefixLen ¬ startHeaderFixedLen + walnutItemLen;
length ¬ prefixLen + textLen + 1;  -- extra CR after text
IF length > 99999 THEN {
CheckReport[opsH, "\nLength of msg %g is too big (%g bytes) - skipping",
[rope[msg]], [integer[length]] ];
someMsgWasTooBig ¬ TRUE;
RETURN
};
-- the -2 below are because the bytecount within the prefix item does not include the surrounding @'s
wStream.PutF[ startHeaderForm, [integer[length]], [integer[prefixLen]] ];
wStream.PutF[walnutItemForm, [integer[walnutItemLen-2]], [integer[formatLen]] ];
wStream.PutRope[walnutItem];
IF formatLen # 0 THEN {
WalnutLog.CopyBytesToArchive[opsH, wStream, textStart+textLen, formatLen];
wStream.PutChar[IF useCRForArchive THEN '\r ELSE '\n];
};
wStream.PutChar['@];
WalnutLog.CopyBytesToArchive[opsH, wStream, textStart, textLen];
wStream.PutChar[IF useCRForArchive THEN '\r ELSE '\n];
};
ok ¬ FALSE;
CheckInProgress[opsH];
BEGIN ENABLE BEGIN
LoganBerry.Error => {
IF debugging THEN SIGNAL DBErrorSeen;
IF wStream # NIL THEN wStream.Close[ ! IO.Error, FS.Error => CONTINUE];
ERROR WalnutDefs.Error[$db, ec, IO.PutFR1["DB.Error with explanation: %g", [rope[explanation]] ] ];
};
WalnutDefs.Error => {
IF debugging THEN SIGNAL WDErrorSeen;
IF wStream # NIL THEN wStream.Close[ ! IO.Error, FS.Error => CONTINUE];
REJECT;
};
WalnutDefs.VersionMismatch => {
IF wStream # NIL THEN wStream.Close[ ! IO.Error, FS.Error => CONTINUE];
REJECT;
};
<<IO.Error => { CheckReport[opsH, exp ¬ FS.ErrorFromStream[stream].explanation]; GOTO err };>>
IO.Error => { CheckReport[opsH, exp ¬ "IO Error"]; GOTO err };
FS.Error => { CheckReport[opsH, exp ¬ error.explanation]; GOTO err };
END;
BEGIN
wStream ¬ WalnutStream.Open[
name: file, useOldIfFound: append, exclusive: TRUE ! FS.Error =>
{ CheckReport[opsH, error.explanation]; GOTO none }].strm;
EXITS
none => wStream ¬ NIL;
END;
IF wStream = NIL THEN {
CheckReport[opsH, "\nCould not open %g\n", [rope[file]] ];
RETURN
};
IF append THEN wStream.SetIndex[wStream.GetLength[]]
ELSE {
wStream.SetIndex[0];
wStream.SetLength[0];
};
CheckReport[opsH, "\n Archiving: "];
FOR mL: LIST OF WalnutDefs.MsgSet ¬ msgSetList, mL.rest UNTIL mL=NIL DO
thisMsgSet ¬ mL.first.name;
IF ~WalnutDB.VerifyMsgSet[opsH, mL.first] THEN {
CheckReport[opsH, "\n MsgSet %g doesn't exist - continuing", [rope[thisMsgSet]] ];
LOOP;
};
IF first THEN first ¬ FALSE ELSE CheckReport[opsH, ", "];
CheckReport[opsH, thisMsgSet];
[] ¬ WalnutDB.EnumerateMsgsInSet[opsH, thisMsgSet, TRUE, WriteProc];
ENDLOOP;
EXITS
err => {
wStream.Close[];
ERROR WalnutDefs.Error[$log, $ErrorDuringWriteArchive, exp];
};
END;
wStream.Close[];
ok ¬ TRUE;
CheckReport[opsH, "\tFinished writing archive file\n"];
};
END.