-- File: CourierImplU.mesa - last edit:
-- AOF 27-Oct-87 21:15:13
-- Copyright (C) 1984, 1985, 1986, 1987 by Xerox Corporation. All rights reserved.
DIRECTORY
Courier USING [
Arguments, Description, Error, ErrorCode, Handle, InvalidArguments,
LocalSystemElement, Parameters, RemoteErrorSignalled, VersionMismatch,
VersionRange],
CourierInternal USING [
activeTimeout, AugmentedStream, Closed, CreateDefaultStream, doStats,
ExchWords, inputOptions, RegisterTransport, seal, VersExchProcType,
SetBulkStream, SetMessageProtocolVersion, stats, streamDefaultWaitTime,
ttDefault, UserConnection],
CourierOps USING [
DeleteStream, Fetch, FlushToEndOfMessage, GetBlock, PutBlock,
SetIdleWatcher, Store],
CourierProtocol USING [
dataSST, Protocol, Protocol3Body, ProtocolRange, pvHigh, RejectCode],
Environment USING [bytesPerWord],
Process USING [CancelAbort, GetCurrent],
Stream USING [Handle, TimeOut],
System USING [GetGreenwichMeanTime, nullNetworkNumber];
CourierImplU: PROGRAM
IMPORTS Courier, CourierInternal, CourierOps, Process, Stream, System
EXPORTS Courier =
BEGIN --Implementation of the Courier User.
--LOCAL VARIABLES/CONSTANTS
uniqueTransaction: CARDINAL = 0;
bpw: CARDINAL = Environment.bytesPerWord;
maxSecondsWaitable: LONG CARDINAL = LAST[LONG CARDINAL] / 1000;
pvEqual: CourierProtocol.Protocol = CourierProtocol.pvHigh;
Call: PUBLIC PROC[
cH: Courier.Handle, procedureNumber: CARDINAL,
arguments, results: Courier.Parameters, timeoutInSeconds: LONG CARDINAL,
requestDataStream: BOOLEAN, streamCheckoutProc: PROC[cH: Courier.Handle]]
RETURNS[sH: Stream.Handle] =
BEGIN
versExchDone: BOOLEAN ← TRUE;
error: Courier.ErrorCode ← noError;
ch: CourierInternal.UserConnection ← LOOPHOLE[cH];
connectionless: BOOLEAN ← FALSE; --delete stream after call?
SELECT TRUE FROM
(ch = NIL), (ch.seal # CourierInternal.seal) =>
RETURN WITH ERROR Courier.Error[invalidHandle];
(ch.streamState = busy), (ch.streamState = out) =>
RETURN WITH ERROR Courier.Error[streamNotYours];
ENDCASE;
BEGIN
ENABLE
BEGIN
UNWIND => CourierOps.SetIdleWatcher[ch]; --start clocking the stream
Courier.Error =>
--delete the stream on those errors that leave a broken stream around
SELECT errorCode FROM
transportTimeout => CourierOps.DeleteStream[ch];
noRouteToSystemElement => CourierOps.DeleteStream[ch];
ENDCASE;
Stream.TimeOut =>
--timeout waiting for results or processing same
BEGIN
error ← IF (System.GetGreenwichMeanTime[] < ch.clock) THEN
transportTimeout ELSE returnTimedOut;
GOTO delete; --gun the stream in any case
END;
--signal raised locally; translate to Error
Courier.InvalidArguments => {error ← invalidArguments; GOTO delete};
--cases where server closed stream | pseudo-version mismatch
CourierInternal.Closed => {CourierOps.DeleteStream[ch]; RETRY};
END;
ch.owner ← Process.GetCurrent[]; --record current owner
ch.streamState ← busy; --and lock the stream
IF (ch.transFilter = NIL) THEN
BEGIN
--set up default transport if nobody has registered one for this connection.
IF ch.createTransport = NIL THEN CourierInternal.RegisterTransport[
cH, CourierInternal.CreateDefaultStream, CourierInternal.ttDefault];
[ch.transFilter, ch.protocolRange, connectionless] ← ch.createTransport[ch];
<<
Protocol version arbitration. The transport filter tells us what our
low and high is, because transports that are not connection-oriented need
to piggyback the version numbers (which can only be done if low = high).
>>
CourierOps.PutBlock[
ch, [@ch.protocolRange, 0, SIZE[CourierProtocol.ProtocolRange] * bpw]];
CourierInternal.SetMessageProtocolVersion[ch, pvEqual]; --has to be
CourierInternal.SetBulkStream[ch]; --build up the bulk stream
ch.versExchProc ← VersExchProc; --proc to deal with version exchanges
END;
sH ← @ch.transFilter.object;
--Transmit call message
WITH ch.message SELECT FROM
protocol3 =>
BEGIN
protocol3Body ← [call[
transaction: uniqueTransaction,
program: CourierInternal.ExchWords[ch.object.programNumber],
version: ch.object.versionNumber, procedure: procedureNumber]];
CourierOps.PutBlock[ch,
[@protocol3Body, 0, SIZE[call CourierProtocol.Protocol3Body] * bpw]];
END;
ENDCASE;
--Append client arguments
IF (arguments.location # NIL) AND (arguments.description # NIL) THEN
CourierOps.Fetch[ch, arguments.location, arguments.description];
sH.sendNow[sH, TRUE]; --transmit and set endRecord
IF CourierInternal.doStats THEN
BEGIN
CourierInternal.stats[callsTransmitted] ←
CourierInternal.stats[callsTransmitted] + 1;
IF (ch.object.remote.net = System.nullNetworkNumber) OR
(ch.object.remote.net = Courier.LocalSystemElement[].net) THEN
CourierInternal.stats[localCallsTransmitted] ←
CourierInternal.stats[localCallsTransmitted] + 1;
END;
--Bulk data, version 2
IF streamCheckoutProc # NIL THEN
BEGIN
<<
Bulk stream is already set in the object, here we are just allowing
client to use it. This is really a crock!
>>
ch.streamState ← out; ch.lastSST ← CourierProtocol.dataSST;
sH.setTimeout[sH, CourierInternal.streamDefaultWaitTime];
streamCheckoutProc[cH];
Process.CancelAbort[Process.GetCurrent[]];
ch.streamState ← busy;
sH.setSST[sH, CourierProtocol.dataSST];
END;
sH.options ← CourierInternal.inputOptions;
IF timeoutInSeconds > maxSecondsWaitable THEN
BEGIN
ch.clock ← [LAST[LONG CARDINAL]];
sH.setTimeout[sH, LAST[LONG CARDINAL]];
END
ELSE
BEGIN
ch.clock ← [System.GetGreenwichMeanTime[] + timeoutInSeconds];
sH.setTimeout[sH, timeoutInSeconds * 1000];
END;
<<
IF ch.versExchProc is NIL then the version exchange is complete. If it
isn't then we still have to deal with this situation.
>>
SELECT TRUE FROM
(ch.versExchProc = NIL) => NULL; --this has already been done
((error ← ch.versExchProc[ch]) # noError) => GOTO delete;
ENDCASE; --if it endcased then it passed the version test
BEGIN
ENABLE ABORTED => GOTO aborted;
--Wait for return message to arrive
ch.endRecord ← FALSE; --just in case it was left over
WITH ch.message SELECT FROM
protocol3 =>
BEGIN
CourierOps.GetBlock[ch, [@protocol3Body, 0, 4]];
ch.clock ← [LAST[LONG CARDINAL]]; --disallow 'returnTimedOut'
sH.setTimeout[sH, CourierInternal.activeTimeout];
WITH p3b: protocol3Body SELECT FROM
return => NULL;
abort => P3Abort[ch, @p3b];
reject => P3Reject[ch, @p3b];
ENDCASE => ERROR Courier.Error[invalidMessage];
END;
ENDCASE;
END; --enable aborted
--only return message types get here...all others signal and never return
IF CourierInternal.doStats THEN
CourierInternal.stats[returnsReceived] ←
CourierInternal.stats[returnsReceived] + 1;
--now receive and store the client's results
IF (results.location # NIL) AND (results.description # NIL) THEN
CourierOps.Store[ch, results.location, results.description !
UNWIND => [] ← CourierOps.FlushToEndOfMessage[ch]];
IF CourierOps.FlushToEndOfMessage[ch] THEN
ERROR Courier.Error[parameterInconsistency];
--Bulk data, version 1
IF requestDataStream THEN
BEGIN --the bulkFilter.object has already been set.
ch.streamState ← out;
sH.setTimeout[sH, CourierInternal.streamDefaultWaitTime];
ch.object.sH ← sH;
END
ELSE
BEGIN
--transport is not connection oriented, delete stream now.
IF connectionless THEN CourierOps.DeleteStream[ch]
ELSE CourierOps.SetIdleWatcher[ch]; --idles the stream
sH ← NIL; --client gets nothing if he doesn't ask
END;
EXITS
delete => {CourierOps.DeleteStream[ch]; ERROR Courier.Error[error]};
aborted => {CourierOps.DeleteStream[ch]; ERROR ABORTED};
END; --encompasses entire module
END; --Call
VersExchProc: CourierInternal.VersExchProcType =
BEGIN
hisRange: CourierProtocol.ProtocolRange;
ch.endRecord ← FALSE; --just like starting out fresh
CourierOps.GetBlock[
ch, [@hisRange, 0, SIZE[CourierProtocol.ProtocolRange] * bpw]];
code ← IF pvEqual IN[hisRange.low..hisRange.high]
THEN noError ELSE protocolMismatch;
ch.endRecord ← FALSE; --just in case he sent it with the version #s
NARROW[ch, CourierInternal.UserConnection].versExchProc ← NIL; --fini
END; --VersExchProc
P3Abort: PROC [
ch: CourierInternal.UserConnection,
abort: LONG POINTER TO abort CourierProtocol.Protocol3Body] =
BEGIN
<<
We have received an "abort" response from the remote procedure. We have
to translate that to an ERROR RemoteErrorSignalled and allow the client to
extract the data from the error. Once the parameters are set up, it looks
just like a normal return.
>>
AbortDescription: Courier.Arguments =
--PROC[argumentsRecord: Courier.Parameters];
BEGIN
IF (argumentsRecord.location # NIL)
AND (argumentsRecord.description # NIL) THEN
CourierOps.Store[
ch, argumentsRecord.location, argumentsRecord.description];
IF CourierOps.FlushToEndOfMessage[ch] THEN
ERROR Courier.Error[parameterInconsistency];
END;
--Get the remainder of the abort message.
CourierOps.GetBlock[ch, [@abort.abort, 0, SIZE[CARDINAL]*bpw]];
IF CourierInternal.doStats THEN
CourierInternal.stats[abortsReceived] ←
CourierInternal.stats[abortsReceived] + 1;
ERROR Courier.RemoteErrorSignalled[abort.abort, AbortDescription];
END;
P3Reject: PROC[
ch: CourierInternal.UserConnection,
reject: LONG POINTER TO reject CourierProtocol.Protocol3Body] =
BEGIN
CourierOps.GetBlock[ch,
[@reject.rejectBody, 0, (SIZE[CourierProtocol.RejectCode] * bpw)]];
WITH v: reject SELECT FROM --just how ugly can we make this?
noSuchVersionNumber => --how many times would we like to test these bits?
CourierOps.GetBlock[ch, --and fill in a couple more bytes of data
[@v.range, 0, (SIZE[Courier.VersionRange] * bpw)]];
ENDCASE;
[] ← CourierOps.FlushToEndOfMessage[ch]; --was there anything left?
IF CourierInternal.doStats THEN
CourierInternal.stats[rejectsReceived] ←
CourierInternal.stats[rejectsReceived] + 1;
WITH rb: reject SELECT FROM --finally raise the error
noSuchProgramNumber => ERROR Courier.Error[noSuchProgramNumber];
noSuchVersionNumber => ERROR Courier.VersionMismatch[rb.range];
noSuchProcedureValue => ERROR Courier.Error[noSuchProcedureNumber];
invalidArguments => ERROR Courier.Error[invalidArguments];
ENDCASE => ERROR Courier.Error[unknownErrorInRemoteProcedure];
END;
END..... -- of CourierImplU.mesa
LOG
30-Jan-85 19:55:55 AOF Post Klamath.
29-Jun-84 10:23:39 SMA Factor Courier from NetworkStreams.
2-Jul-84 16:33:35 SMA Get rid of ch.sH.
9-Oct-84 12:59:06 SMA Remove dependency on NSTypes.WaitTime.
26-Nov-84 15:08:21 SMA Version arbitration back into Courier.
29-Nov-84 9:32:57 SMA Delete stream after call if transport is connectionless.
21-Dec-84 13:08:06 SMA AugmentedStreams and new interfaces.
31-Dec-84 8:29:12 AOF SetSST before placing call on old transport.
7-Jan-85 17:05:04 SMA No more callerAborted translating.
10-Jan-85 14:36:21 SMA bulkFilter set when transport created.
30-Jan-85 19:56:07 AOF Code byte squeezing.
5-Feb-85 18:06:48 SMA Don't try to delete stream when error is invalidHandle.
12-Mar-85 10:57:28 AOF Moved createTimeout and hopWeight to CourierImplM.
8-Apr-85 16:59:47 AOF Delete stream when client aborts waiting for results.
31-May-85 16:50:14 AOF Check Courier.errorCode for parmInc before delete sH.
12-Nov-86 15:30:23 AOF P3.Abort: First get the error code, then test it
16-Jan-87 10:14:10 AOF Removal of Courier Version 2.0
23-Oct-87 19:03:14 AOF And then try to figure out a way to support vers exch.