-- File: PupByteStreams.mesa - last edit:
-- AOF 17-Feb-88 15:26:30
-- SMA 14-Dec-84 17:47:26
-- Copyright (C) 1983, 1986, 1988 by Xerox Corporation. All rights reserved.
DIRECTORY
Environment USING [Byte],
Runtime USING [GlobalFrame],
ByteBlt USING [ByteBlt],
Stream USING [
Byte, Word, Block, CompletionCode, defaultInputOptions, defaultObject,
Handle, InputOptions, LongBlock, Object, ShortBlock, SSTChange,
SubSequenceType, TimeOut],
CommFlags USING [doDebug],
Driver USING [Glitch],
PrincOpsMinus USING [NewSelf],
PupPktDefs USING [
PupPktStream, PupPktStreamAbort, PupPktStreamDestroy, PupPktStreamMake],
PupStream USING [PupOpenMode],
PupDefs USING [
GetBuffer, ReturnBuffer, GetPupContentsBytes, SetPupContentsBytes,
Pair, PupAddress, PupBuffer, PupSocketID, Tocks],
PupPktOps USING [pupBuffers],
PupTypes USING [fillInSocketID];
PupByteStreams: MONITOR
IMPORTS
PrincOpsMinus, Runtime, Stream, ByteBlt, Driver, PupPktDefs, PupDefs,
PupPktOps
EXPORTS PupStream =
BEGIN OPEN PupDefs;
-- Manager data
free: LONG POINTER TO FRAME[PupByteStreams] ←
LOOPHOLE[Runtime.GlobalFrame[LOOPHOLE[PupByteStreamCreate]]];
next: LONG POINTER TO FRAME[PupByteStreams] ← NIL;
GetInstance: ENTRY PROC RETURNS [him: LONG POINTER TO FRAME[PupByteStreams]] =
BEGIN
IF free = NIL THEN
BEGIN
him ← --NEW PupByteStreams-- PrincOpsMinus.NewSelf[];
START him; -- Do initialization code
RETURN;
END;
him ← free;
free ← free.next;
END;
FreeInstance: ENTRY PROC[him: LONG POINTER TO FRAME[PupByteStreams]] =
{him.next ← free; free ← him};
myPupByteStream: Stream.Object ← [
options:, getByte: GetByte, putByte: PutByte, getWord: GetWord,
putWord: PutWord, get: GetBlock, put: PutBlock, setSST: SendMark,
sendAttention: SendAttention, waitAttention: WaitAttention, delete:,
setTimeout: SetTimeout,
getTimeout: GetTimeout,
getPosition: Stream.defaultObject.getPosition,
setPosition: Stream.defaultObject.setPosition,
sendNow: SendNow, clientData: NIL];
myPs: PupPktDefs.PupPktStream;
-- Beware, things don't get reinitialized when a module is reused
inputBuffer: PupBuffer;
inputFinger: CARDINAL;
inputBufferSize: CARDINAL;
inputSST: Stream.SubSequenceType;
outputBuffer: PupBuffer;
outputFinger: CARDINAL; -- 0 if aData/aMark sent
outputBufferSize: CARDINAL;
HandleLooksLikeGarbage: PUBLIC ERROR = CODE;
LeftAndRight: TYPE = MACHINE DEPENDENT RECORD [left, right: Environment.Byte];
PupByteStreamCreate: PUBLIC PROC[remote: PupAddress, ticks: Tocks]
RETURNS [Stream.Handle] =
BEGIN
RETURN[
PupByteStreamMake[PupTypes.fillInSocketID, remote, ticks, sendRfc, [0, 0]]];
END;
PupByteStreamMake: PUBLIC PROC[
local: PupSocketID, remote: PupAddress, ticks: Tocks,
mode: PupStream.PupOpenMode, id: Pair] RETURNS [Stream.Handle] =
BEGIN
newPs: PupPktDefs.PupPktStream;
him: LONG POINTER TO FRAME[PupByteStreams];
-- It is ok to UNWIND here if PupPktStreamMake doesn't work.
newPs ← PupPktDefs.PupPktStreamMake[local, remote, ticks, mode, id];
him ← GetInstance[];
-- Initialization
him.myPs ← newPs;
him.myPupByteStream.options ← Stream.defaultInputOptions;
him.myPupByteStream.delete ← Destroy;
him.inputBuffer ← NIL;
him.inputSST ← 0;
him.outputBuffer ← NIL;
him.outputFinger ← 0;
him.outputBufferSize ← 0;
RETURN[@him.myPupByteStream];
END;
PupByteStreamAbort: PUBLIC --ENTRY-- PROC[bs: Stream.Handle, e: LONG STRING] =
BEGIN
ENABLE UNWIND => NULL;
him: LONG POINTER TO FRAME[PupByteStreams] =
LOOPHOLE[Runtime.GlobalFrame[LOOPHOLE[bs.put]]];
IF CommFlags.doDebug AND bs.delete # Destroy THEN
Driver.Glitch[HandleLooksLikeGarbage];
PupPktDefs.PupPktStreamAbort[him.myPs, e];
END;
Destroy: PUBLIC --ENTRY-- PROC[bs: Stream.Handle] =
BEGIN
ENABLE UNWIND => NULL;
him: LONG POINTER TO FRAME[PupByteStreams] =
LOOPHOLE[Runtime.GlobalFrame[LOOPHOLE[bs.put]]];
IF CommFlags.doDebug AND bs.delete # Destroy THEN
Driver.Glitch[HandleLooksLikeGarbage];
bs.delete ← NIL;
PupPktDefs.PupPktStreamDestroy[him.myPs];
IF him.inputBuffer # NIL THEN ReturnBuffer[@him.inputBuffer];
IF him.outputBuffer # NIL THEN ReturnBuffer[@him.outputBuffer];
FreeInstance[him];
END;
SendAttention: PROC[sH: Stream.Handle, byte: Stream.Byte] =
BEGIN myPs.sendAttention[]; END;
WaitAttention: PROC[sH: Stream.Handle] RETURNS [Stream.Byte] =
BEGIN myPs.waitForAttention[]; RETURN[0]; END;
GetByte: --ENTRY-- PROC[sH: Stream.Handle] RETURNS [byte: Stream.Byte] =
BEGIN
ENABLE UNWIND => NULL;
IF inputBuffer # NIL AND inputFinger + 2 < inputBufferSize THEN
BEGIN -- "+2" lets GetBlock give back the buffer if we take the last byte
byte ← inputBuffer.pup.pupBytes[inputFinger];
inputFinger ← inputFinger + 1;
RETURN;
END
ELSE
BEGIN
array: PACKED ARRAY [0..1] OF Stream.Byte;
[] ← sH.get[sH, [@array, 0, 1], [FALSE, FALSE, FALSE, TRUE, TRUE, TRUE]];
RETURN[array[0]];
END;
END;
GetTimeout: PROC [sH: Stream.Handle] RETURNS [waitTime: LONG CARDINAL] =
{RETURN [myPs.getTimeout[]]};
GetWord: --ENTRY-- PROC[sH: Stream.Handle] RETURNS [word: Stream.Word] =
BEGIN OPEN w: LOOPHOLE[word, LeftAndRight];
w.left ← GetByte[sH];
w.right ← GetByte[sH];
END;
GetBlock: --ENTRY-- PROC[
sH: Stream.Handle, block: Stream.Block, options: Stream.InputOptions]
RETURNS [
bytesTransferred: CARDINAL, why: Stream.CompletionCode,
sst: Stream.SubSequenceType] =
BEGIN
ENABLE UNWIND => NULL;
input: Stream.Block;
moved: CARDINAL;
bytesTransferred ← 0;
sst ← inputSST;
why ← normal;
WHILE block.startIndex < block.stopIndexPlusOne DO
UNTIL inputBuffer # NIL DO
inputFinger ← 0;
inputBuffer ← myPs.get[];
IF inputBuffer = NIL THEN SIGNAL Stream.TimeOut[block.startIndex]
ELSE
IF inputBuffer.pup.pupType = mark OR inputBuffer.pup.pupType = aMark THEN
BEGIN
sst ← inputSST ← inputBuffer.pup.pupBytes[0];
ReturnBuffer[@inputBuffer];
IF options.signalSSTChange THEN
SIGNAL Stream.SSTChange[inputSST, block.startIndex]
ELSE BEGIN why ← sstChange; RETURN; END;
END;
ENDLOOP;
inputBufferSize ← GetPupContentsBytes[inputBuffer];
input ← [
blockPointer: --Krock in Environment-- LOOPHOLE[
@inputBuffer.pup.pupBytes], startIndex: inputFinger,
stopIndexPlusOne: inputBufferSize];
moved ← ByteBlt.ByteBlt[block, input];
bytesTransferred ← bytesTransferred + moved;
block.startIndex ← block.startIndex + moved;
inputFinger ← inputFinger + moved;
IF inputFinger = inputBufferSize THEN BEGIN ReturnBuffer[@inputBuffer]; END;
IF inputBuffer = NIL AND block.startIndex < block.stopIndexPlusOne
AND options.signalLongBlock THEN
BEGIN SIGNAL Stream.LongBlock[block.startIndex]; END;
IF inputBuffer = NIL AND options.terminateOnEndRecord THEN
BEGIN why ← endRecord; EXIT; END;
ENDLOOP;
IF inputBuffer # NIL AND options.signalShortBlock THEN
BEGIN ERROR Stream.ShortBlock; END;
END;
PutByte: --ENTRY-- PROC[sH: Stream.Handle, byte: Stream.Byte] =
BEGIN
ENABLE UNWIND => NULL;
IF outputBuffer # NIL AND outputFinger + 2 < outputBufferSize THEN
BEGIN -- "+2" lets PutBlock flush the buffer if we fill the last byte
outputBuffer.pup.pupBytes[outputFinger] ← byte;
outputFinger ← outputFinger + 1;
RETURN;
END
ELSE
BEGIN
array: PACKED ARRAY [0..1] OF Stream.Byte ← [byte, ];
PutBlock[sH, [@array, 0, 1], FALSE];
END;
END;
PutWord: --ENTRY-- PROC[sH: Stream.Handle, word: Stream.Word] =
BEGIN OPEN w: LOOPHOLE[word, LeftAndRight];
sH.putByte[sH, w.left];
sH.putByte[sH, w.right];
END;
PutBlock: --ENTRY-- PROC[
sH: Stream.Handle, block: Stream.Block, endRecord: BOOLEAN] =
BEGIN
ENABLE UNWIND => NULL;
output: Stream.Block;
moved: CARDINAL;
IF outputBufferSize = 0 THEN outputBufferSize ← myPs.getSenderSizeLimit[];
WHILE block.startIndex < block.stopIndexPlusOne DO
IF outputFinger = outputBufferSize THEN FlushOutputBuffer[];
IF outputBuffer = NIL THEN
BEGIN
outputBuffer ← PupDefs.GetBuffer[PupPktOps.pupBuffers, send];
outputBuffer.pup.pupType ← data;
outputFinger ← 0;
END;
output ← [
blockPointer: --Krock-- LOOPHOLE[
@outputBuffer.pup.pupBytes], startIndex: outputFinger,
stopIndexPlusOne: outputBufferSize];
moved ← ByteBlt.ByteBlt[output, block];
block.startIndex ← block.startIndex + moved;
outputFinger ← outputFinger + moved;
ENDLOOP;
IF (endRecord AND outputFinger # 0) OR outputFinger = outputBufferSize
THEN
BEGIN
IF outputBuffer = NIL THEN
BEGIN
outputBuffer ← PupDefs.GetBuffer[PupPktOps.pupBuffers, send];
outputBuffer.pup.pupType ← data;
outputFinger ← 0;
END;
IF endRecord THEN outputBuffer.pup.pupType ← aData;
FlushOutputBuffer[];
IF endRecord THEN outputFinger ← 0;
END;
END;
SendNow: PROC[sH: Stream.Handle, endRecord: BOOLEAN ← TRUE] =
BEGIN
IF outputBuffer = NIL THEN
BEGIN
outputBuffer ← PupDefs.GetBuffer[PupPktOps.pupBuffers, send];
outputBuffer.pup.pupType ← data;
outputFinger ← 0;
END;
IF endRecord THEN outputBuffer.pup.pupType ← aData
ELSE outputBuffer.pup.pupType ← data;
FlushOutputBuffer[];
END;
SendMark: --ENTRY-- PROC[sH: Stream.Handle, sst: Stream.SubSequenceType] =
BEGIN
ENABLE UNWIND => NULL;
FlushOutputBuffer[];
outputFinger ← 0;
myPs.putMark[sst];
END;
SetTimeout: PROC [sH: Stream.Handle, waitTime: LONG CARDINAL] =
BEGIN
myPs.setTimeout[waitTime];
END; --SetTimeout
FlushOutputBuffer: PROC =
BEGIN
b: PupBuffer; -- don't leave outputBuffer dangling in case of StreamClosing
IF outputBuffer = NIL THEN RETURN;
b ← outputBuffer;
outputBuffer ← NIL;
SetPupContentsBytes[b, outputFinger];
myPs.put[b];
END;
ReturnBuffer: PROC[p: LONG POINTER TO PupBuffer] =
{b: PupBuffer; b ← p↑; p↑ ← NIL; PupDefs.ReturnBuffer[b]};
END.
LOG
Time: 14-Dec-81 10:55:04 By: AOF Action: Adjust to new stream interface
Time: 6-Apr-83 13:53:47 By: SMA Action: Update for Klamath integration.
Time: 19-May-83 10:11:02 By: SMA Action: Convert to new BufferMgr.
Time: 20-Jun-83 13:15:19 By: SMA Action: Set pupType for data buffer.
Time: 12-Jul-83 11:47:03 By: AOF Action: 32-bit procs.
Time: 13-Dec-84 13:40:10 By: SMA SendNow will flush if not endRecord.
Time: 14-Dec-84 17:47:49 By: SMA Added GetTimeout and SetTimeout.
Time: 2-Sep-86 14:47:38 By: AOF New Buffer mgmt.
Time: 17-Feb-88 15:23:25 By: AOF Fix for compiler bug NEWing self.