<> <> <> <> <> <> <> DIRECTORY IO USING [noWhereStream, STREAM], Jukebox USING [bytesPerChirp, CloseTune, Handle, pagesPerChirp], PrincOps USING [ByteBltBlock], PrincOpsUtils USING [ByteBlt], Process USING [MsecToTicks, SetTimeout], Rope USING [ROPE], VM USING [AddressForPageNumber, Allocate, Free], VoiceStream; VoiceStreamBasic: MONITOR LOCKS Lock IMPORTS Jukebox, IO, PrincOpsUtils, Process, VM EXPORTS VoiceStream SHARES VoiceStream = BEGIN OPEN VoiceStream; <> Lock: PUBLIC MONITORLOCK; wholeTune: PUBLIC INT _ 100000000; serverCondition: PUBLIC CONDITION; client: PUBLIC CONDITION; waitCondition: PUBLIC CONDITION; closeCondition: PUBLIC CONDITION; ioStream: PUBLIC IO.STREAM _ IO.noWhereStream; VSList: PUBLIC Handle _ NIL; Error: PUBLIC ERROR[reason: ErrorCode, rope: Rope.ROPE] = CODE; DemonError: PUBLIC ERROR = CODE; <> <> nOpens: INT _ 0; nWaits: INT _ 0; Open: PUBLIC PROC [jukebox: Jukebox.Handle, proc: NotifyProc _ NIL, clientData: REF ANY _ NIL] RETURNS [handle: Handle] = { ENABLE UNWIND => NULL; buffer: REF Buffer _ NIL; handle _ NEW[VSRecord _ [jukebox, NIL, NIL, NIL, NIL, NIL, NIL, Bug, proc, FALSE, clientData, NIL, ,FALSE]]; Process.SetTimeout[condition: @handle.newPiece, ticks: Process.MsecToTicks[400]]; FOR i:INTEGER IN [0..buffersPerStream) DO buffer _ NEW[Buffer]; buffer.valid _ FALSE; buffer.chirpSpace _ VM.Allocate[count: Jukebox.pagesPerChirp]; buffer.block.blockPointer _ LOOPHOLE[VM.AddressForPageNumber[buffer.chirpSpace.page]]; buffer.runData _ buffer.block.blockPointer + (Jukebox.bytesPerChirp/2); buffer.next _ handle.firstIdleBuffer; handle.firstIdleBuffer _ buffer; ENDLOOP; OpenLocked[handle]; RETURN [handle]; }; OpenLocked: ENTRY PROC [handle: Handle] = { ENABLE UNWIND => NULL; handle.next _ VSList; VSList _ handle; nOpens _ nOpens + 1; <> NOTIFY serverCondition; }; Close: PUBLIC ENTRY PROC [handle: Handle] = { <> <> ENABLE UNWIND => NULL; buffer: REF Buffer; record: Handle; <> <> IF (handle.errorRope # NIL) AND (handle.errorCode = StreamClosed) THEN ERROR Error[StreamClosed, handle.errorRope]; <> flushBuffer[handle]; <> flushProc[handle]; <> WHILE (handle.firstClientBuffer # NIL) DO handle.firstClientBuffer.valid _ FALSE; giveServerBuffer[handle:handle]; ENDLOOP; <> <> <> WHILE (handle.firstServerBuffer # NIL) AND (handle.errorRope = NIL) DO NOTIFY serverCondition; WAIT closeCondition; ENDLOOP; <> IF handle.piece # NIL THEN { IF handle.piece.tune # NIL THEN Jukebox.CloseTune[handle.jukebox, handle.piece.tune]; }; <> IF handle.connection # NIL THEN handle.connection.socket _ NIL; <> <> <> IF VSList = handle THEN VSList _ handle.next ELSE { record _ VSList; WHILE record.next # handle DO IF record.next = NIL THEN ERROR Error[Bug, "Unexpected VSList end."]; record _ record.next; ENDLOOP; record.next _ handle.next; }; WHILE handle.firstClientBuffer # NIL DO buffer _ handle.firstClientBuffer; handle.firstClientBuffer _ buffer.next; VM.Free[buffer.chirpSpace]; ENDLOOP; WHILE handle.firstServerBuffer # NIL DO buffer _ handle.firstServerBuffer; handle.firstServerBuffer _ buffer.next; VM.Free[buffer.chirpSpace]; ENDLOOP; WHILE handle.firstIdleBuffer # NIL DO buffer _ handle.firstIdleBuffer; handle.firstIdleBuffer _ buffer.next; VM.Free[buffer.chirpSpace]; ENDLOOP; <> handle.errorCode _ StreamClosed; handle.errorRope _ "Can't use voice stream after it's closed."; handle.jukebox _ NIL; BROADCAST client; BROADCAST waitCondition; }; Get: PUBLIC ENTRY PROC [handle: Handle, maxSilentBytes: NAT, block: PrincOps.ByteBltBlock, wait: BOOL _ FALSE] RETURNS [silence: NAT _ 0, bytesTransferred: NAT _ 0, keyIndex: NAT _ 0] = { ENABLE UNWIND => NULL; buffer: REF Buffer; silencePass: BOOL _ TRUE; -- state variable silenceRunType: BOOL; runSize: NAT; DO <<1) Aquire a non-empty chirp>> <<>> <> <<>> WHILE (handle.firstClientBuffer = NIL) DO IF handle.errorRope # NIL THEN ERROR Error[handle.errorCode, handle.errorRope]; IF handle.piece = NIL THEN { IF handle.proc # NIL AND NOT handle.notified THEN { handle.notified _ TRUE; handle.proc[handle, handle.clientData]; }; BROADCAST waitCondition; RETURN; }; nWaits _ nWaits + 1; IF wait THEN WAIT client ELSE RETURN; ENDLOOP; buffer _ handle.firstClientBuffer; handle.notified _ FALSE; keyIndex _ buffer.keyIndex; <> <> IF buffer.toJukebox THEN RETURN; <> <> IF handle.piece.flush THEN { handle.firstClientBuffer.valid _ FALSE; giveServerBuffer[handle: handle]; LOOP; }; <<>> <> IF buffer.toJukebox THEN RETURN; <> IF buffer.bytesAccountedFor >= Jukebox.bytesPerChirp THEN { <> giveServerBuffer[handle: handle]; LOOP; }; <<2) Aquire a non-empty run>> <<>> runSize _ buffer.runData.runArray[buffer.runIndex]; IF runSize = 0 THEN { buffer.runIndex _ buffer.runIndex + 1; LOOP; }; <<3) build state variables>> <<>> silenceRunType _ (buffer.runIndex MOD 2) = 0; IF silencePass AND NOT silenceRunType THEN silencePass _ FALSE; SELECT TRUE FROM silencePass AND NOT silenceRunType => ERROR; silencePass AND silenceRunType => { silenceAmount: CARDINAL _ MIN[runSize, maxSilentBytes]; silence _ silence + silenceAmount; buffer.runData.runArray[buffer.runIndex] _ runSize - silenceAmount; buffer.bytesAccountedFor _ buffer.bytesAccountedFor + silenceAmount; maxSilentBytes _ maxSilentBytes - silenceAmount; IF maxSilentBytes = 0 THEN silencePass _ FALSE; }; NOT silencePass AND NOT silenceRunType => { voiceAmount: CARDINAL _ MIN[runSize, block.stopIndexPlusOne - block.startIndex]; origStopIndex: CARDINAL _ block.stopIndexPlusOne; count: CARDINAL; block.stopIndexPlusOne _ block.startIndex + voiceAmount; count _ PrincOpsUtils.ByteBlt[from: buffer.block, to: block]; IF count # voiceAmount THEN ERROR; block.stopIndexPlusOne _ origStopIndex; block.startIndex _ block.startIndex + voiceAmount; buffer.block.startIndex _ buffer.block.startIndex + voiceAmount; buffer.bytesAccountedFor _ buffer.bytesAccountedFor + voiceAmount; buffer.runData.runArray[buffer.runIndex] _ runSize - voiceAmount; bytesTransferred _ bytesTransferred + voiceAmount; IF block.startIndex >= block.stopIndexPlusOne THEN { IF buffer.bytesAccountedFor >= Jukebox.bytesPerChirp THEN { <> giveServerBuffer[handle: handle]; }; handle.action _ TRUE; RETURN; }; }; NOT silencePass AND silenceRunType => { RETURN; }; ENDCASE => ERROR; handle.action _ TRUE; ENDLOOP; }; Put: PUBLIC ENTRY PROC [handle: Handle, silentBytes: NAT, block: PrincOps.ByteBltBlock] RETURNS [bytesTransferred: NAT _ 0] = { <> <> ENABLE UNWIND => NULL; buffer: REF Buffer; silencePass: BOOL _ silentBytes > 0; silenceRunType: BOOL; runSize: NAT; < 0 OR block.startIndex < block.stopIndexPlusOne>> DO WHILE handle.firstClientBuffer = NIL DO IF handle.errorRope # NIL THEN ERROR Error[handle.errorCode, handle.errorRope]; IF handle.piece = NIL THEN { IF handle.proc # NIL AND NOT handle.notified THEN { handle.notified _ TRUE; handle.proc[handle, handle.clientData]; }; BROADCAST waitCondition; RETURN; }; WAIT client; ENDLOOP; <> <> buffer _ handle.firstClientBuffer; handle.notified _ FALSE; IF NOT buffer.toJukebox THEN RETURN; <> <<(throw it away if it's invalid, otherwise fill it with zeroes).>> IF handle.piece.flush THEN { IF handle.firstClientBuffer.valid THEN flushBuffer[handle] ELSE giveServerBuffer[handle: handle]; LOOP; }; <> IF NOT buffer.toJukebox THEN RETURN; <<>> <> IF buffer.bytesAccountedFor >= Jukebox.bytesPerChirp THEN { <> giveServerBuffer[handle: handle]; LOOP; }; <<>> <> silenceRunType _ (buffer.runIndex MOD 2) = 0; runSize _ buffer.runData.runArray[buffer.runIndex]; SELECT TRUE FROM silencePass AND NOT silenceRunType => { buffer.runIndex _ buffer.runIndex + 1; buffer.runData.runArray[buffer.runIndex] _ 0; }; silencePass AND silenceRunType => { silenceAmount: CARDINAL _ MIN[silentBytes, Jukebox.bytesPerChirp - buffer.bytesAccountedFor]; silentBytes _ silentBytes - silenceAmount; buffer.runData.runArray[buffer.runIndex] _ runSize + silenceAmount; buffer.bytesAccountedFor _ buffer.bytesAccountedFor + silenceAmount; bytesTransferred _ bytesTransferred + silenceAmount; IF silentBytes = 0 THEN silencePass _ FALSE; }; NOT silencePass AND silenceRunType => { buffer.runIndex _ buffer.runIndex + 1; buffer.runData.runArray[buffer.runIndex] _ 0; }; NOT silencePass AND NOT silenceRunType => { voiceAmount: CARDINAL _ MIN[block.stopIndexPlusOne - block.startIndex, Jukebox.bytesPerChirp - buffer.bytesAccountedFor]; origStopIndex: CARDINAL _ block.stopIndexPlusOne; count: CARDINAL; block.stopIndexPlusOne _ block.startIndex + voiceAmount; count _ PrincOpsUtils.ByteBlt[from: block, to: buffer.block]; IF count # voiceAmount THEN ERROR; block.stopIndexPlusOne _ origStopIndex; IF voiceAmount > 0 THEN buffer.valid _ TRUE; block.startIndex _ block.startIndex + voiceAmount; buffer.block.startIndex _ buffer.block.startIndex + voiceAmount; buffer.bytesAccountedFor _ buffer.bytesAccountedFor + voiceAmount; buffer.runData.runArray[buffer.runIndex] _ runSize + voiceAmount; bytesTransferred _ bytesTransferred + voiceAmount; IF block.startIndex >= block.stopIndexPlusOne THEN { IF buffer.bytesAccountedFor >= Jukebox.bytesPerChirp THEN { <> giveServerBuffer[handle: handle]; }; handle.action _ TRUE; RETURN; }; }; ENDCASE => ERROR; handle.action _ TRUE; ENDLOOP; }; AddPiece: PUBLIC ENTRY PROC [handle: Handle, tuneId: INT, firstByte: INT, nBytes: INT, create: BOOLEAN, playback: BOOLEAN _ TRUE, keyIndex: NAT _ 0, flush: BOOLEAN _ FALSE] = { <> <> ENABLE UNWIND => NULL; piece, p2: REF Piece; IF handle.errorRope # NIL THEN ERROR Error[handle.errorCode, handle.errorRope]; IF flush THEN flushProc[handle]; piece _ NEW[Piece _ [tuneId, create, firstByte, nBytes, NIL, playback, keyIndex, NIL, FALSE]]; IF handle.piece = NIL THEN handle.piece _ piece ELSE { p2 _ handle.piece; WHILE p2.next # NIL DO p2 _ p2.next ENDLOOP; p2.next _ piece; }; <> <> NOTIFY serverCondition; }; FlushPieces: PUBLIC ENTRY PROC [handle: Handle] = { ENABLE UNWIND => NULL; flushProc[handle]; }; flushProc: INTERNAL PROC [handle: Handle] = { <> <> <> <> <> <> <> p: REF Piece _ handle.piece; WHILE p # NIL DO p.flush _ TRUE; p _ p.next; ENDLOOP; BROADCAST client; BROADCAST waitCondition; }; IsEmpty: PUBLIC PROC [handle: Handle] RETURNS [BOOLEAN] = { <
> <> IF handle.firstClientBuffer # NIL THEN RETURN[FALSE]; IF handle.piece # NIL THEN RETURN[FALSE]; RETURN[TRUE]; }; WaitEmpty: PUBLIC ENTRY PROC [handle: Handle] = { <> ENABLE UNWIND => NULL; WHILE TRUE DO IF handle.errorRope # NIL THEN RETURN; IF (handle.firstClientBuffer = NIL) AND (handle.piece = NIL) THEN RETURN; WAIT waitCondition; ENDLOOP; }; Check: PUBLIC ENTRY PROC [handle: Handle] RETURNS[BOOLEAN] = { <> <> <> <> ENABLE UNWIND => NULL; IF handle.errorRope # NIL THEN ERROR Error[handle.errorCode, handle.errorRope]; IF handle.firstClientBuffer = NIL THEN RETURN[FALSE]; IF handle.firstClientBuffer.toJukebox THEN RETURN[FALSE]; RETURN[TRUE]; }; giveServerBuffer: INTERNAL PROC [handle: Handle] = { <> <> buffer, b2: REF Buffer; buffer _ handle.firstClientBuffer; handle.firstClientBuffer _ buffer.next; buffer.next _ NIL; IF handle.firstServerBuffer = NIL THEN handle.firstServerBuffer _ buffer ELSE { b2 _ handle.firstServerBuffer; WHILE b2.next # NIL DO b2 _ b2.next ENDLOOP; b2.next _ buffer; }; NOTIFY serverCondition; }; flushBuffer: PUBLIC INTERNAL PROC [handle: Handle] = { <> <> buffer: REF Buffer; buffer _ handle.firstClientBuffer; IF buffer = NIL THEN RETURN; IF ~buffer.toJukebox THEN RETURN; IF buffer.bytesAccountedFor IN (0..Jukebox.bytesPerChirp) THEN { IF (buffer.runIndex MOD 2) # 0 THEN { -- voice run IF buffer.runData.runArray[buffer.runIndex] = 0 THEN buffer.runIndex _ buffer.runIndex - 1 ELSE { buffer.runIndex _ buffer.runIndex + 1; buffer.runData.runArray[buffer.runIndex] _ 0; }; }; buffer.runData.runArray[buffer.runIndex] _ buffer.runData.runArray[buffer.runIndex] + (Jukebox.bytesPerChirp - buffer.bytesAccountedFor); giveServerBuffer[handle: handle]; }; }; END. Last Edited by: L. Stewart, March 25, 1983 3:54 pm, VoiceStream change L. Stewart, April 5, 1983 2:38 pm, Tioga formatting, rundata L. Stewart, April 11, 1983 1:06 pm, bug fixing in rundata Last Edited by: L. Stewart, April 19, 1983 3:17 pm, Silence encoding L. Stewart, June 4, 1983 5:42 pm, call notifyProc only once L. Stewart, December 27, 1983 1:44 pm, Cedar 5