-- FTPQueue.mesa, Edit: HGM September 30, 1980  5:54 PM  
-- Copyright  Xerox Corporation 1979, 1980

DIRECTORY
  FTPDefs,
  FTPPrivateDefs,
  Process USING [InitializeCondition, InitializeMonitor],
  MDSStorage USING [Node, Free];

FTPQueue: MONITOR
  -- Note:  The queue primitives constitute the monitor.
  -- lock identification
  LOCKS queue USING queue: Queue
  IMPORTS Process, Storage: MDSStorage, FTPPrivateDefs
  EXPORTS FTPPrivateDefs
  SHARES FTPDefs =
  BEGIN OPEN FTPDefs, FTPPrivateDefs;


  InitializeQueue: PUBLIC PROCEDURE [
    queue: Queue, identifiersCoincide: IdentifiersCoincide] =
    BEGIN
    -- initialize queue object
    queue.head ← queue.tail ← NIL;
    queue.elementCount ← 0;
    queue.identifiersCoincide ← identifiersCoincide;
    -- initialize queue conditions
    Process.InitializeCondition[@queue.elementDequeued, LAST[CARDINAL]];
    Process.InitializeCondition[@queue.elementUnlocked, LAST[CARDINAL]];
    -- initialize monitor lock
    Process.InitializeMonitor[@queue.LOCK];
    END;

  FinalizeQueue: PUBLIC ENTRY PROCEDURE [queue: Queue] =
    BEGIN
    ENABLE UNWIND => NULL;
    -- Note:  Awaits emptying of queue.
    -- wait for queue to empty
    UNTIL queue.elementCount = 0 DO WAIT queue.elementDequeued; ENDLOOP;
    END;

  EnQueue: PUBLIC ENTRY PROCEDURE [
    queue: Queue, element: Element, identifier: UNSPECIFIED]
    RETURNS [enqueuedElement: Element] =
    BEGIN
    ENABLE UNWIND => NULL;
    -- Note:  Allocates element if none supplied.
    -- local constants
    allocated: BOOLEAN = (element = NIL);
    -- allocate element object if necessary
    IF allocated THEN element ← Storage.Node[SIZE[ElementObject]];
    -- initialize element object
    element↑ ← ElementObject[
      previous: queue.tail, next: NIL, identifier: identifier,
      allocated: allocated, locked: FALSE, exclusive:];
    -- append element to queue
    IF queue.head = NIL THEN queue.head ← element ELSE queue.tail.next ← element;
    queue.tail ← element;
    -- increment element count
    queue.elementCount ← queue.elementCount + 1;
    -- return enqueued element
    enqueuedElement ← element;
    END;

  DeQueue: PUBLIC ENTRY PROCEDURE [queue: Queue, element: Element] =
    BEGIN
    ENABLE UNWIND => NULL;
    -- Note:  Assumes element unlocked.
    -- decrement element count
    queue.elementCount ← queue.elementCount - 1;
    -- remove element from queue
    IF element = queue.head THEN queue.head ← element.next
    ELSE element.previous.next ← element.next;
    IF element = queue.tail THEN queue.tail ← element.previous
    ELSE element.next.previous ← element.previous;
    -- verify queue consistency
    IF (queue.elementCount = 0 AND (queue.head # NIL OR queue.tail # NIL))
      OR (queue.elementCount # 0 AND (queue.head = NIL OR queue.tail = NIL)) THEN
      Abort[queueInconsistent];
    -- release element object if allocated
    IF element.allocated THEN Storage.Free[element];
    -- broadcast element dequeued
    BROADCAST queue.elementDequeued;
    END;

  LockQueue: PUBLIC ENTRY PROCEDURE [
    queue: Queue, element: Element, exclusive: BOOLEAN] =
    BEGIN
    ENABLE UNWIND => NULL;
    -- Note:  Assumes element unlocked and waits indefinitely.
    -- local variables
    nextElement: Element;
    conflict: BOOLEAN ← FALSE;
    -- wait until no conflict
    DO
      -- check for conflicts
      FOR nextElement ← queue.head, nextElement.next UNTIL
        (conflict OR nextElement = NIL) DO
        conflict ← nextElement.locked AND (exclusive OR nextElement.exclusive)
          AND queue.identifiersCoincide[
            element.identifier, nextElement.identifier];
        ENDLOOP;
      -- exit if no conflict
      IF ~conflict THEN EXIT;
      -- wait for an element to be unlocked
      WAIT queue.elementUnlocked;
      ENDLOOP;
    -- lock queue element
    element.locked ← TRUE;
    element.exclusive ← exclusive;
    END;

  UnlockQueue: PUBLIC ENTRY PROCEDURE [queue: Queue, element: Element] =
    BEGIN
    ENABLE UNWIND => NULL;
    -- unlock element
    element.locked ← FALSE;
    -- broadcast element unlocked
    BROADCAST queue.elementUnlocked;
    END;

  EnumerateQueue: PUBLIC ENTRY PROCEDURE [
    queue: Queue, processElement: PROCEDURE [Element] RETURNS [BOOLEAN]]
    RETURNS [element: Element] =
    BEGIN
    ENABLE UNWIND => NULL;
    -- Note:  processElement must not dequeue the element's successor.
    -- local variables
    nextElement: Element;
    -- enumerate elements of queue
    FOR element ← queue.head, nextElement UNTIL element = NIL DO
      -- save next element in case client dequeues current one
      nextElement ← element.next;
      -- pass current element to caller's procedure
      IF processElement[element] THEN EXIT;
      ENDLOOP;
    END;

  END..