PupChatting:
CEDAR
MONITOR
LOCKS pc USING pc: PupConversation
IMPORTS IO, Process, Protocols, PupDefs, PupStream
= {OPEN Protocols;
pupChat: Protocol ←
NEW [ProtocolRep ← [
name: "PupChat",
Connect: Connect,
Disconnect: Disconnect
]];
PupConversation: TYPE = REF PupConversationRep;
PupConversationRep:
TYPE =
MONITORED
RECORD [
serverName: ROPE,
fromClient, toClient, serverStream: IO.STREAM ← NIL,
charsSentSinceFlush: NAT ← 0,
procs: ARRAY Who OF PROCESS,
procing: ARRAY Who OF BOOL ← ALL[TRUE],
looping: ARRAY Who OF BOOL ← ALL[FALSE],
open: BOOL ← TRUE];
Who: TYPE = {uToS, sToU};
whoBar: ARRAY Who OF Who ← [uToS: sToU, sToU: uToS];
MarkByte: TYPE = [0..256);
setLineWidth: MarkByte = 2;
setPageLength: MarkByte = 3;
timingMark: MarkByte = 5;
timingMarkReply: MarkByte = 6;
Connect:
PROC
[
serverName: ROPE,
fromClient, toClient: IO.STREAM,
noteDisconnect: PROC [clientData: REF ANY] ← NIL,
clientData: REF ANY ← NIL]
RETURNS [c: Conversation] =
{
pc: PupConversation ←
NEW [PupConversationRep ← [
serverName: serverName,
fromClient: fromClient,
toClient: toClient]];
addr: PupDefs.PupAddress;
c ← NIL;
PupDefs.PupPackageMake[];
toClient.PutF["Opening connection to %g ... ", IO.rope[serverName]];
addr ← PupStream.GetPupAddress[PupTypes.telnetSoc, serverName
! PupStream.PupNameTrouble =>
{
toClient.PutRope["PUP name error"];
FinishStringWithErrorMsg[toClient, e];
GOTO Return;
}
];
pc.serverStream ← PupStream.PupByteStreamCreate[addr, PupStream.SecondsToTocks[1]
! PupStream.StreamClosing =>
{
toClient.PutRope["Can't connect"];
FinishStringWithErrorMsg[toClient, text];
GOTO Return;
}
];
toClient.PutF["open.\015\012"];
c ←
NEW [ConversationRep ← [
protocol: pupChat,
noteDisconnect: noteDisconnect, clientData: clientData,
data: pc]];
PupStream.SendMark[pc.serverStream, setLineWidth];
pc.serverStream.PutChar[0C];
pc.serverStream.Flush[];
TRUSTED {
Process.Detach[pc.procs[uToS] ← FORK UserToServer[c, pc]];
Process.Detach[pc.procs[sToU] ← FORK ServerToUser[c, pc]];
};
};
Disconnect:
PROC [c: Conversation] = {
pc: PupConversation ← NARROW[c.data];
IF pc.open THEN CloseConnection[pc, TRUE]};
flushPeriod: NAT ← 50;
UserToServer:
PROC [c: Conversation, pc: PupConversation] = {
char: CHAR;
report:
BOOL ←
TRUE;
{ENABLE UNWIND => Exit[c, pc, uToS];
SetLooping[pc, uToS, TRUE];
WHILE pc.open
DO
ENABLE {
PupStream.StreamClosing => {
IF pc.open
THEN {
pc.toClient.PutF["\015\012Connection being closed by %s", IO.rope[pc.serverName]];
FinishStringWithErrorMsg[pc.toClient, text, " ... "];
report ← FALSE;
};
EXIT;
};
IO.EndOfStream, IO.Error, ABORTED => EXIT;
};
char ← pc.fromClient.GetChar[];
IF
NOT pc.open
THEN {
pc.fromClient.Backup[char];
EXIT;
};
pc.serverStream.PutChar[char];
IF pc.charsSentSinceFlush >= flushPeriod
OR pc.fromClient.CharsAvail[] = 0
THEN {
pc.serverStream.Flush[];
pc.charsSentSinceFlush ← 0;
}
ELSE pc.charsSentSinceFlush ← pc.charsSentSinceFlush + 1;
ENDLOOP;
SetLooping[pc, uToS, FALSE];
IF pc.open THEN CloseConnection[pc, report];
};
Exit[c, pc, uToS];
};
ServerToUser:
PROC [c: Conversation, pc: PupConversation] = {
char: CHAR;
report:
BOOL ←
TRUE;
{ENABLE UNWIND => Exit[c, pc, sToU];
SetLooping[pc, sToU, TRUE];
WHILE pc.open
DO
ENABLE {
PupStream.StreamClosing => {
IF pc.open
THEN {
pc.toClient.PutF["\015\012Connection being closed by %s", IO.rope[pc.serverName]];
FinishStringWithErrorMsg[pc.toClient, text, " ... "];
report ← FALSE;
};
EXIT;
};
IO.EndOfStream, IO.Error, ABORTED => EXIT;
};
IF pc.serverStream.EndOf[]
THEN {
mySST: MarkByte;
mySST ← PupStream.ConsumeMark[pc.serverStream];
IF mySST = timingMark THEN PupStream.SendMark[pc.serverStream, timingMarkReply];
};
char ← pc.serverStream.GetChar[!
PupStream.TimeOut => RESUME;
IO.EndOfStream => LOOP;
];
IF NOT pc.open THEN EXIT;
pc.toClient.PutChar[char];
ENDLOOP;
SetLooping[pc, sToU, FALSE];
IF pc.open THEN CloseConnection[pc, report];
};
Exit[c, pc, sToU];
};
SetLooping:
ENTRY
PROC [pc: PupConversation, who: Who, ing:
BOOL] =
{pc.looping[who] ← ing};
Exit:
PROC [c: Conversation, pc: PupConversation, who: Who] = {
whoElse: Who ← whoBar[who];
LastOneOut:
ENTRY
PROC [pc: PupConversation]
RETURNS [last:
BOOL] = {
pc.procing[who] ← FALSE;
pc.looping[who] ← FALSE;
last ← NOT pc.procing[whoElse];
};
IF whoElse = who THEN ERROR;
IF NOT LastOneOut[pc] THEN RETURN;
pc.toClient.PutRope[" ... closed\015\012" !IO.Error => CONTINUE];
IF c.noteDisconnect # NIL THEN c.noteDisconnect[c.clientData];
pc.serverStream.Close[!IO.Error => CONTINUE];
pc.serverStream ← NIL; -- could cause pointer faults!
PupDefs.PupPackageDestroy[];
};
FinishStringWithErrorMsg:
PROC [toClient:
IO.
STREAM, errorMsg:
ROPE, ending:
ROPE ← ".\015\012"] = {
IF errorMsg # NIL THEN toClient.PutF[":\t%s", IO.rope[errorMsg]];
toClient.PutRope[ending];
};
CloseConnection:
PROC [pc: PupConversation, report:
BOOL] = {
rawSelf: UNSAFE PROCESS;
self: PROCESS;
AbortEm:
ENTRY
PROC [pc: PupConversation] =
TRUSTED {
IF pc.procs[uToS] # self
AND pc.looping[uToS]
THEN Process.Abort[pc.procs[uToS] !Process.InvalidProcess => CONTINUE];
IF pc.procs[sToU] # self
AND pc.looping[sToU]
THEN Process.Abort[pc.procs[sToU] !Process.InvalidProcess => CONTINUE];
};
TRUSTED {
rawSelf ← Process.GetCurrent[];
self ← LOOPHOLE[rawSelf]};
IF report THEN pc.toClient.PutF["\015\012Closing connection to %s ...", IO.rope[pc.serverName] ! IO.Error => CONTINUE];
pc.open ← FALSE;
pc.serverStream.Close[TRUE !IO.Error => CONTINUE];
AbortEm[pc];
};
Protocols.RegProtocol[pupChat];
}.