ElasticSearch日志系统 使用Logstash简单处理日志信息

目标:

使用Logstash的配置文件中的过滤器filter,对日志内容进行简单解析,并处理输出时生成字段。

Logstash实现多路接收。

观察日志格式

系统中的日志,能够大概分为以上两种类型,一种是纯系统操作信息日志,另一种是系统操作信息后,附带JSON格式的具体实体信息。

两种日志中,开头的信息及结构都是相同的。包括 日志编码、日志产生时间、日志产生线程、日志类型、日志产生类路径、日志内容。

grok正则捕获

我们可以通过gork,解析抽离出日志中的日志编码、日志产生时间、日志产生线程、日志类型、日志产生类路径、日志内容。

其中,grok可以通过关键字匹配,和正则表达式进行匹配,两个可在一条匹配语句中混合使用。关键字匹配的实现逻辑也是正则表达式匹配。

在关键字匹配中,其语法规则是:

%{语法:语义}

语法为关键字,语义为自定义字段名称。

在本系统日志格式中,日志编码、日志产生时间、日志产生线程、日志类型均可以使用关键字匹配来解析日志信息。filter中我们可以编写为:

filter{

         grok{

                match => ["message", "%{NUMBER:log_id} %{TIMESTAMP_ISO8601:log_time} \[%{DATA:log_thread}\] %{LOGLEVEL:log_type} " ]

        }

}

看看匹配效果

成功分析出所需要的字段

日志产生类路径和日志内容属于较为复杂的字符串,此时我们用正则表达式进行匹配。filter中我们编写为:

filter{

         grok{

                match => ["message",

                        "%{NUMBER:log_id} %{TIMESTAMP_ISO8601:log_time} \[%{DATA:log_thread}\] %{LOGLEVEL:log_type} \s*(?<class_info>([\S+]*)) %{GREEDYDATA:log_msg}"

                ]

        }

}

看看匹配效果

成功分析出所需要的字段。以上匹配方式只是将最后的日志内容放在了一个字段中,在以JSON结尾的日志中,我们还可以对JSON进行解析。

首先需要提取JSON部分内容。根据观察,内容被嵌套在冒号后边的花括号至最后一个花括号处,正则表达式我们可以这样写:

"%{NUMBER:log_id} %{TIMESTAMP_ISO8601:log_time} \[%{DATA:log_thread}\] %{LOGLEVEL:log_type} \s*(?<class_info>([\S+]*)) .*\w+\:(?<log_json>(\{[\s\S]+\}))"

目前,有两种匹配语句对应匹配两种日志。Logstash可在grok中写入一条或多条匹配语句,并且智能地帮我们匹配到相应类型的日志记录中。

写法如下:

grok{

        match => ["message",

        "%{NUMBER:log_id} %{TIMESTAMP_ISO8601:log_time} \[%{DATA:log_thread}\] %{LOGLEVEL:log_type} \s*(?<class_info>([\S+]*)) .*\w+\:(?<log_json>(\{[\s\S]+\}))",

       "message",

        "%{NUMBER:log_id} %{TIMESTAMP_ISO8601:log_time} \[%{DATA:log_thread}\] %{LOGLEVEL:log_type} \s*(?<class_info>([\S+]*)) %{GREEDYDATA:log_msg}"

        ]

}

JSON格式解析

匹配到JSON的内容到字段 log_json 后,则还需要使用filter中自带的json插件,对 log_json 进行解析,由字符串转化为JSON格式。

在filter{}中加入:

json {

       source => "log_json" #对应log_json字段内容

       target => "json_content" #将内容转化为JSON格式后,放入命名为 json_content 的字段

       remove_field=>["log_json"] #将 log_json 字段删除

 }

json_content解析后,输出效果如下。发送至ElasticSearch服务器是也为JSON格式,即刻按照JSON中属性进行相关匹配、搜索。

日期格式解析

匹配到TIMESTAMP_ISO8601的内容到字段 log_time 后,则还需要使用filter中自带的date插件,对 log_time 进行解析,由字符串转化为时间格式。

在filter{}中加入:

date {

       match => ["log_time", "yyyy-MM-dd HH:mm:ss,SSS"]

}

此时则完成对 log_time 字段的匹配并转义,替代@timestamp字段。(原@timestamp字段意义为Logstash接收到该日志的时间戳。)

删除多余字段

经过对Logstash默认生成日志记录的观察,有部分字段我们并不需要使用到。并且这些字段会给传输和储存带来更大的压力,所以决定删除。

使用 remove_field 删除需要删除的字段,写法如下,在filter{}中加入:

 mutate{

       remove_field => ["host"]

       remove_field => ["agent"]

       remove_field => ["ecs"]

       remove_field => ["tags"]

       remove_field => ["fields"]

       remove_field => ["@version"]

       remove_field => ["input"]

       remove_field => ["log"]

}

最后效果

完整的 filter 如下:

filter{

        grok{

                match => ["message",

                "%{NUMBER:log_id} %{TIMESTAMP_ISO8601:log_time} \[%{DATA:log_thread}\] %{LOGLEVEL:log_type} \s*(?<class_info>([\S+]*)) .*\w+\:(?<log_json>(\{[\s\S]+\}))",

                "message",

                "%{NUMBER:log_id} %{TIMESTAMP_ISO8601:log_time} \[%{DATA:log_thread}\] %{LOGLEVEL:log_type} \s*(?<class_info>([\S+]*)) %{GREEDYDATA:log_msg}"

                ]

        }

        json {

                source => "log_json"

                target => "json_content"

                remove_field=>["log_json"]

        }

        date {

                match => ["log_time", "yyyy-MM-dd HH:mm:ss,SSS"]

        }

        mutate{

                remove_field => ["host"]

                remove_field => ["agent"]

                remove_field => ["ecs"]

                remove_field => ["tags"]

                remove_field => ["fields"]

                remove_field => ["@version"]

                remove_field => ["input"]

                remove_field => ["log"]

        }

}

发送到 ElasticSearch 服务器后,我们可以在 Kibana 中看到如下效果:

Logstash多路接收

由于需求需要,在ELK服务机上,需要使用 Logstash 接收两台日志生成机上,三个项目的日志文件,并且将该三个项目分别建成三条不同的索引存入 ElasticSearch 中。以下将使用两种方式区分不同项目的日志。

使用不同端口接收

在 Logstash 输入输出配置文件中,input下,创建若干个 beats{} ,通过 port 指定接收的端口,并在信息中加入字段用于区分不同端口接收到的日志消息。

input {

    beats {

        port => "5045"

        add_field => {"beat_machine" => "log1"}

    }

    beats {

        port => "5044"

        add_field => {"beat_machine" => "log2"}

    }

}

然后通过判断语句,在output下,将日志区分,写入不同的索引发送至 ElasticSearch 中。

output {

    if [beat_machine] == "log1" {

        elasticsearch {

            hosts => ["ELK服务机IP:9200"]

            index => "log1_%{+YYYY.MM.dd}"

        }

    }

    else if [beat_machine] == "log2" {

        elasticsearch {

            hosts => ["ELK服务机IP:9200"]

            index => "log2_%{+YYYY.MM.dd}"

        }

    }

}

通过Filebeat添加识别字段

Filebeat 传送 Logstash 只可以指定一个端口,该种处理方式只可以区分不同机器上的日志文件,若两个系统生成的日志文件都在一台机器上那怎么办呢?在 Filebeat 中为日志添加识别字段可解决该问题。

在 Filebeat 配置文件中 filebeat.inputs 下,可以指定读取多个路径的日志文件。假设两个系统中的日志文件分别放在两个不同的路径下,则可在每一条指定路径下添加识别字段信息。

filebeat.inputs:

- type: log

  enabled: true

  paths:

    - /home/Log/app/*

  fields:                                    # 设置一个 fields,用于标记这个日志

    project_type: app

  fields_under_root: true         # 新增fields成为顶级目,被Logstash读取

- type: log

  enabled: true

  paths:

    - /home/Log/web/*

  fields:                                    # 设置一个 fields,用于标记这个日志

    project_type: web

  fields_under_root: true         # 新增fields成为顶级目,被Logstash读取

此时,在 Logstash 的 output 中即可像之前一样,通过判断,将日志区分,写入不同的索引发送至 ElasticSearch 中。

output {

        if [project_type] == "app"{

            elasticsearch {

                hosts => ["ELK服务机IP:9200"]

                index => "app_%{+YYYY.MM.dd}"

            }

        }

       else if [project_type] == ""{

            elasticsearch {

                hosts => ["ELK服务机IP:9200"]

                index => "web_%{+YYYY.MM.dd}"

            }

      }

}

完成。

亦可两种方式搭配使用。


版权声明:本文为Beyond1536原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。