Types
ROPE: TYPE ~ Rope.ROPE;
UnsafeBlock: TYPE ~ Basics.UnsafeBlock;
Handle: TYPE ~ SunRPC.Handle;
Object: TYPE ~ SunRPC.Object;
StreamData: TYPE ~ REF StreamDataObject;
StreamDataObject:
TYPE ~
RECORD [
in, out: IO.STREAM,
protocolFamily: ATOM ¬ NIL,
remote: Rope.ROPE ¬ NIL,
sendBuf: REF TEXT,
inLim: CARDINAL ¬ 0,
index: CARDINAL ¬ 0,
endOfRec: BOOL
];
Conversation: TYPE ~ SunRPCAuth.Conversation;
AuthFlavor:
TYPE ~ SunRPCAuth.Flavor;
nullFlavor: AuthFlavor ~ SunRPCAuth.nullFlavor;
AuthValue: TYPE ~ REF TEXT;
Error: ERROR[code: ATOM] ~ SunRPC.Error;
Parameters
networkStreamFlavor: ATOM ~ $networkStream;
dataBufBytes: CARDINAL ~ 8704;
maxAuthBytes: CARDINAL ~ SunRPCAuth.maxValueBytes;
maxRefTextLength: CARDINAL ¬ 8*1024;
maxUnsafeBlockLength: CARDINAL ¬ CARDINAL.LAST - 3; -- ensures stopIndexPlusOne fields won't overflow in calls to ByteBlt
Client Handles
Create:
PUBLIC
PROC [protocolFamily:
ATOM, remote:
ROPE]
RETURNS [h: Handle]
~ {
d: StreamData;
h ¬ NEW[Object];
h.flavor ¬ networkStreamFlavor;
h.flavorData ¬ d ¬ NEW[StreamDataObject];
h.procs ¬ streamProcs;
d.sendBuf ¬ RefText.ObtainScratch[dataBufBytes];
d.sendBuf.length ¬ BYTES[Basics.FWORD];
d.endOfRec ¬ TRUE; -- no incoming data;
{
ENABLE NetworkStream.Error, IO.Error => Error[$unreachable];
[d.in, d.out] ¬ NetworkStream.CreateStreams[protocolFamily~protocolFamily, remote~remote, transportClass~$basicStream ];
};
d.protocolFamily ¬ protocolFamily;
d.remote ¬ remote;
[] ¬ FinalizeOps.EnableFinalization[h, fQueue];
};
GetRemote:
PUBLIC
PROC [h: Handle]
RETURNS [protocolFamily:
ATOM, remote:
ROPE]
~ {
d: StreamData ¬ NARROW[h.flavorData];
protocolFamily ¬ d.protocolFamily;
remote ¬ d.remote;
};
Destroy:
PUBLIC
PROC [h: Handle] ~ {
WITH h.flavorData
SELECT
FROM
d: StreamData => {
Shutdown[d];
d.in ¬ d.out ¬ NIL;
};
ENDCASE;
};
Shutdown:
PROC [d: StreamData] ~ {
IF d.in # NIL THEN { d.in.Close[abort~TRUE] };
IF d.out # NIL THEN { d.out.Close[abort~TRUE] };
};
Class procs
streamProcs:
REF SunRPC.ProcsObject ~
NEW[ SunRPC.ProcsObject ¬ [
destroy~Destroy,
sendCallAndReceiveReply~SendCallAndReceiveReply,
receiveAnotherReply~ReceiveAnotherReply,
releaseReply~ReleaseReply,
bytesRemaining~CantGetBytesRemaining,
getByte~GetByte,
getAlign~GetAlign,
getH~GetH,
getF~GetF,
unsafeGetBlock~UnsafeGetBlock,
getBlock~GetBlock,
putByte~PutByte,
putAlign~PutAlign,
putH~PutH,
putF~PutF,
unsafePutBlock~UnsafePutBlock,
putBlock~PutBlock,
prepareForMessage~PrepareForMessage
]];
SendCallAndReceiveReply:
PROC [h: Handle, timeoutMsec:
CARD¬
CARD.
LAST, retries:
CARD¬0] ~ {
SendAndReceive[h, TRUE];
};
ReceiveAnotherReply:
PROC [h: Handle, timeoutMsec:
CARD¬
CARD.
LAST]
~ {
SendAndReceive[h, FALSE];
};
SendAndReceive:
PROC [h: Handle, doSend:
BOOL] ~ {
errorCode: ATOM ¬ NIL;
replyVerifier: AuthValue;
IF doSend
THEN {
SendSeg[h, TRUE];
};
SkipRec[h];
{
Is it a reply message for this call?
{
ENABLE Error => {
SELECT code
FROM
$unreachable => errorCode ¬ $unreachable;
ENDCASE => errorCode ¬ $protocolError;
GOTO Done
};
returnedXid, returnedMsgType: CARD32;
returnedXid ¬ SunRPC.GetCard32[h];
returnedMsgType ¬ SunRPC.GetCard32[h];
IF (returnedXid # h.xid) OR (returnedMsgType # ORD[RPCT.MsgType.reply]) THEN { errorCode ¬ $protocolError; GOTO Done } -- out of synch;
};
At this point, committed to accepting the reply message. Parse it, switching on replyStat ...
{
ENABLE Error => { errorCode ¬ $protocolError;
GOTO Done };
replyStat, acceptStat, rejectStat, authStat: CARD32;
replyFlavor: AuthFlavor;
SELECT (replyStat ¬ SunRPC.GetCard32[h])
FROM
ORD[
RPCT.ReplyStat.msgAccepted] => {
[replyFlavor, replyVerifier] ¬ SunRPC.GetAuth[h];
errorCode ¬
SELECT SunRPCAuth.CheckReplyVerifier[
NARROW[h.authData], replyFlavor, replyVerifier]
FROM
ok => NIL,
badVerifier => $badReplyVerifier,
wrongVerifier => $wrongReplyVerifier,
ENDCASE => ERROR Error [$bugSendAndReceive];
IF errorCode # NIL THEN GOTO Done;
acceptStat ¬ SunRPC.GetCard32[h];
errorCode ¬
SELECT acceptStat
FROM
ORD[RPCT.AcceptStat.success] => NIL, -- winner!
ORD[RPCT.AcceptStat.progUnavail] => $wrongProgram,
ORD[RPCT.AcceptStat.progMismatch] => $wrongProgramVersion,
ORD[RPCT.AcceptStat.procUnavail] => $wrongProc,
ENDCASE => $protocolError;
GOTO Done;
};
ORD[
RPCT.ReplyStat.msgDenied] => {
SELECT (rejectStat ¬ SunRPC.GetCard32[h])
FROM
ORD[
RPCT.RejectStat.rpcMismatch] => {
errorCode ¬ $wrongRPCVersion;
};
ORD[
RPCT.RejectStat.authError] => {
authStat ¬ SunRPC.GetCard32[h];
errorCode ¬
SELECT authStat
FROM
ORD[RPCT.AuthStat.authBadcred] => $badCredentials,
ORD[RPCT.AuthStat.authRejectedcred] => $wrongCredentials,
ORD[RPCT.AuthStat.authBadverf] => $badVerifier,
ORD[RPCT.AuthStat.authRejectedverf] => $wrongVerifier,
ORD[RPCT.AuthStat.authTooweak] => $weakCredentials,
ENDCASE => $protocolError;
};
ENDCASE => {
errorCode ¬ $protocolError
};
GOTO Done;
};
ENDCASE => {
errorCode ¬ $protocolError;
GOTO Done;
};
};
};
IF replyVerifier #
NIL
THEN { RefText.ReleaseScratch[replyVerifier]; replyVerifier ¬ NIL };
IF errorCode # NIL THEN { ERROR Error[errorCode] };
};
ReleaseReply:
PROC [h: Handle] ~ {
h.authData ¬ NIL; -- help finalization
};
CantGetBytesRemaining:
PROC [h: Handle]
RETURNS [bytes:
CARDINAL] ~ {
Error[$wrongFlavor];
};
Server Registration
Server: TYPE ~ SunRPC.Server;
ServerObject: TYPE ~ SunRPC.ServerObject;
StreamServer: TYPE ~ REF StreamServerObject;
StreamServerObject:
TYPE ~
MONITORED
RECORD [
listener: NetworkStream.Listener
];
myServerMgtProcs: SunRPC.ServerMgtProcs ~
NEW[SunRPC.ServerMgtProcsObject ¬ [
destroyServer~DestroyServer
]];
CreateServer:
PUBLIC
PROC [pgm, version:
CARD, serverProc: SunRPCOnNetworkStream.ServerProc, protocolFamily:
ATOM, local:
ROPE ¬
NIL, clientData:
REF]
RETURNS [s: Server]
~ {
sd: StreamServer ~ NEW[StreamServerObject];
s ¬ NEW[ServerObject];
s.flavor ¬ networkStreamFlavor;
s.flavorData ¬ sd;
s.pgm ¬ pgm;
s.version ¬ version;
s.serverProc ¬ serverProc;
createdServers ¬ createdServers.SUCC;
s.clientData ¬ clientData;
sd.listener ¬ NetworkStream.CreateListener[protocolFamily~protocolFamily, local~local, transportClass~$basicStream, listenerWorkerProc~Serve, listenerWorkerClientData~s];
[] ¬ FinalizeOps.EnableFinalization[s, fQueue];
};
GetServerAddress:
PUBLIC
PROC [s: Server]
RETURNS [protocolFamily:
ATOM, local:
ROPE ] ~ {
sN: Server ~ IF s.flavor=networkStreamFlavor THEN s ELSE Error[$wrongFlavor];
sd: StreamServer ~ NARROW[s.flavorData];
[protocolFamily~protocolFamily, local~local] ¬ NetworkStream.GetListenerInfo[sd.listener];
};
DestroyServer:
PROC [s: Server] ~ {
sd: StreamServer ~ NARROW[s.flavorData];
LockedDestroyServer:
ENTRY
PROC [sd: StreamServer] ~ {
IF NOT s.dead THEN destroyedServers ¬ destroyedServers.SUCC;
NetworkStream.DestroyListener[sd.listener];
sd.listener ¬ NIL;
s.dead ¬ TRUE;
};
LockedDestroyServer[sd];
};
Server Finalization
Statistics
createdServers: CARD ¬ 0;
droppedServers: CARD ¬ 0;
destroyedServers: CARD ¬ 0;
finishedServers: CARD ¬ 0;
FinalizeServer:
PROC [s: Server] = {
finalization is probably useless for these servers
IF NOT s.dead
THEN {
-- Can't happen unless the daemons have failed for some reason ...
droppedServers ¬ droppedServers.SUCC;
[] ¬ FinalizeOps.EnableFinalization[s, fQueue];
DestroyServer[s];
}
ELSE {
-- Normal end of life
finishedServers ¬ finishedServers.SUCC;
};
};
Servers
Serve: NetworkStream.ListenerWorkerProc ~ {
PROC [listener: Listener, in: STREAM, out: STREAM];
s: Server ¬ NARROW[NetworkStream.GetListenerInfo[listener].clientData];
sd: StreamServer ~ NARROW[s.flavorData];
h: Handle;
d: StreamData;
stillServing: BOOL ¬ TRUE;
nb: it's not clear from the NetworkStream interface whether this is called in a forked process or not; ajd says it is.
nb: this architecture binds the rpc program to the listener and hence to the session existing here. Alternatively, we could allows calls to different programs to be intermixed on a single pair of streams. Ajd says the latter appeals to his biases, but the former is closer to what Sun's rpc on tcp servers do.
Aquire handle for the session ...
h ¬ NEW[Object];
h.flavor ¬ networkStreamFlavor;
h.flavorData ¬ d ¬ NEW[StreamDataObject];
h.procs ¬ streamProcs;
d.sendBuf ¬ RefText.ObtainScratch[dataBufBytes];
d.sendBuf.length ¬ BYTES[Basics.FWORD];
d.in ¬ in;
d.out ¬ out;
WHILE stillServing
AND
NOT s.dead
DO
ENABLE {
IO.Error,
IO.EndOfStream => {
stillServing ¬ FALSE;
LOOP;
};
Error => {
SELECT code
FROM
$unreachable => stillServing ¬ FALSE;
ENDCASE => SkipRec[h];
LOOP;
};
};
msgType: CARD32;
h.xid ¬ SunRPC.GetCard32[h];
IF s.dead THEN LOOP;
msgType ¬ SunRPC.GetCard32[h];
SELECT
TRUE
FROM
(msgType #
ORD[
RPCT.MsgType.call]) => {
Error[$protocolError];
};
ENDCASE => {
CallServerProcAndSendReply[s, h];
};
ENDLOOP;
Shutdown[d];
};
CallServerProcAndSendReply:
PROC [s: Server, h: Handle] ~ {
errorCode: ATOM;
credentials, verifier: AuthValue;
sendReply: BOOL ¬ TRUE;
BEGIN
{
ENABLE Error => { sendReply ¬
FALSE;
GOTO Out };
rpcvers, prog, vers, proc: CARD32;
cFlavor, vFlavor: AuthFlavor;
authResult: SunRPCAuth.AuthenticateResult;
conversation: Conversation;
Check RPC version (else we can't parse the message)
rpcvers ¬ SunRPC.GetCard32[h];
IF (rpcvers #
RPCT.rpcVersion)
THEN { errorCode ¬ $wrongRPCVersion; GOTO Reply };
Get <prog, vers, proc>. There's nothing we can do with them yet, until we've examined the credentials and verifier, but that's the way they defined the protocol.
prog ¬ SunRPC.GetCard32[h];
vers ¬ SunRPC.GetCard32[h];
proc ¬ SunRPC.GetCard32[h];
Get credentials ...
[cFlavor, credentials] ¬ SunRPC.GetAuth[h
! Error => { errorCode ¬ $badCredentials; GOTO Reply }];
Get verifier ...
[vFlavor, verifier] ¬ SunRPC.GetAuth[h
! Error => { errorCode ¬ $badVerifier; GOTO Reply }];
Authenticate ...
[authResult, h.authFlavor, h.authData, conversation] ¬ SunRPCAuth.Authenticate[cFlavor, credentials, vFlavor, verifier];
IF authResult # ok
THEN {
errorCode ¬
SELECT authResult
FROM
badCredentials => $badCredentials,
wrongCredentials => $wrongCredentials,
badVerifier => $badVerifier,
wrongVerifier => $wrongVerifier,
ENDCASE => ERROR Error [$bugCallServerProcAndSendReply1];
GOTO Reply;
};
Check program, version ...
IF prog # s.pgm
THEN { errorCode ¬ $wrongProgram; GOTO Reply };
IF vers # s.version
THEN { errorCode ¬ $wrongProgramVersion; GOTO Reply };
Call the server proc!
[doReply~sendReply] ¬ s.serverProc[h, conversation, proc, s.clientData
! Error => { errorCode ¬ code; CONTINUE }];
};
{
ENABLE Error =>
ERROR Error [$bugCallServerProcAndSendReply2];
-- bug
SELECT errorCode
FROM
NIL => NULL;
$wrongRPCVersion => {
SunRPC.StartRejectReply[h, ORD[RPCT.RejectStat.rpcMismatch]];
SunRPC.PutCard32[h, RPCT.rpcVersion];
SunRPC.PutCard32[h, RPCT.rpcVersion];
};
$badCredentials => {
SunRPC.StartRejectReply[h, ORD[RPCT.RejectStat.authError]];
SunRPC.PutCard32[h, ORD[RPCT.AuthStat.authBadcred]];
};
$wrongCredentials => {
SunRPC.StartRejectReply[h, ORD[RPCT.RejectStat.authError]];
SunRPC.PutCard32[h, ORD[RPCT.AuthStat.authRejectedcred]];
};
$badVerifier => {
SunRPC.StartRejectReply[h, ORD[RPCT.RejectStat.authError]];
SunRPC.PutCard32[h, ORD[RPCT.AuthStat.authBadverf]];
};
$wrongVerifier => {
SunRPC.StartRejectReply[h, ORD[RPCT.RejectStat.authError]];
SunRPC.PutCard32[h, ORD[RPCT.AuthStat.authRejectedverf]];
};
$weakCredentials => {
SunRPC.StartRejectReply[h, ORD[RPCT.RejectStat.authError]];
SunRPC.PutCard32[h, ORD[RPCT.AuthStat.authTooweak]];
};
$wrongProgram => {
SunRPC.StartAcceptReply[h, ORD[RPCT.AcceptStat.progUnavail]];
};
$wrongProgramVersion => {
SunRPC.StartAcceptReply[h, ORD[RPCT.AcceptStat.progMismatch]];
SunRPC.PutCard32[h, s.version];
SunRPC.PutCard32[h, s.version];
};
$wrongProc => {
SunRPC.StartAcceptReply[h, ORD[RPCT.AcceptStat.procUnavail]];
};
$abortWithoutReturn, $unreachable => {
sendReply ¬ FALSE;
};
ENDCASE => {
SunRPC.StartAcceptReply[h, ORD[RPCT.AcceptStat.garbageArgs]];
};
};
END;
IF sendReply
THEN
-- send the reply -- {
SendSeg[h, TRUE];
};
SkipRec[h]; -- consume any remaining bytes of this call
h.authData ¬ NIL; -- help finalization
IF credentials # NIL THEN { RefText.ReleaseScratch[credentials]; credentials ¬ NIL };
IF verifier # NIL THEN { RefText.ReleaseScratch[verifier]; verifier ¬ NIL };
};
Serializing / Deserializing
Refill:
PROC [h: Handle] ~ {
When this returns there must be at least 1 byte in the current segment for GetByte to work.
ENABLE IO.Error, IO.EndOfStream => { Shutdown[NARROW[h.flavorData]]; Error[$unreachable]; };
d: StreamData ¬
NARROW[h.flavorData];
segSize: CARD ¬ 0;
IF d.index # d.inLim THEN ERROR Error [$bugRefill]; -- bug
WHILE d.index = d.inLim
DO
IF d.endOfRec THEN Error[$outOfData];
segSize ¬ Basics.Card32FromF[d.in.GetFWord[]];
d.endOfRec ¬ segSize >= 80000000H;
segSize ¬ Basics.BITAND[segSize, 7FFFFFFFH];
IF segSize > 64*1024 THEN ERROR Error [$bugRefill]; -- bug
d.inLim ¬ segSize;
d.index ¬ 0;
ENDLOOP;
};
SkipRec:
PROC [h: Handle] ~ {
d: StreamData ¬ NARROW[h.flavorData];
WHILE
NOT d.endOfRec
DO
SkipSeg[h];
Refill[h];
ENDLOOP;
SkipSeg[h];
d.endOfRec ¬ FALSE;
};
maxSkipBuf: CARD ~ 1024;
SkipSeg:
PROC [h: Handle] ~
TRUSTED {
d: StreamData ¬ NARROW[h.flavorData];
WHILE d.inLim - d.index # 0
DO
skipSize: INTEGER ¬ MIN[d.inLim - d.index, maxSkipBuf];
skipBuf: ARRAY [0..maxSkipBuf) OF CHAR;
skipped: INT ¬ d.in.UnsafeGetBlock[[base~LOOPHOLE[@skipBuf], startIndex~0, count~skipSize]];
d.index ¬ d.index+skipped;
ENDLOOP;
};
GetByte:
PROC [h: Handle]
RETURNS [byte:
BYTE] ~ {
d: StreamData ¬ NARROW[h.flavorData];
IF (d.index+BYTES[BYTE]) > d.inLim THEN Refill[h];
byte ¬ d.in.GetByte[];
d.index ¬ d.index+BYTES[BYTE];
};
GetH:
PROC [h: Handle]
RETURNS [hword: Basics.
HWORD] ~ {
d: StreamData ¬ NARROW[h.flavorData];
IF (d.index+
BYTES[Basics.
HWORD]) > d.inLim
THEN {
hword.hi ¬ GetByte[h];
hword.lo ¬ GetByte[h];
}
ELSE {
hword ¬ d.in.GetHWord[];
d.index ¬ d.index+BYTES[Basics.HWORD];
};
};
GetF:
PROC [h: Handle]
RETURNS [fword: Basics.
FWORD] ~ {
d: StreamData ¬ NARROW[h.flavorData];
IF (d.index+
BYTES[Basics.
FWORD]) > d.inLim
THEN {
fword.hi.hi ¬ GetByte[h];
fword.hi.lo ¬ GetByte[h];
fword.lo.hi ¬ GetByte[h];
fword.lo.lo ¬ GetByte[h];
}
ELSE {
fword ¬ d.in.GetFWord[];
d.index ¬ d.index+BYTES[Basics.FWORD];
};
};
UnsafeGetBlock:
UNSAFE
PROC [h: Handle, block: UnsafeBlock]
~ UNCHECKED {
delta: INT ¬ block.startIndex / BYTES[WORD]; -- delta is in WORDs
block.base ¬ block.base + delta*UNITS[WORD]; -- base is in UNITs
block.startIndex ¬ (block.startIndex - delta*BYTES[WORD]); -- index is in BYTEs
IF (CARD[block.count] > maxUnsafeBlockLength) THEN ERROR Error[$outOfData];
UnsafeGetBlockInner[h, block];
};
UnsafeGetBlockInner:
UNSAFE
PROC [h: Handle, block: UnsafeBlock
]
~ UNCHECKED {
d: StreamData ¬ NARROW[h.flavorData];
bytesMoved: INT ¬ 0;
WHILE bytesMoved < block.count
DO
IF d.inLim-d.index = 0 THEN Refill[h];
{
try: CARD ¬ MIN[block.count-bytesMoved, d.inLim-d.index];
got: CARD ¬ d.in.UnsafeGetBlock[[base~block.base, startIndex~block.startIndex+bytesMoved, count~try]];
IF got#try THEN Error[$outOfData];
bytesMoved ¬ bytesMoved+got;
d.index ¬ d.index + got;
};
ENDLOOP;
};
GetBlock:
PROC [h: Handle, block:
REF
TEXT, startIndex, count:
CARDINAL] ~ {
IF startIndex > block.length THEN ERROR Error [$badStart];
count ¬ MIN[count, block.maxLength-startIndex];
TRUSTED {
UnsafeGetBlockInner[h,
[base~TextPtrFromRefText[block], startIndex~startIndex, count~count]];
};
block.length ¬ startIndex + count;
};
GetAlign:
PROC [h: Handle] ~ {
d: StreamData ¬ NARROW[h.flavorData];
WHILE (d.index MOD 4) # 0 DO [] ¬ GetByte[h]; ENDLOOP;
};
PutByte:
PROC [h: Handle
, byte:
BYTE] ~ {
d: StreamData ¬ NARROW[h.flavorData];
dB: REF TEXT ~ d.sendBuf;
index: CARDINAL ¬ MakeRoom[h, BYTES[BYTE]];
dB.length ¬ index + BYTES[BYTE];
dB[index] ¬ VAL[byte];
};
PutH:
PROC [h: Handle
, hword: Basics.
HWORD] ~ {
d: StreamData ¬ NARROW[h.flavorData];
dB: REF TEXT ~ d.sendBuf;
index: CARDINAL ¬ MakeRoom[h, BYTES[Basics.HWORD]];
dB.length ¬ index + BYTES[Basics.HWORD];
dB[index] ¬ VAL[hword.hi];
dB[index+1] ¬ VAL[hword.lo];
};
PutF:
PROC [h: Handle
, fword: Basics.
FWORD] ~ {
d: StreamData ¬ NARROW[h.flavorData];
dB: REF TEXT ~ d.sendBuf;
index: CARDINAL ¬ MakeRoom[h, BYTES[Basics.FWORD]];
dB.length ¬ index + BYTES[Basics.FWORD];
dB[index] ¬ VAL[fword.hi.hi];
dB[index+1] ¬ VAL[fword.hi.lo];
dB[index+2] ¬ VAL[fword.lo.hi];
dB[index+3] ¬ VAL[fword.lo.lo];
};
MakeRoom:
PROC [h: Handle, size:
CARDINAL]
RETURNS [index:
CARDINAL] ~ {
d: StreamData ¬ NARROW[h.flavorData];
IF (d.sendBuf.length + size) > d.sendBuf.maxLength
THEN {
SendSeg[h, FALSE];
};
index ¬ d.sendBuf.length;
};
SendSeg:
PROC [h: Handle, endOfRec:
BOOL] ~ {
d: StreamData ¬ NARROW[h.flavorData];
dB: REF TEXT ~ d.sendBuf;
length: Basics.FWORD ¬ Basics.FFromCard32[dB.length - BYTES[Basics.FWORD]];
IF endOfRec THEN length.hi.hi ¬ length.hi.hi+80H;
dB[0] ¬ VAL[length.hi.hi];
dB[1] ¬ VAL[length.hi.lo];
dB[2] ¬ VAL[length.lo.hi];
dB[3] ¬ VAL[length.lo.lo];
{
ENABLE IO.Error => { Shutdown[d]; Error[$unreachable] };
d.out.PutBlock[dB];
dB.length ¬ BYTES[Basics.FWORD];
IF endOfRec THEN NetworkStream.SendSoon[d.out];
};
};
UnsafePutBlock:
UNSAFE
PROC [h: Handle, block: UnsafeBlock]
~ {
d: StreamData ¬ NARROW[h.flavorData];
dB: REF TEXT ~ d.sendBuf;
bytesMoved: INT ¬ 0;
TRUSTED {
delta: INT ¬ block.startIndex / BYTES[WORD]; -- delta in WORDs
block.base ¬ block.base + delta*UNITS[WORD]; -- base in UNITs
block.startIndex ¬ (block.startIndex - delta*BYTES[WORD]); -- index in BYTEs
};
SELECT
CARD[block.count]
FROM
0 => RETURN;
> maxUnsafeBlockLength => ERROR Error[$outOfBufferSpace];
ENDCASE;
WHILE bytesMoved < block.count
DO
TRUSTED
{
thisTime: INT ¬ Basics.ByteBlt[
to~[TextPtrFromRefText[dB], dB.length, dB.maxLength], from~[block.base, block.startIndex+bytesMoved, block.startIndex+block.count]];
bytesMoved ¬ bytesMoved+thisTime;
dB.length ¬ dB.length + thisTime;
IF dB.length = dB.maxLength THEN SendSeg[h, FALSE];
};
ENDLOOP;
};
PutBlock:
PROC [h: Handle, block:
REF
READONLY
TEXT, startIndex:
CARDINAL ¬ 0, count:
CARDINAL] ~
TRUSTED {
IF startIndex > block.length THEN ERROR Error[$badStart];
count ¬ MIN[count, block.length-startIndex];
UnsafePutBlock[h, [base~TextPtrFromRefText[block], startIndex~startIndex, count~count]];
};
PutAlign:
PROC [h: Handle, padValue:
BYTE] ~ {
d: StreamData ¬ NARROW[h.flavorData];
dB: REF TEXT ~ d.sendBuf;
WHILE (dB.length
MOD 4) # 0
DO
PutByte[h, padValue];
ENDLOOP;
};
PrepareForMessage:
PROC [h: Handle] ~ {
};
}...