环境搭建:
nats源码学习记录--程序启动,使用对应的seed.conf中的配置。
添加新的ROUTER,gnatsd.exe -p 5222 -cluster nats://localhost:5248 -routes nats://0.0.0.0:4248 -D ,gnatsd.exe 为gnatsd程序。
服务开启的时候,会搜索本集群 router对应的url,对应于上面信息的nats://0.0.0.0:4248;找到后,就立即启动链接的建立。
func (s *Server) StartRouting(clientListenReady chan struct{}) { defer s.grWG.Done() // Wait for the client listen port to be opened, and // the possible ephemeral port to be selected. <-clientListenReady // Spin up the accept loop ch := make(chan struct{}) //本地监听开启,方便本集群的其他ROUTER发链接请求 go s.routeAcceptLoop(ch) <-ch // Solicit Routes if needed. //和本集群 内的其他router建立连接 s.solicitRoutes(s.getOpts().Routes) }
func (s *Server) solicitRoutes(routes []*url.URL) { for _, r := range routes { route := r fmt.Println("solicitRoutes:", r) //针对每个URL分别发出链接请求 s.startGoRoutine(func() { s.connectToRoute(route, true) }) } }
继续往后跟踪会走到一下函数,此函数主要时构建了一个与远端链接的CLIENT,及client对应的读写循环。
func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client { // Snapshot server options. opts := s.getOpts() didSolicit := rURL != nil r := &route{didSolicit: didSolicit} for _, route := range opts.Routes { if rURL != nil && (strings.EqualFold(rURL.Host, route.Host)) { r.routeType = Explicit } } //构建用于管理 与远端的链接的CLIENT c := &client{srv:s, nc: conn, opts: clientOpts{}, kind: ROUTER, msubs: -1, mpay: -1, route: r} // Grab server variables s.mu.Lock() s.generateRouteInfoJSON() infoJSON := s.routeInfoJSON authRequired := s.routeInfo.AuthRequired tlsRequired := s.routeInfo.TLSRequired s.mu.Unlock() // Grab lock c.mu.Lock() // Initialize c.initClient() if didSolicit { // Do this before the TLS code, otherwise, in case of failure // and if route is explicit, it would try to reconnect to 'nil'... r.url = rURL } 。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。 // Do final client initialization // Initialize the per-account cache. c.in.pacache = make(map[string]*perAccountCache, maxPerAccountCacheSize) if didSolicit { // Set permissions associated with the route user (if applicable). // No lock needed since we are already under client lock. c.setRoutePermissions(opts.Cluster.Permissions) } //监测链路是否断开的常用机制 // Set the Ping timer c.setPingTimer() //和远端通信需要的读协程 // Spin up the read loop. s.startGoRoutine(c.readLoop) //向远端写信息协程 // Spin up the write loop. s.startGoRoutine(c.writeLoop) 把请求发出去,以便和其他ROUTER建立链接 if didSolicit { c.Debugf("Route connect msg sent") c.sendConnect(tlsRequired) } 告知对端本地的信息,可以理解为协议握手 c.sendInfo(infoJSON) c.mu.Unlock() c.Noticef("Route connection created") return c }
//下面看下接收端,此函数在server启动的时候启动的,用于监听来自集群内的router的请求
func (s *Server) routeAcceptLoop(ch chan struct{}) { defer func() { if ch != nil { close(ch) } }() // Snapshot server options. opts := s.getOpts() // Snapshot server options. port := opts.Cluster.Port if port == -1 { port = 0 } //tcp协议写法,都比较熟悉 l, e := net.Listen("tcp", hp) s.mu.Lock() // For tests, we want to be able to make this server behave // as an older server so we use the variable which we can override. proto := testRouteProto // Check for TLSConfig tlsReq := opts.Cluster.TLSConfig != nil //握手协议信息准备 info := Info{ ID: s.info.ID, Version: s.info.Version, GoVersion: runtime.Version(), AuthRequired: false, TLSRequired: tlsReq, TLSVerify: tlsReq, MaxPayload: s.info.MaxPayload, Proto: proto, GatewayURL: s.getGatewayURL(), } // Set this if only if advertise is not disabled if !opts.Cluster.NoAdvertise { info.ClientConnectURLs = s.clientConnectURLs } 。。。。。。。。。。。。。。。。。。。。。。。。。 // Setup state that can enable shutdown s.routeListener = l s.mu.Unlock() // Let them know we are up close(ch) ch = nil tmpDelay := ACCEPT_MIN_SLEEP for s.isRunning() { conn, err := l.Accept() if err != nil { tmpDelay = s.acceptError("Route", err, tmpDelay) continue } tmpDelay = ACCEPT_MIN_SLEEP s.startGoRoutine(func() { //新的请求到来后,本地创建一个新的router,,,原理同发送端 s.createRoute(conn, nil) s.grWG.Done() }) } s.Debugf("Router accept loop exiting..") s.done <- true }
版权声明:本文为wxw1198原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。