LoganBerryImpl.mesa
Copyright © 1985 by Xerox Corporation. All rights reserved.
Doug Terry, April 14, 1986 5:33:43 pm PST
Swinehart, January 27, 1986 5:16:09 pm PST
LoganBerry is a simple facility for managing databases. Data is stored in one or more log files and indexed using stable btrees. This "poor man's" database facility is intended to allow various types of data to be stored persistently. A database survives processor crashes, but the data management facility does not provide atomic transactions. Only very simple queries are supported: retrieval by key or key subrange. Databases may be shared by multiple clients and accessed remotely via RPC.
DIRECTORY
Atom USING [GetPName, MakeAtom, GetProp, PutProp],
BasicTime USING [GMT, Period],
BTree USING [Error, GetState],
BTreeSimple USING [DeleteKey, EnumerateRecords, --GetState,-- InternalKey, New, NewPathStk, Open, PathStk, ReadRecord, --ReadValue,-- Relation, SetState, Tree, UpdateRecord, Value, ValueObject],
Convert USING [IntFromRope, RopeFromInt],
DFUtilities USING [--DirectoryItem,-- FileItem, CommentItem, ParseFromStream, ProcessItemProc, RemoveVersionNumber, SyntaxError],
FS USING [Close, Error, ExpandName, FileInfo, OpenFile],
GeneralFS USING [Open, Create, StreamOpen],
IO USING [BreakProc, Close, EndOf, EndOfStream, Error, Flush, GetChar, --GetID,-- GetIndex, GetInt, GetLength, GetLineRope, GetRopeLiteral, GetToken, GetTokenRope, IDProc, PeekChar, PutChar, PutF, RIS, rope, SetIndex, SetLength, SkipWhitespace, STREAM],
RefID USING [Seal, Unseal, Release],
RefTab USING [Create, EachPairAction, Fetch, Insert, Pairs, Ref],
RefText USING [Equal, line, ObtainScratch, ReleaseScratch],
Rope USING [Cat, Compare, Concat, Equal, Find, Index, ROPE, Substr],
SymTab USING [Create, Fetch, Insert, Ref, SeqIndex],
LB,
LoganBerry,
LoganBerryStructure;
LoganBerryImpl: CEDAR MONITOR LOCKS dbinfo USING dbinfo: OpenDBInfo
IMPORTS Atom, BasicTime, BTree, BTreeSimple, Convert, DFUtilities, FS, GeneralFS, IO, RefText, Rope, RefTab, RefID, SymTab
EXPORTS LB, LoganBerry
~
BEGIN
OPEN LoganBerry, LoganBerryStructure;
ROPE: TYPE = Rope.ROPE;
STREAM: TYPE = IO.STREAM;
Exported RPC operations
These routines export to the LoganBerry interface. All of the operations (except EnumerateEntries) can be invoked via RPC. They take a RPC.Conversation as the first argument so that secure remote procedure calls can be made to a LoganBerry database server.
Error: PUBLIC ERROR [ec: ErrorCode, explanation: ROPE ← NIL] = CODE;
Open:
PUBLIC
PROC [conv: Conv ←
NIL, dbName:
ROPE]
RETURNS [db: OpenDB] ~ {
Initiates database activity and checks for consistency. This can be called any number of times to get a new OpenDB handle or reopen a database that has been closed. Indices are automatically rebuilt if any are missing or if a partially-completed update left them out-of-date.
dbinfo: OpenDBInfo ← OpenI[dbName];
IF NOT dbinfo.remoteAccess THEN ERROR Error[$DBNotAvailable, dbinfo.statusMsg];
db ← RefID.Seal[dbinfo];
};
ReadEntry:
PUBLIC
PROC [conv: Conv ←
NIL, db: OpenDB, key: AttributeType, value: AttributeValue]
RETURNS [entry: Entry, others:
BOOLEAN] ~ {
Returns an entry that contains the given attribute value for the given key. If the key refers to the primary index then the unique entry is returned and `others' is FALSE. If the key is secondary and several entries exist with the same value for the key, then an arbitrary entry is returned and `others' is set to TRUE; use EnumerateEntries to get all of the matching entries.
[entry, others] ← ReadEntryI[GetInfo[db], key, value];
};
EnumerateEntries:
PUBLIC PROC [db: OpenDB, key: AttributeType, start: AttributeValue ←
NIL, end: AttributeValue ←
NIL, proc: EntryProc]
RETURNS [] ~ {
Calls `proc' for each entry in the specified range of key values. The enumeration is halted when either the range of entries is exhausted or `proc' returns FALSE. A NIL value for `start' represents the least attribute value, while a NIL value for `end' represents the largest attribute value. Thus, the complete database can be enumerated by specifying start=NIL and end=NIL with `key' equal to the primary key.
EnumerateEntriesI[GetInfo[db], key, start, end, proc];
};
GenerateEntries:
PUBLIC
PROC [conv: Conv ←
NIL, db: OpenDB, key: AttributeType, start: AttributeValue ←
NIL, end: AttributeValue ←
NIL]
RETURNS [cursor: Cursor] ~ {
Similar to EnumerateEntries, but creates a cursor so that entries in the specified range of key values can be retrieved using NextEntry (defined below). Initially, the cursor points to the start of the sequence. A NIL value for `start' represents the least attribute value, while a NIL value for `end' represents the largest attribute value. Thus, the complete database can be enumerated by specifying start=NIL and end=NIL with `key' equal to the primary key.
newCursor: CursorInfo ← GenerateEntriesI[GetInfo[db], key, start, end];
cursor ← RefID.Seal[newCursor];
};
NextEntry:
PUBLIC
PROC [conv: Conv ←
NIL, cursor: Cursor, dir: CursorDirection ← increasing]
RETURNS [entry: Entry] ~ {
Retrieves the next entry relative to the given cursor. The cursor, and thus the sequence of desired entries, must have been previously created by a call to GenerateEntries. The cursor is automatically updated so that NextEntry may be repeatedly called to enumerate entries. NIL is returned if the cursor is at the end of the sequence and dir=increasing or at the start of the sequence and dir=decreasing.
c: CursorInfo;
WITH RefID.Unseal[cursor]
SELECT
FROM
ci: CursorInfo => c ← ci;
ENDCASE => c ← NIL;
IF c = NIL THEN ERROR Error[$BadCursor, "Invalid cursor passed to NextEntry."];
entry ← NextEntryI[c.dbinfo, c, dir];
};
EndGenerate:
PUBLIC
PROC [conv: Conv ←
NIL, cursor: Cursor]
RETURNS [] ~ {
Releases the cursor; no further operations may be performed using the given cursor. This must be called once for every call to GenerateEntries.
[] ← RefID.Release[cursor];
};
WriteEntry:
PUBLIC
PROC [conv: Conv ←
NIL, db: OpenDB, entry: Entry, log: LogID ← activityLog, replace:
BOOLEAN ←
FALSE]
RETURNS [] ~ {
Adds a new entry to the database. The entry is added to the activity log unless another log is explicitly requested. The entry must have an attribute for the primary key. The primary attribute value must be unique throughout the database unless replace=TRUE; in this case, an existing entry with the same primary attribute value is atomically replaced with the new entry.
WriteEntryI[GetInfo[db], entry, log, replace];
};
DeleteEntry:
PUBLIC
PROC [conv: Conv ←
NIL, db: OpenDB, key: AttributeType, value: AttributeValue]
RETURNS [] ~ {
Deletes the entry that contains the given attribute value for the given key. If the key is for a secondary index and the value is not unique then an Error[$ValueNotUnique] is raised. Deletes are actually performed by logging the delete request (so that the log can be replayed at a later time) and then updating all of the indices.
DeleteEntryI[GetInfo[db], key, value];
};
Close:
PUBLIC
PROC [conv: Conv ←
NIL, db: OpenDB]
RETURNS [] ~ {
Terminates database activity and closes all log and index files. Databases are always maintained consistently on disk so it doesn't hurt to leave a database open. The main reason for calling Close is to release the FS locks on the log files so they can be manually edited.
CloseI[GetInfo[db ! Error => IF ec=$DBClosed THEN CONTINUE]];
};
BuildIndices:
PUBLIC
PROC [conv: Conv ←
NIL, db: OpenDB]
RETURNS [] ~ {
Rebuilds the indices by scanning the logs and performing WriteEntry or DeleteEntry operations.
BuildIndicesI[GetInfo[db]];
};
CompactLogs:
PUBLIC
PROC [conv: Conv ←
NIL, db: OpenDB]
RETURNS [] ~ {
Removes deleted entries from the logs by enumerating the primary index and writing new logs.
CompactLogsI[GetInfo[db]];
};
Describe:
PUBLIC
PROC [conv: Conv ←
NIL, db: OpenDB]
RETURNS [info: SchemaInfo] ~ {
Returns schema information about the database.
info ← DescribeI[GetInfo[db]];
};
GetInfo:
PROC [db: OpenDB]
RETURNS [OpenDBInfo] ~
INLINE {
Unseals the database handle, ensures that it's valid, and checks if remote access is allowed.
ref: REF = RefID.Unseal[db];
IF ref =
NIL
THEN
ERROR Error[$BadDBHandle, "NIL OpenDB handle."];
WITH ref
SELECT
FROM
dbinfo: OpenDBInfo =>
{
IF NOT dbinfo.remoteAccess THEN ERROR Error[$DBNotAvailable, dbinfo.statusMsg];
IF NOT dbinfo.isOpen THEN ERROR Error[$DBClosed, dbinfo.statusMsg];
RETURN[dbinfo];
};
ENDCASE => ERROR Error[$BadDBHandle, "Invalid OpenDB handle."];
};
Internal monitored operations
These routines export to the internal LB interface. The operations are monitored so that several processes can share a database. All of the operations (except OpenI) take a monitored OpenDBInfo handle as their first parameter.
OpenI:
PUBLIC
PROC [dbName:
ROPE]
RETURNS [dbinfo: OpenDBInfo] ~ {
Initiates database activity and checks for consistency. This can be called any number of times to get a new OpenDB handle or reopen a database that has been closed. Indices are automatically rebuilt if any are missing or if a partially-completed update left them out-of-date.
WARNING: SINCE THIS ROUTINE IS NOT MONITORED, RACE CONDITIONS MUST BE CAREFULLY AVOIDED.
dbinfo ← GetSharedDB[dbName];
IF
NOT dbinfo.isOpen
THEN
MonitoredOpen[dbinfo];
RETURN[dbinfo];
};
MonitoredOpen:
ENTRY PROC [dbinfo: OpenDBInfo]
RETURNS [] ~ {
ENABLE UNWIND => NULL;
Want at most one client to attempt to open a database.
needToRebuild: BOOLEAN ← FALSE;
IF dbinfo.isOpen THEN RETURN; -- lost race
IF dbinfo.primaryIndex =
NIL
THEN
ReadSchema[dbinfo];
OpenLogs[dbinfo.logs];
IF Recover[dbinfo]
THEN
needToRebuild ← TRUE;
IF OpenIndices[dbinfo.indices]
THEN
needToRebuild ← TRUE;
IF needToRebuild
THEN
BuildIndicesWorker[dbinfo];
dbinfo.isOpen ← TRUE;
};
ReadEntryI:
PUBLIC
ENTRY
PROC [dbinfo: OpenDBInfo, key: AttributeType, value: AttributeValue]
RETURNS [entry: Entry, others:
BOOLEAN] ~ {
ENABLE UNWIND => NULL;
Returns an entry that contains the given attribute value for the given key. If the key refers to the primary index then the unique entry is returned and `others' is FALSE. If the key is secondary and several entries exist with the same value for the key, then an arbitrary entry is returned and `others' is set to TRUE; use EnumerateEntries to get all of the matching entries.
indexEntry: IndexEntry;
index: IndexInfo;
aValue: AttributeValue;
pathSkt: BTreeSimple.PathStk ← BTreeSimple.NewPathStk[];
Find appropriate index
index ← NARROW[RefTab.Fetch[dbinfo.indices, key].val];
IF index =
NIL
THEN
ERROR Error[$NoIndex, Rope.Cat["Index for ",Atom.GetPName[key]," does not exist."]];
[aValue, indexEntry] ← GetIndexEntry[index: index, value: value, pathStk: pathSkt];
IF aValue = NIL THEN RETURN[entry: NIL, others: FALSE];
entry ← ReadLogEntry[dbinfo.logs[indexEntry.log].stream, indexEntry.firstByte];
IF index.type = primary
THEN
others ← FALSE
ELSE {
aValue ← GetIndexEntry[index: index, value: aValue, valueIsKey: TRUE, relation: greater, pathStk: pathSkt].actual;
others ← Rope.Equal[s1: value, s2: UniqueKeyToValue[aValue], case: FALSE];
};
RETURN[entry, others];
};
SimpleEntryProc: EntryProc = {
-- for debugging purposes
[entry: LoganBerry.Entry] RETURNS [continue: BOOL]
cont: BOOLEAN ← TRUE;
set breakpoint here to look at entry
RETURN[cont];
};
EnumerateEntriesI:
PUBLIC
ENTRY PROC [dbinfo: OpenDBInfo, key: AttributeType, start: AttributeValue ←
NIL, end: AttributeValue ←
NIL, proc: EntryProc]
RETURNS [] ~ {
ENABLE UNWIND => NULL;
Calls `proc' for each entry in the specified range of key values. The enumeration is halted when either the range of entries is exhausted or `proc' returns FALSE. A NIL value for `start' represents the least attribute value, while a NIL value for `end' represents the largest attribute value. Thus, the complete database can be enumerated by specifying start=NIL and end=NIL with `key' equal to the primary key.
EntriesInSubrange:
PROC [key: BTreeSimple.InternalKey, value: BTreeSimple.Value]
RETURNS [continue:
BOOLEAN] = {
indexEntry: IndexEntry;
entry: Entry;
IF end # NIL AND Rope.Compare[s1: key, s2: end, case: FALSE] = greater THEN RETURN[continue: FALSE];
TRUSTED {
indexEntry ← LOOPHOLE[@value[0], IndexPtr]^;
};
entry ← ReadLogEntry[dbinfo.logs[indexEntry.log].stream, indexEntry.firstByte];
RETURN[continue: proc[entry]];
};
index: IndexInfo;
index ← NARROW[RefTab.Fetch[dbinfo.indices, key].val];
IF index =
NIL
THEN
ERROR Error[$NoIndex, Rope.Cat["Index for ",Atom.GetPName[key]," does not exist."]];
IF index.type = secondary
THEN {
IF start #
NIL
THEN
start ← ValueToSmallKey[start];
IF end #
NIL
THEN
end ← ValueToLargeKey[end];
};
[] ← BTreeSimple.EnumerateRecords[tree: index.btree, key: start, Proc: EntriesInSubrange !
BTree.Error => ERROR Error[IF reason = closed THEN $DBClosed ELSE $BadIndex, Rope.Cat["Problem enumerating index for ",Atom.GetPName[key],"."]]];
};
GenerateEntriesI:
PUBLIC
ENTRY PROC [dbinfo: OpenDBInfo, key: AttributeType, start: AttributeValue ←
NIL, end: AttributeValue ←
NIL]
RETURNS [cinfo: CursorInfo] ~ {
ENABLE UNWIND => NULL;
Similar to EnumerateEntries, but creates a cursor so that entries in the specified range of key values can be retrieved using NextEntry (defined below). Initially, the cursor points to the start of the sequence. A NIL value for `start' represents the least attribute value, while a NIL value for `end' represents the largest attribute value. Thus, the complete database can be enumerated by specifying start=NIL and end=NIL with `key' equal to the primary key.
newCursor: CursorInfo;
index: IndexInfo;
index ← NARROW[RefTab.Fetch[dbinfo.indices, key].val];
IF index =
NIL
THEN
ERROR Error[$NoIndex, Rope.Cat["Index for ",Atom.GetPName[key]," does not exist."]];
newCursor ← NEW[CursorRecord];
newCursor.index ← index;
newCursor.dbinfo ← dbinfo;
newCursor.key ← key;
newCursor.start ← start;
newCursor.end ← end;
IF newCursor.index.type = secondary
THEN {
IF start #
NIL
THEN
newCursor.start ← ValueToSmallKey[start];
IF end #
NIL
THEN
newCursor.end ← ValueToLargeKey[end];
};
newCursor.pathStk ← BTreeSimple.NewPathStk[];
back up one so we're in a position to start (assuming dir=increasing)
newCursor.current ← IF start # NIL THEN GetIndexEntry[index: newCursor.index, value: newCursor.start, valueIsKey: TRUE, relation: less, pathStk: newCursor.pathStk].actual ELSE NIL;
RETURN[newCursor];
};
NextEntryI:
PUBLIC
ENTRY PROC [dbinfo: OpenDBInfo, cinfo: CursorInfo, dir: CursorDirection ← increasing]
RETURNS [entry: Entry] ~ {
ENABLE UNWIND => NULL;
Retrieves the next entry relative to the given cursor. The cursor, and thus the sequence of desired entries, must have been previously created by a call to GenerateEntries. The cursor is automatically updated so that NextEntry may be repeatedly called to enumerate entries. NIL is returned if the cursor is at the end of the sequence and dir=increasing or at the start of the sequence and dir=decreasing.
Note: The end conditions may not be what is desired, since changing directions at either end causes the first element at that end to be skipped.
actualValue: AttributeValue;
indexEntry: IndexEntry;
[actualValue, indexEntry] ← GetIndexEntry[index: cinfo.index, value: cinfo.current, valueIsKey: TRUE, relation: IF dir = increasing THEN greater ELSE less, pathStk: cinfo.pathStk];
IF (actualValue =
NIL)
OR (dir = increasing
AND cinfo.end #
NIL
AND Rope.Compare[actualValue, cinfo.end,
FALSE] = greater)
OR (dir = decreasing
AND Rope.Compare[actualValue, cinfo.start,
FALSE] = less)
THEN
RETURN[NIL];
entry ← ReadLogEntry[dbinfo.logs[indexEntry.log].stream, indexEntry.firstByte];
cinfo.current ← actualValue;
RETURN[entry];
};
EndGenerateI:
PUBLIC
ENTRY PROC [dbinfo: OpenDBInfo, cinfo: CursorInfo]
RETURNS [] ~ {
ENABLE UNWIND => NULL;
Releases the cursor; no further operations may be performed using the given cursor. This must be called once for every call to GenerateEntries.
NULL; -- this routine is unnecessary since cursors get garbage collected
};
WriteEntryI:
PUBLIC
ENTRY PROC [dbinfo: OpenDBInfo, entry: Entry, log: LogID ← activityLog, replace:
BOOLEAN ←
FALSE]
RETURNS [] ~ {
ENABLE UNWIND => NULL;
Adds a new entry to the database. The entry is added to the activity log unless another log is explicitly requested. The entry must have an attribute for the primary key and the primary attribute value must be unique throughout the database.
value: AttributeValue;
indexData: IndexEntry ← [log: log, firstByte: 0];
doReplace: BOOLEAN ← FALSE;
replacedIndexEntry: IndexEntry;
replacedEntry: Entry;
value ← GetAttributeValue[entry, dbinfo.primaryIndex.key];
IF value =
NIL
THEN
ERROR Error[$NoPrimaryKey, "Entry does not contain a primary key."];
IF dbinfo.logs[log].access = readOnly
THEN
ERROR Error[$LogReadOnly, "Can't write to a read only log."];
[value, replacedIndexEntry] ← GetIndexEntry[index: dbinfo.primaryIndex, value: value];
IF value #
NIL
THEN
IF replace
THEN
doReplace ← TRUE
ELSE
ERROR Error[$ValueNotUnique, "An existing entry already contains the primary key."];
For replacements, the entry being replaced and its replacement must reside in the same log. A replace operation is logged in a similar manner to delete operations. The entry written after the replace log entry is the replacement. Both log entries are written before any of the indices are updated. In the event of a crash, both entries must be recovered atomically.
IF doReplace
THEN {
IF replacedIndexEntry.log # log
THEN
ERROR Error[$InvalidReplace, "Cross-log replacements are not allowed."];
[] ← WriteLogEntry[dbinfo.logs[log].stream, LIST[[$REPLACED, Convert.RopeFromInt[replacedIndexEntry.firstByte]]]];
};
indexData.firstByte ← WriteLogEntry[dbinfo.logs[log].stream, entry];
IF doReplace
THEN {
replacedEntry ← ReadLogEntry[dbinfo.logs[log].stream, replacedIndexEntry.firstByte];
RemoveEntryFromIndices[dbinfo.indices, replacedEntry, replacedIndexEntry];
};
AddEntryToIndices[dbinfo.indices, entry, indexData];
MarkUpdateComplete[dbinfo.logs[log].stream];
};
DeleteEntryI:
PUBLIC
ENTRY
PROC [dbinfo: OpenDBInfo, key: AttributeType, value: AttributeValue]
RETURNS [] ~ {
ENABLE UNWIND => NULL;
Deletes the entry that contains the given attribute value for the given key. If the key is for a secondary index and the value is not unique then an Error[$ValueNotUnique] is raised. Deletes are actually performed by logging the delete request (so that the log can be replayed at a later time) and then updating all of the indices.
indexEntry: IndexEntry;
index: IndexInfo;
pathSkt: BTreeSimple.PathStk ← BTreeSimple.NewPathStk[];
avalue: AttributeValue;
delete: Entry;
entry: Entry;
Find appropriate index
index ← NARROW[RefTab.Fetch[dbinfo.indices, key].val];
IF index =
NIL
THEN
ERROR Error[$NoIndex, Rope.Cat["Index for ",Atom.GetPName[key]," does not exist."]];
Locate log entry
[avalue, indexEntry] ← GetIndexEntry[index: index, value: value, pathStk: pathSkt];
IF avalue = NIL THEN RETURN;
avalue ← GetIndexEntry[index: index, value: avalue, valueIsKey: TRUE, relation: greater, pathStk: pathSkt].actual;
IF Rope.Equal[s1: value, s2: UniqueKeyToValue[avalue], case:
FALSE]
THEN
ERROR Error[$ValueNotUnique, "Entry to be deleted insufficiently specified."];
IF dbinfo.logs[indexEntry.log].access = readOnly
THEN
ERROR Error[$LogReadOnly, "Can't delete entries in a read only log."];
Log delete operation
delete ← LIST[[$DELETED, Convert.RopeFromInt[indexEntry.firstByte]]];
[] ← WriteLogEntry[dbinfo.logs[indexEntry.log].stream, delete];
Update indices
entry ← ReadLogEntry[dbinfo.logs[indexEntry.log].stream, indexEntry.firstByte];
RemoveEntryFromIndices[dbinfo.indices, entry, indexEntry];
MarkUpdateComplete[dbinfo.logs[indexEntry.log].stream];
};
CloseI:
PUBLIC
ENTRY PROC [dbinfo: OpenDBInfo]
RETURNS [] ~ {
ENABLE UNWIND => NULL;
Terminates database activity and closes all log and index files. Databases are always maintained consistently on disk so it doesn't hurt to leave a database open. The main reason for calling Close is to release the FS locks on the log files so they can be manually edited.
CloseLogs[dbinfo.logs];
CloseIndices[dbinfo.indices];
dbinfo.isOpen ← FALSE;
};
Maintenance operations
BuildIndicesI:
PUBLIC
ENTRY
PROC [dbinfo: OpenDBInfo]
RETURNS [] ~ {
ENABLE UNWIND => NULL;
Rebuilds the indices by scanning the logs and performing WriteEntry or DeleteEntry operations.
This routine grabs the monitor lock then calls on the internal procedure to do the real work.
BuildIndicesWorker[dbinfo];
};
BuildIndicesWorker:
PROC [dbinfo: OpenDBInfo]
RETURNS [] ~ {
indexData: IndexEntry;
logstream: STREAM;
entry: Entry;
indexEntry: IndexEntry;
saveStreamPosition: INT;
EraseIndices[dbinfo.indices];
FOR log: LogID
IN [0..dbinfo.logs.size)
DO
IF dbinfo.logs[log] #
NIL
THEN {
indexData ← [log, 0];
logstream ← dbinfo.logs[log].stream;
FOR entry ← ReadLogEntry[logstream, 0], ReadLogEntry[logstream, -1]
UNTIL entry =
NIL
DO
IF (entry.first.type # $DELETED)
AND (entry.first.type # $REPLACED)
THEN {
-- WriteEntry
AddEntryToIndices[dbinfo.indices, entry, indexData];
}
ELSE {
-- DeleteEntry
saveStreamPosition ← IO.GetIndex[logstream]; -- before the following read changes the index
indexEntry ← [log: log, firstByte: Convert.IntFromRope[entry.first.value]];
entry ← ReadLogEntry[dbinfo.logs[indexEntry.log].stream, indexEntry.firstByte];
RemoveEntryFromIndices[dbinfo.indices, entry, indexEntry];
IO.SetIndex[logstream, saveStreamPosition];
};
IF
IO.EndOf[logstream]
OR
IO.GetChar[logstream] # EndOfEntry
THEN
ERROR Error[$BadLogEntry, "Missing terminator on log entry."];
indexData.firstByte ← IO.GetIndex[logstream];
ENDLOOP;
};
ENDLOOP;
};
CompactLogsI:
PUBLIC
ENTRY PROC [dbinfo: OpenDBInfo]
RETURNS [] ~ {
ENABLE { UNWIND => NULL; FS.Error => ERROR Error[$InternalError, error.explanation]; };
Removes deleted entries from the logs by enumerating the primary index and writing new logs.
KeepEntry:
PROC [key: BTreeSimple.InternalKey, value: BTreeSimple.Value]
RETURNS [continue:
BOOLEAN] = {
indexEntry: IndexEntry;
entry: Entry;
TRUSTED {
indexEntry ← LOOPHOLE[@value[0], IndexPtr]^;
};
IF dbinfo.logs[indexEntry.log].access # readOnly
THEN {
entry ← ReadLogEntry[dbinfo.logs[indexEntry.log].stream, indexEntry.firstByte];
[] ← WriteLogEntry[newlogs[indexEntry.log].stream, entry];
MarkUpdateComplete[newlogs[indexEntry.log].stream];
};
RETURN[continue: TRUE];
};
newlogs: LogSet ← NEW[LogSetRecord[dbinfo.logs.size]];
FOR i: LogID
IN [0..dbinfo.logs.size)
DO
IF dbinfo.logs[i] #
NIL
AND dbinfo.logs[i].access # readOnly
THEN {
newlogs[i] ← NEW[LogInfoRecord];
newlogs[i].stream ← GeneralFS.StreamOpen[dbinfo.logs[i].filename, $create !
FS.Error => ERROR Error[$CantOpenLog, error.explanation]];
};
ENDLOOP;
[] ← BTreeSimple.EnumerateRecords[tree: dbinfo.primaryIndex.btree, key:
NIL, Proc: KeepEntry !
BTree.Error => ERROR Error[IF reason = closed THEN $DBClosed ELSE $BadIndex, "Problem enumerating primary index."]];
FOR i: LogID
IN [0..dbinfo.logs.size)
DO
IF newlogs[i] #
NIL
THEN {
IO.Close[newlogs[i].stream];
IO.Close[dbinfo.logs[i].stream];
dbinfo.logs[i].stream
← GeneralFS.StreamOpen[dbinfo.logs[i].filename, $append !
FS.Error => ERROR Error[$CantOpenLog, error.explanation]];
};
ENDLOOP;
BuildIndicesWorker[dbinfo];
};
SetRemoteAccessI:
PUBLIC
ENTRY
PROC [dbinfo: OpenDBInfo, accessible:
BOOLEAN ←
TRUE, why:
ROPE ←
NIL]
RETURNS [] ~ {
ENABLE UNWIND => NULL;
Controls access by remote clients to the database. If accessible=FALSE then the database can not be accessed through the normal LoganBerry interface, only via this interface; setting accessible=TRUE makes the database commonly available (the normal case). This operation is intended for use by administrators that wish to temporarily remove a database from service.
dbinfo.remoteAccess ← accessible;
dbinfo.statusMsg ← why;
};
Database schema
A schema is created by clients of this package and stored in a DF file that can be used to backup the database. Lines of the file starting with "-->" are deemed to contain schema information for the file named in the subsequent line of the DF file or at the end of the same line. The two types of schema entries are as follows:
--> log <logID> <readOnly or readWrite> <optional CR> <filename>
--> index <key> <primary or secondary> <optional CR> <filename>
SchemaChars: PUBLIC REF READONLY TEXT = "-->";
ParseSchemaLine:
PROC [s:
STREAM, buffer:
REF
TEXT, wdir:
ROPE]
RETURNS [item:
REF
ANY, needFilename:
BOOLEAN ← FALSE] ~ {
Takes an input stream and parses one schema entry from it. The item returned is of type LogInfo or IndexInfo; needFilename=TRUE indicates that no file name existed in the input stream (it must be on the next line of the schema file).
ENABLE {
IO.Error => ERROR Error[$BadSchema, "Error parsing schema statement."];
IO.EndOfStream => ERROR Error[$BadSchema, "Unexpected end of stream."] };
token: REF TEXT;
log: LogInfo;
index: IndexInfo;
needFilename ← FALSE;
token ← IO.GetToken[s, IO.IDProc, buffer].token;
IF NOT RefText.Equal[token, SchemaChars] THEN RETURN[NIL, FALSE];
token ← IO.GetToken[s, IO.IDProc, buffer].token;
SELECT
TRUE
FROM
RefText.Equal[token, "log",
FALSE] => {
log ← NEW[LogInfoRecord];
log.id ← IO.GetInt[s];
token ← IO.GetToken[s, IO.IDProc, buffer].token;
log.access ← IF RefText.Equal[token, "ReadOnly", FALSE] THEN readOnly ELSE readWrite;
log.filename ← IO.GetTokenRope[s, IO.IDProc ! IO.EndOfStream => {needFilename ← TRUE; CONTINUE}].token;
IF
NOT needFilename
THEN
log.filename ← FS.ExpandName[log.filename, wdir].fullFName;
item ← log;
};
RefText.Equal[token, "index",
FALSE] => {
index ← NEW[IndexInfoRecord];
index.key ← Atom.MakeAtom[IO.GetRopeLiteral[s]];
token ← IO.GetToken[s, IO.IDProc, buffer].token;
index.type ← IF RefText.Equal[token, "primary", FALSE] THEN primary ELSE secondary;
index.filename ← IO.GetTokenRope[s, IO.IDProc ! IO.EndOfStream => {needFilename ← TRUE; CONTINUE}].token;
IF
NOT needFilename
THEN
index.filename ← FS.ExpandName[index.filename, wdir].fullFName;
item ← index;
};
ENDCASE => ERROR Error[$BadSchema, "Invalid keyword."];
};
ReadSchema:
PROC [dbinfo: OpenDBInfo]
RETURNS [] ~ {
Read the database schema from dbinfo.dbName.
ENABLE DFUtilities.SyntaxError => ERROR Error[$BadSchema, reason];
NewDBInfo:
PROC [item:
REF
ANY, filename:
ROPE ←
NIL]
RETURNS [] ~ {
WITH item
SELECT
FROM
log: LogInfo => {
IF filename #
NIL
THEN
log.filename ← filename;
dbinfo.logs[log.id] ← log;
};
index: IndexInfo => {
IF filename #
NIL
THEN
index.filename ← filename;
[] ← RefTab.Insert[dbinfo.indices, index.key, index];
IF index.type = primary
THEN
IF dbinfo.primaryIndex =
NIL
THEN
dbinfo.primaryIndex ← index
ELSE
ERROR Error[$BadSchema, "Multiple primary indices specified."];
};
ENDCASE => ERROR Error[$InternalError];
};
SchemaItemProc: DFUtilities.ProcessItemProc = {
[item: REF ANY] RETURNS [stop: BOOL ← FALSE]
WITH item
SELECT
FROM
comment:
REF DFUtilities.CommentItem => {
IF needFilename
THEN
ERROR Error[$BadSchema, "No file name specified for some log or index."];
[lastSchemaItem, needFilename] ← ParseSchemaLine[IO.RIS[comment.text], buffer, wdir];
IF lastSchemaItem #
NIL
AND
NOT needFilename
THEN
NewDBInfo[lastSchemaItem];
};
file:
REF DFUtilities.FileItem => {
IF needFilename
THEN {
NewDBInfo[lastSchemaItem, FS.ExpandName[DFUtilities.RemoveVersionNumber[file.name], wdir].fullFName];
needFilename ← FALSE;
};
};
ENDCASE => {
IF needFilename
THEN
ERROR Error[$BadSchema, "No file name specified for some log or index."];
};
};
needFilename: BOOLEAN ← FALSE;
lastSchemaItem: REF ANY;
buffer: REF TEXT = RefText.ObtainScratch[RefText.line];
wdir: ROPE = Rope.Substr[dbinfo.dbName, 0, FS.ExpandName[dbinfo.dbName].cp.base.start];
schemaStream:
STREAM ← GeneralFS.StreamOpen[dbinfo.dbName !
FS.Error => ERROR Error[$CantOpenSchema, error.explanation]];
DFUtilities.ParseFromStream[in: schemaStream, proc: SchemaItemProc, filter: [comments: TRUE]];
RefText.ReleaseScratch[buffer];
};
DescribeI:
PROC [dbinfo: OpenDBInfo]
RETURNS [info: SchemaInfo] ~ {
Returns schema information about the database. This routine is not monitored since it can safely be called while other operations are in progress.
AddIndexInfo: RefTab.EachPairAction = {
[key: RefTab.Key, val: RefTab.Val] RETURNS [quit: BOOLEAN]
index: IndexInfo ← NARROW[val];
IF index # dbinfo.primaryIndex
THEN {
info.keys ← CONS[index.key, info.keys];
info.indexNames ← CONS[index.filename, info.indexNames];
};
RETURN[quit: FALSE];
};
info.dbName ← dbinfo.dbName;
info.logs ← NIL;
info.logNames ← NIL;
FOR i: LogID
IN [0..dbinfo.logs.size)
DO
IF dbinfo.logs[i] #
NIL
THEN {
info.logs ← CONS[i, info.logs];
info.logNames ← CONS[dbinfo.logs[i].filename, info.logNames];
};
ENDLOOP;
info.keys ← NIL;
info.indexNames ← NIL;
[] ← RefTab.Pairs[dbinfo.indices, AddIndexInfo];
info.keys ← CONS[dbinfo.primaryIndex.key, info.keys];
info.indexNames ← CONS[dbinfo.primaryIndex.filename, info.indexNames];
};
Entries
GetAttributeValue:
PROC [entry: Entry, type: AttributeType]
RETURNS [AttributeValue] ~ {
FOR e: Entry ← entry, e.rest
WHILE e #
NIL
DO
IF e.first.type = type
THEN
RETURN[e.first.value];
ENDLOOP;
RETURN[NIL];
};
ReverseEntry:
PROC [entry: Entry]
RETURNS[Entry] = {
Destructively reverses the order of the entry's attributes. Taken from LispImpl.DReverse.
l1, l2, l3: Entry ←
NIL;
IF entry = NIL THEN RETURN[NIL];
l3 ← entry;
UNTIL (l1 ← l3) =
NIL
DO
l3 ← l3.rest;
l1.rest ← l2;
l2 ← l1;
ENDLOOP;
RETURN[l2];
};
Log management
Some day these routines may be placed in a separate module.
OpenLogs:
PROC [logs: LogSet]
RETURNS [] ~ {
FOR i: LogID
IN [0..logs.size)
DO
IF (logs[i] #
NIL)
AND (logs[i].stream =
NIL)
THEN {
logs[i].stream ← GeneralFS.StreamOpen[logs[i].filename,
IF logs[i].access = readWrite
THEN $append
ELSE $read !
FS.Error => ERROR Error[$CantOpenLog, error.explanation]];
};
ENDLOOP;
};
ReadLogEntry:
PROC [logStream:
STREAM, byte: LogAddress]
RETURNS [Entry] ~ {
ENABLE { IO.EndOfStream, IO.Error => ERROR Error[$BadLogEntry] };
AttributeBreakProc:
IO.BreakProc = {
[char: CHAR] RETURNS [IO.CharClass]
RETURN[
SELECT char
FROM
': => sepr,
ENDCASE => other]
};
ReadAttribute:
PROC [s:
STREAM]
RETURNS [a: Attribute] ~ {
a.type ← Atom.MakeAtom[IO.GetTokenRope[s, AttributeBreakProc].token];
[] ← IO.GetChar[s]; -- attribute separation char
[] ← IO.SkipWhitespace[stream: s, flushComments: FALSE];
IF
IO.PeekChar[s] = '"
THEN {
a.value ← IO.GetRopeLiteral[s];
[] ← IO.GetLineRope[s];
}
ELSE
a.value ← IO.GetLineRope[s];
};
entry: Entry;
attribute: Attribute;
IF logStream = NIL THEN ERROR Error[$DBClosed];
IF byte >= 0
THEN
IO.SetIndex[logStream, byte ! IO.Error => ERROR Error[$BadIndex]];
IF IO.PeekChar[logStream] = UpdateComplete THEN RETURN[NIL]; -- at end of log
read a list of attributes until endofentry
UNTIL
IO.PeekChar[logStream] = EndOfEntry
DO
attribute ← ReadAttribute[logStream];
entry ← CONS[attribute, entry];
ENDLOOP;
the list is constructed backwards, so it is reversed before being returned
RETURN[ReverseEntry[entry]];
};
WriteLogEntry:
PROC [logStream:
STREAM, entry: Entry]
RETURNS [LogAddress] ~ {
WriteAttribute:
PROC [s:
STREAM, a: Attribute]
RETURNS [] ~ {
value: ROPE ← a.value;
IF Rope.Find[a.value, "\n"] # -1
THEN
-- write out value as rope literal
value ← Rope.Cat["\"", a.value, "\""];
IO.PutF[s, "%g: %g\n", IO.rope[Atom.GetPName[a.type]], IO.rope[value]]
};
byteOffset: LogAddress;
IF logStream = NIL THEN ERROR Error[$DBClosed];
byteOffset ← IO.GetLength[logStream];
IF byteOffset > 0
THEN
byteOffset ← byteOffset - 1; --remove update marker from end of log
IO.SetIndex[logStream, byteOffset];
write a list of attributes
FOR e: Entry ← entry, e.rest
WHILE e #
NIL
DO
WriteAttribute[logStream, e.first];
ENDLOOP;
IO.PutChar[logStream, EndOfEntry];
IO.Flush[logStream];
RETURN[byteOffset];
};
MarkUpdateComplete:
PROC [logStream:
STREAM]
RETURNS [] ~ {
byteOffset: LogAddress ← IO.GetLength[logStream];
IO.SetIndex[logStream, byteOffset];
IO.PutChar[logStream, UpdateComplete];
IO.Flush[logStream];
};
CloseLogs:
PROC [logs: LogSet]
RETURNS [] ~ {
FOR i: LogID
IN [0..logs.size)
DO
IF (logs[i] #
NIL)
AND (logs[i].stream #
NIL)
THEN {
IO.Close[logs[i].stream];
logs[i].stream ← NIL;
};
ENDLOOP;
};
Index management
Some day these routines may be placed in a separate module.
OpenIndices:
PROC [indices: IndexSet]
RETURNS [needToRebuild:
BOOLEAN] ~ {
OpenIndex: RefTab.EachPairAction = {
[key: RefTab.Key, val: RefTab.Val] RETURNS [quit: BOOL]
init: BOOLEAN ← FALSE;
index: IndexInfo ← NARROW[val];
IF index.btree =
NIL
THEN {
index.btree ← BTreeSimple.New[];
};
IF BTree.GetState[index.btree].state # open
THEN {
index.openfile ← GeneralFS.Open[name: index.filename, lock: $write
!
FS.Error => {
-- problem opening btree so create new one
IF error.group = $user
AND error.code = $unknownFile
THEN {
init ← TRUE;
needToRebuild ← TRUE;
index.openfile ← GeneralFS.Create[name: index.filename];
CONTINUE;
}
ELSE
ERROR Error[$CantOpenIndex, error.explanation];
}
];
BTreeSimple.Open[tree: index.btree, file: index.openfile, initialize: init !
BTree.Error => ERROR Error[$BadIndex, Rope.Cat["Bad index for ",Atom.GetPName[NARROW[key]],"."]]];
};
RETURN[quit: FALSE];
};
needToRebuild ← FALSE;
[] ← RefTab.Pairs[indices, OpenIndex];
RETURN[needToRebuild];
};
GetIndexEntry:
PROC [index: IndexInfo, value: AttributeValue, valueIsKey:
BOOLEAN ←
FALSE, relation: BTreeSimple.Relation ← equal, pathStk: BTreeSimple.PathStk ←
NIL]
RETURNS [actual: AttributeValue, data: IndexEntry] ~ {
Note: the AttributeValue's passed in and out may be unique keys or not.
btreeValue: BTreeSimple.Value;
useKey: AttributeValue ← value;
useRelation: BTreeSimple.Relation ← relation;
IF
NOT valueIsKey
AND index.type = secondary
THEN
SELECT relation
FROM
less =>
useKey ← ValueToSmallKey[value];
lessEqual =>
useKey ← ValueToLargeKey[value];
equal => {
--start small and look upwards
useKey ← ValueToSmallKey[value];
useRelation ← greaterEqual;
};
greaterEqual =>
useKey ← ValueToSmallKey[value];
greater =>
useKey ← ValueToLargeKey[value];
ENDCASE;
[actual, btreeValue] ← BTreeSimple.ReadRecord[tree: index.btree, key: useKey, relation: useRelation, pathStk: pathStk, useExistingPath: pathStk #
NIL !
BTree.Error => ERROR Error[IF reason = closed THEN $DBClosed ELSE $BadIndex]];
IF
NOT valueIsKey
AND index.type = secondary
AND relation = equal
AND
NOT Rope.Equal[value, UniqueKeyToValue[actual],
FALSE]
THEN
-- found entry larger than desired
actual ← NIL;
IF btreeValue #
NIL
THEN
TRUSTED {
data ← LOOPHOLE[@btreeValue[0], IndexPtr]^;
};
RETURN[actual, data];
};
CreateIndexEntry:
PROC [index: IndexInfo, value: AttributeValue, data: IndexEntry]
RETURNS [] ~ {
btreeValue: BTreeSimple.Value;
btreeValue ← NEW[BTreeSimple.ValueObject[SIZE[IndexEntry]]];
TRUSTED {
LOOPHOLE[@btreeValue[0], IndexPtr]^ ← data;
};
BTreeSimple.UpdateRecord[tree: index.btree, key:
IF index.type = primary
THEN value
ELSE ValueToUniqueKey[value, data], value: btreeValue !
BTree.Error => ERROR Error[IF reason = closed THEN $DBClosed ELSE $BadIndex]];
};
DeleteIndexEntry:
PROC [index: IndexInfo, value: AttributeValue, data: IndexEntry]
RETURNS [] ~ {
[] ← BTreeSimple.DeleteKey[tree: index.btree, key:
IF index.type = primary
THEN value
ELSE ValueToUniqueKey[value, data] !
BTree.Error => ERROR Error[IF reason = closed THEN $DBClosed ELSE $BadIndex]];
};
AddEntryToIndices:
PROC [indices: IndexSet, entry: Entry, data: IndexEntry]
RETURNS [] ~ {
index: IndexInfo;
FOR e: Entry ← entry, e.rest
WHILE e #
NIL
DO
index ← NARROW[RefTab.Fetch[indices, e.first.type].val];
IF index #
NIL
THEN {
-- index key in entry so add to index
CreateIndexEntry[index, e.first.value, data];
};
ENDLOOP;
};
RemoveEntryFromIndices:
PROC [indices: IndexSet, entry: Entry, data: IndexEntry]
RETURNS [] ~ {
index: IndexInfo;
FOR e: Entry ← entry, e.rest
WHILE e #
NIL
DO
index ← NARROW[RefTab.Fetch[indices, e.first.type].val];
IF index #
NIL
THEN {
-- index key in entry so remove from index
DeleteIndexEntry[index, e.first.value, data];
};
ENDLOOP;
};
EraseIndices:
PROC [indices: IndexSet]
RETURNS [] ~ {
EraseIndex: RefTab.EachPairAction = {
[key: RefTab.Key, val: RefTab.Val] RETURNS [quit: BOOL]
index: IndexInfo ← NARROW[val];
IF index.btree = NIL THEN RETURN[quit: FALSE];
the following two statements may not really be needed
IF BTree.GetState[index.btree].state # closed
THEN {
BTreeSimple.SetState[index.btree, closed];
FS.Close[index.openfile];
};
index.openfile ← GeneralFS.Create[index.filename];
BTreeSimple.Open[tree: index.btree, file: index.openfile, initialize: TRUE];
RETURN[quit: FALSE];
};
[] ← RefTab.Pairs[indices, EraseIndex];
};
CloseIndices:
PROC [indices: IndexSet]
RETURNS [] ~ {
CloseIndex: RefTab.EachPairAction = {
[key: RefTab.Key, val: RefTab.Val] RETURNS [quit: BOOL]
index: IndexInfo ← NARROW[val];
IF index.btree #
NIL
AND BTree.GetState[index.btree].state # closed
THEN {
BTreeSimple.SetState[index.btree, closed];
FS.Close[index.openfile];
};
RETURN[quit: FALSE];
};
[] ← RefTab.Pairs[indices, CloseIndex];
};
Secondary indices: need not be unique so special btree keys must be generated. Unique keys are obtained by appending a logID/logAddress pair to an AttributeValue.
IsUniqueKey:
PROC [key:
ROPE]
RETURNS [
BOOLEAN] ~
INLINE {
RETURN[Rope.Find[key, "\000"] # -1];
};
ValueToUniqueKey:
PROC [value: AttributeValue, data: IndexEntry]
RETURNS [
ROPE] ~
INLINE {
RETURN[Rope.Cat[value, "\000", Convert.RopeFromInt[data.log], " ", Convert.RopeFromInt[data.firstByte]]];
};
ValueToSmallKey:
PROC [value: AttributeValue]
RETURNS [
ROPE] ~
INLINE {
RETURN[Rope.Concat[value, "\000\000"]];
};
ValueToLargeKey:
PROC [value: AttributeValue]
RETURNS [
ROPE] ~
INLINE {
RETURN[Rope.Concat[value, "\000\377"]];
};
UniqueKeyToValue:
PROC [key:
ROPE]
RETURNS [AttributeValue] ~
INLINE {
RETURN[Rope.Substr[key, 0, Rope.Index[key, 0, "\000"]]];
};
Crash recovery
Recover:
PROC [dbinfo: OpenDBInfo]
RETURNS [needToRebuild:
BOOLEAN] ~ {
needToRebuild ← FALSE;
FOR i: LogID
IN [0..dbinfo.logs.size)
DO
IF (dbinfo.logs[i] #
NIL)
AND (dbinfo.logs[i].access # readOnly)
THEN {
IF
NOT InGoodState[dbinfo.logs[i].stream]
THEN
IF RestoreLog[dbinfo.logs[i].stream]
THEN
needToRebuild ← TRUE;
};
ENDLOOP;
RETURN[needToRebuild];
};
InGoodState:
PROC [logStream:
STREAM]
RETURNS [
BOOLEAN] ~ {
Checks that the log is in a consistent state. The last byte of the log should be the UpdateComplete marker.
PROBLEM: the byte count returned by GetLength may be wrong after a crash?
length: INT;
thirdToLastByte, secondToLastByte, lastByte: CHAR;
length ← IO.GetLength[logStream];
IF length < 3
THEN {
-- log just created (or too short to be valid)
IO.SetLength[logStream, 0];
MarkUpdateComplete[logStream];
RETURN[TRUE];
};
IO.SetIndex[logStream, length-3];
thirdToLastByte ← IO.GetChar[logStream];
secondToLastByte ← IO.GetChar[logStream];
lastByte ← IO.GetChar[logStream];
RETURN[(thirdToLastByte = '\n) AND (secondToLastByte = EndOfEntry) AND (lastByte = UpdateComplete)];
};
RestoreLog:
PROC [logStream:
STREAM]
RETURNS [needToRebuild:
BOOLEAN] ~ {
Restores the log to a consistent state. There are two cases to consider: 1) the system was interupted while writing an entry on the log, 2) a complete entry exists on the log but the btrees have only been partially updated.
logAddress: INT;
prevByte, byte: CHAR;
lastEntry: Entry;
logAddress ← IO.GetLength[logStream]-2;
IO.SetIndex[logStream, logAddress];
prevByte ← IO.GetChar[logStream];
byte ← IO.GetChar[logStream];
IF (byte = EndOfEntry)
AND (prevByte = '\n)
THEN {
entry complete so roll forward, but don't know state of indices so must rebuild them
needToRebuild ← TRUE;
}
ELSE {
entry only partially written so roll backward: scan log from end looking for EndOfEntry, then reset log length
UNTIL ((byte = EndOfEntry)
AND (prevByte = '\n))
OR (logAddress = 0)
DO
byte ← prevByte;
logAddress ← logAddress - 1;
IO.SetIndex[logStream, logAddress];
prevByte ← IO.GetChar[logStream];
ENDLOOP;
IO.SetLength[logStream, IF logAddress = 0 THEN 0 ELSE logAddress + 2];
needToRebuild ← FALSE;
};
A replaced entry should never be the last entry in a log. If this is the case, then we must have crashed before writing the replacement entry. To recover, simply remove the replaced entry.
IF logAddress # 0
THEN {
Back up one more character, then continue backwards search to find last entry
byte ← prevByte;
logAddress ← logAddress - 1;
IO.SetIndex[logStream, logAddress];
prevByte ← IO.GetChar[logStream];
UNTIL ((byte = EndOfEntry)
AND (prevByte = '\n))
OR (logAddress = 0)
DO
byte ← prevByte;
logAddress ← logAddress - 1;
IO.SetIndex[logStream, logAddress];
prevByte ← IO.GetChar[logStream];
ENDLOOP;
IF logAddress # 0
THEN
logAddress ← logAddress + 2;
lastEntry ← ReadLogEntry[logStream, logAddress];
IF lastEntry.first.type = $REPLACED
THEN {
IO.SetLength[logStream, logAddress];
needToRebuild ← FALSE;
};
};
Log is now in good shape.
MarkUpdateComplete[logStream];
RETURN[needToRebuild];
};
IndicesOutOfDate:
PROC [dbinfo: OpenDBInfo]
RETURNS [
BOOLEAN] ~ {
Determines if logs have been edited manually, i.e. if the indices are currently out-of-date, by checking the create date of the primary index against those of the logs.
This doesn't actually work since simply opening an activity log changes its date.
fileDate: BasicTime.GMT;
indexDate: BasicTime.GMT ← FS.FileInfo[name: dbinfo.primaryIndex.filename].created;
FOR i: LogID
IN [0..dbinfo.logs.size)
DO
IF dbinfo.logs[i] #
NIL
THEN {
fileDate ← FS.FileInfo[name: dbinfo.logs[i].filename].created;
IF BasicTime.Period[from: indexDate, to: fileDate] > 0 THEN RETURN[TRUE];
};
ENDLOOP;
RETURN[FALSE];
};
Shared databases
A given database can not actually be opened more than once because of the concurrency control provided by FS. Thus, we keep a table of databases that are already open.
OpenDBTable: SymTab.Ref;
OpenDBTableSize: SymTab.SeqIndex = 2039;
2039 is a good candidate because it is prime and is large enough that collisions should be rare.
GetSharedDB:
PROC [dbName:
ROPE]
RETURNS [dbinfo: OpenDBInfo] ~ {
Remember: this routine is not called under a monitor lock. If several clients try to create an OpenDBRecord concurrently, the first one to register with the OpenDBTable wins.
fname: ROPE ← FS.FileInfo[dbName ! FS.Error => ERROR Error[$CantOpenSchema, error.explanation]].fullFName;
dbinfo ← NARROW[SymTab.Fetch[OpenDBTable, fname].val];
IF dbinfo =
NIL
THEN {
dbinfo ← NEW[OpenDBRecord];
dbinfo.dbName ← fname;
dbinfo.isOpen ← FALSE;
dbinfo.remoteAccess ← TRUE;
dbinfo.statusMsg ← "Open for service.";
dbinfo.indices ← RefTab.Create[];
dbinfo.primaryIndex ← NIL;
dbinfo.logs ← NEW[LogSetRecord[LAST[LogID]]];
IF
NOT SymTab.Insert[OpenDBTable, fname, dbinfo]
THEN
-- lost race
dbinfo ← NARROW[SymTab.Fetch[OpenDBTable, fname].val];
};
};
Want the table of open databases to be created only once, even if this package is re-run.
OpenDBTable ← NARROW[Atom.GetProp[$LoganBerry, $OpenDBTable]];
IF OpenDBTable =
NIL
THEN {
OpenDBTable ← SymTab.Create[mod: OpenDBTableSize, case: FALSE];
Atom.PutProp[$LoganBerry, $OpenDBTable, OpenDBTable];
};
END.
Doug Terry August 12, 1985 1:05:48 pm PDT
created
Doug Terry, January 23, 1986 8:28:43 pm PST
changes to: DIRECTORY, Open, Close, GetInfo, dbinfo (local of GetInfo), OpenI, MonitoredOpen, EnumerateEntriesI, CloseI, CompactLogsI, SetRemoteAccessI, ReadLogEntry, WriteLogEntry, CloseLogs, GetIndexEntry, CreateIndexEntry, DeleteIndexEntry, CloseIndex (local of CloseIndices), OpenDBTable, OpenDBTableSize, GetSharedDB, OpenDBTable, IF, IMPORTS
Doug Terry, January 24, 1986 9:39:30 am PST
changes to: MonitoredOpen, BuildIndicesI, BuildIndicesWorker
Swinehart, January 27, 1986 5:10:45 pm PST
Fullname expansion does not produce a canonical name (case may differ); GetSharedDB now uses a case-insensitive symbol table.
changes to: GetSharedDB
Doug Terry, January 31, 1986 4:25:40 pm PST
changes to: DIRECTORY, OpenIndex (local of OpenIndices), CloseIndex (local of CloseIndices), EraseIndex (local of EraseIndices), CompactLogsI
Doug Terry, March 5, 1986 5:21:27 pm PST
Implemented Describe so that clients can obtain schema information even if they don't have access to the schema file.
changes to: Describe, DescribeI, AddIndexInfo (local of DescribeI)