-- Transport Mechanism Filestore - writer for heap objects --

-- [Juniper]<Grapevine>MS>Writer.mesa

-- Randy Gobbel		19-May-81 12:53:09 --
-- Andrew Birrell	12-Jun-81 11:32:29 --
-- M. D. Schroeder       7-Feb-83 15:43:13 --

DIRECTORY
BodyDefs	USING[ ItemHeader ],
HeapDefs	USING[ Buffer, ObjectOffset, objectStart ],
HeapFileDefs	USING[ ClaimSinglePage, CommitedSinglePage, CommitObject,
		       NextPage, NewWriterPage, NextWriterPage,
		       ObjectAbandoned ],
HeapXDefs	USING[ ObjectHeader, PageHeader, WriterData ],
Inline		USING[ COPY, LowHalf ],
LogDefs		USING[ WriteChar ],
ObjectDirDefs	USING[ FreeObject, ObjectType ],
ObjectDirXDefs	USING[ gapObjectNumber, MoveObject, NewObject,
		       ObjectNumber, ReleaseObject ],
Process		USING[ InitializeMonitor ],
ProtocolDefs	USING[ Failed, ReceiveCount ],
PupStream	USING[ StreamClosing ],
Storage		USING[ Node ],
Stream		USING[ CompletionCode, GetBlock, Handle, SubSequenceType,
		       TimeOut ],
VMDefs		USING[ AllocatePage, PageAddress, PageIndex, pageSize,
                           Page, ReadPage,
                           RemapPage, MarkStartWait, UsePage, Release ];

Writer: MONITOR
   IMPORTS HeapFileDefs, Inline, LogDefs, ObjectDirDefs,
           ObjectDirXDefs,
           Process, ProtocolDefs, PupStream, Stream, Storage, VMDefs
   EXPORTS HeapDefs =

BEGIN

WriterData: PUBLIC TYPE = HeapXDefs.WriterData;
Handle: TYPE = POINTER TO WriterData;


Allocate: PROCEDURE[CARDINAL] RETURNS[POINTER] = Storage.Node;

-- subroutines --

SetHeader: PROCEDURE[ handle: Handle ] =
   BEGIN
   -- Write page header, if needed --
   IF handle.wPos.word = FIRST[VMDefs.PageIndex]
   THEN BEGIN
        handle.page ← IF handle.reWriting
                      THEN VMDefs.ReadPage[handle.wPos.page, 0]
                      ELSE VMDefs.UsePage[handle.wPos.page];
        BEGIN
           header: POINTER TO HeapXDefs.PageHeader =
                      LOOPHOLE[handle.page,POINTER] + handle.wPos.word;
           IF NOT handle.reWriting
           THEN header.offset ← handle.offset
           ELSE handle.offset ← header.offset;
        END;
        handle.wPos.word ← handle.wPos.word + SIZE[HeapXDefs.PageHeader];
        END;
   -- Write object or sub-object header --
   handle.objectHead ← LOOPHOLE[handle.page,POINTER] + handle.wPos.word;
   handle.base ← handle.wPos.word ←
      handle.wPos.word + SIZE[HeapXDefs.ObjectHeader];
   IF NOT handle.reWriting
   THEN BEGIN
        handle.objectHead.number ← handle.object;
        handle.objectHead.size ← 0;
        END;
   END;


BadOffsetFound: ERROR = CODE;

CheckPage: PROCEDURE[ handle: Handle, min: CARDINAL ] =
   BEGIN -- If necessary, write to disk and start new page --
   IF handle.wPos.word + min > LAST[VMDefs.PageIndex] 
   THEN BEGIN
        handle.objectHead.size ← handle.wPos.word - handle.base;
        VMDefs.MarkStartWait[handle.page];
        VMDefs.Release[handle.page];
        IF handle.reWriting
        THEN handle.wPos ← HeapFileDefs.NextPage[handle.wPos]
        ELSE handle.wPos ← HeapFileDefs.NextWriterPage[handle.wPos];
        BEGIN
           wanted: HeapDefs.ObjectOffset = handle.offset;
           SetHeader[handle];
           IF handle.offset # wanted THEN ERROR BadOffsetFound[];
        END;
        END;
   END;

TerminateObject: PROCEDURE[ handle: Handle ] =
   BEGIN
   -- pad page with "gap" object --
   -- note that no non-gap object may end on a page boundary --
   IF NOT handle.reWriting
   THEN BEGIN
        handle.objectHead.size ← handle.wPos.word - handle.base;
        CheckPage[handle, SIZE[HeapXDefs.ObjectHeader] ];
        BEGIN
           header: POINTER TO HeapXDefs.ObjectHeader =
              LOOPHOLE[handle.page,POINTER] + handle.wPos.word;
           header.number ← ObjectDirXDefs.gapObjectNumber;
           header.size ← LAST[VMDefs.PageIndex] -
              (handle.wPos.word+SIZE[HeapXDefs.ObjectHeader]);
        END;
        END;
   END;


-- writer allocation --

noWriter: Handle = NIL;

writerChain, freeChain: Handle ← noWriter;

HeapStartWrite: PUBLIC ENTRY PROCEDURE[ type: ObjectDirDefs.ObjectType ]
                               RETURNS[ res: Handle ] =
   BEGIN -- on exit, 'objectHead' is valid --
   LogDefs.WriteChar['<];
   IF freeChain = noWriter
   THEN BEGIN
        res ← Allocate[SIZE[HeapXDefs.WriterData]];
        Process.InitializeMonitor[@(res.LOCK)];
        END
   ELSE BEGIN
        res ← freeChain; freeChain ← freeChain.next;
        END;
   res.next ← writerChain; writerChain ← res;
   res.wPos ← HeapFileDefs.NewWriterPage[];
   res.start ← res.wPos.page;
   res.reWriting ← FALSE;
   res.object ← ObjectDirXDefs.NewObject[res.wPos, type];
   res.offset ← HeapDefs.objectStart;
   SetHeader[res];
   END;

HeapEndWrite: PUBLIC PROCEDURE[
                            handle: Handle,
                            action: PROCEDURE[ObjectDirXDefs.ObjectNumber] ] =
   BEGIN
   object: ObjectDirXDefs.ObjectNumber = handle.object;
   SubEndWrite[handle] --must not use "handle" after here--;
   -- now, object is safe on disk --
   -- the following call must be outside the monitor--
   action[object ! UNWIND => ObjectDirDefs.FreeObject[object] ];
   ObjectDirDefs.FreeObject[object];
   END;

SubEndWrite: ENTRY PROCEDURE[ handle: Handle ] =
   BEGIN
   -- needn't call CheckPage, as last call was WriteData or StartWrite --

   TerminateObject[handle];

   BEGIN
      limit: VMDefs.PageAddress = IF handle.reWriting
               THEN handle.max.page ELSE handle.wPos.page;
      tempObj: BOOLEAN = handle.object.type = temp;
      IF NOT tempObj AND limit = handle.start
      THEN BEGIN
           -- short writer optimisation --
           single: VMDefs.PageAddress = HeapFileDefs.ClaimSinglePage[];
           VMDefs.RemapPage[handle.page, single];
           ObjectDirXDefs.MoveObject[handle.object, [page:single,word:0] ];
--Commit-- VMDefs.MarkStartWait[handle.page]; 
           VMDefs.Release[handle.page];
           HeapFileDefs.CommitedSinglePage[];
           HeapFileDefs.ObjectAbandoned[handle.start];
           END
      ELSE BEGIN
           VMDefs.MarkStartWait[handle.page];
           VMDefs.Release[handle.page];
--Commit-- IF NOT tempObj
           THEN HeapFileDefs.CommitObject[handle.start, limit]
        -- ELSE leave it unchained and free explicitly when ref count=0--;
           END;
   END;
   RemoveFromChain[handle];

   END;

HeapAbandonWrite: PUBLIC ENTRY PROCEDURE[ handle: Handle ] =
   BEGIN
   VMDefs.Release[handle.page];
   HeapFileDefs.ObjectAbandoned[handle.start]
-- ELSE ObjectDir will free it when the ref count goes to zero --;
   RemoveFromChain[handle];
   ObjectDirDefs.FreeObject[handle.object];
   IF handle.object.type # temp
   THEN ObjectDirXDefs.ReleaseObject[handle.object];
   END;

RemoveFromChain: INTERNAL PROCEDURE[ handle: Handle ] =
   BEGIN
   BEGIN -- remove from 'writerChain' --
      prev: POINTER TO Handle ← @writerChain;
      WHILE prev↑ # handle DO prev ← @(prev↑.next) ENDLOOP;
      prev↑ ← handle.next;
   END;
   LogDefs.WriteChar['>];
   handle.next ← freeChain; freeChain ← handle;
   END;


-- writer --

pageOverhead: CARDINAL = SIZE[HeapXDefs.PageHeader] + SIZE[HeapXDefs.ObjectHeader];
pageCapacity: CARDINAL = LAST[VMDefs.PageIndex] - pageOverhead;
-- the capacity could be greater, but wPos.word could exceed 256 --

HeapWriteData: PUBLIC ENTRY PROCEDURE[ handle: Handle,
                                       from: HeapDefs.Buffer ] =
   BEGIN
   used: CARDINAL ← 0;

   WHILE used < from.length
   DO -- Write, or continue writing, sub-object body --
      BEGIN
         spare: CARDINAL= LAST[VMDefs.PageIndex] - handle.wPos.word;
         -- 'spare' is never < 0 --
         amount: CARDINAL = IF from.length-used > spare
                            THEN spare
                            ELSE from.length-used;
         Inline.COPY[from.where+used, amount,
                         handle.page+handle.wPos.word];
         handle.wPos.word ← handle.wPos.word + amount;
         used ← used + amount;
         handle.offset ← handle.offset + amount;
      END;

      IF handle.reWriting AND handle.wPos.page = handle.max.page
      AND handle.wPos.word >= handle.max.word
      THEN handle.reWriting ← FALSE;

      -- Move to new page, if necessary --
      CheckPage[handle, 1];

   ENDLOOP;
   END --HeapWriteData--;

HeapWriteString: PUBLIC PROCEDURE[handle: Handle, s: STRING] =
   { HeapWriteData[handle, [s,SIZE[StringBody[s.length]]] ] };

WriteItemHeader: PUBLIC PROCEDURE[handle: Handle,
                                  header: BodyDefs.ItemHeader] =
   { HeapWriteData[handle, [@header,SIZE[BodyDefs.ItemHeader]] ] };

ReceiveComponent: PUBLIC PROCEDURE[handle: Handle,
                                   str: Stream.Handle ] =
   BEGIN
   length: CARDINAL ← ProtocolDefs.ReceiveCount[str];
   bLength: CARDINAL = 64;
   buffer: ARRAY [0..bLength) OF WORD;
   HeapWriteData[handle, [@length,SIZE[CARDINAL]] ];
   WHILE length > 0
   DO why:    Stream.CompletionCode;
      wanted: CARDINAL = MIN[length, bLength];
      [,why,] ← --note: the component is an integral number of words--
         Stream.GetBlock[str, [@buffer,0,wanted*2] !
                PupStream.StreamClosing =>
                   ERROR ProtocolDefs.Failed[communicationError];
                Stream.TimeOut =>
                   ERROR ProtocolDefs.Failed[noData]  ];
      IF why # normal THEN ERROR ProtocolDefs.Failed[protocolError];
      HeapWriteData[handle, [@buffer, wanted] ];
      length ← length - wanted;
   ENDLOOP;
   END;

ReceiveObj: PUBLIC PROCEDURE[handle: Handle,
                             str: Stream.Handle ] =
   BEGIN
   buffer: VMDefs.Page = VMDefs.AllocatePage[];
   DO BEGIN
      ENABLE UNWIND => VMDefs.Release[buffer];
      used:   CARDINAL;
      why:    Stream.CompletionCode;
      sst:    Stream.SubSequenceType;
      [used,why,sst] ←
         Stream.GetBlock[str, [buffer,0,VMDefs.pageSize*2] !
            PupStream.StreamClosing =>
               ERROR ProtocolDefs.Failed[communicationError];
            Stream.TimeOut =>
               ERROR ProtocolDefs.Failed[noData]  ];
      HeapWriteData[handle, [buffer,used/2] ];
      IF why = sstChange THEN EXIT;
      END;
   ENDLOOP;
   VMDefs.Release[buffer];
   END;

GetWriterOffset: PUBLIC ENTRY PROCEDURE[ handle: Handle ]
                                RETURNS[ HeapDefs.ObjectOffset ] =
   BEGIN
   RETURN[handle.offset];
   END;

OffsetTooBig: ERROR = CODE;
MissedOffset: ERROR = CODE;

SetWriterOffset: PUBLIC ENTRY PROCEDURE[ handle: Handle,
                                         offset: HeapDefs.ObjectOffset ] =
   BEGIN
   TerminateObject[handle];
   IF NOT handle.reWriting THEN handle.max ← handle.wPos;
   handle.reWriting ← TRUE;
   VMDefs.MarkStartWait[handle.page];
   VMDefs.Release[handle.page];

   handle.wPos ← [page:handle.start, word:FIRST[VMDefs.PageIndex] ];
   handle.offset ← HeapDefs.objectStart;

   -- skip to approximate page --
   WHILE offset >= handle.offset + pageCapacity
   DO handle.offset ← handle.offset + pageCapacity;
      IF handle.wPos.page = handle.max.page THEN ERROR OffsetTooBig[];
      handle.wPos ← HeapFileDefs.NextPage[handle.wPos];
   ENDLOOP;

   SetHeader[handle]; -- picks up real handle.offset --
   IF handle.offset > offset THEN ERROR MissedOffset;

   -- skip to correct page --
   WHILE offset >= handle.offset + handle.objectHead.size
   AND pageOverhead + handle.objectHead.size + SIZE[HeapXDefs.ObjectHeader] 
          > LAST[VMDefs.PageIndex]
   -- i.e. while not in this page and not end of object --
   DO VMDefs.Release[handle.page];
      IF handle.wPos.page = handle.max.page THEN ERROR OffsetTooBig[];
      handle.wPos ← HeapFileDefs.NextPage[handle.wPos];
      SetHeader[handle];
   ENDLOOP;
   
   handle.wPos.word ← handle.wPos.word
                         + Inline.LowHalf[offset - handle.offset];
   handle.offset ← offset;
   IF handle.wPos.page = handle.max.page
   AND handle.wPos.word > handle.max.word
   THEN ERROR OffsetTooBig[]; 
   END;

END.