<> <> <> <<>> <> DIRECTORY AlpineFS USING [OpenFile, ErrorFromStream, OpenFileFromStream], BasicTime USING [Now], FS USING [Error, ErrorDesc, OpenFile, ErrorFromStream, GetInfo, SetByteCountAndCreatedTime, SetPageCount], IO, RefText USING [TrustTextAsRope], Rope, WalnutDefs USING [Error], WalnutKernelDefs USING [LogEntry], WalnutRoot USING [CommitAndContinue, GetStreamsForExpunge, ReturnExpungeStreams, StatsReport], WalnutStream USING [logFileInfo, Aborted, CopyBytes, FindNextEntry, FlushStream, PeekEntry, ReadEntry, SetHighWaterMark, WriteEntry], WalnutLogExpunge; WalnutLogExpungeImpl: CEDAR PROGRAM IMPORTS AlpineFS, BasicTime, FS, IO, RefText, Rope, WalnutDefs, WalnutRoot, WalnutStream EXPORTS WalnutLogExpunge = BEGIN <> ROPE: TYPE = Rope.ROPE; STREAM: TYPE = IO.STREAM; LogEntry: TYPE = WalnutKernelDefs.LogEntry; <> currentStream: STREAM; currentEntryLengthHint: INT; currentEntryPos: INT; expungeStream: STREAM; keyIs: ROPE; expungeInternalFileID: INT; logSeqNo: INT; <> StartExpunge: PUBLIC PROC[pagesNeeded: INT] RETURNS[expungeFileID: INT] = { <> reason: FS.ErrorDesc; actualPages: INT; of: FS.OpenFile; BEGIN ENABLE BEGIN FS.Error => { reason _ error; GOTO err }; IO.Error => { reason _ AlpineFS.ErrorFromStream[stream]; GOTO err }; UNWIND => { currentStream _ expungeStream _ NIL }; END; [currentStream, expungeStream, keyIs, expungeInternalFileID, logSeqNo] _ WalnutRoot.GetStreamsForExpunge[]; actualPages _ FS.GetInfo[of _ AlpineFS.OpenFileFromStream[expungeStream]].pages; IF actualPages < pagesNeeded THEN FS.SetPageCount[of, pagesNeeded ! FS.Error => IF error.code = $quotaExceeded THEN GOTO quota ELSE REJECT]; IF expungeStream.GetLength[] # 0 THEN { WalnutStream.SetHighWaterMark[expungeStream, 0, -1]; expungeStream.SetIndex[0]; expungeStream.SetLength[0]; }; FS.SetByteCountAndCreatedTime[of, -1, BasicTime.Now[]]; WalnutStream.logFileInfo. key _ keyIs.ToRefText[]; WalnutStream.logFileInfo.internalFileID _ expungeInternalFileID; WalnutStream.logFileInfo.logSeqNo _ logSeqNo; [] _ WalnutStream.WriteEntry[expungeStream, WalnutStream.logFileInfo]; WalnutStream.FlushStream[expungeStream, TRUE]; RETURN[expungeInternalFileID]; EXITS quota => ERROR WalnutDefs.Error[$expungeLog, $quotaExceeded, IO.PutFR[" An ExpungeLog of %g pages exceeds your quota; file has %g pages", IO.int[pagesNeeded], IO.int[actualPages]]]; err => { WalnutRoot.StatsReport["\n *** During start expunge "]; WalnutRoot.StatsReport[reason.explanation]; ERROR WalnutDefs.Error[$expungeLog, reason.code, "During Start Expunge"]; }; END; }; RestartExpunge: PUBLIC PROC[currentLogPos, expungeLogLength: INT] RETURNS[ok: BOOL] = { <> DoRE: PROC = { expungePosToUse: INT _ expungeLogLength; le: LogEntry; keyFromExpLog: REF TEXT; internalFileIDFromExpLog, logSeqNoFromExpLog: INT; reason: FS.ErrorDesc; BEGIN ENABLE BEGIN FS.Error => { reason _ error; GOTO error }; IO.Error => { reason _ AlpineFS.ErrorFromStream[stream]; GOTO error }; UNWIND => { currentStream _ expungeStream _ NIL }; END; ok _ FALSE; [currentStream, expungeStream, keyIs, expungeInternalFileID, logSeqNo] _ WalnutRoot.GetStreamsForExpunge[]; IF expungeStream = NIL THEN RETURN; IF expungeStream.GetLength[] < expungeLogLength THEN RETURN; -- too short expungeStream.SetIndex[0]; le _ WalnutStream.ReadEntry[expungeStream].le; IF le = NIL THEN RETURN; TRUSTED { WITH le: le SELECT FROM LogFileInfo => { keyFromExpLog _ le.key; internalFileIDFromExpLog _ le.internalFileID; logSeqNoFromExpLog _ le.logSeqNo; }; ENDCASE => RETURN; }; IF expungeLogLength = 0 THEN expungePosToUse _ expungeStream.GetIndex[]; IF ~keyIs.Equal[RefText.TrustTextAsRope[keyFromExpLog]] THEN RETURN; IF internalFileIDFromExpLog # expungeInternalFileID THEN RETURN; IF logSeqNoFromExpLog # logSeqNo THEN RETURN; WalnutStream.SetHighWaterMark[expungeStream, expungePosToUse, -1]; expungeStream.SetIndex[expungePosToUse]; currentStream.SetIndex[currentLogPos]; currentEntryLengthHint _ -1; currentEntryPos _ currentLogPos; ok _ TRUE; EXITS error => { WalnutRoot.StatsReport["\n *** During Start Expunge "]; WalnutRoot.StatsReport[reason.explanation]; ok _ FALSE; }; END; }; DoRE[]; IF ~ok THEN { WalnutRoot.ReturnExpungeStreams[]; currentStream _ expungeStream _ NIL; }; }; CopyBytesToExpungeLog: PUBLIC PROC[bytesToCopy: INT] = { <> Cbel: PROC = { expungeStream.SetIndex[expungeStream.GetLength[]]; -- to be sure WalnutStream.CopyBytes[ from: currentStream, to: expungeStream, num: bytesToCopy]; currentEntryPos _ currentStream.GetIndex[]; currentEntryLengthHint _ -1; -- unknown }; CarefullyApply[Cbel, "CopyBytesToExpungeLog"]; }; GetExpungeProgress: PUBLIC PROC RETURNS[currentLogPos, expungeLogLength: INT] = { <> Gep: PROC = { WalnutStream.FlushStream[expungeStream, TRUE]; WalnutRoot.CommitAndContinue[]; currentLogPos _ currentEntryPos; expungeLogLength _ expungeStream.GetLength[]; }; CarefullyApply[Gep, "GetExpungeProgress"]; }; <<>> EndExpunge: PUBLIC PROC = { expungeStream _ currentStream _ NIL }; Shutdown: PUBLIC PROC = { expungeStream _ currentStream _ NIL }; <> SetPosition: PUBLIC PROC[startPos: INT] RETURNS[charsSkipped: INT] = { next: INT; SetPs: PROC = { currentStream.SetIndex[startPos]; next _ WalnutStream.FindNextEntry[currentStream]; currentEntryLengthHint _ -1; IF next # -1 THEN currentEntryPos _ next; }; CarefullyApply[SetPs, "SetPosition"]; RETURN[IF next = -1 THEN next ELSE next - startPos]; }; SetIndex: PUBLIC PROC[pos: INT] = { Si: PROC = { currentStream.SetIndex[pos]; currentEntryLengthHint _ -1; currentEntryPos _ pos; }; CarefullyApply[Si, "SetIndex"]; }; PeekEntry: PUBLIC PROC RETURNS[ident: ATOM, msgID: REF TEXT, at: INT] = { <> Pne: PROC = { length: INT; [ident, msgID, length] _ WalnutStream.PeekEntry[currentStream]; currentEntryLengthHint _ length; IF length = -1 THEN { IF currentEntryPos = currentStream.GetLength[] THEN at _ currentEntryPos ELSE at _ -1; RETURN; }; at _ currentEntryPos; }; CarefullyApply[Pne, "PeekEntry"]; }; <<>> SkipEntry: PUBLIC PROC RETURNS[ok: BOOL] = { <> Sne: PROC = { length: INT; ok _ FALSE; IF currentEntryLengthHint = -1 THEN length _ WalnutStream.PeekEntry[currentStream].length ELSE { length _ currentEntryLengthHint; currentEntryLengthHint _ -1; }; IF length = -1 THEN RETURN; currentStream.SetIndex[currentEntryPos _ currentStream.GetIndex[] + length]; ok _ TRUE; }; CarefullyApply[Sne, "SkipEntry"]; }; CopyEntry: PUBLIC PROC RETURNS[newPosition, bytesCopied: INT] = { <> Ce: PROC = { bytesCopied _ 0; newPosition _ expungeStream.GetIndex[]; IF currentEntryLengthHint = -1 THEN currentEntryLengthHint _ WalnutStream.PeekEntry[currentStream].length; IF currentEntryLengthHint = -1 THEN {newPosition _ -1; RETURN}; WalnutStream.CopyBytes[ from: currentStream, to: expungeStream, num: currentEntryLengthHint]; currentEntryPos _ currentStream.GetIndex[]; bytesCopied _ currentEntryLengthHint; currentEntryLengthHint _ -1; -- unknown }; CarefullyApply[Ce, "CopyEntry"]; }; EndCopyEntry: PUBLIC PROC RETURNS[startCopyPos: INT] = { Ne: PROC[] = { wle: LogEntry _ WalnutStream.ReadEntry[currentStream].le; currentStream.SetIndex[currentEntryPos]; IF wle = NIL THEN ERROR WalnutDefs.Error[$log, $NoEntryFound, "During Expunge"]; TRUSTED { WITH le: wle SELECT FROM EndCopyNewMailInfo => startCopyPos _ le.startCopyPos; EndCopyReadArchiveInfo => startCopyPos _ le.startCopyPos; ENDCASE => ERROR WalnutDefs.Error[$log, $WrongEntryFound, "During Expunge"]; }; }; startCopyPos _ 0; CarefullyApply[Ne, "EndCopyNewMailEntry"]; }; LogLength: PUBLIC PROC RETURNS[length: INT] = { IF currentStream = NIL THEN RETURN[-1]; RETURN[currentStream.GetLength[]]; }; GetIndex: PUBLIC PROC RETURNS[length: INT] = { IF currentStream = NIL THEN RETURN[-1]; RETURN[currentStream.GetIndex[]]; }; CopyBytes: PUBLIC PROC[strm: STREAM, num: INT] = { Cb: PROC = { strm.SetIndex[strm.GetLength[]]; WalnutStream.CopyBytes[from: currentStream, to: strm, num: num]; currentEntryPos _ currentStream.GetIndex[]; currentEntryLengthHint _ -1; -- unknown }; CarefullyApply[Cb, "CopyBytes"]; }; ExamineThisEntry: PUBLIC PROC RETURNS[status: WalnutLogExpunge.EntryStatus] = { <> <> Ete: PROC = { length: INT; curPos: INT; status _ noValidEntry; IF currentEntryLengthHint = -1 THEN length _ WalnutStream.PeekEntry[currentStream].length ELSE { length _ currentEntryLengthHint; currentEntryLengthHint _ -1; }; IF length = -1 THEN { status _ noValidEntry; RETURN}; status _ validEntry; currentStream.SetIndex[currentEntryPos _ (curPos _ currentStream.GetIndex[]) + length ! IO.EndOfStream => { status _ EndOfStream; CONTINUE }]; IF status = EndOfStream THEN currentStream.SetIndex[currentEntryPos _ curPos]; }; CarefullyApply[Ete, "ExamineThisEntry"]; }; <> CarefullyApply: PROC[proc: PROC[], who: ROPE ] = { ENABLE BEGIN IO.Error => IF WalnutStream.Aborted[stream] THEN GOTO aborted ELSE { ed: FS.ErrorDesc; ed _ FS.ErrorFromStream[stream ! FS.Error, IO.Error => CONTINUE]; IF ed.code = $quotaExceeded THEN GOTO quota; WalnutRoot.ReturnExpungeStreams[]; currentStream _ expungeStream _ NIL; REJECT }; FS.Error => { IF error.code = $transAborted THEN GOTO aborted; IF error.code = $quotaExceeded THEN GOTO quota; WalnutRoot.ReturnExpungeStreams[]; currentStream _ expungeStream _ NIL; REJECT }; UNWIND => { WalnutRoot.ReturnExpungeStreams[]; currentStream _ expungeStream _ NIL; }; END; proc[]; EXITS aborted => { WalnutRoot.StatsReport["\n **** Trans abort during expunge doing "]; WalnutRoot.StatsReport[who]; WalnutRoot.ReturnExpungeStreams[]; currentStream _ expungeStream _ NIL; ERROR WalnutDefs.Error[$log, $TransactionAbort, "During expunge"]; }; quota => { WalnutRoot.StatsReport["\n **** Quota exceeded during expunge doing "]; WalnutRoot.StatsReport[who]; WalnutRoot.ReturnExpungeStreams[]; currentStream _ expungeStream _ NIL; ERROR WalnutDefs.Error[$log, $QuotaExceeded, "During expunge"]; }; }; END.