go-zero的链路追踪

链路追踪

Trace 和 Span

在OpenTracing(基本是行业标准了~)中,一个重要的概念是“trace”,它表示从头到尾的一个请求的调用链,它的标识符是“traceID”。 一个“trace”包含有许多跨度(span),每个跨度捕获调用链内的一个工作单元,并由“spanId”标识。 每个跨度具有一个父跨度,并且一个“trace”的所有跨度形成有向无环图(DAG)。 以下是跨度之间的关系图。

  • span:链路中的一个操作,存储时间和某些信息

一个 REST 调用或者数据库操作等,都可以作为一个 spanspan 是分布式追踪的最小跟踪单位,一个 Trace 可由多段 Span 组成

type Span struct {
	ctx           spanContext
	serviceName   string
	operationName string
	startTime     time.Time
	flag          string
	children      int
}
  • spanContext:保存链路的上下文信息「traceid,spanid,或者是其他想要传递的内容」实现了 tracer接口
type spanContext struct {
	traceId string
	spanId  string
}
  • noop:空的 tracer 实现
var emptyNoopSpan = noopSpan{}

type noopSpan struct{}

func (s noopSpan) Finish() {
}

func (s noopSpan) Follow(ctx context.Context, serviceName, operationName string) (context.Context, tracespec.Trace) {
	return ctx, emptyNoopSpan
}

func (s noopSpan) Fork(ctx context.Context, serviceName, operationName string) (context.Context, tracespec.Trace) {
	return ctx, emptyNoopSpan
}

func (s noopSpan) SpanId() string {
	return ""
}

func (s noopSpan) TraceId() string {
	return ""
}

func (s noopSpan) Visit(fn func(key, val string) bool) {
}

Span操作

  • fork span (只要调用一次rpc client 就会 从当前span fork 出新的一份span)
// core/trace/span.go
func (s *Span) Fork(ctx context.Context, serviceName, operationName string) (context.Context, tracespec.Trace) {
	span := &Span{
		ctx: spanContext{
			traceId: s.ctx.traceId,
			spanId:  s.forkSpanId(),
		},
		serviceName:   serviceName,
		operationName: operationName,
		startTime:     timex.Time(),
		flag:          clientFlag,
	}
	return context.WithValue(ctx, tracespec.TracingKey, span), span
}

func (s *Span) forkSpanId() string {
   s.children++
   return fmt.Sprintf("%s.%d", s.ctx.spanId, s.children)
}
  • follow span
  // core/trace/span.go
  func (s *Span) Follow(ctx context.Context, serviceName, operationName string) (context.Context, tracespec.Trace) {
     span := &Span{
        ctx: spanContext{
           traceId: s.ctx.traceId,
           spanId:  s.followSpanId(),
        },
        serviceName:   serviceName,
        operationName: operationName,
        startTime:     timex.Time(),
        flag:          s.flag,
     }
     return context.WithValue(ctx, tracespec.TracingKey, span), span
  }
  
  func (s *Span) followSpanId() string {
  	fields := strings.FieldsFunc(s.ctx.spanId, func(r rune) bool {
  		return r == spanSepRune
  	})
  	if len(fields) == 0 {
  		return s.ctx.spanId
  	}
  
  	last := fields[len(fields)-1]
  	val, err := strconv.Atoi(last)
  	if err != nil {
  		return s.ctx.spanId
  	}
  
  	last = strconv.Itoa(val + 1)
  	fields[len(fields)-1] = last
  
  	return strings.Join(fields, spanSep)
  }

举例:

原本Span的 traceId为 1,spanId为1.2,children为2

现在经过客户端的拦截器后,会fork出新的span(具体步骤看?源码),此时新的span traceId为 1,spanId为1.2.3,children为3

如果是使用follow的话,则新的span traceId为 1,spanId为1.3,children为2

RPC Server端(HTTP Server类似)

// zrpc/internal/serverinterceptors/tracinginterceptor.go
func UnaryTracingInterceptor(serviceName string) grpc.UnaryServerInterceptor {
	return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
		handler grpc.UnaryHandler) (resp interface{}, err error) {
     // 从上下文中获取metadata
		md, ok := metadata.FromIncomingContext(ctx
		if !ok {
			return handler(ctx, req)
		}

    // 把metadata组装成carrier类型,carrier有可能是空的                                        
		carrier, err := trace.Extract(trace.GrpcFormat, md) 
		if err != nil {
			return handler(ctx, req)
		}
                                           
    // 开启新span,并把「traceId,spanId」封装在 context 中
		ctx, span := trace.StartServerSpan(ctx, carrier, serviceName, info.FullMethod)
		defer span.Finish()
		return handler(ctx, req)
	}
}

// core/trace/span.go
func StartServerSpan(ctx context.Context, carrier Carrier, serviceName, operationName string) (
	context.Context, tracespec.Trace) {
  // 开启新span
	span := newServerSpan(carrier, serviceName, operationName) 
   // 将span信息写入上下文中
	return context.WithValue(ctx, tracespec.TracingKey, span), span
}

func newServerSpan(carrier Carrier, serviceName, operationName string) tracespec.Trace {
   // 从上下文中获取「traceId」,如果没有就新创建一个
  traceId := stringx.TakeWithPriority(func() string {
		if carrier != nil {
			return carrier.Get(traceIdKey)
		}
		return ""
	}, stringx.RandId)
   // 同理获取「spanId」
	spanId := stringx.TakeWithPriority(func() string {
		if carrier != nil {
			return carrier.Get(spanIdKey)
		}
		return ""
	}, func() string {
		return initSpanId
	})

	return &Span{
		ctx: spanContext{
			traceId: traceId,
			spanId:  spanId,
		},
		serviceName:   serviceName,
		operationName: operationName,
		startTime:     timex.Time(),
		flag:          serverFlag, // 标记flag,还有client的flag
	}
}

RPC Client端

在 rpc 中存在 client, server ,所以从 tracing 上也有 clientTracing, serverTracing

往grpc自带的Interceptor拦截器中注入TracingInterceptor方法。该方法实现根据ctx来生成子span

 func (c *client) buildDialOptions(opts ...ClientOption) []grpc.DialOption {
	var clientOptions ClientOptions
	for _, opt := range opts {
		opt(&clientOptions)
	}
 
	options := []grpc.DialOption{
		grpc.WithInsecure(),
		grpc.WithBlock(),
		WithUnaryClientInterceptors(
			clientinterceptors.TracingInterceptor,
			clientinterceptors.DurationInterceptor,
			clientinterceptors.BreakerInterceptor,
			clientinterceptors.PrometheusInterceptor,
			clientinterceptors.TimeoutInterceptor(clientOptions.Timeout),
		),
	}
	for _, interceptor := range c.interceptors {
		options = append(options, WithUnaryClientInterceptors(interceptor))
	}
 
	return append(options, clientOptions.DialOptions...)
}

// zrpc/internal/clientinterceptors/tracinginterceptor.go
func TracingInterceptor(ctx context.Context, method string, req, reply interface{},
	cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
	ctx, span := trace.StartClientSpan(ctx, cc.Target(), method)
	defer span.Finish()

	var pairs []string
	span.Visit(func(key, val string) bool {
		pairs = append(pairs, key, val)
		return true
	})
	ctx = metadata.AppendToOutgoingContext(ctx, pairs...)

	return invoker(ctx, method, req, reply, cc, opts...)
}

// core/trace/span.go
func StartClientSpan(ctx context.Context, serviceName, operationName string) (context.Context, tracespec.Trace) {
	if span, ok := ctx.Value(tracespec.TracingKey).(*Span); ok {
		return span.Fork(ctx, serviceName, operationName)
	}

	return ctx, emptyNoopSpan
}

func (s *Span) Fork(ctx context.Context, serviceName, operationName string) (context.Context, tracespec.Trace) {
	span := &Span{
		ctx: spanContext{
			traceId: s.ctx.traceId,
			spanId:  s.forkSpanId(),
		},
		serviceName:   serviceName,
		operationName: operationName,
		startTime:     timex.Time(),
		flag:          clientFlag,
	}
	return context.WithValue(ctx, tracespec.TracingKey, span), span
}

获取信息

上面两步都是获取或创建链路信息并将其写入到上下文中,下面这步则是获取链路信息的方法(直接copy了单元测试的代码过来?)

// rpc 获取 「traceId、spanId」
func TestUnaryTracingInterceptor_GrpcFormat(t *testing.T) {
	interceptor := UnaryTracingInterceptor("foo")
	var wg sync.WaitGroup
	wg.Add(1)
	var md metadata.MD
	ctx := metadata.NewIncomingContext(context.Background(), md)
	_, err := interceptor(ctx, nil, &grpc.UnaryServerInfo{
		FullMethod: "/",
	}, func(ctx context.Context, req interface{}) (interface{}, error) {
		defer wg.Done()
		assert.True(t, len(ctx.Value(tracespec.TracingKey).(tracespec.Trace).TraceId()) > 0)
		assert.True(t, len(ctx.Value(tracespec.TracingKey).(tracespec.Trace).SpanId()) > 0)
		return nil, nil
	})
	wg.Wait()
	assert.Nil(t, err)
}

// http 获取 「traceId、spanId」
func TestTracingHandler(t *testing.T) {
	req := httptest.NewRequest(http.MethodGet, "http://localhost", nil)
	req.Header.Set("X-Trace-ID", "theid")
	handler := TracingHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		span, ok := r.Context().Value(tracespec.TracingKey).(tracespec.Trace)
		assert.True(t, ok)
		assert.Equal(t, "theid", span.TraceId())
	}))

	resp := httptest.NewRecorder()
	handler.ServeHTTP(resp, req)
	assert.Equal(t, http.StatusOK, resp.Code)
}

调用链

逻辑调用关系

  • httpServer->grpcClient1->grpcServer1->grpcClient2->grpcServer2
  • grpcClient1->grpcServer1

同样,用户信息等也可通过context传递。流量染色,限流熔断等也是类似思路。

监控

我们可以自己实现实现「tracer」接口,然后把数据report到例如Prometheus或者Jaeger中进行可视化展示,并且可以很直观的看出整条链路中哪个span是比较耗时的,可以有针对性的进行优化

全链路超时控制

2层

  • 拦截器每次判断ctx是否超时,超时直接返回超时错误,不进行服务的调用
const reason = "Request Timeout"

func TimeoutHandler(duration time.Duration) func(http.Handler) http.Handler {
	return func(next http.Handler) http.Handler {
		if duration > 0 {
			return http.TimeoutHandler(next, duration, reason)
		} else {
			return next
		}
	}
}

const defaultTimeout = time.Second * 2

func TimeoutInterceptor(timeout time.Duration) grpc.UnaryClientInterceptor {
	if timeout <= 0 {
		timeout = defaultTimeout
	}

	return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn,
		invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
		ctx, cancel := contextx.ShrinkDeadline(ctx, timeout)
		defer cancel()
		return invoker(ctx, method, req, reply, cc, opts...)
	}
}
  • 每次调用下游函数/服务前select ctx.done一下,同样是由于拦截器对ctx做了处理,会获取剩余的时间并调用withtimeout,因此同样可以做到超时的控制
func UnaryTimeoutInterceptor(timeout time.Duration) grpc.UnaryServerInterceptor {
	return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
		handler grpc.UnaryHandler) (resp interface{}, err error) {
		ctx, cancel := contextx.ShrinkDeadline(ctx, timeout)
		defer cancel()
		return handler(ctx, req)
	}
}

版权声明:本文为zhetmdoubeizhanyong原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。