WalnutLogExpungeImpl.mesa
Copyright Ó 1984, 1988, 1992 by Xerox Corporation. All rights reserved.
Willie-Sue, August 3, 1988 12:50:39 pm PDT
Doug Terry, November 20, 1990 7:56 pm PST
Willie-s, April 27, 1992 1:47 pm PDT
Walnut Log Expunge Operations Implementation
DIRECTORY
BasicTime USING [GMT],
FS USING [Error, ErrorDesc, OpenFile, GetInfo, OpenFileFromStream],
IO,
Rope,
WalnutDefs USING [CheckReportProc, Error, WalnutOpsHandle],
WalnutKernelDefs USING [LogEntry],
WalnutLog USING [ForgetLogStreams],
WalnutLogExpunge,
WalnutRoot -- using lots -- ,
WalnutStream -- using lots -- ;
WalnutLogExpungeImpl:
CEDAR
PROGRAM
IMPORTS
FS, IO, Rope,
WalnutDefs, WalnutLog, WalnutRoot, WalnutStream
EXPORTS
WalnutDefs, WalnutLogExpunge
= BEGIN
Types
GMT: TYPE = BasicTime.GMT;
ROPE: TYPE = Rope.ROPE;
STREAM: TYPE = IO.STREAM;
LogEntry: TYPE = WalnutKernelDefs.LogEntry;
InternalLogInfo: TYPE = WalnutRoot.InternalLogInfo;
WalnutOpsHandle: TYPE = WalnutDefs.WalnutOpsHandle;
RootHandle: TYPE = WalnutRoot.RootHandle;
RootHandleRec: PUBLIC TYPE = WalnutRoot.RootHandleRec;
ExpungeHandle: TYPE = WalnutLogExpunge.ExpungeHandle;
ExpungeHandleRec: PUBLIC TYPE = WalnutLogExpunge.ExpungeHandleRec;
Managing the ExpungeLog
StartExpunge:
PUBLIC
PROC[opsH: WalnutOpsHandle, pagesNeeded:
INT]
RETURNS[expungeFileID:
INT] = {
Writes the first log record on the expunge log, from information from the root
reason: FS.ErrorDesc;
actualPages: INT;
of: FS.OpenFile;
eStrm: STREAM;
eLog: InternalLogInfo;
WalnutLog.ForgetLogStreams[opsH]; -- forget current scanning stuff
BEGIN ENABLE
BEGIN
FS.Error => { reason ¬ error; GOTO err };
IO.Error => { <<reason ¬ FS.ErrorFromStream[stream];>> GOTO err };
UNWIND => opsH.expungeHandle ¬ NIL;
END;
WalnutRoot.GetStreamsForExpunge[opsH: opsH, starting: TRUE, pagesWanted: pagesNeeded];
eLog ¬ opsH.rootHandle.expungeLog;
eStrm ¬ eLog.writeStream;
actualPages ¬ FS.GetInfo[of ¬ FS.OpenFileFromStream[eStrm]].pages;
<<IF actualPages < pagesNeeded
THEN
FS.SetPageCount[of, pagesNeeded !
FS.Error =>
IF error.code = $quotaExceeded THEN GOTO quota ELSE REJECT];>>
IF eStrm.GetLength[] # 0
THEN {
WalnutStream.SetHighWaterMark[eStrm, 0, -1];
eStrm.SetIndex[0];
eStrm.SetLength[0];
};
<<FS.SetByteCountAndCreatedTime[of, -1, BasicTime.Now[]];>>
WalnutStream.logInfoRef.logFileInfo.key ¬ opsH.key;
WalnutStream.logInfoRef.logFileInfo.internalFileID ¬ eLog.internalFileID;
WalnutStream.logInfoRef.logFileInfo.logSeqNo ¬ opsH.rootHandle.currentLog.logSeqNo;
note where the above logSeqNo comes from
[] ¬ WalnutStream.WriteEntry[eStrm, WalnutStream.logInfoRef.logFileInfo];
WalnutStream.FlushStream[eStrm, TRUE];
opsH.expungeHandle ¬ NEW[ExpungeHandleRec ¬ [thisLog: eLog]];
opsH.expungeHandle.eStrm ¬ eStrm;
opsH.expungeHandle.rStrm ¬ opsH.rootHandle.currentLog.readStream;
RETURN[eLog.internalFileID];
EXITS
quota =>
ERROR WalnutDefs.Error[$expungeLog, $quotaExceeded,
IO.PutFR[" An ExpungeLog of %g pages exceeds your quota; file has %g pages", [integer[pagesNeeded]], [integer[actualPages]]] ];
err => {
WalnutRoot.StatsReport[opsH, "\n *** During start expunge "];
WalnutRoot.StatsReport[opsH, reason.explanation];
ERROR WalnutDefs.Error[$expungeLog, reason.code, "During Start Expunge"];
};
END;
};
RestartExpunge:
PUBLIC
PROC[opsH: WalnutOpsHandle, currentLogPos, expungeLogLength:
INT]
RETURNS[ok: BOOL ¬ FALSE] = {
Set the length of the expunge log and the read position of the current log to be the given values. If it returns NIL, then you must StartExpunge -- StartExpunge is always guaranteed to succeed (if space exists!)
DO
expungePosToUse: INT ¬ expungeLogLength;
le: LogEntry;
reason: FS.ErrorDesc;
rStrm, eStrm, erStrm: STREAM;
keyFromExpLog: ROPE;
internalFileIDFromExpLog, logSeqNoFromExpLog: INT;
BEGIN
ENABLE
BEGIN
FS.Error => { reason ¬ error; GOTO error };
IO.Error => { <<reason ¬ FS.ErrorFromStream[stream];>> GOTO error };
UNWIND => opsH.expungeHandle ¬ NIL;
END;
WalnutRoot.GetStreamsForExpunge[opsH, FALSE, 0]; -- pages ignored on restart
IF opsH.rootHandle.expungeLog.writeStream = NIL THEN EXIT;
IF opsH.rootHandle.expungeLog.readStream = NIL THEN EXIT;
eStrm ¬ opsH.rootHandle.expungeLog.writeStream;
erStrm ¬ opsH.rootHandle.expungeLog.readStream;
rStrm ¬ opsH.rootHandle.currentLog.readStream;
IF eStrm.GetLength[] < expungeLogLength THEN EXIT; -- too short
erStrm.SetIndex[0];
le ¬ WalnutStream.ReadEntry[erStrm].le;
IF le = NIL THEN RETURN;
TRUSTED {
WITH le: le
SELECT
FROM
LogFileInfo => {
keyFromExpLog ¬ le.key;
internalFileIDFromExpLog ¬ le.internalFileID;
logSeqNoFromExpLog ¬ le.logSeqNo;
};
ENDCASE => RETURN;
};
IF ~opsH.key.Equal[keyFromExpLog] THEN RETURN;
IF internalFileIDFromExpLog # opsH.rootHandle.expungeLog.internalFileID
THEN
RETURN;
note this next one carefully
IF logSeqNoFromExpLog # opsH.rootHandle.currentLog.logSeqNo THEN RETURN;
IF expungeLogLength = 0 THEN expungePosToUse ¬ erStrm.GetIndex[];
WalnutStream.SetHighWaterMark[eStrm, expungePosToUse, -1];
eStrm.SetIndex[expungePosToUse];
rStrm.SetIndex[currentLogPos];
opsH.expungeHandle ¬ NEW[ExpungeHandleRec ¬ [thisLog: opsH.rootHandle.expungeLog]];
opsH.expungeHandle.entryLengthHint ¬ -1;
opsH.expungeHandle.entryPos ¬ currentLogPos;
opsH.expungeHandle.eStrm ¬ eStrm;
opsH.expungeHandle.rStrm ¬ rStrm;
EXIT
EXITS error =>
WalnutRoot.StatsReport[opsH, "\n *** During Start Expunge %g", [rope[reason.explanation]] ];
END;
ENDLOOP;
IF opsH.expungeHandle = NIL THEN WalnutRoot.StopExpunge[opsH];
RETURN[opsH.expungeHandle # NIL]
};
CopyBytesToExpungeLog:
PUBLIC
PROC[opsH: WalnutOpsHandle, bytesToCopy:
INT] = {
Copies bytesToCopy from the currentLog at its current position onto the end of the expungeLog; does not flush the expungeLog (must use GetExpungeProgress)
eH: ExpungeHandle = opsH.expungeHandle;
eH.eStrm.SetIndex[eH.eStrm.GetLength[]]; -- to be sure
WalnutStream.CopyBytes[from: eH.rStrm, to: eH.eStrm, num: bytesToCopy];
eH.entryPos ¬ eH.rStrm.GetIndex[];
eH.entryLengthHint ¬ -1; -- unknown
};
GetExpungeProgress:
PUBLIC
PROC[opsH: WalnutOpsHandle]
RETURNS[currentLogPos, expungeLogLength: INT] = {
Get the read position of the current log and the length of the expunge log (this information can later be used to restart an expunge that was in progress); forces a flush (commit) of the expunge log - raises WalnutDefs.Error with code = $LogTransAbort if the flush fails
eH: ExpungeHandle = opsH.expungeHandle;
WalnutStream.FlushStream[eH.eStrm, TRUE];
WalnutRoot.CommitAndContinue[opsH];
currentLogPos ¬ eH.entryPos;
expungeLogLength ¬ eH.eStrm.GetLength[];
};
EndExpunge:
PUBLIC
PROC[opsH: WalnutOpsHandle] = {
WalnutRoot.StopExpunge[opsH];
opsH.expungeHandle ¬ NIL;
};
Parsing a log for Expunge
SetPosition:
PUBLIC
PROC[opsH: WalnutOpsHandle, startPos:
INT]
RETURNS[charsSkipped:
INT] = {
next: INT;
eH: ExpungeHandle = opsH.expungeHandle;
eH.rStrm.SetIndex[startPos];
next ¬ WalnutStream.FindNextEntry[eH.rStrm];
eH.entryLengthHint ¬ -1;
IF next # -1 THEN eH.entryPos ¬ next;
RETURN[IF next = -1 THEN next ELSE next - startPos];
};
SetIndex:
PUBLIC PROC[opsH: WalnutOpsHandle, pos:
INT] = {
eH: ExpungeHandle = opsH.expungeHandle;
eH.rStrm.SetIndex[pos];
eH.entryLengthHint ¬ -1;
eH.entryPos ¬ pos;
};
PeekEntry:
PUBLIC
PROC[opsH: WalnutOpsHandle]
RETURNS[ident:
ATOM, msgID:
ROPE, at:
INT] = {
returns at = -1 if there is not a valid entry at the current position. The entry is not "consumed" so that performing a NextEntry or a CopyEntry after reading the ident & msg ID information will copy or retrieve the entry returned. Performing successive PeekEntry's will NOT advance the log position; use SkipEntry to advance the log
Does not check if ident is for a legal log entry type.
length: INT;
eH: ExpungeHandle = opsH.expungeHandle;
eStrm: STREAM = eH.rStrm;
[ident, msgID, length] ¬ WalnutStream.PeekEntry[eStrm, FALSE];
at ¬ eStrm.GetIndex[];
eH.entryLengthHint ¬ length;
IF length = -1
THEN {
IF eH.entryPos = eStrm.GetLength[] THEN at ¬ eH.entryPos ELSE at ¬ -1;
};
};
SkipEntry:
PUBLIC PROC[opsH: WalnutOpsHandle]
RETURNS[ok:
BOOL] = {
returns NOT ok if there is not a valid entry at the current position, and does not advance the log position. If there is a valid entry, it is "consumed", and the log is advanced.
eH: ExpungeHandle = opsH.expungeHandle;
length: INT;
ok ¬ FALSE;
IF eH.entryLengthHint = -1
THEN
length ¬ WalnutStream.PeekEntry[eH.rStrm, TRUE].length
ELSE {
length ¬ eH.entryLengthHint;
eH.entryLengthHint ¬ -1;
};
IF length = -1 THEN RETURN;
eH.rStrm.SetIndex[eH.entryPos ¬ eH.rStrm.GetIndex[] + length];
ok ¬ TRUE;
};
CopyEntry:
PUBLIC
PROC[opsH: WalnutOpsHandle]
RETURNS[newPosition, bytesCopied:
INT] = {
Copies the next entry from the current log to the expunge log; it returns the new position that the entry now has in the expunge log; does NOT flush (commit) the expungeLog, raises WalnutDefs.Error with code = $ExpungeTransAbort if the expunge transaction aborts
eH: ExpungeHandle = opsH.expungeHandle;
bytesCopied ¬ 0;
newPosition ¬ eH.eStrm.GetIndex[];
IF eH.entryLengthHint = -1
THEN
eH.entryLengthHint ¬ WalnutStream.PeekEntry[eH.rStrm, TRUE].length;
IF eH.entryLengthHint = -1 THEN { newPosition ¬ -1; RETURN };
WalnutStream.CopyBytes[from: eH.rStrm, to: eH.eStrm, num: eH.entryLengthHint];
eH.entryPos ¬ eH.rStrm.GetIndex[];
bytesCopied ¬ eH.entryLengthHint;
eH.entryLengthHint ¬ -1; -- unknown
};
EndCopyEntry:
PUBLIC
PROC[opsH: WalnutOpsHandle]
RETURNS[startCopyPos:
INT] = {
eH: ExpungeHandle = opsH.expungeHandle;
wle: LogEntry = WalnutStream.ReadEntry[eH.rStrm, FALSE].le;
eH.rStrm.SetIndex[eH.entryPos];
IF wle =
NIL
THEN
ERROR WalnutDefs.Error[$log, $NoEntryFound, "During Expunge"];
TRUSTED {
WITH le: wle
SELECT
FROM
EndCopyNewMailInfo => startCopyPos ¬ le.startCopyPos;
EndCopyReadArchiveInfo => startCopyPos ¬ le.startCopyPos;
ENDCASE =>
ERROR WalnutDefs.Error[$log, $WrongEntryFound, "During Expunge"];
};
};
ExpLogLength:
PUBLIC
PROC[opsH: WalnutOpsHandle]
RETURNS[length:
INT] = {
IF opsH.expungeHandle = NIL THEN RETURN[-1];
RETURN[opsH.expungeHandle.rStrm.GetLength[]];
};
GetIndex:
PUBLIC
PROC[opsH: WalnutOpsHandle]
RETURNS[length:
INT] = {
IF opsH.expungeHandle = NIL THEN RETURN[-1];
RETURN[opsH.expungeHandle.rStrm.GetIndex[]];
};
CopyBytes:
PUBLIC
PROC[opsH: WalnutOpsHandle, strm:
STREAM, num:
INT] = {
eH: ExpungeHandle = opsH.expungeHandle;
strm.SetIndex[strm.GetLength[]];
WalnutStream.CopyBytes[from: eH.rStrm, to: strm, num: num];
eH.entryPos ¬ eH.rStrm.GetIndex[];
eH.entryLengthHint ¬ -1; -- unknown
};
ExamineThisEntry:
PUBLIC
PROC[opsH: WalnutOpsHandle]
RETURNS[status: WalnutLogExpunge.EntryStatus] = {
returns noValidEntry if there is not a valid entry at the current position, and does not advance the log position. If there is a valid entry, it is "consumed", and the log is advanced.
IF IO.EndOfStream is raised, EndOfStream is returned and log is not advanced.
eH: ExpungeHandle = opsH.expungeHandle;
length: INT;
curPos: INT;
status ¬ noValidEntry;
IF eH.entryLengthHint = -1
THEN
length ¬ WalnutStream.PeekEntry[eH.rStrm, TRUE].length
ELSE {
length ¬ eH.entryLengthHint;
eH.entryLengthHint ¬ -1;
};
IF length = -1 THEN { status ¬ noValidEntry; RETURN};
status ¬ validEntry;
eH.rStrm.SetIndex[
eH.entryPos ¬ (curPos ¬ eH.rStrm.GetIndex[]) + length !
IO.EndOfStream => { status ¬ EndOfStream; CONTINUE }];
IF status = EndOfStream
THEN
eH.rStrm.SetIndex[eH.entryPos ¬ curPos];
};