-- File: StreamCoupleImpl.mesa - last edit: -- AOF 8-Dec-87 9:35:05 -- Copyright (C) 1983, 1984, 1987 by Xerox Corporation. All rights reserved. DIRECTORY ByteBlt USING [ByteBlt], Heap USING [systemZone], Process USING [EnableAborts], Stream USING [ Attention, Block, Byte, CompletionCode, defaultObject, EndOfStream, Handle, InputOptions, LongBlock, Object, ShortBlock, SSTChange, SubSequenceType], StreamCouple USING []; StreamCoupleImpl: MONITOR IMPORTS ByteBlt, Heap, Process, Stream EXPORTS StreamCouple = BEGIN --types HalfStreamPair: TYPE = LONG POINTER TO HSPObject; HSPObject: TYPE = MACHINE DEPENDENT RECORD [ stream: Stream.Object, data: DataRecord]; DataRecord: TYPE = RECORD [ otherStream: Stream.Handle ¬ NIL, --the other half of the stream pair sstReceived: CONDITION ¬ [timeout: 100], --TRUE if sst change rcvd by other sstChange: BOOLEAN ¬ FALSE, currentSST: Stream.SubSequenceType ¬ 1, attn: CONDITION ¬ [timeout: 100], -- attention flag attnQueue: AttnQueue ¬ [NIL, NIL], -- queue for attn bytes attnStatus: AttnStatus ¬ none, attnAnnounced: BOOLEAN ¬ FALSE, waitForPut: CONDITION ¬ [timeout: 100], endRecord: BOOLEAN ¬ FALSE, blockCondition: CONDITION ¬ [timeout: 100], -- change in block status blockStatus: {available, inUse, emptied} ¬ available, -- status of block block: Stream.Block]; -- block for put operation goes here AttnQueue: TYPE = RECORD [ first: AttnList, -- head of the queue last: AttnList]; -- tail of the queue AttnList: TYPE = LONG POINTER TO AttnRec; AttnRec: TYPE = RECORD [ next: AttnList, -- next attn byte in list byte: Stream.Byte]; -- attn byte AttnStatus: TYPE = {none, awaited, arrived, deliveredOutOfBand, delivered}; -- constants blockNil: Stream.Block = [NIL, 0, 0]; -- public procedures Create: PUBLIC ENTRY PROCEDURE RETURNS [sH1, sH2: Stream.Handle] = BEGIN ENABLE UNWIND => NULL; hsp1: HalfStreamPair ¬ Heap.systemZone.NEW[HSPObject]; hsp2: HalfStreamPair ¬ Heap.systemZone.NEW[HSPObject]; Process.EnableAborts[@hsp1.data.attn]; Process.EnableAborts[@hsp2.data.attn]; Process.EnableAborts[@hsp1.data.blockCondition]; Process.EnableAborts[@hsp2.data.blockCondition]; hsp1.stream ¬ Stream.defaultObject; hsp1.stream.get ¬ GetBlock; hsp1.stream.put ¬ PutBlock; hsp1.stream.setSST ¬ SetSST; hsp1.stream.sendAttention ¬ SendAttention; hsp1.stream.waitAttention ¬ WaitAttention; hsp1.stream.delete ¬ Delete; hsp2.stream ¬ hsp1.stream; hsp1.data.otherStream ¬ @hsp2.stream; hsp2.data.otherStream ¬ @hsp1.stream; RETURN[hsp2.data.otherStream, hsp1.data.otherStream]; END; -- Create -- required stream procedures Delete: ENTRY PROCEDURE [sH: Stream.Handle] = BEGIN ENABLE UNWIND => NULL; hsp1, hsp2: HalfStreamPair; attn, nextAttn: AttnList; [hsp1, hsp2] ¬ ConvertHandle[sH]; IF hsp1 # NIL THEN { FOR attn ¬ hsp1.data.attnQueue.first, nextAttn WHILE attn # NIL DO nextAttn ¬ attn.next; Heap.systemZone.FREE[@attn] ENDLOOP; Heap.systemZone.FREE[@hsp1]}; IF hsp2 # NIL THEN hsp2.data.otherStream ¬ NIL; END; -- Delete GetBlock: ENTRY PROCEDURE [ sH: Stream.Handle, block: Stream.Block, options: Stream.InputOptions] RETURNS [ bytesTransferred: CARDINAL, why: Stream.CompletionCode, sst: Stream.SubSequenceType] = BEGIN ENABLE UNWIND => NULL; blockBytes, inputBytes, bytesMoved: CARDINAL ¬ 0; done: BOOLEAN ¬ FALSE; hsp1, hsp2: HalfStreamPair; [hsp1, hsp2] ¬ ConvertHandle[sH]; bytesTransferred ¬ 0; why ¬ normal; sst ¬ hsp2.data.currentSST; WHILE NOT done DO UNTIL hsp2.data.blockStatus = inUse OR hsp2.data.sstChange OR hsp2.data.endRecord OR hsp2.data.attnStatus > awaited DO WAIT hsp1.data.waitForPut ENDLOOP; sst ¬ hsp2.data.currentSST; IF hsp2.data.sstChange THEN BEGIN hsp2.data.sstChange ¬ FALSE; BROADCAST hsp1.data.sstReceived; IF options.signalSSTChange THEN SIGNAL Stream.SSTChange[sst, block.startIndex]; why ¬ sstChange; done ¬ TRUE; END; IF hsp2.data.attnStatus > awaited THEN BEGIN oldStatus: AttnStatus ¬ hsp2.data.attnStatus; hsp2.data.attnStatus ¬ delivered; IF oldStatus = arrived THEN [] ¬ WaitForAttention[sH]; hsp2.data.blockStatus ¬ emptied; BROADCAST hsp2.data.blockCondition; why ¬ attention; done ¬ TRUE END; IF NOT done THEN BEGIN blockBytes ¬ block.stopIndexPlusOne - block.startIndex; inputBytes ¬ hsp2.data.block.stopIndexPlusOne - hsp2.data.block.startIndex; IF inputBytes > 0 THEN BEGIN bytesMoved ¬ ByteBlt.ByteBlt[to: block, from: hsp2.data.block]; block.startIndex ¬ block.startIndex + bytesMoved; hsp2.data.block.startIndex ¬ hsp2.data.block.startIndex + bytesMoved; bytesTransferred ¬ bytesTransferred + bytesMoved END; IF bytesMoved = blockBytes THEN {why ¬ normal; done ¬ TRUE}; IF hsp2.data.endRecord THEN BEGIN why ¬ endOfStream; done ¬ TRUE; END; IF bytesMoved = inputBytes OR hsp2.data.endRecord THEN BEGIN hsp2.data.blockStatus ¬ emptied; BROADCAST hsp2.data.blockCondition; END; IF bytesMoved < blockBytes AND options.signalLongBlock THEN SIGNAL Stream.LongBlock[block.startIndex]; IF bytesMoved < inputBytes AND options.signalShortBlock THEN ERROR Stream.ShortBlock; END; ENDLOOP; IF why = endOfStream AND options.signalEndOfStream THEN SIGNAL Stream.EndOfStream[bytesTransferred] END; -- GetBLock PutBlock: ENTRY PROCEDURE [ sH: Stream.Handle, block: Stream.Block, endRecord: BOOLEAN ¬ FALSE] = BEGIN ENABLE UNWIND => NULL; hsp1, hsp2: HalfStreamPair; -- IF block=blockNil THEN RETURN; -- -- stop gap solution to nullify SendNow function [hsp1, hsp2] ¬ ConvertHandle[sH]; InternalPutBlock[ hsp1, hsp2, block, endRecord ! Stream.Attention => GOTO ReturnAttention] EXITS ReturnAttention => RETURN WITH ERROR Stream.Attention[0] END; -- PutBlock InternalPutBlock: INTERNAL PROCEDURE [ hsp1, hsp2: HalfStreamPair, block: Stream.Block, endRecord: BOOLEAN ¬ FALSE] = BEGIN attentionReceived: BOOLEAN ¬ FALSE; UNTIL hsp1.data.blockStatus = available DO WAIT hsp1.data.blockCondition ENDLOOP; hsp1.data.block ¬ block; hsp1.data.blockStatus ¬ inUse; IF endRecord THEN hsp1.data.endRecord ¬ TRUE; BROADCAST hsp2.data.waitForPut; UNTIL hsp1.data.blockStatus = emptied OR hsp2.data.attnStatus > awaited DO WAIT hsp1.data.blockCondition ENDLOOP; IF hsp2.data.attnStatus > awaited AND NOT hsp2.data.attnAnnounced THEN BEGIN hsp2.data.attnAnnounced ¬ TRUE; attentionReceived ¬ TRUE; END; hsp1.data.blockStatus ¬ available; BROADCAST hsp2.data.blockCondition; IF attentionReceived THEN SIGNAL Stream.Attention[0]; END; -- InternalPutBlock SetSST: ENTRY PROCEDURE [sH: Stream.Handle, sst: Stream.SubSequenceType] = BEGIN ENABLE UNWIND => NULL; hsp1, hsp2: HalfStreamPair; [hsp1, hsp2] ¬ ConvertHandle[sH]; WHILE hsp1.data.sstChange DO WAIT hsp2.data.sstReceived ENDLOOP; IF sst # hsp1.data.currentSST THEN BEGIN hsp1.data.currentSST ¬ sst; hsp1.data.sstChange ¬ TRUE; BROADCAST hsp2.data.waitForPut; END; END; -- SetSST SendAttention: ENTRY PROCEDURE [sH: Stream.Handle, byte: Stream.Byte] = BEGIN ENABLE UNWIND => NULL; attnList: AttnList ¬ Heap.systemZone.NEW[AttnRec]; bytes: PACKED ARRAY [0..2) OF Stream.Byte ¬ [byte, 0]; hsp1, hsp2: HalfStreamPair; [hsp1, hsp2] ¬ ConvertHandle[sH]; attnList.next ¬ NIL; attnList.byte ¬ byte; IF hsp1.data.attnQueue.first = NIL THEN hsp1.data.attnQueue.first ¬ hsp1.data.attnQueue.last ¬ attnList ELSE { hsp1.data.attnQueue.last.next ¬ attnList; hsp1.data.attnQueue.last ¬ attnList}; hsp1.data.attnStatus ¬ arrived; BROADCAST hsp2.data.attn; InternalPutBlock[hsp1, hsp2, [@bytes, 0, 1] ! Stream.Attention => CONTINUE]; END; -- SendAttention WaitAttention: ENTRY PROCEDURE [sH: Stream.Handle] RETURNS [byte: Stream.Byte] = BEGIN ENABLE UNWIND => NULL; hsp1, hsp2: HalfStreamPair; [hsp1, hsp2] ¬ ConvertHandle[sH]; SELECT hsp2.data.attnStatus FROM none => hsp2.data.attnStatus ¬ awaited; arrived => NULL; ENDCASE => RETURN[0]; byte ¬ WaitForAttention[sH] END; -- WaitAttention WaitForAttention: INTERNAL PROCEDURE [sH: Stream.Handle] RETURNS [byte: Stream.Byte] = BEGIN attnList: AttnList; hsp1, hsp2: HalfStreamPair; [hsp1, hsp2] ¬ ConvertHandle[sH]; WHILE hsp2.data.attnQueue.first = NIL DO WAIT hsp1.data.attn ENDLOOP; byte ¬ hsp2.data.attnQueue.first.byte; IF hsp2.data.attnQueue.first.next = NIL THEN { Heap.systemZone.FREE[@hsp2.data.attnQueue.first]; hsp2.data.attnQueue ¬ [NIL, NIL]} ELSE BEGIN attnList ¬ hsp2.data.attnQueue.first.next; Heap.systemZone.FREE[@hsp2.data.attnQueue.first]; hsp2.data.attnQueue.first ¬ attnList; END; IF hsp2.data.attnStatus <= arrived THEN hsp2.data.attnStatus ¬ deliveredOutOfBand; BROADCAST hsp1.data.blockCondition -- notify a Put that is in progress. END; -- WaitForAttention -- private procedures ConvertHandle: PROCEDURE [sH: Stream.Handle] RETURNS [hsp1, hsp2: HalfStreamPair] = BEGIN hsp1 ¬ LOOPHOLE[sH, HalfStreamPair]; IF hsp1 = NIL THEN ERROR; IF hsp1.data.otherStream = NIL THEN hsp2 ¬ NIL ELSE hsp2 ¬ LOOPHOLE[hsp1.data.otherStream, HalfStreamPair]; END; -- ConvertHandle END... LOG 20-Dec-84 16:23:19 AOF Post Klamath 8-Dec-87 9:34:52 AOF No changes - just changing the time stamp