WalnutLogImpl.mesa
Copyright © 1984 by Xerox Corporation. All rights reserved.
Willie-Sue, June 18, 1986 5:10:01 pm PDT
Donahue, January 22, 1986 8:04:31 am PST
Walnut Log Operations Implementation
Last Edited by: Willie-Sue, January 10, 1985 12:27:10 pm PST
Last Edited by: Wert, August 31, 1984 9:49:21 pm PDT
Last Edited by: Donahue, December 11, 1984 10:37:45 am PST
DIRECTORY
AlpFile USING [AccessFailed],
AlpineFS USING [ErrorFromStream, OpenFile, OpenFileFromStream],
BasicTime USING [GMT, nullGMT, Now, ToPupTime],
FS USING [Error, ErrorDesc, OpenFile,
 BytesForPages, ErrorFromStream, GetInfo, PagesForBytes,
 SetByteCountAndCreatedTime, SetPageCount],
GVBasics USING [Timestamp],
IO,
RefText USING [line, AppendRope, Equal, Length, ObtainScratch, ReleaseScratch,
  TrustTextAsRope],
Rope,
RuntimeError USING [BoundsFault],
ViewerTools USING [TiogaContentsRec, TiogaContents],
WalnutDefs USING [Error],
WalnutKernelDefs USING [LogEntry, MsgLogEntry, WhichTempLog],
WalnutLog USING [],
WalnutLogExpunge USING [EntryStatus],
WalnutMiscLog USING [TiogaControlItemType],
WalnutRoot -- using lots -- ,
WalnutSendOps USING [simpleUserName, userRName, TiogaTextFromStrm, RopeFromStream],
WalnutStream -- using lots -- ;
WalnutLogImpl: CEDAR PROGRAM
IMPORTS
AlpFile, AlpineFS, BasicTime, FS, IO, RefText, Rope, RuntimeError,
WalnutDefs, WalnutRoot, WalnutSendOps, WalnutStream
EXPORTS
WalnutLog, WalnutLogExpunge, WalnutMiscLog
= BEGIN
Plan
The log streams provide an abstraction for WalnutOps use in handling the log. WalnutOps procedures deal only with the notion of writing an entry on the current log and reading the entry from a specified position. The log position for a write is always at the end of the current log.
Types
GMT: TYPE = BasicTime.GMT;
ROPE: TYPE = Rope.ROPE;
STREAM: TYPE = IO.STREAM;
LogEntry: TYPE = WalnutKernelDefs.LogEntry;
MsgLogEntry: TYPE = WalnutKernelDefs.MsgLogEntry;
TiogaContents: TYPE = ViewerTools.TiogaContents;
Signals
WalnutDefs.Error: ERROR[who, code: ATOM, explanation: ROPENIL] = CODE;
Raised for a variety of reasons: the current set of codes used by WalnutLogImpl include
$NotScanning
$InvalidOperation
$UnknownLogFileType
$AccessFailed
$LogInaccessible
$TransactionAbort
Variables for WalnutLog
-- the read and write (and temp if open) streams are all under the same transaction; WalnutLogImpl must get them from WalnutRoot
writeStream: STREAMNIL;
tempStream: STREAMNIL;   -- when copying a tempFile
readStream: STREAMNIL;   -- only one log for now
scanning: BOOLFALSE;
currentEntryLengthHint: INT ← -1;
currentEntryPos: INT ← -1;
field1: REF TEXT = NEW[TEXT[RefText.line]];
field2: REF TEXT = NEW[TEXT[RefText.line]];
field3: REF TEXT = NEW[TEXT[RefText.line]];
Variables for WalnutMiscLog
newMailStream: STREAMNIL;
readArchiveStream: STREAMNIL;
msgIDRope: ROPE = "gvMsgID";
categoriesRope: ROPE = "Categories";
activeMsgSet: REF TEXT = "Active";
newMsgSetList: LIST OF REF TEXT;
generalBuffer: REF TEXT;
msgIDBuffer: REF TEXT;
Variables for WalnutLogExpunge
currentStream: STREAM;
expCurrentEntryLengthHint: INT;
expCurrentEntryPos: INT;
expungeStream: STREAM;
keyIs: ROPE;
expungeInternalFileID: INT;
logSeqNo: INT;
Procedures
Logging of operations that perform actions on the Walnut database
ExpungeMsgs: PUBLIC PROC RETURNS[at, next: INT] =
{ [at, next] ← WriteEntry[WalnutStream.expungeMsgs] };
WriteExpungeLog: PUBLIC PROC RETURNS[at, next: INT] = {
WalnutStream.writeExpungeLog.internalFileID ← WalnutRoot.GetExpungeID[];
[at, next] ← WriteEntry[WalnutStream.writeExpungeLog];
};
CreateMsgSet: PUBLIC PROC[name: ROPE] RETURNS[at, next: INT] = {
field1.length ← 0;
WalnutStream.createMsgSet.msgSet ← RefText.AppendRope[field1, name];
[at, next] ← WriteEntry[WalnutStream.createMsgSet]
};
EmptyMsgSet: PUBLIC PROC[msgSet: ROPE] RETURNS[at, next: INT] = {
field1.length ← 0;
WalnutStream.emptyMsgSet.msgSet ← RefText.AppendRope[field1, msgSet];
[at, next] ← WriteEntry[WalnutStream.emptyMsgSet]
};
DestroyMsgSet: PUBLIC PROC[msgSet: ROPE]
RETURNS[at, next: INT] = {
field1.length ← 0;
WalnutStream.destroyMsgSet.msgSet ← RefText.AppendRope[field1, msgSet];
[at, next] ← WriteEntry[WalnutStream.destroyMsgSet]
};
AddMsg: PUBLIC PROC[msg: ROPE, to: ROPE] RETURNS[at, next: INT] = {
field1.length ← 0;
WalnutStream.addMsg.msg ← RefText.AppendRope[field1, msg];
field2.length ← 0;
WalnutStream.addMsg.to ← RefText.AppendRope[field2, to];
[at, next] ← WriteEntry[WalnutStream.addMsg]
};
RemoveMsg: PUBLIC PROC[msg: ROPE, from: ROPE]
RETURNS[at, next: INT] = {
field1.length ← 0;
WalnutStream.removeMsg.msg ← RefText.AppendRope[field1, msg];
field2.length ← 0;
WalnutStream.removeMsg.from ← RefText.AppendRope[field2, from];
[at, next] ← WriteEntry[WalnutStream.removeMsg]
};
MoveMsg: PUBLIC PROC[msg: ROPE, from, to: ROPE] RETURNS[at, next: INT] = {
field1.length ← 0;
WalnutStream.moveMsg.msg ← RefText.AppendRope[field1, msg];
field2.length ← 0;
WalnutStream.moveMsg.from ← RefText.AppendRope[field2, from];
field3.length ← 0;
WalnutStream.moveMsg.to ← RefText.AppendRope[field3, to];
[at, next] ← WriteEntry[WalnutStream.moveMsg]
};
HasBeenRead: PUBLIC PROC[msg: ROPE] RETURNS[at, next: INT] = {
field1.length ← 0;
WalnutStream.hasbeenRead.msg ← RefText.AppendRope[field1, msg];
[at, next] ← WriteEntry[WalnutStream.hasbeenRead]
};
RecordNewMailInfo: PUBLIC PROC[logLen: INT, when: GMT, server: ROPE, num: INT]
RETURNS[at, next: INT] = {
WalnutStream.recordNewMailInfo.logLen ← logLen;
WalnutStream.recordNewMailInfo.when ← when;
field1.length ← 0;
WalnutStream.recordNewMailInfo.server ← RefText.AppendRope[field1, server];
WalnutStream.recordNewMailInfo.num ← num;
[at, next] ← WriteEntry[WalnutStream.recordNewMailInfo]
};
StartCopyNewMail: PUBLIC PROC RETURNS[at, next: INT] =
{ [at, next] ← WriteEntry[WalnutStream.startCopyNewMail] };
AcceptNewMail: PUBLIC PROC RETURNS[at, next: INT] =
{ [at, next] ← WriteEntry[WalnutStream.acceptNewMail] };
StartReadArchiveFile: PUBLIC PROC[file: ROPE, msgSet: ROPE] RETURNS[at, next: INT] = {
field1.length ← 0;
WalnutStream.startReadArchiveFile.file ← RefText.AppendRope[field1, file];
field2.length ← 0;
WalnutStream.startReadArchiveFile.msgSet ← RefText.AppendRope[field2, msgSet];
[at, next] ← WriteEntry[WalnutStream.startReadArchiveFile]
};
EndReadArchiveFile: PUBLIC PROC RETURNS[at, next: INT] =
{ [at, next] ← WriteEntry[WalnutStream.endReadArchiveFile] };
StartCopyReadArchive: PUBLIC PROC[] RETURNS[at, next: INT] =
{ [at, next] ← WriteEntry[WalnutStream.startCopyReadArchive] };
Managing the current log
AcquireWriteLock: PUBLIC PROC = {
Awl: PROC =
{ WalnutRoot.AcquireWriteLock[] };
CarefullyApply[Awl, "AcquireWriteLock"];
};
ForgetLogStreams: PUBLIC PROC =
{ readStream ← writeStream ← NIL };
CloseLogStreams: PUBLIC PROC = {
WalnutRoot.CloseTransaction[];
readStream ← writeStream ← NIL;
};
LogLength: PUBLIC PROC RETURNS[length: INT] = {
Llen: PROC =
{ lengthwriteStream.GetLength[] };
CarefullyApply[Llen, "LogLength"];
};
OpenLogStreams: PUBLIC PROC = {
IF writeStream = NIL THEN {
[readStream, writeStream] ← WalnutRoot.GetCurrentLogStreams[];
IF scanning THEN readStream.SetIndex[currentEntryPos];
};
};
ReleaseWriteLock: PUBLIC PROC = {
IF writeStream = NIL THEN RETURN;
[readStream, writeStream] ← WalnutRoot.ReleaseWriteLock[];
IF scanning THEN readStream.SetIndex[currentEntryPos];
};
ResetLog: PUBLIC PROC[newLength: INT] = {
Rl: PROC = {
writeStream.SetIndex[newLength];
WalnutStream.SetHighWaterMark[writeStream, newLength, -1];
writeStream.SetIndex[newLength];
writeStream.Flush[];
};
CarefullyApply[Rl, "ResetLog"];
};
ReturnCurrentLogStreams: PUBLIC PROC = {
readStream ← writeStream ← NIL;
scanning ← FALSE;
};
ShutdownLog: PUBLIC PROC = {
WalnutRoot.CloseTransaction[];
readStream ← writeStream ← NIL;
scanning ← FALSE;
};
Write a message on the current log
WriteMessage: PUBLIC PROC[msg: ROPE, body: TiogaContents] RETURNS[at: INT]= {
WriteMsg: PROC[] = {
writeStream.SetIndex[writeStream.GetLength[]];
field1.length ← 0;
WalnutStream.createMsg.msg ← RefText.AppendRope[field1, msg];
IF body.formatting.Length[] # 0 THEN {
last: INT ← body.contents.Length[] - 1;
IF body.contents.Fetch[last] = '\000 THEN  -- NULL for padding
{ body.contents ← Rope.Substr[body.contents, 1, last-1];
body.formatting ← Rope.Concat["\000", body.formatting]
}
ELSE body.contents ← Rope.Substr[body.contents, 1];
};
WalnutStream.createMsg.textLen ← body.contents.Length[];
WalnutStream.createMsg.formatLen ← body.formatting.Length[];
at ← WalnutStream.WriteEntry[writeStream, WalnutStream.createMsg];
WalnutStream.WriteMsgBody[writeStream, body];
WalnutStream.FlushStream[strm: writeStream, setCreateDate: TRUE];
};
CarefullyApply[WriteMsg, "WriteMessage"];
};
Parsing a log (used by Archive and Replay/Scavenge
SetPosition: PUBLIC PROC[startPos: INT] RETURNS[charsSkipped: INT] = {
next: INT;
SetPs: PROC[] = {
readStream.SetIndex[startPos];
next ← WalnutStream.FindNextEntry[readStream];
currentEntryLengthHint ← -1;
IF next # -1 THEN currentEntryPos ← next;
scanning ← TRUE;
};
CarefullyApply[SetPs, "SetPosition"];
RETURN[IF next = -1 THEN next ELSE next - startPos];
};
SetIndex: PUBLIC PROC[pos: INT] = {
Si: PROC[] = {
readStream.SetIndex[pos];
currentEntryLengthHint ← -1;
currentEntryPos ← pos;
scanning ← TRUE;
};
CarefullyApply[Si, "SetIndex"];
};
NextAt: PUBLIC PROC RETURNS[at: INT] =
{ at ← currentEntryPos };
NextEntry: PUBLIC PROC RETURNS[le: LogEntry, at: INT] =
{ [le, at] ← NextEntryInternal[FALSE, "NextEntry"] };
NextEntryInternal: PROC[quick: BOOL, who: ROPE] RETURNS[le: LogEntry, at: INT] = {
Ne: PROC[] = {
length: INT;
[le, length] ← WalnutStream.ReadEntry[readStream, quick];
at ← currentEntryPos;
currentEntryLengthHint ← -1;
IF length = -1 THEN {
IF at = readStream.GetLength[] THEN RETURN;
at ← -1; RETURN
};
currentEntryPos ← at + length;
};
IF ~scanning THEN ERROR WalnutDefs.Error[$log, $NotScanning];
CarefullyApply[Ne, who];
};
QuickScan: PUBLIC PROC RETURNS[le: LogEntry, at: INT] =
{ [le, at] ← NextEntryInternal[quick: TRUE, who: "QuickScan"] };
ArchiveEntry: PUBLIC PROC[to: IO.STREAM] RETURNS[ok: BOOL]= {
Copies the next entry of the current log to an archive stream.
Ane: PROC[] = {
ok ← FALSE;
IF currentEntryLengthHint = -1 THEN
currentEntryLengthHint ←
WalnutStream.PeekEntry[readStream].length;
IF currentEntryLengthHint = -1 THEN RETURN;
WalnutStream.CopyBytes[
from: readStream, to: to, num: currentEntryLengthHint];
currentEntryPos ← readStream.GetIndex[];
currentEntryLengthHint ← -1;  -- unknown
ok ← TRUE;
};
IF ~scanning THEN ERROR WalnutDefs.Error[$log, $NotScanning];
CarefullyApply[Ane, "ArchiveEntry"];
};
CopyBytesToArchive: PUBLIC PROC[to: IO.STREAM, startPos, num: INT] = {
Cba: PROC = {
readStream.SetIndex[startPos];
WalnutStream.CopyBytes[from: readStream, to: to, num: num];
scanning ← FALSE;
};
CarefullyApply[Cba, "CopyBytesToArchive"];
};
Copying NewMail/ReadArchive logs
PrepareToCopyTempLog: PUBLIC PROC[which: WalnutKernelDefs.WhichTempLog, pagesAlreadyCopied: INT, reportProc: PROC[msg1, msg2, msg3: ROPENIL]] RETURNS[BOOL] = {
makes sure the writeStream is long enought to copy the which log onto
[writeStream, tempStream] ← WalnutRoot.PrepareToCopyTempLog[which, pagesAlreadyCopied, reportProc];
RETURN[tempStream # NIL];
};
CopyTempLog: PUBLIC PROC[which: WalnutKernelDefs.WhichTempLog, startCopyPos, fromPos: INT, reportProc: PROC[msg1, msg2, msg3: ROPENIL] ] = {
copies the which templog, starting at fromPos, to the end of the writeStream; writes an EndCopy log entry, and sets the length of which to 0; if the operation fails, WalnutDefs.Error is raised; NOT done inside carefullyApply, so that it can catch transaction aborts
exp: ROPE;
endLE: LogEntry;
abort: BOOLTRUE;
ok: BOOLFALSE;
bytesToCopy: INT;
bytesPerCopy: INT = FS.BytesForPages[200];
BEGIN ENABLE BEGIN
IO.Error => {
why: ATOM = AlpineFS.ErrorFromStream[stream].code;
IF why = $transAborted THEN GOTO exit;
IF why = $remoteCallFailed THEN {
exp ← AlpineFS.ErrorFromStream[stream].explanation;
GOTO exit;
};
REJECT;
};
FS.Error => {
IF error.code = $transAborted THEN GOTO exit;
IF error.code = $remoteCallFailed THEN {
exp ← error.explanation;
GOTO exit;
};
REJECT;
};
AlpFile.AccessFailed => { abort ← FALSE; GOTO exit };
END;
SELECT which FROM
newMail => {
WalnutStream.endCopyNewMailInfo.startCopyPos ← startCopyPos;
endLE ← WalnutStream.endCopyNewMailInfo;
};
readArchive => {
WalnutStream.endCopyReadArchiveInfo.startCopyPos ← startCopyPos;
endLE ← WalnutStream.endCopyReadArchiveInfo
};
ENDCASE => ERROR WalnutDefs.Error[$log, $UnknownLogFileType];
IF tempStream = NIL THEN
[writeStream, tempStream] ← WalnutRoot.GetStreamsForCopy[which];
bytesToCopy ← tempStream.GetLength[] - fromPos;
IF bytesToCopy > 0 THEN {
first: BOOLTRUE;
tempStream.SetIndex[fromPos];
writeStream.SetIndex[writeStream.GetLength[]];
DO
thisCopy: INTMIN[bytesPerCopy, bytesToCopy];
WalnutStream.CopyBytes[to: writeStream, from: tempStream, num: thisCopy];
WalnutStream.FlushStream[writeStream, TRUE];
WalnutRoot.CommitAndContinue[];
IF first THEN first ← FALSE ELSE reportProc["#"];
bytesToCopy ← bytesToCopy - thisCopy;
IF bytesToCopy = 0 THEN EXIT;
ENDLOOP;
};
[] ← WalnutStream.WriteEntry[writeStream, endLE];
WalnutStream.FlushStream[writeStream, TRUE];
EXITS
exit => {
who: ROPEIF which = newMail THEN "NewMail" ELSE "ReadArchive";
IF ~abort THEN ERROR WalnutDefs.Error[$log, $AccessFailed, who];
WalnutRoot.AbortTempCopy[which, tempStream];
WalnutRoot.CloseTransaction[];
writeStream ← readStream ← tempStream ← NIL;
ERROR WalnutDefs.Error[
$log, IF exp = NIL THEN $TransactionAbort ELSE $RemoteCallFailed,
IO.PutFR["Copying the %g log", IO.rope[who]] ];
};
END;
tempStream ← NIL;
WalnutRoot.FinishCopy[which];
};
FinishTempLogCopy: PUBLIC PROC[which: WalnutKernelDefs.WhichTempLog] =
{ WalnutRoot.FinishCopy[which] };
CreateArchiveLog: PUBLIC PROC[
fileToRead: STREAM, msgSet: ROPE, reportProc: PROC[msg1, msg2, msg3: ROPENIL]]
RETURNS[ok: BOOL] = {
reason: ROPE;
[ok, reason] ← CreateReadArchiveLog[fileToRead, msgSet, reportProc];
IF ok THEN [] ← WriteEntry[WalnutStream.endReadArchiveFile];
CloseReadArchiveLog[];
};
Utilities
GetTiogaContents: PUBLIC PROC[textStart, textLen, formatLen: INT]
RETURNS[contents: TiogaContents] = {
TContents: PROC[] = {
readStream.SetIndex[textStart];
scanning ← FALSE;
contents ← NEW[ViewerTools.TiogaContentsRec];
contents.contents ← WalnutStream.ReadRope[readStream, textLen];
contents.formatting ← WalnutStream.ReadRope[readStream, formatLen];
};
IF formatLen # 0 THEN  -- tioga formatting nonsense
{ textLen ← textLen + 1; textStart ← textStart - 1};
CarefullyApply[TContents, "GetTiogaContents"];
};
GetRefTextFromLog: PUBLIC PROC[startPos, length: INT, text: REF TEXT] = {
natLen: NAT;
Grf: PROC = {
bytesRead: NAT;
readStream.SetIndex[startPos];
scanning ← FALSE;
bytesRead ← readStream.GetBlock[block: text, startIndex: 0, count: natLen];
IF bytesRead < natLen THEN text.length ← 0;
};
CheckSize: PROC[len: INT] RETURNS[nat: NAT] = { nat ← len };
natLen ← CheckSize[length ! RuntimeError.BoundsFault => GOTO noGood];
CarefullyApply[Grf, "GetRefTextFromLog"];
EXITS
noGood =>
ERROR WalnutDefs.Error[$log, $BadLength,
IO.PutFR[" Can't convert %g into a nat", IO.int[length] ]];
};
Local Utilities
WriteEntry: PROC[le: LogEntry] RETURNS[at, next: INT] = {
We: PROC = {
at ← WalnutStream.WriteEntry[writeStream, le];
WalnutStream.FlushStream[writeStream, TRUE];
next ← writeStream.GetIndex[];
};
CarefullyApply[We, "WriteEntry"];
};
curLog: ROPE = "On currentLog, during ";
CarefullyApply: PROC[ proc: PROC[], who: ROPE ] = {
exp: ROPE;
BEGIN ENABLE BEGIN
IO.Error => {
why: ATOM = AlpineFS.ErrorFromStream[stream].code;
IF why = $transAborted THEN GOTO aborted;
IF why = $remoteCallFailed THEN {
exp ← AlpineFS.ErrorFromStream[stream].explanation;
GOTO failed;
};
REJECT;
};
FS.Error => {
IF error.code = $transAborted THEN GOTO aborted;
IF error.code = $remoteCallFailed THEN {
exp ← error.explanation;
GOTO failed;
};
REJECT;
};
AlpFile.AccessFailed =>
IF missingAccess = spaceQuota THEN GOTO outOfSpace ELSE GOTO exit;
END;
IF readStream = NIL THEN  -- test read, not write stream, in case readOnly
ERROR WalnutDefs.Error[$log, $LogNotOpen, curLog.Concat[who]];
proc[];
EXITS
aborted => {
WalnutRoot.StatsReport[Rope.Concat["\n**** trans abort on currentLog doing", who]];
ERROR WalnutDefs.Error[$log, $TransactionAbort, curLog.Concat[who]];
};
failed => {
WalnutRoot.StatsReport[Rope.Concat["\n**** remote call failed on currentLog doing", who]];
ERROR WalnutDefs.Error[$log, $RemoteCallFailed, exp];
};
outOfSpace =>
ERROR WalnutDefs.Error[$log, $OutOfSpace, curLog.Concat[who]];
exit =>
ERROR WalnutDefs.Error[$log, $OtherAlpineMissingAccess, curLog.Concat[who]];
END;
};
Writing and Reading NewMail/ReadArchive files
GetNewMailLog: PUBLIC PROC[lengthRequired: INT, pagesWanted: INT]
RETURNS[STREAM] = {
opens the NewMailLog (aborting any current stream that might be open); truncates it to the required length; returns NIL if the log is shorter than required, or if the file could not be opened
BEGIN ENABLE IO.Error, FS.Error, AlpFile.AccessFailed =>
IF newMailStream # NIL THEN {
AbortMailLog[];
GOTO exit;
};
IF newMailStream # NIL THEN AbortMailLog[];
IF (newMailStream ←
WalnutRoot.GetNewMailStream[lengthRequired, pagesWanted].strm) = NIL THEN RETURN[NIL];
EXITS
exit => IF newMailStream # NIL THEN AbortMailLog[];
END;
RETURN[newMailStream];
};
CloseNewMailLog: PUBLIC PROC = {
strm: STREAM ← newMailStream;
IF strm = NIL THEN RETURN;
newMailStream ← NIL;
WalnutRoot.ReturnNewMailStream[strm ! IO.Error, FS.Error => CONTINUE];
};
AbortMailLog: PROC = {
IF newMailStream = NIL THEN RETURN;
WalnutStream.AbortStream[newMailStream ! IO.Error, FS.Error => CONTINUE];
CloseNewMailLog[];
};
CreateReadArchiveLog: PUBLIC PROC[
fileToRead: STREAM, msgSet: ROPE, reportProc: PROC[msg1, msg2, msg3: ROPENIL]]
RETURNS[ok: BOOL, reason: ROPE] = {
reads file and writes the ReadArchiveLogFile; returns ok if all went well
lastCommitPosInFileToRead: INT;
fileToReadPages: INTFS.PagesForBytes[fileToRead.GetLength[]];
possibleArchiveLength: INTMAX[20, fileToReadPages + (fileToReadPages/5)]; -- 120%
ok ← FALSE;
reason ← NIL;
BEGIN ENABLE BEGIN
IO.Error => {
reason ← FS.ErrorFromStream[fileToRead].explanation;
IF reason = NIL THEN reason ← FS.ErrorFromStream[readArchiveStream].explanation;
IF reason = NIL THEN reason ← "IO error during createReadArchiveLog";
GOTO exit;
};
FS.Error => { reason ← error.explanation; GOTO exit };
AlpFile.AccessFailed => { reason ← "AlpFile.AccessFailed"; GOTO exit};
UNWIND => GOTO exit;
END;
IF (readArchiveStream ← WalnutRoot.GetReadArchiveStream[possibleArchiveLength]) = NIL THEN
RETURN[FALSE, "Can't get readArchiveStream"];
readArchiveStream.SetIndex[0];
WalnutStream.SetHighWaterMark[strm: readArchiveStream, hwmBytes: 0, numPages: -1];
WalnutRoot.CommitAndContinue[];
[ok, lastCommitPosInFileToRead] ← ArchiveReader[
archiveStream: readArchiveStream, fileToRead: fileToRead,
msgSet: msgSet, reportProc: reportProc, posToStartInFileToRead: 0];
IF ~ok THEN CloseReadArchiveLog[];
EXITS
exit =>
IF readArchiveStream # NIL THEN {
WalnutRoot.ReturnReadArchiveStream[readArchiveStream];
readArchiveStream ← NIL;
};
END;
};
CloseReadArchiveLog: PUBLIC PROC = {
WalnutRoot.ReturnReadArchiveStream[readArchiveStream];
readArchiveStream ← NIL;
};
MiscShutdown: PUBLIC PROC =
{ newMailStream ← readArchiveStream ← NIL };
********************************************************
ArchiveFileType: TYPE = {unknown, old, new};
ArchiveReader: PUBLIC PROC[archiveStream, fileToRead: STREAM, msgSet: ROPE,
reportProc: PROC[msg1, msg2, msg3: Rope.ROPENIL],
posToStartInFileToRead: INT]
RETURNS [ok: BOOL, lastCommitPosInFileToRead: INT] = {
num: INT ← 0;
BadFormat: PROC[s: ROPE] = {
IF reportProc = NIL THEN RETURN;
reportProc[
IO.PutFR["Bad log file format at %g: %g\n", IO.int[fileToRead.GetIndex[]], IO.rope[s]]];
};
BEGIN ENABLE BEGIN
ABORTED => { ok ← FALSE; GOTO GiveUp };
FS.Error => {
IF reportProc # NIL THEN reportProc[error.explanation];
ok ← FALSE; GOTO GiveUp
};
IO.Error => {
IF reportProc # NIL THEN {
reason: ROPEFS.ErrorFromStream[fileToRead].explanation;
IF reason # NIL THEN
reason ← IO.PutFR["IOError on file being read, reported as %g, at pos %g",
IO.rope[reason], IO.int[fileToRead.GetIndex[]] ]
ELSE reason ← FS.ErrorFromStream[archiveStream].explanation;
IF reason # NIL THEN
reason ←
IO.PutFR["IOError on tempLog being written, reported as %g, at pos %g",
IO.rope[reason], IO.int[archiveStream.GetIndex[]] ]
ELSE reason ←
IO.PutFR["IOError at writePos %g",IO.int[archiveStream.GetIndex[]] ];
reportProc["\n", reason, "\n\n"];
};
ok ← FALSE; GOTO GiveUp
};
IO.EndOfStream => {BadFormat["Unexpected EOF"]; ok ← FALSE; GOTO GiveUp};
END;
archiveFileType: ArchiveFileType ← unknown;
entryLength, prefixLength, beginMsgPos: INT;
entryChar: CHAR;
msgSetRT: REF TEXT ← msgSet.ToRefText[];
bytesWritten: INT ← 0;
bytesToWriteBeforeCommit: INT = 100000;  -- try this
lengthOfFileToRead: INT ← fileToRead.GetLength[];
startPosOnArchive: INT;
newMsgSetList ← NIL;  -- global for each file read
ok ← TRUE;
generalBuffer ← RefText.ObtainScratch[RefText.line];
msgIDBuffer ← RefText.ObtainScratch[RefText.line];
lastCommitPosInFileToRead ← posToStartInFileToRead;
fileToRead.SetIndex[posToStartInFileToRead];
DO
Read *start* from log
[beginMsgPos, prefixLength, entryLength, entryChar] ← FindStartOfEntry[fileToRead];
IF entryLength = -1 OR (entryLength=0) OR (entryLength=prefixLength) AND NOT (entryChar = '←) THEN {
RefText.ReleaseScratch[generalBuffer];
RefText.ReleaseScratch[msgIDBuffer];
reportProc[IO.PutFR["\n %g messages read from archive file", IO.int[num]] ];
IF bytesWritten = 0 THEN RETURN[TRUE, lengthOfFileToRead];
IF Flush[archiveStream] THEN RETURN[TRUE, lengthOfFileToRead];
RETURN[FALSE, lastCommitPosInFileToRead];
};
SELECT entryChar FROM
'-, '+, '← => {
BadFormat[IO.PutFR["Non-message entry (entrychar = %g)", IO.char[entryChar]]];
fileToRead.SetIndex[beginMsgPos+entryLength]
};
ENDCASE => {   -- regular message entry
DO   -- fake DO so can use exit
msgID: REF TEXT;
categories: ROPE;
catList: LIST OF REF TEXT;
outOfSynch, notInActive: BOOL;
headersPos: INT ← beginMsgPos+prefixLength;
body: ViewerTools.TiogaContents;
IF archiveFileType = unknown THEN
archiveFileType ← DetermineFileType[fileToRead];
IF archiveFileType = old THEN {
[msgID, categories, outOfSynch] ← ReadPrefixInfo[fileToRead, headersPos];
IF outOfSynch THEN {
BadFormat[IO.PutFR["\nPrefix Info not found at %g", IO.int[headersPos]]];
EXIT};
body ← WalnutSendOps.TiogaTextFromStrm[  -- careful here
fileToRead, headersPos, entryLength-prefixLength];
IF body.formatting.Length[] <= 2 THEN body.formatting ← NIL; --glitch
IF RefText.Length[msgID] = 0 THEN { -- need to construct an ID
back up and parse from the strm
ts: GVBasics.Timestamp;
id, from: ROPE;
mle: WalnutKernelDefs.MsgLogEntry = WalnutStream.createMsg;
mle.textLen ← entryLength - prefixLength;
fileToRead.SetIndex[headersPos];
WalnutStream.MsgEntryInfoFromStream[fileToRead, mle];
from ← IF mle.sender.Equal[WalnutSendOps.userRName, FALSE] OR
mle.sender.Equal[WalnutSendOps.simpleUserName, FALSE]
THEN Rope.Concat["To: ", mle.to]
ELSE mle.sender;
IF from.Length[] = 0 THEN from ← "NoKnownGVid";
IF mle.date = BasicTime.nullGMT THEN mle.date ← BasicTime.Now[];
ts ← [net: 3, host: 14, time: BasicTime.ToPupTime[mle.date]];
id ← WalnutStream.ConstructMsgID[ts, from];
msgID ← id.ToRefText[];
fileToRead.SetIndex[beginMsgPos+entryLength];
};
IF body.formatting.Length[] = 0 THEN {
pos: INT ← 0;
DO  -- replace \240's with spaces (used by laurel)
pos ← body.contents.Find["\240", pos];
IF pos = -1 THEN EXIT;
body.contents ← body.contents.Replace[start: pos, len: 1, with: " "];
ENDLOOP;
};
}
ELSE {
this: BOOL;
[msgID, categories, body, this] ←
ReadNewStyleArchiveFile[
fileToRead, headersPos, entryLength - (headersPos-beginMsgPos) - 1];
IF ~this THEN {
BadFormat[
IO.PutFR["\nCouldn't read entry at %g", IO.int[beginMsgPos]] ];
EXIT;
};
};
BEGIN ENABLE BEGIN
FS.Error =>
IF error.code = $transAborted THEN GOTO aborted ELSE REJECT;
IO.Error =>
IF WalnutStream.Aborted[archiveStream] THEN GOTO aborted ELSE REJECT;
END;
WalnutStream.createMsg.msg ← msgID;
WalnutStream.createMsg.textLen ← body.contents.Length[];
WalnutStream.createMsg.formatLen ← body.formatting.Length[];
startPosOnArchive ←
WalnutStream.WriteEntry[archiveStream, WalnutStream.createMsg];
WalnutStream.WriteMsgBody[archiveStream, body];
IF RefText.Length[msgSetRT] # 0 THEN categories ← NIL;
[catList, notInActive] ← CheckCategories[archiveStream, msgSetRT, categories];
IF entryChar # '? THEN {
WalnutStream.hasbeenRead.msg ← msgID;
[] ← WalnutStream.WriteEntry[archiveStream, WalnutStream.hasbeenRead];
};
-- ProcessCategories
BEGIN
first: BOOLTRUE;
FOR mL: LIST OF REF TEXT ← catList, mL.rest UNTIL mL=NIL DO
IF first AND notInActive THEN {
WalnutStream.moveMsg.msg ← msgID;
WalnutStream.moveMsg.from ← activeMsgSet;
WalnutStream.moveMsg.to ← mL.first;
[] ← WalnutStream.WriteEntry[archiveStream, WalnutStream.moveMsg]
}
ELSE {
WalnutStream.addMsg.msg ← msgID;
WalnutStream.addMsg.to ← mL.first;
[] ← WalnutStream.WriteEntry[archiveStream, WalnutStream.addMsg];
};
first ← FALSE;
ENDLOOP;
END;
IF (bytesWritten ← bytesWritten + archiveStream.GetLength[]-startPosOnArchive) >= bytesToWriteBeforeCommit THEN {
WalnutStream.FlushStream[archiveStream];
WalnutRoot.CommitAndContinue[];
lastCommitPosInFileToRead ← beginMsgPos+entryLength;
bytesWritten ← 0;
};
EXITS
aborted => {
RefText.ReleaseScratch[generalBuffer];
RefText.ReleaseScratch[msgIDBuffer];
RETURN[FALSE, lastCommitPosInFileToRead];
};
END;  -- of Enable FS.Error, IO.Error above - writing on archiveStream
IF (num ← num + 1) MOD 10 = 0 THEN {
IF num MOD 100 = 0 THEN reportProc[IO.PutFR["(%g)", IO.int[num]]]
ELSE reportProc["~"];
};
EXIT;
ENDLOOP;
fileToRead.SetIndex[beginMsgPos+entryLength];
};
ENDLOOP;
EXITS GiveUp => NULL;
END;
RefText.ReleaseScratch[generalBuffer];
RefText.ReleaseScratch[msgIDBuffer];
reportProc[IO.PutFR["\n %g messages read from archive file", IO.int[num]] ];
RETURN[ok, lastCommitPosInFileToRead];
};
Flush: PROC[strm: STREAM] RETURNS[ok: BOOL] = {
ENABLE FS.Error => IF error.code = $transAborted THEN GOTO aborted ELSE REJECT;
WalnutStream.FlushStream[strm];
WalnutRoot.CommitAndContinue[];
RETURN[TRUE];
EXITS
aborted => RETURN[FALSE]
};
CheckCategories: PROC[archiveStream: STREAM, msgSet: REF TEXT, categories: ROPE]
RETURNS[catList: LIST OF REF TEXT, notInActive: BOOL] = {
h: STREAM;
CheckMsgSet: PROC[ms: REF TEXT] RETURNS[notActive: BOOL] = {
IF RefText.Equal[ms, activeMsgSet, FALSE]
THEN RETURN[FALSE];
FOR mL: LIST OF REF TEXT ← newMsgSetList, mL.rest UNTIL mL = NIL DO
IF RefText.Equal[mL.first, ms, FALSE] THEN RETURN[TRUE];
ENDLOOP;
WalnutStream.createMsgSet.msgSet ← ms;
[] ← WalnutStream.WriteEntry[archiveStream, WalnutStream.createMsgSet];
newMsgSetList ← CONS[ms, newMsgSetList];  -- to be able to check next time
RETURN[TRUE];
};
catList ← NIL;
notInActive ← TRUE;
IF categories = NIL THEN
IF RefText.Length[msgSet] = 0 THEN RETURN[NIL, FALSE]
ELSE {
notInActive ← CheckMsgSet[msgSet];
IF notInActive THEN RETURN[LIST[msgSet], notInActive]
ELSE RETURN[NIL, FALSE];
};
IF RefText.Length[msgSet] # 0 THEN {
notInActive ← CheckMsgSet[msgSet];
IF notInActive THEN catList ← LIST[msgSet];
};
h ← IO.RIS[categories];
UNTIL h.EndOf[] DO
token: ROPE ← h.GetTokenRope[breakProc: IO.IDProc].token;
IF token.Length[] # 0 THEN {
tokenRT: REF TEXT ← token.ToRefText[];
notActive: BOOL ← CheckMsgSet[tokenRT ← token.ToRefText[]];
notInActive ← notInActive AND notActive;
IF notActive THEN catList ← CONS[tokenRT, catList];
};
ENDLOOP;
h.Close[];
};
DetermineFileType: PROC[strm: STREAM] RETURNS[aft: ArchiveFileType] = {
curPos: INT ← strm.GetIndex[];
aft ← old;
IF strm.GetChar[] = '@ THEN {  -- might be new or Hardy file
itemType: INT;
[] ← strm.GetInt[];
IF (itemType ← strm.GetInt[]) = WalnutMiscLog.TiogaControlItemType THEN
aft ← new;
};
strm.SetIndex[curPos];
};
ReadPrefixInfo: PROC[fileToRead: STREAM, headersPos: INT]
RETURNS[msgID: REF TEXT, categories: ROPE, outOfSynch: BOOL] = {
curPos: INT;
outOfSynch ← FALSE;
UNTIL (curPos ← fileToRead.GetIndex[]) = headersPos DO
tag: REF TEXT;
IF curPos > headersPos THEN {outOfSynch ← TRUE; RETURN};
tag ← fileToRead.GetToken[buffer: generalBuffer].token;
IF Rope.Equal[RefText.TrustTextAsRope[tag], msgIDRope, FALSE] THEN {
[] ← fileToRead.GetChar[]; -- the :
[] ← fileToRead.SkipWhitespace[];
msgID ← fileToRead.GetLine[msgIDBuffer];
LOOP;
};
IF Rope.Equal[RefText.TrustTextAsRope[tag], categoriesRope, FALSE] THEN {
[] ← fileToRead.GetChar[]; -- the :
[] ← fileToRead.SkipWhitespace[];
categories ← fileToRead.GetLineRope[];
LOOP;
};
ENDLOOP;
};
ReadNewStyleArchiveFile: PROC[
fileToRead: STREAM, headersPos, bodyLen: INT]
RETURNS[msgID: REF TEXT, categories: ROPE, body: TiogaContents, ok: BOOL] = {
len, type, pos, formatLen: INT;
ok ← FALSE;
body ← NEW[ViewerTools.TiogaContentsRec];
IF fileToRead.GetChar[] # '@ THEN RETURN;
len ← fileToRead.GetInt[];
type ← fileToRead.GetInt[];
formatLen ← fileToRead.GetInt[];
[] ← fileToRead.GetChar[];  -- the \n
IF type = WalnutMiscLog.TiogaControlItemType THEN {
msgID ← fileToRead.GetLine[msgIDBuffer];
categories ← fileToRead.GetLineRope[];
IF formatLen # 0 THEN {
body.formatting ←
WalnutSendOps.RopeFromStream[fileToRead, fileToRead.GetIndex[], formatLen];
[] ← fileToRead.GetChar[]; -- cr
};
};
[] ← fileToRead.GetChar[];  -- the other @
pos ← fileToRead.GetIndex[];
body.contents ← WalnutSendOps.RopeFromStream[fileToRead, pos, bodyLen];
[] ← fileToRead.GetChar[ ! IO.EndOfStream => CONTINUE];  -- the trailing \n
ok ← TRUE;
};
FindStartOfEntry: PROC[fileToRead: STREAM]
RETURNS[startPos, prefixLength, entryLength: INT, entryChar: CHAR] = {
ENABLE IO.EndOfStream => GOTO eoS;
line: REF TEXT;
relPos: INT ← 0;
startFound: BOOL;
prefixLength ← entryLength ← 0;
entryChar ← '\000;  -- not a valid entryType char
IF fileToRead.EndOf[] THEN RETURN;
startPos ← fileToRead.GetIndex[];
line ← fileToRead.GetLine[generalBuffer];
[startFound, relPos] ← IsStart[line];
IF NOT startFound THEN
UNTIL startFound DO
IF fileToRead.EndOf[] THEN RETURN;
startPos ← fileToRead.GetIndex[];
line ← fileToRead.GetLine[generalBuffer];
[startFound, relPos] ← IsStart[line];
ENDLOOP;
-- Read entry info line from log, e.g.: 00101 00029 US+
startPos ← startPos + relPos;
entryLength ← fileToRead.GetInt[];
prefixLength ← fileToRead.GetInt[];
line ← fileToRead.GetLine[generalBuffer];
entryChar ← RefText.Fetch[line, RefText.Length[line]-1];
[] ← fileToRead.GetChar[];  -- space
[] ← fileToRead.GetChar[];  -- flag
[] ← fileToRead.GetChar[];  -- flag
entryChar ← fileToRead.GetChar[];  -- Laurel's march char
IF fileToRead.GetChar[] # '\n THEN entryLength ← -1;
EXITS
eoS => {entryLength ← -1; RETURN};
};
starStartStar: ROPE = "*start*";
IsStart: PROC[line: REF TEXT] RETURNS[startFound: BOOL, relPos: INT] = {
relPos ← 0;
IF RefText.Length[line] = 0 THEN RETURN[FALSE, relPos];
IF Rope.Equal[starStartStar, RefText.TrustTextAsRope[line]] THEN
RETURN[TRUE, relPos];
IF (relPos ← RefText.Length[line] - starStartStar.Length[]) <=0 THEN RETURN[FALSE, relPos];
IF Rope.Find[s1: RefText.TrustTextAsRope[line], s2: starStartStar, pos1: relPos] = relPos THEN
RETURN[TRUE, relPos];
RETURN[FALSE, relPos];
};
Managing the ExpungeLog
StartExpunge: PUBLIC PROC[pagesNeeded: INT] RETURNS[expungeFileID: INT] = {
Writes the first log record on the expunge log, from information from the root; returns the internalFileID of the expunge log being written on
reason: FS.ErrorDesc;
actualPages: INT;
of: FS.OpenFile;
BEGIN ENABLE BEGIN
FS.Error => { reason ← error; GOTO err };
IO.Error => { reason ← AlpineFS.ErrorFromStream[stream]; GOTO err };
UNWIND => { currentStream ← expungeStream ← NIL };
END;
[currentStream, expungeStream, keyIs, expungeInternalFileID, logSeqNo] ←
WalnutRoot.GetStreamsForExpunge[];
actualPages ← FS.GetInfo[of ← AlpineFS.OpenFileFromStream[expungeStream]].pages;
IF actualPages < pagesNeeded THEN
FS.SetPageCount[of, pagesNeeded ! FS.Error =>
IF error.code = $quotaExceeded THEN GOTO quota ELSE REJECT];
IF expungeStream.GetLength[] # 0 THEN {
WalnutStream.SetHighWaterMark[expungeStream, 0, -1];
expungeStream.SetIndex[0];
expungeStream.SetLength[0];
};
FS.SetByteCountAndCreatedTime[of, -1, BasicTime.Now[]];
WalnutStream.logFileInfo. key ← keyIs.ToRefText[];
WalnutStream.logFileInfo.internalFileID ← expungeInternalFileID;
WalnutStream.logFileInfo.logSeqNo ← logSeqNo;
[] ← WalnutStream.WriteEntry[expungeStream, WalnutStream.logFileInfo];
WalnutStream.FlushStream[expungeStream, TRUE];
RETURN[expungeInternalFileID];
EXITS
quota =>
ERROR WalnutDefs.Error[$expungeLog, $quotaExceeded,
IO.PutFR[" An ExpungeLog of %g pages exceeds your quota; file has %g pages", IO.int[pagesNeeded], IO.int[actualPages]]];
err => {
WalnutRoot.StatsReport["\n *** During start expunge "];
WalnutRoot.StatsReport[reason.explanation];
ERROR WalnutDefs.Error[$expungeLog, reason.code, "During Start Expunge"];
};
END;
};
RestartExpunge: PUBLIC PROC[currentLogPos, expungeLogLength: INT]
RETURNS[ok: BOOL] = {
Set the length of the expunge log and the read position of the current log to be the given values. If it returns NOT ok, then you must StartExpunge -- StartExpunge is always guaranteed to succeed (if space exists!)
DoRE: PROC = {
expungePosToUse: INT ← expungeLogLength;
le: LogEntry;
keyFromExpLog: REF TEXT;
internalFileIDFromExpLog, logSeqNoFromExpLog: INT;
reason: FS.ErrorDesc;
BEGIN ENABLE BEGIN
FS.Error => { reason ← error; GOTO error };
IO.Error => { reason ← AlpineFS.ErrorFromStream[stream]; GOTO error };
UNWIND => { currentStream ← expungeStream ← NIL };
END;
ok ← FALSE;
[currentStream, expungeStream, keyIs, expungeInternalFileID, logSeqNo] ←
WalnutRoot.GetStreamsForExpunge[];
IF expungeStream = NIL THEN RETURN;
IF expungeStream.GetLength[] < expungeLogLength THEN RETURN;  -- too short
expungeStream.SetIndex[0];
le ← WalnutStream.ReadEntry[expungeStream].le;
IF le = NIL THEN RETURN;
TRUSTED {
WITH le: le SELECT FROM
LogFileInfo => {
keyFromExpLog ← le.key;
internalFileIDFromExpLog ← le.internalFileID;
logSeqNoFromExpLog ← le.logSeqNo;
};
ENDCASE => RETURN;
};
IF expungeLogLength = 0 THEN expungePosToUse ← expungeStream.GetIndex[];
IF ~keyIs.Equal[RefText.TrustTextAsRope[keyFromExpLog]] THEN RETURN;
IF internalFileIDFromExpLog # expungeInternalFileID THEN RETURN;
IF logSeqNoFromExpLog # logSeqNo THEN RETURN;
WalnutStream.SetHighWaterMark[expungeStream, expungePosToUse, -1];
expungeStream.SetIndex[expungePosToUse];
currentStream.SetIndex[currentLogPos];
expCurrentEntryLengthHint ← -1;
currentEntryPos ← currentLogPos;
ok ← TRUE;
EXITS error => {
WalnutRoot.StatsReport["\n *** During Start Expunge "];
WalnutRoot.StatsReport[reason.explanation];
ok ← FALSE;
};
END;
};
DoRE[];
IF ~ok THEN {
WalnutRoot.ReturnExpungeStreams[];
currentStream ← expungeStream ← NIL;
};
};
CopyBytesToExpungeLog: PUBLIC PROC[bytesToCopy: INT] = {
Copies bytesToCopy from the currentLog at its current position onto the end of the expungeLog; does not flush the expungeLog (must use GetExpungeProgress)
Cbel: PROC = {
expungeStream.SetIndex[expungeStream.GetLength[]]; -- to be sure
WalnutStream.CopyBytes[
from: currentStream, to: expungeStream, num: bytesToCopy];
currentEntryPos ← currentStream.GetIndex[];
expCurrentEntryLengthHint ← -1;  -- unknown
};
ExpCarefullyApply[Cbel, "CopyBytesToExpungeLog"];
};
GetExpungeProgress: PUBLIC PROC RETURNS[currentLogPos, expungeLogLength: INT] = {
Get the read position of the current log and the length of the expunge log (this information can later be used to restart an expunge that was in progress); forces a flush (commit) of the expunge log - raises WalnutDefs.Error with code = $LogTransAbort if the flush fails
Gep: PROC = {
WalnutStream.FlushStream[expungeStream, TRUE];
WalnutRoot.CommitAndContinue[];
currentLogPos ← currentEntryPos;
expungeLogLength ← expungeStream.GetLength[];
};
ExpCarefullyApply[Gep, "GetExpungeProgress"];
};
EndExpunge: PUBLIC PROC =
{ expungeStream ← currentStream ← NIL };
ExpShutdown: PUBLIC PROC =
{ expungeStream ← currentStream ← NIL };
Parsing a log for Expunge
ExpSetPosition: PUBLIC PROC[startPos: INT] RETURNS[charsSkipped: INT] = {
next: INT;
SetPs: PROC = {
currentStream.SetIndex[startPos];
next ← WalnutStream.FindNextEntry[currentStream];
expCurrentEntryLengthHint ← -1;
IF next # -1 THEN expCurrentEntryPos ← next;
};
ExpCarefullyApply[SetPs, "SetPosition"];
RETURN[IF next = -1 THEN next ELSE next - startPos];
};
ExpSetIndex: PUBLIC PROC[pos: INT] = {
Si: PROC = {
currentStream.SetIndex[pos];
expCurrentEntryLengthHint ← -1;
expCurrentEntryPos ← pos;
};
ExpCarefullyApply[Si, "SetIndex"];
};
PeekEntry: PUBLIC PROC RETURNS[ident: ATOM, msgID: REF TEXT, at: INT] = {
returns at = -1 if there is not a valid entry at the current position. The entry is not "consumed" so that performing a NextEntry or a CopyEntry after reading the ident & msg ID information will copy or retrieve the entry returned. Performing successive PeekEntry's will NOT advance the log position; use SkipEntry to advance the log
Does not check if ident is for a legal log entry type.
Pne: PROC = {
length: INT;
[ident, msgID, length] ← WalnutStream.PeekEntry[currentStream];
expCurrentEntryLengthHint ← length;
IF length = -1 THEN {
IF expCurrentEntryPos = currentStream.GetLength[]
THEN at ← expCurrentEntryPos ELSE at ← -1;
RETURN;
};
at ← expCurrentEntryPos;
};
ExpCarefullyApply[Pne, "PeekEntry"];
};
SkipEntry: PUBLIC PROC RETURNS[ok: BOOL] = {
returns NOT ok if there is not a valid entry at the current position, and does not advance the log position. If there is a valid entry, it is "consumed", and the log is advanced.
Sne: PROC = {
length: INT;
ok ← FALSE;
IF expCurrentEntryLengthHint = -1 THEN
length ← WalnutStream.PeekEntry[currentStream].length
ELSE {
length ← expCurrentEntryLengthHint;
expCurrentEntryLengthHint ← -1;
};
IF length = -1 THEN RETURN;
currentStream.SetIndex[expCurrentEntryPos ← currentStream.GetIndex[] + length];
ok ← TRUE;
};
ExpCarefullyApply[Sne, "SkipEntry"];
};
CopyEntry: PUBLIC PROC RETURNS[newPosition, bytesCopied: INT] = {
Copies the next entry from the current log to the expunge log; it returns the new position that the entry now has in the expunge log; does NOT flush (commit) the expungeLog, raises WalnutDefs.Error with code = $ExpungeTransAbort if the expunge transaction aborts
Ce: PROC = {
bytesCopied ← 0;
newPosition ← expungeStream.GetIndex[];
IF expCurrentEntryLengthHint = -1 THEN
expCurrentEntryLengthHint ← WalnutStream.PeekEntry[currentStream].length;
IF expCurrentEntryLengthHint = -1 THEN {newPosition ← -1; RETURN};
WalnutStream.CopyBytes[
from: currentStream, to: expungeStream, num: expCurrentEntryLengthHint];
expCurrentEntryPos ← currentStream.GetIndex[];
bytesCopied ← expCurrentEntryLengthHint;
expCurrentEntryLengthHint ← -1;  -- unknown
};
ExpCarefullyApply[Ce, "CopyEntry"];
};
EndCopyEntry: PUBLIC PROC RETURNS[startCopyPos: INT] = {
Ne: PROC[] = {
wle: LogEntry ← WalnutStream.ReadEntry[currentStream].le;
currentStream.SetIndex[expCurrentEntryPos];
IF wle = NIL THEN
ERROR WalnutDefs.Error[$log, $NoEntryFound, "During Expunge"];
TRUSTED { WITH le: wle SELECT FROM
EndCopyNewMailInfo => startCopyPos ← le.startCopyPos;
EndCopyReadArchiveInfo => startCopyPos ← le.startCopyPos;
ENDCASE =>
ERROR WalnutDefs.Error[$log, $WrongEntryFound, "During Expunge"];
};
};
startCopyPos ← 0;
ExpCarefullyApply[Ne, "EndCopyNewMailEntry"];
};
ExpLogLength: PUBLIC PROC RETURNS[length: INT] = {
IF currentStream = NIL THEN RETURN[-1];
RETURN[currentStream.GetLength[]];
};
GetIndex: PUBLIC PROC RETURNS[length: INT] = {
IF currentStream = NIL THEN RETURN[-1];
RETURN[currentStream.GetIndex[]];
};
CopyBytes: PUBLIC PROC[strm: STREAM, num: INT] = {
Cb: PROC = {
strm.SetIndex[strm.GetLength[]];
WalnutStream.CopyBytes[from: currentStream, to: strm, num: num];
expCurrentEntryPos ← currentStream.GetIndex[];
expCurrentEntryLengthHint ← -1;  -- unknown
};
ExpCarefullyApply[Cb, "CopyBytes"];
};
ExamineThisEntry: PUBLIC PROC RETURNS[status: WalnutLogExpunge.EntryStatus] = {
returns noValidEntry if there is not a valid entry at the current position, and does not advance the log position. If there is a valid entry, it is "consumed", and the log is advanced.
IF IO.EndOfStream is raised, EndOfStream is returned and log is not advanced.
Ete: PROC = {
length: INT;
curPos: INT;
status ← noValidEntry;
IF expCurrentEntryLengthHint = -1 THEN
length ← WalnutStream.PeekEntry[currentStream].length
ELSE {
length ← expCurrentEntryLengthHint;
expCurrentEntryLengthHint ← -1;
};
IF length = -1 THEN { status ← noValidEntry; RETURN};
status ← validEntry;
currentStream.SetIndex[
expCurrentEntryPos ← (curPos ← currentStream.GetIndex[]) + length !
  IO.EndOfStream => { status ← EndOfStream; CONTINUE }];
IF status = EndOfStream THEN currentStream.SetIndex[expCurrentEntryPos ← curPos];
};
ExpCarefullyApply[Ete, "ExamineThisEntry"];
};
Local utilities
ExpCarefullyApply: PROC[proc: PROC[], who: ROPE ] = {
ENABLE BEGIN
IO.Error => IF WalnutStream.Aborted[stream] THEN GOTO aborted ELSE {
ed: FS.ErrorDesc;
ed ← FS.ErrorFromStream[stream ! FS.Error, IO.Error => CONTINUE];
IF ed.code = $quotaExceeded THEN GOTO quota;
WalnutRoot.ReturnExpungeStreams[];
currentStream ← expungeStream ← NIL;
REJECT
};
FS.Error => {
IF error.code = $transAborted THEN GOTO aborted;
IF error.code = $quotaExceeded THEN GOTO quota;
WalnutRoot.ReturnExpungeStreams[];
currentStream ← expungeStream ← NIL;
REJECT
};
UNWIND => {
WalnutRoot.ReturnExpungeStreams[];
currentStream ← expungeStream ← NIL;
};
END;
proc[];
EXITS
aborted => {
WalnutRoot.StatsReport["\n **** Trans abort during expunge doing "];
WalnutRoot.StatsReport[who];
WalnutRoot.ReturnExpungeStreams[];
currentStream ← expungeStream ← NIL;
ERROR WalnutDefs.Error[$log, $TransactionAbort, "During expunge"];
};
quota => {
WalnutRoot.StatsReport["\n **** Quota exceeded during expunge doing "];
WalnutRoot.StatsReport[who];
WalnutRoot.ReturnExpungeStreams[];
currentStream ← expungeStream ← NIL;
ERROR WalnutDefs.Error[$log, $QuotaExceeded, "During expunge"];
};
};
END.