-- Copyright (C) 1984, 1985  by Xerox Corporation. All rights reserved. 
-- SLQueueImpl.mesa, Transport Mechanism Mail Server - steering list queues --

-- HGM, 15-Sep-85 12:46:08
-- Andrew Birrell  September 13, 1982 3:41 pm --
-- Hankins	23-Oct-84  9:56:21

DIRECTORY
  Ascii USING [CR],
  EnquiryDefs USING [],
  GlassDefs USING [Handle],
  HeapDefs USING [
    HeapEndRead, HeapEndWrite, HeapReadData, HeapStartRead, ReaderHandle,
    WriterHandle],
  LogDefs USING [DisplayNumber, WriteChar, WriteLogEntry],
  ObjectDirDefs USING [FreeObject, ObjectNumber, RestartObject, UseObject],
  Process USING [InitializeCondition, DisableTimeout],
  ProtocolDefs USING [AppendTimestamp],
  SLDefs USING [Item, ItemAddress, ItemIndex, SLHeader, SLQueue, SLReadHandle],
  VMDefs USING [
    FileHandle, GetFileLength, MarkStartWait, OpenFile, Page, PageAddress,
    PageNumber, ReadPage, Release, UsePage];

SLQueueImpl: MONITOR
  IMPORTS HeapDefs, LogDefs, ObjectDirDefs, Process, ProtocolDefs, VMDefs
  EXPORTS EnquiryDefs, SLDefs =
  BEGIN

  data: ARRAY SLDefs.SLQueue OF RECORD [
    rPos: SLDefs.ItemAddress,
    wPos: SLDefs.ItemAddress,
    size: VMDefs.PageNumber,
    itemWritten: CONDITION];

  count: ARRAY SLDefs.SLQueue OF CARDINAL ← ALL[0];
  received, reDone, forwarded: LONG CARDINAL ← 0;

  SLQPage: TYPE = LONG POINTER TO ARRAY SLDefs.ItemIndex OF SLDefs.Item;
  dataPos: VMDefs.PageAddress;
  page: SLQPage ← NIL;

  NextItem: PROCEDURE [
    queue: SLDefs.SLQueue, pos: LONG POINTER TO SLDefs.ItemAddress] =
    BEGIN
    IF pos.item = LAST[SLDefs.ItemIndex] THEN
      BEGIN
      pos.item ← FIRST[SLDefs.ItemIndex];
      IF pos.page.page + 1 = data[queue].size THEN
        pos.page.page ← FIRST[VMDefs.PageNumber]
      ELSE pos.page.page ← pos.page.page + 1;
      END
    ELSE pos.item ← pos.item + 1;
    END;

  GetItem: PROCEDURE [pos: SLDefs.ItemAddress] RETURNS [LONG POINTER TO SLDefs.Item] =
    BEGIN
    IF page = NIL OR pos.page # dataPos THEN
      BEGIN
      IF page # NIL THEN VMDefs.Release[LOOPHOLE[page, VMDefs.Page]];
      dataPos ← pos.page;
      page ← LOOPHOLE[VMDefs.ReadPage[pos.page, 0 --lookAhead-- ], SLQPage];
      END;
    RETURN[@(page[pos.item])]
    END;

  DataState: TYPE = {clean, dirty};

  ReleaseData: PROCEDURE [state: DataState] = {
    IF state = dirty THEN VMDefs.MarkStartWait[LOOPHOLE[page, VMDefs.Page]]};

  SLWrite: PUBLIC ENTRY PROCEDURE [
    body: ObjectDirDefs.ObjectNumber, SL: HeapDefs.WriterHandle,
    queue: SLDefs.SLQueue] =
    BEGIN
    Action: INTERNAL PROCEDURE [obj: ObjectDirDefs.ObjectNumber] = {
      SubWrite[body, obj, queue]};
    HeapDefs.HeapEndWrite[SL, Action];
    IF queue = input OR queue = express THEN received ← received + 1;
    IF queue = forward THEN forwarded ← forwarded + 1;
    END;

  SubWrite: INTERNAL PROCEDURE [
    body: ObjectDirDefs.ObjectNumber, SL: ObjectDirDefs.ObjectNumber,
    queue: SLDefs.SLQueue] =
    BEGIN
    wPos: LONG POINTER TO SLDefs.ItemAddress = @(data[queue].wPos);
    started: SLDefs.ItemAddress = wPos↑;
    ptr: LONG POINTER TO SLDefs.Item;
    DO
      ptr ← GetItem[wPos↑];
      NextItem[queue, wPos];
      IF ptr.state = free THEN EXIT;
      -- skip items still being read --
      ReleaseData[clean];
      IF wPos↑ = started THEN  --extend--
        BEGIN
        newPage: SLQPage;
        wPos↑.item ← FIRST[SLDefs.ItemIndex];
        wPos↑.page.page ← data[queue].size;
        newPage ← LOOPHOLE[VMDefs.UsePage[wPos↑.page], SLQPage];
        FOR index: SLDefs.ItemIndex IN SLDefs.ItemIndex DO
          newPage[index].state ← free ENDLOOP;
        VMDefs.MarkStartWait[LOOPHOLE[newPage, VMDefs.Page]];
        VMDefs.Release[LOOPHOLE[newPage, VMDefs.Page]];
        data[queue].size ← data[queue].size + 1;
        END;
      ENDLOOP;
    ptr.body ← body;
    ptr.SL ← SL;
    ptr.state ← full;
    ObjectDirDefs.UseObject[body];
    ObjectDirDefs.UseObject[SL];
    ReleaseData[dirty];
    NOTIFY data[queue].itemWritten;
    count[queue] ← count[queue] + 1;
    END;

  WaitForNonEmpty: PUBLIC ENTRY PROC [queue: SLDefs.SLQueue] = {
    InnerWaitNonEmpty[queue]};

  InnerWaitNonEmpty: INTERNAL PROC [queue: SLDefs.SLQueue] = {
    WHILE count[queue] = 0 DO WAIT data[queue].itemWritten ENDLOOP};

  GetCount: PUBLIC PROC [queue: SLDefs.SLQueue] RETURNS [CARDINAL] = {
    RETURN[count[queue]]};

  SLStartRead: PUBLIC ENTRY PROCEDURE [queue: SLDefs.SLQueue]
    RETURNS [
      handle: SLDefs.SLReadHandle, body: ObjectDirDefs.ObjectNumber,
      SL: HeapDefs.ReaderHandle] =
    BEGIN
    rPos: LONG POINTER TO SLDefs.ItemAddress = @(data[queue].rPos);
    ptr: LONG POINTER TO SLDefs.Item;
    InnerWaitNonEmpty[queue];
    count[queue] ← count[queue] - 1;
    DO
      ptr ← GetItem[rPos↑];
      IF ptr.state = full THEN EXIT
      ELSE {ReleaseData[clean]; NextItem[queue, rPos]};
      --skip items which are free or are still being read --
      ENDLOOP;
    handle ← rPos↑;
    NextItem[queue, rPos];
    body ← ptr.body;
    SL ← HeapDefs.HeapStartRead[ptr.SL];
    ptr.state ← reading;
    ReleaseData[dirty];
    LogDefs.WriteChar[
      SELECT queue FROM
        express => 'e,
        input => 'i,
        pending => 'p,
        forward => 'f,
        mailbox => 'm,
        ENDCASE => ERROR];
    END;

  SubTransfer: ENTRY PROCEDURE [
    handle: SLDefs.SLReadHandle, queue: SLDefs.SLQueue] =
    BEGIN
    ptr: LONG POINTER TO SLDefs.Item = GetItem[handle];
    body: ObjectDirDefs.ObjectNumber = ptr.body;
    SL: ObjectDirDefs.ObjectNumber = ptr.SL;
    ReleaseData[clean];
    SubWrite[body, SL, queue];
    IF queue = input THEN reDone ← reDone + 1;
    END;

  SLTransfer: PUBLIC PROC [handle: SLDefs.SLReadHandle, queue: SLDefs.SLQueue] =
    BEGIN SubTransfer[handle, queue]; SLEndRead[handle]; END;

  SLEndRead: PUBLIC ENTRY PROCEDURE [handle: SLDefs.SLReadHandle] =
    BEGIN
    ptr: LONG POINTER TO SLDefs.Item = GetItem[handle];
    body: ObjectDirDefs.ObjectNumber = ptr.body;
    SL: ObjectDirDefs.ObjectNumber = ptr.SL;
    -- beware of the order of events! --
    ptr.state ← free;
    ReleaseData[dirty];
    ObjectDirDefs.FreeObject[body];
    ObjectDirDefs.FreeObject[SL];
    END;

  SLQueueCount: PUBLIC PROC [str: GlassDefs.Handle] =
    BEGIN OPEN str;
    WriteChar[Ascii.CR];
    FOR queue: SLDefs.SLQueue IN SLDefs.SLQueue DO
      WriteString[
        SELECT queue FROM
          express => "Express"L,
          input => "Input"L,
          pending => "Pending"L,
          forward => "Forward"L,
          mailbox => "Mailbox"L,
          ENDCASE => "Unknown"L];
      WriteString[" queue:  length="L];
      DisplayQueue[str, queue];
      ENDLOOP;
    END;

  DisplayQueue: PROC [str: GlassDefs.Handle, queue: SLDefs.SLQueue] =
    BEGIN OPEN str;
    GetRPos: ENTRY PROC [queue: SLDefs.SLQueue] RETURNS [SLDefs.ItemAddress] = {
      RETURN[data[queue].rPos]};
    GetThisItem: ENTRY PROC [
      queue: SLDefs.SLQueue, pos: LONG POINTER TO SLDefs.ItemAddress]
      RETURNS [item: SLDefs.Item] =
      BEGIN
      item ← GetItem[pos↑]↑;
      ReleaseData[clean];
      NextItem[queue, pos];
      IF item.state # free THEN {
        ObjectDirDefs.UseObject[item.SL]; ObjectDirDefs.UseObject[item.body]};
      END;  -- proc. GetThisItem
    firstPos: SLDefs.ItemAddress = GetRPos[queue];
    pos: SLDefs.ItemAddress ← firstPos;
    WriteDecimal[count[queue]];
    WriteChar[Ascii.CR];
    DO
      thisItem: SLDefs.Item = GetThisItem[queue, @pos];
      IF thisItem.state # free THEN
        BEGIN
        slReader: HeapDefs.ReaderHandle = HeapDefs.HeapStartRead[thisItem.SL];
        bodyReader: HeapDefs.ReaderHandle = HeapDefs.HeapStartRead[thisItem.body];
        ObjectDirDefs.FreeObject[thisItem.SL];
        ObjectDirDefs.FreeObject[thisItem.body];
        BEGIN
        ENABLE
          UNWIND => {
            HeapDefs.HeapEndRead[slReader]; HeapDefs.HeapEndRead[bodyReader]};
        s: STRING = [18] --377#377@6553665536-- ;
        slHeader: SLDefs.SLHeader;
        [, ] ← HeapDefs.HeapReadData[
          slReader, [@slHeader, SIZE[SLDefs.SLHeader]]];
        WriteString["Created: "L];
        ProtocolDefs.AppendTimestamp[s, slHeader.created];
        WriteString[s];
	IF slHeader.received.net # 0
	OR slHeader.received.host # 0
	OR slHeader.received.time # slHeader.created.time THEN {
          WriteString[", received: "L];
          s.length ← 0;
          ProtocolDefs.AppendTimestamp[s, slHeader.received];
          WriteString[s]; };
	BEGIN
	server: LONG STRING ← NIL;
        IF slHeader.server # NIL THEN
          server ← WITH this: slHeader.server.name SELECT FROM
              rName => this.value,
              connect => this.value,
              netAddr => "[net address]"L,
              ENDCASE => "[bad name]"L;
        IF server # NIL THEN {
	  WriteString[", server: "L];
	  WriteString[server]; };
        END;
        WriteChar[Ascii.CR];
        END;
        HeapDefs.HeapEndRead[slReader];
        HeapDefs.HeapEndRead[bodyReader];
        END;
      IF pos = firstPos OR DelTyped[] THEN EXIT;
      ENDLOOP;
    END;

  RestartQueues: PUBLIC PROCEDURE [initHeap: BOOLEAN] =
    BEGIN
    SLTitle: ARRAY SLDefs.SLQueue OF STRING = [
      express: "SLQueue.Express"L, input: "SLQueue.Input"L,
      forward: "SLQueue.Forward"L, pending: "SLQueue.Pending"L,
      mailbox: "SLQueue.Mailbox"L];
    SLName: ARRAY SLDefs.SLQueue OF STRING = [
      express: "Express queue"L, input: "Input queue"L, forward: "Forward queue"L,
      pending: "Pending queue"L,
      mailbox: "Mailbox queue"L];
    FOR index: SLDefs.SLQueue IN SLDefs.SLQueue DO
      handle: VMDefs.FileHandle = VMDefs.OpenFile[
        options: oldOrNew, name: SLTitle[index], cacheFraction: 2];
      firstPos: SLDefs.ItemAddress = [
        page: [file: handle, page: 0], item: FIRST[SLDefs.ItemIndex]];
      newFile: BOOLEAN;
      data[index].size ← VMDefs.GetFileLength[handle].page;
      newFile ← data[index].size = 0;
      IF newFile THEN {
        data[index].size ← 1 --cause extension to 1 pages-- ;
        LogDefs.WriteLogEntry["New SL-queue file created"L]};
      IF initHeap OR newFile THEN  -- initialise with empty queue --
        BEGIN
        p: VMDefs.PageNumber;
        FOR p IN [FIRST[VMDefs.PageNumber]..data[index].size) DO
          IF page # NIL THEN VMDefs.Release[LOOPHOLE[page, VMDefs.Page]];
          page ← LOOPHOLE[VMDefs.UsePage[dataPos ← [handle, p]], SLQPage];
          FOR index: SLDefs.ItemIndex IN SLDefs.ItemIndex DO
            page[index].state ← free ENDLOOP;
          ReleaseData[dirty];
          ENDLOOP;
        END;  -- then
      Process.InitializeCondition[@(data[index].itemWritten), 1];
      Process.DisableTimeout[@(data[index].itemWritten)];
      BEGIN  -- Adjust object reference counts --
      pos: SLDefs.ItemAddress ← firstPos;
      count[index] ← 0;  -- count of SL's in this queue --
      DO
        ptr: LONG POINTER TO SLDefs.Item = GetItem[pos];
        state: DataState ← clean;
        IF ptr.state = reading THEN {ptr.state ← full; state ← dirty};
        IF ptr.state = full THEN
          BEGIN
          ObjectDirDefs.RestartObject[ptr.body];
          ObjectDirDefs.RestartObject[ptr.SL];
          count[index] ← count[index] + 1;
          END;
        ReleaseData[state];
        NextItem[index, @pos];
        IF pos = firstPos THEN EXIT;
        ENDLOOP;
      END;
      -- find reader position: skip optional full, then all free items --
      data[index].rPos ← firstPos;
      DO
        ptr: LONG POINTER TO SLDefs.Item = GetItem[data[index].rPos];
        IF ptr.state = free THEN {ReleaseData[clean]; EXIT};
        ReleaseData[clean];
        NextItem[index, @(data[index].rPos)];
        IF data[index].rPos = firstPos THEN EXIT --all items full-- ;
        ENDLOOP;
      BEGIN
      started: SLDefs.ItemAddress = data[index].rPos;
      DO
        ptr: LONG POINTER TO SLDefs.Item = GetItem[data[index].rPos];
        IF ptr.state # free THEN {ReleaseData[clean]; EXIT};
        ReleaseData[clean];
        NextItem[index, @data[index].rPos];
        IF data[index].rPos = started THEN EXIT --all items free-- ;
        ENDLOOP;
      END;
      -- find writer position: after all full items --
      data[index].wPos ← data[index].rPos;
      DO
        ptr: LONG POINTER TO SLDefs.Item ← GetItem[data[index].wPos];
        IF ptr.state = free THEN {ReleaseData[clean]; EXIT};
        ReleaseData[clean];
        NextItem[index, @data[index].wPos];
        IF data[index].wPos = data[index].rPos THEN EXIT;
        ENDLOOP;
      LogDefs.DisplayNumber[SLName[index], [short[@count[index]]]];
      ENDLOOP;
    LogDefs.DisplayNumber["Messages received"L, [long[@received]]];
    LogDefs.DisplayNumber["Messages forwarded"L, [long[@forwarded]]];
    LogDefs.DisplayNumber["Messages re-processed"L, [long[@reDone]]];
    END;



  END.

LOG:  22-Oct-84 15:38:48 - blh:  changed DisplayQueues to print out the server name not handle.