ES用logstash批量导入csv数据

logstash和es的版本要一致,这里使用6.2.4

一、在logstash的bin目录下创建conf文件夹再创建csv.conf文件:

input {
  file {
    path => ["C:\Users\Desktop\test.csv"]  
    start_position => "beginning"
  }
}
filter {
  csv {
    separator => ","
    columns => ["name","age"]
  }
  mutate {
    convert => {
      "name" => "string"
      "age" => "integer"
    }
  }
 }

output {
  elasticsearch {
        hosts => ["127.0.0.1:9200"]
        index => "test2"
		document_type => "test2"
  }
}
其中:
input
input组件负责读取数据,使用以下插件读取不同的数据类型。
       file插件读取本地文本文件,
       stdin插件读取标准输入数据,
       tcp插件读取网络数据,
       log4j插件读取log4j发送过来的数据等等。
path:csv文件路径
start_position:可以设置为beginning或者end,beginning表示从头开始读取文件,end表示读取最新的,这个也要和ignore_older一起使用。

filter
filter插件负责过滤解析input读取的数据
读取csv文件:
separator:拆分符
columns:csv文件中的字段,注意:要和 csv文件中字段顺序一致

mutate:字段类型转换,具体可转换的数据类型,根据es版本而定,可查看官网:https://www.elastic.co/guide/en/logstash/5.5/plugins-filters-mutate.html

output
hosts:主机ip
index:设置es中的索引名称
document_type:索引下的type名称

对于csv文件需要注意一下几点:
1、第一行不需要存储字段名称,直接就是字段值信息
2、最后一行要换行

二、在logstash的bin目录下运行 logstash -f logstash.conf,出现如下信息则表示运行成功:

在这里插入图片描述
在这里插入图片描述

注意:这个进程会一直处于运行状态,若csv文件被修改,程序会自动重新导入数据;
若需要重新导入数据,则需要删除logstash安装目录下\data\plugins\inputs\file下的文件。重新运行logstash -f logstash.conf

再贴一个完整点的conf文件

 
#1.读取数据csv文件,数据输入阶段
input {
    file{
        #设置csv文件路径,多个文件路径可设置成数组[],模糊匹配用*
        #指定单一文件
        path => "/data/es/logstash-5.6.1/files/test.csv"
        #指定数组文件
        #path => ["/data/es/logstash-5.6.1/files/test-1.csv","/data/es/logstash-5.6.1/files/test-2.csv"]
        #指定同级目录模糊匹配
        #path => "/data/es/logstash-5.6.1/files/test*.csv"
        #指定多级目录模糊匹配
        #path => "/data/es/logstash-5.6.1/files/**/test*.csv"
        
        #可设置成begining或end,begining表示从头开始读取文件,end表示读取最新数据,可和ignore_older一起使用
        #begining只针对首次启动是否需要读取所有的历史数据,而当csv文件修改了之后,同样会自动增量更新新数据
        start_position => beginning
        
        #表示针对多久的文件进行监控,默认一天内的文件,单位为秒,0表示不忽略任何过期日志
        #ignore_older => 86400
        
        #表示多久检查一次被监听文件的变化,默认1s
        #stat_interval => 1
        
        #当存在多个文件的时候可使用type指定输入输出路径
        type => "csv_index"
    }
 }  
  
#2.过滤格式化数据阶段
filter {
 
    #读取csv文件
    csv {
     #设置拆分符为逗号
     separator => ","
 
     #指定csv文件的字段,必须要和csv文件中的字段顺序一致
     columns => ["name","age","sex","updatetime"]
 
     #指定字段的数据格式,也可在mutate转换:integer, float, date, date_time, boolean
     convert => {"age" => "float"}
     
   }
   
   #转换其他数据格式
   mutate {
    convert => {
      "name" => "string"
      "sex" => "string"
    }
   }
   
   mutate{
        #将lat和lng两字段数据合并成mergestr字段
        add_field => ["mergestr", "%{age}, %{sex}"]
        
        #删除无效的字段,可自定义
        remove_field => ["@version","message","host","path"]
    }
    
    #获取日志中的时间字符串,转换成时间格式,或者替换成其他的字段
    date {
        match => ["updatetime", "yyyy/MM/dd HH:mm:ss"]
        #直接将updatetime转换时间格式
        target => "updatetime"
        #使用updatetime转换后的时间格式替换掉@timestamp时间
        #target => "@timestamp"
    }
    
    #新增timestamp字段,将@timestamp时间增加8小时
    ruby { code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)" }
    
}
 
#3.数据输出到ES阶段
output {
 
    #日志输出格式,json_lines;rubydebug等
    stdout {
        codec => rubydebug
    }
    
    #输出到es
    if[type] == "csv_index"{
    
        #无法解析的json不记录到elasticsearch中
        if "_csvparsefailure" not in [tags] {    
            elasticsearch {
                #es地址ip端口
                hosts => "127.0.0.1:9200"
                #索引
                index => "csv_index"
                #类型
                document_type => "csv_index"
                #覆盖模板,不需要可注释掉,通用模板下载:https://download.csdn.net/download/alan_liuyue/11241484
                template_overwrite=>true
                template=>"/data/es/logstash-5.6.1/template/logstash.json"         
                
            }
        }
    }
    
}

版权声明:本文为weixin_44665501原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。