目标:
使用Logstash的配置文件中的过滤器filter,对日志内容进行简单解析,并处理输出时生成字段。
Logstash实现多路接收。
观察日志格式

系统中的日志,能够大概分为以上两种类型,一种是纯系统操作信息日志,另一种是系统操作信息后,附带JSON格式的具体实体信息。
两种日志中,开头的信息及结构都是相同的。包括 日志编码、日志产生时间、日志产生线程、日志类型、日志产生类路径、日志内容。
grok正则捕获
我们可以通过gork,解析抽离出日志中的日志编码、日志产生时间、日志产生线程、日志类型、日志产生类路径、日志内容。
其中,grok可以通过关键字匹配,和正则表达式进行匹配,两个可在一条匹配语句中混合使用。关键字匹配的实现逻辑也是正则表达式匹配。
在关键字匹配中,其语法规则是:
%{语法:语义}
语法为关键字,语义为自定义字段名称。
在本系统日志格式中,日志编码、日志产生时间、日志产生线程、日志类型均可以使用关键字匹配来解析日志信息。filter中我们可以编写为:
filter{
grok{
match => ["message", "%{NUMBER:log_id} %{TIMESTAMP_ISO8601:log_time} \[%{DATA:log_thread}\] %{LOGLEVEL:log_type} " ]
}
}
看看匹配效果
成功分析出所需要的字段
日志产生类路径和日志内容属于较为复杂的字符串,此时我们用正则表达式进行匹配。filter中我们编写为:
filter{
grok{
match => ["message",
"%{NUMBER:log_id} %{TIMESTAMP_ISO8601:log_time} \[%{DATA:log_thread}\] %{LOGLEVEL:log_type} \s*(?<class_info>([\S+]*)) %{GREEDYDATA:log_msg}"
]
}
}
看看匹配效果

成功分析出所需要的字段。以上匹配方式只是将最后的日志内容放在了一个字段中,在以JSON结尾的日志中,我们还可以对JSON进行解析。
首先需要提取JSON部分内容。根据观察,内容被嵌套在冒号后边的花括号至最后一个花括号处,正则表达式我们可以这样写:
"%{NUMBER:log_id} %{TIMESTAMP_ISO8601:log_time} \[%{DATA:log_thread}\] %{LOGLEVEL:log_type} \s*(?<class_info>([\S+]*)) .*\w+\:(?<log_json>(\{[\s\S]+\}))"
目前,有两种匹配语句对应匹配两种日志。Logstash可在grok中写入一条或多条匹配语句,并且智能地帮我们匹配到相应类型的日志记录中。
写法如下:
grok{
match => ["message",
"%{NUMBER:log_id} %{TIMESTAMP_ISO8601:log_time} \[%{DATA:log_thread}\] %{LOGLEVEL:log_type} \s*(?<class_info>([\S+]*)) .*\w+\:(?<log_json>(\{[\s\S]+\}))",
"message",
"%{NUMBER:log_id} %{TIMESTAMP_ISO8601:log_time} \[%{DATA:log_thread}\] %{LOGLEVEL:log_type} \s*(?<class_info>([\S+]*)) %{GREEDYDATA:log_msg}"
]
}
JSON格式解析
匹配到JSON的内容到字段 log_json 后,则还需要使用filter中自带的json插件,对 log_json 进行解析,由字符串转化为JSON格式。
在filter{}中加入:
json {
source => "log_json" #对应log_json字段内容
target => "json_content" #将内容转化为JSON格式后,放入命名为 json_content 的字段
remove_field=>["log_json"] #将 log_json 字段删除
}
json_content解析后,输出效果如下。发送至ElasticSearch服务器是也为JSON格式,即刻按照JSON中属性进行相关匹配、搜索。

日期格式解析
匹配到TIMESTAMP_ISO8601的内容到字段 log_time 后,则还需要使用filter中自带的date插件,对 log_time 进行解析,由字符串转化为时间格式。
在filter{}中加入:
date {
match => ["log_time", "yyyy-MM-dd HH:mm:ss,SSS"]
}
此时则完成对 log_time 字段的匹配并转义,替代@timestamp字段。(原@timestamp字段意义为Logstash接收到该日志的时间戳。)
删除多余字段
经过对Logstash默认生成日志记录的观察,有部分字段我们并不需要使用到。并且这些字段会给传输和储存带来更大的压力,所以决定删除。
使用 remove_field 删除需要删除的字段,写法如下,在filter{}中加入:
mutate{
remove_field => ["host"]
remove_field => ["agent"]
remove_field => ["ecs"]
remove_field => ["tags"]
remove_field => ["fields"]
remove_field => ["@version"]
remove_field => ["input"]
remove_field => ["log"]
}
最后效果
完整的 filter 如下:
filter{
grok{
match => ["message",
"%{NUMBER:log_id} %{TIMESTAMP_ISO8601:log_time} \[%{DATA:log_thread}\] %{LOGLEVEL:log_type} \s*(?<class_info>([\S+]*)) .*\w+\:(?<log_json>(\{[\s\S]+\}))",
"message",
"%{NUMBER:log_id} %{TIMESTAMP_ISO8601:log_time} \[%{DATA:log_thread}\] %{LOGLEVEL:log_type} \s*(?<class_info>([\S+]*)) %{GREEDYDATA:log_msg}"
]
}
json {
source => "log_json"
target => "json_content"
remove_field=>["log_json"]
}
date {
match => ["log_time", "yyyy-MM-dd HH:mm:ss,SSS"]
}
mutate{
remove_field => ["host"]
remove_field => ["agent"]
remove_field => ["ecs"]
remove_field => ["tags"]
remove_field => ["fields"]
remove_field => ["@version"]
remove_field => ["input"]
remove_field => ["log"]
}
}
发送到 ElasticSearch 服务器后,我们可以在 Kibana 中看到如下效果:

Logstash多路接收
由于需求需要,在ELK服务机上,需要使用 Logstash 接收两台日志生成机上,三个项目的日志文件,并且将该三个项目分别建成三条不同的索引存入 ElasticSearch 中。以下将使用两种方式区分不同项目的日志。
使用不同端口接收
在 Logstash 输入输出配置文件中,input下,创建若干个 beats{} ,通过 port 指定接收的端口,并在信息中加入字段用于区分不同端口接收到的日志消息。
input {
beats {
port => "5045"
add_field => {"beat_machine" => "log1"}
}
beats {
port => "5044"
add_field => {"beat_machine" => "log2"}
}
}
然后通过判断语句,在output下,将日志区分,写入不同的索引发送至 ElasticSearch 中。
output {
if [beat_machine] == "log1" {
elasticsearch {
hosts => ["ELK服务机IP:9200"]
index => "log1_%{+YYYY.MM.dd}"
}
}
else if [beat_machine] == "log2" {
elasticsearch {
hosts => ["ELK服务机IP:9200"]
index => "log2_%{+YYYY.MM.dd}"
}
}
}
通过Filebeat添加识别字段
Filebeat 传送 Logstash 只可以指定一个端口,该种处理方式只可以区分不同机器上的日志文件,若两个系统生成的日志文件都在一台机器上那怎么办呢?在 Filebeat 中为日志添加识别字段可解决该问题。
在 Filebeat 配置文件中 filebeat.inputs 下,可以指定读取多个路径的日志文件。假设两个系统中的日志文件分别放在两个不同的路径下,则可在每一条指定路径下添加识别字段信息。
filebeat.inputs:
- type: log
enabled: true
paths:
- /home/Log/app/*
fields: # 设置一个 fields,用于标记这个日志
project_type: app
fields_under_root: true # 新增fields成为顶级目,被Logstash读取
- type: log
enabled: true
paths:
- /home/Log/web/*
fields: # 设置一个 fields,用于标记这个日志
project_type: web
fields_under_root: true # 新增fields成为顶级目,被Logstash读取
此时,在 Logstash 的 output 中即可像之前一样,通过判断,将日志区分,写入不同的索引发送至 ElasticSearch 中。
output {
if [project_type] == "app"{
elasticsearch {
hosts => ["ELK服务机IP:9200"]
index => "app_%{+YYYY.MM.dd}"
}
}
else if [project_type] == ""{
elasticsearch {
hosts => ["ELK服务机IP:9200"]
index => "web_%{+YYYY.MM.dd}"
}
}
}
完成。
亦可两种方式搭配使用。