-- Copyright (C) 1984, 1985 by Xerox Corporation. All rights reserved. -- SLQueueImpl.mesa, Transport Mechanism Mail Server - steering list queues -- -- HGM, 15-Sep-85 12:46:08 -- Andrew Birrell September 13, 1982 3:41 pm -- -- Hankins 23-Oct-84 9:56:21 DIRECTORY Ascii USING [CR], EnquiryDefs USING [], GlassDefs USING [Handle], HeapDefs USING [ HeapEndRead, HeapEndWrite, HeapReadData, HeapStartRead, ReaderHandle, WriterHandle], LogDefs USING [DisplayNumber, WriteChar, WriteLogEntry], ObjectDirDefs USING [FreeObject, ObjectNumber, RestartObject, UseObject], Process USING [InitializeCondition, DisableTimeout], ProtocolDefs USING [AppendTimestamp], SLDefs USING [Item, ItemAddress, ItemIndex, SLHeader, SLQueue, SLReadHandle], VMDefs USING [ FileHandle, GetFileLength, MarkStartWait, OpenFile, Page, PageAddress, PageNumber, ReadPage, Release, UsePage]; SLQueueImpl: MONITOR IMPORTS HeapDefs, LogDefs, ObjectDirDefs, Process, ProtocolDefs, VMDefs EXPORTS EnquiryDefs, SLDefs = BEGIN data: ARRAY SLDefs.SLQueue OF RECORD [ rPos: SLDefs.ItemAddress, wPos: SLDefs.ItemAddress, size: VMDefs.PageNumber, itemWritten: CONDITION]; count: ARRAY SLDefs.SLQueue OF CARDINAL ← ALL[0]; received, reDone, forwarded: LONG CARDINAL ← 0; SLQPage: TYPE = LONG POINTER TO ARRAY SLDefs.ItemIndex OF SLDefs.Item; dataPos: VMDefs.PageAddress; page: SLQPage ← NIL; NextItem: PROCEDURE [ queue: SLDefs.SLQueue, pos: LONG POINTER TO SLDefs.ItemAddress] = BEGIN IF pos.item = LAST[SLDefs.ItemIndex] THEN BEGIN pos.item ← FIRST[SLDefs.ItemIndex]; IF pos.page.page + 1 = data[queue].size THEN pos.page.page ← FIRST[VMDefs.PageNumber] ELSE pos.page.page ← pos.page.page + 1; END ELSE pos.item ← pos.item + 1; END; GetItem: PROCEDURE [pos: SLDefs.ItemAddress] RETURNS [LONG POINTER TO SLDefs.Item] = BEGIN IF page = NIL OR pos.page # dataPos THEN BEGIN IF page # NIL THEN VMDefs.Release[LOOPHOLE[page, VMDefs.Page]]; dataPos ← pos.page; page ← LOOPHOLE[VMDefs.ReadPage[pos.page, 0 --lookAhead-- ], SLQPage]; END; RETURN[@(page[pos.item])] END; DataState: TYPE = {clean, dirty}; ReleaseData: PROCEDURE [state: DataState] = { IF state = dirty THEN VMDefs.MarkStartWait[LOOPHOLE[page, VMDefs.Page]]}; SLWrite: PUBLIC ENTRY PROCEDURE [ body: ObjectDirDefs.ObjectNumber, SL: HeapDefs.WriterHandle, queue: SLDefs.SLQueue] = BEGIN Action: INTERNAL PROCEDURE [obj: ObjectDirDefs.ObjectNumber] = { SubWrite[body, obj, queue]}; HeapDefs.HeapEndWrite[SL, Action]; IF queue = input OR queue = express THEN received ← received + 1; IF queue = forward THEN forwarded ← forwarded + 1; END; SubWrite: INTERNAL PROCEDURE [ body: ObjectDirDefs.ObjectNumber, SL: ObjectDirDefs.ObjectNumber, queue: SLDefs.SLQueue] = BEGIN wPos: LONG POINTER TO SLDefs.ItemAddress = @(data[queue].wPos); started: SLDefs.ItemAddress = wPos↑; ptr: LONG POINTER TO SLDefs.Item; DO ptr ← GetItem[wPos↑]; NextItem[queue, wPos]; IF ptr.state = free THEN EXIT; -- skip items still being read -- ReleaseData[clean]; IF wPos↑ = started THEN --extend-- BEGIN newPage: SLQPage; wPos↑.item ← FIRST[SLDefs.ItemIndex]; wPos↑.page.page ← data[queue].size; newPage ← LOOPHOLE[VMDefs.UsePage[wPos↑.page], SLQPage]; FOR index: SLDefs.ItemIndex IN SLDefs.ItemIndex DO newPage[index].state ← free ENDLOOP; VMDefs.MarkStartWait[LOOPHOLE[newPage, VMDefs.Page]]; VMDefs.Release[LOOPHOLE[newPage, VMDefs.Page]]; data[queue].size ← data[queue].size + 1; END; ENDLOOP; ptr.body ← body; ptr.SL ← SL; ptr.state ← full; ObjectDirDefs.UseObject[body]; ObjectDirDefs.UseObject[SL]; ReleaseData[dirty]; NOTIFY data[queue].itemWritten; count[queue] ← count[queue] + 1; END; WaitForNonEmpty: PUBLIC ENTRY PROC [queue: SLDefs.SLQueue] = { InnerWaitNonEmpty[queue]}; InnerWaitNonEmpty: INTERNAL PROC [queue: SLDefs.SLQueue] = { WHILE count[queue] = 0 DO WAIT data[queue].itemWritten ENDLOOP}; GetCount: PUBLIC PROC [queue: SLDefs.SLQueue] RETURNS [CARDINAL] = { RETURN[count[queue]]}; SLStartRead: PUBLIC ENTRY PROCEDURE [queue: SLDefs.SLQueue] RETURNS [ handle: SLDefs.SLReadHandle, body: ObjectDirDefs.ObjectNumber, SL: HeapDefs.ReaderHandle] = BEGIN rPos: LONG POINTER TO SLDefs.ItemAddress = @(data[queue].rPos); ptr: LONG POINTER TO SLDefs.Item; InnerWaitNonEmpty[queue]; count[queue] ← count[queue] - 1; DO ptr ← GetItem[rPos↑]; IF ptr.state = full THEN EXIT ELSE {ReleaseData[clean]; NextItem[queue, rPos]}; --skip items which are free or are still being read -- ENDLOOP; handle ← rPos↑; NextItem[queue, rPos]; body ← ptr.body; SL ← HeapDefs.HeapStartRead[ptr.SL]; ptr.state ← reading; ReleaseData[dirty]; LogDefs.WriteChar[ SELECT queue FROM express => 'e, input => 'i, pending => 'p, forward => 'f, mailbox => 'm, ENDCASE => ERROR]; END; SubTransfer: ENTRY PROCEDURE [ handle: SLDefs.SLReadHandle, queue: SLDefs.SLQueue] = BEGIN ptr: LONG POINTER TO SLDefs.Item = GetItem[handle]; body: ObjectDirDefs.ObjectNumber = ptr.body; SL: ObjectDirDefs.ObjectNumber = ptr.SL; ReleaseData[clean]; SubWrite[body, SL, queue]; IF queue = input THEN reDone ← reDone + 1; END; SLTransfer: PUBLIC PROC [handle: SLDefs.SLReadHandle, queue: SLDefs.SLQueue] = BEGIN SubTransfer[handle, queue]; SLEndRead[handle]; END; SLEndRead: PUBLIC ENTRY PROCEDURE [handle: SLDefs.SLReadHandle] = BEGIN ptr: LONG POINTER TO SLDefs.Item = GetItem[handle]; body: ObjectDirDefs.ObjectNumber = ptr.body; SL: ObjectDirDefs.ObjectNumber = ptr.SL; -- beware of the order of events! -- ptr.state ← free; ReleaseData[dirty]; ObjectDirDefs.FreeObject[body]; ObjectDirDefs.FreeObject[SL]; END; SLQueueCount: PUBLIC PROC [str: GlassDefs.Handle] = BEGIN OPEN str; WriteChar[Ascii.CR]; FOR queue: SLDefs.SLQueue IN SLDefs.SLQueue DO WriteString[ SELECT queue FROM express => "Express"L, input => "Input"L, pending => "Pending"L, forward => "Forward"L, mailbox => "Mailbox"L, ENDCASE => "Unknown"L]; WriteString[" queue: length="L]; DisplayQueue[str, queue]; ENDLOOP; END; DisplayQueue: PROC [str: GlassDefs.Handle, queue: SLDefs.SLQueue] = BEGIN OPEN str; GetRPos: ENTRY PROC [queue: SLDefs.SLQueue] RETURNS [SLDefs.ItemAddress] = { RETURN[data[queue].rPos]}; GetThisItem: ENTRY PROC [ queue: SLDefs.SLQueue, pos: LONG POINTER TO SLDefs.ItemAddress] RETURNS [item: SLDefs.Item] = BEGIN item ← GetItem[pos↑]↑; ReleaseData[clean]; NextItem[queue, pos]; IF item.state # free THEN { ObjectDirDefs.UseObject[item.SL]; ObjectDirDefs.UseObject[item.body]}; END; -- proc. GetThisItem firstPos: SLDefs.ItemAddress = GetRPos[queue]; pos: SLDefs.ItemAddress ← firstPos; WriteDecimal[count[queue]]; WriteChar[Ascii.CR]; DO thisItem: SLDefs.Item = GetThisItem[queue, @pos]; IF thisItem.state # free THEN BEGIN slReader: HeapDefs.ReaderHandle = HeapDefs.HeapStartRead[thisItem.SL]; bodyReader: HeapDefs.ReaderHandle = HeapDefs.HeapStartRead[thisItem.body]; ObjectDirDefs.FreeObject[thisItem.SL]; ObjectDirDefs.FreeObject[thisItem.body]; BEGIN ENABLE UNWIND => { HeapDefs.HeapEndRead[slReader]; HeapDefs.HeapEndRead[bodyReader]}; s: STRING = [18] --377#377@6553665536-- ; slHeader: SLDefs.SLHeader; [, ] ← HeapDefs.HeapReadData[ slReader, [@slHeader, SIZE[SLDefs.SLHeader]]]; WriteString["Created: "L]; ProtocolDefs.AppendTimestamp[s, slHeader.created]; WriteString[s]; IF slHeader.received.net # 0 OR slHeader.received.host # 0 OR slHeader.received.time # slHeader.created.time THEN { WriteString[", received: "L]; s.length ← 0; ProtocolDefs.AppendTimestamp[s, slHeader.received]; WriteString[s]; }; BEGIN server: LONG STRING ← NIL; IF slHeader.server # NIL THEN server ← WITH this: slHeader.server.name SELECT FROM rName => this.value, connect => this.value, netAddr => "[net address]"L, ENDCASE => "[bad name]"L; IF server # NIL THEN { WriteString[", server: "L]; WriteString[server]; }; END; WriteChar[Ascii.CR]; END; HeapDefs.HeapEndRead[slReader]; HeapDefs.HeapEndRead[bodyReader]; END; IF pos = firstPos OR DelTyped[] THEN EXIT; ENDLOOP; END; RestartQueues: PUBLIC PROCEDURE [initHeap: BOOLEAN] = BEGIN SLTitle: ARRAY SLDefs.SLQueue OF STRING = [ express: "SLQueue.Express"L, input: "SLQueue.Input"L, forward: "SLQueue.Forward"L, pending: "SLQueue.Pending"L, mailbox: "SLQueue.Mailbox"L]; SLName: ARRAY SLDefs.SLQueue OF STRING = [ express: "Express queue"L, input: "Input queue"L, forward: "Forward queue"L, pending: "Pending queue"L, mailbox: "Mailbox queue"L]; FOR index: SLDefs.SLQueue IN SLDefs.SLQueue DO handle: VMDefs.FileHandle = VMDefs.OpenFile[ options: oldOrNew, name: SLTitle[index], cacheFraction: 2]; firstPos: SLDefs.ItemAddress = [ page: [file: handle, page: 0], item: FIRST[SLDefs.ItemIndex]]; newFile: BOOLEAN; data[index].size ← VMDefs.GetFileLength[handle].page; newFile ← data[index].size = 0; IF newFile THEN { data[index].size ← 1 --cause extension to 1 pages-- ; LogDefs.WriteLogEntry["New SL-queue file created"L]}; IF initHeap OR newFile THEN -- initialise with empty queue -- BEGIN p: VMDefs.PageNumber; FOR p IN [FIRST[VMDefs.PageNumber]..data[index].size) DO IF page # NIL THEN VMDefs.Release[LOOPHOLE[page, VMDefs.Page]]; page ← LOOPHOLE[VMDefs.UsePage[dataPos ← [handle, p]], SLQPage]; FOR index: SLDefs.ItemIndex IN SLDefs.ItemIndex DO page[index].state ← free ENDLOOP; ReleaseData[dirty]; ENDLOOP; END; -- then Process.InitializeCondition[@(data[index].itemWritten), 1]; Process.DisableTimeout[@(data[index].itemWritten)]; BEGIN -- Adjust object reference counts -- pos: SLDefs.ItemAddress ← firstPos; count[index] ← 0; -- count of SL's in this queue -- DO ptr: LONG POINTER TO SLDefs.Item = GetItem[pos]; state: DataState ← clean; IF ptr.state = reading THEN {ptr.state ← full; state ← dirty}; IF ptr.state = full THEN BEGIN ObjectDirDefs.RestartObject[ptr.body]; ObjectDirDefs.RestartObject[ptr.SL]; count[index] ← count[index] + 1; END; ReleaseData[state]; NextItem[index, @pos]; IF pos = firstPos THEN EXIT; ENDLOOP; END; -- find reader position: skip optional full, then all free items -- data[index].rPos ← firstPos; DO ptr: LONG POINTER TO SLDefs.Item = GetItem[data[index].rPos]; IF ptr.state = free THEN {ReleaseData[clean]; EXIT}; ReleaseData[clean]; NextItem[index, @(data[index].rPos)]; IF data[index].rPos = firstPos THEN EXIT --all items full-- ; ENDLOOP; BEGIN started: SLDefs.ItemAddress = data[index].rPos; DO ptr: LONG POINTER TO SLDefs.Item = GetItem[data[index].rPos]; IF ptr.state # free THEN {ReleaseData[clean]; EXIT}; ReleaseData[clean]; NextItem[index, @data[index].rPos]; IF data[index].rPos = started THEN EXIT --all items free-- ; ENDLOOP; END; -- find writer position: after all full items -- data[index].wPos ← data[index].rPos; DO ptr: LONG POINTER TO SLDefs.Item ← GetItem[data[index].wPos]; IF ptr.state = free THEN {ReleaseData[clean]; EXIT}; ReleaseData[clean]; NextItem[index, @data[index].wPos]; IF data[index].wPos = data[index].rPos THEN EXIT; ENDLOOP; LogDefs.DisplayNumber[SLName[index], [short[@count[index]]]]; ENDLOOP; LogDefs.DisplayNumber["Messages received"L, [long[@received]]]; LogDefs.DisplayNumber["Messages forwarded"L, [long[@forwarded]]]; LogDefs.DisplayNumber["Messages re-processed"L, [long[@reDone]]]; END; END. LOG: 22-Oct-84 15:38:48 - blh: changed DisplayQueues to print out the server name not handle.