-- File: CourierImplF.mesa - last edit:
-- AOF 11-Jan-88 19:50:46
-- SMA 10-Jan-85 14:40:41
-- Copyright (C) 1984, 1985, 1986, 1987, 1988 by Xerox Corporation. All rights reserved.
DIRECTORY
ByteBlt USING [ByteBlt],
Courier USING [Error, ErrorCode],
CourierInternal USING [
AugmentedStream, AugmentedObj, ConnectionHandle, ConnectionObject,
longZone, UserConnection, Redirect, RedirectedObject],
CourierOps USING [],
CourierProtocol USING [dataSST],
Stream USING [
Attention, Block, CompletionCode, Handle, InputOptions,
InvalidOperation, Object, Position, PutProcedure, SSTChange,
SubSequenceType, defaultObject, GetProcedure, GetSSTProcedure,
GetTimeoutProcedure, SendAttentionProcedure, SendNowProcedure,
SetSSTProcedure, SetTimeoutProcedure, WaitAttentionProcedure];
CourierImplF: PROGRAM
IMPORTS ByteBlt, Courier, CourierInternal, Stream
EXPORTS CourierInternal, CourierOps =
BEGIN
Block: TYPE = Stream.Block;
Handle: TYPE = Stream.Handle;
Position: TYPE = Stream.Position;
InputOptions: TYPE = Stream.InputOptions;
CompletionCode: TYPE = Stream.CompletionCode;
SubSequenceType: TYPE = Stream.SubSequenceType;
RealStream: PROC[sH: Stream.Handle] RETURNS[Stream.Handle] = INLINE
BEGIN
RETURN[@LOOPHOLE[
sH, CourierInternal.AugmentedStream].back.transFilter.object];
END; --RealStream
Get: Stream.GetProcedure =
BEGIN
gotSST: Stream.SubSequenceType;
ch: CourierInternal.ConnectionHandle = LOOPHOLE[
sH, CourierInternal.AugmentedStream].back;
sH ← RealStream[sH]; --get the real stream
why ← normal; sst ← ch.lastSST; --in case the block is a null block
bytesTransferred ← 0; --to make the loop invariant work
UNTIL ((block.stopIndexPlusOne - block.startIndex) = bytesTransferred) DO
BEGIN
[bytesTransferred, why, gotSST] ← sH.get[sH, block, options !
Stream.SSTChange =>
BEGIN
bytesTransferred ← nextIndex; --moved that many bytes
gotSST ← sst; --copy the new sst into local variable
why ← sstChange; --that's what it will be if we process it
IF ch.lastSST = CourierProtocol.dataSST THEN CONTINUE; --my data
ch.lastSST ← sst; --copy this into our state
REJECT; --and let the client deal with it
END];
IF why = sstChange THEN
SELECT TRUE FROM
(gotSST = CourierProtocol.dataSST) => GOTO truncated; --short
(ch.lastSST # CourierProtocol.dataSST) => GOTO notFinished;
(bytesTransferred # 0) => GOTO lostData; --that was my data
ENDCASE => GOTO notFinished --no bytes of my data moved
ELSE
SELECT TRUE FROM
(gotSST # CourierProtocol.dataSST) => GOTO exit; --it wasn't my data
(bytesTransferred # 0) => GOTO lostData; --that data was mine
ENDCASE => LOOP; --my sst but no bytes - just ignore
EXITS
notFinished => block.startIndex ← block.startIndex + bytesTransferred;
lostData =>
BEGIN
RedirectStream[ch, block, bytesTransferred, why, gotSST]; --backtrack
WITH vep: ch SELECT FROM
user => SELECT TRUE FROM
(vep.versExchProc = NIL) => GOTO truncated; --already been done
(vep.versExchProc[ch] # noError) => GOTO truncated;
ENDCASE; --version exchange and it's okay - do .get again
ENDCASE => GOTO truncated; --wasn't a user connection
END;
END;
ch.lastSST ← gotSST; --record the last sst we've seen
bytesTransferred ← 0; --reset counter since we modified block
REPEAT
exit => sst ← ch.lastSST ← gotSST; --this is what the client sees
truncated =>
{ch.lastSST ← gotSST; ERROR Courier.Error[truncatedTransfer]};
ENDLOOP;
END;
GetSST: Stream.GetSSTProcedure =
BEGIN
sH ← RealStream[sH]; --get the real stream
RETURN[sH.getSST[sH]];
END;
GetTimeout: Stream.GetTimeoutProcedure =
BEGIN
sH ← RealStream[sH]; --get the real stream
RETURN[sH.getTimeout[sH]];
END; --GetTimeout
Put: Stream.PutProcedure =
BEGIN
sH ← RealStream[sH]; --get the real stream
sH.put[sH, block, endRecord];
END; --Put
SendAttn: Stream.SendAttentionProcedure =
BEGIN
sH ← RealStream[sH]; --get the real stream
sH.sendAttention[sH, byte];
END; --SendAttn
SendNow: Stream.SendNowProcedure =
BEGIN
sH ← RealStream[sH]; --get the real stream
sH.sendNow[sH, endRecord];
END; --SendNow
SetSST: Stream.SetSSTProcedure =
BEGIN
sH ← RealStream[sH]; --get the real stream
IF sst # CourierProtocol.dataSST THEN sH.setSST[sH, sst]
ELSE ERROR Stream.InvalidOperation;
END;
SetTimeout: Stream.SetTimeoutProcedure =
BEGIN
sH ← RealStream[sH]; --get the real stream
sH.setTimeout[sH, waitTime];
END; --SetTimeout
RedirectStream: PUBLIC <<CourierOps>> PROC [
ch: CourierInternal.ConnectionHandle, block: Stream.Block,
bytesTransferred: CARDINAL, why: Stream.CompletionCode,
sst: Stream.SubSequenceType] =
BEGIN
WITH h: ch SELECT FROM
user, server =>
BEGIN
rH: CourierInternal.Redirect ← h.redirected;
IF rH # NIL THEN RETURN; --we've already done this
rH ← h.redirected ← CourierInternal.longZone.NEW[
CourierInternal.RedirectedObject[bytesTransferred] ← [
directGet: h.transFilter.object.get, --save direct .get proc
connection: @h, --the owning connection
why: why, --the original completion code
sst: sst, --and the observed sst
block: , text: ]];
--copy the captured data into the object
rH.block ← [LOOPHOLE[@rH.text], 0, bytesTransferred];
block.stopIndexPlusOne ← block.startIndex + bytesTransferred;
[] ← ByteBlt.ByteBlt[to: rH.block, from: block];
h.transFilter.object.get ← RedirectedGet; --replace get routine
END;
ENDCASE;
END;
RedirectedGet: Stream.GetProcedure =
BEGIN
--sH points to the direct stream object with this new Get procedure.
ch: CourierInternal.UserConnection ← LOOPHOLE[
LOOPHOLE[sH, CourierInternal.AugmentedStream].back];
rH: CourierInternal.Redirect ← ch.redirected;
bytesTransferred ← ByteBlt.ByteBlt[from: rH.block, to: block]; --move data
sst ← CourierProtocol.dataSST; why ← normal; --set default values
rH.block.startIndex ← rH.block.startIndex + bytesTransferred; --consume ours
block.startIndex ← block.startIndex + bytesTransferred; --fill his
IF block.startIndex = block.stopIndexPlusOne THEN RETURN; --all normal
IF rH.block.startIndex = rH.block.stopIndexPlusOne THEN
BEGIN
why ← rH.why; sst ← rH.sst; --he should get these
ch.transFilter.object.get ← rH.directGet; --the original .get proc.
CourierInternal.longZone.FREE[@ch.redirected]; --free up and NIL
SELECT why FROM
normal =>
BEGIN
--his original get was shorter than this one
bytes: CARDINAL;
[bytes, why, sst] ← sH.get[sH, block, options]; --from real .get proc
bytesTransferred ← bytesTransferred + bytes; --add these bytes on
END;
sstChange =>
BEGIN
--original get ended in sst change - so should this one
IF options.signalSSTChange THEN
SIGNAL Stream.SSTChange[sst, block.startIndex];
END;
attention =>
BEGIN
--original get ended with attention - so should this one
IF options.signalAttention THEN
SIGNAL Stream.Attention[block.startIndex];
END;
ENDCASE;
END;
END;
SetBulkStream: PUBLIC PROC [ch: CourierInternal.ConnectionHandle] =
BEGIN
ch.object.sH ← @ch.bulkFilter.object; --that's the one the client gets
ch.bulkFilter.back ← ch; --maintain back pointer to owning courier object
ch.bulkFilter.object ← Stream.defaultObject; --just fill the slots
<<
The following procedures are supported through the filtered stream.
The other I/O procs (getByte, putWord, etc) are supported by the default
stream via the Get and Put procs.
>>
ch.bulkFilter.object.get ← Get;
ch.bulkFilter.object.put ← Put;
ch.bulkFilter.object.setSST ← SetSST;
ch.bulkFilter.object.getSST ← GetSST;
ch.bulkFilter.object.getTimeout ← GetTimeout;
ch.bulkFilter.object.sendNow ← SendNow;
ch.bulkFilter.object.sendAttention ← SendAttn;
ch.bulkFilter.object.setTimeout ← SetTimeout;
ch.bulkFilter.object.waitAttention ← WaitAttn;
END; --SetBulkStream
WaitAttn: Stream.WaitAttentionProcedure =
BEGIN
sH ← RealStream[sH]; --get the real stream
RETURN[sH.waitAttention[sH]];
END; --WaitAttn
END.... --CourierImplF.mesa
LOG
21-Jul-82 10:09:41 AOF Created file.
23-Jul-82 10:49:02 AOF Use raw stream procs when possible.
4-Aug-82 14:43:58 AOF Add SetStreamTimeout procedure.
11-Aug-82 10:02:33 AOF Add EXPORT Courier and pass Get's options to stream.
18-Aug-82 13:13:54 AOF Capture Courier data consumed by client.
29-Mar-83 13:50:48 AOF Klamath changes (default stream, ~unbound link).
6-Apr-83 13:52:07 AOF Klamath changes (stream.SetTimeout, etc).
29-Jun-84 10:23:39 SMA Factor Courier from NetworkStreams.
10-Jul-84 10:43:53 SMA Bulk data factoring.
21-Dec-84 13:12:23 SMA AugmentedStreams and new interfaces.
10-Jan-85 14:40:51 SMA SetBulkData does not create new object (it's in ch now).
14-Jan-86 11:37:14 AOF Cleanup of LOOPHOLES and filtered stream
4-Nov-87 12:23:50 AOF Trapping sst changes vs redirected streams, etc
8-Jan-88 15:43:39 AOF Allowing for Adobe's version of bulk data