XCommunicationsTCPPCedarMonitored.mesa
Copyright Ó 1988, 1991 by Xerox Corporation. All rights reserved.
Created by Christian Jacobi, July 11, 1988 2:41:31 pm PDT
Christian Jacobi, August 29, 1988 3:42:00 pm PDT
DIRECTORY
Basics, XCommunications, UXStrings, IO, Rope;
XCommunicationsTCPPCedarMonitored: CEDAR MONITOR LOCKS d USING d: REF MyDataRec
IMPORTS Basics, IO, XCommunications--, UXStrings
~ BEGIN
STREAM: TYPE = IO.STREAM;
From Sun manual:
Network Programming
Revision A, of 9 May 1988
Chapter 8.8, Figure 8.9, page 206
Initiating an Internet Domain Stream Connection
--C-- HostEnt: TYPE = MACHINE DEPENDENT RECORD [  
--see chapter 9.2, page 233
hName: UXStrings.UnixString, --official name of host
hAliases: POINTER TO PACKED ARRAY [0..0) OF UXStrings.UnixString, --alias list
hAddrtype: AFTypes,  --host address type (e.g. AF¬INET)
hLength: INT32,  --length af address in bytes
hAddrList: POINTER TO PACKED ARRAY [0..0) OF POINTER TO Basics.RawBytes --list of addresses, list null terminated; each address is hLength bytes long
];
--C-- PortNumber: TYPE = MACHINE DEPENDENT RECORD [val: Basics.HWORD];
--C-- InAddr: TYPE = PACKED ARRAY [0..4) OF BYTE; --internet address
--C-- SockAddrIn: TYPE = MACHINE DEPENDENT RECORD [
--see chapter 8.7, page 203 ...<netinet/in.h>
sinFamily: AFTypes16,
sinPort: PortNumber,
sinAddr: InAddr,
sinZero: PACKED ARRAY [0..8) OF BYTE
];
--C-- SockAddr: TYPE = MACHINE DEPENDENT RECORD [
SELECT OVERLAID * FROM
in => [in: SockAddrIn],
ENDCASE
];
IncludeUIO: PROC = TRUSTED MACHINE CODE {
"""/usr/cedar/threads/xr/threads/UIO.h""."
};
IncludeTypes: PROC = TRUSTED MACHINE CODE {
"<sys/types.h>."
};
IncludeSocket: PROC = TRUSTED MACHINE CODE {
"<sys/socket.h>."
};
IncludeIn: PROC = TRUSTED MACHINE CODE {
"<netinet/in.h>."
};
IncludeNetDB: PROC = TRUSTED MACHINE CODE {
"<netdb.h>."
};
IncludeSTDIO: PROC = TRUSTED MACHINE CODE {
"<stdio.h>."
};
AFTypes16: TYPE = INT16;
AFTypes: TYPE = INT32;
SockTypes: TYPE = INT32;
AfInet: PROC RETURNS [AFTypes] = TRUSTED MACHINE CODE {
"@AF←INET"
};
SockStream: PROC RETURNS [SockTypes] = TRUSTED MACHINE CODE {
"@SOCK←STREAM"
};
CreateSocket: PROC [domain: AFTypes, type: SockTypes, protocol: INT32 ¬ 0] RETURNS [INT32] = TRUSTED MACHINE CODE {
"XR←USocket"
};
GetHostByRope: PROC [name: Rope.ROPE] RETURNS [p: POINTER TO HostEnt] = {
GetHostByName: PROC [UXStrings.UnixString] RETURNS [POINTER TO HostEnt] = TRUSTED MACHINE CODE {
"gethostbyname"
};
p ← GetHostByName[UXStrings.Create[name].string]
};
Connect: PROC [socket: INT32, server: POINTER TO SockAddr, bytes: INT32] RETURNS [errCode: INT] = TRUSTED MACHINE CODE {
"XR←UConnect"
};
Close: PROC [socket: INT] = TRUSTED MACHINE CODE {
"XR←UClose"
};
Write: PROC [socket: INT32, buf: POINTER, nbytes: INT32] RETURNS [bytesWritten: INT] = TRUSTED MACHINE CODE {
"XR←UWrite"
};
Read: PROC [socket: INT32, buf: POINTER, nbytes: INT32] RETURNS [errorCode: INT] = TRUSTED MACHINE CODE {
"XR←URead"
};
XrUDup: PROC [socket: INT32] RETURNS [INT] = TRUSTED MACHINE CODE {
"XR←UDup"
};
Recv: PROC [socket: INT32, buf: POINTER, nbytes: INT32, flags: INT32 ¬ 0] RETURNS [errorCode: INT] = TRUSTED MACHINE CODE {
"XR←URecv"
};
myStreamProcs: REF IO.StreamProcs ¬ IO.CreateStreamProcs[
variety: inputOutput,
class: $MyTCP,
getChar: MyGetChar,
endOf: MyEndOf,
unsafeGetBlock: MyUnsafeGetBlock,
putChar: MyPutChar,
unsafePutBlock: MyUnsafePutBlock,
flush: SendNow,
close: MyClose
];
bufferSize: INT = 2000;
bufferSizeM1: INT = 1999;
MyDataRec: TYPE = MONITORED RECORD [
socket1: INT, --writer...
socket2: INT, --reader...
closed: BOOL ¬ FALSE,
bufCnt: INT ¬ 0,
server: SockAddrIn,
hp: POINTER TO HostEnt,
outBuffer: PACKED ARRAY [0..bufferSize) OF CHAR
];
MyEndOf: PROC [self: STREAM] RETURNS [BOOL] = {
RETURN [FALSE];
};
MyPutChar: PROC [self: STREAM, char: CHAR] = {
d: REF MyDataRec = NARROW[self.streamData];
IF d.closed THEN ERROR IO.Error[StreamClosed, self];
EntryPutChar[d, char];
};
MyUnsafePutBlock: PROC [self: STREAM, block: Basics.UnsafeBlock] = {
d: REF MyDataRec = NARROW[self.streamData];
IF d.closed THEN ERROR IO.Error[StreamClosed, self];
EntryUnsafePutBlock[d, block];
};
MoveBytes: UNSAFE PROC [dest: POINTER, src: POINTER, len: CARDINAL] ~ UNCHECKED MACHINE CODE{
"XR←MoveBytesDisjoint"
};
EntryPutChar: ENTRY PROC [d: REF MyDataRec, char: CHAR] = {
ENABLE UNWIND => NULL;
IF d.bufCnt>=bufferSize THEN InternalFlush[d];
d.outBuffer[d.bufCnt] ¬ char;
d.bufCnt ¬ d.bufCnt+1;
IF d.bufCnt>=bufferSize THEN InternalFlush[d];
};
EntryUnsafePutBlock: ENTRY PROC [d: REF MyDataRec, block: Basics.UnsafeBlock] = TRUSTED {
ENABLE UNWIND => NULL;
IF d.bufCnt>=bufferSize THEN InternalFlush[d];
DO
cnt: INT ¬ MIN[bufferSize-d.bufCnt, block.count];
MoveBytes[@d.outBuffer+d.bufCnt, block.base+block.startIndex, cnt];
d.bufCnt ¬ d.bufCnt + cnt;
block.startIndex ¬ block.startIndex + cnt;
block.count ¬ block.count - cnt;
IF block.count>0 THEN InternalFlush[d] ELSE EXIT
ENDLOOP;
IF d.bufCnt>=bufferSize THEN InternalFlush[d];
};
MyGetChar: PROC [self: STREAM] RETURNS [ch: CHAR] = TRUSTED {
d: REF MyDataRec = NARROW[self.streamData];
n: INT;
buff: PACKED ARRAY [0..BYTES[WORD]) OF CHAR;
IF d.closed THEN ERROR IO.Error[StreamClosed, self];
n ¬ Read[d.socket2, @buff, 1];
IF n = 0 THEN ERROR IO.EndOfStream[self];
IF n < 0 THEN ERROR;
RETURN [buff[0]];
};
MyUnsafeGetBlock: UNSAFE PROC [self: STREAM, block: Basics.UnsafeBlock] RETURNS [nBytesRead: INT ¬ 0] = {
d: REF MyDataRec = NARROW[self.streamData];
IF d.closed THEN ERROR IO.Error[StreamClosed, self];
WHILE block.count>0 DO
n: INT ¬ Read[d.socket2, LOOPHOLE[block.base+block.startIndex+nBytesRead], block.count];
IF n<=0 THEN {
IF n=0 THEN RETURN;
ERROR;
};
block.count ¬ block.count-n;
nBytesRead ¬ nBytesRead+n;
ENDLOOP
};
MyClose: PROC [self: STREAM, abort: BOOL] = {
d: REF MyDataRec = NARROW[self.streamData];
IF d.closed THEN RETURN;
IF ~abort THEN SendNow[self];
d.closed ¬ TRUE;
Close[d.socket1];
Close[d.socket2];
};
InternalFlush: INTERNAL PROC [d: REF MyDataRec] = TRUSTED {
addr: POINTER ¬ @d.outBuffer;
cnt: INT ¬ d.bufCnt; d.bufCnt ¬ 0;
WHILE cnt>0 DO
n: INT ¬ Write[d.socket1, addr, cnt];
IF n<=0 THEN ERROR;
cnt ¬ cnt-n; addr ¬ addr+n;
ENDLOOP
};
EntryFlush: ENTRY PROC [d: REF MyDataRec] = {
ENABLE UNWIND => NULL;
InternalFlush[d];
};
SendNow: PROC [self: STREAM] = {
d: REF MyDataRec = NARROW[self.streamData];
EntryFlush[d];
};
Create: PROC [base: Rope.ROPE, port: REF ANY] RETURNS [sd: XCommunications.StreamData] = TRUSTED {
portNo: INT;
d: REF MyDataRec ¬ NEW[MyDataRec];
WITH port SELECT FROM
ri: REF INT => portNo ¬ ri­;
ENDCASE => {sd.errorMsg ¬ "bad port type"; GOTO Oops};
--create socket
d.socket1 ¬ CreateSocket[AfInet[], SockStream[]];
IF d.socket1<0 THEN {sd.errorMsg ¬ "socket not created"; GOTO Oops};
--bind socket to name
d.server.sinFamily ¬ AfInet[];
d.hp ← GetHostByRope[base];
IF d.hp=NIL THEN {err ← "socket not bound"; GOTO Oops};
d.server.sinAddr ← LOOPHOLE[d.hp.hAddrList[0], POINTER TO InAddr]^;
BCopy[hp.hAddr^, @server+BYTES[WORD], hp.hLength];
d.server.sinAddr ¬ [13, 1, 100, 140]; --clarissa
d.server.sinPort ¬ [Basics.HFromInt16[portNo]];
--connect socket with remote socket
IF Connect[d.socket1, LOOPHOLE[@d.server], BYTES[SockAddrIn]] < 0 THEN {
sd.errorMsg ¬ "not connected"; GOTO Oops
};
d.socket2 ¬ XrUDup[d.socket1];
IF d.socket2=0 THEN {
Close[d.socket1];
sd.errorMsg ¬ "not connected: duplication of socket failed"; GOTO Oops
};
--create stream
sd.in ¬ sd.out ¬ IO.CreateStream[streamProcs: myStreamProcs, streamData: d];
sd.success ¬ TRUE;
sd.errorFromStream ¬ ErrorFromStream;
EXITS Oops => {}
};
ErrorFromStream: PROC [s: IO.STREAM] RETURNS [reason: Rope.ROPE] = {
reason ¬ "failure";
};
--get include files
IncludeUIO[]; IncludeSocket[]; IncludeIn[]; IncludeNetDB[]; IncludeSTDIO[];
XCommunications.RegisterCommunication[[create: Create, protocol: $TCP]];
END.