Filebeat日志采集使用方式

Filebeat日志采集使用方式

最近由于业务需求,需要通过filebeat采集日志文件到kafka中,供flink消费,其具体步骤如下:

  • 安装filebeat

  • 文件配置

    • filebeat-input.yml文件配置

      - backoff: 1s
        backoff_factor: 2
        clean_inactive: 25h
        close_inactive: 2h
        encoding: plain
        fields: {NODE_IP: '日志所在节点', target_topic: 目标kafka topic}
        fields_under_root: true
        force_close_files: false
        harvester_buffer_size: 16384
        ignore_older: 24h
        max_backoff: 10s
        max_bytes: 10485760
        multiline: {match: after, negate: true, pattern: '^\d{4}\-\d{1,2}\-\d{1,2}\s\d{2}:\d{2}:\d{2}.\d{3}\s\001'}
        paths: [日志所在文件]
        scan_frequency: 1s
        tail_files: true
        type: log
      
    • filebeat.yml文件配置

      queue:
        mem:
          events: 2048
          flush.min_events: 1024
      # 日志文件采集配置
      filebeat.config.inputs:
        enabled: true
        path: conf/filebeat-input.yml
        reload.enabled: true
        reload.period: 10s
      
      # 输出到终端,供调试用
      #output.console:
      #  pretty: ${CONSOLE:false}
      
      # 输出到kafka
      output.kafka:
        enabled: true
        hosts: ["kafka所在节点"]
        topic: '%{[target_topic]}'
        partition.round_robin:
          reachable_only: false
        required_acks: 1
        compression: gzip
        max_message_bytes: 1000000
      
  • 配置好上述文件以后,就可以启动filebeat进行日志采集:

    ./filebeat -e -c  /app/filebeat/conf/filebeat.yml
    
  • 在kafka测试,即可收到对应日志


版权声明:本文为wangxingxingalulu原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。