File: VoiceStreamSocketImpl.mesa
Copyright © 1985, 1986 by Xerox Corporation. All rights reserved.
This file provides part of the implementation of voicestreams. Its procedures provide for transfer of voice data to and from remote sockets on the network.
Swinehart, October 17, 1986 1:35:57 pm PDT
Ades, January 28, 1986 7:05:42 pm PST
Ousterhout, March 15, 1983 3:22 pm compile
L. Stewart, December 27, 1983 2:06 pm
DIRECTORY
Basics USING [BITAND, LowHalf],
BasicTime,
IntervalTimer USING [Microseconds, Now, WaitForExpirationTime],
IntervalTimer USING [NowInPulses, Pulses, WaitForExpirationTimeInPulses],
IO USING [card, int, PutF, PutRope, rope],
Process USING [Detach, GetCurrent, MsecToTicks, Priority, priorityForeground, SetPriority, SetTimeout],
PupBuffer USING [ Buffer ],
PupSocket USING [ AllocBuffer, FreeBuffer, Get, GetUserBytes, Put, SetUserBytes, SetUserHWords, Socket ],
PupType USING [Type],
Rope USING [ROPE],
VoiceStream;
VoiceStreamSocketImpl: MONITOR LOCKS VoiceStream.Lock
IMPORTS Basics, BasicTime, IntervalTimer, Process, PupSocket, VoiceStream
BEGIN
socketPriority: Process.Priority ← Process.priorityForeground;
Truth for the constants immediately following is ProcessorFace.microsecondsPerHundredPulses.
MicrosecondsPerPulse: LONG CARDINAL = 32;
PulsesPerMillisecond: LONG CARDINAL = 31; -- really 31.25
packet types and formats
BluejayVoiceType: PupType.Type = VAL[250];
ProbeReplyType: PupType.Type = VAL[251];
LarkFirstVoiceType: PupType.Type = VAL[252];
RetransmitRequestType: PupType.Type = VAL[253];
LarkVoiceType: PupType.Type = VAL[254];
BluejayPacket: TYPE = LONG POINTER TO BluejayPacketObject;
BluejayVoiceType
id.a: timeToPlay
id.b: bluejay sequence number
BluejayPacketObject:
TYPE =
MACHINE
DEPENDENT
RECORD [
blank1: [0..77B],
ignore: BOOL,
probeRequest: BOOL,
blank2: [0..17B],
encryptionKeyIndex: [0..17B],
energy: CARDINAL,
silenceMS: CARDINAL,
blank3: CARDINAL
];
ProbeReply: TYPE = LONG POINTER TO ProbeReplyPacketObject;
ProbeReplyType
id.a: Lark clock
id.b: Lark sequence number
ProbeReplyPacketObject:
TYPE =
MACHINE
DEPENDENT
RECORD [
replyID: CARDINAL,
maxPacketSize: CARDINAL,
maxBuffers: CARDINAL,
blank1: CARDINAL
];
LarkVoiceType
id.a: Lark time corresponding to first byte of silence.
id.b: Monotonic increasing packet sequence number (ignored).
LarkPacketObject:
TYPE =
MACHINE
DEPENDENT
RECORD [
blank1: [0..377B],
blank2: [0..17B],
encryptionKeyIndex: [0..17B],
energy: CARDINAL,
silenceMS: CARDINAL, -- preceding data
blank3: CARDINAL
];
RetransmitRequestType
id.a: Lark time (in milliseconds) of first data not received.
id.b: Monotonic increasing packet sequence number.
Some constants for packet shipment on the net:
bytesPerPacket: INTEGER = 160;
bytesPerMs: NAT = 8;
debugSwitch: BOOL ← FALSE;
maxLostTime: LONG CARDINAL ← 100000 / MicrosecondsPerPulse;
now: LONG CARDINAL;
timesLate: LONG CARDINAL ← 0;
noPieceMS: NAT ← 200;
SetSocket:
PUBLIC
ENTRY
PROC [handle: VoiceStream.Handle, socket: PupSocket.Socket] = {
ENABLE UNWIND => NULL;
IF handle.errorRope # NIL THEN VoiceStream.ReportError[handle];
Create a new process, if that is needed.
IF handle.connection = NIL THEN handle.connection ← NEW[VoiceStream.ConnectionObject];
handle.connection.socket ← NIL;
IF socket #
NIL
THEN {
handle.connection.socket ← socket;
Process.Detach[FORK ReceiveProc[handle, socket]];
Process.Detach[FORK SendProc[handle, socket]];
};
};
CheckHandle:
ENTRY
PROC [handle: VoiceStream.Handle, socket: PupSocket.Socket]
RETURNS [Rope.
ROPE] = {
ENABLE UNWIND => NULL;
This procedure just provides a safe way to interrogate handle information to make sure the current socket process should continue to run. A NIL return value means the socket process should continue to run. Otherwise, a rope is returned to desribe why the process should quit.
IF handle.connection.socket # socket THEN RETURN ["Socket changed."];
IF handle.errorRope # NIL THEN RETURN [handle.errorRope];
RETURN [NIL];
};
ForceToJukebox:
ENTRY
PROC [handle: VoiceStream.Handle] = {
This procedure outputs to the server any partially-full write buffer, then waits for the server to write it to disk.
ENABLE UNWIND => NULL;
VoiceStream.FlushBuffer[handle];
WHILE (handle.firstServerBuffer #
NIL)
AND (handle.errorRope = NIL)
AND (handle.piece # NIL) DO WAIT VoiceStream.client ENDLOOP;
};
ReceiveProc:
PROC [handle: VoiceStream.Handle, origSocket: PupSocket.Socket] = {
This process sets up a connection with an Etherphone, then receives incoming packets from the Etherphone.
ENABLE {
VoiceStream.Error => {
IF reason # StreamClosed THEN IO.PutF[ioStream, "Rx Network connection closed: %s.\n", IO.rope[rope]];
CONTINUE;
};
};
input: PupBuffer.Buffer;
expLarkTime: CARDINAL ← 0; -- Expected time for next data.
larkTime: CARDINAL ← 0; -- Time of current packet.
silenceMs: CARDINAL ← 0;
dataBytes: CARDINAL;
first: BOOL ← TRUE; -- first packet of recording
reason: Rope.ROPE ← NIL;
Process.SetPriority[socketPriority];
Read packets continuously until either the voice stream is closed or we cease to be the official socket for the stream. Note that we never time the stream out.
WHILE (reason ← CheckHandle[handle, origSocket]) =
NIL
DO
input ← handle.connection.socket.Get[];
IF input = NIL THEN LOOP;
Decode the Lark packet.
larkTime ← input.id.high;
SELECT input.type FROM
On probe replies, just update information about the connection.
= ProbeReplyType => {
AdjustLocked[handle.connection, input];
handle.connection.packetSize ← input.hWord[1];
handle.connection.bufferSpace ← input.hWord[2] * (handle.connection.packetSize / bytesPerMs);
};
On Lark data, throw away packets that don't correspond to the immediate future, then add silence to the tune (if necessary), then add the data from the packet.
= LarkVoiceType, = LarkFirstVoiceType => {
silenceMs ← larkTime - expLarkTime;
silenceMs ← MIN[silenceMs, 1000];
input.hWord[2] ← MIN[260, input.hWord[2]];
IF
NOT first
AND silenceMs
NOT
IN [0..250)
THEN {
IO.PutF[ioStream, "Got packet time %d, expected %d.\n", IO.int[larkTime], IO.int[expLarkTime]];
silenceMs ← 0;
};
Write silence and data.
IF first THEN silenceMs ← 0
ELSE silenceMs ← silenceMs + input.hWord[2];
dataBytes ← PupSocket.GetUserBytes[input] - 8;
dataBytes ← Basics.BITAND[dataBytes, 0177770B];
[] ← VoiceStream.Put[handle: handle, silentBytes: silenceMs * bytesPerMs, block: [@input.hWord[4], 0, dataBytes], energy: input.hWord[1]];
expLarkTime ← larkTime + input.hWord[2] + (dataBytes/bytesPerMs);
first ← FALSE;
};
ENDCASE;
PupSocket.FreeBuffer[input];
ENDLOOP;
IO.PutF[ioStream, "Rx process %b quitting: %g\n", IO.card[LOOPHOLE[Process.GetCurrent[], CARDINAL]], IO.rope[reason]];
};
AdjustLocked:
ENTRY
PROC [c: VoiceStream.Connection, b: PupBuffer.Buffer] = {
ENABLE UNWIND => NULL;
rp: ProbeReply;
deltaLark, deltaLocal: INT;
rp ← LOOPHOLE[@b.body];
IF rp^.replyID = c.probeSequenceNumber
THEN {
average roundtrip time and check for clock drift The initial association between Bluejay time and Lark time was localEpoch and larkEpoch. The new data point is probeLaunchTime and b.id.high;
IF c.initialProbeFinished
THEN {
c.roundTripMS ← (Basics.LowHalf[(IntervalTimer.NowInPulses[] - c.probeLaunchTime) / PulsesPerMillisecond] + c.roundTripMS) / 2;
deltaLocal ← c.probeLaunchTime - c.localEpoch;
deltaLark ← LONG[b.id.high - c.larkEpoch] * PulsesPerMillisecond;
positive drift means the local clock is too fast
c.drift ← c.drift + deltaLocal - deltaLark;
SELECT
TRUE
FROM
c.drift < -312 => {
c.sendNext ← c.sendNext - 150;
c.drift ← c.drift + 150;
};
c.drift > 312 => {
since the local clock is too fast, we need to wait some more ticks of
the local clock to delay the same real time
c.sendNext ← c.sendNext + 150;
c.drift ← c.drift - 150;
};
ENDCASE;
c.larkEpoch ← b.id.high;
c.localEpoch ← c.probeLaunchTime;
}
ELSE {
-- initial probe case
c.roundTripMS ← Basics.LowHalf[(IntervalTimer.NowInPulses[] - c.probeLaunchTime) / PulsesPerMillisecond];
c.localEpoch ← c.probeLaunchTime;
c.larkEpoch ← b.id.high;
c.drift ← 0;
c.initialProbeFinished ← TRUE;
NOTIFY c.initialProbeDone;
};
};
};
SendProc:
PROC [handle: VoiceStream.Handle, origSocket: PupSocket.Socket] = {
This procedure runs the second process for each connection between Bluejay and a Lark. It is responsible for sending voice to Lark.
ENABLE {
VoiceStream.Error => {
IF reason # StreamClosed THEN IO.PutF[ioStream, "Tx Network connection closed: %s.\n", IO.rope[rope]];
CONTINUE;
};
};
output: PupBuffer.Buffer ← NIL;
pp: BluejayPacket;
c: VoiceStream.Connection ← handle.connection;
silenceLength: NAT;
voiceLength: NAT;
reason: Rope.ROPE;
silenceMS: NAT ← 0;
voiceMS: NAT ← 0;
lastPlayed: IntervalTimer.Microseconds;
lastPlayed: IntervalTimer.Pulses;
noPieceThisTime: BOOL ← FALSE;
wakeTime: IntervalTimer.Pulses;
{
Process.SetPriority[socketPriority];
c.initialProbeFinished ← FALSE;
Process.SetTimeout[condition: @c.initialProbeDone, ticks: Process.MsecToTicks[100]];
For starters, send a probe request to Lark, and keep sending it every quarter second until we get a probe packet back again.
EmptyProbe[handle: handle, origSocket: origSocket];
IF
NOT c.initialProbeFinished
THEN {
probe failed
handle.errorRope ← "Lark failed to respond";
handle.errorCode ← Timeout;
GOTO Cleanup;
};
now we have a clock correspondence, a packet transmitted at localEpoch arrived at the lark at larkEpoch
we want to buffer up as much as possible, but not too much
c.timeToPlay ← c.larkEpoch + MIN[120, c.bufferSpace];
send the first batch right away . . . is this unsafe? (no, it's a simple assignment)
c.sendNext ← c.localEpoch;
lastPlayed ← IntervalTimer.Now[] - 10000;
lastPlayed ← IntervalTimer.NowInPulses[] - (10000 / MicrosecondsPerPulse);
Now enter a loop to send voice data and/or silence to Lark.
WHILE (reason ← CheckHandle[handle, origSocket]) =
NIL
DO
IF output =
NIL
THEN {
output ← c.socket.AllocBuffer[];
pp ← LOOPHOLE[@output.body];
output.type ← BluejayVoiceType;
output.id.low ← c.sequenceNumber;
c.sequenceNumber ← c.sequenceNumber + 1;
pp^ ← [
blank1: 0,
ignore: FALSE,
probeRequest: FALSE,
blank2: 0,
encryptionKeyIndex: 0,
energy: 0,
silenceMS: 0,
blank3: 0
];
};
output.id.high ← c.timeToPlay;
Poll the voice stream to see if there is any data there.
[silence: silenceLength, bytesTransferred: voiceLength, keyIndex: pp.encryptionKeyIndex] ← VoiceStream.Get[handle: handle, maxSilentBytes: 500 * bytesPerMs, block: [@output.hWord[4], 0, bytesPerPacket]];
voiceLength ← Basics.BITAND[voiceLength, 0177770B];
silenceMS ← silenceLength/bytesPerMs;
voiceMS ← voiceLength/bytesPerMs;
If either return value is non-zero, then there is data.
noPieceThisTime ← (silenceLength = 0) AND (voiceLength = 0);
IF noPieceThisTime THEN silenceMS ← noPieceMS;
c.packetMS ← silenceMS + voiceMS;
pp.silenceMS ← silenceMS;
PupSocket.SetUserBytes[output, SIZE[BluejayPacketObject] * 2 + voiceLength];
now ← IntervalTimer.NowInPulses[];
IF
LOOPHOLE[(
LOOPHOLE[lastPlayed,
LONG
CARDINAL] + (5000 / MicrosecondsPerPulse)) - now,
INT] > 0
THEN {
IntervalTimer.WaitForExpirationTimeInPulses[lastPlayed + (5000 / MicrosecondsPerPulse)]; -- wait at least 5 mS
now ← IntervalTimer.NowInPulses[];
IF (now - (lastPlayed + (5000 / MicrosecondsPerPulse))) > maxLostTime
THEN {
timesLate ← timesLate + 1;
};
};
Else wait to send the packet
now ← IntervalTimer.NowInPulses[];
wakeTime ← c.sendNext;
IF
LOOPHOLE[wakeTime - now,
INT] > 0
THEN {
IntervalTimer.WaitForExpirationTimeInPulses[wakeTime];
now ← IntervalTimer.NowInPulses[];
IF now - wakeTime > maxLostTime
THEN {
timesLate ← timesLate + 1;
};
};
PlayPacketLocked[c, pp, output];
IF (reason ← CheckHandle[handle, origSocket]) #
NIL
THEN {
PupSocket.FreeBuffer[output];
output ← NIL;
EXIT;
};
c.socket.Put[output];
output ← NIL;
lastPlayed ← IntervalTimer.NowInPulses[];
ENDLOOP;
GOTO Cleanup;
EXITS
Cleanup => {
IO.PutF[ioStream, "Tx process %b quitting: %g\n", IO.card[LOOPHOLE[Process.GetCurrent[], CARDINAL]], IO.rope[reason]];
};
};
};
Call PlayPacketLocked only if the packet will be sent!
PlayPacketLocked:
ENTRY
PROC [c: VoiceStream.Connection, pp: BluejayPacket, b: PupBuffer.Buffer] = {
ENABLE UNWIND => NULL;
IF (c.sendNext - c.probeLaunchTime)
NOT
IN [0..450000 / MicrosecondsPerPulse)
THEN {
pp^.probeRequest ← TRUE;
c.probeLaunchTime ← IntervalTimer.NowInPulses[];
c.probeSequenceNumber ← b.id.low;
};
c.timeToPlay ← c.timeToPlay + c.packetMS;
c.sendNext ← c.sendNext + (LONG[c.packetMS] * PulsesPerMillisecond);
};
EmptyProbe:
PROC [handle: VoiceStream.Handle, origSocket: PupSocket.Socket] = {
b: PupBuffer.Buffer;
pp: BluejayPacket;
trySN: CARDINAL;
c: VoiceStream.Connection ← handle.connection;
WHILE CheckHandle[handle, origSocket] =
NIL
DO
b ← c.socket.AllocBuffer[];
pp ← LOOPHOLE[@b.body];
trySN ← c.sequenceNumber;
c.sequenceNumber ← c.sequenceNumber + 1;
b.id.high ← 0;
b.id.low ← trySN;
b.type ← BluejayVoiceType;
pp^ ← [
blank1: 0,
ignore: TRUE,
probeRequest: TRUE,
blank2: 0,
encryptionKeyIndex: 0,
energy: 0,
silenceMS: 0,
blank3: 0
];
PupSocket.SetUserHWords[b, SIZE[BluejayPacketObject]];
SendProbeLocked[c, b];
c.socket.Put[b];
WaitLocked[c];
IF c.initialProbeFinished THEN EXIT;
ENDLOOP;
};
SendProbeLocked:
ENTRY
PROC [c: VoiceStream.Connection, b: PupBuffer.Buffer] = {
ENABLE UNWIND => NULL;
c.probeLaunchTime ← IntervalTimer.NowInPulses[];
c.probeSequenceNumber ← b.id.low;
};
WaitLocked:
ENTRY
PROC [c: VoiceStream.Connection] = {
WAIT c.initialProbeDone;
};
END.
L. Stewart, June 5, 1983 4:30 pm, no notify on SetSocket
L. Stewart, June 6, 1983 1:15 pm, LarkFirstVoiceType packet type
L. Stewart, June 20, 1983 4:52 pm, Try using Pulses instead of Microseconds
L. Stewart, July 9, 1983 4:48 pm, added noPieceMS in place of constant
L. Stewart, May 14, 1985 6:29:20 pm PDT, Cedar 5, L
Swinehart, May 14, 1985 6:29:12 pm PDT
Cedar 6.0
Ades, May 10, 1986 5:21:14 pm PDT
modified Put call to supply energy value, DIRECTORY, IMPORTS, AdjustLocked, PlayPacketLocked, SendProbeLocked
Swinehart, May 10, 1986 4:18:44 pm PDT
Convert to new communications package
changes to: VoiceStreamSocketImpl, EXPORTS, BEGIN, SetSocket, CheckHandle, ForceToJukebox, ReceiveProc, SendProc, EmptyProbe