RoseValueSequencePipesImpl.Mesa
Last Edited by: Spreitzer, May 17, 1985 4:39:49 pm PDT
DIRECTORY Atom, IO, Process, RoseSets, RoseTypes, RoseValueSequences;
RoseValueSequencePipesImpl: CEDAR MONITOR
LOCKS pd USING pd: PipeData
IMPORTS IO, Process
EXPORTS RoseValueSequences
= {
OPEN RoseTypes, RoseSets, RoseValueSequences;
PipeData: TYPE = REF PipeDataRep;
PipeDataRep: TYPE = MONITORED RECORD [
header: Header ← NIL,
curAvail: INT ← -1,
curConsumed: INT ← -1,
firstGet: BOOLTRUE,
knownEnd: BOOLTRUE,
Certainly at end if TRUE; might also be at end if FALSE.
change: CONDITION
];
NumChars: INT = CHAR.LAST.ORD.SUCC - CHAR.FIRST.ORD;
outPipeProcs: REF IO.StreamProcs ← IO.CreateStreamProcs[
variety: output,
class: $RoseValueSequencePipeOut,
putChar: PipePut,
flush: PipeFlush,
reset: OutPipeReset,
close: OutPipeClose,
getIndex: OutPipeGetIndex];
inPipeProcs: REF IO.StreamProcs ← IO.CreateStreamProcs[
variety: input,
class: $RoseValueSequencePipeIn,
getChar: PipeGet,
endOf: PipeEndOf,
charsAvail: PipeCharsAvail,
backup: PipeBackup,
reset: InPipeReset,
close: InPipeClose,
getIndex: InPipeGetIndex];
PipeGet: PROC [self: STREAM] RETURNS [c: CHAR] = {
EntryGet: ENTRY PROC [pd: PipeData] = {
ENABLE UNWIND => {};
IF pd.knownEnd THEN ERROR IO.EndOfStream[self];
IF pd.firstGet THEN pd.firstGet ← FALSE ELSE pd.curConsumed ← pd.curConsumed + 1;
BROADCAST pd.change;
DO
SELECT pd.curAvail - pd.curConsumed FROM
0 => IF pd.knownEnd THEN ERROR IO.EndOfStream[self] ELSE WAIT pd.change;
1 => EXIT;
ENDCASE => ERROR IO.Error[Failure, self];
ENDLOOP;
c ← 0C + (pd.curAvail MOD NumChars);
};
pd: PipeData ← NARROW[self.streamData];
EntryGet[pd];
};
PipePut: PROC [self: STREAM, char: CHAR] = {
EntryPut: ENTRY PROC [pd: PipeData] = {
ENABLE UNWIND => {};
IF pd.knownEnd THEN ERROR IO.Error[StreamClosed, self];
pd.curAvail ← pd.curAvail + 1;
BROADCAST pd.change;
DO
SELECT pd.curAvail - pd.curConsumed FROM
0 => EXIT;
1 => WAIT pd.change;
ENDCASE => ERROR IO.Error[Failure, self];
ENDLOOP;
};
pd: PipeData ← NARROW[self.streamData];
EntryPut[pd];
};
PipeFlush: PROC [self: STREAM] = {
EntryFlush: ENTRY PROC [pd: PipeData] = {
ENABLE UNWIND => {};
IF pd.knownEnd THEN ERROR IO.Error[StreamClosed, self];
DO
SELECT pd.curAvail - pd.curConsumed FROM
0 => EXIT;
1 => WAIT pd.change;
ENDCASE => ERROR IO.Error[Failure, self];
ENDLOOP;
};
pd: PipeData ← NARROW[self.streamData];
EntryFlush[pd];
};
PipeEndOf: PROC [self: STREAM] RETURNS [atEnd: BOOL] = {
EntryEndOf: ENTRY PROC [pd: PipeData] = {
ENABLE UNWIND => {};
UNTIL pd.curAvail > pd.curConsumed OR pd.knownEnd DO WAIT pd.change ENDLOOP;
atEnd ← NOT pd.curAvail > pd.curConsumed;
};
pd: PipeData ← NARROW[self.streamData];
EntryEndOf[pd];
};
PipeCharsAvail: PROC [self: STREAM, wait: BOOL] RETURNS [n: INT] = {
EntryCharsAvail: ENTRY PROC [pd: PipeData] = {
ENABLE UNWIND => {};
IF wait THEN UNTIL pd.curAvail > pd.curConsumed DO WAIT pd.change ENDLOOP;
n ← pd.curAvail - pd.curConsumed;
};
pd: PipeData ← NARROW[self.streamData];
EntryCharsAvail[pd];
};
PipeBackup: PROC [self: STREAM, char: CHAR] =
{ERROR IO.Error[NotImplementedForThisStream, self]};
OutPipeReset: PROC [self: STREAM] = {
EntryReset: ENTRY PROC [pd: PipeData] = {
ENABLE UNWIND => {};
IF pd.knownEnd THEN ERROR IO.Error[StreamClosed, self];
pd.curAvail ← -1;
BROADCAST pd.change;
};
pd: PipeData ← NARROW[self.streamData];
EntryReset[pd];
};
InPipeReset: PROC [self: STREAM] = {
EntryReset: ENTRY PROC [pd: PipeData] = {
ENABLE UNWIND => {};
IF pd.knownEnd THEN ERROR IO.Error[StreamClosed, self];
pd.curConsumed ← -1;
BROADCAST pd.change;
};
pd: PipeData ← NARROW[self.streamData];
EntryReset[pd];
};
OutPipeClose: PROC [self: STREAM, abort: BOOLFALSE] = {
EntryClose: ENTRY PROC [pd: PipeData] = {
ENABLE UNWIND => {};
pd.knownEnd ← TRUE;
BROADCAST pd.change;
};
pd: PipeData ← NARROW[self.streamData];
EntryClose[pd];
};
InPipeClose: PROC [self: STREAM, abort: BOOLFALSE] = {
EntryClose: ENTRY PROC [pd: PipeData] = {
ENABLE UNWIND => {};
pd.curConsumed ← pd.curConsumed + 1;
BROADCAST pd.change;
};
pd: PipeData ← NARROW[self.streamData];
EntryClose[pd];
};
OutPipeGetIndex: PROC [self: STREAM] RETURNS [index: INT] = {
EntryGetIndex: ENTRY PROC [pd: PipeData] = {
ENABLE UNWIND => {};
index ← pd.curAvail;
};
pd: PipeData ← NARROW[self.streamData];
EntryGetIndex[pd];
};
InPipeGetIndex: PROC [self: STREAM] RETURNS [index: INT] = {
EntryGetIndex: ENTRY PROC [pd: PipeData] = {
ENABLE UNWIND => {};
index ← pd.curConsumed;
};
pd: PipeData ← NARROW[self.streamData];
EntryGetIndex[pd];
};
CreatePipe: PUBLIC PROC RETURNS [p: Producer, c: Consumer] = {
pd: PipeData ← NEW [PipeDataRep ← []];
ps: STREAMIO.CreateStream[inPipeProcs, pd];
cs: STREAMIO.CreateStream[outPipeProcs, pd];
TRUSTED {
Process.InitializeCondition[@pd.change, Process.SecondsToTicks[10]];
Process.EnableAborts[@pd.change];
};
p ← NEW [ProducerRep ← [pd, PipeGiveHeader, ps]];
c ← NEW [ConsumerRep ← [pd, PipeTakeHeader, cs]];
};
PipeGiveHeader: PROC [data: REF ANY] RETURNS [header: Header] = {
EntryGiveHeader: ENTRY PROC [pd: PipeData] = {
ENABLE UNWIND => {};
WHILE pd.header = NIL DO WAIT pd.change ENDLOOP;
header ← pd.header;
};
pd: PipeData ← NARROW[data];
EntryGiveHeader[pd];
};
PipeTakeHeader: PROC [data: REF ANY, header: Header] = {
EntryTakeHeader: ENTRY PROC [pd: PipeData] = {
ENABLE UNWIND => {};
IF pd.header # NIL THEN ERROR;
pd.header ← header;
BROADCAST pd.change;
};
pd: PipeData ← NARROW[data];
IF header = NIL THEN ERROR;
EntryTakeHeader[pd];
};
Copy: PUBLIC PROC [from: Producer, to: Consumer] = {
to.takeHeader[to.data, from.giveHeader[from.data]];
DO
to.synch.PutChar[from.synch.GetChar[!IO.EndOfStream => EXIT]];
ENDLOOP;
};
}.