-- File: CourierImplS.mesa - last edit:
-- AOF 27-Oct-87 21:15:25
-- Copyright (C) 1984, 1985, 1986, 1987 by Xerox Corporation. All rights reserved.
DIRECTORY
Courier USING [
Arguments, Description, Dispatcher, Error, ErrorCode,
InvalidArguments, LocalSystemElement, NoSuchProcedureNumber,
Parameters, Results, SignalRemoteError, SystemElement, VersionRange],
CourierInternal USING [
activeTimeout, AugmentedStream, Closed, doStats, ExchWords,
inputOptions, ServerConnection, serverIdleTimeout, SetBulkStream,
SetMessageProtocolVersion, stats, streamDefaultWaitTime],
CourierOps USING [
CreateInternal, DeleteConnection, DeleteStream, Fetch,
FlushToEndOfMessage, GetBlock, PutBlock, SearchForExport, Store],
CourierProtocol USING [
dataSST, Protocol, Protocol3Body, ProtocolRange, pvHigh, RejectCode],
Environment USING [Byte, bytesPerWord],
NetworkStream USING [closeSST],
NSConstants USING [courierSocket],
Process USING [Abort, CancelAbort, GetCurrent],
Stream USING [
Byte, CompletionCode, defaultInputOptions, Handle, InputOptions,
SubSequenceType, TimeOut],
System USING [NetworkAddress, nullNetworkNumber];
CourierImplS: PROGRAM
IMPORTS Process, Courier, CourierInternal, CourierOps, Stream
EXPORTS CourierInternal =
BEGIN
--Local Variables/Constants
bpw: CARDINAL = Environment.bytesPerWord;
exchangeVersionNumbersImmediately: BOOLEAN = TRUE;
pvEqual: CourierProtocol.Protocol = CourierProtocol.pvHigh;
IntersectingRanges: PROC[range: CourierProtocol.ProtocolRange]
RETURNS[BOOLEAN] = INLINE {RETURN[pvEqual IN[range.low..range.high]]};
FlushOutOfBandAttn: PROC[sH: Stream.Handle] =
BEGIN
Process.Abort[Process.GetCurrent[]];
--UNTIL ABORTED-- DO
[] ← sH.waitAttention[sH ! ABORTED => EXIT]; ENDLOOP;
END; --FlushOutOfBandAttn
Receiver: PUBLIC PROC [
aH: CourierInternal.AugmentedStream, remote: System.NetworkAddress,
myPv: CourierProtocol.ProtocolRange, connectionless: BOOLEAN] =
BEGIN
arguments: Courier.Arguments =
--PROCEDURE [argumentsRecord: Courier.Parameters];
--Remote program's dispatcher calls here to get the procedure arguments.
BEGIN
ENABLE
BEGIN
UNWIND => [] ← CourierOps.FlushToEndOfMessage[ch];
Stream.TimeOut => {error ← transportTimeout; GOTO stream};
END;
IF (argumentsRecord.location # NIL)
AND (argumentsRecord.description # NIL) THEN
CourierOps.Store[
ch, argumentsRecord.location, argumentsRecord.description];
IF CourierOps.FlushToEndOfMessage[ch] THEN
{error ← parameterInconsistency; GOTO stream};
--the bulkFilter object is already set - here we are just allowing the
--client to use it.
ch.streamState ← out; --until further notice
ch.lastSST ← CourierProtocol.dataSST; --to filter
ch.transFilter.object.options ← Stream.defaultInputOptions;
ch.transFilter.object.setTimeout[
@ch.transFilter.object, CourierInternal.streamDefaultWaitTime];
EXITS stream => Courier.Error[error];
END; --arguments
results: Courier.Results =
--PROCEDURE [
-- resultsRecord: Courier.Parameters,
-- requestDataStream: BOOLEAN]
-- RETURNS [sH: Stream.Handle];
--Remote program's dispatcher calls here to get the procedure arguments.
BEGIN
ENABLE
--Problems with the stream. The signal generated if the stream is
--suspended will proprogate thru here to the client.
Stream.TimeOut=> {error ← transportTimeout; GOTO transportError};
bytes: CARDINAL;
sst: Stream.SubSequenceType;
status: Stream.CompletionCode;
specialOptions: Stream.InputOptions = [
terminateOnEndRecord: TRUE, signalLongBlock: FALSE,
signalShortBlock: FALSE, signalSSTChange: FALSE,
signalEndOfStream: FALSE, signalAttention: FALSE,
signalTimeout: FALSE, signalEndRecord:FALSE];
trash: PACKED ARRAY INTEGER[0..2) OF Environment.Byte;
sH ← @ch.transFilter.object; ch.streamState ← busy;
Process.CancelAbort[ch.owner]; --just in case client ...
sH.setSST[sH, CourierProtocol.dataSST]; --in case he didn't
--UNTIL EXIT | ERROR-- DO
sH.setTimeout[sH, 0]; --set very short timeout
[bytes, status, sst] ← sH.get[
sH, [@trash, 0, LENGTH[trash]], specialOptions];
sH.setTimeout[sH, CourierInternal.activeTimeout];
SELECT TRUE FROM
(status = timeout) => EXIT; --this is as good as it gets
(status = attention) => FlushOutOfBandAttn[sH]; --and this is as bad
(sst = NetworkStream.closeSST) => --caller tired of waiting
{streamClosed ← TRUE; ERROR Courier.Error[callerAborted]};
(sst # CourierProtocol.dataSST) => NULL; --leftover from bulk data?
(bytes > 0) => ERROR Courier.Error[parameterInconsistency];
ENDCASE;
ENDLOOP;
IF ~exchangeVersionNumbersImmediately THEN
CourierOps.PutBlock[ --send the protocol version numbers
ch, [@myPv, 0, SIZE[CourierProtocol.ProtocolRange] * bpw]];
WITH ch.message SELECT FROM
protocol3 =>
BEGIN
protocol3Body ← [return[tid]];
CourierOps.PutBlock[
ch, [@protocol3Body, 0,
SIZE[return CourierProtocol.Protocol3Body] * bpw]];
END;
ENDCASE;
IF CourierInternal.doStats THEN
CourierInternal.stats[returnsTransmitted] ←
CourierInternal.stats[returnsTransmitted] + 1;
IF (resultsRecord.location # NIL) AND (resultsRecord.description # NIL) THEN
CourierOps.Fetch[
ch, resultsRecord.location, resultsRecord.description];
sH.sendNow[sH, TRUE];
IF requestDataStream THEN
BEGIN --the bulkFilter.object has already been set.
ch.streamState ← out;
sH.setTimeout[sH, CourierInternal.streamDefaultWaitTime];
ch.object.sH ← sH;
END
ELSE sH ← NIL;
EXITS transportError => Courier.Error[error];
END; --results
tid: CARDINAL;
streamClosed: BOOLEAN ← FALSE;
error: Courier.ErrorCode ← noError;
ch: CourierInternal.ServerConnection;
BEGIN
ENABLE Stream.TimeOut, CourierInternal.Closed => GOTO error;
procedureNumber: CARDINAL;
range: Courier.VersionRange;
dispatcher: Courier.Dispatcher;
sH: Stream.Handle;
--The transport client has created a transport for us, now create the
--connection object and set the back pointer in the augmented stream object.
ch ← LOOPHOLE[CourierOps.CreateInternal[server, remote]];
ch.streamState ← busy; --it will be real soon
ch.transFilter ← aH;
ch.transFilter.back ← ch;
ch.object.remote ← remote;
ch.owner ← Process.GetCurrent[];
CourierInternal.SetBulkStream[ch];
sH ← @ch.transFilter.object;
ch.object.remote.socket ← NSConstants.courierSocket;
BEGIN
<<
Protocol version arbitration.
There doesn't seem to be any way to piggyback our version numbers with
the return message safely. The version numbers have to go before the
bulk data, but what if the bulk data is sent to us in the first call of
a brand new stream? The server here can't tell that the client is going
to do that. If he's sending (i.e., the other end is receiving) then he
is obligated to change the SST, thus flushing the version numbers.
This is the pits! Let's just force out the damn things and forget it.
Ya, it may be the pits, and it also has to be done. [AOF 26-Oct-87 9:21:58]
>>
ENABLE Courier.Error => GOTO error; --just can't use this stream
CourierOps.GetBlock[ --be careful, we haven't set input options
ch, [@ch.protocolRange, 0, SIZE[CourierProtocol.ProtocolRange] * bpw]];
IF exchangeVersionNumbersImmediately OR
(ch.protocolRange.low # pvEqual) OR
(ch.protocolRange.high # pvEqual) THEN
BEGIN
CourierOps.PutBlock[
ch, [@myPv, 0, SIZE[CourierProtocol.ProtocolRange] * bpw]];
sH.sendNow[sH, FALSE]; --force the packet out - no EOM set
END;
IF ~IntersectingRanges[ch.protocolRange] THEN GOTO delete; --failure
CourierInternal.SetMessageProtocolVersion[ch, pvEqual];
END;
DO --until connection ends via close or timeout.
ENABLE Courier.Error =>
IF errorCode = parameterInconsistency THEN RETRY ELSE GOTO error;
Process.CancelAbort[ch.owner]; --just in case client ...
IF streamClosed THEN GOTO error; --caller aborted (in receiver)
ch.endRecord ← FALSE; ch.streamState ← busy; --set state of stream
sH.setTimeout[sH, CourierInternal.serverIdleTimeout];
sH.options ← CourierInternal.inputOptions;
sH.setSST[sH, CourierProtocol.dataSST];
WITH ch.message SELECT FROM
protocol3 =>
BEGIN
CourierOps.GetBlock[
ch, [@protocol3Body, 0,
SIZE[call CourierProtocol.Protocol3Body] * bpw]];
WITH protocol3Body SELECT FROM
call =>
BEGIN
ch.object.programNumber ← CourierInternal.ExchWords[program];
ch.object.versionNumber ← version;
procedureNumber ← procedure;
tid ← transaction;
END;
--anything else is invalid
ENDCASE => {[] ← CourierOps.FlushToEndOfMessage[ch]; LOOP};
END;
ENDCASE;
--now we have something to do
sH.setTimeout[sH, CourierInternal.activeTimeout];
IF CourierInternal.doStats THEN
BEGIN
CourierInternal.stats[callsReceived] ←
CourierInternal.stats[callsReceived] + 1;
IF (ch.object.remote.net = System.nullNetworkNumber) OR
(ch.object.remote.net = Courier.LocalSystemElement[].net) THEN
CourierInternal.stats[localCallsReceived] ←
CourierInternal.stats[localCallsReceived] + 1;
END;
[dispatcher, range] ← CourierOps.SearchForExport[ch];
IF dispatcher = NIL THEN
{SendReject[ch, range, noSuchProgramNumber, tid]; LOOP};
IF ch.object.versionNumber ~IN[range.low..range.high] THEN
{SendReject[ch, range, noSuchVersionNumber, tid]; LOOP};
--let client code take over
dispatcher[@ch.object, procedureNumber, arguments, results !
Courier.NoSuchProcedureNumber =>
{SendReject[ch, range, noSuchProcedureValue, tid]; CONTINUE};
Courier.InvalidArguments =>
{SendReject[ch, range, invalidArguments, tid]; CONTINUE};
--Don't UNWIND yet - client may have local frames of interest
Courier.SignalRemoteError =>
{SendAbort[ch, errorNumber, tid, arguments]; CONTINUE}];
--delete stream if transport is connectionless.
IF connectionless THEN GOTO delete;
ENDLOOP;
EXITS delete, error =>
BEGIN
CourierOps.DeleteStream[ch];
CourierOps.DeleteConnection[ch];
END;
END;
END; --Receiver
SendReject: PROCEDURE [
ch: CourierInternal.ServerConnection,
range: Courier.VersionRange,
rejectCode: CourierProtocol.RejectCode,
tid: CARDINAL] =
BEGIN
--Send error message to the remote found in the connection specified.
--These errors are discovered short of receiving the data from the link,
--so the link is in an unknown state requiring we flush the input
--stream's data.
ENABLE Stream.TimeOut, Courier.Error => --Can only be a stream problem.
GOTO error;
msgSize: CARDINAL;
msgAddr: LONG POINTER;
IF CourierInternal.doStats THEN
CourierInternal.stats[rejectsTransmitted] ←
CourierInternal.stats[rejectsTransmitted] + 1;
WITH ch.message SELECT FROM
protocol3 =>
BEGIN
msgAddr ← @protocol3Body;
SELECT rejectCode FROM
noSuchProgramNumber =>
BEGIN
protocol3Body ← [reject[tid, noSuchProgramNumber[]]];
msgSize ← SIZE[
noSuchProgramNumber reject CourierProtocol.Protocol3Body] * bpw;
END;
noSuchProcedureValue =>
BEGIN
protocol3Body ← [reject[tid, noSuchProcedureValue[]]];
msgSize ← SIZE[
noSuchProcedureValue reject CourierProtocol.Protocol3Body] * bpw;
END;
invalidArguments =>
BEGIN
protocol3Body ← [reject[tid, invalidArguments[]]];
msgSize ← SIZE[
invalidArguments reject CourierProtocol.Protocol3Body] * bpw;
END;
noSuchVersionNumber =>
BEGIN
protocol3Body ← [reject[tid, noSuchVersionNumber[range]]];
msgSize ← SIZE[
noSuchVersionNumber reject CourierProtocol.Protocol3Body] * bpw;
END;
ENDCASE;
END;
ENDCASE;
[] ← CourierOps.FlushToEndOfMessage[ch];
CourierOps.PutBlock[ch, [msgAddr, 0, msgSize]];
ch.transFilter.object.sendNow[@ch.transFilter.object, TRUE];
--Ignore problems with the stream. They get noticed on the next get.
EXITS error => NULL;
END; --SendReject
SendAbort: PROCEDURE [
ch: CourierInternal.ServerConnection,
abortCode: CARDINAL, tid: CARDINAL,
arguments: Courier.Parameters] =
BEGIN
--Send error message to the remote found in the connection specified.
--This is an error generated by the remote program. The stream will(?)
--survive as it is.
ENABLE Stream.TimeOut, Courier.Error => --Can only be a stream problem.
GOTO error;
IF CourierInternal.doStats THEN
CourierInternal.stats[abortsTransmitted] ←
CourierInternal.stats[abortsTransmitted] + 1;
ch.streamState ← busy;
ch.transFilter.object.setSST[@ch.transFilter.object, CourierProtocol.dataSST];
WITH ch.message SELECT FROM
protocol3 =>
BEGIN
protocol3Body ← [abort[tid, abortCode]];
CourierOps.PutBlock[ch,
[@protocol3Body, 0, SIZE[abort CourierProtocol.Protocol3Body] * bpw]];
END;
ENDCASE;
IF (arguments.location # NIL) AND (arguments.description # NIL) THEN
CourierOps.Fetch[ch, arguments.location, arguments.description];
ch.transFilter.object.sendNow[@ch.transFilter.object, TRUE];
EXITS error => NULL; --Ignore problems with the stream. They will be
--noticed on the next operation.
END; --SendAbort
END..... --of CourierImplS.mesa
LOG
4-Feb-85 18:34:44 AOF Post Klamath.
29-Jun-84 10:25:01 SMA Factor Courier from NetworkStreams.
28-Nov-84 16:39:01 SMA Delete stream after return if connectionless.
28-Nov-84 16:39:01 SMA AugmentedStreams and new interfaces.
31-Dec-84 8:32:50 AOF SetSST = dataSST before sending results.
10-Jan-85 14:24:23 SMA bulkFilter set when connection and transport created.
10-Jan-85 17:48:06 SMA Delete stream if protocol versions don't intersect.
4-Feb-85 18:35:07 AOF More care about RETRY'ng Courier.Error.
21-Sep-85 12:03:39 AOF Allow bulk data on stream in 'results'.
20-Mar-86 15:43:28 AOF Force version # exchange ~piggybacked
8-Jun-86 17:21:41 AOF Undo ~piggyback version numbers
12-Nov-86 15:36:47 AOF FlushToEOS on UNWIND in arguments proc
16-Jan-87 10:14:31 AOF Removal of Courier Version 2.0
26-Oct-87 11:19:49 AOF And hopefully making it right (sorting version exchs)