-- File: ExpeditedCourierImpl.mesa - last edit: -- AOF 4-Mar-87 17:56:28 -- DxG 30-Aug-85 10:50:18 -- DxW 4-Mar-87 17:56:20 -- Copyright (C) 1985, 1986, 1987 by Xerox Corporation. All rights reserved. DIRECTORY NSBuffer USING [Buffer], ByteBlt USING [StartIndexGreaterThanStopIndexPlusOne], Courier USING [Description, DeserializeParameters, Error, Parameters, SerializeParameters, VersionRange], CourierProtocol USING [Protocol3Body, RejectCode], Environment USING [Block, PageCount], ExpeditedCourier USING [ AddrList, DispatcherProc, ElapsedTime, ExpandingRingAction, Header, Hop, NewRadiusNotifyProc, ResponseProc, RingBound, Service, Services], Heap USING [Create, Delete, GetAttributes, systemZone], Inline USING [LowHalf], MemoryStream USING [Create, Destroy, GetIndex, IndexOutOfRange], NSConstants USING [newClearinghouseSocket], NSTypes USING [ bytesPerExchangeHeader, bytesPerIDPHeader, maxDataBytesPerExchange], PacketExchange USING [ Delete, Error, ExchangeClientType, ExchangeHandle, maxBlockLength, RequestObject, SetWaitTimes, ExchWords], PacketExchangeInternal USING [ Create, GetFreeSendPacket, SendReplyPacket, WaitForRequestPacket], PacketTimer USING [SendPackets, ResponseProc, Checkout], Process USING [ Abort, GetPriority, InitializeCondition, MsecToTicks, Pause, Priority, priorityBackground, SetPriority, Ticks], Router USING [ endEnumeration, EnumerateRoutingTable, FillRoutingTable, FindDestinationRelativeNetID, GetDelayToNet, infinity, NoTableEntryForNet, startEnumeration], Socket USING [ ChannelHandle, Create, Delete, GetPacket, GetSendBuffer, PutPacket, BroadcastPacketToAllConnectedNets, ReturnBuffer, SetWaitTime, TimeOut], Stream USING [Handle], System USING [ broadcastHostNumber, GetClockPulses, HostNumber, isUtilityPilot, Microseconds, NetworkAddress, NetworkNumber, nullNetworkAddress, nullNetworkNumber, nullSocketNumber, Pulses, PulsesToMicroseconds, SocketNumber]; ExpeditedCourierImpl: MONITOR IMPORTS ByteBlt, Courier, Heap, Inline, MemoryStream, PacketExchange, PacketExchangeInternal, PacketTimer, Process, Router, Socket, System EXPORTS ExpeditedCourier = BEGIN doDebug: BOOLEAN = FALSE; expeditedCourierExchangeType: PacketExchange.ExchangeClientType = clearinghouseService; defaultExpeditedCourierPacketSocket: System.SocketNumber = NSConstants.newClearinghouseSocket; bytesPerExchangePacketHeader: CARDINAL = NSTypes.bytesPerIDPHeader + NSTypes.bytesPerExchangeHeader; GetDefaultSocketNumber: PUBLIC PROCEDURE RETURNS [System.SocketNumber] = { RETURN[defaultExpeditedCourierPacketSocket] }; waitTimeInSeconds: CARDINAL = 40; reasonableDelay: Process.Ticks ← Process.MsecToTicks[250] + 1; bufferFreed: CONDITION; expeditedCalls: LONG CARDINAL ← 0; -- total # of calls expeditedResponses: LONG CARDINAL ← 0; -- number of responses routerGiveups: LONG CARDINAL ← 0; -- # times router says unreachable objects: ARRAY CARDINAL[0..4) OF CARDINAL ← [ SIZE[call CourierProtocol.Protocol3Body], SIZE[reject CourierProtocol.Protocol3Body], SIZE[return CourierProtocol.Protocol3Body], SIZE[abort CourierProtocol.Protocol3Body] ]; rejects: ARRAY CARDINAL[0..4) OF CARDINAL ← [ SIZE[noSuchProgramNumber reject CourierProtocol.Protocol3Body], SIZE[noSuchProcedureValue reject CourierProtocol.Protocol3Body], SIZE[invalidArguments reject CourierProtocol.Protocol3Body], SIZE[noSuchVersionNumber reject CourierProtocol.Protocol3Body] ]; DescribeHeader: PUBLIC Courier.Description = BEGIN header: LONG POINTER TO ExpeditedCourier.Header ← notes.noteSize[ SIZE[ExpeditedCourier.Header]]; notes.noteChoice[ @header.body, SIZE[CourierProtocol.Protocol3Body], DESCRIPTOR[objects]]; WITH header.body SELECT FROM call => notes.noteLongCardinal[@program]; -- fixes AR 13784 reject => notes.noteChoice[@rejectBody, SIZE[reject CourierProtocol.Protocol3Body], DESCRIPTOR[rejects]]; ENDCASE; END; SerializeHeader: PUBLIC PROCEDURE[ rmsH: Stream.Handle, header: ExpeditedCourier.Header] = {Courier.SerializeParameters[[@header, DescribeHeader], rmsH]}; DeserializeFromBlock: PUBLIC PROCEDURE [ parms: Courier.Parameters, heap: UNCOUNTED ZONE, blk: Environment.Block] RETURNS [succeeded: BOOLEAN ← FALSE] = BEGIN ENABLE Courier.Error, ByteBlt.StartIndexGreaterThanStopIndexPlusOne, MemoryStream.IndexOutOfRange => CONTINUE; --returns with FALSE sH: Stream.Handle ← MemoryStream.Create[blk]; Courier.DeserializeParameters[parms, sH, heap ! UNWIND => MemoryStream.Destroy[sH]]; MemoryStream.Destroy[sH]; --normal destroy RETURN[TRUE]; --if we don't error, then TRUE returned END; -- fills in call buffer for Packet Exchange packets PrepareCallBuffer: PROCEDURE[ b: NSBuffer.Buffer, programNumber: LONG CARDINAL, versionNumber: CARDINAL, procedureNumber: CARDINAL, arguments: Courier.Parameters, address: System.NetworkAddress] RETURNS [tH: System.Pulses]= BEGIN blk: Environment.Block ← [LOOPHOLE[@b.ns.exchangeWords], 0, NSTypes.maxDataBytesPerExchange]; sH: Stream.Handle ← MemoryStream.Create[blk]; header: ExpeditedCourier.Header ← [body: [call[0, programNumber, versionNumber, procedureNumber]]]; l: CARDINAL; { ENABLE UNWIND => MemoryStream.Destroy[sH]; SerializeHeader[sH, header]; Courier.SerializeParameters[arguments, sH]; l ← Inline.LowHalf[MemoryStream.GetIndex[sH]] - blk.startIndex; }; MemoryStream.Destroy[sH]; b.ns.pktLength ← l + bytesPerExchangePacketHeader; b.ns.destination ← address; b.ns.exchangeType ← expeditedCourierExchangeType; b.ns.packetType ← packetExchange; b.ns.exchangeID ← LOOPHOLE[tH ← System.GetClockPulses[]]; END; -- given to System.Pulses values, returns elapsed time in milliseconds GetElapsedTime: PROCEDURE [sent: System.Pulses, received: System.Pulses] RETURNS [ExpeditedCourier.ElapsedTime] = INLINE { RETURN[System.PulsesToMicroseconds[[received - sent]] / 1024] }; -- Makes a Courier Call. If the foreign host responds in an understandable -- form of Courier, the ResponseProc will be called back. Call: PUBLIC PROCEDURE [programNumber: LONG CARDINAL, versionNumber: CARDINAL, procedureNumber: CARDINAL, arguments: Courier.Parameters, address: System.NetworkAddress, response: ExpeditedCourier.ResponseProc ] = BEGIN requestBuffer: NSBuffer.Buffer; -- output packet sH: Stream.Handle ← NIL; -- deserialization stream tH: System.Pulses; -- low-level timing GotAResponse: PacketTimer.ResponseProc = BEGIN ENABLE ByteBlt.StartIndexGreaterThanStopIndexPlusOne, Courier.Error, MemoryStream.IndexOutOfRange => CONTINUE; receptionTime: System.Pulses ← System.GetClockPulses[]; header: ExpeditedCourier.Header; blk: Environment.Block; l: CARDINAL; -- read through the packet, check the protocol version, and skip over it. -- Then return an Environment.Block starting directly afterward. l ← replyBuffer.ns.pktLength - bytesPerExchangePacketHeader; l ← MIN[l, NSTypes.maxDataBytesPerExchange]; blk ← [LOOPHOLE[@replyBuffer.ns.exchangeWords], 0, l]; -- Make Block sH ← MemoryStream.Create[blk]; Courier.DeserializeParameters[[@header, DescribeHeader], sH, NIL ! UNWIND => MemoryStream.Destroy[sH]]; --this is only proc that errors blk.startIndex ← Inline.LowHalf[MemoryStream.GetIndex[sH]]; MemoryStream.Destroy[sH]; IF header.protRange.low<=protocol3 AND header.protRange.high>=protocol3 THEN WITH header.body SELECT FROM reject, return, abort => -- got a procedure return [] ← response[hops, GetElapsedTime[tH, receptionTime], header, blk]; ENDCASE => NULL; expeditedResponses ← expeditedResponses + 1; END; -- got a response -- mainline code expeditedCalls ← expeditedCalls + 1; BEGIN ENABLE -- don't send to unreachable nets {Router.NoTableEntryForNet => {routerGiveups ← routerGiveups + 1; CONTINUE};}; hops:CARDINAL ← Router.GetDelayToNet[address.net]; -- unreachable nets pxH:PacketExchange.ExchangeHandle ← PacketTimer.Checkout[hops]; requestBuffer ← PacketExchangeInternal.GetFreeSendPacket[pxH]; tH ← PrepareCallBuffer[requestBuffer, programNumber, versionNumber, procedureNumber, arguments, address]; PacketTimer.SendPackets[pxH, requestBuffer, GotAResponse]; END; -- enabled END; SendIt: PRIVATE PROCEDURE[ b: NSBuffer.Buffer, soc: Socket.ChannelHandle, distance: ExpeditedCourier.Hop, pauseTime: Process.Ticks, isDirected: BOOLEAN, action: ExpeditedCourier.ExpandingRingAction] = BEGIN RequeueTheBuffer: ENTRY PROCEDURE [theBuf: NSBuffer.Buffer] = BEGIN <<ENABLE UNWIND => NULL;>> IF theBuf#b THEN ERROR; requeued ← TRUE; BROADCAST bufferFreed; END; RecoverTheBuffer: ENTRY PROCEDURE = BEGIN <<ENABLE UNWIND => NULL; --bufferFreed is not abortable>> UNTIL requeued DO WAIT bufferFreed ENDLOOP; END; -- the destination host and socket number of the buffer have been filled in. requeued: BOOLEAN; tries: CARDINAL ← 1; outerLoopCount: CARDINAL ← 2; netDelayTime: Process.Ticks ← 1; net: System.NetworkNumber ← Router.startEnumeration; sendBuffer: PROC[Socket.ChannelHandle, NSBuffer.Buffer] ← Socket.PutPacket; IF action=reliablyFindAllServers THEN BEGIN netDelayTime ← pauseTime; tries ← 5; outerLoopCount ← 1; END; b.requeueProcedure ← RequeueTheBuffer; FOR i: CARDINAL IN [0 .. outerLoopCount) DO SELECT TRUE FROM isDirected => FOR t: CARDINAL IN [0 .. tries) DO requeued ← FALSE; sendBuffer[soc, b]; RecoverTheBuffer[]; Process.Pause[netDelayTime]; ENDLOOP; distance=0 => BEGIN sendBuffer ← Socket.BroadcastPacketToAllConnectedNets; requeued ← FALSE; sendBuffer[soc, b]; RecoverTheBuffer[]; END; ENDCASE => -- not directed and distance not zero BEGIN WHILE (net ← Router.EnumerateRoutingTable[ net, distance]) # Router.endEnumeration DO b.ns.destination.net ← net; FOR t: CARDINAL IN [0 .. tries) DO requeued ← FALSE; sendBuffer[soc, b]; RecoverTheBuffer[]; Process.Pause[netDelayTime]; ENDLOOP; Process.Pause[netDelayTime]; ENDLOOP; END; -- case that hop is not zero Process.Pause[pauseTime]; ENDLOOP; -- simple send loop END; -- of procedure SendIt SendToRing: PUBLIC PROCEDURE[ b: NSBuffer.Buffer, localSocketHandle: Socket.ChannelHandle, ring: ExpeditedCourier.RingBound, action: ExpeditedCourier.ExpandingRingAction, isDirected: BOOLEAN, newRadiusNotify: ExpeditedCourier.NewRadiusNotifyProc] RETURNS [continue: BOOLEAN ← TRUE] = BEGIN filled: BOOLEAN ← FALSE; -- the destination host and socket number of the buffer have been filled in. pauseTime: Process.Ticks ← SELECT action FROM findMostServersInShortTime => reasonableDelay, ENDCASE => reasonableDelay*7; b.ns.destination.host ← System.broadcastHostNumber; IF isDirected THEN BEGIN foundNet: BOOLEAN ← TRUE; [] ← Router.GetDelayToNet[b.ns.destination.net ! Router.NoTableEntryForNet => {foundNet ← FALSE; CONTINUE}]; -- primes the routing table IF foundNet THEN SendIt[b, localSocketHandle, 0, pauseTime, TRUE, action]; RETURN; END; { ENABLE UNWIND => IF filled THEN Router.FillRoutingTable[0]; FOR i: ExpeditedCourier.Hop IN [ring.low .. ring.high] DO << We don't have to fill the table for local net stuff. If the client is looking for the 'first' answer, then we'll never have to fill the table and we'll not incure all that overhead. >> IF ~filled AND (i # 0) THEN {filled ← TRUE; Router.FillRoutingTable[ring.high]}; IF newRadiusNotify#NIL THEN continue ← newRadiusNotify[i]; IF NOT continue THEN EXIT; SendIt[b, localSocketHandle, i, pauseTime, FALSE, action]; Process.Pause[pauseTime]; ENDLOOP; }; IF filled THEN Router.FillRoutingTable[0]; -- unfill routing table <<Process.Pause[ring.high*pauseTime];>> END; DirectedBroadcastCall: PUBLIC PROCEDURE[ programNumber: LONG CARDINAL, versionNumber: CARDINAL, procedureNumber: CARDINAL, arguments: Courier.Parameters, address: System.NetworkAddress, action: ExpeditedCourier.ExpandingRingAction, eachResponse: ExpeditedCourier.ResponseProc, responseBufferCount: CARDINAL ] = BEGIN dummyRing: ExpeditedCourier.RingBound; -- declared here because wait time depends on the max ring size, -- which is defaulted to max hop count. CommonCallBroadcaster[ programNumber, versionNumber, procedureNumber, arguments, dummyRing, address, action, eachResponse, NIL, responseBufferCount, TRUE]; END; CallToInternetRing: PUBLIC PROCEDURE[ programNumber: LONG CARDINAL, versionNumber: CARDINAL, procedureNumber: CARDINAL, arguments: Courier.Parameters, ring: ExpeditedCourier.RingBound, socket: System.SocketNumber, action: ExpeditedCourier.ExpandingRingAction, eachResponse: ExpeditedCourier.ResponseProc, newRadiusNotify: ExpeditedCourier.NewRadiusNotifyProc, responseBufferCount: CARDINAL] = BEGIN address: System.NetworkAddress; address.socket ← socket; CommonCallBroadcaster[ programNumber, versionNumber, procedureNumber, arguments, ring, address, action, eachResponse, newRadiusNotify, responseBufferCount, FALSE]; END; CallToAddresses: PUBLIC PROCEDURE[ programNumber: LONG CARDINAL, versionNumber: CARDINAL, procedureNumber: CARDINAL, arguments: Courier.Parameters, socket: System.SocketNumber, addresses: ExpeditedCourier.AddrList, response: ExpeditedCourier.ResponseProc, responseBufferCount: CARDINAL ← 5 ] = BEGIN sendInProgress, ok, continue: BOOLEAN ← TRUE; sendProcess: PROCESS ← NIL; localSocket: Socket.ChannelHandle ← Socket.Create[ System.nullSocketNumber, 1, responseBufferCount]; requestBuffer: NSBuffer.Buffer ← Socket.GetSendBuffer[localSocket]; blk: Environment.Block; sH: Stream.Handle ← NIL; header: ExpeditedCourier.Header; l: CARDINAL; tH: System.Pulses; elpseTime: ExpeditedCourier.ElapsedTime; replyBuffer: NSBuffer.Buffer ← NIL; hopsToResponder: ExpeditedCourier.Hop; FreeResources: PROCEDURE = BEGIN IF sH # NIL THEN {MemoryStream.Destroy[sH]; sH ← NIL}; IF sendInProgress AND (sendProcess # NIL) THEN Process.Abort[sendProcess]; IF sendProcess # NIL THEN {JOIN sendProcess; sendProcess ← NIL}; IF requestBuffer # NIL THEN {Socket.ReturnBuffer[requestBuffer]; requestBuffer ← NIL}; IF replyBuffer # NIL THEN {Socket.ReturnBuffer[replyBuffer]; replyBuffer ← NIL}; Socket.Delete[localSocket]; END; Sender: PROCEDURE = BEGIN requeued: BOOLEAN; RequeueTheBuffer: ENTRY PROCEDURE [theBuf: NSBuffer.Buffer] = BEGIN <<ENABLE UNWIND => NULL;>> IF theBuf#requestBuffer THEN ERROR; requeued ← TRUE; BROADCAST bufferFreed; END; RecoverTheBuffer: ENTRY PROCEDURE = BEGIN <<ENABLE UNWIND => NULL; --bufferFreed is not abortable>> UNTIL requeued DO WAIT bufferFreed ENDLOOP; END; FOR i: CARDINAL IN [0 .. 9) DO ENABLE ABORTED => EXIT; FOR a: ExpeditedCourier.AddrList ← addresses, a.next UNTIL a=NIL DO requestBuffer.ns.destination ← a.address; requestBuffer.ns.destination.socket ← socket; requestBuffer.requeueProcedure ← RequeueTheBuffer; requeued ← FALSE; Socket.PutPacket[localSocket, requestBuffer]; RecoverTheBuffer[]; IF NOT continue THEN EXIT; Process.Pause[reasonableDelay]; ENDLOOP; IF NOT continue THEN EXIT; Process.Pause[reasonableDelay*7]; ENDLOOP; sendInProgress ← FALSE; END; Socket.SetWaitTime[localSocket, 2500]; { ENABLE UNWIND => FreeResources[]; tH ← PrepareCallBuffer[ requestBuffer, programNumber, versionNumber, procedureNumber, arguments, System.nullNetworkAddress]; sendProcess ← FORK Sender[]; WHILE continue DO replyBuffer ← Socket.GetPacket[localSocket ! Socket.TimeOut => IF sendInProgress THEN RETRY ELSE EXIT; ABORTED => EXIT]; l ← replyBuffer.ns.pktLength - bytesPerExchangePacketHeader; l ← MIN[l, NSTypes.maxDataBytesPerExchange]; blk ← [LOOPHOLE[@replyBuffer.ns.exchangeWords], 0, l]; sH ← MemoryStream.Create[blk]; ok ← replyBuffer.ns.packetType=packetExchange AND replyBuffer.ns.exchangeType=expeditedCourierExchangeType AND replyBuffer.ns.exchangeID=requestBuffer.ns.exchangeID; IF ok THEN Courier.DeserializeParameters[ [@header, DescribeHeader], sH, NIL ! MemoryStream.IndexOutOfRange => {ok ← FALSE; CONTINUE}; Courier.Error => {ok ← FALSE; CONTINUE}; ByteBlt.StartIndexGreaterThanStopIndexPlusOne => {ok ← FALSE; CONTINUE}]; l ← Inline.LowHalf[MemoryStream.GetIndex[sH]]; MemoryStream.Destroy[sH]; sH ← NIL; IF ok AND header.protRange.low<=protocol3 AND header.protRange.high>=protocol3 THEN BEGIN blk.startIndex ← l; WITH header.body SELECT FROM reject, return, abort => BEGIN elpseTime ← GetElapsedTime[tH, replyBuffer.fo.time]; hopsToResponder ← Router.GetDelayToNet[replyBuffer.ns.source.net ! Router.NoTableEntryForNet => { hopsToResponder ← Router.infinity; CONTINUE }]; continue ← response[hopsToResponder, elpseTime, header, blk]; END; ENDCASE => NULL; END; Socket.ReturnBuffer[replyBuffer]; replyBuffer ← NIL; ENDLOOP; }; -- enable unwind FreeResources[]; END; CommonCallBroadcaster: PRIVATE PROCEDURE[ programNumber: LONG CARDINAL, versionNumber: CARDINAL, procedureNumber: CARDINAL, arguments: Courier.Parameters, ring: ExpeditedCourier.RingBound, address: System.NetworkAddress, action: ExpeditedCourier.ExpandingRingAction, eachResponse: ExpeditedCourier.ResponseProc, newRadiusNotify: ExpeditedCourier.NewRadiusNotifyProc, responseBufferCount: CARDINAL, isDirected: BOOLEAN ] = BEGIN Sender: PROCEDURE = BEGIN myPriority: Process.Priority ← Process.GetPriority[]; IF myPriority > Process.priorityBackground THEN Process.SetPriority[myPriority - 1]; continue ← SendToRing[ requestBuffer, localSocket, ring, action, isDirected, newRadiusNotify ! ABORTED => CONTINUE]; sendInProgress ← FALSE; END; FreeResources: PROCEDURE = BEGIN IF sH # NIL THEN {MemoryStream.Destroy[sH]; sH ← NIL}; IF sendProcess # NIL THEN {Process.Abort[sendProcess]; JOIN sendProcess; sendProcess ← NIL}; IF requestBuffer # NIL THEN {Socket.ReturnBuffer[requestBuffer]; requestBuffer ← NIL}; IF replyBuffer # NIL THEN {Socket.ReturnBuffer[replyBuffer]; replyBuffer ← NIL}; Socket.Delete[localSocket]; END; finWaitCount: CARDINAL ← 0; Finished: PROCEDURE RETURNS [finished: BOOLEAN] = BEGIN IF NOT continue OR finWaitCount>=ring.high OR (~sendInProgress AND standAlone) THEN RETURN[TRUE]; finished ← FALSE; finWaitCount ← finWaitCount + 2; END; tH: System.Pulses; blk: Environment.Block; sH: Stream.Handle ← NIL; sendProcess: PROCESS ← NIL; header: ExpeditedCourier.Header; replyBuffer: NSBuffer.Buffer ← NIL; hopsToResponder: ExpeditedCourier.Hop; elpseTime: ExpeditedCourier.ElapsedTime; continue, sendInProgress: BOOLEAN ← TRUE; localSocket: Socket.ChannelHandle ← Socket.Create[ System.nullSocketNumber, 1, responseBufferCount]; requestBuffer: NSBuffer.Buffer ← Socket.GetSendBuffer[localSocket]; standAlone: BOOLEAN ← Router.FindDestinationRelativeNetID[System.nullNetworkNumber] = System.nullNetworkNumber; Socket.SetWaitTime[localSocket, 2500]; { ENABLE UNWIND => FreeResources[]; tH ← PrepareCallBuffer[ requestBuffer, programNumber, versionNumber, procedureNumber, arguments, address]; sendProcess ← FORK Sender[]; WHILE continue DO replyBuffer ← Socket.GetPacket[localSocket ! Socket.TimeOut => IF sendInProgress THEN RETRY ELSE CONTINUE; ABORTED => CONTINUE]; IF replyBuffer=NIL THEN IF Finished[] THEN EXIT ELSE LOOP; blk ← [LOOPHOLE[@replyBuffer.ns.exchangeWords], 0, MIN[(replyBuffer.ns.pktLength - bytesPerExchangePacketHeader), NSTypes.maxDataBytesPerExchange]]; IF replyBuffer.ns.packetType=packetExchange AND replyBuffer.ns.exchangeType=expeditedCourierExchangeType AND replyBuffer.ns.exchangeID=requestBuffer.ns.exchangeID THEN BEGIN sH ← MemoryStream.Create[blk]; BEGIN Courier.DeserializeParameters[ [@header, DescribeHeader], sH, NIL ! MemoryStream.IndexOutOfRange, Courier.Error, ByteBlt.StartIndexGreaterThanStopIndexPlusOne => GOTO notOk]; blk.startIndex ← Inline.LowHalf[MemoryStream.GetIndex[sH]]; -- here we ignore protocol version numbers and the response body MUST be -- a "return" body. WITH header.body SELECT FROM return, reject, abort => BEGIN elpseTime ← GetElapsedTime[tH, replyBuffer.fo.time]; hopsToResponder ← Router.GetDelayToNet[replyBuffer.ns.source.net ! Router.NoTableEntryForNet => {hopsToResponder ← Router.infinity; CONTINUE}]; continue ← eachResponse[hopsToResponder, elpseTime, header, blk]; END; ENDCASE; EXITS notOk => NULL; END; MemoryStream.Destroy[sH]; sH ← NIL; END; Socket.ReturnBuffer[replyBuffer]; replyBuffer ← NIL; ENDLOOP; }; -- enable unwind FreeResources[]; END; -- - - - - - - - - - - - - - - - -- Server side of the deal... -- - - - - - - - - - - - - - - - processesPerService: CARDINAL = 2; buffersPerService: CARDINAL = 2; requestObjectSize: CARDINAL = SIZE[PacketExchange.RequestObject]; RequestObject: TYPE = ARRAY [0 .. requestObjectSize) OF WORD; ProcessInfo: TYPE = RECORD [ pid: PROCESS, currentRequest: RequestObject ]; ExpeditedServiceHandle: PUBLIC TYPE = LONG POINTER TO ExpeditedServiceRecord; ExpeditedServiceRecord: TYPE = RECORD [ next: ExpeditedServiceHandle, services: ExpeditedCourier.Services, pleaseStop: BOOLEAN, exH: PacketExchange.ExchangeHandle, state: ARRAY [0 .. processesPerService) OF ProcessInfo ]; heap: UNCOUNTED ZONE ← NIL; head: ExpeditedServiceHandle ← NIL; ExportExpeditedPrograms: PUBLIC ENTRY PROCEDURE[ services: ExpeditedCourier.Services, socket: System.SocketNumber] RETURNS[h: ExpeditedServiceHandle] = BEGIN i: CARDINAL ← LENGTH[services]; sequenceOfService: TYPE = RECORD[ SEQUENCE COMPUTED CARDINAL OF ExpeditedCourier.Service]; sequencePtr: LONG POINTER TO sequenceOfService; exchanger: PacketExchange.ExchangeHandle ← MakeAnExchanger[socket]; h ← heap.NEW[ExpeditedServiceRecord]; h.next ← head; head ← h; h.pleaseStop ← FALSE; sequencePtr ← heap.NEW[sequenceOfService[i]]; h.services ← DESCRIPTOR[sequencePtr, i]; FOR j: CARDINAL IN [0 .. i) DO h.services[j] ← services[j]; ENDLOOP; h.exH ← exchanger; FOR j: CARDINAL IN [0 .. processesPerService) DO h.state[j].pid ← FORK Dispatcher[h, j]; ENDLOOP; END; UnexportExpeditedPrograms: PUBLIC PROCEDURE [h: ExpeditedServiceHandle] = BEGIN DoItLocked: ENTRY PROCEDURE = BEGIN prev, this: ExpeditedServiceHandle; FOR this ← head, this.next UNTIL this = h DO prev ← this ENDLOOP; IF this=head THEN head ← head.next ELSE prev.next ← this.next; h.next ← NIL; FreeAnExchanger[h.exH]; IF exchangerCount=0 THEN RETURN; -- don't free stuff if heap was deleted heap.FREE[@h.services.BASE]; h.services ← DESCRIPTOR[NIL, 0]; heap.FREE[@h]; END; h.pleaseStop ← TRUE; PacketExchange.SetWaitTimes[h.exH, 100, 100]; FOR j: CARDINAL IN [0 .. processesPerService) DO Process.Abort[h.state[j].pid]; -- wake them all before waiting to join any ENDLOOP; FOR j: CARDINAL IN [0 .. processesPerService) DO JOIN h.state[j].pid; -- must do this outside of monitor ENDLOOP; DoItLocked[]; END; exchangerCount: CARDINAL ← 0; MakeAnExchanger: INTERNAL PROCEDURE[ socket: System.SocketNumber] RETURNS[exH: PacketExchange.ExchangeHandle] = BEGIN SELECT TRUE FROM (exchangerCount # 0) => NULL; (System.isUtilityPilot) => heap ← Heap.systemZone; ENDCASE => heap ← Heap.Create[ initial: 10, increment: 6, swapUnitSize: 1, largeNodeThreshold: 512, checking: FALSE, ownerChecking: Heap.GetAttributes[Heap.systemZone].ownerChecking]; exH ← PacketExchangeInternal.Create[socket, buffersPerService, ]; exchangerCount ← exchangerCount + 1; END; FreeAnExchanger: INTERNAL PROCEDURE [exH: PacketExchange.ExchangeHandle] = BEGIN PacketExchange.Delete[exH]; exchangerCount ← exchangerCount - 1; IF exchangerCount=0 THEN {Heap.Delete[heap]; heap ← NIL}; END; Dispatcher: PRIVATE PROCEDURE [myH: ExpeditedServiceHandle, myIndex: CARDINAL] = BEGIN OPEN myH; b: NSBuffer.Buffer; theRequestData: PacketExchange.RequestObject; sH: Stream.Handle; header: ExpeditedCourier.Header; serviceDispatcher: ExpeditedCourier.DispatcherProc; ok, wasBroadcasted: BOOLEAN; dataStartIndex: CARDINAL; UNTIL myH.pleaseStop DO state[myIndex].currentRequest ← ALL[0]; ok ← TRUE; b ← PacketExchangeInternal.WaitForRequestPacket[ exH, expeditedCourierExchangeType ! PacketExchange.Error => SELECT why FROM aborted, noReceiverAtDestination, rejectedByReceiver, hardwareProblem => EXIT; timeout, insufficientResourcesAtDestination => LOOP; ENDCASE => IF ~doDebug THEN LOOP; ABORTED => EXIT; ]; theRequestData ← [ nBytes: b.ns.pktLength - bytesPerExchangePacketHeader, requestType: b.ns.exchangeType, requestorsAddress: b.ns.source, requestorsExchangeID: b.ns.exchangeID]; -- use pkt exch buffer here!! wasBroadcasted ← b.ns.destination.host=System.broadcastHostNumber; sH ← MemoryStream.Create[[@b.ns.exchangeBytes, 0, theRequestData.nBytes]]; Courier.DeserializeParameters[[@header, DescribeHeader], sH, NIL ! MemoryStream.IndexOutOfRange => {ok ← FALSE; CONTINUE}; Courier.Error => {ok ← FALSE; CONTINUE}; ByteBlt.StartIndexGreaterThanStopIndexPlusOne => {ok ← FALSE; CONTINUE}]; dataStartIndex ← Inline.LowHalf[MemoryStream.GetIndex[sH]]; MemoryStream.Destroy[sH]; ok ← ok AND header.protRange.low<=protocol3 AND header.protRange.high>=protocol3; IF ok THEN WITH header.body SELECT FROM call => BEGIN [ok, serviceDispatcher, program] ← FindServiceDispatcher[ program, version, procedure, LOOPHOLE[LONG[@theRequestData]], myH, myIndex]; IF ok THEN BEGIN -- note this is same storage passed to client sH ← MemoryStream.Create[ [@b.ns.exchangeBytes, 0, PacketExchange.maxBlockLength]]; ok ← serviceDispatcher[ program, version, procedure, [@b.ns.exchangeBytes, dataStartIndex, theRequestData.nBytes], sH, wasBroadcasted]; b.ns.pktLength ← Inline.LowHalf[ MemoryStream.GetIndex[sH]] + bytesPerExchangePacketHeader; MemoryStream.Destroy[sH]; END; -- all was ok END; -- header was a "call" header ENDCASE => ok ← FALSE; -- header was not a "call" header IF ok AND (NOT myH.pleaseStop) THEN BEGIN ENABLE PacketExchange.Error, ABORTED => CONTINUE; --b ← PacketExchangeInternal.GetFreeSendPacket[exH]; --b.ns.pktLength ← replyBlock.stopIndexPlusOne + bytesPerExchangePacketHeader; --b.ns.packetType ← packetExchange; --b.ns.exchangeType ← expeditedCourierExchangeType; --b.ns.exchangeID ← rH.requestorsExchangeID; b.ns.destination ← b.ns.source; PacketExchangeInternal.SendReplyPacket[exH, b]; LOOP; END; Socket.ReturnBuffer[b]; b ← NIL; ENDLOOP; END; FindServiceDispatcher: PRIVATE ENTRY PROCEDURE[ program: LONG CARDINAL, version: CARDINAL, procedure: CARDINAL, currentRequest: LONG POINTER TO RequestObject, h: ExpeditedServiceHandle, index: CARDINAL] RETURNS [ ok: BOOLEAN, proc: ExpeditedCourier.DispatcherProc, theRealProgram: LONG CARDINAL] = BEGIN FindProgramInHandle: PROCEDURE [exH: ExpeditedServiceHandle] RETURNS [isBindProc: BOOLEAN ← FALSE, proc: ExpeditedCourier.DispatcherProc ← NIL] = BEGIN OPEN exH↑; currentProgram: LONG CARDINAL; FOR i: CARDINAL IN [0 .. LENGTH[services]) DO currentProgram ← services[i].programNumber; IF (program=currentProgram OR PacketExchange.ExchWords[program]=currentProgram) -- backward hack for AR 13784 AND services[i].versionRange.low<=version AND services[i].versionRange.high>=version THEN BEGIN theRealProgram ← currentProgram; -- backward hack for AR 13784 proc ← services[i].dispatcher; isBindProc ← procedure=services[i].bindRequestProcedure; RETURN; END; ENDLOOP; END; currentH: ExpeditedServiceHandle ← head; [ok, proc] ← FindProgramInHandle[h]; WHILE proc=NIL AND currentH#NIL DO [ok, proc] ← FindProgramInHandle[currentH]; currentH ← currentH.next; ENDLOOP; IF proc#NIL AND NOT ok THEN -- is really ok only if the the request is not already being processed BEGIN FOR i: CARDINAL IN [0 .. processesPerService) DO IF currentRequest↑ = h.state[i].currentRequest THEN RETURN; -- not ok ENDLOOP; h.state[index].currentRequest ← currentRequest↑; ok ← TRUE; END; END; -- mainline code Process.InitializeCondition[@bufferFreed, Process.MsecToTicks[60]]; END.. LOG ??-???-85 ??:??:?? - DxG - Created file (copied mostly from CourierExpediter) 19-Jun-85 11:24:44 - JxS - Added Copyright. Changed CourierInternal to CourierProtocol. Changes for Socket interface changes. 12-Nov-85 8:17:56 - AOF - Cleanup 17-Jan-86 11:12:27 - AOF - Removed ML deadlock transmitting and requeuing b's 26-Feb-86 14:52:00 - AOF - Don't fill routing table for local net rings 30-May-86 19:24:20 - AOF - New buffer package 11-Feb-87 20:31:12 - AOF - Don't delete instance unless in UNWIND 11-Feb-87 20:31:12 - Woods - Join processes when unexporting (else can end up freeing buffer after buffer pool has been destroyed).