InfluxDB数据模型和实践
本文是针对没有使用过InfluxDB的读者的一个入门介绍。
主要内容:介绍influx DB的数据模型
对influx DB 官方文档的摘写和导航
实践:使用JAVA对InfluxDB进行读写操作体验InfluxDB的性能
InfluxDB是什么
InfluxDB是开源的时间序列数据库,可用于实时查看堆栈,传感器和系统的指标查看。
优势:时序数据库的优势,在于处理指标数据的聚合,并且读写效率非常高。
特点:influxDB 不支持数据库的更新操作,毕竟时间数据只能随着时间产生新数据,肯定无法对过去的数据修改。
从数据结构上说,时间序列数据数据没有单一的主键标识,必须包含时间戳,数据只能和时间戳进行关联,不适合普通业务数据。
InfluxDB的数据模型InfluxDB里存储的数据被称为时间序列数据,其包含一个数值,就像CPU的load值或是温度值类似的。
时序数据有零个或多个数据点,每一个都是一个指标值。time(一个时间戳) 【类比SQL中的主键】
measurement(例如cpu_load) 【可以理解为SQL中的table】
至少一个k-v格式的field(也即指标的数值例如 “value=0.64”或者“temperature=21.2”) 【类比SQL中的列】
零个或多个tag,其一般是对于这个指标值的元数据(例如“host=server01”, “region=EMEA”, “dc=Frankfurt)。【可以理解为列,与field不同的是,tag理解为一个对象,例如server01的主机温度为21.2。并且tag是被索引起来的】
与传统的关系型数据库不同之处在于,在InfluxDB里,你可以有几百万的measurements,你不用事先定义数据的scheme,并且null值不会被存储。
将数据点写入InfluxDB,只需要遵守如下的行协议:ps: 注意 tag和field 之间有一个空格
[,=...] =[,=...] [unix-nano-timestamp]
例如:
cpu,host=serverA,region=us_west value=0.64
payment,device=mobile,product=Notepad,method=credit billed=33,licenses=3i 1434067467100293230
stock,symbol=AAPL bid=127.46,ask=127.48
temperature,machine=unit42,type=assembly external=25,internal=37 1434067467000000000
搭建InfluxDB实验环境
本文使用Docker快速搭建。 在dockerHub上搜索InfluxDB的镜像查看使用说明 :https://hub.docker.com/_/influxdb/ 这里安装influxDB最新版本。示例如下。将端口号8086暴露,并将数据映射到本地路径。
docker run -d --name influxdbdemo1 -p 8086:8086 -v $PWD:/var/lib/influxdb influxdb
验证 到此部署成功。
influx 命令行工具
有很多可以向InfluxDB写数据的方式,包括命令行、客户端还有一些像Graphite有一样数据格式的插件。influx这个工具包含在InfluxDB的安装包里,是一个操作数据库的轻量级工具。它直接通过InfluxDB的HTTP接口(如果没有修改,默认是8086)来和InfluxDB通信。 说明:也可以直接发送裸的HTTP请求来操作数据库,例如curl
进入到容器中.
docker exec -it influxdbdemo1 /bin/bash
root@09460d9e8053:/# influx -precision rfc3339
Connected to http://localhost:8086 version 1.8.1
InfluxDB shell version: 1.8.1-precision参数表明了任何返回的时间戳的格式和精度,在上面的例子里,rfc3339是让InfluxDB返回RFC339格式(YYYY-MM-DDTHH:MM:SS.nnnnnnnnnZ)的时间戳
influxdb的DDL命令和mysql的命令很相似创建数据库:
CREATE DATABASE [WITH [DURATION ] [REPLICATION ] [SHARD DURATION ] [NAME ]]切换数据库: use [数据库名称]
插入数据: insert [数据点]
查询数据: select * from [measurement]
示例如下:
root@09460d9e8053:/# influx -precision rfc3339
Connected to http://localhost:8086 version 1.8.1
InfluxDB shell version: 1.8.1
> CREATE DATABASE mydb
> show databases
name: databases
name
----
_internal
mydb
> use mydb
Using database mydb
> INSERT temperature,machine=unit42,type=assembly external=25,internal=37
> SELECT * FROM "temperature"
name: temperature
time external internal machine type
---- -------- -------- ------- ----
2020-08-06T02:33:08.927309504Z 25 37 unit42 assembly
当然还有更多示例和用法,本文没有提及,可以通过help命令查看
root@09460d9e8053:/# influx --help
Usage of influx:
-version
Display the version and exit.
-path-prefix 'url path'
Path that follows the host and port
-host 'host name'
Host to connect to.
-port 'port #'
Port to connect to.
-socket 'unix domain socket'
Unix socket to connect to.
-database 'database name'
Database to connect to the server.
-password 'password'
Password to connect to the server. Leaving blank will prompt for password (--password '').
-username 'username'
Username to connect to the server.
-ssl
Use https for requests.
-unsafeSsl
Set this when connecting to the cluster using https and not use SSL verification.
-execute 'command'
Execute command and quit.
-type 'influxql|flux'
Type specifies the query language for executing commands or when invoking the REPL.
-format 'json|csv|column'
Format specifies the format of the server responses: json, csv, or column.
-precision 'rfc3339|h|m|s|ms|u|ns'
Precision specifies the format of the timestamp: rfc3339, h, m, s, ms, u or ns.
-consistency 'any|one|quorum|all'
Set write consistency level: any, one, quorum, or all
-pretty
Turns on pretty print for the json format.
-import
Import a previous database export from file
-pps
How many points per second the import will allow. By default it is zero and will not throttle importing.
-path
Path to file to import
-compressed
Set to true if the import file is compressed
Examples:
# Use influx in a non-interactive mode to query the database "metrics" and pretty print json:
$ influx -database 'metrics' -execute 'select * from cpu' -format 'json' -pretty
# Connect to a specific database on startup and set database context:
$ influx -database 'metrics' -host 'localhost' -port '8086'
root@09460d9e8053:/#
內建的HTTP接口写入数据
上面介绍了使用influx自带的工具进行,下面介绍使用influx API 写入数据到influxDB到数据库中。 并且infulxDB API 是主要的写入数据的方法。
curl -i -XPOST http://localhost:8086/query --data-urlencode "q=CREATE DATABASE mydb"
1.8 API 版本
curl -i -XPOST 'http://localhost:8086/write?db=mydb'
--data-binary 'cpu_load_short,host=server01,region=us-west value=0.64 1434055562000000000'
2.0 API 版本
curl -i -XPOST 'http://localhost:8086/api/v2/write?bucket=db/rp&precision=ns' \ --header 'Authorization: Token username:password' \ --data-raw 'cpu_load_short,host=server01,region=us-west value=0.64 1434055562000000000'
curl -i -XPOST 'http://localhost:8086/write?db=mydb' --data-binary 'cpu_load_short,host=server02 value=0.67cpu_load_short,host=server02,region=us-west value=0.55 1422568543702900257cpu_load_short,direction=in,host=server01,region=us-west value=2.0 1422568543702900257'
PS C:\Users\dhht> curl -i -XPOST 'http://localhost:8086/write?db=mydb'
HTTP/1.1 204 No Content
Content-Type: application/json
Request-Id: 9007f7f0-d857-11ea-800e-0242ac110002
X-Influxdb-Build: OSS
X-Influxdb-Version: 1.8.1
X-Request-Id: 9007f7f0-d857-11ea-800e-0242ac110002
Date: Fri, 07 Aug 2020 02:41:57 GMT
如果收到 HTTP/1.1 204 No Content 说明写入成功。
要执行InfluxQL查询,请将GET请求发送到/ query端点,将URL参数db设置为目标数据库,并将URL参数q设置为您的查询。
curl -G 'http://localhost:8086/query?pretty=true' --data-urlencode "db=mydb" --data-urlencode "q=SELECT \"value\" FROM \"cpu_load_short\" WHERE \"region\"='us-west'"
返回json
{
"results": [
{
"statement_id": 0,
"series": [
{
"name": "cpu_load_short",
"columns": [
"time",
"value"
],
"values": [
[
"2015-01-29T21:55:43.702900257Z",
2
],
[
"2015-01-29T21:55:43.702900257Z",
0.55
],
[
"2015-06-11T20:46:02Z",
0.64
]
]
}
]
}
]
}
实验 InfluxDB 性能测试
Spring boot 整合 influxDB 插入1千万条数据
添加依赖
org.springframework.boot
spring-boot-starter
org.influxdb
influxdb-java
2.17
org.projectlombok
lombok
org.springframework.boot
spring-boot-starter-test
test
org.junit.vintage
junit-vintage-engine
初始Influxdb数据库测试类
package com.cll.influxdbtest;
import lombok.extern.slf4j.Slf4j;
import okhttp3.OkHttpClient;
import org.influxdb.BatchOptions;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.Point;
import org.influxdb.dto.Query;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
@Slf4j
@SpringBootTest
public class InfluxInitTest {
/**
* 1千万条数据
*/
private final static int ROWS = 10000000;
/**
* 插入initDB数据
*/
@Test
public void initInfluxDB(){
long begin = System.currentTimeMillis();
OkHttpClient.Builder builder = new OkHttpClient().newBuilder()
.connectTimeout(1, TimeUnit.SECONDS)
.readTimeout(10, TimeUnit.SECONDS)
.writeTimeout(10, TimeUnit.SECONDS);
try (InfluxDB influxDB = InfluxDBFactory.connect("http://127.0.0.1:8086",builder)){
String db = "influxtest";
influxDB.query(new Query("DROP DATABASE "+db));
influxDB.query(new Query("CREATE DATABASE "+db));
influxDB.setDatabase(db);
influxDB.enableBatch(BatchOptions.DEFAULTS.actions(10000).flushDuration(1000));
IntStream.rangeClosed(1,ROWS).mapToObj(i -> Point.measurement("m")
.addField("value", ThreadLocalRandom.current().nextInt(10000))
.time(LocalDateTime.now().minusSeconds(5*i).toInstant(ZoneOffset.UTC).toEpochMilli(),
TimeUnit.MICROSECONDS).build()
).forEach(influxDB::write);
influxDB.flush();
}
System.out.println("消耗" + (System.currentTimeMillis()-begin));
}
}
输出
消耗89906
Spring boot 整合 jpa 插入1千万条数据
添加依赖
mysql
mysql-connector-java
建立 mysql 数据库初始类
package com.cll.influxdbtest;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.jdbc.core.JdbcTemplate;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.util.concurrent.ThreadLocalRandom;
@Slf4j
@SpringBootTest
public class MysqlInitTest {
/**
* 1千万条数据
*/
private final static int ROWS = 10000000;
@Autowired
private JdbcTemplate jdbcTemplate;
/**
* mysql 插入
*/
@Test
public void initMysql(){
long begin = System.currentTimeMillis();
jdbcTemplate.execute("DROP TABLE IF EXISTS `mysqltest`;");
//只有ID、值和时间戳三列
jdbcTemplate.execute("CREATE TABLE `mysqltest` (\n" +
" `id` bigint(20) NOT NULL AUTO_INCREMENT,\n" +
" `value` bigint NOT NULL,\n" +
" `time` timestamp NOT NULL,\n" +
" PRIMARY KEY (`id`),\n" +
" KEY `time` (`time`) USING BTREE\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;");
String sql = "INSERT INTO `mysqltest` (`value`,`time`) VALUES (?,?)";
//批量插入数据
jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement preparedStatement, int i) throws SQLException {
preparedStatement.setLong(1, ThreadLocalRandom.current().nextInt(10000));
preparedStatement.setTimestamp(2, Timestamp.valueOf(LocalDateTime.now().minusSeconds(5 * i)));
}
@Override
public int getBatchSize() {
return ROWS;
}
});
log.info("消耗{}", System.currentTimeMillis() - begin);
}
}
输出:
消耗505675
耗时对比:mysql 耗时: 505675
influxdb 耗时:89906 毫秒。
可以看到,插入一千万条数据,influxdb比mysql快了6倍多。
Influx与mysql 查询数据耗时对比对这 1000 万数据进行一个统计,查询最近 60 天的数据,按照 1 小时的时间粒度聚合,统计 value 列的最大值、最小值和平均值
测试代码
package com.cll.influxdbtest;
import lombok.extern.slf4j.Slf4j;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.jdbc.core.JdbcTemplate;
import java.util.List;
import java.util.Map;
/**
* QueryTest
* 对 1000 万数据进行一个统计,查询最近 60 天的数据,按照 1 小时的时间粒度聚合,统
* 计 value 列的最大值、最小值和平均值
**/
@Slf4j
@SpringBootTest
public class QueryTest {
@Autowired
private JdbcTemplate jdbcTemplate;
/**
* 查询influx数据库 耗时2097
*/
@Test
public void influxQuery(){
long begin = System.currentTimeMillis();
try(InfluxDB influxDB = InfluxDBFactory.connect("http://127.0.0.1:8086")){
influxDB.setDatabase("influxtest");
QueryResult query = influxDB.query(new Query("SELECT MEAN(value),MIN(value),MAX(value) FROM m WHERE time > now() - 60d GROUP BY TIME(1h)"));
log.info("耗时{}",System.currentTimeMillis()-begin);
}
}
/**
* 查询mysql数据库 耗时:20323
*/
@Test
public void mysqlQuery(){
long begin = System.currentTimeMillis();
List> maps = jdbcTemplate.queryForList("SELECT date_format(time,'%Y%m%d%H'),max(value),min(value),avg(value) FROM mysqltest WHERE time>now()- INTERVAL 60 DAY GROUP BY date_format(time,'%Y%m%d%H')");
log.info("耗时:{}",System.currentTimeMillis()-begin);
}
}
时间对比:mysql 查询耗时:20323 毫秒
influx 查询耗时:2097 毫秒
influx的查询时间实际显著比mysql快。
总结 :influxDB 的注意事项对于 MySQL 而言,针对大量的数据使用全表扫描的方式来聚合统计指标数据,性能非常差,一般只能作为临时方案来使用。此时,引入 InfluxDB 之类的时间序列数据库,就很有必要了。时间序列数据库可以作为特定场景(比如监控、统计)的主存储,也可以和关系型数据库搭配使用,作为一个辅助数据源,保存业务系统的指标数据
使用场景InfluxDB是因为物联网而兴起的数据库,其天生具有IOT的特性。几乎所有的物联网数据都可以通过InfluxDB存储,分析与展示。
InfluxDB的具体使用场景包括:应用于DevOps监控、IoT监控、实时分析,智慧物联网监控分析系统,传统石油化工、采矿以及制造企业设备数据采集与分析,医疗数据采集与分析,车联网,智慧交通等。InfluxDB同时还可以用于日志数据存储与分析,各种服务、软件以及系统监控数据采集、分析与报警,金融数据采集与分析等。
只要符合写多读少、无事务要求、海量高并发持续写入、基于时间区间聚合分析以及基于时间区间快速查询的数据都可以使用Influx
参考资料: