DIRECTORY Basics USING [LowHalf], FS USING [defaultStreamOptions, Error, StreamOpen, StreamOptions], IntervalTimer USING [Microseconds, Now, WaitForExpirationTime], IO, LarkTest, PrincOpsUtils USING [LongCOPY], Process USING [MsecToTicks, Pause, SetTimeout], PupDefs USING [GetFreePupBuffer, MsToTocks, PupBuffer, PupSocket, PupSocketDestroy, PupSocketMake, ReturnFreePupBuffer, SetPupContentsWords, UniqueLocalPupSocketID], PupTypes USING [PupAddress, PupType], Rope USING [ROPE]; PlayBackImpl: CEDAR MONITOR IMPORTS Basics, FS, IntervalTimer, IO, PrincOpsUtils, Process, PupDefs EXPORTS LarkTest = BEGIN sendProbes: BOOL _ TRUE; BluejayVoiceType: PupTypes.PupType = LOOPHOLE[250]; ProbeReplyType: PupTypes.PupType = LOOPHOLE[251]; BluejayPacket: TYPE = LONG POINTER TO BluejayPacketObject; BluejayPacketObject: TYPE = MACHINE DEPENDENT RECORD [ blank1: [0..77B], ignore: BOOL, probeRequest: BOOL, blank2: [0..17B], encryptionKeyIndex: [0..17B], energy: CARDINAL, silenceMS: CARDINAL, blank3: CARDINAL ]; ProbeReply: TYPE = LONG POINTER TO ProbeReplyPacketObject; ProbeReplyPacketObject: TYPE = MACHINE DEPENDENT RECORD [ replyID: CARDINAL, maxPacketSize: CARDINAL, maxBuffers: CARDINAL, blank1: CARDINAL ]; VoiceBuffer: TYPE = REF VoiceBufferObject; VoiceBufferObject: TYPE = RECORD [ valid: BOOL _ FALSE, cLength: NAT, data: SEQUENCE length: [0..4096) OF CARDINAL ]; PlayBack: TYPE = REF PlayBackObject; PlayBackObject: TYPE = RECORD [ fileName: Rope.ROPE, fileStream: IO.STREAM, him: PupTypes.PupAddress, lark: PupDefs.PupSocket, sequenceNumber: CARDINAL, probeSequenceNumber: CARDINAL, doneInitialProbing: BOOL, initialProbeDone: CONDITION, batchCount: NAT, probeLaunchTime: IntervalTimer.Microseconds, drift: INT, stop: BOOL, larkEpoch: CARDINAL, localEpoch: IntervalTimer.Microseconds, timeToPlay: CARDINAL, sendNextBatch: IntervalTimer.Microseconds, roundTripMS: CARDINAL, diskProcess: PROCESS, clockAdjustProcess: PROCESS, bufferA: VoiceBuffer, bufferB: VoiceBuffer ]; lastPB: PlayBack; SerialNumberProcType: TYPE = PROC RETURNS [CARDINAL]; PlayFile: PUBLIC PROC [fileName: Rope.ROPE, him: PupTypes.PupAddress] = TRUSTED { pb: PlayBack _ NEW[PlayBackObject]; rawOptions: FS.StreamOptions _ FS.defaultStreamOptions; rawOptions[tiogaRead] _ FALSE; lastPB _ pb; { pb.fileName _ fileName; pb.him _ him; pb.fileStream _ FS.StreamOpen[fileName: pb.fileName, streamOptions: rawOptions, streamBufferParms: [16, 4] ! FS.Error => TRUSTED { GOTO NoStart; }]; pb.lark _ PupDefs.PupSocketMake[local: PupDefs.UniqueLocalPupSocketID[], remote: pb.him, ticks: PupDefs.MsToTocks[250]]; pb.bufferA _ NEW[VoiceBufferObject[4000]]; pb.bufferA.valid _ FALSE; pb.bufferA.cLength _ 0; pb.bufferB _ NEW[VoiceBufferObject[4000]]; pb.bufferB.valid _ FALSE; pb.bufferB.cLength _ 0; pb.stop _ FALSE; pb.drift _ 0; pb.batchCount _ 0; pb.doneInitialProbing _ FALSE; Process.SetTimeout[condition: @pb.initialProbeDone, ticks: Process.MsecToTicks[100]]; pb.diskProcess _ FORK BufferFiller[pb]; pb.clockAdjustProcess _ FORK AdjustProcess[pb]; WHILE NOT pb.bufferA.valid DO Process.Pause[Process.MsecToTicks[100]]; ENDLOOP; EmptyProbe[pb: pb]; IF NOT pb.doneInitialProbing THEN { GOTO Finish; }; pb.timeToPlay _ pb.larkEpoch + 120; pb.sendNextBatch _ pb.localEpoch; DO IF NOT pb.bufferA.valid THEN EXIT; PlayABuffer[pb, pb.bufferA]; IF NOT pb.bufferB.valid THEN EXIT; PlayABuffer[pb, pb.bufferB]; ENDLOOP; GOTO Finish; EXITS Finish => { pb.stop _ TRUE; JOIN pb.diskProcess; JOIN pb.clockAdjustProcess; pb.fileStream.Close[]; PupDefs.PupSocketDestroy[pb.lark]; }; NoStart => NULL; }; }; BufferFiller: PROC [pb: PlayBack] = TRUSTED { end: BOOL; DO end _ FillABuf[pb, pb.bufferA, pb.fileStream]; IF end OR pb.stop THEN RETURN; end _ FillABuf[pb, pb.bufferB, pb.fileStream]; IF end OR pb.stop THEN RETURN; ENDLOOP; }; FillABuf: PROC [pb: PlayBack, vb: VoiceBuffer, s: IO.STREAM] RETURNS [eof: BOOL] = TRUSTED { read: INT; IF s.EndOf[] THEN { vb.cLength _ 0; vb.valid _ FALSE; RETURN [TRUE]; }; WHILE vb.valid AND NOT pb.stop DO Process.Pause[Process.MsecToTicks[100]]; ENDLOOP; read _ s.UnsafeGetBlock[block: [base: LOOPHOLE[BASE[DESCRIPTOR[vb.data]], LONG POINTER TO PACKED ARRAY [0..0) OF CHAR], startIndex: 0, count: 8000]]; IF read < 8000 THEN FOR i: INT IN [read/2..4000) DO vb[i] _ 0; ENDLOOP; vb.cLength _ read/2; vb.valid _ TRUE; RETURN[read # 8000]; }; EmptyProbe: PROC [pb: PlayBack] = TRUSTED { b: PupDefs.PupBuffer; pp: BluejayPacket; trySN: CARDINAL; FOR i: NAT IN [0..4) DO b _ PupDefs.GetFreePupBuffer[]; pp _ LOOPHOLE[@b.pupBody]; trySN _ pb.sequenceNumber; pb.sequenceNumber _ pb.sequenceNumber + 1; b.pupID.a _ 0; b.pupID.b _ trySN; b.pupType _ BluejayVoiceType; pp^ _ [ blank1: 0, ignore: TRUE, probeRequest: TRUE, blank2: 0, encryptionKeyIndex: 0, energy: 0, silenceMS: 0, blank3: 0 ]; PupDefs.SetPupContentsWords[b, SIZE[BluejayPacketObject]]; SendProbeLocked[pb, b]; pb.lark.put[b]; WaitLocked[pb]; IF pb.doneInitialProbing THEN EXIT; ENDLOOP; }; SendProbeLocked: ENTRY PROCEDURE [pb: PlayBack, b: PupDefs.PupBuffer] = TRUSTED { pb.probeLaunchTime _ IntervalTimer.Now[]; pb.probeSequenceNumber _ b.pupID.b; }; WaitLocked: ENTRY PROCEDURE [pb: PlayBack] = TRUSTED { WAIT pb.initialProbeDone; }; PlayABuffer: PROC [pb: PlayBack, vb: VoiceBuffer] = TRUSTED { b: PupDefs.PupBuffer; pp: BluejayPacket; FOR i: NAT IN [0..vb.cLength/80) DO b _ PupDefs.GetFreePupBuffer[]; pp _ LOOPHOLE[@b.pupBody]; b.pupType _ BluejayVoiceType; b.pupID.b _ pb.sequenceNumber; pb.sequenceNumber _ pb.sequenceNumber + 1; b.pupID.a _ pb.timeToPlay; pb.timeToPlay _ pb.timeToPlay + 20; pp^ _ [ blank1: 0, ignore: FALSE, probeRequest: FALSE, blank2: 0, encryptionKeyIndex: 0, energy: 0, silenceMS: 0, blank3: 0 ]; PrincOpsUtils.LongCOPY[from: @vb.data[i * 80], nwords: 80, to: @b.pupWords[4]]; PupDefs.SetPupContentsWords[b, SIZE[BluejayPacketObject] + 80]; IntervalTimer.WaitForExpirationTime[pb.sendNextBatch]; PlayPacketLocked[pb, pp, b]; pb.lark.put[b]; ENDLOOP; vb.valid _ FALSE; }; PlayPacketLocked: ENTRY PROC [pb: PlayBack, pp: BluejayPacket, b: PupDefs.PupBuffer] = TRUSTED { IF pb.batchCount = 4 AND sendProbes THEN { pp^.probeRequest _ TRUE; pb.probeLaunchTime _ IntervalTimer.Now[]; pb.probeSequenceNumber _ b.pupID.b; }; pb.batchCount _ (pb.batchCount + 1) MOD 5; pb.sendNextBatch _ pb.sendNextBatch + 20000; }; AdjustProcess: PROC [pb: PlayBack] = TRUSTED { b: PupDefs.PupBuffer; DO IF pb.stop THEN EXIT; b _ pb.lark.get[]; IF b = NIL THEN LOOP; IF b.pupType # ProbeReplyType THEN { PupDefs.ReturnFreePupBuffer[b]; LOOP; }; AdjustLocked[pb, b]; PupDefs.ReturnFreePupBuffer[b]; ENDLOOP; }; AdjustLocked: ENTRY PROC [pb: PlayBack, b: PupDefs.PupBuffer] = TRUSTED { rp: ProbeReply; deltaLark, deltaLocal: INT; rp _ LOOPHOLE[@b.pupBody]; IF rp^.replyID = pb.probeSequenceNumber THEN { IF pb.doneInitialProbing THEN { pb.roundTripMS _ (Basics.LowHalf[(IntervalTimer.Now[] - pb.probeLaunchTime) / 1000] + pb.roundTripMS) / 2; deltaLocal _ pb.probeLaunchTime - pb.localEpoch; deltaLark _ LONG[b.pupID.a - pb.larkEpoch] * 1000; pb.drift _ pb.drift + LOOPHOLE[deltaLocal - deltaLark, INT]; SELECT TRUE FROM pb.drift < -5000 => { pb.sendNextBatch _ pb.sendNextBatch - 5000; pb.drift _ pb.drift + 5000; }; pb.drift > 5000 => { pb.sendNextBatch _ pb.sendNextBatch + 5000; pb.drift _ pb.drift - 5000; }; ENDCASE; pb.larkEpoch _ b.pupID.a; pb.localEpoch _ pb.probeLaunchTime; } ELSE { -- initial probe case pb.roundTripMS _ Basics.LowHalf[(IntervalTimer.Now[] - pb.probeLaunchTime) / 1000]; pb.localEpoch _ pb.probeLaunchTime; pb.larkEpoch _ b.pupID.a; pb.doneInitialProbing _ TRUE; NOTIFY pb.initialProbeDone; }; }; }; END. March 17, 1983 2:02 pm, Stewart, created March 24, 1983 10:51 am, Stewart, condition variable and interval timer December 27, 1983 5:50 pm, Stewart, Cedar 5 \PlayBackImpl.mesa Last Edited by: Stewart, December 27, 1983 5:50 pm Last Edited by: Swinehart, February 15, 1985 12:26:48 pm PST id.a: timeToPlay id.b: bluejay sequence number id.a: Lark clock id.b: Lark sequence number probe failed now we have a clock correspondence, a packet transmitted at localEpoch arrived at the lark at larkEpoch we want a packet transmitted now to be played in 120 mSec (6 packet times) send the first batch right away . . . is this unsafe? (no, it's a simple assignment) average roundtrip time and check for clock drift The initial association between Bluejay time and Lark time was localEpoch and larkEpoch. The new data point is probeLaunchTime and b.pupID.a; positive drift means the local clock is too fast since the local clock is too fast, we need to wait some more ticks of the local clock to delay the same real time Ê –˜Jšœ™Jšœ2™2J™