-- Transport Mechanism Mail Server - steering list queues --

-- [Indigo]<Grapevine>MS>SLQueue.mesa --

-- Andrew Birrell  September 13, 1982 3:41 pm --

DIRECTORY
Ascii		USING[ CR],
EnquiryDefs	USING[ ],
GlassDefs	USING[ Handle ],
HeapDefs	USING[ HeapEndRead, HeapEndWrite, HeapReadData,
		       HeapStartRead, ReaderHandle, WriterHandle ],
LogDefs		USING[ WriteChar ],
ObjectDirDefs	USING[ FreeObject, ObjectNumber, UseObject ],
ProtocolDefs	USING[ AppendTimestamp ],
SLDefs		USING[ Item, ItemAddress, ItemIndex,
		       SLHeader, SLQueue, SLReadHandle ],
VMDefs		USING[ MarkStartWait, Page, PageAddress, PageNumber, 
		       ReadPage, Release, UsePage ];

SLQueueImpl: MONITOR
   IMPORTS HeapDefs, LogDefs, ObjectDirDefs, ProtocolDefs, VMDefs
   EXPORTS EnquiryDefs, SLDefs =
BEGIN

data: ARRAY SLDefs.SLQueue OF RECORD[
         rPos: SLDefs.ItemAddress,
         wPos: SLDefs.ItemAddress,
         size: VMDefs.PageNumber,
         itemWritten: CONDITION ];

count: ARRAY SLDefs.SLQueue OF CARDINAL ← ALL[0];
received: CARDINAL ← 0;
reDone:   CARDINAL ← 0;

SLQPage: TYPE = POINTER TO ARRAY SLDefs.ItemIndex OF SLDefs.Item;
dataPos: VMDefs.PageAddress;
page:    SLQPage ← NIL;

NextItem: PROCEDURE[ queue: SLDefs.SLQueue,
                     pos: POINTER TO SLDefs.ItemAddress ] =
   BEGIN
   IF pos.item = LAST[SLDefs.ItemIndex]
   THEN BEGIN
        pos.item ← FIRST[SLDefs.ItemIndex];
        IF pos.page.page+1 = data[queue].size
        THEN pos.page.page ← FIRST[VMDefs.PageNumber]
        ELSE pos.page.page ← pos.page.page+1;
        END
   ELSE pos.item ← pos.item + 1;
   END;


GetItem: PROCEDURE[ pos: SLDefs.ItemAddress ]
            RETURNS[ POINTER TO SLDefs.Item ] =
   BEGIN
   IF page = NIL OR pos.page # dataPos
   THEN BEGIN
        IF page # NIL THEN VMDefs.Release[LOOPHOLE[page,VMDefs.Page]];
        dataPos ← pos.page;
        page ← LOOPHOLE[VMDefs.ReadPage[ pos.page, 0--lookAhead-- ],
                        SLQPage];
        END;
   RETURN[ @(page[pos.item]) ]
   END;

DataState: TYPE = {clean,dirty};

ReleaseData: PROCEDURE[ state: DataState ] =
   BEGIN
   IF state = dirty THEN VMDefs.MarkStartWait[LOOPHOLE[page,VMDefs.Page]];
   END;

SLWrite: PUBLIC ENTRY PROCEDURE[
                        body:  ObjectDirDefs.ObjectNumber,
                        SL:    HeapDefs.WriterHandle,
                        queue: SLDefs.SLQueue ] =
   BEGIN
   Action: INTERNAL PROCEDURE[ obj: ObjectDirDefs.ObjectNumber ] =
      BEGIN
      SubWrite[body, obj, queue];
      END;
   HeapDefs.HeapEndWrite[SL, Action ];
   IF queue = input OR queue = express THEN received ← received + 1;
   END;

SubWrite: INTERNAL PROCEDURE[
                        body:  ObjectDirDefs.ObjectNumber,
                        SL:    ObjectDirDefs.ObjectNumber,
                        queue: SLDefs.SLQueue ] =
   BEGIN
   wPos: POINTER TO SLDefs.ItemAddress = @(data[queue].wPos);
   started: SLDefs.ItemAddress = wPos↑;
   ptr: POINTER TO SLDefs.Item;
   DO ptr ← GetItem[wPos↑]; NextItem[queue, wPos];
      IF ptr.state = free THEN EXIT;
      -- skip items still being read --
      ReleaseData[clean];
      IF wPos↑ = started
      THEN --extend--
           BEGIN
           newPage: SLQPage;
           wPos↑.item ← FIRST[SLDefs.ItemIndex];
           wPos↑.page.page ← data[queue].size;
           newPage ← LOOPHOLE[VMDefs.UsePage[wPos↑.page],SLQPage];
           FOR index: SLDefs.ItemIndex IN SLDefs.ItemIndex
           DO newPage[index].state ← free ENDLOOP;
           VMDefs.MarkStartWait[LOOPHOLE[newPage,VMDefs.Page]];
           VMDefs.Release[LOOPHOLE[newPage,VMDefs.Page]];
           data[queue].size ← data[queue].size + 1;
           END;
   ENDLOOP;
   ptr.body ← body;
   ptr.SL ← SL;
   ptr.state ← full;
   ObjectDirDefs.UseObject[body];
   ObjectDirDefs.UseObject[SL];
   ReleaseData[dirty];
   NOTIFY data[queue].itemWritten;
   count[queue] ← count[queue] + 1;
   END;

  
WaitForNonEmpty: PUBLIC ENTRY PROC[ queue: SLDefs.SLQueue ] =
   { InnerWaitNonEmpty[queue] };

InnerWaitNonEmpty: INTERNAL PROC[ queue: SLDefs.SLQueue ] =
   BEGIN
   WHILE count[queue] = 0 DO WAIT data[queue].itemWritten ENDLOOP;
   END;

GetCount: PUBLIC ENTRY PROC[ queue: SLDefs.SLQueue ]
                    RETURNS[CARDINAL] =
   { RETURN[count[queue]] };

SLStartRead: PUBLIC ENTRY PROCEDURE[ queue: SLDefs.SLQueue ]
               RETURNS[ handle: SLDefs.SLReadHandle,
                        body: ObjectDirDefs.ObjectNumber,
                        SL: HeapDefs.ReaderHandle ] =
   BEGIN
   rPos: POINTER TO SLDefs.ItemAddress = @(data[queue].rPos);
   ptr: POINTER TO SLDefs.Item;
   InnerWaitNonEmpty[queue];
   DO ptr ← GetItem[rPos↑];
      IF ptr.state = full
      THEN EXIT
      ELSE { ReleaseData[clean]; NextItem[queue, rPos] };
      --skip items which are free or are still being read --
   ENDLOOP;
   handle ← rPos↑;  NextItem[queue, rPos];
   body ← ptr.body;
   SL ← HeapDefs.HeapStartRead[ptr.SL];
   ptr.state ← reading; ReleaseData[dirty];
   count[queue] ← count[queue] - 1;
   LogDefs.WriteChar[SELECT queue FROM
       express => 'e,
       input => 'i,
       pending => 'p,
       forward => 'f,
       foreign => 'x,
       mailbox => 'm,
     ENDCASE => ERROR];
   END;

SubTransfer: ENTRY PROCEDURE[ handle: SLDefs.SLReadHandle,
                              queue: SLDefs.SLQueue ] =
   BEGIN
   ptr: POINTER TO SLDefs.Item = GetItem[handle];
   body: ObjectDirDefs.ObjectNumber = ptr.body;
   SL:   ObjectDirDefs.ObjectNumber = ptr.SL;
   ReleaseData[clean];
   SubWrite[body, SL, queue];
   IF queue = input THEN reDone ← reDone + 1;
   END;

SLTransfer: PUBLIC PROCEDURE[ handle: SLDefs.SLReadHandle,
                              queue: SLDefs.SLQueue ] =
   BEGIN
   SubTransfer[handle, queue];
   SLEndRead[handle];
   END;

SLEndRead: PUBLIC ENTRY PROCEDURE[ handle: SLDefs.SLReadHandle ] =
   BEGIN
   ptr: POINTER TO SLDefs.Item = GetItem[handle];
   body: ObjectDirDefs.ObjectNumber = ptr.body;
   SL:   ObjectDirDefs.ObjectNumber = ptr.SL;
   -- beware of the order of events! --
   ptr.state ← free;
   ReleaseData[dirty];
   ObjectDirDefs.FreeObject[body];
   ObjectDirDefs.FreeObject[SL];
   END;

SLQueueCount: PUBLIC PROC[str: GlassDefs.Handle] =
   BEGIN
   OPEN str;
   WriteChar[Ascii.CR];
   FOR queue: SLDefs.SLQueue IN SLDefs.SLQueue
   DO WriteString[SELECT queue FROM
         express => "Express"L,
         input => "Input"L,
         pending => "Pending"L,
         forward => "Forward"L,
         foreign => "Foreign"L,
         mailbox => "Mailbox"L,
        ENDCASE => "Unknown"L];
      WriteString[" queue:  length="L];
      DisplayQueue[str, queue];
   ENDLOOP;
   END;

DisplayQueue: PROC[str: GlassDefs.Handle, queue: SLDefs.SLQueue] =
   BEGIN
   OPEN str;
   GetRPos: ENTRY PROC[queue: SLDefs.SLQueue] RETURNS[SLDefs.ItemAddress] =
      { RETURN[data[queue].rPos] };
   GetThisItem: ENTRY PROC[queue: SLDefs.SLQueue,
                           pos: POINTER TO SLDefs.ItemAddress]
                   RETURNS[item:SLDefs.Item] =
        BEGIN
        item ← GetItem[pos↑]↑; ReleaseData[clean]; NextItem[queue, pos];
        IF item.state # free
        THEN BEGIN
             ObjectDirDefs.UseObject[item.SL];
             ObjectDirDefs.UseObject[item.body];
             END;
        END;
   firstPos: SLDefs.ItemAddress = GetRPos[queue];
   pos: SLDefs.ItemAddress ← firstPos;
   WriteDecimal[count[queue]];
   WriteChar[Ascii.CR];
   DO thisItem: SLDefs.Item = GetThisItem[queue, @pos];
      IF thisItem.state # free
      THEN BEGIN
           slReader: HeapDefs.ReaderHandle =
                             HeapDefs.HeapStartRead[thisItem.SL];
           bodyReader: HeapDefs.ReaderHandle =
                           HeapDefs.HeapStartRead[thisItem.body];
           ObjectDirDefs.FreeObject[thisItem.SL];
           ObjectDirDefs.FreeObject[thisItem.body];
           BEGIN
              ENABLE UNWIND =>
                 BEGIN
                 HeapDefs.HeapEndRead[slReader];
                 HeapDefs.HeapEndRead[bodyReader];
                 END;
              s: STRING = [18] --377#377@6553665536--;
              slHeader: SLDefs.SLHeader;
              [,] ← HeapDefs.HeapReadData[slReader,
                                       [@slHeader,SIZE[SLDefs.SLHeader]]];
              WriteString["Created: "L];
              ProtocolDefs.AppendTimestamp[s, slHeader.created];
              WriteString[s];
              WriteString[", received: "L];
              s.length←0;
              ProtocolDefs.AppendTimestamp[s, slHeader.received];
              WriteString[s];
              WriteString[", server: "L];
              WriteDecimal[LOOPHOLE[slHeader.server]];
              WriteChar[Ascii.CR];
           END;
           HeapDefs.HeapEndRead[slReader];
           HeapDefs.HeapEndRead[bodyReader];
           END;
      IF pos = firstPos OR DelTyped[] THEN EXIT;
   ENDLOOP;
   END;

END.