RoseValueSequencePipesImpl:
CEDAR
MONITOR
LOCKS pd USING pd: PipeData
IMPORTS IO, Process
EXPORTS RoseValueSequences
= {
OPEN RoseTypes, RoseSets, RoseValueSequences;
PipeData: TYPE = REF PipeDataRep;
PipeDataRep:
TYPE =
MONITORED
RECORD [
header: Header ← NIL,
curAvail: INT ← -1,
curConsumed: INT ← -1,
firstGet: BOOL ← TRUE,
knownEnd:
BOOL ←
TRUE,
Certainly at end if TRUE; might also be at end if FALSE.
change: CONDITION
];
NumChars: INT = CHAR.LAST.ORD.SUCC - CHAR.FIRST.ORD;
outPipeProcs:
REF
IO.StreamProcs ←
IO.CreateStreamProcs[
variety: output,
class: $RoseValueSequencePipeOut,
putChar: PipePut,
flush: PipeFlush,
reset: OutPipeReset,
close: OutPipeClose,
getIndex: OutPipeGetIndex];
inPipeProcs:
REF
IO.StreamProcs ←
IO.CreateStreamProcs[
variety: input,
class: $RoseValueSequencePipeIn,
getChar: PipeGet,
endOf: PipeEndOf,
charsAvail: PipeCharsAvail,
backup: PipeBackup,
reset: InPipeReset,
close: InPipeClose,
getIndex: InPipeGetIndex];
PipeGet:
PROC [self:
STREAM]
RETURNS [c:
CHAR] = {
EntryGet:
ENTRY
PROC [pd: PipeData] = {
ENABLE UNWIND => {};
IF pd.knownEnd THEN ERROR IO.EndOfStream[self];
IF pd.firstGet THEN pd.firstGet ← FALSE ELSE pd.curConsumed ← pd.curConsumed + 1;
BROADCAST pd.change;
DO
SELECT pd.curAvail - pd.curConsumed
FROM
0 => IF pd.knownEnd THEN ERROR IO.EndOfStream[self] ELSE WAIT pd.change;
1 => EXIT;
ENDCASE => ERROR IO.Error[Failure, self];
ENDLOOP;
c ← 0C + (pd.curAvail MOD NumChars);
};
pd: PipeData ← NARROW[self.streamData];
EntryGet[pd];
};
PipePut:
PROC [self:
STREAM, char:
CHAR] = {
EntryPut:
ENTRY
PROC [pd: PipeData] = {
ENABLE UNWIND => {};
IF pd.knownEnd THEN ERROR IO.Error[StreamClosed, self];
pd.curAvail ← pd.curAvail + 1;
BROADCAST pd.change;
DO
SELECT pd.curAvail - pd.curConsumed
FROM
0 => EXIT;
1 => WAIT pd.change;
ENDCASE => ERROR IO.Error[Failure, self];
ENDLOOP;
};
pd: PipeData ← NARROW[self.streamData];
EntryPut[pd];
};
PipeFlush:
PROC [self:
STREAM] = {
EntryFlush:
ENTRY
PROC [pd: PipeData] = {
ENABLE UNWIND => {};
IF pd.knownEnd THEN ERROR IO.Error[StreamClosed, self];
DO
SELECT pd.curAvail - pd.curConsumed
FROM
0 => EXIT;
1 => WAIT pd.change;
ENDCASE => ERROR IO.Error[Failure, self];
ENDLOOP;
};
pd: PipeData ← NARROW[self.streamData];
EntryFlush[pd];
};
PipeEndOf:
PROC [self:
STREAM]
RETURNS [atEnd:
BOOL] = {
EntryEndOf:
ENTRY
PROC [pd: PipeData] = {
ENABLE UNWIND => {};
UNTIL pd.curAvail > pd.curConsumed OR pd.knownEnd DO WAIT pd.change ENDLOOP;
atEnd ← NOT pd.curAvail > pd.curConsumed;
};
pd: PipeData ← NARROW[self.streamData];
EntryEndOf[pd];
};
PipeCharsAvail:
PROC [self:
STREAM, wait:
BOOL]
RETURNS [n:
INT] = {
EntryCharsAvail:
ENTRY
PROC [pd: PipeData] = {
ENABLE UNWIND => {};
IF wait THEN UNTIL pd.curAvail > pd.curConsumed DO WAIT pd.change ENDLOOP;
n ← pd.curAvail - pd.curConsumed;
};
pd: PipeData ← NARROW[self.streamData];
EntryCharsAvail[pd];
};
PipeBackup:
PROC [self:
STREAM, char:
CHAR] =
{ERROR IO.Error[NotImplementedForThisStream, self]};
OutPipeReset:
PROC [self:
STREAM] = {
EntryReset:
ENTRY
PROC [pd: PipeData] = {
ENABLE UNWIND => {};
IF pd.knownEnd THEN ERROR IO.Error[StreamClosed, self];
pd.curAvail ← -1;
BROADCAST pd.change;
};
pd: PipeData ← NARROW[self.streamData];
EntryReset[pd];
};
InPipeReset:
PROC [self:
STREAM] = {
EntryReset:
ENTRY
PROC [pd: PipeData] = {
ENABLE UNWIND => {};
IF pd.knownEnd THEN ERROR IO.Error[StreamClosed, self];
pd.curConsumed ← -1;
BROADCAST pd.change;
};
pd: PipeData ← NARROW[self.streamData];
EntryReset[pd];
};
OutPipeClose:
PROC [self:
STREAM, abort:
BOOL ←
FALSE] = {
EntryClose:
ENTRY
PROC [pd: PipeData] = {
ENABLE UNWIND => {};
pd.knownEnd ← TRUE;
BROADCAST pd.change;
};
pd: PipeData ← NARROW[self.streamData];
EntryClose[pd];
};
InPipeClose:
PROC [self:
STREAM, abort:
BOOL ←
FALSE] = {
EntryClose:
ENTRY
PROC [pd: PipeData] = {
ENABLE UNWIND => {};
pd.curConsumed ← pd.curConsumed + 1;
BROADCAST pd.change;
};
pd: PipeData ← NARROW[self.streamData];
EntryClose[pd];
};
OutPipeGetIndex:
PROC [self:
STREAM]
RETURNS [index:
INT] = {
EntryGetIndex:
ENTRY
PROC [pd: PipeData] = {
ENABLE UNWIND => {};
index ← pd.curAvail;
};
pd: PipeData ← NARROW[self.streamData];
EntryGetIndex[pd];
};
InPipeGetIndex:
PROC [self:
STREAM]
RETURNS [index:
INT] = {
EntryGetIndex:
ENTRY
PROC [pd: PipeData] = {
ENABLE UNWIND => {};
index ← pd.curConsumed;
};
pd: PipeData ← NARROW[self.streamData];
EntryGetIndex[pd];
};
CreatePipe:
PUBLIC
PROC
RETURNS [p: Producer, c: Consumer] = {
pd: PipeData ← NEW [PipeDataRep ← []];
ps: STREAM ← IO.CreateStream[inPipeProcs, pd];
cs: STREAM ← IO.CreateStream[outPipeProcs, pd];
TRUSTED {
Process.InitializeCondition[@pd.change, Process.SecondsToTicks[10]];
Process.EnableAborts[@pd.change];
};
p ← NEW [ProducerRep ← [pd, PipeGiveHeader, ps]];
c ← NEW [ConsumerRep ← [pd, PipeTakeHeader, cs]];
};
PipeGiveHeader:
PROC [data:
REF
ANY]
RETURNS [header: Header] = {
EntryGiveHeader:
ENTRY
PROC [pd: PipeData] = {
ENABLE UNWIND => {};
WHILE pd.header = NIL DO WAIT pd.change ENDLOOP;
header ← pd.header;
};
pd: PipeData ← NARROW[data];
EntryGiveHeader[pd];
};
PipeTakeHeader:
PROC [data:
REF
ANY, header: Header] = {
EntryTakeHeader:
ENTRY
PROC [pd: PipeData] = {
ENABLE UNWIND => {};
IF pd.header # NIL THEN ERROR;
pd.header ← header;
BROADCAST pd.change;
};
pd: PipeData ← NARROW[data];
IF header = NIL THEN ERROR;
EntryTakeHeader[pd];
};
Copy:
PUBLIC
PROC [from: Producer, to: Consumer] = {
to.takeHeader[to.data, from.giveHeader[from.data]];
DO
to.synch.PutChar[from.synch.GetChar[!IO.EndOfStream => EXIT]];
ENDLOOP;
};
}.