-- Transport Mechanism Filestore - writer for heap objects -- -- [Juniper]<Grapevine>MS>Writer.mesa -- Randy Gobbel 19-May-81 12:53:09 -- -- Andrew Birrell 12-Jun-81 11:32:29 -- -- Mark Johnson 11-Nov-81 11:09:31 -- -- Brenda Hankins 22-Oct-84 14:35:11 DIRECTORY BodyDefs USING [ItemHeader], HeapDefs USING [Buffer, ObjectOffset, objectStart], HeapFileDefs USING [ ClaimSinglePage, CommitedSinglePage, CommitObject, NextPage, NewWriterPage, NextWriterPage, ObjectAbandoned, UseNormalPath], HeapXDefs USING [ObjectHeader, PageHeader, WriterData], Inline USING [COPY, LongDivMod], LogDefs USING [WriteChar], ObjectDirDefs USING [FreeObject, ObjectType], ObjectDirXDefs USING [ gapObjectNumber, MoveObject, NewObject, ObjectNumber, ReleaseObject], Process USING [InitializeMonitor], ProtocolDefs USING [Failed, ReceiveCount], PupStream USING [StreamClosing], Storage USING [Node], Stream USING [CompletionCode, GetBlock, Handle, SubSequenceType, TimeOut], VMDefs USING [ AllocatePage, FullAddress, PageAddress, PageIndex, pageSize, Page, ReadPage, RemapPage, MarkStartWait, UsePage, Release]; Writer: MONITOR IMPORTS HeapFileDefs, Inline, LogDefs, ObjectDirDefs, ObjectDirXDefs, Process, ProtocolDefs, PupStream, Stream, Storage, VMDefs EXPORTS HeapDefs = BEGIN WriterData: PUBLIC TYPE = HeapXDefs.WriterData; Handle: TYPE = POINTER TO WriterData; pageOverhead: CARDINAL = SIZE[HeapXDefs.PageHeader] + SIZE[HeapXDefs.ObjectHeader]; pageCapacity: CARDINAL = LAST[VMDefs.PageIndex] - pageOverhead; -- The capacity could be greater, but wPos.word could exceed 256. Allocate: PROCEDURE [CARDINAL] RETURNS [POINTER] = Storage.Node; -- subroutines -- BadOffsetFound: ERROR = CODE; InitPage: PROCEDURE [handle: Handle] = -- used to be SetHeader BEGIN -- Write page header, if needed -- handle.page ← IF handle.reWriting THEN VMDefs.ReadPage[handle.wPos.page, 0] ELSE VMDefs.UsePage[handle.wPos.page]; BEGIN header: POINTER TO HeapXDefs.PageHeader = LOOPHOLE[handle.page, POINTER]; IF NOT handle.reWriting THEN header.offset ← handle.offset ELSE IF handle.offset # header.offset THEN ERROR BadOffsetFound[]; END; -- Write object or sub-object header -- handle.objectHead ← LOOPHOLE[handle.page, POINTER] + SIZE[HeapXDefs.PageHeader]; handle.wPos.word ← pageOverhead; IF NOT handle.reWriting THEN { handle.objectHead.number ← handle.object; handle.objectHead.size ← 0}; END; CheckPage: PROCEDURE [handle: Handle, min: CARDINAL] = BEGIN -- If necessary, write to disk and start new page -- IF handle.wPos.word + min > LAST[VMDefs.PageIndex] THEN BEGIN IF NOT handle.reWriting THEN handle.objectHead.size ← handle.wPos.word - pageOverhead; VMDefs.MarkStartWait[handle.page]; VMDefs.Release[handle.page]; IF handle.reWriting THEN handle.wPos ← HeapFileDefs.NextPage[handle.wPos] ELSE handle.wPos ← HeapFileDefs.NextWriterPage[handle.wPos]; InitPage[handle]; END; END; -- writer allocation -- noWriter: Handle = NIL; writerChain, freeChain: Handle ← noWriter; writerCount: LONG CARDINAL ← 0; HeapStartWrite: PUBLIC ENTRY PROCEDURE [type: ObjectDirDefs.ObjectType] RETURNS [res: Handle] = BEGIN -- on exit, 'objectHead' is valid -- LogDefs.WriteChar['<]; IF freeChain = noWriter THEN BEGIN res ← Allocate[SIZE[HeapXDefs.WriterData]]; Process.InitializeMonitor[@(res.LOCK)]; END ELSE {res ← freeChain; freeChain ← freeChain.next}; res.next ← writerChain; writerChain ← res; res.wPos ← HeapFileDefs.NewWriterPage[]; res.start ← res.wPos.page; res.reWriting ← FALSE; res.object ← ObjectDirXDefs.NewObject[res.wPos, type]; res.offset ← res.maxOffset ← HeapDefs.objectStart; InitPage[res]; writerCount ← writerCount + 1; END; HeapEndWrite: PUBLIC PROCEDURE [ handle: Handle, action: PROCEDURE [ObjectDirXDefs.ObjectNumber]] = BEGIN object: ObjectDirXDefs.ObjectNumber = handle.object; DecrementWriters: ENTRY PROC = INLINE {writerCount ← writerCount - 1}; SubEndWrite[handle]; --must not use "handle" after here -- -- now, object is safe on disk -- action[ object ! -- this call must be outside the monitor -- UNWIND => {ObjectDirDefs.FreeObject[object]; DecrementWriters[]}]; ObjectDirDefs.FreeObject[object]; DecrementWriters[]; END; SubEndWrite: PROCEDURE [handle: Handle] = BEGIN committed: BOOLEAN ← FALSE; tempObj: BOOLEAN = handle.object.type = temp; MonitoredOperations: ENTRY PROCEDURE = BEGIN -- don't see why these (except last line) need be monitored, but JUST to be safe... IF NOT tempObj AND handle.wPos.page = handle.start THEN BEGIN -- short writer optimization -- single: VMDefs.PageAddress = HeapFileDefs.ClaimSinglePage[ ! HeapFileDefs.UseNormalPath => GOTO usePageWeHave]; VMDefs.RemapPage[handle.page, single]; ObjectDirXDefs.MoveObject[handle.object, [page: single, word: 0]]; --Commit-- VMDefs.MarkStartWait[handle.page]; VMDefs.Release[handle.page]; HeapFileDefs.CommitedSinglePage[]; HeapFileDefs.ObjectAbandoned[handle.start]; committed ← TRUE; EXITS usePageWeHave => NULL; END; IF ~committed THEN BEGIN -- not a single page or single page failed. VMDefs.MarkStartWait[handle.page]; VMDefs.Release[handle.page]; --Commit-- IF NOT tempObj THEN HeapFileDefs.CommitObject[handle.start, handle.wPos.page] -- ELSE leave it unchained and free explicitly when ref count=0-- ; END; RemoveFromChain[handle]; END; -- proc. MonitoredOperations -- needn't call CheckPage, as last call was WriteData or StartWrite (?)-- -- Pad last page with "gap" object (was done once if rewriting): -- Note that no non-gap object may end on a page boundary. IF handle.reWriting THEN SetWriterOffset[handle, handle.maxOffset]; -- note that previous SetWriterOffset wrote a gap object there but guess -- that various fields of writer are not set up to take that into account. -- could set it up properly ourselves. handle.objectHead.size ← handle.wPos.word - pageOverhead; CheckPage[handle, SIZE[HeapXDefs.ObjectHeader]]; BEGIN header: POINTER TO HeapXDefs.ObjectHeader = LOOPHOLE[handle.page, POINTER] + handle.wPos.word; header.number ← ObjectDirXDefs.gapObjectNumber; header.size ← LAST[VMDefs.PageIndex] - (handle.wPos.word + SIZE[HeapXDefs.ObjectHeader]); END; MonitoredOperations[]; END; -- proc. SubEndWrite HeapAbandonWrite: PUBLIC ENTRY PROCEDURE [handle: Handle] = BEGIN VMDefs.Release[handle.page]; IF handle.object.type # temp THEN HeapFileDefs.ObjectAbandoned[handle.start] -- ELSE ObjectDir will free it when the ref count goes to zero -- ; RemoveFromChain[handle]; ObjectDirDefs.FreeObject[handle.object]; IF handle.object.type # temp THEN ObjectDirXDefs.ReleaseObject[handle.object]; -- don't see what this latter does. writerCount ← writerCount - 1; END; RemoveFromChain: INTERNAL PROCEDURE [handle: Handle] = BEGIN -- remove from 'writerChain' -- prev: POINTER TO Handle ← @writerChain; WHILE prev↑ # handle DO prev ← @(prev↑.next) ENDLOOP; prev↑ ← handle.next; LogDefs.WriteChar['>]; handle.next ← freeChain; freeChain ← handle; END; -- writer -- HeapWriteData: PUBLIC ENTRY PROCEDURE [handle: Handle, from: HeapDefs.Buffer] = BEGIN used: CARDINAL ← 0; WHILE used < from.length DO -- Write, or continue writing, sub-object body -- spare: CARDINAL = LAST[VMDefs.PageIndex] - handle.wPos.word; -- 'spare' is never < 0 -- amount: CARDINAL = IF from.length - used > spare THEN spare ELSE from.length - used; Inline.COPY[from.where + used, amount, handle.page + handle.wPos.word]; handle.wPos.word ← handle.wPos.word + amount; used ← used + amount; handle.offset ← handle.offset + amount; CheckPage[handle, 1]; -- Move to new page, if necessary -- IF handle.offset >= handle.maxOffset THEN handle.reWriting ← FALSE; IF NOT handle.reWriting THEN handle.maxOffset ← handle.offset; ENDLOOP; END --HeapWriteData-- ; HeapWriteString: PUBLIC PROCEDURE [handle: Handle, s: STRING] = { HeapWriteData[handle, [s, SIZE[StringBody [s.length]]]]}; WriteItemHeader: PUBLIC PROCEDURE [ handle: Handle, header: BodyDefs.ItemHeader] = { HeapWriteData[handle, [@header, SIZE[BodyDefs.ItemHeader]]]}; ReceiveComponent: PUBLIC PROCEDURE [handle: Handle, str: Stream.Handle] = BEGIN length: CARDINAL ← ProtocolDefs.ReceiveCount[str]; bLength: CARDINAL = 64; buffer: ARRAY [0..bLength) OF WORD; bufferAddr: LONG POINTER ← @buffer; HeapWriteData[handle, [@length, SIZE[CARDINAL]]]; WHILE length > 0 DO why: Stream.CompletionCode; wanted: CARDINAL = MIN[length, bLength]; [, why, ] ← --note: the component is an integral number of words-- Stream.GetBlock[ str, [LOOPHOLE[bufferAddr], 0, wanted * 2] ! PupStream.StreamClosing => ERROR ProtocolDefs.Failed[communicationError]; Stream.TimeOut => ERROR ProtocolDefs.Failed[noData]]; IF why # normal THEN ERROR ProtocolDefs.Failed[protocolError]; HeapWriteData[handle, [@buffer, wanted]]; length ← length - wanted; ENDLOOP; END; ReceiveObj: PUBLIC PROCEDURE [handle: Handle, str: Stream.Handle] = BEGIN buffer: VMDefs.Page = VMDefs.AllocatePage[]; bufferAddr: LONG POINTER ← buffer; DO BEGIN ENABLE UNWIND => VMDefs.Release[buffer]; used: CARDINAL; why: Stream.CompletionCode; sst: Stream.SubSequenceType; [used, why, sst] ← Stream.GetBlock[ str, [bufferAddr, 0, VMDefs.pageSize * 2] ! PupStream.StreamClosing => ERROR ProtocolDefs.Failed[communicationError]; Stream.TimeOut => ERROR ProtocolDefs.Failed[noData]]; HeapWriteData[handle, [buffer, used / 2]]; IF why = sstChange THEN EXIT; END; ENDLOOP; VMDefs.Release[buffer]; END; GetWriterOffset: PUBLIC ENTRY PROCEDURE [handle: Handle] RETURNS [HeapDefs.ObjectOffset] = {RETURN[handle.offset]}; OffsetTooBig: ERROR = CODE; MissedOffset: ERROR = CODE; -- the following is the only one I didn't re check real carefully: SetWriterOffset: PUBLIC ENTRY PROCEDURE [ handle: Handle, offset: HeapDefs.ObjectOffset] = BEGIN offsetPage, offsetWord: CARDINAL; IF NOT handle.reWriting THEN handle.maxOffset ← handle.offset; IF offset > handle.maxOffset THEN ERROR OffsetTooBig[]; [offsetPage, offsetWord] ← Inline.LongDivMod[offset, pageCapacity]; IF handle.offset / pageCapacity # offsetPage THEN BEGIN -- changing pages, will need to write out current one IF offset < handle.offset THEN -- moving backwards BEGIN IF ~handle.reWriting THEN -- record current sub obj header info handle.objectHead.size ← handle.wPos.word - pageOverhead; IF handle.offset / pageCapacity = handle.maxOffset / pageCapacity THEN BEGIN -- on last page of object (must be full or have gap obj) lastPos: VMDefs.PageIndex; [, lastPos] ← Inline.LongDivMod[handle.maxOffset, pageCapacity]; lastPos ← lastPos + pageOverhead; IF lastPos + SIZE[HeapXDefs.ObjectHeader] <= LAST[VMDefs.PageIndex] THEN BEGIN -- not full, append a gap obj (even if already has one) header: POINTER TO HeapXDefs.ObjectHeader = LOOPHOLE[handle.page, POINTER] + lastPos; header.number ← ObjectDirXDefs.gapObjectNumber; header.size ← LAST[VMDefs.PageIndex] - (lastPos + SIZE[HeapXDefs.ObjectHeader]); END; -- gap obj check END; -- full? handle.reWriting ← TRUE; -- position at beginning of obj to search for desired pos: handle.wPos ← [page: handle.start, word: FIRST[VMDefs.PageIndex]]; handle.offset ← HeapDefs.objectStart; END -- moving backwards ELSE handle.offset ← (handle.offset / pageCapacity) * pageCapacity; -- write current page: VMDefs.MarkStartWait[handle.page]; VMDefs.Release[handle.page]; -- find page we're looking for: WHILE handle.offset / pageCapacity # offsetPage DO handle.offset ← handle.offset + pageCapacity; handle.wPos ← HeapFileDefs.NextPage[handle.wPos]; ENDLOOP; InitPage[handle]; END -- changing pages ELSE IF offset < handle.offset THEN handle.reWriting ← TRUE; handle.wPos.word ← pageOverhead + offsetWord; IF (handle.offset ← offset) = handle.maxOffset THEN handle.reWriting ← FALSE; END; -- proc. SetWriterOffset END. log: 15-Aug-84 14:15:59 - blh: made same as product Writer.mesa 22-Oct-84 14:35:37 - blh: fixed last 3 lines of HeapWriteData (swapped).