通过api获取jsonfilelog格式的日志信息时,时不时的发现读取速度比较慢,于是试着去理解docker内部的实现原理,
Container logs的api的使用方法:
https://docs.docker.com/engine/api/v1.26/#operation/ContainerLogs
https://docs.docker.com/engine/admin/logging/overview/#options
Docker daemon会把容器日志所在路径下的所有日志文件拼接成一个ReadSeeker,然后从中按顺序读取解析。
由于日志文件都是一个是由一条一条的json格式的日志记录组成,每条日志都得先被解析,然后才能发给客户端;
当指定since参数的值时,会把每一条日志解析完之后与since参数做对比,只有满足条件才会发给客户端
但不指定tail参数时,默认是读取所有的日志;
当tail指定一个值时,如果这个值相对比较大时,比如100000, dockerdaemon的响应速度会比较慢,可以通过增大代码里的blockSize来提高响应速度。
blockSize的默认值是1024字节,自己曾经试着把这个值改成32K,发现增速效果明显,下面是自己的一组测试数据:
blockSize=1k
[root@kube-node80 ~]# date;docker logs -t=true --tail=100000 ab8e412c37eb5d > test.log 2>&1;date
Fri Mar 17 17:32:30 CST 2017
Fri Mar 17 17:34:23 CST 2017
blockSize=10k
[root@kube-node80 ~]# date;docker logs -t=true --tail=100000 c76b9481028cdf0b > test.log 2>&1;date
Fri Mar 17 17:53:07 CST 2017
Fri Mar 17 17:53:19 CST 2017
blockSize=32K
[root@kube-node80 ~]# date;docker logs -t=true --tail=100000 3e82f16be274443b > test.log 2>&1;date
Fri Mar 17 18:07:40 CST 2017
Fri Mar 17 18:07:45 CST 2017
下面是关键代码的分析:
container/
container.go
func (container *Container)startLogging()error {
ifcontainer.HostConfig.LogConfig.Type == "none" {
return nil // do not start logging routines
}
l,err :=container.StartLogger()
iferr != nil {
returnfmt.Errorf("failed to initialize logging driver: %v",err)
}
copier :=logger.NewCopier(map[string]io.Reader{"stdout":container.StdoutPipe(), "stderr":container.StderrPipe()},l)
container.LogCopier =copier
copier.Run()
container.LogDriver =l
// set LogPath field only for json-file logdriver
ifjl,ok :=l.(*jsonfilelog.JSONFileLogger);ok {
container.LogPath =jl.LogPath()
}
return nil
}
// initRoutes initializes the routes in container router
func (r *containerRouter)initRoutes() {
r.routes = []router.Route{
// HEAD
router.NewHeadRoute("/containers/{name:.*}/archive",r.headContainersArchive),
// GET
router.NewGetRoute("/containers/json",r.getContainersJSON),
router.NewGetRoute("/containers/{name:.*}/export",r.getContainersExport),
router.NewGetRoute("/containers/{name:.*}/changes",r.getContainersChanges),
router.NewGetRoute("/containers/{name:.*}/json",r.getContainersByName),
router.NewGetRoute("/containers/{name:.*}/top",r.getContainersTop),
router.Cancellable(router.NewGetRoute("/containers/{name:.*}/logs",r.getContainersLogs)),
router.Cancellable(router.NewGetRoute("/containers/{name:.*}/stats",r.getContainersStats)),
router.NewGetRoute("/containers/{name:.*}/attach/ws",r.wsContainersAttach),
router.NewGetRoute("/exec/{id:.*}/json",r.getExecByID),
router.NewGetRoute("/containers/{name:.*}/archive",r.getContainersArchive),
// POST
router.NewPostRoute("/containers/create",r.postContainersCreate),
router.NewPostRoute("/containers/{name:.*}/kill",r.postContainersKill),
router.NewPostRoute("/containers/{name:.*}/pause",r.postContainersPause),
router.NewPostRoute("/containers/{name:.*}/unpause",r.postContainersUnpause),
router.NewPostRoute("/containers/{name:.*}/restart",r.postContainersRestart),
router.NewPostRoute("/containers/{name:.*}/start",r.postContainersStart),
router.NewPostRoute("/containers/{name:.*}/stop",r.postContainersStop),
router.NewPostRoute("/containers/{name:.*}/wait",r.postContainersWait),
router.NewPostRoute("/containers/{name:.*}/resize",r.postContainersResize),
router.NewPostRoute("/containers/{name:.*}/attach",r.postContainersAttach),
router.NewPostRoute("/containers/{name:.*}/copy",r.postContainersCopy), // Deprecated since 1.8, Errors out since 1.12
router.NewPostRoute("/containers/{name:.*}/exec",r.postContainerExecCreate),
router.NewPostRoute("/exec/{name:.*}/start",r.postContainerExecStart),
router.NewPostRoute("/exec/{name:.*}/resize",r.postContainerExecResize),
router.NewPostRoute("/containers/{name:.*}/rename",r.postContainerRename),
router.NewPostRoute("/containers/{name:.*}/update",r.postContainerUpdate),
router.NewPostRoute("/containers/prune",r.postContainersPrune),
// PUT
router.NewPutRoute("/containers/{name:.*}/archive",r.putContainersArchive),
// DELETE
router.NewDeleteRoute("/containers/{name:.*}",r.deleteContainers),
}
}
api/
server/
router/
container/
container_routes.go
func (s *containerRouter)getContainersLogs(ctx context.Context,w http.ResponseWriter,r *http.Request,vars map[string]string)error {
iferr :=httputils.ParseForm(r);err != nil {
returnerr
}
// Args are validated before the stream starts because when it starts we're
// sending HTTP 200 by writing an empty chunk of data to tell the client that
// daemon is going to stream. By sending this initial HTTP 200 we can't report
// any error after the stream starts (i.e. container not found, wrong parameters)
// with the appropriate status code.
stdout,stderr :=httputils.BoolValue(r, "stdout"),httputils.BoolValue(r, "stderr")
if !(stdout ||stderr) {
returnfmt.Errorf("Bad parameters: you must choose at least one stream")
}
containerName :=vars["name"]
logsConfig := &backend.ContainerLogsConfig{
ContainerLogsOptions:types.ContainerLogsOptions{
Follow:httputils.BoolValue(r, "follow"),
Timestamps:httputils.BoolValue(r, "timestamps"),
Since:r.Form.Get("since"),
Tail:r.Form.Get("tail"),
ShowStdout:stdout,
ShowStderr:stderr,
Details:httputils.BoolValue(r, "details"),
},
OutStream:w,
}
chStarted :=make(chan struct{})
iferr :=s.backend.ContainerLogs(ctx,containerName,logsConfig,chStarted);err != nil {
select {
case <-chStarted:
// The client may be expecting all of the data we're sending to
// be multiplexed, so mux it through the Systemerr stream, which
// will cause the client to throw an error when demuxing
stdwriter :=stdcopy.NewStdWriter(logsConfig.OutStream,stdcopy.Systemerr)
fmt.Fprintf(stdwriter, "Error running logs job: %v\n",err)
default:
returnerr
}
}
return nil
}
daemon/
logs.go
// ContainerLogs hooks up a container's stdout and stderr streams
// configured with the given struct.
func (daemon *Daemon)ContainerLogs(ctx context.Context,containerName string,config *backend.ContainerLogsConfig,started chan struct{})error {
if !(config.ShowStdout ||config.ShowStderr) {
returnerrors.New("You must choose at least one stream")
}
container,err :=daemon.GetContainer(containerName)
iferr != nil {
returnerr
}
………..
logs :=logReader.ReadLogs(readConfig)
// Close logWatcher on exit
defer func() {
logs.Close()
ifcLog !=container.LogDriver {
// Since the logger isn't cached in the container, which
// occurs if it is running, it must get explicitly closed
// here to avoid leaking it and any file handles it has.
iferr :=cLog.Close();err != nil {
logrus.Errorf("Error closing logger: %v",err)
}
}
}()
}
logger/
jsonfilelog/
read.go
// ReadLogs implements the logger's LogReader interface for the logs
// created by this driver.
func (l *JSONFileLogger)ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
logWatcher :=logger.NewLogWatcher()
gol.readLogs(logWatcher,config)
returnlogWatcher
}
func (l *JSONFileLogger)readLogs(logWatcher *logger.LogWatcher,config logger.ReadConfig) {
deferclose(logWatcher.Msg)
……..
followLogs()
}
funcfollowLogs(f *os.File,logWatcher *logger.LogWatcher,notifyRotate chan interface{},since time.Time) {
dec :=json.NewDecoder(f)
l := &jsonlog.JSONLog{}
// main loop
for {
msg,err := decodeLogLine(dec,l)
iferr != nil {
iferr :=handleDecodeErr(err);err != nil {
iferr ==errDone {
return
}
// we got an unrecoverable error, so return
logWatcher.Err <-err
return
}
// ready to try again
continue
}
}