nats 源码记录-集群内ROUTER链接建立

环境搭建:

nats源码学习记录--程序启动,使用对应的seed.conf中的配置。

添加新的ROUTER,gnatsd.exe -p 5222 -cluster nats://localhost:5248 -routes nats://0.0.0.0:4248 -D ,gnatsd.exe 为gnatsd程序。

 

服务开启的时候,会搜索本集群 router对应的url,对应于上面信息的nats://0.0.0.0:4248;找到后,就立即启动链接的建立。

func (s *Server) StartRouting(clientListenReady chan struct{}) {
   defer s.grWG.Done()

   // Wait for the client listen port to be opened, and
   // the possible ephemeral port to be selected.
   <-clientListenReady

   // Spin up the accept loop
   ch := make(chan struct{})

//本地监听开启,方便本集群的其他ROUTER发链接请求
   go s.routeAcceptLoop(ch)  
   <-ch

   // Solicit Routes if needed.
//和本集群 内的其他router建立连接
   s.solicitRoutes(s.getOpts().Routes)
}
func (s *Server) solicitRoutes(routes []*url.URL) {
   for _, r := range routes {
      route := r
      fmt.Println("solicitRoutes:", r)
     //针对每个URL分别发出链接请求
      s.startGoRoutine(func() { s.connectToRoute(route, true) })
   }
}

继续往后跟踪会走到一下函数,此函数主要时构建了一个与远端链接的CLIENT,及client对应的读写循环。

func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client {
   // Snapshot server options.
   opts := s.getOpts()

   didSolicit := rURL != nil
   r := &route{didSolicit: didSolicit}
   for _, route := range opts.Routes {
      if rURL != nil && (strings.EqualFold(rURL.Host, route.Host)) {
         r.routeType = Explicit
      }
   }

//构建用于管理 与远端的链接的CLIENT
   c := &client{srv:s, nc: conn, opts: clientOpts{}, kind: ROUTER, msubs: -1, mpay: -1, route: r}

   // Grab server variables
   s.mu.Lock()
   s.generateRouteInfoJSON()
   infoJSON := s.routeInfoJSON
   authRequired := s.routeInfo.AuthRequired
   tlsRequired := s.routeInfo.TLSRequired
   s.mu.Unlock()

   // Grab lock
   c.mu.Lock()

   // Initialize
   c.initClient()

   if didSolicit {
      // Do this before the TLS code, otherwise, in case of failure
      // and if route is explicit, it would try to reconnect to 'nil'...
      r.url = rURL
   }

。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。

   // Do final client initialization

   // Initialize the per-account cache.
   c.in.pacache = make(map[string]*perAccountCache, maxPerAccountCacheSize)
   if didSolicit {
      // Set permissions associated with the route user (if applicable).
      // No lock needed since we are already under client lock.
      c.setRoutePermissions(opts.Cluster.Permissions)
   }

//监测链路是否断开的常用机制
   // Set the Ping timer
   c.setPingTimer()

 //和远端通信需要的读协程
   // Spin up the read loop.
   s.startGoRoutine(c.readLoop)
//向远端写信息协程
   // Spin up the write loop.
   s.startGoRoutine(c.writeLoop)

   把请求发出去,以便和其他ROUTER建立链接
   if didSolicit {
      c.Debugf("Route connect msg sent")
      c.sendConnect(tlsRequired)
   }

  告知对端本地的信息,可以理解为协议握手
   c.sendInfo(infoJSON)
   c.mu.Unlock()

   c.Noticef("Route connection created")
   return c
}

//下面看下接收端,此函数在server启动的时候启动的,用于监听来自集群内的router的请求

func (s *Server) routeAcceptLoop(ch chan struct{}) {
   defer func() {
      if ch != nil {
         close(ch)
      }
   }()

   // Snapshot server options.
   opts := s.getOpts()

   // Snapshot server options.
   port := opts.Cluster.Port

   if port == -1 {
      port = 0
   }
   //tcp协议写法,都比较熟悉
   l, e := net.Listen("tcp", hp)

   s.mu.Lock()
   // For tests, we want to be able to make this server behave
   // as an older server so we use the variable which we can override.
   proto := testRouteProto
   // Check for TLSConfig
   tlsReq := opts.Cluster.TLSConfig != nil
//握手协议信息准备
   info := Info{
      ID:           s.info.ID,
      Version:      s.info.Version,
      GoVersion:    runtime.Version(),
      AuthRequired: false,
      TLSRequired:  tlsReq,
      TLSVerify:    tlsReq,
      MaxPayload:   s.info.MaxPayload,
      Proto:        proto,
      GatewayURL:   s.getGatewayURL(),
   }
   // Set this if only if advertise is not disabled
   if !opts.Cluster.NoAdvertise {
      info.ClientConnectURLs = s.clientConnectURLs
   }
。。。。。。。。。。。。。。。。。。。。。。。。。
   // Setup state that can enable shutdown
   s.routeListener = l
   s.mu.Unlock()

   // Let them know we are up
   close(ch)
   ch = nil

   tmpDelay := ACCEPT_MIN_SLEEP

   for s.isRunning() {
      conn, err := l.Accept()
      if err != nil {
         tmpDelay = s.acceptError("Route", err, tmpDelay)
         continue
      }
      tmpDelay = ACCEPT_MIN_SLEEP
      s.startGoRoutine(func() {
//新的请求到来后,本地创建一个新的router,,,原理同发送端
         s.createRoute(conn, nil)
         s.grWG.Done()
      })
   }
   s.Debugf("Router accept loop exiting..")
   s.done <- true
}

版权声明:本文为wxw1198原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。