-- Transport Mechanism Filestore - heap compactor --

-- [Juniper]<Grapevine>MS>Compactor.mesa

-- Randy Gobbel		19-May-81 13:24:41 --
-- Andrew Birrell	 3-Jun-81 13:00:58 --

DIRECTORY
HeapDefs	USING[ Buffer, HeapReadData, objectStart, ReaderHandle ],
HeapFileDefs	USING[ FirstSegment, InsertFirstSegment, NextPage,
		       NoMorePages, FreeSegment ],
HeapXDefs	USING[ PageHeader, ObjectHeader, ReaderData,
		       StopAllReaders, RestartAllReaders],
ObjectDirXDefs	USING[ DOPC, ObjectNumber, gapObjectNumber, GetObjectState,
		       MoveObject, ReportDofpc ],
PolicyDefs	USING[ CompactorPause, CompactorStart ],
Process		USING[ DisableTimeout, InitializeCondition,
		       InitializeMonitor ],
Storage		USING[ Node ],
VMDefs		USING[ Deactivate, Page, PageIndex, ReadPage, 
		       MarkStartWait, UsePage, PageNumber, PageAddress,
		       FullAddress ];

Compactor: PROGRAM
   IMPORTS HeapDefs, HeapFileDefs, HeapXDefs, ObjectDirXDefs,
           PolicyDefs, Process, Storage, VMDefs
   EXPORTS HeapDefs --Compactor-- =
BEGIN

OPEN HeapXDefs, ObjectDirXDefs;

Allocate: PROCEDURE[CARDINAL]RETURNS[POINTER] = Storage.Node;

-- Current state --

wPos:            VMDefs.FullAddress;
wPage:           VMDefs.Page;
flushedBefore:   VMDefs.FullAddress;
reader:          HeapXDefs.ReaderData;
readerHandle:    HeapDefs.ReaderHandle = LOOPHOLE[@reader];
object:          POINTER TO ObjectHeader;
dorpc:           ObjectDirXDefs.DOPC ← 0;--see comments in ObjectDirXDefs

-- record of objects moved by compactor --
ObjRec: TYPE = RECORD[ obj: ObjectNumber,
                       where: VMDefs.FullAddress,
                       next: ObjPtr ];
ObjPtr: TYPE = POINTER TO ObjRec;
endObj: ObjPtr = NIL;
freeChain: ObjPtr ← endObj;
moveChain: ObjPtr ← endObj;
freeCount: CARDINAL ← 0;

GetObjRec: PROCEDURE[ chain: POINTER TO ObjPtr ]
          RETURNS[ ptr: ObjPtr ] = INLINE
   BEGIN
   IF freeChain = endObj
   THEN ptr ← Allocate[SIZE[ObjRec]]
   ELSE { ptr←freeChain; freeChain←freeChain.next; freeCount←freeCount-1 };
   ptr.next ← chain↑; chain↑ ← ptr;
   END;

FreeObjRec: PROC[ptr: ObjPtr] = INLINE
   BEGIN
   ptr.next ← freeChain; freeChain ← ptr;
   freeCount ← freeCount+1;
   END;

Start: PROCEDURE =
   BEGIN -- called before each compaction --
   reader.where ← HeapFileDefs.FirstSegment[];
   reader.page ← VMDefs.ReadPage[reader.where.page, 0];
   dorpc ← dorpc+1;
   reader.offset ← HeapDefs.objectStart;
   reader.object ← ObjectDirXDefs.gapObjectNumber;
   wPos ← HeapFileDefs.InsertFirstSegment[];
   wPage ← NIL -- set later by SetHeader --;
   flushedBefore ← wPos; Flush[];
   END;


FindObject: PROCEDURE =
   BEGIN -- skips until non-ignorable object --
   -- If no more objects exist, HeapFileDefs.NextPage generates a signal. --
   -- This should be caught by the caller of FindObject --
   OPEN VMDefs;
   reader.object ← gapObjectNumber --indicates "no current object"--;
   reader.end ← FALSE;
   DO -- Consider any page header --
      IF reader.where.word = FIRST[PageIndex]
      THEN BEGIN
           pageHead: POINTER TO PageHeader = LOOPHOLE[reader.page,POINTER]
                                                + reader.where.word;
           reader.where.word ← reader.where.word + SIZE[ PageHeader ];
           reader.offset ← pageHead.offset;
           END
      ELSE reader.offset ← HeapDefs.objectStart;

      BEGIN -- read sub-object header --
      object: POINTER TO ObjectHeader =
                         LOOPHOLE[reader.page,POINTER] + reader.where.word;
      IF ( reader.object # gapObjectNumber
           -- Inside an object, looking for continuation sub-object --
           -- If a duplicate start is found, it may be non-ignorable --
           AND object.number = reader.object AND reader.offset = HeapDefs.objectStart )
      OR ( object.number # gapObjectNumber AND reader.offset = HeapDefs.objectStart )
      THEN BEGIN -- start of a new object --
           SELECT GetObjectState[object.number, reader.where, dorpc] FROM
           inUse => BEGIN
                    ptr: ObjPtr = GetObjRec[@moveChain];
                    ptr.where ← wPos;
                    ptr.obj ← reader.object ← object.number;
                    EXIT
                    END;
           unused => -- ignorable object, marked as deleted by ObjectDir --
                     NULL;
           duplicate => NULL; -- ignorable object --
           ENDCASE => ERROR;
           reader.object ← object.number --now the current object--;
           END
   -- ELSE we have one of:
   --         continuation of ignorable object,
   --         imbedded object which should be ignored,
   --         unexpected partial object,
   --         gap object --;
      reader.where.word ← reader.where.word + SIZE[ObjectHeader];
      reader.where.word ← reader.where.word + object.size;
      END;

      -- check for end of page --
      IF reader.where.word + SIZE[ObjectHeader] > LAST[PageIndex]
      THEN BEGIN
           Deactivate[reader.page]; -- not "Release", for better cache -
           PolicyDefs.CompactorPause[];
           reader.where ← HeapFileDefs.NextPage[reader.where];
           -- That may have generated a signal, terminating FindObject --
           -- Note that there is no current page here.  --
           reader.page ← ReadPage[reader.where.page, 0]; dorpc ← dorpc+1;
           END
      ELSE -- end of any current object --
           reader.object ← gapObjectNumber;
   ENDLOOP;
   --the interlock with the readers occurs in TerminatePage --
   END;


Flush: PROCEDURE =
   BEGIN --write empty pages upto, but excluding, 'limit' --
   WHILE flushedBefore.page.page # reader.where.page.page
   DO EmptyPage[flushedBefore.page];
      flushedBefore ← HeapFileDefs.NextPage[flushedBefore]
   ENDLOOP;
   ObjectDirXDefs.ReportDofpc[dorpc-1];
   END;

EmptyPage: PROCEDURE[ where: VMDefs.PageAddress ] =
   BEGIN
   OPEN VMDefs;
   page: Page = UsePage[where];
   header: POINTER TO PageHeader = LOOPHOLE[page];
   obj: POINTER TO ObjectHeader = LOOPHOLE[page, POINTER] +
                                                          SIZE[PageHeader];
   header.offset ← HeapDefs.objectStart;
   obj.number ← ObjectDirXDefs.gapObjectNumber;
   obj.size ← 1 + LAST[PageIndex] - (SIZE[PageHeader]+SIZE[ObjectHeader]);
   MarkStartWait[page];
   Deactivate[page];
   END;


SetHeader: PROCEDURE[ number: ObjectDirXDefs.ObjectNumber ] =
   BEGIN
   -- Write page header, if needed --
   IF wPos.word = FIRST[VMDefs.PageIndex]
   THEN BEGIN
        wPage ← VMDefs.UsePage[wPos.page];
        BEGIN
           header: POINTER TO PageHeader = LOOPHOLE[wPage,POINTER]
                                              + wPos.word;
           header.offset ← reader.offset;
           wPos.word ← wPos.word + SIZE[ PageHeader ];
        END;
        END;
   -- Write object or sub-object header --
   object ← LOOPHOLE[wPage,POINTER] + wPos.word;
   wPos.word ← wPos.word + SIZE[ObjectHeader];
   object.number ← number;
   object.size ← 0;
   END;


TerminatePage: PROCEDURE[ current: ObjectDirXDefs.ObjectNumber ] =
   BEGIN --write wPage to disk and update object directory and free files--
   VMDefs.MarkStartWait[wPage];
   VMDefs.Deactivate[wPage];
   -- altering object directory is now ok, even although old copies of --
   -- the objects in wPage still exist on other pages --
   WHILE moveChain # endObj
   DO BEGIN
      ptr: ObjPtr = moveChain; moveChain ← moveChain.next;
      ObjectDirXDefs.MoveObject[ptr.obj, ptr.where];
      --ensure the readers are not confused --
      HeapXDefs.RestartAllReaders[ptr.obj];
      FreeObjRec[ptr];
      END;
   ENDLOOP;
   PolicyDefs.CompactorPause[];
   HeapXDefs.StopAllReaders[stoppedObj ← current];
   Flush[];
   wPos ← HeapFileDefs.NextPage[wPos];
   HeapFileDefs.FreeSegment[wPos, reader.where];
   -- leaves readers for "current" stopped --
   END;

stoppedObj: ObjectDirXDefs.ObjectNumber ← ObjectDirXDefs.gapObjectNumber;

RestartStoppedObj: PROCEDURE =
   BEGIN
   IF stoppedObj # ObjectDirXDefs.gapObjectNumber
   THEN HeapXDefs.RestartAllReaders[stoppedObj];
   stoppedObj ← ObjectDirXDefs.gapObjectNumber;
   END;

BufferNotFilled: ERROR = CODE;
ObjectNotDestroyed: ERROR = CODE;

RunCompactor: PROCEDURE =
   BEGIN
   -- Never returns --
   DO PolicyDefs.CompactorStart[];
      Start[];

      DO FindObject[ ! HeapFileDefs.NoMorePages => EXIT];

         BEGIN
         objectEnded: BOOLEAN ← FALSE;
         objectStartPage: VMDefs.PageNumber = reader.where.page.page;

         SetHeader[reader.object];

         UNTIL objectEnded
         DO -- copy data into wPage --
            BEGIN
               buffer: HeapDefs.Buffer;
               buffer ← [ wPage+wPos.word,
                          1+LAST[VMDefs.PageIndex]-wPos.word];
               [objectEnded, object.size] ←
                   HeapDefs.HeapReadData[readerHandle, buffer];
               -- always 'objectEnded' and/or object.size=buffer.length --
               wPos.word ← wPos.word + object.size;
            END;
            -- move to new page, if needed, even if objectEnded --
            IF wPos.word + SIZE[ObjectHeader] >
                 LAST[VMDefs.PageIndex]
            THEN BEGIN
                 RestartStoppedObj[];
                 TerminatePage[reader.object];
                 SetHeader[reader.object];
                 END
            ELSE IF NOT objectEnded THEN ERROR BufferNotFilled[];
         ENDLOOP --for each object--;

         RestartStoppedObj[];

         IF reader.where.page.page # objectStartPage
         THEN -- HeapReadData moved us beyond last deleted object page --
              dorpc ← dorpc+1;

         END;
      ENDLOOP --end of last object--;

      --pad last page with gap, and write to disk--
      SetHeader[ObjectDirXDefs.gapObjectNumber];
      object.size ← 1+LAST[VMDefs.PageIndex]-wPos.word;
      TerminatePage[ObjectDirXDefs.gapObjectNumber];
      RestartStoppedObj[];
      EmptyPage[reader.where.page];
      ObjectDirXDefs.ReportDofpc[dorpc]; -- everything has been flushed --
   ENDLOOP -- eternal loop of compactions --;
   END --Compact--;


-- Main Program --

Process.InitializeMonitor[ @reader.LOCK ];
Process.InitializeCondition[ @reader.canStart, 0 ];
Process.DisableTimeout[ @reader.canStart ];
reader.stopped ← FALSE;

RunCompactor[];


END.