-- Transport Mechanism Filestore - writer for heap objects --
-- [Juniper]<Grapevine>MS>Writer.mesa
-- Randy Gobbel 19-May-81 12:53:09 --
-- Andrew Birrell 12-Jun-81 11:32:29 --
-- M. D. Schroeder 7-Feb-83 15:43:13 --
DIRECTORY
BodyDefs USING[ ItemHeader ],
HeapDefs USING[ Buffer, ObjectOffset, objectStart ],
HeapFileDefs USING[ ClaimSinglePage, CommitedSinglePage, CommitObject,
NextPage, NewWriterPage, NextWriterPage,
ObjectAbandoned ],
HeapXDefs USING[ ObjectHeader, PageHeader, WriterData ],
Inline USING[ COPY, LowHalf ],
LogDefs USING[ WriteChar ],
ObjectDirDefs USING[ FreeObject, ObjectType ],
ObjectDirXDefs USING[ gapObjectNumber, MoveObject, NewObject,
ObjectNumber, ReleaseObject ],
Process USING[ InitializeMonitor ],
ProtocolDefs USING[ Failed, ReceiveCount ],
PupStream USING[ StreamClosing ],
Storage USING[ Node ],
Stream USING[ CompletionCode, GetBlock, Handle, SubSequenceType,
TimeOut ],
VMDefs USING[ AllocatePage, PageAddress, PageIndex, pageSize,
Page, ReadPage,
RemapPage, MarkStartWait, UsePage, Release ];
Writer: MONITOR
IMPORTS HeapFileDefs, Inline, LogDefs, ObjectDirDefs,
ObjectDirXDefs,
Process, ProtocolDefs, PupStream, Stream, Storage, VMDefs
EXPORTS HeapDefs =
BEGIN
WriterData: PUBLIC TYPE = HeapXDefs.WriterData;
Handle: TYPE = POINTER TO WriterData;
Allocate: PROCEDURE[CARDINAL] RETURNS[POINTER] = Storage.Node;
-- subroutines --
SetHeader: PROCEDURE[ handle: Handle ] =
BEGIN
-- Write page header, if needed --
IF handle.wPos.word = FIRST[VMDefs.PageIndex]
THEN BEGIN
handle.page ← IF handle.reWriting
THEN VMDefs.ReadPage[handle.wPos.page, 0]
ELSE VMDefs.UsePage[handle.wPos.page];
BEGIN
header: POINTER TO HeapXDefs.PageHeader =
LOOPHOLE[handle.page,POINTER] + handle.wPos.word;
IF NOT handle.reWriting
THEN header.offset ← handle.offset
ELSE handle.offset ← header.offset;
END;
handle.wPos.word ← handle.wPos.word + SIZE[HeapXDefs.PageHeader];
END;
-- Write object or sub-object header --
handle.objectHead ← LOOPHOLE[handle.page,POINTER] + handle.wPos.word;
handle.base ← handle.wPos.word ←
handle.wPos.word + SIZE[HeapXDefs.ObjectHeader];
IF NOT handle.reWriting
THEN BEGIN
handle.objectHead.number ← handle.object;
handle.objectHead.size ← 0;
END;
END;
BadOffsetFound: ERROR = CODE;
CheckPage: PROCEDURE[ handle: Handle, min: CARDINAL ] =
BEGIN -- If necessary, write to disk and start new page --
IF handle.wPos.word + min > LAST[VMDefs.PageIndex]
THEN BEGIN
handle.objectHead.size ← handle.wPos.word - handle.base;
VMDefs.MarkStartWait[handle.page];
VMDefs.Release[handle.page];
IF handle.reWriting
THEN handle.wPos ← HeapFileDefs.NextPage[handle.wPos]
ELSE handle.wPos ← HeapFileDefs.NextWriterPage[handle.wPos];
BEGIN
wanted: HeapDefs.ObjectOffset = handle.offset;
SetHeader[handle];
IF handle.offset # wanted THEN ERROR BadOffsetFound[];
END;
END;
END;
TerminateObject: PROCEDURE[ handle: Handle ] =
BEGIN
-- pad page with "gap" object --
-- note that no non-gap object may end on a page boundary --
IF NOT handle.reWriting
THEN BEGIN
handle.objectHead.size ← handle.wPos.word - handle.base;
CheckPage[handle, SIZE[HeapXDefs.ObjectHeader] ];
BEGIN
header: POINTER TO HeapXDefs.ObjectHeader =
LOOPHOLE[handle.page,POINTER] + handle.wPos.word;
header.number ← ObjectDirXDefs.gapObjectNumber;
header.size ← LAST[VMDefs.PageIndex] -
(handle.wPos.word+SIZE[HeapXDefs.ObjectHeader]);
END;
END;
END;
-- writer allocation --
noWriter: Handle = NIL;
writerChain, freeChain: Handle ← noWriter;
HeapStartWrite: PUBLIC ENTRY PROCEDURE[ type: ObjectDirDefs.ObjectType ]
RETURNS[ res: Handle ] =
BEGIN -- on exit, 'objectHead' is valid --
LogDefs.WriteChar['<];
IF freeChain = noWriter
THEN BEGIN
res ← Allocate[SIZE[HeapXDefs.WriterData]];
Process.InitializeMonitor[@(res.LOCK)];
END
ELSE BEGIN
res ← freeChain; freeChain ← freeChain.next;
END;
res.next ← writerChain; writerChain ← res;
res.wPos ← HeapFileDefs.NewWriterPage[];
res.start ← res.wPos.page;
res.reWriting ← FALSE;
res.object ← ObjectDirXDefs.NewObject[res.wPos, type];
res.offset ← HeapDefs.objectStart;
SetHeader[res];
END;
HeapEndWrite: PUBLIC PROCEDURE[
handle: Handle,
action: PROCEDURE[ObjectDirXDefs.ObjectNumber] ] =
BEGIN
object: ObjectDirXDefs.ObjectNumber = handle.object;
SubEndWrite[handle] --must not use "handle" after here--;
-- now, object is safe on disk --
-- the following call must be outside the monitor--
action[object ! UNWIND => ObjectDirDefs.FreeObject[object] ];
ObjectDirDefs.FreeObject[object];
END;
SubEndWrite: ENTRY PROCEDURE[ handle: Handle ] =
BEGIN
-- needn't call CheckPage, as last call was WriteData or StartWrite --
TerminateObject[handle];
BEGIN
limit: VMDefs.PageAddress = IF handle.reWriting
THEN handle.max.page ELSE handle.wPos.page;
tempObj: BOOLEAN = handle.object.type = temp;
IF NOT tempObj AND limit = handle.start
THEN BEGIN
-- short writer optimisation --
single: VMDefs.PageAddress = HeapFileDefs.ClaimSinglePage[];
VMDefs.RemapPage[handle.page, single];
ObjectDirXDefs.MoveObject[handle.object, [page:single,word:0] ];
--Commit-- VMDefs.MarkStartWait[handle.page];
VMDefs.Release[handle.page];
HeapFileDefs.CommitedSinglePage[];
HeapFileDefs.ObjectAbandoned[handle.start];
END
ELSE BEGIN
VMDefs.MarkStartWait[handle.page];
VMDefs.Release[handle.page];
--Commit-- IF NOT tempObj
THEN HeapFileDefs.CommitObject[handle.start, limit]
-- ELSE leave it unchained and free explicitly when ref count=0--;
END;
END;
RemoveFromChain[handle];
END;
HeapAbandonWrite: PUBLIC ENTRY PROCEDURE[ handle: Handle ] =
BEGIN
VMDefs.Release[handle.page];
HeapFileDefs.ObjectAbandoned[handle.start]
-- ELSE ObjectDir will free it when the ref count goes to zero --;
RemoveFromChain[handle];
ObjectDirDefs.FreeObject[handle.object];
IF handle.object.type # temp
THEN ObjectDirXDefs.ReleaseObject[handle.object];
END;
RemoveFromChain: INTERNAL PROCEDURE[ handle: Handle ] =
BEGIN
BEGIN -- remove from 'writerChain' --
prev: POINTER TO Handle ← @writerChain;
WHILE prev↑ # handle DO prev ← @(prev↑.next) ENDLOOP;
prev↑ ← handle.next;
END;
LogDefs.WriteChar['>];
handle.next ← freeChain; freeChain ← handle;
END;
-- writer --
pageOverhead: CARDINAL = SIZE[HeapXDefs.PageHeader] + SIZE[HeapXDefs.ObjectHeader];
pageCapacity: CARDINAL = LAST[VMDefs.PageIndex] - pageOverhead;
-- the capacity could be greater, but wPos.word could exceed 256 --
HeapWriteData: PUBLIC ENTRY PROCEDURE[ handle: Handle,
from: HeapDefs.Buffer ] =
BEGIN
used: CARDINAL ← 0;
WHILE used < from.length
DO -- Write, or continue writing, sub-object body --
BEGIN
spare: CARDINAL= LAST[VMDefs.PageIndex] - handle.wPos.word;
-- 'spare' is never < 0 --
amount: CARDINAL = IF from.length-used > spare
THEN spare
ELSE from.length-used;
Inline.COPY[from.where+used, amount,
handle.page+handle.wPos.word];
handle.wPos.word ← handle.wPos.word + amount;
used ← used + amount;
handle.offset ← handle.offset + amount;
END;
IF handle.reWriting AND handle.wPos.page = handle.max.page
AND handle.wPos.word >= handle.max.word
THEN handle.reWriting ← FALSE;
-- Move to new page, if necessary --
CheckPage[handle, 1];
ENDLOOP;
END --HeapWriteData--;
HeapWriteString: PUBLIC PROCEDURE[handle: Handle, s: STRING] =
{ HeapWriteData[handle, [s,SIZE[StringBody[s.length]]] ] };
WriteItemHeader: PUBLIC PROCEDURE[handle: Handle,
header: BodyDefs.ItemHeader] =
{ HeapWriteData[handle, [@header,SIZE[BodyDefs.ItemHeader]] ] };
ReceiveComponent: PUBLIC PROCEDURE[handle: Handle,
str: Stream.Handle ] =
BEGIN
length: CARDINAL ← ProtocolDefs.ReceiveCount[str];
bLength: CARDINAL = 64;
buffer: ARRAY [0..bLength) OF WORD;
HeapWriteData[handle, [@length,SIZE[CARDINAL]] ];
WHILE length > 0
DO why: Stream.CompletionCode;
wanted: CARDINAL = MIN[length, bLength];
[,why,] ← --note: the component is an integral number of words--
Stream.GetBlock[str, [@buffer,0,wanted*2] !
PupStream.StreamClosing =>
ERROR ProtocolDefs.Failed[communicationError];
Stream.TimeOut =>
ERROR ProtocolDefs.Failed[noData] ];
IF why # normal THEN ERROR ProtocolDefs.Failed[protocolError];
HeapWriteData[handle, [@buffer, wanted] ];
length ← length - wanted;
ENDLOOP;
END;
ReceiveObj: PUBLIC PROCEDURE[handle: Handle,
str: Stream.Handle ] =
BEGIN
buffer: VMDefs.Page = VMDefs.AllocatePage[];
DO BEGIN
ENABLE UNWIND => VMDefs.Release[buffer];
used: CARDINAL;
why: Stream.CompletionCode;
sst: Stream.SubSequenceType;
[used,why,sst] ←
Stream.GetBlock[str, [buffer,0,VMDefs.pageSize*2] !
PupStream.StreamClosing =>
ERROR ProtocolDefs.Failed[communicationError];
Stream.TimeOut =>
ERROR ProtocolDefs.Failed[noData] ];
HeapWriteData[handle, [buffer,used/2] ];
IF why = sstChange THEN EXIT;
END;
ENDLOOP;
VMDefs.Release[buffer];
END;
GetWriterOffset: PUBLIC ENTRY PROCEDURE[ handle: Handle ]
RETURNS[ HeapDefs.ObjectOffset ] =
BEGIN
RETURN[handle.offset];
END;
OffsetTooBig: ERROR = CODE;
MissedOffset: ERROR = CODE;
SetWriterOffset: PUBLIC ENTRY PROCEDURE[ handle: Handle,
offset: HeapDefs.ObjectOffset ] =
BEGIN
TerminateObject[handle];
IF NOT handle.reWriting THEN handle.max ← handle.wPos;
handle.reWriting ← TRUE;
VMDefs.MarkStartWait[handle.page];
VMDefs.Release[handle.page];
handle.wPos ← [page:handle.start, word:FIRST[VMDefs.PageIndex] ];
handle.offset ← HeapDefs.objectStart;
-- skip to approximate page --
WHILE offset >= handle.offset + pageCapacity
DO handle.offset ← handle.offset + pageCapacity;
IF handle.wPos.page = handle.max.page THEN ERROR OffsetTooBig[];
handle.wPos ← HeapFileDefs.NextPage[handle.wPos];
ENDLOOP;
SetHeader[handle]; -- picks up real handle.offset --
IF handle.offset > offset THEN ERROR MissedOffset;
-- skip to correct page --
WHILE offset >= handle.offset + handle.objectHead.size
AND pageOverhead + handle.objectHead.size + SIZE[HeapXDefs.ObjectHeader]
> LAST[VMDefs.PageIndex]
-- i.e. while not in this page and not end of object --
DO VMDefs.Release[handle.page];
IF handle.wPos.page = handle.max.page THEN ERROR OffsetTooBig[];
handle.wPos ← HeapFileDefs.NextPage[handle.wPos];
SetHeader[handle];
ENDLOOP;
handle.wPos.word ← handle.wPos.word
+ Inline.LowHalf[offset - handle.offset];
handle.offset ← offset;
IF handle.wPos.page = handle.max.page
AND handle.wPos.word > handle.max.word
THEN ERROR OffsetTooBig[];
END;
END.