DIRECTORY Basics USING [LowHalf], FS USING [defaultStreamOptions, Error, StreamOpen, StreamOptions], IntervalTimer USING [Microseconds, Now, WaitForExpirationTime], IO, NewLark, PrincOpsUtils USING [LongCOPY], Process USING [MsecToTicks, Pause, SetTimeout], PupDefs USING [GetFreePupBuffer, MsToTocks, PupBuffer, PupSocket, PupSocketDestroy, PupSocketMake, ReturnFreePupBuffer, SetPupContentsWords, UniqueLocalPupSocketID], PupTypes USING [PupAddress, PupType], Rope USING [ROPE]; PlayBackImpl: MONITOR IMPORTS Basics, FS, IntervalTimer, IO, PrincOpsUtils, Process, PupDefs EXPORTS NewLark = BEGIN sendProbes: BOOL _ TRUE; BluejayVoiceType: PupTypes.PupType = LOOPHOLE[250]; ProbeReplyType: PupTypes.PupType = LOOPHOLE[251]; BluejayPacket: TYPE = LONG POINTER TO BluejayPacketObject; BluejayPacketObject: TYPE = MACHINE DEPENDENT RECORD [ blank1: [0..77B], ignore: BOOL, probeRequest: BOOL, blank2: [0..17B], encryptionKeyIndex: [0..17B], energy: CARDINAL, silenceMS: CARDINAL, blank3: CARDINAL ]; ProbeReply: TYPE = LONG POINTER TO ProbeReplyPacketObject; ProbeReplyPacketObject: TYPE = MACHINE DEPENDENT RECORD [ replyID: CARDINAL, maxPacketSize: CARDINAL, maxBuffers: CARDINAL, blank1: CARDINAL ]; VoiceBuffer: TYPE = REF VoiceBufferObject; VoiceBufferObject: TYPE = RECORD [ valid: BOOL _ FALSE, cLength: NAT, data: SEQUENCE length: [0..4096) OF CARDINAL ]; PlayBack: TYPE = REF PlayBackObject; PlayBackObject: TYPE = RECORD [ fileName: Rope.ROPE, fileStream: IO.STREAM, him: PupTypes.PupAddress, lark: PupDefs.PupSocket, sequenceNumber: CARDINAL, probeSequenceNumber: CARDINAL, doneInitialProbing: BOOL, initialProbeDone: CONDITION, batchCount: NAT, probeLaunchTime: IntervalTimer.Microseconds, drift: INT, stop: BOOL, larkEpoch: CARDINAL, localEpoch: IntervalTimer.Microseconds, timeToPlay: CARDINAL, sendNextBatch: IntervalTimer.Microseconds, roundTripMS: CARDINAL, diskProcess: PROCESS, clockAdjustProcess: PROCESS, bufferA: VoiceBuffer, bufferB: VoiceBuffer ]; lastPB: PlayBack; SerialNumberProcType: TYPE = PROC RETURNS [CARDINAL]; PlayFile: PUBLIC PROC [fileName: Rope.ROPE, him: PupTypes.PupAddress] = { pb: PlayBack _ NEW[PlayBackObject]; rawOptions: FS.StreamOptions _ FS.defaultStreamOptions; rawOptions[tiogaRead] _ FALSE; lastPB _ pb; { pb.fileName _ fileName; pb.him _ him; pb.fileStream _ FS.StreamOpen[fileName: pb.fileName, streamOptions: rawOptions, streamBufferParms: [16, 4] ! FS.Error => TRUSTED { GOTO NoStart; }]; pb.lark _ PupDefs.PupSocketMake[local: PupDefs.UniqueLocalPupSocketID[], remote: pb.him, ticks: PupDefs.MsToTocks[250]]; pb.bufferA _ NEW[VoiceBufferObject[4000]]; pb.bufferA.valid _ FALSE; pb.bufferA.cLength _ 0; pb.bufferB _ NEW[VoiceBufferObject[4000]]; pb.bufferB.valid _ FALSE; pb.bufferB.cLength _ 0; pb.stop _ FALSE; pb.drift _ 0; pb.batchCount _ 0; pb.doneInitialProbing _ FALSE; Process.SetTimeout[condition: @pb.initialProbeDone, ticks: Process.MsecToTicks[100]]; pb.diskProcess _ FORK BufferFiller[pb]; pb.clockAdjustProcess _ FORK AdjustProcess[pb]; WHILE NOT pb.bufferA.valid DO Process.Pause[Process.MsecToTicks[100]]; ENDLOOP; EmptyProbe[pb: pb]; IF NOT pb.doneInitialProbing THEN { GOTO Finish; }; pb.timeToPlay _ pb.larkEpoch + 120; pb.sendNextBatch _ pb.localEpoch; DO IF NOT pb.bufferA.valid THEN EXIT; PlayABuffer[pb, pb.bufferA]; IF NOT pb.bufferB.valid THEN EXIT; PlayABuffer[pb, pb.bufferB]; ENDLOOP; GOTO Finish; EXITS Finish => { pb.stop _ TRUE; JOIN pb.diskProcess; JOIN pb.clockAdjustProcess; pb.fileStream.Close[]; PupDefs.PupSocketDestroy[pb.lark]; }; NoStart => NULL; }; }; BufferFiller: PROC [pb: PlayBack] = { end: BOOL; DO end _ FillABuf[pb, pb.bufferA, pb.fileStream]; IF end OR pb.stop THEN RETURN; end _ FillABuf[pb, pb.bufferB, pb.fileStream]; IF end OR pb.stop THEN RETURN; ENDLOOP; }; FillABuf: PROC [pb: PlayBack, vb: VoiceBuffer, s: IO.STREAM] RETURNS [eof: BOOL] = { read: INT; IF s.EndOf[] THEN { vb.cLength _ 0; vb.valid _ FALSE; RETURN [TRUE]; }; WHILE vb.valid AND NOT pb.stop DO Process.Pause[Process.MsecToTicks[100]]; ENDLOOP; read _ s.UnsafeGetBlock[block: [base: LOOPHOLE[BASE[DESCRIPTOR[vb.data]], LONG POINTER TO PACKED ARRAY [0..0) OF CHAR], startIndex: 0, count: 8000]]; IF read < 8000 THEN FOR i: INT IN [read/2..4000) DO vb[i] _ 0; ENDLOOP; vb.cLength _ read/2; vb.valid _ TRUE; RETURN[read # 8000]; }; EmptyProbe: PROC [pb: PlayBack] = { b: PupDefs.PupBuffer; pp: BluejayPacket; trySN: CARDINAL; FOR i: NAT IN [0..4) DO b _ PupDefs.GetFreePupBuffer[]; pp _ LOOPHOLE[@b.pupBody]; trySN _ pb.sequenceNumber; pb.sequenceNumber _ pb.sequenceNumber + 1; b.pupID.a _ 0; b.pupID.b _ trySN; b.pupType _ BluejayVoiceType; pp^ _ [ blank1: 0, ignore: TRUE, probeRequest: TRUE, blank2: 0, encryptionKeyIndex: 0, energy: 0, silenceMS: 0, blank3: 0 ]; PupDefs.SetPupContentsWords[b, SIZE[BluejayPacketObject]]; SendProbeLocked[pb, b]; pb.lark.put[b]; WaitLocked[pb]; IF pb.doneInitialProbing THEN EXIT; ENDLOOP; }; SendProbeLocked: ENTRY PROCEDURE [pb: PlayBack, b: PupDefs.PupBuffer] = { pb.probeLaunchTime _ IntervalTimer.Now[]; pb.probeSequenceNumber _ b.pupID.b; }; WaitLocked: ENTRY PROCEDURE [pb: PlayBack] = { WAIT pb.initialProbeDone; }; PlayABuffer: PROC [pb: PlayBack, vb: VoiceBuffer] = { b: PupDefs.PupBuffer; pp: BluejayPacket; FOR i: NAT IN [0..vb.cLength/80) DO b _ PupDefs.GetFreePupBuffer[]; pp _ LOOPHOLE[@b.pupBody]; b.pupType _ BluejayVoiceType; b.pupID.b _ pb.sequenceNumber; pb.sequenceNumber _ pb.sequenceNumber + 1; b.pupID.a _ pb.timeToPlay; pb.timeToPlay _ pb.timeToPlay + 20; pp^ _ [ blank1: 0, ignore: FALSE, probeRequest: FALSE, blank2: 0, encryptionKeyIndex: 0, energy: 0, silenceMS: 0, blank3: 0 ]; PrincOpsUtils.LongCOPY[from: @vb.data[i * 80], nwords: 80, to: @b.pupWords[4]]; PupDefs.SetPupContentsWords[b, SIZE[BluejayPacketObject] + 80]; IntervalTimer.WaitForExpirationTime[pb.sendNextBatch]; PlayPacketLocked[pb, pp, b]; pb.lark.put[b]; ENDLOOP; vb.valid _ FALSE; }; PlayPacketLocked: ENTRY PROC [pb: PlayBack, pp: BluejayPacket, b: PupDefs.PupBuffer] = { IF pb.batchCount = 4 AND sendProbes THEN { pp^.probeRequest _ TRUE; pb.probeLaunchTime _ IntervalTimer.Now[]; pb.probeSequenceNumber _ b.pupID.b; }; pb.batchCount _ (pb.batchCount + 1) MOD 5; pb.sendNextBatch _ pb.sendNextBatch + 20000; }; AdjustProcess: PROC [pb: PlayBack] = { b: PupDefs.PupBuffer; DO IF pb.stop THEN EXIT; b _ pb.lark.get[]; IF b = NIL THEN LOOP; IF b.pupType # ProbeReplyType THEN { PupDefs.ReturnFreePupBuffer[b]; LOOP; }; AdjustLocked[pb, b]; PupDefs.ReturnFreePupBuffer[b]; ENDLOOP; }; AdjustLocked: ENTRY PROC [pb: PlayBack, b: PupDefs.PupBuffer] = { rp: ProbeReply; deltaLark, deltaLocal: INT; rp _ LOOPHOLE[@b.pupBody]; IF rp^.replyID = pb.probeSequenceNumber THEN { IF pb.doneInitialProbing THEN { pb.roundTripMS _ (Basics.LowHalf[(IntervalTimer.Now[] - pb.probeLaunchTime) / 1000] + pb.roundTripMS) / 2; deltaLocal _ pb.probeLaunchTime - pb.localEpoch; deltaLark _ LONG[b.pupID.a - pb.larkEpoch] * 1000; pb.drift _ pb.drift + LOOPHOLE[deltaLocal - deltaLark, INT]; SELECT TRUE FROM pb.drift < -5000 => { pb.sendNextBatch _ pb.sendNextBatch - 5000; pb.drift _ pb.drift + 5000; }; pb.drift > 5000 => { pb.sendNextBatch _ pb.sendNextBatch + 5000; pb.drift _ pb.drift - 5000; }; ENDCASE; pb.larkEpoch _ b.pupID.a; pb.localEpoch _ pb.probeLaunchTime; } ELSE { -- initial probe case pb.roundTripMS _ Basics.LowHalf[(IntervalTimer.Now[] - pb.probeLaunchTime) / 1000]; pb.localEpoch _ pb.probeLaunchTime; pb.larkEpoch _ b.pupID.a; pb.doneInitialProbing _ TRUE; NOTIFY pb.initialProbeDone; }; }; }; END. March 17, 1983 2:02 pm, Stewart, created March 24, 1983 10:51 am, Stewart, condition variable and interval timer December 27, 1983 5:50 pm, Stewart, Cedar 5 PlayBackImpl.mesa Last Edited by: Stewart, December 27, 1983 5:50 pm id.a: timeToPlay id.b: bluejay sequence number id.a: Lark clock id.b: Lark sequence number probe failed now we have a clock correspondence, a packet transmitted at localEpoch arrived at the lark at larkEpoch we want a packet transmitted now to be played in 120 mSec (6 packet times) send the first batch right away . . . is this unsafe? (no, it's a simple assignment) average roundtrip time and check for clock drift The initial association between Bluejay time and Lark time was localEpoch and larkEpoch. The new data point is probeLaunchTime and b.pupID.a; positive drift means the local clock is too fast since the local clock is too fast, we need to wait some more ticks of the local clock to delay the same real time Ê k˜Jšœ™Jšœ2™2J˜šÏk ˜ J˜J˜BJ˜?Jšœ˜J˜Jšœ˜Jšœ/˜/Jšœ¥˜¥Jšœ%˜%Jšœ˜J˜—šœ˜Jšœœ!˜FJšœ ˜Jš˜J˜Jšœ œœ˜J˜Jšœ%œ˜3Jšœ#œ˜1J˜Jš œœœœœ˜:J˜Jšœ™Jšœ™š œœœ œœ˜6J˜Jšœœ˜ Jšœœ˜J˜J˜Jšœœ˜Jšœ œ˜Jšœ˜J˜J˜—Jš œ œœœœ˜:J˜Jšœ™Jšœ™š œœœ œœ˜9Jšœ œ˜Jšœœ˜Jšœ œ˜Jšœ˜J˜J˜—Jšœ œœ˜*J˜šœœœ˜"Jšœœœ˜Jšœ œ˜ Jšœœœ˜,J˜J˜—Jšœ œœ˜$J˜šœœœ˜Jšœœ˜Jšœ œœ˜J˜J˜Jšœœ˜Jšœœ˜Jšœœ˜Jšœ œ˜Jšœ œ˜J˜,Jšœœ˜ Jšœœ˜ Jšœ œ˜J˜'Jšœ œ˜J˜*Jšœ œ˜Jšœ œ˜Jšœœ˜J˜J˜J˜J˜—J˜J˜Jš Ïnœœœœœ˜5J˜šžœœœœ˜IJšœœ˜#Jšœ7˜7Jšœ˜J˜ J˜J˜J˜ Jšœyœœ ˜”J˜xJšœ œ˜*Jšœœ˜J˜Jšœ œ˜*Jšœœ˜J˜Jšœ œ˜J˜ J˜Jšœœ˜JšœU˜UJšœœ˜'Jšœœ˜/Jšœœœ*œ˜OJ˜šœœœ˜#Jšœ ™ Jšœ˜ J˜—JšœF™FJšœ ™ JšœJ™JJ˜#JšœT™TJ˜!š˜Jšœœœœ˜"J˜Jšœœœœ˜"J˜Jšœ˜—Jšœ˜ Jš˜˜ Jšœ œ˜Jšœ˜Jšœ˜J˜J˜"J˜—Jšœ œ˜J˜J˜J˜—šž œœ˜%Jšœœ˜ š˜J˜.Jšœœ œœ˜J˜.Jšœœ œœ˜Jšœ˜—J˜J˜—š žœœ$œœœœ˜TJšœœ˜ šœ œ˜J˜Jšœ œ˜Jšœœ˜J˜—Jš œ œœ œ*œ˜SJšœ& œ œ œœœœœœœ ˜•Jšœ œœœœœ œ˜GJ˜Jšœ œ˜Jšœ˜J˜J˜—šž œœ˜#J˜J˜Jšœœ˜šœœœ˜J˜Jšœœ ˜J˜J˜*J˜J˜J˜˜J˜ Jšœœ˜ Jšœœ˜J˜ J˜J˜ J˜ J˜ J˜—Jšœœ˜:Jšœ˜J˜Jšœ˜Jšœœœ˜#Jšœ˜—J˜J˜—šžœœ œ)˜IJ˜)J˜#Jšœ˜—J˜šž œœ œ˜.Jšœ˜Jšœ˜—J˜šž œœ$˜5J˜J˜šœœœ˜#J˜Jšœœ ˜J˜J˜J˜*J˜J˜#˜J˜ Jšœœ˜Jšœœ˜J˜ J˜J˜ J˜ J˜ J˜—J˜OJšœœ˜?J˜6J˜J˜Jšœ˜—Jšœ œ˜J˜J˜—šžœœœ<˜Xšœœ œ˜*Jšœœ˜J˜)J˜#J˜—Jšœ$œ˜*J˜,J˜J˜—šž œœ˜&J˜š˜Jšœ œœ˜J˜Jšœœœœ˜šœœ˜$J˜Jšœ˜J˜—J˜J˜Jšœ˜—J˜J˜—šž œœœ)˜AJ˜Jšœœ˜Jšœœ ˜šœ&œ˜.Jšœ0™0JšœI™IJšœD™Dšœœ˜J˜jJ˜0Jšœ œ"˜2Jšœ0™0Jšœœœ˜<šœœ˜˜J˜+J˜J˜—˜JšœE™EJšœ+™+J˜+J˜J˜—Jšœ˜—J˜J˜#J˜—šœÏc˜J˜SJ˜#J˜Jšœœ˜Jšœ˜J˜—J˜—J˜J˜—Jšœ˜—J˜(J˜GJ˜+J˜J˜—…— *“