LoganBerryImpl.mesa
Copyright © 1985 by Xerox Corporation. All rights reserved.
Doug Terry, April 14, 1986 5:33:43 pm PST
Swinehart, January 27, 1986 5:16:09 pm PST
LoganBerry is a simple facility for managing databases. Data is stored in one or more log files and indexed using stable btrees. This "poor man's" database facility is intended to allow various types of data to be stored persistently. A database survives processor crashes, but the data management facility does not provide atomic transactions. Only very simple queries are supported: retrieval by key or key subrange. Databases may be shared by multiple clients and accessed remotely via RPC.
DIRECTORY
Atom USING [GetPName, MakeAtom, GetProp, PutProp],
BasicTime USING [GMT, Period],
BTree USING [Error, GetState],
BTreeSimple USING [DeleteKey, EnumerateRecords, --GetState,-- InternalKey, New, NewPathStk, Open, PathStk, ReadRecord, --ReadValue,-- Relation, SetState, Tree, UpdateRecord, Value, ValueObject],
Convert USING [IntFromRope, RopeFromInt],
DFUtilities USING [--DirectoryItem,-- FileItem, CommentItem, ParseFromStream, ProcessItemProc, RemoveVersionNumber, SyntaxError],
FS USING [Close, Error, ExpandName, FileInfo, OpenFile],
GeneralFS USING [Open, Create, StreamOpen],
IO USING [BreakProc, Close, EndOf, EndOfStream, Error, Flush, GetChar, --GetID,-- GetIndex, GetInt, GetLength, GetLineRope, GetRopeLiteral, GetToken, GetTokenRope, IDProc, PeekChar, PutChar, PutF, RIS, rope, SetIndex, SetLength, SkipWhitespace, STREAM],
RefID USING [Seal, Unseal, Release],
RefTab USING [Create, EachPairAction, Fetch, Insert, Pairs, Ref],
RefText USING [Equal, line, ObtainScratch, ReleaseScratch],
Rope USING [Cat, Compare, Concat, Equal, Find, Index, ROPE, Substr],
SymTab USING [Create, Fetch, Insert, Ref, SeqIndex],
LB,
LoganBerry,
LoganBerryStructure;
LoganBerryImpl: CEDAR MONITOR LOCKS dbinfo USING dbinfo: OpenDBInfo
IMPORTS Atom, BasicTime, BTree, BTreeSimple, Convert, DFUtilities, FS, GeneralFS, IO, RefText, Rope, RefTab, RefID, SymTab
EXPORTS LB, LoganBerry
~ BEGIN
OPEN LoganBerry, LoganBerryStructure;
ROPE: TYPE = Rope.ROPE;
STREAM: TYPE = IO.STREAM;
Exported RPC operations
These routines export to the LoganBerry interface. All of the operations (except EnumerateEntries) can be invoked via RPC. They take a RPC.Conversation as the first argument so that secure remote procedure calls can be made to a LoganBerry database server.
Error: PUBLIC ERROR [ec: ErrorCode, explanation: ROPENIL] = CODE;
Open: PUBLIC PROC [conv: Conv ← NIL, dbName: ROPE] RETURNS [db: OpenDB] ~ {
Initiates database activity and checks for consistency. This can be called any number of times to get a new OpenDB handle or reopen a database that has been closed. Indices are automatically rebuilt if any are missing or if a partially-completed update left them out-of-date.
dbinfo: OpenDBInfo ← OpenI[dbName];
IF NOT dbinfo.remoteAccess THEN ERROR Error[$DBNotAvailable, dbinfo.statusMsg];
db ← RefID.Seal[dbinfo];
};
ReadEntry: PUBLIC PROC [conv: Conv ← NIL, db: OpenDB, key: AttributeType, value: AttributeValue] RETURNS [entry: Entry, others: BOOLEAN] ~ {
Returns an entry that contains the given attribute value for the given key. If the key refers to the primary index then the unique entry is returned and `others' is FALSE. If the key is secondary and several entries exist with the same value for the key, then an arbitrary entry is returned and `others' is set to TRUE; use EnumerateEntries to get all of the matching entries.
[entry, others] ← ReadEntryI[GetInfo[db], key, value];
};
EnumerateEntries: PUBLIC PROC [db: OpenDB, key: AttributeType, start: AttributeValue ← NIL, end: AttributeValue ← NIL, proc: EntryProc] RETURNS [] ~ {
Calls `proc' for each entry in the specified range of key values. The enumeration is halted when either the range of entries is exhausted or `proc' returns FALSE. A NIL value for `start' represents the least attribute value, while a NIL value for `end' represents the largest attribute value. Thus, the complete database can be enumerated by specifying start=NIL and end=NIL with `key' equal to the primary key.
EnumerateEntriesI[GetInfo[db], key, start, end, proc];
};
GenerateEntries: PUBLIC PROC [conv: Conv ← NIL, db: OpenDB, key: AttributeType, start: AttributeValue ← NIL, end: AttributeValue ← NIL] RETURNS [cursor: Cursor] ~ {
Similar to EnumerateEntries, but creates a cursor so that entries in the specified range of key values can be retrieved using NextEntry (defined below). Initially, the cursor points to the start of the sequence. A NIL value for `start' represents the least attribute value, while a NIL value for `end' represents the largest attribute value. Thus, the complete database can be enumerated by specifying start=NIL and end=NIL with `key' equal to the primary key.
newCursor: CursorInfo ← GenerateEntriesI[GetInfo[db], key, start, end];
cursor ← RefID.Seal[newCursor];
};
NextEntry: PUBLIC PROC [conv: Conv ← NIL, cursor: Cursor, dir: CursorDirection ← increasing] RETURNS [entry: Entry] ~ {
Retrieves the next entry relative to the given cursor. The cursor, and thus the sequence of desired entries, must have been previously created by a call to GenerateEntries. The cursor is automatically updated so that NextEntry may be repeatedly called to enumerate entries. NIL is returned if the cursor is at the end of the sequence and dir=increasing or at the start of the sequence and dir=decreasing.
c: CursorInfo;
WITH RefID.Unseal[cursor] SELECT FROM
ci: CursorInfo => c ← ci;
ENDCASE => c ← NIL;
IF c = NIL THEN ERROR Error[$BadCursor, "Invalid cursor passed to NextEntry."];
entry ← NextEntryI[c.dbinfo, c, dir];
};
EndGenerate: PUBLIC PROC [conv: Conv ← NIL, cursor: Cursor] RETURNS [] ~ {
Releases the cursor; no further operations may be performed using the given cursor. This must be called once for every call to GenerateEntries.
[] ← RefID.Release[cursor];
};
WriteEntry: PUBLIC PROC [conv: Conv ← NIL, db: OpenDB, entry: Entry, log: LogID ← activityLog, replace: BOOLEANFALSE] RETURNS [] ~ {
Adds a new entry to the database. The entry is added to the activity log unless another log is explicitly requested. The entry must have an attribute for the primary key. The primary attribute value must be unique throughout the database unless replace=TRUE; in this case, an existing entry with the same primary attribute value is atomically replaced with the new entry.
WriteEntryI[GetInfo[db], entry, log, replace];
};
DeleteEntry: PUBLIC PROC [conv: Conv ← NIL, db: OpenDB, key: AttributeType, value: AttributeValue] RETURNS [] ~ {
Deletes the entry that contains the given attribute value for the given key. If the key is for a secondary index and the value is not unique then an Error[$ValueNotUnique] is raised. Deletes are actually performed by logging the delete request (so that the log can be replayed at a later time) and then updating all of the indices.
DeleteEntryI[GetInfo[db], key, value];
};
Close: PUBLIC PROC [conv: Conv ← NIL, db: OpenDB] RETURNS [] ~ {
Terminates database activity and closes all log and index files. Databases are always maintained consistently on disk so it doesn't hurt to leave a database open. The main reason for calling Close is to release the FS locks on the log files so they can be manually edited.
CloseI[GetInfo[db ! Error => IF ec=$DBClosed THEN CONTINUE]];
};
BuildIndices: PUBLIC PROC [conv: Conv ← NIL, db: OpenDB] RETURNS [] ~ {
Rebuilds the indices by scanning the logs and performing WriteEntry or DeleteEntry operations.
BuildIndicesI[GetInfo[db]];
};
CompactLogs: PUBLIC PROC [conv: Conv ← NIL, db: OpenDB] RETURNS [] ~ {
Removes deleted entries from the logs by enumerating the primary index and writing new logs.
CompactLogsI[GetInfo[db]];
};
Describe: PUBLIC PROC [conv: Conv ← NIL, db: OpenDB] RETURNS [info: SchemaInfo] ~ {
Returns schema information about the database.
infoDescribeI[GetInfo[db]];
};
GetInfo: PROC [db: OpenDB] RETURNS [OpenDBInfo] ~ INLINE {
Unseals the database handle, ensures that it's valid, and checks if remote access is allowed.
ref: REF = RefID.Unseal[db];
IF ref = NIL THEN
ERROR Error[$BadDBHandle, "NIL OpenDB handle."];
WITH ref SELECT FROM
dbinfo: OpenDBInfo => {
IF NOT dbinfo.remoteAccess THEN ERROR Error[$DBNotAvailable, dbinfo.statusMsg];
IF NOT dbinfo.isOpen THEN ERROR Error[$DBClosed, dbinfo.statusMsg];
RETURN[dbinfo];
};
ENDCASE => ERROR Error[$BadDBHandle, "Invalid OpenDB handle."];
};
Internal monitored operations
These routines export to the internal LB interface. The operations are monitored so that several processes can share a database. All of the operations (except OpenI) take a monitored OpenDBInfo handle as their first parameter.
OpenI: PUBLIC PROC [dbName: ROPE] RETURNS [dbinfo: OpenDBInfo] ~ {
Initiates database activity and checks for consistency. This can be called any number of times to get a new OpenDB handle or reopen a database that has been closed. Indices are automatically rebuilt if any are missing or if a partially-completed update left them out-of-date.
WARNING: SINCE THIS ROUTINE IS NOT MONITORED, RACE CONDITIONS MUST BE CAREFULLY AVOIDED.
dbinfo ← GetSharedDB[dbName];
IF NOT dbinfo.isOpen THEN
MonitoredOpen[dbinfo];
RETURN[dbinfo];
};
MonitoredOpen: ENTRY PROC [dbinfo: OpenDBInfo] RETURNS [] ~ {
ENABLE UNWIND => NULL;
Want at most one client to attempt to open a database.
needToRebuild: BOOLEANFALSE;
IF dbinfo.isOpen THEN RETURN; -- lost race
IF dbinfo.primaryIndex = NIL THEN
ReadSchema[dbinfo];
OpenLogs[dbinfo.logs];
IF Recover[dbinfo] THEN
needToRebuild ← TRUE;
IF OpenIndices[dbinfo.indices] THEN
needToRebuild ← TRUE;
IF needToRebuild THEN
BuildIndicesWorker[dbinfo];
dbinfo.isOpen ← TRUE;
};
ReadEntryI: PUBLIC ENTRY PROC [dbinfo: OpenDBInfo, key: AttributeType, value: AttributeValue] RETURNS [entry: Entry, others: BOOLEAN] ~ {
ENABLE UNWIND => NULL;
Returns an entry that contains the given attribute value for the given key. If the key refers to the primary index then the unique entry is returned and `others' is FALSE. If the key is secondary and several entries exist with the same value for the key, then an arbitrary entry is returned and `others' is set to TRUE; use EnumerateEntries to get all of the matching entries.
indexEntry: IndexEntry;
index: IndexInfo;
aValue: AttributeValue;
pathSkt: BTreeSimple.PathStk ← BTreeSimple.NewPathStk[];
Find appropriate index
index ← NARROW[RefTab.Fetch[dbinfo.indices, key].val];
IF index = NIL THEN
ERROR Error[$NoIndex, Rope.Cat["Index for ",Atom.GetPName[key]," does not exist."]];
[aValue, indexEntry] ← GetIndexEntry[index: index, value: value, pathStk: pathSkt];
IF aValue = NIL THEN RETURN[entry: NIL, others: FALSE];
entry ← ReadLogEntry[dbinfo.logs[indexEntry.log].stream, indexEntry.firstByte];
IF index.type = primary THEN
others ← FALSE
ELSE {
aValue ← GetIndexEntry[index: index, value: aValue, valueIsKey: TRUE, relation: greater, pathStk: pathSkt].actual;
others ← Rope.Equal[s1: value, s2: UniqueKeyToValue[aValue], case: FALSE];
};
RETURN[entry, others];
};
SimpleEntryProc: EntryProc = { -- for debugging purposes
[entry: LoganBerry.Entry] RETURNS [continue: BOOL]
cont: BOOLEANTRUE;
set breakpoint here to look at entry
RETURN[cont];
};
EnumerateEntriesI: PUBLIC ENTRY PROC [dbinfo: OpenDBInfo, key: AttributeType, start: AttributeValue ← NIL, end: AttributeValue ← NIL, proc: EntryProc] RETURNS [] ~ {
ENABLE UNWIND => NULL;
Calls `proc' for each entry in the specified range of key values. The enumeration is halted when either the range of entries is exhausted or `proc' returns FALSE. A NIL value for `start' represents the least attribute value, while a NIL value for `end' represents the largest attribute value. Thus, the complete database can be enumerated by specifying start=NIL and end=NIL with `key' equal to the primary key.
EntriesInSubrange: PROC [key: BTreeSimple.InternalKey, value: BTreeSimple.Value] RETURNS [continue: BOOLEAN] = {
indexEntry: IndexEntry;
entry: Entry;
IF end # NIL AND Rope.Compare[s1: key, s2: end, case: FALSE] = greater THEN RETURN[continue: FALSE];
TRUSTED {
indexEntry ← LOOPHOLE[@value[0], IndexPtr]^;
};
entry ← ReadLogEntry[dbinfo.logs[indexEntry.log].stream, indexEntry.firstByte];
RETURN[continue: proc[entry]];
};
index: IndexInfo;
index ← NARROW[RefTab.Fetch[dbinfo.indices, key].val];
IF index = NIL THEN
ERROR Error[$NoIndex, Rope.Cat["Index for ",Atom.GetPName[key]," does not exist."]];
IF index.type = secondary THEN {
IF start # NIL THEN
start ← ValueToSmallKey[start];
IF end # NIL THEN
end ← ValueToLargeKey[end];
};
[] ← BTreeSimple.EnumerateRecords[tree: index.btree, key: start, Proc: EntriesInSubrange !
BTree.Error => ERROR Error[IF reason = closed THEN $DBClosed ELSE $BadIndex, Rope.Cat["Problem enumerating index for ",Atom.GetPName[key],"."]]];
};
GenerateEntriesI: PUBLIC ENTRY PROC [dbinfo: OpenDBInfo, key: AttributeType, start: AttributeValue ← NIL, end: AttributeValue ← NIL] RETURNS [cinfo: CursorInfo] ~ {
ENABLE UNWIND => NULL;
Similar to EnumerateEntries, but creates a cursor so that entries in the specified range of key values can be retrieved using NextEntry (defined below). Initially, the cursor points to the start of the sequence. A NIL value for `start' represents the least attribute value, while a NIL value for `end' represents the largest attribute value. Thus, the complete database can be enumerated by specifying start=NIL and end=NIL with `key' equal to the primary key.
newCursor: CursorInfo;
index: IndexInfo;
index ← NARROW[RefTab.Fetch[dbinfo.indices, key].val];
IF index = NIL THEN
ERROR Error[$NoIndex, Rope.Cat["Index for ",Atom.GetPName[key]," does not exist."]];
newCursor ← NEW[CursorRecord];
newCursor.index ← index;
newCursor.dbinfo ← dbinfo;
newCursor.key ← key;
newCursor.start ← start;
newCursor.end ← end;
IF newCursor.index.type = secondary THEN {
IF start # NIL THEN
newCursor.start ← ValueToSmallKey[start];
IF end # NIL THEN
newCursor.end ← ValueToLargeKey[end];
};
newCursor.pathStk ← BTreeSimple.NewPathStk[];
back up one so we're in a position to start (assuming dir=increasing)
newCursor.current ← IF start # NIL THEN GetIndexEntry[index: newCursor.index, value: newCursor.start, valueIsKey: TRUE, relation: less, pathStk: newCursor.pathStk].actual ELSE NIL;
RETURN[newCursor];
};
NextEntryI: PUBLIC ENTRY PROC [dbinfo: OpenDBInfo, cinfo: CursorInfo, dir: CursorDirection ← increasing] RETURNS [entry: Entry] ~ {
ENABLE UNWIND => NULL;
Retrieves the next entry relative to the given cursor. The cursor, and thus the sequence of desired entries, must have been previously created by a call to GenerateEntries. The cursor is automatically updated so that NextEntry may be repeatedly called to enumerate entries. NIL is returned if the cursor is at the end of the sequence and dir=increasing or at the start of the sequence and dir=decreasing.
Note: The end conditions may not be what is desired, since changing directions at either end causes the first element at that end to be skipped.
actualValue: AttributeValue;
indexEntry: IndexEntry;
[actualValue, indexEntry] ← GetIndexEntry[index: cinfo.index, value: cinfo.current, valueIsKey: TRUE, relation: IF dir = increasing THEN greater ELSE less, pathStk: cinfo.pathStk];
IF (actualValue = NIL) OR (dir = increasing AND cinfo.end # NIL AND Rope.Compare[actualValue, cinfo.end, FALSE] = greater) OR (dir = decreasing AND Rope.Compare[actualValue, cinfo.start, FALSE] = less) THEN
RETURN[NIL];
entry ← ReadLogEntry[dbinfo.logs[indexEntry.log].stream, indexEntry.firstByte];
cinfo.current ← actualValue;
RETURN[entry];
};
EndGenerateI: PUBLIC ENTRY PROC [dbinfo: OpenDBInfo, cinfo: CursorInfo] RETURNS [] ~ {
ENABLE UNWIND => NULL;
Releases the cursor; no further operations may be performed using the given cursor. This must be called once for every call to GenerateEntries.
NULL; -- this routine is unnecessary since cursors get garbage collected
};
WriteEntryI: PUBLIC ENTRY PROC [dbinfo: OpenDBInfo, entry: Entry, log: LogID ← activityLog, replace: BOOLEANFALSE] RETURNS [] ~ {
ENABLE UNWIND => NULL;
Adds a new entry to the database. The entry is added to the activity log unless another log is explicitly requested. The entry must have an attribute for the primary key and the primary attribute value must be unique throughout the database.
value: AttributeValue;
indexData: IndexEntry ← [log: log, firstByte: 0];
doReplace: BOOLEANFALSE;
replacedIndexEntry: IndexEntry;
replacedEntry: Entry;
value ← GetAttributeValue[entry, dbinfo.primaryIndex.key];
IF value = NIL THEN
ERROR Error[$NoPrimaryKey, "Entry does not contain a primary key."];
IF dbinfo.logs[log].access = readOnly THEN
ERROR Error[$LogReadOnly, "Can't write to a read only log."];
[value, replacedIndexEntry] ← GetIndexEntry[index: dbinfo.primaryIndex, value: value];
IF value # NIL THEN
IF replace THEN
doReplace ← TRUE
ELSE
ERROR Error[$ValueNotUnique, "An existing entry already contains the primary key."];
For replacements, the entry being replaced and its replacement must reside in the same log. A replace operation is logged in a similar manner to delete operations. The entry written after the replace log entry is the replacement. Both log entries are written before any of the indices are updated. In the event of a crash, both entries must be recovered atomically.
IF doReplace THEN {
IF replacedIndexEntry.log # log THEN
ERROR Error[$InvalidReplace, "Cross-log replacements are not allowed."];
[] ← WriteLogEntry[dbinfo.logs[log].stream, LIST[[$REPLACED, Convert.RopeFromInt[replacedIndexEntry.firstByte]]]];
};
indexData.firstByte ← WriteLogEntry[dbinfo.logs[log].stream, entry];
IF doReplace THEN {
replacedEntry ← ReadLogEntry[dbinfo.logs[log].stream, replacedIndexEntry.firstByte];
RemoveEntryFromIndices[dbinfo.indices, replacedEntry, replacedIndexEntry];
};
AddEntryToIndices[dbinfo.indices, entry, indexData];
MarkUpdateComplete[dbinfo.logs[log].stream];
};
DeleteEntryI: PUBLIC ENTRY PROC [dbinfo: OpenDBInfo, key: AttributeType, value: AttributeValue] RETURNS [] ~ {
ENABLE UNWIND => NULL;
Deletes the entry that contains the given attribute value for the given key. If the key is for a secondary index and the value is not unique then an Error[$ValueNotUnique] is raised. Deletes are actually performed by logging the delete request (so that the log can be replayed at a later time) and then updating all of the indices.
indexEntry: IndexEntry;
index: IndexInfo;
pathSkt: BTreeSimple.PathStk ← BTreeSimple.NewPathStk[];
avalue: AttributeValue;
delete: Entry;
entry: Entry;
Find appropriate index
index ← NARROW[RefTab.Fetch[dbinfo.indices, key].val];
IF index = NIL THEN
ERROR Error[$NoIndex, Rope.Cat["Index for ",Atom.GetPName[key]," does not exist."]];
Locate log entry
[avalue, indexEntry] ← GetIndexEntry[index: index, value: value, pathStk: pathSkt];
IF avalue = NIL THEN RETURN;
avalue ← GetIndexEntry[index: index, value: avalue, valueIsKey: TRUE, relation: greater, pathStk: pathSkt].actual;
IF Rope.Equal[s1: value, s2: UniqueKeyToValue[avalue], case: FALSE] THEN
ERROR Error[$ValueNotUnique, "Entry to be deleted insufficiently specified."];
IF dbinfo.logs[indexEntry.log].access = readOnly THEN
ERROR Error[$LogReadOnly, "Can't delete entries in a read only log."];
Log delete operation
delete ← LIST[[$DELETED, Convert.RopeFromInt[indexEntry.firstByte]]];
[] ← WriteLogEntry[dbinfo.logs[indexEntry.log].stream, delete];
Update indices
entry ← ReadLogEntry[dbinfo.logs[indexEntry.log].stream, indexEntry.firstByte];
RemoveEntryFromIndices[dbinfo.indices, entry, indexEntry];
MarkUpdateComplete[dbinfo.logs[indexEntry.log].stream];
};
CloseI: PUBLIC ENTRY PROC [dbinfo: OpenDBInfo] RETURNS [] ~ {
ENABLE UNWIND => NULL;
Terminates database activity and closes all log and index files. Databases are always maintained consistently on disk so it doesn't hurt to leave a database open. The main reason for calling Close is to release the FS locks on the log files so they can be manually edited.
CloseLogs[dbinfo.logs];
CloseIndices[dbinfo.indices];
dbinfo.isOpen ← FALSE;
};
Maintenance operations
BuildIndicesI: PUBLIC ENTRY PROC [dbinfo: OpenDBInfo] RETURNS [] ~ {
ENABLE UNWIND => NULL;
Rebuilds the indices by scanning the logs and performing WriteEntry or DeleteEntry operations.
This routine grabs the monitor lock then calls on the internal procedure to do the real work.
BuildIndicesWorker[dbinfo];
};
BuildIndicesWorker: PROC [dbinfo: OpenDBInfo] RETURNS [] ~ {
indexData: IndexEntry;
logstream: STREAM;
entry: Entry;
indexEntry: IndexEntry;
saveStreamPosition: INT;
EraseIndices[dbinfo.indices];
FOR log: LogID IN [0..dbinfo.logs.size) DO
IF dbinfo.logs[log] # NIL THEN {
indexData ← [log, 0];
logstream ← dbinfo.logs[log].stream;
FOR entry ← ReadLogEntry[logstream, 0], ReadLogEntry[logstream, -1] UNTIL entry = NIL DO
IF (entry.first.type # $DELETED) AND (entry.first.type # $REPLACED) THEN { -- WriteEntry
AddEntryToIndices[dbinfo.indices, entry, indexData];
}
ELSE { -- DeleteEntry
saveStreamPosition ← IO.GetIndex[logstream]; -- before the following read changes the index
indexEntry ← [log: log, firstByte: Convert.IntFromRope[entry.first.value]];
entry ← ReadLogEntry[dbinfo.logs[indexEntry.log].stream, indexEntry.firstByte];
RemoveEntryFromIndices[dbinfo.indices, entry, indexEntry];
IO.SetIndex[logstream, saveStreamPosition];
};
IF IO.EndOf[logstream] OR IO.GetChar[logstream] # EndOfEntry THEN
ERROR Error[$BadLogEntry, "Missing terminator on log entry."];
indexData.firstByte ← IO.GetIndex[logstream];
ENDLOOP;
};
ENDLOOP;
};
CompactLogsI: PUBLIC ENTRY PROC [dbinfo: OpenDBInfo] RETURNS [] ~ {
ENABLE { UNWIND => NULL; FS.Error => ERROR Error[$InternalError, error.explanation]; };
Removes deleted entries from the logs by enumerating the primary index and writing new logs.
KeepEntry: PROC [key: BTreeSimple.InternalKey, value: BTreeSimple.Value] RETURNS [continue: BOOLEAN] = {
indexEntry: IndexEntry;
entry: Entry;
TRUSTED {
indexEntry ← LOOPHOLE[@value[0], IndexPtr]^;
};
IF dbinfo.logs[indexEntry.log].access # readOnly THEN {
entry ← ReadLogEntry[dbinfo.logs[indexEntry.log].stream, indexEntry.firstByte];
[] ← WriteLogEntry[newlogs[indexEntry.log].stream, entry];
MarkUpdateComplete[newlogs[indexEntry.log].stream];
};
RETURN[continue: TRUE];
};
newlogs: LogSet ← NEW[LogSetRecord[dbinfo.logs.size]];
FOR i: LogID IN [0..dbinfo.logs.size) DO
IF dbinfo.logs[i] # NIL AND dbinfo.logs[i].access # readOnly THEN {
newlogs[i] ← NEW[LogInfoRecord];
newlogs[i].stream ← GeneralFS.StreamOpen[dbinfo.logs[i].filename, $create !
FS.Error => ERROR Error[$CantOpenLog, error.explanation]];
};
ENDLOOP;
[] ← BTreeSimple.EnumerateRecords[tree: dbinfo.primaryIndex.btree, key: NIL, Proc: KeepEntry !
BTree.Error => ERROR Error[IF reason = closed THEN $DBClosed ELSE $BadIndex, "Problem enumerating primary index."]];
FOR i: LogID IN [0..dbinfo.logs.size) DO
IF newlogs[i] # NIL THEN {
IO.Close[newlogs[i].stream];
IO.Close[dbinfo.logs[i].stream];
dbinfo.logs[i].streamGeneralFS.StreamOpen[dbinfo.logs[i].filename, $append !
FS.Error => ERROR Error[$CantOpenLog, error.explanation]];
};
ENDLOOP;
BuildIndicesWorker[dbinfo];
};
SetRemoteAccessI: PUBLIC ENTRY PROC [dbinfo: OpenDBInfo, accessible: BOOLEANTRUE, why: ROPENIL] RETURNS [] ~ {
ENABLE UNWIND => NULL;
Controls access by remote clients to the database. If accessible=FALSE then the database can not be accessed through the normal LoganBerry interface, only via this interface; setting accessible=TRUE makes the database commonly available (the normal case). This operation is intended for use by administrators that wish to temporarily remove a database from service.
dbinfo.remoteAccess ← accessible;
dbinfo.statusMsg ← why;
};
Database schema
A schema is created by clients of this package and stored in a DF file that can be used to backup the database. Lines of the file starting with "-->" are deemed to contain schema information for the file named in the subsequent line of the DF file or at the end of the same line. The two types of schema entries are as follows:
--> log <logID> <readOnly or readWrite> <optional CR> <filename>
--> index <key> <primary or secondary> <optional CR> <filename>
SchemaChars: PUBLIC REF READONLY TEXT = "-->";
ParseSchemaLine: PROC [s: STREAM, buffer: REF TEXT, wdir: ROPE] RETURNS [item: REF ANY, needFilename: BOOLEAN ← FALSE] ~ {
Takes an input stream and parses one schema entry from it. The item returned is of type LogInfo or IndexInfo; needFilename=TRUE indicates that no file name existed in the input stream (it must be on the next line of the schema file).
ENABLE {
IO.Error => ERROR Error[$BadSchema, "Error parsing schema statement."];
IO.EndOfStream => ERROR Error[$BadSchema, "Unexpected end of stream."] };
token: REF TEXT;
log: LogInfo;
index: IndexInfo;
needFilename ← FALSE;
token ← IO.GetToken[s, IO.IDProc, buffer].token;
IF NOT RefText.Equal[token, SchemaChars] THEN RETURN[NIL, FALSE];
token ← IO.GetToken[s, IO.IDProc, buffer].token;
SELECT TRUE FROM
RefText.Equal[token, "log", FALSE] => {
log ← NEW[LogInfoRecord];
log.id ← IO.GetInt[s];
token ← IO.GetToken[s, IO.IDProc, buffer].token;
log.access ← IF RefText.Equal[token, "ReadOnly", FALSE] THEN readOnly ELSE readWrite;
log.filename ← IO.GetTokenRope[s, IO.IDProc ! IO.EndOfStream => {needFilename ← TRUE; CONTINUE}].token;
IF NOT needFilename THEN
log.filename ← FS.ExpandName[log.filename, wdir].fullFName;
item ← log;
};
RefText.Equal[token, "index", FALSE] => {
index ← NEW[IndexInfoRecord];
index.key ← Atom.MakeAtom[IO.GetRopeLiteral[s]];
token ← IO.GetToken[s, IO.IDProc, buffer].token;
index.type ← IF RefText.Equal[token, "primary", FALSE] THEN primary ELSE secondary;
index.filename ← IO.GetTokenRope[s, IO.IDProc ! IO.EndOfStream => {needFilename ← TRUE; CONTINUE}].token;
IF NOT needFilename THEN
index.filename ← FS.ExpandName[index.filename, wdir].fullFName;
item ← index;
};
ENDCASE => ERROR Error[$BadSchema, "Invalid keyword."];
};
ReadSchema: PROC [dbinfo: OpenDBInfo] RETURNS [] ~ {
Read the database schema from dbinfo.dbName.
ENABLE DFUtilities.SyntaxError => ERROR Error[$BadSchema, reason];
NewDBInfo: PROC [item: REF ANY, filename: ROPENIL] RETURNS [] ~ {
WITH item SELECT FROM
log: LogInfo => {
IF filename # NIL THEN
log.filename ← filename;
dbinfo.logs[log.id] ← log;
};
index: IndexInfo => {
IF filename # NIL THEN
index.filename ← filename;
[] ← RefTab.Insert[dbinfo.indices, index.key, index];
IF index.type = primary THEN
IF dbinfo.primaryIndex = NIL THEN
dbinfo.primaryIndex ← index
ELSE
ERROR Error[$BadSchema, "Multiple primary indices specified."];
};
ENDCASE => ERROR Error[$InternalError];
};
SchemaItemProc: DFUtilities.ProcessItemProc = {
[item: REF ANY] RETURNS [stop: BOOL ← FALSE]
WITH item SELECT FROM
comment: REF DFUtilities.CommentItem => {
IF needFilename THEN
ERROR Error[$BadSchema, "No file name specified for some log or index."];
[lastSchemaItem, needFilename] ← ParseSchemaLine[IO.RIS[comment.text], buffer, wdir];
IF lastSchemaItem # NIL AND NOT needFilename THEN
NewDBInfo[lastSchemaItem];
};
file: REF DFUtilities.FileItem => {
IF needFilename THEN {
NewDBInfo[lastSchemaItem, FS.ExpandName[DFUtilities.RemoveVersionNumber[file.name], wdir].fullFName];
needFilename ← FALSE;
};
};
ENDCASE => {
IF needFilename THEN
ERROR Error[$BadSchema, "No file name specified for some log or index."];
};
};
needFilename: BOOLEANFALSE;
lastSchemaItem: REF ANY;
buffer: REF TEXT = RefText.ObtainScratch[RefText.line];
wdir: ROPE = Rope.Substr[dbinfo.dbName, 0, FS.ExpandName[dbinfo.dbName].cp.base.start];
schemaStream: STREAM ← GeneralFS.StreamOpen[dbinfo.dbName !
FS.Error => ERROR Error[$CantOpenSchema, error.explanation]];
DFUtilities.ParseFromStream[in: schemaStream, proc: SchemaItemProc, filter: [comments: TRUE]];
RefText.ReleaseScratch[buffer];
};
DescribeI: PROC [dbinfo: OpenDBInfo] RETURNS [info: SchemaInfo] ~ {
Returns schema information about the database. This routine is not monitored since it can safely be called while other operations are in progress.
AddIndexInfo: RefTab.EachPairAction = {
[key: RefTab.Key, val: RefTab.Val] RETURNS [quit: BOOLEAN]
index: IndexInfo ← NARROW[val];
IF index # dbinfo.primaryIndex THEN {
info.keys ← CONS[index.key, info.keys];
info.indexNames ← CONS[index.filename, info.indexNames];
};
RETURN[quit: FALSE];
};
info.dbName ← dbinfo.dbName;
info.logs ← NIL;
info.logNames ← NIL;
FOR i: LogID IN [0..dbinfo.logs.size) DO
IF dbinfo.logs[i] # NIL THEN {
info.logs ← CONS[i, info.logs];
info.logNames ← CONS[dbinfo.logs[i].filename, info.logNames];
};
ENDLOOP;
info.keys ← NIL;
info.indexNames ← NIL;
[] ← RefTab.Pairs[dbinfo.indices, AddIndexInfo];
info.keys ← CONS[dbinfo.primaryIndex.key, info.keys];
info.indexNames ← CONS[dbinfo.primaryIndex.filename, info.indexNames];
};
Entries
GetAttributeValue: PROC [entry: Entry, type: AttributeType] RETURNS [AttributeValue] ~ {
FOR e: Entry ← entry, e.rest WHILE e # NIL DO
IF e.first.type = type THEN
RETURN[e.first.value];
ENDLOOP;
RETURN[NIL];
};
ReverseEntry: PROC [entry: Entry] RETURNS[Entry] = {
Destructively reverses the order of the entry's attributes. Taken from LispImpl.DReverse.
l1, l2, l3: Entry ← NIL;
IF entry = NIL THEN RETURN[NIL];
l3 ← entry;
UNTIL (l1 ← l3) = NIL DO
l3 ← l3.rest;
l1.rest ← l2;
l2 ← l1;
ENDLOOP;
RETURN[l2];
};
Log management
Some day these routines may be placed in a separate module.
OpenLogs: PROC [logs: LogSet] RETURNS [] ~ {
FOR i: LogID IN [0..logs.size) DO
IF (logs[i] # NIL) AND (logs[i].stream = NIL) THEN {
logs[i].stream ← GeneralFS.StreamOpen[logs[i].filename, IF logs[i].access = readWrite THEN $append ELSE $read !
FS.Error => ERROR Error[$CantOpenLog, error.explanation]];
};
ENDLOOP;
};
ReadLogEntry: PROC [logStream: STREAM, byte: LogAddress] RETURNS [Entry] ~ {
ENABLE { IO.EndOfStream, IO.Error => ERROR Error[$BadLogEntry] };
AttributeBreakProc: IO.BreakProc = {
[char: CHAR] RETURNS [IO.CharClass]
RETURN[SELECT char FROM
': => sepr,
ENDCASE => other]
};
ReadAttribute: PROC [s: STREAM] RETURNS [a: Attribute] ~ {
a.type ← Atom.MakeAtom[IO.GetTokenRope[s, AttributeBreakProc].token];
[] ← IO.GetChar[s]; -- attribute separation char
[] ← IO.SkipWhitespace[stream: s, flushComments: FALSE];
IF IO.PeekChar[s] = '" THEN {
a.value ← IO.GetRopeLiteral[s];
[] ← IO.GetLineRope[s];
}
ELSE
a.value ← IO.GetLineRope[s];
};
entry: Entry;
attribute: Attribute;
IF logStream = NIL THEN ERROR Error[$DBClosed];
IF byte >= 0 THEN
IO.SetIndex[logStream, byte ! IO.Error => ERROR Error[$BadIndex]];
IF IO.PeekChar[logStream] = UpdateComplete THEN RETURN[NIL]; -- at end of log
read a list of attributes until endofentry
UNTIL IO.PeekChar[logStream] = EndOfEntry DO
attribute ← ReadAttribute[logStream];
entry ← CONS[attribute, entry];
ENDLOOP;
the list is constructed backwards, so it is reversed before being returned
RETURN[ReverseEntry[entry]];
};
WriteLogEntry: PROC [logStream: STREAM, entry: Entry] RETURNS [LogAddress] ~ {
WriteAttribute: PROC [s: STREAM, a: Attribute] RETURNS [] ~ {
value: ROPE ← a.value;
IF Rope.Find[a.value, "\n"] # -1 THEN -- write out value as rope literal
value ← Rope.Cat["\"", a.value, "\""];
IO.PutF[s, "%g: %g\n", IO.rope[Atom.GetPName[a.type]], IO.rope[value]]
};
byteOffset: LogAddress;
IF logStream = NIL THEN ERROR Error[$DBClosed];
byteOffset ← IO.GetLength[logStream];
IF byteOffset > 0 THEN
byteOffset ← byteOffset - 1; --remove update marker from end of log
IO.SetIndex[logStream, byteOffset];
write a list of attributes
FOR e: Entry ← entry, e.rest WHILE e # NIL DO
WriteAttribute[logStream, e.first];
ENDLOOP;
IO.PutChar[logStream, EndOfEntry];
IO.Flush[logStream];
RETURN[byteOffset];
};
MarkUpdateComplete: PROC [logStream: STREAM] RETURNS [] ~ {
byteOffset: LogAddress ← IO.GetLength[logStream];
IO.SetIndex[logStream, byteOffset];
IO.PutChar[logStream, UpdateComplete];
IO.Flush[logStream];
};
CloseLogs: PROC [logs: LogSet] RETURNS [] ~ {
FOR i: LogID IN [0..logs.size) DO
IF (logs[i] # NIL) AND (logs[i].stream # NIL) THEN {
IO.Close[logs[i].stream];
logs[i].stream ← NIL;
};
ENDLOOP;
};
Index management
Some day these routines may be placed in a separate module.
OpenIndices: PROC [indices: IndexSet] RETURNS [needToRebuild: BOOLEAN] ~ {
OpenIndex: RefTab.EachPairAction = {
[key: RefTab.Key, val: RefTab.Val] RETURNS [quit: BOOL]
init: BOOLEANFALSE;
index: IndexInfo ← NARROW[val];
IF index.btree = NIL THEN {
index.btree ← BTreeSimple.New[];
};
IF BTree.GetState[index.btree].state # open THEN {
index.openfile ← GeneralFS.Open[name: index.filename, lock: $write
! FS.Error => { -- problem opening btree so create new one
IF error.group = $user AND error.code = $unknownFile THEN {
init ← TRUE;
needToRebuild ← TRUE;
index.openfile ← GeneralFS.Create[name: index.filename];
CONTINUE;
}
ELSE
ERROR Error[$CantOpenIndex, error.explanation];
}
];
BTreeSimple.Open[tree: index.btree, file: index.openfile, initialize: init !
BTree.Error => ERROR Error[$BadIndex, Rope.Cat["Bad index for ",Atom.GetPName[NARROW[key]],"."]]];
};
RETURN[quit: FALSE];
};
needToRebuild ← FALSE;
[] ← RefTab.Pairs[indices, OpenIndex];
RETURN[needToRebuild];
};
GetIndexEntry: PROC [index: IndexInfo, value: AttributeValue, valueIsKey: BOOLEANFALSE, relation: BTreeSimple.Relation ← equal, pathStk: BTreeSimple.PathStk ← NIL] RETURNS [actual: AttributeValue, data: IndexEntry] ~ {
Note: the AttributeValue's passed in and out may be unique keys or not.
btreeValue: BTreeSimple.Value;
useKey: AttributeValue ← value;
useRelation: BTreeSimple.Relation ← relation;
IF NOT valueIsKey AND index.type = secondary THEN
SELECT relation FROM
less =>
useKey ← ValueToSmallKey[value];
lessEqual =>
useKey ← ValueToLargeKey[value];
equal => { --start small and look upwards
useKey ← ValueToSmallKey[value];
useRelation ← greaterEqual;
};
greaterEqual =>
useKey ← ValueToSmallKey[value];
greater =>
useKey ← ValueToLargeKey[value];
ENDCASE;
[actual, btreeValue] ← BTreeSimple.ReadRecord[tree: index.btree, key: useKey, relation: useRelation, pathStk: pathStk, useExistingPath: pathStk # NIL !
BTree.Error => ERROR Error[IF reason = closed THEN $DBClosed ELSE $BadIndex]];
IF NOT valueIsKey AND index.type = secondary AND relation = equal AND NOT Rope.Equal[value, UniqueKeyToValue[actual], FALSE] THEN -- found entry larger than desired
actual ← NIL;
IF btreeValue # NIL THEN TRUSTED {
data ← LOOPHOLE[@btreeValue[0], IndexPtr]^;
};
RETURN[actual, data];
};
CreateIndexEntry: PROC [index: IndexInfo, value: AttributeValue, data: IndexEntry] RETURNS [] ~ {
btreeValue: BTreeSimple.Value;
btreeValue ← NEW[BTreeSimple.ValueObject[SIZE[IndexEntry]]];
TRUSTED {
LOOPHOLE[@btreeValue[0], IndexPtr]^ ← data;
};
BTreeSimple.UpdateRecord[tree: index.btree, key: IF index.type = primary THEN value ELSE ValueToUniqueKey[value, data], value: btreeValue !
BTree.Error => ERROR Error[IF reason = closed THEN $DBClosed ELSE $BadIndex]];
};
DeleteIndexEntry: PROC [index: IndexInfo, value: AttributeValue, data: IndexEntry] RETURNS [] ~ {
[] ← BTreeSimple.DeleteKey[tree: index.btree, key: IF index.type = primary THEN value ELSE ValueToUniqueKey[value, data] !
BTree.Error => ERROR Error[IF reason = closed THEN $DBClosed ELSE $BadIndex]];
};
AddEntryToIndices: PROC [indices: IndexSet, entry: Entry, data: IndexEntry] RETURNS [] ~ {
index: IndexInfo;
FOR e: Entry ← entry, e.rest WHILE e # NIL DO
index ← NARROW[RefTab.Fetch[indices, e.first.type].val];
IF index # NIL THEN { -- index key in entry so add to index
CreateIndexEntry[index, e.first.value, data];
};
ENDLOOP;
};
RemoveEntryFromIndices: PROC [indices: IndexSet, entry: Entry, data: IndexEntry] RETURNS [] ~ {
index: IndexInfo;
FOR e: Entry ← entry, e.rest WHILE e # NIL DO
index ← NARROW[RefTab.Fetch[indices, e.first.type].val];
IF index # NIL THEN { -- index key in entry so remove from index
DeleteIndexEntry[index, e.first.value, data];
};
ENDLOOP;
};
EraseIndices: PROC [indices: IndexSet] RETURNS [] ~ {
EraseIndex: RefTab.EachPairAction = {
[key: RefTab.Key, val: RefTab.Val] RETURNS [quit: BOOL]
index: IndexInfo ← NARROW[val];
IF index.btree = NIL THEN RETURN[quit: FALSE];
the following two statements may not really be needed
IF BTree.GetState[index.btree].state # closed THEN {
BTreeSimple.SetState[index.btree, closed];
FS.Close[index.openfile];
};
index.openfile ← GeneralFS.Create[index.filename];
BTreeSimple.Open[tree: index.btree, file: index.openfile, initialize: TRUE];
RETURN[quit: FALSE];
};
[] ← RefTab.Pairs[indices, EraseIndex];
};
CloseIndices: PROC [indices: IndexSet] RETURNS [] ~ {
CloseIndex: RefTab.EachPairAction = {
[key: RefTab.Key, val: RefTab.Val] RETURNS [quit: BOOL]
index: IndexInfo ← NARROW[val];
IF index.btree # NIL AND BTree.GetState[index.btree].state # closed THEN {
BTreeSimple.SetState[index.btree, closed];
FS.Close[index.openfile];
};
RETURN[quit: FALSE];
};
[] ← RefTab.Pairs[indices, CloseIndex];
};
Secondary indices: need not be unique so special btree keys must be generated. Unique keys are obtained by appending a logID/logAddress pair to an AttributeValue.
IsUniqueKey: PROC [key: ROPE] RETURNS [BOOLEAN] ~ INLINE {
RETURN[Rope.Find[key, "\000"] # -1];
};
ValueToUniqueKey: PROC [value: AttributeValue, data: IndexEntry] RETURNS [ROPE] ~ INLINE {
RETURN[Rope.Cat[value, "\000", Convert.RopeFromInt[data.log], " ", Convert.RopeFromInt[data.firstByte]]];
};
ValueToSmallKey: PROC [value: AttributeValue] RETURNS [ROPE] ~ INLINE {
RETURN[Rope.Concat[value, "\000\000"]];
};
ValueToLargeKey: PROC [value: AttributeValue] RETURNS [ROPE] ~ INLINE {
RETURN[Rope.Concat[value, "\000\377"]];
};
UniqueKeyToValue: PROC [key: ROPE] RETURNS [AttributeValue] ~ INLINE {
RETURN[Rope.Substr[key, 0, Rope.Index[key, 0, "\000"]]];
};
Crash recovery
Recover: PROC [dbinfo: OpenDBInfo] RETURNS [needToRebuild: BOOLEAN] ~ {
needToRebuild ← FALSE;
FOR i: LogID IN [0..dbinfo.logs.size) DO
IF (dbinfo.logs[i] # NIL) AND (dbinfo.logs[i].access # readOnly) THEN {
IF NOT InGoodState[dbinfo.logs[i].stream] THEN
IF RestoreLog[dbinfo.logs[i].stream] THEN
needToRebuild ← TRUE;
};
ENDLOOP;
RETURN[needToRebuild];
};
InGoodState: PROC [logStream: STREAM] RETURNS [BOOLEAN] ~ {
Checks that the log is in a consistent state. The last byte of the log should be the UpdateComplete marker.
PROBLEM: the byte count returned by GetLength may be wrong after a crash?
length: INT;
thirdToLastByte, secondToLastByte, lastByte: CHAR;
length ← IO.GetLength[logStream];
IF length < 3 THEN { -- log just created (or too short to be valid)
IO.SetLength[logStream, 0];
MarkUpdateComplete[logStream];
RETURN[TRUE];
};
IO.SetIndex[logStream, length-3];
thirdToLastByte ← IO.GetChar[logStream];
secondToLastByte ← IO.GetChar[logStream];
lastByte ← IO.GetChar[logStream];
RETURN[(thirdToLastByte = '\n) AND (secondToLastByte = EndOfEntry) AND (lastByte = UpdateComplete)];
};
RestoreLog: PROC [logStream: STREAM] RETURNS [needToRebuild: BOOLEAN] ~ {
Restores the log to a consistent state. There are two cases to consider: 1) the system was interupted while writing an entry on the log, 2) a complete entry exists on the log but the btrees have only been partially updated.
logAddress: INT;
prevByte, byte: CHAR;
lastEntry: Entry;
logAddress ← IO.GetLength[logStream]-2;
IO.SetIndex[logStream, logAddress];
prevByte ← IO.GetChar[logStream];
byte ← IO.GetChar[logStream];
IF (byte = EndOfEntry) AND (prevByte = '\n) THEN {
entry complete so roll forward, but don't know state of indices so must rebuild them
needToRebuild ← TRUE;
}
ELSE {
entry only partially written so roll backward: scan log from end looking for EndOfEntry, then reset log length
UNTIL ((byte = EndOfEntry) AND (prevByte = '\n)) OR (logAddress = 0) DO
byte ← prevByte;
logAddress ← logAddress - 1;
IO.SetIndex[logStream, logAddress];
prevByte ← IO.GetChar[logStream];
ENDLOOP;
IO.SetLength[logStream, IF logAddress = 0 THEN 0 ELSE logAddress + 2];
needToRebuild ← FALSE;
};
A replaced entry should never be the last entry in a log. If this is the case, then we must have crashed before writing the replacement entry. To recover, simply remove the replaced entry.
IF logAddress # 0 THEN {
Back up one more character, then continue backwards search to find last entry
byte ← prevByte;
logAddress ← logAddress - 1;
IO.SetIndex[logStream, logAddress];
prevByte ← IO.GetChar[logStream];
UNTIL ((byte = EndOfEntry) AND (prevByte = '\n)) OR (logAddress = 0) DO
byte ← prevByte;
logAddress ← logAddress - 1;
IO.SetIndex[logStream, logAddress];
prevByte ← IO.GetChar[logStream];
ENDLOOP;
IF logAddress # 0 THEN
logAddress ← logAddress + 2;
lastEntry ← ReadLogEntry[logStream, logAddress];
IF lastEntry.first.type = $REPLACED THEN {
IO.SetLength[logStream, logAddress];
needToRebuild ← FALSE;
};
};
Log is now in good shape.
MarkUpdateComplete[logStream];
RETURN[needToRebuild];
};
IndicesOutOfDate: PROC [dbinfo: OpenDBInfo] RETURNS [BOOLEAN] ~ {
Determines if logs have been edited manually, i.e. if the indices are currently out-of-date, by checking the create date of the primary index against those of the logs.
This doesn't actually work since simply opening an activity log changes its date.
fileDate: BasicTime.GMT;
indexDate: BasicTime.GMT ← FS.FileInfo[name: dbinfo.primaryIndex.filename].created;
FOR i: LogID IN [0..dbinfo.logs.size) DO
IF dbinfo.logs[i] # NIL THEN {
fileDate ← FS.FileInfo[name: dbinfo.logs[i].filename].created;
IF BasicTime.Period[from: indexDate, to: fileDate] > 0 THEN RETURN[TRUE];
};
ENDLOOP;
RETURN[FALSE];
};
Shared databases
A given database can not actually be opened more than once because of the concurrency control provided by FS. Thus, we keep a table of databases that are already open.
OpenDBTable: SymTab.Ref;
OpenDBTableSize: SymTab.SeqIndex = 2039;
2039 is a good candidate because it is prime and is large enough that collisions should be rare.
GetSharedDB: PROC [dbName: ROPE] RETURNS [dbinfo: OpenDBInfo] ~ {
Remember: this routine is not called under a monitor lock. If several clients try to create an OpenDBRecord concurrently, the first one to register with the OpenDBTable wins.
fname: ROPEFS.FileInfo[dbName ! FS.Error => ERROR Error[$CantOpenSchema, error.explanation]].fullFName;
dbinfo ← NARROW[SymTab.Fetch[OpenDBTable, fname].val];
IF dbinfo = NIL THEN {
dbinfo ← NEW[OpenDBRecord];
dbinfo.dbName ← fname;
dbinfo.isOpen ← FALSE;
dbinfo.remoteAccess ← TRUE;
dbinfo.statusMsg ← "Open for service.";
dbinfo.indices ← RefTab.Create[];
dbinfo.primaryIndex ← NIL;
dbinfo.logs ← NEW[LogSetRecord[LAST[LogID]]];
IF NOT SymTab.Insert[OpenDBTable, fname, dbinfo] THEN -- lost race
dbinfo ← NARROW[SymTab.Fetch[OpenDBTable, fname].val];
};
};
Want the table of open databases to be created only once, even if this package is re-run.
OpenDBTable ← NARROW[Atom.GetProp[$LoganBerry, $OpenDBTable]];
IF OpenDBTable = NIL THEN {
OpenDBTable ← SymTab.Create[mod: OpenDBTableSize, case: FALSE];
Atom.PutProp[$LoganBerry, $OpenDBTable, OpenDBTable];
};
END.
Doug Terry August 12, 1985 1:05:48 pm PDT
created
Doug Terry, January 23, 1986 8:28:43 pm PST
changes to: DIRECTORY, Open, Close, GetInfo, dbinfo (local of GetInfo), OpenI, MonitoredOpen, EnumerateEntriesI, CloseI, CompactLogsI, SetRemoteAccessI, ReadLogEntry, WriteLogEntry, CloseLogs, GetIndexEntry, CreateIndexEntry, DeleteIndexEntry, CloseIndex (local of CloseIndices), OpenDBTable, OpenDBTableSize, GetSharedDB, OpenDBTable, IF, IMPORTS
Doug Terry, January 24, 1986 9:39:30 am PST
changes to: MonitoredOpen, BuildIndicesI, BuildIndicesWorker
Swinehart, January 27, 1986 5:10:45 pm PST
Fullname expansion does not produce a canonical name (case may differ); GetSharedDB now uses a case-insensitive symbol table.
changes to: GetSharedDB
Doug Terry, January 31, 1986 4:25:40 pm PST
changes to: DIRECTORY, OpenIndex (local of OpenIndices), CloseIndex (local of CloseIndices), EraseIndex (local of EraseIndices), CompactLogsI
Doug Terry, March 5, 1986 5:21:27 pm PST
Implemented Describe so that clients can obtain schema information even if they don't have access to the schema file.
changes to: Describe, DescribeI, AddIndexInfo (local of DescribeI)