-- Transport Mechanism Filestore - writer for heap objects -- -- [Juniper]MS>Writer.mesa -- Randy Gobbel 19-May-81 12:53:09 -- -- Andrew Birrell 12-Jun-81 11:32:29 -- -- M. D. Schroeder 7-Feb-83 15:43:13 -- DIRECTORY BodyDefs USING[ ItemHeader ], HeapDefs USING[ Buffer, ObjectOffset, objectStart ], HeapFileDefs USING[ ClaimSinglePage, CommitedSinglePage, CommitObject, NextPage, NewWriterPage, NextWriterPage, ObjectAbandoned ], HeapXDefs USING[ ObjectHeader, PageHeader, WriterData ], Inline USING[ COPY, LowHalf ], LogDefs USING[ WriteChar ], ObjectDirDefs USING[ FreeObject, ObjectType ], ObjectDirXDefs USING[ gapObjectNumber, MoveObject, NewObject, ObjectNumber, ReleaseObject ], Process USING[ InitializeMonitor ], ProtocolDefs USING[ Failed, ReceiveCount ], PupStream USING[ StreamClosing ], Storage USING[ Node ], Stream USING[ CompletionCode, GetBlock, Handle, SubSequenceType, TimeOut ], VMDefs USING[ AllocatePage, PageAddress, PageIndex, pageSize, Page, ReadPage, RemapPage, MarkStartWait, UsePage, Release ]; Writer: MONITOR IMPORTS HeapFileDefs, Inline, LogDefs, ObjectDirDefs, ObjectDirXDefs, Process, ProtocolDefs, PupStream, Stream, Storage, VMDefs EXPORTS HeapDefs = BEGIN WriterData: PUBLIC TYPE = HeapXDefs.WriterData; Handle: TYPE = POINTER TO WriterData; Allocate: PROCEDURE[CARDINAL] RETURNS[POINTER] = Storage.Node; -- subroutines -- SetHeader: PROCEDURE[ handle: Handle ] = BEGIN -- Write page header, if needed -- IF handle.wPos.word = FIRST[VMDefs.PageIndex] THEN BEGIN handle.page _ IF handle.reWriting THEN VMDefs.ReadPage[handle.wPos.page, 0] ELSE VMDefs.UsePage[handle.wPos.page]; BEGIN header: POINTER TO HeapXDefs.PageHeader = LOOPHOLE[handle.page,POINTER] + handle.wPos.word; IF NOT handle.reWriting THEN header.offset _ handle.offset ELSE handle.offset _ header.offset; END; handle.wPos.word _ handle.wPos.word + SIZE[HeapXDefs.PageHeader]; END; -- Write object or sub-object header -- handle.objectHead _ LOOPHOLE[handle.page,POINTER] + handle.wPos.word; handle.base _ handle.wPos.word _ handle.wPos.word + SIZE[HeapXDefs.ObjectHeader]; IF NOT handle.reWriting THEN BEGIN handle.objectHead.number _ handle.object; handle.objectHead.size _ 0; END; END; BadOffsetFound: ERROR = CODE; CheckPage: PROCEDURE[ handle: Handle, min: CARDINAL ] = BEGIN -- If necessary, write to disk and start new page -- IF handle.wPos.word + min > LAST[VMDefs.PageIndex] THEN BEGIN handle.objectHead.size _ handle.wPos.word - handle.base; VMDefs.MarkStartWait[handle.page]; VMDefs.Release[handle.page]; IF handle.reWriting THEN handle.wPos _ HeapFileDefs.NextPage[handle.wPos] ELSE handle.wPos _ HeapFileDefs.NextWriterPage[handle.wPos]; BEGIN wanted: HeapDefs.ObjectOffset = handle.offset; SetHeader[handle]; IF handle.offset # wanted THEN ERROR BadOffsetFound[]; END; END; END; TerminateObject: PROCEDURE[ handle: Handle ] = BEGIN -- pad page with "gap" object -- -- note that no non-gap object may end on a page boundary -- IF NOT handle.reWriting THEN BEGIN handle.objectHead.size _ handle.wPos.word - handle.base; CheckPage[handle, SIZE[HeapXDefs.ObjectHeader] ]; BEGIN header: POINTER TO HeapXDefs.ObjectHeader = LOOPHOLE[handle.page,POINTER] + handle.wPos.word; header.number _ ObjectDirXDefs.gapObjectNumber; header.size _ LAST[VMDefs.PageIndex] - (handle.wPos.word+SIZE[HeapXDefs.ObjectHeader]); END; END; END; -- writer allocation -- noWriter: Handle = NIL; writerChain, freeChain: Handle _ noWriter; HeapStartWrite: PUBLIC ENTRY PROCEDURE[ type: ObjectDirDefs.ObjectType ] RETURNS[ res: Handle ] = BEGIN -- on exit, 'objectHead' is valid -- LogDefs.WriteChar['<]; IF freeChain = noWriter THEN BEGIN res _ Allocate[SIZE[HeapXDefs.WriterData]]; Process.InitializeMonitor[@(res.LOCK)]; END ELSE BEGIN res _ freeChain; freeChain _ freeChain.next; END; res.next _ writerChain; writerChain _ res; res.wPos _ HeapFileDefs.NewWriterPage[]; res.start _ res.wPos.page; res.reWriting _ FALSE; res.object _ ObjectDirXDefs.NewObject[res.wPos, type]; res.offset _ HeapDefs.objectStart; SetHeader[res]; END; HeapEndWrite: PUBLIC PROCEDURE[ handle: Handle, action: PROCEDURE[ObjectDirXDefs.ObjectNumber] ] = BEGIN object: ObjectDirXDefs.ObjectNumber = handle.object; SubEndWrite[handle] --must not use "handle" after here--; -- now, object is safe on disk -- -- the following call must be outside the monitor-- action[object ! UNWIND => ObjectDirDefs.FreeObject[object] ]; ObjectDirDefs.FreeObject[object]; END; SubEndWrite: ENTRY PROCEDURE[ handle: Handle ] = BEGIN -- needn't call CheckPage, as last call was WriteData or StartWrite -- TerminateObject[handle]; BEGIN limit: VMDefs.PageAddress = IF handle.reWriting THEN handle.max.page ELSE handle.wPos.page; tempObj: BOOLEAN = handle.object.type = temp; IF NOT tempObj AND limit = handle.start THEN BEGIN -- short writer optimisation -- single: VMDefs.PageAddress = HeapFileDefs.ClaimSinglePage[]; VMDefs.RemapPage[handle.page, single]; ObjectDirXDefs.MoveObject[handle.object, [page:single,word:0] ]; --Commit-- VMDefs.MarkStartWait[handle.page]; VMDefs.Release[handle.page]; HeapFileDefs.CommitedSinglePage[]; HeapFileDefs.ObjectAbandoned[handle.start]; END ELSE BEGIN VMDefs.MarkStartWait[handle.page]; VMDefs.Release[handle.page]; --Commit-- IF NOT tempObj THEN HeapFileDefs.CommitObject[handle.start, limit] -- ELSE leave it unchained and free explicitly when ref count=0--; END; END; RemoveFromChain[handle]; END; HeapAbandonWrite: PUBLIC ENTRY PROCEDURE[ handle: Handle ] = BEGIN VMDefs.Release[handle.page]; HeapFileDefs.ObjectAbandoned[handle.start] -- ELSE ObjectDir will free it when the ref count goes to zero --; RemoveFromChain[handle]; ObjectDirDefs.FreeObject[handle.object]; IF handle.object.type # temp THEN ObjectDirXDefs.ReleaseObject[handle.object]; END; RemoveFromChain: INTERNAL PROCEDURE[ handle: Handle ] = BEGIN BEGIN -- remove from 'writerChain' -- prev: POINTER TO Handle _ @writerChain; WHILE prev^ # handle DO prev _ @(prev^.next) ENDLOOP; prev^ _ handle.next; END; LogDefs.WriteChar['>]; handle.next _ freeChain; freeChain _ handle; END; -- writer -- pageOverhead: CARDINAL = SIZE[HeapXDefs.PageHeader] + SIZE[HeapXDefs.ObjectHeader]; pageCapacity: CARDINAL = LAST[VMDefs.PageIndex] - pageOverhead; -- the capacity could be greater, but wPos.word could exceed 256 -- HeapWriteData: PUBLIC ENTRY PROCEDURE[ handle: Handle, from: HeapDefs.Buffer ] = BEGIN used: CARDINAL _ 0; WHILE used < from.length DO -- Write, or continue writing, sub-object body -- BEGIN spare: CARDINAL= LAST[VMDefs.PageIndex] - handle.wPos.word; -- 'spare' is never < 0 -- amount: CARDINAL = IF from.length-used > spare THEN spare ELSE from.length-used; Inline.COPY[from.where+used, amount, handle.page+handle.wPos.word]; handle.wPos.word _ handle.wPos.word + amount; used _ used + amount; handle.offset _ handle.offset + amount; END; IF handle.reWriting AND handle.wPos.page = handle.max.page AND handle.wPos.word >= handle.max.word THEN handle.reWriting _ FALSE; -- Move to new page, if necessary -- CheckPage[handle, 1]; ENDLOOP; END --HeapWriteData--; HeapWriteString: PUBLIC PROCEDURE[handle: Handle, s: STRING] = { HeapWriteData[handle, [s,SIZE[StringBody[s.length]]] ] }; WriteItemHeader: PUBLIC PROCEDURE[handle: Handle, header: BodyDefs.ItemHeader] = { HeapWriteData[handle, [@header,SIZE[BodyDefs.ItemHeader]] ] }; ReceiveComponent: PUBLIC PROCEDURE[handle: Handle, str: Stream.Handle ] = BEGIN length: CARDINAL _ ProtocolDefs.ReceiveCount[str]; bLength: CARDINAL = 64; buffer: ARRAY [0..bLength) OF WORD; HeapWriteData[handle, [@length,SIZE[CARDINAL]] ]; WHILE length > 0 DO why: Stream.CompletionCode; wanted: CARDINAL = MIN[length, bLength]; [,why,] _ --note: the component is an integral number of words-- Stream.GetBlock[str, [@buffer,0,wanted*2] ! PupStream.StreamClosing => ERROR ProtocolDefs.Failed[communicationError]; Stream.TimeOut => ERROR ProtocolDefs.Failed[noData] ]; IF why # normal THEN ERROR ProtocolDefs.Failed[protocolError]; HeapWriteData[handle, [@buffer, wanted] ]; length _ length - wanted; ENDLOOP; END; ReceiveObj: PUBLIC PROCEDURE[handle: Handle, str: Stream.Handle ] = BEGIN buffer: VMDefs.Page = VMDefs.AllocatePage[]; DO BEGIN ENABLE UNWIND => VMDefs.Release[buffer]; used: CARDINAL; why: Stream.CompletionCode; sst: Stream.SubSequenceType; [used,why,sst] _ Stream.GetBlock[str, [buffer,0,VMDefs.pageSize*2] ! PupStream.StreamClosing => ERROR ProtocolDefs.Failed[communicationError]; Stream.TimeOut => ERROR ProtocolDefs.Failed[noData] ]; HeapWriteData[handle, [buffer,used/2] ]; IF why = sstChange THEN EXIT; END; ENDLOOP; VMDefs.Release[buffer]; END; GetWriterOffset: PUBLIC ENTRY PROCEDURE[ handle: Handle ] RETURNS[ HeapDefs.ObjectOffset ] = BEGIN RETURN[handle.offset]; END; OffsetTooBig: ERROR = CODE; MissedOffset: ERROR = CODE; SetWriterOffset: PUBLIC ENTRY PROCEDURE[ handle: Handle, offset: HeapDefs.ObjectOffset ] = BEGIN TerminateObject[handle]; IF NOT handle.reWriting THEN handle.max _ handle.wPos; handle.reWriting _ TRUE; VMDefs.MarkStartWait[handle.page]; VMDefs.Release[handle.page]; handle.wPos _ [page:handle.start, word:FIRST[VMDefs.PageIndex] ]; handle.offset _ HeapDefs.objectStart; -- skip to approximate page -- WHILE offset >= handle.offset + pageCapacity DO handle.offset _ handle.offset + pageCapacity; IF handle.wPos.page = handle.max.page THEN ERROR OffsetTooBig[]; handle.wPos _ HeapFileDefs.NextPage[handle.wPos]; ENDLOOP; SetHeader[handle]; -- picks up real handle.offset -- IF handle.offset > offset THEN ERROR MissedOffset; -- skip to correct page -- WHILE offset >= handle.offset + handle.objectHead.size AND pageOverhead + handle.objectHead.size + SIZE[HeapXDefs.ObjectHeader] > LAST[VMDefs.PageIndex] -- i.e. while not in this page and not end of object -- DO VMDefs.Release[handle.page]; IF handle.wPos.page = handle.max.page THEN ERROR OffsetTooBig[]; handle.wPos _ HeapFileDefs.NextPage[handle.wPos]; SetHeader[handle]; ENDLOOP; handle.wPos.word _ handle.wPos.word + Inline.LowHalf[offset - handle.offset]; handle.offset _ offset; IF handle.wPos.page = handle.max.page AND handle.wPos.word > handle.max.word THEN ERROR OffsetTooBig[]; END; END.