<> <> <> <> <> <> <> DIRECTORY Basics USING [BITAND, LowHalf], BasicTime, <> IntervalTimer USING [NowInPulses, Pulses, WaitForExpirationTimeInPulses], <> Process USING [Detach, GetCurrent, MsecToTicks, Priority, priorityForeground, SetPriority, SetTimeout], PupBuffer USING [ Buffer ], PupSocket USING [ AllocBuffer, FreeBuffer, Get, GetUserBytes, Put, SetUserBytes, SetUserHWords, Socket ], PupType USING [Type], Rope USING [ROPE], VoiceStream; VoiceStreamSocketImpl: MONITOR LOCKS VoiceStream.Lock IMPORTS Basics, BasicTime, IntervalTimer, Process, PupSocket, VoiceStream EXPORTS VoiceStream = BEGIN socketPriority: Process.Priority _ Process.priorityForeground; <> MicrosecondsPerPulse: LONG CARDINAL = 32; PulsesPerMillisecond: LONG CARDINAL = 31; -- really 31.25 <> BluejayVoiceType: PupType.Type = VAL[250]; ProbeReplyType: PupType.Type = VAL[251]; LarkFirstVoiceType: PupType.Type = VAL[252]; RetransmitRequestType: PupType.Type = VAL[253]; LarkVoiceType: PupType.Type = VAL[254]; BluejayPacket: TYPE = LONG POINTER TO BluejayPacketObject; <> <> <> BluejayPacketObject: TYPE = MACHINE DEPENDENT RECORD [ blank1: [0..77B], ignore: BOOL, probeRequest: BOOL, blank2: [0..17B], encryptionKeyIndex: [0..17B], energy: CARDINAL, silenceMS: CARDINAL, blank3: CARDINAL ]; ProbeReply: TYPE = LONG POINTER TO ProbeReplyPacketObject; <> <> <> ProbeReplyPacketObject: TYPE = MACHINE DEPENDENT RECORD [ replyID: CARDINAL, maxPacketSize: CARDINAL, maxBuffers: CARDINAL, blank1: CARDINAL ]; <> <> <> LarkPacketObject: TYPE = MACHINE DEPENDENT RECORD [ blank1: [0..377B], blank2: [0..17B], encryptionKeyIndex: [0..17B], energy: CARDINAL, silenceMS: CARDINAL, -- preceding data blank3: CARDINAL ]; <> <> <> <> bytesPerPacket: INTEGER = 160; bytesPerMs: NAT = 8; debugSwitch: BOOL _ FALSE; maxLostTime: LONG CARDINAL _ 100000 / MicrosecondsPerPulse; now: LONG CARDINAL; timesLate: LONG CARDINAL _ 0; noPieceMS: NAT _ 200; SetSocket: PUBLIC ENTRY PROC [handle: VoiceStream.Handle, socket: PupSocket.Socket] = { ENABLE UNWIND => NULL; IF handle.errorRope # NIL THEN VoiceStream.ReportError[handle]; <> IF handle.connection = NIL THEN handle.connection _ NEW[VoiceStream.ConnectionObject]; handle.connection.socket _ NIL; IF socket # NIL THEN { handle.connection.socket _ socket; Process.Detach[FORK ReceiveProc[handle, socket]]; Process.Detach[FORK SendProc[handle, socket]]; }; }; CheckHandle: ENTRY PROC [handle: VoiceStream.Handle, socket: PupSocket.Socket] RETURNS [Rope.ROPE] = { ENABLE UNWIND => NULL; <<>> <> IF handle.connection.socket # socket THEN RETURN ["Socket changed."]; IF handle.errorRope # NIL THEN RETURN [handle.errorRope]; RETURN [NIL]; }; ForceToJukebox: ENTRY PROC [handle: VoiceStream.Handle] = { <<>> <> ENABLE UNWIND => NULL; VoiceStream.FlushBuffer[handle]; WHILE (handle.firstServerBuffer # NIL) AND (handle.errorRope = NIL) AND (handle.piece # NIL) DO WAIT VoiceStream.client ENDLOOP; }; ReceiveProc: PROC [handle: VoiceStream.Handle, origSocket: PupSocket.Socket] = { <<>> <> ENABLE { VoiceStream.Error => { <> CONTINUE; }; }; input: PupBuffer.Buffer; expLarkTime: CARDINAL _ 0; -- Expected time for next data. larkTime: CARDINAL _ 0; -- Time of current packet. silenceMs: CARDINAL _ 0; dataBytes: CARDINAL; first: BOOL _ TRUE; -- first packet of recording reason: Rope.ROPE _ NIL; Process.SetPriority[socketPriority]; <> WHILE (reason _ CheckHandle[handle, origSocket]) = NIL DO input _ handle.connection.socket.Get[]; IF input = NIL THEN LOOP; <> larkTime _ input.id.high; SELECT input.type FROM <> = ProbeReplyType => { AdjustLocked[handle.connection, input]; handle.connection.packetSize _ input.hWord[1]; handle.connection.bufferSpace _ input.hWord[2] * (handle.connection.packetSize / bytesPerMs); }; <> = LarkVoiceType, = LarkFirstVoiceType => { silenceMs _ larkTime - expLarkTime; silenceMs _ MIN[silenceMs, 1000]; input.hWord[2] _ MIN[260, input.hWord[2]]; IF NOT first AND silenceMs NOT IN [0..250) THEN { <> silenceMs _ 0; }; <> IF first THEN silenceMs _ 0 ELSE silenceMs _ silenceMs + input.hWord[2]; dataBytes _ PupSocket.GetUserBytes[input] - 8; dataBytes _ Basics.BITAND[dataBytes, 0177770B]; [] _ VoiceStream.Put[handle: handle, silentBytes: silenceMs * bytesPerMs, block: [@input.hWord[4], 0, dataBytes], energy: input.hWord[1]]; expLarkTime _ larkTime + input.hWord[2] + (dataBytes/bytesPerMs); first _ FALSE; }; ENDCASE; PupSocket.FreeBuffer[input]; ENDLOOP; <> }; AdjustLocked: ENTRY PROC [c: VoiceStream.Connection, b: PupBuffer.Buffer] = { ENABLE UNWIND => NULL; rp: ProbeReply; deltaLark, deltaLocal: INT; rp _ LOOPHOLE[@b.body]; IF rp^.replyID = c.probeSequenceNumber THEN { <<>> <> IF c.initialProbeFinished THEN { c.roundTripMS _ (Basics.LowHalf[(IntervalTimer.NowInPulses[] - c.probeLaunchTime) / PulsesPerMillisecond] + c.roundTripMS) / 2; deltaLocal _ c.probeLaunchTime - c.localEpoch; deltaLark _ LONG[b.id.high - c.larkEpoch] * PulsesPerMillisecond; <> c.drift _ c.drift + deltaLocal - deltaLark; SELECT TRUE FROM c.drift < -312 => { c.sendNext _ c.sendNext - 150; c.drift _ c.drift + 150; }; c.drift > 312 => { <> <> c.sendNext _ c.sendNext + 150; c.drift _ c.drift - 150; }; ENDCASE; c.larkEpoch _ b.id.high; c.localEpoch _ c.probeLaunchTime; } ELSE { -- initial probe case c.roundTripMS _ Basics.LowHalf[(IntervalTimer.NowInPulses[] - c.probeLaunchTime) / PulsesPerMillisecond]; c.localEpoch _ c.probeLaunchTime; c.larkEpoch _ b.id.high; c.drift _ 0; c.initialProbeFinished _ TRUE; NOTIFY c.initialProbeDone; }; }; }; SendProc: PROC [handle: VoiceStream.Handle, origSocket: PupSocket.Socket] = { <> ENABLE { VoiceStream.Error => { <> CONTINUE; }; }; output: PupBuffer.Buffer _ NIL; pp: BluejayPacket; c: VoiceStream.Connection _ handle.connection; silenceLength: NAT; voiceLength: NAT; reason: Rope.ROPE; silenceMS: NAT _ 0; voiceMS: NAT _ 0; <> lastPlayed: IntervalTimer.Pulses; noPieceThisTime: BOOL _ FALSE; wakeTime: IntervalTimer.Pulses; { Process.SetPriority[socketPriority]; c.initialProbeFinished _ FALSE; Process.SetTimeout[condition: @c.initialProbeDone, ticks: Process.MsecToTicks[100]]; <> EmptyProbe[handle: handle, origSocket: origSocket]; IF NOT c.initialProbeFinished THEN { <> handle.errorRope _ "Lark failed to respond"; handle.errorCode _ Timeout; GOTO Cleanup; }; <> <> c.timeToPlay _ c.larkEpoch + MIN[120, c.bufferSpace]; <> c.sendNext _ c.localEpoch; <> lastPlayed _ IntervalTimer.NowInPulses[] - (10000 / MicrosecondsPerPulse); <> WHILE (reason _ CheckHandle[handle, origSocket]) = NIL DO IF output = NIL THEN { output _ c.socket.AllocBuffer[]; pp _ LOOPHOLE[@output.body]; output.type _ BluejayVoiceType; output.id.low _ c.sequenceNumber; c.sequenceNumber _ c.sequenceNumber + 1; pp^ _ [ blank1: 0, ignore: FALSE, probeRequest: FALSE, blank2: 0, encryptionKeyIndex: 0, energy: 0, silenceMS: 0, blank3: 0 ]; }; output.id.high _ c.timeToPlay; <<>> <> [silence: silenceLength, bytesTransferred: voiceLength, keyIndex: pp.encryptionKeyIndex] _ VoiceStream.Get[handle: handle, maxSilentBytes: 500 * bytesPerMs, block: [@output.hWord[4], 0, bytesPerPacket]]; voiceLength _ Basics.BITAND[voiceLength, 0177770B]; silenceMS _ silenceLength/bytesPerMs; voiceMS _ voiceLength/bytesPerMs; <> noPieceThisTime _ (silenceLength = 0) AND (voiceLength = 0); IF noPieceThisTime THEN silenceMS _ noPieceMS; c.packetMS _ silenceMS + voiceMS; pp.silenceMS _ silenceMS; PupSocket.SetUserBytes[output, SIZE[BluejayPacketObject] * 2 + voiceLength]; <<>> now _ IntervalTimer.NowInPulses[]; IF LOOPHOLE[(LOOPHOLE[lastPlayed, LONG CARDINAL] + (5000 / MicrosecondsPerPulse)) - now, INT] > 0 THEN { IntervalTimer.WaitForExpirationTimeInPulses[lastPlayed + (5000 / MicrosecondsPerPulse)]; -- wait at least 5 mS now _ IntervalTimer.NowInPulses[]; IF (now - (lastPlayed + (5000 / MicrosecondsPerPulse))) > maxLostTime THEN { timesLate _ timesLate + 1; }; }; <> now _ IntervalTimer.NowInPulses[]; wakeTime _ c.sendNext; IF LOOPHOLE[wakeTime - now, INT] > 0 THEN { IntervalTimer.WaitForExpirationTimeInPulses[wakeTime]; now _ IntervalTimer.NowInPulses[]; IF now - wakeTime > maxLostTime THEN { timesLate _ timesLate + 1; }; }; PlayPacketLocked[c, pp, output]; IF (reason _ CheckHandle[handle, origSocket]) # NIL THEN { PupSocket.FreeBuffer[output]; output _ NIL; EXIT; }; c.socket.Put[output]; output _ NIL; lastPlayed _ IntervalTimer.NowInPulses[]; ENDLOOP; GOTO Cleanup; EXITS Cleanup => { <> }; }; }; <> PlayPacketLocked: ENTRY PROC [c: VoiceStream.Connection, pp: BluejayPacket, b: PupBuffer.Buffer] = { ENABLE UNWIND => NULL; IF (c.sendNext - c.probeLaunchTime) NOT IN [0..450000 / MicrosecondsPerPulse) THEN { pp^.probeRequest _ TRUE; c.probeLaunchTime _ IntervalTimer.NowInPulses[]; c.probeSequenceNumber _ b.id.low; }; c.timeToPlay _ c.timeToPlay + c.packetMS; c.sendNext _ c.sendNext + (LONG[c.packetMS] * PulsesPerMillisecond); }; EmptyProbe: PROC [handle: VoiceStream.Handle, origSocket: PupSocket.Socket] = { b: PupBuffer.Buffer; pp: BluejayPacket; trySN: CARDINAL; c: VoiceStream.Connection _ handle.connection; WHILE CheckHandle[handle, origSocket] = NIL DO b _ c.socket.AllocBuffer[]; pp _ LOOPHOLE[@b.body]; trySN _ c.sequenceNumber; c.sequenceNumber _ c.sequenceNumber + 1; b.id.high _ 0; b.id.low _ trySN; b.type _ BluejayVoiceType; pp^ _ [ blank1: 0, ignore: TRUE, probeRequest: TRUE, blank2: 0, encryptionKeyIndex: 0, energy: 0, silenceMS: 0, blank3: 0 ]; PupSocket.SetUserHWords[b, SIZE[BluejayPacketObject]]; SendProbeLocked[c, b]; c.socket.Put[b]; WaitLocked[c]; IF c.initialProbeFinished THEN EXIT; ENDLOOP; }; SendProbeLocked: ENTRY PROC [c: VoiceStream.Connection, b: PupBuffer.Buffer] = { ENABLE UNWIND => NULL; c.probeLaunchTime _ IntervalTimer.NowInPulses[]; c.probeSequenceNumber _ b.id.low; }; WaitLocked: ENTRY PROC [c: VoiceStream.Connection] = { WAIT c.initialProbeDone; }; END. L. Stewart, June 5, 1983 4:30 pm, no notify on SetSocket L. Stewart, June 6, 1983 1:15 pm, LarkFirstVoiceType packet type L. Stewart, June 20, 1983 4:52 pm, Try using Pulses instead of Microseconds L. Stewart, July 9, 1983 4:48 pm, added noPieceMS in place of constant L. Stewart, May 14, 1985 6:29:20 pm PDT, Cedar 5, L <> <> <> <<>> <<>> <<>> <> <> <> <<>>