logstash和es的版本要一致,这里使用6.2.4
一、在logstash的bin目录下创建conf文件夹再创建csv.conf文件:
input {
file {
path => ["C:\Users\Desktop\test.csv"]
start_position => "beginning"
}
}
filter {
csv {
separator => ","
columns => ["name","age"]
}
mutate {
convert => {
"name" => "string"
"age" => "integer"
}
}
}
output {
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "test2"
document_type => "test2"
}
}
其中:
input
input组件负责读取数据,使用以下插件读取不同的数据类型。
file插件读取本地文本文件,
stdin插件读取标准输入数据,
tcp插件读取网络数据,
log4j插件读取log4j发送过来的数据等等。
path:csv文件路径
start_position:可以设置为beginning或者end,beginning表示从头开始读取文件,end表示读取最新的,这个也要和ignore_older一起使用。
filter
filter插件负责过滤解析input读取的数据
读取csv文件:
separator:拆分符
columns:csv文件中的字段,注意:要和 csv文件中字段顺序一致
mutate:字段类型转换,具体可转换的数据类型,根据es版本而定,可查看官网:https://www.elastic.co/guide/en/logstash/5.5/plugins-filters-mutate.html
output
hosts:主机ip
index:设置es中的索引名称
document_type:索引下的type名称
对于csv文件需要注意一下几点:
1、第一行不需要存储字段名称,直接就是字段值信息
2、最后一行要换行
二、在logstash的bin目录下运行 logstash -f logstash.conf,出现如下信息则表示运行成功:
注意:这个进程会一直处于运行状态,若csv文件被修改,程序会自动重新导入数据;
若需要重新导入数据,则需要删除logstash安装目录下\data\plugins\inputs\file下的文件。重新运行logstash -f logstash.conf
再贴一个完整点的conf文件
#1.读取数据csv文件,数据输入阶段
input {
file{
#设置csv文件路径,多个文件路径可设置成数组[],模糊匹配用*
#指定单一文件
path => "/data/es/logstash-5.6.1/files/test.csv"
#指定数组文件
#path => ["/data/es/logstash-5.6.1/files/test-1.csv","/data/es/logstash-5.6.1/files/test-2.csv"]
#指定同级目录模糊匹配
#path => "/data/es/logstash-5.6.1/files/test*.csv"
#指定多级目录模糊匹配
#path => "/data/es/logstash-5.6.1/files/**/test*.csv"
#可设置成begining或end,begining表示从头开始读取文件,end表示读取最新数据,可和ignore_older一起使用
#begining只针对首次启动是否需要读取所有的历史数据,而当csv文件修改了之后,同样会自动增量更新新数据
start_position => beginning
#表示针对多久的文件进行监控,默认一天内的文件,单位为秒,0表示不忽略任何过期日志
#ignore_older => 86400
#表示多久检查一次被监听文件的变化,默认1s
#stat_interval => 1
#当存在多个文件的时候可使用type指定输入输出路径
type => "csv_index"
}
}
#2.过滤格式化数据阶段
filter {
#读取csv文件
csv {
#设置拆分符为逗号
separator => ","
#指定csv文件的字段,必须要和csv文件中的字段顺序一致
columns => ["name","age","sex","updatetime"]
#指定字段的数据格式,也可在mutate转换:integer, float, date, date_time, boolean
convert => {"age" => "float"}
}
#转换其他数据格式
mutate {
convert => {
"name" => "string"
"sex" => "string"
}
}
mutate{
#将lat和lng两字段数据合并成mergestr字段
add_field => ["mergestr", "%{age}, %{sex}"]
#删除无效的字段,可自定义
remove_field => ["@version","message","host","path"]
}
#获取日志中的时间字符串,转换成时间格式,或者替换成其他的字段
date {
match => ["updatetime", "yyyy/MM/dd HH:mm:ss"]
#直接将updatetime转换时间格式
target => "updatetime"
#使用updatetime转换后的时间格式替换掉@timestamp时间
#target => "@timestamp"
}
#新增timestamp字段,将@timestamp时间增加8小时
ruby { code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)" }
}
#3.数据输出到ES阶段
output {
#日志输出格式,json_lines;rubydebug等
stdout {
codec => rubydebug
}
#输出到es
if[type] == "csv_index"{
#无法解析的json不记录到elasticsearch中
if "_csvparsefailure" not in [tags] {
elasticsearch {
#es地址ip端口
hosts => "127.0.0.1:9200"
#索引
index => "csv_index"
#类型
document_type => "csv_index"
#覆盖模板,不需要可注释掉,通用模板下载:https://download.csdn.net/download/alan_liuyue/11241484
template_overwrite=>true
template=>"/data/es/logstash-5.6.1/template/logstash.json"
}
}
}
}
版权声明:本文为weixin_44665501原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。