DIRECTORY
Basics USING [LowHalf],
Endian,
FS USING [defaultStreamOptions, Error, StreamOpen, StreamOptions],
IntervalTimer USING [Microseconds, Now, WaitForExpirationTime],
IO,
LarkTest,
PrincOpsUtils USING [LongCopy],
Process USING [MsecToTicks, Pause, SetTimeout],
Pup USING [ Address ],
PupBuffer USING [ Buffer ],
PupSocket USING [ AllocBuffer, CreateEphemeral, Destroy, FreeBuffer, Get, Put, Socket, SetUserSize ],
PupType USING [Type],
Rope USING [ROPE];
PlayBackImpl:
CEDAR MONITOR
IMPORTS Basics, FS, IntervalTimer, IO, PrincOpsUtils, Process, PupSocket
EXPORTS LarkTest =
BEGIN
sendProbes: BOOL ← TRUE;
BluejayVoiceType: PupType.Type = LOOPHOLE[250];
ProbeReplyType: PupType.Type = LOOPHOLE[251];
BluejayPacket: TYPE = LONG POINTER TO BluejayPacketObject;
id.high: timeToPlay
id.low: bluejay sequence number
BluejayPacketObject:
TYPE =
MACHINE
DEPENDENT
RECORD [
blank1: [0..77B],
ignore: BOOL,
probeRequest: BOOL,
blank2: [0..17B],
encryptionKeyIndex: [0..17B],
energy: CARDINAL,
silenceMS: CARDINAL,
blank3: CARDINAL
];
ProbeReply: TYPE = LONG POINTER TO ProbeReplyPacketObject;
id.high: Lark clock
id.low: Lark sequence number
ProbeReplyPacketObject:
TYPE =
MACHINE
DEPENDENT
RECORD [
replyID: CARDINAL,
maxPacketSize: CARDINAL,
maxBuffers: CARDINAL,
blank1: CARDINAL
];
VoiceBuffer: TYPE = REF VoiceBufferObject;
VoiceBufferObject:
TYPE =
RECORD [
valid: BOOL ← FALSE,
cLength: NAT,
data: SEQUENCE length: [0..4096) OF CARDINAL
];
PlayBack: TYPE = REF PlayBackObject;
PlayBackObject:
TYPE =
RECORD [
fileName: Rope.ROPE,
fileStream: IO.STREAM,
him: Pup.Address,
lark: PupSocket.Socket,
sequenceNumber: CARDINAL,
probeSequenceNumber: CARDINAL,
doneInitialProbing: BOOL,
initialProbeDone: CONDITION,
batchCount: NAT,
probeLaunchTime: IntervalTimer.Microseconds,
drift: INT,
stop: BOOL,
larkEpoch: CARDINAL,
localEpoch: IntervalTimer.Microseconds,
timeToPlay: CARDINAL,
sendNextBatch: IntervalTimer.Microseconds,
roundTripMS: CARDINAL,
diskProcess: PROCESS,
clockAdjustProcess: PROCESS,
bufferA: VoiceBuffer,
bufferB: VoiceBuffer
];
lastPB: PlayBack;
SerialNumberProcType: TYPE = PROC RETURNS [CARDINAL];
PlayFile:
PUBLIC
PROC [fileName: Rope.
ROPE, him: Pup.Address] =
TRUSTED {
pb: PlayBack ← NEW[PlayBackObject];
rawOptions: FS.StreamOptions ← FS.defaultStreamOptions;
rawOptions[tiogaRead] ← FALSE;
lastPB ← pb;
{
pb.fileName ← fileName;
pb.him ← him;
pb.fileStream ← FS.StreamOpen[fileName: pb.fileName, streamOptions: rawOptions, streamBufferParms: [16, 4] ! FS.Error => TRUSTED { GOTO NoStart; }];
pb.lark ← PupSocket.CreateEphemeral[remote: pb.him, getTimeout: 250];
pb.bufferA ← NEW[VoiceBufferObject[4000]];
pb.bufferA.valid ← FALSE;
pb.bufferA.cLength ← 0;
pb.bufferB ← NEW[VoiceBufferObject[4000]];
pb.bufferB.valid ← FALSE;
pb.bufferB.cLength ← 0;
pb.stop ← FALSE;
pb.drift ← 0;
pb.batchCount ← 0;
pb.doneInitialProbing ← FALSE;
Process.SetTimeout[condition: @pb.initialProbeDone, ticks: Process.MsecToTicks[100]];
pb.diskProcess ← FORK BufferFiller[pb];
pb.clockAdjustProcess ← FORK AdjustProcess[pb];
WHILE NOT pb.bufferA.valid DO Process.Pause[Process.MsecToTicks[100]]; ENDLOOP;
EmptyProbe[pb: pb];
IF
NOT pb.doneInitialProbing
THEN {
probe failed
GOTO Finish;
};
now we have a clock correspondence, a packet transmitted at localEpoch
arrived at the lark at larkEpoch
we want a packet transmitted now to be played in 120 mSec (6 packet times)
pb.timeToPlay ← pb.larkEpoch + 120;
send the first batch right away . . . is this unsafe? (no, it's a simple assignment)
pb.sendNextBatch ← pb.localEpoch;
DO
IF NOT pb.bufferA.valid THEN EXIT;
PlayABuffer[pb, pb.bufferA];
IF NOT pb.bufferB.valid THEN EXIT;
PlayABuffer[pb, pb.bufferB];
ENDLOOP;
GOTO Finish;
EXITS
Finish => {
pb.stop ← TRUE;
JOIN pb.diskProcess;
JOIN pb.clockAdjustProcess;
pb.fileStream.Close[];
PupSocket.Destroy[pb.lark];
};
NoStart => NULL;
};
};
BufferFiller:
PROC [pb: PlayBack] =
TRUSTED {
end: BOOL;
DO
end ← FillABuf[pb, pb.bufferA, pb.fileStream];
IF end OR pb.stop THEN RETURN;
end ← FillABuf[pb, pb.bufferB, pb.fileStream];
IF end OR pb.stop THEN RETURN;
ENDLOOP;
};
FillABuf:
PROC [pb: PlayBack, vb: VoiceBuffer, s:
IO.
STREAM]
RETURNS [eof:
BOOL] =
TRUSTED {
read: INT;
IF s.EndOf[]
THEN {
vb.cLength ← 0;
vb.valid ← FALSE;
RETURN [TRUE];
};
WHILE vb.valid AND NOT pb.stop DO Process.Pause[Process.MsecToTicks[100]]; ENDLOOP;
read ← s.UnsafeGetBlock[block: [base: LOOPHOLE[BASE[DESCRIPTOR[vb.data]]], startIndex: 0, count: 8000]];
IF read < 8000 THEN FOR i: INT IN [read/2..4000) DO vb[i] ← 0; ENDLOOP;
vb.cLength ← read/2;
vb.valid ← TRUE;
RETURN[read # 8000];
};
EmptyProbe:
PROC [pb: PlayBack] =
TRUSTED {
b: PupBuffer.Buffer;
pp: BluejayPacket;
trySN: CARDINAL;
FOR i:
NAT
IN [0..4)
DO
b ← PupSocket.AllocBuffer[pb.lark];
pp ← LOOPHOLE[@b.body];
trySN ← pb.sequenceNumber;
pb.sequenceNumber ← pb.sequenceNumber + 1;
b.id.high ← 0;
b.id.low ← trySN;
b.type ← BluejayVoiceType;
pp^ ← [
blank1: 0,
ignore: TRUE,
probeRequest: TRUE,
blank2: 0,
encryptionKeyIndex: 0,
energy: 0,
silenceMS: 0,
blank3: 0
];
PupSocket.SetUserSize[b, SIZE[BluejayPacketObject]];
SendProbeLocked[pb, b];
pb.lark.Put[b];
WaitLocked[pb];
IF pb.doneInitialProbing THEN EXIT;
ENDLOOP;
};
SendProbeLocked:
ENTRY
PROCEDURE [pb: PlayBack, b: PupBuffer.Buffer] =
TRUSTED {
pb.probeLaunchTime ← IntervalTimer.Now[];
pb.probeSequenceNumber ← b.id.low;
};
WaitLocked:
ENTRY
PROCEDURE [pb: PlayBack] =
TRUSTED {
WAIT pb.initialProbeDone;
};
PlayABuffer:
PROC [pb: PlayBack, vb: VoiceBuffer] =
TRUSTED {
b: PupBuffer.Buffer;
pp: BluejayPacket;
FOR i:
NAT
IN [0..vb.cLength/80)
DO
b ← PupSocket.AllocBuffer[pb.lark];
pp ← LOOPHOLE[@b.body];
b.type ← BluejayVoiceType;
b.id.low ← pb.sequenceNumber;
pb.sequenceNumber ← pb.sequenceNumber + 1;
b.id.high ← pb.timeToPlay;
pb.timeToPlay ← pb.timeToPlay + 20;
pp^ ← [
blank1: 0,
ignore: FALSE,
probeRequest: FALSE,
blank2: 0,
encryptionKeyIndex: 0,
energy: 0,
silenceMS: 0,
blank3: 0
];
PrincOpsUtils.LongCopy[from: @vb.data[i * 80], nwords: 80, to: @b.hWord[4]];
PupSocket.SetUserSize[b, SIZE[BluejayPacketObject] + 80];
IntervalTimer.WaitForExpirationTime[pb.sendNextBatch];
PlayPacketLocked[pb, pp, b];
pb.lark.Put[b];
ENDLOOP;
vb.valid ← FALSE;
};
PlayPacketLocked:
ENTRY
PROC [pb: PlayBack, pp: BluejayPacket, b: PupBuffer.Buffer] =
TRUSTED {
IF pb.batchCount = 4
AND sendProbes
THEN {
pp^.probeRequest ← TRUE;
pb.probeLaunchTime ← IntervalTimer.Now[];
pb.probeSequenceNumber ← b.id.low;
};
pb.batchCount ← (pb.batchCount + 1) MOD 5;
pb.sendNextBatch ← pb.sendNextBatch + 20000;
};
AdjustProcess:
PROC [pb: PlayBack] =
TRUSTED {
b: PupBuffer.Buffer;
DO
IF pb.stop THEN EXIT;
b ← pb.lark.Get[];
IF b = NIL THEN LOOP;
IF b.type # ProbeReplyType
THEN {
PupSocket.FreeBuffer[b];
LOOP;
};
AdjustLocked[pb, b];
PupSocket.FreeBuffer[b];
ENDLOOP;
};
AdjustLocked:
ENTRY
PROC [pb: PlayBack, b: PupBuffer.Buffer] =
TRUSTED {
rp: ProbeReply;
deltaLark, deltaLocal: INT;
rp ← LOOPHOLE[@b.body];
IF rp^.replyID = pb.probeSequenceNumber
THEN {
average roundtrip time and check for clock drift
The initial association between Bluejay time and Lark time was localEpoch
and larkEpoch. The new data point is probeLaunchTime and b.id.high;
IF pb.doneInitialProbing
THEN {
pb.roundTripMS ← (Basics.LowHalf[(IntervalTimer.Now[] - pb.probeLaunchTime) / 1000] + pb.roundTripMS) / 2;
deltaLocal ← pb.probeLaunchTime - pb.localEpoch;
deltaLark ← LONG[b.id.high - pb.larkEpoch] * 1000;
positive drift means the local clock is too fast
pb.drift ← pb.drift + LOOPHOLE[deltaLocal - deltaLark, INT];
SELECT
TRUE
FROM
pb.drift < -5000 => {
pb.sendNextBatch ← pb.sendNextBatch - 5000;
pb.drift ← pb.drift + 5000;
};
pb.drift > 5000 => {
since the local clock is too fast, we need to wait some more ticks of
the local clock to delay the same real time
pb.sendNextBatch ← pb.sendNextBatch + 5000;
pb.drift ← pb.drift - 5000;
};
ENDCASE;
pb.larkEpoch ← b.id.high;
pb.localEpoch ← pb.probeLaunchTime;
}
ELSE {
-- initial probe case
pb.roundTripMS ← Basics.LowHalf[(IntervalTimer.Now[] - pb.probeLaunchTime) / 1000];
pb.localEpoch ← pb.probeLaunchTime;
pb.larkEpoch ← b.id.high;
pb.doneInitialProbing ← TRUE;
NOTIFY pb.initialProbeDone;
};
};
};
END.