TDengine 如何批量插入?

【本文正在参与 “拥抱开源 | 涛思数据 TDengine有奖征稿】拥抱开源 涛思数据 TDengine有奖征稿活动-CSDN 链接:https://marketing.csdn.net/p/0ada836ca30caa924b9baae0fd33857c


      最近在使用涛思旗下tdengine数据库的时候,因为需求,需要做一个批量插入,遇到了些问题,记录一下。

正常插入

      在测试时,数据都是单条插入的,代码如下:

package test;

import java.sql.*;

public class jdbcTest {
    public static void main(String[] args) throws ClassNotFoundException, SQLException {
        Class.forName("com.taosdata.jdbc.rs.RestfulDriver");
//        Class.forName("com.taosdata.jdbc.TSDBDriver");

        // j?charset=UTF-8&locale=en_US.UTF-8
        String jdbcUrl = "jdbc:TAOS://:6030/restful_test?user=zhangsan&password=123456";
        Connection conn = DriverManager.getConnection(jdbcUrl);

        Statement stmt = conn.createStatement();
        stmt.executeUpdate("use restful_test");
        System.out.println( 111 );


//        int affectedRows = stmt.executeUpdate("insert into restful_test.l_test values(now,1) ");

        int affectedRows = stmt.executeUpdate("insert into student values(1622685792584, \"zhangsan\", \"lisi\", 90) ");
        System.out.println("insert " + affectedRows + " rows."); 
        stmt.close();
        conn.close(); 
    }
}

      该代码运行正常,可以插入。但是生产环境下这样去插入数据肯定是不妥的,于是尝试批量插入。

批量插入

      根据以前的开发经验,批量插入,使用PrepareStatement即可。代码百度即可,这里不再演示如下:

PreparedStatement ps = conn.prepareStatement(  
   "INSERT into student values (?, ?, ?)");  
  
long l = new Date().getTime();
for (n = 0; n < 100; n++) {  
  
  ps.setLong(l);  
  ps.setString("zhangsan");  
  ps.setString("lisi");  
  ps.setInt(90);  
  l ++;
  ps.addBatch();  
}  
ps.executeBatch();  

      但实际运行时,会报错。

解决错误

查询官方文档

      这个我实在想不通,这么简单的代码,我应该没写错吧。再次检查代码,仍然报错;再次百度,写法仍然是这样的。
      那就查查官方文档吧。
      搜索批量插入后,只搜索到三篇文章。
文档 | 涛思数据 | 连接器 链接:https://www.taosdata.com/cn/documentation/connector
文档 | 涛思数据 | 高效写入数据 链接:https://www.taosdata.com/cn/documentation/insert
文档 | 涛思数据 | 常见问题 链接:https://www.taosdata.com/cn/documentation/faq
      根据标体来看,肯定是高效写入数据比较靠谱。但既然都碰到问题了,就都看一遍吧。

常见问题官方文档

      该篇官方文档比较简短,只找到如下相关描述:

11. 最有效的写入数据的方法是什么?
批量插入。每条写入语句可以一张表同时插入多条记录,也可以同时插入多张表的多条记录。

      虽然简短,但根据这条信息可以确定,tdengine是支持一次插入多条记录的。
      

连接器官方文档

      顾名思义,该篇官方文章主要是讲各种语言的连接器的创建和使用的。找到java的连接器,查看相关官方文档。在Python客户端安装的windows代码示例下,找到如下描述:

import datetime
# 创建数据库
c1.execute('create database db')
c1.execute('use db')
# 建表
c1.execute('create table tb (ts timestamp, temperature int, humidity float)')
# 插入数据
start_time = datetime.datetime(2019, 11, 1)
affected_rows = c1.execute('insert into tb values (\'%s\', 0, 0.0)' %start_time)
# 批量插入数据
time_interval = datetime.timedelta(seconds=60)
sqlcmd = ['insert into tb values']
for irow in range(1,11):
  start_time += time_interval
  sqlcmd.append('(\'%s\', %d, %f)' %(start_time, irow, irow*1.2))
affected_rows = c1.execute(' '.join(sqlcmd))

      所幸之前接触过python,能看懂啥意思。这段代码中的批量插入,并没有使用prepareStatement的方式,而是将sql拼接为insert into tb values(‘xxx’,xxx)(‘xxx’,xxx)的形式,再去执行sql。
      从该篇官方文章中,再没有得到更具体的关于批量插入的信息。

高效写入数据官方文档

      这篇官方文章,看名字就像要找的答案。很容易就找到了如下描述:

高效写入数据
TDengine支持多种接口写入数据,包括SQL, Prometheus, Telegraf, EMQ MQTT Broker, HiveMQ Broker, CSV文件等,后续还将提供Kafka, OPC等接口。数据可以单条插入,也可以批量插入,可以插入一个数据采集点的数据,也可以同时插入多个数据采集点的数据。支持多线程插入,支持时间乱序数据插入,也支持历史数据插入。

      再次确定了连接器官方文章小节中的结论: 以及如下官方描述:tdengine是支持一次插入多条记录的。

SQL写入
应用通过C/C++, JDBC, GO, 或Python Connector 执行SQL insert语句来插入数据,用户还可以通过TAOS Shell,手动输入SQL insert语句插入数据。比如下面这条insert 就将一条记录写入到表d1001中:

INSERT INTO d1001 VALUES (1538548685000, 10.3, 219, 0.31);
TDengine支持一次写入多条记录,比如下面这条命令就将两条记录写入到表d1001中:

INSERT INTO d1001 VALUES (1538548684000, 10.2, 220, 0.23) (1538548696650, 10.3, 218, 0.25);
TDengine也支持一次向多个表写入数据,比如下面这条命令就向d1001写入两条记录,向d1002写入一条记录:

INSERT INTO d1001 VALUES (1538548685000, 10.3, 219, 0.31) (1538548695000, 12.6, 218, 0.33) d1002 VALUES (1538548696800, 12.3, 221, 0.31);
详细的SQL INSERT语法规则请见 TAOS SQL 的数据写入 章节。

      在这里,可以查到一次插入多条数据的sql写法。需要注意的是:多条数据之间并没有用逗号分隔。

      继续向下查看还有如下官方描述:

Tips:

要提高写入效率,需要批量写入。一批写入的记录条数越多,插入效率就越高。但一条记录不能超过16K,一条SQL语句总长度不能超过64K(可通过参数maxSQLLength配置,最大可配置为1M)。

TDengine支持多线程同时写入,要进一步提高写入速度,一个客户端需要打开20个以上的线程同时写。但线程数达到一定数量后,无法再提高,甚至还会下降,因为线程切频繁切换,带来额外开销。

对同一张表,如果新插入记录的时间戳已经存在,默认(没有使用 UPDATE 1 创建数据库)新记录将被直接抛弃,也就是说,在一张表里,时间戳必须是唯一的。如果应用自动生成记录,很有可能生成的时间戳是一样的,这样,成功插入的记录条数会小于应用插入的记录条数。如果在创建数据库时使用 UPDATE 1 选项,插入相同时间戳的新记录将覆盖原有记录。

写入的数据的时间戳必须大于当前时间减去配置参数keep的时间。如果keep配置为3650天,那么无法写入比3650天还老的数据。写入数据的时间戳也不能大于当前时间加配置参数days。如果days配置为2,那么无法写入比当前时间还晚2天的数据。

      这里包含很重要的信息:要提高写入效率,需要批量写入。一批写入的记录条数越多,插入效率就越高。但一条记录不能超过16K,一条SQL语句总长度不能超过64K(可通过参数maxSQLLength配置,最大可配置为1M)。 以及: TDengine支持多线程同时写入,要进一步提高写入速度,一个客户端需要打开20个以上的线程同时写。
      虽然与批量插入无关,但这条信息也很重要:对同一张表,如果新插入记录的时间戳已经存在,默认(没有使用 UPDATE 1 创建数据库)新记录将被直接抛弃,也就是说,在一张表里,时间戳必须是唯一的。

最终结论

      tdengine是支持批量插入的,但只支持sql形式的批量插入:insert into tb values(xxx,xxx)(xxx,xxx)。最重要的是:一条数据不能超过16K,一条sql不能超过64K(可配置)。如果需要更快的写入速度,可以考虑多线程同时写入;官方文档对线程数量的描述为:20个以上。
      同时,需要注意:没有使用 UPDATE 1 创建数据库,同一张表中,若需插入的数据的时间戳已存在,则该条需插入的数据被废弃。

修改后的代码

      经过如上文章查询后,最终代码如下:
      (用scala写的,语言不是问题,理解思路即可,注释也还是挺够用的)

  def insert(size: Int, dataArray: Array[Row]): Unit ={

    // j?charset=UTF-8&locale=en_US.UTF-8
    val jdbcUrl = "jdbc:TAOS://:6030/restful_test?user=zhangsan&password=123456"
    val conn = DriverManager.getConnection(jdbcUrl)


    var ts = System.currentTimeMillis();
    val sql = "insert into restful_test.student values"
    var jdbcSql = new StringBuffer (sql )
    val stmt = conn.createStatement();


	// 当前一条sql插入的数据量
    var i = 0;
    var start = 0L
    var end = 0L
    // 共插入数据量
    var allCount = 0
	// 插入数据总消耗时间
    var allTime = 0L

    for ( row <- dataArray ) {
      i = i + 1
      // 同一张表中,若需插入的数据的时间戳已存在,则该条需插入的数据被废弃。所以需要手动该表ts。
      ts = ts + 1l;
      jdbcSql.append(
        "(%d, \"%s\", \"%s\", %f)".format(ts, row.getString(0), row.getString(1), row.getDouble(2))
      )

	  // 保证一条sql不超过16K,大于15K时就执行
      if( jdbcSql.length > 15 * 1024 ){
        start = System.currentTimeMillis();
        stmt.executeUpdate(jdbcSql.toString)
        end = System.currentTimeMillis();
        println( i  + ", " + ( end - start) )
        allCount += i
        allTime += ( end - start)
        i = 0
        jdbcSql = new StringBuffer(sql )
      }

    }

	// 并不能保证数据中最后一条sql也大于15K,如果不添加下面的执行逻辑,则最后一条sql不会执行,会导致数据没有完全插入。
    start = System.currentTimeMillis();
    stmt.executeUpdate(jdbcSql.toString)
    end = System.currentTimeMillis();

    allCount += i
    allTime += ( end - start)
    println( i  + ", " + ( end - start) )
    println( allCount + ", " + allTime )

    println( "  " + size + ",  -----------------" )
    println( "-----------------------, ------------------------------------")

    stmt.close()
    conn.close()
  }

      【本文正在参与 “拥抱开源 | 涛思数据 TDengine有奖征稿】拥抱开源 涛思数据 TDengine有奖征稿活动-CSDN 链接:https://marketing.csdn.net/p/0ada836ca30caa924b9baae0fd33857c


版权声明:本文为weixin_42845682原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。