在之前的搭建elk环境中,日志的处理流程为:filebeat --> logstash --> elasticsearch,随着业务量的增长,需要对架构做进一步的扩展,引入kafka集群。日志的处理流程变为:filebeat --> kafka --> logstash --> elasticsearch。
架构图如下所示:
安装步骤跳过,不会的自行查阅文档
这得着重讲解如何将rabbitmq的日志如何添加到ES里,kibana将数据展现出来
首先到rabbitmq服务器,确认rabbitmq的日志位置,/var/log/rabbitmq
更改filebeat配置文件
- type:log enabled:true paths: -/var/log/rabbitmq/rabbit*.log fields: log_topic:rabbitmq-log 如果有多个topic就多复制一段,将topic改成自己的 #------------------------------ Kafka output --------------------------------- output.kafka: enabled:true hosts:["10.11.10.9:9092","10.11.10.70:9092","10.11.10.1:9092"] #topic: userinfo topics: -topic:"rabbitmq-log" when.regexp: fields.log_topic:"rabbitmq-log" 如果有多个topic就多复制一段,将topic改成自己的 partition.round_robin: reachable_only:false compression:gzip max_message_bytes:1000000 required_acks:1 |
此时filebeat就配置好了
登陆到logstash服务器,编辑logstash文件
input { kafka { bootstrap_servers => "10.11.10.9:9092", "10.11.10.70:9092", "10.11.10.1:9092" group_id => "rabbitmq-log" topics => ["rabbitmq-log"] codec => "plain" type => "info" } filter { if ([message]== "") { drop {} }
} output { if [fields][log_topic] == "rabbitmq-log" { elasticsearch { hosts => ["10.11.10.9:9092", "10.11.10.70:9092", "10.11.10.1:9092"] index => "secure-log-%{+YYYY.MM.dd}"
} stdout{ codec => rebydebug } } } |
重启logstash
nohup /usr/share/logstash/bin/logstash -f /etc/logstash2/conf.d/logstash.conf >> /var/log/logstash2-stdout.log 2>>/var/log/logstash2-stderr.log & |
打开浏览器,登陆kibana
添加刚才创建的topic
返回到discover,就可以看到数据了
转载于:https://blog.51cto.com/536410/2335661