Stream Operations
This should be changed to Susie Armstrong's "SPP Convergence" protocol
at the earliest opportunity ...
LayeredHdr: TYPE ~ REF LayeredHdrObject;
LayeredHdrObject:
TYPE ~
MACHINE
DEPENDENT
RECORD [
length: HWORD,
sst: HWORD,
eom: HWORD,
attn: HWORD
];
PrivateStreamData: TYPE ~ REF PrivateStreamDataObject;
PrivateStreamDataObject:
TYPE ~
MONITORED
RECORD [
underStream: STREAM ¬ NIL,
sst: NS.SubStreamType ¬ 0,
hdrBytesProcessed: INT ¬ 0,
inHdr: LayeredHdrObject ¬ [ [0,0], [0,0], [0,0], [0,0] ],
inBytesLeft: INT ¬ 0,
outBuf: REF TEXT ¬ NIL
];
myInStreamProcs:
REF
IO.StreamProcs ¬
IO.CreateStreamProcs[
variety~input,
class~$networkStream,
getChar~MyGetChar,
unsafeGetBlock~MyUnsafeGetBlock,
endOf~MyEndOf,
charsAvail~MyCharsAvail,
reset~MyInReset,
close~MyClose,
getIndex~MyGetIndex
];
myOutStreamProcs:
REF
IO.StreamProcs ¬
IO.CreateStreamProcs[
variety~output,
class~$networkStream,
putChar~MyPutChar,
unsafePutBlock~MyUnsafePutBlock,
flush~MyFlush,
close~MyClose,
getIndex~MyGetIndex
];
PrivateStreamDataFromStream:
PROC [s:
STREAM]
RETURNS [PrivateStreamData] ~
INLINE {
RETURN [NARROW[NARROW[s.streamData, NS.NetworkStreamData].data]] };
StreamPairFromUnderStreamPair:
PROC [registration:
NS.Registration, uIn:
STREAM, uOut:
STREAM]
RETURNS [in:
STREAM, out:
STREAM] ~ {
nsdIn, nsdOut: NS.NetworkStreamData;
outBuf: REF TEXT;
[in, out] ¬ NS.CreateUninitializedStreams[registration];
nsdIn ¬ NARROW[in.streamData];
[local~nsdIn.local, remote~nsdIn.remote] ¬ NS.GetStreamInfo[uIn];
nsdIn.data ¬ NEW[PrivateStreamDataObject ¬ [underStream~uIn]];
nsdOut ¬ NARROW[out.streamData];
[local~nsdOut.local, remote~nsdOut.remote] ¬ NS.GetStreamInfo[uOut];
outBuf ¬ RefText.New[outBufBytes];
nsdOut.data ¬ NEW[PrivateStreamDataObject ¬ [underStream~uOut, outBuf~outBuf]];
};
MyRegisteredCreateStreamsProc:
NS.RegisteredCreateStreamsProc
-- [registration, remote, timeout, transportParameters] RETURNS [in, out] -- ~ {
tIn, tOut: STREAM;
nsdIn, nsdOut: NS.NetworkStreamData;
[tIn, tOut] ¬ NS.CreateStreams[registration.protocolFamily, remote, transportClassUnderMe, timeout, transportParameters]; -- this might fail, raise Error unwinding me ???
[in, out] ¬ StreamPairFromUnderStreamPair[registration, tIn, tOut];
};
Push:
INTERNAL
PROC [data: PrivateStreamData, self:
STREAM, sendIfNoData:
BOOL, sendEOM:
BOOL] ~ {
b: REF TEXT ¬ data.outBuf;
IF (b.length # 0)
OR sendIfNoData
OR sendEOM
THEN {
SendHeader[data, self, b.length, sendEOM];
IF b.length > 0 THEN IO.PutBlock[data.underStream, b, 0, b.length];
b.length ¬ 0;
};
};
SendHeader:
INTERNAL
PROC [data: PrivateStreamData, self:
STREAM, nBytes:
CARDINAL, sendEOM:
BOOL] ~ {
outHdr: LayeredHdrObject;
outHdr.length ¬ Basics.HFromCard16[nBytes];
outHdr.sst ¬ [hi~0, lo~data.sst];
outHdr.eom ¬ [hi~0, lo~IF sendEOM THEN 1 ELSE 0];
outHdr.attn ¬ [0, 0];
TRUSTED { IO.UnsafePutBlock[ data.underStream, [base~LOOPHOLE[@outHdr], startIndex~0, count~BYTES[LayeredHdrObject]] ] };
data.hdrBytesProcessed ¬ data.hdrBytesProcessed + BYTES[LayeredHdrObject];
};
ReadHeader:
INTERNAL
PROC [data: PrivateStreamData, self:
STREAM] ~ {
n: INT;
TRUSTED {
n ¬ IO.UnsafeGetBlock[ data.underStream, [base~LOOPHOLE[@data.inHdr], startIndex~0, count~BYTES[LayeredHdrObject]] ];
};
IF n # BYTES[LayeredHdrObject] THEN NS.RaiseIOError[Failure, self, LIST[$hdrErr], "hdr end of stream"];
data.hdrBytesProcessed ¬ data.hdrBytesProcessed + BYTES[LayeredHdrObject];
Consistency testing ...
SELECT Basics.Card16FromH[data.inHdr.eom]
FROM
0, 1 => NULL;
ENDCASE => NS.RaiseIOError[Failure, self, LIST[$hdrErr], "hdr eom err"];
SELECT Basics.Card16FromH[data.inHdr.attn]
FROM
0, 1 => NULL;
ENDCASE => NS.RaiseIOError[Failure, self, LIST[$hdrErr], "hdr attn err"];
SELECT Basics.Card16FromH[data.inHdr.sst]
FROM
< 256 => NULL;
ENDCASE => NS.RaiseIOError[Failure, self, LIST[$hdrErr], "hdr sst err"];
data.inBytesLeft ¬ Basics.Card16FromH[data.inHdr.length];
};
MyEntryGetChar:
ENTRY
PROC [data: PrivateStreamData, self:
STREAM]
RETURNS [c:
CHAR] ~ {
ENABLE UNWIND => NULL;
DO
IF data.inHdr.sst.lo # data.sst THEN ERROR IO.EndOfStream[self];
IF data.inBytesLeft > 0 THEN EXIT;
IF data.inHdr.eom # [0, 0] THEN ERROR IO.EndOfStream[self];
ReadHeader[data, self]; -- sets data.inBytesLeft
ENDLOOP;
c ¬ IO.GetChar[data.underStream];
data.inBytesLeft ¬ data.inBytesLeft.PRED;
};
MyGetChar:
PROC [self:
STREAM]
RETURNS [c:
CHAR] ~ {
data: PrivateStreamData ¬ PrivateStreamDataFromStream[self];
c ¬ MyEntryGetChar[data, self];
};
MyEntryEndOf:
ENTRY
PROC [data: PrivateStreamData, self:
STREAM]
RETURNS [
BOOL] ~ {
ENABLE UNWIND => NULL;
IF data.inHdr.sst.lo # data.sst THEN RETURN[TRUE];
IF data.inBytesLeft > 0 THEN RETURN[FALSE];
IF data.inHdr.eom # [0, 0] THEN RETURN[TRUE];
RETURN[ IO.EndOf[data.underStream] ];
};
MyEndOf:
PROC [self:
STREAM]
RETURNS [
BOOL] ~ {
data: PrivateStreamData ¬ PrivateStreamDataFromStream[self];
RETURN[ MyEntryEndOf[data, self] ];
};
MyEntryCharsAvail:
ENTRY
PROC [data: PrivateStreamData, self:
STREAM, wait:
BOOL]
RETURNS [charsAvail:
INT] ~ {
ENABLE UNWIND => NULL;
DO
IF data.inHdr.sst.lo # data.sst THEN RETURN[INT.LAST];
IF (data.inBytesLeft = 0) AND (data.inHdr.eom # [0, 0]) THEN RETURN[INT.LAST];
charsAvail ¬ IO.CharsAvail[data.underStream, wait];
IF IO.EndOf[data.underStream] THEN RETURN[INT.LAST];
IF data.inBytesLeft > 0 THEN RETURN[ MIN[charsAvail, data.inBytesLeft] ];
IF (charsAvail < BYTES[LayeredHdrObject]) AND (NOT wait) THEN RETURN[0];
ReadHeader[data, self];
ENDLOOP;
};
MyCharsAvail:
PROC [self:
STREAM, wait:
BOOL]
RETURNS [charsAvail:
INT] ~ {
data: PrivateStreamData ¬ PrivateStreamDataFromStream[self];
RETURN[ MyEntryCharsAvail[data, self, wait] ];
};
MyEntryUnsafeGetBlock:
ENTRY
UNSAFE
PROC [data: PrivateStreamData, self:
STREAM, block:
IO.UnsafeBlock]
RETURNS [nBytesRead:
INT ¬ 0] ~ {
ENABLE UNWIND => NULL;
nGot: INT;
count: INT ¬ block.count;
WHILE count > 0
DO
Test for EndOf ...
IF data.inHdr.sst.lo # data.sst THEN EXIT; -- substreamType change
IF data.inBytesLeft = 0
THEN {
IF data.inHdr.eom # [0, 0] THEN EXIT; -- endOfMessage
IF IO.EndOf[data.underStream] THEN EXIT; -- endOfStream
ReadHeader[data, self];
LOOP;
};
At least data.inBytesLeft bytes of data is available, so read it ...
block.count ¬ MIN[count, data.inBytesLeft];
TRUSTED { nGot ¬ IO.UnsafeGetBlock[data.underStream, block]; };
IF nGot = 0 THEN EXIT; -- race?
data.inBytesLeft ¬ data.inBytesLeft - nGot;
Update my loop counters ...
nBytesRead ¬ nBytesRead + nGot;
block.startIndex ¬ block.startIndex + nGot;
count ¬ count - nGot;
ENDLOOP;
};
MyUnsafeGetBlock:
UNSAFE
PROC [self:
STREAM, block:
IO.UnsafeBlock]
RETURNS [nBytesRead:
INT] ~ {
data: PrivateStreamData ¬ PrivateStreamDataFromStream[self];
TRUSTED { nBytesRead ¬ MyEntryUnsafeGetBlock[data, self, block] };
RETURN;
};
MyEntryPutChar:
ENTRY
PROC [data: PrivateStreamData, self:
STREAM, char:
CHAR] ~ {
ENABLE UNWIND => NULL;
b: REF TEXT ¬ data.outBuf;
IF b.length >= maxData THEN Push[data, self, FALSE, FALSE];
b[b.length] ¬ char;
b.length ¬ b.length.SUCC;
};
MyPutChar:
PROC [self:
STREAM, char:
CHAR] ~ {
data: PrivateStreamData ¬ PrivateStreamDataFromStream[self];
MyEntryPutChar[data, self, char];
};
MyEntryUnsafePutBlock:
ENTRY PROC [data: PrivateStreamData, self:
STREAM, block:
IO.UnsafeBlock] ~ {
ENABLE UNWIND => NULL;
b: REF TEXT ¬ data.outBuf;
PushB:
PROC ~ {
nToMove: INT ¬ MIN[block.count, (maxData - b.length)];
IF nToMove = 0 THEN RETURN;
TRUSTED {
startIndex: INT ¬ BYTES[TEXT[0]] + b.length;
[] ¬ Basics.ByteBlt[
to~[
blockPointer~LOOPHOLE[b],
startIndex~startIndex,
stopIndexPlusOne~(startIndex + nToMove)],
from~[
blockPointer~block.base,
startIndex~block.startIndex,
stopIndexPlusOne~(block.startIndex + nToMove)]
];
};
b.length ¬ b.length + nToMove;
block.startIndex ¬ block.startIndex + nToMove;
block.count ¬ block.count - nToMove;
};
PushB[];
IF block.count = 0 THEN RETURN;
Push[data, self, FALSE, FALSE];
WHILE block.count > maxData
DO
SendHeader[data, self, maxData, FALSE];
TRUSTED {
IO.UnsafePutBlock[data.underStream, [block.base, block.startIndex, maxData]];
};
block.startIndex ¬ block.startIndex + maxData;
block.count ¬ block.count - maxData;
ENDLOOP;
PushB[];
};
MyUnsafePutBlock:
PROC [self:
STREAM, block:
IO.UnsafeBlock] ~ {
data: PrivateStreamData ¬ PrivateStreamDataFromStream[self];
MyEntryUnsafePutBlock[data, self, block];
};
MyEntryFlush:
ENTRY PROC [data: PrivateStreamData, self:
STREAM] ~ {
ENABLE UNWIND => NULL;
Push[data, self, FALSE, FALSE];
};
MyFlush:
PROC [self:
STREAM] ~ {
data: PrivateStreamData ¬ PrivateStreamDataFromStream[self];
MyEntryFlush[data, self];
IO.Flush[data.underStream];
};
MyEntryInReset:
ENTRY
PROC [data: PrivateStreamData, self:
STREAM] ~ {
ENABLE UNWIND => NULL;
resetBufBytes: INT ~ 1024;
resetBuf: REF TEXT ¬ RefText.ObtainScratch[resetBufBytes];
nGot: INT;
{
ENABLE
UNWIND => RefText.ReleaseScratch[resetBuf];
DO
IF data.inHdr.sst.lo # data.sst THEN EXIT;
IF data.inBytesLeft = 0
THEN {
IF data.inHdr.eom # [0, 0] THEN EXIT;
IF IO.EndOf[data.underStream] THEN EXIT;
ReadHeader[data, self];
LOOP;
};
nGot ¬ IO.GetBlock[data.underStream, resetBuf, 0, data.inBytesLeft];
data.inBytesLeft ¬ data.inBytesLeft - nGot;
ENDLOOP;
};
RefText.ReleaseScratch[resetBuf];
};
MyInReset:
PROC [self:
STREAM] ~ {
data: PrivateStreamData ¬ PrivateStreamDataFromStream[self];
MyEntryInReset[data, self];
};
MyClose:
PROC [self:
STREAM, abort:
BOOL] ~ {
data: PrivateStreamData ¬ PrivateStreamDataFromStream[self];
IO.Close[data.underStream, abort];
};
MyEntryGetIndex:
ENTRY PROC [data: PrivateStreamData, self:
STREAM]
RETURNS [
INT] ~ {
ENABLE UNWIND => NULL;
RETURN[ IO.GetIndex[data.underStream] - data.hdrBytesProcessed ];
};
MyGetIndex:
PROC [self:
STREAM]
RETURNS [
INT] ~ {
data: PrivateStreamData ¬ PrivateStreamDataFromStream[self];
RETURN[ MyEntryGetIndex[data, self] ];
};
myInNetworkStreamProcs:
NS.NetworkStreamProcs ¬
NEW[
NS.NetworkStreamProcsRecord ¬ [
setTimeout~MySetTimeout,
sendSoon~MySendSoon,
getIndexDetails~MyInGetIndexDetails,
getStreamState~MyGetStreamState
waitAttention~MyWaitAttention,
waitAck~MyWaitAck,
]
];
myOutNetworkStreamProcs:
NS.NetworkStreamProcs ¬
NEW[
NS.NetworkStreamProcsRecord ¬ [
setTimeout~MySetTimeout,
sendSoon~MySendSoon,
getIndexDetails~MyOutGetIndexDetails,
setSubStreamType~MySetSubStreamType,
sendEndOfMessage~MySendEndOfMessage
sendAttention~MySendAttention,
]
];
MyGetTimeout:
NS.GetTimeoutProc
-- [stream] RETURNS [timeout, signalTimeout] -- ~ {
data: PrivateStreamData ¬ PrivateStreamDataFromStream[stream];
[timeout, signalTimeout] ¬ NS.GetTimeout[data.underStream];
};
MySetTimeout:
NS.SetTimeoutProc
-- [stream, timeout, signalTimeout] -- ~ {
data: PrivateStreamData ¬ PrivateStreamDataFromStream[stream];
NS.SetTimeout[data.underStream, timeout, signalTimeout];
};
MyEntrySendSoon:
ENTRY
PROC [data: PrivateStreamData, out:
STREAM] ~ {
ENABLE UNWIND => NULL;
Push[data, out, FALSE, FALSE];
};
MySendSoon:
NS.SendSoonProc
-- [out, when] -- ~ {
data: PrivateStreamData ¬ PrivateStreamDataFromStream[out];
MyEntrySendSoon[data, out];
NS.SendSoon[data.underStream, when];
};
MyEntryGetIndexDetails:
ENTRY
PROC [data: PrivateStreamData, stream:
STREAM, output:
BOOL]
RETURNS [index:
INT, bufferIndex:
INT, ackIndex:
INT] ~ {
ENABLE UNWIND => NULL;
[index, bufferIndex, ackIndex] ¬ NS.GetIndexDetails[data.underStream];
index ¬ index - data.hdrBytesProcessed;
IF output THEN index ¬ index + data.outBuf.length;
The following values are too high by the size of headers buffered in underStream ...
bufferIndex ¬ bufferIndex - data.hdrBytesProcessed;
IF ackIndex # (-1) THEN ackIndex ¬ ackIndex - data.hdrBytesProcessed;
};
MyInGetIndexDetails:
NS.GetIndexDetailsProc
-- [stream] RETURNS [index, bufferIndex, ackIndex] -- ~ {
data: PrivateStreamData ¬ PrivateStreamDataFromStream[stream];
[index, bufferIndex, ackIndex] ¬ MyEntryGetIndexDetails[data, stream, FALSE];
};
MyOutGetIndexDetails:
NS.GetIndexDetailsProc
-- [stream] RETURNS [index, bufferIndex, ackIndex] -- ~ {
data: PrivateStreamData ¬ PrivateStreamDataFromStream[stream];
[index, bufferIndex, ackIndex] ¬ MyEntryGetIndexDetails[data, stream, TRUE];
};
MyEntryGetStreamState:
ENTRY
PROC [data: PrivateStreamData, in:
STREAM, reset:
BOOL]
RETURNS [streamState:
NS.StreamState, subStreamType:
NS.SubStreamType, attentionType:
NS.AttentionType] ~ {
ENABLE UNWIND => NULL;
subStreamType ¬ data.sst;
attentionType ¬ 0;
DO
IF data.inHdr.sst.lo # data.sst
THEN {
IF reset THEN data.sst ¬ data.inHdr.sst.lo;
streamState ¬ subStreamTypeChange;
subStreamType ¬ data.inHdr.sst.lo;
RETURN;
};
IF data.inBytesLeft > 0
THEN {
streamState ¬ open;
RETURN;
};
IF data.inHdr.eom # [0, 0]
THEN {
IF reset THEN data.inHdr.eom ¬ [0, 0];
streamState ¬ endOfMessage;
RETURN;
};
IF
IO.EndOf[data.underStream]
THEN {
streamState ¬ remoteClosed;
RETURN;
};
IF (
IO.CharsAvail[data.underStream,
FALSE] <
BYTES[LayeredHdrObject])
THEN {
streamState ¬ open;
RETURN;
};
ReadHeader[data, in];
ENDLOOP;
};
MyGetStreamState:
NS.GetStreamStateProc
-- [in, reset] RETURNS [streamState, subStreamType, attentionType] -- ~ {
data: PrivateStreamData ¬ PrivateStreamDataFromStream[in];
[streamState, subStreamType, attentionType] ¬ MyEntryGetStreamState[data, in, reset];
};
MyEntrySetSubStreamType:
ENTRY
PROC [data: PrivateStreamData, out:
STREAM, subStreamType:
NS.SubStreamType]
~ {
ENABLE UNWIND => NULL;
IF subStreamType # data.sst
THEN {
Push[data~data, self~out, sendIfNoData~TRUE, sendEOM~FALSE];
data.sst ¬ subStreamType;
NS.SendSoon[data.underStream, 0];
};
};
MySetSubStreamType:
NS.SetSubStreamTypeProc
-- [out, subStreamType] -- ~ {
data: PrivateStreamData ¬ PrivateStreamDataFromStream[out];
IF subStreamType # data.sst
THEN
MyEntrySetSubStreamType[data, out, subStreamType];
};
MyEntrySendEndOfMessage:
ENTRY PROC [data: PrivateStreamData, out:
STREAM] ~ {
ENABLE UNWIND => NULL;
Push[data~data, self~out, sendIfNoData~TRUE, sendEOM~TRUE];
NS.SendSoon[data.underStream, 0];
};
MySendEndOfMessage:
NS.SendEndOfMessageProc
-- [out] -- ~ {
data: PrivateStreamData ¬ PrivateStreamDataFromStream[out];
MyEntrySendEndOfMessage[data, out];
};
Listeners
myListenerProcs:
NS.ListenerProcs ¬
NEW[
NS.ListenerProcsRecord ¬ [
getListenerInfo~NIL,
destroyListener~MyDestroyListener
]];
MyDestroyListener:
NS.DestroyListenerProc
-- [listener] -- ~ {
underListener: NS.Listener ¬ NARROW[listener.data];
NS.DestroyListener[underListener];
};
UnderListenerWorkerClientData: TYPE ~ REF UnderListenerWorkerClientDataObject;
UnderListenerWorkerClientDataObject:
TYPE ~
RECORD [
overListenerHandle: Finalize.Handle
];
MyUnderListenerWorkerProc:
NS.ListenerWorkerProc
-- [listener, in, out] -- ~ {
underListenerWorkerClientData: UnderListenerWorkerClientData;
overListener: NS.Listener;
newIn, newOut: STREAM;
underListenerWorkerClientData ¬ NARROW[listener.listenerWorkerClientData];
overListener ¬ NARROW[ Finalize.HandleToObject[underListenerWorkerClientData.overListenerHandle] ];
WHILE overListener.data = NIL DO Process.PauseMsec[50] ENDLOOP; -- race with CreateListener
IF overListener.listenerWorkerProc = NIL THEN RETURN;
[newIn, newOut] ¬ StreamPairFromUnderStreamPair[overListener.registration, in, out];
overListener.listenerWorkerProc[overListener, newIn, newOut];
};
dummyListenerFQ: Finalize.FinalizationQueue ¬ Finalize.NewFQ[];
MyRegisteredCreateListenerProc:
NS.RegisteredCreateListenerProc
-- [registration, local, transportParameters, listenerWorkerProc, listenerWorkerClientData] RETURNS [listener] -- ~ {
underListener: NS.Listener ¬ NIL;
disguisedListener: UnderListenerWorkerClientData;
listener ¬ NS.CreateUninitializedListener[registration];
disguisedListener ¬ NEW[UnderListenerWorkerClientDataObject ¬ [NIL] ];
disguisedListener.overListenerHandle ¬ Finalize.EnableFinalization[listener, dummyListenerFQ];
[] ¬ Finalize.DisableFinalization[disguisedListener.overListenerHandle];
underListener ¬ NS.CreateListener[registration.protocolFamily, local, transportClassUnderMe, NIL, MyUnderListenerWorkerProc, disguisedListener];
listener.local ¬ NS.GetListenerInfo[underListener].local;
listener.listenerWorkerProc ¬ listenerWorkerProc;
listener.listenerWorkerClientData ¬ listenerWorkerClientData;
listener.data ¬ underListener;
};
}.