IMPORTS
AlpFile, AlpineFile, AlpineFS, AlpInstance, AlpTransaction,
BasicTime, Convert, DB, DBEnvironment, FS, IO, Rope, RPC,
WalnutDefs, WalnutStream
Types
GMT: TYPE = BasicTime.GMT;
ROPE: TYPE = Rope.ROPE;
STREAM: TYPE = IO.STREAM;
WhichTempLog: TYPE = WalnutKernelDefs.WhichTempLog;
TransInfo:
TYPE =
RECORD [
fileStore: AlpInstance.FileStore,
handle: AlpInstance.Handle];
nullTransInfo: TransInfo ← [NIL, NIL];
InternalLogInfo: TYPE = REF InternalLogInfoObject;
InternalLogInfoObject:
TYPE =
RECORD [
readStream: STREAM, -- not used for expungeLog
writeStream: STREAM,
name: ROPE ← NIL, -- rope name of this log
internalFileID: INT,
logSeqNo: INT ← -1,
logSeqNoPos: INT ← -1 -- position in rootLog of logSeqNo
];
MiscLogInfo: TYPE = REF MiscLogInfoObject;
MiscLogInfoObject:
TYPE =
RECORD [
stream: STREAM,
name: ROPE ← NIL,
trans: AlpTransaction.Handle ← NIL,
rootOpenFile: FS.OpenFile ← FS.nullOpenFile -- may have its own transaction
];
RootLog: TYPE = REF RootLogObject;
RootLogObject:
TYPE =
RECORD [
name: ROPE,
createDate: GMT ← BasicTime.nullGMT,
readOnly: BOOL ← FALSE,
keyFromRootFile: ROPE ← NIL,
mailForFromRootFile: ROPE ← NIL,
dbNameFromRootFile: ROPE ← NIL,
segmentID: WalnutKernelDefs.SegmentID ← defaultSegmentID,
transInfo: TransInfo ← nullTransInfo,
alpTH: AlpTransaction.Handle ← NIL, -- current transaction
rootOpenFileForLogs: FS.OpenFile ← FS.nullOpenFile,
currentLog: InternalLogInfo ← NIL, -- only one log for now
expungeLog: InternalLogInfo ← NIL,
newMailLog: MiscLogInfo ← NIL,
readArchiveLog: MiscLogInfo ← NIL
];
TransID:
TYPE =
MACHINE
DEPENDENT
RECORD [
-- taken from ConcreteTransID
randomBits: INT,
idOnFileStore: INT,
fileStore: AlpineEnvironment.VolumeID
];
defaultSegmentID: WalnutKernelDefs.SegmentID ← [$Walnut, 0];
Opening/closing the rootFile
for opening the rootFile
fileOptions: AlpineFS.FileOptions = [
updateCreateTime: FALSE, -- is done explicitly
referencePattern: sequential,
recoveryOption: $log,
finishTransOnClose: FALSE];
for the stream for the rootFile - it is a small file
rootBufferOption: FS.StreamBufferParms = [vmPagesPerBuffer: 4, nBuffers: 1];
alpineStreamOptions: AlpineFS.StreamOptions ← [
tiogaRead: FALSE,
commitAndReopenTransOnFlush: FALSE, -- just flush
truncatePagesOnClose: FALSE,
finishTransOnClose: FALSE,
closeFSOpenFileOnClose: TRUE -- Note: this option is changed for opening temp logs when doing copies, so that we can Unlock the file before closing it
];
Open:
PUBLIC
PROC[ rootName:
ROPE, readOnly:
BOOL ←
FALSE, newSegmentOk:
BOOL ←
FALSE]
RETURNS[ key, mailFor:
ROPE, rootFileCreateDate:
GMT, segment: WalnutDefs.Segment, isReadOnly:
BOOL] = {
newMailLogName, readArchiveLogName: ROPE;
logInfoList: LIST OF WalnutKernelDefs.LogInfoFromRoot;
fileName, dbName: ROPE;
isReadOnly ← FALSE;
IF rootLog #
NIL
THEN
ERROR WalnutDefs.Error[$root, $RootAlreadyOpen, "Must do a Close"];
IF seriousDebugging
THEN {
trans: AlpTransaction.Handle ←
NARROW[DB.GetSegmentInfo[$Walnut].trans];
IF trans # NIL THEN SIGNAL BogusDBTrans;
DB.AbortCache[trans];
};
BEGIN
logInfoCount: INT ← 0;
reason: ATOM;
openFile: FS.OpenFile;
rStream: STREAM;
rootLog ← NEW[RootLogObject ← [name: rootName, readOnly: readOnly]];
rootLog.transInfo ← [fileStore: FileStoreForRoot[rootName]];
rootLog.alpTH ← CreateTrans[];
currentVolumeGroupID ← AlpTransaction.GetNextVolumeGroup[rootLog.alpTH];
rootLog.segmentID ← [$Walnut, 0]; -- eventually from rootFile
BEGIN ENABLE FS.Error => { reason ← error.code; GOTO error };
openFile ← AlpineFS.Open[
name: rootName, access: $write, lock: [$write, $wait],
options: fileOptions, transHandle: rootLog.alpTH];
FS.Close[openFile];
EXITS error =>
IF reason = $unknownFile
THEN
ERROR WalnutDefs.Error[$root, $RootNotFound, rootName]
ELSE
IF reason = $accessDenied THEN isReadOnly ← TRUE
ELSE ERROR WalnutDefs.Error[$root, reason, rootName];
END;
rootLog.readOnly ← isReadOnly;
BEGIN
ENABLE
FS.Error => { reason ← error.code;
GOTO error };
openFile ← AlpineFS.Open[
name: rootName, access: $read, lock: [$read, $wait],
options: fileOptions, transHandle: rootLog.alpTH];
rStream ← AlpineFS.StreamFromOpenFile[
openFile: openFile,
streamOptions: alpineStreamOptions,
streamBufferParms: streamBufferOption];
rootFileCreateDate ← rootLog.createDate ← FS.GetInfo[openFile].created;
EXITS error =>
ERROR WalnutDefs.Error[$root, reason, rootName];
END;
parse the root file to get information about the database, the newMail, readArchive, current and expunge logs
BEGIN ENABLE IO.EndOfStream, IO.Error => GOTO cantParse;
finished: BOOL ← FALSE;
rStream.SetIndex[0];
DO
curPos: INT ← rStream.GetIndex[];
rootEntry: WalnutKernelDefs.RootEntry ← NextRootEntry[rStream];
DO
IF rootEntry.ident.Equal["Key", FALSE] THEN {
key ← rootEntry.value; EXIT
};
IF rootEntry.ident.Equal["MailFor",
FALSE]
THEN {
mailFor ← rootEntry.value; EXIT
};
IF rootEntry.ident.Equal["Database",
FALSE]
THEN {
dbName ← rootEntry.value; EXIT
};
IF rootEntry.ident.Equal["NewMailLog",
FALSE]
THEN {
newMailLogName ← rootEntry.value; EXIT
};
IF rootEntry.ident.Equal["ReadArchiveLog",
FALSE]
THEN {
readArchiveLogName ← rootEntry.value; EXIT
};
IF rootEntry.ident.Equal["LogInfo",
FALSE]
THEN {
logInfoList ← CONS[rootEntry.info, logInfoList];
logInfoCount ← logInfoCount + 1;
EXIT
};
IF rootEntry.ident.Equal["End",
FALSE]
THEN {
finished ← TRUE;
EXIT
};
rStream.Close[ ! IO.Error, FS.Error => CONTINUE];
FinishTrans[rootLog.alpTH, FALSE, FALSE ! WalnutDefs.Error => CONTINUE];
rootLog ← NIL;
ERROR WalnutDefs.Error[$log, $BadRootFile,
IO.PutFR["Unknown entry %g in RootFile at pos %g",
IO.rope[rootEntry.ident], IO.int[curPos]]];
ENDLOOP;
IF finished THEN EXIT;
ENDLOOP;
EXITS
cantParse => {
rStream.Close[ ! IO.Error, FS.Error => CONTINUE];
FinishTrans[rootLog.alpTH, FALSE, FALSE ! WalnutDefs.Error => CONTINUE];
rootLog ← NIL;
ERROR WalnutDefs.Error[$log, $BadRootFile, "Couldn't parse root file"];
};
END;
rStream.Close[];
rStream← NIL; -- not needed
IF key.Length[] = 0
OR dbName.Length[] = 0
OR mailFor.Length[] = 0
OR
newMailLogName.Length[] = 0 OR readArchiveLogName.Length[] = 0 OR
logInfoCount # 2
THEN
ERROR WalnutDefs.Error[
$root, $BadRootFile, "Incomplete RootFile - missing entries"];
check that all the alpine-server names are the same
BEGIN
cp: FS.ComponentPositions;
fn: ROPE;
rServer, oServer: ROPE;
CheckServer:
PROC[name:
ROPE] = {
[fn, cp, ] ← FS.ExpandName[name];
oServer ← Rope.Substr[fn, cp.server.start, cp.server.length];
IF ~Rope.Equal[rServer, oServer,
FALSE]
THEN {
rName: ROPE ← rootLog.name;
FS.Close[openFile ! FS.Error => CONTINUE];
FinishTrans[rootLog.alpTH, FALSE, FALSE ! WalnutDefs.Error => CONTINUE];
rootLog ← NIL;
ERROR WalnutDefs.Error[$root, $BadRootFile,
IO.PutFR["server for file %g does not agree with that for %g",
IO.rope[name], IO.rope[rName]] ];
};
};
[fn, cp, ] ← FS.ExpandName[rootLog.name];
rServer ← Rope.Substr[fn, cp.server.start, cp.server.length];
CheckServer[dbName];
CheckServer[newMailLogName];
CheckServer[readArchiveLogName];
CheckServer[logInfoList.first.fileName];
CheckServer[logInfoList.rest.first.fileName];
END;
rootLog.keyFromRootFile ← key;
rootLog.mailForFromRootFile ← mailFor;
rootLog.dbNameFromRootFile ← dbName;
FS.Close[openFile ! FS.Error => CONTINUE];
FinishTrans[rootLog.alpTH, FALSE, FALSE ! WalnutDefs.Error => CONTINUE];
rootLog.alpTH ← NIL;
END;
set up the internal log objects for all the logs
FOR lil:
LIST
OF WalnutKernelDefs.LogInfoFromRoot ← logInfoList, lil.rest
UNTIL lil =
NIL
DO
seqNo: ROPE ← lil.first.logSeqNo;
fileName ← lil.first.fileName;
IF seqNo.Fetch[0] # 'E
THEN {
-- this is the current log
rootLog.currentLog ←
NEW[InternalLogInfoObject ← [
name: fileName,
internalFileID: lil.first.internalFileID,
logSeqNo: Convert.IntFromRope[seqNo],
logSeqNoPos: lil.first.seqNoPos
]];
}
ELSE
-- this is the expunge log
rootLog.expungeLog ←
NEW[InternalLogInfoObject ← [
name: fileName,
internalFileID: lil.first.internalFileID,
logSeqNo: -1,
logSeqNoPos: lil.first.seqNoPos
]];
ENDLOOP;
IF ~isReadOnly
THEN {
rootLog.newMailLog ← NEW[MiscLogInfoObject ← [name: newMailLogName]];
rootLog.readArchiveLog ← NEW[MiscLogInfoObject ← [name: readArchiveLogName]];
}
ELSE rootLog.expungeLog ← NIL; -- is readOnly, can't have expungeLog
segment ← rootLog.segmentID.segment;
DeclareDBSegment[rootLog, newSegmentOk];
};
StartTransaction:
PUBLIC
PROC = {
IF rootLog.alpTH #
NIL
THEN {
BEGIN
IF seriousDebugging THEN CheckDBTrans["StartTransaction"];
DB.EndTransaction[rootLog.alpTH ! DB.Aborted, DB.Error, DB.Failure => GOTO err];
EXITS
err =>
DB.AbortCache[rootLog.alpTH ! DB.Aborted, DB.Error, DB.Failure => CONTINUE];
END;
FS.Close[rootLog.rootOpenFileForLogs ! FS.Error => CONTINUE];
FinishTrans[alpTH: rootLog.alpTH, abort:
FALSE, continue:
FALSE !
WalnutDefs.Error => CONTINUE];
};
[rootLog.rootOpenFileForLogs, rootLog.alpTH] ← GetRootReadLock[];
OpenDBTrans[rootLog];
};
Shutdown:
PUBLIC
PROC = {
IF rootLog = NIL THEN RETURN;
CloseInternalLog[rootLog.currentLog];
CloseInternalLog[rootLog.expungeLog];
CloseMiscLog[mli: rootLog.readArchiveLog];
CloseMiscLog[mli: rootLog.newMailLog, abortIfExternal: TRUE];
IF seriousDebugging THEN CheckDBTrans["Shutdown"];
DB.EndTransaction[rootLog.alpTH ! DB.Aborted, DB.Error, DB.Failure => CONTINUE];
don't care about errors from FinishTrans
FS.Close[rootLog.rootOpenFileForLogs ! FS.Error => CONTINUE];
FinishTrans[alpTH: rootLog.alpTH, abort: FALSE, continue: FALSE ! WalnutDefs.Error => CONTINUE];
rootLog← NIL;
};
CloseTransaction:
PUBLIC
PROC = {
IF rootLog = NIL THEN RETURN;
CloseInternalLog[rootLog.currentLog];
CloseInternalLog[rootLog.expungeLog];
CloseMiscLog[rootLog.readArchiveLog];
IF seriousDebugging THEN CheckDBTrans["CloseTransaction"];
BEGIN
DB.EndTransaction[rootLog.alpTH ! DB.Aborted, DB.Error, DB.Failure => GOTO err];
EXITS
err => DB.AbortCache[rootLog.alpTH ! DB.Aborted, DB.Error, DB.Failure => CONTINUE];
END;
FS.Close[rootLog.rootOpenFileForLogs ! FS.Error => CONTINUE];
rootLog.rootOpenFileForLogs ← FS.nullOpenFile;
FinishTrans[rootLog.alpTH, FALSE, FALSE ! WalnutDefs.Error => CONTINUE];
rootLog.alpTH ← NIL;
};
CommitAndContinue:
PUBLIC
PROC[flushDB:
BOOL ←
TRUE] = {
eCode: ATOM;
BEGIN
ENABLE
BEGIN
DB.Aborted => {eCode ← $TransactionAbort; GOTO error };
DB.Error => {eCode ← $DBError; GOTO error };
DB.Failure => {eCode ← $DatabaseInaccessible; GOTO error };
END;
IF seriousDebugging THEN CheckDBTrans["CommitAndContinue"];
IF flushDB THEN DB.FlushCache[rootLog.alpTH];
EXITS
error => ERROR WalnutDefs.Error[$db, eCode, "Can't flush"];
END;
FinishTrans[alpTH: rootLog.alpTH, abort: FALSE, continue: TRUE];
AlpTransaction.UnlockOwnerDB[rootLog.alpTH, currentVolumeGroupID]
};
AbortTransaction:
PUBLIC
PROC = {
IF rootLog = NIL THEN RETURN;
IF rootLog.alpTH =
NIL
THEN {
IF seriousDebugging THEN StatsReport["\nAbort of NIL trans"]; RETURN };
CloseInternalLog[rootLog.currentLog];
CloseInternalLog[rootLog.expungeLog];
CloseMiscLog[rootLog.readArchiveLog];
IF seriousDebugging THEN CheckDBTrans["AbortTransaction"];
DB.AbortCache[rootLog.alpTH];
FinishTrans[alpTH: rootLog.alpTH, abort: TRUE, continue: FALSE];
rootLog.alpTH ← NIL;
};
EraseDB:
PUBLIC
PROC = {
code: ATOM;
IF rootLog =
NIL
THEN
ERROR WalnutDefs.Error[$root, $RootNotOpen, "Trying to do an EraseDB"];
IF seriousDebugging THEN CheckDBTrans["EraseDB"];
DB.EndTransaction[rootLog.alpTH ! DB.Aborted, DB.Error, DB.Failure => CONTINUE];
DB.DeclareSegment[
filePath: rootLog.dbNameFromRootFile,
segment: rootLog.segmentID.segment,
number: rootLog.segmentID.index,
readonly: FALSE, nPagesInitial: 256, nPagesPerExtent: 256];
BEGIN
ENABLE
BEGIN
DB.Error, DB.Failure => { code ← $DBError; GOTO error};
DB.Failure => { code ← $DatabaseInaccessible; GOTO error};
DB.Aborted => { code ← $TransactionAbort; GOTO error};
END;
DB.EraseSegment[rootLog.segmentID.segment, rootLog.alpTH]; -- leaves trans open
EXITS
error => {
DB.AbortCache[rootLog.alpTH];
ERROR WalnutDefs.Error[$db, code, "Can't erase"];
};
END;
};
FlushAndContinue:
PUBLIC
PROC[strm:
STREAM] = {
mli: MiscLogInfo;
IF strm # (mli ← rootLog.newMailLog).stream
THEN
ERROR WalnutDefs.Error[$root, $UnknownStream];
WalnutStream.FlushStream[strm: strm, setCreateDate: TRUE];
FinishTrans[alpTH: mli.trans, abort: FALSE, continue: TRUE]
};
GetRootFileStamp:
PUBLIC PROC
RETURNS[rootFileStamp:
GMT] = {
reason: ATOM;
BEGIN
ENABLE
FS.Error => { reason ← error.code;
GOTO error };
of:
FS.OpenFile ←
AlpineFS.Open[
name: rootLog.name,
options: fileOptions,
lock: [$read, fail],
transHandle: rootLog.alpTH];
rootFileStamp ← FS.GetInfo[of].created;
FS.Close[of];
EXITS
error =>
ERROR WalnutDefs.Error[$root, reason, rootLog.name];
END;
};
GetExpungeID:
PUBLIC
PROC
RETURNS[expungeFileID:
INT] =
{ RETURN[rootLog.expungeLog.internalFileID] };
Managing the CurrentLog
AcquireWriteLock:
PUBLIC
PROC = {
of: AlpineFS.OpenFile ← AlpineFS.OpenFileFromStream[rootLog.currentLog.writeStream];
AlpineFS.SetLockOption[of, [$write, $wait] ]
};
GetCurrentLogStreams:
PUBLIC
PROC
RETURNS[readStream, writeStream:
STREAM] = {
IF rootLog =
NIL
OR rootLog.alpTH =
NIL
THEN
ERROR WalnutDefs.Error[$log, $TransNotOpen, "Must do Shutdown and Startup"];
IF rootLog.readOnly
THEN rootLog.currentLog.readStream ←
StreamOpen[name: rootLog.currentLog.name, readOnly: TRUE, trans: rootLog.alpTH]
ELSE {
rootLog.currentLog.writeStream ←
StreamOpen[name: rootLog.currentLog.name, trans: rootLog.alpTH];
rootLog.currentLog.readStream ←
AlpineFS.StreamFromOpenStream[rootLog.currentLog.writeStream];
};
RETURN[rootLog.currentLog.readStream, rootLog.currentLog.writeStream]
};
ReleaseWriteLock:
PUBLIC
PROC
RETURNS[readStream, writeStream:
STREAM] = {
-- uses same trans as before
CloseInternalLog[rootLog.currentLog];
rootLog.currentLog.writeStream ←
StreamOpen[name: rootLog.currentLog.name, trans: rootLog.alpTH];
rootLog.currentLog.readStream ←
AlpineFS.StreamFromOpenStream[rootLog.currentLog.writeStream];
RETURN[rootLog.currentLog.readStream, rootLog.currentLog.writeStream];
};
ReturnCurrentLogStreams:
PUBLIC
PROC = {
IF rootLog = NIL THEN RETURN;
CloseInternalLog[rootLog.currentLog];
CloseInternalLog[rootLog.expungeLog];
FinishTrans[rootLog.alpTH, FALSE, TRUE];
};
Managing the TempLogs
PrepareToCopyTempLog:
PUBLIC
PROC[ which: WhichTempLog, pagesAlreadyCopied:
INT, reportProc:
PROC[msg1, msg2, msg3:
ROPE ←
NIL]]
RETURNS[currentStream, tempStream:
STREAM] = {
makes sure there is enough space to copy the WhichTempLog onto the currentLog; returns NIL for the streams if there is not enough space; currentStream & tempStream are opened under the same transaction
mli: MiscLogInfo ← OpenTempStream[which];
tempLogLen, pagesNeeded, actualPages, currentInUsePages: INT;
of: FS.OpenFile;
currentStream ← rootLog.currentLog.writeStream;
IF mli.stream = NIL THEN RETURN[currentStream, NIL];
currentInUsePages ← FS.PagesForBytes[rootLog.currentLog.writeStream.GetLength[]];
tempLogLen ← mli.stream.GetLength[];
extra bytes are for start/endcopy entries
pagesNeeded ← FS.PagesForBytes[tempLogLen+200] + currentInUsePages - pagesAlreadyCopied;
actualPages ←
FS.GetInfo[of← AlpineFS.OpenFileFromStream[rootLog.currentLog.writeStream]].pages;
BEGIN
IF actualPages < pagesNeeded
THEN {
FS.SetPageCount[of, pagesNeeded !
FS.Error =>
IF error.code = $quotaExceeded
THEN {
reportProc["\n"]; reportProc[error.explanation]; reportProc["\n"]; GOTO quota }
ELSE REJECT];
Setting the size of the file sets a lock on the owner database. We remove this read lock after committing the transaction (so it will be downgraded)
BEGIN
streamTrans: AlpTransaction.Handle = mli.trans;
FinishTrans[alpTH: streamTrans, abort: FALSE, continue: TRUE];
AlpTransaction.UnlockOwnerDB[streamTrans, currentVolumeGroupID];
END }
EXITS
quota => {
mli.stream.Close[ ! IO.Error, FS.Error => CONTINUE];
mli.stream ← NIL;
RETURN[currentStream, NIL]
};
END;
RETURN[currentStream, mli.stream];
};
GetStreamsForCopy:
PUBLIC
PROC[which: WhichTempLog]
RETURNS[currentStream, tempStream:
STREAM] = {
tempStream ← OpenTempStream[which].stream;
RETURN[rootLog.currentLog.writeStream, tempStream];
};
FinishCopy:
PUBLIC
PROC[which: WhichTempLog] = {
sets the length of the tempLog to something reasonable and closes the tempStream
Also releases the write lock held on the temp log
mli: MiscLogInfo;
pages: INT;
fileHandle: FS.OpenFile;
SELECT which
FROM
newMail => { mli ← rootLog.newMailLog; pages ← 200 };
readArchive => { mli ← rootLog.readArchiveLog; pages ← 1 };
ENDCASE => ERROR WalnutDefs.Error[$log, $UnknownLogFileType];
IF mli.stream = NIL THEN mli ← OpenTempStream[which];
fileHandle ← AlpineFS.OpenFileFromStream[mli.stream];
WalnutStream.SetHighWaterMark[strm: mli.stream, hwmBytes: 0, numPages: pages, setCreateDate: TRUE];
mli.stream.Close[ ! FS.Error, IO.Error => CONTINUE];
CommitAndContinue[];
AlpFile.UnlockFile[AlpineFS.GetAlpFileHandle[fileHandle]];
-- be careful to throw away the file lock
FS.Close[fileHandle];
mli.stream ← NIL;
};
AbortTempCopy:
PUBLIC
PROC[which: WhichTempLog, strm:
STREAM] = {
mli: MiscLogInfo;
fileHandle: AlpFile.Handle;
closeError: BOOL ← FALSE;
SELECT which
FROM
newMail => mli ← rootLog.newMailLog;
readArchive => mli ← rootLog.readArchiveLog;
ENDCASE => ERROR WalnutDefs.Error[$log, $UnknownLogFileType];
IF mli.stream = NIL THEN RETURN;
IF mli.stream # strm THEN ERROR WalnutDefs.Error[$root, $UnknownStreamReturned];
fileHandle ← AlpineFS.GetAlpFileHandle[file: AlpineFS.OpenFileFromStream[strm]];
strm.Close[ ! FS.Error, IO.Error => { closeError ← TRUE; CONTINUE} ];
IF ~closeError THEN AlpFile.Close[fileHandle ! AlpineFile.Unknown => CONTINUE];
mli.stream ← NIL;
};
Access to streams for the various logs, other values parsed from the rootFile
GetNewMailStream:
PUBLIC
PROC[lengthRequired:
INT, pagesWanted:
INT]
RETURNS[strm: STREAM, actualLen: INT] = {
mStream: STREAM;
CheckRootStamp[];
IF rootLog.newMailLog.stream # NIL THEN RETURN[NIL, -1];
IF seriousDebugging THEN StatsReport["\n### Trans for newMailLog"];
[rootLog.newMailLog.rootOpenFile, rootLog.newMailLog.trans] ← GetRootReadLock[];
mStream ← rootLog.newMailLog.stream ← StreamOpen[
name: rootLog.newMailLog.name, trans: rootLog.newMailLog.trans, exclusive: TRUE];
IF (actualLen ← mStream.GetLength[]) < lengthRequired
THEN {
CloseMiscLog[rootLog.newMailLog];
RETURN[NIL, actualLen];
};
IF actualLen > lengthRequired
THEN {
WalnutStream.SetHighWaterMark[mStream, lengthRequired, pagesWanted, TRUE];
mStream.Flush[];
FinishTrans[alpTH: rootLog.newMailLog.trans, abort: FALSE, continue: TRUE];
};
RETURN[mStream, actualLen]
};
GetReadArchiveStream:
PUBLIC
PROC[pagesWanted:
INT ← -1]
RETURNS[
STREAM] = {
CheckRootStamp[];
IF rootLog.readArchiveLog.stream # NIL THEN RETURN[NIL];
rootLog.readArchiveLog.stream ← StreamOpen[
name: rootLog.readArchiveLog.name,
trans: rootLog.alpTH,
pages: pagesWanted,
exclusive: TRUE];
RETURN[rootLog.readArchiveLog.stream]
};
KeyIs:
PUBLIC PROC
RETURNS[
ROPE] = {
CheckRootStamp[];
RETURN[rootLog.keyFromRootFile]
};
Mailfor:
PUBLIC PROC
RETURNS[
ROPE] = {
CheckRootStamp[];
RETURN[rootLog.mailForFromRootFile]
};
DBName:
PUBLIC PROC
RETURNS[
ROPE] = {
CheckRootStamp[];
RETURN[rootLog.dbNameFromRootFile]
};
ReturnNewMailStream:
PUBLIC
PROC[newMailStream:
STREAM] = {
IF newMailStream # rootLog.newMailLog.stream THEN RETURN;
rootLog.newMailLog.stream ← NIL;
newMailStream.Close[ ! FS.Error, IO.Error => CONTINUE];
IF rootLog.newMailLog.rootOpenFile #
FS.nullOpenFile
THEN {
trans: AlpTransaction.Handle ← rootLog.newMailLog.trans;
FS.Close[rootLog.newMailLog.rootOpenFile ! FS.Error => CONTINUE];
rootLog.newMailLog.rootOpenFile ← FS.nullOpenFile;
rootLog.newMailLog.trans ← NIL;
FinishTrans[alpTH: trans, abort: FALSE, continue: FALSE];
};
};
ReturnReadArchiveStream:
PUBLIC PROC[readArchiveStream:
STREAM] = {
IF readArchiveStream # rootLog.readArchiveLog.stream THEN RETURN;
rootLog.readArchiveLog.stream ← NIL;
readArchiveStream.Close[ ! FS.Error, IO.Error => CONTINUE]; -- assume flushed
};
StreamToWhich:
PUBLIC
PROC[stream:
STREAM]
RETURNS[
ROPE] = {
IF stream = rootLog.currentLog.readStream
OR stream = rootLog.currentLog.writeStream
OR
stream = rootLog.expungeLog.readStream THEN RETURN["currentLog"];
IF stream = rootLog.expungeLog.writeStream THEN RETURN["expungeLog"];
IF stream = rootLog.newMailLog.stream THEN RETURN["newMailLog"];
IF stream = rootLog.readArchiveLog.stream THEN RETURN["readArchiveLog"];
RETURN["unknownLog"];
};
Local Utilities
CreateTrans:
PROC
RETURNS[alpTH: AlpTransaction.Handle] = {
needRetry: BOOL ← FALSE;
haveRetried: BOOL ← FALSE;
DO
IF rootLog.transInfo.handle =
NIL
THEN
rootLog.transInfo.handle ← AlpInstance.Create[rootLog.transInfo.fileStore ! AlpInstance.Failed =>
IF why = authenticateFailed
THEN
ERROR WalnutDefs.Error[$root, $BadUserPassword, "Can't create AlpInstance.Handle"]
ELSE ERROR WalnutDefs.Error[$root, $communication, rootLog.transInfo.fileStore]
];
alpTH ← AlpTransaction.Create[rootLog.transInfo.handle !
AlpTransaction.OperationFailed =>
IF why = busy
THEN
ERROR WalnutDefs.Error[$root, $serverBusy, rootLog.transInfo.fileStore];
RPC.CallFailed =>
TRUSTED {
IF why = unbound
THEN {needRetry ←
TRUE;
CONTINUE}
a moderately likely failure, due to the instance cache
ELSE
IF why
IN [timeout .. busy]
THEN
ERROR WalnutDefs.Error[$root, $communication, rootLog.transInfo.fileStore]}
];
IF NOT needRetry THEN EXIT;
IF haveRetried
THEN
ERROR WalnutDefs.Error[$root, $communication, rootLog.transInfo.fileStore];
needRetry ← FALSE; haveRetried ← TRUE;
ENDLOOP;
IF seriousDebugging
THEN
StatsReport[
IO.PutFR["\n&&& CreateTransID: %g",
IO.rope[TransIDToRope[alpTH]] ] ];
};
FinishTrans:
PROC[alpTH: AlpTransaction.Handle, abort:
BOOL, continue:
BOOL] = {
outcome: AlpTransaction.Outcome;
IF alpTH = NIL THEN RETURN;
IF seriousDebugging
THEN
StatsReport[
IO.PutFR["\n***[old: %g, abort: %g, continue: %g]",
IO.rope[TransIDToRope[alpTH]], IO.bool[abort], IO.bool[continue]] ];
outcome ← alpTH.Finish[
requestedOutcome: IF abort THEN abort ELSE commit,
continue: continue !
RPC.CallFailed =>
IF why
IN [timeout .. busy]
THEN {
IF abort
THEN {outcome ← abort;
CONTINUE}
-- so can escape when no communication!
ELSE ERROR WalnutDefs.Error[$root, $communication, rootLog.transInfo.fileStore]
}
ELSE ERROR WalnutDefs.Error[$root, $ProtocolError, rootLog.transInfo.fileStore]
];
IF seriousDebugging
AND continue
THEN
StatsReport[IO.PutFR[" NewTransID: %g", IO.rope[TransIDToRope[alpTH]]] ];
IF
NOT abort
AND outcome # commit
THEN {
StatsReport[IO.PutFR["\n TransactionAbort on %g", IO.rope[rootLog.transInfo.fileStore]]];
ERROR WalnutDefs.Error[$root, $TransactionAbort, rootLog.transInfo.fileStore];
};
};
CheckDBTrans:
PROC[who:
ROPE] = {
dbTrans: AlpTransaction.Handle ←
NARROW[DB.GetSegmentInfo[rootLog.segmentID.segment].trans];
IF dbTrans = rootLog.alpTH THEN RETURN;
SIGNAL BogusDBTrans;
StatsReport[
IO.PutFR["\n-> -> During %g: DBTrans is: %g, rootTrans is: %g",
IO.rope[who],
IO.rope[TransIDToRope[rootLog.alpTH]], IO.rope[TransIDToRope[dbTrans]] ] ];
};
TransIDToRope:
PROC[alpTH: AlpTransaction.Handle]
RETURNS[
ROPE] = {
trueTransID: TransID;
IF alpTH = NIL THEN RETURN["NIL"];
trueTransID ← LOOPHOLE[alpTH.transID];
RETURN[Convert.RopeFromInt[trueTransID.idOnFileStore]];
};
GetRootReadLock:
PROC
RETURNS[of:
FS.OpenFile, alpTH: AlpTransaction.Handle] = {
reason: ATOM;
rootFileStamp: GMT;
alpTH ← CreateTrans[];
BEGIN
ENABLE
FS.Error => { reason ← error.code;
GOTO error};
of ← AlpineFS.Open[
name: rootLog.name, lock: [$read, $fail], transHandle: alpTH,
options: rootOpenOptions ];
rootFileStamp ← FS.GetInfo[of].created;
EXITS
error => {
FS.Close[of ! FS.Error => CONTINUE];
FinishTrans[alpTH, FALSE, FALSE];
ERROR WalnutDefs.Error[$root, reason, rootLog.name];
};
END;
CheckStamp[rootFileStamp];
};
rootOpenOptions: AlpineFS.FileOptions ← [
updateCreateTime: FALSE,
referencePattern: sequential,
recoveryOption: $log,
finishTransOnClose: FALSE];
CloseInternalLog:
PROC[ili: InternalLogInfo] = {
IF ili = NIL THEN RETURN;
IF ili.writeStream #
NIL
THEN {
ili.writeStream.Close[ ! FS.Error, IO.Error => CONTINUE];
ili.writeStream ← NIL;
};
IF ili.readStream #
NIL
THEN {
ili.readStream.Close[ ! FS.Error, IO.Error => CONTINUE];
ili.readStream ← NIL;
};
};
CloseMiscLog:
PROC[mli: MiscLogInfo, abortIfExternal:
BOOL ←
FALSE] = {
if abortIfExternal and rootOpenFile#nullOpenFile then abort
strm: STREAM;
IF mli = NIL THEN RETURN;
strm ← mli.stream;
mli.stream ← NIL;
IF mli.rootOpenFile #
NIL
THEN {
-- is external
of: FS.OpenFile ← mli.rootOpenFile;
mli.rootOpenFile ← FS.nullOpenFile;
IF strm # NIL THEN strm.Close[! IO.Error, FS.Error => CONTINUE];
IF mli.trans #
NIL
THEN
FinishTrans[alpTH: mli.trans, abort: abortIfExternal, continue: FALSE !
FS.Error => CONTINUE];
FS.Close[of ! FS.Error => CONTINUE];
}
ELSE
IF strm # NIL THEN strm.Close[ ! FS.Error, IO.Error => CONTINUE];
};
OpenTempStream:
PROC[which: WhichTempLog]
RETURNS[mli: MiscLogInfo] = {
SELECT which
FROM
newMail => mli ← rootLog.newMailLog;
readArchive => mli ← rootLog.readArchiveLog;
ENDCASE => ERROR WalnutDefs.Error[$log, $UnknownLogFileType];
IF mli.stream #
NIL
THEN {
mli.stream.Close[ ! IO.Error, FS.Error => CONTINUE];
mli.stream ← NIL;
};
mli.stream ← StreamOpen[name: mli.name, trans: rootLog.alpTH, exclusive: FALSE, closeFSOpenFile: FALSE];
mli.trans ← rootLog.alpTH
};
CheckRootStamp:
PROC[] =
{ CheckStamp[GetRootFileStamp[]] };
CheckStamp:
PROC[created:
GMT] = {
IF created # rootLog.createDate
THEN
ERROR WalnutDefs.Error[$root, $WrongCreateDate,
IO.PutFR[" Create date is : %g, Create date expected is: %g",
IO.time[created], IO.time[rootLog.createDate]]];
};
NextRootEntry:
PROC[strm:
STREAM]
RETURNS[rte: WalnutKernelDefs.RootEntry] = {
[]← strm.SkipWhitespace[flushComments: TRUE];
rte.ident ← strm.GetTokenRope[].token;
IF rte.ident.Equal["End", FALSE] THEN RETURN;
[]← strm.SkipWhitespace[flushComments: TRUE];
IF rte.ident.Equal["LogInfo",
FALSE]
THEN {
lif: WalnutKernelDefs.LogInfoFromRoot;
lif.fileName ← strm.GetTokenRope[IO.IDProc].token;
[]← strm.SkipWhitespace[flushComments: TRUE];
lif.internalFileID ← strm.GetInt[];
[]← strm.SkipWhitespace[flushComments: TRUE];
lif.seqNoPos ← strm.GetIndex[];
lif.logSeqNo ← strm.GetTokenRope[].token;
[]← strm.SkipWhitespace[flushComments: TRUE];
rte.info ← lif;
}
ELSE
IF rte.ident.Equal["Key", FALSE] THEN rte.value ← strm.GetLineRope[]
ELSE {
rte.value ← strm.GetTokenRope[IO.IDProc].token;
[]← strm.SkipWhitespace[flushComments: TRUE];
};
RETURN[rte];
};
FileStoreForRoot:
PROC[rootName:
ROPE]
RETURNS[fileStore:
ROPE] = {
cp: FS.ComponentPositions ← FS.ExpandName[rootName].cp;
IF cp.server.length = 0
THEN
WalnutDefs.Error[$root, $InvalidName, rootName];
RETURN[rootName.Substr[cp.server.start, cp.server.length]];
};
has special options (see below)
StreamOpen:
PROC[name:
ROPE, readOnly:
BOOL←
FALSE, pages:
INT ← 200,
trans: AlpTransaction.Handle←
NIL, exclusive:
BOOL ←
FALSE, closeFSOpenFile:
BOOL ←
TRUE]
RETURNS [strm:
STREAM] = {
openFile: AlpineFS.OpenFile ← [];
IF readOnly
THEN {
openFile ← AlpineFS.Open[
name: name, options: alpineFileOptions, lock: [$read, $fail], transHandle: trans];
strm ← AlpineFS.StreamFromOpenFile[openFile: openFile, streamOptions: alpineStreamOptions, streamBufferParms: streamBufferOption];
}
ELSE {
actualPages: INT;
openFile ← AlpineFS.Open[
name: name,
access: $write,
lock: IF exclusive THEN [$write, $fail] ELSE [$read, $wait],
options: alpineFileOptions,
transHandle: trans];
actualPages ← FS.GetInfo[openFile].pages;
IF pages > actualPages THEN FS.SetPageCount[openFile, pages];
If the file is going to be unlocked, then this option must be false
alpineStreamOptions.closeFSOpenFileOnClose ← closeFSOpenFile;
strm ← AlpineFS.StreamFromOpenFile[openFile: openFile, accessRights: $write, initialPosition: $start, streamOptions: alpineStreamOptions,
streamBufferParms: streamBufferOption];
};
};
alpineFileOptions: AlpineFS.FileOptions ← [
updateCreateTime: FALSE,
referencePattern: sequential,
recoveryOption: $log,
finishTransOnClose: FALSE
];
streamBufferOption: FS.StreamBufferParms = [vmPagesPerBuffer: 4, nBuffers: 4];
nFreeTuples: NAT ← 256;
DeclareDBSegment:
PROC[rootLog: RootLog, newSegmentOk:
BOOL] = {
protectionViolation: BOOL ← FALSE;
DB.DeclareSegment[
filePath: rootLog.dbNameFromRootFile,
segment: rootLog.segmentID.segment,
number: rootLog.segmentID.index,
readonly: rootLog.readOnly,
createIfNotFound: newSegmentOk, nPagesInitial: 256, nPagesPerExtent: 256 ];
};
OpenDBTrans:
PROC[rootLog: RootLog] = {
inaccessible: BOOL ← FALSE;
reason: ROPE ← "unknown";
DB.OpenTransaction[segment: rootLog.segmentID.segment, useTrans: rootLog.alpTH !
DB.Aborted => GOTO abort;
DBEnvironment.Error => {
IF code = TransactionAlreadyOpen THEN CONTINUE;
inaccessible ← TRUE;
SELECT code
FROM
ProtectionViolation => reason ← "database is read only";
DirectoryNotFound => reason ← "directory not found";
FileNotFound => reason ← "file not found";
ServerNotFound => reason ← "server not found";
QuotaExceeded => reason ← "quota exceeded";
ENDCASE => REJECT;
CONTINUE;
};
DBEnvironment.Failure => {
inaccessible ← TRUE;
SELECT what
FROM
$communication => reason ← "cannot connect to server";
ENDCASE => REJECT;
CONTINUE;
};
];
IF inaccessible THEN ERROR WalnutDefs.Error[$db, $DatabaseInaccessible, reason];
EXITS
abort => ERROR WalnutDefs.Error[$db, $TransactionAbort, "During open database"];
};
DB.Initialize[nFreeTuples: nFreeTuples];