SunRPCOnNetworkStreamImpl.mesa
Copyright Ó 1989, 1992 by Xerox Corporation. All rights reserved.
Demers, November 23, 1988 2:45:39 pm PST
Carl Hauser, November 22, 1988 4:01:56 pm PST
Willie-Sue, March 16, 1989 7:03:43 pm PST
Michael Plass, December 30, 1991 11:21 am PST
David Nichols, January 15, 1991 5:44 pm PST
Chauser, January 14, 1992 1:51 pm PST
Russ Atkinson (RRA) July 20, 1993 7:32 pm PDT
Christian Jacobi, July 24, 1992 2:29 pm PDT
Doug Terry, July 30, 1993 10:10 am PDT
DIRECTORY
Basics,
IO,
NetworkStream,
FinalizeOps USING [CallQueue, CreateCallQueue, EnableFinalization, FinalizeProc],
RefText USING [ObtainScratch, ReleaseScratch],
Rope USING [ROPE],
SunRPCAuth USING [Authenticate, AuthenticateResult, CheckReplyVerifier, Conversation, Flavor, maxValueBytes, nullFlavor],
SunRPCNumbers USING [],
SunRPC,
SunRPCOnNetworkStream,
SunRPCType USING [AcceptStat, AuthStat, MsgType, RejectStat, ReplyStat, rpcVersion]
;
SunRPCOnNetworkStreamImpl: CEDAR MONITOR
LOCKS sd USING sd: StreamServer
IMPORTS IO, NetworkStream, Basics, FinalizeOps, RefText, SunRPC, SunRPCAuth
EXPORTS SunRPCOnNetworkStream
~ {
OPEN RPCT: SunRPCType;
Types
ROPE: TYPE ~ Rope.ROPE;
UnsafeBlock: TYPE ~ Basics.UnsafeBlock;
Handle: TYPE ~ SunRPC.Handle;
Object: TYPE ~ SunRPC.Object;
StreamData: TYPE ~ REF StreamDataObject;
StreamDataObject: TYPE ~ RECORD [
in, out: IO.STREAM,
protocolFamily: ATOM ¬ NIL,
remote: Rope.ROPE ¬ NIL,
sendBuf: REF TEXT,
inLim: CARDINAL ¬ 0,
index: CARDINAL ¬ 0,
endOfRec: BOOL
];
Conversation: TYPE ~ SunRPCAuth.Conversation;
AuthFlavor: TYPE ~ SunRPCAuth.Flavor;
nullFlavor: AuthFlavor ~ SunRPCAuth.nullFlavor;
AuthValue: TYPE ~ REF TEXT;
Error: ERROR[code: ATOM] ~ SunRPC.Error;
Parameters
networkStreamFlavor: ATOM ~ $networkStream;
dataBufBytes: CARDINAL ~ 8704;
maxAuthBytes: CARDINAL ~ SunRPCAuth.maxValueBytes;
maxRefTextLength: CARDINAL ¬ 8*1024;
maxUnsafeBlockLength: CARDINAL ¬ CARDINAL.LAST - 3; -- ensures stopIndexPlusOne fields won't overflow in calls to ByteBlt
LOOPHOLEs
TextPtrFromRefText: UNSAFE PROC [block: REF READONLY TEXT] RETURNS [LONG POINTER] ~ TRUSTED INLINE {
RETURN[ LOOPHOLE[block, LONG POINTER] + UNITS[TEXT[0]] ] };
Client Handles
Create: PUBLIC PROC [protocolFamily: ATOM, remote: ROPE] RETURNS [h: Handle]
~ {
d: StreamData;
h ¬ NEW[Object];
h.flavor ¬ networkStreamFlavor;
h.flavorData ¬ d ¬ NEW[StreamDataObject];
h.procs ¬ streamProcs;
d.sendBuf ¬ RefText.ObtainScratch[dataBufBytes];
d.sendBuf.length ¬ BYTES[Basics.FWORD];
d.endOfRec ¬ TRUE; -- no incoming data;
{
ENABLE NetworkStream.Error, IO.Error => Error[$unreachable];
[d.in, d.out] ¬ NetworkStream.CreateStreams[protocolFamily~protocolFamily, remote~remote, transportClass~$basicStream ];
};
d.protocolFamily ¬ protocolFamily;
d.remote ¬ remote;
[] ¬ FinalizeOps.EnableFinalization[h, fQueue];
};
GetRemote: PUBLIC PROC [h: Handle] RETURNS [protocolFamily: ATOM, remote: ROPE]
~ {
d: StreamData ¬ NARROW[h.flavorData];
protocolFamily ¬ d.protocolFamily;
remote ¬ d.remote;
};
Destroy: PUBLIC PROC [h: Handle] ~ {
WITH h.flavorData SELECT FROM
d: StreamData => {
Shutdown[d];
d.in ¬ d.out ¬ NIL;
};
ENDCASE;
};
Shutdown: PROC [d: StreamData] ~ {
IF d.in # NIL THEN { d.in.Close[abort~TRUE] };
IF d.out # NIL THEN { d.out.Close[abort~TRUE] };
};
Class procs
streamProcs: REF SunRPC.ProcsObject ~ NEW[ SunRPC.ProcsObject ¬ [
destroy~Destroy,
sendCallAndReceiveReply~SendCallAndReceiveReply,
receiveAnotherReply~ReceiveAnotherReply,
releaseReply~ReleaseReply,
bytesRemaining~CantGetBytesRemaining,
getByte~GetByte,
getAlign~GetAlign,
getH~GetH,
getF~GetF,
unsafeGetBlock~UnsafeGetBlock,
getBlock~GetBlock,
putByte~PutByte,
putAlign~PutAlign,
putH~PutH,
putF~PutF,
unsafePutBlock~UnsafePutBlock,
putBlock~PutBlock,
prepareForMessage~PrepareForMessage
]];
SendCallAndReceiveReply: PROC [h: Handle, timeoutMsec: CARD¬CARD.LAST, retries: CARD¬0] ~ {
SendAndReceive[h, TRUE];
};
ReceiveAnotherReply: PROC [h: Handle, timeoutMsec: CARD¬CARD.LAST] ~ {
SendAndReceive[h, FALSE];
};
SendAndReceive: PROC [h: Handle, doSend: BOOL] ~ {
errorCode: ATOM ¬ NIL;
replyVerifier: AuthValue;
IF doSend THEN {
SendSeg[h, TRUE];
};
SkipRec[h];
{
Is it a reply message for this call?
{ ENABLE Error => {
SELECT code FROM
$unreachable => errorCode ¬ $unreachable;
ENDCASE => errorCode ¬ $protocolError;
GOTO Done
};
returnedXid, returnedMsgType: CARD32;
returnedXid ¬ SunRPC.GetCard32[h];
returnedMsgType ¬ SunRPC.GetCard32[h];
IF (returnedXid # h.xid) OR (returnedMsgType # ORD[RPCT.MsgType.reply]) THEN { errorCode ¬ $protocolError; GOTO Done } -- out of synch;
};
At this point, committed to accepting the reply message. Parse it, switching on replyStat ...
{ ENABLE Error => { errorCode ¬ $protocolError; GOTO Done };
replyStat, acceptStat, rejectStat, authStat: CARD32;
replyFlavor: AuthFlavor;
SELECT (replyStat ¬ SunRPC.GetCard32[h]) FROM
ORD[RPCT.ReplyStat.msgAccepted] => {
[replyFlavor, replyVerifier] ¬ SunRPC.GetAuth[h];
errorCode ¬ SELECT SunRPCAuth.CheckReplyVerifier[NARROW[h.authData], replyFlavor, replyVerifier] FROM
ok => NIL,
badVerifier => $badReplyVerifier,
wrongVerifier => $wrongReplyVerifier,
ENDCASE => ERROR Error [$bugSendAndReceive];
IF errorCode # NIL THEN GOTO Done;
acceptStat ¬ SunRPC.GetCard32[h];
errorCode ¬ SELECT acceptStat FROM
ORD[RPCT.AcceptStat.success] => NIL, -- winner!
ORD[RPCT.AcceptStat.progUnavail] => $wrongProgram,
ORD[RPCT.AcceptStat.progMismatch] => $wrongProgramVersion,
ORD[RPCT.AcceptStat.procUnavail] => $wrongProc,
ENDCASE => $protocolError;
GOTO Done;
};
ORD[RPCT.ReplyStat.msgDenied] => {
SELECT (rejectStat ¬ SunRPC.GetCard32[h]) FROM
ORD[RPCT.RejectStat.rpcMismatch] => {
errorCode ¬ $wrongRPCVersion;
};
ORD[RPCT.RejectStat.authError] => {
authStat ¬ SunRPC.GetCard32[h];
errorCode ¬ SELECT authStat FROM
ORD[RPCT.AuthStat.authBadcred] => $badCredentials,
ORD[RPCT.AuthStat.authRejectedcred] => $wrongCredentials,
ORD[RPCT.AuthStat.authBadverf] => $badVerifier,
ORD[RPCT.AuthStat.authRejectedverf] => $wrongVerifier,
ORD[RPCT.AuthStat.authTooweak] => $weakCredentials,
ENDCASE => $protocolError;
};
ENDCASE => {
errorCode ¬ $protocolError
};
GOTO Done;
};
ENDCASE => {
errorCode ¬ $protocolError;
GOTO Done;
};
};
EXITS
Done => NULL;
};
IF replyVerifier # NIL
THEN { RefText.ReleaseScratch[replyVerifier]; replyVerifier ¬ NIL };
IF errorCode # NIL THEN { ERROR Error[errorCode] };
};
ReleaseReply: PROC [h: Handle] ~ {
h.authData ¬ NIL; -- help finalization
};
CantGetBytesRemaining: PROC [h: Handle] RETURNS [bytes: CARDINAL] ~ {
Error[$wrongFlavor];
};
Server Registration
Server: TYPE ~ SunRPC.Server;
ServerObject: TYPE ~ SunRPC.ServerObject;
StreamServer: TYPE ~ REF StreamServerObject;
StreamServerObject: TYPE ~ MONITORED RECORD [
listener: NetworkStream.Listener
];
myServerMgtProcs: SunRPC.ServerMgtProcs ~ NEW[SunRPC.ServerMgtProcsObject ¬ [
destroyServer~DestroyServer
]];
CreateServer: PUBLIC PROC [pgm, version: CARD, serverProc: SunRPCOnNetworkStream.ServerProc, protocolFamily: ATOM, local: ROPE ¬ NIL, clientData: REF] RETURNS [s: Server]
~ {
sd: StreamServer ~ NEW[StreamServerObject];
s ¬ NEW[ServerObject];
s.flavor ¬ networkStreamFlavor;
s.flavorData ¬ sd;
s.pgm ¬ pgm;
s.version ¬ version;
s.serverProc ¬ serverProc;
createdServers ¬ createdServers.SUCC;
s.clientData ¬ clientData;
sd.listener ¬ NetworkStream.CreateListener[protocolFamily~protocolFamily, local~local, transportClass~$basicStream, listenerWorkerProc~Serve, listenerWorkerClientData~s];
[] ¬ FinalizeOps.EnableFinalization[s, fQueue];
};
GetServerAddress: PUBLIC PROC [s: Server] RETURNS [protocolFamily: ATOM, local: ROPE ] ~ {
sN: Server ~ IF s.flavor=networkStreamFlavor THEN s ELSE Error[$wrongFlavor];
sd: StreamServer ~ NARROW[s.flavorData];
[protocolFamily~protocolFamily, local~local] ¬ NetworkStream.GetListenerInfo[sd.listener];
};
DestroyServer: PROC [s: Server] ~ {
sd: StreamServer ~ NARROW[s.flavorData];
LockedDestroyServer: ENTRY PROC [sd: StreamServer] ~ {
IF NOT s.dead THEN destroyedServers ¬ destroyedServers.SUCC;
NetworkStream.DestroyListener[sd.listener];
sd.listener ¬ NIL;
s.dead ¬ TRUE;
};
LockedDestroyServer[sd];
};
Server Finalization
Statistics
createdServers: CARD ¬ 0;
droppedServers: CARD ¬ 0;
destroyedServers: CARD ¬ 0;
finishedServers: CARD ¬ 0;
FinalizeServer: PROC [s: Server] = {
finalization is probably useless for these servers
IF NOT s.dead
THEN { -- Can't happen unless the daemons have failed for some reason ...
droppedServers ¬ droppedServers.SUCC;
[] ¬ FinalizeOps.EnableFinalization[s, fQueue];
DestroyServer[s];
}
ELSE { -- Normal end of life
finishedServers ¬ finishedServers.SUCC;
};
};
Servers
Serve: NetworkStream.ListenerWorkerProc ~ {
PROC [listener: Listener, in: STREAM, out: STREAM];
s: Server ¬ NARROW[NetworkStream.GetListenerInfo[listener].clientData];
sd: StreamServer ~ NARROW[s.flavorData];
h: Handle;
d: StreamData;
stillServing: BOOL ¬ TRUE;
nb: it's not clear from the NetworkStream interface whether this is called in a forked process or not; ajd says it is.
nb: this architecture binds the rpc program to the listener and hence to the session existing here. Alternatively, we could allows calls to different programs to be intermixed on a single pair of streams. Ajd says the latter appeals to his biases, but the former is closer to what Sun's rpc on tcp servers do.
Aquire handle for the session ...
h ¬ NEW[Object];
h.flavor ¬ networkStreamFlavor;
h.flavorData ¬ d ¬ NEW[StreamDataObject];
h.procs ¬ streamProcs;
d.sendBuf ¬ RefText.ObtainScratch[dataBufBytes];
d.sendBuf.length ¬ BYTES[Basics.FWORD];
d.in ¬ in;
d.out ¬ out;
WHILE stillServing AND NOT s.dead DO
ENABLE {
IO.Error, IO.EndOfStream => {
stillServing ¬ FALSE;
LOOP;
};
Error => {
SELECT code FROM
$unreachable => stillServing ¬ FALSE;
ENDCASE => SkipRec[h];
LOOP;
};
};
msgType: CARD32;
h.xid ¬ SunRPC.GetCard32[h];
IF s.dead THEN LOOP;
msgType ¬ SunRPC.GetCard32[h];
SELECT TRUE FROM
(msgType # ORD[RPCT.MsgType.call]) => {
Error[$protocolError];
};
ENDCASE => {
CallServerProcAndSendReply[s, h];
};
ENDLOOP;
Shutdown[d];
};
CallServerProcAndSendReply: PROC [s: Server, h: Handle] ~ {
errorCode: ATOM;
credentials, verifier: AuthValue;
sendReply: BOOL ¬ TRUE;
BEGIN
{ ENABLE Error => { sendReply ¬ FALSE; GOTO Out };
rpcvers, prog, vers, proc: CARD32;
cFlavor, vFlavor: AuthFlavor;
authResult: SunRPCAuth.AuthenticateResult;
conversation: Conversation;
Check RPC version (else we can't parse the message)
rpcvers ¬ SunRPC.GetCard32[h];
IF (rpcvers # RPCT.rpcVersion)
THEN { errorCode ¬ $wrongRPCVersion; GOTO Reply };
Get <prog, vers, proc>. There's nothing we can do with them yet, until we've examined the credentials and verifier, but that's the way they defined the protocol.
prog ¬ SunRPC.GetCard32[h];
vers ¬ SunRPC.GetCard32[h];
proc ¬ SunRPC.GetCard32[h];
Get credentials ...
[cFlavor, credentials] ¬ SunRPC.GetAuth[h
! Error => { errorCode ¬ $badCredentials; GOTO Reply }];
Get verifier ...
[vFlavor, verifier] ¬ SunRPC.GetAuth[h
! Error => { errorCode ¬ $badVerifier; GOTO Reply }];
Authenticate ...
[authResult, h.authFlavor, h.authData, conversation] ¬ SunRPCAuth.Authenticate[cFlavor, credentials, vFlavor, verifier];
IF authResult # ok THEN {
errorCode ¬ SELECT authResult FROM
badCredentials => $badCredentials,
wrongCredentials => $wrongCredentials,
badVerifier => $badVerifier,
wrongVerifier => $wrongVerifier,
ENDCASE => ERROR Error [$bugCallServerProcAndSendReply1];
GOTO Reply;
};
Check program, version ...
IF prog # s.pgm
THEN { errorCode ¬ $wrongProgram; GOTO Reply };
IF vers # s.version
THEN { errorCode ¬ $wrongProgramVersion; GOTO Reply };
Call the server proc!
[doReply~sendReply] ¬ s.serverProc[h, conversation, proc, s.clientData
! Error => { errorCode ¬ code; CONTINUE }];
EXITS
Reply => NULL;
};
{ ENABLE Error => ERROR Error [$bugCallServerProcAndSendReply2]; -- bug
SELECT errorCode FROM
NIL => NULL;
$wrongRPCVersion => {
SunRPC.StartRejectReply[h, ORD[RPCT.RejectStat.rpcMismatch]];
SunRPC.PutCard32[h, RPCT.rpcVersion];
SunRPC.PutCard32[h, RPCT.rpcVersion];
};
$badCredentials => {
SunRPC.StartRejectReply[h, ORD[RPCT.RejectStat.authError]];
SunRPC.PutCard32[h, ORD[RPCT.AuthStat.authBadcred]];
};
$wrongCredentials => {
SunRPC.StartRejectReply[h, ORD[RPCT.RejectStat.authError]];
SunRPC.PutCard32[h, ORD[RPCT.AuthStat.authRejectedcred]];
};
$badVerifier => {
SunRPC.StartRejectReply[h, ORD[RPCT.RejectStat.authError]];
SunRPC.PutCard32[h, ORD[RPCT.AuthStat.authBadverf]];
};
$wrongVerifier => {
SunRPC.StartRejectReply[h, ORD[RPCT.RejectStat.authError]];
SunRPC.PutCard32[h, ORD[RPCT.AuthStat.authRejectedverf]];
};
$weakCredentials => {
SunRPC.StartRejectReply[h, ORD[RPCT.RejectStat.authError]];
SunRPC.PutCard32[h, ORD[RPCT.AuthStat.authTooweak]];
};
$wrongProgram => {
SunRPC.StartAcceptReply[h, ORD[RPCT.AcceptStat.progUnavail]];
};
$wrongProgramVersion => {
SunRPC.StartAcceptReply[h, ORD[RPCT.AcceptStat.progMismatch]];
SunRPC.PutCard32[h, s.version];
SunRPC.PutCard32[h, s.version];
};
$wrongProc => {
SunRPC.StartAcceptReply[h, ORD[RPCT.AcceptStat.procUnavail]];
};
$abortWithoutReturn, $unreachable => {
sendReply ¬ FALSE;
};
ENDCASE => {
SunRPC.StartAcceptReply[h, ORD[RPCT.AcceptStat.garbageArgs]];
};
};
EXITS
Out => NULL;
END;
IF sendReply
THEN -- send the reply -- {
SendSeg[h, TRUE];
};
SkipRec[h]; -- consume any remaining bytes of this call
h.authData ¬ NIL; -- help finalization
IF credentials # NIL THEN { RefText.ReleaseScratch[credentials]; credentials ¬ NIL };
IF verifier # NIL THEN { RefText.ReleaseScratch[verifier]; verifier ¬ NIL };
};
Serializing / Deserializing
Refill: PROC [h: Handle] ~ {
When this returns there must be at least 1 byte in the current segment for GetByte to work.
ENABLE IO.Error, IO.EndOfStream => { Shutdown[NARROW[h.flavorData]]; Error[$unreachable]; };
d: StreamData ¬ NARROW[h.flavorData];
segSize: CARD ¬ 0;
IF d.index # d.inLim THEN ERROR Error [$bugRefill]; -- bug
WHILE d.index = d.inLim DO
IF d.endOfRec THEN Error[$outOfData];
segSize ¬ Basics.Card32FromF[d.in.GetFWord[]];
d.endOfRec ¬ segSize >= 80000000H;
segSize ¬ Basics.BITAND[segSize, 7FFFFFFFH];
IF segSize > 64*1024 THEN ERROR Error [$bugRefill]; -- bug
d.inLim ¬ segSize;
d.index ¬ 0;
ENDLOOP;
};
SkipRec: PROC [h: Handle] ~ {
d: StreamData ¬ NARROW[h.flavorData];
WHILE NOT d.endOfRec DO
SkipSeg[h];
Refill[h];
ENDLOOP;
SkipSeg[h];
d.endOfRec ¬ FALSE;
};
maxSkipBuf: CARD ~ 1024;
SkipSeg: PROC [h: Handle] ~ TRUSTED {
d: StreamData ¬ NARROW[h.flavorData];
WHILE d.inLim - d.index # 0 DO
skipSize: INTEGER ¬ MIN[d.inLim - d.index, maxSkipBuf];
skipBuf: ARRAY [0..maxSkipBuf) OF CHAR;
skipped: INT ¬ d.in.UnsafeGetBlock[[base~LOOPHOLE[@skipBuf], startIndex~0, count~skipSize]];
d.index ¬ d.index+skipped;
ENDLOOP;
};
GetByte: PROC [h: Handle] RETURNS [byte: BYTE] ~ {
d: StreamData ¬ NARROW[h.flavorData];
IF (d.index+BYTES[BYTE]) > d.inLim THEN Refill[h];
byte ¬ d.in.GetByte[];
d.index ¬ d.index+BYTES[BYTE];
};
GetH: PROC [h: Handle] RETURNS [hword: Basics.HWORD] ~ {
d: StreamData ¬ NARROW[h.flavorData];
IF (d.index+BYTES[Basics.HWORD]) > d.inLim THEN {
hword.hi ¬ GetByte[h];
hword.lo ¬ GetByte[h];
}
ELSE {
hword ¬ d.in.GetHWord[];
d.index ¬ d.index+BYTES[Basics.HWORD];
};
};
GetF: PROC [h: Handle] RETURNS [fword: Basics.FWORD] ~ {
d: StreamData ¬ NARROW[h.flavorData];
IF (d.index+BYTES[Basics.FWORD]) > d.inLim THEN {
fword.hi.hi ¬ GetByte[h];
fword.hi.lo ¬ GetByte[h];
fword.lo.hi ¬ GetByte[h];
fword.lo.lo ¬ GetByte[h];
}
ELSE {
fword ¬ d.in.GetFWord[];
d.index ¬ d.index+BYTES[Basics.FWORD];
};
};
UnsafeGetBlock: UNSAFE PROC [h: Handle, block: UnsafeBlock]
~ UNCHECKED {
delta: INT ¬ block.startIndex / BYTES[WORD]; -- delta is in WORDs
block.base ¬ block.base + delta*UNITS[WORD]; -- base is in UNITs
block.startIndex ¬ (block.startIndex - delta*BYTES[WORD]); -- index is in BYTEs
IF (CARD[block.count] > maxUnsafeBlockLength) THEN ERROR Error[$outOfData];
UnsafeGetBlockInner[h, block];
};
UnsafeGetBlockInner: UNSAFE PROC [h: Handle, block: UnsafeBlock]
~ UNCHECKED {
d: StreamData ¬ NARROW[h.flavorData];
bytesMoved: INT ¬ 0;
WHILE bytesMoved < block.count DO
IF d.inLim-d.index = 0 THEN Refill[h];
{
try: CARD ¬ MIN[block.count-bytesMoved, d.inLim-d.index];
got: CARD ¬ d.in.UnsafeGetBlock[[base~block.base, startIndex~block.startIndex+bytesMoved, count~try]];
IF got#try THEN Error[$outOfData];
bytesMoved ¬ bytesMoved+got;
d.index ¬ d.index + got;
};
ENDLOOP;
};
GetBlock: PROC [h: Handle, block: REF TEXT, startIndex, count: CARDINAL] ~ {
IF startIndex > block.length THEN ERROR Error [$badStart];
count ¬ MIN[count, block.maxLength-startIndex];
TRUSTED {
UnsafeGetBlockInner[h,
[base~TextPtrFromRefText[block], startIndex~startIndex, count~count]];
};
block.length ¬ startIndex + count;
};
GetAlign: PROC [h: Handle] ~ {
d: StreamData ¬ NARROW[h.flavorData];
WHILE (d.index MOD 4) # 0 DO [] ¬ GetByte[h]; ENDLOOP;
};
PutByte: PROC [h: Handle, byte: BYTE] ~ {
d: StreamData ¬ NARROW[h.flavorData];
dB: REF TEXT ~ d.sendBuf;
index: CARDINAL ¬ MakeRoom[h, BYTES[BYTE]];
dB.length ¬ index + BYTES[BYTE];
dB[index] ¬ VAL[byte];
};
PutH: PROC [h: Handle, hword: Basics.HWORD] ~ {
d: StreamData ¬ NARROW[h.flavorData];
dB: REF TEXT ~ d.sendBuf;
index: CARDINAL ¬ MakeRoom[h, BYTES[Basics.HWORD]];
dB.length ¬ index + BYTES[Basics.HWORD];
dB[index] ¬ VAL[hword.hi];
dB[index+1] ¬ VAL[hword.lo];
};
PutF: PROC [h: Handle, fword: Basics.FWORD] ~ {
d: StreamData ¬ NARROW[h.flavorData];
dB: REF TEXT ~ d.sendBuf;
index: CARDINAL ¬ MakeRoom[h, BYTES[Basics.FWORD]];
dB.length ¬ index + BYTES[Basics.FWORD];
dB[index] ¬ VAL[fword.hi.hi];
dB[index+1] ¬ VAL[fword.hi.lo];
dB[index+2] ¬ VAL[fword.lo.hi];
dB[index+3] ¬ VAL[fword.lo.lo];
};
MakeRoom: PROC [h: Handle, size: CARDINAL] RETURNS [index: CARDINAL] ~ {
d: StreamData ¬ NARROW[h.flavorData];
IF (d.sendBuf.length + size) > d.sendBuf.maxLength THEN {
SendSeg[h, FALSE];
};
index ¬ d.sendBuf.length;
};
SendSeg: PROC [h: Handle, endOfRec: BOOL] ~ {
d: StreamData ¬ NARROW[h.flavorData];
dB: REF TEXT ~ d.sendBuf;
length: Basics.FWORD ¬ Basics.FFromCard32[dB.length - BYTES[Basics.FWORD]];
IF endOfRec THEN length.hi.hi ¬ length.hi.hi+80H;
dB[0] ¬ VAL[length.hi.hi];
dB[1] ¬ VAL[length.hi.lo];
dB[2] ¬ VAL[length.lo.hi];
dB[3] ¬ VAL[length.lo.lo];
{
ENABLE IO.Error => { Shutdown[d]; Error[$unreachable] };
d.out.PutBlock[dB];
dB.length ¬ BYTES[Basics.FWORD];
IF endOfRec THEN NetworkStream.SendSoon[d.out];
};
};
UnsafePutBlock: UNSAFE PROC [h: Handle, block: UnsafeBlock]
~ {
d: StreamData ¬ NARROW[h.flavorData];
dB: REF TEXT ~ d.sendBuf;
bytesMoved: INT ¬ 0;
TRUSTED {
delta: INT ¬ block.startIndex / BYTES[WORD]; -- delta in WORDs
block.base ¬ block.base + delta*UNITS[WORD]; -- base in UNITs
block.startIndex ¬ (block.startIndex - delta*BYTES[WORD]); -- index in BYTEs
};
SELECT CARD[block.count] FROM
0 => RETURN;
> maxUnsafeBlockLength => ERROR Error[$outOfBufferSpace];
ENDCASE;
WHILE bytesMoved < block.count DO TRUSTED
{
thisTime: INT ¬ Basics.ByteBlt[
to~[TextPtrFromRefText[dB], dB.length, dB.maxLength], from~[block.base, block.startIndex+bytesMoved, block.startIndex+block.count]];
bytesMoved ¬ bytesMoved+thisTime;
dB.length ¬ dB.length + thisTime;
IF dB.length = dB.maxLength THEN SendSeg[h, FALSE];
};
ENDLOOP;
};
PutBlock: PROC [h: Handle, block: REF READONLY TEXT, startIndex: CARDINAL ¬ 0, count: CARDINAL] ~ TRUSTED {
IF startIndex > block.length THEN ERROR Error[$badStart];
count ¬ MIN[count, block.length-startIndex];
UnsafePutBlock[h, [base~TextPtrFromRefText[block], startIndex~startIndex, count~count]];
};
PutAlign: PROC [h: Handle, padValue: BYTE] ~ {
d: StreamData ¬ NARROW[h.flavorData];
dB: REF TEXT ~ d.sendBuf;
WHILE (dB.length MOD 4) # 0 DO
PutByte[h, padValue];
ENDLOOP;
};
PrepareForMessage: PROC [h: Handle] ~ {
};
Finalization
fQueue: FinalizeOps.CallQueue ¬ FinalizeOps.CreateCallQueue[Finalizer];
Finalizer: FinalizeOps.FinalizeProc = {
WITH object SELECT FROM
h: Handle => Destroy[h];
s: Server => FinalizeServer[s];
ENDCASE;
};
}...