File: DBModelRelationImpl.mesa
Copyright © 1985 by Xerox Corporation. All rights reserved.
Contents: Implementation of DBModel, DBModelPrivate operations on relations.
Last edited by:
Donahue, August 20, 1986 8:32:35 am PDT
DIRECTORY
BasicTime USING[GMT, earliestGMT, nullGMT, latestGMT],
CardTable,
DBCommon,
DBStorage,
DBStorageField USING[NWordsInField],
DBDefs,
DB,
DBModel,
DBModelPrivate,
Rope,
SymTab;
DBModelRelationImpl: CEDAR PROGRAM
IMPORTS BasicTime, CardTable, DB, DBDefs, DBStorage, DBStorageField, DBModel, DBModelPrivate, Rope, SymTab
EXPORTS DBModel, DBModelPrivate =
BEGIN OPEN DBCommon, DBDefs;
0. Constants used to encode surrogate/property information
RelationType: TYPE = {normal, property, surrogate};
1. Relationship creation/destruction/field access
CreateRelship: PUBLIC PROC[r: Relation, init: ValueSequence] RETURNS[t: Relship] = {
KeyRecord: TYPE ~ RECORD[key: Rope.ROPE, handle: IndexHandle];
keyList: LIST OF KeyRecord;
CheckRelationKey[r];
IF NOT r.indicesKnown THEN DBModelPrivate.CacheIndices[r];
IF NOT r.attributesKnown THEN CacheAttributes[r];
Check to see that all of the types match
IF init.count # r.attributes.count THEN ERROR DB.Error[WrongNumberOfFields];
FOR i: CARDINAL IN [0..init.count) DO
IF NOT CheckAttribute[attr: r.attributes[i], val: init[i]] THEN ERROR DB.Error[MismatchedAttributeValueType];
ENDLOOP;
All the types match, now check the keys
If this is a property, check to see that the tuple doesn't already exist in the relation
IF r.isProperty THEN TRUSTED {
entity: entity Value = LOOPHOLE[init[0]];
IF PropForEntity[r: r, entity: entity.value, doCheck: FALSE] # NIL THEN
ERROR DB.Error[NonUniqueKeyValue] };
FOR k: LIST OF DBDefs.IndexHandle ← r.keys, k.rest UNTIL k = NIL DO
thisIndex: DBDefs.FieldSequence = k.first.index;
thisKey: Rope.ROPE;
FOR i: CARDINAL IN [0..thisIndex.count) DO
thisKey ← Rope.Concat[thisKey, DBModelPrivate.NCode[init[thisIndex[i]]]]
ENDLOOP;
keyList ← CONS[[key: thisKey, handle: k.first], keyList];
ENDLOOP;
FOR l: LIST OF KeyRecord ← keyList, l.rest UNTIL l = NIL DO
IF DBModelPrivate.CheckKeyInIndex[l.first.handle, l.first.key] THEN ERROR DB.Error[NonUniqueKeyValue];
ENDLOOP;
Create the tuple and set the fields
TRUSTED BEGIN
theTuple: TupleHandle = IF r.isSurrogate
THEN LOOPHOLE[init[0], entity Value].value.tuple
ELSE DBStorage.CreateTuple[r.tupleSet];
t ← NEW[RelshipObject ← [relation: r, handle: theTuple]];
FOR i: CARDINAL IN [IF r.isSurrogate THEN 1 ELSE 0 .. r.attributes.count) DO
SetField[theTuple, r.attributes[i].fh, init[i]]
ENDLOOP;
If it's a surrogate, then it needs to be added to the surrogate group for the relation
IF r.isSurrogate THEN
DBModelPrivate.SetTupleField[theTuple, r.surrogateGroupHandle, r.tupleSet]
END;
Add all of the key fields for the tuple
FOR l: LIST OF KeyRecord ← keyList, l.rest UNTIL l = NIL DO
DBStorage.InsertIntoIndex[l.first.handle.indexHandle, l.first.key, t.handle]
ENDLOOP;
Finally, add any other index entries that should be created for the tuple
FOR l: LIST OF IndexHandle ← r.indices, l.rest UNTIL l = NIL DO
DBModelPrivate.AddTupleToIndex[t.handle, r, l.first]
ENDLOOP };
CheckTypes: PROC [r: Relation, fields: FieldSequence, val: ValueSequence] RETURNS [legal: BOOL] ~ {
Check that the value sequence specified is type-consistent with the field specification given in the Index
legal ← TRUE;
IF fields.count # val.count THEN { legal ← FALSE; RETURN };
FOR i: CARDINAL IN [0..fields.count) DO
legal ← legal AND CheckAttribute[r.attributes[fields[i]], val[i]]
ENDLOOP
};
CheckAttribute: PROC [attr: REF AttributeObject, val: Value] RETURNS [legal: BOOL] ~ {
Check that the value is type-consistent with the attribute specification
WITH val SELECT FROM
v: entity Value => IF attr.type = anyDomainType THEN legal ← TRUE
ELSE legal ← DBModelPrivate.CompatibleTypes[v.value.domain, attr.type];
v: null Value => legal ← TRUE;
ENDCASE => legal ← ORD[val.type] = attr.type.code };
IsLegalIndex: PUBLIC PROC [r: Relation, index: Index, checkKeysOnly: BOOLFALSE] RETURNS [legal: BOOL] ~ {
RETURN[r.key = index.key AND (NOT checkKeysOnly OR index.isKey)] };
EnsureUniqueKey: PROC [i: IndexHandle, tuple: TupleHandle, r: Relation, field: CARDINAL, v: Value] RETURNS [yes: BOOL] ~ TRUSTED {
indexRope: Rope.ROPE;
Ensure that there currently is no entry in the index i for the tuple t with field f having value v
FOR j: CARDINAL IN [0..i.index.count) DO
value: Value = IF i.index[j] = field THEN v ELSE PrimitiveGetF[tuple, r, field];
indexRope ← Rope.Concat[indexRope, DBModelPrivate.NCode[value]]
ENDLOOP;
Now, we have the key; check it in the index
yes ← NOT DBModelPrivate.CheckKeyInIndex[i, indexRope];
};
LookupRelship: PUBLIC PROC[r: Relation, key: Index, val: ValueSequence] RETURNS[t: Relship] = {
keyRope: Rope.ROPE;
tuple: DBStorage.TupleHandle;
IF NOT r.attributesKnown THEN CacheAttributes[r];
IF NOT r.indicesKnown THEN DBModelPrivate.CacheIndices[r];
IF NOT IsLegalIndex[r: r, index: key, checkKeysOnly: TRUE]
THEN ERROR DB.Error[IllegalIndex];
Check to see that the types of the value sequence and the key match
IF NOT CheckTypes[r, key.fields, val] THEN ERROR DB.Error[MismatchedAttributeValueType];
Check the index to see that the tuple exists: note that because we are only searching keys, at most one tuple will be found
FOR i: CARDINAL IN [0..val.count) DO
keyRope ← Rope.Concat[keyRope, DBModelPrivate.NCode[val[i]]]
ENDLOOP;
tuple ← DBModelPrivate.GetNamedTuple[key.index, keyRope];
RETURN[IF tuple = NIL THEN NIL ELSE NEW[RelshipObject ← [relation: r, handle: tuple]]]
};
PropForEntity: PROC[r: Relation, entity: Entity, doCheck: BOOLTRUE] RETURNS[handle: TupleHandle] = {
IF doCheck THEN {
IF NOT r.isProperty THEN ERROR DB.Error[IllegalProperty];
IF NOT DBModelPrivate.CompatibleTypes[entity.domain, r.attributes[0].type]
THEN ERROR DB.Error[MismatchedAttributeValueType] };
IF r.isSurrogate THEN
IF DBModelPrivate.GetTupleField[entity.tuple, r.surrogateGroupHandle] # NIL
THEN RETURN[entity.tuple] ELSE RETURN[NIL];
handle ← DBStorage.TupleForField[entity.tuple, r.attributes[0].fh] };
LookupProperty: PUBLIC PROC[r: Relation, e: Entity] RETURNS[relship: Relship] = {
CheckRelationKey[r];
IF NOT r.attributesKnown THEN CacheAttributes[r];
BEGIN
handle: TupleHandle = PropForEntity[r, e];
IF handle = NIL THEN RETURN[NIL]
ELSE RETURN[NEW[RelshipObject ← [relation: r, handle: handle]]]
END };
DestroyRelship: PUBLIC PROC[t: Relship] = {
r: Relation = t.relation;
tuple: TupleHandle = t.handle;
CheckRelationKey[r];
DBModelPrivate.CheckForNull[tuple];
IF NOT r.attributesKnown THEN CacheAttributes[r];
DestroyRelshipTuple[r: r, t: tuple] };
DestroyRelshipTuple: PUBLIC PROC[t: TupleHandle, r: Relation] = {
Nullify all of the TID and Rope fields; if the relation is not a surrogate, then also destroy the tuple; we assume that the attribute information is cached at this point
IF NOT r.indicesKnown THEN DBModelPrivate.CacheIndices[r];
Remove the tuple from any indices that it may participate in
FOR indices: LIST OF IndexHandle ← r.keys, indices.rest UNTIL indices = NIL DO
DBModelPrivate.DeleteTupleFromIndex[t, r, indices.first]
ENDLOOP;
FOR indices: LIST OF IndexHandle ← r.indices, indices.rest UNTIL indices = NIL DO
DBModelPrivate.DeleteTupleFromIndex[t, r, indices.first]
ENDLOOP;
FOR i: CARDINAL IN [IF r.isSurrogate THEN 1 ELSE 0 .. r.attributes.count) DO
IF r.attributes[i].type = DBDefs.ropeType THEN
DBModelPrivate.SetRopeField[t, r.attributes[i].fh, NIL]
ELSE IF NOT IsPrimitive[r.attributes[i].type] THEN
DBModelPrivate.NullifyTupleField[t, r.attributes[i].fh]
ENDLOOP;
IF NOT r.isSurrogate THEN DBStorage.DestroyTuple[t]
ELSE DBModelPrivate.NullifyTupleField[t, r.surrogateGroupHandle] };
CopyRelship: PUBLIC PROC[t: Relship] RETURNS[Relship] = {
IF t = NIL THEN RETURN[NIL];
RETURN[NEW[RelshipObject ← [relation: t.relation, handle: DBModelPrivate.CopyTupleHandle[t.handle]]]] };
SetF: PUBLIC PROC[t: Relship, field: CARDINAL, v: Value] = TRUSTED {
IF t = NIL OR t.handle = NIL THEN ERROR DB.Error[NILArgument];
BEGIN
r: Relation = t.relation;
CheckRelationKey[r];
DBModelPrivate.CheckForNull[t.handle];
IF NOT r.attributesKnown THEN CacheAttributes[r];
IF field NOT IN [0..r.attributes.count) THEN ERROR DB.Error[IllegalAttribute];
IF NOT CheckAttribute[r.attributes[field], v] THEN ERROR DB.Error[MismatchedAttributeValueType];
PrimitiveSetF[t.handle, r, field, v]
END };
PrimitiveSetF: PROC[tuple: TupleHandle, r: Relation, field: CARDINAL, val: Value] = {
This procedure does all of the key checking and index maintenance necessary to ensure the validity of the database. It assumes that the value and field arguments have already been tested for reasonableness
attribute: REF AttributeObject = r.attributes[field];
keyIndex: IndexHandle;
IF NOT r.indicesKnown THEN DBModelPrivate.CacheIndices[r];
Determine if a key must be maintained
If this is a property and you're changing the first field, check to see that the tuple doesn't already exist in the relation
IF r.isProperty AND field = 0 THEN TRUSTED {
entity: entity Value = LOOPHOLE[val];
IF PropForEntity[r: r, entity: entity.value, doCheck: FALSE] # NIL THEN
ERROR DB.Error[NonUniqueKeyValue] }
ELSE {
Check the key indices for the relation
FOR i: LIST OF IndexHandle ← attribute.indices, i.rest UNTIL i = NIL DO
thisIndex: IndexHandle = i.first;
IF thisIndex.isKey THEN { keyIndex ← thisIndex; EXIT }
ENDLOOP;
IF keyIndex # NIL AND NOT EnsureUniqueKey[keyIndex, tuple, r, field, val] THEN ERROR DB.Error[NonUniqueKeyValue] };
Remove the tuple from all of the indices that are affected
FOR i: LIST OF IndexHandle ← attribute.indices, i.rest UNTIL i = NIL DO
DBModelPrivate.DeleteTupleFromIndex[tuple, r, i.first]
ENDLOOP;
Have to do case split on whether this is first field of surrogate
IF NOT r.isSurrogate OR field # 0 THEN SetField[tuple, attribute.fh, val]
ELSE TRUSTED {
First have to split the tuple, copying all of the fields from the current relationship (need to check to see whether tuple already exists in the relation)
The tuple that is added to the indices for the relation is the new one that was just created
WITH val: val SELECT FROM
entity => {
DBModelPrivate.SetTupleField[val.value.tuple, r.surrogateGroupHandle, r.tupleSet]; -- add the new tuple in
DBModelPrivate.NullifyTupleField[tuple, r.surrogateGroupHandle]; -- take the old one out
Copy all of the other fields
FOR i: CARDINAL IN [1..r.attributes.count) DO
SetField[val.value.tuple, r.attributes[i].fh, PrimitiveGetF[tuple, r, i]]
ENDLOOP;
tuple ← val.value.tuple };
ENDCASE => DB.Error[NILArgument];
};
Finally add all the index entries for the modified tuple
FOR i: LIST OF IndexHandle ← attribute.indices, i.rest UNTIL i = NIL DO
thisIndex: IndexHandle = i.first;
DBModelPrivate.AddTupleToIndex[tuple, r, thisIndex]
ENDLOOP };
SetField: PROC [t: TupleHandle, fh: DBStorage.FieldHandle, val: Value] ~ TRUSTED {
WITH val: val SELECT FROM
boolean => DBModelPrivate.SetBoolField[t, fh, val.value];
integer => DBModelPrivate.SetIntField[t, fh, val.value];
rope => DBModelPrivate.SetRopeField[t, fh, val.value];
time => DBModelPrivate.SetTimeField[t, fh, val.value];
entity => DBModelPrivate.SetTupleField[t, fh, val.value.tuple];
null => DBModelPrivate.NullifyTupleField[t, fh];
ENDCASE => DB.Error[NILArgument];
};
PrimitiveGetF: PROC [t: TupleHandle, r: Relation, field: CARDINAL] RETURNS [v: Value ← [null[]]] ~ TRUSTED {
IF r.isSurrogate AND field = 0 THEN {
domain: Domain = r.attributes[0].domain;
BEGIN
entity: Entity = NEW[EntityObject ← [domain: domain, tuple: t, name: DBModelPrivate.GetRopeField[t, domain.entityNameHandle]]];
RETURN[Value[entity[value: entity]]]
END };
BEGIN
attr: REF AttributeObject = r.attributes[field];
IF DBDefs.IsPrimitive[attr.type] THEN {
SELECT attr.type FROM
DBDefs.boolType => v ← [boolean[value: DBModelPrivate.GetBoolField[t, attr.fh]]];
DBDefs.intType => v ← [integer[value: LOOPHOLE[DBModelPrivate.GetIntField[t, attr.fh]]]];
DBDefs.ropeType => v ← [rope[value: DBStorage.ReadVarByte[t, attr.fh]]];
DBDefs.timeType => v ← [time[value: DBModelPrivate.GetTimeField[t, attr.fh]]];
ENDCASE => NULL }
ELSE {
tuple: DBCommon.TupleHandle = DBModelPrivate.GetTupleField[t, attr.fh];
IF DBStorage.NullTuple[tuple] THEN v ← [null[]]
ELSE BEGIN
domain: Domain = IF attr.type # anyDomainType THEN attr.domain ELSE DBModelPrivate.TupleToDomain[DBStorage.ReadTupleset[tuple], r.segment];
name: Rope.ROPE = DBModelPrivate.GetRopeField[tuple, domain.entityNameHandle];
v ← [entity[value: NEW[DBDefs.EntityObject ← [domain, name, tuple]]]]
END }
END
};
GetF: PUBLIC PROC[t: Relship, field: CARDINAL] RETURNS [v: Value ← [null[]]] = TRUSTED {
IF t = NIL OR t.handle = NIL THEN ERROR DB.Error[NILArgument];
BEGIN
r: Relation = t.relation;
CheckRelationKey[r];
DBModelPrivate.CheckForNull[t.handle];
IF NOT r.attributesKnown THEN CacheAttributes[r];
IF field >= r.attributes.count THEN ERROR DB.Error[IllegalAttribute]
ELSE v ← PrimitiveGetF[t.handle, r, field]
END };
SetP: PUBLIC PROC[e: Entity, r: Relation, field: CARDINAL, v: Value] = {
CheckRelationKey[r];
DBModelPrivate.CheckForNull[e.tuple];
IF NOT r.attributesKnown THEN CacheAttributes[r];
IF field >= r.attributes.count OR NOT CheckAttribute[r.attributes[field], v]
THEN ERROR DB.Error[IllegalAttribute];
BEGIN
tuple: DBStorage.TupleHandle = PropForEntity[r, e];
IF tuple = NIL THEN ERROR DB.Error[NotFound];
PrimitiveSetF[tuple, r, field, v]
END
};
GetP: PUBLIC PROC[e: Entity, r: Relation, field: CARDINAL] RETURNS[v: Value ← [null[]]] = {
CheckRelationKey[r];
DBModelPrivate.CheckForNull[e.tuple];
IF NOT r.attributesKnown THEN CacheAttributes[r];
IF field >= r.attributes.count THEN ERROR DB.Error[IllegalAttribute];
TRUSTED BEGIN
tuple: DBStorage.TupleHandle = PropForEntity[r, e];
IF tuple = NIL THEN ERROR DB.Error[NotFound];
v ← PrimitiveGetF[tuple, r, field]
END
};
3. Relation creation/destruction/manipulation
NameToField: PUBLIC PROC[r: Relation, name: ROPE] RETURNS[exists: BOOL, pos: CARDINAL] = {
CheckRelationKey[r];
IF NOT r.attributesKnown THEN CacheAttributes[r];
FOR pos IN [0..r.attributes.count) DO
IF Rope.Equal[name, r.attributes[pos].name] THEN {exists ← TRUE; RETURN}
ENDLOOP;
exists ← FALSE
};
FieldDescription: PUBLIC PROC[r: Relation, pos: CARDINAL] RETURNS[field: DB.Field] = {
CheckRelationKey[r];
IF NOT r.attributesKnown THEN CacheAttributes[r];
IF pos > r.attributes.count THEN DB.Error[IllegalAttribute];
field ← GetDescription[r, pos] };
GetDescription: PROC[r: Relation, pos: CARDINAL] RETURNS[field: DB.Field] = TRUSTED {
field.name ← r.attributes[pos].name;
field.type ← DB.TypeSpec[direct[r.attributes[pos].type]];
IF r.attributes[pos].type = ropeType THEN {
field.lengthHint ← (DBStorageField.NWordsInField[r.attributes[pos].fh] * 2) - 2 } };
Fields: PUBLIC PROC[r: Relation] RETURNS[fields: DB.FieldSpec] = {
CheckRelationKey[r];
IF NOT r.attributesKnown THEN CacheAttributes[r];
fields ← NEW[DB.FieldSpecObject[r.attributes.count]];
FOR i: CARDINAL IN [0..r.attributes.count) DO
fields[i] ← GetDescription[r, i]
ENDLOOP };
FieldCount: PUBLIC PROC[r: Relation] RETURNS[count: CARDINAL] = {
CheckRelationKey[r];
IF NOT r.attributesKnown THEN CacheAttributes[r];
count ← r.attributes.count };
RelationOf: PUBLIC PROC [t: Relship] RETURNS [Relation] = {
CheckRelationKey[t.relation]; RETURN[t.relation] };
IndexSequence: TYPE = REF IndexSequenceObject;
IndexSequenceObject: TYPE = RECORD[ SEQUENCE count: CARDINAL OF Index];
DeclareRelation: PUBLIC PROC [name: ROPE, segment: Segment, fields: DB.FieldSpec, isProperty: BOOL] RETURNS[r: Relation] = {
Creates a relation of this name or fetches it if it already exists.
sh: SegmentHandle = DBModelPrivate.SegmentToHandle[segment];
relationIndex: DBStorage.IndexHandle = sh.indices[DBCommon.relationIndex];
EvalCode: PROC[ type: DB.TypeSpec ] RETURNS[ code: TypeCode ] = TRUSTED {
WITH type: type SELECT FROM
direct => code ← type.code;
indirect => code ← type.code[]
ENDCASE };
AddToCache: PROC[tuple: TupleHandle] RETURNS[r: Relation] = {
relationEntry: REF RelationObject = NEW[RelationObject ← [key: sh.key, segment: segment, name: name, tupleSet: DBModelPrivate.CopyTupleHandle[tuple]]];
r ← relationEntry;
[] ← SymTab.Store[x: sh.relationTable, key: name, val: relationEntry];
RETURN[r] };
DefineFields: PROC[t: TupleHandle] = {
Turn the attribute descriptions into attributes for the relation tuple t
The relation will be a surrogate if the first attribute is a key and is domain-valued and if the domain is currently empty (this final condition is necessary because of inadequacies in the Storage level currently)
isFirst: BOOLTRUE; -- This is TRUE only the first time SetFieldHandle is called
tupleSet: DBStorage.TuplesetHandle ← NIL; -- This is set at the first call to SetFieldHandle to the tuple set actually storing the tuples of the relation (which may be a domain if a surrogate is used)
domain: Domain;
If the field that is being set up is entity-valued, then this will be set by SetFieldHandle
attributeGroup: DBStorage.GroupScanHandle = DBStorage.OpenScanGroup[t, relationHandle, Last];
This scan group is used to link together the attributes in a group for the relation
SetFieldHandle: PROC[attr: TupleHandle, field: DB.Field] RETURNS[fh: DBStorage.FieldHandle] = {
type: TypeCode = EvalCode[field.type];
length: CARDINAL = field.lengthHint;
descriptor: DBStorage.FieldDescriptor = SELECT type FROM
boolType => DBModelPrivate.OneWordDescriptor,
intType, timeType => DBStorage.FieldDescriptor[TwoWord[]],
ropeType => DBStorage.FieldDescriptor[VarByte[length]],
ENDCASE => DBStorage.FieldDescriptor[Group[groupID: attr]];
If the descriptor is of some domain type (including AnyDomainType), then the groupID used is the attribute tuple itself; this allows us to do the mapping from groupIDs to field handles needed to do the Unlink operation when entities are destroyed
IF NOT IsPrimitive[type] THEN
domain ← DBModel.TypeToDomain[type, segment];
IF NOT isFirst THEN {
fh ← DBStorage.AddField[tupleSet, descriptor];
SetFieldField[attr, fieldHandleHandle, fh] }
ELSE {
This is the first attribute. It will be stored as a surrogate only if the attribute is a key AND the type is of some domain type AND the domain is currently empty
isFirst ← FALSE;
IF NOT isProperty OR NOT DBModelPrivate.EmptyTupleSet[domain.tupleSet] OR NOT DBModelPrivate.NoSubtypes[domain]
THEN {
fh ← DBStorage.AddField[t, descriptor];
tupleSet ← t;
SetCardField[t, propertyHandle, IF isProperty THEN ORD[RelationType[property]] ELSE ORD[RelationType[normal]]];
SetFieldField[attr, fieldHandleHandle, fh] }
ELSE {
Here we can use the surrogate optimization. We record the fact that the surrogate optimization is used and add a field to the tupleset (the domain) that will contain the group handle for the relation (which will allow us to enumerate all of the relationships in the relation)
SetCardField[t, propertyHandle, ORD[RelationType[surrogate]]];
r.isSurrogate ← TRUE;
tupleSet ← domain.tupleSet;
DBModelPrivate.SetTupleField[t, surrogateDomainHandle, domain.tupleSet];
r.surrogateGroupHandle ← DBStorage.AddField[tupleSet, DBStorage.FieldDescriptor[Group[groupID: t]]];
SetFieldField[t: t, f: surrogateHandleHandle, val: r.surrogateGroupHandle];
r.surrogateDomain ← domain };
} };
SetCardField[t, countHandle, fields.count];
r.attributes ← NEW[AttributeSequence[fields.count]];
FOR i: CARDINAL IN [0..fields.count) DO
field: DB.Field = fields[i];
type: DBDefs.TypeCode = EvalCode[field.type];
attrObj: REF AttributeObject = NEW[AttributeObject ← [name: field.name, type: type]];
attr: DBStorage.TupleHandle = DBStorage.CreateSystemPageTuple[x: AttributeTupleSet, y: t, s: segment];
DBModelPrivate.SetCardField[attr, posHandle, i];
DBModelPrivate.SetTypeField[attr, typeHandle, type];
DBModelPrivate.SetRopeField[attr, aNameHandle, field.name];
Add it to the group for the relation
DBStorage.WriteTID[attr, attributeGroup];
attrObj.fh ← SetFieldHandle[attr, field];
attrObj.tuple ← attr.tid;
IF NOT DBDefs.IsPrimitive[type] AND NOT type = anyDomainType THEN {
DBModelPrivate.SetTupleField[attr, domainHandle, domain.tupleSet];
attrObj.domain ← domain };
r.attributes[i] ← attrObj;
ENDLOOP;
r.attributesKnown ← TRUE;
DBStorage.CloseScanGroup[attributeGroup] };
r ← LookupRelation[name, segment];
IF r # NIL THEN {
IF NOT r.attributesKnown THEN CacheAttributes[r];
Check that all the attributes match (they must be given in precisely the same order)
IF fields.count # r.attributes.count THEN ERROR DB.Error[MismatchedExistingRelation];
FOR i: CARDINAL IN [0..fields.count) DO
IF NOT Rope.Equal[fields[i].name, r.attributes[i].name] OR EvalCode[fields[i].type] # r.attributes[i].type THEN ERROR DB.Error[MismatchedExistingRelation]
ENDLOOP;
IF isProperty # r.isProperty THEN ERROR DB.Error[MismatchedExistingRelation];
RETURN[r] };
If the relation is not found, first ensure that the relation can legally be a property if it has been declared as one
IF isProperty AND (IsPrimitive[EvalCode[fields[0].type]] OR EvalCode[fields[0].type] = anyDomainType) THEN ERROR DB.Error[IllegalProperty];
Create a new relation.
BEGIN
t: TupleHandle = DBStorage.CreateSystemPageTuple[x: DBModelPrivate.RelationTupleSet, y: NIL, s: segment];
DBStorage.CreateTupleset[t];
DBStorage.WriteVarByte[t, rNameHandle, name];
DBStorage.InsertIntoIndex[relationIndex, name, t];
r ← AddToCache[t];
DefineFields[t];
r.isProperty ← isProperty;
sh.schemaUpdated ← TRUE;
RETURN[r]
END };
LookupRelation: PUBLIC PROC[name: Rope.ROPE, segment: Segment] RETURNS[r: Relation] = {
sh: SegmentHandle = DBModelPrivate.SegmentToHandle[segment];
relationIndex: DBStorage.IndexHandle = sh.indices[DBCommon.relationIndex];
AddToCache: PROC[tuple: TupleHandle] RETURNS[r: Relation] = {
relationEntry: REF RelationObject = NEW[RelationObject ← [key: sh.key, segment: segment, name: name, tupleSet: DBModelPrivate.CopyTupleHandle[tuple]]];
r ← relationEntry;
[] ← SymTab.Store[x: sh.relationTable, key: name, val: relationEntry];
RETURN[r]
};
r ← NARROW[SymTab.Fetch[x: sh.relationTable, key: name].val];
IF r # NIL THEN RETURN[r];
BEGIN
See if you can find it in the index for the RelationDomain
tuple: TupleHandle = DBModelPrivate.GetNamedTuple[relationIndex, name];
IF tuple = NIL THEN RETURN[NIL]; -- it's not to be found
It's in the database, but not in the cache. Cache it; none of the details of the entry are computed yet
r ← AddToCache[tuple]
END };
RelationsByName: PUBLIC PROC [segment: Segment, lowName, highName: ROPE, start: DBCommon.FirstLast] RETURNS[rs: RelationSet] = {
iScan: DBStorage.IndexHandle = DBModelPrivate.SegmentToHandle[segment].indices[DBCommon.relationIndex];
IF highName=NIL THEN highName ← lowName;
rs ← NEW[RelationSetObject ← [segment: segment, variant: index[ scanHandle: DBStorage.OpenScanIndex[iScan, [lowerBound: lowName, upperBound: highName, includeLowerBound: TRUE, includeUpperBound: TRUE, lowerBoundInfinity: lowName = NIL, upperBoundInfinity: highName = NIL], start]]]];
RETURN[rs] };
NextRelation: PUBLIC PROC [rs: RelationSet] RETURNS [Relation] = TRUSTED {
name: Rope.ROPE;
tuple: DBCommon.TupleHandle;
WITH rs: rs SELECT FROM
index => {
name ← DBModelPrivate.NameFromIndexKey[DBStorage.NextScanKey[rs.scanHandle]];
tuple ← IF name # NIL THEN DBStorage.ThisScanTuple[rs.scanHandle] ELSE NIL };
relationsForDomain => {
FOR tuple ← DBStorage.NextInGroup[rs.scanHandle], DBStorage.NextInGroup[rs.scanHandle] UNTIL tuple = NIL OR tuple # rs.lastTuple DO
ENDLOOP;
rs.lastTuple ← tuple };
ENDCASE;
RETURN[TupleToRelation[tuple, rs.segment, name]] };
PrevRelation: PUBLIC PROC [rs: RelationSet] RETURNS [Relation] = TRUSTED {
name: Rope.ROPE;
tuple: DBCommon.TupleHandle;
WITH rs: rs SELECT FROM
index => {
name ← DBModelPrivate.NameFromIndexKey[DBStorage.PrevScanKey[rs.scanHandle]];
tuple ← IF name # NIL THEN DBStorage.ThisScanTuple[rs.scanHandle] ELSE NIL };
relationsForDomain => {
FOR tuple ← DBStorage.PrevInGroup[rs.scanHandle], DBStorage.PrevInGroup[rs.scanHandle] UNTIL tuple = NIL OR tuple # rs.lastTuple DO
ENDLOOP;
rs.lastTuple ← tuple };
ENDCASE;
RETURN[TupleToRelation[tuple, rs.segment, name]] };
ReleaseRelationSet: PUBLIC PROC [rs: RelationSet] = TRUSTED {
IF rs = NIL THEN RETURN;
WITH rs: rs SELECT FROM
index => { DBStorage.CloseScanIndex[rs.scanHandle]; rs.scanHandle ← NIL };
relationsForDomain => { DBStorage.CloseScanGroup[rs.scanHandle]; rs.scanHandle ← NIL };
ENDCASE };
TupleToRelation: PUBLIC PROC[t: TupleHandle, segment: Segment, name: Rope.ROPENIL] RETURNS[r: Relation] = {
IF t = NIL THEN RETURN[NIL];
IF name = NIL THEN name ← DBStorage.ReadVarByte[t, rNameHandle];
BEGIN
sh: SegmentHandle = DBModelPrivate.SegmentToHandle[segment];
val: REF ANY = SymTab.Fetch[x: sh.relationTable, key: name].val;
r ← NARROW[val];
IF r # NIL THEN RETURN[r]
ELSE {
Record it in the cache
relationEntry: REF RelationObject = NEW[RelationObject ← [key: sh.key, segment: segment, name: name, tupleSet: DBModelPrivate.CopyTupleHandle[t]]];
r ← relationEntry;
[] ← SymTab.Store[x: sh.relationTable, key: name, val: relationEntry] }
END };
FetchAttribute: PUBLIC PROC [r: Relation, name: ROPE] RETURNS[a: Attribute] = {
CheckRelationKey[r];
IF NOT r.attributesKnown THEN CacheAttributes[r];
FOR i: CARDINAL IN [0..r.attributes.count) DO
IF Rope.Equal[r.attributes[i].name, name] THEN RETURN[NEW[AttributeHandle ← [pos: i, relation: r]]]
ENDLOOP;
RETURN[NIL] };
CacheAttributes: PUBLIC PROC[r: Relation] = {
sh: SegmentHandle = DBModelPrivate.SegmentToHandle[r.segment];
type: RelationType = VAL[GetCardField[r.tupleSet, propertyHandle]];
r.attributesKnown ← TRUE;
BEGIN
attributeGroup: DBStorage.GroupScanHandle = DBStorage.OpenScanGroup[r.tupleSet, relationHandle, First];
attributeCount: CARDINAL = GetCardField[r.tupleSet, countHandle];
attributeRecord: AttrSeqHandle = NEW[ AttributeSequence[attributeCount] ];
thisAttribute: CARDINAL ← 0;
Enumerate all of the attributes of the relation tuple
FOR t: TupleHandle ← DBStorage.NextInGroup[attributeGroup], DBStorage.NextInGroup[attributeGroup] UNTIL t = NIL DO
thisAttributeObject: REF AttributeObject ← NEW[AttributeObject];
attributeRecord[thisAttribute] ← thisAttributeObject;
thisAttributeObject.tuple ← t.tid;
thisAttributeObject.name ← DBModelPrivate.GetRopeField[t, aNameHandle];
thisAttributeObject.type ← DBModelPrivate.GetTypeField[t, typeHandle];
BEGIN
domain: TupleHandle = DBModelPrivate.GetTupleField[t, domainHandle];
IF domain # NIL THEN {
thisAttributeObject.domain ← DBModelPrivate.TupleToDomain[domain, r.segment];
IF NOT thisAttributeObject.domain.detailsKnown THEN DBModelPrivate.ReadDomain[thisAttributeObject.domain] }
END;
thisAttributeObject.fh ← DBModelPrivate.GetFieldField[t, fieldHandleHandle];
thisAttribute ← thisAttribute + 1;
ENDLOOP;
r.attributes ← attributeRecord;
DBStorage.CloseScanGroup[attributeGroup]
END;
If the relation is a surrogate, cache the surrogate data
r.isSurrogate ← type = surrogate;
r.isProperty ← type # normal;
IF r.isSurrogate THEN {
domainTuple: DBStorage.TupleHandle ← DBStorage.ConsSystemTupleObject[];
r.surrogateGroupHandle ← GetFieldField[r.tupleSet, surrogateHandleHandle];
domainTuple.tid ← r.attributes[0].type.code;
domainTuple.cacheHint ← NIL;
r.surrogateDomain ← DBModelPrivate.TupleToDomain[domainTuple, r.segment] } };
CheckRelationKey: PUBLIC PROC[r: Relation] = {
sh: SegmentHandle = DBModelPrivate.SegmentToHandle[r.segment];
IF sh.key # r.key THEN ERROR DB.Error[IllegalRelation] };
AttributeInfo: PUBLIC PROC [a: Attribute] RETURNS [name: ROPE, r: Relation, pos: NAT, type: TypeCode] = {
CheckRelationKey[a.relation];
pos ← a.pos;
BEGIN
attr: REF AttributeObject = a.relation.attributes[pos];
name ← attr.name;
type ← attr.type;
END };
RelationEq: PUBLIC PROC [r1, r2: Relation] RETURNS[BOOL] = {
IF r1 = NIL THEN RETURN[r2 = NIL];
IF r2 = NIL THEN RETURN[FALSE];
CheckRelationKey[r1]; CheckRelationKey[r2];
RETURN[r1^ = r2^] };
AttributeEq: PUBLIC PROC [a1, a2: Attribute] RETURNS[BOOL] = {
IF a1 = NIL THEN RETURN[a2 = NIL];
IF a2 = NIL THEN RETURN[FALSE];
RETURN[a1.pos = a2.pos AND RelationEq[a1.relation, a2.relation]] };
NullRelation: PUBLIC PROC [r: Relation] RETURNS[BOOLEAN] = {
IF r = NIL THEN RETURN[TRUE];
BEGIN
sh: SegmentHandle = DBModelPrivate.SegmentToHandle[r.segment];
RETURN[r.key # sh.key]
END };
NullAttribute: PUBLIC PROC [a: Attribute] RETURNS[BOOLEAN] = {
RETURN[a = NIL OR NullRelation[a.relation]] };
NullRelship: PUBLIC PROC[r: Relship] RETURNS[yes: BOOL] = {
IF r = NIL THEN RETURN[TRUE];
IF DBStorage.NullTuple[r.handle] THEN RETURN[TRUE];
BEGIN
relation: Relation = r.relation;
sh: SegmentHandle = DBModelPrivate.SegmentToHandle[relation.segment];
RETURN[relation.key # sh.key]
END };
RelshipEq: PUBLIC PROC[r1: Relship, r2: Relship] RETURNS[yes: BOOL] = {
IF NullRelship[r1] THEN RETURN[NullRelship[r2]];
IF NullRelship[r2] THEN RETURN[NullRelship[r1]];
RETURN[RelationEq[r1.relation, r2.relation] AND r1.handle.tid = r2.handle.tid]
};
RelationSubset: PUBLIC PROC [r: Relation, index: Index, constraint: Constraint, start: FirstLast] RETURNS [rs: RelshipSet] = {
CheckConstraint: PROC[] = {
IF index.fields.count # constraint.count THEN ERROR DB.Error[IllegalConstraint];
FOR i: CARDINAL IN [0..constraint.count) DO
thisAttributeType: TypeCode = r.attributes[index.fields[i]].type;
thisConstraint: ValueConstraint = constraint[i];
IF constraint[i].type = null THEN ERROR DB.Error[IllegalConstraint];
IF IsPrimitive[thisAttributeType] THEN
IF ORD[constraint[i].type] # thisAttributeType.code THEN ERROR DB.Error[IllegalConstraint]
ELSE IF thisAttributeType # anyDomainType THEN {
WITH thisConstraint SELECT FROM
v: entity ValueConstraint =>
IF NOT DBModelPrivate.CompatibleTypes[v.value.domain, thisAttributeType] THEN ERROR DB.Error[IllegalConstraint];
ENDCASE }
ENDLOOP };
NCodeConstraint: PROC[c: ValueConstraint, checkForNils: BOOL] RETURNS[low, high: Rope.ROPE, noDecodeNeeded: BOOL] = TRUSTED {
noDecodeNeeded is TRUE iff either the constraint was single-valued (thus the search of the index will generate only tuples that have this value) or the constraint covers the entire range of values (in which case any tuple in the enumeration matching the key will have a value in range for this field)
WITH c: c SELECT FROM
boolean => {
low ← high ← DBModelPrivate.NCodeBool[c.value];
noDecodeNeeded ← TRUE };
integer => {
low ← DBModelPrivate.NCodeInt[c.low];
high ← DBModelPrivate.NCodeInt[c.high];
noDecodeNeeded ← (c.low = c.high) OR (c.low = FIRST[INT] AND c.high = LAST[INT]) };
rope => {
noDecodeNeeded ← (c.low = NIL AND c.high = NIL) OR (Rope.Equal[c.low, c.high]);
low ← IF checkForNils AND c.low = NIL THEN NIL ELSE DBModelPrivate.NCodeRope[c.low];
high ← IF checkForNils AND c.high = NIL THEN NIL ELSE DBModelPrivate.NCodeRope[IF c.high = NIL THEN "\177" ELSE c.high]
};
time => {
IF c.low = BasicTime.nullGMT THEN c.low ← BasicTime.earliestGMT;
IF c.high = BasicTime.nullGMT THEN c.high ← BasicTime.latestGMT;
noDecodeNeeded ← (c.low = c.high) OR (c.low = BasicTime.earliestGMT AND c.high = BasicTime.latestGMT);
low ← DBModelPrivate.NCodeTime[c.low];
high ← DBModelPrivate.NCodeTime[c.high] };
entity => {
domain: Domain = c.value.domain;
DBModelPrivate.CheckDomainKey[domain];
low ← high ← DBModelPrivate.NCodeTuple[c.value.tuple];
noDecodeNeeded ← TRUE }
ENDCASE => NULL };
CheckRelationKey[r];
IF NOT r.attributesKnown THEN CacheAttributes[r];
IF NOT r.indicesKnown THEN DBModelPrivate.CacheIndices[r];
IF index = NIL THEN { -- scan the entire relation
IF r.isSurrogate THEN {
scanHandle: DBStorage.GroupScanHandle = DBStorage.OpenScanGroup[r.tupleSet, r.surrogateGroupHandle, start];
RETURN[NEW[RelshipSetObject ← [relation: r, variant: group[scanHandle]]]] }
ELSE {
scanHandle: DBStorage.TuplesetScanHandle = DBStorage.OpenScanTupleset[r.tupleSet, start];
RETURN[NEW[RelshipSetObject ← [relation: r, variant: tupleSet[scanHandle]]]] } };
IF NOT IsLegalIndex[r, index] THEN ERROR DB.Error[IllegalConstraint];
IF constraint = NIL THEN {
Scan the entire index
scanHandle: DBStorage.IndexScanHandle = DBStorage.OpenScanIndex[index.index, DBStorage.Selection[lowerBound: NIL, upperBound: NIL, includeLowerBound: TRUE, includeUpperBound: TRUE, lowerBoundInfinity: TRUE, upperBoundInfinity: TRUE], start];
RETURN[NEW[RelshipSetObject ← [relation: r, variant: simpleScan[scanHandle]]]]
};
CheckConstraint[];
IF constraint.count = 1 THEN { -- do a simple scan
low, high: Rope.ROPE;
scanHandle: DBStorage.IndexScanHandle;
[low ~ low, high ~ high] ← NCodeConstraint[c: constraint[0], checkForNils: TRUE];
scanHandle ← DBStorage.OpenScanIndex[index.index, DBStorage.Selection[lowerBound: low, upperBound: high, includeLowerBound: TRUE, includeUpperBound: TRUE, lowerBoundInfinity: low = NIL, upperBoundInfinity: high = NIL], start];
RETURN[NEW[RelshipSetObject ← [relation: r, variant: simpleScan[scanHandle]]]] }
ELSE { -- this is the complicated case
low: RopeSequence ← NEW[RopeSequenceObject[constraint.count]];
high: RopeSequence ← NEW[RopeSequenceObject[constraint.count]];
lowCompleteKey, highCompleteKey: Rope.ROPE;
scanHandle: DBStorage.IndexScanHandle;
dense: BOOLTRUE;
If each of the constraints (except the last) is either single-valued or covers the entire range of values, then the enumeration of the index will produce only values satisfying the constaints; in this case no testing of the keys in the index to determine whether the tuple satisfies the constraints is necessary
FOR i: CARDINAL IN [0..constraint.count) DO
noDecodeNeeded: BOOL;
[low ~ low[i], high ~ high[i], noDecodeNeeded ~ noDecodeNeeded] ← NCodeConstraint[c: constraint[i], checkForNils: FALSE];
lowCompleteKey ← Rope.Concat[lowCompleteKey, low[i]];
highCompleteKey ← Rope.Concat[highCompleteKey, high[i]];
IF i # constraint.count-1 THEN dense ← dense AND noDecodeNeeded
ENDLOOP;
scanHandle ← DBStorage.OpenScanIndex[index.index, DBStorage.Selection[lowerBound: lowCompleteKey, upperBound: highCompleteKey, includeLowerBound: TRUE, includeUpperBound: TRUE], start];
IF dense THEN
RETURN[NEW[RelshipSetObject ← [relation: r, variant: simpleScan[scanHandle]]]]
ELSE
RETURN[NEW[RelshipSetObject ← [relation: r, variant: complexScan[handle: NEW[ComplexScanObject ← [scanHandle: scanHandle, low: low, high: high, thisOne: NEW[RopeSequenceObject[constraint.count]]]]]]]]
}
};
RelshipsForEntity: PUBLIC PROC [e: Entity] RETURNS [rs: RelshipSet] = {
DBModelPrivate.CheckDomainKey[e.domain];
DBModelPrivate.CheckForNull[e.tuple];
BEGIN
attributeList: LIST OF TupleHandle = DBStorage.GetGroupIDs[e.tuple];
relationTuple: TupleHandle = DBModelPrivate.GetTupleField[attributeList.first, relationHandle];
thisRelation: Relation = TupleToRelation[relationTuple, e.domain.segment];
fieldHandle: DBStorage.FieldHandle = DBModelPrivate.GetFieldField[attributeList.first, fieldHandleHandle];
thisGroupScan: DBStorage.GroupScanHandle = DBStorage.OpenScanGroup[e.tuple, fieldHandle, First];
rs ← NEW[RelshipSetObject ← [relation: NIL, variant: scanForEntity[handle: NEW[EntityScanObject ← [entityTuple: e.tuple, fields: attributeList.rest, relationsAlreadyDone: LIST[relationTuple], thisRelation: thisRelation, scanHandle: thisGroupScan]]]]]
END };
FirstRelship: PUBLIC PROC[r: Relation] RETURNS[relship: Relship] = {
CheckRelationKey[r];
BEGIN
tuple: DBStorage.TupleHandle = DBStorage.FirstTuple[r.tupleSet];
IF tuple = NIL THEN RETURN[NIL];
RETURN[NEW[RelshipObject ← [relation: r, handle: tuple]]]
END };
NextRelship: PUBLIC PROC [rs: RelshipSet] RETURNS [rship: Relship] = TRUSTED {
tuple: TupleHandle;
relation: Relation;
WITH rs: rs SELECT FROM
scanForEntity => {
DO
IF rs.handle.scanHandle = NIL THEN EXIT;
tuple ← DBStorage.NextInGroup[rs.handle.scanHandle];
IF tuple # NIL THEN { relation ← rs.handle.thisRelation; EXIT };
DBStorage.CloseScanGroup[rs.handle.scanHandle];
BEGIN
relationTuple: TupleHandle;
alreadyDone: BOOL;
rs.handle.fields ← rs.handle.fields.rest;
DO
IF rs.handle.fields = NIL THEN EXIT; -- Nothing more to do
relationTuple ← DBModelPrivate.GetTupleField[rs.handle.fields.first, relationHandle];
alreadyDone ← FALSE;
FOR rL: LIST OF TupleHandle ← rs.handle.relationsAlreadyDone, rL.rest UNTIL rL = NIL DO
IF rL.first^ = relationTuple^ THEN {alreadyDone ← TRUE; EXIT}
ENDLOOP;
IF alreadyDone THEN LOOP ELSE EXIT
ENDLOOP;
IF rs.handle.fields = NIL THEN { rs.handle.scanHandle ← NIL; EXIT }
ELSE {
thisRelation: Relation = TupleToRelation[relationTuple, rs.handle.thisRelation.segment];
fieldHandle: DBStorage.FieldHandle = DBModelPrivate.GetFieldField[rs.handle.fields.first, fieldHandleHandle];
thisGroupScan: DBStorage.GroupScanHandle = DBStorage.OpenScanGroup[rs.handle.entityTuple, fieldHandle, First];
rs.handle.scanHandle ← thisGroupScan;
rs.handle.fields ← rs.handle.fields.rest;
rs.handle.relationsAlreadyDone ← CONS[relationTuple, rs.handle.relationsAlreadyDone];
rs.handle.thisRelation ← thisRelation;
LOOP }
END
ENDLOOP };
onlyOne => {
IF rs.tuple = NIL THEN rship ← NIL
ELSE {
rship ← NEW[RelshipObject ← [relation: rs.relation, handle: rs.tuple]];
rs.tuple ← NIL } };
simpleScan => {
tuple: DBStorage.TupleHandle = DBStorage.NextScanIndex[rs.scanHandle];
IF tuple = NIL THEN rship ← NIL
ELSE rship ← NEW[RelshipObject ← [relation: rs.relation, handle: tuple]] };
tupleSet => {
tuple: DBStorage.TupleHandle = DBStorage.NextScanTupleset[rs.scanHandle];
IF tuple = NIL THEN rship ← NIL
ELSE rship ← NEW[RelshipObject ← [relation: rs.relation, handle: tuple]] };
group => {
tuple: DBStorage.TupleHandle = DBStorage.NextInGroup[rs.scanHandle];
IF tuple = NIL THEN rship ← NIL
ELSE rship ← NEW[RelshipObject ← [relation: rs.relation, handle: tuple]] };
complexScan => {
tuple: DBStorage.TupleHandle;
DO
thisKey: DBCommon.IndexKey = DBStorage.NextScanKey[rs.handle.scanHandle];
IF thisKey = NIL THEN EXIT;
DBModelPrivate.DCode[thisKey, rs.handle.thisOne];
IF CheckRanges[rs.handle.low, rs.handle.thisOne, rs.handle.high]
THEN { tuple ← DBStorage.ThisScanTuple[rs.handle.scanHandle]; EXIT }
ELSE DBStorage.IgnoreEntry[rs.handle.scanHandle]
ENDLOOP;
IF tuple = NIL THEN rship ← NIL
ELSE rship ← NEW[RelshipObject ← [relation: rs.relation, handle: tuple]] }
ENDCASE };
CheckRanges: PROC [low, this, high: RopeSequence] RETURNS [inRange: BOOL] ~ {
FOR i: CARDINAL IN [0..low.length) DO
Each component of the sequence must be in the range low[i] <= this[i] <= high[i]
IF Rope.Compare[low[i], this[i]] = greater OR Rope.Compare[high[i], this[i]] = less
THEN RETURN[FALSE]
ENDLOOP;
RETURN[TRUE]
};
RelshipsWithEntityField: PUBLIC PROC[r: Relation, field: CARDINAL, val: Entity] RETURNS[rs: RelshipSet] = {
IF NOT r.attributesKnown THEN CacheAttributes[r];
IF field >= r.attributes.count THEN ERROR DB.Error[IllegalAttribute];
IF NOT CheckAttribute[r.attributes[field], Value[entity[val]]] THEN ERROR DB.Error[IllegalAttribute];
DBModelPrivate.CheckForNull[val.tuple];
BEGIN
fh: DBStorage.FieldHandle = r.attributes[field].fh;
handle: DBStorage.GroupScanHandle = DBStorage.OpenScanGroup[val.tuple, fh, First];
RETURN[NEW[RelshipSetObject ← [relation: r, variant: group[handle]]]]
END };
PrevRelship: PUBLIC PROC [rs: RelshipSet] RETURNS [rship: Relship] = TRUSTED {
WITH rs: rs SELECT FROM
scanForEntity => ERROR DB.Error[NotImplemented];
onlyOne => rship ← NIL;
simpleScan => {
tuple: DBStorage.TupleHandle = DBStorage.PrevScanIndex[rs.scanHandle];
IF tuple = NIL THEN rship ← NIL
ELSE rship ← NEW[RelshipObject ← [relation: rs.relation, handle: tuple]] };
tupleSet => {
tuple: DBStorage.TupleHandle = DBStorage.PrevScanTupleset[rs.scanHandle];
IF tuple = NIL THEN rship ← NIL
ELSE rship ← NEW[RelshipObject ← [relation: rs.relation, handle: tuple]] };
group => {
tuple: DBStorage.TupleHandle = DBStorage.PrevInGroup[rs.scanHandle];
IF tuple = NIL THEN rship ← NIL
ELSE rship ← NEW[RelshipObject ← [relation: rs.relation, handle: tuple]] };
complexScan => {
tuple: DBStorage.TupleHandle;
DO
thisKey: DBCommon.IndexKey = DBStorage.PrevScanKey[rs.handle.scanHandle];
IF thisKey = NIL THEN EXIT;
DBModelPrivate.DCode[thisKey, rs.handle.thisOne];
IF CheckRanges[rs.handle.low, rs.handle.thisOne, rs.handle.high]
THEN { tuple ← DBStorage.ThisScanTuple[rs.handle.scanHandle]; EXIT }
ELSE DBStorage.IgnoreEntry[rs.handle.scanHandle]
ENDLOOP;
IF tuple = NIL THEN rship ← NIL
ELSE rship ← NEW[RelshipObject ← [relation: rs.relation, handle: tuple]] }
ENDCASE };
ReleaseRelshipSet: PUBLIC PROC [rs: RelshipSet] = TRUSTED {
WITH rs: rs SELECT FROM
scanForEntity => { DBStorage.CloseScanGroup[rs.handle.scanHandle] };
onlyOne => NULL;
simpleScan => { DBStorage.CloseScanIndex[rs.scanHandle] };
tupleSet => { DBStorage.CloseScanTupleset[rs.scanHandle] };
group => { DBStorage.CloseScanGroup[rs.scanHandle] };
complexScan => { DBStorage.CloseScanIndex[rs.handle.scanHandle] }
ENDCASE
};
RelationsOf: PUBLIC PROC [d: Domain] RETURNS[rs: RelationSet] = {
DBModelPrivate.CheckDomainKey[d];
BEGIN
attributeEnumeration: DBStorage.GroupScanHandle = DBStorage.OpenScanGroup[d.tupleSet, domainHandle, First];
rs ← NEW[RelationSetObject ← [segment: d.segment, variant: relationsForDomain[scanHandle: attributeEnumeration, lastTuple: NIL]]]
END
};
DestroyRelation: PUBLIC PROC [r: Relation] = {
sh: SegmentHandle = DBModelPrivate.SegmentToHandle[r.segment];
relationIndex: DBStorage.IndexHandle = sh.indices[DBCommon.relationIndex];
Destroy all indices on the relation
IF NOT r.indicesKnown THEN DBModelPrivate.CacheIndices[r];
IF NOT r.attributesKnown THEN CacheAttributes[r];
FOR iL: LIST OF IndexHandle ← r.indices, iL.rest UNTIL iL = NIL DO
DBModelPrivate.DestroyIndexTuple[iL.first.indexHandle]
ENDLOOP;
r.indices ← NIL;
FOR iL: LIST OF IndexHandle ← r.keys, iL.rest UNTIL iL = NIL DO
DBModelPrivate.DestroyIndexTuple[iL.first.indexHandle]
ENDLOOP;
r.keys ← NIL;
Enumerate each tuple and destroy the tuple (DestroyRelshipTuple)
IF r.isSurrogate THEN {
relationSet: DBStorage.GroupScanHandle = DBStorage.OpenScanGroup[r.surrogateDomain.tupleSet, r.surrogateGroupHandle, First];
FOR nextTuple: DBStorage.TupleHandle ← DBStorage.NextInGroup[relationSet], DBStorage.NextInGroup[relationSet] UNTIL nextTuple = NIL DO
DestroyRelshipTuple[nextTuple, r];
ENDLOOP;
DBStorage.CloseScanGroup[relationSet] }
ELSE {
relationSet: DBStorage.TuplesetScanHandle = DBStorage.OpenScanTupleset[r.tupleSet, First];
FOR nextTuple: DBStorage.TupleHandle ← DBStorage.NextScanTupleset[relationSet], DBStorage.NextScanTupleset[relationSet] UNTIL nextTuple = NIL DO
DestroyRelshipTuple[nextTuple, r];
ENDLOOP;
DBStorage.CloseScanTupleset[relationSet]
};
Destroy all of the schema information
BEGIN
attributeSet: DBStorage.GroupScanHandle = DBStorage.OpenScanGroup[r.tupleSet, relationHandle, First];
attrList: LIST OF DBStorage.TupleHandle;
FOR nextTuple: DBStorage.TupleHandle ← DBStorage.NextInGroup[attributeSet], DBStorage.NextInGroup[attributeSet] UNTIL nextTuple = NIL DO
attrList ← CONS[nextTuple, attrList];
ENDLOOP;
DBStorage.CloseScanGroup[attributeSet];
FOR tl: LIST OF DBStorage.TupleHandle ← attrList, tl.rest UNTIL tl = NIL DO
DBStorage.WriteTIDNil[tl.first, domainHandle];
DBStorage.WriteTIDNil[tl.first, relationHandle];
DBModelPrivate.SetRopeField[tl.first, aNameHandle, NIL];
DBStorage.DestroyTuple[tl.first]
ENDLOOP;
DBStorage.WriteTIDNil[r.tupleSet, surrogateDomainHandle];
END;
Destroy the relation tuple itself
DBStorage.DeleteFromIndex[relationIndex, r.name, r.tupleSet];
DBStorage.DestroyTuple[r.tupleSet];
RemoveRelationFromCache[r]
};
RemoveRelationFromCache: PROC[r: Relation] = {
sh: SegmentHandle = DBModelPrivate.SegmentToHandle[r.segment];
IF r.attributesKnown THEN {
FOR i: CARDINAL IN [0..r.attributes.count) DO
If the attribute was cached in the attribute table, then throw that entry away
[] ← CardTable.Delete[sh.attributeTable, r.attributes[i].tuple];
Remove the circularity in the list structures
r.attributes[i].domain ← NIL;
r.attributes[i].indices ← NIL
ENDLOOP
};
If this relation was a surrogate, then flush all of the cached surrogate information for its domain
IF r.surrogateDomain # NIL THEN {
r.surrogateDomain.surrogatesKnown ← FALSE;
r.surrogateDomain.surrogates ← NIL };
r.surrogateDomain ← NIL;
r.key ← 0;
[] ← SymTab.Delete[x: sh.relationTable, key: r.name]
};
DestroyStoredRelations: PUBLIC PROC[d: Domain] = {
Two steps in this process:
1. Interrogate the database and destroy all of the relations that have attributes that refer to the domain (enumerate the attribute tupleset to find them)
2. Throw away any cache entries for relations that have been destroyed as a result (Note that the structure of the cache ensures that this can be done without further database interrogation)
relationList: LIST OF Relation;
attributeScan: DBStorage.GroupScanHandle = DBStorage.OpenScanGroup[d.tupleSet, domainHandle, First];
FOR attr: TupleHandle ← DBStorage.NextInGroup[attributeScan], DBStorage.NextInGroup[attributeScan] UNTIL attr = NIL DO
relationTuple: TupleHandle = DBModelPrivate.GetTupleField[attr, relationHandle];
IF GetCardField[relationTuple, propertyHandle] = ORD[RelationType[surrogate]]
THEN LOOP;
BEGIN
relation: Relation = TupleToRelation[relationTuple, d.segment];
Check to see whether the relation already exists in the list
FOR rL: LIST OF Relation ← relationList, rL.rest UNTIL rL = NIL DO
IF relation = rL.first THEN EXIT
REPEAT
FINISHED => relationList ← CONS[relation, relationList]
ENDLOOP
END
ENDLOOP;
DBStorage.CloseScanGroup[attributeScan];
FOR rL: LIST OF Relation ← relationList, rL.rest UNTIL rL = NIL DO
DestroyRelation[rL.first];
ENDLOOP;
};
DestroySurrogates: PUBLIC PROC[d: Domain] = {
Enumerate all of the surrogate for the domain
IF NOT d.surrogatesKnown THEN EnumerateSurrogates[d];
DestroyRelation on each of them
FOR rl: RelationList ← d.surrogates, rl.next UNTIL rl = NIL DO
DestroyRelation[rl.relation]
ENDLOOP
};
EnumerateSurrogates: PROC [d: Domain] ~ {
surrogateScanGroup: DBStorage.GroupScanHandle = DBStorage.OpenScanGroup[d.tupleSet, surrogateDomainHandle, First];
FOR nextTuple: TupleHandle ← DBStorage.NextInGroup[surrogateScanGroup], DBStorage.NextInGroup[surrogateScanGroup] UNTIL nextTuple = NIL DO
d.surrogates ← NEW[RelationListObject ← [relation: TupleToRelation[nextTuple, d.segment], next: d.surrogates]]
ENDLOOP;
DBStorage.CloseScanGroup[surrogateScanGroup];
d.surrogatesKnown ← TRUE };
Unlink: PUBLIC PROC[t: TupleHandle, d: Domain] = {
sh: SegmentHandle = DBModelPrivate.SegmentToHandle[d.segment];
IF NOT d.surrogatesKnown THEN EnumerateSurrogates[d];
BEGIN
groups: LIST OF DBStorage.TupleHandle = DBStorage.GetGroupIDs[t];
Each of these tuple ids is an attribute in the Attribute domain; we first insert each into the attributeTable if not already there
FOR aL: LIST OF DBStorage.TupleHandle ← groups, aL.rest UNTIL aL = NIL DO
attrTuple: DBStorage.TupleHandle = aL.first;
attr: Attribute;
attr ← NARROW[CardTable.Fetch[sh.attributeTable, attrTuple.tid].val];
IF attr = NIL THEN {
relationTuple: DBStorage.TupleHandle = DBModelPrivate.GetTupleField[attrTuple, relationHandle];
pos: CARDINAL = DBModelPrivate.GetCardField[attrTuple, posHandle];
relation: Relation = TupleToRelation[relationTuple, d.segment];
attr ← NEW[AttributeHandle ← [pos: pos, relation: relation]];
[] ← CardTable.Insert[x: sh.attributeTable, key: attrTuple.tid, val: attr] };
IF NOT attr.relation.attributesKnown THEN CacheAttributes[attr.relation];
Open the scan group and destroy all of the tuples that refer to this one
BEGIN
fh: DBStorage.FieldHandle = attr.relation.attributes[attr.pos].fh;
tupleScan: DBStorage.GroupScanHandle = DBStorage.OpenScanGroup[t, fh, First];
FOR nextTuple: DBStorage.TupleHandle ← DBStorage.NextInGroup[tupleScan], DBStorage.NextInGroup[tupleScan] UNTIL nextTuple = NIL DO
DestroyRelshipTuple[nextTuple, attr.relation]
ENDLOOP;
DBStorage.CloseScanGroup[tupleScan];
END
ENDLOOP;
END;
All that is left to do is to destroy all of the surrogates that may be stored with the tuple
FOR rL: RelationList ← d.surrogates, rL.next UNTIL rL = NIL DO
relation: Relation = rL.relation;
IF NOT relation.attributesKnown THEN CacheAttributes[relation];
DestroyRelshipTuple[t, relation];
DBStorage.WriteTIDNil[t, relation.surrogateGroupHandle]
ENDLOOP
};
4. Definition of the structure of Relations in the Database
The structure of the storage of the relations, attributes and index factors component of a Cypress database is reasonably complicated and takes advantage of the ability to define many groups at the storage level.
A relation tuple contains the name of the relation, the number of attributes in the relation, whether the relation is a surrogate relation, and (if it is a surrogate) a field handle to be used for enumerating all of the tuples in the relation (which will be a subset of all of the tuples in the domain defined by the first attribute). A relation tuple is also the head of groups that enumerate all of the attributes of the relation and all of the indices of the relation.
An attribute tuple contains the name of the attribute, the type code for the attribute, a link to the domain defining the type (if the attribute is not of one of the basic types), a link to the relation to which the attribute belongs, the cardinal position of the attribute, and the field handle to be used when retrieving or setting this field of a relation. Note that the attribute information does not depend on whether the attribute is part of a surrogate relation or not.
AttributeTupleSet: DBStorage.SystemTuplesetHandle;
AttributeDomain: DBCommon.TupleHandle;
relationGroup: DBStorage.FieldDescriptor ← [Group[]];
attributeGroup: DBStorage.FieldDescriptor ← [Group[]];
InitializeRelationSchema: PUBLIC PROC[] = TRUSTED {
[AttributeTupleSet, AttributeDomain] ← DBModelPrivate.SetSystemTable[DBModelPrivate.AttributeTSID];
relationGroup ← DBStorage.FieldDescriptor[Group[groupID: DBModelPrivate.RelationDomain]];
attributeGroup ← DBStorage.FieldDescriptor[Group[groupID: AttributeDomain]];
The RelationDomain tuples are tuplesets with the following additional fields:
1. the name of the relation
2. the number of fields in the relation
3. the type of the relation (whether normal, property, or surrogate),
4. if it is a surrogate, the field handle to be used when enumerating the tuples in the relation,
5. if it is a surrogate, a link to the domain of which it is a surrogate
[] ← DBStorage.AddSystemField[DBModelPrivate.RelationTupleSet, DBModelPrivate.TupleSetDescriptor];
rNameHandle ← DBStorage.AddSystemField[DBModelPrivate.RelationTupleSet, DBModelPrivate.NameDescriptor];
countHandle ← DBStorage.AddSystemField[DBModelPrivate.RelationTupleSet, DBModelPrivate.OneWordDescriptor];
propertyHandle ← DBStorage.AddSystemField[DBModelPrivate.RelationTupleSet, DBModelPrivate.OneWordDescriptor];
surrogateHandleHandle ← DBStorage.AddSystemField[DBModelPrivate.RelationTupleSet, DBModelPrivate.HandleDescriptor];
surrogateDomainHandle ← DBStorage.AddSystemField[DBModelPrivate.RelationTupleSet, [Group[DBModelPrivate.DomainDomain]]];
The AttributeDomain tuples have the following fields:
1. the name of the attribute,
2. the typecode of the attribute,
3. the domain corresponding to the typecode (if a domain type),
4. the relation to which the attribute belongs,
5. the relative position of the attribute in the tuple, and
6. the field handle for the attribute
aNameHandle ← DBStorage.AddSystemField[AttributeTupleSet, DBModelPrivate.NameDescriptor];
typeHandle ← DBStorage.AddSystemField[AttributeTupleSet, DBModelPrivate.TypeDescriptor];
domainHandle ← DBStorage.AddSystemField[AttributeTupleSet, attributeGroup];
relationHandle ← DBStorage.AddSystemField[AttributeTupleSet, attributeGroup];
posHandle ← DBStorage.AddSystemField[AttributeTupleSet, DBModelPrivate.OneWordDescriptor];
fieldHandleHandle← DBStorage.AddSystemField[AttributeTupleSet, DBModelPrivate.HandleDescriptor] };
Handles for the relation domain
rNameHandle, countHandle, propertyHandle, surrogateDomainHandle, surrogateHandleHandle: DBStorage.FieldHandle;
Handles for the attribute domain
aNameHandle, typeHandle, domainHandle, relationHandle, posHandle, fieldHandleHandle: DBStorage.FieldHandle;
Each of the procedures below assume that the field handle is a OneWord handle
SetCardField: PUBLIC PROC[t: TupleHandle, f: DBStorage.FieldHandle, val: CARDINAL] = {
DBStorage.Write1Word[t, f, val] };
GetCardField: PUBLIC PROC[t: TupleHandle, f: DBStorage.FieldHandle] RETURNS[val: CARDINAL] = { val ← DBStorage.Read1Word[t, f] };
SetBoolField: PUBLIC PROC[t: TupleHandle, f: DBStorage.FieldHandle, val: BOOL] = {
DBStorage.Write1Word[t, f, IF val THEN 0 ELSE 1] };
GetBoolField: PUBLIC PROC[t: TupleHandle, f: DBStorage.FieldHandle] RETURNS[val: BOOL] = { val ← DBStorage.Read1Word[t, f] = 0 };
Type codes are stored as two words
SetTypeField: PUBLIC PROC[t: TupleHandle, f: DBStorage.FieldHandle, val: TypeCode] = {
DBStorage.Write2Word[t, f, val.code] };
GetTypeField: PUBLIC PROC[t: TupleHandle, f: DBStorage.FieldHandle] RETURNS[val: TypeCode] = { RETURN[TypeCode[DBStorage.Read2Word[t, f]]] };
Integers and times are also stored as two words
SetTimeField: PUBLIC PROC[t: TupleHandle, f: DBStorage.FieldHandle, val: BasicTime.GMT] = {
DBStorage.Write2Word[t, f, LOOPHOLE[val]] };
GetTimeField: PUBLIC PROC[t: TupleHandle, f: DBStorage.FieldHandle] RETURNS[val: BasicTime.GMT] = {
val ← LOOPHOLE[DBStorage.Read2Word[t, f]] };
SetIntField: PUBLIC PROC[t: TupleHandle, f: DBStorage.FieldHandle, val: INT] = {
DBStorage.Write2Word[t, f, LOOPHOLE[val]] };
GetIntField: PUBLIC PROC[t: TupleHandle, f: DBStorage.FieldHandle] RETURNS[val: INT] = {
val ← LOOPHOLE[DBStorage.Read2Word[t, f]] };
These procedures manipulate field handles stored in the database
SetFieldField: PUBLIC PROC[t: TupleHandle, f: DBStorage.FieldHandle, val: DBStorage.FieldHandle] = {
DBStorage.WriteNWord[t, f, val] };
GetFieldField: PUBLIC PROC[t: TupleHandle, f: DBStorage.FieldHandle] RETURNS[val: DBStorage.FieldHandle] = {
val ← DBStorage.CreateFieldHandle[]; DBStorage.ReadNWord[t, f, val] };
END.