ProcStreamImpl.mesa
Copyright Ó 1989, 1991 by Xerox Corporation. All rights reserved.
Demers, November 21, 1989 8:51:57 am PST
Willie-s, December 9, 1991 12:40 pm PST
Christian Jacobi, July 24, 1992 2:34 pm PDT
DIRECTORY
Basics,
FinalizeOps,
IO,
ProcStream,
RefText,
Rope,
RuntimeError
;
ProcStreamImpl: CEDAR MONITOR
LOCKS d USING d: PrivateStreamData
IMPORTS Basics, FinalizeOps, IO, RefText, RuntimeError
EXPORTS ProcStream
~ {
Parameters
defaultInBufferLength: CARDINAL ¬ 2048;
defaultOutBufferLength: CARDINAL ¬ 2048;
myClass: ATOM ~ $PROC;
Types
ROPE: TYPE ~ Rope.ROPE;
STREAM: TYPE ~ IO.STREAM;
GetProc: TYPE ~ ProcStream.GetProc;
PutProc: TYPE ~ ProcStream.PutProc;
CloseProc: TYPE ~ ProcStream.CloseProc;
GetErrorProc: TYPE ~ ProcStream.GetErrorProc;
Stream Implementation
PrivateStreamData: TYPE ~ REF PrivateStreamDataRecord;
PrivateStreamDataRecord: TYPE ~ MONITORED RECORD [
data: REF ¬ NIL,
get: GetProc ¬ NIL,
put: PutProc ¬ NIL,
close: CloseProc ¬ NIL,
getError: GetErrorProc ¬ NIL,
closed: BOOL ¬ FALSE,
atEnd: BOOL ¬ FALSE,
canSetIndex: BOOL ¬ TRUE,
shared: BOOL ¬ TRUE,
inBuf: REF TEXT ¬ NIL,
inBufIndex: CARDINAL ¬ 0,
inIndex: CARD ¬ 0, -- actual stream index
outBuf: REF TEXT ¬ NIL,
outIndex: CARD ¬ 0, -- actual stream index
ec: IO.ErrorCode ¬ Null,
errCode: INT ¬ 0,
errMsg: ROPE ¬ NIL
];
PrivateStreamDataFromStream: PROC [s: STREAM] RETURNS [PrivateStreamData] ~ INLINE {
RETURN [NARROW[s.streamData, PrivateStreamData]];
};
GetPutIndex: PROC[d: PrivateStreamData] RETURNS [CARD] ~ INLINE {
RETURN[ IF d.canSetIndex THEN d.outIndex ELSE 0 ];
};
GetGetIndex: PROC[d: PrivateStreamData] RETURNS [CARD] ~ INLINE {
RETURN[ IF d.canSetIndex THEN d.inIndex ELSE 0 ];
};
RaiseError: PROC [s: STREAM, ec: IO.ErrorCode, code: INT, msg: ROPE] ~ {
d: PrivateStreamData ¬ PrivateStreamDataFromStream[s];
newMsg: ROPE;
newEC: IO.ErrorCode;
IF code > 0 THEN code ¬ -code;
IF code # 0 THEN {
[newEC, newMsg] ¬ d.getError[d.data, code];
IF ec = Null THEN ec ¬ newEC;
msg ¬ IO.PutFR["%g (%g: %g)", IO.rope[msg], IO.int[-code], IO.rope[newMsg]];
};
IF ec = Null THEN ec ¬ Failure;
d.ec ¬ ec;
d.errCode ¬ code;
d.errMsg ¬ msg;
ERROR IO.Error[ec, s];
};
RaiseClosed: PROC [s: STREAM] ~ {
RaiseError[s, StreamClosed, 0, "stream closed"];
};
RaiseEndOf: PROC [s: STREAM] ~ {
d: PrivateStreamData ¬ PrivateStreamDataFromStream[s];
d.ec ¬ Failure;
d.errCode ¬ 0;
d.errMsg ¬ "end of stream";
ERROR IO.EndOfStream[s];
};
ValidateSharedInputBuffer: INTERNAL PROC [s: STREAM, d: PrivateStreamData] ~ {
sharedLo, sharedHi, sharedLen, ixIn, ixOut: CARD;
IF NOT d.shared THEN ERROR;
sharedLo ¬ MAX[ d.inIndex, d.outIndex-d.outBuf.length ];
sharedHi ¬ MIN[ d.inIndex+d.inBuf.length-d.inBufIndex, d.outIndex ];
IF sharedLo >= sharedHi THEN RETURN;
sharedLen ¬ sharedHi - sharedLo;
ixIn ¬ d.inBufIndex + (sharedLo - d.inIndex);
ixOut ¬ d.outBuf.length - (d.outIndex - sharedLo);
TRUSTED {
[] ¬ Basics.ByteBlt[
from~[
blockPointer~LOOPHOLE[d.outBuf],
startIndex~ixOut+BYTES[TEXT[0]],
stopIndexPlusOne~ixOut+BYTES[TEXT[0]]+sharedLen
],
to~[
blockPointer~LOOPHOLE[d.inBuf],
startIndex~ixIn+BYTES[TEXT[0]],
stopIndexPlusOne~ixIn+BYTES[TEXT[0]]+sharedLen
]
];
};
};
ValidateSharedRead: INTERNAL PROC [s: STREAM, d: PrivateStreamData, nBytes: CARD] ~ {
sharedLo, sharedHi: CARD;
IF NOT d.shared THEN ERROR;
IF d.inBufIndex # d.inBuf.length THEN ERROR;
sharedLo ¬ MAX[ d.inIndex, d.outIndex-d.outBuf.length ];
sharedHi ¬ MIN[ d.inIndex+nBytes, d.outIndex ];
IF sharedLo >= sharedHi THEN RETURN;
FlushBuf[s, d];
};
FillBuf: INTERNAL PROC [s: STREAM, d: PrivateStreamData] ~ {
b: REF TEXT ¬ d.inBuf;
len, nRead: INT;
IF (d.inBufIndex < b.length) OR (d.atEnd) THEN RETURN;
len ¬ b.maxLength;
IF d.shared THEN ValidateSharedRead[s, d, len];
TRUSTED {
[nRead, d.atEnd] ¬ d.get[d.data, GetGetIndex[d], [LOOPHOLE[b], BYTES[TEXT[0]], len]];
};
IF nRead < 0 THEN RaiseError[s, Null, nRead, "read error"];
d.inBufIndex ¬ 0;
b.length ¬ nRead;
};
FlushBuf: INTERNAL PROC [s: STREAM, d: PrivateStreamData] ~ {
b: REF TEXT ¬ d.outBuf;
ib, fIndex: INT;
ib ¬ 0;
DO
len: INT ¬ (INT[b.length] - ib);
IF len <= 0 THEN EXIT;
fIndex ¬ IF d.canSetIndex THEN d.outIndex - len ELSE 0;
TRUSTED {
len ¬ d.put[d.data, fIndex, [LOOPHOLE[b], ib+BYTES[TEXT[0]], len]];
};
SELECT len FROM
< 0 => {
RaiseError[s, Null, len, "write error"];
};
> 0 => {
ib ¬ ib + len;
};
ENDCASE => ERROR; -- "can't happen"
ENDLOOP;
b.length ¬ 0;
};
MyEntryGetChar: ENTRY PROC [s: STREAM, d: PrivateStreamData] RETURNS [c: CHAR] ~ {
ENABLE UNWIND => NULL;
b: REF TEXT ¬ d.inBuf;
i: CARDINAL;
DO
IF d.closed THEN RaiseClosed[s];
IF d.shared THEN ValidateSharedInputBuffer[s, d];
IF (i ¬ d.inBufIndex) < b.length THEN EXIT;
IF d.atEnd THEN RaiseEndOf[s];
FillBuf[s, d];
ENDLOOP;
c ¬ b[i];
d.inBufIndex ¬ i + 1;
d.inIndex ¬ d.inIndex + 1;
};
MyGetChar: PROC [self: STREAM] RETURNS [CHAR] ~ {
RETURN [MyEntryGetChar[self, PrivateStreamDataFromStream[self]]];
};
MyEntryUnsafeGetBlock: ENTRY PROC [s: STREAM, d: PrivateStreamData, block: IO.UnsafeBlock] RETURNS [nBytesRead: INT ¬ 0] ~ CHECKED {
ENABLE UNWIND => NULL;
b: REF TEXT ¬ d.inBuf;
ix: CARD;
len: INT;
nBytesMoreWanted: INT ¬ block.count;
blockIndex: INT ¬ block.startIndex;
DO
IF d.closed THEN {
IF nBytesRead = 0 THEN RaiseClosed[s] ELSE RETURN;
};
IF nBytesMoreWanted <= 0 THEN RETURN;
IF d.shared THEN ValidateSharedInputBuffer[s, d];
SELECT TRUE FROM
(b.length > d.inBufIndex) => {
ix ¬ d.inBufIndex;
len ¬ MIN[ INT[b.length] - INT[ix], nBytesMoreWanted ];
TRUSTED {
[] ¬ Basics.ByteBlt[
to~[
blockPointer~LOOPHOLE[block.base],
startIndex~blockIndex,
stopIndexPlusOne~blockIndex+len
],
from~[
blockPointer~LOOPHOLE[b],
startIndex~ix+BYTES[TEXT[0]],
stopIndexPlusOne~ix+BYTES[TEXT[0]]+len
]
];
};
d.inBufIndex ¬ ix + len;
};
(nBytesMoreWanted > b.maxLength/2) => {
IF d.atEnd THEN RETURN;
IF d.shared THEN ValidateSharedRead[s, d, nBytesMoreWanted];
TRUSTED {
[len, d.atEnd] ¬ d.get[d.data, GetGetIndex[d], [block.base, blockIndex, nBytesMoreWanted]];
};
IF len < 0 THEN RaiseError[s, Null, len, "read error"];
};
ENDCASE => {
IF d.atEnd THEN RETURN;
FillBuf[s, d];
len ¬ 0;
};
d.inIndex ¬ d.inIndex + len;
nBytesRead ¬ nBytesRead + len;
blockIndex ¬ blockIndex + len;
nBytesMoreWanted ¬ nBytesMoreWanted - len;
ENDLOOP;
};
MyUnsafeGetBlock: UNSAFE PROC [self: STREAM, block: IO.UnsafeBlock] RETURNS [nBytesRead: INT] ~ {
d: PrivateStreamData ~ PrivateStreamDataFromStream[self];
IF block.startIndex < 0 OR block.count < 0 THEN ERROR RuntimeError.BoundsFault;
RETURN [MyEntryUnsafeGetBlock[self, d, block]];
};
MyEntryEndOf: ENTRY PROC [s: STREAM, d: PrivateStreamData] RETURNS [BOOL] ~ {
ENABLE UNWIND => NULL;
b: REF TEXT ¬ d.inBuf;
DO
IF d.closed THEN RaiseClosed[s];
IF d.inBufIndex < b.length THEN RETURN [FALSE];
IF d.atEnd THEN RETURN [TRUE];
FillBuf[s, d];
ENDLOOP;
};
MyEndOf: PROC [self: STREAM] RETURNS [BOOL] ~ {
RETURN [MyEntryEndOf[self, PrivateStreamDataFromStream[self]]];
};
MyEntryCharsAvail: ENTRY PROC [s: STREAM, d: PrivateStreamData, wait: BOOL] RETURNS [INT] ~ {
ENABLE UNWIND => NULL;
b: REF TEXT ¬ d.inBuf;
nLeftInBuffer: INT;
DO
IF d.closed THEN RaiseClosed[s];
nLeftInBuffer ¬ INT[b.length] - INT[d.inBufIndex];
IF nLeftInBuffer > 0 THEN RETURN [nLeftInBuffer];
IF d.atEnd THEN RETURN [INT.LAST];
IF NOT wait THEN RETURN[0];
FillBuf[s, d];
ENDLOOP;
};
MyCharsAvail: PROC [self: STREAM, wait: BOOL] RETURNS [INT] ~ {
RETURN [MyEntryCharsAvail[self, PrivateStreamDataFromStream[self], wait]];
};
MyEntryPutChar: ENTRY PROC [s: STREAM, d: PrivateStreamData, char: CHAR] ~ {
ENABLE UNWIND => NULL;
b: REF TEXT ¬ d.outBuf;
len: CARDINAL;
DO
IF d.closed THEN RaiseClosed[s];
IF (len ¬ b.length) < b.maxLength THEN EXIT;
FlushBuf[s, d];
ENDLOOP;
d.outIndex ¬ d.outIndex + 1;
b.length ¬ len + 1;
b[len] ¬ char;
};
MyPutChar: PROC [self: STREAM, char: CHAR] ~ {
MyEntryPutChar[self, PrivateStreamDataFromStream[self], char];
};
MyEntryUnsafePutBlock: ENTRY PROC [s: STREAM, d: PrivateStreamData, block: IO.UnsafeBlock] ~ CHECKED {
ENABLE UNWIND => NULL;
b: REF TEXT ¬ d.outBuf;
ix: CARD;
len: INT;
nBytesMoreToWrite: INT ¬ block.count;
blockIndex: INT ¬ block.startIndex;
DO
IF d.closed THEN RaiseClosed[s];
IF nBytesMoreToWrite <= 0 THEN RETURN;
SELECT TRUE FROM
(b.length = b.maxLength) => {
FlushBuf[s, d]; -- updates b.length
len ¬ 0;
};
(b.length > 0) OR (nBytesMoreToWrite <= (b.maxLength/2)) => {
ix ¬ b.length;
len ¬ MIN[ (INT[b.maxLength] - INT[ix]), nBytesMoreToWrite ];
TRUSTED {
[] ¬ Basics.ByteBlt[
to~[
blockPointer~LOOPHOLE[b, LONG POINTER]+SIZE[TEXT[0]],
startIndex~ix,
stopIndexPlusOne~ix+len
],
from~[
blockPointer~LOOPHOLE[block.base],
startIndex~blockIndex,
stopIndexPlusOne~blockIndex+len
]
];
};
b.length ¬ ix + len;
};
ENDCASE => {
TRUSTED {
len ¬ d.put[d.data, GetPutIndex[d], [block.base, blockIndex, nBytesMoreToWrite]];
};
SELECT len FROM
< 0 => {
RaiseError[s, Null, len, "write error"];
};
ENDCASE;
};
d.outIndex ¬ d.outIndex + len;
blockIndex ¬ blockIndex + len;
nBytesMoreToWrite ¬ nBytesMoreToWrite - len;
ENDLOOP;
};
MyUnsafePutBlock: PROC [self: STREAM, block: IO.UnsafeBlock] ~ {
d: PrivateStreamData ~ PrivateStreamDataFromStream[self];
IF block.startIndex < 0 OR block.count < 0 THEN ERROR RuntimeError.BoundsFault;
MyEntryUnsafePutBlock[self, d, block];
};
MyEntryFlush: ENTRY PROC [s: STREAM, d: PrivateStreamData] ~ {
ENABLE UNWIND => NULL;
IF d.closed THEN RaiseClosed[s];
IF d.outBuf.length > 0 THEN FlushBuf[s, d];
};
MyFlush: PROC [self: STREAM] ~ {
MyEntryFlush[self, PrivateStreamDataFromStream[self]];
};
MyEntryReset: PROC [s: STREAM, d: PrivateStreamData] ~ {
ENABLE UNWIND => NULL;
n: CARDINAL;
IF d.closed THEN RETURN;
IF (n ¬ (d.inBuf.length - d.inBufIndex)) > 0 THEN {
d.inBufIndex ¬ d.inBufIndex + n;
d.inIndex ¬ d.inIndex + n;
};
d.atEnd ¬ TRUE;
};
MyReset: PROC [self: STREAM] ~ {
MyEntryReset[self, PrivateStreamDataFromStream[self]];
};
MyEntryClose: ENTRY PROC [s: STREAM, d: PrivateStreamData, abort: BOOL] ~ {
ENABLE UNWIND => NULL;
ans: INT;
IF d.closed THEN RETURN;
IF IO.GetInfo[s].variety # input THEN {
IF (d.outBuf.length > 0) AND (NOT abort) THEN FlushBuf[s, d];
};
d.closed ¬ TRUE;
ans ¬ d.close[d.data, abort];
IF ans < 0 THEN RaiseError[s, Null, ans, "i/o error in close"];
};
MyClose: PROC [self: STREAM, abort: BOOL] ~ {
MyEntryClose[self, PrivateStreamDataFromStream[self], abort];
};
MyEntryInGetIndex: ENTRY PROC [s: STREAM, d: PrivateStreamData] RETURNS [INT] ~ {
ENABLE UNWIND => NULL;
IF d.closed THEN RaiseClosed[s];
RETURN[d.inIndex];
};
MyInGetIndex: PROC [self: STREAM] RETURNS [INT] ~ {
RETURN[MyEntryInGetIndex[self, PrivateStreamDataFromStream[self]]];
};
MyEntryInSetIndex: ENTRY PROC [s: STREAM, d: PrivateStreamData, index: INT] ~ {
ENABLE UNWIND => NULL;
IF d.closed THEN RaiseClosed[s];
d.inBufIndex ¬ d.inBuf.length;
d.inIndex ¬ index;
};
MyInSetIndex: PROC [self: STREAM, index: INT] ~ {
MyEntryInSetIndex[self, PrivateStreamDataFromStream[self], index];
};
MyEntryOutGetIndex: ENTRY PROC [s: STREAM, d: PrivateStreamData] RETURNS [INT] ~ {
ENABLE UNWIND => NULL;
IF d.closed THEN RaiseClosed[s];
RETURN[d.outIndex];
};
MyOutGetIndex: PROC [self: STREAM] RETURNS [INT] ~ {
RETURN[MyEntryOutGetIndex[self, PrivateStreamDataFromStream[self]]];
};
MyEntryOutSetIndex: ENTRY PROC [s: STREAM, d: PrivateStreamData, index: INT] ~ {
ENABLE UNWIND => NULL;
IF d.closed THEN RaiseClosed[s];
IF d.outBuf.length > 0 THEN FlushBuf[s, d];
};
MyOutSetIndex: PROC [self: STREAM, index: INT] ~ {
MyEntryOutSetIndex[self, PrivateStreamDataFromStream[self], index];
};
MyEntryInOutSetIndex: ENTRY PROC [s: STREAM, d: PrivateStreamData, index: INT] ~ {
ENABLE UNWIND => NULL;
IF d.closed THEN RaiseClosed[s];
IF d.outBuf.length > 0 THEN FlushBuf[s, d];
d.inBufIndex ¬ d.inBuf.length;
d.inIndex ¬ d.outIndex ¬ index;
};
MyInOutSetIndex: PROC [self: STREAM, index: INT] ~ {
MyEntryInOutSetIndex[self, PrivateStreamDataFromStream[self], index];
};
myInStreamProcs: REF IO.StreamProcs ¬ IO.CreateStreamProcs[
variety~input,
class~myClass,
getChar~MyGetChar,
unsafeGetBlock~MyUnsafeGetBlock,
endOf~MyEndOf,
charsAvail~MyCharsAvail,
reset~MyReset,
getIndex~MyInGetIndex,
setIndex~MyInSetIndex,
close~MyClose
];
myOutStreamProcs: REF IO.StreamProcs ¬ IO.CreateStreamProcs[
variety~output,
class~myClass,
putChar~MyPutChar,
unsafePutBlock~MyUnsafePutBlock,
flush~MyFlush,
getIndex~MyOutGetIndex,
setIndex~MyOutSetIndex,
close~MyClose
];
myInOutStreamProcs: REF IO.StreamProcs ¬ IO.CreateStreamProcs[
variety~inputOutput,
class~myClass,
getChar~MyGetChar,
unsafeGetBlock~MyUnsafeGetBlock,
endOf~MyEndOf,
charsAvail~MyCharsAvail,
reset~MyReset,
putChar~MyPutChar,
unsafePutBlock~MyUnsafePutBlock,
flush~MyFlush,
getIndex~MyInGetIndex, -- MyOutGetIndex?
setIndex~MyInOutSetIndex,
close~MyClose
];
Finalization
finalizationQueue: FinalizeOps.CallQueue ¬ FinalizeOps.CreateCallQueue[ProcStreamFinalizer];
ProcStreamFinalizer: FinalizeOps.FinalizeProc ~ {
s: IO.STREAM ¬ NARROW[object];
IO.Close[s, TRUE ! IO.Error => CONTINUE];
};
Stream Creation
DefaultCloseProc: CloseProc -- [clientData, abort] RETURNS[ok] -- ~ { RETURN [0] };
DefaultGetErrorProc: GetErrorProc -- [clientData, code] RETURNS[ec, msg] -- ~ {
IF code = 0 THEN RETURN [Null, "success"];
IF code < 0 THEN code ¬ -code;
RETURN [Failure, IO.PutFR1["failure %g", [integer[code]]] ];
};
CreateProcStreamInner: PROC [variety: IO.StreamVariety, clientData: REF, get: GetProc, put: PutProc, close: CloseProc, getError: GetErrorProc, oldStream: STREAM, canSetIndex: BOOL, shared: BOOL] RETURNS [stream: STREAM ¬ NIL] ~ {
d: PrivateStreamData;
IF get = NIL THEN {
IF variety # output THEN RaiseError[oldStream, NotImplementedForThisStream, 0, "missing get proc for input stream"];
};
IF put = NIL THEN {
IF variety # input THEN RaiseError[oldStream, NotImplementedForThisStream, 0, "missing put proc for output stream"];
};
IF close = NIL THEN close ¬ DefaultCloseProc;
IF getError = NIL THEN getError ¬ DefaultGetErrorProc;
IF oldStream # NIL THEN {
ENABLE IO.Error => CONTINUE;
oldVariety: IO.StreamVariety;
oldClass: ATOM;
[oldVariety, oldClass] ¬ IO.GetInfo[oldStream];
IF (oldVariety = variety) AND (oldClass = myClass) THEN stream ¬ oldStream;
};
IF stream # NIL
THEN {
NARROW[oldStream.streamData];
}
ELSE {
theStreamProcs: REF IO.StreamProcs;
d ¬ NEW[PrivateStreamDataRecord];
theStreamProcs ¬ ( SELECT variety FROM
input => myInStreamProcs,
output => myOutStreamProcs
ENDCASE => myInOutStreamProcs );
stream ¬ IO.CreateStream[streamProcs~theStreamProcs, streamData~d];
[] ¬ FinalizeOps.EnableFinalization[stream, finalizationQueue];
};
d.data ¬ clientData;
d.get ¬ get;
d.put ¬ put;
d.close ¬ close;
d.getError ¬ getError;
d.closed ¬ FALSE;
d.atEnd ¬ FALSE;
d.canSetIndex ¬ canSetIndex;
d.shared ¬ shared;
IF variety # output THEN {
IF d.inBuf = NIL THEN d.inBuf ¬ RefText.New[defaultInBufferLength];
d.inBuf.length ¬ 0;
};
d.inBufIndex ¬ 0;
d.inIndex ¬ 0;
IF variety # input THEN {
IF d.outBuf = NIL THEN d.outBuf ¬ RefText.New[defaultOutBufferLength];
d.outBuf.length ¬ 0;
};
d.outIndex ¬ 0;
d.ec ¬ Null;
d.errCode ¬ 0;
d.errMsg ¬ NIL;
};
PIS: PUBLIC PROC [clientData: REF, get: GetProc, close: CloseProc, getError: GetErrorProc, oldStream: STREAM, canSetIndex: BOOL] RETURNS [in: STREAM] ~ {
in ¬ CreateProcStreamInner[input, clientData, get, NIL, close, getError, oldStream, canSetIndex, FALSE];
};
POS: PUBLIC PROC [clientData: REF, put: PutProc, close: CloseProc, getError: GetErrorProc, oldStream: STREAM, canSetIndex: BOOL] RETURNS [out: STREAM] ~ {
out ¬ CreateProcStreamInner[output, clientData, NIL, put, close, getError, oldStream, canSetIndex, FALSE];
};
PIOS: PUBLIC PROC [clientData: REF, get: GetProc, put: PutProc, close: CloseProc, getError: GetErrorProc, oldStream: STREAM, canSetIndex: BOOL, shared: BOOL] RETURNS [inOut: STREAM] ~ {
inOut ¬ CreateProcStreamInner[inputOutput, clientData, get, put, close, getError, oldStream, canSetIndex, shared];
};
GetClientData: PUBLIC PROC [self: STREAM] RETURNS [clientData: REF] ~ {
d: PrivateStreamData ¬ PrivateStreamDataFromStream[self];
RETURN[d.data];
};
GetErrorDetails: PUBLIC PROC [self: STREAM] RETURNS [ec: IO.ErrorCode, code: INT, msg: ROPE] ~ {
d: PrivateStreamData ¬ PrivateStreamDataFromStream[self];
RETURN[d.ec, d.errCode, d.errMsg];
};
}.