-- File: ExpeditedCourierImpl.mesa - last edit:
-- AOF 4-Mar-87 17:56:28
-- DxG 30-Aug-85 10:50:18
-- DxW 4-Mar-87 17:56:20
-- Copyright (C) 1985, 1986, 1987 by Xerox Corporation. All rights reserved.
DIRECTORY
NSBuffer USING [Buffer],
ByteBlt USING [StartIndexGreaterThanStopIndexPlusOne],
Courier USING [Description, DeserializeParameters, Error,
Parameters, SerializeParameters, VersionRange],
CourierProtocol USING [Protocol3Body, RejectCode],
Environment USING [Block, PageCount],
ExpeditedCourier USING [
AddrList, DispatcherProc, ElapsedTime, ExpandingRingAction, Header,
Hop, NewRadiusNotifyProc, ResponseProc, RingBound, Service, Services],
Heap USING [Create, Delete, GetAttributes, systemZone],
Inline USING [LowHalf],
MemoryStream USING [Create, Destroy, GetIndex, IndexOutOfRange],
NSConstants USING [newClearinghouseSocket],
NSTypes USING [
bytesPerExchangeHeader, bytesPerIDPHeader, maxDataBytesPerExchange],
PacketExchange USING [
Delete, Error, ExchangeClientType, ExchangeHandle, maxBlockLength,
RequestObject, SetWaitTimes, ExchWords],
PacketExchangeInternal USING [
Create, GetFreeSendPacket, SendReplyPacket, WaitForRequestPacket],
PacketTimer USING [SendPackets, ResponseProc, Checkout],
Process USING [
Abort, GetPriority, InitializeCondition, MsecToTicks, Pause, Priority,
priorityBackground, SetPriority, Ticks],
Router USING [
endEnumeration, EnumerateRoutingTable, FillRoutingTable,
FindDestinationRelativeNetID, GetDelayToNet,
infinity, NoTableEntryForNet, startEnumeration],
Socket USING [
ChannelHandle, Create, Delete, GetPacket, GetSendBuffer, PutPacket,
BroadcastPacketToAllConnectedNets, ReturnBuffer, SetWaitTime, TimeOut],
Stream USING [Handle],
System USING [
broadcastHostNumber, GetClockPulses, HostNumber, isUtilityPilot,
Microseconds, NetworkAddress, NetworkNumber, nullNetworkAddress,
nullNetworkNumber, nullSocketNumber, Pulses, PulsesToMicroseconds,
SocketNumber];
ExpeditedCourierImpl: MONITOR
IMPORTS
ByteBlt, Courier, Heap, Inline, MemoryStream, PacketExchange,
PacketExchangeInternal, PacketTimer, Process, Router, Socket, System
EXPORTS ExpeditedCourier =
BEGIN
doDebug: BOOLEAN = FALSE;
expeditedCourierExchangeType: PacketExchange.ExchangeClientType = clearinghouseService;
defaultExpeditedCourierPacketSocket: System.SocketNumber = NSConstants.newClearinghouseSocket;
bytesPerExchangePacketHeader: CARDINAL =
NSTypes.bytesPerIDPHeader + NSTypes.bytesPerExchangeHeader;
GetDefaultSocketNumber: PUBLIC PROCEDURE RETURNS [System.SocketNumber] =
{ RETURN[defaultExpeditedCourierPacketSocket] };
waitTimeInSeconds: CARDINAL = 40;
reasonableDelay: Process.Ticks ← Process.MsecToTicks[250] + 1;
bufferFreed: CONDITION;
expeditedCalls: LONG CARDINAL ← 0; -- total # of calls
expeditedResponses: LONG CARDINAL ← 0; -- number of responses
routerGiveups: LONG CARDINAL ← 0; -- # times router says unreachable
objects: ARRAY CARDINAL[0..4) OF CARDINAL ← [
SIZE[call CourierProtocol.Protocol3Body],
SIZE[reject CourierProtocol.Protocol3Body],
SIZE[return CourierProtocol.Protocol3Body],
SIZE[abort CourierProtocol.Protocol3Body] ];
rejects: ARRAY CARDINAL[0..4) OF CARDINAL ← [
SIZE[noSuchProgramNumber reject CourierProtocol.Protocol3Body],
SIZE[noSuchProcedureValue reject CourierProtocol.Protocol3Body],
SIZE[invalidArguments reject CourierProtocol.Protocol3Body],
SIZE[noSuchVersionNumber reject CourierProtocol.Protocol3Body] ];
DescribeHeader: PUBLIC Courier.Description =
BEGIN
header: LONG POINTER TO ExpeditedCourier.Header ← notes.noteSize[
SIZE[ExpeditedCourier.Header]];
notes.noteChoice[
@header.body, SIZE[CourierProtocol.Protocol3Body], DESCRIPTOR[objects]];
WITH header.body SELECT FROM
call => notes.noteLongCardinal[@program]; -- fixes AR 13784
reject => notes.noteChoice[@rejectBody,
SIZE[reject CourierProtocol.Protocol3Body], DESCRIPTOR[rejects]];
ENDCASE;
END;
SerializeHeader: PUBLIC PROCEDURE[
rmsH: Stream.Handle, header: ExpeditedCourier.Header] =
{Courier.SerializeParameters[[@header, DescribeHeader], rmsH]};
DeserializeFromBlock: PUBLIC PROCEDURE [
parms: Courier.Parameters, heap: UNCOUNTED ZONE, blk: Environment.Block]
RETURNS [succeeded: BOOLEAN ← FALSE] =
BEGIN
ENABLE Courier.Error, ByteBlt.StartIndexGreaterThanStopIndexPlusOne,
MemoryStream.IndexOutOfRange => CONTINUE; --returns with FALSE
sH: Stream.Handle ← MemoryStream.Create[blk];
Courier.DeserializeParameters[parms, sH, heap !
UNWIND => MemoryStream.Destroy[sH]];
MemoryStream.Destroy[sH]; --normal destroy
RETURN[TRUE]; --if we don't error, then TRUE returned
END;
-- fills in call buffer for Packet Exchange packets
PrepareCallBuffer: PROCEDURE[
b: NSBuffer.Buffer, programNumber: LONG CARDINAL, versionNumber: CARDINAL,
procedureNumber: CARDINAL, arguments: Courier.Parameters,
address: System.NetworkAddress] RETURNS [tH: System.Pulses]=
BEGIN
blk: Environment.Block ← [LOOPHOLE[@b.ns.exchangeWords], 0, NSTypes.maxDataBytesPerExchange];
sH: Stream.Handle ← MemoryStream.Create[blk];
header: ExpeditedCourier.Header ← [body: [call[0, programNumber, versionNumber, procedureNumber]]];
l: CARDINAL;
{ ENABLE UNWIND => MemoryStream.Destroy[sH];
SerializeHeader[sH, header];
Courier.SerializeParameters[arguments, sH];
l ← Inline.LowHalf[MemoryStream.GetIndex[sH]] - blk.startIndex;
};
MemoryStream.Destroy[sH];
b.ns.pktLength ← l + bytesPerExchangePacketHeader;
b.ns.destination ← address;
b.ns.exchangeType ← expeditedCourierExchangeType;
b.ns.packetType ← packetExchange;
b.ns.exchangeID ← LOOPHOLE[tH ← System.GetClockPulses[]];
END;
-- given to System.Pulses values, returns elapsed time in milliseconds
GetElapsedTime: PROCEDURE [sent: System.Pulses, received: System.Pulses]
RETURNS [ExpeditedCourier.ElapsedTime] =
INLINE { RETURN[System.PulsesToMicroseconds[[received - sent]] / 1024] };
-- Makes a Courier Call. If the foreign host responds in an understandable
-- form of Courier, the ResponseProc will be called back.
Call: PUBLIC PROCEDURE [programNumber: LONG CARDINAL, versionNumber: CARDINAL,
procedureNumber: CARDINAL, arguments: Courier.Parameters,
address: System.NetworkAddress, response: ExpeditedCourier.ResponseProc ] =
BEGIN
requestBuffer: NSBuffer.Buffer; -- output packet
sH: Stream.Handle ← NIL; -- deserialization stream
tH: System.Pulses; -- low-level timing
GotAResponse: PacketTimer.ResponseProc =
BEGIN
ENABLE
ByteBlt.StartIndexGreaterThanStopIndexPlusOne, Courier.Error,
MemoryStream.IndexOutOfRange => CONTINUE;
receptionTime: System.Pulses ← System.GetClockPulses[];
header: ExpeditedCourier.Header;
blk: Environment.Block;
l: CARDINAL;
-- read through the packet, check the protocol version, and skip over it.
-- Then return an Environment.Block starting directly afterward.
l ← replyBuffer.ns.pktLength - bytesPerExchangePacketHeader;
l ← MIN[l, NSTypes.maxDataBytesPerExchange];
blk ← [LOOPHOLE[@replyBuffer.ns.exchangeWords], 0, l]; -- Make Block
sH ← MemoryStream.Create[blk];
Courier.DeserializeParameters[[@header, DescribeHeader], sH, NIL !
UNWIND => MemoryStream.Destroy[sH]]; --this is only proc that errors
blk.startIndex ← Inline.LowHalf[MemoryStream.GetIndex[sH]];
MemoryStream.Destroy[sH];
IF header.protRange.low<=protocol3 AND header.protRange.high>=protocol3 THEN
WITH header.body SELECT FROM
reject, return, abort => -- got a procedure return
[] ← response[hops, GetElapsedTime[tH, receptionTime], header, blk];
ENDCASE => NULL;
expeditedResponses ← expeditedResponses + 1;
END; -- got a response
-- mainline code
expeditedCalls ← expeditedCalls + 1;
BEGIN ENABLE -- don't send to unreachable nets
{Router.NoTableEntryForNet => {routerGiveups ← routerGiveups + 1; CONTINUE};};
hops:CARDINAL ← Router.GetDelayToNet[address.net]; -- unreachable nets
pxH:PacketExchange.ExchangeHandle ← PacketTimer.Checkout[hops];
requestBuffer ← PacketExchangeInternal.GetFreeSendPacket[pxH];
tH ← PrepareCallBuffer[requestBuffer,
programNumber, versionNumber, procedureNumber, arguments, address];
PacketTimer.SendPackets[pxH, requestBuffer, GotAResponse];
END; -- enabled
END;
SendIt: PRIVATE PROCEDURE[
b: NSBuffer.Buffer, soc: Socket.ChannelHandle,
distance: ExpeditedCourier.Hop, pauseTime: Process.Ticks,
isDirected: BOOLEAN, action: ExpeditedCourier.ExpandingRingAction] =
BEGIN
RequeueTheBuffer: ENTRY PROCEDURE [theBuf: NSBuffer.Buffer] =
BEGIN
<<ENABLE UNWIND => NULL;>>
IF theBuf#b THEN ERROR;
requeued ← TRUE; BROADCAST bufferFreed;
END;
RecoverTheBuffer: ENTRY PROCEDURE =
BEGIN
<<ENABLE UNWIND => NULL; --bufferFreed is not abortable>>
UNTIL requeued DO WAIT bufferFreed ENDLOOP;
END;
-- the destination host and socket number of the buffer have been filled in.
requeued: BOOLEAN;
tries: CARDINAL ← 1;
outerLoopCount: CARDINAL ← 2;
netDelayTime: Process.Ticks ← 1;
net: System.NetworkNumber ← Router.startEnumeration;
sendBuffer: PROC[Socket.ChannelHandle, NSBuffer.Buffer] ← Socket.PutPacket;
IF action=reliablyFindAllServers THEN
BEGIN
netDelayTime ← pauseTime;
tries ← 5;
outerLoopCount ← 1;
END;
b.requeueProcedure ← RequeueTheBuffer;
FOR i: CARDINAL IN [0 .. outerLoopCount) DO
SELECT TRUE FROM
isDirected =>
FOR t: CARDINAL IN [0 .. tries) DO
requeued ← FALSE; sendBuffer[soc, b]; RecoverTheBuffer[];
Process.Pause[netDelayTime];
ENDLOOP;
distance=0 =>
BEGIN
sendBuffer ← Socket.BroadcastPacketToAllConnectedNets;
requeued ← FALSE; sendBuffer[soc, b]; RecoverTheBuffer[];
END;
ENDCASE => -- not directed and distance not zero
BEGIN
WHILE (net ← Router.EnumerateRoutingTable[
net, distance]) # Router.endEnumeration DO
b.ns.destination.net ← net;
FOR t: CARDINAL IN [0 .. tries) DO
requeued ← FALSE; sendBuffer[soc, b]; RecoverTheBuffer[];
Process.Pause[netDelayTime];
ENDLOOP;
Process.Pause[netDelayTime];
ENDLOOP;
END; -- case that hop is not zero
Process.Pause[pauseTime];
ENDLOOP; -- simple send loop
END; -- of procedure SendIt
SendToRing: PUBLIC PROCEDURE[
b: NSBuffer.Buffer, localSocketHandle: Socket.ChannelHandle,
ring: ExpeditedCourier.RingBound,
action: ExpeditedCourier.ExpandingRingAction,
isDirected: BOOLEAN, newRadiusNotify: ExpeditedCourier.NewRadiusNotifyProc]
RETURNS [continue: BOOLEAN ← TRUE] =
BEGIN
filled: BOOLEAN ← FALSE;
-- the destination host and socket number of the buffer have been filled in.
pauseTime: Process.Ticks ← SELECT action FROM
findMostServersInShortTime => reasonableDelay,
ENDCASE => reasonableDelay*7;
b.ns.destination.host ← System.broadcastHostNumber;
IF isDirected THEN
BEGIN
foundNet: BOOLEAN ← TRUE;
[] ← Router.GetDelayToNet[b.ns.destination.net
! Router.NoTableEntryForNet => {foundNet ← FALSE; CONTINUE}]; -- primes the routing table
IF foundNet THEN SendIt[b, localSocketHandle, 0, pauseTime, TRUE, action];
RETURN;
END;
{ ENABLE UNWIND => IF filled THEN Router.FillRoutingTable[0];
FOR i: ExpeditedCourier.Hop IN [ring.low .. ring.high] DO
<<
We don't have to fill the table for local net stuff. If the client
is looking for the 'first' answer, then we'll never have to fill the
table and we'll not incure all that overhead.
>>
IF ~filled AND (i # 0) THEN
{filled ← TRUE; Router.FillRoutingTable[ring.high]};
IF newRadiusNotify#NIL THEN continue ← newRadiusNotify[i];
IF NOT continue THEN EXIT;
SendIt[b, localSocketHandle, i, pauseTime, FALSE, action];
Process.Pause[pauseTime];
ENDLOOP;
};
IF filled THEN Router.FillRoutingTable[0]; -- unfill routing table
<<Process.Pause[ring.high*pauseTime];>>
END;
DirectedBroadcastCall: PUBLIC PROCEDURE[
programNumber: LONG CARDINAL, versionNumber: CARDINAL,
procedureNumber: CARDINAL, arguments: Courier.Parameters,
address: System.NetworkAddress,
action: ExpeditedCourier.ExpandingRingAction,
eachResponse: ExpeditedCourier.ResponseProc,
responseBufferCount: CARDINAL ] =
BEGIN
dummyRing: ExpeditedCourier.RingBound;
-- declared here because wait time depends on the max ring size,
-- which is defaulted to max hop count.
CommonCallBroadcaster[
programNumber, versionNumber, procedureNumber, arguments,
dummyRing, address, action, eachResponse, NIL, responseBufferCount, TRUE];
END;
CallToInternetRing: PUBLIC PROCEDURE[
programNumber: LONG CARDINAL, versionNumber: CARDINAL,
procedureNumber: CARDINAL, arguments: Courier.Parameters,
ring: ExpeditedCourier.RingBound, socket: System.SocketNumber,
action: ExpeditedCourier.ExpandingRingAction,
eachResponse: ExpeditedCourier.ResponseProc,
newRadiusNotify: ExpeditedCourier.NewRadiusNotifyProc,
responseBufferCount: CARDINAL] =
BEGIN
address: System.NetworkAddress;
address.socket ← socket;
CommonCallBroadcaster[
programNumber, versionNumber, procedureNumber, arguments,
ring, address, action, eachResponse, newRadiusNotify,
responseBufferCount, FALSE];
END;
CallToAddresses: PUBLIC PROCEDURE[
programNumber: LONG CARDINAL, versionNumber: CARDINAL,
procedureNumber: CARDINAL, arguments: Courier.Parameters,
socket: System.SocketNumber, addresses: ExpeditedCourier.AddrList,
response: ExpeditedCourier.ResponseProc,
responseBufferCount: CARDINAL ← 5 ] =
BEGIN
sendInProgress, ok, continue: BOOLEAN ← TRUE;
sendProcess: PROCESS ← NIL;
localSocket: Socket.ChannelHandle ← Socket.Create[
System.nullSocketNumber, 1, responseBufferCount];
requestBuffer: NSBuffer.Buffer ← Socket.GetSendBuffer[localSocket];
blk: Environment.Block;
sH: Stream.Handle ← NIL;
header: ExpeditedCourier.Header;
l: CARDINAL;
tH: System.Pulses;
elpseTime: ExpeditedCourier.ElapsedTime;
replyBuffer: NSBuffer.Buffer ← NIL;
hopsToResponder: ExpeditedCourier.Hop;
FreeResources: PROCEDURE =
BEGIN
IF sH # NIL THEN {MemoryStream.Destroy[sH]; sH ← NIL};
IF sendInProgress AND (sendProcess # NIL) THEN Process.Abort[sendProcess];
IF sendProcess # NIL THEN {JOIN sendProcess; sendProcess ← NIL};
IF requestBuffer # NIL THEN
{Socket.ReturnBuffer[requestBuffer]; requestBuffer ← NIL};
IF replyBuffer # NIL THEN
{Socket.ReturnBuffer[replyBuffer]; replyBuffer ← NIL};
Socket.Delete[localSocket];
END;
Sender: PROCEDURE =
BEGIN
requeued: BOOLEAN;
RequeueTheBuffer: ENTRY PROCEDURE [theBuf: NSBuffer.Buffer] =
BEGIN
<<ENABLE UNWIND => NULL;>>
IF theBuf#requestBuffer THEN ERROR;
requeued ← TRUE; BROADCAST bufferFreed;
END;
RecoverTheBuffer: ENTRY PROCEDURE =
BEGIN
<<ENABLE UNWIND => NULL; --bufferFreed is not abortable>>
UNTIL requeued DO WAIT bufferFreed ENDLOOP;
END;
FOR i: CARDINAL IN [0 .. 9) DO
ENABLE ABORTED => EXIT;
FOR a: ExpeditedCourier.AddrList ← addresses, a.next UNTIL a=NIL DO
requestBuffer.ns.destination ← a.address;
requestBuffer.ns.destination.socket ← socket;
requestBuffer.requeueProcedure ← RequeueTheBuffer;
requeued ← FALSE;
Socket.PutPacket[localSocket, requestBuffer];
RecoverTheBuffer[];
IF NOT continue THEN EXIT;
Process.Pause[reasonableDelay];
ENDLOOP;
IF NOT continue THEN EXIT;
Process.Pause[reasonableDelay*7];
ENDLOOP;
sendInProgress ← FALSE;
END;
Socket.SetWaitTime[localSocket, 2500];
{ ENABLE UNWIND => FreeResources[];
tH ← PrepareCallBuffer[
requestBuffer, programNumber, versionNumber, procedureNumber,
arguments, System.nullNetworkAddress];
sendProcess ← FORK Sender[];
WHILE continue DO
replyBuffer ← Socket.GetPacket[localSocket !
Socket.TimeOut => IF sendInProgress THEN RETRY ELSE EXIT;
ABORTED => EXIT];
l ← replyBuffer.ns.pktLength - bytesPerExchangePacketHeader;
l ← MIN[l, NSTypes.maxDataBytesPerExchange];
blk ← [LOOPHOLE[@replyBuffer.ns.exchangeWords], 0, l];
sH ← MemoryStream.Create[blk];
ok ← replyBuffer.ns.packetType=packetExchange
AND replyBuffer.ns.exchangeType=expeditedCourierExchangeType
AND replyBuffer.ns.exchangeID=requestBuffer.ns.exchangeID;
IF ok THEN Courier.DeserializeParameters[
[@header, DescribeHeader], sH, NIL !
MemoryStream.IndexOutOfRange => {ok ← FALSE; CONTINUE};
Courier.Error => {ok ← FALSE; CONTINUE};
ByteBlt.StartIndexGreaterThanStopIndexPlusOne => {ok ← FALSE; CONTINUE}];
l ← Inline.LowHalf[MemoryStream.GetIndex[sH]];
MemoryStream.Destroy[sH]; sH ← NIL;
IF ok AND header.protRange.low<=protocol3
AND header.protRange.high>=protocol3 THEN
BEGIN
blk.startIndex ← l;
WITH header.body SELECT FROM
reject, return, abort => BEGIN
elpseTime ← GetElapsedTime[tH, replyBuffer.fo.time];
hopsToResponder ← Router.GetDelayToNet[replyBuffer.ns.source.net
! Router.NoTableEntryForNet =>
{ hopsToResponder ← Router.infinity; CONTINUE }];
continue ← response[hopsToResponder, elpseTime, header, blk];
END;
ENDCASE => NULL;
END;
Socket.ReturnBuffer[replyBuffer]; replyBuffer ← NIL;
ENDLOOP;
}; -- enable unwind
FreeResources[];
END;
CommonCallBroadcaster: PRIVATE PROCEDURE[
programNumber: LONG CARDINAL, versionNumber: CARDINAL,
procedureNumber: CARDINAL, arguments: Courier.Parameters,
ring: ExpeditedCourier.RingBound, address: System.NetworkAddress,
action: ExpeditedCourier.ExpandingRingAction,
eachResponse: ExpeditedCourier.ResponseProc,
newRadiusNotify: ExpeditedCourier.NewRadiusNotifyProc,
responseBufferCount: CARDINAL, isDirected: BOOLEAN ] =
BEGIN
Sender: PROCEDURE =
BEGIN
myPriority: Process.Priority ← Process.GetPriority[];
IF myPriority > Process.priorityBackground THEN
Process.SetPriority[myPriority - 1];
continue ← SendToRing[
requestBuffer, localSocket, ring, action, isDirected, newRadiusNotify !
ABORTED => CONTINUE];
sendInProgress ← FALSE;
END;
FreeResources: PROCEDURE =
BEGIN
IF sH # NIL THEN {MemoryStream.Destroy[sH]; sH ← NIL};
IF sendProcess # NIL THEN
{Process.Abort[sendProcess]; JOIN sendProcess; sendProcess ← NIL};
IF requestBuffer # NIL THEN
{Socket.ReturnBuffer[requestBuffer]; requestBuffer ← NIL};
IF replyBuffer # NIL THEN
{Socket.ReturnBuffer[replyBuffer]; replyBuffer ← NIL};
Socket.Delete[localSocket];
END;
finWaitCount: CARDINAL ← 0;
Finished: PROCEDURE RETURNS [finished: BOOLEAN] =
BEGIN
IF NOT continue OR finWaitCount>=ring.high OR
(~sendInProgress AND standAlone) THEN RETURN[TRUE];
finished ← FALSE;
finWaitCount ← finWaitCount + 2;
END;
tH: System.Pulses;
blk: Environment.Block;
sH: Stream.Handle ← NIL;
sendProcess: PROCESS ← NIL;
header: ExpeditedCourier.Header;
replyBuffer: NSBuffer.Buffer ← NIL;
hopsToResponder: ExpeditedCourier.Hop;
elpseTime: ExpeditedCourier.ElapsedTime;
continue, sendInProgress: BOOLEAN ← TRUE;
localSocket: Socket.ChannelHandle ← Socket.Create[
System.nullSocketNumber, 1, responseBufferCount];
requestBuffer: NSBuffer.Buffer ← Socket.GetSendBuffer[localSocket];
standAlone: BOOLEAN ←
Router.FindDestinationRelativeNetID[System.nullNetworkNumber] =
System.nullNetworkNumber;
Socket.SetWaitTime[localSocket, 2500];
{ ENABLE UNWIND => FreeResources[];
tH ← PrepareCallBuffer[
requestBuffer, programNumber, versionNumber,
procedureNumber, arguments, address];
sendProcess ← FORK Sender[];
WHILE continue DO
replyBuffer ← Socket.GetPacket[localSocket !
Socket.TimeOut => IF sendInProgress THEN RETRY ELSE CONTINUE;
ABORTED => CONTINUE];
IF replyBuffer=NIL THEN IF Finished[] THEN EXIT ELSE LOOP;
blk ← [LOOPHOLE[@replyBuffer.ns.exchangeWords], 0,
MIN[(replyBuffer.ns.pktLength - bytesPerExchangePacketHeader),
NSTypes.maxDataBytesPerExchange]];
IF replyBuffer.ns.packetType=packetExchange
AND replyBuffer.ns.exchangeType=expeditedCourierExchangeType
AND replyBuffer.ns.exchangeID=requestBuffer.ns.exchangeID THEN
BEGIN
sH ← MemoryStream.Create[blk];
BEGIN
Courier.DeserializeParameters[
[@header, DescribeHeader], sH, NIL !
MemoryStream.IndexOutOfRange, Courier.Error,
ByteBlt.StartIndexGreaterThanStopIndexPlusOne => GOTO notOk];
blk.startIndex ← Inline.LowHalf[MemoryStream.GetIndex[sH]];
-- here we ignore protocol version numbers and the response body MUST be
-- a "return" body.
WITH header.body SELECT FROM
return, reject, abort =>
BEGIN
elpseTime ← GetElapsedTime[tH, replyBuffer.fo.time];
hopsToResponder ← Router.GetDelayToNet[replyBuffer.ns.source.net !
Router.NoTableEntryForNet =>
{hopsToResponder ← Router.infinity; CONTINUE}];
continue ← eachResponse[hopsToResponder, elpseTime, header, blk];
END;
ENDCASE;
EXITS notOk => NULL;
END;
MemoryStream.Destroy[sH]; sH ← NIL;
END;
Socket.ReturnBuffer[replyBuffer]; replyBuffer ← NIL;
ENDLOOP;
}; -- enable unwind
FreeResources[];
END;
-- - - - - - - - - - - - - - - -
-- Server side of the deal...
-- - - - - - - - - - - - - - - -
processesPerService: CARDINAL = 2;
buffersPerService: CARDINAL = 2;
requestObjectSize: CARDINAL = SIZE[PacketExchange.RequestObject];
RequestObject: TYPE = ARRAY [0 .. requestObjectSize) OF WORD;
ProcessInfo: TYPE = RECORD [
pid: PROCESS,
currentRequest: RequestObject ];
ExpeditedServiceHandle: PUBLIC TYPE = LONG POINTER TO ExpeditedServiceRecord;
ExpeditedServiceRecord: TYPE = RECORD [
next: ExpeditedServiceHandle,
services: ExpeditedCourier.Services,
pleaseStop: BOOLEAN,
exH: PacketExchange.ExchangeHandle,
state: ARRAY [0 .. processesPerService) OF ProcessInfo ];
heap: UNCOUNTED ZONE ← NIL;
head: ExpeditedServiceHandle ← NIL;
ExportExpeditedPrograms: PUBLIC ENTRY PROCEDURE[
services: ExpeditedCourier.Services, socket: System.SocketNumber]
RETURNS[h: ExpeditedServiceHandle] =
BEGIN
i: CARDINAL ← LENGTH[services];
sequenceOfService: TYPE = RECORD[
SEQUENCE COMPUTED CARDINAL OF ExpeditedCourier.Service];
sequencePtr: LONG POINTER TO sequenceOfService;
exchanger: PacketExchange.ExchangeHandle ← MakeAnExchanger[socket];
h ← heap.NEW[ExpeditedServiceRecord];
h.next ← head;
head ← h;
h.pleaseStop ← FALSE;
sequencePtr ← heap.NEW[sequenceOfService[i]];
h.services ← DESCRIPTOR[sequencePtr, i];
FOR j: CARDINAL IN [0 .. i) DO h.services[j] ← services[j]; ENDLOOP;
h.exH ← exchanger;
FOR j: CARDINAL IN [0 .. processesPerService) DO
h.state[j].pid ← FORK Dispatcher[h, j];
ENDLOOP;
END;
UnexportExpeditedPrograms: PUBLIC PROCEDURE [h: ExpeditedServiceHandle] =
BEGIN
DoItLocked: ENTRY PROCEDURE =
BEGIN
prev, this: ExpeditedServiceHandle;
FOR this ← head, this.next UNTIL this = h DO prev ← this ENDLOOP;
IF this=head THEN head ← head.next ELSE prev.next ← this.next;
h.next ← NIL;
FreeAnExchanger[h.exH];
IF exchangerCount=0 THEN RETURN; -- don't free stuff if heap was deleted
heap.FREE[@h.services.BASE];
h.services ← DESCRIPTOR[NIL, 0];
heap.FREE[@h];
END;
h.pleaseStop ← TRUE;
PacketExchange.SetWaitTimes[h.exH, 100, 100];
FOR j: CARDINAL IN [0 .. processesPerService) DO
Process.Abort[h.state[j].pid]; -- wake them all before waiting to join any
ENDLOOP;
FOR j: CARDINAL IN [0 .. processesPerService) DO
JOIN h.state[j].pid; -- must do this outside of monitor
ENDLOOP;
DoItLocked[];
END;
exchangerCount: CARDINAL ← 0;
MakeAnExchanger: INTERNAL PROCEDURE[
socket: System.SocketNumber]
RETURNS[exH: PacketExchange.ExchangeHandle] =
BEGIN
SELECT TRUE FROM
(exchangerCount # 0) => NULL;
(System.isUtilityPilot) => heap ← Heap.systemZone;
ENDCASE =>
heap ← Heap.Create[
initial: 10, increment: 6, swapUnitSize: 1,
largeNodeThreshold: 512, checking: FALSE,
ownerChecking: Heap.GetAttributes[Heap.systemZone].ownerChecking];
exH ← PacketExchangeInternal.Create[socket, buffersPerService, ];
exchangerCount ← exchangerCount + 1;
END;
FreeAnExchanger: INTERNAL PROCEDURE [exH: PacketExchange.ExchangeHandle] =
BEGIN
PacketExchange.Delete[exH];
exchangerCount ← exchangerCount - 1;
IF exchangerCount=0 THEN {Heap.Delete[heap]; heap ← NIL};
END;
Dispatcher: PRIVATE PROCEDURE [myH: ExpeditedServiceHandle, myIndex: CARDINAL] =
BEGIN OPEN myH;
b: NSBuffer.Buffer;
theRequestData: PacketExchange.RequestObject;
sH: Stream.Handle;
header: ExpeditedCourier.Header;
serviceDispatcher: ExpeditedCourier.DispatcherProc;
ok, wasBroadcasted: BOOLEAN;
dataStartIndex: CARDINAL;
UNTIL myH.pleaseStop DO
state[myIndex].currentRequest ← ALL[0];
ok ← TRUE;
b ← PacketExchangeInternal.WaitForRequestPacket[
exH, expeditedCourierExchangeType !
PacketExchange.Error => SELECT why FROM
aborted, noReceiverAtDestination, rejectedByReceiver, hardwareProblem => EXIT;
timeout, insufficientResourcesAtDestination => LOOP;
ENDCASE => IF ~doDebug THEN LOOP;
ABORTED => EXIT; ];
theRequestData ← [
nBytes: b.ns.pktLength - bytesPerExchangePacketHeader,
requestType: b.ns.exchangeType,
requestorsAddress: b.ns.source,
requestorsExchangeID: b.ns.exchangeID];
-- use pkt exch buffer here!!
wasBroadcasted ← b.ns.destination.host=System.broadcastHostNumber;
sH ← MemoryStream.Create[[@b.ns.exchangeBytes, 0, theRequestData.nBytes]];
Courier.DeserializeParameters[[@header, DescribeHeader], sH, NIL !
MemoryStream.IndexOutOfRange => {ok ← FALSE; CONTINUE};
Courier.Error => {ok ← FALSE; CONTINUE};
ByteBlt.StartIndexGreaterThanStopIndexPlusOne => {ok ← FALSE; CONTINUE}];
dataStartIndex ← Inline.LowHalf[MemoryStream.GetIndex[sH]];
MemoryStream.Destroy[sH];
ok ← ok AND header.protRange.low<=protocol3
AND header.protRange.high>=protocol3;
IF ok THEN WITH header.body SELECT FROM
call => BEGIN
[ok, serviceDispatcher, program] ← FindServiceDispatcher[
program, version, procedure, LOOPHOLE[LONG[@theRequestData]],
myH, myIndex];
IF ok THEN
BEGIN
-- note this is same storage passed to client
sH ← MemoryStream.Create[
[@b.ns.exchangeBytes, 0, PacketExchange.maxBlockLength]];
ok ← serviceDispatcher[
program, version, procedure,
[@b.ns.exchangeBytes, dataStartIndex, theRequestData.nBytes],
sH, wasBroadcasted];
b.ns.pktLength ← Inline.LowHalf[
MemoryStream.GetIndex[sH]] + bytesPerExchangePacketHeader;
MemoryStream.Destroy[sH];
END; -- all was ok
END; -- header was a "call" header
ENDCASE => ok ← FALSE; -- header was not a "call" header
IF ok AND (NOT myH.pleaseStop) THEN
BEGIN ENABLE PacketExchange.Error, ABORTED => CONTINUE;
--b ← PacketExchangeInternal.GetFreeSendPacket[exH];
--b.ns.pktLength ← replyBlock.stopIndexPlusOne + bytesPerExchangePacketHeader;
--b.ns.packetType ← packetExchange;
--b.ns.exchangeType ← expeditedCourierExchangeType;
--b.ns.exchangeID ← rH.requestorsExchangeID;
b.ns.destination ← b.ns.source;
PacketExchangeInternal.SendReplyPacket[exH, b];
LOOP;
END;
Socket.ReturnBuffer[b]; b ← NIL;
ENDLOOP;
END;
FindServiceDispatcher: PRIVATE ENTRY PROCEDURE[
program: LONG CARDINAL, version: CARDINAL, procedure: CARDINAL,
currentRequest: LONG POINTER TO RequestObject, h: ExpeditedServiceHandle,
index: CARDINAL]
RETURNS [
ok: BOOLEAN, proc: ExpeditedCourier.DispatcherProc,
theRealProgram: LONG CARDINAL] =
BEGIN
FindProgramInHandle: PROCEDURE [exH: ExpeditedServiceHandle]
RETURNS [isBindProc: BOOLEAN ← FALSE,
proc: ExpeditedCourier.DispatcherProc ← NIL] =
BEGIN OPEN exH↑;
currentProgram: LONG CARDINAL;
FOR i: CARDINAL IN [0 .. LENGTH[services]) DO
currentProgram ← services[i].programNumber;
IF (program=currentProgram OR PacketExchange.ExchWords[program]=currentProgram) -- backward hack for AR 13784
AND services[i].versionRange.low<=version
AND services[i].versionRange.high>=version THEN
BEGIN
theRealProgram ← currentProgram; -- backward hack for AR 13784
proc ← services[i].dispatcher;
isBindProc ← procedure=services[i].bindRequestProcedure;
RETURN;
END;
ENDLOOP;
END;
currentH: ExpeditedServiceHandle ← head;
[ok, proc] ← FindProgramInHandle[h];
WHILE proc=NIL AND currentH#NIL DO
[ok, proc] ← FindProgramInHandle[currentH];
currentH ← currentH.next;
ENDLOOP;
IF proc#NIL AND NOT ok THEN
-- is really ok only if the the request is not already being processed
BEGIN
FOR i: CARDINAL IN [0 .. processesPerService) DO
IF currentRequest↑ = h.state[i].currentRequest THEN RETURN; -- not ok
ENDLOOP;
h.state[index].currentRequest ← currentRequest↑;
ok ← TRUE;
END;
END;
-- mainline code
Process.InitializeCondition[@bufferFreed, Process.MsecToTicks[60]];
END..
LOG
??-???-85 ??:??:?? - DxG - Created file (copied mostly from CourierExpediter)
19-Jun-85 11:24:44 - JxS - Added Copyright. Changed CourierInternal to CourierProtocol. Changes for Socket interface changes.
12-Nov-85 8:17:56 - AOF - Cleanup
17-Jan-86 11:12:27 - AOF - Removed ML deadlock transmitting and requeuing b's
26-Feb-86 14:52:00 - AOF - Don't fill routing table for local net rings
30-May-86 19:24:20 - AOF - New buffer package
11-Feb-87 20:31:12 - AOF - Don't delete instance unless in UNWIND
11-Feb-87 20:31:12 - Woods - Join processes when unexporting (else can end up freeing buffer after buffer pool has been destroyed).