通过copyIn向PostgreSQL或Greenplum写入数据

     最近需要向PG写入百万级的数据,分别尝试了Spark、jdbc写法,发现性能都不是特别好,差不多都得花上几分钟,有可能是PG本身没有进行优化,或者编程过程中有欠缺的地方。总之,达不到业务需求。后面改用PG的copyIn方法,发现写入百万数据只需要秒级别。下面将demo代码附上,与君共享,有不足的地方,万望指正。

pom依赖:

<dependency>
    <groupId>org.postgresql</groupId>
    <artifactId>postgresql</artifactId>
    <version>42.2.6</version>
</dependency>

注意:虽然GP的依赖和PG有区别(具体区别请各位自行查阅资料),但是GP的依赖包里面并没有copyIn方法。所以如果要用copyIn方法向GP中写入数据,依赖要用PG的。

导入方法:

import java.io.{ByteArrayInputStream, InputStream}
import java.nio.charset.StandardCharsets
import java.sql.DriverManager
import org.postgresql.copy.CopyManager
import org.postgresql.core.BaseConnection

 demo代码:

object CopyInTest {
  def main(args: Array[String]): Unit = {
    val tableName = "copyIn_test"
    val data: Array[Array[String]] = Array(Array("1111","ddddd","2019-10-30"))
    Class.forName("org.postgresql.Driver")
    val conn = DriverManager.getConnection("jdbc:postgresql://localhost:5432/test","postgres", "postgres")
    val manager = new CopyManager(conn.asInstanceOf[BaseConnection])
    manager.copyIn(s"COPY $tableName from STDIN",genInputStream(data))
    conn.close()
  }

  /**
    * 将数据转为字节流
    * @param arr
    * @return
    */
  def genInputStream(arr: Array[Array[String]]): InputStream = {
    val stringBuilder = new StringBuilder
    if (arr.length != 0) {
      val rowcount = arr.length
      val columncount = arr(0).length
      for (i <- 0 until rowcount; j <- 0 until columncount) {
        stringBuilder.append(arr(i)(j) + (if (j == columncount-1) "\r\n" else "\t"))
      }
    }
    val str = stringBuilder.toString
    new ByteArrayInputStream(str.getBytes(StandardCharsets.UTF_8))//注意设置与数据库对应的编码格式
  }
}

     最后再分享一个小知识点,在实际操作中发现GP可以使用PG的依赖对数据库进行操作(如建表、写入数据等),但是在查询数据的时候,当列数超过30之后查询程序就会卡住。所以建议除了copyIn这种操作之外,对GP的操作还是用GP自己的依赖。

GP依赖:

<dependency>
    <groupId>com.pivotal</groupId>
    <artifactId>greenplum-jdbc</artifactId>
    <version>5.1.4</version>
</dependency>

 


版权声明:本文为Programmer_Xiaoming原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。