- 1.所需依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
- 2.获取mysqlSink(创建JDBCSinkFunction-直接new)
import com.leneovo.util.JDBCUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.PreparedStatement;
public class JDBCSinkFunction extends RichSinkFunction<Tuple2<String, Long>> {
private Connection conn = null;
private PreparedStatement pst;
@Override
public void open(Configuration parameters) throws Exception {
conn = JDBCUtils.getConnection();
pst = conn.prepareStatement(
"INSERT INTO test_sink (content, event_time, update_time) VALUES (?, ?, ?)"
);
}
@Override
public void close() throws Exception {
conn.close();
}
@Override
public void invoke(Tuple2<String, Long> value, Context context) throws Exception {
// 执行插入
pst.setString(1, value.f0);
pst.setLong(2, value.f1);
pst.setLong(3, System.currentTimeMillis());
pst.execute();
}
// 批量插入-示例
/*
private int count = 0;
@Override
public void invoke(Tuple2<String, Long> value, Context context) throws Exception {
// 执行插入
pst.setString(1, value.f0);
pst.setLong(2, value.f1);
pst.setLong(3, System.currentTimeMillis());
pst.addBatch();
count++;
// 每1000条记录插入一次
if (count == 1000){
pst.executeBatch();
conn.commit(); // 进行手动提交
pst.clearBatch();
count = 0;
}
// 记得在open处关闭自动提交[conn.setAutoCommit(false);]
}
*/
}
版权声明:本文为qq_37195258原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。