-- File: CourierImplF.mesa - last edit: -- AOF 11-Jan-88 19:50:46 -- SMA 10-Jan-85 14:40:41 -- Copyright (C) 1984, 1985, 1986, 1987, 1988 by Xerox Corporation. All rights reserved. DIRECTORY ByteBlt USING [ByteBlt], Courier USING [Error, ErrorCode], CourierInternal USING [ AugmentedStream, AugmentedObj, ConnectionHandle, ConnectionObject, longZone, UserConnection, Redirect, RedirectedObject], CourierOps USING [], CourierProtocol USING [dataSST], Stream USING [ Attention, Block, CompletionCode, Handle, InputOptions, InvalidOperation, Object, Position, PutProcedure, SSTChange, SubSequenceType, defaultObject, GetProcedure, GetSSTProcedure, GetTimeoutProcedure, SendAttentionProcedure, SendNowProcedure, SetSSTProcedure, SetTimeoutProcedure, WaitAttentionProcedure]; CourierImplF: PROGRAM IMPORTS ByteBlt, Courier, CourierInternal, Stream EXPORTS CourierInternal, CourierOps = BEGIN Block: TYPE = Stream.Block; Handle: TYPE = Stream.Handle; Position: TYPE = Stream.Position; InputOptions: TYPE = Stream.InputOptions; CompletionCode: TYPE = Stream.CompletionCode; SubSequenceType: TYPE = Stream.SubSequenceType; RealStream: PROC[sH: Stream.Handle] RETURNS[Stream.Handle] = INLINE BEGIN RETURN[@LOOPHOLE[ sH, CourierInternal.AugmentedStream].back.transFilter.object]; END; --RealStream Get: Stream.GetProcedure = BEGIN gotSST: Stream.SubSequenceType; ch: CourierInternal.ConnectionHandle = LOOPHOLE[ sH, CourierInternal.AugmentedStream].back; sH ← RealStream[sH]; --get the real stream why ← normal; sst ← ch.lastSST; --in case the block is a null block bytesTransferred ← 0; --to make the loop invariant work UNTIL ((block.stopIndexPlusOne - block.startIndex) = bytesTransferred) DO BEGIN [bytesTransferred, why, gotSST] ← sH.get[sH, block, options ! Stream.SSTChange => BEGIN bytesTransferred ← nextIndex; --moved that many bytes gotSST ← sst; --copy the new sst into local variable why ← sstChange; --that's what it will be if we process it IF ch.lastSST = CourierProtocol.dataSST THEN CONTINUE; --my data ch.lastSST ← sst; --copy this into our state REJECT; --and let the client deal with it END]; IF why = sstChange THEN SELECT TRUE FROM (gotSST = CourierProtocol.dataSST) => GOTO truncated; --short (ch.lastSST # CourierProtocol.dataSST) => GOTO notFinished; (bytesTransferred # 0) => GOTO lostData; --that was my data ENDCASE => GOTO notFinished --no bytes of my data moved ELSE SELECT TRUE FROM (gotSST # CourierProtocol.dataSST) => GOTO exit; --it wasn't my data (bytesTransferred # 0) => GOTO lostData; --that data was mine ENDCASE => LOOP; --my sst but no bytes - just ignore EXITS notFinished => block.startIndex ← block.startIndex + bytesTransferred; lostData => BEGIN RedirectStream[ch, block, bytesTransferred, why, gotSST]; --backtrack WITH vep: ch SELECT FROM user => SELECT TRUE FROM (vep.versExchProc = NIL) => GOTO truncated; --already been done (vep.versExchProc[ch] # noError) => GOTO truncated; ENDCASE; --version exchange and it's okay - do .get again ENDCASE => GOTO truncated; --wasn't a user connection END; END; ch.lastSST ← gotSST; --record the last sst we've seen bytesTransferred ← 0; --reset counter since we modified block REPEAT exit => sst ← ch.lastSST ← gotSST; --this is what the client sees truncated => {ch.lastSST ← gotSST; ERROR Courier.Error[truncatedTransfer]}; ENDLOOP; END; GetSST: Stream.GetSSTProcedure = BEGIN sH ← RealStream[sH]; --get the real stream RETURN[sH.getSST[sH]]; END; GetTimeout: Stream.GetTimeoutProcedure = BEGIN sH ← RealStream[sH]; --get the real stream RETURN[sH.getTimeout[sH]]; END; --GetTimeout Put: Stream.PutProcedure = BEGIN sH ← RealStream[sH]; --get the real stream sH.put[sH, block, endRecord]; END; --Put SendAttn: Stream.SendAttentionProcedure = BEGIN sH ← RealStream[sH]; --get the real stream sH.sendAttention[sH, byte]; END; --SendAttn SendNow: Stream.SendNowProcedure = BEGIN sH ← RealStream[sH]; --get the real stream sH.sendNow[sH, endRecord]; END; --SendNow SetSST: Stream.SetSSTProcedure = BEGIN sH ← RealStream[sH]; --get the real stream IF sst # CourierProtocol.dataSST THEN sH.setSST[sH, sst] ELSE ERROR Stream.InvalidOperation; END; SetTimeout: Stream.SetTimeoutProcedure = BEGIN sH ← RealStream[sH]; --get the real stream sH.setTimeout[sH, waitTime]; END; --SetTimeout RedirectStream: PUBLIC <<CourierOps>> PROC [ ch: CourierInternal.ConnectionHandle, block: Stream.Block, bytesTransferred: CARDINAL, why: Stream.CompletionCode, sst: Stream.SubSequenceType] = BEGIN WITH h: ch SELECT FROM user, server => BEGIN rH: CourierInternal.Redirect ← h.redirected; IF rH # NIL THEN RETURN; --we've already done this rH ← h.redirected ← CourierInternal.longZone.NEW[ CourierInternal.RedirectedObject[bytesTransferred] ← [ directGet: h.transFilter.object.get, --save direct .get proc connection: @h, --the owning connection why: why, --the original completion code sst: sst, --and the observed sst block: , text: ]]; --copy the captured data into the object rH.block ← [LOOPHOLE[@rH.text], 0, bytesTransferred]; block.stopIndexPlusOne ← block.startIndex + bytesTransferred; [] ← ByteBlt.ByteBlt[to: rH.block, from: block]; h.transFilter.object.get ← RedirectedGet; --replace get routine END; ENDCASE; END; RedirectedGet: Stream.GetProcedure = BEGIN --sH points to the direct stream object with this new Get procedure. ch: CourierInternal.UserConnection ← LOOPHOLE[ LOOPHOLE[sH, CourierInternal.AugmentedStream].back]; rH: CourierInternal.Redirect ← ch.redirected; bytesTransferred ← ByteBlt.ByteBlt[from: rH.block, to: block]; --move data sst ← CourierProtocol.dataSST; why ← normal; --set default values rH.block.startIndex ← rH.block.startIndex + bytesTransferred; --consume ours block.startIndex ← block.startIndex + bytesTransferred; --fill his IF block.startIndex = block.stopIndexPlusOne THEN RETURN; --all normal IF rH.block.startIndex = rH.block.stopIndexPlusOne THEN BEGIN why ← rH.why; sst ← rH.sst; --he should get these ch.transFilter.object.get ← rH.directGet; --the original .get proc. CourierInternal.longZone.FREE[@ch.redirected]; --free up and NIL SELECT why FROM normal => BEGIN --his original get was shorter than this one bytes: CARDINAL; [bytes, why, sst] ← sH.get[sH, block, options]; --from real .get proc bytesTransferred ← bytesTransferred + bytes; --add these bytes on END; sstChange => BEGIN --original get ended in sst change - so should this one IF options.signalSSTChange THEN SIGNAL Stream.SSTChange[sst, block.startIndex]; END; attention => BEGIN --original get ended with attention - so should this one IF options.signalAttention THEN SIGNAL Stream.Attention[block.startIndex]; END; ENDCASE; END; END; SetBulkStream: PUBLIC PROC [ch: CourierInternal.ConnectionHandle] = BEGIN ch.object.sH ← @ch.bulkFilter.object; --that's the one the client gets ch.bulkFilter.back ← ch; --maintain back pointer to owning courier object ch.bulkFilter.object ← Stream.defaultObject; --just fill the slots << The following procedures are supported through the filtered stream. The other I/O procs (getByte, putWord, etc) are supported by the default stream via the Get and Put procs. >> ch.bulkFilter.object.get ← Get; ch.bulkFilter.object.put ← Put; ch.bulkFilter.object.setSST ← SetSST; ch.bulkFilter.object.getSST ← GetSST; ch.bulkFilter.object.getTimeout ← GetTimeout; ch.bulkFilter.object.sendNow ← SendNow; ch.bulkFilter.object.sendAttention ← SendAttn; ch.bulkFilter.object.setTimeout ← SetTimeout; ch.bulkFilter.object.waitAttention ← WaitAttn; END; --SetBulkStream WaitAttn: Stream.WaitAttentionProcedure = BEGIN sH ← RealStream[sH]; --get the real stream RETURN[sH.waitAttention[sH]]; END; --WaitAttn END.... --CourierImplF.mesa LOG 21-Jul-82 10:09:41 AOF Created file. 23-Jul-82 10:49:02 AOF Use raw stream procs when possible. 4-Aug-82 14:43:58 AOF Add SetStreamTimeout procedure. 11-Aug-82 10:02:33 AOF Add EXPORT Courier and pass Get's options to stream. 18-Aug-82 13:13:54 AOF Capture Courier data consumed by client. 29-Mar-83 13:50:48 AOF Klamath changes (default stream, ~unbound link). 6-Apr-83 13:52:07 AOF Klamath changes (stream.SetTimeout, etc). 29-Jun-84 10:23:39 SMA Factor Courier from NetworkStreams. 10-Jul-84 10:43:53 SMA Bulk data factoring. 21-Dec-84 13:12:23 SMA AugmentedStreams and new interfaces. 10-Jan-85 14:40:51 SMA SetBulkData does not create new object (it's in ch now). 14-Jan-86 11:37:14 AOF Cleanup of LOOPHOLES and filtered stream 4-Nov-87 12:23:50 AOF Trapping sst changes vs redirected streams, etc 8-Jan-88 15:43:39 AOF Allowing for Adobe's version of bulk data