File: VoiceStreamServerImpl.mesa
Copyright © 1986 by Xerox Corporation. All rights reserved.
This file contains part of the implementation of voicestreams. The
routines in this file implement the voicestream server, a background
process that transfers information between disk and buffers in
voicestreams.
Ades, February 10, 1986 4:01:59 pm PST
Swinehart, June 17, 1986 11:56:14 pm PDT
Last Edited by: Ousterhout, March 8, 1983 11:39 am compile
Last Edited by: Stewart, January 3, 1984 11:47 am
DIRECTORY
FS USING [Read, Write],
IO USING [int, PutFR, STREAM],
Jukebox USING [bytesPerChirp, CloseTune, EOF, Error, FindChirp, MissingChirp, OpenTune, TuneSize, RunComponent, singlePktLength, RunElementType, RunArrayRange, Tune],
Process USING [Detach, Priority, priorityNormal, SetPriority],
Rope USING [ROPE],
RuntimeError USING [ UNCAUGHT ],
VM USING [AddressForPageNumber],
VoiceStream;
VoiceStreamServerImpl: MONITOR LOCKS VoiceStream.Lock
IMPORTS FS, IO, Jukebox, Process, RuntimeError, VM, VoiceStream
BEGIN
Foo: SIGNAL = CODE;
debugRuns: BOOL ← FALSE;
serverPriority: Process.Priority ← Process.priorityNormal;
GetServerBuffer:
ENTRY
PROC
RETURNS [handle: VoiceStream.Handle] = {
This is a monitor procedure used by the server to wait for work to do. For the server to have work to do, there must not be any errors associated with the stream. Furthermore, either there must be a buffer waiting for the server, or there must be an idle buffer and there must be a valid piece waiting for the server. If there is a server buffer waiting, this routine transfers the frontmost server buffer to the front of the idle list, from which position the server will process it. Note: it doesn't make any difference how the buffers on the idle list are ordered.
ENABLE UNWIND => NULL;
buffer: REF VoiceStream.Buffer;
WHILE
TRUE
DO
handle ← VoiceStream.vSList;
WHILE handle #
NIL
DO
IF (handle.errorRope =
NIL)
THEN {
IF (handle.firstServerBuffer #
NIL)
THEN {
buffer ← handle.firstServerBuffer;
handle.firstServerBuffer ← buffer.next;
buffer.next ← handle.firstIdleBuffer;
handle.firstIdleBuffer ← buffer;
RETURN [handle];
}
ELSE
IF (handle.piece #
NIL)
THEN {
IF handle.firstClientBuffer = NIL THEN RETURN [handle];
IF (
NOT handle.piece.flush
AND (handle.piece.intervalSpec.length > 0)
AND (handle.firstIdleBuffer # NIL))
THEN RETURN [handle];
};
};
handle ← handle.next;
ENDLOOP;
Notify anybody trying to close a stream that we've done all
the work we can, and are waiting.
BROADCAST VoiceStream.closeCondition;
WAIT VoiceStream.serverCondition;
ENDLOOP;
};
GiveClientBuffer:
ENTRY
PROC [handle: VoiceStream.Handle] = {
ENABLE UNWIND => NULL;
buffer, b2: REF VoiceStream.Buffer;
buffer ← handle.firstIdleBuffer;
handle.firstIdleBuffer ← buffer.next;
buffer.next ← NIL;
IF handle.firstClientBuffer = NIL
THEN {
handle.firstClientBuffer ← buffer;
NOTIFY handle.newPiece;
}
ELSE {
b2 ← handle.firstClientBuffer;
WHILE b2.next # NIL DO b2 ← b2.next ENDLOOP;
b2.next ← buffer;
};
BROADCAST VoiceStream.client;
};
SetError:
PUBLIC
ENTRY
PROC [handle: VoiceStream.Handle, rope: Rope.
ROPE, code: VoiceStream.ErrorCode] = {
This procedure just records error information in a voice
stream and wakes up waiting clients. This is made a separate
procedure because exclusive access is needed on the voice
stream, which the server doesn't have.
ENABLE UNWIND => NULL;
handle.errorRope ← rope;
handle.errorCode ← code;
BROADCAST VoiceStream.client;
BROADCAST VoiceStream.waitCondition;
};
GetPiece: ENTRY PROC [handle: VoiceStream.Handle] RETURNS [VoiceStream.Piece] = {
This procedure is called only by the server.
It is made a separate procedure in order to guarantee exclusive access to the voice stream information. Because of the high overheads in opening and closing tunes, we check the next piece and if it's the same tune, then don't bother closing the current one. NIL is returned if either there are no pieces left, or if all the space of the frontmost piece is allocated to buffers but the buffers haven't been processed completely. This means we can't buffer ahead across pieces, but the alternative results in messy synchronization problems.
Client notification of the start-of-transfer ($started) and end-of-transfer ($finished) for each piece is done here.
ENABLE UNWIND => NULL;
piece: VoiceStream.Piece;
tune: Jukebox.Tune←NIL;
WHILE
TRUE
DO
piece ← handle.piece;
IF piece = NIL THEN { handle.pieceInProgress ← FALSE; RETURN [NIL]; };
IF piece.flush THEN piece.intervalSpec.length ← 0;
IF handle.pieceInProgress
THEN {
nxP: VoiceStream.Piece ← piece.next;
IF piece.intervalSpec.length > 0 THEN RETURN [piece];
Piece in progress is now finished.
tune ← piece.tune;
IF (handle.firstClientBuffer #
NIL)
OR (handle.firstServerBuffer # NIL) THEN RETURN [NIL]; -- No buffer-ahead
IF handle.proc#
NIL
THEN
-- Report completion of a piece
handle.proc[handle, $finished, piece.clientData];
IF tune #
NIL
AND (nxP =
NIL
OR nxP.intervalSpec.tuneID # piece.intervalSpec.tuneID)
THEN {
Jukebox.CloseTune[handle.jukebox, tune];
tune ← NIL;
};
handle.piece ← piece ← piece.next;
};
handle.pieceInProgress ← TRUE;
IF piece = NIL THEN { BROADCAST VoiceStream.client; LOOP; };
IF tune=
NIL
THEN tune ←
Jukebox.OpenTune[handle.jukebox, piece.intervalSpec.tuneID, ~piece.playback];
piece.tune ← tune;
IF handle.proc#
NIL
THEN
-- Report start of next piece (see AddPiece for report of first)
handle.proc[handle, $started, handle.piece.clientData];
ENDLOOP;
RETURN [NIL];
};
Server: PROC [] = {
This procedure is responsible for buffering ahead and behind the client processes. Be VERY careful with this code. The procedure runs unmonitored, so that other processes can be getting and putting voice data while we do disk operations. However, you must be extremely careful to make sure that the processes don't get in each others' way.
handle: VoiceStream.Handle;
buffer: REF VoiceStream.Buffer;
ourSize: INT;
skipLeading: INT;
piece: VoiceStream.Piece;
noChirp: BOOLEAN;
Process.SetPriority[serverPriority];
WHILE
TRUE
DO
ENABLE {
Jukebox.Error => {
SetError[handle: handle, code: BadPiece,
rope:
IO.PutFR["Piece for tune id %d starting at byte %d can't be accessed.",
IO.int[piece.intervalSpec.tuneID], IO.int[piece.intervalSpec.start]]];
CONTINUE;
};
On EOF, just discard the rest of the current piece.
Jukebox.
EOF => {
piece.intervalSpec.length ← 0;
CONTINUE;
};
RuntimeError.
UNCAUGHT => {
SetError[handle: handle, code: Bug,
rope: "Unknown error in voice stream demon."];
IO.PutRope[ioStream, "Unknown error in voice stream demon.\n"];
REJECT;
};
};
handle ← GetServerBuffer[];
buffer ← handle.firstIdleBuffer;
If this buffer is to be written, and the buffer is valid, then write
it out.
IF buffer.toJukebox
AND buffer.valid
THEN {
buffer.valid ← FALSE;
buffer.window ← Jukebox.FindChirp[self: handle.jukebox, tune: handle.piece.tune,
chirp: buffer.chirp, signalMissingChirp: FALSE, signalEOF: FALSE];
make sure that no partial chirps are recorded!
IF buffer.bytesAccountedFor < Jukebox.bytesPerChirp
THEN {
fill the rest of the chirp with silence
IF buffer.runData.runArray[buffer.runIndex].elementType ~= silence
THEN {
last thing placed in chirp was non-silent
buffer.runIndex ← buffer.runIndex +
(IF buffer.runData.runArray[buffer.runIndex].elementType = soundEnergy THEN 2 ELSE 1);
buffer.runData.runArray[buffer.runIndex] ← [silence[0]];
};
buffer.runData.runArray[buffer.runIndex]←[
silence[
NARROW[buffer.runData.runArray[buffer.runIndex],
Jukebox.RunComponent[silence]].length
+(Jukebox.bytesPerChirp - buffer.bytesAccountedFor)]
];
buffer.bytesAccountedFor ← Jukebox.bytesPerChirp -- just for purity
};
CheckRunData[buffer];
FS.Write[file: buffer.window.file, to: buffer.window.base, nPages: buffer.chirpSpace.count, from: VM.AddressForPageNumber[buffer.chirpSpace.page]];
IO.PutF[ioStream, "Wrote chirp %d to disk.\n", IO.int[buffer.chirp]];
};
Collect information for the next buffer's worth of voice. First of all,
find a piece that isn't empty.
piece ← GetPiece[handle];
IF piece = NIL THEN LOOP;
ioStream.PutF["Number of bytes left: %d\n", IO.int[piece.intervalSpec.length]];
Set up the encryption key index (copy over from piece).
buffer.keyIndex ← piece.intervalSpec.keyIndex;
Make sure that the tune is open.
IF piece.tune =
NIL
OR NOT
(piece.playback
OR piece.tune.writable
) THEN
ERROR;
Figure out which chirp this is.
IF piece.intervalSpec.start < 0 THEN piece.intervalSpec.start ← Jukebox.TuneSize[piece.tune] * Jukebox.bytesPerChirp;
buffer.chirp ← piece.intervalSpec.start / Jukebox.bytesPerChirp;
Figure out which portion of the chirp we'll actually use.
skipLeading ← piece.intervalSpec.start MOD Jukebox.bytesPerChirp;
buffer.bytesAccountedFor ← skipLeading;
ourSize ← Jukebox.bytesPerChirp - skipLeading;
Trim the tail if needed
IF piece.intervalSpec.length < ourSize
THEN {
disallow a recording that ends in the middle of a chirp
IF piece.playback THEN ourSize ← piece.intervalSpec.length
ELSE piece.intervalSpec.length ← ourSize;
IF piece.playback THEN buffer.bytesAccountedFor ← Jukebox.bytesPerChirp - ourSize;
};
Account for the part of the piece in this chirp.
piece.intervalSpec.length ← piece.intervalSpec.length - ourSize;
piece.intervalSpec.start ← piece.intervalSpec.start + ourSize;
Read in the chirp, if necessary. Note: we don't look up the disk location until its time to actually transfer information. This way, the jukebox routines won't allocate the space until it's actually needed. Also, for playback we substitute a homemade all-silent chirp if there isn't a chirp on the disk.
buffer.runIndex ← 0;
buffer.playedBytes ← 0;
IF (ourSize # Jukebox.bytesPerChirp)
OR piece.playback
THEN {
noChirp ← FALSE;
buffer.window ← Jukebox.FindChirp[self: handle.jukebox, tune: piece.tune,
chirp: buffer.chirp, signalEOF: piece.playback,
signalMissingChirp: TRUE
! Jukebox.MissingChirp => {noChirp ← TRUE; CONTINUE}];
IF noChirp
THEN {
IF piece.playback
THEN {
Construct a chirp with a single run of silence
buffer.block.startIndex ← 0;
buffer.block.stopIndexPlusOne ← 0;
buffer.runData.runArray[0] ← [silence[ourSize]];
}
ELSE {
-- build a fake chirp with the right structure
buffer.block.startIndex ← 0;
buffer.block.stopIndexPlusOne ← ourSize;
buffer.runData.runArray[0] ← [silence[skipLeading]];
};
}
ELSE {
Read in the whole chirp, including its runData
FS.Read[file: buffer.window.file, from: buffer.window.base, nPages: buffer.chirpSpace.count, to: VM.AddressForPageNumber[buffer.chirpSpace.page]];
IF skipLeading > 0
THEN {
runSize: CARDINAL;
The value of skipLeading is the amount to chase down the runData
buffer.block.startIndex ← 0;
DO
WITH curr: buffer.runData.runArray[buffer.runIndex] SELECT FROM
silence => runSize ← curr.length;
singlePkt => runSize ← Jukebox.singlePktLength;
soundEnergy => runSize ← runSize ← NARROW[buffer.runData.runArray[buffer.runIndex+1], Jukebox.RunComponent[soundLength]].length;
ENDCASE => ERROR;
IF skipLeading >= runSize
THEN {
skip the whole run
skipLeading ← skipLeading - runSize;
IF buffer.runData.runArray[buffer.runIndex].elementType # silence THEN buffer.block.startIndex ← buffer.block.startIndex + runSize;
buffer.runIndex ← buffer.runIndex + (IF buffer.runData.runArray[buffer.runIndex].elementType=soundEnergy THEN 2 ELSE 1);
}
ELSE {
-- last run:
WITH curr: buffer.runData.runArray[buffer.runIndex] SELECT FROM
silence => curr.length ← runSize - skipLeading;
soundEnergy =>
buffer.runData.runArray[buffer.runIndex+1] ← [soundLength[IF piece.playback
THEN NARROW[buffer.runData.runArray[buffer.runIndex+1], Jukebox.RunComponent[soundLength]].length - skipLeading
i.e. on playback this is what is left to be played back
ELSE skipLeading]]; -- but on record this is what is already there
singlePkt => IF piece.playback THEN buffer.playedBytes ← skipLeading
ELSE
since we are recording it is okay to rewrite this and the next runArray entries
{buffer.runData.runArray[buffer.runIndex] ← [soundEnergy[NARROW[buffer.runData.runArray[buffer.runIndex], Jukebox.RunComponent[soundEnergy]].energy]];
buffer.runData.runArray[buffer.runIndex+1] ← [soundLength[skipLeading]];
};
ENDCASE;
EXIT;
};
ENDLOOP;
buffer.block.stopIndexPlusOne ← buffer.block.startIndex + ourSize;
}
ELSE {
buffer.block.startIndex ← 0;
Not all of this is needed, since some might be silence, but we don't know how much and don't want to spend the time analyzing the runData.
buffer.block.stopIndexPlusOne ← ourSize;
};
};
IO.PutF[ioStream, "Read chirp %d from disk.\n", IO.int[buffer.chirp]];
}
ELSE {
-- whole chirp is involved and it is record
buffer.block.startIndex ← 0;
buffer.block.stopIndexPlusOne ← ourSize;
buffer.runData.runArray[0] ← [silence[0]]
};
buffer.toJukebox ← NOT piece.playback;
IF buffer.toJukebox THEN buffer.runData.ambientLevel ← piece.intervalSpec.ambientLevel;
GiveClientBuffer[handle];
ENDLOOP;
};
CheckRunData:
PROC [buffer:
REF VoiceStream.Buffer] = {
total: CARDINAL ← 0;
run: CARDINAL;
currElement: Jukebox.RunElementType ← silence; -- must simply start as ~soundEnergy
FOR ri: Jukebox.RunArrayRange
IN [0..95)
DO
IF currElement = soundEnergy
-- acutally it is prevElement is this context
THEN
IF buffer.runData.runArray[ri].elementType = soundLength
THEN
{currElement ←soundLength;
LOOP}
ELSE
ERROR;
currElement ← buffer.runData.runArray[ri].elementType;
WITH curr: buffer.runData.runArray[ri] SELECT FROM
silence => run ← curr.length;
singlePkt => run ← Jukebox.singlePktLength;
soundEnergy => run ← NARROW[buffer.runData.runArray[ri+1],
Jukebox.RunComponent[soundLength]].length;
ENDCASE => ERROR;
total ← total + run;
IF total > Jukebox.bytesPerChirp
THEN {
WITH curr: buffer.runData.runArray[ri]
SELECT
FROM
silence => curr.length ← run - (total - Jukebox.bytesPerChirp);
soundEnergy => buffer.runData.runArray[ri+1] ← [soundLength[run - (total - Jukebox.bytesPerChirp)]];
singlePkt => {
-- since this is last valid entry in chirp, okay to trample 'nextElement'
buffer.runData.runArray[buffer.runIndex]← [soundEnergy[NARROW[buffer.runData.runArray[buffer.runIndex], Jukebox.RunComponent[singlePkt]].energy]];
buffer.runData.runArray[ri+1] ← [soundLength[Jukebox.singlePktLength - (total - Jukebox.bytesPerChirp)]]
};
ENDCASE;
IF debugRuns THEN SIGNAL Foo;
EXIT;
};
IF total = Jukebox.bytesPerChirp THEN EXIT;
ENDLOOP;
};
StartServer: PUBLIC PROC [stream: IO.STREAM] = { VoiceStream.ioStream ← stream; };
Init: PROC = {
This procedure simply forks the stream buffer server and
initializes shared state.
p: PROCESS;
p ← FORK Server[];
Process.Detach[p];
};
Init[];
END.
1983, Stewart, run-encoded chirps
May 10, 1986 4:14:09 pm PDT, Stewart, Cedar 5, December
Swinehart, May 10, 1986 4:13:59 pm PDT
Convert to new communications package
changes to: VoiceStreamServerImpl, EXPORTS, BEGIN, GetServerBuffer, GiveClientBuffer, SetError, GetPiece, Server, CheckRunData
Swinehart, June 9, 1986 9:52:06 am PDT
create option no longer supported. Tunes must be open by arrival at server, GetServerBuffer, GetPiece