-- Copyright (C) 1984, 1985 by Xerox Corporation. All rights reserved.
-- SLQueueImpl.mesa, Transport Mechanism Mail Server - steering list queues --
-- HGM, 15-Sep-85 12:46:08
-- Andrew Birrell September 13, 1982 3:41 pm --
-- Hankins 23-Oct-84 9:56:21
DIRECTORY
Ascii USING [CR],
EnquiryDefs USING [],
GlassDefs USING [Handle],
HeapDefs USING [
HeapEndRead, HeapEndWrite, HeapReadData, HeapStartRead, ReaderHandle,
WriterHandle],
LogDefs USING [DisplayNumber, WriteChar, WriteLogEntry],
ObjectDirDefs USING [FreeObject, ObjectNumber, RestartObject, UseObject],
Process USING [InitializeCondition, DisableTimeout],
ProtocolDefs USING [AppendTimestamp],
SLDefs USING [Item, ItemAddress, ItemIndex, SLHeader, SLQueue, SLReadHandle],
VMDefs USING [
FileHandle, GetFileLength, MarkStartWait, OpenFile, Page, PageAddress,
PageNumber, ReadPage, Release, UsePage];
SLQueueImpl: MONITOR
IMPORTS HeapDefs, LogDefs, ObjectDirDefs, Process, ProtocolDefs, VMDefs
EXPORTS EnquiryDefs, SLDefs =
BEGIN
data: ARRAY SLDefs.SLQueue OF RECORD [
rPos: SLDefs.ItemAddress,
wPos: SLDefs.ItemAddress,
size: VMDefs.PageNumber,
itemWritten: CONDITION];
count: ARRAY SLDefs.SLQueue OF CARDINAL ← ALL[0];
received, reDone, forwarded: LONG CARDINAL ← 0;
SLQPage: TYPE = LONG POINTER TO ARRAY SLDefs.ItemIndex OF SLDefs.Item;
dataPos: VMDefs.PageAddress;
page: SLQPage ← NIL;
NextItem: PROCEDURE [
queue: SLDefs.SLQueue, pos: LONG POINTER TO SLDefs.ItemAddress] =
BEGIN
IF pos.item = LAST[SLDefs.ItemIndex] THEN
BEGIN
pos.item ← FIRST[SLDefs.ItemIndex];
IF pos.page.page + 1 = data[queue].size THEN
pos.page.page ← FIRST[VMDefs.PageNumber]
ELSE pos.page.page ← pos.page.page + 1;
END
ELSE pos.item ← pos.item + 1;
END;
GetItem: PROCEDURE [pos: SLDefs.ItemAddress] RETURNS [LONG POINTER TO SLDefs.Item] =
BEGIN
IF page = NIL OR pos.page # dataPos THEN
BEGIN
IF page # NIL THEN VMDefs.Release[LOOPHOLE[page, VMDefs.Page]];
dataPos ← pos.page;
page ← LOOPHOLE[VMDefs.ReadPage[pos.page, 0 --lookAhead-- ], SLQPage];
END;
RETURN[@(page[pos.item])]
END;
DataState: TYPE = {clean, dirty};
ReleaseData: PROCEDURE [state: DataState] = {
IF state = dirty THEN VMDefs.MarkStartWait[LOOPHOLE[page, VMDefs.Page]]};
SLWrite: PUBLIC ENTRY PROCEDURE [
body: ObjectDirDefs.ObjectNumber, SL: HeapDefs.WriterHandle,
queue: SLDefs.SLQueue] =
BEGIN
Action: INTERNAL PROCEDURE [obj: ObjectDirDefs.ObjectNumber] = {
SubWrite[body, obj, queue]};
HeapDefs.HeapEndWrite[SL, Action];
IF queue = input OR queue = express THEN received ← received + 1;
IF queue = forward THEN forwarded ← forwarded + 1;
END;
SubWrite: INTERNAL PROCEDURE [
body: ObjectDirDefs.ObjectNumber, SL: ObjectDirDefs.ObjectNumber,
queue: SLDefs.SLQueue] =
BEGIN
wPos: LONG POINTER TO SLDefs.ItemAddress = @(data[queue].wPos);
started: SLDefs.ItemAddress = wPos↑;
ptr: LONG POINTER TO SLDefs.Item;
DO
ptr ← GetItem[wPos↑];
NextItem[queue, wPos];
IF ptr.state = free THEN EXIT;
-- skip items still being read --
ReleaseData[clean];
IF wPos↑ = started THEN --extend--
BEGIN
newPage: SLQPage;
wPos↑.item ← FIRST[SLDefs.ItemIndex];
wPos↑.page.page ← data[queue].size;
newPage ← LOOPHOLE[VMDefs.UsePage[wPos↑.page], SLQPage];
FOR index: SLDefs.ItemIndex IN SLDefs.ItemIndex DO
newPage[index].state ← free ENDLOOP;
VMDefs.MarkStartWait[LOOPHOLE[newPage, VMDefs.Page]];
VMDefs.Release[LOOPHOLE[newPage, VMDefs.Page]];
data[queue].size ← data[queue].size + 1;
END;
ENDLOOP;
ptr.body ← body;
ptr.SL ← SL;
ptr.state ← full;
ObjectDirDefs.UseObject[body];
ObjectDirDefs.UseObject[SL];
ReleaseData[dirty];
NOTIFY data[queue].itemWritten;
count[queue] ← count[queue] + 1;
END;
WaitForNonEmpty: PUBLIC ENTRY PROC [queue: SLDefs.SLQueue] = {
InnerWaitNonEmpty[queue]};
InnerWaitNonEmpty: INTERNAL PROC [queue: SLDefs.SLQueue] = {
WHILE count[queue] = 0 DO WAIT data[queue].itemWritten ENDLOOP};
GetCount: PUBLIC PROC [queue: SLDefs.SLQueue] RETURNS [CARDINAL] = {
RETURN[count[queue]]};
SLStartRead: PUBLIC ENTRY PROCEDURE [queue: SLDefs.SLQueue]
RETURNS [
handle: SLDefs.SLReadHandle, body: ObjectDirDefs.ObjectNumber,
SL: HeapDefs.ReaderHandle] =
BEGIN
rPos: LONG POINTER TO SLDefs.ItemAddress = @(data[queue].rPos);
ptr: LONG POINTER TO SLDefs.Item;
InnerWaitNonEmpty[queue];
count[queue] ← count[queue] - 1;
DO
ptr ← GetItem[rPos↑];
IF ptr.state = full THEN EXIT
ELSE {ReleaseData[clean]; NextItem[queue, rPos]};
--skip items which are free or are still being read --
ENDLOOP;
handle ← rPos↑;
NextItem[queue, rPos];
body ← ptr.body;
SL ← HeapDefs.HeapStartRead[ptr.SL];
ptr.state ← reading;
ReleaseData[dirty];
LogDefs.WriteChar[
SELECT queue FROM
express => 'e,
input => 'i,
pending => 'p,
forward => 'f,
mailbox => 'm,
ENDCASE => ERROR];
END;
SubTransfer: ENTRY PROCEDURE [
handle: SLDefs.SLReadHandle, queue: SLDefs.SLQueue] =
BEGIN
ptr: LONG POINTER TO SLDefs.Item = GetItem[handle];
body: ObjectDirDefs.ObjectNumber = ptr.body;
SL: ObjectDirDefs.ObjectNumber = ptr.SL;
ReleaseData[clean];
SubWrite[body, SL, queue];
IF queue = input THEN reDone ← reDone + 1;
END;
SLTransfer: PUBLIC PROC [handle: SLDefs.SLReadHandle, queue: SLDefs.SLQueue] =
BEGIN SubTransfer[handle, queue]; SLEndRead[handle]; END;
SLEndRead: PUBLIC ENTRY PROCEDURE [handle: SLDefs.SLReadHandle] =
BEGIN
ptr: LONG POINTER TO SLDefs.Item = GetItem[handle];
body: ObjectDirDefs.ObjectNumber = ptr.body;
SL: ObjectDirDefs.ObjectNumber = ptr.SL;
-- beware of the order of events! --
ptr.state ← free;
ReleaseData[dirty];
ObjectDirDefs.FreeObject[body];
ObjectDirDefs.FreeObject[SL];
END;
SLQueueCount: PUBLIC PROC [str: GlassDefs.Handle] =
BEGIN OPEN str;
WriteChar[Ascii.CR];
FOR queue: SLDefs.SLQueue IN SLDefs.SLQueue DO
WriteString[
SELECT queue FROM
express => "Express"L,
input => "Input"L,
pending => "Pending"L,
forward => "Forward"L,
mailbox => "Mailbox"L,
ENDCASE => "Unknown"L];
WriteString[" queue: length="L];
DisplayQueue[str, queue];
ENDLOOP;
END;
DisplayQueue: PROC [str: GlassDefs.Handle, queue: SLDefs.SLQueue] =
BEGIN OPEN str;
GetRPos: ENTRY PROC [queue: SLDefs.SLQueue] RETURNS [SLDefs.ItemAddress] = {
RETURN[data[queue].rPos]};
GetThisItem: ENTRY PROC [
queue: SLDefs.SLQueue, pos: LONG POINTER TO SLDefs.ItemAddress]
RETURNS [item: SLDefs.Item] =
BEGIN
item ← GetItem[pos↑]↑;
ReleaseData[clean];
NextItem[queue, pos];
IF item.state # free THEN {
ObjectDirDefs.UseObject[item.SL]; ObjectDirDefs.UseObject[item.body]};
END; -- proc. GetThisItem
firstPos: SLDefs.ItemAddress = GetRPos[queue];
pos: SLDefs.ItemAddress ← firstPos;
WriteDecimal[count[queue]];
WriteChar[Ascii.CR];
DO
thisItem: SLDefs.Item = GetThisItem[queue, @pos];
IF thisItem.state # free THEN
BEGIN
slReader: HeapDefs.ReaderHandle = HeapDefs.HeapStartRead[thisItem.SL];
bodyReader: HeapDefs.ReaderHandle = HeapDefs.HeapStartRead[thisItem.body];
ObjectDirDefs.FreeObject[thisItem.SL];
ObjectDirDefs.FreeObject[thisItem.body];
BEGIN
ENABLE
UNWIND => {
HeapDefs.HeapEndRead[slReader]; HeapDefs.HeapEndRead[bodyReader]};
s: STRING = [18] --377#377@6553665536-- ;
slHeader: SLDefs.SLHeader;
[, ] ← HeapDefs.HeapReadData[
slReader, [@slHeader, SIZE[SLDefs.SLHeader]]];
WriteString["Created: "L];
ProtocolDefs.AppendTimestamp[s, slHeader.created];
WriteString[s];
IF slHeader.received.net # 0
OR slHeader.received.host # 0
OR slHeader.received.time # slHeader.created.time THEN {
WriteString[", received: "L];
s.length ← 0;
ProtocolDefs.AppendTimestamp[s, slHeader.received];
WriteString[s]; };
BEGIN
server: LONG STRING ← NIL;
IF slHeader.server # NIL THEN
server ← WITH this: slHeader.server.name SELECT FROM
rName => this.value,
connect => this.value,
netAddr => "[net address]"L,
ENDCASE => "[bad name]"L;
IF server # NIL THEN {
WriteString[", server: "L];
WriteString[server]; };
END;
WriteChar[Ascii.CR];
END;
HeapDefs.HeapEndRead[slReader];
HeapDefs.HeapEndRead[bodyReader];
END;
IF pos = firstPos OR DelTyped[] THEN EXIT;
ENDLOOP;
END;
RestartQueues: PUBLIC PROCEDURE [initHeap: BOOLEAN] =
BEGIN
SLTitle: ARRAY SLDefs.SLQueue OF STRING = [
express: "SLQueue.Express"L, input: "SLQueue.Input"L,
forward: "SLQueue.Forward"L, pending: "SLQueue.Pending"L,
mailbox: "SLQueue.Mailbox"L];
SLName: ARRAY SLDefs.SLQueue OF STRING = [
express: "Express queue"L, input: "Input queue"L, forward: "Forward queue"L,
pending: "Pending queue"L,
mailbox: "Mailbox queue"L];
FOR index: SLDefs.SLQueue IN SLDefs.SLQueue DO
handle: VMDefs.FileHandle = VMDefs.OpenFile[
options: oldOrNew, name: SLTitle[index], cacheFraction: 2];
firstPos: SLDefs.ItemAddress = [
page: [file: handle, page: 0], item: FIRST[SLDefs.ItemIndex]];
newFile: BOOLEAN;
data[index].size ← VMDefs.GetFileLength[handle].page;
newFile ← data[index].size = 0;
IF newFile THEN {
data[index].size ← 1 --cause extension to 1 pages-- ;
LogDefs.WriteLogEntry["New SL-queue file created"L]};
IF initHeap OR newFile THEN -- initialise with empty queue --
BEGIN
p: VMDefs.PageNumber;
FOR p IN [FIRST[VMDefs.PageNumber]..data[index].size) DO
IF page # NIL THEN VMDefs.Release[LOOPHOLE[page, VMDefs.Page]];
page ← LOOPHOLE[VMDefs.UsePage[dataPos ← [handle, p]], SLQPage];
FOR index: SLDefs.ItemIndex IN SLDefs.ItemIndex DO
page[index].state ← free ENDLOOP;
ReleaseData[dirty];
ENDLOOP;
END; -- then
Process.InitializeCondition[@(data[index].itemWritten), 1];
Process.DisableTimeout[@(data[index].itemWritten)];
BEGIN -- Adjust object reference counts --
pos: SLDefs.ItemAddress ← firstPos;
count[index] ← 0; -- count of SL's in this queue --
DO
ptr: LONG POINTER TO SLDefs.Item = GetItem[pos];
state: DataState ← clean;
IF ptr.state = reading THEN {ptr.state ← full; state ← dirty};
IF ptr.state = full THEN
BEGIN
ObjectDirDefs.RestartObject[ptr.body];
ObjectDirDefs.RestartObject[ptr.SL];
count[index] ← count[index] + 1;
END;
ReleaseData[state];
NextItem[index, @pos];
IF pos = firstPos THEN EXIT;
ENDLOOP;
END;
-- find reader position: skip optional full, then all free items --
data[index].rPos ← firstPos;
DO
ptr: LONG POINTER TO SLDefs.Item = GetItem[data[index].rPos];
IF ptr.state = free THEN {ReleaseData[clean]; EXIT};
ReleaseData[clean];
NextItem[index, @(data[index].rPos)];
IF data[index].rPos = firstPos THEN EXIT --all items full-- ;
ENDLOOP;
BEGIN
started: SLDefs.ItemAddress = data[index].rPos;
DO
ptr: LONG POINTER TO SLDefs.Item = GetItem[data[index].rPos];
IF ptr.state # free THEN {ReleaseData[clean]; EXIT};
ReleaseData[clean];
NextItem[index, @data[index].rPos];
IF data[index].rPos = started THEN EXIT --all items free-- ;
ENDLOOP;
END;
-- find writer position: after all full items --
data[index].wPos ← data[index].rPos;
DO
ptr: LONG POINTER TO SLDefs.Item ← GetItem[data[index].wPos];
IF ptr.state = free THEN {ReleaseData[clean]; EXIT};
ReleaseData[clean];
NextItem[index, @data[index].wPos];
IF data[index].wPos = data[index].rPos THEN EXIT;
ENDLOOP;
LogDefs.DisplayNumber[SLName[index], [short[@count[index]]]];
ENDLOOP;
LogDefs.DisplayNumber["Messages received"L, [long[@received]]];
LogDefs.DisplayNumber["Messages forwarded"L, [long[@forwarded]]];
LogDefs.DisplayNumber["Messages re-processed"L, [long[@reDone]]];
END;
END.
LOG: 22-Oct-84 15:38:48 - blh: changed DisplayQueues to print out the server name not handle.