WalnutLogImpl.mesa
Copyright © 1984 by Xerox Corporation. All rights reserved.
Willie-Sue, November 7, 1985 12:25:38 pm PST
Donahue, January 22, 1986 8:04:31 am PST
Walnut Log Operations Implementation
Last Edited by: Willie-Sue, January 10, 1985 12:27:10 pm PST
Last Edited by: Wert, August 31, 1984 9:49:21 pm PDT
Last Edited by: Donahue, December 11, 1984 10:37:45 am PST
DIRECTORY
AlpFile USING [AccessFailed],
AlpineFS USING [ErrorFromStream],
BasicTime USING [GMT],
FS USING [Error, BytesForPages],
IO,
RefText USING [line, AppendRope],
Rope,
RuntimeError USING [BoundsFault],
ViewerTools USING [TiogaContentsRec, TiogaContents],
WalnutDefs USING [Error],
WalnutKernelDefs USING [LogEntry, WhichTempLog],
WalnutLog USING [],
WalnutMiscLog USING [CloseReadArchiveLog, CreateReadArchiveLog],
WalnutRoot USING [AbortTempCopy, AcquireWriteLock, CommitAndContinue, CloseTransaction, FinishCopy, GetCurrentLogStreams, GetExpungeID, GetStreamsForCopy, PrepareToCopyTempLog, ReleaseWriteLock, StatsReport],
WalnutStream;
WalnutLogImpl: CEDAR PROGRAM
IMPORTS
AlpFile, AlpineFS, FS, IO, RefText, Rope, RuntimeError,
WalnutDefs, WalnutMiscLog, WalnutRoot, WalnutStream
EXPORTS
WalnutLog
= BEGIN
Plan
The log streams provide an abstraction for WalnutOps use in handling the log. WalnutOps procedures deal only with the notion of writing an entry on the current log and reading the entry from a specified position. The log position for a write is always at the end of the current log.
Types
GMT: TYPE = BasicTime.GMT;
ROPE: TYPE = Rope.ROPE;
STREAM: TYPE = IO.STREAM;
LogEntry: TYPE = WalnutKernelDefs.LogEntry;
TiogaContents: TYPE = ViewerTools.TiogaContents;
Signals
WalnutDefs.Error: ERROR[who, code: ATOM, explanation: ROPENIL] = CODE;
Raised for a variety of reasons: the current set of codes used by WalnutLogImpl include
$NotScanning
$InvalidOperation
$UnknownLogFileType
$AccessFailed
$LogInaccessible
$TransactionAbort
Variables
-- the read and write (and temp if open) streams are all under the same transaction; WalnutLogImpl must get them from WalnutRoot
writeStream: STREAMNIL;
tempStream: STREAMNIL;   -- when copying a tempFile
readStream: STREAMNIL;   -- only one log for now
scanning: BOOLFALSE;
currentEntryLengthHint: INT ← -1;
currentEntryPos: INT ← -1;
field1: REF TEXT = NEW[TEXT[RefText.line]];
field2: REF TEXT = NEW[TEXT[RefText.line]];
field3: REF TEXT = NEW[TEXT[RefText.line]];
Procedures
Logging of operations that perform actions on the Walnut database
ExpungeMsgs: PUBLIC PROC RETURNS[at, next: INT] =
{ [at, next] ← WriteEntry[WalnutStream.expungeMsgs] };
WriteExpungeLog: PUBLIC PROC RETURNS[at, next: INT] = {
WalnutStream.writeExpungeLog.internalFileID ← WalnutRoot.GetExpungeID[];
[at, next] ← WriteEntry[WalnutStream.writeExpungeLog];
};
CreateMsgSet: PUBLIC PROC[name: ROPE] RETURNS[at, next: INT] = {
field1.length ← 0;
WalnutStream.createMsgSet.msgSet ← RefText.AppendRope[field1, name];
[at, next] ← WriteEntry[WalnutStream.createMsgSet]
};
EmptyMsgSet: PUBLIC PROC[msgSet: ROPE] RETURNS[at, next: INT] = {
field1.length ← 0;
WalnutStream.emptyMsgSet.msgSet ← RefText.AppendRope[field1, msgSet];
[at, next] ← WriteEntry[WalnutStream.emptyMsgSet]
};
DestroyMsgSet: PUBLIC PROC[msgSet: ROPE]
RETURNS[at, next: INT] = {
field1.length ← 0;
WalnutStream.destroyMsgSet.msgSet ← RefText.AppendRope[field1, msgSet];
[at, next] ← WriteEntry[WalnutStream.destroyMsgSet]
};
AddMsg: PUBLIC PROC[msg: ROPE, to: ROPE] RETURNS[at, next: INT] = {
field1.length ← 0;
WalnutStream.addMsg.msg ← RefText.AppendRope[field1, msg];
field2.length ← 0;
WalnutStream.addMsg.to ← RefText.AppendRope[field2, to];
[at, next] ← WriteEntry[WalnutStream.addMsg]
};
RemoveMsg: PUBLIC PROC[msg: ROPE, from: ROPE]
RETURNS[at, next: INT] = {
field1.length ← 0;
WalnutStream.removeMsg.msg ← RefText.AppendRope[field1, msg];
field2.length ← 0;
WalnutStream.removeMsg.from ← RefText.AppendRope[field2, from];
[at, next] ← WriteEntry[WalnutStream.removeMsg]
};
MoveMsg: PUBLIC PROC[msg: ROPE, from, to: ROPE] RETURNS[at, next: INT] = {
field1.length ← 0;
WalnutStream.moveMsg.msg ← RefText.AppendRope[field1, msg];
field2.length ← 0;
WalnutStream.moveMsg.from ← RefText.AppendRope[field2, from];
field3.length ← 0;
WalnutStream.moveMsg.to ← RefText.AppendRope[field3, to];
[at, next] ← WriteEntry[WalnutStream.moveMsg]
};
HasBeenRead: PUBLIC PROC[msg: ROPE] RETURNS[at, next: INT] = {
field1.length ← 0;
WalnutStream.hasbeenRead.msg ← RefText.AppendRope[field1, msg];
[at, next] ← WriteEntry[WalnutStream.hasbeenRead]
};
RecordNewMailInfo: PUBLIC PROC[logLen: INT, when: GMT, server: ROPE, num: INT]
RETURNS[at, next: INT] = {
WalnutStream.recordNewMailInfo.logLen ← logLen;
WalnutStream.recordNewMailInfo.when ← when;
field1.length ← 0;
WalnutStream.recordNewMailInfo.server ← RefText.AppendRope[field1, server];
WalnutStream.recordNewMailInfo.num ← num;
[at, next] ← WriteEntry[WalnutStream.recordNewMailInfo]
};
StartCopyNewMail: PUBLIC PROC RETURNS[at, next: INT] =
{ [at, next] ← WriteEntry[WalnutStream.startCopyNewMail] };
AcceptNewMail: PUBLIC PROC RETURNS[at, next: INT] =
{ [at, next] ← WriteEntry[WalnutStream.acceptNewMail] };
StartReadArchiveFile: PUBLIC PROC[file: ROPE, msgSet: ROPE] RETURNS[at, next: INT] = {
field1.length ← 0;
WalnutStream.startReadArchiveFile.file ← RefText.AppendRope[field1, file];
field2.length ← 0;
WalnutStream.startReadArchiveFile.msgSet ← RefText.AppendRope[field2, msgSet];
[at, next] ← WriteEntry[WalnutStream.startReadArchiveFile]
};
EndReadArchiveFile: PUBLIC PROC RETURNS[at, next: INT] =
{ [at, next] ← WriteEntry[WalnutStream.endReadArchiveFile] };
StartCopyReadArchive: PUBLIC PROC[] RETURNS[at, next: INT] =
{ [at, next] ← WriteEntry[WalnutStream.startCopyReadArchive] };
Managing the current log
AcquireWriteLock: PUBLIC PROC = {
Awl: PROC =
{ WalnutRoot.AcquireWriteLock[] };
CarefullyApply[Awl, "AcquireWriteLock"];
};
ForgetLogStreams: PUBLIC PROC =
{ readStream ← writeStream ← NIL };
CloseLogStreams: PUBLIC PROC = {
WalnutRoot.CloseTransaction[];
readStream ← writeStream ← NIL;
};
LogLength: PUBLIC PROC RETURNS[length: INT] = {
Llen: PROC =
{ lengthwriteStream.GetLength[] };
CarefullyApply[Llen, "LogLength"];
};
OpenLogStreams: PUBLIC PROC = {
IF writeStream = NIL THEN {
[readStream, writeStream] ← WalnutRoot.GetCurrentLogStreams[];
IF scanning THEN readStream.SetIndex[currentEntryPos];
};
};
ReleaseWriteLock: PUBLIC PROC = {
IF writeStream = NIL THEN RETURN;
[readStream, writeStream] ← WalnutRoot.ReleaseWriteLock[];
IF scanning THEN readStream.SetIndex[currentEntryPos];
};
ResetLog: PUBLIC PROC[newLength: INT] = {
Rl: PROC = {
writeStream.SetIndex[newLength];
WalnutStream.SetHighWaterMark[writeStream, newLength, -1];
writeStream.SetIndex[newLength];
writeStream.Flush[];
};
CarefullyApply[Rl, "ResetLog"];
};
ReturnCurrentLogStreams: PUBLIC PROC = {
readStream ← writeStream ← NIL;
scanning ← FALSE;
};
ShutdownLog: PUBLIC PROC = {
WalnutRoot.CloseTransaction[];
readStream ← writeStream ← NIL;
scanning ← FALSE;
};
Write a message on the current log
WriteMessage: PUBLIC PROC[msg: ROPE, body: TiogaContents] RETURNS[at: INT]= {
WriteMsg: PROC[] = {
writeStream.SetIndex[writeStream.GetLength[]];
field1.length ← 0;
WalnutStream.createMsg.msg ← RefText.AppendRope[field1, msg];
IF body.formatting.Length[] # 0 THEN {
last: INT ← body.contents.Length[] - 1;
IF body.contents.Fetch[last] = '\000 THEN  -- NULL for padding
{ body.contents ← Rope.Substr[body.contents, 1, last-1];
body.formatting ← Rope.Concat["\000", body.formatting]
}
ELSE body.contents ← Rope.Substr[body.contents, 1];
};
WalnutStream.createMsg.textLen ← body.contents.Length[];
WalnutStream.createMsg.formatLen ← body.formatting.Length[];
at ← WalnutStream.WriteEntry[writeStream, WalnutStream.createMsg];
WalnutStream.WriteMsgBody[writeStream, body];
WalnutStream.FlushStream[strm: writeStream, setCreateDate: TRUE];
};
CarefullyApply[WriteMsg, "WriteMessage"];
};
Parsing a log (used by Archive and Replay/Scavenge
SetPosition: PUBLIC PROC[startPos: INT] RETURNS[charsSkipped: INT] = {
next: INT;
SetPs: PROC[] = {
readStream.SetIndex[startPos];
next ← WalnutStream.FindNextEntry[readStream];
currentEntryLengthHint ← -1;
IF next # -1 THEN currentEntryPos ← next;
scanning ← TRUE;
};
CarefullyApply[SetPs, "SetPosition"];
RETURN[IF next = -1 THEN next ELSE next - startPos];
};
SetIndex: PUBLIC PROC[pos: INT] = {
Si: PROC[] = {
readStream.SetIndex[pos];
currentEntryLengthHint ← -1;
currentEntryPos ← pos;
scanning ← TRUE;
};
CarefullyApply[Si, "SetIndex"];
};
NextAt: PUBLIC PROC RETURNS[at: INT] =
{ at ← currentEntryPos };
NextEntry: PUBLIC PROC RETURNS[le: LogEntry, at: INT] =
{ [le, at] ← NextEntryInternal[FALSE, "NextEntry"] };
NextEntryInternal: PROC[quick: BOOL, who: ROPE] RETURNS[le: LogEntry, at: INT] = {
Ne: PROC[] = {
length: INT;
[le, length] ← WalnutStream.ReadEntry[readStream, quick];
at ← currentEntryPos;
currentEntryLengthHint ← -1;
IF length = -1 THEN {
IF at = readStream.GetLength[] THEN RETURN;
at ← -1; RETURN
};
currentEntryPos ← at + length;
};
IF ~scanning THEN ERROR WalnutDefs.Error[$log, $NotScanning];
CarefullyApply[Ne, who];
};
QuickScan: PUBLIC PROC RETURNS[le: LogEntry, at: INT] =
{ [le, at] ← NextEntryInternal[quick: TRUE, who: "QuickScan"] };
ArchiveEntry: PUBLIC PROC[to: IO.STREAM] RETURNS[ok: BOOL]= {
Copies the next entry of the current log to an archive stream.
Ane: PROC[] = {
ok ← FALSE;
IF currentEntryLengthHint = -1 THEN
currentEntryLengthHint ←
WalnutStream.PeekEntry[readStream].length;
IF currentEntryLengthHint = -1 THEN RETURN;
WalnutStream.CopyBytes[
from: readStream, to: to, num: currentEntryLengthHint];
currentEntryPos ← readStream.GetIndex[];
currentEntryLengthHint ← -1;  -- unknown
ok ← TRUE;
};
IF ~scanning THEN ERROR WalnutDefs.Error[$log, $NotScanning];
CarefullyApply[Ane, "ArchiveEntry"];
};
CopyBytesToArchive: PUBLIC PROC[to: IO.STREAM, startPos, num: INT] = {
Cba: PROC = {
readStream.SetIndex[startPos];
WalnutStream.CopyBytes[from: readStream, to: to, num: num];
scanning ← FALSE;
};
CarefullyApply[Cba, "CopyBytesToArchive"];
};
Copying NewMail/ReadArchive logs
PrepareToCopyTempLog: PUBLIC PROC[which: WalnutKernelDefs.WhichTempLog, pagesAlreadyCopied: INT, reportProc: PROC[msg1, msg2, msg3: ROPENIL]] RETURNS[BOOL] = {
makes sure the writeStream is long enought to copy the which log onto
[writeStream, tempStream] ← WalnutRoot.PrepareToCopyTempLog[which, pagesAlreadyCopied, reportProc];
RETURN[tempStream # NIL];
};
CopyTempLog: PUBLIC PROC[which: WalnutKernelDefs.WhichTempLog, startCopyPos, fromPos: INT, reportProc: PROC[msg1, msg2, msg3: ROPENIL] ] = {
copies the which templog, starting at fromPos, to the end of the writeStream; writes an EndCopy log entry, and sets the length of which to 0; if the operation fails, WalnutDefs.Error is raised; NOT done inside carefullyApply, so that it can catch transaction aborts
exp: ROPE;
endLE: LogEntry;
abort: BOOLTRUE;
ok: BOOLFALSE;
bytesToCopy: INT;
bytesPerCopy: INT = FS.BytesForPages[200];
BEGIN ENABLE BEGIN
IO.Error => {
why: ATOM = AlpineFS.ErrorFromStream[stream].code;
IF why = $transAborted THEN GOTO exit;
IF why = $remoteCallFailed THEN {
exp ← AlpineFS.ErrorFromStream[stream].explanation;
GOTO exit;
};
REJECT;
};
FS.Error => {
IF error.code = $transAborted THEN GOTO exit;
IF error.code = $remoteCallFailed THEN {
exp ← error.explanation;
GOTO exit;
};
REJECT;
};
AlpFile.AccessFailed => { abort ← FALSE; GOTO exit };
END;
SELECT which FROM
newMail => {
WalnutStream.endCopyNewMailInfo.startCopyPos ← startCopyPos;
endLE ← WalnutStream.endCopyNewMailInfo;
};
readArchive => {
WalnutStream.endCopyReadArchiveInfo.startCopyPos ← startCopyPos;
endLE ← WalnutStream.endCopyReadArchiveInfo
};
ENDCASE => ERROR WalnutDefs.Error[$log, $UnknownLogFileType];
IF tempStream = NIL THEN
[writeStream, tempStream] ← WalnutRoot.GetStreamsForCopy[which];
bytesToCopy ← tempStream.GetLength[] - fromPos;
IF bytesToCopy > 0 THEN {
first: BOOLTRUE;
tempStream.SetIndex[fromPos];
writeStream.SetIndex[writeStream.GetLength[]];
DO
thisCopy: INTMIN[bytesPerCopy, bytesToCopy];
WalnutStream.CopyBytes[to: writeStream, from: tempStream, num: thisCopy];
WalnutStream.FlushStream[writeStream, TRUE];
WalnutRoot.CommitAndContinue[];
IF first THEN first ← FALSE ELSE reportProc["#"];
bytesToCopy ← bytesToCopy - thisCopy;
IF bytesToCopy = 0 THEN EXIT;
ENDLOOP;
};
[] ← WalnutStream.WriteEntry[writeStream, endLE];
WalnutStream.FlushStream[writeStream, TRUE];
EXITS
exit => {
who: ROPEIF which = newMail THEN "NewMail" ELSE "ReadArchive";
IF ~abort THEN ERROR WalnutDefs.Error[$log, $AccessFailed, who];
WalnutRoot.AbortTempCopy[which, tempStream];
WalnutRoot.CloseTransaction[];
writeStream ← readStream ← tempStream ← NIL;
ERROR WalnutDefs.Error[
$log, IF exp = NIL THEN $TransactionAbort ELSE $RemoteCallFailed,
IO.PutFR["Copying the %g log", IO.rope[who]] ];
};
END;
tempStream ← NIL;
WalnutRoot.FinishCopy[which];
};
FinishTempLogCopy: PUBLIC PROC[which: WalnutKernelDefs.WhichTempLog] =
{ WalnutRoot.FinishCopy[which] };
CreateArchiveLog: PUBLIC PROC[
fileToRead: STREAM, msgSet: ROPE, reportProc: PROC[msg1, msg2, msg3: ROPENIL]]
RETURNS[ok: BOOL] = {
reason: ROPE;
[ok, reason] ← WalnutMiscLog.CreateReadArchiveLog[fileToRead, msgSet, reportProc];
IF ok THEN [] ← WriteEntry[WalnutStream.endReadArchiveFile];
WalnutMiscLog.CloseReadArchiveLog[];
};
Utilities
GetTiogaContents: PUBLIC PROC[textStart, textLen, formatLen: INT]
RETURNS[contents: TiogaContents] = {
TContents: PROC[] = {
readStream.SetIndex[textStart];
scanning ← FALSE;
contents ← NEW[ViewerTools.TiogaContentsRec];
contents.contents ← WalnutStream.ReadRope[readStream, textLen];
contents.formatting ← WalnutStream.ReadRope[readStream, formatLen];
};
IF formatLen # 0 THEN  -- tioga formatting nonsense
{ textLen ← textLen + 1; textStart ← textStart - 1};
CarefullyApply[TContents, "GetTiogaContents"];
};
GetRefTextFromLog: PUBLIC PROC[startPos, length: INT, text: REF TEXT] = {
natLen: NAT;
Grf: PROC = {
bytesRead: NAT;
readStream.SetIndex[startPos];
scanning ← FALSE;
bytesRead ← readStream.GetBlock[block: text, startIndex: 0, count: natLen];
IF bytesRead < natLen THEN text.length ← 0;
};
CheckSize: PROC[len: INT] RETURNS[nat: NAT] = { nat ← len };
natLen ← CheckSize[length ! RuntimeError.BoundsFault => GOTO noGood];
CarefullyApply[Grf, "GetRefTextFromLog"];
EXITS
noGood =>
ERROR WalnutDefs.Error[$log, $BadLength,
IO.PutFR[" Can't convert %g into a nat", IO.int[length] ]];
};
Local Utilities
WriteEntry: PROC[le: LogEntry] RETURNS[at, next: INT] = {
We: PROC = {
at ← WalnutStream.WriteEntry[writeStream, le];
WalnutStream.FlushStream[writeStream, TRUE];
next ← writeStream.GetIndex[];
};
CarefullyApply[We, "WriteEntry"];
};
curLog: ROPE = "On currentLog, during ";
CarefullyApply: PROC[ proc: PROC[], who: ROPE ] = {
exp: ROPE;
BEGIN ENABLE BEGIN
IO.Error => {
why: ATOM = AlpineFS.ErrorFromStream[stream].code;
IF why = $transAborted THEN GOTO aborted;
IF why = $remoteCallFailed THEN {
exp ← AlpineFS.ErrorFromStream[stream].explanation;
GOTO failed;
};
REJECT;
};
FS.Error => {
IF error.code = $transAborted THEN GOTO aborted;
IF error.code = $remoteCallFailed THEN {
exp ← error.explanation;
GOTO failed;
};
REJECT;
};
AlpFile.AccessFailed =>
IF missingAccess = spaceQuota THEN GOTO outOfSpace ELSE GOTO exit;
END;
IF readStream = NIL THEN  -- test read, not write stream, in case readOnly
ERROR WalnutDefs.Error[$log, $LogNotOpen, curLog.Concat[who]];
proc[];
EXITS
aborted => {
WalnutRoot.StatsReport[Rope.Concat["\n**** trans abort on currentLog doing", who]];
ERROR WalnutDefs.Error[$log, $TransactionAbort, curLog.Concat[who]];
};
failed => {
WalnutRoot.StatsReport[Rope.Concat["\n**** remote call failed on currentLog doing", who]];
ERROR WalnutDefs.Error[$log, $RemoteCallFailed, exp];
};
outOfSpace =>
ERROR WalnutDefs.Error[$log, $OutOfSpace, curLog.Concat[who]];
exit =>
ERROR WalnutDefs.Error[$log, $OtherAlpineMissingAccess, curLog.Concat[who]];
END;
};
END.