flink写入mysql(mysqlSink)

  • 1.所需依赖
   <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.38</version>
    </dependency>
  • 2.获取mysqlSink(创建JDBCSinkFunction-直接new)
import com.leneovo.util.JDBCUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.PreparedStatement;

public class JDBCSinkFunction extends RichSinkFunction<Tuple2<String, Long>> {

    private Connection conn = null;
    private PreparedStatement pst;

    @Override
    public void open(Configuration parameters) throws Exception {
        conn = JDBCUtils.getConnection();
        pst = conn.prepareStatement(
                "INSERT INTO test_sink (content, event_time, update_time) VALUES (?, ?, ?)"
        );
    }

    @Override
    public void close() throws Exception {
        conn.close();
    }

    @Override
    public void invoke(Tuple2<String, Long> value, Context context) throws Exception {
        // 执行插入
        pst.setString(1, value.f0);
        pst.setLong(2, value.f1);
        pst.setLong(3, System.currentTimeMillis());
        pst.execute();
    }

    // 批量插入-示例
    /*
    private int count = 0;

    @Override
    public void invoke(Tuple2<String, Long> value, Context context) throws Exception {
        // 执行插入
        pst.setString(1, value.f0);
        pst.setLong(2, value.f1);
        pst.setLong(3, System.currentTimeMillis());
        pst.addBatch();

        count++;

        // 每1000条记录插入一次
        if (count == 1000){
            pst.executeBatch();
            conn.commit(); // 进行手动提交
            pst.clearBatch();
            count = 0;
        }
        // 记得在open处关闭自动提交[conn.setAutoCommit(false);]
    }
    */

}

版权声明:本文为qq_37195258原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。