-- File: StreamCoupleImpl.mesa - last edit: -- AOF 8-Dec-87 9:35:05 -- Copyright (C) 1983, 1984, 1987 by Xerox Corporation. All rights reserved. DIRECTORY ByteBlt USING [ByteBlt], Heap USING [systemZone], Process USING [EnableAborts], Stream USING [ Attention, Block, Byte, CompletionCode, defaultObject, EndOfStream, Handle, InputOptions, LongBlock, Object, ShortBlock, SSTChange, SubSequenceType], StreamCouple USING []; StreamCoupleImpl: MONITOR IMPORTS ByteBlt, Heap, Process, Stream EXPORTS StreamCouple = BEGIN --types HalfStreamPair: TYPE = LONG POINTER TO HSPObject; HSPObject: TYPE = MACHINE DEPENDENT RECORD [ stream: Stream.Object, data: DataRecord]; DataRecord: TYPE = RECORD [ otherStream: Stream.Handle ← NIL, --the other half of the stream pair sstReceived: CONDITION ← [timeout: 100], --TRUE if sst change rcvd by other sstChange: BOOLEAN ← FALSE, currentSST: Stream.SubSequenceType ← 1, attn: CONDITION ← [timeout: 100], -- attention flag attnQueue: AttnQueue ← [NIL, NIL], -- queue for attn bytes attnStatus: AttnStatus ← none, attnAnnounced: BOOLEAN ← FALSE, waitForPut: CONDITION ← [timeout: 100], endRecord: BOOLEAN ← FALSE, blockCondition: CONDITION ← [timeout: 100], -- change in block status blockStatus: {available, inUse, emptied} ← available, -- status of block block: Stream.Block]; -- block for put operation goes here AttnQueue: TYPE = RECORD [ first: AttnList, -- head of the queue last: AttnList]; -- tail of the queue AttnList: TYPE = LONG POINTER TO AttnRec; AttnRec: TYPE = RECORD [ next: AttnList, -- next attn byte in list byte: Stream.Byte]; -- attn byte AttnStatus: TYPE = {none, awaited, arrived, deliveredOutOfBand, delivered}; -- constants blockNil: Stream.Block = [NIL, 0, 0]; -- public procedures Create: PUBLIC ENTRY PROCEDURE RETURNS [sH1, sH2: Stream.Handle] = BEGIN ENABLE UNWIND => NULL; hsp1: HalfStreamPair ← Heap.systemZone.NEW[HSPObject]; hsp2: HalfStreamPair ← Heap.systemZone.NEW[HSPObject]; Process.EnableAborts[@hsp1.data.attn]; Process.EnableAborts[@hsp2.data.attn]; Process.EnableAborts[@hsp1.data.blockCondition]; Process.EnableAborts[@hsp2.data.blockCondition]; hsp1.stream ← Stream.defaultObject; hsp1.stream.get ← GetBlock; hsp1.stream.put ← PutBlock; hsp1.stream.setSST ← SetSST; hsp1.stream.sendAttention ← SendAttention; hsp1.stream.waitAttention ← WaitAttention; hsp1.stream.delete ← Delete; hsp2.stream ← hsp1.stream; hsp1.data.otherStream ← @hsp2.stream; hsp2.data.otherStream ← @hsp1.stream; RETURN[hsp2.data.otherStream, hsp1.data.otherStream]; END; -- Create -- required stream procedures Delete: ENTRY PROCEDURE [sH: Stream.Handle] = BEGIN ENABLE UNWIND => NULL; hsp1, hsp2: HalfStreamPair; attn, nextAttn: AttnList; [hsp1, hsp2] ← ConvertHandle[sH]; IF hsp1 # NIL THEN { FOR attn ← hsp1.data.attnQueue.first, nextAttn WHILE attn # NIL DO nextAttn ← attn.next; Heap.systemZone.FREE[@attn] ENDLOOP; Heap.systemZone.FREE[@hsp1]}; IF hsp2 # NIL THEN hsp2.data.otherStream ← NIL; END; -- Delete GetBlock: ENTRY PROCEDURE [ sH: Stream.Handle, block: Stream.Block, options: Stream.InputOptions] RETURNS [ bytesTransferred: CARDINAL, why: Stream.CompletionCode, sst: Stream.SubSequenceType] = BEGIN ENABLE UNWIND => NULL; blockBytes, inputBytes, bytesMoved: CARDINAL ← 0; done: BOOLEAN ← FALSE; hsp1, hsp2: HalfStreamPair; [hsp1, hsp2] ← ConvertHandle[sH]; bytesTransferred ← 0; why ← normal; sst ← hsp2.data.currentSST; WHILE NOT done DO UNTIL hsp2.data.blockStatus = inUse OR hsp2.data.sstChange OR hsp2.data.endRecord OR hsp2.data.attnStatus > awaited DO WAIT hsp1.data.waitForPut ENDLOOP; sst ← hsp2.data.currentSST; IF hsp2.data.sstChange THEN BEGIN hsp2.data.sstChange ← FALSE; BROADCAST hsp1.data.sstReceived; IF options.signalSSTChange THEN SIGNAL Stream.SSTChange[sst, block.startIndex]; why ← sstChange; done ← TRUE; END; IF hsp2.data.attnStatus > awaited THEN BEGIN oldStatus: AttnStatus ← hsp2.data.attnStatus; hsp2.data.attnStatus ← delivered; IF oldStatus = arrived THEN [] ← WaitForAttention[sH]; hsp2.data.blockStatus ← emptied; BROADCAST hsp2.data.blockCondition; why ← attention; done ← TRUE END; IF NOT done THEN BEGIN blockBytes ← block.stopIndexPlusOne - block.startIndex; inputBytes ← hsp2.data.block.stopIndexPlusOne - hsp2.data.block.startIndex; IF inputBytes > 0 THEN BEGIN bytesMoved ← ByteBlt.ByteBlt[to: block, from: hsp2.data.block]; block.startIndex ← block.startIndex + bytesMoved; hsp2.data.block.startIndex ← hsp2.data.block.startIndex + bytesMoved; bytesTransferred ← bytesTransferred + bytesMoved END; IF bytesMoved = blockBytes THEN {why ← normal; done ← TRUE}; IF hsp2.data.endRecord THEN BEGIN why ← endOfStream; done ← TRUE; END; IF bytesMoved = inputBytes OR hsp2.data.endRecord THEN BEGIN hsp2.data.blockStatus ← emptied; BROADCAST hsp2.data.blockCondition; END; IF bytesMoved < blockBytes AND options.signalLongBlock THEN SIGNAL Stream.LongBlock[block.startIndex]; IF bytesMoved < inputBytes AND options.signalShortBlock THEN ERROR Stream.ShortBlock; END; ENDLOOP; IF why = endOfStream AND options.signalEndOfStream THEN SIGNAL Stream.EndOfStream[bytesTransferred] END; -- GetBLock PutBlock: ENTRY PROCEDURE [ sH: Stream.Handle, block: Stream.Block, endRecord: BOOLEAN ← FALSE] = BEGIN ENABLE UNWIND => NULL; hsp1, hsp2: HalfStreamPair; -- IF block=blockNil THEN RETURN; -- -- stop gap solution to nullify SendNow function [hsp1, hsp2] ← ConvertHandle[sH]; InternalPutBlock[ hsp1, hsp2, block, endRecord ! Stream.Attention => GOTO ReturnAttention] EXITS ReturnAttention => RETURN WITH ERROR Stream.Attention[0] END; -- PutBlock InternalPutBlock: INTERNAL PROCEDURE [ hsp1, hsp2: HalfStreamPair, block: Stream.Block, endRecord: BOOLEAN ← FALSE] = BEGIN attentionReceived: BOOLEAN ← FALSE; UNTIL hsp1.data.blockStatus = available DO WAIT hsp1.data.blockCondition ENDLOOP; hsp1.data.block ← block; hsp1.data.blockStatus ← inUse; IF endRecord THEN hsp1.data.endRecord ← TRUE; BROADCAST hsp2.data.waitForPut; UNTIL hsp1.data.blockStatus = emptied OR hsp2.data.attnStatus > awaited DO WAIT hsp1.data.blockCondition ENDLOOP; IF hsp2.data.attnStatus > awaited AND NOT hsp2.data.attnAnnounced THEN BEGIN hsp2.data.attnAnnounced ← TRUE; attentionReceived ← TRUE; END; hsp1.data.blockStatus ← available; BROADCAST hsp2.data.blockCondition; IF attentionReceived THEN SIGNAL Stream.Attention[0]; END; -- InternalPutBlock SetSST: ENTRY PROCEDURE [sH: Stream.Handle, sst: Stream.SubSequenceType] = BEGIN ENABLE UNWIND => NULL; hsp1, hsp2: HalfStreamPair; [hsp1, hsp2] ← ConvertHandle[sH]; WHILE hsp1.data.sstChange DO WAIT hsp2.data.sstReceived ENDLOOP; IF sst # hsp1.data.currentSST THEN BEGIN hsp1.data.currentSST ← sst; hsp1.data.sstChange ← TRUE; BROADCAST hsp2.data.waitForPut; END; END; -- SetSST SendAttention: ENTRY PROCEDURE [sH: Stream.Handle, byte: Stream.Byte] = BEGIN ENABLE UNWIND => NULL; attnList: AttnList ← Heap.systemZone.NEW[AttnRec]; bytes: PACKED ARRAY [0..2) OF Stream.Byte ← [byte, 0]; hsp1, hsp2: HalfStreamPair; [hsp1, hsp2] ← ConvertHandle[sH]; attnList.next ← NIL; attnList.byte ← byte; IF hsp1.data.attnQueue.first = NIL THEN hsp1.data.attnQueue.first ← hsp1.data.attnQueue.last ← attnList ELSE { hsp1.data.attnQueue.last.next ← attnList; hsp1.data.attnQueue.last ← attnList}; hsp1.data.attnStatus ← arrived; BROADCAST hsp2.data.attn; InternalPutBlock[hsp1, hsp2, [@bytes, 0, 1] ! Stream.Attention => CONTINUE]; END; -- SendAttention WaitAttention: ENTRY PROCEDURE [sH: Stream.Handle] RETURNS [byte: Stream.Byte] = BEGIN ENABLE UNWIND => NULL; hsp1, hsp2: HalfStreamPair; [hsp1, hsp2] ← ConvertHandle[sH]; SELECT hsp2.data.attnStatus FROM none => hsp2.data.attnStatus ← awaited; arrived => NULL; ENDCASE => RETURN[0]; byte ← WaitForAttention[sH] END; -- WaitAttention WaitForAttention: INTERNAL PROCEDURE [sH: Stream.Handle] RETURNS [byte: Stream.Byte] = BEGIN attnList: AttnList; hsp1, hsp2: HalfStreamPair; [hsp1, hsp2] ← ConvertHandle[sH]; WHILE hsp2.data.attnQueue.first = NIL DO WAIT hsp1.data.attn ENDLOOP; byte ← hsp2.data.attnQueue.first.byte; IF hsp2.data.attnQueue.first.next = NIL THEN { Heap.systemZone.FREE[@hsp2.data.attnQueue.first]; hsp2.data.attnQueue ← [NIL, NIL]} ELSE BEGIN attnList ← hsp2.data.attnQueue.first.next; Heap.systemZone.FREE[@hsp2.data.attnQueue.first]; hsp2.data.attnQueue.first ← attnList; END; IF hsp2.data.attnStatus <= arrived THEN hsp2.data.attnStatus ← deliveredOutOfBand; BROADCAST hsp1.data.blockCondition -- notify a Put that is in progress. END; -- WaitForAttention -- private procedures ConvertHandle: PROCEDURE [sH: Stream.Handle] RETURNS [hsp1, hsp2: HalfStreamPair] = BEGIN hsp1 ← LOOPHOLE[sH, HalfStreamPair]; IF hsp1 = NIL THEN ERROR; IF hsp1.data.otherStream = NIL THEN hsp2 ← NIL ELSE hsp2 ← LOOPHOLE[hsp1.data.otherStream, HalfStreamPair]; END; -- ConvertHandle END... LOG 20-Dec-84 16:23:19 AOF Post Klamath 8-Dec-87 9:34:52 AOF No changes - just changing the time stamp