LoganBerryImpl.mesa (Cedar10 version)
Copyright Ó 1985, 1992 by Xerox Corporation. All rights reserved.
Doug Terry, June 17, 1992 3:04 pm PDT
Swinehart, May 17, 1991 8:38 am PDT
Polle Zellweger (PTZ) February 23, 1988 4:11:39 pm PST
Brian Oki, June 25, 1990 5:12 pm PDT
Willie-s, April 23, 1992 5:22 pm PDT
LoganBerry is a simple facility that allows various types of data to be stored persistently. Data is stored in one or more log files and indexed using stable btrees. A database survives processor crashes, but the data management facility does not provide atomic transactions. Only very simple queries are supported: retrieval by key or key subrange. Databases may be shared by multiple clients and accessed remotely via RPC.
DIRECTORY
Atom USING [GetPName, MakeAtom, GetProp, PutProp],
BasicTime USING [GMT, Period],
BTree USING [Error, UpdateInProgress, GetState],
BTreeSimple USING [CacheSize, DeleteKey, EnumerateRecords, --GetState,--
InternalKey, New, NewPathStk, Open, PageSize, PathStk, ReadRecord, --ReadValue,-- Relation, SetState, SetUpdateInProgress, Tree, UpdateRecord, Value, ValueObject],
Convert USING [IntFromRope, RopeFromInt],
DFUtilities USING [--DirectoryItem,-- FileItem, CommentItem, ParseFromStream, ProcessItemProc, RemoveVersionNumber, SyntaxError],
IO USING [Backup, BreakProc, Close, EndOf, EndOfStream, Error, Flush, GetChar, --GetID,-- GetIndex, GetInt, GetLength, GetLineRope, GetRopeLiteral, GetToken, GetTokenRope, IDProc, int, PeekChar, PutChar, PutF1, PutFR1, PutRope, RIS, SetIndex, SetLength, STREAM],
LoganBerryClass USING [Class, ClassObject, Register],
PFS USING [AbsoluteName, AccessOptions, Error, FileInfo, GetWDir, PATH, PathFromRope, RopeFromPath, StreamBufferParms, StreamOpen, UniqueID],
PFSNames USING [IsAbsolute],
RefID USING [Release, Reseal, Seal, Unseal],
RefTab USING [Create, EachPairAction, Fetch, Insert, Pairs, Ref],
RefText USING [Equal, line, ObtainScratch, ReleaseScratch, TrustTextAsRope],
Rope USING [Cat, Compare, Concat, Equal, Find, Index, Length, Map, ROPE, SkipTo, Substr],
SymTab USING [Create, EachPairAction, Fetch, Insert, Pairs, Ref],
LoganBerryBackdoor,
LoganBerry,
LoganBerryStructure;
LoganBerryImpl: CEDAR MONITOR LOCKS dbinfo USING dbinfo: OpenDBInfo
IMPORTS Atom, BasicTime, BTree, BTreeSimple, Convert, DFUtilities, IO, LoganBerry, LoganBerryClass, PFS, PFSNames, RefText, Rope, RefTab, RefID, SymTab
EXPORTS LoganBerryBackdoor
~ BEGIN
OPEN LoganBerry, LoganBerryStructure;
ROPE: TYPE = Rope.ROPE;
STREAM: TYPE = IO.STREAM;
bytesPerBuffer: INT ¬ 4096; -- PFS-dependent value!
maxAttributeLen: INT ¬ 200; -- Guess, which has to be conservative!
Performance tuning
From BTreeDoc.tioga:
The size of the cache relative to the size of the BTree can have a substantial effect on performance, as the results below show. Additionally, it should be noted that for a large BTree it is desirable to have a large page size; given a reasonably large amount of space for a cache, it is better to have a modest number of relatively large pages than a larger number of small pages. This is because the main cost of performing an operation on a large BTree stored on a disk is disk access time (as opposed to transfer time); and larger pages mean more entries per page, a shallower tree, and fewer accesses per operation.
The following variables affect the performance of index operations:
btreeCacheSize: BTreeSimple.CacheSize ¬ 300; -- default value is 25
the number of logical BTree pages kept in BTreeVM's cache [8..4000].
btreePageSize: BTreeSimple.PageSize ¬ 2048; -- number of words per physical page on a Sun
The following variables affect the performance of log operations:
[vmPagesPerBuffer: INT [1 .. 128], nBuffers: INT [1 .. 64]]
logBufferSize: PFS.StreamBufferParms ¬ [bytesPerBuffer: 4*1024, nBuffers: 16]; -- default is [4*1024, 2]
used for normal log operations that randomly read entries from the log;
scanBufferSize: PFS.StreamBufferParms ¬ [bytesPerBuffer: 8*1024, nBuffers: 2]; -- default is [4*1024, 2]
should be used when a log is being read from start to end (e.g. when building indices).
Exported RPC operations
These routines export to the LoganBerry interface. All of the operations (except EnumerateEntries) can be invoked via RPC. They take a RPC.Conversation as the first argument so that secure remote procedure calls can be made to a LoganBerry database server.
Open: --PUBLIC-- PROC [conv: Conv ¬ NIL, dbName: ROPE] RETURNS [db: OpenDB] ~ {
Initiates database activity and checks for consistency. This can be called any number of times to get a new OpenDB handle or reopen a database that has been closed.
dbinfo: OpenDBInfo ¬ OpenI[dbName];
IF NOT dbinfo.remoteAccess THEN ERROR Error[$DBNotAvailable, dbinfo.statusMsg];
db ¬ RefID.Seal[dbinfo];
};
ReadEntry: --PUBLIC-- PROC [conv: Conv ¬ NIL, db: OpenDB, key: AttributeType, value: AttributeValue] RETURNS [entry: Entry, others: BOOLEAN] ~ {
Returns an entry that contains the given attribute value for the given key. If the key refers to the primary index then the unique entry is returned and `others' is FALSE. If the key is secondary and several entries exist with the same value for the key, then an arbitrary entry is returned and `others' is set to TRUE; use EnumerateEntries to get all of the matching entries.
[entry, others] ¬ ReadEntryI[GetInfo[db], key, value];
};
EnumerateEntries: --PUBLIC-- PROC [db: OpenDB, key: AttributeType, start: AttributeValue ¬ NIL, end: AttributeValue ¬ NIL, proc: EntryProc] RETURNS [] ~ {
Calls `proc' for each entry in the specified range of key values. The enumeration is halted when either the range of entries is exhausted or `proc' returns FALSE. A NIL value for `start' represents the least attribute value, while a NIL value for `end' represents the largest attribute value. Thus, the complete database can be enumerated by specifying start=NIL and end=NIL with `key' equal to the primary key.
EnumerateEntriesI[GetInfo[db], key, start, end, proc];
};
GenerateEntries: --PUBLIC-- PROC [conv: Conv ¬ NIL, db: OpenDB, key: AttributeType, start: AttributeValue ¬ NIL, end: AttributeValue ¬ NIL] RETURNS [cursor: Cursor] ~ {
Similar to EnumerateEntries, but creates a cursor so that entries in the specified range of key values can be retrieved using NextEntry (defined below). Initially, the cursor points to the start of the sequence. A NIL value for `start' represents the least attribute value, while a NIL value for `end' represents the largest attribute value. Thus, the complete database can be enumerated by specifying start=NIL and end=NIL with `key' equal to the primary key.
newCursor: CursorInfo ¬ GenerateEntriesI[GetInfo[db], key, start, end];
cursor ¬ RefID.Seal[newCursor];
};
NextEntry: --PUBLIC-- PROC [conv: Conv ¬ NIL, cursor: Cursor, dir: CursorDirection ¬ increasing] RETURNS [entry: Entry] ~ {
Retrieves the next entry relative to the given cursor. The cursor, and thus the sequence of desired entries, must have been previously created by a call to GenerateEntries. The cursor is automatically updated so that NextEntry may be repeatedly called to enumerate entries. NIL is returned if the cursor is at the end of the sequence and dir=increasing or at the start of the sequence and dir=decreasing.
c: CursorInfo;
WITH RefID.Unseal[cursor] SELECT FROM
ci: CursorInfo => c ¬ ci;
ENDCASE => c ¬ NIL;
IF c = NIL THEN ERROR Error[$BadCursor, "Invalid cursor passed to NextEntry."];
IF NOT c.dbinfo.remoteAccess THEN ERROR Error[$DBNotAvailable, c.dbinfo.statusMsg];
entry ¬ NextEntryI[c.dbinfo, c, dir];
};
EndGenerate: --PUBLIC-- PROC [conv: Conv ¬ NIL, cursor: Cursor] RETURNS [] ~ {
Releases the cursor; no further operations may be performed using the given cursor. This must be called once for every call to GenerateEntries.
[] ¬ RefID.Release[cursor];
};
WriteEntry: --PUBLIC-- PROC [conv: Conv ¬ NIL, db: OpenDB, entry: Entry, log: LogID ¬ activityLog, replace: BOOLEAN ¬ FALSE] RETURNS [] ~ {
Adds a new entry to the database. The entry is added to the activity log unless another log is explicitly requested. The entry must have an attribute for the primary key. The primary attribute value must be unique throughout the database unless replace=TRUE; in this case, an existing entry with the same primary attribute value is atomically replaced with the new entry.
WriteEntryI[GetInfo[db], entry, log, replace];
};
DeleteEntry: --PUBLIC-- PROC [conv: Conv ¬ NIL, db: OpenDB, key: AttributeType, value: AttributeValue] RETURNS [deleted: Entry] ~ {
Deletes the entry that contains the given attribute value for the given key. If the key is for a secondary index and the value is not unique then an Error[$ValueNotUnique] is raised. Deletes are actually performed by logging the delete request (so that the log can be replayed at a later time) and then updating all of the indices.
deleted ¬ DeleteEntryI[GetInfo[db], key, value];
};
Close: --PUBLIC-- PROC [conv: Conv ¬ NIL, db: OpenDB] RETURNS [] ~ {
Terminates database activity and closes all log and index files. Databases are always maintained consistently on disk so it doesn't hurt to leave a database open. The main reason for calling Close is to release the FS locks on the log files so they can be manually edited.
CloseI[GetInfo[db]];
};
BuildIndices: --PUBLIC-- PROC [conv: Conv ¬ NIL, db: OpenDB] RETURNS [] ~ {
Rebuilds the indices by scanning the logs and performing WriteEntry or DeleteEntry operations.
BuildIndicesI[GetInfo[db]];
};
CompactLogs: --PUBLIC-- PROC [conv: Conv ¬ NIL, db: OpenDB] RETURNS [] ~ {
Removes deleted entries from the logs by enumerating the primary index and writing new logs.
CompactLogsI[GetInfo[db]];
};
Describe: --PUBLIC-- PROC [conv: Conv ¬ NIL, db: OpenDB] RETURNS [info: SchemaInfo] ~ {
Returns schema information about the database.
info ¬ DescribeI[GetInfo[db]];
};
IsLocal: --PUBLIC-- PROC[db: LoganBerry.OpenDB] RETURNS[BOOL ¬ TRUE] = {
[]¬GetInfo[db]; -- Error if the db isn't valid
};
StartTransaction: --PUBLIC-- PROC [db: LoganBerry.OpenDB, wantAtomic: BOOLEAN ¬ FALSE] = {
Tells LoganBerry that a sequence of update operations, i.e. a transaction, is being performed. If wantAtomic=TRUE then all operations until the next EndTransaction operation should be executed as an atomic transaction. If wantAtomic=FALSE then the outcome of subsequent operations is undefined if a crash occurs before the EndTransaction operation; this can significantly improve the performance for a sequence of update operations. This should only be called if the database is being accessed by a single client.
StartTransactionI[GetInfo[db], wantAtomic];
};
EndTransaction: --PUBLIC-- PROC [db: LoganBerry.OpenDB, commit: BOOLEAN ¬ TRUE] RETURNS [committed: BOOLEAN] = {
Completes a sequence of update operations. If commit=TRUE then all operations since the last StartTransaction operation are completed; otherwise some or all may be aborted. Returns TRUE if all operations are successfully committed.
committed ¬ EndTransactionI[GetInfo[db], commit];
};
FlushDBCache: --PUBLIC-- PROC [db: LoganBerry.OpenDB ¬ LoganBerry.nullDB] = {
Causes LoganBerry to flush all cached information about the specific database, or about all databases if db=nullDB. This may be necessary if the schema has changed, for instance.
FlushOpenTable[IF db # nullDB THEN GetInfo[db] ELSE NIL];
};
GetInfo: PROC [db: OpenDB] RETURNS [OpenDBInfo] ~ INLINE {
Unseals the database handle, ensures that it's valid, and checks if remote access is allowed.
ref: REF = RefID.Unseal[db];
IF ref = NIL THEN
ERROR Error[$BadDBHandle, "NIL OpenDB handle."];
WITH ref SELECT FROM
dbinfo: OpenDBInfo => {
IF NOT dbinfo.remoteAccess THEN ERROR Error[$DBNotAvailable, dbinfo.statusMsg];
RETURN[dbinfo];
};
ENDCASE => ERROR Error[$BadDBHandle, "Invalid OpenDB handle."];
};
Internal monitored operations
These routines export to the internal LB interface. The operations are monitored so that several processes can share a database. All of the operations (except OpenI) take a monitored OpenDBInfo handle as their first parameter.
OpenI: PUBLIC PROC [dbName: ROPE] RETURNS [dbinfo: OpenDBInfo] ~ {
Initiates database activity and checks for consistency. This can be called any number of times to get a new OpenDB handle or reopen a database that has been closed.
WARNING: SINCE THIS ROUTINE IS NOT MONITORED, RACE CONDITIONS MUST BE CAREFULLY AVOIDED. That care is exercised here.
dbinfo ¬ GetSharedDB[dbName];
IF NOT dbinfo.isOpen THEN
MonitoredOpen[dbinfo];
RETURN[dbinfo];
};
MonitoredOpen: ENTRY PROC [dbinfo: OpenDBInfo] RETURNS [] ~ {
ENABLE UNWIND => NULL;
Want at most one client to attempt to open a database. This simply graps the monitor lock and then calls an internal procedure to do the real work.
IF dbinfo.isOpen THEN RETURN; -- lost race
DoOpen[dbinfo];
};
DoOpen: INTERNAL PROC [dbinfo: OpenDBInfo] RETURNS [] ~ {
Really does the Open.
pathname: PFS.PATH;
badIndexMsg: ROPE ¬ NIL;
oldDBVersion: ROPE ¬ dbinfo.dbName;
pathname ¬ PFS.PathFromRope[DFUtilities.RemoveVersionNumber[dbinfo.dbName]];
dbinfo.dbName ¬ PFS.RopeFromPath[PFS.FileInfo[pathname ! PFS.Error => ERROR Error[$CantOpenSchema, error.explanation]].fullFName];
IF dbinfo.primaryIndex = NIL OR NOT Rope.Equal[dbinfo.dbName, oldDBVersion, FALSE] THEN
ReadSchema[dbinfo];
dbinfo.openForUpdate ¬ FALSE;
OpenLogs[dbinfo.logs, $read];
IF Recover[dbinfo] THEN
badIndexMsg ¬ "Logs recovered after apparent crash, please rebuild Indices.";
IF OpenIndices[dbinfo.indices, $read] THEN
badIndexMsg ¬ "Indices cannot be opened, please rebuild.";
dbinfo.isOpen ¬ TRUE;
IF IndicesOutOfDate[dbinfo] THEN
badIndexMsg ← "Indices are out of date, please rebuild.";
IF badIndexMsg # NIL THEN
ERROR Error[$BadIndex, badIndexMsg]; -- instead of BuildIndicesWorker[dbinfo];
};
ReadEntryI: PUBLIC ENTRY PROC [dbinfo: OpenDBInfo, key: AttributeType, value: AttributeValue] RETURNS [entry: Entry, others: BOOLEAN] ~ {
ENABLE UNWIND => NULL;
Returns an entry that contains the given attribute value for the given key. If the key refers to the primary index then the unique entry is returned and `others' is FALSE. If the key is secondary and several entries exist with the same value for the key, then an arbitrary entry is returned and `others' is set to TRUE; use EnumerateEntries to get all of the matching entries.
indexEntry: IndexEntry;
index: OpenIndexInfo;
aValue: AttributeValue;
pathSkt: BTreeSimple.PathStk ¬ BTreeSimple.NewPathStk[];
IF NOT dbinfo.isOpen THEN DoOpen[dbinfo];
Find appropriate index
index ¬ NARROW[RefTab.Fetch[dbinfo.indices, key].val];
IF index = NIL THEN
ERROR Error[$NoIndex, Rope.Cat["Index for ",Atom.GetPName[key]," does not exist."]];
[aValue, indexEntry] ¬ GetIndexEntry[index: index, value: value, pathStk: pathSkt];
IF aValue = NIL THEN RETURN[entry: NIL, others: FALSE];
entry ¬ ReadLogEntryT[dbinfo, indexEntry.log, indexEntry.firstByte];
IF index.type = primary THEN
others ¬ FALSE
ELSE {
aValue ¬ GetIndexEntry[index: index, value: aValue, valueIsKey: TRUE, relation: greater, pathStk: pathSkt].actual;
others ¬ Rope.Equal[s1: value, s2: UniqueKeyToValue[aValue], case: FALSE];
};
RETURN[entry, others];
};
SimpleEntryProc: EntryProc = { -- for debugging purposes
[entry: LoganBerry.Entry] RETURNS [continue: BOOL]
cont: BOOLEAN ¬ TRUE;
set breakpoint here to look at entry
RETURN[cont];
};
EnumerateEntriesI: PUBLIC PROC [dbinfo: OpenDBInfo, key: AttributeType, start: AttributeValue ¬ NIL, end: AttributeValue ¬ NIL, proc: EntryProc] RETURNS [] ~ {
ENABLE UNWIND => NULL;
Calls `proc' for each entry in the specified range of key values. The enumeration is halted when either the range of entries is exhausted or `proc' returns FALSE. A NIL value for `start' represents the least attribute value, while a NIL value for `end' represents the largest attribute value. Thus, the complete database can be enumerated by specifying start=NIL and end=NIL with `key' equal to the primary key.
Uses GenerateEntries so that the database is not locked during callbacks. This procedure is not monitored!
entry: Entry;
c: CursorInfo;
c ¬ GenerateEntriesI[dbinfo, key, start, end];
entry ¬ NextEntryI[c.dbinfo, c];
WHILE entry # NIL DO
IF NOT proc[entry] THEN EXIT;
entry ¬ NextEntryI[c.dbinfo, c];
ENDLOOP;
EndGenerateI[c.dbinfo, c];
};
GenerateEntriesI: PUBLIC ENTRY PROC [dbinfo: OpenDBInfo, key: AttributeType, start: AttributeValue ¬ NIL, end: AttributeValue ¬ NIL] RETURNS [cinfo: CursorInfo] ~ {
ENABLE UNWIND => NULL;
Similar to EnumerateEntries, but creates a cursor so that entries in the specified range of key values can be retrieved using NextEntry (defined below). Initially, the cursor points to the start of the sequence. A NIL value for `start' represents the least attribute value, while a NIL value for `end' represents the largest attribute value. Thus, the complete database can be enumerated by specifying start=NIL and end=NIL with `key' equal to the primary key.
newCursor: CursorInfo;
index: OpenIndexInfo;
IF NOT dbinfo.isOpen THEN DoOpen[dbinfo];
index ¬ NARROW[RefTab.Fetch[dbinfo.indices, key].val];
IF index = NIL THEN
ERROR Error[$NoIndex, Rope.Cat["Index for ",Atom.GetPName[key]," does not exist."]];
newCursor ¬ NEW[CursorRecord];
newCursor.index ¬ index;
newCursor.dbinfo ¬ dbinfo;
newCursor.key ¬ key;
newCursor.start ¬ start;
newCursor.end ¬ end;
IF newCursor.index.type = secondary THEN {
IF start # NIL THEN
newCursor.start ¬ ValueToSmallKey[start];
IF end # NIL THEN
newCursor.end ¬ ValueToLargeKey[end];
};
newCursor.pathStk ¬ BTreeSimple.NewPathStk[];
back up one so we're in a position to start (assuming dir=increasing)
newCursor.current ¬ IF start # NIL THEN GetIndexEntry[index: newCursor.index, value: newCursor.start, valueIsKey: TRUE, relation: less, pathStk: newCursor.pathStk].actual ELSE NIL;
RETURN[newCursor];
};
NextEntryI: PUBLIC ENTRY PROC [dbinfo: OpenDBInfo, cinfo: CursorInfo, dir: CursorDirection ¬ increasing] RETURNS [entry: Entry] ~ {
ENABLE UNWIND => NULL;
Retrieves the next entry relative to the given cursor. The cursor, and thus the sequence of desired entries, must have been previously created by a call to GenerateEntries. The cursor is automatically updated so that NextEntry may be repeatedly called to enumerate entries. NIL is returned if the cursor is at the end of the sequence and dir=increasing or at the start of the sequence and dir=decreasing.
Note: The end conditions may not be what is desired, since changing directions at either end causes the first element at that end to be skipped.
actualValue: AttributeValue;
indexEntry: IndexEntry;
IF NOT dbinfo.isOpen THEN DoOpen[dbinfo];
[actualValue, indexEntry] ¬ GetIndexEntry[index: cinfo.index, value: cinfo.current, valueIsKey: TRUE, relation: IF dir = increasing THEN greater ELSE less, pathStk: cinfo.pathStk];
IF (actualValue = NIL) OR (dir = increasing AND cinfo.end # NIL AND Rope.Compare[actualValue, cinfo.end, FALSE] = greater) OR (dir = decreasing AND Rope.Compare[actualValue, cinfo.start, FALSE] = less) THEN
RETURN[NIL];
entry ¬ ReadLogEntryT[dbinfo, indexEntry.log, indexEntry.firstByte];
cinfo.current ¬ actualValue;
RETURN[entry];
};
EndGenerateI: PUBLIC ENTRY PROC [dbinfo: OpenDBInfo, cinfo: CursorInfo] RETURNS [] ~ {
ENABLE UNWIND => NULL;
Releases the cursor; no further operations may be performed using the given cursor. This must be called once for every call to GenerateEntries.
NULL; -- this routine is unnecessary since cursors get garbage collected
};
WriteEntryI: PUBLIC ENTRY PROC [dbinfo: OpenDBInfo, entry: Entry, log: LogID ¬ activityLog, replace: BOOLEAN ¬ FALSE] RETURNS [] ~ {
ENABLE UNWIND => NULL;
Adds a new entry to the database. The entry is added to the activity log unless another log is explicitly requested. The entry must have an attribute for the primary key and the primary attribute value must be unique throughout the database.
value: AttributeValue;
indexData: IndexEntry ¬ [log: log, firstByte: 0];
doReplace: BOOLEAN ¬ FALSE;
replacedIndexEntry: IndexEntry;
replacedEntry: Entry ¬ NIL;
inTrans: BOOL¬FALSE;
IF NOT dbinfo.isOpen THEN DoOpen[dbinfo];
inTrans ¬ dbinfo.transStarted;
value ¬ GetAttributeValue[entry, dbinfo.primaryIndex.key];
IF value = NIL THEN
ERROR Error[$NoPrimaryKey, "Entry does not contain a primary key."];
IF dbinfo.logs[log].access = readOnly THEN
ERROR Error[$LogReadOnly, "Can't write to a read only log."];
[value, replacedIndexEntry] ¬ GetIndexEntry[index: dbinfo.primaryIndex, value: value];
IF value # NIL THEN
IF replace THEN
doReplace ¬ TRUE
ELSE
ERROR Error[$ValueNotUnique, "An existing entry already contains the primary key."];
For replacements, the entry being replaced and its replacement must reside in the same log. A replace operation is logged in a similar manner to delete operations. The entry written after the replace log entry is the replacement. Both log entries are written before any of the indices are updated. In the event of a crash, both entries must be recovered atomically.
MakeDatabaseWriteable[dbinfo];
IF doReplace THEN {
IF replacedIndexEntry.log # log THEN
ERROR Error[$InvalidReplace, "Cross-log replacements are not allowed."];
[] ¬ WriteLogEntry[dbinfo.logs[log].writeStream, LIST[[$REPLACE, value]], inTrans];
};
indexData.firstByte ¬ WriteLogEntry[dbinfo.logs[log].writeStream, entry, doReplace OR inTrans];
IF doReplace THEN {
replacedEntry ¬ GetCachedEntry[dbinfo, value];
IF replacedEntry=NIL THEN
replacedEntry ¬ ReadLogEntryT[dbinfo, log, replacedIndexEntry.firstByte];
RemoveEntryFromIndices[dbinfo.indices, replacedEntry, replacedIndexEntry];
};
AddEntryToIndices[dbinfo.indices, entry, indexData];
CompleteLogEntry[dbinfo, log];
};
DeleteEntryI: PUBLIC ENTRY PROC [dbinfo: OpenDBInfo, key: AttributeType, value: AttributeValue] RETURNS [deleted: Entry] ~ {
ENABLE UNWIND => NULL;
Deletes the entry that contains the given attribute value for the given key. If the key is for a secondary index and the value is not unique then an Error[$ValueNotUnique] is raised. Deletes are actually performed by logging the delete request (so that the log can be replayed at a later time) and then updating all of the indices.
indexEntry: IndexEntry;
index: OpenIndexInfo;
pathSkt: BTreeSimple.PathStk ¬ BTreeSimple.NewPathStk[];
avalue: AttributeValue;
delete: Entry;
entry: Entry;
IF NOT dbinfo.isOpen THEN DoOpen[dbinfo];
Find appropriate index
index ¬ NARROW[RefTab.Fetch[dbinfo.indices, key].val];
IF index = NIL THEN
ERROR Error[$NoIndex, Rope.Cat["Index for ",Atom.GetPName[key]," does not exist."]];
Locate log entry
[avalue, indexEntry] ¬ GetIndexEntry[index: index, value: value, pathStk: pathSkt];
IF avalue = NIL THEN RETURN;
IF avalue = NIL THEN {
ERROR Error[$NoSuchEntryToDelete, "Cannot delete non-existent entry"];
};
avalue ¬ GetIndexEntry[index: index, value: avalue, valueIsKey: TRUE, relation: greater, pathStk: pathSkt].actual;
IF Rope.Equal[s1: value, s2: UniqueKeyToValue[avalue], case: FALSE] THEN
ERROR Error[$ValueNotUnique, "Entry to be deleted insufficiently specified."];
IF dbinfo.logs[indexEntry.log].access = readOnly THEN
ERROR Error[$LogReadOnly, "Can't delete entries in a read only log."];
Log delete operation
entry ¬ ReadLogEntryT[dbinfo, indexEntry.log, indexEntry.firstByte];
MakeDatabaseWriteable[dbinfo];
delete ¬ LIST[[$DELETE, GetAttributeValue[entry, dbinfo.primaryIndex.key]]];
[] ¬ WriteLogEntry[dbinfo.logs[indexEntry.log].writeStream, delete, dbinfo.transStarted];
Update indices
RemoveEntryFromIndices[dbinfo.indices, entry, indexEntry];
CompleteLogEntry[dbinfo, indexEntry.log];
RETURN[entry];
};
CloseI: PUBLIC PROC [dbinfo: OpenDBInfo] RETURNS [] ~ {
DoClose[dbinfo];
};
DoClose: ENTRY PROC [dbinfo: OpenDBInfo, flush: BOOL¬FALSE] RETURNS [] ~ {
ENABLE UNWIND => NULL;
Terminates database activity and closes all log and index files. Databases are always maintained consistently on disk so it doesn't hurt to leave a database open. The main reason for calling Close is to release the FS locks on the log files so they can be manually edited.
A DB is considered flushed when its schema must be reread on the next open.
IF NOT dbinfo.isOpen THEN RETURN;
IF dbinfo.transStarted THEN []¬EndTransactionInt[dbinfo, TRUE];
DCS May 17, 1991 8:28:56 am PDT I guess TRUE is right; it's really an error to close the DB without explicitly ending the transaction!
CloseLogs[dbinfo.logs];
CloseIndices[dbinfo.indices];
dbinfo.isOpen ¬ FALSE;
IF flush THEN dbinfo.primaryIndex ¬ NIL;
};
DescribeI: PUBLIC PROC [dbinfo: OpenDBInfo] RETURNS [info: SchemaInfo] ~ {
Returns schema information about the database. This routine is not monitored since it can safely be called while other operations are in progress.
ReverseLogList: PROC [list: LIST OF LogInfo] RETURNS [LIST OF LogInfo] ~ {
new, temp: LIST OF LogInfo ¬ NIL;
UNTIL list = NIL DO
temp ¬ list; list ¬ list.rest; temp.rest ¬ new; new ¬ temp;
ENDLOOP;
RETURN[new];
};
ReverseIndexList: PROC [list: LIST OF IndexInfo] RETURNS [LIST OF IndexInfo] ~ {
new, temp: LIST OF IndexInfo ¬ NIL;
UNTIL list = NIL DO
temp ¬ list; list ¬ list.rest; temp.rest ¬ new; new ¬ temp;
ENDLOOP;
RETURN[new];
};
AddIndexInfo: RefTab.EachPairAction = {
[key: RefTab.Key, val: RefTab.Val] RETURNS [quit: BOOLEAN]
index: OpenIndexInfo ¬ NARROW[val];
IF index # dbinfo.primaryIndex THEN {
sIndex: IndexInfo ¬ NEW[IndexInfoRec ¬ [key: index.key, file: index.filename, order: index.order]];
info.indices ¬ CONS[sIndex, info.indices];
};
RETURN[quit: FALSE];
};
info ¬ NEW[SchemaInfoRec];
info.dbName ¬ dbinfo.dbName;
info.logs ¬ NIL;
FOR i: LogID IN [0..dbinfo.logs.size) DO
IF dbinfo.logs[i] # NIL THEN {
log: LogInfo ¬ NEW[LogInfoRec ¬ [id: dbinfo.logs[i].id, file: dbinfo.logs[i].filename]];
info.logs ¬ CONS[log, info.logs];
};
ENDLOOP;
info.logs ¬ ReverseLogList[info.logs];
info.indices ¬ NIL;
[] ¬ RefTab.Pairs[dbinfo.indices, AddIndexInfo];
info.indices ¬ ReverseIndexList[info.indices];
info.indices ¬ CONS[NEW[IndexInfoRec ¬ [key: dbinfo.primaryIndex.key, file: dbinfo.primaryIndex.filename, order: dbinfo.primaryIndex.order]], info.indices];
};
Maintenance operations
BuildIndicesI: PUBLIC ENTRY PROC [dbinfo: OpenDBInfo] RETURNS [] ~ {
ENABLE UNWIND => NULL;
Rebuilds the indices by scanning the logs and performing WriteEntry or DeleteEntry operations.
This routine grabs the monitor lock then calls on the internal procedure to do the real work.
IF NOT dbinfo.isOpen THEN DoOpen[dbinfo];
BuildIndicesWorker[dbinfo, TRUE];
};
BuildIndicesWorker: PROC [dbinfo: OpenDBInfo, checkDuplicates: BOOLEAN ¬ FALSE] RETURNS [] ~ {
ENABLE UNWIND => MarkUpdateOnIndices[dbinfo.indices, FALSE];
indexData: IndexEntry;
logstream: STREAM;
entry: Entry;
indexEntry: IndexEntry;
saveStreamPosition: INT;
EraseIndices[dbinfo.indices];
MarkUpdateOnIndices[dbinfo.indices, TRUE];
FOR log: LogID IN [0..dbinfo.logs.size) DO
IF dbinfo.logs[log] # NIL THEN {
indexData ¬ [log, 0];
logstream ¬ dbinfo.logs[log].readStream;
FOR entry ¬ ReadLogEntryT[dbinfo, log, 0], ReadLogEntryT[dbinfo, log, -1]
UNTIL entry = NIL DO
SELECT entry.first.type FROM
$DELETED, $REPLACED => { -- DeleteEntry (old logging scheme)
saveStreamPosition ¬ IO.GetIndex[logstream]; -- before the following read changes the index
indexEntry ¬ [log: log, firstByte: Convert.IntFromRope[entry.first.value]];
entry ¬ ReadLogEntryT[dbinfo, indexEntry.log, indexEntry.firstByte];
RemoveEntryFromIndices[dbinfo.indices, entry, indexEntry];
IO.SetIndex[logstream, saveStreamPosition];
};
$DELETE, $REPLACE => { -- DeleteEntry (new logging scheme)
saveStreamPosition ¬ IO.GetIndex[logstream]; -- before the following read changes the index
indexEntry ¬ GetIndexEntry[index: dbinfo.primaryIndex, value: entry.first.value].data;
entry ¬ ReadLogEntryT[dbinfo, indexEntry.log, indexEntry.firstByte];
RemoveEntryFromIndices[dbinfo.indices, entry, indexEntry];
IO.SetIndex[logstream, saveStreamPosition];
};
ENDCASE => { -- WriteEntry
IF checkDuplicates THEN {
value: AttributeValue ¬ GetAttributeValue[entry, dbinfo.primaryIndex.key];
IF value = NIL THEN {
ERROR Error[$NoPrimaryKey, IO.PutFR1["Entry at position %g in log does not contain a primary key.", IO.int[indexData.firstByte]]];
};
value ¬ GetIndexEntry[index: dbinfo.primaryIndex, value: value].actual;
IF value # NIL THEN {
ERROR Error[$ValueNotUnique, IO.PutFR1["Entry at position %g in log contains a duplicate primary key.", IO.int[indexData.firstByte]]];
};
};
AddEntryToIndices[dbinfo.indices, entry, indexData];
};
IF IO.EndOf[logstream] OR NOT IsEndOfEntry[IO.GetChar[logstream]] THEN {
ERROR Error[$BadLogEntry, "Missing terminator on log entry."];
};
IF IO.EndOf[logstream] THEN {
ERROR Error[$BadLogEntry, "At end of log. Missing terminator on log entry."];
};
IF NOT IsEndOfEntry[IO.GetChar[logstream]] THEN {
ERROR Error[$BadLogEntry, IO.PutFR1["Missing terminator on log entry at position %g in log.", IO.int[IO.GetIndex[logstream]]]];
};
indexData.firstByte ¬ IO.GetIndex[logstream];
ENDLOOP;
};
ENDLOOP;
MarkUpdateOnIndices[dbinfo.indices, FALSE];
};
CompactLogsI: PUBLIC ENTRY PROC [dbinfo: OpenDBInfo] RETURNS [] ~ {
ENABLE { UNWIND => NULL; PFS.Error => ERROR Error[$InternalError, error.explanation]; };
Removes deleted entries from the logs by enumerating the primary index and writing new logs.
firstWritten: BOOL¬TRUE;
KeepEntry: PROC [key: BTreeSimple.InternalKey, value: BTreeSimple.Value] RETURNS [continue: BOOLEAN] = {
indexEntry: IndexEntry;
entry: Entry;
TRUSTED {
indexEntry ¬ LOOPHOLE[@value[0], IndexPtr]­;
};
Given the index pointer to the old log, read the entry at that point, write the entry to the new log, and mark the update as complete
IF dbinfo.logs[indexEntry.log].access # readOnly THEN {
entry ¬ ReadLogEntryT[dbinfo, indexEntry.log, indexEntry.firstByte];
[] ¬ WriteLogEntry[newlogs[indexEntry.log].writeStream, entry, firstWritten];
MarkUpdateComplete[newlogs[indexEntry.log].writeStream];
};
RETURN[continue: TRUE];
};
newlogs: LogSet;
pathName: PFS.PATH;
IF NOT dbinfo.isOpen THEN DoOpen[dbinfo];
newlogs ¬ NEW[LogSetRecord[dbinfo.logs.size]];
FOR i: LogID IN [0..dbinfo.logs.size) DO
IF dbinfo.logs[i] # NIL AND dbinfo.logs[i].access # readOnly THEN {
newlogs[i] ¬ NEW[OpenLogInfoRecord];
pathName ¬ PFS.PathFromRope[dbinfo.logs[i].filename];
newlogs[i].writeStream ¬ PFS.StreamOpen[fileName: pathName, accessOptions: $create, streamBufferParms: logBufferSize !
PFS.Error => ERROR Error[$CantOpenLog, error.explanation]];
};
ENDLOOP;
[] ¬ BTreeSimple.EnumerateRecords[tree: dbinfo.primaryIndex.btree, key: NIL, Proc: KeepEntry !
BTree.Error => ERROR Error[$BadIndex, "Problem enumerating primary index."]];
FOR i: LogID IN [0..dbinfo.logs.size) DO
IF newlogs[i] # NIL THEN {
IF dbinfo.logs[i].access # readOnly THEN MarkUpdateComplete[newlogs[i].writeStream];
IO.Close[newlogs[i].writeStream];
IO.Close[dbinfo.logs[i].readStream];
Added !H because PFS expects it
pathName ¬ PFS.PathFromRope[Rope.Concat[dbinfo.logs[i].filename, "!H"]];
dbinfo.logs[i].writeStream ¬ PFS.StreamOpen[fileName: pathName, accessOptions: $write, streamBufferParms: logBufferSize !
PFS.Error => ERROR Error[$CantOpenLog, error.explanation]];
dbinfo.logs[i].readStream ¬ PFS.StreamOpen[fileName: pathName, accessOptions: $read, streamBufferParms: logBufferSize !
PFS.Error => ERROR Error[$CantOpenLog, error.explanation]];
};
ENDLOOP;
BuildIndicesWorker[dbinfo];
};
SetRemoteAccessI: PUBLIC ENTRY PROC [dbinfo: OpenDBInfo, accessible: BOOLEAN ¬ TRUE, why: ROPE ¬ NIL] RETURNS [] ~ {
ENABLE UNWIND => NULL;
Exports to LB.
Controls access by remote clients to the database. If accessible=FALSE then the database can not be accessed through the normal LoganBerry interface, only via this interface; setting accessible=TRUE makes the database commonly available (the normal case). This operation is intended for use by administrators who wish to remove temporarily a database from service.
dbinfo.remoteAccess ¬ accessible;
dbinfo.statusMsg ¬ why;
};
Transaction operations
This LoganBerry implementation does not support atomic transactions! The following two procedures are hooks so that transactional support can be added later. Currently, StartTransaction indicates that a sequence of operations is being performed and it is okay to leave the database in an inconsistent state during the sequence. All subsequent operations on the database up until an EndTransaction are considered part of the "transaction". Using transactions in this way can have significant performance benefits for clients that are performing a sequence of updates since each update does not have to be commited to disk immediately. However, until real atomic transactions are implemented, clients must be willing to recover the database in case of inopportune crashes.

WARNING: These operations should not be used on databases that are shared by multiple clients/processes since the transaction includes all operations not just those performed by a particular client. This is particularly true since file flushing is inhibited while a transaction is open, thus widening the window of exposure.
StartTransactionI: ENTRY PROC [dbinfo: OpenDBInfo, wantAtomic: BOOLEAN ¬ FALSE] = {
Currently, this simply tries to avoid flushing the indices on every update. It could also avoid flushing log writes, but we won't for now.
ENABLE UNWIND => NULL;
logStream: IO.STREAM;
logs: LogSet;
streamIndex: LogAddress;
IF wantAtomic THEN
ERROR Error[$NotImplemented, "Atomic operations are not currently provided."];
DCS, May 17, 1991; added to permit flush suppression during open transactions.
IF dbinfo.transStarted THEN RETURN;
MakeDatabaseWriteable[dbinfo];
dbinfo.transStarted ¬ TRUE;
logs ¬ dbinfo.logs;
FOR i: LogID IN [0..logs.size) DO
Back up each write stream over the endofupdate character once, in preparation for entry writes. While the transaction is open, ordinary updates will neither write the endofupdate character nor try to back up over it.
IF logs[i] = NIL OR logs[i].writeStream=NIL OR logs[i].access # readWrite THEN LOOP;
streamIndex ¬ IO.GetIndex[logs[i].writeStream];
IF streamIndex > 0 THEN IO.SetIndex[logs[i].writeStream, streamIndex-1];
ENDLOOP;
MarkUpdateOnIndices[dbinfo.indices, TRUE];
};
EndTransactionI: ENTRY PROC [dbinfo: OpenDBInfo, commit: BOOLEAN ¬ TRUE] RETURNS [committed: BOOLEAN] = {
ENABLE UNWIND => NULL;
RETURN[EndTransactionInt[dbinfo, commit]];
};
EndTransactionInt: INTERNAL PROC [dbinfo: OpenDBInfo, commit: BOOLEAN ¬ TRUE] RETURNS [committed: BOOLEAN¬FALSE] = {
IF NOT dbinfo.transStarted THEN RETURN;
MarkUpdateOnIndices[dbinfo.indices, FALSE];
dbinfo.transStarted ¬ FALSE;
FOR i: LogID IN [0..dbinfo.logs.size) DO
Flush and record end markers for each write stream; neither has been maintained while the transaction was open.
CompleteLogEntry[dbinfo, i];
ENDLOOP;
RETURN[TRUE];
};
Database schema
A schema is created by clients of this package and stored in a DF file that can be used to backup the database. Lines of the file starting with "-->" are deemed to contain schema information for the file named in the subsequent line of the DF file or at the end of the same line. The two types of schema entries are as follows:
--> log <logID> <readOnly or readWrite> <optional CR> <filename>
--> index <key> <primary or secondary> <optional CR> <filename>
SchemaChars: PUBLIC REF READONLY TEXT = "-->";
ParseSchemaLine: PROC [s: STREAM, buffer: REF TEXT, wdir: ROPE] RETURNS [item: REF ANY, needFilename: BOOLEAN ¬ FALSE] ~ {
Takes an input stream and parses one schema entry from it. The item returned is of type OpenLogInfo or OpenIndexInfo; needFilename=TRUE indicates that no file name existed in the input stream (it must be on the next line of the schema file).
ENABLE {
IO.Error => ERROR Error[$BadSchema, "Error parsing schema statement."];
IO.EndOfStream => ERROR Error[$BadSchema, "Unexpected end of stream."] };
token: REF TEXT;
log: OpenLogInfo;
index: OpenIndexInfo;
needFilename ¬ FALSE;
token ¬ IO.GetToken[s, IO.IDProc, buffer].token;
IF NOT RefText.Equal[token, SchemaChars] THEN RETURN[NIL, FALSE];
token ¬ IO.GetToken[s, IO.IDProc, buffer].token;
SELECT TRUE FROM
RefText.Equal[token, "log", FALSE] => {
log ¬ NEW[OpenLogInfoRecord];
log.id ¬ IO.GetInt[s];
token ¬ IO.GetToken[s, IO.IDProc, buffer].token;
log.access ¬ IF RefText.Equal[token, "ReadOnly", FALSE] THEN readOnly ELSE readWrite;
log.filename ¬ IO.GetTokenRope[s, IO.IDProc ! IO.EndOfStream => {needFilename ¬ TRUE; CONTINUE}].token;
IF NOT needFilename THEN {
pathName, wdirPathName: PFS.PATH;
pathName ¬ PFS.PathFromRope[log.filename];
wdirPathName ¬ PFS.PathFromRope[wdir];
log.filename ¬ PFS.RopeFromPath[PFS.AbsoluteName[pathName, wdirPathName]];
<<log.filename ← FS.ExpandName[log.filename, wdir].fullFName; note different field>>
};
item ¬ log;
};
RefText.Equal[token, "index", FALSE] => {
index ¬ NEW[OpenIndexInfoRecord];
item ¬ index;
index.key ¬ Atom.MakeAtom[IO.GetRopeLiteral[s]];
token ¬ IO.GetToken[s, IO.IDProc, buffer].token;
index.type ¬ IF RefText.Equal[token, "primary", FALSE] THEN primary ELSE secondary;
index.filename ¬ IO.GetTokenRope[s, IO.IDProc ! IO.EndOfStream => {needFilename ¬ TRUE; CONTINUE}].token;
IF NOT needFilename THEN {
pathName, wdirPathName: PFS.PATH;
pathName ¬ PFS.PathFromRope[index.filename];
wdirPathName ¬ PFS.PathFromRope[wdir];
index.filename ¬ PFS.RopeFromPath[PFS.AbsoluteName[pathName, wdirPathName]];
<<index.filename ← FS.ExpandName[index.filename, wdir].fullFName; again note different field selector name>>
};
item ¬ index;
};
ENDCASE => ERROR Error[$BadSchema, "Invalid keyword."];
};
ReadSchema: PROC [dbinfo: OpenDBInfo] RETURNS [] ~ {
Read the database schema from dbinfo.dbName.
ENABLE DFUtilities.SyntaxError => ERROR Error[$BadSchema, reason];
NewDBInfo: PROC [item: REF ANY, filename: ROPE ¬ NIL] RETURNS [] ~ {
WITH item SELECT FROM
log: OpenLogInfo => {
IF filename # NIL THEN
log.filename ¬ filename;
dbinfo.logs[log.id] ¬ log;
};
index: OpenIndexInfo => {
IF filename # NIL THEN
index.filename ¬ filename;
[] ¬ RefTab.Insert[dbinfo.indices, index.key, index];
IF index.type = primary THEN
IF dbinfo.primaryIndex = NIL THEN
dbinfo.primaryIndex ¬ index
ELSE
ERROR Error[$BadSchema, "Multiple primary indices specified."];
};
ENDCASE => ERROR Error[$InternalError];
};
SchemaItemProc: DFUtilities.ProcessItemProc = {
[item: REF ANY] RETURNS [stop: BOOLFALSE]
WITH item SELECT FROM
comment: REF DFUtilities.CommentItem => {
IF needFilename THEN
ERROR Error[$BadSchema, "No file name specified for some log or index."];
[lastSchemaItem, needFilename] ¬ ParseSchemaLine[IO.RIS[comment.text], buffer, wdir];
IF lastSchemaItem # NIL AND NOT needFilename THEN {
NewDBInfo[lastSchemaItem];
}
};
file: REF DFUtilities.FileItem => {
IF needFilename THEN {
fullPathName, wdirPathName: PFS.PATH;
pathname: PFS.PATH ¬ PFS.PathFromRope[DFUtilities.RemoveVersionNumber[file.name]];
wdirPathName ¬ PFS.PathFromRope[wdir];
fullPathName ¬ PFS.AbsoluteName[pathname, wdirPathName];
NewDBInfo[lastSchemaItem, PFS.RopeFromPath[fullPathName]];
<<NewDBInfo[lastSchemaItem, FS.ExpandName[DFUtilities.RemoveVersionNumber[file.name], wdir].fullFName]; >>
needFilename ¬ FALSE;
};
};
ENDCASE => {
IF needFilename THEN
ERROR Error[$BadSchema, "No file name specified for some log or index."];
};
};
needFilename: BOOLEAN ¬ FALSE;
lastSchemaItem: REF ANY;
name: PFS.PATH;
schemaStream: STREAM;
buffer: REF TEXT = RefText.ObtainScratch[RefText.line];
wdir: ROPE;
shortName: ROPE;
<<wdir: ROPE = UFS.ExpandName[dbinfo.dbName].cr.dir;>>
<<wdir: ROPE = Rope.Substr[dbinfo.dbName, 0, FS.ExpandName[dbinfo.dbName].cr.base.start];>>
name ¬ PFS.PathFromRope[dbinfo.dbName];
IF PFSNames.IsAbsolute[name] THEN {
prev, loc, length: INT;
ropeName: ROPE;
prev ¬ 0;
loc ¬ 0;
ropeName ¬ PFS.RopeFromPath[name]; -- convert Path to rope and manipulate ropes
length ¬ Rope.Length[ropeName];
WHILE loc < length DO
prev ¬ loc;
loc ¬ Rope.SkipTo[s: ropeName, pos: prev+1, skip: "/>]"];
ENDLOOP;
shortName ¬ Rope.Substr[base: ropeName, start: prev+1, len: length];
wdir ¬ Rope.Substr[base: ropeName, start: 0, len: prev+1];
}
ELSE wdir ¬ PFS.RopeFromPath[PFS.GetWDir[]];
schemaStream ¬ PFS.StreamOpen[PFS.AbsoluteName[name] !
PFS.Error => ERROR Error[$CantOpenSchema, error.explanation]];
dbinfo.indices ¬ RefTab.Create[];
dbinfo.primaryIndex ¬ NIL;
dbinfo.logs ¬ NEW[LogSetRecord[LAST[LogID]]];
DFUtilities.ParseFromStream[in: schemaStream, proc: SchemaItemProc, filter: [comments: TRUE]];
RefText.ReleaseScratch[buffer];
IF dbinfo.primaryIndex = NIL THEN
ERROR Error[$BadSchema, "No primary index specified."];
};
Entries
GetAttributeValue: PROC [entry: Entry, type: AttributeType]
RETURNS [AttributeValue] ~ {
FOR e: Entry ¬ entry, e.rest WHILE e # NIL DO
IF e.first.type = type THEN
RETURN[e.first.value];
ENDLOOP;
RETURN[NIL];
};
ReverseEntry: PROC [entry: Entry] RETURNS[Entry] = {
Destructively reverses the order of the entry's attributes. Taken from LispImpl.DReverse.
l1, l2, l3: Entry ¬ NIL;
IF entry = NIL THEN RETURN[NIL];
l3 ¬ entry;
UNTIL (l1 ¬ l3) = NIL DO
l3 ¬ l3.rest;
l1.rest ¬ l2;
l2 ¬ l1;
ENDLOOP;
RETURN[l2];
};
Log management
Some day these routines may be placed in a separate module.
OpenLogs: PROC [logs: LogSet, access: PFS.AccessOptions ¬ $write] RETURNS [] ~ {
FOR i: LogID IN [0..logs.size) DO
IF (logs[i] # NIL) AND (logs[i].writeStream = NIL) AND (logs[i].readStream = NIL) THEN {
name: ROPE;
name ¬logs[i].filename;
IF (logs[i].access = readOnly) OR (access = read)
THEN {
logs[i].readStream ¬ PFS.StreamOpen[fileName: PFS.PathFromRope[name], accessOptions: $read, streamBufferParms: logBufferSize !
PFS.Error => ERROR Error[$CantOpenLog, error.explanation]];
}
ELSE {
name ¬ Rope.Concat[logs[i].filename, "!H"];
logs[i].writeStream ¬ PFS.StreamOpen[fileName: PFS.PathFromRope[name], accessOptions: $write, streamBufferParms: logBufferSize !
PFS.Error => ERROR Error[$CantOpenLog, error.explanation]];
IO.SetIndex[logs[i].writeStream, IO.GetLength[logs[i].writeStream]]; -- $$PerfA$$
Make sure writeStream begins life poised at EOF -- many operations that previously did this now no longer do.
logs[i].readStream ¬ PFS.StreamOpen[fileName: PFS.PathFromRope[name], accessOptions: $read, streamBufferParms: logBufferSize !
PFS.Error => ERROR Error[$CantOpenLog, error.explanation]];
};
};
ENDLOOP;
};
IsEndOfAttribute: PROC [ch: CHAR] RETURNS [BOOLEAN] ~ {
On different machines, \n may be interpreted differently. Thus, rather than checking whether an input character is \n, we should check for \r or \l.
RETURN[(ch = '\r) OR (ch = '\l)];
};
IsEndOfEntry: PROC [ch: CHAR] RETURNS [BOOLEAN] ~ {
On different machines, \n may be interpreted differently. Thus, rather than checking whether an input character is \n, we should check for \r or \l.
RETURN[(ch = '\r) OR (ch = '\l)];
};
ReadLogEntryT: PROC [dbinfo: OpenDBInfo, log: LogID, byte: LogAddress]
RETURNS [Entry] ~ {
logStream: STREAM ¬ dbinfo.logs[log].readStream;
IF logStream = NIL THEN {ERROR Error[$CantOpenLog]};
RETURN[ReadEntryDo[dbinfo.logs[log].readStream, byte,
IF dbinfo.transStarted THEN dbinfo.logs[log].writeStream ELSE NIL]];
};
ReadLogEntry: PUBLIC PROC [logStream: STREAM, byte: LogAddress] RETURNS [Entry] ~ {
RETURN[ReadEntryDo[logStream, byte]];
};
ReadEntryDo: PROC [logStream: STREAM, byte: LogAddress, writeStream: STREAM¬NIL]
RETURNS [Entry] ~ {
writeStream is stream corresponding to logStream; used for buffer management, below
flushFirst: BOOL ¬ FALSE;
{
ENABLE {
IO.EndOfStream =>
IF writeStream=NIL THEN ERROR Error[$BadLogEntry]
ELSE { flushFirst ¬ TRUE; RETRY; }; -- See discussion below
IO.Error =>
IF writeStream=NIL THEN ERROR Error[$BadIndex]
ELSE { flushFirst ¬ TRUE; RETRY; };
};
AttributeBreakProc: IO.BreakProc = { -- [char: CHAR] RETURNS [IO.CharClass]
RETURN[SELECT char FROM ': => sepr, ENDCASE => other]
};
ReadAttribute: PROC [s: STREAM] RETURNS [a: Attribute] ~ {
char: CHAR ¬ ' ;
a.type ¬ Atom.MakeAtom[IO.GetTokenRope[s, AttributeBreakProc].token];
[] ¬ IO.GetChar[s]; -- attribute separation char
WHILE char=' OR char='\t DO char ¬ IO.GetChar[s]; ENDLOOP; -- skip whitespace
IO.Backup[s, char];
IF char = '" THEN { a.value ¬ IO.GetRopeLiteral[s]; [] ¬ IO.GetLineRope[s]; }
ELSE a.value ¬ IO.GetLineRope[s];
};
entry: Entry;
attribute: Attribute;
IF logStream = NIL THEN {ERROR Error[$CantOpenLog]};
IF flushFirst THEN { IO.Flush[writeStream]; writeStream ¬ NIL; };
DCS, May 17, 1991; A read tried to read a part of the file that was still stored in a PFS write buffer. Now stable storage common to both read and write should fully reflect all write actions.
IF byte >= 0 THEN IO.SetIndex[logStream, byte] ELSE byte ¬ IO.GetIndex[logStream];
IF IO.PeekChar[logStream] = UpdateComplete THEN RETURN[NIL]; -- at end of log
entry ¬ NIL;
read a list of attributes until endofentry
UNTIL IsEndOfEntry[IO.PeekChar[logStream]] DO
attribute ¬ ReadAttribute[logStream];
entry ¬ CONS[attribute, entry];
ENDLOOP;
the list is constructed backwards, so it is reversed before being returned
RETURN[ReverseEntry[entry]];
};
};
WriteLogEntry: PUBLIC PROC [logStream: STREAM, entry: Entry, continuation: BOOLEAN ¬ FALSE] RETURNS [LogAddress] ~ {
NeedsQuoting: PROC [rope: ROPE] RETURNS [BOOLEAN] ~ {
RETURN [Rope.SkipTo[s: rope, skip: "\l\r\\\""] < Rope.Length[rope]];
};
PutRopeLiteral: PROC [stream: IO.STREAM, rope: ROPE] ~ {
Action: PROC [c: CHAR] RETURNS [quit: BOOL ¬ FALSE] ~ {
IF c = '" OR c='\\ THEN
IO.PutChar[stream, '\\];
IF c IN [' .. '~] THEN
IO.PutChar[stream, c]
ELSE
IO.PutF1[stream, "\\%03b", [cardinal[ORD[c]]]];
};
IO.PutChar[stream, '"];
[] ¬ Rope.Map[base: rope, action: Action];
IO.PutChar[stream, '"];
};
WriteAttribute: PROC [s: STREAM, a: Attribute] RETURNS [] ~ {
IO.PutRope[s, Atom.GetPName[a.type]];
IO.PutRope[s, ": "];
IF NeedsQuoting[a.value] THEN PutRopeLiteral[s, a.value] ELSE IO.PutRope[s, a.value];
IO.PutRope[s, "\n"];
};
byteOffset: LogAddress;
IF logStream = NIL THEN {ERROR Error[$CantOpenLog];
};
byteOffset ¬ IO.GetIndex[logStream]; -- DCS index and length should be the same
byteOffset ← IO.GetLength[logStream];
IF byteOffset > 0 AND NOT continuation THEN {
byteOffset ¬ byteOffset - 1; --remove update marker from end of log
IO.SetIndex[logStream, byteOffset];
};
write a list of attributes
FOR e: Entry ¬ entry, e.rest WHILE e # NIL DO
WriteAttribute[logStream, e.first];
ENDLOOP;
IO.PutChar[logStream, EndOfEntry];
IO.Flush[logStream]; -- $$PerfC$$
RETURN[byteOffset];
};
MarkUpdateComplete: PUBLIC PROC [writeStream: STREAM] RETURNS [] ~ {
DCS It's an invariant that writeStream is always positioned at EOF.
byteOffset: LogAddress ← IO.GetLength[writeStream];
IO.SetIndex[writeStream, byteOffset];
IO.PutChar[writeStream, UpdateComplete];
IO.Flush[writeStream];
};
CompleteLogEntry: PROC [dbinfo: OpenDBInfo, index: LogID, forceFlush: BOOL¬FALSE] ~ {
DCS; MarkUpdateComplete is OK for clients that are creating logs for later use by LoganBerry, but this one takes read/write stream interaction and transaction status into account.
writeStream: STREAM;
IF dbinfo.logs[index]=NIL THEN RETURN;
writeStream ¬ dbinfo.logs[index].writeStream;
IF writeStream=NIL THEN RETURN;
IF NOT dbinfo.transStarted THEN IO.PutChar[writeStream, UpdateComplete];
IF dbinfo.transStarted AND NOT forceFlush THEN RETURN;
IO.Flush[writeStream];
IF dbinfo.logs[index].readStream#NIL THEN
IO.Flush[dbinfo.logs[index].readStream]; -- so readers see most recent changes
};
CloseLogs: PROC [logs: LogSet] RETURNS [] ~ {
FOR i: LogID IN [0..logs.size) DO
IF (logs[i] # NIL) THEN {
IF (logs[i].readStream # NIL) THEN {
IO.Flush[logs[i].readStream];
IO.Close[logs[i].readStream];
logs[i].readStream ¬ NIL
};
IF (logs[i].writeStream # NIL) THEN {
IO.Flush[logs[i].writeStream];
IO.Close[logs[i].writeStream];
logs[i].writeStream ¬ NIL;
};
};
ENDLOOP;
};
Index management
Some day these routines may be placed in a separate module.
OpenIndices: PROC [indices: IndexSet, access: PFS.AccessOptions ¬ $write] RETURNS [needToRebuild: BOOLEAN] ~ {
ENABLE { PFS.Error => ERROR Error[$CantOpenIndex, "LoganBerry doesn't work in VUX directories!"]};
OpenIndex: RefTab.EachPairAction = {
[key: RefTab.Key, val: RefTab.Val] RETURNS [quit: BOOL]
SetBackingStream: PROC [] ~ {
name: ROPE ¬ index.filename;
IF access = $read
THEN {
index.readBacking ¬ PFS.StreamOpen[fileName: PFS.PathFromRope[name], accessOptions: $read];
}
ELSE {
name ¬ Rope.Concat[name, "!H"];
index.writeBacking ¬ PFS.StreamOpen[fileName: PFS.PathFromRope[name], accessOptions: $write];
index.readBacking ¬ PFS.StreamOpen[fileName: PFS.PathFromRope[name], accessOptions: $read];
};
};
init: BOOLEAN ¬ FALSE;
index: OpenIndexInfo ¬ NARROW[val];
IF index.btree = NIL THEN {
index.btree ¬ BTreeSimple.New[];
};
IF BTree.GetState[index.btree].state # open THEN {
SetBackingStream[
! PFS.Error => { -- problem opening btree so create new one
IF error.group = $user AND error.code = $unknownFile THEN {
name: ROPE;
init ¬ TRUE;
needToRebuild ¬ TRUE;
name ¬ Rope.Concat[index.filename, "!H"];
index.writeBacking ¬ PFS.StreamOpen[fileName: PFS.PathFromRope[name], accessOptions: $write];
index.readBacking ¬ PFS.StreamOpen[fileName: PFS.PathFromRope[name], accessOptions: $read];
CONTINUE;
}
ELSE
ERROR Error[$CantOpenIndex, error.explanation];
}
];
BTreeSimple.Open[tree: index.btree, readFile: index.readBacking, writeFile: index.writeBacking, pageSize: btreePageSize, cacheSize: btreeCacheSize, initialize: init !
BTree.Error => ERROR Error[$BadIndex, Rope.Cat["Bad index for ", Atom.GetPName[NARROW[key]], "."]];
BTree.UpdateInProgress => { -- scrap btree and try again
name: ROPE;
init ¬ TRUE;
needToRebuild ¬ TRUE;
name ¬ Rope.Concat[index.filename, "!H"];
index.writeBacking ¬ PFS.StreamOpen[fileName: PFS.PathFromRope[name], accessOptions: $write];
index.readBacking ¬ PFS.StreamOpen[fileName: PFS.PathFromRope[name], accessOptions: $read];
RETRY;}
];
};
RETURN[quit: FALSE];
};
needToRebuild ¬ FALSE;
[] ¬ RefTab.Pairs[indices, OpenIndex];
RETURN[needToRebuild];
};
GetIndexEntry: PROC [index: OpenIndexInfo, value: AttributeValue, valueIsKey: BOOLEAN ¬ FALSE, relation: BTreeSimple.Relation ¬ equal, pathStk: BTreeSimple.PathStk ¬ NIL] RETURNS [actual: AttributeValue, data: IndexEntry] ~ {
Note: the AttributeValue's passed in and out may be unique keys or not.
btreeValue: BTreeSimple.Value;
useKey: AttributeValue ¬ value;
useRelation: BTreeSimple.Relation ¬ relation;
IF NOT valueIsKey AND index.type = secondary THEN
SELECT relation FROM
less =>
useKey ¬ ValueToSmallKey[value];
lessEqual =>
useKey ¬ ValueToLargeKey[value];
equal => { --start small and look upwards
useKey ¬ ValueToSmallKey[value];
useRelation ¬ greaterEqual;
};
greaterEqual =>
useKey ¬ ValueToSmallKey[value];
greater =>
useKey ¬ ValueToLargeKey[value];
ENDCASE;
[actual, btreeValue] ¬ BTreeSimple.ReadRecord[tree: index.btree, key: useKey, relation: useRelation, pathStk: pathStk, useExistingPath: pathStk # NIL !
BTree.Error => ERROR Error[$BadIndex]];
IF NOT valueIsKey AND index.type = secondary AND relation = equal AND NOT Rope.Equal[value, UniqueKeyToValue[actual], FALSE] THEN -- found entry larger than desired
actual ¬ NIL;
IF btreeValue # NIL THEN TRUSTED {
data ¬ LOOPHOLE[@btreeValue[0], IndexPtr]­;
};
RETURN[actual, data];
};
CreateIndexEntry: PROC [index: OpenIndexInfo, value: AttributeValue, data: IndexEntry] RETURNS [] ~ {
btreeValue: BTreeSimple.Value;
btreeValue ← NEW[BTreeSimple.ValueObject[SIZE[IndexEntry]]];
Old LoganBerry statement. Changed SIZE to WORDS because SIZE[T] and WORDS[T] are
different on the Sun4, but same on Dorado
btreeValue ¬ NEW[BTreeSimple.ValueObject[WORDS[IndexEntry]]];
Stuff the index entry denoted by data into btreevalue
TRUSTED {
LOOPHOLE[@btreeValue[0], IndexPtr]­ ¬ data;
};
BTreeSimple.UpdateRecord[tree: index.btree, key: IF index.type = primary THEN value ELSE ValueToUniqueKey[value, data], value: btreeValue !
BTree.Error => ERROR Error[$BadIndex]];
};
DeleteIndexEntry: PROC [index: OpenIndexInfo, value: AttributeValue, data: IndexEntry] RETURNS [] ~ {
[] ¬ BTreeSimple.DeleteKey[tree: index.btree, key: IF index.type = primary THEN value ELSE ValueToUniqueKey[value, data] !
BTree.Error => ERROR Error[$BadIndex]];
};
AddEntryToIndices: PROC [indices: IndexSet, entry: Entry, data: IndexEntry] RETURNS [] ~ {
index: OpenIndexInfo;
FOR e: Entry ¬ entry, e.rest WHILE e # NIL DO
index ¬ NARROW[RefTab.Fetch[indices, e.first.type].val];
IF index # NIL THEN { -- index key in entry so add to index
CreateIndexEntry[index, e.first.value, data];
};
ENDLOOP;
};
RemoveEntryFromIndices: PROC [indices: IndexSet, entry: Entry, data: IndexEntry] RETURNS [] ~ {
index: OpenIndexInfo;
FOR e: Entry ¬ entry, e.rest WHILE e # NIL DO
index ¬ NARROW[RefTab.Fetch[indices, e.first.type].val];
IF index # NIL THEN { -- index key in entry so remove from index
DeleteIndexEntry[index, e.first.value, data];
};
ENDLOOP;
};
EraseIndices: PROC [indices: IndexSet] RETURNS [] ~ {
EraseIndex: RefTab.EachPairAction = {
[key: RefTab.Key, val: RefTab.Val] RETURNS [quit: BOOL]
name: ROPE;
index: OpenIndexInfo ¬ NARROW[val];
IF index.btree = NIL THEN RETURN[quit: FALSE];
the following two statements may not really be needed
IF BTree.GetState[index.btree].state # closed THEN {
BTreeSimple.SetState[index.btree, closed];
IO.Close[index.readBacking];
index.readBacking ¬ NIL;
IF index.writeBacking ~= NIL THEN {
IO.Flush[index.writeBacking];
IO.Close[index.writeBacking];
index.writeBacking ¬ NIL;
};
};
name ¬ Rope.Concat[index.filename, "!H"];
index.writeBacking ¬ PFS.StreamOpen[fileName: PFS.PathFromRope[name], accessOptions: $write];
index.readBacking ¬ PFS.StreamOpen[fileName: PFS.PathFromRope[name], accessOptions: $read];
<<index.openfile ¬ GeneralFS.Create[index.filename];>>
BTreeSimple.Open[tree: index.btree, readFile: index.readBacking, writeFile: index.writeBacking, pageSize: btreePageSize, cacheSize: btreeCacheSize, initialize: TRUE];
RETURN[quit: FALSE];
};
[] ¬ RefTab.Pairs[indices, EraseIndex];
};
MarkUpdateOnIndices: PROC [indices: IndexSet, updateInProgress: BOOLEAN] RETURNS [] ~ {
MarkIndex: RefTab.EachPairAction = {
[key: RefTab.Key, val: RefTab.Val] RETURNS [quit: BOOL]
index: OpenIndexInfo ¬ NARROW[val];
IF index.btree # NIL AND BTree.GetState[index.btree].state # closed THEN {
BTreeSimple.SetUpdateInProgress[index.btree, updateInProgress];
};
RETURN[quit: FALSE];
};
[] ¬ RefTab.Pairs[indices, MarkIndex];
};
CloseIndices: PROC [indices: IndexSet] RETURNS [] ~ {
CloseIndex: RefTab.EachPairAction = {
[key: RefTab.Key, val: RefTab.Val] RETURNS [quit: BOOL]
index: OpenIndexInfo ¬ NARROW[val];
IF index.btree # NIL AND BTree.GetState[index.btree].state # closed THEN {
BTreeSimple.SetState[index.btree, closed];
IO.Flush[self: index.readBacking];
IO.Close[self: index.readBacking];
index.readBacking ¬ NIL;
IF index.writeBacking ~= NIL THEN {
IO.Flush[self: index.writeBacking];
IO.Close[self: index.writeBacking];
index.writeBacking ¬ NIL};
};
RETURN[quit: FALSE];
};
[] ¬ RefTab.Pairs[indices, CloseIndex];
};
Secondary indices: need not be unique so special btree keys must be generated. Unique keys are obtained by appending a logID/logAddress pair to an AttributeValue.
IsUniqueKey: PROC [key: ROPE] RETURNS [BOOLEAN] ~ INLINE {
RETURN[Rope.Find[key, "\000"] # -1];
};
ValueToUniqueKey: PROC [value: AttributeValue, data: IndexEntry] RETURNS [ROPE] ~ INLINE {
RETURN[Rope.Cat[value, "\000", Convert.RopeFromInt[data.log], " ", Convert.RopeFromInt[data.firstByte]]];
};
ValueToSmallKey: PROC [value: AttributeValue] RETURNS [ROPE] ~ INLINE {
RETURN[Rope.Concat[value, "\000\000"]];
};
ValueToLargeKey: PROC [value: AttributeValue] RETURNS [ROPE] ~ INLINE {
RETURN[Rope.Concat[value, "\000\377"]];
};
UniqueKeyToValue: PROC [key: ROPE] RETURNS [AttributeValue] ~ INLINE {
RETURN[Rope.Substr[key, 0, Rope.Index[key, 0, "\000"]]];
};
Crash recovery
Recover: PROC [dbinfo: OpenDBInfo] RETURNS [needToRebuild: BOOLEAN] ~ {
needToRebuild ¬ FALSE;
FOR i: LogID IN [0..dbinfo.logs.size) DO
IF (dbinfo.logs[i] # NIL) AND (dbinfo.logs[i].access # readOnly) THEN {
IF NOT InGoodState[dbinfo.logs[i].readStream] THEN {
MakeDatabaseWriteable[dbinfo];
IF RestoreLog[dbinfo.logs[i].readStream, dbinfo.logs[i].writeStream] THEN
needToRebuild ¬ TRUE;
};
};
ENDLOOP;
RETURN[needToRebuild];
};
InGoodState: PROC [logStream: STREAM] RETURNS [BOOLEAN] ~ {
Checks that the log is in a consistent state. The last byte of the log should be the UpdateComplete marker.
PROBLEM: the byte count returned by GetLength may be wrong after a crash?
length: INT;
thirdToLastByte, secondToLastByte, lastByte: CHAR;
length ¬ IO.GetLength[logStream];
SELECT length FROM
=1 => { -- empty log?
IO.SetIndex[logStream, 0];
lastByte ¬ IO.GetChar[logStream];
RETURN[lastByte = UpdateComplete];
};
< 3 => { -- log just created (or too short to be valid)
RETURN[FALSE];
};
>= 3 => { -- look for valid end of entry
IO.SetIndex[logStream, length-3];
thirdToLastByte ¬ IO.GetChar[logStream];
secondToLastByte ¬ IO.GetChar[logStream];
lastByte ¬ IO.GetChar[logStream];
RETURN[IsEndOfAttribute[thirdToLastByte]AND IsEndOfEntry[secondToLastByte] AND (lastByte = UpdateComplete)];
};
ENDCASE => ERROR Error[$InternalError, "Shouldn't get here."];
};
RestoreLog: PROC [readStream, writeStream: STREAM] RETURNS [needToRebuild: BOOLEAN] ~ {
Restores the log to a consistent state. There are two cases to consider: 1) the system was interrupted while writing an entry on the log, 2) a complete entry exists on the log but the btrees have only been partially updated.
logAddress: INT;
prevByte, byte: CHAR;
lastEntry: Entry;
logAddress ¬ IO.GetLength[readStream]-2;
IF logAddress <= 0 THEN { -- log just created (or too short to be valid)
IO.SetLength[writeStream, 0];
IO.SetIndex[writeStream, 0]; -- $$PerfA$$ total paranoia!
MarkUpdateComplete[writeStream];
RETURN[TRUE];
};
IO.SetIndex[readStream, logAddress];
prevByte ¬ IO.GetChar[readStream];
byte ¬ IO.GetChar[readStream];
IF IsEndOfEntry[byte] AND IsEndOfAttribute[prevByte] THEN {
entry complete so roll forward, but don't know state of indices so must rebuild them
needToRebuild ¬ TRUE;
}
ELSE {
entry only partially written so roll backward: scan log from end looking for EndOfEntry, then reset log length
UNTIL (IsEndOfEntry[byte] AND IsEndOfAttribute[prevByte]) OR (logAddress = 0) DO
byte ¬ prevByte;
logAddress ¬ logAddress - 1;
IO.SetIndex[readStream, logAddress];
prevByte ¬ IO.GetChar[readStream];
ENDLOOP;
IO.SetLength[writeStream, IF logAddress = 0 THEN 0 ELSE logAddress + 2];
IO.Flush[readStream]; -- to notice new file length
needToRebuild ¬ FALSE;
};
A replaced entry should never be the last entry in a log. If this is the case, then we must have crashed before writing the replacement entry. To recover, simply remove the replaced entry.
IF logAddress # 0 THEN {
Back up one more character, then continue backwards search to find last entry
byte ¬ prevByte;
logAddress ¬ logAddress - 1;
IO.SetIndex[readStream, logAddress];
prevByte ¬ IO.GetChar[readStream];
UNTIL (IsEndOfEntry[byte] AND IsEndOfAttribute[prevByte]) OR (logAddress = 0) DO
byte ¬ prevByte;
logAddress ¬ logAddress - 1;
IO.SetIndex[readStream, logAddress];
prevByte ¬ IO.GetChar[readStream];
ENDLOOP;
IF logAddress # 0 THEN
logAddress ¬ logAddress + 2;
lastEntry ¬ ReadLogEntry[readStream, logAddress];
IF lastEntry.first.type = $REPLACED OR lastEntry.first.type = $REPLACE THEN {
IO.SetLength[writeStream, logAddress];
IO.Flush[readStream]; -- to notice new file length
needToRebuild ¬ FALSE;
};
};
Log is now in good shape.
IO.SetIndex[writeStream, IO.GetLength[writeStream]]; -- $$PerfA$$ total paranoia!
MarkUpdateComplete[writeStream];
RETURN[needToRebuild];
};
IndicesOutOfDate: PROC [dbinfo: OpenDBInfo] RETURNS [BOOLEAN] ~ {
Determines if logs have been edited manually, i.e. if the indices are currently out-of-date, by checking the create date of the primary index against those of the logs.
fileDate: BasicTime.GMT;
indexDate: BasicTime.GMT;
uniqueID: PFS.UniqueID;
uniqueID ¬ PFS.FileInfo[name: PFS.PathFromRope[dbinfo.primaryIndex.filename]].uniqueID;
indexDate ¬ uniqueID.egmt.gmt;
<<indexDate: BasicTime.GMT ← FS.FileInfo[name: dbinfo.primaryIndex.filename].created;>>
FOR i: LogID IN [0..dbinfo.logs.size) DO
IF dbinfo.logs[i] # NIL THEN {
uniqueID ¬ PFS.FileInfo[name: PFS.PathFromRope[dbinfo.logs[i].filename]].uniqueID;
fileDate ¬ uniqueID.egmt.gmt;
<<fileDate ← FS.FileInfo[name: dbinfo.logs[i].filename].created;>>
IF BasicTime.Period[from: indexDate, to: fileDate] > 0 THEN RETURN[TRUE];
};
ENDLOOP;
RETURN[FALSE];
};
Overcoming file stream deficiencies
MakeDatabaseWriteable: PROC [dbinfo: OpenDBInfo] RETURNS [] ~ {
Opening a file stream with $append access on a log changes its update date, even if no writes are made to the log. To overcome this problem, all log and index files are initally opened with $read access. This routine should be called to make them writeable when necessary.
IF dbinfo.openForUpdate THEN RETURN;
CloseLogs[dbinfo.logs];
CloseIndices[dbinfo.indices];
OpenLogs[dbinfo.logs, $write];
[] ¬ OpenIndices[dbinfo.indices, $write];
dbinfo.openForUpdate ¬ TRUE;
};
Shared databases
A given database can not actually be opened more than once because of the concurrency control provided by FS. Thus, we keep a table of databases that are already open.
OpenDBTable: SymTab.Ref;
OpenDBTableSize: NAT = 2039;
2039 is a good candidate because it is prime and is large enough that collisions should be rare.
GetSharedDB: PROC [dbName: ROPE] RETURNS [dbinfo: OpenDBInfo] ~ {
Remember: this routine is not called under a monitor lock. If several clients try to create an OpenDBRecord concurrently, the first one to register with the OpenDBTable wins.
fname, key: ROPE;
fullName: PFS.PATH;
fullName ¬ PFS.FileInfo[PFS.PathFromRope[dbName] ! PFS.Error => ERROR Error[$CantOpenSchema, error.explanation]].fullFName;
fname ¬ PFS.RopeFromPath[fullName];
<<fname: ROPE ← FS.FileInfo[dbName ! FS.Error => ERROR Error[$CantOpenSchema, error.explanation]].fullFName;>>
key ¬ DFUtilities.RemoveVersionNumber[fname];
dbinfo ¬ NARROW[SymTab.Fetch[OpenDBTable, key].val];
IF dbinfo = NIL THEN {
dbinfo ¬ NEW[OpenDBRecord];
dbinfo.dbName ¬ fname;
dbinfo.isOpen ¬ FALSE;
dbinfo.remoteAccess ¬ TRUE;
dbinfo.openForUpdate ¬ FALSE;
dbinfo.transStarted ¬ FALSE;
dbinfo.statusMsg ¬ "Open for service.";
dbinfo.primaryIndex ¬ NIL;
IF NOT SymTab.Insert[OpenDBTable, key, dbinfo] THEN -- lost race
dbinfo ¬ NARROW[SymTab.Fetch[OpenDBTable, key].val];
};
};
FlushOpenTable: PROC [dbinfo: OpenDBInfo ¬ NIL] = {
Flushes one or all databases in the OpenDBTable.
The mapping from name to OpenDBTable entry never changes. A flushed DB is merely a closed DB for which it is necessary to reparse the schema on next access.
DCS, May 10, 1991
FlushDB: SymTab.EachPairAction = { DoClose[NARROW[val], TRUE]; };
IF dbinfo # NIL THEN DoClose[dbinfo, TRUE]
ELSE [] ¬ SymTab.Pairs[OpenDBTable, FlushDB];
};
Cache Hackery
DCS March 20, 1991; if caching is available, use it during WriteEntry
cachedReadEnabled: BOOL¬TRUE;
RRC: TYPE ~ RECORD[
remoteReadCache: PROC[LoganBerry.OpenDB, AttributeValue] RETURNS [Entry]
];
GetCachedEntry: PROC[dbinfo: OpenDBInfo, value: AttributeValue]
RETURNS [entry: Entry¬NIL] ~ TRUSTED INLINE {
temp: REF;
rrc: REF RRC;
IF ~cachedReadEnabled THEN RETURN;
temp ¬ Atom.GetProp[$LoganBerry, $RemoteReadCache];
IF temp=NIL THEN RETURN;
rrc ¬ LOOPHOLE[temp];
entry ¬ rrc.remoteReadCache[RefID.Reseal[dbinfo], value];
};
Initialization and class registration
class: LoganBerryClass.Class;
Want the table of open databases to be created only once, even if this package is re-run.
OpenDBTable ¬ NARROW[Atom.GetProp[$LoganBerry, $OpenDBTable]];
IF OpenDBTable = NIL THEN {
OpenDBTable ¬ SymTab.Create[mod: OpenDBTableSize, case: FALSE];
Atom.PutProp[$LoganBerry, $OpenDBTable, OpenDBTable];
};
Register as the main LoganBerry class.
class ¬ NEW[LoganBerryClass.ClassObject ¬ [
name: $LoganBerry,
open: Open,
describe: Describe,
readEntry: ReadEntry,
enumerateEntries: EnumerateEntries,
generateEntries: GenerateEntries,
nextEntry: NextEntry,
endGenerate: EndGenerate,
writeEntry: WriteEntry,
deleteEntry: DeleteEntry,
close: Close,
buildIndices: BuildIndices,
compactLogs: CompactLogs,
isLocal: IsLocal,
startTransaction: StartTransaction,
endTransaction: EndTransaction,
flushDBCache: FlushDBCache,
classData: NIL
]];
LoganBerryClass.Register[name: class.name, class: class, tryLast: TRUE];
END.
Doug Terry August 12, 1985 1:05:48 pm PDT
created
Doug Terry, January 23, 1986 8:28:43 pm PST
changes to: DIRECTORY, Open, Close, GetInfo, dbinfo (local of GetInfo), OpenI, MonitoredOpen, EnumerateEntriesI, CloseI, CompactLogsI, SetRemoteAccessI, ReadLogEntry, WriteLogEntry, CloseLogs, GetIndexEntry, CreateIndexEntry, DeleteIndexEntry, CloseIndex (local of CloseIndices), OpenDBTable, OpenDBTableSize, GetSharedDB, OpenDBTable, IF, IMPORTS
Doug Terry, January 24, 1986 9:39:30 am PST
changes to: MonitoredOpen, BuildIndicesI, BuildIndicesWorker
Swinehart, January 27, 1986 5:10:45 pm PST
Fullname expansion does not produce a canonical name (case may differ); GetSharedDB now uses a case-insensitive symbol table.
changes to: GetSharedDB
Doug Terry, January 31, 1986 4:25:40 pm PST
changes to: DIRECTORY, OpenIndex (local of OpenIndices), CloseIndex (local of CloseIndices), EraseIndex (local of EraseIndices), CompactLogsI
Doug Terry, March 5, 1986 5:21:27 pm PST
Implemented Describe so that clients can obtain schema information even if they don't have access to the schema file.
changes to: Describe, DescribeI, AddIndexInfo (local of DescribeI)
Doug Terry, June 2, 1987 2:09:48 pm PDT
BuildIndicesWorker now sets the "update in progress" flag while it is working; left call to BTree.GetState since BTreeSimple.GetState raises an address fault; added variables for btree performance tuning.
changes to: DIRECTORY, OpenIndex (local of OpenIndices), EraseIndex (local of EraseIndices), MarkIndex (local of MarkUpdateOnIndices), CloseIndex (local of CloseIndices)
Doug Terry, June 4, 1987 12:45:13 pm PDT
Database is now opened with read access and upgraded to writeable when necessary; on Open, indices are checked to see if they are out-of-date.
changes to: DIRECTORY, scanBufferSize, MonitoredOpen, WriteEntryI, DeleteEntryI, BuildIndicesWorker, OpenLogs, CloseLogs, OpenIndices, OpenIndex (local of OpenIndices), MarkUpdateOnIndices, Recover, InGoodState, RestoreLog, IndicesOutOfDate, MakeDatabaseWriteable, GetSharedDB
Doug Terry, July 28, 1987 6:05:42 pm PDT
Registers PrepareForCheckpoint procedure to close databases in a checkpoint.
changes to: GetSharedDB, PrepareForCheckpoint, CloseDB (local of PrepareForCheckpoint), IF, Booting, DIRECTORY, IMPORTS
Doug Terry, August 24, 1987 2:19:44 pm PDT
Open always reads the highest version database schema.
changes to: MonitoredOpen, ReadSchema, GetSharedDB
Polle Zellweger (PTZ) February 23, 1988 4:08:49 pm PST
Write entry containing CRs as a rope literal, including changing " into \".
changes to: WriteAttribute (local of WriteLogEntry)
Doug Terry, March 29, 1988 11:24:46 am PST
Indices are no longer automatically rebuilt if discovered to be inconsistent or non-existent, NIL attribute values are now properly handled by ReadAttribute.
changes to: DIRECTORY, Open, OpenI, MonitoredOpen, ReadAttribute (local of ReadLogEntry)
Doug Terry, December 20, 1988 10:40:05 am PST
BuildIndicesWorker checks for duplicate or non-existent primary keys in entries.
changes to: DIRECTORY, BuildIndicesI, BuildIndicesWorker
Brian Oki, June 12, 1989 8:59:48 am PDT
Made numerous minor changes to many routines, mostly replacing instances of FS and GeneralFS with UFS. Old statements are commented out and bracketed in << >>
Major changes: Added GMTToHostTime and SetCreateTime routines to make Unix System
calls. SetCreateTime invoked in CloseLogs. Changed CompactLogsI to remove the dependency on versions; a problem is that there's now a window of vulnerability during which the old log file can be completely clobbered while the bits from the compacted log are being copied. Interface change to BTreeSimple (see code). Changed name of LoganBerryStructure to LBStructure because of some runtime problems I didn't understand. Changed all instances of
$append mode to $write mode.
Doug Terry, August 20, 1989 10:16:56 pm PDT
Registers as a LoganBerry class rather than exporting LoganBerry directly.
changes to: DIRECTORY, IMPORTS, EXPORTS, Open, ReadEntry, EnumerateEntries, GenerateEntries, NextEntry, EndGenerate, WriteEntry, DeleteEntry, Close, BuildIndices, CompactLogs, Describe, RegisterWriteProc, UnregisterWriteProc, IsLocal, class
Doug Terry, August 21, 1989 12:25:34 pm PDT
Closed databases are now automatically reopened on access.
changes to: MonitoredOpen, DoOpen, ReadEntryI, GenerateEntriesI, NextEntryI, WriteEntryI, DeleteEntryI, BuildIndicesI, CompactLogsI
Doug Terry, November 13, 1989 6:27:07 pm PST
Write notification is now handled by LoganBerryClassImpl.
changes to: DeleteEntry, IsLocal, GetInfo, WriteEntryI, DeleteEntryI, CloseLogs, class
Dan Swinehart, May 17, 1991 8:18:49 am PDT
A number of performance and robustness improvements have been made over the period from March 6 to now. Here is a summary:
March 6, 1991First step is merely to eliminate most calls on IO.GetLength[], which is expensive because it has to consult the file system; especially expensive if it's a network file system!
March 20, 1991; second step is to incorporate cached LoganBerry entries, so that many reads can be avoided altogether. This is done primarily by running the new cached LoganBerry class (implemented by LoganBerryCacheImpl) and changing the name of the LoganBerry DB in the root file. However, WriteEntry[replace:TRUE] must first read the previous entry in order to remove it from the indices; if the caching code is present and enabled, a back door is provided permitting WriteEntry to read the previous entry from the cache. This saves a lot of time.
March 29, 1991; third step is to change the semantics of the state between StartTransaction and EndTransaction. While a transaction is in progress, the contents of the stable storage is not guaranteed correct between calls to WriteEntry, since stream flush operations are suppressed. Special care must be (and is) taken when doing a ReadEntry (expected to be infrequent because of the cache) that everything it's trying to read is available to the read stream, since writes and reads use separate streams. The hard part of this is that the transaction is opened on the entire DB, but the actions are potentially applied to multiple logs.
April 6, 1991; Reduce the size of the B-Tree cache assigned to each open database. Above a certain size, thrashing is increased without substantial improvement in performance.
Robustness improvements, May 10, 1991: The definition of a DB flush has been to lose the mapping between a database name and its OpenDBInfo entry. The DB was not closed, nor were other held handles in any way invalidated. Since applications could still hold the original DB handle, a subsequent open could yield two separately monitored access paths to the same information -- a bad thing. Flushing is hereby changed to retain the name=>dbinfo mapping, but to close the DB and force a reread of the DB schema the next time the DB is opened. A full restart will be required whenever the interface changes, or whenever some part of the file name access path is visibly changed. A small price to pay.
January 15, 1992; (DBT) Changed firstWritten in CompactLogsI to TRUE to fix bug in Compact.