-- Transport Mechanism Mail Server - steering list queues -- -- [Indigo]<Grapevine>MS>SLQueue.mesa -- -- Andrew Birrell September 13, 1982 3:41 pm -- DIRECTORY Ascii USING[ CR], EnquiryDefs USING[ ], GlassDefs USING[ Handle ], HeapDefs USING[ HeapEndRead, HeapEndWrite, HeapReadData, HeapStartRead, ReaderHandle, WriterHandle ], LogDefs USING[ WriteChar ], ObjectDirDefs USING[ FreeObject, ObjectNumber, UseObject ], ProtocolDefs USING[ AppendTimestamp ], SLDefs USING[ Item, ItemAddress, ItemIndex, SLHeader, SLQueue, SLReadHandle ], VMDefs USING[ MarkStartWait, Page, PageAddress, PageNumber, ReadPage, Release, UsePage ]; SLQueueImpl: MONITOR IMPORTS HeapDefs, LogDefs, ObjectDirDefs, ProtocolDefs, VMDefs EXPORTS EnquiryDefs, SLDefs = BEGIN data: ARRAY SLDefs.SLQueue OF RECORD[ rPos: SLDefs.ItemAddress, wPos: SLDefs.ItemAddress, size: VMDefs.PageNumber, itemWritten: CONDITION ]; count: ARRAY SLDefs.SLQueue OF CARDINAL ← ALL[0]; received: CARDINAL ← 0; reDone: CARDINAL ← 0; SLQPage: TYPE = POINTER TO ARRAY SLDefs.ItemIndex OF SLDefs.Item; dataPos: VMDefs.PageAddress; page: SLQPage ← NIL; NextItem: PROCEDURE[ queue: SLDefs.SLQueue, pos: POINTER TO SLDefs.ItemAddress ] = BEGIN IF pos.item = LAST[SLDefs.ItemIndex] THEN BEGIN pos.item ← FIRST[SLDefs.ItemIndex]; IF pos.page.page+1 = data[queue].size THEN pos.page.page ← FIRST[VMDefs.PageNumber] ELSE pos.page.page ← pos.page.page+1; END ELSE pos.item ← pos.item + 1; END; GetItem: PROCEDURE[ pos: SLDefs.ItemAddress ] RETURNS[ POINTER TO SLDefs.Item ] = BEGIN IF page = NIL OR pos.page # dataPos THEN BEGIN IF page # NIL THEN VMDefs.Release[LOOPHOLE[page,VMDefs.Page]]; dataPos ← pos.page; page ← LOOPHOLE[VMDefs.ReadPage[ pos.page, 0--lookAhead-- ], SLQPage]; END; RETURN[ @(page[pos.item]) ] END; DataState: TYPE = {clean,dirty}; ReleaseData: PROCEDURE[ state: DataState ] = BEGIN IF state = dirty THEN VMDefs.MarkStartWait[LOOPHOLE[page,VMDefs.Page]]; END; SLWrite: PUBLIC ENTRY PROCEDURE[ body: ObjectDirDefs.ObjectNumber, SL: HeapDefs.WriterHandle, queue: SLDefs.SLQueue ] = BEGIN Action: INTERNAL PROCEDURE[ obj: ObjectDirDefs.ObjectNumber ] = BEGIN SubWrite[body, obj, queue]; END; HeapDefs.HeapEndWrite[SL, Action ]; IF queue = input OR queue = express THEN received ← received + 1; END; SubWrite: INTERNAL PROCEDURE[ body: ObjectDirDefs.ObjectNumber, SL: ObjectDirDefs.ObjectNumber, queue: SLDefs.SLQueue ] = BEGIN wPos: POINTER TO SLDefs.ItemAddress = @(data[queue].wPos); started: SLDefs.ItemAddress = wPos↑; ptr: POINTER TO SLDefs.Item; DO ptr ← GetItem[wPos↑]; NextItem[queue, wPos]; IF ptr.state = free THEN EXIT; -- skip items still being read -- ReleaseData[clean]; IF wPos↑ = started THEN --extend-- BEGIN newPage: SLQPage; wPos↑.item ← FIRST[SLDefs.ItemIndex]; wPos↑.page.page ← data[queue].size; newPage ← LOOPHOLE[VMDefs.UsePage[wPos↑.page],SLQPage]; FOR index: SLDefs.ItemIndex IN SLDefs.ItemIndex DO newPage[index].state ← free ENDLOOP; VMDefs.MarkStartWait[LOOPHOLE[newPage,VMDefs.Page]]; VMDefs.Release[LOOPHOLE[newPage,VMDefs.Page]]; data[queue].size ← data[queue].size + 1; END; ENDLOOP; ptr.body ← body; ptr.SL ← SL; ptr.state ← full; ObjectDirDefs.UseObject[body]; ObjectDirDefs.UseObject[SL]; ReleaseData[dirty]; NOTIFY data[queue].itemWritten; count[queue] ← count[queue] + 1; END; WaitForNonEmpty: PUBLIC ENTRY PROC[ queue: SLDefs.SLQueue ] = { InnerWaitNonEmpty[queue] }; InnerWaitNonEmpty: INTERNAL PROC[ queue: SLDefs.SLQueue ] = BEGIN WHILE count[queue] = 0 DO WAIT data[queue].itemWritten ENDLOOP; END; GetCount: PUBLIC ENTRY PROC[ queue: SLDefs.SLQueue ] RETURNS[CARDINAL] = { RETURN[count[queue]] }; SLStartRead: PUBLIC ENTRY PROCEDURE[ queue: SLDefs.SLQueue ] RETURNS[ handle: SLDefs.SLReadHandle, body: ObjectDirDefs.ObjectNumber, SL: HeapDefs.ReaderHandle ] = BEGIN rPos: POINTER TO SLDefs.ItemAddress = @(data[queue].rPos); ptr: POINTER TO SLDefs.Item; InnerWaitNonEmpty[queue]; DO ptr ← GetItem[rPos↑]; IF ptr.state = full THEN EXIT ELSE { ReleaseData[clean]; NextItem[queue, rPos] }; --skip items which are free or are still being read -- ENDLOOP; handle ← rPos↑; NextItem[queue, rPos]; body ← ptr.body; SL ← HeapDefs.HeapStartRead[ptr.SL]; ptr.state ← reading; ReleaseData[dirty]; count[queue] ← count[queue] - 1; LogDefs.WriteChar[SELECT queue FROM express => 'e, input => 'i, pending => 'p, forward => 'f, foreign => 'x, mailbox => 'm, ENDCASE => ERROR]; END; SubTransfer: ENTRY PROCEDURE[ handle: SLDefs.SLReadHandle, queue: SLDefs.SLQueue ] = BEGIN ptr: POINTER TO SLDefs.Item = GetItem[handle]; body: ObjectDirDefs.ObjectNumber = ptr.body; SL: ObjectDirDefs.ObjectNumber = ptr.SL; ReleaseData[clean]; SubWrite[body, SL, queue]; IF queue = input THEN reDone ← reDone + 1; END; SLTransfer: PUBLIC PROCEDURE[ handle: SLDefs.SLReadHandle, queue: SLDefs.SLQueue ] = BEGIN SubTransfer[handle, queue]; SLEndRead[handle]; END; SLEndRead: PUBLIC ENTRY PROCEDURE[ handle: SLDefs.SLReadHandle ] = BEGIN ptr: POINTER TO SLDefs.Item = GetItem[handle]; body: ObjectDirDefs.ObjectNumber = ptr.body; SL: ObjectDirDefs.ObjectNumber = ptr.SL; -- beware of the order of events! -- ptr.state ← free; ReleaseData[dirty]; ObjectDirDefs.FreeObject[body]; ObjectDirDefs.FreeObject[SL]; END; SLQueueCount: PUBLIC PROC[str: GlassDefs.Handle] = BEGIN OPEN str; WriteChar[Ascii.CR]; FOR queue: SLDefs.SLQueue IN SLDefs.SLQueue DO WriteString[SELECT queue FROM express => "Express"L, input => "Input"L, pending => "Pending"L, forward => "Forward"L, foreign => "Foreign"L, mailbox => "Mailbox"L, ENDCASE => "Unknown"L]; WriteString[" queue: length="L]; DisplayQueue[str, queue]; ENDLOOP; END; DisplayQueue: PROC[str: GlassDefs.Handle, queue: SLDefs.SLQueue] = BEGIN OPEN str; GetRPos: ENTRY PROC[queue: SLDefs.SLQueue] RETURNS[SLDefs.ItemAddress] = { RETURN[data[queue].rPos] }; GetThisItem: ENTRY PROC[queue: SLDefs.SLQueue, pos: POINTER TO SLDefs.ItemAddress] RETURNS[item:SLDefs.Item] = BEGIN item ← GetItem[pos↑]↑; ReleaseData[clean]; NextItem[queue, pos]; IF item.state # free THEN BEGIN ObjectDirDefs.UseObject[item.SL]; ObjectDirDefs.UseObject[item.body]; END; END; firstPos: SLDefs.ItemAddress = GetRPos[queue]; pos: SLDefs.ItemAddress ← firstPos; WriteDecimal[count[queue]]; WriteChar[Ascii.CR]; DO thisItem: SLDefs.Item = GetThisItem[queue, @pos]; IF thisItem.state # free THEN BEGIN slReader: HeapDefs.ReaderHandle = HeapDefs.HeapStartRead[thisItem.SL]; bodyReader: HeapDefs.ReaderHandle = HeapDefs.HeapStartRead[thisItem.body]; ObjectDirDefs.FreeObject[thisItem.SL]; ObjectDirDefs.FreeObject[thisItem.body]; BEGIN ENABLE UNWIND => BEGIN HeapDefs.HeapEndRead[slReader]; HeapDefs.HeapEndRead[bodyReader]; END; s: STRING = [18] --377#377@6553665536--; slHeader: SLDefs.SLHeader; [,] ← HeapDefs.HeapReadData[slReader, [@slHeader,SIZE[SLDefs.SLHeader]]]; WriteString["Created: "L]; ProtocolDefs.AppendTimestamp[s, slHeader.created]; WriteString[s]; WriteString[", received: "L]; s.length←0; ProtocolDefs.AppendTimestamp[s, slHeader.received]; WriteString[s]; WriteString[", server: "L]; WriteDecimal[LOOPHOLE[slHeader.server]]; WriteChar[Ascii.CR]; END; HeapDefs.HeapEndRead[slReader]; HeapDefs.HeapEndRead[bodyReader]; END; IF pos = firstPos OR DelTyped[] THEN EXIT; ENDLOOP; END; END.