-- Copyright (C) 1985, 1986 by Xerox Corporation. All rights reserved.
-- PipeImpl.mesa
-- NFS 21-Aug-85 10:39:27
DIRECTORY
Environment USING [Byte],
Heap USING [Create],
Pipe USING [defaultBufferSize, NWords],
Process USING [EnableAborts],
Stream USING [
defaultObject, DeleteProcedure, GetByteProcedure, GetProcedure, Handle,
InvalidOperation, Object, PutByteProcedure];
PipeImpl: MONITOR LOCKS h USING h: Handle
IMPORTS Heap, Process, Stream EXPORTS Pipe =
{
Handle: TYPE = LONG POINTER TO Object;
Object: PUBLIC TYPE = MONITORED RECORD [
producer, consumer: PipeStreamHandle,
inPos, outPos: BufferPosition,
BytesInBuffer, SpaceInBuffer: CONDITION,
buffer: PACKED SEQUENCE bufferSize: CARDINAL OF Environment.Byte];
BufferPosition: TYPE = CARDINAL;
PipeStreamHandle: TYPE = LONG POINTER TO PipeStreamObject;
PipeStreamObject: TYPE = RECORD [streamObject: Stream.Object, pipe: Handle];
z: UNCOUNTED ZONE = Heap.Create[initial: 3]; << 2 pages for 1 default sized
buffer, and an extra page for pipe overhead.>>
Create: PUBLIC PROCEDURE [bufferSize: Pipe.NWords ← Pipe.defaultBufferSize]
RETURNS [h: Handle, producer, consumer: Stream.Handle] = {
prod: PipeStreamHandle ← z.NEW[PipeStreamObject];
cons: PipeStreamHandle ← z.NEW[PipeStreamObject];
h ← z.NEW[Object [bufferSize]];
h.producer ← prod;
h.consumer ← cons;
h.inPos ← h.outPos ← BufferPosition.FIRST;
prod.pipe ← cons.pipe ← h;
prod.streamObject ← cons.streamObject ← Stream.defaultObject;
prod.streamObject.putByte ← WriteByteToBuffer;
prod.streamObject.getByte ← ErrorGetByte;
prod.streamObject.delete ← cons.streamObject.delete ← NoOpDelete;
cons.streamObject.getByte ← ReadByteFromBuffer;
cons.streamObject.get ← ReadFromBuffer;
cons.streamObject.putByte ← ErrorPutByte;
Process.EnableAborts[@h.BytesInBuffer];
Process.EnableAborts[@h.SpaceInBuffer];
RETURN[h, LOOPHOLE[prod], LOOPHOLE[cons]];
};
Delete: PUBLIC PROCEDURE [h: Handle] = {
z.FREE[@h.producer]; z.FREE[@h.consumer]; z.FREE[@h]; };
WriteByteToBuffer: Stream.PutByteProcedure = {
LockedWriteByte: ENTRY PROC [h: Handle] = {
WHILE BufferFull[h] DO WAIT h.SpaceInBuffer; ENDLOOP;
h.inPos ← (h.inPos + 1) MOD h.bufferSize;
h[h.inPos] ← byte;
NOTIFY h.BytesInBuffer;
};
LockedWriteByte[LOOPHOLE[sH, PipeStreamHandle].pipe];
};
ReadByteFromBuffer: Stream.GetByteProcedure = {
LockedReadByte: ENTRY PROC [h: Handle] = {
WHILE BufferEmpty[h] DO WAIT h.BytesInBuffer; ENDLOOP;
h.outPos ← (h.outPos + 1) MOD h.bufferSize;
byte ← h[h.outPos];
NOTIFY h.SpaceInBuffer;
};
LockedReadByte[LOOPHOLE[sH, PipeStreamHandle].pipe];
};
ReadFromBuffer: Stream.GetProcedure = {
LockedRead: ENTRY PROC [h: Handle] = {
WHILE BufferEmpty[h] DO WAIT h.BytesInBuffer; ENDLOOP;
WHILE block.startIndex < block.stopIndexPlusOne DO
IF BufferEmpty[h] THEN {why ← endRecord; RETURN};
h.outPos ← (h.outPos + 1) MOD h.bufferSize;
block.blockPointer[block.startIndex] ← h[h.outPos];
block.startIndex ← block.startIndex.SUCC;
bytesTransferred ← bytesTransferred.SUCC;
ENDLOOP;
};
bytesTransferred ← sst ← 0;
why ← normal;
LockedRead[LOOPHOLE[sH, PipeStreamHandle].pipe];
};
BufferFull: PROCEDURE [h: Handle] RETURNS [BOOLEAN] = INLINE {
RETURN[((h.inPos + 1) MOD h.bufferSize) = h.outPos]; };
BufferEmpty: PROCEDURE [h: Handle] RETURNS [BOOLEAN] = INLINE {
RETURN[h.inPos = h.outPos]; };
NoOpDelete: Stream.DeleteProcedure = {};
ErrorPutByte: Stream.PutByteProcedure = {ERROR Stream.InvalidOperation; };
ErrorGetByte: Stream.GetByteProcedure = {ERROR Stream.InvalidOperation; };
GetProducer: PUBLIC PROCEDURE [h: Handle] RETURNS [Stream.Handle] = {
RETURN[LOOPHOLE[h.producer]]; };
GetConsumer: PUBLIC PROCEDURE [h: Handle] RETURNS [Stream.Handle] = {
RETURN[LOOPHOLE[h.consumer]]; };
}.