DIRECTORY LoganBerry USING [Entry], LoganQuery USING [ComplexCursor, DestroyProc, QueryClass, QueryClassRec, RetrieveProc], TapMsgQueue; TapMsgQueueImpl: CEDAR MONITOR EXPORTS TapMsgQueue ~ BEGIN Msg: TYPE ~ TapMsgQueue.Msg; MsgQueue: TYPE ~ TapMsgQueue.MsgQueue; Get: PUBLIC PROC [queue: MsgQueue, wait: BOOLEAN _ FALSE] RETURNS [msg: Msg] ~ { IF queue.class.get = NIL THEN RETURN[NIL]; msg _ queue.class.get[queue]; WHILE msg=NIL AND wait AND NOT queue.closed DO Wait[queue]; msg _ queue.class.get[queue]; ENDLOOP; }; Put: PUBLIC PROC [msg: Msg, queue: MsgQueue] RETURNS [] ~ { IF queue.class.put = NIL THEN RETURN[]; -- should raise an error? queue.class.put[queue, msg]; Notify[queue]; }; Close: PUBLIC PROC [queue: MsgQueue] RETURNS [] ~ { IF queue.class.close = NIL THEN RETURN[]; queue.class.close[queue]; queue.closed _ TRUE; Notify[queue]; }; Wait: ENTRY PROC [queue: MsgQueue] ~ { WAIT queue.nonempty; }; Notify: ENTRY PROC [queue: MsgQueue] ~ { NOTIFY queue.nonempty; }; basicClass: TapMsgQueue.QueueClass ~ NEW[TapMsgQueue.QueueClassRec _ [$Basic, BasicGet, BasicPut, BasicClose]]; MsgList: TYPE ~ REF MsgListRec; MsgListRec: TYPE ~ RECORD [ contents: LIST OF Msg _ NIL, -- full contents of queue last: LIST OF Msg _ NIL -- last element in queue ]; Create: PUBLIC PROC [] RETURNS [queue: MsgQueue] ~ { queue _ NEW[TapMsgQueue.MsgQueueRec]; queue.class _ basicClass; queue.data _ NEW[MsgListRec]; }; BasicGet: TapMsgQueue.GetProc ~ { q: MsgList _ NARROW[queue.data]; msg _ GetMsgFromList[q]; }; BasicPut: TapMsgQueue.PutProc ~ { q: MsgList _ NARROW[queue.data]; AddMsgToList[q, msg]; }; BasicClose: TapMsgQueue.CloseProc ~ { NULL; }; GetMsgFromList: ENTRY PROC [list: MsgList] RETURNS [msg: Msg] ~ { IF list.contents = NIL THEN RETURN[NIL]; msg _ list.contents.first; list.contents _ list.contents.rest; IF list.contents = NIL THEN list.last _ NIL; }; AddMsgToList: ENTRY PROC [list: MsgList, msg: Msg] RETURNS [] ~ { IF list.contents = NIL THEN { list.contents _ LIST[msg]; list.last _ list.contents; } ELSE { list.last.rest _ LIST[msg]; list.last _ list.last.rest; }; }; duplicateClass: TapMsgQueue.QueueClass ~ NEW[TapMsgQueue.QueueClassRec _ [$Duplicate, DuplicateGet, DuplicatePut, DuplicateClose]]; DuplicateData: TYPE ~ REF DuplicateDataRec; DuplicateDataRec: TYPE ~ RECORD [ qnum: [0..1], -- which output queue am I? input: MsgQueue, -- input queue shared: SharedDuplicateData -- stuff common to both output queues ]; SharedDuplicateData: TYPE ~ REF SharedDuplicateDataRec; SharedDuplicateDataRec: TYPE ~ RECORD [ closed: ARRAY [0..1] OF BOOLEAN _ ALL[FALSE], -- queue closed? prefetched: ARRAY [0..1] OF MsgList -- msgs ready for output queues ]; Duplicate: PUBLIC PROC [queue: MsgQueue] RETURNS [q1, q2: MsgQueue] ~ { dd: DuplicateData; sd: SharedDuplicateData _ NEW[SharedDuplicateDataRec]; q1 _ NEW[TapMsgQueue.MsgQueueRec]; dd _ NEW[DuplicateDataRec]; q1.class _ duplicateClass; q1.data _ dd; dd.qnum _ 0; dd.input _ queue; dd.shared _ sd; sd.prefetched[dd.qnum] _ NEW[MsgListRec]; q2 _ NEW[TapMsgQueue.MsgQueueRec]; dd _ NEW[DuplicateDataRec]; q2.class _ duplicateClass; q2.data _ dd; dd.qnum _ 1; dd.input _ queue; dd.shared _ sd; sd.prefetched[dd.qnum] _ NEW[MsgListRec]; }; DuplicateGet: TapMsgQueue.GetProc ~ { q: DuplicateData _ NARROW[queue.data]; msg _ GetMsgFromList[q.shared.prefetched[q.qnum]]; IF msg = NIL THEN { msg _ Get[q.input]; IF msg # NIL THEN { AddMsgToList[q.shared.prefetched[IF q.qnum=0 THEN 1 ELSE 0], msg]; }; }; }; DuplicatePut: TapMsgQueue.PutProc ~ { NULL; -- doesn't make sense to write to a duplicated queue }; DuplicateClose: TapMsgQueue.CloseProc ~ { q: DuplicateData _ NARROW[queue.data]; q.shared.closed[q.qnum] _ TRUE; IF q.shared.closed[0] AND q.shared.closed[1] THEN Close[q.input]; }; mergeClass: TapMsgQueue.QueueClass ~ NEW[TapMsgQueue.QueueClassRec _ [$Merge, MergeGet, MergePut, MergeClose]]; MergeData: TYPE ~ REF MergeDataRec; MergeDataRec: TYPE ~ RECORD [ input: ARRAY [0..1] OF MsgQueue, -- two input queues next: [0..1] _ 0 -- next queue to read from ]; Merge: PUBLIC PROC [q1, q2: MsgQueue] RETURNS [queue: MsgQueue] ~ { md: MergeData _ NEW[MergeDataRec]; queue _ NEW[TapMsgQueue.MsgQueueRec]; queue.class _ mergeClass; queue.data _ md; md.input[0] _ q1; md.input[1] _ q2; }; MergeGet: TapMsgQueue.GetProc ~ { q: MergeData _ NARROW[queue.data]; msg _ Get[q.input[q.next]]; q.next _ IF q.next=0 THEN 1 ELSE 0; -- swap inputs for next time IF msg = NIL THEN -- try other input queue msg _ Get[q.input[q.next]]; }; MergePut: TapMsgQueue.PutProc ~ { NULL; -- doesn't make sense to write to a merged queue }; MergeClose: TapMsgQueue.CloseProc ~ { q: MergeData _ NARROW[queue.data]; Close[q.input[0]]; Close[q.input[1]]; }; MsgClass: LoganQuery.QueryClass = NEW[LoganQuery.QueryClassRec _ [$MsgQueue, MsgRetrieve, MsgDestroy]]; CursorFromMsgQueue: PUBLIC PROC [mq: TapMsgQueue.MsgQueue] RETURNS [cursor: LoganQuery.ComplexCursor] = { cursor.class _ MsgClass; cursor.data _ mq; }; EntryFromMsg: PUBLIC PROC [msg: TapMsgQueue.Msg] RETURNS [entry: LoganBerry.Entry] ~ TRUSTED { entry _ LOOPHOLE[msg]; }; MsgFromEntry: PUBLIC PROC [entry: LoganBerry.Entry] RETURNS [msg: TapMsgQueue.Msg] ~ TRUSTED { msg _ LOOPHOLE[entry]; }; MsgRetrieve: LoganQuery.RetrieveProc = { WITH cursor.data SELECT FROM mq: TapMsgQueue.MsgQueue => RETURN[EntryFromMsg[Get[queue: mq]]]; ENDCASE => ERROR; }; MsgDestroy: LoganQuery.DestroyProc = { WITH cursor.data SELECT FROM mq: TapMsgQueue.MsgQueue => Close[queue: mq]; ENDCASE => ERROR; }; END. βTapMsgQueueImpl.mesa Copyright Σ 1989 by Xerox Corporation. All rights reserved. Doug Terry, February 19, 1990 2:57:30 pm PST Implements the message queue abstraction along with the $Basic, $Duplicate, and $Merge message queue classes. Generic operations If wait=TRUE then the call should block until a message is available or the queue is closed. Simply call class-specific operation. May only work on certain classes of message queues. Simply call class-specific operation. The following two procedures exist since WAITs and NOTIFYs have to be done inside entry procedures. Get, Put, and Close cannot be entry procedures since they may be called recursively. The $Basic message queue class Basic queues are simply lists of messages. Creates a message queue of type $Basic. A $Basic queue simply maintains a list of its current elements. PROC [queue: MsgQueue] RETURNS [msg: Msg] PROC [queue: MsgQueue, msg: Msg] RETURNS [] PROC [queue: MsgQueue] RETURNS [] Access to message lists is monitored. The $Duplicate message queue class A duplicated queue is actually two queues where each produces the same elements as the original queue. Creates two queues where each produces the same elements as the original queue. Needs storage for saving msgs that have been retrieved from the input queue and returned to one output queue but not the other. The two new queues have class $Duplicate. PROC [queue: MsgQueue] RETURNS [msg: Msg] Get message off prefetched list if possible Get message off input queue Add message to other output queue's prefetched list PROC [queue: MsgQueue, msg: Msg] RETURNS [] PROC [queue: MsgQueue] RETURNS [] The $Merge message queue class A merged queue contains elements from both of its input queues in unspecified order. For fairness, we alternate reading between the two queues. Creates a new queue that contains elements from both of the input queues in unspecified order. The new queue has class $Merge. PROC [queue: MsgQueue] RETURNS [msg: Msg] PROC [queue: MsgQueue, msg: Msg] RETURNS [] PROC [queue: MsgQueue] RETURNS [] Connecting MsgQueues to ComplexCursors $MsgQueue query class [cursor: ComplexCursor, dir: CursorDirection _ increasing] RETURNS [entry: LoganBerry.Entry] [cursor: ComplexCursor] RETURNS [] ΚΡ˜code•Mark outsideHeaderšœ™Kšœ<™