TapMsgQueueImpl.mesa
Copyright Ó 1989 by Xerox Corporation. All rights reserved.
Doug Terry, February 19, 1990 2:57:30 pm PST
Implements the message queue abstraction along with the $Basic, $Duplicate, and $Merge message queue classes.
DIRECTORY
LoganBerry USING [Entry],
LoganQuery USING [ComplexCursor, DestroyProc, QueryClass, QueryClassRec, RetrieveProc],
TapMsgQueue;
TapMsgQueueImpl: CEDAR MONITOR
EXPORTS TapMsgQueue
~ BEGIN
Msg: TYPE ~ TapMsgQueue.Msg;
MsgQueue: TYPE ~ TapMsgQueue.MsgQueue;
Generic operations
Get: PUBLIC PROC [queue: MsgQueue, wait: BOOLEANFALSE] RETURNS [msg: Msg] ~ {
If wait=TRUE then the call should block until a message is available or the queue is closed.
IF queue.class.get = NIL THEN RETURN[NIL];
msg ← queue.class.get[queue];
WHILE msg=NIL AND wait AND NOT queue.closed DO
Wait[queue];
msg ← queue.class.get[queue];
ENDLOOP;
};
Put: PUBLIC PROC [msg: Msg, queue: MsgQueue] RETURNS [] ~ {
Simply call class-specific operation. May only work on certain classes of message queues.
IF queue.class.put = NIL THEN RETURN[]; -- should raise an error?
queue.class.put[queue, msg];
Notify[queue];
};
Close: PUBLIC PROC [queue: MsgQueue] RETURNS [] ~ {
Simply call class-specific operation.
IF queue.class.close = NIL THEN RETURN[];
queue.class.close[queue];
queue.closed ← TRUE;
Notify[queue];
};
The following two procedures exist since WAITs and NOTIFYs have to be done inside entry procedures. Get, Put, and Close cannot be entry procedures since they may be called recursively.
Wait: ENTRY PROC [queue: MsgQueue] ~ {
WAIT queue.nonempty;
};
Notify: ENTRY PROC [queue: MsgQueue] ~ {
NOTIFY queue.nonempty;
};
The $Basic message queue class
Basic queues are simply lists of messages.
basicClass: TapMsgQueue.QueueClass ~ NEW[TapMsgQueue.QueueClassRec ← [$Basic, BasicGet, BasicPut, BasicClose]];
MsgList: TYPE ~ REF MsgListRec;
MsgListRec: TYPE ~ RECORD [
contents: LIST OF Msg ← NIL, -- full contents of queue
last: LIST OF Msg ← NIL  -- last element in queue
];
Create: PUBLIC PROC [] RETURNS [queue: MsgQueue] ~ {
Creates a message queue of type $Basic. A $Basic queue simply maintains a list of its current elements.
queue ← NEW[TapMsgQueue.MsgQueueRec];
queue.class ← basicClass;
queue.data ← NEW[MsgListRec];
};
BasicGet: TapMsgQueue.GetProc ~ {
PROC [queue: MsgQueue] RETURNS [msg: Msg]
q: MsgList ← NARROW[queue.data];
msg ← GetMsgFromList[q];
};
BasicPut: TapMsgQueue.PutProc ~ {
PROC [queue: MsgQueue, msg: Msg] RETURNS []
q: MsgList ← NARROW[queue.data];
AddMsgToList[q, msg];
};
BasicClose: TapMsgQueue.CloseProc ~ {
PROC [queue: MsgQueue] RETURNS []
NULL;
};
Access to message lists is monitored.
GetMsgFromList: ENTRY PROC [list: MsgList] RETURNS [msg: Msg] ~ {
IF list.contents = NIL THEN RETURN[NIL];
msg ← list.contents.first;
list.contents ← list.contents.rest;
IF list.contents = NIL THEN
list.last ← NIL;
};
AddMsgToList: ENTRY PROC [list: MsgList, msg: Msg] RETURNS [] ~ {
IF list.contents = NIL
THEN {
list.contents ← LIST[msg];
list.last ← list.contents;
}
ELSE {
list.last.rest ← LIST[msg];
list.last ← list.last.rest;
};
};
The $Duplicate message queue class
A duplicated queue is actually two queues where each produces the same elements as the original queue.
duplicateClass: TapMsgQueue.QueueClass ~ NEW[TapMsgQueue.QueueClassRec ← [$Duplicate, DuplicateGet, DuplicatePut, DuplicateClose]];
DuplicateData: TYPE ~ REF DuplicateDataRec;
DuplicateDataRec: TYPE ~ RECORD [
qnum: [0..1],   -- which output queue am I?
input: MsgQueue,   -- input queue
shared: SharedDuplicateData -- stuff common to both output queues
];
SharedDuplicateData: TYPE ~ REF SharedDuplicateDataRec;
SharedDuplicateDataRec: TYPE ~ RECORD [
closed: ARRAY [0..1] OF BOOLEANALL[FALSE], -- queue closed?
prefetched: ARRAY [0..1] OF MsgList -- msgs ready for output queues
];
Duplicate: PUBLIC PROC [queue: MsgQueue] RETURNS [q1, q2: MsgQueue] ~ {
Creates two queues where each produces the same elements as the original queue. Needs storage for saving msgs that have been retrieved from the input queue and returned to one output queue but not the other. The two new queues have class $Duplicate.
dd: DuplicateData;
sd: SharedDuplicateData ← NEW[SharedDuplicateDataRec];
q1 ← NEW[TapMsgQueue.MsgQueueRec];
dd ← NEW[DuplicateDataRec];
q1.class ← duplicateClass;
q1.data ← dd;
dd.qnum ← 0;
dd.input ← queue;
dd.shared ← sd;
sd.prefetched[dd.qnum] ← NEW[MsgListRec];
q2 ← NEW[TapMsgQueue.MsgQueueRec];
dd ← NEW[DuplicateDataRec];
q2.class ← duplicateClass;
q2.data ← dd;
dd.qnum ← 1;
dd.input ← queue;
dd.shared ← sd;
sd.prefetched[dd.qnum] ← NEW[MsgListRec];
};
DuplicateGet: TapMsgQueue.GetProc ~ {
PROC [queue: MsgQueue] RETURNS [msg: Msg]
q: DuplicateData ← NARROW[queue.data];
Get message off prefetched list if possible
msg ← GetMsgFromList[q.shared.prefetched[q.qnum]];
IF msg = NIL THEN {
Get message off input queue
msg ← Get[q.input];
IF msg # NIL THEN {
Add message to other output queue's prefetched list
AddMsgToList[q.shared.prefetched[IF q.qnum=0 THEN 1 ELSE 0], msg];
};
};
};
DuplicatePut: TapMsgQueue.PutProc ~ {
PROC [queue: MsgQueue, msg: Msg] RETURNS []
NULL; -- doesn't make sense to write to a duplicated queue
};
DuplicateClose: TapMsgQueue.CloseProc ~ {
PROC [queue: MsgQueue] RETURNS []
q: DuplicateData ← NARROW[queue.data];
q.shared.closed[q.qnum] ← TRUE;
IF q.shared.closed[0] AND q.shared.closed[1] THEN
Close[q.input];
};
The $Merge message queue class
A merged queue contains elements from both of its input queues in unspecified order. For fairness, we alternate reading between the two queues.
mergeClass: TapMsgQueue.QueueClass ~ NEW[TapMsgQueue.QueueClassRec ← [$Merge, MergeGet, MergePut, MergeClose]];
MergeData: TYPE ~ REF MergeDataRec;
MergeDataRec: TYPE ~ RECORD [
input: ARRAY [0..1] OF MsgQueue,  -- two input queues
next: [0..1] ← 0  -- next queue to read from
];
Merge: PUBLIC PROC [q1, q2: MsgQueue] RETURNS [queue: MsgQueue] ~ {
Creates a new queue that contains elements from both of the input queues in unspecified order. The new queue has class $Merge.
md: MergeData ← NEW[MergeDataRec];
queue ← NEW[TapMsgQueue.MsgQueueRec];
queue.class ← mergeClass;
queue.data ← md;
md.input[0] ← q1;
md.input[1] ← q2;
};
MergeGet: TapMsgQueue.GetProc ~ {
PROC [queue: MsgQueue] RETURNS [msg: Msg]
q: MergeData ← NARROW[queue.data];
msg ← Get[q.input[q.next]];
q.next ← IF q.next=0 THEN 1 ELSE 0; -- swap inputs for next time
IF msg = NIL THEN -- try other input queue
msg ← Get[q.input[q.next]];
};
MergePut: TapMsgQueue.PutProc ~ {
PROC [queue: MsgQueue, msg: Msg] RETURNS []
NULL; -- doesn't make sense to write to a merged queue
};
MergeClose: TapMsgQueue.CloseProc ~ {
PROC [queue: MsgQueue] RETURNS []
q: MergeData ← NARROW[queue.data];
Close[q.input[0]];
Close[q.input[1]];
};
Connecting MsgQueues to ComplexCursors
$MsgQueue query class
MsgClass: LoganQuery.QueryClass = NEW[LoganQuery.QueryClassRec ← [$MsgQueue, MsgRetrieve, MsgDestroy]];
CursorFromMsgQueue: PUBLIC PROC [mq: TapMsgQueue.MsgQueue] RETURNS [cursor: LoganQuery.ComplexCursor] = {
cursor.class ← MsgClass;
cursor.data ← mq;
};
EntryFromMsg: PUBLIC PROC [msg: TapMsgQueue.Msg] RETURNS [entry: LoganBerry.Entry] ~ TRUSTED {
entry ← LOOPHOLE[msg];
};
MsgFromEntry: PUBLIC PROC [entry: LoganBerry.Entry] RETURNS [msg: TapMsgQueue.Msg] ~ TRUSTED {
msg ← LOOPHOLE[entry];
};
MsgRetrieve: LoganQuery.RetrieveProc = {
[cursor: ComplexCursor, dir: CursorDirection ← increasing] RETURNS [entry: LoganBerry.Entry]
WITH cursor.data SELECT FROM
mq: TapMsgQueue.MsgQueue => RETURN[EntryFromMsg[Get[queue: mq]]];
ENDCASE => ERROR;
};
MsgDestroy: LoganQuery.DestroyProc = {
[cursor: ComplexCursor] RETURNS []
WITH cursor.data SELECT FROM
mq: TapMsgQueue.MsgQueue => Close[queue: mq];
ENDCASE => ERROR;
};
END.