-- File: PupByteStreams.mesa - last edit: -- AOF 17-Feb-88 15:26:30 -- SMA 14-Dec-84 17:47:26 -- Copyright (C) 1983, 1986, 1988 by Xerox Corporation. All rights reserved. DIRECTORY Environment USING [Byte], Runtime USING [GlobalFrame], ByteBlt USING [ByteBlt], Stream USING [ Byte, Word, Block, CompletionCode, defaultInputOptions, defaultObject, Handle, InputOptions, LongBlock, Object, ShortBlock, SSTChange, SubSequenceType, TimeOut], CommFlags USING [doDebug], Driver USING [Glitch], PrincOpsMinus USING [NewSelf], PupPktDefs USING [ PupPktStream, PupPktStreamAbort, PupPktStreamDestroy, PupPktStreamMake], PupStream USING [PupOpenMode], PupDefs USING [ GetBuffer, ReturnBuffer, GetPupContentsBytes, SetPupContentsBytes, Pair, PupAddress, PupBuffer, PupSocketID, Tocks], PupPktOps USING [pupBuffers], PupTypes USING [fillInSocketID]; PupByteStreams: MONITOR IMPORTS PrincOpsMinus, Runtime, Stream, ByteBlt, Driver, PupPktDefs, PupDefs, PupPktOps EXPORTS PupStream = BEGIN OPEN PupDefs; -- Manager data free: LONG POINTER TO FRAME[PupByteStreams] ¬ LOOPHOLE[Runtime.GlobalFrame[LOOPHOLE[PupByteStreamCreate]]]; next: LONG POINTER TO FRAME[PupByteStreams] ¬ NIL; GetInstance: ENTRY PROC RETURNS [him: LONG POINTER TO FRAME[PupByteStreams]] = BEGIN IF free = NIL THEN BEGIN him ¬ --NEW PupByteStreams-- PrincOpsMinus.NewSelf[]; START him; -- Do initialization code RETURN; END; him ¬ free; free ¬ free.next; END; FreeInstance: ENTRY PROC[him: LONG POINTER TO FRAME[PupByteStreams]] = {him.next ¬ free; free ¬ him}; myPupByteStream: Stream.Object ¬ [ options:, getByte: GetByte, putByte: PutByte, getWord: GetWord, putWord: PutWord, get: GetBlock, put: PutBlock, setSST: SendMark, sendAttention: SendAttention, waitAttention: WaitAttention, delete:, setTimeout: SetTimeout, getTimeout: GetTimeout, getPosition: Stream.defaultObject.getPosition, setPosition: Stream.defaultObject.setPosition, sendNow: SendNow, clientData: NIL]; myPs: PupPktDefs.PupPktStream; -- Beware, things don't get reinitialized when a module is reused inputBuffer: PupBuffer; inputFinger: CARDINAL; inputBufferSize: CARDINAL; inputSST: Stream.SubSequenceType; outputBuffer: PupBuffer; outputFinger: CARDINAL; -- 0 if aData/aMark sent outputBufferSize: CARDINAL; HandleLooksLikeGarbage: PUBLIC ERROR = CODE; LeftAndRight: TYPE = MACHINE DEPENDENT RECORD [left, right: Environment.Byte]; PupByteStreamCreate: PUBLIC PROC[remote: PupAddress, ticks: Tocks] RETURNS [Stream.Handle] = BEGIN RETURN[ PupByteStreamMake[PupTypes.fillInSocketID, remote, ticks, sendRfc, [0, 0]]]; END; PupByteStreamMake: PUBLIC PROC[ local: PupSocketID, remote: PupAddress, ticks: Tocks, mode: PupStream.PupOpenMode, id: Pair] RETURNS [Stream.Handle] = BEGIN newPs: PupPktDefs.PupPktStream; him: LONG POINTER TO FRAME[PupByteStreams]; -- It is ok to UNWIND here if PupPktStreamMake doesn't work. newPs ¬ PupPktDefs.PupPktStreamMake[local, remote, ticks, mode, id]; him ¬ GetInstance[]; -- Initialization him.myPs ¬ newPs; him.myPupByteStream.options ¬ Stream.defaultInputOptions; him.myPupByteStream.delete ¬ Destroy; him.inputBuffer ¬ NIL; him.inputSST ¬ 0; him.outputBuffer ¬ NIL; him.outputFinger ¬ 0; him.outputBufferSize ¬ 0; RETURN[@him.myPupByteStream]; END; PupByteStreamAbort: PUBLIC --ENTRY-- PROC[bs: Stream.Handle, e: LONG STRING] = BEGIN ENABLE UNWIND => NULL; him: LONG POINTER TO FRAME[PupByteStreams] = LOOPHOLE[Runtime.GlobalFrame[LOOPHOLE[bs.put]]]; IF CommFlags.doDebug AND bs.delete # Destroy THEN Driver.Glitch[HandleLooksLikeGarbage]; PupPktDefs.PupPktStreamAbort[him.myPs, e]; END; Destroy: PUBLIC --ENTRY-- PROC[bs: Stream.Handle] = BEGIN ENABLE UNWIND => NULL; him: LONG POINTER TO FRAME[PupByteStreams] = LOOPHOLE[Runtime.GlobalFrame[LOOPHOLE[bs.put]]]; IF CommFlags.doDebug AND bs.delete # Destroy THEN Driver.Glitch[HandleLooksLikeGarbage]; bs.delete ¬ NIL; PupPktDefs.PupPktStreamDestroy[him.myPs]; IF him.inputBuffer # NIL THEN ReturnBuffer[@him.inputBuffer]; IF him.outputBuffer # NIL THEN ReturnBuffer[@him.outputBuffer]; FreeInstance[him]; END; SendAttention: PROC[sH: Stream.Handle, byte: Stream.Byte] = BEGIN myPs.sendAttention[]; END; WaitAttention: PROC[sH: Stream.Handle] RETURNS [Stream.Byte] = BEGIN myPs.waitForAttention[]; RETURN[0]; END; GetByte: --ENTRY-- PROC[sH: Stream.Handle] RETURNS [byte: Stream.Byte] = BEGIN ENABLE UNWIND => NULL; IF inputBuffer # NIL AND inputFinger + 2 < inputBufferSize THEN BEGIN -- "+2" lets GetBlock give back the buffer if we take the last byte byte ¬ inputBuffer.pup.pupBytes[inputFinger]; inputFinger ¬ inputFinger + 1; RETURN; END ELSE BEGIN array: PACKED ARRAY [0..1] OF Stream.Byte; [] ¬ sH.get[sH, [@array, 0, 1], [FALSE, FALSE, FALSE, TRUE, TRUE, TRUE]]; RETURN[array[0]]; END; END; GetTimeout: PROC [sH: Stream.Handle] RETURNS [waitTime: LONG CARDINAL] = {RETURN [myPs.getTimeout[]]}; GetWord: --ENTRY-- PROC[sH: Stream.Handle] RETURNS [word: Stream.Word] = BEGIN OPEN w: LOOPHOLE[word, LeftAndRight]; w.left ¬ GetByte[sH]; w.right ¬ GetByte[sH]; END; GetBlock: --ENTRY-- PROC[ sH: Stream.Handle, block: Stream.Block, options: Stream.InputOptions] RETURNS [ bytesTransferred: CARDINAL, why: Stream.CompletionCode, sst: Stream.SubSequenceType] = BEGIN ENABLE UNWIND => NULL; input: Stream.Block; moved: CARDINAL; bytesTransferred ¬ 0; sst ¬ inputSST; why ¬ normal; WHILE block.startIndex < block.stopIndexPlusOne DO UNTIL inputBuffer # NIL DO inputFinger ¬ 0; inputBuffer ¬ myPs.get[]; IF inputBuffer = NIL THEN SIGNAL Stream.TimeOut[block.startIndex] ELSE IF inputBuffer.pup.pupType = mark OR inputBuffer.pup.pupType = aMark THEN BEGIN sst ¬ inputSST ¬ inputBuffer.pup.pupBytes[0]; ReturnBuffer[@inputBuffer]; IF options.signalSSTChange THEN SIGNAL Stream.SSTChange[inputSST, block.startIndex] ELSE BEGIN why ¬ sstChange; RETURN; END; END; ENDLOOP; inputBufferSize ¬ GetPupContentsBytes[inputBuffer]; input ¬ [ blockPointer: --Krock in Environment-- LOOPHOLE[ @inputBuffer.pup.pupBytes], startIndex: inputFinger, stopIndexPlusOne: inputBufferSize]; moved ¬ ByteBlt.ByteBlt[block, input]; bytesTransferred ¬ bytesTransferred + moved; block.startIndex ¬ block.startIndex + moved; inputFinger ¬ inputFinger + moved; IF inputFinger = inputBufferSize THEN BEGIN ReturnBuffer[@inputBuffer]; END; IF inputBuffer = NIL AND block.startIndex < block.stopIndexPlusOne AND options.signalLongBlock THEN BEGIN SIGNAL Stream.LongBlock[block.startIndex]; END; IF inputBuffer = NIL AND options.terminateOnEndRecord THEN BEGIN why ¬ endRecord; EXIT; END; ENDLOOP; IF inputBuffer # NIL AND options.signalShortBlock THEN BEGIN ERROR Stream.ShortBlock; END; END; PutByte: --ENTRY-- PROC[sH: Stream.Handle, byte: Stream.Byte] = BEGIN ENABLE UNWIND => NULL; IF outputBuffer # NIL AND outputFinger + 2 < outputBufferSize THEN BEGIN -- "+2" lets PutBlock flush the buffer if we fill the last byte outputBuffer.pup.pupBytes[outputFinger] ¬ byte; outputFinger ¬ outputFinger + 1; RETURN; END ELSE BEGIN array: PACKED ARRAY [0..1] OF Stream.Byte ¬ [byte, ]; PutBlock[sH, [@array, 0, 1], FALSE]; END; END; PutWord: --ENTRY-- PROC[sH: Stream.Handle, word: Stream.Word] = BEGIN OPEN w: LOOPHOLE[word, LeftAndRight]; sH.putByte[sH, w.left]; sH.putByte[sH, w.right]; END; PutBlock: --ENTRY-- PROC[ sH: Stream.Handle, block: Stream.Block, endRecord: BOOLEAN] = BEGIN ENABLE UNWIND => NULL; output: Stream.Block; moved: CARDINAL; IF outputBufferSize = 0 THEN outputBufferSize ¬ myPs.getSenderSizeLimit[]; WHILE block.startIndex < block.stopIndexPlusOne DO IF outputFinger = outputBufferSize THEN FlushOutputBuffer[]; IF outputBuffer = NIL THEN BEGIN outputBuffer ¬ PupDefs.GetBuffer[PupPktOps.pupBuffers, send]; outputBuffer.pup.pupType ¬ data; outputFinger ¬ 0; END; output ¬ [ blockPointer: --Krock-- LOOPHOLE[ @outputBuffer.pup.pupBytes], startIndex: outputFinger, stopIndexPlusOne: outputBufferSize]; moved ¬ ByteBlt.ByteBlt[output, block]; block.startIndex ¬ block.startIndex + moved; outputFinger ¬ outputFinger + moved; ENDLOOP; IF (endRecord AND outputFinger # 0) OR outputFinger = outputBufferSize THEN BEGIN IF outputBuffer = NIL THEN BEGIN outputBuffer ¬ PupDefs.GetBuffer[PupPktOps.pupBuffers, send]; outputBuffer.pup.pupType ¬ data; outputFinger ¬ 0; END; IF endRecord THEN outputBuffer.pup.pupType ¬ aData; FlushOutputBuffer[]; IF endRecord THEN outputFinger ¬ 0; END; END; SendNow: PROC[sH: Stream.Handle, endRecord: BOOLEAN ¬ TRUE] = BEGIN IF outputBuffer = NIL THEN BEGIN outputBuffer ¬ PupDefs.GetBuffer[PupPktOps.pupBuffers, send]; outputBuffer.pup.pupType ¬ data; outputFinger ¬ 0; END; IF endRecord THEN outputBuffer.pup.pupType ¬ aData ELSE outputBuffer.pup.pupType ¬ data; FlushOutputBuffer[]; END; SendMark: --ENTRY-- PROC[sH: Stream.Handle, sst: Stream.SubSequenceType] = BEGIN ENABLE UNWIND => NULL; FlushOutputBuffer[]; outputFinger ¬ 0; myPs.putMark[sst]; END; SetTimeout: PROC [sH: Stream.Handle, waitTime: LONG CARDINAL] = BEGIN myPs.setTimeout[waitTime]; END; --SetTimeout FlushOutputBuffer: PROC = BEGIN b: PupBuffer; -- don't leave outputBuffer dangling in case of StreamClosing IF outputBuffer = NIL THEN RETURN; b ¬ outputBuffer; outputBuffer ¬ NIL; SetPupContentsBytes[b, outputFinger]; myPs.put[b]; END; ReturnBuffer: PROC[p: LONG POINTER TO PupBuffer] = {b: PupBuffer; b ¬ p­; p­ ¬ NIL; PupDefs.ReturnBuffer[b]}; END. LOG Time: 14-Dec-81 10:55:04 By: AOF Action: Adjust to new stream interface Time: 6-Apr-83 13:53:47 By: SMA Action: Update for Klamath integration. Time: 19-May-83 10:11:02 By: SMA Action: Convert to new BufferMgr. Time: 20-Jun-83 13:15:19 By: SMA Action: Set pupType for data buffer. Time: 12-Jul-83 11:47:03 By: AOF Action: 32-bit procs. Time: 13-Dec-84 13:40:10 By: SMA SendNow will flush if not endRecord. Time: 14-Dec-84 17:47:49 By: SMA Added GetTimeout and SetTimeout. Time: 2-Sep-86 14:47:38 By: AOF New Buffer mgmt. Time: 17-Feb-88 15:23:25 By: AOF Fix for compiler bug NEWing self.