-- File: CourierImplF.mesa - last edit: -- AOF 11-Jan-88 19:50:46 -- SMA 10-Jan-85 14:40:41 -- Copyright (C) 1984, 1985, 1986, 1987, 1988 by Xerox Corporation. All rights reserved. DIRECTORY ByteBlt USING [ByteBlt], Courier USING [Error, ErrorCode], CourierInternal USING [ AugmentedStream, AugmentedObj, ConnectionHandle, ConnectionObject, longZone, UserConnection, Redirect, RedirectedObject], CourierOps USING [], CourierProtocol USING [dataSST], Stream USING [ Attention, Block, CompletionCode, Handle, InputOptions, InvalidOperation, Object, Position, PutProcedure, SSTChange, SubSequenceType, defaultObject, GetProcedure, GetSSTProcedure, GetTimeoutProcedure, SendAttentionProcedure, SendNowProcedure, SetSSTProcedure, SetTimeoutProcedure, WaitAttentionProcedure]; CourierImplF: PROGRAM IMPORTS ByteBlt, Courier, CourierInternal, Stream EXPORTS CourierInternal, CourierOps = BEGIN Block: TYPE = Stream.Block; Handle: TYPE = Stream.Handle; Position: TYPE = Stream.Position; InputOptions: TYPE = Stream.InputOptions; CompletionCode: TYPE = Stream.CompletionCode; SubSequenceType: TYPE = Stream.SubSequenceType; RealStream: PROC[sH: Stream.Handle] RETURNS[Stream.Handle] = INLINE BEGIN RETURN[@LOOPHOLE[ sH, CourierInternal.AugmentedStream].back.transFilter.object]; END; --RealStream Get: Stream.GetProcedure = BEGIN gotSST: Stream.SubSequenceType; ch: CourierInternal.ConnectionHandle = LOOPHOLE[ sH, CourierInternal.AugmentedStream].back; sH ¬ RealStream[sH]; --get the real stream why ¬ normal; sst ¬ ch.lastSST; --in case the block is a null block bytesTransferred ¬ 0; --to make the loop invariant work UNTIL ((block.stopIndexPlusOne - block.startIndex) = bytesTransferred) DO BEGIN [bytesTransferred, why, gotSST] ¬ sH.get[sH, block, options ! Stream.SSTChange => BEGIN bytesTransferred ¬ nextIndex; --moved that many bytes gotSST ¬ sst; --copy the new sst into local variable why ¬ sstChange; --that's what it will be if we process it IF ch.lastSST = CourierProtocol.dataSST THEN CONTINUE; --my data ch.lastSST ¬ sst; --copy this into our state REJECT; --and let the client deal with it END]; IF why = sstChange THEN SELECT TRUE FROM (gotSST = CourierProtocol.dataSST) => GOTO truncated; --short (ch.lastSST # CourierProtocol.dataSST) => GOTO notFinished; (bytesTransferred # 0) => GOTO lostData; --that was my data ENDCASE => GOTO notFinished --no bytes of my data moved ELSE SELECT TRUE FROM (gotSST # CourierProtocol.dataSST) => GOTO exit; --it wasn't my data (bytesTransferred # 0) => GOTO lostData; --that data was mine ENDCASE => LOOP; --my sst but no bytes - just ignore EXITS notFinished => block.startIndex ¬ block.startIndex + bytesTransferred; lostData => BEGIN RedirectStream[ch, block, bytesTransferred, why, gotSST]; --backtrack WITH vep: ch SELECT FROM user => SELECT TRUE FROM (vep.versExchProc = NIL) => GOTO truncated; --already been done (vep.versExchProc[ch] # noError) => GOTO truncated; ENDCASE; --version exchange and it's okay - do .get again ENDCASE => GOTO truncated; --wasn't a user connection END; END; ch.lastSST ¬ gotSST; --record the last sst we've seen bytesTransferred ¬ 0; --reset counter since we modified block REPEAT exit => sst ¬ ch.lastSST ¬ gotSST; --this is what the client sees truncated => {ch.lastSST ¬ gotSST; ERROR Courier.Error[truncatedTransfer]}; ENDLOOP; END; GetSST: Stream.GetSSTProcedure = BEGIN sH ¬ RealStream[sH]; --get the real stream RETURN[sH.getSST[sH]]; END; GetTimeout: Stream.GetTimeoutProcedure = BEGIN sH ¬ RealStream[sH]; --get the real stream RETURN[sH.getTimeout[sH]]; END; --GetTimeout Put: Stream.PutProcedure = BEGIN sH ¬ RealStream[sH]; --get the real stream sH.put[sH, block, endRecord]; END; --Put SendAttn: Stream.SendAttentionProcedure = BEGIN sH ¬ RealStream[sH]; --get the real stream sH.sendAttention[sH, byte]; END; --SendAttn SendNow: Stream.SendNowProcedure = BEGIN sH ¬ RealStream[sH]; --get the real stream sH.sendNow[sH, endRecord]; END; --SendNow SetSST: Stream.SetSSTProcedure = BEGIN sH ¬ RealStream[sH]; --get the real stream IF sst # CourierProtocol.dataSST THEN sH.setSST[sH, sst] ELSE ERROR Stream.InvalidOperation; END; SetTimeout: Stream.SetTimeoutProcedure = BEGIN sH ¬ RealStream[sH]; --get the real stream sH.setTimeout[sH, waitTime]; END; --SetTimeout RedirectStream: PUBLIC <> PROC [ ch: CourierInternal.ConnectionHandle, block: Stream.Block, bytesTransferred: CARDINAL, why: Stream.CompletionCode, sst: Stream.SubSequenceType] = BEGIN WITH h: ch SELECT FROM user, server => BEGIN rH: CourierInternal.Redirect ¬ h.redirected; IF rH # NIL THEN RETURN; --we've already done this rH ¬ h.redirected ¬ CourierInternal.longZone.NEW[ CourierInternal.RedirectedObject[bytesTransferred] ¬ [ directGet: h.transFilter.object.get, --save direct .get proc connection: @h, --the owning connection why: why, --the original completion code sst: sst, --and the observed sst block: , text: ]]; --copy the captured data into the object rH.block ¬ [LOOPHOLE[@rH.text], 0, bytesTransferred]; block.stopIndexPlusOne ¬ block.startIndex + bytesTransferred; [] ¬ ByteBlt.ByteBlt[to: rH.block, from: block]; h.transFilter.object.get ¬ RedirectedGet; --replace get routine END; ENDCASE; END; RedirectedGet: Stream.GetProcedure = BEGIN --sH points to the direct stream object with this new Get procedure. ch: CourierInternal.UserConnection ¬ LOOPHOLE[ LOOPHOLE[sH, CourierInternal.AugmentedStream].back]; rH: CourierInternal.Redirect ¬ ch.redirected; bytesTransferred ¬ ByteBlt.ByteBlt[from: rH.block, to: block]; --move data sst ¬ CourierProtocol.dataSST; why ¬ normal; --set default values rH.block.startIndex ¬ rH.block.startIndex + bytesTransferred; --consume ours block.startIndex ¬ block.startIndex + bytesTransferred; --fill his IF block.startIndex = block.stopIndexPlusOne THEN RETURN; --all normal IF rH.block.startIndex = rH.block.stopIndexPlusOne THEN BEGIN why ¬ rH.why; sst ¬ rH.sst; --he should get these ch.transFilter.object.get ¬ rH.directGet; --the original .get proc. CourierInternal.longZone.FREE[@ch.redirected]; --free up and NIL SELECT why FROM normal => BEGIN --his original get was shorter than this one bytes: CARDINAL; [bytes, why, sst] ¬ sH.get[sH, block, options]; --from real .get proc bytesTransferred ¬ bytesTransferred + bytes; --add these bytes on END; sstChange => BEGIN --original get ended in sst change - so should this one IF options.signalSSTChange THEN SIGNAL Stream.SSTChange[sst, block.startIndex]; END; attention => BEGIN --original get ended with attention - so should this one IF options.signalAttention THEN SIGNAL Stream.Attention[block.startIndex]; END; ENDCASE; END; END; SetBulkStream: PUBLIC PROC [ch: CourierInternal.ConnectionHandle] = BEGIN ch.object.sH ¬ @ch.bulkFilter.object; --that's the one the client gets ch.bulkFilter.back ¬ ch; --maintain back pointer to owning courier object ch.bulkFilter.object ¬ Stream.defaultObject; --just fill the slots << The following procedures are supported through the filtered stream. The other I/O procs (getByte, putWord, etc) are supported by the default stream via the Get and Put procs. >> ch.bulkFilter.object.get ¬ Get; ch.bulkFilter.object.put ¬ Put; ch.bulkFilter.object.setSST ¬ SetSST; ch.bulkFilter.object.getSST ¬ GetSST; ch.bulkFilter.object.getTimeout ¬ GetTimeout; ch.bulkFilter.object.sendNow ¬ SendNow; ch.bulkFilter.object.sendAttention ¬ SendAttn; ch.bulkFilter.object.setTimeout ¬ SetTimeout; ch.bulkFilter.object.waitAttention ¬ WaitAttn; END; --SetBulkStream WaitAttn: Stream.WaitAttentionProcedure = BEGIN sH ¬ RealStream[sH]; --get the real stream RETURN[sH.waitAttention[sH]]; END; --WaitAttn END.... --CourierImplF.mesa LOG 21-Jul-82 10:09:41 AOF Created file. 23-Jul-82 10:49:02 AOF Use raw stream procs when possible. 4-Aug-82 14:43:58 AOF Add SetStreamTimeout procedure. 11-Aug-82 10:02:33 AOF Add EXPORT Courier and pass Get's options to stream. 18-Aug-82 13:13:54 AOF Capture Courier data consumed by client. 29-Mar-83 13:50:48 AOF Klamath changes (default stream, ~unbound link). 6-Apr-83 13:52:07 AOF Klamath changes (stream.SetTimeout, etc). 29-Jun-84 10:23:39 SMA Factor Courier from NetworkStreams. 10-Jul-84 10:43:53 SMA Bulk data factoring. 21-Dec-84 13:12:23 SMA AugmentedStreams and new interfaces. 10-Jan-85 14:40:51 SMA SetBulkData does not create new object (it's in ch now). 14-Jan-86 11:37:14 AOF Cleanup of LOOPHOLES and filtered stream 4-Nov-87 12:23:50 AOF Trapping sst changes vs redirected streams, etc 8-Jan-88 15:43:39 AOF Allowing for Adobe's version of bulk data