IMPORTS
AlpFile, AlpineFS, FS, IO, RefText, Rope, RuntimeError,
WalnutDefs, WalnutMiscLog, WalnutRoot, WalnutStream
Types
GMT: TYPE = BasicTime.GMT;
ROPE: TYPE = Rope.ROPE;
STREAM: TYPE = IO.STREAM;
LogEntry: TYPE = WalnutKernelDefs.LogEntry;
TiogaContents: TYPE = ViewerTools.TiogaContents;
Variables
-- the read and write (and temp if open) streams are all under the same transaction; WalnutLogImpl must get them from WalnutRoot
writeStream: STREAM ← NIL;
tempStream: STREAM ← NIL; -- when copying a tempFile
readStream: STREAM ← NIL; -- only one log for now
scanning: BOOL ← FALSE;
currentEntryLengthHint: INT ← -1;
currentEntryPos: INT ← -1;
field1: REF TEXT = NEW[TEXT[RefText.line]];
field2: REF TEXT = NEW[TEXT[RefText.line]];
field3: REF TEXT = NEW[TEXT[RefText.line]];
Logging of operations that perform actions on the Walnut database
ExpungeMsgs:
PUBLIC
PROC
RETURNS[at, next:
INT] =
{ [at, next] ← WriteEntry[WalnutStream.expungeMsgs] };
WriteExpungeLog:
PUBLIC
PROC
RETURNS[at, next:
INT] = {
WalnutStream.writeExpungeLog.internalFileID ← WalnutRoot.GetExpungeID[];
[at, next] ← WriteEntry[WalnutStream.writeExpungeLog];
};
CreateMsgSet:
PUBLIC
PROC[name:
ROPE]
RETURNS[at, next:
INT] = {
field1.length ← 0;
WalnutStream.createMsgSet.msgSet ← RefText.AppendRope[field1, name];
[at, next] ← WriteEntry[WalnutStream.createMsgSet]
};
EmptyMsgSet:
PUBLIC
PROC[msgSet:
ROPE]
RETURNS[at, next:
INT] = {
field1.length ← 0;
WalnutStream.emptyMsgSet.msgSet ← RefText.AppendRope[field1, msgSet];
[at, next] ← WriteEntry[WalnutStream.emptyMsgSet]
};
DestroyMsgSet:
PUBLIC
PROC[msgSet:
ROPE]
RETURNS[at, next:
INT] = {
field1.length ← 0;
WalnutStream.destroyMsgSet.msgSet ← RefText.AppendRope[field1, msgSet];
[at, next] ← WriteEntry[WalnutStream.destroyMsgSet]
};
AddMsg:
PUBLIC
PROC[msg:
ROPE, to:
ROPE]
RETURNS[at, next:
INT] = {
field1.length ← 0;
WalnutStream.addMsg.msg ← RefText.AppendRope[field1, msg];
field2.length ← 0;
WalnutStream.addMsg.to ← RefText.AppendRope[field2, to];
[at, next] ← WriteEntry[WalnutStream.addMsg]
};
RemoveMsg:
PUBLIC
PROC[msg:
ROPE, from:
ROPE]
RETURNS[at, next:
INT] = {
field1.length ← 0;
WalnutStream.removeMsg.msg ← RefText.AppendRope[field1, msg];
field2.length ← 0;
WalnutStream.removeMsg.from ← RefText.AppendRope[field2, from];
[at, next] ← WriteEntry[WalnutStream.removeMsg]
};
MoveMsg:
PUBLIC
PROC[msg:
ROPE, from, to:
ROPE]
RETURNS[at, next:
INT] = {
field1.length ← 0;
WalnutStream.moveMsg.msg ← RefText.AppendRope[field1, msg];
field2.length ← 0;
WalnutStream.moveMsg.from ← RefText.AppendRope[field2, from];
field3.length ← 0;
WalnutStream.moveMsg.to ← RefText.AppendRope[field3, to];
[at, next] ← WriteEntry[WalnutStream.moveMsg]
};
HasBeenRead:
PUBLIC
PROC[msg:
ROPE]
RETURNS[at, next:
INT] = {
field1.length ← 0;
WalnutStream.hasbeenRead.msg ← RefText.AppendRope[field1, msg];
[at, next] ← WriteEntry[WalnutStream.hasbeenRead]
};
RecordNewMailInfo:
PUBLIC
PROC[logLen:
INT, when:
GMT, server:
ROPE, num:
INT]
RETURNS[at, next: INT] = {
WalnutStream.recordNewMailInfo.logLen ← logLen;
WalnutStream.recordNewMailInfo.when ← when;
field1.length ← 0;
WalnutStream.recordNewMailInfo.server ← RefText.AppendRope[field1, server];
WalnutStream.recordNewMailInfo.num ← num;
[at, next] ← WriteEntry[WalnutStream.recordNewMailInfo]
};
StartCopyNewMail:
PUBLIC
PROC
RETURNS[at, next:
INT] =
{ [at, next] ← WriteEntry[WalnutStream.startCopyNewMail] };
AcceptNewMail:
PUBLIC
PROC
RETURNS[at, next:
INT] =
{ [at, next] ← WriteEntry[WalnutStream.acceptNewMail] };
StartReadArchiveFile:
PUBLIC
PROC[file:
ROPE, msgSet:
ROPE]
RETURNS[at, next:
INT] = {
field1.length ← 0;
WalnutStream.startReadArchiveFile.file ← RefText.AppendRope[field1, file];
field2.length ← 0;
WalnutStream.startReadArchiveFile.msgSet ← RefText.AppendRope[field2, msgSet];
[at, next] ← WriteEntry[WalnutStream.startReadArchiveFile]
};
EndReadArchiveFile:
PUBLIC
PROC RETURNS[at, next:
INT] =
{ [at, next] ← WriteEntry[WalnutStream.endReadArchiveFile] };
StartCopyReadArchive:
PUBLIC
PROC[]
RETURNS[at, next:
INT] =
{ [at, next] ← WriteEntry[WalnutStream.startCopyReadArchive] };
Managing the current log
AcquireWriteLock:
PUBLIC
PROC = {
Awl:
PROC =
{ WalnutRoot.AcquireWriteLock[] };
CarefullyApply[Awl, "AcquireWriteLock"];
};
ForgetLogStreams:
PUBLIC
PROC =
{ readStream ← writeStream ← NIL };
CloseLogStreams:
PUBLIC
PROC = {
WalnutRoot.CloseTransaction[];
readStream ← writeStream ← NIL;
};
LogLength:
PUBLIC
PROC
RETURNS[length:
INT] = {
Llen:
PROC =
{ length ← writeStream.GetLength[] };
CarefullyApply[Llen, "LogLength"];
};
OpenLogStreams:
PUBLIC
PROC = {
IF writeStream =
NIL
THEN {
[readStream, writeStream] ← WalnutRoot.GetCurrentLogStreams[];
IF scanning THEN readStream.SetIndex[currentEntryPos];
};
};
ReleaseWriteLock:
PUBLIC
PROC = {
IF writeStream = NIL THEN RETURN;
[readStream, writeStream] ← WalnutRoot.ReleaseWriteLock[];
IF scanning THEN readStream.SetIndex[currentEntryPos];
};
ResetLog:
PUBLIC
PROC[newLength:
INT] = {
Rl:
PROC = {
writeStream.SetIndex[newLength];
WalnutStream.SetHighWaterMark[writeStream, newLength, -1];
writeStream.SetIndex[newLength];
writeStream.Flush[];
};
CarefullyApply[Rl, "ResetLog"];
};
ReturnCurrentLogStreams:
PUBLIC
PROC = {
readStream ← writeStream ← NIL;
scanning ← FALSE;
};
ShutdownLog:
PUBLIC
PROC = {
WalnutRoot.CloseTransaction[];
readStream ← writeStream ← NIL;
scanning ← FALSE;
};
Parsing a log (used by Archive and Replay/Scavenge
SetPosition:
PUBLIC
PROC[startPos:
INT]
RETURNS[charsSkipped:
INT] = {
next: INT;
SetPs:
PROC[] = {
readStream.SetIndex[startPos];
next ← WalnutStream.FindNextEntry[readStream];
currentEntryLengthHint ← -1;
IF next # -1 THEN currentEntryPos ← next;
scanning ← TRUE;
};
CarefullyApply[SetPs, "SetPosition"];
RETURN[IF next = -1 THEN next ELSE next - startPos];
};
SetIndex:
PUBLIC PROC[pos:
INT] = {
Si:
PROC[] = {
readStream.SetIndex[pos];
currentEntryLengthHint ← -1;
currentEntryPos ← pos;
scanning ← TRUE;
};
CarefullyApply[Si, "SetIndex"];
};
NextAt:
PUBLIC
PROC
RETURNS[at:
INT] =
{ at ← currentEntryPos };
NextEntry:
PUBLIC
PROC
RETURNS[le: LogEntry, at:
INT] =
{ [le, at] ← NextEntryInternal[FALSE, "NextEntry"] };
NextEntryInternal:
PROC[quick:
BOOL, who:
ROPE]
RETURNS[le: LogEntry, at:
INT] = {
Ne:
PROC[] = {
length: INT;
[le, length] ← WalnutStream.ReadEntry[readStream, quick];
at ← currentEntryPos;
currentEntryLengthHint ← -1;
IF length = -1
THEN {
IF at = readStream.GetLength[] THEN RETURN;
at ← -1; RETURN
};
currentEntryPos ← at + length;
};
IF ~scanning THEN ERROR WalnutDefs.Error[$log, $NotScanning];
CarefullyApply[Ne, who];
};
QuickScan:
PUBLIC
PROC
RETURNS[le: LogEntry, at:
INT] =
{ [le, at] ← NextEntryInternal[quick: TRUE, who: "QuickScan"] };
ArchiveEntry:
PUBLIC
PROC[to:
IO.
STREAM]
RETURNS[ok:
BOOL]= {
Copies the next entry of the current log to an archive stream.
Ane:
PROC[] = {
ok ← FALSE;
IF currentEntryLengthHint = -1
THEN
currentEntryLengthHint ←
WalnutStream.PeekEntry[readStream].length;
IF currentEntryLengthHint = -1 THEN RETURN;
WalnutStream.CopyBytes[
from: readStream, to: to, num: currentEntryLengthHint];
currentEntryPos ← readStream.GetIndex[];
currentEntryLengthHint ← -1; -- unknown
ok ← TRUE;
};
IF ~scanning THEN ERROR WalnutDefs.Error[$log, $NotScanning];
CarefullyApply[Ane, "ArchiveEntry"];
};
CopyBytesToArchive:
PUBLIC
PROC[to:
IO.
STREAM, startPos, num:
INT] = {
Cba:
PROC = {
readStream.SetIndex[startPos];
WalnutStream.CopyBytes[from: readStream, to: to, num: num];
scanning ← FALSE;
};
CarefullyApply[Cba, "CopyBytesToArchive"];
};
Copying NewMail/ReadArchive logs
PrepareToCopyTempLog:
PUBLIC
PROC[which: WalnutKernelDefs.WhichTempLog, pagesAlreadyCopied:
INT, reportProc:
PROC[msg1, msg2, msg3:
ROPE ←
NIL]]
RETURNS[
BOOL] = {
makes sure the writeStream is long enought to copy the which log onto
[writeStream, tempStream] ← WalnutRoot.PrepareToCopyTempLog[which, pagesAlreadyCopied, reportProc];
RETURN[tempStream # NIL];
};
CopyTempLog:
PUBLIC
PROC[which: WalnutKernelDefs.WhichTempLog, startCopyPos, fromPos:
INT, reportProc:
PROC[msg1, msg2, msg3:
ROPE ←
NIL] ] = {
copies the which templog, starting at fromPos, to the end of the writeStream; writes an EndCopy log entry, and sets the length of which to 0; if the operation fails, WalnutDefs.Error is raised; NOT done inside carefullyApply, so that it can catch transaction aborts
exp: ROPE;
endLE: LogEntry;
abort: BOOL ← TRUE;
ok: BOOL ← FALSE;
bytesToCopy: INT;
bytesPerCopy: INT = FS.BytesForPages[200];
BEGIN
ENABLE
BEGIN
IO.Error =>
{
why: ATOM = AlpineFS.ErrorFromStream[stream].code;
IF why = $transAborted THEN GOTO exit;
IF why = $remoteCallFailed
THEN {
exp ← AlpineFS.ErrorFromStream[stream].explanation;
GOTO exit;
};
REJECT;
};
FS.Error =>
{
IF error.code = $transAborted THEN GOTO exit;
IF error.code = $remoteCallFailed
THEN {
exp ← error.explanation;
GOTO exit;
};
REJECT;
};
AlpFile.AccessFailed => { abort ← FALSE; GOTO exit };
END;
SELECT which
FROM
newMail => {
WalnutStream.endCopyNewMailInfo.startCopyPos ← startCopyPos;
endLE ← WalnutStream.endCopyNewMailInfo;
};
readArchive => {
WalnutStream.endCopyReadArchiveInfo.startCopyPos ← startCopyPos;
endLE ← WalnutStream.endCopyReadArchiveInfo
};
ENDCASE => ERROR WalnutDefs.Error[$log, $UnknownLogFileType];
IF tempStream =
NIL
THEN
[writeStream, tempStream] ← WalnutRoot.GetStreamsForCopy[which];
bytesToCopy ← tempStream.GetLength[] - fromPos;
IF bytesToCopy > 0
THEN {
first: BOOL ← TRUE;
tempStream.SetIndex[fromPos];
writeStream.SetIndex[writeStream.GetLength[]];
DO
thisCopy: INT ← MIN[bytesPerCopy, bytesToCopy];
WalnutStream.CopyBytes[to: writeStream, from: tempStream, num: thisCopy];
WalnutStream.FlushStream[writeStream, TRUE];
WalnutRoot.CommitAndContinue[];
IF first THEN first ← FALSE ELSE reportProc["#"];
bytesToCopy ← bytesToCopy - thisCopy;
IF bytesToCopy = 0 THEN EXIT;
ENDLOOP;
};
[] ← WalnutStream.WriteEntry[writeStream, endLE];
WalnutStream.FlushStream[writeStream, TRUE];
EXITS
exit => {
who: ROPE ← IF which = newMail THEN "NewMail" ELSE "ReadArchive";
IF ~abort THEN ERROR WalnutDefs.Error[$log, $AccessFailed, who];
WalnutRoot.AbortTempCopy[which, tempStream];
WalnutRoot.CloseTransaction[];
writeStream ← readStream ← tempStream ← NIL;
ERROR WalnutDefs.Error[
$log, IF exp = NIL THEN $TransactionAbort ELSE $RemoteCallFailed,
IO.PutFR["Copying the %g log", IO.rope[who]] ];
};
END;
tempStream ← NIL;
WalnutRoot.FinishCopy[which];
};
FinishTempLogCopy:
PUBLIC
PROC[which: WalnutKernelDefs.WhichTempLog] =
{ WalnutRoot.FinishCopy[which] };
CreateArchiveLog:
PUBLIC PROC[
fileToRead:
STREAM, msgSet:
ROPE, reportProc:
PROC[msg1, msg2, msg3:
ROPE ←
NIL]]
RETURNS[ok:
BOOL] = {
reason: ROPE;
[ok, reason] ← WalnutMiscLog.CreateReadArchiveLog[fileToRead, msgSet, reportProc];
IF ok THEN [] ← WriteEntry[WalnutStream.endReadArchiveFile];
WalnutMiscLog.CloseReadArchiveLog[];
};