STPStreamData:
TYPE =
RECORD [
-- describes both, an input stream and an outputstream
nonFull: CONDITION,
nonEmpty: CONDITION,
eof: BOOL←FALSE,
hasBlock: BOOL,
--if hasBlock
block: REF READONLY TEXT←NIL,
startIndex: NAT, count: NAT,
--if not hasBlock
in, out, num: NAT𡤀,
buff: ARRAY [0..buffSize) OF CHAR
];
--Users--
OpenSTP:
PUBLIC
PROC [ fileName: Rope.
ROPE, accessOptions:
FS.AccessOptions ← read, host: Rope.
ROPE ←
NIL ]
RETURNS [ self:
IO.
STREAM ] =
BEGIN -- simplified for output to Oliver servers like [vice]
SELECT accessOptions
FROM
create => NULL;
ENDCASE => ERROR IO.Error[NotImplementedForThisStream, NIL];
BEGIN
sd: STPStreamDataRef ← NEW[STPStreamData];
userProcs:
REF
IO.StreamProcs =
IO.CreateStreamProcs[
variety: $output, class: $JacobiSTPUsersOutputStreams,
putChar: UsersSendCharToSTP,
putBlock: UsersSendBlockToSTP,
close: UsersCloseSTP
];
invertedProcs:
REF
IO.StreamProcs =
IO.CreateStreamProcs[
variety: $input, class: $JacobiSTPsGetFromUserStreams,
getChar: STPsGetCharFromUser,
getBlock: STPsGetBlockFromUser,
endOf: STPsEndOf
];
inverted: IO.STREAM;
SELECT accessOptions
FROM
create => NULL;
ENDCASE => ERROR IO.Error[NotImplementedForThisStream, NIL];
IF host=NIL THEN host ← "indigo";
--initialize conditions; is it necessary, or is something done automaticelly???????
--DisableTimeout is not necessary really
TRUSTED {
Process.DisableTimeout[@sd.nonEmpty];
Process.EnableAborts[@sd.nonEmpty];
Process.DisableTimeout[@sd.nonFull];
Process.EnableAborts[@sd.nonFull];
};
self ← IO.CreateStream[userProcs, sd];
inverted ← IO.CreateStream[invertedProcs, sd];
TRUSTED { Process.Detach[
FORK ForkedSTPStore[fileName: fileName, host: host, inStream: inverted] ]
};
END
END;
UsersSendBlockToSTP:
ENTRY
PROC [ self:
IO.
STREAM, block:
REF
READONLY
TEXT, startIndex:
NAT, count:
NAT ] =
BEGIN
ENABLE UNWIND => NULL;
sd: STPStreamDataRef = NARROW[self.streamData];
IF count=0 THEN RETURN;
WHILE sd.num>0 DO WAIT sd.nonFull ENDLOOP;
IF sd.hasBlock THEN ERROR;
sd.hasBlock ← TRUE;
sd.block ← block;
sd.startIndex ← startIndex;
sd.count ← count;
BROADCAST sd.nonEmpty;
WHILE sd.hasBlock DO WAIT sd.nonFull ENDLOOP
END;
STPsGetBlockFromUser:
ENTRY
PROC [ self:
IO.
STREAM, block:
REF
TEXT, startIndex:
NAT, count:
NAT]
RETURNS [ nBytesRead:
NAT ] =
BEGIN
ENABLE UNWIND => NULL;
sd: STPStreamDataRef = NARROW[self.streamData];
WHILE sd.num<=0
AND
NOT sd.hasBlock
DO
IF sd.eof THEN RETURN WITH ERROR IO.EndOfStream[self]
ELSE WAIT sd.nonEmpty
ENDLOOP;
IF sd.num>0
THEN {
sd.num ← sd.num-1;
sd.out ← (sd.out+1) MOD buffSize;
block[startIndex] ← sd.buff[sd.out];
nBytesRead ← 1
}
ELSE
IF sd.hasBlock
THEN {
n: NAT = MAX[count, sd.count];
THROUGH [0..n)
DO
block[startIndex] ← sd.block[sd.startIndex];
startIndex ← startIndex+1;
sd.startIndex ← sd.startIndex+1;
ENDLOOP;
sd.count ← sd.count-n;
IF sd.count=0 THEN sd.hasBlock←FALSE;
}
ELSE ERROR;
BROADCAST sd.nonFull
END;
UsersSendCharToSTP:
ENTRY
PROC [ self:
IO.
STREAM, char:
CHAR ] =
BEGIN
ENABLE UNWIND => NULL;
sd: STPStreamDataRef = NARROW[self.streamData];
IF sd.hasBlock THEN ERROR;
WHILE sd.num>=buffSize OR sd.hasBlock DO WAIT sd.nonFull ENDLOOP;
sd.num ← sd.num+1;
sd.in ← (sd.in+1) MOD buffSize;
sd.buff[sd.in] ← char;
BROADCAST sd.nonEmpty
END;
STPsGetCharFromUser:
ENTRY
PROC [ self:
IO.
STREAM]
RETURNS [ char:
CHAR ] =
BEGIN
ENABLE UNWIND => NULL;
sd: STPStreamDataRef = NARROW[self.streamData];
WHILE sd.num<=0
AND
NOT sd.hasBlock
DO
IF sd.eof THEN RETURN WITH ERROR IO.EndOfStream[self]
ELSE WAIT sd.nonEmpty
ENDLOOP;
IF sd.num>0
THEN {
sd.num ← sd.num-1;
sd.out ← (sd.out+1) MOD buffSize;
char ← sd.buff[sd.out];
}
ELSE
IF sd.hasBlock
AND sd.count>0
THEN {
char ← sd.block[sd.startIndex];
sd.startIndex ← sd.startIndex+1;
sd.count ← sd.count-1;
IF sd.count=0 THEN sd.hasBlock←FALSE;
}
ELSE ERROR;
BROADCAST sd.nonFull
END;