-- FTPQueue.mesa, Edit: HGM September 30, 1980  5:54 PM  

-- Copyright  Xerox Corporation 1979, 1980

DIRECTORY
  FTPDefs,
  FTPPrivateDefs,
  Process USING [InitializeCondition, InitializeMonitor],
  Storage USING [Node, Free];

FTPQueue: MONITOR 
  -- Note:  The queue primitives constitute the monitor.
  -- lock identification
    LOCKS queue USING queue: Queue
  -- import list
    IMPORTS Process, Storage, FTPPrivateDefs
  -- export list
    EXPORTS FTPPrivateDefs
  -- share list
    SHARES FTPDefs
  = BEGIN OPEN FTPDefs, FTPPrivateDefs; 


InitializeQueue: PUBLIC PROCEDURE [queue: Queue, identifiersCoincide: IdentifiersCoincide] =
  BEGIN
  -- initialize queue object
    queue.head ← queue.tail ← NIL;
    queue.elementCount ← 0;
    queue.identifiersCoincide ← identifiersCoincide;
  -- initialize queue conditions
    Process.InitializeCondition[@queue.elementDequeued, LAST[CARDINAL]];
    Process.InitializeCondition[@queue.elementUnlocked, LAST[CARDINAL]];
  -- initialize monitor lock
    Process.InitializeMonitor[@queue.LOCK];
  END;

FinalizeQueue: PUBLIC ENTRY PROCEDURE [queue: Queue] =
  BEGIN ENABLE UNWIND => NULL;
  -- Note:  Awaits emptying of queue.
  -- wait for queue to empty
    UNTIL queue.elementCount = 0 DO
      WAIT queue.elementDequeued;
      ENDLOOP;
  END;

EnQueue: PUBLIC ENTRY PROCEDURE [queue: Queue, element: Element, identifier: UNSPECIFIED] RETURNS [enqueuedElement: Element] =
  BEGIN ENABLE UNWIND => NULL;
  -- Note:  Allocates element if none supplied.
  -- local constants
    allocated: BOOLEAN = (element = NIL);
  -- allocate element object if necessary
    IF allocated THEN element ← Storage.Node[SIZE[ElementObject]];
  -- initialize element object
    element↑ ← ElementObject[
      previous: queue.tail, next: NIL,
      identifier: identifier,
      allocated: allocated, locked: FALSE, exclusive: ];
  -- append element to queue
    IF queue.head = NIL THEN queue.head ← element
    ELSE queue.tail.next ← element;
    queue.tail ← element;
  -- increment element count
    queue.elementCount ← queue.elementCount + 1;
  -- return enqueued element
    enqueuedElement ← element;
  END;

DeQueue: PUBLIC ENTRY PROCEDURE [queue: Queue, element: Element] =
  BEGIN ENABLE UNWIND => NULL;
  -- Note:  Assumes element unlocked.
  -- decrement element count
    queue.elementCount ← queue.elementCount - 1;
  -- remove element from queue
    IF element = queue.head THEN queue.head ← element.next
    ELSE element.previous.next ← element.next;
    IF element = queue.tail THEN queue.tail ← element.previous
    ELSE element.next.previous ← element.previous;
  -- verify queue consistency
    IF (queue.elementCount = 0 AND (queue.head # NIL OR queue.tail # NIL))
    OR (queue.elementCount # 0 AND (queue.head = NIL OR queue.tail = NIL)) THEN
      Abort[queueInconsistent];
  -- release element object if allocated
    IF element.allocated THEN Storage.Free[element];
  -- broadcast element dequeued
    BROADCAST queue.elementDequeued;
  END;

LockQueue: PUBLIC ENTRY PROCEDURE [queue: Queue, element: Element, exclusive: BOOLEAN] =
  BEGIN ENABLE UNWIND => NULL;
  -- Note:  Assumes element unlocked and waits indefinitely.
  -- local variables
    nextElement: Element;
    conflict: BOOLEAN ← FALSE;
  -- wait until no conflict
    DO
      -- check for conflicts
        FOR nextElement ← queue.head, nextElement.next
            UNTIL (conflict OR nextElement = NIL) DO
          conflict ← nextElement.locked AND (exclusive OR nextElement.exclusive)
            AND queue.identifiersCoincide[element.identifier, nextElement.identifier];
          ENDLOOP;
      -- exit if no conflict
        IF ~conflict THEN EXIT;
      -- wait for an element to be unlocked
        WAIT queue.elementUnlocked;
      ENDLOOP;
  -- lock queue element
    element.locked ← TRUE;
    element.exclusive ← exclusive;
  END;

UnlockQueue: PUBLIC ENTRY PROCEDURE [queue: Queue, element: Element] =
  BEGIN ENABLE UNWIND => NULL;
  -- unlock element
    element.locked ← FALSE;
  -- broadcast element unlocked
    BROADCAST queue.elementUnlocked;
  END;

EnumerateQueue: PUBLIC ENTRY PROCEDURE [queue: Queue, processElement: PROCEDURE [Element] RETURNS [BOOLEAN]] RETURNS [element: Element] =
  BEGIN ENABLE UNWIND => NULL;
  -- Note:  processElement must not dequeue the element's successor.
  -- local variables
    nextElement: Element;
  -- enumerate elements of queue
    FOR element ← queue.head, nextElement UNTIL element = NIL DO
      -- save next element in case client dequeues current one
        nextElement ← element.next;
      -- pass current element to caller's procedure
        IF processElement[element] THEN EXIT;
      ENDLOOP;
  END;

END..