<> <> <> <<>> <> <<>> DIRECTORY LoganBerry USING [Entry], LoganQuery USING [ComplexCursor, DestroyProc, QueryClass, QueryClassRec, RetrieveProc], TapMsgQueue; TapMsgQueueImpl: CEDAR MONITOR EXPORTS TapMsgQueue ~ BEGIN Msg: TYPE ~ TapMsgQueue.Msg; MsgQueue: TYPE ~ TapMsgQueue.MsgQueue; <> Get: PUBLIC PROC [queue: MsgQueue, wait: BOOLEAN _ FALSE] RETURNS [msg: Msg] ~ { <> IF queue.class.get = NIL THEN RETURN[NIL]; msg _ queue.class.get[queue]; WHILE msg=NIL AND wait AND NOT queue.closed DO Wait[queue]; msg _ queue.class.get[queue]; ENDLOOP; }; <<>> Put: PUBLIC PROC [msg: Msg, queue: MsgQueue] RETURNS [] ~ { <> IF queue.class.put = NIL THEN RETURN[]; -- should raise an error? queue.class.put[queue, msg]; Notify[queue]; }; <<>> Close: PUBLIC PROC [queue: MsgQueue] RETURNS [] ~ { <> IF queue.class.close = NIL THEN RETURN[]; queue.class.close[queue]; queue.closed _ TRUE; Notify[queue]; }; <<>> <> Wait: ENTRY PROC [queue: MsgQueue] ~ { WAIT queue.nonempty; }; Notify: ENTRY PROC [queue: MsgQueue] ~ { NOTIFY queue.nonempty; }; <> <> <<>> basicClass: TapMsgQueue.QueueClass ~ NEW[TapMsgQueue.QueueClassRec _ [$Basic, BasicGet, BasicPut, BasicClose]]; MsgList: TYPE ~ REF MsgListRec; MsgListRec: TYPE ~ RECORD [ contents: LIST OF Msg _ NIL, -- full contents of queue last: LIST OF Msg _ NIL -- last element in queue ]; Create: PUBLIC PROC [] RETURNS [queue: MsgQueue] ~ { <> queue _ NEW[TapMsgQueue.MsgQueueRec]; queue.class _ basicClass; queue.data _ NEW[MsgListRec]; }; <<>> BasicGet: TapMsgQueue.GetProc ~ { <> q: MsgList _ NARROW[queue.data]; msg _ GetMsgFromList[q]; }; <<>> BasicPut: TapMsgQueue.PutProc ~ { <> q: MsgList _ NARROW[queue.data]; AddMsgToList[q, msg]; }; <<>> BasicClose: TapMsgQueue.CloseProc ~ { <> NULL; }; <<>> <> GetMsgFromList: ENTRY PROC [list: MsgList] RETURNS [msg: Msg] ~ { IF list.contents = NIL THEN RETURN[NIL]; msg _ list.contents.first; list.contents _ list.contents.rest; IF list.contents = NIL THEN list.last _ NIL; }; AddMsgToList: ENTRY PROC [list: MsgList, msg: Msg] RETURNS [] ~ { IF list.contents = NIL THEN { list.contents _ LIST[msg]; list.last _ list.contents; } ELSE { list.last.rest _ LIST[msg]; list.last _ list.last.rest; }; }; <> <> <<>> duplicateClass: TapMsgQueue.QueueClass ~ NEW[TapMsgQueue.QueueClassRec _ [$Duplicate, DuplicateGet, DuplicatePut, DuplicateClose]]; DuplicateData: TYPE ~ REF DuplicateDataRec; DuplicateDataRec: TYPE ~ RECORD [ qnum: [0..1], -- which output queue am I? input: MsgQueue, -- input queue shared: SharedDuplicateData -- stuff common to both output queues ]; SharedDuplicateData: TYPE ~ REF SharedDuplicateDataRec; SharedDuplicateDataRec: TYPE ~ RECORD [ closed: ARRAY [0..1] OF BOOLEAN _ ALL[FALSE], -- queue closed? prefetched: ARRAY [0..1] OF MsgList -- msgs ready for output queues ]; Duplicate: PUBLIC PROC [queue: MsgQueue] RETURNS [q1, q2: MsgQueue] ~ { <> dd: DuplicateData; sd: SharedDuplicateData _ NEW[SharedDuplicateDataRec]; q1 _ NEW[TapMsgQueue.MsgQueueRec]; dd _ NEW[DuplicateDataRec]; q1.class _ duplicateClass; q1.data _ dd; dd.qnum _ 0; dd.input _ queue; dd.shared _ sd; sd.prefetched[dd.qnum] _ NEW[MsgListRec]; q2 _ NEW[TapMsgQueue.MsgQueueRec]; dd _ NEW[DuplicateDataRec]; q2.class _ duplicateClass; q2.data _ dd; dd.qnum _ 1; dd.input _ queue; dd.shared _ sd; sd.prefetched[dd.qnum] _ NEW[MsgListRec]; }; <<>> DuplicateGet: TapMsgQueue.GetProc ~ { <> q: DuplicateData _ NARROW[queue.data]; <> msg _ GetMsgFromList[q.shared.prefetched[q.qnum]]; IF msg = NIL THEN { <> msg _ Get[q.input]; IF msg # NIL THEN { <> AddMsgToList[q.shared.prefetched[IF q.qnum=0 THEN 1 ELSE 0], msg]; }; }; }; <<>> DuplicatePut: TapMsgQueue.PutProc ~ { <> NULL; -- doesn't make sense to write to a duplicated queue }; <<>> DuplicateClose: TapMsgQueue.CloseProc ~ { <> q: DuplicateData _ NARROW[queue.data]; q.shared.closed[q.qnum] _ TRUE; IF q.shared.closed[0] AND q.shared.closed[1] THEN Close[q.input]; }; <<>> <> <> <<>> mergeClass: TapMsgQueue.QueueClass ~ NEW[TapMsgQueue.QueueClassRec _ [$Merge, MergeGet, MergePut, MergeClose]]; MergeData: TYPE ~ REF MergeDataRec; MergeDataRec: TYPE ~ RECORD [ input: ARRAY [0..1] OF MsgQueue, -- two input queues next: [0..1] _ 0 -- next queue to read from ]; Merge: PUBLIC PROC [q1, q2: MsgQueue] RETURNS [queue: MsgQueue] ~ { <> md: MergeData _ NEW[MergeDataRec]; queue _ NEW[TapMsgQueue.MsgQueueRec]; queue.class _ mergeClass; queue.data _ md; md.input[0] _ q1; md.input[1] _ q2; }; <<>> MergeGet: TapMsgQueue.GetProc ~ { <> q: MergeData _ NARROW[queue.data]; msg _ Get[q.input[q.next]]; q.next _ IF q.next=0 THEN 1 ELSE 0; -- swap inputs for next time IF msg = NIL THEN -- try other input queue msg _ Get[q.input[q.next]]; }; <<>> MergePut: TapMsgQueue.PutProc ~ { <> NULL; -- doesn't make sense to write to a merged queue }; <<>> MergeClose: TapMsgQueue.CloseProc ~ { <> q: MergeData _ NARROW[queue.data]; Close[q.input[0]]; Close[q.input[1]]; }; <<>> <> <<$MsgQueue query class>> MsgClass: LoganQuery.QueryClass = NEW[LoganQuery.QueryClassRec _ [$MsgQueue, MsgRetrieve, MsgDestroy]]; CursorFromMsgQueue: PUBLIC PROC [mq: TapMsgQueue.MsgQueue] RETURNS [cursor: LoganQuery.ComplexCursor] = { cursor.class _ MsgClass; cursor.data _ mq; }; EntryFromMsg: PUBLIC PROC [msg: TapMsgQueue.Msg] RETURNS [entry: LoganBerry.Entry] ~ TRUSTED { entry _ LOOPHOLE[msg]; }; MsgFromEntry: PUBLIC PROC [entry: LoganBerry.Entry] RETURNS [msg: TapMsgQueue.Msg] ~ TRUSTED { msg _ LOOPHOLE[entry]; }; MsgRetrieve: LoganQuery.RetrieveProc = { <<[cursor: ComplexCursor, dir: CursorDirection _ increasing] RETURNS [entry: LoganBerry.Entry]>> WITH cursor.data SELECT FROM mq: TapMsgQueue.MsgQueue => RETURN[EntryFromMsg[Get[queue: mq]]]; ENDCASE => ERROR; }; MsgDestroy: LoganQuery.DestroyProc = { <<[cursor: ComplexCursor] RETURNS []>> WITH cursor.data SELECT FROM mq: TapMsgQueue.MsgQueue => Close[queue: mq]; ENDCASE => ERROR; }; END.