-- Transport Mechanism Mail Server - steering list queues --
-- [Indigo]<Grapevine>MS>SLQueue.mesa --
-- Andrew Birrell September 13, 1982 3:41 pm --
DIRECTORY
Ascii USING[ CR],
EnquiryDefs USING[ ],
GlassDefs USING[ Handle ],
HeapDefs USING[ HeapEndRead, HeapEndWrite, HeapReadData,
HeapStartRead, ReaderHandle, WriterHandle ],
LogDefs USING[ WriteChar ],
ObjectDirDefs USING[ FreeObject, ObjectNumber, UseObject ],
ProtocolDefs USING[ AppendTimestamp ],
SLDefs USING[ Item, ItemAddress, ItemIndex,
SLHeader, SLQueue, SLReadHandle ],
VMDefs USING[ MarkStartWait, Page, PageAddress, PageNumber,
ReadPage, Release, UsePage ];
SLQueueImpl: MONITOR
IMPORTS HeapDefs, LogDefs, ObjectDirDefs, ProtocolDefs, VMDefs
EXPORTS EnquiryDefs, SLDefs =
BEGIN
data: ARRAY SLDefs.SLQueue OF RECORD[
rPos: SLDefs.ItemAddress,
wPos: SLDefs.ItemAddress,
size: VMDefs.PageNumber,
itemWritten: CONDITION ];
count: ARRAY SLDefs.SLQueue OF CARDINAL ← ALL[0];
received: CARDINAL ← 0;
reDone: CARDINAL ← 0;
SLQPage: TYPE = POINTER TO ARRAY SLDefs.ItemIndex OF SLDefs.Item;
dataPos: VMDefs.PageAddress;
page: SLQPage ← NIL;
NextItem: PROCEDURE[ queue: SLDefs.SLQueue,
pos: POINTER TO SLDefs.ItemAddress ] =
BEGIN
IF pos.item = LAST[SLDefs.ItemIndex]
THEN BEGIN
pos.item ← FIRST[SLDefs.ItemIndex];
IF pos.page.page+1 = data[queue].size
THEN pos.page.page ← FIRST[VMDefs.PageNumber]
ELSE pos.page.page ← pos.page.page+1;
END
ELSE pos.item ← pos.item + 1;
END;
GetItem: PROCEDURE[ pos: SLDefs.ItemAddress ]
RETURNS[ POINTER TO SLDefs.Item ] =
BEGIN
IF page = NIL OR pos.page # dataPos
THEN BEGIN
IF page # NIL THEN VMDefs.Release[LOOPHOLE[page,VMDefs.Page]];
dataPos ← pos.page;
page ← LOOPHOLE[VMDefs.ReadPage[ pos.page, 0--lookAhead-- ],
SLQPage];
END;
RETURN[ @(page[pos.item]) ]
END;
DataState: TYPE = {clean,dirty};
ReleaseData: PROCEDURE[ state: DataState ] =
BEGIN
IF state = dirty THEN VMDefs.MarkStartWait[LOOPHOLE[page,VMDefs.Page]];
END;
SLWrite: PUBLIC ENTRY PROCEDURE[
body: ObjectDirDefs.ObjectNumber,
SL: HeapDefs.WriterHandle,
queue: SLDefs.SLQueue ] =
BEGIN
Action: INTERNAL PROCEDURE[ obj: ObjectDirDefs.ObjectNumber ] =
BEGIN
SubWrite[body, obj, queue];
END;
HeapDefs.HeapEndWrite[SL, Action ];
IF queue = input OR queue = express THEN received ← received + 1;
END;
SubWrite: INTERNAL PROCEDURE[
body: ObjectDirDefs.ObjectNumber,
SL: ObjectDirDefs.ObjectNumber,
queue: SLDefs.SLQueue ] =
BEGIN
wPos: POINTER TO SLDefs.ItemAddress = @(data[queue].wPos);
started: SLDefs.ItemAddress = wPos↑;
ptr: POINTER TO SLDefs.Item;
DO ptr ← GetItem[wPos↑]; NextItem[queue, wPos];
IF ptr.state = free THEN EXIT;
-- skip items still being read --
ReleaseData[clean];
IF wPos↑ = started
THEN --extend--
BEGIN
newPage: SLQPage;
wPos↑.item ← FIRST[SLDefs.ItemIndex];
wPos↑.page.page ← data[queue].size;
newPage ← LOOPHOLE[VMDefs.UsePage[wPos↑.page],SLQPage];
FOR index: SLDefs.ItemIndex IN SLDefs.ItemIndex
DO newPage[index].state ← free ENDLOOP;
VMDefs.MarkStartWait[LOOPHOLE[newPage,VMDefs.Page]];
VMDefs.Release[LOOPHOLE[newPage,VMDefs.Page]];
data[queue].size ← data[queue].size + 1;
END;
ENDLOOP;
ptr.body ← body;
ptr.SL ← SL;
ptr.state ← full;
ObjectDirDefs.UseObject[body];
ObjectDirDefs.UseObject[SL];
ReleaseData[dirty];
NOTIFY data[queue].itemWritten;
count[queue] ← count[queue] + 1;
END;
WaitForNonEmpty: PUBLIC ENTRY PROC[ queue: SLDefs.SLQueue ] =
{ InnerWaitNonEmpty[queue] };
InnerWaitNonEmpty: INTERNAL PROC[ queue: SLDefs.SLQueue ] =
BEGIN
WHILE count[queue] = 0 DO WAIT data[queue].itemWritten ENDLOOP;
END;
GetCount: PUBLIC ENTRY PROC[ queue: SLDefs.SLQueue ]
RETURNS[CARDINAL] =
{ RETURN[count[queue]] };
SLStartRead: PUBLIC ENTRY PROCEDURE[ queue: SLDefs.SLQueue ]
RETURNS[ handle: SLDefs.SLReadHandle,
body: ObjectDirDefs.ObjectNumber,
SL: HeapDefs.ReaderHandle ] =
BEGIN
rPos: POINTER TO SLDefs.ItemAddress = @(data[queue].rPos);
ptr: POINTER TO SLDefs.Item;
InnerWaitNonEmpty[queue];
DO ptr ← GetItem[rPos↑];
IF ptr.state = full
THEN EXIT
ELSE { ReleaseData[clean]; NextItem[queue, rPos] };
--skip items which are free or are still being read --
ENDLOOP;
handle ← rPos↑; NextItem[queue, rPos];
body ← ptr.body;
SL ← HeapDefs.HeapStartRead[ptr.SL];
ptr.state ← reading; ReleaseData[dirty];
count[queue] ← count[queue] - 1;
LogDefs.WriteChar[SELECT queue FROM
express => 'e,
input => 'i,
pending => 'p,
forward => 'f,
foreign => 'x,
mailbox => 'm,
ENDCASE => ERROR];
END;
SubTransfer: ENTRY PROCEDURE[ handle: SLDefs.SLReadHandle,
queue: SLDefs.SLQueue ] =
BEGIN
ptr: POINTER TO SLDefs.Item = GetItem[handle];
body: ObjectDirDefs.ObjectNumber = ptr.body;
SL: ObjectDirDefs.ObjectNumber = ptr.SL;
ReleaseData[clean];
SubWrite[body, SL, queue];
IF queue = input THEN reDone ← reDone + 1;
END;
SLTransfer: PUBLIC PROCEDURE[ handle: SLDefs.SLReadHandle,
queue: SLDefs.SLQueue ] =
BEGIN
SubTransfer[handle, queue];
SLEndRead[handle];
END;
SLEndRead: PUBLIC ENTRY PROCEDURE[ handle: SLDefs.SLReadHandle ] =
BEGIN
ptr: POINTER TO SLDefs.Item = GetItem[handle];
body: ObjectDirDefs.ObjectNumber = ptr.body;
SL: ObjectDirDefs.ObjectNumber = ptr.SL;
-- beware of the order of events! --
ptr.state ← free;
ReleaseData[dirty];
ObjectDirDefs.FreeObject[body];
ObjectDirDefs.FreeObject[SL];
END;
SLQueueCount: PUBLIC PROC[str: GlassDefs.Handle] =
BEGIN
OPEN str;
WriteChar[Ascii.CR];
FOR queue: SLDefs.SLQueue IN SLDefs.SLQueue
DO WriteString[SELECT queue FROM
express => "Express"L,
input => "Input"L,
pending => "Pending"L,
forward => "Forward"L,
foreign => "Foreign"L,
mailbox => "Mailbox"L,
ENDCASE => "Unknown"L];
WriteString[" queue: length="L];
DisplayQueue[str, queue];
ENDLOOP;
END;
DisplayQueue: PROC[str: GlassDefs.Handle, queue: SLDefs.SLQueue] =
BEGIN
OPEN str;
GetRPos: ENTRY PROC[queue: SLDefs.SLQueue] RETURNS[SLDefs.ItemAddress] =
{ RETURN[data[queue].rPos] };
GetThisItem: ENTRY PROC[queue: SLDefs.SLQueue,
pos: POINTER TO SLDefs.ItemAddress]
RETURNS[item:SLDefs.Item] =
BEGIN
item ← GetItem[pos↑]↑; ReleaseData[clean]; NextItem[queue, pos];
IF item.state # free
THEN BEGIN
ObjectDirDefs.UseObject[item.SL];
ObjectDirDefs.UseObject[item.body];
END;
END;
firstPos: SLDefs.ItemAddress = GetRPos[queue];
pos: SLDefs.ItemAddress ← firstPos;
WriteDecimal[count[queue]];
WriteChar[Ascii.CR];
DO thisItem: SLDefs.Item = GetThisItem[queue, @pos];
IF thisItem.state # free
THEN BEGIN
slReader: HeapDefs.ReaderHandle =
HeapDefs.HeapStartRead[thisItem.SL];
bodyReader: HeapDefs.ReaderHandle =
HeapDefs.HeapStartRead[thisItem.body];
ObjectDirDefs.FreeObject[thisItem.SL];
ObjectDirDefs.FreeObject[thisItem.body];
BEGIN
ENABLE UNWIND =>
BEGIN
HeapDefs.HeapEndRead[slReader];
HeapDefs.HeapEndRead[bodyReader];
END;
s: STRING = [18] --377#377@6553665536--;
slHeader: SLDefs.SLHeader;
[,] ← HeapDefs.HeapReadData[slReader,
[@slHeader,SIZE[SLDefs.SLHeader]]];
WriteString["Created: "L];
ProtocolDefs.AppendTimestamp[s, slHeader.created];
WriteString[s];
WriteString[", received: "L];
s.length←0;
ProtocolDefs.AppendTimestamp[s, slHeader.received];
WriteString[s];
WriteString[", server: "L];
WriteDecimal[LOOPHOLE[slHeader.server]];
WriteChar[Ascii.CR];
END;
HeapDefs.HeapEndRead[slReader];
HeapDefs.HeapEndRead[bodyReader];
END;
IF pos = firstPos OR DelTyped[] THEN EXIT;
ENDLOOP;
END;
END.