Stream Implementation
BoundaryCondition: TYPE ~ { none, attention, endOfStream };
PrivateStreamData: TYPE ~ REF PrivateStreamDataRecord;
PrivateStreamDataRecord:
TYPE ~
MONITORED RECORD [
sd: NSS.StreamDescriptor,
sibling: PrivateStreamData,
streamIndex: INT ¬ 0,
buffer: REF TEXT ¬ NIL,
bufferIndex: CARDINAL ¬ 0,
localClose: BOOL ¬ FALSE,
boundaryCondition: BoundaryCondition ¬ none
];
NewPrivateStreamDataPair:
PROC [sd:
NSS.StreamDescriptor, inBufLen:
CARDINAL, outBufLen:
CARDINAL]
RETURNS [dIn, dOut: PrivateStreamData] ~ {
dIn ¬ NEW[ PrivateStreamDataRecord ¬ [sd~sd] ];
dOut ¬ NEW[ PrivateStreamDataRecord ¬ [sd~sd] ];
dIn.sibling ¬ dOut;
dOut.sibling ¬ dIn;
dIn.buffer ¬ RefText.New[inBufLen];
dOut.buffer ¬ RefText.New[outBufLen];
};
NetworkStreamDataFromStream:
PROC [s:
STREAM]
RETURNS [
NS.NetworkStreamData] ~
INLINE {
RETURN [NARROW[s.streamData]] };
PrivateStreamDataFromStream:
PROC [s:
STREAM]
RETURNS [PrivateStreamData] ~
INLINE {
RETURN [NARROW[NARROW[s.streamData, NS.NetworkStreamData].data]] };
PrivateStreamDataFromNetworkStreamData:
PROC [d:
NS.NetworkStreamData]
RETURNS [PrivateStreamData] ~
INLINE {
RETURN [NARROW[d.data]] };
RaiseClosed:
PROC [s:
STREAM] ~ {
NS.RaiseIOError[StreamClosed, s, LIST[$streamClosed], "stream closed"];
};
RaiseEndOf:
PROC [s:
STREAM] ~ {
NS.RaiseIOError[Failure, s, LIST[$endOfStream], "end of stream"];
};
RaiseErrorFromCC:
PROC [s:
STREAM, cc:
NSS.CompletionCode, codes:
LIST OF ATOM, msg:
ROPE] ~ {
cca: ATOM;
cca ¬ NSS.AtomFromCC[cc];
codes ¬ CONS[cca, codes];
IF (s = NIL) AND (cca # $timeout) THEN codes ¬ CONS[$networkStreamError, codes];
NS.RaiseIOError[Failure, s, codes, msg];
};
MyRegisteredCreateStreamsProc:
NS.RegisteredCreateStreamsProc
-- [registration, remote, timeout, transportParameters] RETURNS [in, out] -- ~ {
sd: NSS.StreamDescriptor ¬ NIL;
{
ENABLE UNWIND => {
IF sd # NIL THEN [] ¬ NSS.DestroyStreamDescriptor[sd, TRUE];
};
cc: NSS.CompletionCode;
tRemote, tLocal: ROPE;
nsdIn, nsdOut: NS.NetworkStreamData;
[cc, sd] ¬ NSS.CreateStreamDescriptor[remote, timeout];
IF NSS.FailureCC[cc] THEN RaiseErrorFromCC[NIL, cc, LIST[$connect], "can't connect"];
[cc, tLocal, tRemote] ¬ NSS.AddressesFromStreamDescriptor[sd];
IF NSS.FailureCC[cc] THEN RaiseErrorFromCC[NIL, cc, LIST[$getStreamInfo], "can't get local/remote address"];
[in, out] ¬ NS.CreateUninitializedStreams[registration];
nsdIn ¬ NARROW[in.streamData];
nsdOut ¬ NARROW[out.streamData];
nsdIn.local ¬ nsdOut.local ¬ tLocal;
nsdIn.remote ¬ nsdOut.remote ¬ tRemote;
[dIn~nsdIn.data, dOut~nsdOut.data] ¬ NewPrivateStreamDataPair[sd, defaultInBufferLength, defaultOutBufferLength];
};
};
FillBuf:
PROC [s:
STREAM, d: PrivateStreamData] ~ {
b: REF TEXT ¬ d.buffer;
cc: NSS.CompletionCode;
nBytes: INT;
atAttn: BOOL;
IF d.bufferIndex < b.length THEN ERROR;
TRUSTED {
[cc, nBytes] ¬
NSS.Receive[
d.sd,
[
base~LOOPHOLE[ LOOPHOLE[b, LONG POINTER]+SIZE[TEXT[0]] ],
startIndex~0,
count~b.maxLength
]
];
};
IF NSS.FailureCC[cc] THEN RaiseErrorFromCC[s, cc, LIST[$Read], "read error"];
IF nBytes < b.maxLength
THEN {
[cc, atAttn] ¬ NSS.AtIBAttn[d.sd];
IF NSS.FailureCC[cc] THEN RaiseErrorFromCC[s, cc, LIST[$AtIBAttn], "read error (AtIBAttn)"];
d.boundaryCondition ¬
SELECT
TRUE
FROM
atAttn => attention,
nBytes = 0 => endOfStream,
ENDCASE => none;
}
ELSE {
d.boundaryCondition ¬ none;
};
d.bufferIndex ¬ 0;
b.length ¬ nBytes;
};
FlushBuf:
PROC [s:
STREAM, d: PrivateStreamData] ~ {
b: REF TEXT ¬ d.buffer;
cc: NSS.CompletionCode;
TRUSTED {
cc ¬
NSS.Send[
d.sd,
[
base~LOOPHOLE[ LOOPHOLE[b, LONG POINTER]+SIZE[TEXT[0]] ],
startIndex~d.bufferIndex,
count~b.length-d.bufferIndex
]
];
};
IF NSS.FailureCC[cc] THEN RaiseErrorFromCC[s, cc, LIST[$Write], "write error"];
d.bufferIndex ¬ b.length ¬ 0;
};
MyEntryGetChar:
ENTRY
PROC [s:
STREAM, d: PrivateStreamData]
RETURNS [c:
CHAR] ~ {
ENABLE UNWIND => NULL;
b: REF TEXT ¬ d.buffer;
i: CARDINAL;
DO
IF d.localClose THEN RaiseClosed[s];
IF (i ¬ d.bufferIndex) < b.length THEN EXIT;
IF d.boundaryCondition # none THEN RaiseEndOf[s];
FillBuf[s, d];
ENDLOOP;
c ¬ b[i];
d.bufferIndex ¬ i + 1;
d.streamIndex ¬ d.streamIndex + 1;
};
MyGetChar:
PROC [self:
STREAM]
RETURNS [
CHAR] ~ {
RETURN [MyEntryGetChar[self, PrivateStreamDataFromStream[self]]];
};
MyEntryUnsafeGetBlock:
ENTRY
PROC [s:
STREAM, d: PrivateStreamData, block:
IO.UnsafeBlock]
RETURNS [nBytesRead:
INT ¬ 0] ~
CHECKED {
ENABLE UNWIND => NULL;
b: REF TEXT ¬ d.buffer;
i: CARD;
len: INT;
nBytesMoreWanted: INT ¬ block.count;
blockIndex: INT ¬ block.startIndex;
DO
IF d.localClose THEN RaiseClosed[s];
IF nBytesMoreWanted <= 0 THEN RETURN;
i ¬ d.bufferIndex;
len ¬ INT[b.length] - INT[i];
IF len <= 0
THEN {
IF d.boundaryCondition # none THEN RETURN;
FillBuf[s, d];
LOOP;
};
IF len > nBytesMoreWanted THEN len ¬ nBytesMoreWanted;
TRUSTED {
len ¬ Basics.ByteBlt[
to~[
blockPointer~LOOPHOLE[block.base],
startIndex~blockIndex,
stopIndexPlusOne~blockIndex+len
],
from~[
blockPointer~LOOPHOLE[b, LONG POINTER]+SIZE[TEXT[0]],
startIndex~i,
stopIndexPlusOne~i+len
]
];
};
d.bufferIndex ¬ i + len;
d.streamIndex ¬ d.streamIndex + len;
nBytesRead ¬ nBytesRead + len;
blockIndex ¬ blockIndex + len;
nBytesMoreWanted ¬ nBytesMoreWanted - len;
ENDLOOP;
};
MyUnsafeGetBlock:
UNSAFE
PROC [self:
STREAM, block:
IO.UnsafeBlock]
RETURNS [nBytesRead:
INT] ~ {
d: PrivateStreamData ~ PrivateStreamDataFromStream[self];
IF block.startIndex < 0 OR block.count < 0 THEN ERROR RuntimeError.BoundsFault;
RETURN [MyEntryUnsafeGetBlock[self, d, block]];
};
MyEntryEndOf:
ENTRY
PROC [s:
STREAM, d: PrivateStreamData]
RETURNS [
BOOL] ~ {
ENABLE UNWIND => NULL;
b: REF TEXT ¬ d.buffer;
IF d.localClose THEN RaiseClosed[s];
IF d.bufferIndex < b.length THEN RETURN [FALSE];
RETURN [d.boundaryCondition # none];
};
MyEndOf:
PROC [self:
STREAM]
RETURNS [
BOOL] ~ {
RETURN [MyEntryEndOf[self, PrivateStreamDataFromStream[self]]];
};
MyEntryCharsAvail:
ENTRY
PROC [s:
STREAM, d: PrivateStreamData, wait:
BOOL]
RETURNS [
INT] ~ {
ENABLE UNWIND => NULL;
b: REF TEXT ¬ d.buffer;
nLeftInBuffer: INT;
cc: NSS.CompletionCode;
inputReady: BOOL;
DO
IF d.localClose THEN RaiseClosed[s];
nLeftInBuffer ¬ INT[b.length] - INT[d.bufferIndex];
IF nLeftInBuffer > 0 THEN RETURN [nLeftInBuffer];
IF d.boundaryCondition # none THEN RETURN [INT.LAST];
[cc, inputReady] ¬ NSS.InputReady[d.sd, FALSE];
IF NSS.FailureCC[cc] THEN RaiseErrorFromCC[NIL, cc, LIST[$CharsAvail], "can't test input ready"];
IF (NOT inputReady) AND (NOT wait) THEN RETURN[0];
FillBuf[s, d];
ENDLOOP;
};
MyCharsAvail:
PROC [self:
STREAM, wait:
BOOL]
RETURNS [
INT] ~ {
RETURN [MyEntryCharsAvail[self, PrivateStreamDataFromStream[self], wait]];
};
MyEntryPutChar:
ENTRY PROC [s:
STREAM, d: PrivateStreamData, char:
CHAR] ~ {
ENABLE UNWIND => NULL;
b: REF TEXT ¬ d.buffer;
i: CARDINAL;
DO
IF d.localClose THEN RaiseClosed[s];
IF (i ¬ b.length) < b.maxLength THEN EXIT;
FlushBuf[s, d];
ENDLOOP;
d.streamIndex ¬ d.streamIndex + 1;
b.length ¬ i + 1;
b[i] ¬ char;
};
MyPutChar:
PROC [self:
STREAM, char:
CHAR] ~ {
MyEntryPutChar[self, PrivateStreamDataFromStream[self], char];
};
MyEntryUnsafePutBlock:
ENTRY
PROC [s:
STREAM, d: PrivateStreamData, block:
IO.UnsafeBlock] ~
CHECKED {
ENABLE UNWIND => NULL;
b: REF TEXT ¬ d.buffer;
i: CARD;
len: INT;
nBytesMoreToWrite: INT ¬ block.count;
blockIndex: INT ¬ block.startIndex;
DO
IF d.localClose THEN RaiseClosed[s];
IF nBytesMoreToWrite <= 0 THEN RETURN;
i ¬ b.length;
len ¬ INT[b.maxLength] - INT[i];
IF len <= 0
THEN {
FlushBuf[s, d];
LOOP;
};
IF len > nBytesMoreToWrite THEN len ¬ nBytesMoreToWrite;
TRUSTED {
len ¬ Basics.ByteBlt[
to~[
blockPointer~LOOPHOLE[b, LONG POINTER]+SIZE[TEXT[0]],
startIndex~i,
stopIndexPlusOne~i+len
],
from~[
blockPointer~LOOPHOLE[block.base],
startIndex~blockIndex,
stopIndexPlusOne~blockIndex+len
]
];
};
b.length ¬ i + len;
d.streamIndex ¬ d.streamIndex + len;
blockIndex ¬ blockIndex + len;
nBytesMoreToWrite ¬ nBytesMoreToWrite - len;
ENDLOOP;
};
MyUnsafePutBlock:
PROC [self:
STREAM, block:
IO.UnsafeBlock] ~ {
d: PrivateStreamData ~ PrivateStreamDataFromStream[self];
IF block.startIndex < 0 OR block.count < 0 THEN ERROR RuntimeError.BoundsFault;
MyEntryUnsafePutBlock[self, d, block];
};
MyEntryFlush:
ENTRY
PROC [s:
STREAM, d: PrivateStreamData] ~ {
ENABLE UNWIND => NULL;
IF d.localClose THEN RaiseClosed[s];
IF d.buffer.length > 0 THEN FlushBuf[s, d];
};
MyFlush:
PROC [self:
STREAM] ~ {
MyEntryFlush[self, PrivateStreamDataFromStream[self]];
};
MyEntryInputReset:
ENTRY PROC [s:
STREAM, d: PrivateStreamData] ~ {
ENABLE UNWIND => NULL;
n: CARDINAL;
DO
IF d.localClose THEN RETURN;
IF (n ¬ (d.buffer.length - d.bufferIndex)) > 0
THEN {
d.bufferIndex ¬ d.bufferIndex + n;
d.streamIndex ¬ d.streamIndex + n;
};
IF d.boundaryCondition # none THEN RETURN;
FillBuf[s, d];
ENDLOOP;
};
MyInputReset:
PROC [self:
STREAM] ~ {
MyEntryInputReset[self, PrivateStreamDataFromStream[self]];
};
specialLockForMustDoClose: PrivateStreamData ¬ NEW[ PrivateStreamDataRecord ¬ []];
MustDoClose:
ENTRY
PROC [real
<<to be closed>>: PrivateStreamData, d
<<special lock>>: PrivateStreamData ¬ specialLockForMustDoClose]
RETURNS [doClose:
BOOL] ~ {
mySibling: PrivateStreamData;
IF real.localClose THEN RETURN[FALSE];
real.localClose ¬ TRUE;
IF (mySibling ¬ real.sibling) = NIL THEN ERROR;
real.sibling ¬ NIL;
RETURN[ mySibling.localClose ];
};
MyIOClose:
PROC [self:
STREAM, abort:
BOOL] ~ {
d: PrivateStreamData ~ PrivateStreamDataFromStream[self];
cc: NSS.CompletionCode;
IF NOT MustDoClose[d] THEN RETURN;
[cc] ¬ NSS.DestroyStreamDescriptor[d.sd, abort];
IF NSS.FailureCC[cc] AND (NOT abort) THEN RaiseErrorFromCC[self, cc, LIST[$Close], "I/O error in close"];
};
MyOutputClose:
PROC [self:
STREAM, abort:
BOOL] ~ {
d: PrivateStreamData ~ PrivateStreamDataFromStream[self];
IF d.localClose THEN RETURN;
IF
NOT abort
THEN
IO.Flush[self
-- ! IO.Error, NS.Timeout => CONTINUE -- ];
MyIOClose[self, abort];
};
MyEntryGetIndex:
ENTRY PROC [s:
STREAM, d: PrivateStreamData]
RETURNS [
INT] ~ {
ENABLE UNWIND => NULL;
IF d.localClose THEN RaiseClosed[s];
RETURN[d.streamIndex];
};
MyGetIndex:
PROC [self:
STREAM]
RETURNS [
INT] ~ {
d: PrivateStreamData ~ PrivateStreamDataFromStream[self];
RETURN[MyEntryGetIndex[self, d]];
};
myInStreamProcs:
REF
IO.StreamProcs ¬
IO.CreateStreamProcs[
variety~input,
class~$networkStream,
getChar~MyGetChar,
unsafeGetBlock~MyUnsafeGetBlock,
endOf~MyEndOf,
charsAvail~MyCharsAvail,
reset~MyInputReset,
close~MyIOClose,
getIndex~MyGetIndex
];
myOutStreamProcs:
REF
IO.StreamProcs ¬
IO.CreateStreamProcs[
variety~output,
class~$networkStream,
putChar~MyPutChar,
unsafePutBlock~MyUnsafePutBlock,
flush~MyFlush,
close~MyOutputClose,
getIndex~MyGetIndex
];
MyEntrySetTimeout:
ENTRY
PROC [s:
STREAM, nsd:
NS.NetworkStreamData, d: PrivateStreamData, timeout:
NS.Milliseconds, signalTimeout:
BOOL, output:
BOOL] ~ {
ENABLE UNWIND => NULL;
cc: NSS.CompletionCode;
IF d.localClose THEN RaiseClosed[s];
nsd.timeout ¬ timeout;
nsd.signalTimeout ¬ signalTimeout;
IF output
THEN cc ¬ NSS.SetPutTimeout[d.sd, timeout]
ELSE cc ¬ NSS.SetGetTimeout[d.sd, timeout];
IF NSS.FailureCC[cc] THEN RaiseErrorFromCC[s, cc, LIST[$SetTimeout], "can't set timeout"];
};
MyInputSetTimeout:
NS.SetTimeoutProc
-- [stream, timeout, signalTimeout] -- ~ {
nsd: NS.NetworkStreamData ¬ NARROW[stream.streamData];
d: PrivateStreamData ¬ NARROW[nsd.data];
MyEntrySetTimeout[stream, nsd, d, timeout, signalTimeout, FALSE];
};
MyOutputSetTimeout:
NS.SetTimeoutProc
-- [stream, timeout, signalTimeout] -- ~ {
nsd: NS.NetworkStreamData ¬ NARROW[stream.streamData];
d: PrivateStreamData ¬ NARROW[nsd.data];
MyEntrySetTimeout[stream, nsd, d, timeout, signalTimeout, TRUE];
};
MyEntryGetIndexDetails:
ENTRY
PROC [s:
STREAM, d: PrivateStreamData]
RETURNS [index, bytesInBuffer:
INT] ~ {
ENABLE UNWIND => NULL;
IF d.localClose THEN RaiseClosed[s];
bytesInBuffer ¬ INT[d.buffer.length] - INT[d.bufferIndex];
index ¬ d.streamIndex;
};
MyInputGetIndexDetails:
NS.GetIndexDetailsProc
-- [stream] RETURNS [index, bufferIndex, ackIndex] -- ~ {
d: PrivateStreamData ~ PrivateStreamDataFromStream[stream];
bytesInBuffer: INT;
[index~index, bytesInBuffer~bytesInBuffer] ¬ MyEntryGetIndexDetails[stream, d];
RETURN[ index, index+bytesInBuffer, -1 ];
};
MyOutputGetIndexDetails:
NS.GetIndexDetailsProc
-- [stream] RETURNS [index, bufferIndex, ackIndex] -- ~ {
d: PrivateStreamData ~ PrivateStreamDataFromStream[stream];
bytesInBuffer: INT;
[index~index, bytesInBuffer~bytesInBuffer] ¬ MyEntryGetIndexDetails[stream, d];
RETURN[ index, index-bytesInBuffer, -1 ];
};
MyEntryGetStreamState:
ENTRY
PROC [s:
STREAM, d: PrivateStreamData, reset:
BOOL]
RETURNS [ss:
NS.StreamState] ~ {
ENABLE UNWIND => NULL;
IF d.localClose THEN RaiseClosed[s];
IF d.buffer.length > d.bufferIndex THEN RETURN [ss~open];
SELECT d.boundaryCondition
FROM
none => ss ¬ open;
attention => { ss ¬ attention; IF reset THEN d.boundaryCondition ¬ none };
endOfStream => { ss ¬ remoteClosed };
ENDCASE => ERROR;
};
MyGetStreamState:
NS.GetStreamStateProc
-- [in, reset] RETURNS [streamState, subStreamType, attentionType] -- ~ {
d: PrivateStreamData ~ PrivateStreamDataFromStream[in];
streamState ¬ MyEntryGetStreamState[in, d, reset];
subStreamType ¬ 0;
attentionType ¬ 0;
};
MyWaitAttention:
NS.WaitAttentionProc
-- [in, timeout] RETURNS [attentionType] -- ~ {
This doesn't need to (and better not) be an ENTRY proc!
d: PrivateStreamData ~ PrivateStreamDataFromStream[in];
cc: NSS.CompletionCode;
at: NS.AttentionType;
IF d.localClose THEN RaiseClosed[in];
[cc, at] ¬ NSS.WaitOOBAttn[d.sd, timeout];
IF NSS.FailureCC[cc] THEN RaiseErrorFromCC[in, cc, LIST[$WaitAttn], "error in wait attention proc"];
RETURN [0];
};
MySendAttention:
NS.SendAttentionProc
-- [out, attentionType] -- ~ {
This doesn't need to be an ENTRY proc!
d: PrivateStreamData ~ PrivateStreamDataFromStream[out];
cc: NSS.CompletionCode;
IF d.localClose THEN RaiseClosed[out];
[cc] ¬ NSS.SendAttn[d.sd, attentionType];
IF NSS.FailureCC[cc] THEN RaiseErrorFromCC[out, cc, LIST[$SendAttn], "error in send attention proc"];
};
myInNetworkStreamProcs:
NS.NetworkStreamProcs ¬
NEW[
NS.NetworkStreamProcsRecord ¬ [
setTimeout~MyInputSetTimeout,
getIndexDetails~MyInputGetIndexDetails,
getStreamState~MyGetStreamState,
waitAttention~MyWaitAttention
]
];
myOutNetworkStreamProcs:
NS.NetworkStreamProcs ¬
NEW[
NS.NetworkStreamProcsRecord ¬ [
setTimeout~MyOutputSetTimeout,
getIndexDetails~MyOutputGetIndexDetails,
sendAttention~MySendAttention
]
];
}.