链路追踪
Trace 和 Span
在OpenTracing(基本是行业标准了~)中,一个重要的概念是“trace”,它表示从头到尾的一个请求的调用链,它的标识符是“traceID”。 一个“trace”包含有许多跨度(span),每个跨度捕获调用链内的一个工作单元,并由“spanId”标识。 每个跨度具有一个父跨度,并且一个“trace”的所有跨度形成有向无环图(DAG)。 以下是跨度之间的关系图。

- span:链路中的一个操作,存储时间和某些信息
一个 REST 调用或者数据库操作等,都可以作为一个 span 。 span 是分布式追踪的最小跟踪单位,一个 Trace 可由多段 Span 组成
type Span struct {
ctx spanContext
serviceName string
operationName string
startTime time.Time
flag string
children int
}
- spanContext:保存链路的上下文信息「traceid,spanid,或者是其他想要传递的内容」实现了
tracer接口
type spanContext struct {
traceId string
spanId string
}
- noop:空的
tracer实现
var emptyNoopSpan = noopSpan{}
type noopSpan struct{}
func (s noopSpan) Finish() {
}
func (s noopSpan) Follow(ctx context.Context, serviceName, operationName string) (context.Context, tracespec.Trace) {
return ctx, emptyNoopSpan
}
func (s noopSpan) Fork(ctx context.Context, serviceName, operationName string) (context.Context, tracespec.Trace) {
return ctx, emptyNoopSpan
}
func (s noopSpan) SpanId() string {
return ""
}
func (s noopSpan) TraceId() string {
return ""
}
func (s noopSpan) Visit(fn func(key, val string) bool) {
}
Span操作
- fork span (只要调用一次rpc client 就会 从当前span fork 出新的一份span)
// core/trace/span.go
func (s *Span) Fork(ctx context.Context, serviceName, operationName string) (context.Context, tracespec.Trace) {
span := &Span{
ctx: spanContext{
traceId: s.ctx.traceId,
spanId: s.forkSpanId(),
},
serviceName: serviceName,
operationName: operationName,
startTime: timex.Time(),
flag: clientFlag,
}
return context.WithValue(ctx, tracespec.TracingKey, span), span
}
func (s *Span) forkSpanId() string {
s.children++
return fmt.Sprintf("%s.%d", s.ctx.spanId, s.children)
}
- follow span
// core/trace/span.go
func (s *Span) Follow(ctx context.Context, serviceName, operationName string) (context.Context, tracespec.Trace) {
span := &Span{
ctx: spanContext{
traceId: s.ctx.traceId,
spanId: s.followSpanId(),
},
serviceName: serviceName,
operationName: operationName,
startTime: timex.Time(),
flag: s.flag,
}
return context.WithValue(ctx, tracespec.TracingKey, span), span
}
func (s *Span) followSpanId() string {
fields := strings.FieldsFunc(s.ctx.spanId, func(r rune) bool {
return r == spanSepRune
})
if len(fields) == 0 {
return s.ctx.spanId
}
last := fields[len(fields)-1]
val, err := strconv.Atoi(last)
if err != nil {
return s.ctx.spanId
}
last = strconv.Itoa(val + 1)
fields[len(fields)-1] = last
return strings.Join(fields, spanSep)
}
举例:
原本Span的 traceId为 1,spanId为1.2,children为2
现在经过客户端的拦截器后,会fork出新的span(具体步骤看?源码),此时新的span traceId为 1,spanId为1.2.3,children为3
如果是使用follow的话,则新的span traceId为 1,spanId为1.3,children为2
RPC Server端(HTTP Server类似)
// zrpc/internal/serverinterceptors/tracinginterceptor.go
func UnaryTracingInterceptor(serviceName string) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (resp interface{}, err error) {
// 从上下文中获取metadata
md, ok := metadata.FromIncomingContext(ctx
if !ok {
return handler(ctx, req)
}
// 把metadata组装成carrier类型,carrier有可能是空的
carrier, err := trace.Extract(trace.GrpcFormat, md)
if err != nil {
return handler(ctx, req)
}
// 开启新span,并把「traceId,spanId」封装在 context 中
ctx, span := trace.StartServerSpan(ctx, carrier, serviceName, info.FullMethod)
defer span.Finish()
return handler(ctx, req)
}
}
// core/trace/span.go
func StartServerSpan(ctx context.Context, carrier Carrier, serviceName, operationName string) (
context.Context, tracespec.Trace) {
// 开启新span
span := newServerSpan(carrier, serviceName, operationName)
// 将span信息写入上下文中
return context.WithValue(ctx, tracespec.TracingKey, span), span
}
func newServerSpan(carrier Carrier, serviceName, operationName string) tracespec.Trace {
// 从上下文中获取「traceId」,如果没有就新创建一个
traceId := stringx.TakeWithPriority(func() string {
if carrier != nil {
return carrier.Get(traceIdKey)
}
return ""
}, stringx.RandId)
// 同理获取「spanId」
spanId := stringx.TakeWithPriority(func() string {
if carrier != nil {
return carrier.Get(spanIdKey)
}
return ""
}, func() string {
return initSpanId
})
return &Span{
ctx: spanContext{
traceId: traceId,
spanId: spanId,
},
serviceName: serviceName,
operationName: operationName,
startTime: timex.Time(),
flag: serverFlag, // 标记flag,还有client的flag
}
}
RPC Client端
在 rpc 中存在 client, server ,所以从 tracing 上也有 clientTracing, serverTracing 。
往grpc自带的Interceptor拦截器中注入TracingInterceptor方法。该方法实现根据ctx来生成子span
func (c *client) buildDialOptions(opts ...ClientOption) []grpc.DialOption {
var clientOptions ClientOptions
for _, opt := range opts {
opt(&clientOptions)
}
options := []grpc.DialOption{
grpc.WithInsecure(),
grpc.WithBlock(),
WithUnaryClientInterceptors(
clientinterceptors.TracingInterceptor,
clientinterceptors.DurationInterceptor,
clientinterceptors.BreakerInterceptor,
clientinterceptors.PrometheusInterceptor,
clientinterceptors.TimeoutInterceptor(clientOptions.Timeout),
),
}
for _, interceptor := range c.interceptors {
options = append(options, WithUnaryClientInterceptors(interceptor))
}
return append(options, clientOptions.DialOptions...)
}
// zrpc/internal/clientinterceptors/tracinginterceptor.go
func TracingInterceptor(ctx context.Context, method string, req, reply interface{},
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
ctx, span := trace.StartClientSpan(ctx, cc.Target(), method)
defer span.Finish()
var pairs []string
span.Visit(func(key, val string) bool {
pairs = append(pairs, key, val)
return true
})
ctx = metadata.AppendToOutgoingContext(ctx, pairs...)
return invoker(ctx, method, req, reply, cc, opts...)
}
// core/trace/span.go
func StartClientSpan(ctx context.Context, serviceName, operationName string) (context.Context, tracespec.Trace) {
if span, ok := ctx.Value(tracespec.TracingKey).(*Span); ok {
return span.Fork(ctx, serviceName, operationName)
}
return ctx, emptyNoopSpan
}
func (s *Span) Fork(ctx context.Context, serviceName, operationName string) (context.Context, tracespec.Trace) {
span := &Span{
ctx: spanContext{
traceId: s.ctx.traceId,
spanId: s.forkSpanId(),
},
serviceName: serviceName,
operationName: operationName,
startTime: timex.Time(),
flag: clientFlag,
}
return context.WithValue(ctx, tracespec.TracingKey, span), span
}
获取信息
上面两步都是获取或创建链路信息并将其写入到上下文中,下面这步则是获取链路信息的方法(直接copy了单元测试的代码过来?)
// rpc 获取 「traceId、spanId」
func TestUnaryTracingInterceptor_GrpcFormat(t *testing.T) {
interceptor := UnaryTracingInterceptor("foo")
var wg sync.WaitGroup
wg.Add(1)
var md metadata.MD
ctx := metadata.NewIncomingContext(context.Background(), md)
_, err := interceptor(ctx, nil, &grpc.UnaryServerInfo{
FullMethod: "/",
}, func(ctx context.Context, req interface{}) (interface{}, error) {
defer wg.Done()
assert.True(t, len(ctx.Value(tracespec.TracingKey).(tracespec.Trace).TraceId()) > 0)
assert.True(t, len(ctx.Value(tracespec.TracingKey).(tracespec.Trace).SpanId()) > 0)
return nil, nil
})
wg.Wait()
assert.Nil(t, err)
}
// http 获取 「traceId、spanId」
func TestTracingHandler(t *testing.T) {
req := httptest.NewRequest(http.MethodGet, "http://localhost", nil)
req.Header.Set("X-Trace-ID", "theid")
handler := TracingHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
span, ok := r.Context().Value(tracespec.TracingKey).(tracespec.Trace)
assert.True(t, ok)
assert.Equal(t, "theid", span.TraceId())
}))
resp := httptest.NewRecorder()
handler.ServeHTTP(resp, req)
assert.Equal(t, http.StatusOK, resp.Code)
}
调用链
逻辑调用关系
- httpServer->grpcClient1->grpcServer1->grpcClient2->grpcServer2
- grpcClient1->grpcServer1
同样,用户信息等也可通过context传递。流量染色,限流熔断等也是类似思路。
监控
我们可以自己实现实现「tracer」接口,然后把数据report到例如Prometheus或者Jaeger中进行可视化展示,并且可以很直观的看出整条链路中哪个span是比较耗时的,可以有针对性的进行优化
全链路超时控制
2层
- 拦截器每次判断ctx是否超时,超时直接返回超时错误,不进行服务的调用
const reason = "Request Timeout"
func TimeoutHandler(duration time.Duration) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
if duration > 0 {
return http.TimeoutHandler(next, duration, reason)
} else {
return next
}
}
}
const defaultTimeout = time.Second * 2
func TimeoutInterceptor(timeout time.Duration) grpc.UnaryClientInterceptor {
if timeout <= 0 {
timeout = defaultTimeout
}
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn,
invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
ctx, cancel := contextx.ShrinkDeadline(ctx, timeout)
defer cancel()
return invoker(ctx, method, req, reply, cc, opts...)
}
}
- 每次调用下游函数/服务前select ctx.done一下,同样是由于拦截器对ctx做了处理,会获取剩余的时间并调用withtimeout,因此同样可以做到超时的控制
func UnaryTimeoutInterceptor(timeout time.Duration) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (resp interface{}, err error) {
ctx, cancel := contextx.ShrinkDeadline(ctx, timeout)
defer cancel()
return handler(ctx, req)
}
}