方式一 经过JDBCOutputFormat
在flink中没有现成的用来写入MySQL的sink,可是flink提供了一个类,JDBCOutputFormat,经过这个类,若是你提供了jdbc的driver,则能够当作sink使用。java
JDBCOutputFormat实际上是flink的batch api,但也能够用来做为stream的api使用,社区也推荐经过这种方式来进行。mysql
JDBCOutputFormat用起来很简单,只须要一个prepared statement,driver和database connection,就能够开始使用了。git
1 JDBCOutputFormat jdbcOutput =JDBCOutputFormat.buildJDBCOutputFormat()2 .setDrivername("com.mysql.jdbc.Driver")3 .setDBUrl("jdbc:mysql://localhost:1234/test?user=xxx&password=xxx")4 .setQuery(query)5 .finish();
以下的sql语句能够做为prepared statement:github
String query = "INSERT INTO public.cases (caseid, tracehash) VALUES (?, ?)";
对应的表的结构:sql
1 CREATE TABLE cases2 (3 caseid VARCHAR(255),4 tracehash VARCHAR(255)5 );
但有一点要明确,JDBCOutputFormat只能处理Row,而Row是对prepared statement的参数的一个包装类。这意味着咱们须要将流中的case转换为row,经过map就能作的。数据库
1 DataStream cases =...2
3 DataStream rows = cases.map((MapFunction) aCase ->{4 Row row = new Row(2); //our prepared statement has 2 parameters
5 row.setField(0, aCase.getId()); //first parameter is case ID
6 row.setField(1, aCase.getTraceHash()); //second paramater is tracehash
7 returnrow;8 });
这样,咱们就能添加sink了:apache
1 rows.writeUsingOutputFormat(jdbcOutput);
这样,你就能够将数据写入mysql了。api
可是在你在流上附加了窗口以后,可能会获得下面的报错:app
1 "Unknown column type for column %s. Best effort approach to set its value: %s."
由于窗口处理的类型,没有明确的类型定义,以下修改以前的定义,显式的指定类型:ide
1 JDBCOutputFormat jdbcOutput =JDBCOutputFormat.buildJDBCOutputFormat()2 .setDrivername("com.mysql.jdbc.Driver")3 .setDBUrl("jdbc:mysql://localhost:1234/test?user=xxx&password=xxx")4 .setQuery(query)5 .setSqlTypes(new int[] { Types.VARCHAR, Types.VARCHAR }) //set the types
6 .finish();
JDBCOutputFormat has a batchInterval, which you can specify on the JDBCOutputFormatBuilder. If, however, I specify a batch interval of 5000, I would potentially never write anything to the database, or wait a very long time until anything was written.
JDBCOutputFormat 还有一个颇有用的参数,batchInterval,见名知意,就是多少数据提交一次,尽可能高效率的向数据库提交数据。固然还有好比timeout等其余参数,能够探索。
方式二 经过自定义sink提交
咱们经过继承RichSinkFunction来实现自定义sink:
1 public class RichCaseSink extends RichSinkFunction{2
3 private static final String UPSERT_CASE = "INSERT INTO public.cases (caseid, tracehash) "
4 + "VALUES (?, ?) "
5 + "ON CONFLICT (caseid) DO UPDATE SET "
6 + " tracehash=?";7
8 privatePreparedStatement statement;9
10
11 @Override12 public void invoke(Case aCase) throwsException {13
14 statement.setString(1, aCase.getId());15 statement.setString(2, aCase.getTraceHash());16 statement.setString(3, aCase.getTraceHash());17 statement.addBatch();18 statement.executeBatch();19 }20
21 @Override22 public void open(Configuration parameters) throwsException {23 Class.forName("com.mysql.jdbc.Driver");24 Connection connection =
25 DriverManager.getConnection("jdbc:mysql://localhost:5432/casedb?user=signavio&password=signavio");26
27 statement =connection.prepareStatement(UPSERT_CASE);28 }29
30 }
这样,就能够在流上添加sink 了:
1 DataStream cases =...2 cases.addSink(new RichCaseSink());
固然,上面的实现很简略,没有给出批量提交或者超时提交,这个均可以很容易的添加,好比close()中关闭链接。
可是上面的实现中,最大的问题仍是没有跟flink的状态管理相结合,这个才是重头戏。
方式二 增强版的自定义sink
在checkpoint的时候保存数据,继承接口CheckpointedFunction :
1 @Override2 public void snapshotState(FunctionSnapshotContext context) throwsException {3 long checkpointId =context.getCheckpointId();4 List cases =pendingCasesPerCheckpoint.get(checkpointId);5 if(cases == null){6 cases = new ArrayList<>();7 pendingCasesPerCheckpoint.put(checkpointId, cases);8 }9 cases.addAll(pendingCases);10 pendingCases.clear();11 }
在消息到达的时候不插入数据,只是留存数据:
1 @Override2 public void invoke(Case aCase) throwsException {3 pendingCases.add(aCase);4 }
这样,经过继承CheckpointListener,咱们就能在某个checkpoint完成的时候插入数据:
1 @Override2 public void notifyCheckpointComplete(long checkpointId) throwsException {3
4 Iterator>> pendingCheckpointsIt =
5 pendingCasesPerCheckpoint.entrySet().iterator();6
7 while(pendingCheckpointsIt.hasNext()) {8
9 Map.Entry> entry =pendingCheckpointsIt.next();10 Long pastCheckpointId =entry.getKey();11 List pendingCases =entry.getValue();12
13 if (pastCheckpointId <=checkpointId) {14
15 for(Case pendingCase : pendingCases) {16 statement.setString(1, pendingCase.getId());17 statement.setString(2, pendingCase.getTraceHash());18 statement.setString(3, pendingCase.getTraceHash());19 statement.addBatch();20 }21 pendingCheckpointsIt.remove();22 }23 }24 statement.executeBatch();25
26 }
前提,是须要设置checkpoint,好比:
ExecutionEnvironment env =...
env.enableCheckpointing(10000L);
这样,每隔10s,当一个checkpoint作成功,就会插入一次数据。
固然,上面的代码验证可用,但不建议在生产环境使用,生产环境须要考虑更多的问题。