-- File: StreamCoupleImpl.mesa - last edit:
-- AOF 8-Dec-87 9:35:05
-- Copyright (C) 1983, 1984, 1987 by Xerox Corporation. All rights reserved.
DIRECTORY
ByteBlt USING [ByteBlt],
Heap USING [systemZone],
Process USING [EnableAborts],
Stream USING [
Attention, Block, Byte, CompletionCode, defaultObject, EndOfStream, Handle,
InputOptions, LongBlock, Object, ShortBlock, SSTChange, SubSequenceType],
StreamCouple USING [];
StreamCoupleImpl: MONITOR
IMPORTS ByteBlt, Heap, Process, Stream EXPORTS StreamCouple =
BEGIN
--types
HalfStreamPair: TYPE = LONG POINTER TO HSPObject;
HSPObject: TYPE = MACHINE DEPENDENT RECORD [
stream: Stream.Object, data: DataRecord];
DataRecord: TYPE = RECORD [
otherStream: Stream.Handle ← NIL, --the other half of the stream pair
sstReceived: CONDITION ← [timeout: 100], --TRUE if sst change rcvd by other
sstChange: BOOLEAN ← FALSE,
currentSST: Stream.SubSequenceType ← 1,
attn: CONDITION ← [timeout: 100], -- attention flag
attnQueue: AttnQueue ← [NIL, NIL], -- queue for attn bytes
attnStatus: AttnStatus ← none,
attnAnnounced: BOOLEAN ← FALSE,
waitForPut: CONDITION ← [timeout: 100],
endRecord: BOOLEAN ← FALSE,
blockCondition: CONDITION ← [timeout: 100], -- change in block status
blockStatus: {available, inUse, emptied} ← available, -- status of block
block: Stream.Block]; -- block for put operation goes here
AttnQueue: TYPE = RECORD [
first: AttnList, -- head of the queue
last: AttnList]; -- tail of the queue
AttnList: TYPE = LONG POINTER TO AttnRec;
AttnRec: TYPE = RECORD [
next: AttnList, -- next attn byte in list
byte: Stream.Byte]; -- attn byte
AttnStatus: TYPE = {none, awaited, arrived, deliveredOutOfBand, delivered};
-- constants
blockNil: Stream.Block = [NIL, 0, 0];
-- public procedures
Create: PUBLIC ENTRY PROCEDURE RETURNS [sH1, sH2: Stream.Handle] =
BEGIN
ENABLE UNWIND => NULL;
hsp1: HalfStreamPair ← Heap.systemZone.NEW[HSPObject];
hsp2: HalfStreamPair ← Heap.systemZone.NEW[HSPObject];
Process.EnableAborts[@hsp1.data.attn];
Process.EnableAborts[@hsp2.data.attn];
Process.EnableAborts[@hsp1.data.blockCondition];
Process.EnableAborts[@hsp2.data.blockCondition];
hsp1.stream ← Stream.defaultObject;
hsp1.stream.get ← GetBlock;
hsp1.stream.put ← PutBlock;
hsp1.stream.setSST ← SetSST;
hsp1.stream.sendAttention ← SendAttention;
hsp1.stream.waitAttention ← WaitAttention;
hsp1.stream.delete ← Delete;
hsp2.stream ← hsp1.stream;
hsp1.data.otherStream ← @hsp2.stream;
hsp2.data.otherStream ← @hsp1.stream;
RETURN[hsp2.data.otherStream, hsp1.data.otherStream];
END; -- Create
-- required stream procedures
Delete: ENTRY PROCEDURE [sH: Stream.Handle] =
BEGIN
ENABLE UNWIND => NULL;
hsp1, hsp2: HalfStreamPair;
attn, nextAttn: AttnList;
[hsp1, hsp2] ← ConvertHandle[sH];
IF hsp1 # NIL THEN {
FOR attn ← hsp1.data.attnQueue.first, nextAttn WHILE attn # NIL DO
nextAttn ← attn.next; Heap.systemZone.FREE[@attn] ENDLOOP;
Heap.systemZone.FREE[@hsp1]};
IF hsp2 # NIL THEN hsp2.data.otherStream ← NIL;
END; -- Delete
GetBlock: ENTRY PROCEDURE [
sH: Stream.Handle, block: Stream.Block, options: Stream.InputOptions]
RETURNS [
bytesTransferred: CARDINAL, why: Stream.CompletionCode,
sst: Stream.SubSequenceType] =
BEGIN
ENABLE UNWIND => NULL;
blockBytes, inputBytes, bytesMoved: CARDINAL ← 0;
done: BOOLEAN ← FALSE;
hsp1, hsp2: HalfStreamPair;
[hsp1, hsp2] ← ConvertHandle[sH];
bytesTransferred ← 0;
why ← normal;
sst ← hsp2.data.currentSST;
WHILE NOT done DO
UNTIL hsp2.data.blockStatus = inUse OR hsp2.data.sstChange
OR hsp2.data.endRecord OR hsp2.data.attnStatus > awaited DO
WAIT hsp1.data.waitForPut ENDLOOP;
sst ← hsp2.data.currentSST;
IF hsp2.data.sstChange THEN
BEGIN
hsp2.data.sstChange ← FALSE;
BROADCAST hsp1.data.sstReceived;
IF options.signalSSTChange THEN
SIGNAL Stream.SSTChange[sst, block.startIndex];
why ← sstChange;
done ← TRUE;
END;
IF hsp2.data.attnStatus > awaited THEN
BEGIN
oldStatus: AttnStatus ← hsp2.data.attnStatus;
hsp2.data.attnStatus ← delivered;
IF oldStatus = arrived THEN [] ← WaitForAttention[sH];
hsp2.data.blockStatus ← emptied;
BROADCAST hsp2.data.blockCondition;
why ← attention;
done ← TRUE
END;
IF NOT done THEN
BEGIN
blockBytes ← block.stopIndexPlusOne - block.startIndex;
inputBytes ←
hsp2.data.block.stopIndexPlusOne - hsp2.data.block.startIndex;
IF inputBytes > 0 THEN
BEGIN
bytesMoved ← ByteBlt.ByteBlt[to: block, from: hsp2.data.block];
block.startIndex ← block.startIndex + bytesMoved;
hsp2.data.block.startIndex ← hsp2.data.block.startIndex + bytesMoved;
bytesTransferred ← bytesTransferred + bytesMoved
END;
IF bytesMoved = blockBytes THEN {why ← normal; done ← TRUE};
IF hsp2.data.endRecord THEN BEGIN why ← endOfStream; done ← TRUE; END;
IF bytesMoved = inputBytes OR hsp2.data.endRecord THEN
BEGIN
hsp2.data.blockStatus ← emptied;
BROADCAST hsp2.data.blockCondition;
END;
IF bytesMoved < blockBytes AND options.signalLongBlock THEN
SIGNAL Stream.LongBlock[block.startIndex];
IF bytesMoved < inputBytes AND options.signalShortBlock THEN
ERROR Stream.ShortBlock;
END;
ENDLOOP;
IF why = endOfStream AND options.signalEndOfStream THEN
SIGNAL Stream.EndOfStream[bytesTransferred]
END; -- GetBLock
PutBlock: ENTRY PROCEDURE [
sH: Stream.Handle, block: Stream.Block, endRecord: BOOLEAN ← FALSE] =
BEGIN
ENABLE UNWIND => NULL;
hsp1, hsp2: HalfStreamPair;
-- IF block=blockNil THEN RETURN; -- -- stop gap solution to nullify SendNow function
[hsp1, hsp2] ← ConvertHandle[sH];
InternalPutBlock[
hsp1, hsp2, block, endRecord ! Stream.Attention => GOTO ReturnAttention]
EXITS ReturnAttention => RETURN WITH ERROR Stream.Attention[0]
END; -- PutBlock
InternalPutBlock: INTERNAL PROCEDURE [
hsp1, hsp2: HalfStreamPair, block: Stream.Block,
endRecord: BOOLEAN ← FALSE] =
BEGIN
attentionReceived: BOOLEAN ← FALSE;
UNTIL hsp1.data.blockStatus = available DO
WAIT hsp1.data.blockCondition ENDLOOP;
hsp1.data.block ← block;
hsp1.data.blockStatus ← inUse;
IF endRecord THEN hsp1.data.endRecord ← TRUE;
BROADCAST hsp2.data.waitForPut;
UNTIL hsp1.data.blockStatus = emptied OR hsp2.data.attnStatus > awaited DO
WAIT hsp1.data.blockCondition ENDLOOP;
IF hsp2.data.attnStatus > awaited AND NOT hsp2.data.attnAnnounced THEN
BEGIN hsp2.data.attnAnnounced ← TRUE; attentionReceived ← TRUE; END;
hsp1.data.blockStatus ← available;
BROADCAST hsp2.data.blockCondition;
IF attentionReceived THEN SIGNAL Stream.Attention[0];
END; -- InternalPutBlock
SetSST: ENTRY PROCEDURE [sH: Stream.Handle, sst: Stream.SubSequenceType] =
BEGIN
ENABLE UNWIND => NULL;
hsp1, hsp2: HalfStreamPair;
[hsp1, hsp2] ← ConvertHandle[sH];
WHILE hsp1.data.sstChange DO WAIT hsp2.data.sstReceived ENDLOOP;
IF sst # hsp1.data.currentSST THEN
BEGIN
hsp1.data.currentSST ← sst;
hsp1.data.sstChange ← TRUE;
BROADCAST hsp2.data.waitForPut;
END;
END; -- SetSST
SendAttention: ENTRY PROCEDURE [sH: Stream.Handle, byte: Stream.Byte] =
BEGIN
ENABLE UNWIND => NULL;
attnList: AttnList ← Heap.systemZone.NEW[AttnRec];
bytes: PACKED ARRAY [0..2) OF Stream.Byte ← [byte, 0];
hsp1, hsp2: HalfStreamPair;
[hsp1, hsp2] ← ConvertHandle[sH];
attnList.next ← NIL;
attnList.byte ← byte;
IF hsp1.data.attnQueue.first = NIL THEN
hsp1.data.attnQueue.first ← hsp1.data.attnQueue.last ← attnList
ELSE {
hsp1.data.attnQueue.last.next ← attnList;
hsp1.data.attnQueue.last ← attnList};
hsp1.data.attnStatus ← arrived;
BROADCAST hsp2.data.attn;
InternalPutBlock[hsp1, hsp2, [@bytes, 0, 1] ! Stream.Attention => CONTINUE];
END; -- SendAttention
WaitAttention: ENTRY PROCEDURE [sH: Stream.Handle]
RETURNS [byte: Stream.Byte] =
BEGIN
ENABLE UNWIND => NULL;
hsp1, hsp2: HalfStreamPair;
[hsp1, hsp2] ← ConvertHandle[sH];
SELECT hsp2.data.attnStatus FROM
none => hsp2.data.attnStatus ← awaited;
arrived => NULL;
ENDCASE => RETURN[0];
byte ← WaitForAttention[sH]
END; -- WaitAttention
WaitForAttention: INTERNAL PROCEDURE [sH: Stream.Handle]
RETURNS [byte: Stream.Byte] =
BEGIN
attnList: AttnList;
hsp1, hsp2: HalfStreamPair;
[hsp1, hsp2] ← ConvertHandle[sH];
WHILE hsp2.data.attnQueue.first = NIL DO WAIT hsp1.data.attn ENDLOOP;
byte ← hsp2.data.attnQueue.first.byte;
IF hsp2.data.attnQueue.first.next = NIL THEN {
Heap.systemZone.FREE[@hsp2.data.attnQueue.first];
hsp2.data.attnQueue ← [NIL, NIL]}
ELSE
BEGIN
attnList ← hsp2.data.attnQueue.first.next;
Heap.systemZone.FREE[@hsp2.data.attnQueue.first];
hsp2.data.attnQueue.first ← attnList;
END;
IF hsp2.data.attnStatus <= arrived THEN
hsp2.data.attnStatus ← deliveredOutOfBand;
BROADCAST hsp1.data.blockCondition -- notify a Put that is in progress.
END; -- WaitForAttention
-- private procedures
ConvertHandle: PROCEDURE [sH: Stream.Handle]
RETURNS [hsp1, hsp2: HalfStreamPair] =
BEGIN
hsp1 ← LOOPHOLE[sH, HalfStreamPair];
IF hsp1 = NIL THEN ERROR;
IF hsp1.data.otherStream = NIL THEN hsp2 ← NIL
ELSE hsp2 ← LOOPHOLE[hsp1.data.otherStream, HalfStreamPair];
END; -- ConvertHandle
END...
LOG
20-Dec-84 16:23:19 AOF Post Klamath
8-Dec-87 9:34:52 AOF No changes - just changing the time stamp