DIRECTORY
BasicTime USING [GMT, nullGMT, Now],
FS USING [BytesForPages, Error, <<ErrorDesc,>> <<ErrorFromStream,>> PagesForBytes, StreamOpen],
IO,
LoganBerry USING [Error],
LoganBerryEntry USING [I2V],
Process USING [PauseMsec],
RefText USING [MaxLen, New, TrustTextAsRope],
Rope,
-- UserCredentials USING [Get],
TapFilter USING [Agent, CreateAgent, Error, IsAgentIdle, MonitorAgent, MonitorProc, ParseMsgIntoFields, WakeupAgent, msgID, seqNum],
TapMsgQueue USING [Create, Msg, Put],
UserProfile USING [Boolean, Token],
ViewerClasses USING [Viewer],
ViewerIO USING [CreateViewerStreams, GetViewerFromStream],
ViewerOps USING [FindViewer],
WalnutDB --using lots-- ,
WalnutDefs USING [CheckReportProc, dontCareMsgSetVersion, Error, LogInfo, MsgSet, VersionMismatch, WalnutOpsHandle],
WalnutKernelDefs USING [LogEntry, LogExpungePhase, MsgLogEntry, WhichTempLog],
WalnutLog --using lots-- ,
WalnutLogExpunge --using lots-- ,
WalnutMiscLog USING [CreateReadArchiveLog, GetNewMailLog, walnutItemFixedLength],
WalnutOps,
WalnutOpsInternal,
WalnutRegistryPrivate USING [NotifyForEvent],
WalnutRoot USING [AbortTransaction, AcquireWriteLock, CommitAndContinue, GetStreamsForExpunge, Open, RegisterStatsProc, RootHandle, RootHandleRec, Shutdown, StartTransaction, StatsReport, SwapLogs, UnregisterStatsProc],
WalnutStream USING [Open];
Types
ROPE: TYPE = Rope.ROPE;
STREAM: TYPE = IO.STREAM;
GMT: TYPE = BasicTime.GMT;
CheckReportProc: TYPE = WalnutDefs.CheckReportProc;
LogInfo: TYPE = WalnutDefs.LogInfo;
MsgSet: TYPE = WalnutDefs.MsgSet;
WalnutOpsHandle: TYPE = WalnutDefs.WalnutOpsHandle;
RootHandle: TYPE = WalnutRoot.RootHandle;
RootHandleRec: PUBLIC TYPE = WalnutRoot.RootHandleRec;
KernelHandle: TYPE = WalnutOpsInternal.KernelHandle;
KernelHandleRec: PUBLIC TYPE = WalnutOpsInternal.KernelHandleRec;
WhichTempLog: TYPE = WalnutKernelDefs.WhichTempLog;
OutCome: TYPE = { done, couldntRestart };
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
ComputeMaxExpungeLogPages:
PUBLIC
PROC[opsH: WalnutOpsHandle]
RETURNS[pages:
INT] = {
bytesInDestroyedMsgs: INT = WalnutDB.GetExpungeInfo[opsH].bytesInDestroyedMsgs;
logLen: INT = WalnutLog.LogLength[opsH];
RETURN[FS.PagesForBytes[logLen-bytesInDestroyedMsgs]];
};
DoLogExpunge:
PUBLIC
PROC[opsH: WalnutOpsHandle] = {
The log is re-written, starting from the first file with deleted messages. Called from within a LongRunningApply; calling SetLogExpungePhase does a commit
DO
expPhase: WalnutKernelDefs.LogExpungePhase ¬ WalnutDB.GetLogExpungePhase[opsH];
SELECT expPhase
FROM
idle => {
StatsReport[opsH, "\n ~~~ Starting log expunge"];
WalnutDB.SetLogExpungePhase[opsH, initializingExpungeLog];
WalnutRoot.CommitAndContinue[opsH];
commitFrequency ¬ expungeCommitFrequency;
};
initializingExpungeLog => {
expungeFileID: INT;
pagesNeeded: INT = ComputeMaxExpungeLogPages[opsH];
expungeFileID ¬ WalnutLogExpunge.StartExpunge[opsH, pagesNeeded];
WalnutDB.SetExpungeFileID[opsH, expungeFileID];
WalnutDB.SetLogExpungePhase[opsH, writingExpungeLog];
WalnutRoot.CommitAndContinue[opsH];
};
writingExpungeLog => {
[] ¬ WriteExpungeLog[opsH];
[] ¬ WalnutLogExpunge.EndExpunge[opsH];
WalnutDB.SetLogExpungePhase[opsH, swappingLogs];
WalnutRoot.CommitAndContinue[opsH];
};
swappingLogs => {
expungeFileID: INT ¬ WalnutDB.GetExpungeFileID[opsH];
date: BasicTime.GMT;
logLen: INT;
[] ¬ WalnutRoot.GetStreamsForExpunge[opsH: opsH, starting: FALSE, pagesWanted: -1];
StatsReport[opsH, "\n Swapping Log Files\n"];
WalnutDB.SetExpungeProgressInfo[opsH, 0,0];
WalnutDB.SetExpungeInfo[opsH, 0, 0];
WalnutDB.SetOpInProgressPos[opsH, -1];
WalnutDB.SetLogExpungePhase[opsH, idle]; -- whew
date ¬ BasicTime.Now[];
WalnutDB.SetTimeOfLastExpunge[opsH, date];
logLen ¬ WalnutRoot.SwapLogs[opsH, expungeFileID, date];
WalnutDB.SetRootFileVersion[opsH, opsH.rootHandle.createDate];
WalnutRoot.CommitAndContinue[opsH];
StatsReport[opsH, "\n ~~~ Finished expunge"];
WalnutLog.OpenLogStreams[opsH];
RETURN;
};
ENDCASE => ERROR;
ENDLOOP;
};
ExpungeLogResult:
TYPE = { unknown, couldntRestart, finished };
WriteExpungeLog:
PROC[opsH: WalnutOpsHandle]
RETURNS[result: ExpungeLogResult] = {
called from within catch phrases, but must do its own error catching, since the expunge Log cannot be flushed after every write - that makes too much traffic on the alpine server;
reads the old log, starting at currentLogPos, copies entries to the expungeLog, changing msg entryStart pointers as it goes, doing ocasional commits of both the expungeLog and the database
numMsgs: INT ¬ 0;
currentLogPos, expungeLogPos: INT;
previousAt, at: INT;
startRetryCount: INT = 2;
retryCount: INT ¬ startRetryCount;
bytesToCopyBeforeFlush: INT ¬ FS.BytesForPages[500];
lastExpungeCommitLength: INT ¬ -1;
oldLength: INT = WalnutLog.LogLength[opsH];
DO
BEGIN
ENABLE WalnutDefs.Error => {
IF debugging THEN SIGNAL WDErrorSeen;
IF code = $TransactionAbort THEN GOTO retry ELSE REJECT;
};
bytesCopiedSinceLastCommit, numMsgsSinceShutdown: INT ¬ 0;
lastAcceptNewMailPos: INT;
firstDestroyedMsgPos: INT ¬ WalnutDB.GetExpungeInfo[opsH].firstDestroyedMsgPos;
[currentLogPos, expungeLogPos] ¬ WalnutDB.GetExpungeProgressInfo[opsH];
IF lastExpungeCommitLength = expungeLogPos
THEN {
IF (retryCount ¬ retryCount - 1) <= 0
THEN
ERROR WalnutDefs.Error[$log, $TooManyLogAborts,
IO.PutFR1["\n During Expunge - last commit pos in expungeLog was %g",
[integer[expungeLogPos]]] ];
}
ELSE {
lastExpungeCommitLength ¬ expungeLogPos;
retryCount ¬ startRetryCount;
};
IF ~WalnutLogExpunge.RestartExpunge[opsH, currentLogPos, expungeLogPos]
THEN {
pagesNeeded: INT ¬ FS.PagesForBytes[WalnutLogExpunge.ExpLogLength[opsH] - WalnutDB.GetExpungeInfo[opsH].bytesInDestroyedMsgs];
CheckReport[opsH,
"\n ****Could not restart expunge from %g in expungeLog, %g in currentLog - restarting Log Expunge\n",
[integer[expungeLogPos]], [integer[currentLogPos]] ];
expungeLogPos ¬ currentLogPos ¬ 0;
WalnutDB.SetExpungeProgressInfo[opsH, currentLogPos, expungeLogPos]; -- commits
[] ¬ WalnutLogExpunge.StartExpunge[opsH, pagesNeeded]; -- try again
StatsReport[opsH, "Restarting expunge from beginning"];
IF ~WalnutLogExpunge.RestartExpunge[opsH, currentLogPos, expungeLogPos]
THEN
WalnutDefs.Error[$log, $CantStartExpunge];
lastExpungeCommitLength ¬ expungeLogPos;
};
IF currentLogPos < firstDestroyedMsgPos
THEN {
-- copyingHead phase
bytesLeftToCopyInHead: INT ¬ firstDestroyedMsgPos - currentLogPos;
bytesToCopyAtOneTime: INT = FS.BytesForPages[400];
IF currentLogPos = 0
THEN {
WalnutLogExpunge.SetIndex[opsH, 0];
[] ¬ WalnutLogExpunge.SkipEntry[opsH]; -- need to skip the LogFileInfo entry
bytesLeftToCopyInHead ¬
bytesLeftToCopyInHead - WalnutLogExpunge.GetIndex[opsH];
};
CheckReport[opsH,
"Copying the head of the log, starting at bytePos %g, ending at bytePos %g\n\t",
[integer[currentLogPos]], [integer[firstDestroyedMsgPos]] ];
DO
bytesToCopyThisTime: INT ¬ MIN[bytesLeftToCopyInHead, bytesToCopyAtOneTime];
WalnutLogExpunge.CopyBytesToExpungeLog[opsH, bytesToCopyThisTime];
[currentLogPos, expungeLogPos] ¬ WalnutLogExpunge.GetExpungeProgress[opsH];
WalnutDB.SetExpungeProgressInfo[opsH, currentLogPos, expungeLogPos];
WalnutRoot.CommitAndContinue[opsH];
CheckReport[opsH, "#"];
bytesLeftToCopyInHead ¬ bytesLeftToCopyInHead - bytesToCopyThisTime;
IF bytesLeftToCopyInHead = 0 THEN EXIT;
ENDLOOP;
retryCount ¬ startRetryCount; -- in case has abort during copy
};
lastAcceptNewMailPos can't be before firstDestroyedMsgPos
lastAcceptNewMailPos ¬ WalnutDB.GetAcceptNewMailPos[opsH];
WalnutLogExpunge.SetIndex[opsH, at ¬ currentLogPos]; -- before doing PeekEntry
CheckReport[opsH,
"\nProcessing the tail of the log, starting at bytePos %g (old length %g bytes, %g pages)\n",
[integer[at]], [integer[oldLength]], [integer[FS.PagesForBytes[oldLength]]] ];
DO
ident: ATOM;
msgID: ROPE;
bytesThisCopy: INT ¬ 0;
previousAt ¬ at;
[ident, msgID, at] ¬ WalnutLogExpunge.PeekEntry[opsH];
IF ident =
NIL
THEN
IF at # -1 THEN EXIT -- end of log
ELSE {
charSkipped: INT ¬ WalnutLogExpunge.SetPosition[opsH, previousAt+1];
IF charSkipped = -1
THEN
ERROR WalnutDefs.Error[$log, $DuringExpunge,
IO.PutFR1["No entry found after %g", [integer[previousAt]] ]];
at ¬ previousAt + charSkipped;
LOOP;
};
SELECT ident
FROM
$ExpungeMsgs => [] ¬ WalnutLogExpunge.SkipEntry[opsH];
$WriteExpungeLog => { result ¬ finished; EXIT }; -- last entry on the Log
$LogFileInfo => [] ¬ WalnutLogExpunge.SkipEntry[opsH];
$RecordNewMailInfo, $StartCopyNewMail =>
IF at < lastAcceptNewMailPos
THEN [] ¬ WalnutLogExpunge.SkipEntry[opsH]
ELSE bytesThisCopy ¬ WalnutLogExpunge.CopyEntry[opsH].bytesCopied;
$EndCopyNewMailInfo => {
IF at < lastAcceptNewMailPos
THEN {
-- maybe skip
startCopyPos: INT ¬ WalnutLogExpunge.EndCopyEntry[opsH];
IF firstDestroyedMsgPos < startCopyPos
THEN [] ¬ WalnutLogExpunge.SkipEntry[opsH]
ELSE bytesThisCopy ¬ WalnutLogExpunge.CopyEntry[opsH].bytesCopied;
}
ELSE bytesThisCopy ¬ WalnutLogExpunge.CopyEntry[opsH].bytesCopied;
};
$AcceptNewMail =>
IF at < lastAcceptNewMailPos
THEN [] ¬ WalnutLogExpunge.SkipEntry[opsH]
ELSE {
newLogPos: INT;
[newLogPos, bytesThisCopy] ¬ WalnutLogExpunge.CopyEntry[opsH];
WalnutDB.SetAcceptNewMailPos[opsH, newLogPos];
};
$StartCopyReadArchive => [] ¬ WalnutLogExpunge.SkipEntry[opsH];
$EndCopyReadArchiveInfo => {
startCopyPos: INT ¬ WalnutLogExpunge.EndCopyEntry[opsH];
IF firstDestroyedMsgPos < startCopyPos
THEN [] ¬ WalnutLogExpunge.SkipEntry[opsH]
ELSE bytesThisCopy ¬ WalnutLogExpunge.CopyEntry[opsH].bytesCopied;
};
ENDCASE =>
IF msgID.Length[] = 0
OR WalnutDB.MsgExists[opsH, msgID]
THEN {
the next entry either has nothing to do with a message or it is an operation on a message that still exists
copy the entry to the new log
newLogPos: INT ;
[newLogPos, bytesThisCopy] ¬ WalnutLogExpunge.CopyEntry[opsH];
IF ident = $CreateMsg
THEN {
it is the message body itself; record the change in position in the database
IF newLogPos # at THEN WalnutDB.SetMsgEntryPosition[opsH, newLogPos];
numMsgsSinceShutdown ¬ numMsgsSinceShutdown + 1;
IF (numMsgs ¬ numMsgs + 1)
MOD 10 = 0
THEN
IF numMsgs
MOD 100 = 0
THEN
CheckReport[opsH, "(%g) ", [integer[numMsgs]] ]
ELSE CheckReport[opsH, "."];
};
}
if you haven't consumed the entry by copying it, then just advance the log pointer
ELSE [] ¬ WalnutLogExpunge.SkipEntry[opsH];
IF numMsgsSinceShutdown >= shutdownFrequency
THEN {
expLogPos: INT;
[currentLogPos, expLogPos] ¬ WalnutLogExpunge.GetExpungeProgress[opsH];
WalnutDB.SetExpungeProgressInfo[opsH, currentLogPos, expLogPos];
WalnutLogExpunge.EndExpunge[opsH];
ShortShutdown[opsH, 20, -1];
[] ¬ WalnutLogExpunge.RestartExpunge[opsH, currentLogPos, expLogPos];
lastExpungeCommitLength ¬ expLogPos;
bytesCopiedSinceLastCommit ¬ bytesThisCopy ¬ 0;
numMsgsSinceShutdown ¬ 0;
};
IF (bytesCopiedSinceLastCommit ¬ bytesCopiedSinceLastCommit + bytesThisCopy) >= bytesToCopyBeforeFlush
THEN {
expLogPos: INT;
[currentLogPos, expLogPos] ¬ WalnutLogExpunge.GetExpungeProgress[opsH];
WalnutDB.SetExpungeProgressInfo[opsH, currentLogPos, expLogPos];
WalnutRoot.CommitAndContinue[opsH];
lastExpungeCommitLength ¬ expLogPos;
bytesCopiedSinceLastCommit ¬ 0;
};
ENDLOOP;
Commit the last batch of updates
IF bytesCopiedSinceLastCommit > 0
THEN {
[currentLogPos, expungeLogPos] ¬ WalnutLogExpunge.GetExpungeProgress[opsH];
WalnutDB.SetExpungeProgressInfo[opsH, currentLogPos, expungeLogPos];
WalnutRoot.CommitAndContinue[opsH];
lastExpungeCommitLength ¬ expungeLogPos;
};
RETURN;
EXITS
retry => {
schemaInvalid: BOOL ¬ TRUE;
WalnutLogExpunge.EndExpunge[opsH];
WalnutRoot.AbortTransaction[opsH];
schemaInvalid ¬ WalnutRoot.StartTransaction[opsH];
IF schemaInvalid THEN [] ¬ WalnutDB.InitSchema[opsH, schemaInvalid];
WalnutLog.OpenLogStreams[opsH];
};
END;
ENDLOOP;
};
Is some long running operation in progress
CheckInProgress:
PUBLIC
PROC[opsH: WalnutOpsHandle] = {
inProgressPos: INT ¬ 0;
parseInProgress: BOOL ¬ FALSE;
needsParse: BOOL ¬ FALSE;
doingLogExpunge: BOOL ¬ FALSE;
Cip:
PROC = {
IF (parseInProgress ¬ WalnutDB.GetParseLogInProgress[opsH])
THEN {
WalnutRoot.AcquireWriteLock[opsH]; RETURN };
inProgressPos ¬ WalnutDB.GetOpInProgressPos[opsH];
doingLogExpunge ¬ ( WalnutDB.GetLogExpungePhase[opsH] # idle );
IF inProgressPos <= 0 THEN RETURN;
WalnutRoot.AcquireWriteLock[opsH];
};
Fip:
PROC[inProgress:
BOOL] = {
-- FinishInProgress
at: INT;
wle: WalnutKernelDefs.LogEntry;
WalnutLog.SetIndex[opsH, inProgressPos];
[wle, at] ¬ WalnutLog.NextEntry[opsH];
IF at = -1
THEN
ERROR WalnutDefs.Error[$log, $BadLog,
IO.PutFR1["No entry at %g", [integer[inProgressPos]]] ];
TRUSTED {
WITH le: wle
SELECT
FROM
ExpungeMsgs =>
WalnutDB.ExpungeMsgs[opsH, dontCareMsgSetVersion, CheckReport];
WriteExpungeLog => DoLogExpunge[opsH];
EmptyMsgSet => []¬ WalnutDB.EmptyMsgSet[
opsH,
[ le.msgSet, dontCareMsgSetVersion ],
WalnutOpsInternal.CheckReport ];
DestroyMsgSet =>
[]¬ WalnutDB.DestroyMsgSet[
opsH: opsH,
msgSet: [ le.msgSet, dontCareMsgSetVersion ],
msDomainVersion: dontCareDomainVersion,
report: WalnutOpsInternal.CheckReport
];
AcceptNewMail => {
WalnutDB.AcceptNewMail[opsH, at, dontCareMsgSetVersion];
opsH.kernelHandle.newMailSomewhere ¬ FALSE;
};
StartCopyNewMail => {
FinishCopyTempLog[opsH, newMail, at];
WalnutDB.SetCopyMailLogPos[opsH, 0];
WalnutRoot.CommitAndContinue[opsH];
WalnutLog.FinishTempLogCopy[opsH, newMail];
needsParse ¬ TRUE;
};
StartReadArchiveFile => {
WalnutLog.ResetLog[opsH, at];
WalnutDB.SetReadArchivePos[opsH, 0];
};
StartCopyReadArchive => {
FinishCopyTempLog[opsH, readArchive, at];
WalnutDB.SetCopyReadArchivePos[opsH, 0];
WalnutRoot.CommitAndContinue[opsH];
WalnutLog.FinishTempLogCopy[opsH, readArchive];
needsParse ¬ TRUE;
};
ENDCASE =>
ERROR WalnutDefs.Error[$db, $ErroneousInProgress,
IO.PutFR1["Entry at %g", [integer[inProgressPos]]] ];
}; -- end trusted
WalnutDB.SetOpInProgressPos[opsH, -1];
WalnutRoot.CommitAndContinue[opsH];
};
IF opsH.kernelHandle.errorInProgress
THEN
ERROR WalnutDefs.Error[$db, $InternalError, "Must do Shutdown & Startup"];
IF opsH.kernelHandle.isShutdown
THEN {
IF opsH.rootHandle = NIL THEN [] ¬ WalnutRoot.Open[opsH];
Restart[opsH];
};
WalnutOpsInternal.CarefullyApply[opsH, Cip, FALSE];
IF parseInProgress
THEN {
StatsReport[opsH, "\n Continuing parseInProgress"];
[] ¬ ParseLog[opsH, TRUE];
RETURN
};
IF inProgressPos <= 0 THEN RETURN;
StatsReport[opsH, "\n Continuing longRunningOperation"];
WalnutOpsInternal.LongRunningApply[opsH, Fip];
IF needsParse
THEN {
numNew: INT ¬ 0;
CheckReport[opsH, "Adding messages to the database\n"];
numNew ¬ ParseLog[opsH, FALSE];
IF numNew = 0
THEN
CheckReport[opsH, "No messages were new\n"]
ELSE CheckReport[opsH,
" %g new messages\n", [integer[numNew]] ];
};
};
FinishCopyTempLog:
PROC[opsH: WalnutOpsHandle, which: WhichTempLog, at:
INT] = {
logLen: INT ¬ WalnutLog.LogLength[opsH];
startedCopyAt, startCopyPos, fromPos: INT;
pagesAlreadyCopied: INT;
IF WalnutLog.SetPosition[opsH, at] # 0
THEN
ERROR WalnutDefs.Error[$log, $BadLog, IO.PutFR1["no entry at %g", [integer[at]]] ];
startCopyPos ¬ WalnutLog.NextEntry[opsH].at; -- skip the copy entry
startedCopyAt ¬ WalnutLog.NextAt[opsH];
fromPos ¬ logLen - startedCopyAt;
pagesAlreadyCopied ¬ FS.PagesForBytes[fromPos];
IF ~WalnutLog.PrepareToCopyTempLog[opsH: opsH, which: which, pagesAlreadyCopied: pagesAlreadyCopied, reportProc: WalnutOpsInternal.CheckReport]
THEN ERROR WalnutDefs.Error[$log, $DuringOpInProgress, "can't get tempLog for copy"];
CheckReport[opsH,
"\nContinuing copy of tempLog, starting at bytePos %g\n", [integer[fromPos]] ];
WalnutLog.CopyTempLog[opsH, which, startCopyPos, fromPos, CheckReport];
WalnutDB.SetParseLogInProgress[opsH, TRUE];
WalnutDB.SetParseLogPos[opsH, WalnutDB.GetOpInProgressPos[opsH]];
WalnutDB.SetOpInProgressPos[opsH, -1];
};
Parsing the Log - assumes the log is write-locked
ParseLog:
PUBLIC
PROC[opsH: WalnutOpsHandle, doingScavenge:
BOOL ¬
FALSE]
RETURNS[numNew: INT] = {
parseLogPos: INT;
at: INT; -- position in log of entry just processed
lastNumNew, savedNumNew, savedLastNumNew: INT ¬ 0;
wle: WalnutLog.LogEntry;
charsSkipped: INT;
errorRope: ROPE = "Can't find a log entry after pos %g (log length is %g)";
retries: INT ¬ 2;
accepted: BOOL;
giveStartInfo: BOOL ¬ TRUE;
commitFrequency: INT ¬ parseCommitFrequency;
updatesSinceLastCommit, numSinceShutdown: INT ¬ 0;
numNew ¬ 0;
DO
currentLogLength: INT ¬ WalnutLog.LogLength[opsH]; -- in case at end of Expunge
BEGIN
ENABLE
BEGIN
WalnutDefs.Error => {
IF debugging THEN SIGNAL WDErrorSeen;
IF code = $TransactionAbort
THEN {
StatsReport[opsH, " *** TransactionAborts during parseLog"];
IF ( retries ¬ retries - 1) > 0 THEN GOTO tryAgain;
opsH.kernelHandle.errorInProgress ¬ TRUE;
StatsReport[opsH, " *** Too many TransactionAborts during parseLog"];
REJECT
}
ELSE {
opsH.kernelHandle.errorInProgress ¬ TRUE;
StatsReport[opsH, "InternalError: %g during parseLog", [atom[code]] ];
REJECT
};
};
END;
parseLogPos ¬ WalnutDB.GetParseLogPos[opsH];
IF giveStartInfo
THEN {
IF doingScavenge
THEN numNew ¬ WalnutDB.SizeOfDatabase[opsH].messages
ELSE numNew ¬ 0;
IF doingScavenge
AND numNew # 0
THEN
CheckReport[opsH,
" Parsing the log - database has %g messages so far\n", [integer[numNew]] ];
giveStartInfo ¬ FALSE;
};
IF ( parseLogPos < 0 ) OR ( parseLogPos = currentLogLength ) THEN RETURN[numNew];
charsSkipped ¬ WalnutLog.SetPosition[opsH, parseLogPos];
IF charsSkipped = -1
THEN
ERROR WalnutDefs.Error[$log, $BadLog,
IO.PutFR[errorRope, [integer[parseLogPos]], [integer[currentLogLength]]] ];
at ¬ parseLogPos + charsSkipped;
make sure things are in good shape before starting
IF at # parseLogPos
THEN {
WalnutDB.SetParseLogPos[opsH, at];
WalnutRoot.CommitAndContinue[opsH];
};
accepted ¬ NOT WalnutDB.GetAddingServerMsgs[opsH];
DO
previousAt: INT ¬ at;
[wle, at] ¬ WalnutLog.NextEntry[opsH];
IF wle =
NIL
THEN {
maybeBadAt: INT = at;
charsSkipped ¬ WalnutLog.SetPosition[opsH, previousAt+1];
IF charsSkipped = -1
THEN
ERROR WalnutDefs.Error[$log, $BadLog,
IO.PutFR[errorRope, [integer[previousAt]], [integer[currentLogLength]]] ];
[wle, at] ¬ WalnutLog.NextEntry[opsH];
IF wle =
NIL
THEN
ERROR WalnutDefs.Error[$log, $BadLog,
IO.PutFR[errorRope, [integer[previousAt]], [integer[currentLogLength]]] ];
CheckReport[opsH,
"\n*** Bad log entry at or near %g or %g; do a ScanWalnutLog sometime soon\n",
[integer[previousAt]], [integer[at]] ];
};
IF numSinceShutdown >= shutdownFrequency
THEN {
WalnutDB.SetParseLogPos[opsH, at]; -- at is beginning of current (unseen) entry
ShortShutdown[opsH, 20, at];
updatesSinceLastCommit ¬ 0;
savedNumNew ¬ numNew;
savedLastNumNew ¬ lastNumNew;
numSinceShutdown ¬ 0;
};
IF (updatesSinceLastCommit ¬ updatesSinceLastCommit+1) >= commitFrequency
THEN {
WalnutDB.SetParseLogPos[opsH, at]; -- at is beginning of current (unseen) entry
WalnutRoot.CommitAndContinue[opsH];
updatesSinceLastCommit ¬ 0;
savedNumNew ¬ numNew;
savedLastNumNew ¬ lastNumNew;
};
BEGIN ENABLE WalnutDefs.VersionMismatch => GOTO continue;
TRUSTED {
WITH le: wle
SELECT
FROM
LogFileInfo => NULL;
ExpungeMsgs => {
if we don't SetParseLogPos & Commit here, we might delete some messages, commit and later get an error, parse the log from an earlier point, and add more to bytesInDestroyedMsgs
WalnutDB.SetParseLogPos[opsH, at];
WalnutRoot.CommitAndContinue[opsH];
CheckReport[opsH, "\nStarting ExpungeMsgs "];
WalnutDB.ExpungeMsgs[opsH, dontCareMsgSetVersion, WalnutOpsInternal.CheckReport];
WalnutRoot.CommitAndContinue[opsH];
CheckReport[opsH, " ... finished ExpungeMsgs\n"];
};
WriteExpungeLog => {
CheckReport[opsH, "\nStarting WriteExpungeLog "];
WalnutDB.SetParseLogPos[opsH, -1];
WalnutDB.SetParseLogInProgress[opsH, FALSE];
WalnutDB.SetOpInProgressPos[opsH, at]; -- at is beginning of current entry
WalnutRoot.CommitAndContinue[opsH];
[] ¬ DoLogExpunge[opsH];
CheckReport[opsH, " ... finished WriteExpungeLog\n"];
EXIT; -- WriteExpungeLog had to be the last entry
};
EndOfLog => {
WalnutDB.SetParseLogPos[opsH, -1];
WalnutDB.SetParseLogInProgress[opsH, FALSE];
WalnutRoot.CommitAndContinue[opsH];
RETURN
};
CreateMsgSet =>
[] ¬ WalnutDB.CreateMsgSet[opsH, le.msgSet, dontCareDomainVersion];
EmptyMsgSet => {
[] ¬ WalnutDB.EmptyMsgSet[
opsH,
[le.msgSet, dontCareMsgSetVersion],
WalnutOpsInternal.CheckReport
];
};
DestroyMsgSet => {
[] ¬ WalnutDB.DestroyMsgSet[
opsH: opsH,
msgSet: [le.msgSet, dontCareMsgSetVersion],
msDomainVersion: dontCareDomainVersion,
report: WalnutOpsInternal.CheckReport
];
};
CreateMsg => {
me: WalnutKernelDefs.MsgLogEntry = NARROW[wle];
me.show ¬ accepted;
IF me.msg #
NIL
THEN {
IF ~WalnutDB.AddNewMsg[opsH, me] THEN numNew ¬ numNew + 1;
numSinceShutdown ¬ numSinceShutdown + 1;
};
};
AddMsg =>
IF le.msg #
NIL
THEN []¬ WalnutDB.AddMsg[
opsH: opsH,
msg: le.msg,
from: [NIL, dontCareMsgSetVersion],
to: [le.to, dontCareMsgSetVersion] ];
RemoveMsg =>
IF le.msg #
NIL
THEN []¬ WalnutDB.RemoveMsg[
opsH: opsH,
msg: le.msg,
from: [le.from, dontCareMsgSetVersion],
deletedVersion: dontCareMsgSetVersion ];
MoveMsg =>
IF le.msg #
NIL
THEN []¬ WalnutDB.MoveMsg[
opsH: opsH,
msg: le.msg,
from: [ le.from, dontCareMsgSetVersion],
to: [le.to, dontCareMsgSetVersion] ];
HasBeenRead => IF le.msg # NIL THEN WalnutDB.SetHasBeenRead[opsH, le.msg];
RecordNewMailInfo => {
WalnutDB.SetNewMailInfo[
opsH, le.logLen, le.when, le.server, le.num];
opsH.kernelHandle.newMailSomewhere ¬ TRUE;
};
StartCopyNewMail => {
WalnutDB.SetCopyMailLogPos[opsH, at];
WalnutDB.SetAddingServerMsgs[opsH, TRUE];
accepted ¬ FALSE;
};
EndCopyNewMailInfo => {
WalnutDB.SetCopyMailLogPos[opsH, 0];
WalnutDB.SetNewMailInfo[opsH, 0, BasicTime.nullGMT, NIL, 0];
WalnutDB.SetAddingServerMsgs[opsH, FALSE];
accepted ¬ TRUE;
};
AcceptNewMail => {
WalnutDB.AcceptNewMail[opsH, at, dontCareMsgSetVersion];
opsH.kernelHandle.newMailSomewhere ¬ FALSE;
};
StartReadArchiveFile => WalnutDB.SetReadArchivePos[opsH, at];
EndReadArchiveFile => WalnutDB.SetReadArchivePos[opsH, 0];
StartCopyReadArchive => WalnutDB.SetCopyReadArchivePos[opsH, at];
EndCopyReadArchiveInfo => WalnutDB.SetCopyReadArchivePos[opsH, 0];
ENDCASE => ERROR;
};
EXITS continue => NULL;
END;
IF numNew # lastNumNew
THEN
IF numNew
MOD 10 = 0
THEN
IF numNew
MOD 100 = 0
THEN
CheckReport[opsH, "(%g) ", [integer[numNew]] ]
ELSE CheckReport[opsH, "~"];
lastNumNew ¬ numNew;
ENDLOOP;
EXITS
tryAgain => {
schemaInvalid: BOOL ¬ TRUE;
numNew ¬ savedNumNew;
lastNumNew ¬ savedLastNumNew;
WalnutRoot.AbortTransaction[opsH];
schemaInvalid ¬ WalnutRoot.StartTransaction[opsH, TRUE];
IF schemaInvalid THEN [] ¬ WalnutDB.InitSchema[opsH, schemaInvalid];
WalnutLog.OpenLogStreams[opsH];
giveStartInfo ¬ TRUE;
};
END;
ENDLOOP;
};
ShortShutdown:
PROC[opsH: WalnutOpsHandle, sec:
INT, logPos:
INT] = {
WalnutLog.ShutdownLog[opsH]; -- this does a close transaction as well
Process.PauseMsec[sec*1000];
[] ¬ WalnutRoot.StartTransaction[opsH, TRUE];
WalnutLog.OpenLogStreams[opsH];
IF logPos >= 0 THEN [] ¬ WalnutLog.SetPosition[opsH, logPos];
};
DoStartNewMail:
PUBLIC
PROC[opsH: WalnutOpsHandle]
RETURNS[
STREAM] = {
newMailLogLength: INT ¬ -1;
expungeInProgress: BOOL ¬ FALSE;
Gnml:
PROC = {
newMailLogLength ¬ WalnutDB.GetNewMailLogLength[opsH];
expungeInProgress ¬ WalnutDB.GetLogExpungePhase[opsH] # idle;
};
CheckInProgress[opsH];
IF opsH.kernelHandle.mailStream # NIL THEN RETURN[NIL];
CarefullyApply[opsH, Gnml, FALSE];
IF newMailLogLength = -1 OR expungeInProgress THEN RETURN[NIL];
RETURN[opsH.kernelHandle.mailStream ¬ WalnutMiscLog.GetNewMailLog[opsH, newMailLogLength, 200]];
};
DoAcceptNewMail:
PUBLIC
PROC[opsH: WalnutOpsHandle, activeVersion:
INT] = {
Anm:
PROC[inProgress:
BOOL] = {
at: INT;
IF ~inProgress
THEN {
[] ¬ WalnutDB.VerifyMsgSet[opsH, [ActiveName, activeVersion]];
at ¬ WalnutLog.AcceptNewMail[opsH].at;
WalnutDB.SetOpInProgressPos[opsH, at];
}
ELSE at ¬ WalnutDB.GetAcceptNewMailPos[opsH];
WalnutDB.AcceptNewMail[opsH, at, activeVersion];
};
CheckInProgress[opsH];
LongRunningApply[opsH, Anm];
opsH.kernelHandle.newMailSomewhere ¬ FALSE;
};
DoNewMail:
PUBLIC
PROC[
opsH: WalnutOpsHandle, activeVersion:
INT,
proc:
PROC[msg, tocEntry:
ROPE, startOfSubject:
INT]]
RETURNS[responses:
LIST
OF WalnutOps.ServerInfo, complete:
BOOL] = {
someEntries: BOOL ¬ FALSE;
Cml:
PROC = {
IF activeVersion # WalnutDefs.dontCareMsgSetVersion
THEN
[] ¬ WalnutDB.VerifyMsgSet[opsH, [ActiveName, activeVersion] ];
someEntries ¬ WalnutDB.GetNewMailLogLength[opsH] # 0;
};
Cml2:
PROC[inProgress:
BOOL] = {
fromPos: INT ¬ 0;
complete ¬ WalnutLog.PrepareToCopyTempLog[opsH: opsH, which: newMail, pagesAlreadyCopied: 0, reportProc: CheckReport];
IF ~complete THEN RETURN;
IF ~inProgress
THEN {
at: INT ¬ WalnutLog.StartCopyNewMail[opsH].at;
WalnutDB.SetOpInProgressPos[opsH, at];
WalnutDB.SetCopyMailLogPos[opsH, at];
}
ELSE {
at: INT = WalnutDB.GetCopyMailLogPos[opsH];
CheckReport[opsH, "\n Continue copying the newMailLog\n"];
[] ¬ WalnutLog.SetPosition[opsH, at];
[] ¬ WalnutLog.NextEntry[opsH]; -- skip the copy entry
fromPos ¬ WalnutLog.LogLength[opsH] - WalnutLog.NextAt[opsH];
};
WalnutDB.SetAddingServerMsgs[opsH, TRUE];
WalnutLog.CopyTempLog[opsH, newMail, WalnutDB.GetCopyMailLogPos[opsH], fromPos, CheckReport];
raises error if problem
WalnutDB.SetParseLogInProgress[opsH, TRUE];
WalnutDB.SetParseLogPos[opsH, WalnutDB.GetOpInProgressPos[opsH]];
WalnutDB.SetOpInProgressPos[opsH, -1];
};
Gnm:
PROC = {
IF proc # NIL THEN WalnutDB.EnumerateUnacceptedMsgs[opsH, activeVersion, proc];
responses ¬ WalnutDB.EnumerateServers[opsH];
};
CheckInProgress[opsH];
CarefullyApply[opsH, Cml, FALSE];
IF opsH.kernelHandle.mailStream # NIL THEN RETURN[NIL, FALSE]; -- file is busy
complete ¬ TRUE;
IF someEntries
THEN {
LongRunningApply[opsH, Cml2];
IF complete
THEN {
WalnutLog.OpenLogStreams[opsH];
CheckReport[opsH,
"Adding new mail to the database @ %g\n", [time[BasicTime.Now[]]] ];
[] ¬ ParseLog[opsH, FALSE]; -- "see" messages
[] ¬ FilterMessages[opsH, activeVersion];
};
};
CarefullyApply[opsH, Gnm, FALSE];
};
TapRef:
TYPE =
REF TapFilterRep;
TapFilterRep: TYPE = RECORD[opsH: WalnutOpsHandle, annotations: INT ¬ 0];
ReportProgress: TapFilter.MonitorProc = {
[msgID: ROPE, msg: TapMsgQueue.Msg, filterID: ROPE, annot: TapFilter.Annotation, clientData: REF ANY] RETURNS [doIt: BOOLEAN ← TRUE]
tR: TapRef ¬ NARROW[clientData];
CheckReport[tR.opsH, "@"];
tR.annotations ¬ tR.annotations + 1;
};
FilterMessages:
PROC[opsH: WalnutOpsHandle, activeVersion:
INT]
RETURNS [annotations:
INT ¬ 0] ~ {
ENABLE TapFilter.Error => {
CheckReport[opsH, "Problem with filtering agent: %g - %g.\n", IO.atom[ec], IO.rope[explanation]];
CONTINUE;
};
GetMsgText:
PROC [opsHandle: WalnutOpsHandle, msg:
ROPE]
RETURNS [
ROPE] = {
Adapted from WalnutOpsImpl.GetMsgText.
textStart, textLen: INT;
contents: REF TEXT;
[textStart, textLen, ] ¬ WalnutDB.GetMsgText[opsHandle, msg];
IF textStart = 0 THEN RETURN[NIL];
IF textLen > RefText.MaxLen-8 THEN textLen ¬ RefText.MaxLen-8;
contents ¬ RefText.New[textLen];
WalnutLog.GetRefTextFromLog[opsHandle, textStart, textLen, contents];
RETURN[RefText.TrustTextAsRope[contents]];
};
PassToFilterAgent:
PROC[msg, tocEntry:
ROPE, startOfSubject:
INT] = {
parsedMsg: TapMsgQueue.Msg;
contents: ROPE ¬ GetMsgText[opsH, msg];
parsedMsg ¬ TapFilter.ParseMsgIntoFields[contents];
feederSeqNum ¬ feederSeqNum + 1;
parsedMsg ¬ CONS[[TapFilter.seqNum, LoganBerryEntry.I2V[feederSeqNum]], parsedMsg];
parsedMsg ¬ CONS[[TapFilter.msgID, msg], parsedMsg];
TapMsgQueue.Put[parsedMsg, opsH.filterFeeder];
};
Fm:
PROC = {
Get user profile info.
wantAnnotations: BOOLEAN ¬ UserProfile.Boolean[key: "WallTapestry.AnnotateNewMail", default: FALSE];
filterDBName: ROPE ¬ UserProfile.Token[key: "WallTapestry.FilterDB", default: NIL];
annotationDBName: ROPE ¬ UserProfile.Token[key: "WallTapestry.AnnotationDB", default: NIL];
filteringAgent: TapFilter.Agent ¬ opsH.filterAgent;
IF NOT wantAnnotations OR filterDBName = NIL OR annotationDBName = NIL THEN RETURN;
Create filtering agent if necessary.
IF filteringAgent =
NIL
THEN {
opsH.filterFeeder ¬ TapMsgQueue.Create[];
filteringAgent ¬ opsH.filterAgent ¬ TapFilter.CreateAgent[feeder: opsH.filterFeeder, filterDB: filterDBName, user: NIL, annotDB: annotationDBName];
IF filteringAgent = NIL THEN RETURN;
};
Parse msgs and place on filter queue.
tR ¬ NEW[TapFilterRep ¬ [opsH, 0]];
CheckReport[opsH, "\nAnnotating messages: "];
TapFilter.MonitorAgent[agent: filteringAgent, proc: ReportProgress, clientData: tR];
WalnutDB.EnumerateUnacceptedMsgs[opsH, activeVersion, PassToFilterAgent];
Wakeup agent to filter messages and wait until it is finished.
TapFilter.WakeupAgent[filteringAgent];
[] ¬ TapFilter.IsAgentIdle[agent: filteringAgent, wait: TRUE];
CheckReport[opsH, " done.\n"];
TapFilter.MonitorAgent[agent: filteringAgent, proc: NIL, clientData: NIL];
};
tR: TapRef;
CarefullyApply[opsH, Fm, FALSE];
RETURN[IF tR # NIL THEN tR.annotations ELSE 0];
};
DoReadArchiveFile:
PUBLIC
PROC[opsH: WalnutOpsHandle, file:
ROPE, msgSet: MsgSet]
RETURNS[numNew:
INT] = {
Write a "readArchiveFile" log entry; if the archiveFile exists, parses it and writes the appropriate entries (new msgs and moves if msgSet is not "Active" (NIL defaults to categories specified in the file). Then replays the log. If the file couldn't be read, numNew = -1;
ENABLE UNWIND => NULL;
ok: BOOL ¬ FALSE;
fStream: STREAM;
reason: ROPE ¬ NIL;
Raf:
PROC = {
at: INT;
IF msgSet.name # NIL THEN [] ¬ WalnutDB.VerifyMsgSet[opsH, msgSet];
at ¬ WalnutLog.StartReadArchiveFile[opsH, file, msgSet.name].at;
WalnutDB.SetReadArchivePos[opsH, at];
};
Raf2:
PROC = {
[] ¬ WalnutLog.EndReadArchiveFile[opsH];
WalnutDB.SetReadArchivePos[opsH, 0];
};
Caf:
PROC[inProgress:
BOOL] = {
fromPos: INT ¬ 0;
IF ~inProgress
THEN {
at: INT;
ok ¬ WalnutLog.PrepareToCopyTempLog[ opsH: opsH, which: readArchive, pagesAlreadyCopied: 0, reportProc: WalnutOpsInternal.CheckReport];
IF ~ok THEN RETURN;
at ¬ WalnutLog.StartCopyReadArchive[opsH].at;
WalnutDB.SetCopyReadArchivePos[opsH, at];
WalnutDB.SetOpInProgressPos[opsH, at];
WalnutRoot.CommitAndContinue[opsH];
}
ELSE {
-- calculate fromPos
logLen: INT ¬ WalnutLog.LogLength[opsH];
startedCopyAt, startCopyPos: INT;
startCopyPos ¬ WalnutDB.GetCopyReadArchivePos[opsH];
IF WalnutLog.SetPosition[opsH, startCopyPos] # 0
THEN
ERROR WalnutDefs.Error[$log, $BadLog,
IO.PutFR1["no entry at %g",
[integer[startCopyPos]]] ];
[] ¬ WalnutLog.NextEntry[opsH]; -- skip the copy entry
startedCopyAt ¬ WalnutLog.NextAt[opsH];
fromPos ¬ logLen - startedCopyAt;
};
CheckReport[opsH,
"Copying the ReadArchiveTempLog, starting at bytePos %g\n", [integer[fromPos]]];
WalnutLog.CopyTempLog[opsH,
readArchive, WalnutDB.GetCopyReadArchivePos[opsH], fromPos, CheckReport];
WalnutDB.SetParseLogInProgress[opsH, TRUE];
WalnutDB.SetParseLogPos[opsH, WalnutDB.GetOpInProgressPos[opsH]];
WalnutDB.SetOpInProgressPos[opsH, -1];
};
CheckInProgress[opsH];
fStream ¬ WalnutStream.Open[name: file, readOnly:
TRUE !
FS.Error =>
{ CheckReport[opsH, error.explanation]; fStream ¬ NIL; CONTINUE} ].strm;
IF fStream = NIL THEN RETURN[-1];
CarefullyApply[opsH, Raf, TRUE];
BEGIN
ENABLE
BEGIN
FS.Error => { reason ¬ error.explanation; GOTO exit };
IO.Error => {
<<reason ¬ FS.ErrorFromStream[stream].explanation;>>
IF reason = NIL THEN reason ¬ "IO Error creating readArchiveLog";
GOTO exit
};
END;
[ok, reason] ¬
WalnutMiscLog.CreateReadArchiveLog[opsH, fStream, msgSet.name, CheckReport];
EXITS
exit => ok ¬ FALSE;
END;
fStream.Close[ ! IO.Error, FS.Error => CONTINUE];
IF ~ok
THEN {
CheckReport[opsH, " Archive Read of %g failed\n", [rope[file]] ];
IF reason # NIL THEN CheckReport[opsH, " Error reported as: %g\n", [rope[reason]]];
RETURN[-1];
}
ELSE Raf2[];
LongRunningApply[opsH, Caf];
IF ~ok
THEN {
CheckReport[opsH, "Out of space trying to copy readArchiveLog for file %g\n", [rope[file]] ];
RETURN[-1];
};
CheckReport[opsH, "Adding messages to database\n"];
numNew ¬ ParseLog[opsH, FALSE];
};
nativeWalnutItemForm: ROPE = "@%05d 00525 %05d\n"; -- 20 chars, tioga formatting
crWalnutItemForm: ROPE = "@%05d 00525 %05d\r"; -- 20 chars, tioga formatting
nativeStartHeaderForm: ROPE = "*start*\n%05d %05d US \n";
crStartHeaderForm:
ROPE = "*start*\r%05d %05d US \r";
DoWriteArchiveFile:
PUBLIC
PROC[
opsH: WalnutOpsHandle, file:
ROPE, msgSetList:
LIST
OF MsgSet, append:
BOOL]
RETURNS[ok: BOOL]= {
Write an archive file that contains the messages from the given message sets. No log entry is written and no updates are made to the database (we just hold the monitor to guarantee that no changes to the message sets occur).
wStream: STREAM;
someMsgWasTooBig: BOOL ¬ FALSE;
thisMsgSet, exp: ROPE;
startHeaderFixedLen: INT = 24;
useCRForArchive: BOOL ¬ UserProfile.Boolean["Walnut.useCRForArchiveFile", FALSE];
walnutItemForm: ROPE ~ IF useCRForArchive THEN crWalnutItemForm ELSE nativeWalnutItemForm;
startHeaderForm: ROPE ~ IF useCRForArchive THEN crStartHeaderForm ELSE nativeStartHeaderForm;
first: BOOL ¬ TRUE;
WriteProc:
PROC[msg, tocEntry:
ROPE, hasBeenRead:
BOOL, startOfSubject:
INT]
RETURNS[continue:
BOOL] = {
textStart, textLen, formatLen, prefixLen: INT;
length, walnutItemLen: INT ¬ 0;
walnutItem: ROPE;
continue ¬ TRUE;
[textStart, textLen, formatLen, , ] ¬ WalnutDB.GetMsgText[opsH, msg];
walnutItem ¬ IF useCRForArchive THEN Rope.Cat[msg, "\r", thisMsgSet, "\r"]
ELSE Rope.Cat[msg, "\n", thisMsgSet, "\n"];
walnutItemLen ¬
WalnutMiscLog.walnutItemFixedLength + walnutItem.Length[] + formatLen;
IF formatLen # 0 THEN walnutItemLen ¬ walnutItemLen + 1; -- for extra CR after formatting
prefixLen ¬ startHeaderFixedLen + walnutItemLen;
length ¬ prefixLen + textLen + 1; -- extra CR after text
IF length > 99999
THEN {
CheckReport[opsH, "\nLength of msg %g is too big (%g bytes) - skipping",
[rope[msg]], [integer[length]] ];
someMsgWasTooBig ¬ TRUE;
RETURN
};
-- the -2 below are because the bytecount within the prefix item does not include the surrounding @'s
wStream.PutF[ startHeaderForm, [integer[length]], [integer[prefixLen]] ];
wStream.PutF[walnutItemForm, [integer[walnutItemLen-2]], [integer[formatLen]] ];
wStream.PutRope[walnutItem];
IF formatLen # 0
THEN {
WalnutLog.CopyBytesToArchive[opsH, wStream, textStart+textLen, formatLen];
wStream.PutChar[IF useCRForArchive THEN '\r ELSE '\n];
};
wStream.PutChar['@];
WalnutLog.CopyBytesToArchive[opsH, wStream, textStart, textLen];
wStream.PutChar[IF useCRForArchive THEN '\r ELSE '\n];
};
ok ¬ FALSE;
CheckInProgress[opsH];
BEGIN
ENABLE
BEGIN
LoganBerry.Error => {
IF debugging THEN SIGNAL DBErrorSeen;
IF wStream # NIL THEN wStream.Close[ ! IO.Error, FS.Error => CONTINUE];
ERROR WalnutDefs.Error[$db, ec, IO.PutFR1["DB.Error with explanation: %g", [rope[explanation]] ] ];
};
WalnutDefs.Error => {
IF debugging THEN SIGNAL WDErrorSeen;
IF wStream # NIL THEN wStream.Close[ ! IO.Error, FS.Error => CONTINUE];
REJECT;
};
WalnutDefs.VersionMismatch => {
IF wStream # NIL THEN wStream.Close[ ! IO.Error, FS.Error => CONTINUE];
REJECT;
};
<<IO.Error => { CheckReport[opsH, exp ¬ FS.ErrorFromStream[stream].explanation]; GOTO err };>>
IO.Error => { CheckReport[opsH, exp ¬ "IO Error"]; GOTO err };
FS.Error => { CheckReport[opsH, exp ¬ error.explanation]; GOTO err };
END;
BEGIN
wStream ¬ WalnutStream.Open[
name: file, useOldIfFound: append, exclusive: TRUE ! FS.Error =>
{ CheckReport[opsH, error.explanation]; GOTO none }].strm;
EXITS
none => wStream ¬ NIL;
END;
IF wStream =
NIL
THEN {
CheckReport[opsH, "\nCould not open %g\n", [rope[file]] ];
RETURN
};
IF append
THEN wStream.SetIndex[wStream.GetLength[]]
ELSE {
wStream.SetIndex[0];
wStream.SetLength[0];
};
CheckReport[opsH, "\n Archiving: "];
FOR mL:
LIST
OF WalnutDefs.MsgSet ¬ msgSetList, mL.rest
UNTIL mL=
NIL
DO
thisMsgSet ¬ mL.first.name;
IF ~WalnutDB.VerifyMsgSet[opsH, mL.first]
THEN {
CheckReport[opsH, "\n MsgSet %g doesn't exist - continuing", [rope[thisMsgSet]] ];
LOOP;
};
IF first THEN first ¬ FALSE ELSE CheckReport[opsH, ", "];
CheckReport[opsH, thisMsgSet];
[] ¬ WalnutDB.EnumerateMsgsInSet[opsH, thisMsgSet, TRUE, WriteProc];
ENDLOOP;
EXITS
err => {
wStream.Close[];
ERROR WalnutDefs.Error[$log, $ErrorDuringWriteArchive, exp];
};
END;
wStream.Close[];
ok ¬ TRUE;
CheckReport[opsH, "\tFinished writing archive file\n"];
};