最近需要向PG写入百万级的数据,分别尝试了Spark、jdbc写法,发现性能都不是特别好,差不多都得花上几分钟,有可能是PG本身没有进行优化,或者编程过程中有欠缺的地方。总之,达不到业务需求。后面改用PG的copyIn方法,发现写入百万数据只需要秒级别。下面将demo代码附上,与君共享,有不足的地方,万望指正。
pom依赖:
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.2.6</version>
</dependency>注意:虽然GP的依赖和PG有区别(具体区别请各位自行查阅资料),但是GP的依赖包里面并没有copyIn方法。所以如果要用copyIn方法向GP中写入数据,依赖要用PG的。
导入方法:
import java.io.{ByteArrayInputStream, InputStream}
import java.nio.charset.StandardCharsets
import java.sql.DriverManager
import org.postgresql.copy.CopyManager
import org.postgresql.core.BaseConnectiondemo代码:
object CopyInTest {
def main(args: Array[String]): Unit = {
val tableName = "copyIn_test"
val data: Array[Array[String]] = Array(Array("1111","ddddd","2019-10-30"))
Class.forName("org.postgresql.Driver")
val conn = DriverManager.getConnection("jdbc:postgresql://localhost:5432/test","postgres", "postgres")
val manager = new CopyManager(conn.asInstanceOf[BaseConnection])
manager.copyIn(s"COPY $tableName from STDIN",genInputStream(data))
conn.close()
}
/**
* 将数据转为字节流
* @param arr
* @return
*/
def genInputStream(arr: Array[Array[String]]): InputStream = {
val stringBuilder = new StringBuilder
if (arr.length != 0) {
val rowcount = arr.length
val columncount = arr(0).length
for (i <- 0 until rowcount; j <- 0 until columncount) {
stringBuilder.append(arr(i)(j) + (if (j == columncount-1) "\r\n" else "\t"))
}
}
val str = stringBuilder.toString
new ByteArrayInputStream(str.getBytes(StandardCharsets.UTF_8))//注意设置与数据库对应的编码格式
}
}最后再分享一个小知识点,在实际操作中发现GP可以使用PG的依赖对数据库进行操作(如建表、写入数据等),但是在查询数据的时候,当列数超过30之后查询程序就会卡住。所以建议除了copyIn这种操作之外,对GP的操作还是用GP自己的依赖。
GP依赖:
<dependency>
<groupId>com.pivotal</groupId>
<artifactId>greenplum-jdbc</artifactId>
<version>5.1.4</version>
</dependency>
版权声明:本文为Programmer_Xiaoming原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。