-- Transport Mechanism Filestore - writer for heap objects --

-- [Juniper]<Grapevine>MS>Writer.mesa

-- Randy Gobbel		19-May-81 12:53:09 --
-- Andrew Birrell	12-Jun-81 11:32:29 --
-- Mark Johnson 	11-Nov-81 11:09:31 --
-- Brenda Hankins	22-Oct-84 14:35:11

DIRECTORY
  BodyDefs USING [ItemHeader],
  HeapDefs USING [Buffer, ObjectOffset, objectStart],
  HeapFileDefs USING [
    ClaimSinglePage, CommitedSinglePage, CommitObject, NextPage, NewWriterPage,
    NextWriterPage, ObjectAbandoned, UseNormalPath],
  HeapXDefs USING [ObjectHeader, PageHeader, WriterData],
  Inline USING [COPY, LongDivMod],
  LogDefs USING [WriteChar],
  ObjectDirDefs USING [FreeObject, ObjectType],
  ObjectDirXDefs USING [
    gapObjectNumber, MoveObject, NewObject, ObjectNumber, ReleaseObject],
  Process USING [InitializeMonitor],
  ProtocolDefs USING [Failed, ReceiveCount],
  PupStream USING [StreamClosing],
  Storage USING [Node],
  Stream USING [CompletionCode, GetBlock, Handle, SubSequenceType, TimeOut],
  VMDefs USING [
    AllocatePage, FullAddress, PageAddress, PageIndex, pageSize, Page, ReadPage,
    RemapPage, MarkStartWait, UsePage, Release];

Writer: MONITOR
  IMPORTS
    HeapFileDefs, Inline, LogDefs, ObjectDirDefs, ObjectDirXDefs, Process,
    ProtocolDefs, PupStream, Stream, Storage, VMDefs
  EXPORTS HeapDefs =

  BEGIN

  WriterData: PUBLIC TYPE = HeapXDefs.WriterData;
  Handle: TYPE = POINTER TO WriterData;

  pageOverhead: CARDINAL =
    SIZE[HeapXDefs.PageHeader] + SIZE[HeapXDefs.ObjectHeader];
  pageCapacity: CARDINAL = LAST[VMDefs.PageIndex] - pageOverhead;
  -- The capacity could be greater, but wPos.word could exceed 256.

  Allocate: PROCEDURE [CARDINAL] RETURNS [POINTER] = Storage.Node;

  -- subroutines --

  BadOffsetFound: ERROR = CODE;

  InitPage: PROCEDURE [handle: Handle] =  -- used to be SetHeader
    BEGIN
    -- Write page header, if needed --
    handle.page ←
      IF handle.reWriting THEN VMDefs.ReadPage[handle.wPos.page, 0]
      ELSE VMDefs.UsePage[handle.wPos.page];
    BEGIN
    header: POINTER TO HeapXDefs.PageHeader = LOOPHOLE[handle.page, POINTER];
    IF NOT handle.reWriting THEN header.offset ← handle.offset
    ELSE IF handle.offset # header.offset THEN ERROR BadOffsetFound[];
    END;
    -- Write object or sub-object header --
    handle.objectHead ←
      LOOPHOLE[handle.page, POINTER] + SIZE[HeapXDefs.PageHeader];
    handle.wPos.word ← pageOverhead;
    IF NOT handle.reWriting THEN {
      handle.objectHead.number ← handle.object; handle.objectHead.size ← 0};
    END;

  CheckPage: PROCEDURE [handle: Handle, min: CARDINAL] =
    BEGIN  -- If necessary, write to disk and start new page --
    IF handle.wPos.word + min > LAST[VMDefs.PageIndex] THEN
      BEGIN
      IF NOT handle.reWriting THEN
        handle.objectHead.size ← handle.wPos.word - pageOverhead;
      VMDefs.MarkStartWait[handle.page];
      VMDefs.Release[handle.page];
      IF handle.reWriting THEN handle.wPos ← HeapFileDefs.NextPage[handle.wPos]
      ELSE handle.wPos ← HeapFileDefs.NextWriterPage[handle.wPos];
      InitPage[handle];
      END;
    END;

  -- writer allocation --

  noWriter: Handle = NIL;
  writerChain, freeChain: Handle ← noWriter;
  writerCount: LONG CARDINAL ← 0;

  HeapStartWrite: PUBLIC ENTRY PROCEDURE [type: ObjectDirDefs.ObjectType]
    RETURNS [res: Handle] =
    BEGIN  -- on exit, 'objectHead' is valid --
    LogDefs.WriteChar['<];
    IF freeChain = noWriter THEN
      BEGIN
      res ← Allocate[SIZE[HeapXDefs.WriterData]];
      Process.InitializeMonitor[@(res.LOCK)];
      END
    ELSE {res ← freeChain; freeChain ← freeChain.next};
    res.next ← writerChain;
    writerChain ← res;
    res.wPos ← HeapFileDefs.NewWriterPage[];
    res.start ← res.wPos.page;
    res.reWriting ← FALSE;
    res.object ← ObjectDirXDefs.NewObject[res.wPos, type];
    res.offset ← res.maxOffset ← HeapDefs.objectStart;
    InitPage[res];
    writerCount ← writerCount + 1;
    END;

  HeapEndWrite: PUBLIC PROCEDURE [
    handle: Handle, action: PROCEDURE [ObjectDirXDefs.ObjectNumber]] =
    BEGIN
    object: ObjectDirXDefs.ObjectNumber = handle.object;
    DecrementWriters: ENTRY PROC = INLINE {writerCount ← writerCount - 1};
    SubEndWrite[handle];  --must not use "handle" after here --
    -- now, object is safe on disk --
    action[
      object !  -- this call must be outside the monitor --
      UNWIND => {ObjectDirDefs.FreeObject[object]; DecrementWriters[]}];
    ObjectDirDefs.FreeObject[object];
    DecrementWriters[];
    END;

  SubEndWrite: PROCEDURE [handle: Handle] =
    BEGIN
    committed: BOOLEAN ← FALSE;
    tempObj: BOOLEAN = handle.object.type = temp;
    MonitoredOperations: ENTRY PROCEDURE =
      BEGIN  -- don't see why these (except last line) need be monitored, but JUST to be safe...
      IF NOT tempObj AND handle.wPos.page = handle.start THEN
        BEGIN
        -- short writer optimization --
        single: VMDefs.PageAddress = HeapFileDefs.ClaimSinglePage[
          ! HeapFileDefs.UseNormalPath => GOTO usePageWeHave];
        VMDefs.RemapPage[handle.page, single];
        ObjectDirXDefs.MoveObject[handle.object, [page: single, word: 0]];
        --Commit-- VMDefs.MarkStartWait[handle.page];
        VMDefs.Release[handle.page];
        HeapFileDefs.CommitedSinglePage[];
        HeapFileDefs.ObjectAbandoned[handle.start];
        committed ← TRUE;
        EXITS usePageWeHave => NULL;
        END;
      IF ~committed THEN
        BEGIN  -- not a single page or single page failed.
        VMDefs.MarkStartWait[handle.page];
        VMDefs.Release[handle.page];
        --Commit--
        IF NOT tempObj THEN
          HeapFileDefs.CommitObject[handle.start, handle.wPos.page]
        -- ELSE leave it unchained and free explicitly when ref count=0-- ;
        END;
      RemoveFromChain[handle];
      END;  -- proc. MonitoredOperations
    -- needn't call CheckPage, as last call was WriteData or StartWrite (?)--
    -- Pad last page with "gap" object (was done once if rewriting):
    -- Note that no non-gap object may end on a page boundary.
    IF handle.reWriting THEN SetWriterOffset[handle, handle.maxOffset];
    -- note that previous SetWriterOffset wrote a gap object there but guess
    -- that various fields of writer are not set up to take that into account.
    -- could set it up properly ourselves.
    handle.objectHead.size ← handle.wPos.word - pageOverhead;
    CheckPage[handle, SIZE[HeapXDefs.ObjectHeader]];
    BEGIN
    header: POINTER TO HeapXDefs.ObjectHeader =
      LOOPHOLE[handle.page, POINTER] + handle.wPos.word;
    header.number ← ObjectDirXDefs.gapObjectNumber;
    header.size ←
      LAST[VMDefs.PageIndex] - (handle.wPos.word + SIZE[HeapXDefs.ObjectHeader]);
    END;
    MonitoredOperations[];
    END;  -- proc. SubEndWrite

  HeapAbandonWrite: PUBLIC ENTRY PROCEDURE [handle: Handle] =
    BEGIN
    VMDefs.Release[handle.page];
    IF handle.object.type # temp THEN HeapFileDefs.ObjectAbandoned[handle.start]
    -- ELSE ObjectDir will free it when the ref count goes to zero -- ;
    RemoveFromChain[handle];
    ObjectDirDefs.FreeObject[handle.object];
    IF handle.object.type # temp THEN ObjectDirXDefs.ReleaseObject[handle.object];
    -- don't see what this latter does.
    writerCount ← writerCount - 1;
    END;

  RemoveFromChain: INTERNAL PROCEDURE [handle: Handle] =
    BEGIN  -- remove from 'writerChain' --
    prev: POINTER TO Handle ← @writerChain;
    WHILE prev↑ # handle DO prev ← @(prev↑.next) ENDLOOP;
    prev↑ ← handle.next;
    LogDefs.WriteChar['>];
    handle.next ← freeChain;
    freeChain ← handle;
    END;


  -- writer --

  HeapWriteData: PUBLIC ENTRY PROCEDURE [handle: Handle, from: HeapDefs.Buffer] =
    BEGIN
    used: CARDINAL ← 0;
    WHILE used < from.length DO  -- Write, or continue writing, sub-object body --
      spare: CARDINAL = LAST[VMDefs.PageIndex] - handle.wPos.word;
      -- 'spare' is never < 0 --
      amount: CARDINAL =
        IF from.length - used > spare THEN spare ELSE from.length - used;
      Inline.COPY[from.where + used, amount, handle.page + handle.wPos.word];
      handle.wPos.word ← handle.wPos.word + amount;
      used ← used + amount;
      handle.offset ← handle.offset + amount;
      CheckPage[handle, 1];  -- Move to new page, if necessary --
      IF handle.offset >= handle.maxOffset THEN handle.reWriting ← FALSE;
      IF NOT handle.reWriting THEN handle.maxOffset ← handle.offset;
      ENDLOOP;
    END --HeapWriteData-- ;

  HeapWriteString: PUBLIC PROCEDURE [handle: Handle, s: STRING] = {
    HeapWriteData[handle, [s, SIZE[StringBody [s.length]]]]};

  WriteItemHeader: PUBLIC PROCEDURE [
    handle: Handle, header: BodyDefs.ItemHeader] = {
    HeapWriteData[handle, [@header, SIZE[BodyDefs.ItemHeader]]]};

  ReceiveComponent: PUBLIC PROCEDURE [handle: Handle, str: Stream.Handle] =
    BEGIN
    length: CARDINAL ← ProtocolDefs.ReceiveCount[str];
    bLength: CARDINAL = 64;
    buffer: ARRAY [0..bLength) OF WORD;
    bufferAddr: LONG POINTER ← @buffer;
    HeapWriteData[handle, [@length, SIZE[CARDINAL]]];
    WHILE length > 0 DO
      why: Stream.CompletionCode;
      wanted: CARDINAL = MIN[length, bLength];
      [, why, ] ←  --note: the component is an integral number of words--
        Stream.GetBlock[
        str, [LOOPHOLE[bufferAddr], 0, wanted * 2] !
        PupStream.StreamClosing => ERROR ProtocolDefs.Failed[communicationError];
        Stream.TimeOut => ERROR ProtocolDefs.Failed[noData]];
      IF why # normal THEN ERROR ProtocolDefs.Failed[protocolError];
      HeapWriteData[handle, [@buffer, wanted]];
      length ← length - wanted;
      ENDLOOP;
    END;

  ReceiveObj: PUBLIC PROCEDURE [handle: Handle, str: Stream.Handle] =
    BEGIN
    buffer: VMDefs.Page = VMDefs.AllocatePage[];
    bufferAddr: LONG POINTER ← buffer;
    DO
      BEGIN
      ENABLE UNWIND => VMDefs.Release[buffer];
      used: CARDINAL;
      why: Stream.CompletionCode;
      sst: Stream.SubSequenceType;
      [used, why, sst] ← Stream.GetBlock[
        str, [bufferAddr, 0, VMDefs.pageSize * 2] !
        PupStream.StreamClosing => ERROR ProtocolDefs.Failed[communicationError];
        Stream.TimeOut => ERROR ProtocolDefs.Failed[noData]];
      HeapWriteData[handle, [buffer, used / 2]];
      IF why = sstChange THEN EXIT;
      END;
      ENDLOOP;
    VMDefs.Release[buffer];
    END;

  GetWriterOffset: PUBLIC ENTRY PROCEDURE [handle: Handle]
    RETURNS [HeapDefs.ObjectOffset] = {RETURN[handle.offset]};

  OffsetTooBig: ERROR = CODE;
  MissedOffset: ERROR = CODE;

  -- the following is the only one I didn't re check real carefully:

  SetWriterOffset: PUBLIC ENTRY PROCEDURE [
    handle: Handle, offset: HeapDefs.ObjectOffset] =
    BEGIN
    offsetPage, offsetWord: CARDINAL;
    IF NOT handle.reWriting THEN handle.maxOffset ← handle.offset;
    IF offset > handle.maxOffset THEN ERROR OffsetTooBig[];
    [offsetPage, offsetWord] ← Inline.LongDivMod[offset, pageCapacity];
    IF handle.offset / pageCapacity # offsetPage THEN
      BEGIN  -- changing pages, will need to write out current one
      IF offset < handle.offset THEN  -- moving backwards
        BEGIN
        IF ~handle.reWriting THEN  -- record current sub obj header info
          handle.objectHead.size ← handle.wPos.word - pageOverhead;
        IF handle.offset / pageCapacity = handle.maxOffset / pageCapacity THEN
          BEGIN  -- on last page of object (must be full or have gap obj)
          lastPos: VMDefs.PageIndex;
          [, lastPos] ← Inline.LongDivMod[handle.maxOffset, pageCapacity];
          lastPos ← lastPos + pageOverhead;
          IF lastPos + SIZE[HeapXDefs.ObjectHeader] <= LAST[VMDefs.PageIndex] THEN
            BEGIN  -- not full, append a gap obj (even if already has one)
            header: POINTER TO HeapXDefs.ObjectHeader =
              LOOPHOLE[handle.page, POINTER] + lastPos;
            header.number ← ObjectDirXDefs.gapObjectNumber;
            header.size ←
              LAST[VMDefs.PageIndex] - (lastPos + SIZE[HeapXDefs.ObjectHeader]);
            END;  -- gap obj check
          END;  -- full?
        handle.reWriting ← TRUE;
        -- position at beginning of obj to search for desired pos:
        handle.wPos ← [page: handle.start, word: FIRST[VMDefs.PageIndex]];
        handle.offset ← HeapDefs.objectStart;
        END  -- moving backwards
      ELSE handle.offset ← (handle.offset / pageCapacity) * pageCapacity;
      -- write current page:
      VMDefs.MarkStartWait[handle.page];
      VMDefs.Release[handle.page];
      -- find page we're looking for:
      WHILE handle.offset / pageCapacity # offsetPage DO
        handle.offset ← handle.offset + pageCapacity;
        handle.wPos ← HeapFileDefs.NextPage[handle.wPos];
        ENDLOOP;
      InitPage[handle];
      END  -- changing pages
    ELSE IF offset < handle.offset THEN handle.reWriting ← TRUE;
    handle.wPos.word ← pageOverhead + offsetWord;
    IF (handle.offset ← offset) = handle.maxOffset THEN handle.reWriting ← FALSE;
    END;  -- proc. SetWriterOffset

  END.

log:
15-Aug-84 14:15:59 - blh:  made same as product Writer.mesa
22-Oct-84 14:35:37 - blh:  fixed last 3 lines of HeapWriteData (swapped).