FanoutStreamImpl.mesa
Copyright Ó 1991 by Xerox Corporation. All rights reserved.
Christian Jacobi, July 8, 1991 11:49 am PDT
Last tweaked by Mike Spreitzer March 20, 1992 2:15 pm PST
DIRECTORY
IO USING [Close, CreateStream, CreateStreamProcs, Error, EraseChar, Flush, PutBlock, PutChar, Reset, STREAM, StreamProcs, UnsafeBlock, UnsafePutBlock],
FanoutStream USING [],
IOUtils USING [closedStreamProcs];
Fanout Stream
FanoutStreamData: TYPE = REF FanoutStreamRecord;
FanoutStreamRecord:
TYPE =
RECORD [
reportErrors, forwardClose: BOOL ¬ FALSE,
slaves: LIST OF STREAM ¬ NIL
];
fanoutStreamProcs:
REF
IO.StreamProcs ¬
IO.CreateStreamProcs[
variety: $output, class: $DribbleOutput,
putChar: FanoutStreamPutChar,
putBlock: FanoutStreamPutBlock,
unsafePutBlock: FanoutStreamUnsafePutBlock,
flush: FanoutStreamFlush,
eraseChar: ProtectedFanoutStreamEraseChar,
reset: ProtectedFanoutStreamReset,
close: ProtectedFanoutStreamClose
];
protectedFanoutStreamProcs:
REF
IO.StreamProcs ¬
IO.CreateStreamProcs[
variety: $output, class: $DribbleOutput2,
putChar: ProtectedFanoutStreamPutChar,
putBlock: ProtectedFanoutStreamPutBlock,
unsafePutBlock: ProtectedFanoutStreamUnsafePutBlock,
flush: ProtectedFanoutStreamFlush,
eraseChar: ProtectedFanoutStreamEraseChar,
reset: ProtectedFanoutStreamReset,
close: ProtectedFanoutStreamClose
];
Create:
PUBLIC
PROC [reportErrors:
BOOL ¬
FALSE, forwardClose:
BOOL ¬
FALSE]
RETURNS [
STREAM] = {
RETURN[
IO.CreateStream[
streamProcs: IF reportErrors THEN fanoutStreamProcs ELSE protectedFanoutStreamProcs,
streamData: NEW[FanoutStreamRecord ¬ [reportErrors: reportErrors, forwardClose: forwardClose]]
]];
};
EntryAdd:
ENTRY
PROC [data: FanoutStreamData, slave:
IO.
STREAM] = {
IF data#
NIL
THEN {
FOR l:
LIST
OF
STREAM ¬ data.slaves, l.rest
WHILE l#
NIL
DO
IF l.first = slave THEN RETURN;
ENDLOOP;
data.slaves ¬ CONS[slave, data.slaves];
};
EntryRemove:
ENTRY
PROC [data: FanoutStreamData, slave:
IO.
STREAM] = {
IF data#
NIL
THEN {
WHILE data.slaves#
NIL
AND data.slaves.first=slave
DO
data.slaves ¬ data.slaves.rest
ENDLOOP;
BEGIN
lag: LIST OF STREAM ¬ data.slaves;
WHILE lag#
NIL
AND lag.rest#
NIL
DO
IF lag.rest.first=slave
THEN lag.rest ¬ lag.rest.rest
ELSE lag ¬ lag.rest
ENDLOOP
END;
}
AddSlave:
PUBLIC
PROC [master:
IO.
STREAM, slave:
IO.
STREAM] = {
data: FanoutStreamData = NARROW[master.streamData];
EntryAdd[data, slave];
};
RemoveSlave:
PUBLIC
PROC [master:
IO.
STREAM, slave:
IO.
STREAM] = {
IF master#
NIL
AND slave#
NIL
THEN {
data: FanoutStreamData = NARROW[master.streamData];
EntryRemove[data, slave];
};
};
FanoutStreamPutChar:
PROC [self:
STREAM, char:
CHAR] = {
data: FanoutStreamData = NARROW[self.streamData];
FOR l:
LIST
OF
STREAM ¬ data.slaves, l.rest
WHILE l#
NIL
DO
IO.PutChar[l.first, char];
ENDLOOP
};
ProtectedFanoutStreamPutChar:
PROC [self:
STREAM, char:
CHAR] = {
data: FanoutStreamData = NARROW[self.streamData];
FOR l:
LIST
OF
STREAM ¬ data.slaves, l.rest
WHILE l#
NIL
DO
IO.PutChar[l.first, char !
IO.Error => IF ~data.reportErrors THEN {EntryRemove[data, l.first]; CONTINUE}
];
ENDLOOP
};
FanoutStreamPutBlock:
PROC [self:
STREAM, block:
REF
READONLY
TEXT, startIndex:
NAT, count:
NAT] = {
data: FanoutStreamData = NARROW[self.streamData];
FOR l:
LIST
OF
STREAM ¬ data.slaves, l.rest
WHILE l#
NIL
DO
IO.PutBlock[l.first, block, startIndex, count];
ENDLOOP
};
ProtectedFanoutStreamPutBlock:
PROC [self:
STREAM, block:
REF
READONLY
TEXT, startIndex:
NAT, count:
NAT] = {
data: FanoutStreamData = NARROW[self.streamData];
FOR l:
LIST
OF
STREAM ¬ data.slaves, l.rest
WHILE l#
NIL
DO
IO.PutBlock[l.first, block, startIndex, count !
IO.Error => IF ~data.reportErrors THEN {EntryRemove[data, l.first]; CONTINUE}
];
ENDLOOP
};
FanoutStreamUnsafePutBlock:
PROC [self:
STREAM, block: IO.UnsafeBlock] = {
data: FanoutStreamData = NARROW[self.streamData];
FOR l:
LIST
OF
STREAM ¬ data.slaves, l.rest
WHILE l#
NIL
DO
IO.UnsafePutBlock[l.first, block];
ENDLOOP
};
ProtectedFanoutStreamUnsafePutBlock:
PROC [self:
STREAM, block: IO.UnsafeBlock] = {
data: FanoutStreamData = NARROW[self.streamData];
FOR l:
LIST
OF
STREAM ¬ data.slaves, l.rest
WHILE l#
NIL
DO
IO.UnsafePutBlock[l.first, block !
IO.Error => IF ~data.reportErrors THEN {EntryRemove[data, l.first]; CONTINUE}
];
ENDLOOP
};
FanoutStreamFlush:
PROC [self:
STREAM] = {
data: FanoutStreamData = NARROW[self.streamData];
FOR l:
LIST
OF
STREAM ¬ data.slaves, l.rest
WHILE l#
NIL
DO
IO.Flush[l.first];
ENDLOOP
};
ProtectedFanoutStreamFlush:
PROC [self:
STREAM] = {
data: FanoutStreamData = NARROW[self.streamData];
FOR l:
LIST
OF
STREAM ¬ data.slaves, l.rest
WHILE l#
NIL
DO
IO.Flush[l.first !
IO.Error => IF ~data.reportErrors THEN {EntryRemove[data, l.first]; CONTINUE}
];
ENDLOOP
};
ProtectedFanoutStreamEraseChar:
PROC [self:
STREAM, char:
CHAR] = {
data: FanoutStreamData = NARROW[self.streamData];
FOR l:
LIST
OF
STREAM ¬ data.slaves, l.rest
WHILE l#
NIL
DO
IO.EraseChar[l.first, char !
IO.Error => IF ~data.reportErrors THEN {EntryRemove[data, l.first]; CONTINUE}
];
ENDLOOP
};
ProtectedFanoutStreamReset:
PROC [self:
STREAM] = {
data: FanoutStreamData = NARROW[self.streamData];
FOR l:
LIST
OF
STREAM ¬ data.slaves, l.rest
WHILE l#
NIL
DO
IO.Reset[l.first !
IO.Error => IF ~data.reportErrors THEN {EntryRemove[data, l.first]; CONTINUE}
];
ENDLOOP
};
ProtectedFanoutStreamClose:
PROC [self:
STREAM, abort:
BOOL] = {
data: FanoutStreamData = NARROW[self.streamData];
IF data.forwardClose
THEN
FOR l:
LIST
OF
STREAM ¬ data.slaves, l.rest
WHILE l#
NIL
DO
IO.Close[l.first !
IO.Error => IF ~data.reportErrors THEN CONTINUE
];
ENDLOOP;
data.slaves ¬ NIL;
self.streamProcs ¬ IOUtils.closedStreamProcs;
};