STPStreamsImpl.mesa
Copyright © 1985 by Xerox Corporation. All rights reserved.
A package for producing IO streams on FTP remote files, suited for ChipNDale I/O.
written by Ch. Jacobi, January 13, 1984 10:54 am
based on module RemoteFiles written by E. McCreight, November 9, 1983 12:42 pm
Last Edited by: Jacobi, May 23, 1985 4:37:54 pm PDT
DIRECTORY
BasicTime,
FS,
FSRemoteFile,
IO,
Process,
Rope,
STPStreams;
STPStreamsImpl: CEDAR MONITOR
IMPORTS BasicTime, IO, FSRemoteFile, Process
EXPORTS STPStreams =
BEGIN
buffSize: NAT = 200;
STPStreamDataRef: TYPE = REF STPStreamData;
STPStreamData: TYPE = RECORD [-- describes both, an input stream and an outputstream
nonFull: CONDITION,
nonEmpty: CONDITION,
eof: BOOLFALSE,
hasBlock: BOOL,
--if hasBlock
block: REF READONLY TEXTNIL,
startIndex: NAT, count: NAT,
--if not hasBlock
in, out, num: NAT𡤀,
buff: ARRAY [0..buffSize) OF CHAR
];
Confirm: FSRemoteFile.ConfirmProc = {RETURN [TRUE]};
--Users--OpenSTP: PUBLIC PROC [ fileName: Rope.ROPE, accessOptions: FS.AccessOptions ← read, host: Rope.ROPENIL ] RETURNS [ self: IO.STREAM ] =
BEGIN -- simplified for output to Oliver servers like [vice]
SELECT accessOptions FROM
create => NULL;
ENDCASE => ERROR IO.Error[NotImplementedForThisStream, NIL];
BEGIN
sd: STPStreamDataRef ← NEW[STPStreamData];
userProcs: REF IO.StreamProcs = IO.CreateStreamProcs[
variety: $output, class: $JacobiSTPUsersOutputStreams,
putChar: UsersSendCharToSTP,
putBlock: UsersSendBlockToSTP,
close: UsersCloseSTP
];
invertedProcs: REF IO.StreamProcs = IO.CreateStreamProcs[
variety: $input, class: $JacobiSTPsGetFromUserStreams,
getChar: STPsGetCharFromUser,
getBlock: STPsGetBlockFromUser,
endOf: STPsEndOf
];
inverted: IO.STREAM;
SELECT accessOptions FROM
create => NULL;
ENDCASE => ERROR IO.Error[NotImplementedForThisStream, NIL];
IF host=NIL THEN host ← "indigo";
--initialize conditions; is it necessary, or is something done automaticelly???????
--DisableTimeout is not necessary really
TRUSTED {
Process.DisableTimeout[@sd.nonEmpty];
Process.EnableAborts[@sd.nonEmpty];
Process.DisableTimeout[@sd.nonFull];
Process.EnableAborts[@sd.nonFull];
};
self ← IO.CreateStream[userProcs, sd];
inverted ← IO.CreateStream[invertedProcs, sd];
TRUSTED { Process.Detach[
FORK ForkedSTPStore[fileName: fileName, host: host, inStream: inverted] ]
};
END
END;
ForkedSTPStore: PROC [fileName: Rope.ROPE, host: Rope.ROPE, inStream: IO.STREAM] =
BEGIN
FSRemoteFile.Store[
server: host,
file: fileName,
str: inStream,
created: BasicTime.Now[],--may raise TimeNotKnown
proc: Confirm
];
END;
UsersSendBlockToSTP: ENTRY PROC [ self: IO.STREAM, block: REF READONLY TEXT, startIndex: NAT, count: NAT ] =
BEGIN
ENABLE UNWIND => NULL;
sd: STPStreamDataRef = NARROW[self.streamData];
IF count=0 THEN RETURN;
WHILE sd.num>0 DO WAIT sd.nonFull ENDLOOP;
IF sd.hasBlock THEN ERROR;
sd.hasBlock ← TRUE;
sd.block ← block;
sd.startIndex ← startIndex;
sd.count ← count;
BROADCAST sd.nonEmpty;
WHILE sd.hasBlock DO WAIT sd.nonFull ENDLOOP
END;
STPsGetBlockFromUser: ENTRY PROC [ self: IO.STREAM, block: REF TEXT, startIndex: NAT, count: NAT] RETURNS [ nBytesRead: NAT ] =
BEGIN
ENABLE UNWIND => NULL;
sd: STPStreamDataRef = NARROW[self.streamData];
WHILE sd.num<=0 AND NOT sd.hasBlock DO
IF sd.eof THEN RETURN WITH ERROR IO.EndOfStream[self]
ELSE WAIT sd.nonEmpty
ENDLOOP;
IF sd.num>0 THEN {
sd.num ← sd.num-1;
sd.out ← (sd.out+1) MOD buffSize;
block[startIndex] ← sd.buff[sd.out];
nBytesRead ← 1
}
ELSE IF sd.hasBlock THEN {
n: NAT = MAX[count, sd.count];
THROUGH [0..n) DO
block[startIndex] ← sd.block[sd.startIndex];
startIndex ← startIndex+1;
sd.startIndex ← sd.startIndex+1;
ENDLOOP;
sd.count ← sd.count-n;
IF sd.count=0 THEN sd.hasBlock←FALSE;
}
ELSE ERROR;
BROADCAST sd.nonFull
END;
UsersSendCharToSTP: ENTRY PROC [ self: IO.STREAM, char: CHAR ] =
BEGIN
ENABLE UNWIND => NULL;
sd: STPStreamDataRef = NARROW[self.streamData];
IF sd.hasBlock THEN ERROR;
WHILE sd.num>=buffSize OR sd.hasBlock DO WAIT sd.nonFull ENDLOOP;
sd.num ← sd.num+1;
sd.in ← (sd.in+1) MOD buffSize;
sd.buff[sd.in] ← char;
BROADCAST sd.nonEmpty
END;
STPsGetCharFromUser: ENTRY PROC [ self: IO.STREAM] RETURNS [ char: CHAR ] =
BEGIN
ENABLE UNWIND => NULL;
sd: STPStreamDataRef = NARROW[self.streamData];
WHILE sd.num<=0 AND NOT sd.hasBlock DO
IF sd.eof THEN RETURN WITH ERROR IO.EndOfStream[self]
ELSE WAIT sd.nonEmpty
ENDLOOP;
IF sd.num>0 THEN {
sd.num ← sd.num-1;
sd.out ← (sd.out+1) MOD buffSize;
char ← sd.buff[sd.out];
}
ELSE IF sd.hasBlock AND sd.count>0 THEN {
char ← sd.block[sd.startIndex];
sd.startIndex ← sd.startIndex+1;
sd.count ← sd.count-1;
IF sd.count=0 THEN sd.hasBlock←FALSE;
}
ELSE ERROR;
BROADCAST sd.nonFull
END;
STPsEndOf: ENTRY PROC [ self: IO.STREAM] RETURNS [ BOOL ] =
BEGIN
ENABLE UNWIND => NULL;
sd: STPStreamDataRef = NARROW[self.streamData];
RETURN[sd.num<=0 AND sd.eof]
END;
UsersCloseSTP: ENTRY PROC [ self: IO.STREAM, abort: BOOL ] =
BEGIN
ENABLE UNWIND => NULL;
WHAT DO WITH ABORT??
ED'S VERSION CATCHES ERROR
sd: STPStreamDataRef = NARROW[self.streamData];
sd.eof ← TRUE;
END;
END.