-- File: CourierImplS.mesa - last edit: -- AOF 27-Oct-87 21:15:25 -- Copyright (C) 1984, 1985, 1986, 1987 by Xerox Corporation. All rights reserved. DIRECTORY Courier USING [ Arguments, Description, Dispatcher, Error, ErrorCode, InvalidArguments, LocalSystemElement, NoSuchProcedureNumber, Parameters, Results, SignalRemoteError, SystemElement, VersionRange], CourierInternal USING [ activeTimeout, AugmentedStream, Closed, doStats, ExchWords, inputOptions, ServerConnection, serverIdleTimeout, SetBulkStream, SetMessageProtocolVersion, stats, streamDefaultWaitTime], CourierOps USING [ CreateInternal, DeleteConnection, DeleteStream, Fetch, FlushToEndOfMessage, GetBlock, PutBlock, SearchForExport, Store], CourierProtocol USING [ dataSST, Protocol, Protocol3Body, ProtocolRange, pvHigh, RejectCode], Environment USING [Byte, bytesPerWord], NetworkStream USING [closeSST], NSConstants USING [courierSocket], Process USING [Abort, CancelAbort, GetCurrent], Stream USING [ Byte, CompletionCode, defaultInputOptions, Handle, InputOptions, SubSequenceType, TimeOut], System USING [NetworkAddress, nullNetworkNumber]; CourierImplS: PROGRAM IMPORTS Process, Courier, CourierInternal, CourierOps, Stream EXPORTS CourierInternal = BEGIN --Local Variables/Constants bpw: CARDINAL = Environment.bytesPerWord; exchangeVersionNumbersImmediately: BOOLEAN = TRUE; pvEqual: CourierProtocol.Protocol = CourierProtocol.pvHigh; IntersectingRanges: PROC[range: CourierProtocol.ProtocolRange] RETURNS[BOOLEAN] = INLINE {RETURN[pvEqual IN[range.low..range.high]]}; FlushOutOfBandAttn: PROC[sH: Stream.Handle] = BEGIN Process.Abort[Process.GetCurrent[]]; --UNTIL ABORTED-- DO [] ← sH.waitAttention[sH ! ABORTED => EXIT]; ENDLOOP; END; --FlushOutOfBandAttn Receiver: PUBLIC PROC [ aH: CourierInternal.AugmentedStream, remote: System.NetworkAddress, myPv: CourierProtocol.ProtocolRange, connectionless: BOOLEAN] = BEGIN arguments: Courier.Arguments = --PROCEDURE [argumentsRecord: Courier.Parameters]; --Remote program's dispatcher calls here to get the procedure arguments. BEGIN ENABLE BEGIN UNWIND => [] ← CourierOps.FlushToEndOfMessage[ch]; Stream.TimeOut => {error ← transportTimeout; GOTO stream}; END; IF (argumentsRecord.location # NIL) AND (argumentsRecord.description # NIL) THEN CourierOps.Store[ ch, argumentsRecord.location, argumentsRecord.description]; IF CourierOps.FlushToEndOfMessage[ch] THEN {error ← parameterInconsistency; GOTO stream}; --the bulkFilter object is already set - here we are just allowing the --client to use it. ch.streamState ← out; --until further notice ch.lastSST ← CourierProtocol.dataSST; --to filter ch.transFilter.object.options ← Stream.defaultInputOptions; ch.transFilter.object.setTimeout[ @ch.transFilter.object, CourierInternal.streamDefaultWaitTime]; EXITS stream => Courier.Error[error]; END; --arguments results: Courier.Results = --PROCEDURE [ -- resultsRecord: Courier.Parameters, -- requestDataStream: BOOLEAN] -- RETURNS [sH: Stream.Handle]; --Remote program's dispatcher calls here to get the procedure arguments. BEGIN ENABLE --Problems with the stream. The signal generated if the stream is --suspended will proprogate thru here to the client. Stream.TimeOut=> {error ← transportTimeout; GOTO transportError}; bytes: CARDINAL; sst: Stream.SubSequenceType; status: Stream.CompletionCode; specialOptions: Stream.InputOptions = [ terminateOnEndRecord: TRUE, signalLongBlock: FALSE, signalShortBlock: FALSE, signalSSTChange: FALSE, signalEndOfStream: FALSE, signalAttention: FALSE, signalTimeout: FALSE, signalEndRecord:FALSE]; trash: PACKED ARRAY INTEGER[0..2) OF Environment.Byte; sH ← @ch.transFilter.object; ch.streamState ← busy; Process.CancelAbort[ch.owner]; --just in case client ... sH.setSST[sH, CourierProtocol.dataSST]; --in case he didn't --UNTIL EXIT | ERROR-- DO sH.setTimeout[sH, 0]; --set very short timeout [bytes, status, sst] ← sH.get[ sH, [@trash, 0, LENGTH[trash]], specialOptions]; sH.setTimeout[sH, CourierInternal.activeTimeout]; SELECT TRUE FROM (status = timeout) => EXIT; --this is as good as it gets (status = attention) => FlushOutOfBandAttn[sH]; --and this is as bad (sst = NetworkStream.closeSST) => --caller tired of waiting {streamClosed ← TRUE; ERROR Courier.Error[callerAborted]}; (sst # CourierProtocol.dataSST) => NULL; --leftover from bulk data? (bytes > 0) => ERROR Courier.Error[parameterInconsistency]; ENDCASE; ENDLOOP; IF ~exchangeVersionNumbersImmediately THEN CourierOps.PutBlock[ --send the protocol version numbers ch, [@myPv, 0, SIZE[CourierProtocol.ProtocolRange] * bpw]]; WITH ch.message SELECT FROM protocol3 => BEGIN protocol3Body ← [return[tid]]; CourierOps.PutBlock[ ch, [@protocol3Body, 0, SIZE[return CourierProtocol.Protocol3Body] * bpw]]; END; ENDCASE; IF CourierInternal.doStats THEN CourierInternal.stats[returnsTransmitted] ← CourierInternal.stats[returnsTransmitted] + 1; IF (resultsRecord.location # NIL) AND (resultsRecord.description # NIL) THEN CourierOps.Fetch[ ch, resultsRecord.location, resultsRecord.description]; sH.sendNow[sH, TRUE]; IF requestDataStream THEN BEGIN --the bulkFilter.object has already been set. ch.streamState ← out; sH.setTimeout[sH, CourierInternal.streamDefaultWaitTime]; ch.object.sH ← sH; END ELSE sH ← NIL; EXITS transportError => Courier.Error[error]; END; --results tid: CARDINAL; streamClosed: BOOLEAN ← FALSE; error: Courier.ErrorCode ← noError; ch: CourierInternal.ServerConnection; BEGIN ENABLE Stream.TimeOut, CourierInternal.Closed => GOTO error; procedureNumber: CARDINAL; range: Courier.VersionRange; dispatcher: Courier.Dispatcher; sH: Stream.Handle; --The transport client has created a transport for us, now create the --connection object and set the back pointer in the augmented stream object. ch ← LOOPHOLE[CourierOps.CreateInternal[server, remote]]; ch.streamState ← busy; --it will be real soon ch.transFilter ← aH; ch.transFilter.back ← ch; ch.object.remote ← remote; ch.owner ← Process.GetCurrent[]; CourierInternal.SetBulkStream[ch]; sH ← @ch.transFilter.object; ch.object.remote.socket ← NSConstants.courierSocket; BEGIN << Protocol version arbitration. There doesn't seem to be any way to piggyback our version numbers with the return message safely. The version numbers have to go before the bulk data, but what if the bulk data is sent to us in the first call of a brand new stream? The server here can't tell that the client is going to do that. If he's sending (i.e., the other end is receiving) then he is obligated to change the SST, thus flushing the version numbers. This is the pits! Let's just force out the damn things and forget it. Ya, it may be the pits, and it also has to be done. [AOF 26-Oct-87 9:21:58] >> ENABLE Courier.Error => GOTO error; --just can't use this stream CourierOps.GetBlock[ --be careful, we haven't set input options ch, [@ch.protocolRange, 0, SIZE[CourierProtocol.ProtocolRange] * bpw]]; IF exchangeVersionNumbersImmediately OR (ch.protocolRange.low # pvEqual) OR (ch.protocolRange.high # pvEqual) THEN BEGIN CourierOps.PutBlock[ ch, [@myPv, 0, SIZE[CourierProtocol.ProtocolRange] * bpw]]; sH.sendNow[sH, FALSE]; --force the packet out - no EOM set END; IF ~IntersectingRanges[ch.protocolRange] THEN GOTO delete; --failure CourierInternal.SetMessageProtocolVersion[ch, pvEqual]; END; DO --until connection ends via close or timeout. ENABLE Courier.Error => IF errorCode = parameterInconsistency THEN RETRY ELSE GOTO error; Process.CancelAbort[ch.owner]; --just in case client ... IF streamClosed THEN GOTO error; --caller aborted (in receiver) ch.endRecord ← FALSE; ch.streamState ← busy; --set state of stream sH.setTimeout[sH, CourierInternal.serverIdleTimeout]; sH.options ← CourierInternal.inputOptions; sH.setSST[sH, CourierProtocol.dataSST]; WITH ch.message SELECT FROM protocol3 => BEGIN CourierOps.GetBlock[ ch, [@protocol3Body, 0, SIZE[call CourierProtocol.Protocol3Body] * bpw]]; WITH protocol3Body SELECT FROM call => BEGIN ch.object.programNumber ← CourierInternal.ExchWords[program]; ch.object.versionNumber ← version; procedureNumber ← procedure; tid ← transaction; END; --anything else is invalid ENDCASE => {[] ← CourierOps.FlushToEndOfMessage[ch]; LOOP}; END; ENDCASE; --now we have something to do sH.setTimeout[sH, CourierInternal.activeTimeout]; IF CourierInternal.doStats THEN BEGIN CourierInternal.stats[callsReceived] ← CourierInternal.stats[callsReceived] + 1; IF (ch.object.remote.net = System.nullNetworkNumber) OR (ch.object.remote.net = Courier.LocalSystemElement[].net) THEN CourierInternal.stats[localCallsReceived] ← CourierInternal.stats[localCallsReceived] + 1; END; [dispatcher, range] ← CourierOps.SearchForExport[ch]; IF dispatcher = NIL THEN {SendReject[ch, range, noSuchProgramNumber, tid]; LOOP}; IF ch.object.versionNumber ~IN[range.low..range.high] THEN {SendReject[ch, range, noSuchVersionNumber, tid]; LOOP}; --let client code take over dispatcher[@ch.object, procedureNumber, arguments, results ! Courier.NoSuchProcedureNumber => {SendReject[ch, range, noSuchProcedureValue, tid]; CONTINUE}; Courier.InvalidArguments => {SendReject[ch, range, invalidArguments, tid]; CONTINUE}; --Don't UNWIND yet - client may have local frames of interest Courier.SignalRemoteError => {SendAbort[ch, errorNumber, tid, arguments]; CONTINUE}]; --delete stream if transport is connectionless. IF connectionless THEN GOTO delete; ENDLOOP; EXITS delete, error => BEGIN CourierOps.DeleteStream[ch]; CourierOps.DeleteConnection[ch]; END; END; END; --Receiver SendReject: PROCEDURE [ ch: CourierInternal.ServerConnection, range: Courier.VersionRange, rejectCode: CourierProtocol.RejectCode, tid: CARDINAL] = BEGIN --Send error message to the remote found in the connection specified. --These errors are discovered short of receiving the data from the link, --so the link is in an unknown state requiring we flush the input --stream's data. ENABLE Stream.TimeOut, Courier.Error => --Can only be a stream problem. GOTO error; msgSize: CARDINAL; msgAddr: LONG POINTER; IF CourierInternal.doStats THEN CourierInternal.stats[rejectsTransmitted] ← CourierInternal.stats[rejectsTransmitted] + 1; WITH ch.message SELECT FROM protocol3 => BEGIN msgAddr ← @protocol3Body; SELECT rejectCode FROM noSuchProgramNumber => BEGIN protocol3Body ← [reject[tid, noSuchProgramNumber[]]]; msgSize ← SIZE[ noSuchProgramNumber reject CourierProtocol.Protocol3Body] * bpw; END; noSuchProcedureValue => BEGIN protocol3Body ← [reject[tid, noSuchProcedureValue[]]]; msgSize ← SIZE[ noSuchProcedureValue reject CourierProtocol.Protocol3Body] * bpw; END; invalidArguments => BEGIN protocol3Body ← [reject[tid, invalidArguments[]]]; msgSize ← SIZE[ invalidArguments reject CourierProtocol.Protocol3Body] * bpw; END; noSuchVersionNumber => BEGIN protocol3Body ← [reject[tid, noSuchVersionNumber[range]]]; msgSize ← SIZE[ noSuchVersionNumber reject CourierProtocol.Protocol3Body] * bpw; END; ENDCASE; END; ENDCASE; [] ← CourierOps.FlushToEndOfMessage[ch]; CourierOps.PutBlock[ch, [msgAddr, 0, msgSize]]; ch.transFilter.object.sendNow[@ch.transFilter.object, TRUE]; --Ignore problems with the stream. They get noticed on the next get. EXITS error => NULL; END; --SendReject SendAbort: PROCEDURE [ ch: CourierInternal.ServerConnection, abortCode: CARDINAL, tid: CARDINAL, arguments: Courier.Parameters] = BEGIN --Send error message to the remote found in the connection specified. --This is an error generated by the remote program. The stream will(?) --survive as it is. ENABLE Stream.TimeOut, Courier.Error => --Can only be a stream problem. GOTO error; IF CourierInternal.doStats THEN CourierInternal.stats[abortsTransmitted] ← CourierInternal.stats[abortsTransmitted] + 1; ch.streamState ← busy; ch.transFilter.object.setSST[@ch.transFilter.object, CourierProtocol.dataSST]; WITH ch.message SELECT FROM protocol3 => BEGIN protocol3Body ← [abort[tid, abortCode]]; CourierOps.PutBlock[ch, [@protocol3Body, 0, SIZE[abort CourierProtocol.Protocol3Body] * bpw]]; END; ENDCASE; IF (arguments.location # NIL) AND (arguments.description # NIL) THEN CourierOps.Fetch[ch, arguments.location, arguments.description]; ch.transFilter.object.sendNow[@ch.transFilter.object, TRUE]; EXITS error => NULL; --Ignore problems with the stream. They will be --noticed on the next operation. END; --SendAbort END..... --of CourierImplS.mesa LOG 4-Feb-85 18:34:44 AOF Post Klamath. 29-Jun-84 10:25:01 SMA Factor Courier from NetworkStreams. 28-Nov-84 16:39:01 SMA Delete stream after return if connectionless. 28-Nov-84 16:39:01 SMA AugmentedStreams and new interfaces. 31-Dec-84 8:32:50 AOF SetSST = dataSST before sending results. 10-Jan-85 14:24:23 SMA bulkFilter set when connection and transport created. 10-Jan-85 17:48:06 SMA Delete stream if protocol versions don't intersect. 4-Feb-85 18:35:07 AOF More care about RETRY'ng Courier.Error. 21-Sep-85 12:03:39 AOF Allow bulk data on stream in 'results'. 20-Mar-86 15:43:28 AOF Force version # exchange ~piggybacked 8-Jun-86 17:21:41 AOF Undo ~piggyback version numbers 12-Nov-86 15:36:47 AOF FlushToEOS on UNWIND in arguments proc 16-Jan-87 10:14:31 AOF Removal of Courier Version 2.0 26-Oct-87 11:19:49 AOF And hopefully making it right (sorting version exchs)