WalnutLogExpungeImpl.Mesa
Copyright © 1985 by Xerox Corporation. All rights reserved.
Willie-Sue, October 14, 1985 12:06:22 pm PDT
Contents: types and procedures which take the current and expunge log streams as implicit parameters
DIRECTORY
AlpineFS USING [OpenFile, ErrorFromStream, OpenFileFromStream],
BasicTime USING [Now],
FS USING [Error, ErrorDesc, OpenFile,
 ErrorFromStream, GetInfo, SetByteCountAndCreatedTime, SetPageCount],
IO,
RefText USING [TrustTextAsRope],
Rope,
WalnutDefs USING [Error],
WalnutKernelDefs USING [LogEntry],
WalnutRoot USING [CommitAndContinue, GetStreamsForExpunge, ReturnExpungeStreams, StatsReport],
WalnutStream USING [logFileInfo, Aborted, CopyBytes, FindNextEntry, FlushStream, PeekEntry, ReadEntry, SetHighWaterMark, WriteEntry],
WalnutLogExpunge;
WalnutLogExpungeImpl: CEDAR PROGRAM
IMPORTS
AlpineFS, BasicTime, FS, IO, RefText, Rope,
WalnutDefs, WalnutRoot, WalnutStream
EXPORTS
WalnutLogExpunge =
BEGIN
Types
ROPE: TYPE = Rope.ROPE;
STREAM: TYPE = IO.STREAM;
LogEntry: TYPE = WalnutKernelDefs.LogEntry;
Variables
currentStream: STREAM;
currentEntryLengthHint: INT;
currentEntryPos: INT;
expungeStream: STREAM;
keyIs: ROPE;
expungeInternalFileID: INT;
logSeqNo: INT;
Managing the ExpungeLog
StartExpunge: PUBLIC PROC[pagesNeeded: INT] RETURNS[expungeFileID: INT] = {
Writes the first log record on the expunge log, from information from the root; returns the internalFileID of the expunge log being written on
reason: FS.ErrorDesc;
actualPages: INT;
of: FS.OpenFile;
BEGIN ENABLE BEGIN
FS.Error => { reason ← error; GOTO err };
IO.Error => { reason ← AlpineFS.ErrorFromStream[stream]; GOTO err };
UNWIND => { currentStream ← expungeStream ← NIL };
END;
[currentStream, expungeStream, keyIs, expungeInternalFileID, logSeqNo] ←
WalnutRoot.GetStreamsForExpunge[];
actualPages ← FS.GetInfo[of ← AlpineFS.OpenFileFromStream[expungeStream]].pages;
IF actualPages < pagesNeeded THEN
FS.SetPageCount[of, pagesNeeded ! FS.Error =>
IF error.code = $quotaExceeded THEN GOTO quota ELSE REJECT];
IF expungeStream.GetLength[] # 0 THEN {
WalnutStream.SetHighWaterMark[expungeStream, 0, -1];
expungeStream.SetIndex[0];
expungeStream.SetLength[0];
};
FS.SetByteCountAndCreatedTime[of, -1, BasicTime.Now[]];
WalnutStream.logFileInfo. key ← keyIs.ToRefText[];
WalnutStream.logFileInfo.internalFileID ← expungeInternalFileID;
WalnutStream.logFileInfo.logSeqNo ← logSeqNo;
[] ← WalnutStream.WriteEntry[expungeStream, WalnutStream.logFileInfo];
WalnutStream.FlushStream[expungeStream, TRUE];
RETURN[expungeInternalFileID];
EXITS
quota =>
ERROR WalnutDefs.Error[$expungeLog, $quotaExceeded,
IO.PutFR[" An ExpungeLog of %g pages exceeds your quota; file has %g pages", IO.int[pagesNeeded], IO.int[actualPages]]];
err => {
WalnutRoot.StatsReport["\n *** During start expunge "];
WalnutRoot.StatsReport[reason.explanation];
ERROR WalnutDefs.Error[$expungeLog, reason.code, "During Start Expunge"];
};
END;
};
RestartExpunge: PUBLIC PROC[currentLogPos, expungeLogLength: INT]
RETURNS[ok: BOOL] = {
Set the length of the expunge log and the read position of the current log to be the given values. If it returns NOT ok, then you must StartExpunge -- StartExpunge is always guaranteed to succeed (if space exists!)
DoRE: PROC = {
expungePosToUse: INT ← expungeLogLength;
le: LogEntry;
keyFromExpLog: REF TEXT;
internalFileIDFromExpLog, logSeqNoFromExpLog: INT;
reason: FS.ErrorDesc;
BEGIN ENABLE BEGIN
FS.Error => { reason ← error; GOTO error };
IO.Error => { reason ← AlpineFS.ErrorFromStream[stream]; GOTO error };
UNWIND => { currentStream ← expungeStream ← NIL };
END;
ok ← FALSE;
[currentStream, expungeStream, keyIs, expungeInternalFileID, logSeqNo] ←
WalnutRoot.GetStreamsForExpunge[];
IF expungeStream = NIL THEN RETURN;
IF expungeStream.GetLength[] < expungeLogLength THEN RETURN;  -- too short
expungeStream.SetIndex[0];
le ← WalnutStream.ReadEntry[expungeStream].le;
IF le = NIL THEN RETURN;
TRUSTED {
WITH le: le SELECT FROM
LogFileInfo => {
keyFromExpLog ← le.key;
internalFileIDFromExpLog ← le.internalFileID;
logSeqNoFromExpLog ← le.logSeqNo;
};
ENDCASE => RETURN;
};
IF expungeLogLength = 0 THEN expungePosToUse ← expungeStream.GetIndex[];
IF ~keyIs.Equal[RefText.TrustTextAsRope[keyFromExpLog]] THEN RETURN;
IF internalFileIDFromExpLog # expungeInternalFileID THEN RETURN;
IF logSeqNoFromExpLog # logSeqNo THEN RETURN;
WalnutStream.SetHighWaterMark[expungeStream, expungePosToUse, -1];
expungeStream.SetIndex[expungePosToUse];
currentStream.SetIndex[currentLogPos];
currentEntryLengthHint ← -1;
currentEntryPos ← currentLogPos;
ok ← TRUE;
EXITS error => {
WalnutRoot.StatsReport["\n *** During Start Expunge "];
WalnutRoot.StatsReport[reason.explanation];
ok ← FALSE;
};
END;
};
DoRE[];
IF ~ok THEN {
WalnutRoot.ReturnExpungeStreams[];
currentStream ← expungeStream ← NIL;
};
};
CopyBytesToExpungeLog: PUBLIC PROC[bytesToCopy: INT] = {
Copies bytesToCopy from the currentLog at its current position onto the end of the expungeLog; does not flush the expungeLog (must use GetExpungeProgress)
Cbel: PROC = {
expungeStream.SetIndex[expungeStream.GetLength[]]; -- to be sure
WalnutStream.CopyBytes[
from: currentStream, to: expungeStream, num: bytesToCopy];
currentEntryPos ← currentStream.GetIndex[];
currentEntryLengthHint ← -1;  -- unknown
};
CarefullyApply[Cbel, "CopyBytesToExpungeLog"];
};
GetExpungeProgress: PUBLIC PROC RETURNS[currentLogPos, expungeLogLength: INT] = {
Get the read position of the current log and the length of the expunge log (this information can later be used to restart an expunge that was in progress); forces a flush (commit) of the expunge log - raises WalnutDefs.Error with code = $LogTransAbort if the flush fails
Gep: PROC = {
WalnutStream.FlushStream[expungeStream, TRUE];
WalnutRoot.CommitAndContinue[];
currentLogPos ← currentEntryPos;
expungeLogLength ← expungeStream.GetLength[];
};
CarefullyApply[Gep, "GetExpungeProgress"];
};
EndExpunge: PUBLIC PROC =
{ expungeStream ← currentStream ← NIL };
Shutdown: PUBLIC PROC =
{ expungeStream ← currentStream ← NIL };
Parsing a log for Expunge
SetPosition: PUBLIC PROC[startPos: INT] RETURNS[charsSkipped: INT] = {
next: INT;
SetPs: PROC = {
currentStream.SetIndex[startPos];
next ← WalnutStream.FindNextEntry[currentStream];
currentEntryLengthHint ← -1;
IF next # -1 THEN currentEntryPos ← next;
};
CarefullyApply[SetPs, "SetPosition"];
RETURN[IF next = -1 THEN next ELSE next - startPos];
};
SetIndex: PUBLIC PROC[pos: INT] = {
Si: PROC = {
currentStream.SetIndex[pos];
currentEntryLengthHint ← -1;
currentEntryPos ← pos;
};
CarefullyApply[Si, "SetIndex"];
};
PeekEntry: PUBLIC PROC RETURNS[ident: ATOM, msgID: REF TEXT, at: INT] = {
returns at = -1 if there is not a valid entry at the current position. The entry is not "consumed" so that performing a NextEntry or a CopyEntry after reading the ident & msg ID information will copy or retrieve the entry returned. Performing successive PeekEntry's will NOT advance the log position; use SkipEntry to advance the log
Does not check if ident is for a legal log entry type.
Pne: PROC = {
length: INT;
[ident, msgID, length] ← WalnutStream.PeekEntry[currentStream];
currentEntryLengthHint ← length;
IF length = -1 THEN {
IF currentEntryPos = currentStream.GetLength[]
THEN at ← currentEntryPos ELSE at ← -1;
RETURN;
};
at ← currentEntryPos;
};
CarefullyApply[Pne, "PeekEntry"];
};
SkipEntry: PUBLIC PROC RETURNS[ok: BOOL] = {
returns NOT ok if there is not a valid entry at the current position, and does not advance the log position. If there is a valid entry, it is "consumed", and the log is advanced.
Sne: PROC = {
length: INT;
ok ← FALSE;
IF currentEntryLengthHint = -1 THEN
length ← WalnutStream.PeekEntry[currentStream].length
ELSE {
length ← currentEntryLengthHint;
currentEntryLengthHint ← -1;
};
IF length = -1 THEN RETURN;
currentStream.SetIndex[currentEntryPos ← currentStream.GetIndex[] + length];
ok ← TRUE;
};
CarefullyApply[Sne, "SkipEntry"];
};
CopyEntry: PUBLIC PROC RETURNS[newPosition, bytesCopied: INT] = {
Copies the next entry from the current log to the expunge log; it returns the new position that the entry now has in the expunge log; does NOT flush (commit) the expungeLog, raises WalnutDefs.Error with code = $ExpungeTransAbort if the expunge transaction aborts
Ce: PROC = {
bytesCopied ← 0;
newPosition ← expungeStream.GetIndex[];
IF currentEntryLengthHint = -1 THEN
currentEntryLengthHint ← WalnutStream.PeekEntry[currentStream].length;
IF currentEntryLengthHint = -1 THEN {newPosition ← -1; RETURN};
WalnutStream.CopyBytes[
from: currentStream, to: expungeStream, num: currentEntryLengthHint];
currentEntryPos ← currentStream.GetIndex[];
bytesCopied ← currentEntryLengthHint;
currentEntryLengthHint ← -1;  -- unknown
};
CarefullyApply[Ce, "CopyEntry"];
};
EndCopyEntry: PUBLIC PROC RETURNS[startCopyPos: INT] = {
Ne: PROC[] = {
wle: LogEntry ← WalnutStream.ReadEntry[currentStream].le;
currentStream.SetIndex[currentEntryPos];
IF wle = NIL THEN
ERROR WalnutDefs.Error[$log, $NoEntryFound, "During Expunge"];
TRUSTED { WITH le: wle SELECT FROM
EndCopyNewMailInfo => startCopyPos ← le.startCopyPos;
EndCopyReadArchiveInfo => startCopyPos ← le.startCopyPos;
ENDCASE =>
ERROR WalnutDefs.Error[$log, $WrongEntryFound, "During Expunge"];
};
};
startCopyPos ← 0;
CarefullyApply[Ne, "EndCopyNewMailEntry"];
};
LogLength: PUBLIC PROC RETURNS[length: INT] = {
IF currentStream = NIL THEN RETURN[-1];
RETURN[currentStream.GetLength[]];
};
GetIndex: PUBLIC PROC RETURNS[length: INT] = {
IF currentStream = NIL THEN RETURN[-1];
RETURN[currentStream.GetIndex[]];
};
CopyBytes: PUBLIC PROC[strm: STREAM, num: INT] = {
Cb: PROC = {
strm.SetIndex[strm.GetLength[]];
WalnutStream.CopyBytes[from: currentStream, to: strm, num: num];
currentEntryPos ← currentStream.GetIndex[];
currentEntryLengthHint ← -1;  -- unknown
};
CarefullyApply[Cb, "CopyBytes"];
};
ExamineThisEntry: PUBLIC PROC RETURNS[status: WalnutLogExpunge.EntryStatus] = {
returns noValidEntry if there is not a valid entry at the current position, and does not advance the log position. If there is a valid entry, it is "consumed", and the log is advanced.
IF IO.EndOfStream is raised, EndOfStream is returned and log is not advanced.
Ete: PROC = {
length: INT;
curPos: INT;
status ← noValidEntry;
IF currentEntryLengthHint = -1 THEN
length ← WalnutStream.PeekEntry[currentStream].length
ELSE {
length ← currentEntryLengthHint;
currentEntryLengthHint ← -1;
};
IF length = -1 THEN { status ← noValidEntry; RETURN};
status ← validEntry;
currentStream.SetIndex[currentEntryPos ← (curPos ← currentStream.GetIndex[]) + length !
  IO.EndOfStream => { status ← EndOfStream; CONTINUE }];
IF status = EndOfStream THEN currentStream.SetIndex[currentEntryPos ← curPos];
};
CarefullyApply[Ete, "ExamineThisEntry"];
};
Local utilities
CarefullyApply: PROC[proc: PROC[], who: ROPE ] = {
ENABLE BEGIN
IO.Error => IF WalnutStream.Aborted[stream] THEN GOTO aborted ELSE {
ed: FS.ErrorDesc;
ed ← FS.ErrorFromStream[stream ! FS.Error, IO.Error => CONTINUE];
IF ed.code = $quotaExceeded THEN GOTO quota;
WalnutRoot.ReturnExpungeStreams[];
currentStream ← expungeStream ← NIL;
REJECT
};
FS.Error => {
IF error.code = $transAborted THEN GOTO aborted;
IF error.code = $quotaExceeded THEN GOTO quota;
WalnutRoot.ReturnExpungeStreams[];
currentStream ← expungeStream ← NIL;
REJECT
};
UNWIND => {
WalnutRoot.ReturnExpungeStreams[];
currentStream ← expungeStream ← NIL;
};
END;
proc[];
EXITS
aborted => {
WalnutRoot.StatsReport["\n **** Trans abort during expunge doing "];
WalnutRoot.StatsReport[who];
WalnutRoot.ReturnExpungeStreams[];
currentStream ← expungeStream ← NIL;
ERROR WalnutDefs.Error[$log, $TransactionAbort, "During expunge"];
};
quota => {
WalnutRoot.StatsReport["\n **** Quota exceeded during expunge doing "];
WalnutRoot.StatsReport[who];
WalnutRoot.ReturnExpungeStreams[];
currentStream ← expungeStream ← NIL;
ERROR WalnutDefs.Error[$log, $QuotaExceeded, "During expunge"];
};
};
END.