<> <> <> <> <> <> <> DIRECTORY Basics USING [BITAND, LowHalf], BasicTime, <> IntervalTimer USING [NowInPulses, Pulses, WaitForExpirationTimeInPulses], <> Process USING [Detach, GetCurrent, MsecToTicks, Priority, priorityForeground, SetPriority, SetTimeout], PupDefs USING [GetFreePupBuffer, GetPupContentsBytes, PupBuffer, PupSocket, ReturnFreePupBuffer, SetPupContentsBytes, SetPupContentsWords], PupTypes USING [PupType], Rope USING [ROPE], SpyOps USING [Record, StackType], VoiceStream; VoiceStreamSocket: MONITOR LOCKS Lock IMPORTS Basics, BasicTime, IntervalTimer, Process, PupDefs, SpyOps, VoiceStream EXPORTS VoiceStream SHARES VoiceStream = BEGIN OPEN VoiceStream; socketPriority: Process.Priority _ Process.priorityForeground; <> MicrosecondsPerPulse: LONG CARDINAL = 32; PulsesPerMillisecond: LONG CARDINAL = 31; -- really 31.25 <> BluejayVoiceType: PupTypes.PupType = LOOPHOLE[250]; ProbeReplyType: PupTypes.PupType = LOOPHOLE[251]; LarkFirstVoiceType: PupTypes.PupType = LOOPHOLE[252]; RetransmitRequestType: PupTypes.PupType = LOOPHOLE[253]; LarkVoiceType: PupTypes.PupType = LOOPHOLE[254]; BluejayPacket: TYPE = LONG POINTER TO BluejayPacketObject; <> <> <> BluejayPacketObject: TYPE = MACHINE DEPENDENT RECORD [ blank1: [0..77B], ignore: BOOL, probeRequest: BOOL, blank2: [0..17B], encryptionKeyIndex: [0..17B], energy: CARDINAL, silenceMS: CARDINAL, blank3: CARDINAL ]; ProbeReply: TYPE = LONG POINTER TO ProbeReplyPacketObject; <> <> <> ProbeReplyPacketObject: TYPE = MACHINE DEPENDENT RECORD [ replyID: CARDINAL, maxPacketSize: CARDINAL, maxBuffers: CARDINAL, blank1: CARDINAL ]; <> <> <> LarkPacketObject: TYPE = MACHINE DEPENDENT RECORD [ blank1: [0..377B], blank2: [0..17B], encryptionKeyIndex: [0..17B], energy: CARDINAL, silenceMS: CARDINAL, -- preceding data blank3: CARDINAL ]; <> <> <> <> bytesPerPacket: INTEGER = 160; bytesPerMs: NAT = 8; debugSwitch: BOOL _ FALSE; maxLostTime: LONG CARDINAL _ 100000 / MicrosecondsPerPulse; now: LONG CARDINAL; timesLate: LONG CARDINAL _ 0; noPieceMS: NAT _ 200; SetSocket: PUBLIC ENTRY PROC [handle: Handle, socket: PupDefs.PupSocket] = { ENABLE UNWIND => NULL; IF handle.errorRope # NIL THEN ERROR Error[handle.errorCode, handle.errorRope]; <> IF handle.connection = NIL THEN handle.connection _ NEW[VoiceStream.ConnectionObject]; handle.connection.socket _ NIL; IF socket # NIL THEN { handle.connection.socket _ socket; handle.notified _ TRUE; Process.Detach[FORK receiveProc[handle, socket]]; Process.Detach[FORK sendProc[handle, socket]]; }; }; checkHandle: ENTRY PROC [handle: Handle, socket: PupDefs.PupSocket] RETURNS [Rope.ROPE] = { ENABLE UNWIND => NULL; <<>> <> IF handle.connection.socket # socket THEN RETURN ["Socket changed."]; IF handle.errorRope # NIL THEN RETURN [handle.errorRope]; RETURN [NIL]; }; forceToJukebox: ENTRY PROC [handle: Handle] = { <<>> <> ENABLE UNWIND => NULL; flushBuffer[handle]; WHILE (handle.firstServerBuffer # NIL) AND (handle.errorRope = NIL) AND (handle.piece # NIL) DO WAIT client ENDLOOP; }; receiveProc: PROC [handle: Handle, origSocket: PupDefs.PupSocket] = { <<>> <> ENABLE { Error => { <> CONTINUE; }; }; input: PupDefs.PupBuffer; expLarkTime: CARDINAL _ 0; -- Expected time for next data. larkTime: CARDINAL _ 0; -- Time of current packet. silenceMs: CARDINAL _ 0; dataBytes: CARDINAL; first: BOOL _ TRUE; -- first packet of recording reason: Rope.ROPE _ NIL; Process.SetPriority[socketPriority]; <> WHILE (reason _ checkHandle[handle, origSocket]) = NIL DO input _ handle.connection.socket.get[]; IF input = NIL THEN LOOP; <> larkTime _ input.pupID.a; SELECT input.pupType FROM <> = ProbeReplyType => { AdjustLocked[handle.connection, input]; handle.connection.packetSize _ input.pupWords[1]; handle.connection.bufferSpace _ input.pupWords[2] * (handle.connection.packetSize / bytesPerMs); }; <> = LarkVoiceType, = LarkFirstVoiceType => { silenceMs _ larkTime - expLarkTime; silenceMs _ MIN[silenceMs, 1000]; input.pupWords[2] _ MIN[260, input.pupWords[2]]; IF NOT first AND silenceMs NOT IN [0..250) THEN { <> silenceMs _ 0; }; <> IF first THEN silenceMs _ 0 ELSE silenceMs _ silenceMs + input.pupWords[2]; dataBytes _ PupDefs.GetPupContentsBytes[input] - 8; dataBytes _ Basics.BITAND[dataBytes, 0177770B]; [] _ Put[handle: handle, silentBytes: silenceMs * bytesPerMs, block: [@input.pupWords[4], 0, dataBytes]]; expLarkTime _ larkTime + input.pupWords[2] + (dataBytes/bytesPerMs); first _ FALSE; }; ENDCASE; PupDefs.ReturnFreePupBuffer[input]; ENDLOOP; <> }; AdjustLocked: ENTRY PROC [c: VoiceStream.Connection, b: PupDefs.PupBuffer] = { ENABLE UNWIND => NULL; rp: ProbeReply; deltaLark, deltaLocal: INT; rp _ LOOPHOLE[@b.pupBody]; IF rp^.replyID = c.probeSequenceNumber THEN { <<>> <> IF c.initialProbeFinished THEN { <> c.roundTripMS _ (Basics.LowHalf[(IntervalTimer.NowInPulses[] - c.probeLaunchTime) / PulsesPerMillisecond] + c.roundTripMS) / 2; deltaLocal _ c.probeLaunchTime - c.localEpoch; <> deltaLark _ LONG[b.pupID.a - c.larkEpoch] * PulsesPerMillisecond; <> c.drift _ c.drift + deltaLocal - deltaLark; SELECT TRUE FROM c.drift < -312 => { c.sendNext _ c.sendNext - 150; c.drift _ c.drift + 150; }; c.drift > 312 => { <> <> c.sendNext _ c.sendNext + 150; c.drift _ c.drift - 150; }; ENDCASE; <