<> <> <> DIRECTORY Basics USING [LowHalf], FS USING [defaultStreamOptions, Error, StreamOpen, StreamOptions], IntervalTimer USING [Microseconds, Now, WaitForExpirationTime], IO, LarkTest, PrincOpsUtils USING [LongCopy], Process USING [MsecToTicks, Pause, SetTimeout], PupDefs USING [GetFreePupBuffer, MsToTocks, PupBuffer, PupSocket, PupSocketDestroy, PupSocketMake, ReturnFreePupBuffer, SetPupContentsWords, UniqueLocalPupSocketID], PupTypes USING [PupAddress, PupType], Rope USING [ROPE]; PlayBackImpl: CEDAR MONITOR IMPORTS Basics, FS, IntervalTimer, IO, PrincOpsUtils, Process, PupDefs EXPORTS LarkTest = BEGIN sendProbes: BOOL _ TRUE; BluejayVoiceType: PupTypes.PupType = LOOPHOLE[250]; ProbeReplyType: PupTypes.PupType = LOOPHOLE[251]; BluejayPacket: TYPE = LONG POINTER TO BluejayPacketObject; <> <> BluejayPacketObject: TYPE = MACHINE DEPENDENT RECORD [ blank1: [0..77B], ignore: BOOL, probeRequest: BOOL, blank2: [0..17B], encryptionKeyIndex: [0..17B], energy: CARDINAL, silenceMS: CARDINAL, blank3: CARDINAL ]; ProbeReply: TYPE = LONG POINTER TO ProbeReplyPacketObject; <> <> ProbeReplyPacketObject: TYPE = MACHINE DEPENDENT RECORD [ replyID: CARDINAL, maxPacketSize: CARDINAL, maxBuffers: CARDINAL, blank1: CARDINAL ]; VoiceBuffer: TYPE = REF VoiceBufferObject; VoiceBufferObject: TYPE = RECORD [ valid: BOOL _ FALSE, cLength: NAT, data: SEQUENCE length: [0..4096) OF CARDINAL ]; PlayBack: TYPE = REF PlayBackObject; PlayBackObject: TYPE = RECORD [ fileName: Rope.ROPE, fileStream: IO.STREAM, him: PupTypes.PupAddress, lark: PupDefs.PupSocket, sequenceNumber: CARDINAL, probeSequenceNumber: CARDINAL, doneInitialProbing: BOOL, initialProbeDone: CONDITION, batchCount: NAT, probeLaunchTime: IntervalTimer.Microseconds, drift: INT, stop: BOOL, larkEpoch: CARDINAL, localEpoch: IntervalTimer.Microseconds, timeToPlay: CARDINAL, sendNextBatch: IntervalTimer.Microseconds, roundTripMS: CARDINAL, diskProcess: PROCESS, clockAdjustProcess: PROCESS, bufferA: VoiceBuffer, bufferB: VoiceBuffer ]; lastPB: PlayBack; SerialNumberProcType: TYPE = PROC RETURNS [CARDINAL]; PlayFile: PUBLIC PROC [fileName: Rope.ROPE, him: PupTypes.PupAddress] = TRUSTED { pb: PlayBack _ NEW[PlayBackObject]; rawOptions: FS.StreamOptions _ FS.defaultStreamOptions; rawOptions[tiogaRead] _ FALSE; lastPB _ pb; { pb.fileName _ fileName; pb.him _ him; pb.fileStream _ FS.StreamOpen[fileName: pb.fileName, streamOptions: rawOptions, streamBufferParms: [16, 4] ! FS.Error => TRUSTED { GOTO NoStart; }]; pb.lark _ PupDefs.PupSocketMake[local: PupDefs.UniqueLocalPupSocketID[], remote: pb.him, ticks: PupDefs.MsToTocks[250]]; pb.bufferA _ NEW[VoiceBufferObject[4000]]; pb.bufferA.valid _ FALSE; pb.bufferA.cLength _ 0; pb.bufferB _ NEW[VoiceBufferObject[4000]]; pb.bufferB.valid _ FALSE; pb.bufferB.cLength _ 0; pb.stop _ FALSE; pb.drift _ 0; pb.batchCount _ 0; pb.doneInitialProbing _ FALSE; Process.SetTimeout[condition: @pb.initialProbeDone, ticks: Process.MsecToTicks[100]]; pb.diskProcess _ FORK BufferFiller[pb]; pb.clockAdjustProcess _ FORK AdjustProcess[pb]; WHILE NOT pb.bufferA.valid DO Process.Pause[Process.MsecToTicks[100]]; ENDLOOP; EmptyProbe[pb: pb]; IF NOT pb.doneInitialProbing THEN { <> GOTO Finish; }; <> <> <> pb.timeToPlay _ pb.larkEpoch + 120; <> pb.sendNextBatch _ pb.localEpoch; DO IF NOT pb.bufferA.valid THEN EXIT; PlayABuffer[pb, pb.bufferA]; IF NOT pb.bufferB.valid THEN EXIT; PlayABuffer[pb, pb.bufferB]; ENDLOOP; GOTO Finish; EXITS Finish => { pb.stop _ TRUE; JOIN pb.diskProcess; JOIN pb.clockAdjustProcess; pb.fileStream.Close[]; PupDefs.PupSocketDestroy[pb.lark]; }; NoStart => NULL; }; }; BufferFiller: PROC [pb: PlayBack] = TRUSTED { end: BOOL; DO end _ FillABuf[pb, pb.bufferA, pb.fileStream]; IF end OR pb.stop THEN RETURN; end _ FillABuf[pb, pb.bufferB, pb.fileStream]; IF end OR pb.stop THEN RETURN; ENDLOOP; }; FillABuf: PROC [pb: PlayBack, vb: VoiceBuffer, s: IO.STREAM] RETURNS [eof: BOOL] = TRUSTED { read: INT; IF s.EndOf[] THEN { vb.cLength _ 0; vb.valid _ FALSE; RETURN [TRUE]; }; WHILE vb.valid AND NOT pb.stop DO Process.Pause[Process.MsecToTicks[100]]; ENDLOOP; read _ s.UnsafeGetBlock[block: [base: LOOPHOLE[BASE[DESCRIPTOR[vb.data]]], startIndex: 0, count: 8000]]; IF read < 8000 THEN FOR i: INT IN [read/2..4000) DO vb[i] _ 0; ENDLOOP; vb.cLength _ read/2; vb.valid _ TRUE; RETURN[read # 8000]; }; EmptyProbe: PROC [pb: PlayBack] = TRUSTED { b: PupDefs.PupBuffer; pp: BluejayPacket; trySN: CARDINAL; FOR i: NAT IN [0..4) DO b _ PupDefs.GetFreePupBuffer[]; pp _ LOOPHOLE[@b.pupBody]; trySN _ pb.sequenceNumber; pb.sequenceNumber _ pb.sequenceNumber + 1; b.pupID.a _ 0; b.pupID.b _ trySN; b.pupType _ BluejayVoiceType; pp^ _ [ blank1: 0, ignore: TRUE, probeRequest: TRUE, blank2: 0, encryptionKeyIndex: 0, energy: 0, silenceMS: 0, blank3: 0 ]; PupDefs.SetPupContentsWords[b, SIZE[BluejayPacketObject]]; SendProbeLocked[pb, b]; pb.lark.put[b]; WaitLocked[pb]; IF pb.doneInitialProbing THEN EXIT; ENDLOOP; }; SendProbeLocked: ENTRY PROCEDURE [pb: PlayBack, b: PupDefs.PupBuffer] = TRUSTED { pb.probeLaunchTime _ IntervalTimer.Now[]; pb.probeSequenceNumber _ b.pupID.b; }; WaitLocked: ENTRY PROCEDURE [pb: PlayBack] = TRUSTED { WAIT pb.initialProbeDone; }; PlayABuffer: PROC [pb: PlayBack, vb: VoiceBuffer] = TRUSTED { b: PupDefs.PupBuffer; pp: BluejayPacket; FOR i: NAT IN [0..vb.cLength/80) DO b _ PupDefs.GetFreePupBuffer[]; pp _ LOOPHOLE[@b.pupBody]; b.pupType _ BluejayVoiceType; b.pupID.b _ pb.sequenceNumber; pb.sequenceNumber _ pb.sequenceNumber + 1; b.pupID.a _ pb.timeToPlay; pb.timeToPlay _ pb.timeToPlay + 20; pp^ _ [ blank1: 0, ignore: FALSE, probeRequest: FALSE, blank2: 0, encryptionKeyIndex: 0, energy: 0, silenceMS: 0, blank3: 0 ]; PrincOpsUtils.LongCopy[from: @vb.data[i * 80], nwords: 80, to: @b.pupWords[4]]; PupDefs.SetPupContentsWords[b, SIZE[BluejayPacketObject] + 80]; IntervalTimer.WaitForExpirationTime[pb.sendNextBatch]; PlayPacketLocked[pb, pp, b]; pb.lark.put[b]; ENDLOOP; vb.valid _ FALSE; }; PlayPacketLocked: ENTRY PROC [pb: PlayBack, pp: BluejayPacket, b: PupDefs.PupBuffer] = TRUSTED { IF pb.batchCount = 4 AND sendProbes THEN { pp^.probeRequest _ TRUE; pb.probeLaunchTime _ IntervalTimer.Now[]; pb.probeSequenceNumber _ b.pupID.b; }; pb.batchCount _ (pb.batchCount + 1) MOD 5; pb.sendNextBatch _ pb.sendNextBatch + 20000; }; AdjustProcess: PROC [pb: PlayBack] = TRUSTED { b: PupDefs.PupBuffer; DO IF pb.stop THEN EXIT; b _ pb.lark.get[]; IF b = NIL THEN LOOP; IF b.pupType # ProbeReplyType THEN { PupDefs.ReturnFreePupBuffer[b]; LOOP; }; AdjustLocked[pb, b]; PupDefs.ReturnFreePupBuffer[b]; ENDLOOP; }; AdjustLocked: ENTRY PROC [pb: PlayBack, b: PupDefs.PupBuffer] = TRUSTED { rp: ProbeReply; deltaLark, deltaLocal: INT; rp _ LOOPHOLE[@b.pupBody]; IF rp^.replyID = pb.probeSequenceNumber THEN { <> <> <> IF pb.doneInitialProbing THEN { pb.roundTripMS _ (Basics.LowHalf[(IntervalTimer.Now[] - pb.probeLaunchTime) / 1000] + pb.roundTripMS) / 2; deltaLocal _ pb.probeLaunchTime - pb.localEpoch; deltaLark _ LONG[b.pupID.a - pb.larkEpoch] * 1000; <> pb.drift _ pb.drift + LOOPHOLE[deltaLocal - deltaLark, INT]; SELECT TRUE FROM pb.drift < -5000 => { pb.sendNextBatch _ pb.sendNextBatch - 5000; pb.drift _ pb.drift + 5000; }; pb.drift > 5000 => { <> <> pb.sendNextBatch _ pb.sendNextBatch + 5000; pb.drift _ pb.drift - 5000; }; ENDCASE; pb.larkEpoch _ b.pupID.a; pb.localEpoch _ pb.probeLaunchTime; } ELSE { -- initial probe case pb.roundTripMS _ Basics.LowHalf[(IntervalTimer.Now[] - pb.probeLaunchTime) / 1000]; pb.localEpoch _ pb.probeLaunchTime; pb.larkEpoch _ b.pupID.a; pb.doneInitialProbing _ TRUE; NOTIFY pb.initialProbeDone; }; }; }; END. March 17, 1983 2:02 pm, Stewart, created March 24, 1983 10:51 am, Stewart, condition variable and interval timer December 27, 1983 5:50 pm, Stewart, Cedar 5