// GrapevineLocate.bcpl
// Copyright Xerox Corporation 1981, 1982, 1983
// Last modified September 19, 1983  9:37 AM by Taft

get "Pup0.decl"
get "Pup1.decl"
get "Grapevine.decl"
get "GrapevineProtocol.decl"
get "GrapevineInternal.decl"

external
[
// outgoing procedures
FindServer

// incoming procedures
ReadRList; DestroyRList; ReadRString; GVCreateStream
OpenLevel1Socket; CloseLevel1Socket; CompletePup
GetPBI; ReleasePBI; SetPupDPort; LocateNet
EnumeratePupAddresses
InitializeContext; Enqueue; Dequeue; Unqueue; QueueLength
StringCompare
Block; Dismiss; SetTimer; TimerHasExpired
Allocate; Free; MoveBlock; Zero

// incoming statics
@gus; pupCtxQ; ndbQ
]

structure ARec:	// ATable record for R-Server probing
[
requestPort @Port // port to prod
hops word	// distance from here; zero if preSorted
// The replyPort doesn't have anything to do with the corresponding requestPort.
// They are kept in the same data structure in order to avoid having two separate
// tables.  (The number of potential replies is essentially the same as
// the number of requests.)
replyPort @Port
]
manifest lenARec = size ARec/16

structure ATable: // Address table for R-Server probing
[
count word	// number of ARecs
maxCount word	// maximum number of ARecs that will fit in ATable
preSorted word	// true iff ARecs are already sorted by hop count
nReplies word	// number of replies received
rec↑0,0 @ARec
]
manifest lenATableHeader = offset ATable.rec/16

//----------------------------------------------------------------------------
let FindServer(serviceName, pollingSocket, proc, arg) = valof
//----------------------------------------------------------------------------
// Attempts to find an instance of the service with the specified name.
// If serviceName contains a ".", treats it as an RName (which must be
// a Grapevine group) and attempts to locate the nearest functioning instance
// of the service among the group's members (which must be individuals
// whose connect sites are valid Pup address constants).
// If serviceName does not contain a ".", treats it as an NLS name and
// attempts to locate the nearest functioning instance of the service without
// consulting Grapevine (a local broadcast is also issued).
// In either case, for each potential instance of the service, calls proc(port, arg),
// which should attempt to open a connection to the given port and return true
// if successful and false if not (arg may be used to communicate additional
// parameters and/or results).  Proc should at least default and perhaps
// unconditionally set the port's socket field to the appropriate well-known
// socket number before using it, since in general the service socket is distinct
// from the polling socket.  Note that proc will be called repeatedly until either
// it returns true or the list of potential instances is exhausted.

// If FindServer is successful, it returns zero; if unsuccessful, it returns one of
// ecBadRName (there is no such service) or ecAllDown (can't contact any instance of
// the service).

// The service to be located must respond to EchoMe requests on a well-known
// socket, the low 16 bits of which are given as pollingSocket and the
// high 16 bits of which are gus>>GUS.socketHigh (zero except when testing).  Any
// service (Grapevine or non-Grapevine) obeying this convention may be located,
// regardless of whether it is named by an RName or an NLS name.

// The caller is assumed NOT to have done a GVClaimStream(), since FindServer may
// call the Grapevine package recursively to read group memberships and such.
// If proc manipulates the stream then it must call GVClaimStream internally.
[
let zone = gus>>GUS.zone
let ec = ecAllDown

// see what kind of name, and set up some defaults
if StringCompare(serviceName, "GV.GV") eq 0 then
   // prevent infinite recursion in the Grapevine name space.  (This should
   // not actually happen, but this test is here anyway for safety.)
   serviceName = "GrapevineRServer"
let haveRName = false
for i = 1 to serviceName>>String.length do
   if serviceName>>String.char↑i eq $. then [ haveRName = true; break ]

// FindServer (cont'd)

// obtain table of connect ports for service
let aTable = 0
test haveRName
   ifnot
      [ // Obtain list of addresses by looking up serviceName as an NLS-name,
      // and also by broadcasting.
      manifest maxAddrs = 10  // we'll take the 10 closest
      manifest lenATable = lenATableHeader + maxAddrs*lenARec
      aTable = Allocate(zone, lenATable); Zero(aTable, lenATable)
      aTable>>ATable.maxCount = maxAddrs
      // First request will be a local broadcast.
      // Remaining requests will be to ports registered in NLS.
      // Note that EnumeratePupAddresses gives them closest first.
      aTable>>ATable.preSorted = true
      AccumulateGVPort(table [ 0; 0; 0 ], aTable)
      EnumeratePupAddresses(serviceName, AccumulateGVPort, aTable)
      ]
   ifso
      [ // obtain the list by enumerating serviceName as a Grapevine group
      // and then obtaining the connect site for each member.
      let rList = ReadRList(serviceName, opReadMembers, lv ec)
      if rList ne 0 then
         [
         let count = QueueLength(lv rList>>RList.queue)
         let lenATable = lenATableHeader + count*lenARec
         aTable = Allocate(zone, lenATable); Zero(aTable, lenATable)
         aTable>>ATable.maxCount = count
         let rItem = rList>>RList.queue.head
         while rItem ne 0 do
            [ // get connect site string for member and convert to address
	    let connect = ReadRString(lv rItem>>RItem.rName, opReadConnect)
	    if connect ne 0 then
	       [
               // suppress usual reachability check, since we will do our own
               // computation based on hop counts later on.
	       let port = vec lenPort
	       if EnumeratePupAddresses(connect, 0, port, true) eq 0 then
	          AccumulateGVPort(port, aTable, nil)
	       Free(zone, connect)
	       ]
	    rItem = rItem>>RItem.next
	    ]
         DestroyRList(rList)
         ]
      ]

if aTable eq 0 resultis ec  // ecBadRName or ecAllDown

// FindServer (cont'd)

// Now things start to get tricky...
// Send Echo packets to each socket in aTable, in order of hop count so as
// to bias the choice to nearby servers.  As replies come back from each
// server, attempt to establish a byte stream to that server; and terminate
// the polling process as soon as this is successful.
// This is done by three processes in order to prevent deadlocks due to
// hogging PBIs.  The Prodder sends out EchoMe Pups to the addresses in
// the table.  The Slurper receives the IAmEcho responses, and marks the
// table.  Meanwhile, the original process reads the table and attempts to
// establish connections to the marked addresses.

let soc = vec lenPupSoc
OpenLevel1Socket(soc, 0, 0, true)  // transient default local socket
let nRoutesFound = 0

for try = 1 to 4 do  // try the whole process up to 4 times
   [
   unless nRoutesFound eq aTable>>ATable.count % aTable>>ATable.preSorted do
      // try to get all the hop counts.  Note: hop counts were initialized
      // to zero if preSorted, maxHops+1 otherwise.
      for routeTry = 0 to 10 do
         [
         let dontProbe = (routeTry & 1) eq 0
         for i = 0 to aTable>>ATable.count-1 do
            [
            let aRec = lv aTable>>ATable.rec↑i
            if aRec>>ARec.hops gr maxHops then
               [
               let rte = LocateNet(aRec>>ARec.requestPort.net, dontProbe)
               if rte ne 0 then
	          [
	          aRec>>ARec.hops = rte>>RTE.hops
	          if aRec>>ARec.hops le maxHops then nRoutesFound = nRoutesFound+1
	          ]
	       ]
            ]
         if nRoutesFound eq aTable>>ATable.count %
	  nRoutesFound gr 0 & routeTry ge 2 then break
         unless dontProbe do Dismiss(100)
         ]

   aTable>>ATable.nReplies = 0  // no replies yet
   let nextReplyToExamine = 0

   // create Prodder & Slurper processes
   let ctxTable = vec 1
   for i = 0 to 1 do
      [
      let ctx = InitializeContext(Allocate(zone, 150), 150,
       (i eq 0? Prodder, Slurper), 3)
      ctx!3 = aTable; ctx!4 = soc  // args needed by Prodder & Slurper
      ctx!5 = pollingSocket  // arg needed by Prodder only
      Enqueue(pupCtxQ, ctx)
      ctxTable!i = ctx
      ]

// FindServer (cont'd)

   // collect responses produced by Prodder & Slurper
      [ // repeat
      // wait for reply to be returned by Slurper
      let timer = nil; SetTimer(lv timer, 150)  // 1.5 seconds
      Dismiss(1) repeatuntil aTable>>ATable.nReplies gr nextReplyToExamine %
       TimerHasExpired(lv timer)
      if aTable>>ATable.nReplies eq nextReplyToExamine then break  // timed out

      // have a candidate server; try to access it.
      let port = lv aTable>>ATable.rec↑nextReplyToExamine.replyPort
      nextReplyToExamine = nextReplyToExamine+1
      if proc(port, arg) then [ ec = 0; break ]  // successfully accessed service
      ] repeat

   // destroy Prodder & Slurper processes
   for i = 0 to 1 do
      [ Unqueue(pupCtxQ, ctxTable!i); Free(zone, ctxTable!i) ]

   if ec eq 0 then break  // successfully opened stream
   ]

CloseLevel1Socket(soc)
Free(zone, aTable)

resultis ec  // zero (successful) or ecAllDown
]

//----------------------------------------------------------------------------
and AccumulateGVPort(port, aTable, nil; numargs na) = valof
//----------------------------------------------------------------------------
// 2-argument call is from NLS expansion: don't accumulate specific addresses on
// the directly-connected network, since they will be reached by the initial broadcast.
// 3-argument call is from R-Name expansion: set hops field to maxHops+1.
[
let count = aTable>>ATable.count
if count eq aTable>>ATable.maxCount resultis true
let aRec = lv aTable>>ATable.rec↑count
test na eq 2
   ifso if port>>Port.net eq (ndbQ!0)>>NDB.localNet &
       port>>Port.host ne 0 then resultis false
   ifnot aRec>>ARec.hops = maxHops+1
MoveBlock(lv aRec>>ARec.requestPort, port, lenPort)
aTable>>ATable.count = count+1
resultis false
]

// Prodder and Slurper need not be OEPs, because the processes that execute
// them exist only during the lifetime of FindServer, which is in the
// same module as these.

//----------------------------------------------------------------------------
and Prodder(ctx) be
//----------------------------------------------------------------------------
[
let aTable = ctx!3
let soc = ctx!4
let pollingSocket = ctx!5

manifest lastHops = 3  // treat >3 hops as equivalent to 3
for hops = 0 to lastHops do
   for i = 0 to aTable>>ATable.count-1 do
      [
      let aRec = lv aTable>>ATable.rec↑i
      if aRec>>ARec.hops eq hops %
       (hops eq lastHops & aRec>>ARec.hops gr hops
        & aRec>>ARec.hops le maxHops) then
         [
	 let pbi = GetPBI(soc)
	 SetPupDPort(pbi, lv aRec>>ARec.requestPort)
	 pbi>>PBI.pup.dPort.socket↑1 = gus>>GUS.socketHigh
	 pbi>>PBI.pup.dPort.socket↑2 = pollingSocket
	 CompletePup(pbi, ptEchoMe, pupOvBytes)
	 // space requests 100 ms apart so as to favor earlier
	 // requests and avoid hogging PBIs.
	 Dismiss(10)
	 ]
      ]

Block() repeat
]

//----------------------------------------------------------------------------
and Slurper(ctx) be
//----------------------------------------------------------------------------
[
let aTable = ctx!3
let soc = ctx!4

   [ // repeat
   Block() repeatuntil soc>>PupSoc.iQ.head ne 0
   let pbi = Dequeue(lv soc>>PupSoc.iQ)
   if pbi>>PBI.pup.type eq ptImAnEcho then
      [
      let i = aTable>>ATable.nReplies
      if i uls aTable>>ATable.count then
         [
	 let aRec = lv aTable>>ATable.rec↑i
	 MoveBlock(lv aRec>>ARec.replyPort, lv pbi>>PBI.pup.sPort, lenPort)
         aTable>>ATable.nReplies = i+1
	 ]
      ]
   ReleasePBI(pbi)
   ] repeat
]