IMPORTS
BasicTime, FS, IO, MailUtils, RefText, Rope, RuntimeError, SendMailOps,
WalnutDefs, WalnutRoot, WalnutStream
Types
GMT: TYPE = BasicTime.GMT;
ROPE: TYPE = Rope.ROPE;
STREAM: TYPE = IO.STREAM;
LogEntry: TYPE = WalnutKernelDefs.LogEntry;
MsgLogEntry: TYPE = WalnutKernelDefs.MsgLogEntry;
TiogaContents: TYPE = ViewerTools.TiogaContents;
LogInfo: TYPE = WalnutDefs.LogInfo;
WalnutOpsHandle: TYPE = WalnutDefs.WalnutOpsHandle;
LogHandle: TYPE = WalnutLog.LogHandle;
LogHandleRec: PUBLIC TYPE = WalnutLog.LogHandleRec;
RootHandle: TYPE = WalnutRoot.RootHandle;
RootHandleRec: PUBLIC TYPE = WalnutRoot.RootHandleRec;
InternalLogInfo: TYPE = WalnutRoot.InternalLogInfo;
the all streams are under the same transaction; WalnutLogImpl must get them WalnutRoot to open the files
Logging of operations that perform actions on the Walnut database
ExpungeMsgs:
PUBLIC PROC[opsH: WalnutOpsHandle]
RETURNS[at, next:
INT] =
{ [at, next] Ź WriteEntry[opsH, WalnutStream.logInfoRef.expungeMsgs] };
WriteExpungeLog:
PUBLIC PROC[opsH: WalnutOpsHandle]
RETURNS[at, next:
INT] = {
[at, next] Ź WriteEntry[opsH, WalnutStream.logInfoRef.writeExpungeLog];
};
CreateMsgSet:
PUBLIC PROC[opsH: WalnutOpsHandle, name:
ROPE]
RETURNS[at, next:
INT] = {
WalnutStream.logInfoRef.createMsgSet.msgSet Ź name;
[at, next] Ź WriteEntry[opsH, WalnutStream.logInfoRef.createMsgSet]
};
EmptyMsgSet:
PUBLIC PROC[opsH: WalnutOpsHandle, msgSet:
ROPE]
RETURNS[at, next: INT] = {
WalnutStream.logInfoRef.emptyMsgSet.msgSet Ź msgSet;
[at, next] Ź WriteEntry[opsH, WalnutStream.logInfoRef.emptyMsgSet]
};
DestroyMsgSet:
PUBLIC PROC[opsH: WalnutOpsHandle, msgSet:
ROPE]
RETURNS[at, next:
INT] = {
WalnutStream.logInfoRef.destroyMsgSet.msgSet Ź msgSet;
[at, next] Ź WriteEntry[opsH, WalnutStream.logInfoRef.destroyMsgSet]
};
AddMsg:
PUBLIC PROC[opsH: WalnutOpsHandle, msg, to:
ROPE]
RETURNS[at, next:
INT] = {
WalnutStream.logInfoRef.addMsg.msg Ź msg;
WalnutStream.logInfoRef.addMsg.to Ź to;
[at, next] Ź WriteEntry[opsH, WalnutStream.logInfoRef.addMsg]
};
RemoveMsg:
PUBLIC PROC[opsH: WalnutOpsHandle, msg, from:
ROPE]
RETURNS[at, next: INT] = {
WalnutStream.logInfoRef.removeMsg.msg Ź msg;
WalnutStream.logInfoRef.removeMsg.from Ź from;
[at, next] Ź WriteEntry[opsH, WalnutStream.logInfoRef.removeMsg]
};
MoveMsg:
PUBLIC PROC[opsH: WalnutOpsHandle, msg, from, to:
ROPE]
RETURNS[at, next: INT] = {
WalnutStream.logInfoRef.moveMsg.msg Ź msg;
WalnutStream.logInfoRef.moveMsg.from Ź from;
WalnutStream.logInfoRef.moveMsg.to Ź to;
[at, next] Ź WriteEntry[opsH, WalnutStream.logInfoRef.moveMsg]
};
HasBeenRead:
PUBLIC PROC[opsH: WalnutOpsHandle, msg:
ROPE]
RETURNS[at, next:
INT] = {
WalnutStream.logInfoRef.hasbeenRead.msg Ź msg;
[at, next] Ź WriteEntry[opsH, WalnutStream.logInfoRef.hasbeenRead]
};
RecordNewMailInfo:
PUBLIC PROC[opsH: WalnutOpsHandle, logLen:
INT, when:
GMT, server:
ROPE, num:
INT]
RETURNS[at, next: INT] = {
WalnutStream.logInfoRef.recordNewMailInfo.logLen Ź logLen;
WalnutStream.logInfoRef.recordNewMailInfo.when Ź when;
WalnutStream.logInfoRef.recordNewMailInfo.server Ź server;
WalnutStream.logInfoRef.recordNewMailInfo.num Ź num;
[at, next] Ź WriteEntry[opsH, WalnutStream.logInfoRef.recordNewMailInfo]
};
StartCopyNewMail:
PUBLIC PROC[opsH: WalnutOpsHandle]
RETURNS[at, next:
INT] =
{ [at, next] Ź WriteEntry[opsH, WalnutStream.logInfoRef.startCopyNewMail] };
AcceptNewMail:
PUBLIC PROC[opsH: WalnutOpsHandle]
RETURNS[at, next:
INT] =
{ [at, next] Ź WriteEntry[opsH, WalnutStream.logInfoRef.acceptNewMail] };
StartReadArchiveFile:
PUBLIC PROC[opsH: WalnutOpsHandle, file:
ROPE, msgSet:
ROPE]
RETURNS[at, next:
INT] = {
WalnutStream.logInfoRef.startReadArchiveFile.file Ź file;
WalnutStream.logInfoRef.startReadArchiveFile.msgSet Ź msgSet;
[at, next] Ź WriteEntry[opsH, WalnutStream.logInfoRef.startReadArchiveFile]
};
EndReadArchiveFile:
PUBLIC PROC[opsH: WalnutOpsHandle]
RETURNS[at, next:
INT] =
{ [at, next] Ź WriteEntry[opsH, WalnutStream.logInfoRef.endReadArchiveFile] };
StartCopyReadArchive:
PUBLIC PROC[opsH: WalnutOpsHandle]
RETURNS[at, next:
INT] =
{ [at, next] Ź WriteEntry[opsH, WalnutStream.logInfoRef.startCopyReadArchive] };
Managing the current log
InitLogStream:
PUBLIC PROC[opsH: WalnutOpsHandle]
RETURNS[
BOOL] = {
opsH.logHandle Ź NEW[LogHandleRec Ź [] ];
OpenLogStreams[opsH];
RETURN[TRUE];
};
LogLength:
PUBLIC PROC[opsH: WalnutOpsHandle]
RETURNS[length:
INT] = {
li: InternalLogInfo = opsH.rootHandle.currentLog;
RETURN[ IF li.writeStream # NIL THEN li.writeStream.GetLength[] ELSE li.readStream.GetLength[] ];
};
OpenLogStreams:
PUBLIC PROC[opsH: WalnutOpsHandle] = {
IF opsH.rootHandle.currentLog =
NIL OR opsH.rootHandle.currentLog.readStream =
NIL THEN {
WalnutRoot.OpenLogStreams[opsH];
IF opsH.logHandle = NIL THEN opsH.logHandle Ź NEW[LogHandleRec Ź [] ];
IF opsH.logHandle.scanning THEN SetLogIndex[opsH, opsH.logHandle.entryPos, TRUE];
};
};
ReleaseWriteLock:
PUBLIC PROC[opsH: WalnutOpsHandle] = {
logH: LogHandle = opsH.logHandle;
IF opsH.rootHandle.currentLog.writeStream = NIL THEN RETURN;
WalnutRoot.ReleaseWriteLock[opsH];
IF logH.scanning THEN SetLogIndex[opsH, logH.entryPos, TRUE];
};
ResetLog:
PUBLIC PROC[opsH: WalnutOpsHandle, newLength:
INT] = {
strm: STREAM;
IF opsH.logHandle = NIL THEN RETURN;
SetLogIndex[opsH, newLength, TRUE];
WalnutStream.SetHighWaterMark[strm Ź opsH.rootHandle.currentLog.writeStream,
newLength, -1];
strm.SetLength[newLength];
strm.Flush[];
April 26, 1991, DCS, assure invariant that log streams are positioned at end, except temporarily when explicitly repositioned.
strm.SetIndex[newLength];
opsH.logHandle.scanning Ź FALSE;
};
ForgetLogStreams:
PUBLIC PROC[opsH: WalnutOpsHandle] = {
logH: LogHandle = opsH.logHandle;
IF logH = NIL THEN RETURN;
logH.scanning Ź FALSE;
logH.strm Ź NIL;
logH.scanningLog Ź NIL;
};
ShutdownLog:
PUBLIC PROC[opsH: WalnutOpsHandle] = {
logH: LogHandle = opsH.logHandle;
IF logH = NIL THEN RETURN;
logH.scanning Ź FALSE;
logH.strm Ź NIL;
logH.scanningLog Ź NIL;
WalnutRoot.CloseTransaction[opsH];
};
Write a message on the current log
WriteMessage:
PUBLIC PROC[opsH: WalnutOpsHandle, msg:
ROPE, body: TiogaContents]
RETURNS[at: INT]= {
wStrm: STREAM = opsH.rootHandle.currentLog.writeStream;
wStrm.SetIndex[wStrm.GetLength[]];
WalnutStream.logInfoRef.createMsg.msg Ź msg;
IF ( body.formatting.Length[] # 0 )
AND ( body.contents.Fetch[0] = '\r
OR body.contents.Fetch[0] = '\l )
THEN {
last: INT Ź body.contents.Length[] - 1;
IF body.contents.Fetch[last] = '\000
THEN -- NUL for padding
{ body.contents Ź Rope.Substr[body.contents, 1, last-1];
body.formatting Ź Rope.Concat["\000", body.formatting]
}
ELSE
IF body.contents.Fetch[0] = '\r
OR body.contents.Fetch[0] = '\l
THEN
body.contents Ź Rope.Substr[body.contents, 1]
};
WalnutStream.logInfoRef.createMsg.textLen Ź body.contents.Length[];
WalnutStream.logInfoRef.createMsg.formatLen Ź body.formatting.Length[];
at Ź WalnutStream.WriteEntry[wStrm, WalnutStream.logInfoRef.createMsg];
WalnutStream.WriteMsgBody[wStrm, body];
WalnutStream.FlushStream[strm: wStrm, setCreateDate: TRUE];
};
Parsing a log (used by Archive and Replay/Scavenge
SetPosition:
PUBLIC PROC[opsH: WalnutOpsHandle, startPos:
INT]
RETURNS[charsSkipped: INT] = {
next: INT;
logH: LogHandle = opsH.logHandle;
SetLogIndex[opsH, startPos, TRUE];
next Ź WalnutStream.FindNextEntry[logH.strm];
logH.entryLengthHint Ź -1;
IF next # -1 THEN logH.entryPos Ź next;
RETURN[IF next = -1 THEN next ELSE next - startPos];
};
SetIndex:
PUBLIC PROC[opsH: WalnutOpsHandle, pos:
INT] = {
logH: LogHandle = opsH.logHandle;
SetLogIndex[opsH, pos, TRUE];
logH.entryLengthHint Ź -1;
logH.entryPos Ź pos;
};
NextAt:
PUBLIC PROC[opsH: WalnutOpsHandle]
RETURNS[at:
INT] =
{ RETURN[opsH.logHandle.entryPos] };
NextEntry:
PUBLIC PROC[opsH: WalnutOpsHandle]
RETURNS[le: LogEntry, at:
INT] =
{ [le, at] Ź NextEntryInternal[opsH, FALSE] };
NextEntryInternal:
PROC[opsH: WalnutOpsHandle, quick:
BOOL]
RETURNS[le: LogEntry, at: INT] = {
logH: LogHandle = opsH.logHandle;
length, startPos: INT;
IF ~logH.scanning THEN ERROR WalnutDefs.Error[$log, $NotScanning];
[le, length] Ź WalnutStream.ReadEntry[logH.strm, quick];
startPos Ź logH.entryPos;
logH.entryLengthHint Ź -1;
IF length # -1 THEN logH.entryPos Ź startPos + length; -- normal case
RETURN[le, startPos];
};
QuickScan:
PUBLIC PROC[opsH: WalnutOpsHandle]
RETURNS[le: LogEntry, at:
INT] =
{ [le, at] Ź NextEntryInternal[opsH: opsH, quick: TRUE] };
ArchiveEntry:
PUBLIC PROC[opsH: WalnutOpsHandle, to:
STREAM]
RETURNS[ok:
BOOL]= {
Copies the next entry of the current log to an archive stream.
logH: LogHandle = opsH.logHandle;
IF ~logH.scanning THEN ERROR WalnutDefs.Error[$log, $NotScanning];
ok Ź FALSE;
IF logH.entryLengthHint = -1
THEN
logH.entryLengthHint Ź
WalnutStream.PeekEntry[logH.strm, TRUE].length;
IF logH.entryLengthHint = -1 THEN RETURN;
WalnutStream.CopyBytes[
from: logH.strm, to: to, num: logH.entryLengthHint ];
logH.entryPos Ź logH.strm.GetIndex[];
logH.entryLengthHint Ź -1; -- unknown
ok Ź TRUE;
};
CopyBytesToArchive:
PUBLIC PROC[
opsH: WalnutOpsHandle, to: STREAM, startPos, num: INT] = {
SetLogIndex[opsH, startPos, FALSE];
WalnutStream.CopyBytes[from: opsH.logHandle.strm, to: to, num: num];
};
Copying NewMail/ReadArchive logs
PrepareToCopyTempLog:
PUBLIC PROC[opsH: WalnutOpsHandle, which: WalnutKernelDefs.WhichTempLog, pagesAlreadyCopied:
INT, reportProc: WalnutDefs.CheckReportProc]
RETURNS[
BOOL] = {
makes sure the writeStream is long enought to copy the which log onto
RETURN[WalnutRoot.PrepareToCopyTempLog[opsH, which, pagesAlreadyCopied, reportProc] ];
};
CopyTempLog:
PUBLIC PROC[opsH: WalnutOpsHandle, which: WalnutKernelDefs.WhichTempLog, startCopyPos, fromPos:
INT, reportProc: WalnutDefs.CheckReportProc ] = {
copies the which templog, starting at fromPos, to the end of the writeStream; writes an EndCopy log entry, and sets the length of which to 0; if the operation fails, WalnutDefs.Error is raised; NOT done inside carefullyApply, so that it can catch transaction aborts
exp: ROPE;
endLE: LogEntry;
abort: BOOL Ź TRUE;
ok: BOOL Ź FALSE;
copyStrm, wStrm: STREAM;
bytesToCopy: INT;
bytesPerCopy: INT = FS.BytesForPages[200];
opsH.logHandle.scanning Ź FALSE;
BEGIN ENABLE BEGIN
IO.Error => {
ed: FS.ErrorDesc;
<<ed Ź FS.ErrorFromStream[stream ! IO.Error => CONTINUE];>>
IF ed.code = $transAborted
OR ed.code = $remoteCallFailed
THEN
{ exp Ź ed.explanation; GOTO exit };
REJECT;
};
FS.Error =>
{
IF error.code = $transAborted THEN GOTO exit;
IF error.code = $remoteCallFailed
THEN {
exp Ź error.explanation;
GOTO exit;
};
REJECT;
};
END;
SELECT which
FROM
newMail => {
WalnutStream.logInfoRef.endCopyNewMailInfo.startCopyPos Ź startCopyPos;
endLE Ź WalnutStream.logInfoRef.endCopyNewMailInfo;
};
readArchive => {
WalnutStream.logInfoRef.endCopyReadArchiveInfo.startCopyPos Ź startCopyPos;
endLE Ź WalnutStream.logInfoRef.endCopyReadArchiveInfo
};
ENDCASE => ERROR WalnutDefs.Error[$log, $UnknownLogFileType];
copyStrm Ź WalnutRoot.GetStreamsForCopy[opsH, which];
wStrm Ź opsH.rootHandle.currentLog.writeStream;
Play it safe! DCS April 26, 1991
wStrm.SetIndex[wStrm.GetLength[]];
bytesToCopy Ź copyStrm.GetLength[] - fromPos;
IF bytesToCopy > 0
THEN {
first: BOOL Ź TRUE;
copyStrm.SetIndex[fromPos];
<<wStrm.SetIndex[wStrm.GetLength[]];>> -- Moved outside conditional.
DO
thisCopy: INT Ź MIN[bytesPerCopy, bytesToCopy];
WalnutStream.CopyBytes[to: wStrm, from: copyStrm, num: thisCopy];
WalnutStream.FlushStream[wStrm, TRUE];
WalnutRoot.CommitAndContinue[opsH];
IF first THEN first Ź FALSE ELSE reportProc[opsH, "#"];
bytesToCopy Ź bytesToCopy - thisCopy;
IF bytesToCopy = 0 THEN EXIT;
ENDLOOP;
};
[] Ź WalnutStream.WriteEntry[wStrm, endLE];
WalnutStream.FlushStream[wStrm, TRUE];
WalnutRoot.CommitAndContinue[opsH];
EXITS
exit => {
who: ROPE Ź IF which = newMail THEN "NewMail" ELSE "ReadArchive";
IF ~abort THEN ERROR WalnutDefs.Error[$log, $AccessFailed, who];
WalnutRoot.AbortTempCopy[opsH, which];
WalnutRoot.CloseTransaction[opsH];
ERROR WalnutDefs.Error[
$log, IF exp = NIL THEN $TransactionAbort ELSE $RemoteCallFailed,
IO.PutFR1["Copying the %g log", [rope[who]]] ];
};
END;
WalnutRoot.FinishCopy[opsH, which];
};
FinishTempLogCopy:
PUBLIC PROC[opsH: WalnutOpsHandle, which: WalnutKernelDefs.WhichTempLog] =
{ WalnutRoot.FinishCopy[opsH, which] };
CreateArchiveLog:
PUBLIC PROC[
opsH: WalnutOpsHandle, fileToRead:
STREAM, msgSet:
ROPE, reportProc: WalnutDefs.CheckReportProc]
RETURNS[ok:
BOOL] = {
reason: ROPE;
[ok, reason] Ź CreateReadArchiveLog[opsH, fileToRead, msgSet, reportProc];
IF ok THEN [] Ź WriteEntry[opsH, WalnutStream.logInfoRef.endReadArchiveFile];
WalnutRoot.ReturnReadArchiveStream[opsH];
};
Utilities
GetTiogaContents:
PUBLIC PROC[opsH: WalnutOpsHandle, textStart, textLen, formatLen:
INT]
RETURNS[contents: TiogaContents] = {
logH: LogHandle = opsH.logHandle;
IF formatLen # 0
THEN -- tioga formatting nonsense
{ textLen Ź textLen + 1; textStart Ź textStart - 1};
SetLogIndex[opsH, textStart, FALSE];
contents Ź NEW[ViewerTools.TiogaContentsRec];
contents.contents Ź WalnutStream.ReadRope[logH.strm, textLen];
contents.formatting Ź WalnutStream.ReadRope[logH.strm, formatLen];
};
GetRefTextFromLog:
PUBLIC PROC[opsH: WalnutOpsHandle, startPos, length:
INT, text:
REF TEXT] = {
natLen: NAT;
bytesRead: NAT;
logH: LogHandle = opsH.logHandle;
CheckSize: PROC[len: INT] RETURNS[nat: NAT] = { nat Ź len };
natLen Ź CheckSize[length ! RuntimeError.BoundsFault => GOTO noGood];
SetLogIndex[opsH, startPos, FALSE];
bytesRead Ź logH.strm.GetBlock[block: text, startIndex: 0, count: natLen];
IF bytesRead < natLen THEN text.length Ź 0;
EXITS
noGood =>
ERROR WalnutDefs.Error[$log, $BadLength,
IO.PutFR1[" Can't convert %g into a nat", [integer[length]] ]];
};
Local Utilities
SetLogIndex:
PROC[opsH: WalnutOpsHandle, pos:
INT, setScan:
BOOL] = {
logH: LogHandle = opsH.logHandle;
logH.strm Ź opsH.rootHandle.currentLog.readStream;
IF ( logH.scanning Ź setScan ) THEN logH.scanningLog Ź opsH.rootHandle.currentLog;
logH.strm.SetIndex[pos ! IO.Error => GOTO err];
EXITS err =>
ERROR WalnutDefs.Error[$log, $BadLogIndex, IO.PutFR1["Pos beyond end of log %g", [integer[pos]]] ];
};
WriteEntry:
PROC[opsH: WalnutOpsHandle, le: LogEntry]
RETURNS[at, next:
INT] = {
wStrm: STREAM = opsH.rootHandle.currentLog.writeStream;
at Ź WalnutStream.WriteEntry[wStrm, le];
WalnutStream.FlushStream[wStrm, TRUE];
next Ź wStrm.GetIndex[];
};
Writing and Reading NewMail/ReadArchive files
GetNewMailLog:
PUBLIC PROC[opsH: WalnutOpsHandle, lengthRequired, pagesWanted:
INT]
RETURNS[STREAM] = {
opens the NewMailLog (aborting any current stream that might be open); truncates it to the required length; returns NIL if the log is shorter than required, or if the file could not be opened
BEGIN ENABLE IO.Error,
FS.Error => {
AbortMailLog[opsH];
GOTO exit;
};
AbortMailLog[opsH];
IF NOT WalnutRoot.GetNewMailStream[opsH, lengthRequired, pagesWanted].ok THEN RETURN[NIL];
EXITS
exit => AbortMailLog[opsH];
END;
RETURN[opsH.rootHandle.newMailLog.stream];
};
CloseNewMailLog:
PUBLIC PROC[opsH: WalnutOpsHandle] = {
mStrm: STREAM = opsH.rootHandle.newMailLog.stream;
IF mStrm = NIL THEN RETURN;
WalnutRoot.ReturnNewMailStream[opsH];
};
AbortMailLog:
PROC[opsH: WalnutOpsHandle] = {
mStrm: STREAM = opsH.rootHandle.newMailLog.stream;
IF mStrm = NIL THEN RETURN;
WalnutStream.AbortStream[mStrm ! IO.Error, FS.Error => CONTINUE];
WalnutRoot.ReturnNewMailStream[opsH];
};
CreateReadArchiveLog:
PUBLIC PROC[opsH: WalnutOpsHandle, fileToRead:
STREAM, msgSet:
ROPE, reportProc: WalnutDefs.CheckReportProc]
RETURNS[ok: BOOL, reason: ROPE] = {
reads file and writes the ReadArchiveLogFile; returns ok if all went well
readArchiveStrm: STREAM;
lastCommitPosInFileToRead: INT;
fileToReadPages: INT Ź FS.PagesForBytes[fileToRead.GetLength[]];
possibleArchiveLength: INT Ź MAX[20, fileToReadPages + (fileToReadPages/5)]; -- 120%
ok Ź FALSE;
reason Ź NIL;
BEGIN ENABLE BEGIN
IO.Error => {
<<reason Ź FS.ErrorFromStream[fileToRead].explanation;>>
<<IF reason = NIL THEN reason Ź FS.ErrorFromStream[readArchiveStrm].explanation;>>
IF reason = NIL THEN reason Ź "IO error during createReadArchiveLog";
GOTO exit;
};
FS.Error => { reason Ź error.explanation; GOTO exit };
UNWIND => GOTO exit;
END;
IF (readArchiveStrm Ź WalnutRoot.GetReadArchiveStream[opsH, possibleArchiveLength]) =
NIL THEN
RETURN[FALSE, "Can't get readArchiveStream"];
readArchiveStrm.SetIndex[0];
WalnutStream.SetHighWaterMark[strm: readArchiveStrm, hwmBytes: 0, numPages: -1];
WalnutRoot.CommitAndContinue[opsH];
[ok, lastCommitPosInFileToRead] Ź ArchiveReader[
opsH: opsH, archiveStream: readArchiveStrm,
fileToRead: fileToRead,
msgSet: msgSet, reportProc: reportProc, posToStartInFileToRead: 0];
IF ~ok THEN WalnutRoot.ReturnReadArchiveStream[opsH];
EXITS
exit =>
IF readArchiveStrm #
NIL THEN {
WalnutRoot.ReturnReadArchiveStream[opsH];
};
END;
};
CloseReadArchiveLog:
PUBLIC PROC[opsH: WalnutOpsHandle] = {
WalnutRoot.ReturnReadArchiveStream[opsH];
};