环境搭建:
nats源码学习记录--程序启动,使用对应的seed.conf中的配置。
添加新的ROUTER,gnatsd.exe -p 5222 -cluster nats://localhost:5248 -routes nats://0.0.0.0:4248 -D ,gnatsd.exe 为gnatsd程序。
服务开启的时候,会搜索本集群 router对应的url,对应于上面信息的nats://0.0.0.0:4248;找到后,就立即启动链接的建立。
func (s *Server) StartRouting(clientListenReady chan struct{}) {
defer s.grWG.Done()
// Wait for the client listen port to be opened, and
// the possible ephemeral port to be selected.
<-clientListenReady
// Spin up the accept loop
ch := make(chan struct{})
//本地监听开启,方便本集群的其他ROUTER发链接请求
go s.routeAcceptLoop(ch)
<-ch
// Solicit Routes if needed.
//和本集群 内的其他router建立连接
s.solicitRoutes(s.getOpts().Routes)
}
func (s *Server) solicitRoutes(routes []*url.URL) {
for _, r := range routes {
route := r
fmt.Println("solicitRoutes:", r)
//针对每个URL分别发出链接请求
s.startGoRoutine(func() { s.connectToRoute(route, true) })
}
}继续往后跟踪会走到一下函数,此函数主要时构建了一个与远端链接的CLIENT,及client对应的读写循环。
func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client {
// Snapshot server options.
opts := s.getOpts()
didSolicit := rURL != nil
r := &route{didSolicit: didSolicit}
for _, route := range opts.Routes {
if rURL != nil && (strings.EqualFold(rURL.Host, route.Host)) {
r.routeType = Explicit
}
}
//构建用于管理 与远端的链接的CLIENT
c := &client{srv:s, nc: conn, opts: clientOpts{}, kind: ROUTER, msubs: -1, mpay: -1, route: r}
// Grab server variables
s.mu.Lock()
s.generateRouteInfoJSON()
infoJSON := s.routeInfoJSON
authRequired := s.routeInfo.AuthRequired
tlsRequired := s.routeInfo.TLSRequired
s.mu.Unlock()
// Grab lock
c.mu.Lock()
// Initialize
c.initClient()
if didSolicit {
// Do this before the TLS code, otherwise, in case of failure
// and if route is explicit, it would try to reconnect to 'nil'...
r.url = rURL
}
。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
// Do final client initialization
// Initialize the per-account cache.
c.in.pacache = make(map[string]*perAccountCache, maxPerAccountCacheSize)
if didSolicit {
// Set permissions associated with the route user (if applicable).
// No lock needed since we are already under client lock.
c.setRoutePermissions(opts.Cluster.Permissions)
}
//监测链路是否断开的常用机制
// Set the Ping timer
c.setPingTimer()
//和远端通信需要的读协程
// Spin up the read loop.
s.startGoRoutine(c.readLoop)
//向远端写信息协程
// Spin up the write loop.
s.startGoRoutine(c.writeLoop)
把请求发出去,以便和其他ROUTER建立链接
if didSolicit {
c.Debugf("Route connect msg sent")
c.sendConnect(tlsRequired)
}
告知对端本地的信息,可以理解为协议握手
c.sendInfo(infoJSON)
c.mu.Unlock()
c.Noticef("Route connection created")
return c
}//下面看下接收端,此函数在server启动的时候启动的,用于监听来自集群内的router的请求
func (s *Server) routeAcceptLoop(ch chan struct{}) {
defer func() {
if ch != nil {
close(ch)
}
}()
// Snapshot server options.
opts := s.getOpts()
// Snapshot server options.
port := opts.Cluster.Port
if port == -1 {
port = 0
}
//tcp协议写法,都比较熟悉
l, e := net.Listen("tcp", hp)
s.mu.Lock()
// For tests, we want to be able to make this server behave
// as an older server so we use the variable which we can override.
proto := testRouteProto
// Check for TLSConfig
tlsReq := opts.Cluster.TLSConfig != nil
//握手协议信息准备
info := Info{
ID: s.info.ID,
Version: s.info.Version,
GoVersion: runtime.Version(),
AuthRequired: false,
TLSRequired: tlsReq,
TLSVerify: tlsReq,
MaxPayload: s.info.MaxPayload,
Proto: proto,
GatewayURL: s.getGatewayURL(),
}
// Set this if only if advertise is not disabled
if !opts.Cluster.NoAdvertise {
info.ClientConnectURLs = s.clientConnectURLs
}
。。。。。。。。。。。。。。。。。。。。。。。。。
// Setup state that can enable shutdown
s.routeListener = l
s.mu.Unlock()
// Let them know we are up
close(ch)
ch = nil
tmpDelay := ACCEPT_MIN_SLEEP
for s.isRunning() {
conn, err := l.Accept()
if err != nil {
tmpDelay = s.acceptError("Route", err, tmpDelay)
continue
}
tmpDelay = ACCEPT_MIN_SLEEP
s.startGoRoutine(func() {
//新的请求到来后,本地创建一个新的router,,,原理同发送端
s.createRoute(conn, nil)
s.grWG.Done()
})
}
s.Debugf("Router accept loop exiting..")
s.done <- true
}
版权声明:本文为wxw1198原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。