File: VoiceStreamBasic.mesa
This file contains part of the VoiceStream implementation.
Routines in this file provide basic client facilities such as
opening closing, reading, and writing voice streams.
Last Edited by: Swinehart, April 14, 1983 10:48 am
Last Edited by: Ousterhout, March 8, 1983 11:37 amcompile
Last Edited by: L. Stewart, December 30, 1983 11:19 am
DIRECTORY
IO USING [noWhereStream, STREAM],
Jukebox USING [bytesPerChirp, CloseTune, Handle, pagesPerChirp],
PrincOps USING [ByteBltBlock],
PrincOpsUtils USING [ByteBlt],
Process USING [MsecToTicks, SetTimeout],
Rope USING [ROPE],
VM USING [AddressForPageNumber, Allocate, Free],
VoiceStream;
VoiceStreamBasic: MONITOR LOCKS Lock
IMPORTS Jukebox, IO, PrincOpsUtils, Process, VM
EXPORTS VoiceStream
SHARES VoiceStream =
BEGIN OPEN VoiceStream;
See VoiceStream.mesa for documentation on the following things.
Lock: PUBLIC MONITORLOCK;
wholeTune: PUBLIC INT ← 100000000;
serverCondition: PUBLIC CONDITION;
client: PUBLIC CONDITION;
waitCondition: PUBLIC CONDITION;
closeCondition: PUBLIC CONDITION;
ioStream: PUBLIC IO.STREAMIO.noWhereStream;
VSList: PUBLIC Handle ← NIL;
Error: PUBLIC ERROR[reason: ErrorCode, rope: Rope.ROPE] = CODE;
DemonError: PUBLIC ERROR = CODE;
The following variables keep track of how many streams have been
used and how often waiting occurred.
nOpens: INT ← 0;
nWaits: INT ← 0;
Open: PUBLIC PROC [jukebox: Jukebox.Handle, proc: NotifyProc ← NIL,
clientData: REF ANYNIL]
RETURNS [handle: Handle] = {
ENABLE UNWIND => NULL;
buffer: REF Buffer ← NIL;
handle ← NEW[VSRecord ← [jukebox, NIL, NIL, NIL, NIL, NIL, NIL,
Bug, proc, FALSE, clientData, NIL, ,FALSE]];
Process.SetTimeout[condition: @handle.newPiece, ticks: Process.MsecToTicks[400]];
FOR i:INTEGER IN [0..buffersPerStream) DO
buffer ← NEW[Buffer];
buffer.valid ← FALSE;
buffer.chirpSpace ← VM.Allocate[count: Jukebox.pagesPerChirp];
buffer.block.blockPointer ← LOOPHOLE[VM.AddressForPageNumber[buffer.chirpSpace.page]];
buffer.runData ← buffer.block.blockPointer + (Jukebox.bytesPerChirp/2);
buffer.next ← handle.firstIdleBuffer;
handle.firstIdleBuffer ← buffer;
ENDLOOP;
OpenLocked[handle];
RETURN [handle];
};
OpenLocked: ENTRY PROC [handle: Handle] = {
ENABLE UNWIND => NULL;
handle.next ← VSList;
VSList ← handle;
nOpens ← nOpens + 1;
Invoke the server to prepare the buffers for actual use.
NOTIFY serverCondition;
};
Close: PUBLIC ENTRY PROC [handle: Handle] = {
This procedure closes out a voice stream. It waits for
pending I/O to complete, then deallocates the stream.
ENABLE UNWIND => NULL;
buffer: REF Buffer;
record: Handle;
IO.PutF[ioStream, "Closing tune.\n"];
Make sure that the stream isn't already closed.
IF (handle.errorRope # NIL) AND (handle.errorCode = StreamClosed)
THEN ERROR Error[StreamClosed, handle.errorRope];
Output any partially full buffers to the jukebox.
flushBuffer[handle];
Flush any pending pieces.
flushProc[handle];
Flush any pending client buffers.
WHILE (handle.firstClientBuffer # NIL) DO
handle.firstClientBuffer.valid ← FALSE;
giveServerBuffer[handle:handle];
ENDLOOP;
Wait for the server to get completely caught up. This code
also synchronizes with the server so we're sure the server
isn't touching the voice stream info anymore.
WHILE (handle.firstServerBuffer # NIL) AND (handle.errorRope = NIL) DO
NOTIFY serverCondition;
WAIT closeCondition;
ENDLOOP;
Close any tune that might be open.
IF handle.piece # NIL THEN {
IF handle.piece.tune # NIL THEN Jukebox.CloseTune[handle.jukebox, handle.piece.tune];
};
If there is a socket process, then signal it to die.
IF handle.connection # NIL THEN handle.connection.socket ← NIL;
Remove the VSRecord from our list, then de-allocate the spaces
for the buffers (everything else is taken care of by the garbage
collector.
IF VSList = handle THEN VSList ← handle.next
ELSE {
record ← VSList;
WHILE record.next # handle DO
IF record.next = NIL THEN ERROR Error[Bug, "Unexpected VSList end."];
record ← record.next;
ENDLOOP;
record.next ← handle.next;
};
WHILE handle.firstClientBuffer # NIL DO
buffer ← handle.firstClientBuffer;
handle.firstClientBuffer ← buffer.next;
VM.Free[buffer.chirpSpace];
ENDLOOP;
WHILE handle.firstServerBuffer # NIL DO
buffer ← handle.firstServerBuffer;
handle.firstServerBuffer ← buffer.next;
VM.Free[buffer.chirpSpace];
ENDLOOP;
WHILE handle.firstIdleBuffer # NIL DO
buffer ← handle.firstIdleBuffer;
handle.firstIdleBuffer ← buffer.next;
VM.Free[buffer.chirpSpace];
ENDLOOP;
Mark the VSRecord invalid.
handle.errorCode ← StreamClosed;
handle.errorRope ← "Can't use voice stream after it's closed.";
handle.jukebox ← NIL;
BROADCAST client;
BROADCAST waitCondition;
};
Get: PUBLIC ENTRY PROC [handle: Handle, maxSilentBytes: NAT, block: PrincOps.ByteBltBlock, wait: BOOLFALSE]
RETURNS [silence: NAT ← 0, bytesTransferred: NAT ← 0, keyIndex: NAT ← 0] = {
ENABLE UNWIND => NULL;
buffer: REF Buffer;
silencePass: BOOLTRUE; -- state variable
silenceRunType: BOOL;
runSize: NAT;
DO
1) Aquire a non-empty chirp
If the server isn't doing its job, then we may have to wait here to get a chirp. This shouldn't happen and should probably be reported. Rather than wait, we should play silence and try again later in order to keep the protocol inviolate.
WHILE (handle.firstClientBuffer = NIL) DO
IF handle.errorRope # NIL THEN ERROR Error[handle.errorCode, handle.errorRope];
IF handle.piece = NIL THEN {
IF handle.proc # NIL AND NOT handle.notified THEN {
handle.notified ← TRUE;
handle.proc[handle, handle.clientData];
};
BROADCAST waitCondition;
RETURN;
};
nWaits ← nWaits + 1;
IF wait THEN WAIT client
ELSE RETURN;
ENDLOOP;
buffer ← handle.firstClientBuffer;
handle.notified ← FALSE;
keyIndex ← buffer.keyIndex;
If the stream is currently going the wrong way, then just
return immediately whith whatever has been accumulated. Typically this is a RETURN[0, 0].
IF buffer.toJukebox THEN RETURN;
If the current piece is being flushed, then just give the buffers
back to the server immediately (ignore the data).
IF handle.piece.flush THEN {
handle.firstClientBuffer.valid ← FALSE;
giveServerBuffer[handle: handle];
LOOP;
};
If presently recording, then quietly return. Typically this is a RETURN[0, 0].
IF buffer.toJukebox THEN RETURN;
If we have used up all of this chirp, then give it back and get the next chirp. This should be done at the end of the code, not the beginning.
IF buffer.bytesAccountedFor >= Jukebox.bytesPerChirp THEN {
IO.PutF[ioStream, "Got chirp %d.\n", IO.int[buffer.chirp]];
giveServerBuffer[handle: handle];
LOOP;
};
2) Aquire a non-empty run
runSize ← buffer.runData.runArray[buffer.runIndex];
IF runSize = 0 THEN {
buffer.runIndex ← buffer.runIndex + 1;
LOOP;
};
3) build state variables
silenceRunType ← (buffer.runIndex MOD 2) = 0;
IF silencePass AND NOT silenceRunType THEN silencePass ← FALSE;
SELECT TRUE FROM
silencePass AND NOT silenceRunType => ERROR;
silencePass AND silenceRunType => {
silenceAmount: CARDINALMIN[runSize, maxSilentBytes];
silence ← silence + silenceAmount;
buffer.runData.runArray[buffer.runIndex] ← runSize - silenceAmount;
buffer.bytesAccountedFor ← buffer.bytesAccountedFor + silenceAmount;
maxSilentBytes ← maxSilentBytes - silenceAmount;
IF maxSilentBytes = 0 THEN silencePass ← FALSE;
};
NOT silencePass AND NOT silenceRunType => {
voiceAmount: CARDINALMIN[runSize, block.stopIndexPlusOne - block.startIndex];
origStopIndex: CARDINAL ← block.stopIndexPlusOne;
count: CARDINAL;
block.stopIndexPlusOne ← block.startIndex + voiceAmount;
count ← PrincOpsUtils.ByteBlt[from: buffer.block, to: block];
IF count # voiceAmount THEN ERROR;
block.stopIndexPlusOne ← origStopIndex;
block.startIndex ← block.startIndex + voiceAmount;
buffer.block.startIndex ← buffer.block.startIndex + voiceAmount;
buffer.bytesAccountedFor ← buffer.bytesAccountedFor + voiceAmount;
buffer.runData.runArray[buffer.runIndex] ← runSize - voiceAmount;
bytesTransferred ← bytesTransferred + voiceAmount;
IF block.startIndex >= block.stopIndexPlusOne THEN {
IF buffer.bytesAccountedFor >= Jukebox.bytesPerChirp THEN {
IO.PutF[ioStream, "Got chirp %d.\n", IO.int[buffer.chirp]];
giveServerBuffer[handle: handle];
};
handle.action ← TRUE;
RETURN;
};
};
NOT silencePass AND silenceRunType => {
RETURN;
};
ENDCASE => ERROR;
handle.action ← TRUE;
ENDLOOP;
};
Put: PUBLIC ENTRY PROC [handle: Handle, silentBytes: NAT, block: PrincOps.ByteBltBlock] RETURNS [bytesTransferred: NAT ← 0] = {
This procedure adds the bytes from block to the end of the
voice stream.
ENABLE UNWIND => NULL;
buffer: REF Buffer;
silencePass: BOOL ← silentBytes > 0;
silenceRunType: BOOL;
runSize: NAT;
WHILE silentBytes > 0 OR block.startIndex < block.stopIndexPlusOne
DO
WHILE handle.firstClientBuffer = NIL DO
IF handle.errorRope # NIL THEN ERROR Error[handle.errorCode, handle.errorRope];
IF handle.piece = NIL THEN {
IF handle.proc # NIL AND NOT handle.notified THEN {
handle.notified ← TRUE;
handle.proc[handle, handle.clientData];
};
BROADCAST waitCondition;
RETURN;
};
WAIT client;
ENDLOOP;
If the stream is currently going the wrong way, then just
return immediately. Typically this is a RETURN[0].
buffer ← handle.firstClientBuffer;
handle.notified ← FALSE;
IF NOT buffer.toJukebox THEN RETURN;
If the current piece is being flushed, then get rid of the current buffer
(throw it away if it's invalid, otherwise fill it with zeroes).
IF handle.piece.flush THEN {
IF handle.firstClientBuffer.valid THEN flushBuffer[handle]
ELSE giveServerBuffer[handle: handle];
LOOP;
};
If not presently recording, then quietly return. Typically this is a RETURN[0].
IF NOT buffer.toJukebox THEN RETURN;
If we have used up all of this chirp, then give it back and get the next chirp. This should be done at the end of the code, not the beginning.
IF buffer.bytesAccountedFor >= Jukebox.bytesPerChirp THEN {
IO.PutF[ioStream, "Got chirp %d.\n", IO.int[buffer.chirp]];
giveServerBuffer[handle: handle];
LOOP;
};
build state variable
silenceRunType ← (buffer.runIndex MOD 2) = 0;
runSize ← buffer.runData.runArray[buffer.runIndex];
SELECT TRUE FROM
silencePass AND NOT silenceRunType => {
buffer.runIndex ← buffer.runIndex + 1;
buffer.runData.runArray[buffer.runIndex] ← 0;
};
silencePass AND silenceRunType => {
silenceAmount: CARDINALMIN[silentBytes, Jukebox.bytesPerChirp - buffer.bytesAccountedFor];
silentBytes ← silentBytes - silenceAmount;
buffer.runData.runArray[buffer.runIndex] ← runSize + silenceAmount;
buffer.bytesAccountedFor ← buffer.bytesAccountedFor + silenceAmount;
bytesTransferred ← bytesTransferred + silenceAmount;
IF silentBytes = 0 THEN silencePass ← FALSE;
};
NOT silencePass AND silenceRunType => {
buffer.runIndex ← buffer.runIndex + 1;
buffer.runData.runArray[buffer.runIndex] ← 0;
};
NOT silencePass AND NOT silenceRunType => {
voiceAmount: CARDINALMIN[block.stopIndexPlusOne - block.startIndex, Jukebox.bytesPerChirp - buffer.bytesAccountedFor];
origStopIndex: CARDINAL ← block.stopIndexPlusOne;
count: CARDINAL;
block.stopIndexPlusOne ← block.startIndex + voiceAmount;
count ← PrincOpsUtils.ByteBlt[from: block, to: buffer.block];
IF count # voiceAmount THEN ERROR;
block.stopIndexPlusOne ← origStopIndex;
IF voiceAmount > 0 THEN buffer.valid ← TRUE;
block.startIndex ← block.startIndex + voiceAmount;
buffer.block.startIndex ← buffer.block.startIndex + voiceAmount;
buffer.bytesAccountedFor ← buffer.bytesAccountedFor + voiceAmount;
buffer.runData.runArray[buffer.runIndex] ← runSize + voiceAmount;
bytesTransferred ← bytesTransferred + voiceAmount;
IF block.startIndex >= block.stopIndexPlusOne THEN {
IF buffer.bytesAccountedFor >= Jukebox.bytesPerChirp THEN {
IO.PutF[ioStream, "Put chirp %d.\n", IO.int[buffer.chirp]];
giveServerBuffer[handle: handle];
};
handle.action ← TRUE;
RETURN;
};
};
ENDCASE => ERROR;
handle.action ← TRUE;
ENDLOOP;
};
AddPiece: PUBLIC ENTRY PROC [handle: Handle, tuneId: INT,
firstByte: INT, nBytes: INT, create: BOOLEAN,
playback: BOOLEANTRUE, keyIndex: NAT ← 0, flush: BOOLEANFALSE] = {
This routine allocates another piece descriptor and adds it to the
list for the voice stream.
ENABLE UNWIND => NULL;
piece, p2: REF Piece;
IF handle.errorRope # NIL
THEN ERROR Error[handle.errorCode, handle.errorRope];
IF flush THEN flushProc[handle];
piece ← NEW[Piece ← [tuneId, create, firstByte, nBytes,
NIL, playback, keyIndex, NIL, FALSE]];
IF handle.piece = NIL THEN handle.piece ← piece
ELSE {
p2 ← handle.piece;
WHILE p2.next # NIL DO p2 ← p2.next ENDLOOP;
p2.next ← piece;
};
ioStream.PutF["%s: size %d\n", IO.rope[IF playback THEN "Playback" ELSE "Record"],
IO.int[nBytes]];
NOTIFY serverCondition;
};
FlushPieces: PUBLIC ENTRY PROC [handle: Handle] = {
ENABLE UNWIND => NULL;
flushProc[handle];
};
flushProc: INTERNAL PROC [handle: Handle] = {
This routine flushes any unused voice info in a stream.
This is done by marking all the pieces as "flushed". Then,
various routines see the flush flag and ignore information.
Buffers currently waiting to be written to disk will not
be flushed, but everything else will be. It isn't safe
synchronization-wise for us to just remove all the buffers
and pieces, hence the use of the flag.
p: REF Piece ← handle.piece;
WHILE p # NIL DO
p.flush ← TRUE;
p ← p.next;
ENDLOOP;
BROADCAST client;
BROADCAST waitCondition;
};
IsEmpty: PUBLIC PROC [handle: Handle] RETURNS [BOOLEAN] = {
Figure out whether the socket process has used up all the
available pieces.
IF handle.firstClientBuffer # NIL THEN RETURN[FALSE];
IF handle.piece # NIL THEN RETURN[FALSE];
RETURN[TRUE];
};
WaitEmpty: PUBLIC ENTRY PROC [handle: Handle] = {
Just wait until the stream empties completely.
ENABLE UNWIND => NULL;
WHILE TRUE DO
IF handle.errorRope # NIL THEN RETURN;
IF (handle.firstClientBuffer = NIL) AND (handle.piece = NIL) THEN RETURN;
WAIT waitCondition;
ENDLOOP;
};
Check: PUBLIC ENTRY PROC [handle: Handle]
RETURNS[BOOLEAN] = {
If there are any errors pending for the given voice stream,
they are signalled immediately. If there are no errors and
there is data ready for playback, then TRUE is returned.
Otherwise, FALSE is returned.
ENABLE UNWIND => NULL;
IF handle.errorRope # NIL THEN
ERROR Error[handle.errorCode, handle.errorRope];
IF handle.firstClientBuffer = NIL THEN RETURN[FALSE];
IF handle.firstClientBuffer.toJukebox THEN RETURN[FALSE];
RETURN[TRUE];
};
giveServerBuffer: INTERNAL PROC [handle: Handle] = {
This routine passes the leading buffer to the server, and
activates the server.
buffer, b2: REF Buffer;
buffer ← handle.firstClientBuffer;
handle.firstClientBuffer ← buffer.next;
buffer.next ← NIL;
IF handle.firstServerBuffer = NIL THEN handle.firstServerBuffer ← buffer
ELSE {
b2 ← handle.firstServerBuffer;
WHILE b2.next # NIL DO b2 ← b2.next ENDLOOP;
b2.next ← buffer;
};
NOTIFY serverCondition;
};
flushBuffer: PUBLIC INTERNAL PROC [handle: Handle] = {
If there is a partially-filled leading client buffer, this
routine pads it with zeroes and sends it to the server.
buffer: REF Buffer;
buffer ← handle.firstClientBuffer;
IF buffer = NIL THEN RETURN;
IF ~buffer.toJukebox THEN RETURN;
IF buffer.bytesAccountedFor IN (0..Jukebox.bytesPerChirp) THEN {
IF (buffer.runIndex MOD 2) # 0 THEN { -- voice run
IF buffer.runData.runArray[buffer.runIndex] = 0 THEN buffer.runIndex ← buffer.runIndex - 1
ELSE {
buffer.runIndex ← buffer.runIndex + 1;
buffer.runData.runArray[buffer.runIndex] ← 0;
};
};
buffer.runData.runArray[buffer.runIndex] ← buffer.runData.runArray[buffer.runIndex] + (Jukebox.bytesPerChirp - buffer.bytesAccountedFor);
giveServerBuffer[handle: handle];
};
};
END.
Last Edited by: L. Stewart, March 25, 1983 3:54 pm, VoiceStream change
L. Stewart, April 5, 1983 2:38 pm, Tioga formatting, rundata
L. Stewart, April 11, 1983 1:06 pm, bug fixing in rundata
Last Edited by: L. Stewart, April 19, 1983 3:17 pm, Silence encoding
L. Stewart, June 4, 1983 5:42 pm, call notifyProc only once
L. Stewart, December 27, 1983 1:44 pm, Cedar 5