docker读取容器日志关键代码分析

通过api获取jsonfilelog格式的日志信息时,时不时的发现读取速度比较慢,于是试着去理解docker内部的实现原理,

Container logs的api的使用方法:

https://docs.docker.com/engine/api/v1.26/#operation/ContainerLogs

https://docs.docker.com/engine/admin/logging/overview/#options

Docker daemon会把容器日志所在路径下的所有日志文件拼接成一个ReadSeeker,然后从中按顺序读取解析。

由于日志文件都是一个是由一条一条的json格式的日志记录组成,每条日志都得先被解析,然后才能发给客户端;

当指定since参数的值时,会把每一条日志解析完之后与since参数做对比,只有满足条件才会发给客户端

但不指定tail参数时,默认是读取所有的日志;

tail指定一个值时,如果这个值相对比较大时,比如100000, dockerdaemon的响应速度会比较慢,可以通过增大代码里的blockSize来提高响应速度。

blockSize的默认值是1024字节,自己曾经试着把这个值改成32K,发现增速效果明显,下面是自己的一组测试数据:

blockSize=1k

[root@kube-node80 ~]# date;docker logs -t=true --tail=100000 ab8e412c37eb5d > test.log 2>&1;date

Fri Mar 17 17:32:30 CST 2017

Fri Mar 17 17:34:23 CST 2017

   

blockSize=10k

[root@kube-node80 ~]# date;docker logs -t=true --tail=100000 c76b9481028cdf0b > test.log 2>&1;date

Fri Mar 17 17:53:07 CST 2017

Fri Mar 17 17:53:19 CST 2017

   

blockSize=32K

[root@kube-node80 ~]# date;docker logs -t=true --tail=100000 3e82f16be274443b > test.log 2>&1;date

Fri Mar 17 18:07:40 CST 2017

Fri Mar 17 18:07:45 CST 2017

   

   

下面是关键代码的分析:

container/

container.go

func (container *Container)startLogging()error {

    ifcontainer.HostConfig.LogConfig.Type == "none" {

        return nil // do not start logging routines

    }

    l,err :=container.StartLogger()

    iferr != nil {

        returnfmt.Errorf("failed to initialize logging driver: %v",err)

    }

    copier :=logger.NewCopier(map[string]io.Reader{"stdout":container.StdoutPipe(), "stderr":container.StderrPipe()},l)

    container.LogCopier =copier

    copier.Run()

    container.LogDriver =l

    // set LogPath field only for json-file logdriver

    ifjl,ok :=l.(*jsonfilelog.JSONFileLogger);ok {

        container.LogPath =jl.LogPath()

    }

    return nil

}

   

// initRoutes initializes the routes in container router

func (r *containerRouter)initRoutes() {

    r.routes = []router.Route{

        // HEAD

        router.NewHeadRoute("/containers/{name:.*}/archive",r.headContainersArchive),

        // GET

        router.NewGetRoute("/containers/json",r.getContainersJSON),

        router.NewGetRoute("/containers/{name:.*}/export",r.getContainersExport),

        router.NewGetRoute("/containers/{name:.*}/changes",r.getContainersChanges),

        router.NewGetRoute("/containers/{name:.*}/json",r.getContainersByName),

        router.NewGetRoute("/containers/{name:.*}/top",r.getContainersTop),

        router.Cancellable(router.NewGetRoute("/containers/{name:.*}/logs",r.getContainersLogs)),

        router.Cancellable(router.NewGetRoute("/containers/{name:.*}/stats",r.getContainersStats)),

        router.NewGetRoute("/containers/{name:.*}/attach/ws",r.wsContainersAttach),

        router.NewGetRoute("/exec/{id:.*}/json",r.getExecByID),

        router.NewGetRoute("/containers/{name:.*}/archive",r.getContainersArchive),

        // POST

        router.NewPostRoute("/containers/create",r.postContainersCreate),

        router.NewPostRoute("/containers/{name:.*}/kill",r.postContainersKill),

        router.NewPostRoute("/containers/{name:.*}/pause",r.postContainersPause),

        router.NewPostRoute("/containers/{name:.*}/unpause",r.postContainersUnpause),

        router.NewPostRoute("/containers/{name:.*}/restart",r.postContainersRestart),

        router.NewPostRoute("/containers/{name:.*}/start",r.postContainersStart),

        router.NewPostRoute("/containers/{name:.*}/stop",r.postContainersStop),

        router.NewPostRoute("/containers/{name:.*}/wait",r.postContainersWait),

        router.NewPostRoute("/containers/{name:.*}/resize",r.postContainersResize),

        router.NewPostRoute("/containers/{name:.*}/attach",r.postContainersAttach),

        router.NewPostRoute("/containers/{name:.*}/copy",r.postContainersCopy), // Deprecated since 1.8, Errors out since 1.12

        router.NewPostRoute("/containers/{name:.*}/exec",r.postContainerExecCreate),

        router.NewPostRoute("/exec/{name:.*}/start",r.postContainerExecStart),

        router.NewPostRoute("/exec/{name:.*}/resize",r.postContainerExecResize),

        router.NewPostRoute("/containers/{name:.*}/rename",r.postContainerRename),

        router.NewPostRoute("/containers/{name:.*}/update",r.postContainerUpdate),

        router.NewPostRoute("/containers/prune",r.postContainersPrune),

        // PUT

        router.NewPutRoute("/containers/{name:.*}/archive",r.putContainersArchive),

        // DELETE

        router.NewDeleteRoute("/containers/{name:.*}",r.deleteContainers),

    }

}

api/

server/

router/

container/

container_routes.go

func (s *containerRouter)getContainersLogs(ctx context.Context,w http.ResponseWriter,r *http.Request,vars map[string]string)error {

    iferr :=httputils.ParseForm(r);err != nil {

        returnerr

    }

    // Args are validated before the stream starts because when it starts we're

    // sending HTTP 200 by writing an empty chunk of data to tell the client that

    // daemon is going to stream. By sending this initial HTTP 200 we can't report

    // any error after the stream starts (i.e. container not found, wrong parameters)

    // with the appropriate status code.

    stdout,stderr :=httputils.BoolValue(r, "stdout"),httputils.BoolValue(r, "stderr")

    if !(stdout ||stderr) {

        returnfmt.Errorf("Bad parameters: you must choose at least one stream")

    }

    containerName :=vars["name"]

    logsConfig := &backend.ContainerLogsConfig{

        ContainerLogsOptions:types.ContainerLogsOptions{

            Follow:httputils.BoolValue(r, "follow"),

            Timestamps:httputils.BoolValue(r, "timestamps"),

            Since:r.Form.Get("since"),

            Tail:r.Form.Get("tail"),

            ShowStdout:stdout,

            ShowStderr:stderr,

            Details:httputils.BoolValue(r, "details"),

        },

        OutStream:w,

    }

    chStarted :=make(chan struct{})

    iferr :=s.backend.ContainerLogs(ctx,containerName,logsConfig,chStarted);err != nil {

        select {

        case <-chStarted:

            // The client may be expecting all of the data we're sending to

            // be multiplexed, so mux it through the Systemerr stream, which

            // will cause the client to throw an error when demuxing

            stdwriter :=stdcopy.NewStdWriter(logsConfig.OutStream,stdcopy.Systemerr)

            fmt.Fprintf(stdwriter, "Error running logs job: %v\n",err)

        default:

            returnerr

        }

    }

    return nil

}

   

   

daemon/

logs.go

// ContainerLogs hooks up a container's stdout and stderr streams

// configured with the given struct.

func (daemon *Daemon)ContainerLogs(ctx context.Context,containerName string,config *backend.ContainerLogsConfig,started chan struct{})error {

    if !(config.ShowStdout ||config.ShowStderr) {

        returnerrors.New("You must choose at least one stream")

    }

    container,err :=daemon.GetContainer(containerName)

    iferr != nil {

        returnerr

    }

………..

    logs :=logReader.ReadLogs(readConfig)

    // Close logWatcher on exit

    defer func() {

        logs.Close()

        ifcLog !=container.LogDriver {

            // Since the logger isn't cached in the container, which

            // occurs if it is running, it must get explicitly closed

            // here to avoid leaking it and any file handles it has.

            iferr :=cLog.Close();err != nil {

                logrus.Errorf("Error closing logger: %v",err)

            }

        }

    }()

}

logger/

jsonfilelog/

read.go

// ReadLogs implements the logger's LogReader interface for the logs

// created by this driver.

func (l *JSONFileLogger)ReadLogs(config logger.ReadConfig) *logger.LogWatcher {

    logWatcher :=logger.NewLogWatcher()

    gol.readLogs(logWatcher,config)

    returnlogWatcher

}

func (l *JSONFileLogger)readLogs(logWatcher *logger.LogWatcher,config logger.ReadConfig) {

    deferclose(logWatcher.Msg)

……..

followLogs()

}

   

funcfollowLogs(f *os.File,logWatcher *logger.LogWatcher,notifyRotate chan interface{},since time.Time) {

    dec :=json.NewDecoder(f)

    l := &jsonlog.JSONLog{}

    // main loop

    for {

        msg,err := decodeLogLine(dec,l)

        iferr != nil {

            iferr :=handleDecodeErr(err);err != nil {

                iferr ==errDone {

                    return

                }

                // we got an unrecoverable error, so return

                logWatcher.Err <-err

                return

            }

            // ready to try again

            continue

        }

}

   


版权声明:本文为weixin_37763985原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。