TapMsgQueueImpl.mesa
Copyright Ó 1989 by Xerox Corporation. All rights reserved.
Doug Terry, February 19, 1990 2:57:30 pm PST
DIRECTORY
LoganBerry USING [Entry],
LoganQuery USING [ComplexCursor, DestroyProc, QueryClass, QueryClassRec, RetrieveProc],
TapMsgQueue;
Generic operations
Get:
PUBLIC
PROC [queue: MsgQueue, wait:
BOOLEAN ←
FALSE]
RETURNS [msg: Msg] ~ {
If wait=TRUE then the call should block until a message is available or the queue is closed.
IF queue.class.get = NIL THEN RETURN[NIL];
msg ← queue.class.get[queue];
WHILE msg=
NIL
AND wait
AND
NOT queue.closed
DO
Wait[queue];
msg ← queue.class.get[queue];
ENDLOOP;
};
Put:
PUBLIC
PROC [msg: Msg, queue: MsgQueue]
RETURNS [] ~ {
Simply call class-specific operation. May only work on certain classes of message queues.
IF queue.class.put = NIL THEN RETURN[]; -- should raise an error?
queue.class.put[queue, msg];
Notify[queue];
};
Close:
PUBLIC
PROC [queue: MsgQueue]
RETURNS [] ~ {
Simply call class-specific operation.
IF queue.class.close = NIL THEN RETURN[];
queue.class.close[queue];
queue.closed ← TRUE;
Notify[queue];
};
The following two procedures exist since WAITs and NOTIFYs have to be done inside entry procedures. Get, Put, and Close cannot be entry procedures since they may be called recursively.
Wait:
ENTRY PROC [queue: MsgQueue] ~ {
WAIT queue.nonempty;
};
Notify:
ENTRY PROC [queue: MsgQueue] ~ {
NOTIFY queue.nonempty;
};
The $Basic message queue class
Basic queues are simply lists of messages.
basicClass: TapMsgQueue.QueueClass ~ NEW[TapMsgQueue.QueueClassRec ← [$Basic, BasicGet, BasicPut, BasicClose]];
MsgList: TYPE ~ REF MsgListRec;
MsgListRec:
TYPE ~
RECORD [
contents: LIST OF Msg ← NIL, -- full contents of queue
last: LIST OF Msg ← NIL -- last element in queue
];
Create:
PUBLIC
PROC []
RETURNS [queue: MsgQueue] ~ {
Creates a message queue of type $Basic. A $Basic queue simply maintains a list of its current elements.
queue ← NEW[TapMsgQueue.MsgQueueRec];
queue.class ← basicClass;
queue.data ← NEW[MsgListRec];
};
BasicGet: TapMsgQueue.GetProc ~ {
PROC [queue: MsgQueue] RETURNS [msg: Msg]
q: MsgList ← NARROW[queue.data];
msg ← GetMsgFromList[q];
};
BasicPut: TapMsgQueue.PutProc ~ {
PROC [queue: MsgQueue, msg: Msg] RETURNS []
q: MsgList ← NARROW[queue.data];
AddMsgToList[q, msg];
};
BasicClose: TapMsgQueue.CloseProc ~ {
PROC [queue: MsgQueue] RETURNS []
NULL;
};
Access to message lists is monitored.
GetMsgFromList:
ENTRY PROC [list: MsgList]
RETURNS [msg: Msg] ~ {
IF list.contents = NIL THEN RETURN[NIL];
msg ← list.contents.first;
list.contents ← list.contents.rest;
IF list.contents =
NIL
THEN
list.last ← NIL;
};
AddMsgToList:
ENTRY PROC [list: MsgList, msg: Msg]
RETURNS [] ~ {
IF list.contents =
NIL
THEN {
list.contents ← LIST[msg];
list.last ← list.contents;
}
ELSE {
list.last.rest ← LIST[msg];
list.last ← list.last.rest;
};
};
The $Duplicate message queue class
A duplicated queue is actually two queues where each produces the same elements as the original queue.
duplicateClass: TapMsgQueue.QueueClass ~ NEW[TapMsgQueue.QueueClassRec ← [$Duplicate, DuplicateGet, DuplicatePut, DuplicateClose]];
DuplicateData: TYPE ~ REF DuplicateDataRec;
DuplicateDataRec:
TYPE ~
RECORD [
qnum: [0..1], -- which output queue am I?
input: MsgQueue, -- input queue
shared: SharedDuplicateData -- stuff common to both output queues
];
SharedDuplicateData: TYPE ~ REF SharedDuplicateDataRec;
SharedDuplicateDataRec:
TYPE ~
RECORD [
closed: ARRAY [0..1] OF BOOLEAN ← ALL[FALSE], -- queue closed?
prefetched: ARRAY [0..1] OF MsgList -- msgs ready for output queues
];
Duplicate:
PUBLIC
PROC [queue: MsgQueue]
RETURNS [q1, q2: MsgQueue] ~ {
Creates two queues where each produces the same elements as the original queue. Needs storage for saving msgs that have been retrieved from the input queue and returned to one output queue but not the other. The two new queues have class $Duplicate.
dd: DuplicateData;
sd: SharedDuplicateData ← NEW[SharedDuplicateDataRec];
q1 ← NEW[TapMsgQueue.MsgQueueRec];
dd ← NEW[DuplicateDataRec];
q1.class ← duplicateClass;
q1.data ← dd;
dd.qnum ← 0;
dd.input ← queue;
dd.shared ← sd;
sd.prefetched[dd.qnum] ← NEW[MsgListRec];
q2 ← NEW[TapMsgQueue.MsgQueueRec];
dd ← NEW[DuplicateDataRec];
q2.class ← duplicateClass;
q2.data ← dd;
dd.qnum ← 1;
dd.input ← queue;
dd.shared ← sd;
sd.prefetched[dd.qnum] ← NEW[MsgListRec];
};
DuplicateGet: TapMsgQueue.GetProc ~ {
PROC [queue: MsgQueue] RETURNS [msg: Msg]
q: DuplicateData ← NARROW[queue.data];
Get message off prefetched list if possible
msg ← GetMsgFromList[q.shared.prefetched[q.qnum]];
IF msg =
NIL
THEN {
Get message off input queue
msg ← Get[q.input];
IF msg #
NIL
THEN {
Add message to other output queue's prefetched list
AddMsgToList[q.shared.prefetched[IF q.qnum=0 THEN 1 ELSE 0], msg];
};
};
};
DuplicatePut: TapMsgQueue.PutProc ~ {
PROC [queue: MsgQueue, msg: Msg] RETURNS []
NULL; -- doesn't make sense to write to a duplicated queue
};
DuplicateClose: TapMsgQueue.CloseProc ~ {
PROC [queue: MsgQueue] RETURNS []
q: DuplicateData ← NARROW[queue.data];
q.shared.closed[q.qnum] ← TRUE;
IF q.shared.closed[0]
AND q.shared.closed[1]
THEN
Close[q.input];
};
The $Merge message queue class
A merged queue contains elements from both of its input queues in unspecified order. For fairness, we alternate reading between the two queues.
mergeClass: TapMsgQueue.QueueClass ~ NEW[TapMsgQueue.QueueClassRec ← [$Merge, MergeGet, MergePut, MergeClose]];
MergeData: TYPE ~ REF MergeDataRec;
MergeDataRec:
TYPE ~
RECORD [
input: ARRAY [0..1] OF MsgQueue, -- two input queues
next: [0..1] ← 0 -- next queue to read from
];
Merge:
PUBLIC
PROC [q1, q2: MsgQueue]
RETURNS [queue: MsgQueue] ~ {
Creates a new queue that contains elements from both of the input queues in unspecified order. The new queue has class $Merge.
md: MergeData ← NEW[MergeDataRec];
queue ← NEW[TapMsgQueue.MsgQueueRec];
queue.class ← mergeClass;
queue.data ← md;
md.input[0] ← q1;
md.input[1] ← q2;
};
MergeGet: TapMsgQueue.GetProc ~ {
PROC [queue: MsgQueue] RETURNS [msg: Msg]
q: MergeData ← NARROW[queue.data];
msg ← Get[q.input[q.next]];
q.next ← IF q.next=0 THEN 1 ELSE 0; -- swap inputs for next time
IF msg =
NIL
THEN
-- try other input queue
msg ← Get[q.input[q.next]];
};
MergePut: TapMsgQueue.PutProc ~ {
PROC [queue: MsgQueue, msg: Msg] RETURNS []
NULL; -- doesn't make sense to write to a merged queue
};
MergeClose: TapMsgQueue.CloseProc ~ {
PROC [queue: MsgQueue] RETURNS []
q: MergeData ← NARROW[queue.data];
Close[q.input[0]];
Close[q.input[1]];
};
Connecting MsgQueues to ComplexCursors
$MsgQueue query class
MsgClass: LoganQuery.QueryClass = NEW[LoganQuery.QueryClassRec ← [$MsgQueue, MsgRetrieve, MsgDestroy]];
CursorFromMsgQueue:
PUBLIC
PROC [mq: TapMsgQueue.MsgQueue]
RETURNS [cursor: LoganQuery.ComplexCursor] = {
cursor.class ← MsgClass;
cursor.data ← mq;
};
EntryFromMsg:
PUBLIC
PROC [msg: TapMsgQueue.Msg]
RETURNS [entry: LoganBerry.Entry] ~
TRUSTED {
entry ← LOOPHOLE[msg];
};
MsgFromEntry:
PUBLIC
PROC [entry: LoganBerry.Entry]
RETURNS [msg: TapMsgQueue.Msg] ~
TRUSTED {
msg ← LOOPHOLE[entry];
};
MsgRetrieve: LoganQuery.RetrieveProc = {
[cursor: ComplexCursor, dir: CursorDirection ← increasing] RETURNS [entry: LoganBerry.Entry]
WITH cursor.data
SELECT
FROM
mq: TapMsgQueue.MsgQueue => RETURN[EntryFromMsg[Get[queue: mq]]];
ENDCASE => ERROR;
};
MsgDestroy: LoganQuery.DestroyProc = {
[cursor: ComplexCursor] RETURNS []
WITH cursor.data
SELECT
FROM
mq: TapMsgQueue.MsgQueue => Close[queue: mq];
ENDCASE => ERROR;
};