<<>> <> <> <> <> <> <> <> <> <<>> <<>> DIRECTORY Atom USING [GetPName, MakeAtom, GetProp, PutProp], BasicTime USING [GMT, Period], BTree USING [Error, UpdateInProgress, GetState], BTreeSimple USING [CacheSize, DeleteKey, EnumerateRecords, --GetState,-- InternalKey, New, NewPathStk, Open, PageSize, PathStk, ReadRecord, --ReadValue,-- Relation, SetState, SetUpdateInProgress, Tree, UpdateRecord, Value, ValueObject], Convert USING [IntFromRope, RopeFromInt], DFUtilities USING [--DirectoryItem,-- FileItem, CommentItem, ParseFromStream, ProcessItemProc, RemoveVersionNumber, SyntaxError], IO USING [Backup, BreakProc, Close, EndOf, EndOfStream, Error, Flush, GetChar, --GetID,-- GetIndex, GetInt, GetLength, GetLineRope, GetRopeLiteral, GetToken, GetTokenRope, IDProc, int, PeekChar, PutChar, PutF1, PutFR1, PutRope, RIS, SetIndex, SetLength, STREAM], LoganBerryClass USING [Class, ClassObject, Register], PFS USING [AbsoluteName, AccessOptions, Error, FileInfo, GetWDir, PATH, PathFromRope, RopeFromPath, StreamBufferParms, StreamOpen, UniqueID], PFSNames USING [IsAbsolute], RefID USING [Release, Reseal, Seal, Unseal], RefTab USING [Create, EachPairAction, Fetch, Insert, Pairs, Ref], RefText USING [Equal, line, ObtainScratch, ReleaseScratch, TrustTextAsRope], Rope USING [Cat, Compare, Concat, Equal, Find, Index, Length, Map, ROPE, SkipTo, Substr], SymTab USING [Create, EachPairAction, Fetch, Insert, Pairs, Ref], LoganBerryBackdoor, LoganBerry, LoganBerryStructure; LoganBerryImpl: CEDAR MONITOR LOCKS dbinfo USING dbinfo: OpenDBInfo IMPORTS Atom, BasicTime, BTree, BTreeSimple, Convert, DFUtilities, IO, LoganBerry, LoganBerryClass, PFS, PFSNames, RefText, Rope, RefTab, RefID, SymTab EXPORTS LoganBerryBackdoor ~ BEGIN OPEN LoganBerry, LoganBerryStructure; ROPE: TYPE = Rope.ROPE; STREAM: TYPE = IO.STREAM; bytesPerBuffer: INT ¬ 4096; -- PFS-dependent value! maxAttributeLen: INT ¬ 200; -- Guess, which has to be conservative! <> <> <> <<>> <> btreeCacheSize: BTreeSimple.CacheSize ¬ 300; -- default value is 25 <> btreePageSize: BTreeSimple.PageSize ¬ 2048; -- number of words per physical page on a Sun <<>> <> <<[vmPagesPerBuffer: INT [1 .. 128], nBuffers: INT [1 .. 64]]>> logBufferSize: PFS.StreamBufferParms ¬ [bytesPerBuffer: 4*1024, nBuffers: 16]; -- default is [4*1024, 2] <> scanBufferSize: PFS.StreamBufferParms ¬ [bytesPerBuffer: 8*1024, nBuffers: 2]; -- default is [4*1024, 2] <> <> <> Open: --PUBLIC-- PROC [conv: Conv ¬ NIL, dbName: ROPE] RETURNS [db: OpenDB] ~ { <> dbinfo: OpenDBInfo ¬ OpenI[dbName]; IF NOT dbinfo.remoteAccess THEN ERROR Error[$DBNotAvailable, dbinfo.statusMsg]; db ¬ RefID.Seal[dbinfo]; }; ReadEntry: --PUBLIC-- PROC [conv: Conv ¬ NIL, db: OpenDB, key: AttributeType, value: AttributeValue] RETURNS [entry: Entry, others: BOOLEAN] ~ { <> [entry, others] ¬ ReadEntryI[GetInfo[db], key, value]; }; EnumerateEntries: --PUBLIC-- PROC [db: OpenDB, key: AttributeType, start: AttributeValue ¬ NIL, end: AttributeValue ¬ NIL, proc: EntryProc] RETURNS [] ~ { <> <<>> EnumerateEntriesI[GetInfo[db], key, start, end, proc]; }; GenerateEntries: --PUBLIC-- PROC [conv: Conv ¬ NIL, db: OpenDB, key: AttributeType, start: AttributeValue ¬ NIL, end: AttributeValue ¬ NIL] RETURNS [cursor: Cursor] ~ { <> newCursor: CursorInfo ¬ GenerateEntriesI[GetInfo[db], key, start, end]; cursor ¬ RefID.Seal[newCursor]; }; NextEntry: --PUBLIC-- PROC [conv: Conv ¬ NIL, cursor: Cursor, dir: CursorDirection ¬ increasing] RETURNS [entry: Entry] ~ { <> c: CursorInfo; WITH RefID.Unseal[cursor] SELECT FROM ci: CursorInfo => c ¬ ci; ENDCASE => c ¬ NIL; IF c = NIL THEN ERROR Error[$BadCursor, "Invalid cursor passed to NextEntry."]; IF NOT c.dbinfo.remoteAccess THEN ERROR Error[$DBNotAvailable, c.dbinfo.statusMsg]; entry ¬ NextEntryI[c.dbinfo, c, dir]; }; EndGenerate: --PUBLIC-- PROC [conv: Conv ¬ NIL, cursor: Cursor] RETURNS [] ~ { <> [] ¬ RefID.Release[cursor]; }; WriteEntry: --PUBLIC-- PROC [conv: Conv ¬ NIL, db: OpenDB, entry: Entry, log: LogID ¬ activityLog, replace: BOOLEAN ¬ FALSE] RETURNS [] ~ { <> WriteEntryI[GetInfo[db], entry, log, replace]; }; DeleteEntry: --PUBLIC-- PROC [conv: Conv ¬ NIL, db: OpenDB, key: AttributeType, value: AttributeValue] RETURNS [deleted: Entry] ~ { <> deleted ¬ DeleteEntryI[GetInfo[db], key, value]; }; Close: --PUBLIC-- PROC [conv: Conv ¬ NIL, db: OpenDB] RETURNS [] ~ { <> CloseI[GetInfo[db]]; }; BuildIndices: --PUBLIC-- PROC [conv: Conv ¬ NIL, db: OpenDB] RETURNS [] ~ { <> BuildIndicesI[GetInfo[db]]; }; CompactLogs: --PUBLIC-- PROC [conv: Conv ¬ NIL, db: OpenDB] RETURNS [] ~ { <> CompactLogsI[GetInfo[db]]; }; Describe: --PUBLIC-- PROC [conv: Conv ¬ NIL, db: OpenDB] RETURNS [info: SchemaInfo] ~ { <> info ¬ DescribeI[GetInfo[db]]; }; IsLocal: --PUBLIC-- PROC[db: LoganBerry.OpenDB] RETURNS[BOOL ¬ TRUE] = { []¬GetInfo[db]; -- Error if the db isn't valid }; StartTransaction: --PUBLIC-- PROC [db: LoganBerry.OpenDB, wantAtomic: BOOLEAN ¬ FALSE] = { <> StartTransactionI[GetInfo[db], wantAtomic]; }; EndTransaction: --PUBLIC-- PROC [db: LoganBerry.OpenDB, commit: BOOLEAN ¬ TRUE] RETURNS [committed: BOOLEAN] = { <> committed ¬ EndTransactionI[GetInfo[db], commit]; }; FlushDBCache: --PUBLIC-- PROC [db: LoganBerry.OpenDB ¬ LoganBerry.nullDB] = { <> FlushOpenTable[IF db # nullDB THEN GetInfo[db] ELSE NIL]; }; GetInfo: PROC [db: OpenDB] RETURNS [OpenDBInfo] ~ INLINE { <> ref: REF = RefID.Unseal[db]; IF ref = NIL THEN ERROR Error[$BadDBHandle, "NIL OpenDB handle."]; WITH ref SELECT FROM dbinfo: OpenDBInfo => { IF NOT dbinfo.remoteAccess THEN ERROR Error[$DBNotAvailable, dbinfo.statusMsg]; RETURN[dbinfo]; }; ENDCASE => ERROR Error[$BadDBHandle, "Invalid OpenDB handle."]; }; <> <> OpenI: PUBLIC PROC [dbName: ROPE] RETURNS [dbinfo: OpenDBInfo] ~ { <> <> dbinfo ¬ GetSharedDB[dbName]; IF NOT dbinfo.isOpen THEN MonitoredOpen[dbinfo]; RETURN[dbinfo]; }; MonitoredOpen: ENTRY PROC [dbinfo: OpenDBInfo] RETURNS [] ~ { ENABLE UNWIND => NULL; <> IF dbinfo.isOpen THEN RETURN; -- lost race DoOpen[dbinfo]; }; DoOpen: INTERNAL PROC [dbinfo: OpenDBInfo] RETURNS [] ~ { <> pathname: PFS.PATH; badIndexMsg: ROPE ¬ NIL; oldDBVersion: ROPE ¬ dbinfo.dbName; pathname ¬ PFS.PathFromRope[DFUtilities.RemoveVersionNumber[dbinfo.dbName]]; dbinfo.dbName ¬ PFS.RopeFromPath[PFS.FileInfo[pathname ! PFS.Error => ERROR Error[$CantOpenSchema, error.explanation]].fullFName]; IF dbinfo.primaryIndex = NIL OR NOT Rope.Equal[dbinfo.dbName, oldDBVersion, FALSE] THEN ReadSchema[dbinfo]; dbinfo.openForUpdate ¬ FALSE; OpenLogs[dbinfo.logs, $read]; IF Recover[dbinfo] THEN badIndexMsg ¬ "Logs recovered after apparent crash, please rebuild Indices."; IF OpenIndices[dbinfo.indices, $read] THEN badIndexMsg ¬ "Indices cannot be opened, please rebuild."; dbinfo.isOpen ¬ TRUE; <> <> IF badIndexMsg # NIL THEN ERROR Error[$BadIndex, badIndexMsg]; -- instead of BuildIndicesWorker[dbinfo]; }; ReadEntryI: PUBLIC ENTRY PROC [dbinfo: OpenDBInfo, key: AttributeType, value: AttributeValue] RETURNS [entry: Entry, others: BOOLEAN] ~ { ENABLE UNWIND => NULL; <> indexEntry: IndexEntry; index: OpenIndexInfo; aValue: AttributeValue; pathSkt: BTreeSimple.PathStk ¬ BTreeSimple.NewPathStk[]; IF NOT dbinfo.isOpen THEN DoOpen[dbinfo]; <> index ¬ NARROW[RefTab.Fetch[dbinfo.indices, key].val]; IF index = NIL THEN ERROR Error[$NoIndex, Rope.Cat["Index for ",Atom.GetPName[key]," does not exist."]]; [aValue, indexEntry] ¬ GetIndexEntry[index: index, value: value, pathStk: pathSkt]; IF aValue = NIL THEN RETURN[entry: NIL, others: FALSE]; entry ¬ ReadLogEntryT[dbinfo, indexEntry.log, indexEntry.firstByte]; IF index.type = primary THEN others ¬ FALSE ELSE { aValue ¬ GetIndexEntry[index: index, value: aValue, valueIsKey: TRUE, relation: greater, pathStk: pathSkt].actual; others ¬ Rope.Equal[s1: value, s2: UniqueKeyToValue[aValue], case: FALSE]; }; RETURN[entry, others]; }; SimpleEntryProc: EntryProc = { -- for debugging purposes <<[entry: LoganBerry.Entry] RETURNS [continue: BOOL]>> cont: BOOLEAN ¬ TRUE; <> RETURN[cont]; }; EnumerateEntriesI: PUBLIC PROC [dbinfo: OpenDBInfo, key: AttributeType, start: AttributeValue ¬ NIL, end: AttributeValue ¬ NIL, proc: EntryProc] RETURNS [] ~ { ENABLE UNWIND => NULL; <> <> <<>> entry: Entry; c: CursorInfo; c ¬ GenerateEntriesI[dbinfo, key, start, end]; entry ¬ NextEntryI[c.dbinfo, c]; WHILE entry # NIL DO IF NOT proc[entry] THEN EXIT; entry ¬ NextEntryI[c.dbinfo, c]; ENDLOOP; EndGenerateI[c.dbinfo, c]; }; GenerateEntriesI: PUBLIC ENTRY PROC [dbinfo: OpenDBInfo, key: AttributeType, start: AttributeValue ¬ NIL, end: AttributeValue ¬ NIL] RETURNS [cinfo: CursorInfo] ~ { ENABLE UNWIND => NULL; <> newCursor: CursorInfo; index: OpenIndexInfo; IF NOT dbinfo.isOpen THEN DoOpen[dbinfo]; index ¬ NARROW[RefTab.Fetch[dbinfo.indices, key].val]; IF index = NIL THEN ERROR Error[$NoIndex, Rope.Cat["Index for ",Atom.GetPName[key]," does not exist."]]; newCursor ¬ NEW[CursorRecord]; newCursor.index ¬ index; newCursor.dbinfo ¬ dbinfo; newCursor.key ¬ key; newCursor.start ¬ start; newCursor.end ¬ end; IF newCursor.index.type = secondary THEN { IF start # NIL THEN newCursor.start ¬ ValueToSmallKey[start]; IF end # NIL THEN newCursor.end ¬ ValueToLargeKey[end]; }; newCursor.pathStk ¬ BTreeSimple.NewPathStk[]; <> newCursor.current ¬ IF start # NIL THEN GetIndexEntry[index: newCursor.index, value: newCursor.start, valueIsKey: TRUE, relation: less, pathStk: newCursor.pathStk].actual ELSE NIL; RETURN[newCursor]; }; NextEntryI: PUBLIC ENTRY PROC [dbinfo: OpenDBInfo, cinfo: CursorInfo, dir: CursorDirection ¬ increasing] RETURNS [entry: Entry] ~ { ENABLE UNWIND => NULL; <> <> actualValue: AttributeValue; indexEntry: IndexEntry; IF NOT dbinfo.isOpen THEN DoOpen[dbinfo]; [actualValue, indexEntry] ¬ GetIndexEntry[index: cinfo.index, value: cinfo.current, valueIsKey: TRUE, relation: IF dir = increasing THEN greater ELSE less, pathStk: cinfo.pathStk]; IF (actualValue = NIL) OR (dir = increasing AND cinfo.end # NIL AND Rope.Compare[actualValue, cinfo.end, FALSE] = greater) OR (dir = decreasing AND Rope.Compare[actualValue, cinfo.start, FALSE] = less) THEN RETURN[NIL]; entry ¬ ReadLogEntryT[dbinfo, indexEntry.log, indexEntry.firstByte]; cinfo.current ¬ actualValue; RETURN[entry]; }; EndGenerateI: PUBLIC ENTRY PROC [dbinfo: OpenDBInfo, cinfo: CursorInfo] RETURNS [] ~ { ENABLE UNWIND => NULL; <> NULL; -- this routine is unnecessary since cursors get garbage collected }; WriteEntryI: PUBLIC ENTRY PROC [dbinfo: OpenDBInfo, entry: Entry, log: LogID ¬ activityLog, replace: BOOLEAN ¬ FALSE] RETURNS [] ~ { ENABLE UNWIND => NULL; <> value: AttributeValue; indexData: IndexEntry ¬ [log: log, firstByte: 0]; doReplace: BOOLEAN ¬ FALSE; replacedIndexEntry: IndexEntry; replacedEntry: Entry ¬ NIL; inTrans: BOOL¬FALSE; IF NOT dbinfo.isOpen THEN DoOpen[dbinfo]; inTrans ¬ dbinfo.transStarted; value ¬ GetAttributeValue[entry, dbinfo.primaryIndex.key]; IF value = NIL THEN ERROR Error[$NoPrimaryKey, "Entry does not contain a primary key."]; IF dbinfo.logs[log].access = readOnly THEN ERROR Error[$LogReadOnly, "Can't write to a read only log."]; [value, replacedIndexEntry] ¬ GetIndexEntry[index: dbinfo.primaryIndex, value: value]; IF value # NIL THEN IF replace THEN doReplace ¬ TRUE ELSE ERROR Error[$ValueNotUnique, "An existing entry already contains the primary key."]; <> MakeDatabaseWriteable[dbinfo]; IF doReplace THEN { IF replacedIndexEntry.log # log THEN ERROR Error[$InvalidReplace, "Cross-log replacements are not allowed."]; [] ¬ WriteLogEntry[dbinfo.logs[log].writeStream, LIST[[$REPLACE, value]], inTrans]; }; indexData.firstByte ¬ WriteLogEntry[dbinfo.logs[log].writeStream, entry, doReplace OR inTrans]; IF doReplace THEN { replacedEntry ¬ GetCachedEntry[dbinfo, value]; IF replacedEntry=NIL THEN replacedEntry ¬ ReadLogEntryT[dbinfo, log, replacedIndexEntry.firstByte]; RemoveEntryFromIndices[dbinfo.indices, replacedEntry, replacedIndexEntry]; }; AddEntryToIndices[dbinfo.indices, entry, indexData]; CompleteLogEntry[dbinfo, log]; }; DeleteEntryI: PUBLIC ENTRY PROC [dbinfo: OpenDBInfo, key: AttributeType, value: AttributeValue] RETURNS [deleted: Entry] ~ { ENABLE UNWIND => NULL; <> indexEntry: IndexEntry; index: OpenIndexInfo; pathSkt: BTreeSimple.PathStk ¬ BTreeSimple.NewPathStk[]; avalue: AttributeValue; delete: Entry; entry: Entry; IF NOT dbinfo.isOpen THEN DoOpen[dbinfo]; <> index ¬ NARROW[RefTab.Fetch[dbinfo.indices, key].val]; IF index = NIL THEN ERROR Error[$NoIndex, Rope.Cat["Index for ",Atom.GetPName[key]," does not exist."]]; <> [avalue, indexEntry] ¬ GetIndexEntry[index: index, value: value, pathStk: pathSkt]; IF avalue = NIL THEN RETURN; <> <> <<};>> avalue ¬ GetIndexEntry[index: index, value: avalue, valueIsKey: TRUE, relation: greater, pathStk: pathSkt].actual; IF Rope.Equal[s1: value, s2: UniqueKeyToValue[avalue], case: FALSE] THEN ERROR Error[$ValueNotUnique, "Entry to be deleted insufficiently specified."]; IF dbinfo.logs[indexEntry.log].access = readOnly THEN ERROR Error[$LogReadOnly, "Can't delete entries in a read only log."]; <> entry ¬ ReadLogEntryT[dbinfo, indexEntry.log, indexEntry.firstByte]; MakeDatabaseWriteable[dbinfo]; delete ¬ LIST[[$DELETE, GetAttributeValue[entry, dbinfo.primaryIndex.key]]]; [] ¬ WriteLogEntry[dbinfo.logs[indexEntry.log].writeStream, delete, dbinfo.transStarted]; <> RemoveEntryFromIndices[dbinfo.indices, entry, indexEntry]; CompleteLogEntry[dbinfo, indexEntry.log]; RETURN[entry]; }; CloseI: PUBLIC PROC [dbinfo: OpenDBInfo] RETURNS [] ~ { DoClose[dbinfo]; }; DoClose: ENTRY PROC [dbinfo: OpenDBInfo, flush: BOOL¬FALSE] RETURNS [] ~ { ENABLE UNWIND => NULL; <> <> IF NOT dbinfo.isOpen THEN RETURN; IF dbinfo.transStarted THEN []¬EndTransactionInt[dbinfo, TRUE]; <> CloseLogs[dbinfo.logs]; CloseIndices[dbinfo.indices]; dbinfo.isOpen ¬ FALSE; IF flush THEN dbinfo.primaryIndex ¬ NIL; }; DescribeI: PUBLIC PROC [dbinfo: OpenDBInfo] RETURNS [info: SchemaInfo] ~ { <> ReverseLogList: PROC [list: LIST OF LogInfo] RETURNS [LIST OF LogInfo] ~ { new, temp: LIST OF LogInfo ¬ NIL; UNTIL list = NIL DO temp ¬ list; list ¬ list.rest; temp.rest ¬ new; new ¬ temp; ENDLOOP; RETURN[new]; }; ReverseIndexList: PROC [list: LIST OF IndexInfo] RETURNS [LIST OF IndexInfo] ~ { new, temp: LIST OF IndexInfo ¬ NIL; UNTIL list = NIL DO temp ¬ list; list ¬ list.rest; temp.rest ¬ new; new ¬ temp; ENDLOOP; RETURN[new]; }; AddIndexInfo: RefTab.EachPairAction = { <<[key: RefTab.Key, val: RefTab.Val] RETURNS [quit: BOOLEAN]>> index: OpenIndexInfo ¬ NARROW[val]; IF index # dbinfo.primaryIndex THEN { sIndex: IndexInfo ¬ NEW[IndexInfoRec ¬ [key: index.key, file: index.filename, order: index.order]]; info.indices ¬ CONS[sIndex, info.indices]; }; RETURN[quit: FALSE]; }; info ¬ NEW[SchemaInfoRec]; info.dbName ¬ dbinfo.dbName; info.logs ¬ NIL; FOR i: LogID IN [0..dbinfo.logs.size) DO IF dbinfo.logs[i] # NIL THEN { log: LogInfo ¬ NEW[LogInfoRec ¬ [id: dbinfo.logs[i].id, file: dbinfo.logs[i].filename]]; info.logs ¬ CONS[log, info.logs]; }; ENDLOOP; info.logs ¬ ReverseLogList[info.logs]; info.indices ¬ NIL; [] ¬ RefTab.Pairs[dbinfo.indices, AddIndexInfo]; info.indices ¬ ReverseIndexList[info.indices]; info.indices ¬ CONS[NEW[IndexInfoRec ¬ [key: dbinfo.primaryIndex.key, file: dbinfo.primaryIndex.filename, order: dbinfo.primaryIndex.order]], info.indices]; }; <> BuildIndicesI: PUBLIC ENTRY PROC [dbinfo: OpenDBInfo] RETURNS [] ~ { ENABLE UNWIND => NULL; <> <> IF NOT dbinfo.isOpen THEN DoOpen[dbinfo]; BuildIndicesWorker[dbinfo, TRUE]; }; BuildIndicesWorker: PROC [dbinfo: OpenDBInfo, checkDuplicates: BOOLEAN ¬ FALSE] RETURNS [] ~ { ENABLE UNWIND => MarkUpdateOnIndices[dbinfo.indices, FALSE]; indexData: IndexEntry; logstream: STREAM; entry: Entry; indexEntry: IndexEntry; saveStreamPosition: INT; EraseIndices[dbinfo.indices]; MarkUpdateOnIndices[dbinfo.indices, TRUE]; FOR log: LogID IN [0..dbinfo.logs.size) DO IF dbinfo.logs[log] # NIL THEN { indexData ¬ [log, 0]; logstream ¬ dbinfo.logs[log].readStream; FOR entry ¬ ReadLogEntryT[dbinfo, log, 0], ReadLogEntryT[dbinfo, log, -1] UNTIL entry = NIL DO SELECT entry.first.type FROM $DELETED, $REPLACED => { -- DeleteEntry (old logging scheme) saveStreamPosition ¬ IO.GetIndex[logstream]; -- before the following read changes the index indexEntry ¬ [log: log, firstByte: Convert.IntFromRope[entry.first.value]]; entry ¬ ReadLogEntryT[dbinfo, indexEntry.log, indexEntry.firstByte]; RemoveEntryFromIndices[dbinfo.indices, entry, indexEntry]; IO.SetIndex[logstream, saveStreamPosition]; }; $DELETE, $REPLACE => { -- DeleteEntry (new logging scheme) saveStreamPosition ¬ IO.GetIndex[logstream]; -- before the following read changes the index indexEntry ¬ GetIndexEntry[index: dbinfo.primaryIndex, value: entry.first.value].data; entry ¬ ReadLogEntryT[dbinfo, indexEntry.log, indexEntry.firstByte]; RemoveEntryFromIndices[dbinfo.indices, entry, indexEntry]; IO.SetIndex[logstream, saveStreamPosition]; }; ENDCASE => { -- WriteEntry IF checkDuplicates THEN { value: AttributeValue ¬ GetAttributeValue[entry, dbinfo.primaryIndex.key]; IF value = NIL THEN { ERROR Error[$NoPrimaryKey, IO.PutFR1["Entry at position %g in log does not contain a primary key.", IO.int[indexData.firstByte]]]; }; value ¬ GetIndexEntry[index: dbinfo.primaryIndex, value: value].actual; IF value # NIL THEN { ERROR Error[$ValueNotUnique, IO.PutFR1["Entry at position %g in log contains a duplicate primary key.", IO.int[indexData.firstByte]]]; }; }; AddEntryToIndices[dbinfo.indices, entry, indexData]; }; <> <> <<};>> IF IO.EndOf[logstream] THEN { ERROR Error[$BadLogEntry, "At end of log. Missing terminator on log entry."]; }; IF NOT IsEndOfEntry[IO.GetChar[logstream]] THEN { ERROR Error[$BadLogEntry, IO.PutFR1["Missing terminator on log entry at position %g in log.", IO.int[IO.GetIndex[logstream]]]]; }; indexData.firstByte ¬ IO.GetIndex[logstream]; ENDLOOP; }; ENDLOOP; MarkUpdateOnIndices[dbinfo.indices, FALSE]; }; CompactLogsI: PUBLIC ENTRY PROC [dbinfo: OpenDBInfo] RETURNS [] ~ { ENABLE { UNWIND => NULL; PFS.Error => ERROR Error[$InternalError, error.explanation]; }; <> firstWritten: BOOL¬TRUE; KeepEntry: PROC [key: BTreeSimple.InternalKey, value: BTreeSimple.Value] RETURNS [continue: BOOLEAN] = { indexEntry: IndexEntry; entry: Entry; TRUSTED { indexEntry ¬ LOOPHOLE[@value[0], IndexPtr]­; }; <> IF dbinfo.logs[indexEntry.log].access # readOnly THEN { entry ¬ ReadLogEntryT[dbinfo, indexEntry.log, indexEntry.firstByte]; [] ¬ WriteLogEntry[newlogs[indexEntry.log].writeStream, entry, firstWritten]; <> }; RETURN[continue: TRUE]; }; newlogs: LogSet; pathName: PFS.PATH; IF NOT dbinfo.isOpen THEN DoOpen[dbinfo]; newlogs ¬ NEW[LogSetRecord[dbinfo.logs.size]]; FOR i: LogID IN [0..dbinfo.logs.size) DO IF dbinfo.logs[i] # NIL AND dbinfo.logs[i].access # readOnly THEN { newlogs[i] ¬ NEW[OpenLogInfoRecord]; pathName ¬ PFS.PathFromRope[dbinfo.logs[i].filename]; newlogs[i].writeStream ¬ PFS.StreamOpen[fileName: pathName, accessOptions: $create, streamBufferParms: logBufferSize ! PFS.Error => ERROR Error[$CantOpenLog, error.explanation]]; }; ENDLOOP; [] ¬ BTreeSimple.EnumerateRecords[tree: dbinfo.primaryIndex.btree, key: NIL, Proc: KeepEntry ! BTree.Error => ERROR Error[$BadIndex, "Problem enumerating primary index."]]; FOR i: LogID IN [0..dbinfo.logs.size) DO IF newlogs[i] # NIL THEN { IF dbinfo.logs[i].access # readOnly THEN MarkUpdateComplete[newlogs[i].writeStream]; IO.Close[newlogs[i].writeStream]; IO.Close[dbinfo.logs[i].readStream]; <> pathName ¬ PFS.PathFromRope[Rope.Concat[dbinfo.logs[i].filename, "!H"]]; dbinfo.logs[i].writeStream ¬ PFS.StreamOpen[fileName: pathName, accessOptions: $write, streamBufferParms: logBufferSize ! PFS.Error => ERROR Error[$CantOpenLog, error.explanation]]; dbinfo.logs[i].readStream ¬ PFS.StreamOpen[fileName: pathName, accessOptions: $read, streamBufferParms: logBufferSize ! PFS.Error => ERROR Error[$CantOpenLog, error.explanation]]; }; ENDLOOP; BuildIndicesWorker[dbinfo]; }; SetRemoteAccessI: PUBLIC ENTRY PROC [dbinfo: OpenDBInfo, accessible: BOOLEAN ¬ TRUE, why: ROPE ¬ NIL] RETURNS [] ~ { ENABLE UNWIND => NULL; <> <> dbinfo.remoteAccess ¬ accessible; dbinfo.statusMsg ¬ why; }; <> <> << WARNING: These operations should not be used on databases that are shared by multiple clients/processes since the transaction includes all operations not just those performed by a particular client. This is particularly true since file flushing is inhibited while a transaction is open, thus widening the window of exposure.>> StartTransactionI: ENTRY PROC [dbinfo: OpenDBInfo, wantAtomic: BOOLEAN ¬ FALSE] = { <> ENABLE UNWIND => NULL; logStream: IO.STREAM; logs: LogSet; streamIndex: LogAddress; IF wantAtomic THEN ERROR Error[$NotImplemented, "Atomic operations are not currently provided."]; <> IF dbinfo.transStarted THEN RETURN; MakeDatabaseWriteable[dbinfo]; dbinfo.transStarted ¬ TRUE; logs ¬ dbinfo.logs; FOR i: LogID IN [0..logs.size) DO <> IF logs[i] = NIL OR logs[i].writeStream=NIL OR logs[i].access # readWrite THEN LOOP; streamIndex ¬ IO.GetIndex[logs[i].writeStream]; IF streamIndex > 0 THEN IO.SetIndex[logs[i].writeStream, streamIndex-1]; ENDLOOP; MarkUpdateOnIndices[dbinfo.indices, TRUE]; }; EndTransactionI: ENTRY PROC [dbinfo: OpenDBInfo, commit: BOOLEAN ¬ TRUE] RETURNS [committed: BOOLEAN] = { ENABLE UNWIND => NULL; RETURN[EndTransactionInt[dbinfo, commit]]; }; EndTransactionInt: INTERNAL PROC [dbinfo: OpenDBInfo, commit: BOOLEAN ¬ TRUE] RETURNS [committed: BOOLEAN¬FALSE] = { IF NOT dbinfo.transStarted THEN RETURN; MarkUpdateOnIndices[dbinfo.indices, FALSE]; dbinfo.transStarted ¬ FALSE; FOR i: LogID IN [0..dbinfo.logs.size) DO <> CompleteLogEntry[dbinfo, i]; ENDLOOP; RETURN[TRUE]; }; <> <" are deemed to contain schema information for the file named in the subsequent line of the DF file or at the end of the same line. The two types of schema entries are as follows:>> <<--> log >> <<--> index >> <<>> SchemaChars: PUBLIC REF READONLY TEXT = "-->"; ParseSchemaLine: PROC [s: STREAM, buffer: REF TEXT, wdir: ROPE] RETURNS [item: REF ANY, needFilename: BOOLEAN ¬ FALSE] ~ { <> ENABLE { IO.Error => ERROR Error[$BadSchema, "Error parsing schema statement."]; IO.EndOfStream => ERROR Error[$BadSchema, "Unexpected end of stream."] }; token: REF TEXT; log: OpenLogInfo; index: OpenIndexInfo; needFilename ¬ FALSE; token ¬ IO.GetToken[s, IO.IDProc, buffer].token; IF NOT RefText.Equal[token, SchemaChars] THEN RETURN[NIL, FALSE]; token ¬ IO.GetToken[s, IO.IDProc, buffer].token; SELECT TRUE FROM RefText.Equal[token, "log", FALSE] => { log ¬ NEW[OpenLogInfoRecord]; log.id ¬ IO.GetInt[s]; token ¬ IO.GetToken[s, IO.IDProc, buffer].token; log.access ¬ IF RefText.Equal[token, "ReadOnly", FALSE] THEN readOnly ELSE readWrite; log.filename ¬ IO.GetTokenRope[s, IO.IDProc ! IO.EndOfStream => {needFilename ¬ TRUE; CONTINUE}].token; IF NOT needFilename THEN { pathName, wdirPathName: PFS.PATH; pathName ¬ PFS.PathFromRope[log.filename]; wdirPathName ¬ PFS.PathFromRope[wdir]; log.filename ¬ PFS.RopeFromPath[PFS.AbsoluteName[pathName, wdirPathName]]; <<<>>> }; item ¬ log; }; RefText.Equal[token, "index", FALSE] => { index ¬ NEW[OpenIndexInfoRecord]; item ¬ index; index.key ¬ Atom.MakeAtom[IO.GetRopeLiteral[s]]; token ¬ IO.GetToken[s, IO.IDProc, buffer].token; index.type ¬ IF RefText.Equal[token, "primary", FALSE] THEN primary ELSE secondary; index.filename ¬ IO.GetTokenRope[s, IO.IDProc ! IO.EndOfStream => {needFilename ¬ TRUE; CONTINUE}].token; IF NOT needFilename THEN { pathName, wdirPathName: PFS.PATH; pathName ¬ PFS.PathFromRope[index.filename]; wdirPathName ¬ PFS.PathFromRope[wdir]; index.filename ¬ PFS.RopeFromPath[PFS.AbsoluteName[pathName, wdirPathName]]; <<<>>> }; item ¬ index; }; ENDCASE => ERROR Error[$BadSchema, "Invalid keyword."]; }; ReadSchema: PROC [dbinfo: OpenDBInfo] RETURNS [] ~ { <> ENABLE DFUtilities.SyntaxError => ERROR Error[$BadSchema, reason]; NewDBInfo: PROC [item: REF ANY, filename: ROPE ¬ NIL] RETURNS [] ~ { WITH item SELECT FROM log: OpenLogInfo => { IF filename # NIL THEN log.filename ¬ filename; dbinfo.logs[log.id] ¬ log; }; index: OpenIndexInfo => { IF filename # NIL THEN index.filename ¬ filename; [] ¬ RefTab.Insert[dbinfo.indices, index.key, index]; IF index.type = primary THEN IF dbinfo.primaryIndex = NIL THEN dbinfo.primaryIndex ¬ index ELSE ERROR Error[$BadSchema, "Multiple primary indices specified."]; }; ENDCASE => ERROR Error[$InternalError]; }; SchemaItemProc: DFUtilities.ProcessItemProc = { <<[item: REF ANY] RETURNS [stop: BOOL _ FALSE]>> WITH item SELECT FROM comment: REF DFUtilities.CommentItem => { IF needFilename THEN ERROR Error[$BadSchema, "No file name specified for some log or index."]; [lastSchemaItem, needFilename] ¬ ParseSchemaLine[IO.RIS[comment.text], buffer, wdir]; IF lastSchemaItem # NIL AND NOT needFilename THEN { NewDBInfo[lastSchemaItem]; } }; file: REF DFUtilities.FileItem => { IF needFilename THEN { fullPathName, wdirPathName: PFS.PATH; pathname: PFS.PATH ¬ PFS.PathFromRope[DFUtilities.RemoveVersionNumber[file.name]]; wdirPathName ¬ PFS.PathFromRope[wdir]; fullPathName ¬ PFS.AbsoluteName[pathname, wdirPathName]; NewDBInfo[lastSchemaItem, PFS.RopeFromPath[fullPathName]]; <<<>>> needFilename ¬ FALSE; }; }; ENDCASE => { IF needFilename THEN ERROR Error[$BadSchema, "No file name specified for some log or index."]; }; }; needFilename: BOOLEAN ¬ FALSE; lastSchemaItem: REF ANY; name: PFS.PATH; schemaStream: STREAM; buffer: REF TEXT = RefText.ObtainScratch[RefText.line]; wdir: ROPE; shortName: ROPE; <<<>>> <<<>>> name ¬ PFS.PathFromRope[dbinfo.dbName]; IF PFSNames.IsAbsolute[name] THEN { prev, loc, length: INT; ropeName: ROPE; prev ¬ 0; loc ¬ 0; ropeName ¬ PFS.RopeFromPath[name]; -- convert Path to rope and manipulate ropes length ¬ Rope.Length[ropeName]; WHILE loc < length DO prev ¬ loc; loc ¬ Rope.SkipTo[s: ropeName, pos: prev+1, skip: "/>]"]; ENDLOOP; shortName ¬ Rope.Substr[base: ropeName, start: prev+1, len: length]; wdir ¬ Rope.Substr[base: ropeName, start: 0, len: prev+1]; } ELSE wdir ¬ PFS.RopeFromPath[PFS.GetWDir[]]; schemaStream ¬ PFS.StreamOpen[PFS.AbsoluteName[name] ! PFS.Error => ERROR Error[$CantOpenSchema, error.explanation]]; dbinfo.indices ¬ RefTab.Create[]; dbinfo.primaryIndex ¬ NIL; dbinfo.logs ¬ NEW[LogSetRecord[LAST[LogID]]]; DFUtilities.ParseFromStream[in: schemaStream, proc: SchemaItemProc, filter: [comments: TRUE]]; RefText.ReleaseScratch[buffer]; IF dbinfo.primaryIndex = NIL THEN ERROR Error[$BadSchema, "No primary index specified."]; }; <> GetAttributeValue: PROC [entry: Entry, type: AttributeType] RETURNS [AttributeValue] ~ { FOR e: Entry ¬ entry, e.rest WHILE e # NIL DO IF e.first.type = type THEN RETURN[e.first.value]; ENDLOOP; RETURN[NIL]; }; ReverseEntry: PROC [entry: Entry] RETURNS[Entry] = { <> l1, l2, l3: Entry ¬ NIL; IF entry = NIL THEN RETURN[NIL]; l3 ¬ entry; UNTIL (l1 ¬ l3) = NIL DO l3 ¬ l3.rest; l1.rest ¬ l2; l2 ¬ l1; ENDLOOP; RETURN[l2]; }; <> <> OpenLogs: PROC [logs: LogSet, access: PFS.AccessOptions ¬ $write] RETURNS [] ~ { FOR i: LogID IN [0..logs.size) DO IF (logs[i] # NIL) AND (logs[i].writeStream = NIL) AND (logs[i].readStream = NIL) THEN { name: ROPE; name ¬logs[i].filename; IF (logs[i].access = readOnly) OR (access = read) THEN { logs[i].readStream ¬ PFS.StreamOpen[fileName: PFS.PathFromRope[name], accessOptions: $read, streamBufferParms: logBufferSize ! PFS.Error => ERROR Error[$CantOpenLog, error.explanation]]; } ELSE { name ¬ Rope.Concat[logs[i].filename, "!H"]; logs[i].writeStream ¬ PFS.StreamOpen[fileName: PFS.PathFromRope[name], accessOptions: $write, streamBufferParms: logBufferSize ! PFS.Error => ERROR Error[$CantOpenLog, error.explanation]]; IO.SetIndex[logs[i].writeStream, IO.GetLength[logs[i].writeStream]]; -- $$PerfA$$ <> logs[i].readStream ¬ PFS.StreamOpen[fileName: PFS.PathFromRope[name], accessOptions: $read, streamBufferParms: logBufferSize ! PFS.Error => ERROR Error[$CantOpenLog, error.explanation]]; }; }; ENDLOOP; }; IsEndOfAttribute: PROC [ch: CHAR] RETURNS [BOOLEAN] ~ { <> RETURN[(ch = '\r) OR (ch = '\l)]; }; IsEndOfEntry: PROC [ch: CHAR] RETURNS [BOOLEAN] ~ { <> RETURN[(ch = '\r) OR (ch = '\l)]; }; ReadLogEntryT: PROC [dbinfo: OpenDBInfo, log: LogID, byte: LogAddress] RETURNS [Entry] ~ { logStream: STREAM ¬ dbinfo.logs[log].readStream; IF logStream = NIL THEN {ERROR Error[$CantOpenLog]}; RETURN[ReadEntryDo[dbinfo.logs[log].readStream, byte, IF dbinfo.transStarted THEN dbinfo.logs[log].writeStream ELSE NIL]]; }; ReadLogEntry: PUBLIC PROC [logStream: STREAM, byte: LogAddress] RETURNS [Entry] ~ { RETURN[ReadEntryDo[logStream, byte]]; }; ReadEntryDo: PROC [logStream: STREAM, byte: LogAddress, writeStream: STREAM¬NIL] RETURNS [Entry] ~ { <> flushFirst: BOOL ¬ FALSE; { ENABLE { IO.EndOfStream => IF writeStream=NIL THEN ERROR Error[$BadLogEntry] ELSE { flushFirst ¬ TRUE; RETRY; }; -- See discussion below IO.Error => IF writeStream=NIL THEN ERROR Error[$BadIndex] ELSE { flushFirst ¬ TRUE; RETRY; }; }; AttributeBreakProc: IO.BreakProc = { -- [char: CHAR] RETURNS [IO.CharClass] RETURN[SELECT char FROM ': => sepr, ENDCASE => other] }; ReadAttribute: PROC [s: STREAM] RETURNS [a: Attribute] ~ { char: CHAR ¬ ' ; a.type ¬ Atom.MakeAtom[IO.GetTokenRope[s, AttributeBreakProc].token]; [] ¬ IO.GetChar[s]; -- attribute separation char WHILE char=' OR char='\t DO char ¬ IO.GetChar[s]; ENDLOOP; -- skip whitespace IO.Backup[s, char]; IF char = '" THEN { a.value ¬ IO.GetRopeLiteral[s]; [] ¬ IO.GetLineRope[s]; } ELSE a.value ¬ IO.GetLineRope[s]; }; entry: Entry; attribute: Attribute; IF logStream = NIL THEN {ERROR Error[$CantOpenLog]}; IF flushFirst THEN { IO.Flush[writeStream]; writeStream ¬ NIL; }; <> IF byte >= 0 THEN IO.SetIndex[logStream, byte] ELSE byte ¬ IO.GetIndex[logStream]; IF IO.PeekChar[logStream] = UpdateComplete THEN RETURN[NIL]; -- at end of log entry ¬ NIL; <> UNTIL IsEndOfEntry[IO.PeekChar[logStream]] DO attribute ¬ ReadAttribute[logStream]; entry ¬ CONS[attribute, entry]; ENDLOOP; <> RETURN[ReverseEntry[entry]]; }; }; WriteLogEntry: PUBLIC PROC [logStream: STREAM, entry: Entry, continuation: BOOLEAN ¬ FALSE] RETURNS [LogAddress] ~ { NeedsQuoting: PROC [rope: ROPE] RETURNS [BOOLEAN] ~ { RETURN [Rope.SkipTo[s: rope, skip: "\l\r\\\""] < Rope.Length[rope]]; }; PutRopeLiteral: PROC [stream: IO.STREAM, rope: ROPE] ~ { Action: PROC [c: CHAR] RETURNS [quit: BOOL ¬ FALSE] ~ { IF c = '" OR c='\\ THEN IO.PutChar[stream, '\\]; IF c IN [' .. '~] THEN IO.PutChar[stream, c] ELSE IO.PutF1[stream, "\\%03b", [cardinal[ORD[c]]]]; }; IO.PutChar[stream, '"]; [] ¬ Rope.Map[base: rope, action: Action]; IO.PutChar[stream, '"]; }; WriteAttribute: PROC [s: STREAM, a: Attribute] RETURNS [] ~ { IO.PutRope[s, Atom.GetPName[a.type]]; IO.PutRope[s, ": "]; IF NeedsQuoting[a.value] THEN PutRopeLiteral[s, a.value] ELSE IO.PutRope[s, a.value]; IO.PutRope[s, "\n"]; }; <<>> byteOffset: LogAddress; IF logStream = NIL THEN {ERROR Error[$CantOpenLog]; }; byteOffset ¬ IO.GetIndex[logStream]; -- DCS index and length should be the same <> IF byteOffset > 0 AND NOT continuation THEN { byteOffset ¬ byteOffset - 1; --remove update marker from end of log IO.SetIndex[logStream, byteOffset]; }; <> FOR e: Entry ¬ entry, e.rest WHILE e # NIL DO WriteAttribute[logStream, e.first]; ENDLOOP; IO.PutChar[logStream, EndOfEntry]; <> RETURN[byteOffset]; }; MarkUpdateComplete: PUBLIC PROC [writeStream: STREAM] RETURNS [] ~ { <> <> <> IO.PutChar[writeStream, UpdateComplete]; IO.Flush[writeStream]; }; CompleteLogEntry: PROC [dbinfo: OpenDBInfo, index: LogID, forceFlush: BOOL¬FALSE] ~ { <> writeStream: STREAM; IF dbinfo.logs[index]=NIL THEN RETURN; writeStream ¬ dbinfo.logs[index].writeStream; IF writeStream=NIL THEN RETURN; IF NOT dbinfo.transStarted THEN IO.PutChar[writeStream, UpdateComplete]; IF dbinfo.transStarted AND NOT forceFlush THEN RETURN; IO.Flush[writeStream]; IF dbinfo.logs[index].readStream#NIL THEN IO.Flush[dbinfo.logs[index].readStream]; -- so readers see most recent changes }; CloseLogs: PROC [logs: LogSet] RETURNS [] ~ { FOR i: LogID IN [0..logs.size) DO IF (logs[i] # NIL) THEN { IF (logs[i].readStream # NIL) THEN { IO.Flush[logs[i].readStream]; IO.Close[logs[i].readStream]; logs[i].readStream ¬ NIL }; IF (logs[i].writeStream # NIL) THEN { IO.Flush[logs[i].writeStream]; IO.Close[logs[i].writeStream]; logs[i].writeStream ¬ NIL; }; }; ENDLOOP; }; <> <> OpenIndices: PROC [indices: IndexSet, access: PFS.AccessOptions ¬ $write] RETURNS [needToRebuild: BOOLEAN] ~ { ENABLE { PFS.Error => ERROR Error[$CantOpenIndex, "LoganBerry doesn't work in VUX directories!"]}; OpenIndex: RefTab.EachPairAction = { <<[key: RefTab.Key, val: RefTab.Val] RETURNS [quit: BOOL]>> SetBackingStream: PROC [] ~ { name: ROPE ¬ index.filename; IF access = $read THEN { index.readBacking ¬ PFS.StreamOpen[fileName: PFS.PathFromRope[name], accessOptions: $read]; } ELSE { name ¬ Rope.Concat[name, "!H"]; index.writeBacking ¬ PFS.StreamOpen[fileName: PFS.PathFromRope[name], accessOptions: $write]; index.readBacking ¬ PFS.StreamOpen[fileName: PFS.PathFromRope[name], accessOptions: $read]; }; }; init: BOOLEAN ¬ FALSE; index: OpenIndexInfo ¬ NARROW[val]; IF index.btree = NIL THEN { index.btree ¬ BTreeSimple.New[]; }; IF BTree.GetState[index.btree].state # open THEN { SetBackingStream[ ! PFS.Error => { -- problem opening btree so create new one IF error.group = $user AND error.code = $unknownFile THEN { name: ROPE; init ¬ TRUE; needToRebuild ¬ TRUE; name ¬ Rope.Concat[index.filename, "!H"]; index.writeBacking ¬ PFS.StreamOpen[fileName: PFS.PathFromRope[name], accessOptions: $write]; index.readBacking ¬ PFS.StreamOpen[fileName: PFS.PathFromRope[name], accessOptions: $read]; CONTINUE; } ELSE ERROR Error[$CantOpenIndex, error.explanation]; } ]; BTreeSimple.Open[tree: index.btree, readFile: index.readBacking, writeFile: index.writeBacking, pageSize: btreePageSize, cacheSize: btreeCacheSize, initialize: init ! BTree.Error => ERROR Error[$BadIndex, Rope.Cat["Bad index for ", Atom.GetPName[NARROW[key]], "."]]; BTree.UpdateInProgress => { -- scrap btree and try again name: ROPE; init ¬ TRUE; needToRebuild ¬ TRUE; name ¬ Rope.Concat[index.filename, "!H"]; index.writeBacking ¬ PFS.StreamOpen[fileName: PFS.PathFromRope[name], accessOptions: $write]; index.readBacking ¬ PFS.StreamOpen[fileName: PFS.PathFromRope[name], accessOptions: $read]; RETRY;} ]; }; RETURN[quit: FALSE]; }; needToRebuild ¬ FALSE; [] ¬ RefTab.Pairs[indices, OpenIndex]; RETURN[needToRebuild]; }; GetIndexEntry: PROC [index: OpenIndexInfo, value: AttributeValue, valueIsKey: BOOLEAN ¬ FALSE, relation: BTreeSimple.Relation ¬ equal, pathStk: BTreeSimple.PathStk ¬ NIL] RETURNS [actual: AttributeValue, data: IndexEntry] ~ { <> btreeValue: BTreeSimple.Value; useKey: AttributeValue ¬ value; useRelation: BTreeSimple.Relation ¬ relation; IF NOT valueIsKey AND index.type = secondary THEN SELECT relation FROM less => useKey ¬ ValueToSmallKey[value]; lessEqual => useKey ¬ ValueToLargeKey[value]; equal => { --start small and look upwards useKey ¬ ValueToSmallKey[value]; useRelation ¬ greaterEqual; }; greaterEqual => useKey ¬ ValueToSmallKey[value]; greater => useKey ¬ ValueToLargeKey[value]; ENDCASE; [actual, btreeValue] ¬ BTreeSimple.ReadRecord[tree: index.btree, key: useKey, relation: useRelation, pathStk: pathStk, useExistingPath: pathStk # NIL ! BTree.Error => ERROR Error[$BadIndex]]; IF NOT valueIsKey AND index.type = secondary AND relation = equal AND NOT Rope.Equal[value, UniqueKeyToValue[actual], FALSE] THEN -- found entry larger than desired actual ¬ NIL; IF btreeValue # NIL THEN TRUSTED { data ¬ LOOPHOLE[@btreeValue[0], IndexPtr]­; }; RETURN[actual, data]; }; CreateIndexEntry: PROC [index: OpenIndexInfo, value: AttributeValue, data: IndexEntry] RETURNS [] ~ { btreeValue: BTreeSimple.Value; <> btreeValue ¬ NEW[BTreeSimple.ValueObject[WORDS[IndexEntry]]]; <> TRUSTED { LOOPHOLE[@btreeValue[0], IndexPtr]­ ¬ data; }; BTreeSimple.UpdateRecord[tree: index.btree, key: IF index.type = primary THEN value ELSE ValueToUniqueKey[value, data], value: btreeValue ! BTree.Error => ERROR Error[$BadIndex]]; }; DeleteIndexEntry: PROC [index: OpenIndexInfo, value: AttributeValue, data: IndexEntry] RETURNS [] ~ { [] ¬ BTreeSimple.DeleteKey[tree: index.btree, key: IF index.type = primary THEN value ELSE ValueToUniqueKey[value, data] ! BTree.Error => ERROR Error[$BadIndex]]; }; AddEntryToIndices: PROC [indices: IndexSet, entry: Entry, data: IndexEntry] RETURNS [] ~ { index: OpenIndexInfo; FOR e: Entry ¬ entry, e.rest WHILE e # NIL DO index ¬ NARROW[RefTab.Fetch[indices, e.first.type].val]; IF index # NIL THEN { -- index key in entry so add to index CreateIndexEntry[index, e.first.value, data]; }; ENDLOOP; }; RemoveEntryFromIndices: PROC [indices: IndexSet, entry: Entry, data: IndexEntry] RETURNS [] ~ { index: OpenIndexInfo; FOR e: Entry ¬ entry, e.rest WHILE e # NIL DO index ¬ NARROW[RefTab.Fetch[indices, e.first.type].val]; IF index # NIL THEN { -- index key in entry so remove from index DeleteIndexEntry[index, e.first.value, data]; }; ENDLOOP; }; EraseIndices: PROC [indices: IndexSet] RETURNS [] ~ { EraseIndex: RefTab.EachPairAction = { <<[key: RefTab.Key, val: RefTab.Val] RETURNS [quit: BOOL]>> name: ROPE; index: OpenIndexInfo ¬ NARROW[val]; IF index.btree = NIL THEN RETURN[quit: FALSE]; <> IF BTree.GetState[index.btree].state # closed THEN { BTreeSimple.SetState[index.btree, closed]; IO.Close[index.readBacking]; index.readBacking ¬ NIL; IF index.writeBacking ~= NIL THEN { IO.Flush[index.writeBacking]; IO.Close[index.writeBacking]; index.writeBacking ¬ NIL; }; }; name ¬ Rope.Concat[index.filename, "!H"]; index.writeBacking ¬ PFS.StreamOpen[fileName: PFS.PathFromRope[name], accessOptions: $write]; index.readBacking ¬ PFS.StreamOpen[fileName: PFS.PathFromRope[name], accessOptions: $read]; <> BTreeSimple.Open[tree: index.btree, readFile: index.readBacking, writeFile: index.writeBacking, pageSize: btreePageSize, cacheSize: btreeCacheSize, initialize: TRUE]; RETURN[quit: FALSE]; }; [] ¬ RefTab.Pairs[indices, EraseIndex]; }; MarkUpdateOnIndices: PROC [indices: IndexSet, updateInProgress: BOOLEAN] RETURNS [] ~ { MarkIndex: RefTab.EachPairAction = { <<[key: RefTab.Key, val: RefTab.Val] RETURNS [quit: BOOL]>> index: OpenIndexInfo ¬ NARROW[val]; IF index.btree # NIL AND BTree.GetState[index.btree].state # closed THEN { BTreeSimple.SetUpdateInProgress[index.btree, updateInProgress]; }; RETURN[quit: FALSE]; }; [] ¬ RefTab.Pairs[indices, MarkIndex]; }; CloseIndices: PROC [indices: IndexSet] RETURNS [] ~ { CloseIndex: RefTab.EachPairAction = { <<[key: RefTab.Key, val: RefTab.Val] RETURNS [quit: BOOL]>> index: OpenIndexInfo ¬ NARROW[val]; IF index.btree # NIL AND BTree.GetState[index.btree].state # closed THEN { BTreeSimple.SetState[index.btree, closed]; IO.Flush[self: index.readBacking]; IO.Close[self: index.readBacking]; index.readBacking ¬ NIL; IF index.writeBacking ~= NIL THEN { IO.Flush[self: index.writeBacking]; IO.Close[self: index.writeBacking]; index.writeBacking ¬ NIL}; }; RETURN[quit: FALSE]; }; [] ¬ RefTab.Pairs[indices, CloseIndex]; }; <> IsUniqueKey: PROC [key: ROPE] RETURNS [BOOLEAN] ~ INLINE { RETURN[Rope.Find[key, "\000"] # -1]; }; ValueToUniqueKey: PROC [value: AttributeValue, data: IndexEntry] RETURNS [ROPE] ~ INLINE { RETURN[Rope.Cat[value, "\000", Convert.RopeFromInt[data.log], " ", Convert.RopeFromInt[data.firstByte]]]; }; ValueToSmallKey: PROC [value: AttributeValue] RETURNS [ROPE] ~ INLINE { RETURN[Rope.Concat[value, "\000\000"]]; }; ValueToLargeKey: PROC [value: AttributeValue] RETURNS [ROPE] ~ INLINE { RETURN[Rope.Concat[value, "\000\377"]]; }; UniqueKeyToValue: PROC [key: ROPE] RETURNS [AttributeValue] ~ INLINE { RETURN[Rope.Substr[key, 0, Rope.Index[key, 0, "\000"]]]; }; <> Recover: PROC [dbinfo: OpenDBInfo] RETURNS [needToRebuild: BOOLEAN] ~ { needToRebuild ¬ FALSE; FOR i: LogID IN [0..dbinfo.logs.size) DO IF (dbinfo.logs[i] # NIL) AND (dbinfo.logs[i].access # readOnly) THEN { IF NOT InGoodState[dbinfo.logs[i].readStream] THEN { MakeDatabaseWriteable[dbinfo]; IF RestoreLog[dbinfo.logs[i].readStream, dbinfo.logs[i].writeStream] THEN needToRebuild ¬ TRUE; }; }; ENDLOOP; RETURN[needToRebuild]; }; InGoodState: PROC [logStream: STREAM] RETURNS [BOOLEAN] ~ { <> <> length: INT; thirdToLastByte, secondToLastByte, lastByte: CHAR; length ¬ IO.GetLength[logStream]; SELECT length FROM =1 => { -- empty log? IO.SetIndex[logStream, 0]; lastByte ¬ IO.GetChar[logStream]; RETURN[lastByte = UpdateComplete]; }; < 3 => { -- log just created (or too short to be valid) RETURN[FALSE]; }; >= 3 => { -- look for valid end of entry IO.SetIndex[logStream, length-3]; thirdToLastByte ¬ IO.GetChar[logStream]; secondToLastByte ¬ IO.GetChar[logStream]; lastByte ¬ IO.GetChar[logStream]; RETURN[IsEndOfAttribute[thirdToLastByte]AND IsEndOfEntry[secondToLastByte] AND (lastByte = UpdateComplete)]; }; ENDCASE => ERROR Error[$InternalError, "Shouldn't get here."]; }; RestoreLog: PROC [readStream, writeStream: STREAM] RETURNS [needToRebuild: BOOLEAN] ~ { <> logAddress: INT; prevByte, byte: CHAR; lastEntry: Entry; logAddress ¬ IO.GetLength[readStream]-2; IF logAddress <= 0 THEN { -- log just created (or too short to be valid) IO.SetLength[writeStream, 0]; IO.SetIndex[writeStream, 0]; -- $$PerfA$$ total paranoia! MarkUpdateComplete[writeStream]; RETURN[TRUE]; }; IO.SetIndex[readStream, logAddress]; prevByte ¬ IO.GetChar[readStream]; byte ¬ IO.GetChar[readStream]; IF IsEndOfEntry[byte] AND IsEndOfAttribute[prevByte] THEN { <> needToRebuild ¬ TRUE; } ELSE { <> UNTIL (IsEndOfEntry[byte] AND IsEndOfAttribute[prevByte]) OR (logAddress = 0) DO byte ¬ prevByte; logAddress ¬ logAddress - 1; IO.SetIndex[readStream, logAddress]; prevByte ¬ IO.GetChar[readStream]; ENDLOOP; IO.SetLength[writeStream, IF logAddress = 0 THEN 0 ELSE logAddress + 2]; IO.Flush[readStream]; -- to notice new file length needToRebuild ¬ FALSE; }; <> IF logAddress # 0 THEN { <> byte ¬ prevByte; logAddress ¬ logAddress - 1; IO.SetIndex[readStream, logAddress]; prevByte ¬ IO.GetChar[readStream]; UNTIL (IsEndOfEntry[byte] AND IsEndOfAttribute[prevByte]) OR (logAddress = 0) DO byte ¬ prevByte; logAddress ¬ logAddress - 1; IO.SetIndex[readStream, logAddress]; prevByte ¬ IO.GetChar[readStream]; ENDLOOP; IF logAddress # 0 THEN logAddress ¬ logAddress + 2; lastEntry ¬ ReadLogEntry[readStream, logAddress]; IF lastEntry.first.type = $REPLACED OR lastEntry.first.type = $REPLACE THEN { IO.SetLength[writeStream, logAddress]; IO.Flush[readStream]; -- to notice new file length needToRebuild ¬ FALSE; }; }; <> IO.SetIndex[writeStream, IO.GetLength[writeStream]]; -- $$PerfA$$ total paranoia! MarkUpdateComplete[writeStream]; RETURN[needToRebuild]; }; IndicesOutOfDate: PROC [dbinfo: OpenDBInfo] RETURNS [BOOLEAN] ~ { <> fileDate: BasicTime.GMT; indexDate: BasicTime.GMT; uniqueID: PFS.UniqueID; uniqueID ¬ PFS.FileInfo[name: PFS.PathFromRope[dbinfo.primaryIndex.filename]].uniqueID; indexDate ¬ uniqueID.egmt.gmt; <<<>>> FOR i: LogID IN [0..dbinfo.logs.size) DO IF dbinfo.logs[i] # NIL THEN { uniqueID ¬ PFS.FileInfo[name: PFS.PathFromRope[dbinfo.logs[i].filename]].uniqueID; fileDate ¬ uniqueID.egmt.gmt; <<<>>> IF BasicTime.Period[from: indexDate, to: fileDate] > 0 THEN RETURN[TRUE]; }; ENDLOOP; RETURN[FALSE]; }; <> MakeDatabaseWriteable: PROC [dbinfo: OpenDBInfo] RETURNS [] ~ { <> IF dbinfo.openForUpdate THEN RETURN; CloseLogs[dbinfo.logs]; CloseIndices[dbinfo.indices]; OpenLogs[dbinfo.logs, $write]; [] ¬ OpenIndices[dbinfo.indices, $write]; dbinfo.openForUpdate ¬ TRUE; }; <> <> OpenDBTable: SymTab.Ref; OpenDBTableSize: NAT = 2039; <<2039 is a good candidate because it is prime and is large enough that collisions should be rare.>> GetSharedDB: PROC [dbName: ROPE] RETURNS [dbinfo: OpenDBInfo] ~ { <> <<>> fname, key: ROPE; fullName: PFS.PATH; fullName ¬ PFS.FileInfo[PFS.PathFromRope[dbName] ! PFS.Error => ERROR Error[$CantOpenSchema, error.explanation]].fullFName; fname ¬ PFS.RopeFromPath[fullName]; <<< ERROR Error[$CantOpenSchema, error.explanation]].fullFName;>> >> key ¬ DFUtilities.RemoveVersionNumber[fname]; dbinfo ¬ NARROW[SymTab.Fetch[OpenDBTable, key].val]; IF dbinfo = NIL THEN { dbinfo ¬ NEW[OpenDBRecord]; dbinfo.dbName ¬ fname; dbinfo.isOpen ¬ FALSE; dbinfo.remoteAccess ¬ TRUE; dbinfo.openForUpdate ¬ FALSE; dbinfo.transStarted ¬ FALSE; dbinfo.statusMsg ¬ "Open for service."; dbinfo.primaryIndex ¬ NIL; IF NOT SymTab.Insert[OpenDBTable, key, dbinfo] THEN -- lost race dbinfo ¬ NARROW[SymTab.Fetch[OpenDBTable, key].val]; }; }; FlushOpenTable: PROC [dbinfo: OpenDBInfo ¬ NIL] = { <> <> <> FlushDB: SymTab.EachPairAction = { DoClose[NARROW[val], TRUE]; }; IF dbinfo # NIL THEN DoClose[dbinfo, TRUE] ELSE [] ¬ SymTab.Pairs[OpenDBTable, FlushDB]; }; <> <> <<>> cachedReadEnabled: BOOL¬TRUE; RRC: TYPE ~ RECORD[ remoteReadCache: PROC[LoganBerry.OpenDB, AttributeValue] RETURNS [Entry] ]; GetCachedEntry: PROC[dbinfo: OpenDBInfo, value: AttributeValue] RETURNS [entry: Entry¬NIL] ~ TRUSTED INLINE { temp: REF; rrc: REF RRC; IF ~cachedReadEnabled THEN RETURN; temp ¬ Atom.GetProp[$LoganBerry, $RemoteReadCache]; IF temp=NIL THEN RETURN; rrc ¬ LOOPHOLE[temp]; entry ¬ rrc.remoteReadCache[RefID.Reseal[dbinfo], value]; }; <> class: LoganBerryClass.Class; <> OpenDBTable ¬ NARROW[Atom.GetProp[$LoganBerry, $OpenDBTable]]; <<>> IF OpenDBTable = NIL THEN { OpenDBTable ¬ SymTab.Create[mod: OpenDBTableSize, case: FALSE]; Atom.PutProp[$LoganBerry, $OpenDBTable, OpenDBTable]; }; <> class ¬ NEW[LoganBerryClass.ClassObject ¬ [ name: $LoganBerry, open: Open, describe: Describe, readEntry: ReadEntry, enumerateEntries: EnumerateEntries, generateEntries: GenerateEntries, nextEntry: NextEntry, endGenerate: EndGenerate, writeEntry: WriteEntry, deleteEntry: DeleteEntry, close: Close, buildIndices: BuildIndices, compactLogs: CompactLogs, isLocal: IsLocal, startTransaction: StartTransaction, endTransaction: EndTransaction, flushDBCache: FlushDBCache, classData: NIL ]]; LoganBerryClass.Register[name: class.name, class: class, tryLast: TRUE]; END. <> <> <> <> <> <> <> <> <> <> <> <> <> <> <> <> <> <> <> <> <> <> <> <> <> <> <> <> <> <> <> <> <> <> <> <> <>>> <> <> <> <> <> <> <> <> <> <> <> <> <> <<>> <> <<>> <> <<>> <> <<>> <dbinfo mapping, but to close the DB and force a reread of the DB schema the next time the DB is opened. A full restart will be required whenever the interface changes, or whenever some part of the file name access path is visibly changed. A small price to pay.>> <<>> <> <<>>