WalnutRootImpl.mesa
Copyright © 1985 by Xerox Corporation. All rights reserved.
Willie-Sue, January 6, 1986 4:30:40 pm PST
Rick Beach, June 21, 1985 4:36:05 pm PDT
Donahue, January 28, 1986 5:12:30 pm PST
Walnut Root Operations Implementation
DIRECTORY
AlpineEnvironment USING [VolumeID],
AlpFile USING [LockOption, UnlockFile, Handle, Close],
AlpineFile USING [Unknown],
AlpineFS USING [OpenFile, FileOptions, StreamOptions, GetAlpFileHandle,
 Open, OpenFileFromStream, SetLockOption, StreamFromOpenFile,
 StreamFromOpenStream],
AlpInstance USING [Failed, FileStore, Handle, Create],
AlpTransaction USING [Handle, OperationFailed, Outcome, Create, Finish, GetNextVolumeGroup, UnlockOwnerDB, VolumeGroupID],
BasicTime USING [GMT, nullGMT, Now],
Convert USING [IntFromRope, RopeFromInt],
DB USING [Aborted, Error, Failure,
 AbortCache, DeclareSegment, EndTransaction, EraseSegment, FlushCache,
 GetSegmentInfo,
 Initialize, OpenTransaction],
DBEnvironment USING [Error, Failure],
FS USING [ComponentPositions, Error, nullOpenFile,
 Close, GetInfo, ExpandName, OpenFile, PagesForBytes, SetPageCount,
 StreamBufferParms, SetByteCountAndCreatedTime],
IO,
Rope,
RPC USING [CallFailed],
WalnutDefs USING [Error, Segment],
WalnutKernelDefs USING [LogInfoFromRoot, RootEntry, SegmentID, WhichTempLog],
WalnutRoot USING [],
WalnutStream USING [FlushStream, SetHighWaterMark];
WalnutRootImpl: CEDAR MONITOR
IMPORTS
AlpFile, AlpineFile, AlpineFS, AlpInstance, AlpTransaction,
BasicTime, Convert, DB, DBEnvironment, FS, IO, Rope, RPC,
WalnutDefs, WalnutStream
EXPORTS
WalnutRoot
= BEGIN
Plan
The root file provides the path names for all the other files used by Walnut, plus other information about the database. The only time the rootFile is written is at the end of an expunge, when the currentLog and expungeLog are swapped.
Types
GMT: TYPE = BasicTime.GMT;
ROPE: TYPE = Rope.ROPE;
STREAM: TYPE = IO.STREAM;
WhichTempLog: TYPE = WalnutKernelDefs.WhichTempLog;
TransInfo: TYPE = RECORD [
fileStore: AlpInstance.FileStore,
handle: AlpInstance.Handle];
nullTransInfo: TransInfo ← [NIL, NIL];
InternalLogInfo: TYPE = REF InternalLogInfoObject;
InternalLogInfoObject: TYPE = RECORD [
readStream: STREAM,     -- not used for expungeLog
writeStream: STREAM,
name: ROPENIL,      -- rope name of this log
internalFileID: INT,
logSeqNo: INT ← -1,
logSeqNoPos: INT ← -1     -- position in rootLog of logSeqNo
];
MiscLogInfo: TYPE = REF MiscLogInfoObject;
MiscLogInfoObject: TYPE = RECORD [
stream: STREAM,
name: ROPENIL,
trans: AlpTransaction.Handle ← NIL,
rootOpenFile: FS.OpenFile ← FS.nullOpenFile -- may have its own transaction
];
RootLog: TYPE = REF RootLogObject;
RootLogObject: TYPE = RECORD [
name: ROPE,
createDate: GMT ← BasicTime.nullGMT,
readOnly: BOOLFALSE,
keyFromRootFile: ROPENIL,
mailForFromRootFile: ROPENIL,
dbNameFromRootFile: ROPENIL,
segmentID: WalnutKernelDefs.SegmentID ← defaultSegmentID,
transInfo: TransInfo ← nullTransInfo,
alpTH: AlpTransaction.Handle ← NIL,   -- current transaction
rootOpenFileForLogs: FS.OpenFile ← FS.nullOpenFile,
currentLog: InternalLogInfo ← NIL,   -- only one log for now
expungeLog: InternalLogInfo ← NIL,
newMailLog: MiscLogInfo ← NIL,
readArchiveLog: MiscLogInfo ← NIL
];
TransID: TYPE = MACHINE DEPENDENT RECORD [  -- taken from ConcreteTransID
randomBits: INT,
idOnFileStore: INT,
fileStore: AlpineEnvironment.VolumeID
];
defaultSegmentID: WalnutKernelDefs.SegmentID ← [$Walnut, 0];
Variables
rootLog: RootLog ← NIL;
statsProc: PROC[ROPE] ← NIL;
seriousDebugging: BOOLFALSE;
BogusDBTrans: SIGNAL = CODE;
currentVolumeGroupID: AlpTransaction.VolumeGroupID;
Procedures
Opening/closing the rootFile
for opening the rootFile
fileOptions: AlpineFS.FileOptions = [
updateCreateTime: FALSE,  -- is done explicitly
referencePattern: sequential,
recoveryOption: $log,
finishTransOnClose: FALSE];
for the stream for the rootFile - it is a small file
rootBufferOption: FS.StreamBufferParms = [vmPagesPerBuffer: 4, nBuffers: 1];
alpineStreamOptions: AlpineFS.StreamOptions ← [
tiogaRead: FALSE,
commitAndReopenTransOnFlush: FALSE,  -- just flush
truncatePagesOnClose: FALSE,
finishTransOnClose: FALSE,
closeFSOpenFileOnClose: TRUE -- Note: this option is changed for opening temp logs when doing copies, so that we can Unlock the file before closing it
];
Open: PUBLIC PROC[ rootName: ROPE, readOnly: BOOLFALSE, newSegmentOk: BOOLFALSE] RETURNS[ key, mailFor: ROPE, rootFileCreateDate: GMT, segment: WalnutDefs.Segment, isReadOnly: BOOL] = {
newMailLogName, readArchiveLogName: ROPE;
logInfoList: LIST OF WalnutKernelDefs.LogInfoFromRoot;
fileName, dbName: ROPE;
isReadOnly ← FALSE;
IF rootLog # NIL THEN
ERROR WalnutDefs.Error[$root, $RootAlreadyOpen, "Must do a Close"];
IF seriousDebugging THEN {
trans: AlpTransaction.Handle ←
NARROW[DB.GetSegmentInfo[$Walnut].trans];
IF trans # NIL THEN SIGNAL BogusDBTrans;
DB.AbortCache[trans];
};
BEGIN
logInfoCount: INT ← 0;
reason: ATOM;
openFile: FS.OpenFile;
rStream: STREAM;
rootLog ← NEW[RootLogObject ← [name: rootName, readOnly: readOnly]];
rootLog.transInfo ← [fileStore: FileStoreForRoot[rootName]];
rootLog.alpTH ← CreateTrans[];
currentVolumeGroupID ← AlpTransaction.GetNextVolumeGroup[rootLog.alpTH];
rootLog.segmentID ← [$Walnut, 0];  -- eventually from rootFile
BEGIN ENABLE FS.Error => { reason ← error.code; GOTO error };
openFile ← AlpineFS.Open[
name: rootName, access: $write, lock: [$write, $wait],
options: fileOptions, transHandle: rootLog.alpTH];
FS.Close[openFile];
EXITS error =>
IF reason = $unknownFile THEN
ERROR WalnutDefs.Error[$root, $RootNotFound, rootName]
ELSE
IF reason = $accessDenied THEN isReadOnly ← TRUE
ELSE ERROR WalnutDefs.Error[$root, reason, rootName];
END;
rootLog.readOnly ← isReadOnly;
BEGIN ENABLE FS.Error => { reason ← error.code; GOTO error };
openFile ← AlpineFS.Open[
name: rootName, access: $read, lock: [$read, $wait],
options: fileOptions, transHandle: rootLog.alpTH];
rStream ← AlpineFS.StreamFromOpenFile[
openFile: openFile,
streamOptions: alpineStreamOptions,
streamBufferParms: streamBufferOption];
rootFileCreateDate ← rootLog.createDate ← FS.GetInfo[openFile].created;
EXITS error =>
ERROR WalnutDefs.Error[$root, reason, rootName];
END;
parse the root file to get information about the database, the newMail, readArchive, current and expunge logs
BEGIN ENABLE IO.EndOfStream, IO.Error => GOTO cantParse;
finished: BOOL FALSE;
rStream.SetIndex[0];
DO
curPos: INT ← rStream.GetIndex[];
rootEntry: WalnutKernelDefs.RootEntry ← NextRootEntry[rStream];
DO
IF rootEntry.ident.Equal["Key", FALSE] THEN {
key ← rootEntry.value; EXIT
};
IF rootEntry.ident.Equal["MailFor", FALSE] THEN {
mailFor ← rootEntry.value; EXIT
};
IF rootEntry.ident.Equal["Database", FALSE] THEN {
dbName ← rootEntry.value; EXIT
};
IF rootEntry.ident.Equal["NewMailLog", FALSE] THEN {
newMailLogName ← rootEntry.value; EXIT
};
IF rootEntry.ident.Equal["ReadArchiveLog", FALSE] THEN {
readArchiveLogName ← rootEntry.value; EXIT
};
IF rootEntry.ident.Equal["LogInfo", FALSE] THEN {
logInfoList ← CONS[rootEntry.info, logInfoList];
logInfoCount ← logInfoCount + 1;
EXIT
};
IF rootEntry.ident.Equal["End", FALSE] THEN {
finished ← TRUE;
EXIT
};
rStream.Close[ ! IO.Error, FS.Error => CONTINUE];
FinishTrans[rootLog.alpTH, FALSE, FALSE ! WalnutDefs.Error => CONTINUE];
rootLog ← NIL;
ERROR WalnutDefs.Error[$log, $BadRootFile,
IO.PutFR["Unknown entry %g in RootFile at pos %g",
IO.rope[rootEntry.ident], IO.int[curPos]]];
ENDLOOP;
IF finished THEN EXIT;
ENDLOOP;
EXITS
cantParse => {
rStream.Close[ ! IO.Error, FS.Error => CONTINUE];
FinishTrans[rootLog.alpTH, FALSE, FALSE ! WalnutDefs.Error => CONTINUE];
rootLog ← NIL;
ERROR WalnutDefs.Error[$log, $BadRootFile, "Couldn't parse root file"];
};
END;
rStream.Close[];
rStream← NIL;  -- not needed
IF key.Length[] = 0 OR dbName.Length[] = 0 OR mailFor.Length[] = 0 OR
newMailLogName.Length[] = 0 OR readArchiveLogName.Length[] = 0 OR
logInfoCount # 2 THEN
ERROR WalnutDefs.Error[
$root, $BadRootFile, "Incomplete RootFile - missing entries"];
check that all the alpine-server names are the same
BEGIN
cp: FS.ComponentPositions;
fn: ROPE;
rServer, oServer: ROPE;
CheckServer: PROC[name: ROPE] = {
[fn, cp, ] ← FS.ExpandName[name];
oServer ← Rope.Substr[fn, cp.server.start, cp.server.length];
IF ~Rope.Equal[rServer, oServer, FALSE] THEN {
rName: ROPE ← rootLog.name;
FS.Close[openFile ! FS.Error => CONTINUE];
FinishTrans[rootLog.alpTH, FALSE, FALSE ! WalnutDefs.Error => CONTINUE];
rootLog ← NIL;
ERROR WalnutDefs.Error[$root, $BadRootFile,
IO.PutFR["server for file %g does not agree with that for %g",
IO.rope[name], IO.rope[rName]] ];
};
};
[fn, cp, ] ← FS.ExpandName[rootLog.name];
rServer ← Rope.Substr[fn, cp.server.start, cp.server.length];
CheckServer[dbName];
CheckServer[newMailLogName];
CheckServer[readArchiveLogName];
CheckServer[logInfoList.first.fileName];
CheckServer[logInfoList.rest.first.fileName];
END;
rootLog.keyFromRootFile ← key;
rootLog.mailForFromRootFile ← mailFor;
rootLog.dbNameFromRootFile ← dbName;
FS.Close[openFile ! FS.Error => CONTINUE];
FinishTrans[rootLog.alpTH, FALSE, FALSE ! WalnutDefs.Error => CONTINUE];
rootLog.alpTH ← NIL;
END;
set up the internal log objects for all the logs
FOR lil: LIST OF WalnutKernelDefs.LogInfoFromRoot ← logInfoList, lil.rest UNTIL lil = NIL DO
seqNo: ROPE ← lil.first.logSeqNo;
fileName lil.first.fileName;
IF seqNo.Fetch[0] # 'E THEN {  -- this is the current log
rootLog.currentLog ← NEW[InternalLogInfoObject ← [
name: fileName,
internalFileID: lil.first.internalFileID,
logSeqNo: Convert.IntFromRope[seqNo],
logSeqNoPos: lil.first.seqNoPos
]];
}
ELSE   -- this is the expunge log
rootLog.expungeLog ← NEW[InternalLogInfoObject ← [
name: fileName,
internalFileID: lil.first.internalFileID,
logSeqNo: -1,
logSeqNoPos: lil.first.seqNoPos
]];
ENDLOOP;
IF ~isReadOnly THEN {
rootLog.newMailLog ← NEW[MiscLogInfoObject ← [name: newMailLogName]];
rootLog.readArchiveLog ← NEW[MiscLogInfoObject ← [name: readArchiveLogName]];
}
ELSE rootLog.expungeLog ← NIL;  -- is readOnly, can't have expungeLog
segment ← rootLog.segmentID.segment;
DeclareDBSegment[rootLog, newSegmentOk];
};
StartTransaction: PUBLIC PROC = {
IF rootLog.alpTH # NIL THEN {
BEGIN
IF seriousDebugging THEN CheckDBTrans["StartTransaction"];
DB.EndTransaction[rootLog.alpTH ! DB.Aborted, DB.Error, DB.Failure => GOTO err];
EXITS
err =>
DB.AbortCache[rootLog.alpTH ! DB.Aborted, DB.Error, DB.Failure => CONTINUE];
END;
FS.Close[rootLog.rootOpenFileForLogs ! FS.Error => CONTINUE];
FinishTrans[alpTH: rootLog.alpTH, abort: FALSE, continue: FALSE !
WalnutDefs.Error => CONTINUE];
};
[rootLog.rootOpenFileForLogs, rootLog.alpTH] ← GetRootReadLock[];
OpenDBTrans[rootLog];
};
Shutdown: PUBLIC PROC = {
IF rootLog = NIL THEN RETURN;
CloseInternalLog[rootLog.currentLog];
CloseInternalLog[rootLog.expungeLog];
CloseMiscLog[mli: rootLog.readArchiveLog];
CloseMiscLog[mli: rootLog.newMailLog, abortIfExternal: TRUE];
IF seriousDebugging THEN CheckDBTrans["Shutdown"];
DB.EndTransaction[rootLog.alpTH ! DB.Aborted, DB.Error, DB.Failure => CONTINUE];
don't care about errors from FinishTrans
FS.Close[rootLog.rootOpenFileForLogs ! FS.Error => CONTINUE];
FinishTrans[alpTH: rootLog.alpTH, abort: FALSE, continue: FALSE ! WalnutDefs.Error => CONTINUE];
rootLog← NIL;
};
CloseTransaction: PUBLIC PROC = {
IF rootLog = NIL THEN RETURN;
CloseInternalLog[rootLog.currentLog];
CloseInternalLog[rootLog.expungeLog];
CloseMiscLog[rootLog.readArchiveLog];
IF seriousDebugging THEN CheckDBTrans["CloseTransaction"];
BEGIN
DB.EndTransaction[rootLog.alpTH ! DB.Aborted, DB.Error, DB.Failure => GOTO err];
EXITS
err => DB.AbortCache[rootLog.alpTH ! DB.Aborted, DB.Error, DB.Failure => CONTINUE];
END;
FS.Close[rootLog.rootOpenFileForLogs ! FS.Error => CONTINUE];
rootLog.rootOpenFileForLogs ← FS.nullOpenFile;
FinishTrans[rootLog.alpTH, FALSE, FALSE ! WalnutDefs.Error => CONTINUE];
rootLog.alpTH ← NIL;
};
CommitAndContinue: PUBLIC PROC[flushDB: BOOLTRUE] = {
eCode: ATOM;
BEGIN ENABLE BEGIN
DB.Aborted => {eCode ← $TransactionAbort; GOTO error };
DB.Error => {eCode ← $DBError; GOTO error };
DB.Failure => {eCode ← $DatabaseInaccessible; GOTO error };
END;
IF seriousDebugging THEN CheckDBTrans["CommitAndContinue"];
IF flushDB THEN DB.FlushCache[rootLog.alpTH];
EXITS
error => ERROR WalnutDefs.Error[$db, eCode, "Can't flush"];
END;
FinishTrans[alpTH: rootLog.alpTH, abort: FALSE, continue: TRUE];
AlpTransaction.UnlockOwnerDB[rootLog.alpTH, currentVolumeGroupID]
};
AbortTransaction: PUBLIC PROC = {
IF rootLog = NIL THEN RETURN;
IF rootLog.alpTH = NIL THEN {
IF seriousDebugging THEN StatsReport["\nAbort of NIL trans"]; RETURN };
CloseInternalLog[rootLog.currentLog];
CloseInternalLog[rootLog.expungeLog];
CloseMiscLog[rootLog.readArchiveLog];
IF seriousDebugging THEN CheckDBTrans["AbortTransaction"];
DB.AbortCache[rootLog.alpTH];
FinishTrans[alpTH: rootLog.alpTH, abort: TRUE, continue: FALSE];
rootLog.alpTH ← NIL;
};
EraseDB: PUBLIC PROC = {
code: ATOM;
IF rootLog = NIL THEN
ERROR WalnutDefs.Error[$root, $RootNotOpen, "Trying to do an EraseDB"];
IF seriousDebugging THEN CheckDBTrans["EraseDB"];
DB.EndTransaction[rootLog.alpTH ! DB.Aborted, DB.Error, DB.Failure => CONTINUE];
DB.DeclareSegment[
filePath: rootLog.dbNameFromRootFile,
segment: rootLog.segmentID.segment,
number: rootLog.segmentID.index,
readonly: FALSE, nPagesInitial: 256, nPagesPerExtent: 256];
BEGIN ENABLE BEGIN
DB.Error, DB.Failure => { code ← $DBError; GOTO error};
DB.Failure => { code ← $DatabaseInaccessible; GOTO error};
DB.Aborted => { code ← $TransactionAbort; GOTO error};
END;
DB.EraseSegment[rootLog.segmentID.segment, rootLog.alpTH];  -- leaves trans open
EXITS
error => {
DB.AbortCache[rootLog.alpTH];
ERROR WalnutDefs.Error[$db, code, "Can't erase"];
};
END;
};
FlushAndContinue: PUBLIC PROC[strm: STREAM] = {
mli: MiscLogInfo;
IF strm # (mli ← rootLog.newMailLog).stream THEN
ERROR WalnutDefs.Error[$root, $UnknownStream];
WalnutStream.FlushStream[strm: strm, setCreateDate: TRUE];
FinishTrans[alpTH: mli.trans, abort: FALSE, continue: TRUE]
};
GetRootFileStamp: PUBLIC PROC RETURNS[rootFileStamp: GMT] = {
reason: ATOM;
BEGIN ENABLE FS.Error => { reason ← error.code; GOTO error };
of: FS.OpenFile ←
AlpineFS.Open[
name: rootLog.name,
options: fileOptions,
lock: [$read, fail],
transHandle: rootLog.alpTH];
rootFileStamp ← FS.GetInfo[of].created;
FS.Close[of];
EXITS
error =>
ERROR WalnutDefs.Error[$root, reason, rootLog.name];
END;
};
GetExpungeID: PUBLIC PROC RETURNS[expungeFileID: INT] =
{ RETURN[rootLog.expungeLog.internalFileID] };
Managing the CurrentLog
AcquireWriteLock: PUBLIC PROC = {
of: AlpineFS.OpenFile ← AlpineFS.OpenFileFromStream[rootLog.currentLog.writeStream];
AlpineFS.SetLockOption[of, [$write, $wait] ]
};
GetCurrentLogStreams: PUBLIC PROC RETURNS[readStream, writeStream: STREAM] = {
IF rootLog = NIL OR rootLog.alpTH = NIL THEN
ERROR WalnutDefs.Error[$log, $TransNotOpen, "Must do Shutdown and Startup"];
IF rootLog.readOnly THEN rootLog.currentLog.readStream ←
StreamOpen[name: rootLog.currentLog.name, readOnly: TRUE, trans: rootLog.alpTH]
ELSE {
rootLog.currentLog.writeStream ←
StreamOpen[name: rootLog.currentLog.name, trans: rootLog.alpTH];
rootLog.currentLog.readStream ←
AlpineFS.StreamFromOpenStream[rootLog.currentLog.writeStream];
};
RETURN[rootLog.currentLog.readStream, rootLog.currentLog.writeStream]
};
ReleaseWriteLock: PUBLIC PROC RETURNS[readStream, writeStream: STREAM] = {
-- uses same trans as before
CloseInternalLog[rootLog.currentLog];
rootLog.currentLog.writeStream ←
StreamOpen[name: rootLog.currentLog.name, trans: rootLog.alpTH];
rootLog.currentLog.readStream ←
AlpineFS.StreamFromOpenStream[rootLog.currentLog.writeStream];
RETURN[rootLog.currentLog.readStream, rootLog.currentLog.writeStream];
};
ReturnCurrentLogStreams: PUBLIC PROC = {
IF rootLog = NIL THEN RETURN;
CloseInternalLog[rootLog.currentLog];
CloseInternalLog[rootLog.expungeLog];
FinishTrans[rootLog.alpTH, FALSE, TRUE];
};
Managing the ExpungeLog
GetStreamsForExpunge: PUBLIC PROC RETURNS[
currentStream, expungeStream: STREAM, keyIs: ROPE, expungeFileID, logSeqNo: INT] = {
-- assert: have a write lock on the currentLog, the expungeLog is not open
IF rootLog.expungeLog.writeStream # NIL THEN
rootLog.expungeLog.writeStream.Close[ ! IO.Error, FS.Error => CONTINUE];
rootLog.expungeLog.writeStream ← NIL;
rootLog.expungeLog.writeStream ← StreamOpen[
name: rootLog.expungeLog.name, trans: rootLog.alpTH, exclusive: TRUE];
RETURN[
rootLog.currentLog.readStream,
rootLog.expungeLog.writeStream,
rootLog.keyFromRootFile,
rootLog.expungeLog.internalFileID,
rootLog.currentLog.logSeqNo]   -- *****NOTE FIRST AND LAST VALUES
};
SwapLogs: PUBLIC PROC[expungeFileID: INT, newRootStamp: GMT] RETURNS[newLogLen: INT] = {
Renames the current log and expunge log so that they are interchanged and rewrites the time stamps in the root file for the logs
tempLog: InternalLogInfo;
reason: ATOM;
IF rootLog.currentLog.internalFileID = expungeFileID THEN  -- already did swap
RETURN[rootLog.currentLog.writeStream.GetLength[]];
IF rootLog.expungeLog.internalFileID # expungeFileID THEN
ERROR WalnutDefs.Error[$expungeLog, $WrongFileID,
IO.PutFR["FileID is: %g, FileID expected is: %g",
IO.int[rootLog.expungeLog.internalFileID], IO.int[expungeFileID]] ];
IF rootLog.expungeLog.writeStream = NIL THEN
ERROR WalnutDefs.Error[$expungeLog, $LogNotOpen, "Doing SwapLogs"];
BEGIN ENABLE FS.Error => { reason ← error.code; GOTO error };
of: FS.OpenFile ← AlpineFS.Open[
name: rootLog.name, wantedCreatedTime: rootLog.createDate,
access: $write, lock: [$write, $wait], transHandle: rootLog.alpTH];
rStream: STREAM ← AlpineFS.StreamFromOpenFile[
openFile: of, accessRights: $write,
streamOptions: alpineStreamOptions,
streamBufferParms: streamBufferOption];
rStream.SetIndex[rootLog.currentLog.logSeqNoPos];
rStream.PutRope["Expunge"];
rStream.SetIndex[rootLog.expungeLog.logSeqNoPos];
rStream.PutRope[IO.PutFR["%07d", IO.int[rootLog.currentLog.logSeqNo]]];
FS.SetByteCountAndCreatedTime[of, -1, newRootStamp];
WalnutStream.SetHighWaterMark[rootLog.currentLog.writeStream, 0, 0];
newLogLen ← rootLog.expungeLog.writeStream.GetLength[];
rStream.Close[];  -- commits rootFile and other streams
rootLog.createDate ← newRootStamp;
CloseInternalLog[rootLog.currentLog];  -- release the currentLog streams
CloseInternalLog[rootLog.expungeLog];  -- will want only read Lock
-- swap the rootLog variables currentLog & expungeLog
tempLog ← rootLog.currentLog;
rootLog.currentLog ← rootLog.expungeLog;
rootLog.expungeLog ← tempLog;
rootLog.currentLog.logSeqNo ← rootLog.expungeLog.logSeqNo;
rootLog.expungeLog.logSeqNo ← -1;
EXITS
error => ERROR WalnutDefs.Error[$root, reason, "During SwapLogs"];
END;
};
ReturnExpungeStreams: PUBLIC PROC = {
IF rootLog.expungeLog.writeStream # NIL THEN {
rootLog.expungeLog.writeStream.Close[ ! FS.Error, IO.Error => CONTINUE];
rootLog.expungeLog.writeStream ← NIL;
};
};
Managing the TempLogs
PrepareToCopyTempLog: PUBLIC PROC[ which: WhichTempLog, pagesAlreadyCopied: INT, reportProc: PROC[msg1, msg2, msg3: ROPENIL]]
RETURNS[currentStream, tempStream: STREAM] = {
makes sure there is enough space to copy the WhichTempLog onto the currentLog; returns NIL for the streams if there is not enough space; currentStream & tempStream are opened under the same transaction
mli: MiscLogInfo ← OpenTempStream[which];
tempLogLen, pagesNeeded, actualPages, currentInUsePages: INT;
of: FS.OpenFile;
currentStream ← rootLog.currentLog.writeStream;
IF mli.stream = NIL THEN RETURN[currentStream, NIL];
currentInUsePages ← FS.PagesForBytes[rootLog.currentLog.writeStream.GetLength[]];
tempLogLen ← mli.stream.GetLength[];
extra bytes are for start/endcopy entries
pagesNeeded ← FS.PagesForBytes[tempLogLen+200] + currentInUsePages - pagesAlreadyCopied;
actualPages ←
FS.GetInfo[of← AlpineFS.OpenFileFromStream[rootLog.currentLog.writeStream]].pages;
BEGIN
IF actualPages < pagesNeeded THEN {
FS.SetPageCount[of, pagesNeeded ! FS.Error =>
IF error.code = $quotaExceeded THEN {
reportProc["\n"]; reportProc[error.explanation]; reportProc["\n"]; GOTO quota }
ELSE REJECT];
Setting the size of the file sets a lock on the owner database. We remove this read lock after committing the transaction (so it will be downgraded)
BEGIN
streamTrans: AlpTransaction.Handle = mli.trans;
FinishTrans[alpTH: streamTrans, abort: FALSE, continue: TRUE];
AlpTransaction.UnlockOwnerDB[streamTrans, currentVolumeGroupID];
END }
EXITS
quota => {
mli.stream.Close[ ! IO.Error, FS.Error => CONTINUE];
mli.stream ← NIL;
RETURN[currentStream, NIL]
};
END;
RETURN[currentStream, mli.stream];
};
GetStreamsForCopy: PUBLIC PROC[which: WhichTempLog] RETURNS[currentStream, tempStream: STREAM] = {
tempStream ← OpenTempStream[which].stream;
RETURN[rootLog.currentLog.writeStream, tempStream];
};
FinishCopy: PUBLIC PROC[which: WhichTempLog] = {
sets the length of the tempLog to something reasonable and closes the tempStream
Also releases the write lock held on the temp log
mli: MiscLogInfo;
pages: INT;
fileHandle: FS.OpenFile;
SELECT which FROM
newMail => { mli ← rootLog.newMailLog; pages ← 200 };
readArchive => { mli ← rootLog.readArchiveLog; pages ← 1 };
ENDCASE => ERROR WalnutDefs.Error[$log, $UnknownLogFileType];
IF mli.stream = NIL THEN mli ← OpenTempStream[which];
fileHandle ← AlpineFS.OpenFileFromStream[mli.stream];
WalnutStream.SetHighWaterMark[strm: mli.stream, hwmBytes: 0, numPages: pages, setCreateDate: TRUE];
mli.stream.Close[ ! FS.Error, IO.Error => CONTINUE];
CommitAndContinue[];
AlpFile.UnlockFile[AlpineFS.GetAlpFileHandle[fileHandle]];
-- be careful to throw away the file lock
FS.Close[fileHandle];
mli.stream ← NIL;
};
AbortTempCopy: PUBLIC PROC[which: WhichTempLog, strm: STREAM] = {
mli: MiscLogInfo;
fileHandle: AlpFile.Handle;
closeError: BOOLFALSE;
SELECT which FROM
newMail => mli ← rootLog.newMailLog;
readArchive => mli ← rootLog.readArchiveLog;
ENDCASE => ERROR WalnutDefs.Error[$log, $UnknownLogFileType];
IF mli.stream = NIL THEN RETURN;
IF mli.stream # strm THEN ERROR WalnutDefs.Error[$root, $UnknownStreamReturned];
fileHandle ← AlpineFS.GetAlpFileHandle[file: AlpineFS.OpenFileFromStream[strm]];
strm.Close[ ! FS.Error, IO.Error => { closeError ← TRUE; CONTINUE} ];
IF ~closeError THEN AlpFile.Close[fileHandle ! AlpineFile.Unknown => CONTINUE];
mli.stream ← NIL;
};
Access to streams for the various logs, other values parsed from the rootFile
GetNewMailStream: PUBLIC PROC[lengthRequired: INT, pagesWanted: INT]
RETURNS[strm: STREAM, actualLen: INT] = {
mStream: STREAM;
CheckRootStamp[];
IF rootLog.newMailLog.stream # NIL THEN RETURN[NIL, -1];
IF seriousDebugging THEN StatsReport["\n### Trans for newMailLog"];
[rootLog.newMailLog.rootOpenFile, rootLog.newMailLog.trans] ← GetRootReadLock[];
mStream ← rootLog.newMailLog.stream ← StreamOpen[
name: rootLog.newMailLog.name, trans: rootLog.newMailLog.trans, exclusive: TRUE];
IF (actualLen ← mStream.GetLength[]) < lengthRequired THEN {
CloseMiscLog[rootLog.newMailLog];
RETURN[NIL, actualLen];
};
IF actualLen > lengthRequired THEN {
WalnutStream.SetHighWaterMark[mStream, lengthRequired, pagesWanted, TRUE];
mStream.Flush[];
FinishTrans[alpTH: rootLog.newMailLog.trans, abort: FALSE, continue: TRUE];
};
RETURN[mStream, actualLen]
};
GetReadArchiveStream: PUBLIC PROC[pagesWanted: INT ← -1] RETURNS[STREAM] = {
CheckRootStamp[];
IF rootLog.readArchiveLog.stream # NIL THEN RETURN[NIL];
rootLog.readArchiveLog.stream ← StreamOpen[
name: rootLog.readArchiveLog.name,
trans: rootLog.alpTH,
pages: pagesWanted,
exclusive: TRUE];
RETURN[rootLog.readArchiveLog.stream]
};
KeyIs: PUBLIC PROC RETURNS[ROPE] = {
CheckRootStamp[];
RETURN[rootLog.keyFromRootFile]
};
Mailfor: PUBLIC PROC RETURNS[ROPE] = {
CheckRootStamp[];
RETURN[rootLog.mailForFromRootFile]
};
DBName: PUBLIC PROC RETURNS[ROPE] = {
CheckRootStamp[];
RETURN[rootLog.dbNameFromRootFile]
};
ReturnNewMailStream: PUBLIC PROC[newMailStream: STREAM] = {
IF newMailStream # rootLog.newMailLog.stream THEN RETURN;
rootLog.newMailLog.stream ← NIL;
newMailStream.Close[ ! FS.Error, IO.Error => CONTINUE];
IF rootLog.newMailLog.rootOpenFile # FS.nullOpenFile THEN {
trans: AlpTransaction.Handle ← rootLog.newMailLog.trans;
FS.Close[rootLog.newMailLog.rootOpenFile ! FS.Error => CONTINUE];
rootLog.newMailLog.rootOpenFile ← FS.nullOpenFile;
rootLog.newMailLog.trans ← NIL;
FinishTrans[alpTH: trans, abort: FALSE, continue: FALSE];
};
};
ReturnReadArchiveStream: PUBLIC PROC[readArchiveStream: STREAM] = {
IF readArchiveStream # rootLog.readArchiveLog.stream THEN RETURN;
rootLog.readArchiveLog.stream ← NIL;
readArchiveStream.Close[ ! FS.Error, IO.Error => CONTINUE];  -- assume flushed
};
StreamToWhich: PUBLIC PROC[stream: STREAM] RETURNS[ROPE] = {
IF stream = rootLog.currentLog.readStream OR stream = rootLog.currentLog.writeStream OR
stream = rootLog.expungeLog.readStream THEN RETURN["currentLog"];
IF stream = rootLog.expungeLog.writeStream THEN RETURN["expungeLog"];
IF stream = rootLog.newMailLog.stream THEN RETURN["newMailLog"];
IF stream = rootLog.readArchiveLog.stream THEN RETURN["readArchiveLog"];
RETURN["unknownLog"];
};
Utilities
RegisterStatsProc: PUBLIC PROC[proc: PROC[ROPE]] =
{ statsProc ← proc };
UnregisterStatsProc: PUBLIC PROC[proc: PROC[ROPE]] =
{ IF statsProc = proc THEN statsProc ← NIL };
StatsReport: PUBLIC PROC[r: ROPE] =
{ IF statsProc # NIL THEN statsProc[r]; };
Local Utilities
CreateTrans: PROC RETURNS[alpTH: AlpTransaction.Handle] = {
needRetry: BOOLFALSE;
haveRetried: BOOLFALSE;
DO
IF rootLog.transInfo.handle = NIL THEN
rootLog.transInfo.handle ← AlpInstance.Create[rootLog.transInfo.fileStore ! AlpInstance.Failed =>
IF why = authenticateFailed THEN
ERROR WalnutDefs.Error[$root, $BadUserPassword, "Can't create AlpInstance.Handle"]
ELSE ERROR WalnutDefs.Error[$root, $communication, rootLog.transInfo.fileStore]
];
alpTH ← AlpTransaction.Create[rootLog.transInfo.handle !
AlpTransaction.OperationFailed => IF why = busy THEN
ERROR WalnutDefs.Error[$root, $serverBusy, rootLog.transInfo.fileStore];
RPC.CallFailed => TRUSTED {
IF why = unbound THEN {needRetry ← TRUE; CONTINUE}
a moderately likely failure, due to the instance cache
ELSE IF why IN [timeout .. busy] THEN
ERROR WalnutDefs.Error[$root, $communication, rootLog.transInfo.fileStore]}
];
IF NOT needRetry THEN EXIT;
IF haveRetried THEN
ERROR WalnutDefs.Error[$root, $communication, rootLog.transInfo.fileStore];
needRetry ← FALSE; haveRetried ← TRUE;
ENDLOOP;
IF seriousDebugging THEN
StatsReport[IO.PutFR["\n&&& CreateTransID: %g",
IO.rope[TransIDToRope[alpTH]] ] ];
};
FinishTrans: PROC[alpTH: AlpTransaction.Handle, abort: BOOL, continue: BOOL] = {
outcome: AlpTransaction.Outcome;
IF alpTH = NIL THEN RETURN;
IF seriousDebugging THEN
StatsReport[IO.PutFR["\n***[old: %g, abort: %g, continue: %g]",
IO.rope[TransIDToRope[alpTH]], IO.bool[abort], IO.bool[continue]] ];
outcome ← alpTH.Finish[
requestedOutcome: IF abort THEN abort ELSE commit,
continue: continue ! RPC.CallFailed =>
IF why IN [timeout .. busy] THEN {
IF abort THEN {outcome ← abort; CONTINUE} -- so can escape when no communication!
ELSE ERROR WalnutDefs.Error[$root, $communication, rootLog.transInfo.fileStore]
}
ELSE ERROR WalnutDefs.Error[$root, $ProtocolError, rootLog.transInfo.fileStore]
];
IF seriousDebugging AND continue THEN
StatsReport[IO.PutFR[" NewTransID: %g", IO.rope[TransIDToRope[alpTH]]] ];
IF NOT abort AND outcome # commit THEN {
StatsReport[IO.PutFR["\n TransactionAbort on %g", IO.rope[rootLog.transInfo.fileStore]]];
ERROR WalnutDefs.Error[$root, $TransactionAbort, rootLog.transInfo.fileStore];
};
};
CheckDBTrans: PROC[who: ROPE] = {
dbTrans: AlpTransaction.Handle ←
NARROW[DB.GetSegmentInfo[rootLog.segmentID.segment].trans];
IF dbTrans = rootLog.alpTH THEN RETURN;
SIGNAL BogusDBTrans;
StatsReport[IO.PutFR["\n-> -> During %g: DBTrans is: %g, rootTrans is: %g",
IO.rope[who],
IO.rope[TransIDToRope[rootLog.alpTH]], IO.rope[TransIDToRope[dbTrans]] ] ];
};
TransIDToRope: PROC[alpTH: AlpTransaction.Handle] RETURNS[ROPE] = {
trueTransID: TransID;
IF alpTH = NIL THEN RETURN["NIL"];
trueTransID ← LOOPHOLE[alpTH.transID];
RETURN[Convert.RopeFromInt[trueTransID.idOnFileStore]];
};
GetRootReadLock: PROC RETURNS[of: FS.OpenFile, alpTH: AlpTransaction.Handle] = {
reason: ATOM;
rootFileStamp: GMT;
alpTH ← CreateTrans[];
BEGIN ENABLE FS.Error => { reason ← error.code; GOTO error};
of ← AlpineFS.Open[
name: rootLog.name, lock: [$read, $fail], transHandle: alpTH,
options: rootOpenOptions ];
rootFileStamp ← FS.GetInfo[of].created;
EXITS
error => {
FS.Close[of ! FS.Error => CONTINUE];
FinishTrans[alpTH, FALSE, FALSE];
ERROR WalnutDefs.Error[$root, reason, rootLog.name];
};
END;
CheckStamp[rootFileStamp];
};
rootOpenOptions: AlpineFS.FileOptions ← [
updateCreateTime: FALSE,
referencePattern: sequential,
recoveryOption: $log,
finishTransOnClose: FALSE];
CloseInternalLog: PROC[ili: InternalLogInfo] = {
IF ili = NIL THEN RETURN;
IF ili.writeStream # NIL THEN {
ili.writeStream.Close[ ! FS.Error, IO.Error => CONTINUE];
ili.writeStream ← NIL;
};
IF ili.readStream # NIL THEN {
ili.readStream.Close[ ! FS.Error, IO.Error => CONTINUE];
ili.readStream ← NIL;
};
};
CloseMiscLog: PROC[mli: MiscLogInfo, abortIfExternal: BOOLFALSE] = {
if abortIfExternal and rootOpenFile#nullOpenFile then abort
strm: STREAM;
IF mli = NIL THEN RETURN;
strm ← mli.stream;
mli.stream ← NIL;
IF mli.rootOpenFile # NIL THEN {  -- is external
of: FS.OpenFile ← mli.rootOpenFile;
mli.rootOpenFile ← FS.nullOpenFile;
IF strm # NIL THEN strm.Close[! IO.Error, FS.Error => CONTINUE];
IF mli.trans # NIL THEN
FinishTrans[alpTH: mli.trans, abort: abortIfExternal, continue: FALSE !
FS.Error => CONTINUE];
FS.Close[of ! FS.Error => CONTINUE];
}
ELSE
IF strm # NIL THEN strm.Close[ ! FS.Error, IO.Error => CONTINUE];
};
OpenTempStream: PROC[which: WhichTempLog] RETURNS[mli: MiscLogInfo] = {
SELECT which FROM
newMail => mli ← rootLog.newMailLog;
readArchive => mli ← rootLog.readArchiveLog;
ENDCASE => ERROR WalnutDefs.Error[$log, $UnknownLogFileType];
IF mli.stream # NIL THEN {
mli.stream.Close[ ! IO.Error, FS.Error => CONTINUE];
mli.stream ← NIL;
};
mli.stream ← StreamOpen[name: mli.name, trans: rootLog.alpTH, exclusive: FALSE, closeFSOpenFile: FALSE];
mli.trans ← rootLog.alpTH
};
CheckRootStamp: PROC[] =
{ CheckStamp[GetRootFileStamp[]] };
CheckStamp: PROC[created: GMT] = {
IF created # rootLog.createDate THEN
ERROR WalnutDefs.Error[$root, $WrongCreateDate,
IO.PutFR[" Create date is : %g, Create date expected is: %g",
IO.time[created], IO.time[rootLog.createDate]]];
};
NextRootEntry: PROC[strm: STREAM] RETURNS[rte: WalnutKernelDefs.RootEntry] = {
[]← strm.SkipWhitespace[flushComments: TRUE];
rte.ident ← strm.GetTokenRope[].token;
IF rte.ident.Equal["End", FALSE] THEN RETURN;
[]← strm.SkipWhitespace[flushComments: TRUE];
IF rte.ident.Equal["LogInfo", FALSE] THEN {
lif: WalnutKernelDefs.LogInfoFromRoot;
lif.fileName ← strm.GetTokenRope[IO.IDProc].token;
[]← strm.SkipWhitespace[flushComments: TRUE];
lif.internalFileID ← strm.GetInt[];
[]← strm.SkipWhitespace[flushComments: TRUE];
lif.seqNoPos ← strm.GetIndex[];
lif.logSeqNo ← strm.GetTokenRope[].token;
[]← strm.SkipWhitespace[flushComments: TRUE];
rte.info ← lif;
}
ELSE
IF rte.ident.Equal["Key", FALSE] THEN rte.value ← strm.GetLineRope[]
ELSE {
rte.value ← strm.GetTokenRope[IO.IDProc].token;
[]← strm.SkipWhitespace[flushComments: TRUE];
};
RETURN[rte];
};
FileStoreForRoot: PROC[rootName: ROPE] RETURNS[fileStore: ROPE] = {
cp: FS.ComponentPositions ← FS.ExpandName[rootName].cp;
IF cp.server.length = 0 THEN
WalnutDefs.Error[$root, $InvalidName, rootName];
RETURN[rootName.Substr[cp.server.start, cp.server.length]];
};
has special options (see below)
StreamOpen: PROC[name: ROPE, readOnly: BOOLFALSE, pages: INT ← 200,
  trans: AlpTransaction.Handle← NIL, exclusive: BOOLFALSE, closeFSOpenFile: BOOLTRUE]
RETURNS [strm: STREAM] = {
openFile: AlpineFS.OpenFile ← [];
IF readOnly THEN {
openFile ← AlpineFS.Open[
name: name, options: alpineFileOptions, lock: [$read, $fail], transHandle: trans];
strm ← AlpineFS.StreamFromOpenFile[openFile: openFile, streamOptions: alpineStreamOptions, streamBufferParms: streamBufferOption];
}
ELSE {
actualPages: INT;
openFile ← AlpineFS.Open[
name: name,
access: $write,
lock: IF exclusive THEN [$write, $fail] ELSE [$read, $wait],
options: alpineFileOptions,
transHandle: trans];
actualPages ← FS.GetInfo[openFile].pages;
IF pages > actualPages THEN FS.SetPageCount[openFile, pages];
If the file is going to be unlocked, then this option must be false
alpineStreamOptions.closeFSOpenFileOnClose ← closeFSOpenFile;
strm ← AlpineFS.StreamFromOpenFile[openFile: openFile, accessRights: $write, initialPosition: $start, streamOptions: alpineStreamOptions,
streamBufferParms: streamBufferOption];
};
};
alpineFileOptions: AlpineFS.FileOptions ← [
updateCreateTime: FALSE,
referencePattern: sequential,
recoveryOption: $log,
finishTransOnClose: FALSE
];
streamBufferOption: FS.StreamBufferParms = [vmPagesPerBuffer: 4, nBuffers: 4];
nFreeTuples: NAT ← 256;
DeclareDBSegment: PROC[rootLog: RootLog, newSegmentOk: BOOL] = {
protectionViolation: BOOLFALSE;
DB.DeclareSegment[
filePath: rootLog.dbNameFromRootFile,
segment: rootLog.segmentID.segment,
number: rootLog.segmentID.index,
readonly: rootLog.readOnly,
createIfNotFound: newSegmentOk, nPagesInitial: 256, nPagesPerExtent: 256 ];
};
OpenDBTrans: PROC[rootLog: RootLog] = {
inaccessible: BOOLFALSE;
reason: ROPE ← "unknown";
DB.OpenTransaction[segment: rootLog.segmentID.segment, useTrans: rootLog.alpTH !
DB.Aborted => GOTO abort;
DBEnvironment.Error => {
IF code = TransactionAlreadyOpen THEN CONTINUE;
inaccessible ← TRUE;
SELECT code FROM
ProtectionViolation => reason ← "database is read only";
DirectoryNotFound => reason ← "directory not found";
FileNotFound => reason ← "file not found";
ServerNotFound => reason ← "server not found";
QuotaExceeded => reason ← "quota exceeded";
ENDCASE => REJECT;
CONTINUE;
};
DBEnvironment.Failure => {
inaccessible ← TRUE;
SELECT what FROM
$communication => reason ← "cannot connect to server";
ENDCASE => REJECT;
CONTINUE;
};
];
IF inaccessible THEN ERROR WalnutDefs.Error[$db, $DatabaseInaccessible, reason];
EXITS
abort => ERROR WalnutDefs.Error[$db, $TransactionAbort, "During open database"];
};
DB.Initialize[nFreeTuples: nFreeTuples];
END.