-- Copyright (C) 1985, 1986 by Xerox Corporation. All rights reserved. -- PipeImpl.mesa -- NFS 21-Aug-85 10:39:27 DIRECTORY Environment USING [Byte], Heap USING [Create], Pipe USING [defaultBufferSize, NWords], Process USING [EnableAborts], Stream USING [ defaultObject, DeleteProcedure, GetByteProcedure, GetProcedure, Handle, InvalidOperation, Object, PutByteProcedure]; PipeImpl: MONITOR LOCKS h USING h: Handle IMPORTS Heap, Process, Stream EXPORTS Pipe = { Handle: TYPE = LONG POINTER TO Object; Object: PUBLIC TYPE = MONITORED RECORD [ producer, consumer: PipeStreamHandle, inPos, outPos: BufferPosition, BytesInBuffer, SpaceInBuffer: CONDITION, buffer: PACKED SEQUENCE bufferSize: CARDINAL OF Environment.Byte]; BufferPosition: TYPE = CARDINAL; PipeStreamHandle: TYPE = LONG POINTER TO PipeStreamObject; PipeStreamObject: TYPE = RECORD [streamObject: Stream.Object, pipe: Handle]; z: UNCOUNTED ZONE = Heap.Create[initial: 3]; << 2 pages for 1 default sized buffer, and an extra page for pipe overhead.>> Create: PUBLIC PROCEDURE [bufferSize: Pipe.NWords ← Pipe.defaultBufferSize] RETURNS [h: Handle, producer, consumer: Stream.Handle] = { prod: PipeStreamHandle ← z.NEW[PipeStreamObject]; cons: PipeStreamHandle ← z.NEW[PipeStreamObject]; h ← z.NEW[Object [bufferSize]]; h.producer ← prod; h.consumer ← cons; h.inPos ← h.outPos ← BufferPosition.FIRST; prod.pipe ← cons.pipe ← h; prod.streamObject ← cons.streamObject ← Stream.defaultObject; prod.streamObject.putByte ← WriteByteToBuffer; prod.streamObject.getByte ← ErrorGetByte; prod.streamObject.delete ← cons.streamObject.delete ← NoOpDelete; cons.streamObject.getByte ← ReadByteFromBuffer; cons.streamObject.get ← ReadFromBuffer; cons.streamObject.putByte ← ErrorPutByte; Process.EnableAborts[@h.BytesInBuffer]; Process.EnableAborts[@h.SpaceInBuffer]; RETURN[h, LOOPHOLE[prod], LOOPHOLE[cons]]; }; Delete: PUBLIC PROCEDURE [h: Handle] = { z.FREE[@h.producer]; z.FREE[@h.consumer]; z.FREE[@h]; }; WriteByteToBuffer: Stream.PutByteProcedure = { LockedWriteByte: ENTRY PROC [h: Handle] = { WHILE BufferFull[h] DO WAIT h.SpaceInBuffer; ENDLOOP; h.inPos ← (h.inPos + 1) MOD h.bufferSize; h[h.inPos] ← byte; NOTIFY h.BytesInBuffer; }; LockedWriteByte[LOOPHOLE[sH, PipeStreamHandle].pipe]; }; ReadByteFromBuffer: Stream.GetByteProcedure = { LockedReadByte: ENTRY PROC [h: Handle] = { WHILE BufferEmpty[h] DO WAIT h.BytesInBuffer; ENDLOOP; h.outPos ← (h.outPos + 1) MOD h.bufferSize; byte ← h[h.outPos]; NOTIFY h.SpaceInBuffer; }; LockedReadByte[LOOPHOLE[sH, PipeStreamHandle].pipe]; }; ReadFromBuffer: Stream.GetProcedure = { LockedRead: ENTRY PROC [h: Handle] = { WHILE BufferEmpty[h] DO WAIT h.BytesInBuffer; ENDLOOP; WHILE block.startIndex < block.stopIndexPlusOne DO IF BufferEmpty[h] THEN {why ← endRecord; RETURN}; h.outPos ← (h.outPos + 1) MOD h.bufferSize; block.blockPointer[block.startIndex] ← h[h.outPos]; block.startIndex ← block.startIndex.SUCC; bytesTransferred ← bytesTransferred.SUCC; ENDLOOP; }; bytesTransferred ← sst ← 0; why ← normal; LockedRead[LOOPHOLE[sH, PipeStreamHandle].pipe]; }; BufferFull: PROCEDURE [h: Handle] RETURNS [BOOLEAN] = INLINE { RETURN[((h.inPos + 1) MOD h.bufferSize) = h.outPos]; }; BufferEmpty: PROCEDURE [h: Handle] RETURNS [BOOLEAN] = INLINE { RETURN[h.inPos = h.outPos]; }; NoOpDelete: Stream.DeleteProcedure = {}; ErrorPutByte: Stream.PutByteProcedure = {ERROR Stream.InvalidOperation; }; ErrorGetByte: Stream.GetByteProcedure = {ERROR Stream.InvalidOperation; }; GetProducer: PUBLIC PROCEDURE [h: Handle] RETURNS [Stream.Handle] = { RETURN[LOOPHOLE[h.producer]]; }; GetConsumer: PUBLIC PROCEDURE [h: Handle] RETURNS [Stream.Handle] = { RETURN[LOOPHOLE[h.consumer]]; }; }.