flink写入 mysql_flink写入mysql的两种方式

方式一 通过JDBCOutputFormat

在flink中没有现成的用来写入MySQL的sink,但是flink提供了一个类,JDBCOutputFormat,通过这个类,如果你提供了jdbc的driver,则可以当做sink使用。

JDBCOutputFormat其实是flink的batch api,但也可以用来作为stream的api使用,社区也推荐通过这种方式来进行。

JDBCOutputFormat用起来很简单,只需要一个prepared statement,driver和database connection,就可以开始使用了。

JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()

.setDrivername(“com.mysql.jdbc.Driver”)

.setDBUrl(“jdbc:mysql://localhost:1234/test?user=xxx&password=xxx”)

.setQuery(query)

.finish();

如下的sql语句可以作为prepared statement:

String query = “INSERT INTO public.cases (caseid, tracehash) VALUES (?, ?)”;

对应的表的结构:

CREATE TABLE cases

(

caseid VARCHAR(255),

tracehash VARCHAR(255)

);

但有一点要明确,JDBCOutputFormat只能处理Row,而Row是对prepared statement的参数的一个包装类。这意味着我们需要将流中的case转换为row,通过map就能做的。

DataStream cases = …

DataStream rows = cases.map((MapFunction) aCase -> {

Row row = new Row(2); // our prepared statement has 2 parameters

row.setField(0, aCase.getId()); //first parameter is case ID

row.setField(1, aCase.getTraceHash()); //second paramater is tracehash

return row;

});

这样,我们就能添加sink了:

rows.writeUsingOutputFormat(jdbcOutput);

这样,你就可以将数据写入mysql了。

但是在你在流上附加了窗口之后,可能会得到下面的报错:

“Unknown column type for column %s. Best effort approach to set its value: %s.”

因为窗口处理的类型,没有明确的类型定义,如下修改之前的定义,显式的指定类型:

JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()

.setDrivername(“com.mysql.jdbc.Driver”)

.setDBUrl(“jdbc:mysql://localhost:1234/test?user=xxx&password=xxx”)

.setQuery(query)

.setSqlTypes(new int[] { Types.VARCHAR, Types.VARCHAR }) //set the types

.finish();

JDBCOutputFormat has a batchInterval, which you can specify on the JDBCOutputFormatBuilder. If, however, I specify a batch interval of 5000, I would potentially never write anything to the database, or wait a very long time until anything was written.

JDBCOutputFormat 还有一个很有用的参数,batchInterval,见名知意,就是多少数据提交一次,尽量高效率的向数据库提交数据。当然还有比如timeout等其他参数,可以探索。

方式二 通过自定义sink提交

我们通过继承RichSinkFunction来实现自定义sink:

public class RichCaseSink extends RichSinkFunction {

private static final String UPSERT_CASE = “INSERT INTO public.cases (caseid, tracehash) “

+ “VALUES (?, ?) “

+ “ON CONFLICT (caseid) DO UPDATE SET “

+ ” tracehash=?”;

private PreparedStatement statement;

@Override

public void invoke(Case aCase) throws Exception {

statement.setString(1, aCase.getId());

statement.setString(2, aCase.getTraceHash());

statement.setString(3, aCase.getTraceHash());

statement.addBatch();

statement.executeBatch();

}

@Override

public void open(Configuration parameters) throws Exception {

Class.forName(“com.mysql.jdbc.Driver”);

Connection connection =

DriverManager.getConnection(“jdbc:mysql://localhost:5432/casedb?user=signavio&password=signavio”);

statement = connection.prepareStatement(UPSERT_CASE);

}

}

这样,就可以在流上添加sink 了:

DataStream cases = …

cases.addSink(new RichCaseSink());

当然,上面的实现很简略,没有给出批量提交或者超时提交,这个都可以很容易的添加,比如close()中关闭连接。

但是上面的实现中,最大的问题还是没有跟flink的状态管理相结合,这个才是重头戏。

方式二 加强版的自定义sink

在checkpoint的时候保存数据,继承接口CheckpointedFunction :

@Override

public void snapshotState(FunctionSnapshotContext context) throws Exception {

long checkpointId = context.getCheckpointId();

List cases = pendingCasesPerCheckpoint.get(checkpointId);

if(cases == null){

cases = new ArrayList<>();

pendingCasesPerCheckpoint.put(checkpointId, cases);

}

cases.addAll(pendingCases);

pendingCases.clear();

}

在消息到达的时候不插入数据,只是留存数据:

@Override

public void invoke(Case aCase) throws Exception {

pendingCases.add(aCase);

}

这样,通过继承CheckpointListener,我们就能在某个checkpoint完成的时候插入数据:

@Override

public void notifyCheckpointComplete(long checkpointId) throws Exception {

Iterator>> pendingCheckpointsIt =

pendingCasesPerCheckpoint.entrySet().iterator();

while (pendingCheckpointsIt.hasNext()) {

Map.Entry> entry = pendingCheckpointsIt.next();

Long pastCheckpointId = entry.getKey();

List pendingCases = entry.getValue();

if (pastCheckpointId <= checkpointId) {

for (Case pendingCase : pendingCases) {

statement.setString(1, pendingCase.getId());

statement.setString(2, pendingCase.getTraceHash());

statement.setString(3, pendingCase.getTraceHash());

statement.addBatch();

}

pendingCheckpointsIt.remove();

}

}

statement.executeBatch();

}

前提,是需要设置checkpoint,比如:

ExecutionEnvironment env = …

env.enableCheckpointing(10000L);

这样,每隔10s,当一个checkpoint做成功,就会插入一次数据。

当然,上面的代码验证可用,但不建议在生产环境使用,生产环境需要考虑更多的问题。

flask 操作mysql的两种方式-sqlalchemy操作

flask 操作mysql的两种方式-sqlalchemy操作 二.ORM sqlalchemy操作 #coding=utf-8 # model.py from app import db class …

flask 操作mysql的两种方式-sql操作

flask 操作mysql的两种方式-sql操作 一.用常规的sql语句操作 # coding=utf-8 # model.py import MySQLdb def get_conn(): conn …

C&plus;&plus;连接mysql的两种方式&lpar;ADO连接和mysql api连接&rpar;

一.ADO连接mysql 1.安装mysql-5.5.20-win32.msi和mysql-connector-odbc-5.3.4-win32.msi(一般两个安装程序要匹配,否则可能连接不上)  …

MySql入门(2-1)windows下安装mysql的两种方式

一.下载mysql 1.下载解压MySQL 登录oracle主页,需要用户名和口令: lshengqi@netease.com/1wsx**** 下载路径:: https://dev.mysql.co …

spark2&period;2jdbc写入mysql 的两种方法(append&comma;Overriedwrite&rpar;-不用Mysql建表

import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.{SQLContext, SaveMode} …

Spark:DataFrame批量导入Hbase的两种方式&lpar;HFile、Hive&rpar;

Spark处理后的结果数据resultDataFrame可以有多种存储介质,比较常见是存储为文件.关系型数据库,非关系行数据库. 各种方式有各自的特点,对于海量数据而言,如果想要达到实时查询的目的,使 …

不停止MySQL服务增加从库的两种方式

不停止MySQL服务增加从库的两种方式 转载自:http://lizhenliang.blog.51cto.com/7876557/1669829 现在生产环境MySQL数据库是一主一从,由于业务量访 …

mysql级联更新的两种方式:触发器更新和外键

1.mysql级联更新有两种方式:触发器更新和外键更新. 2.触发器更新和外键更新的目的都是为了保证数据完整性. 我们通常有这样的需求:删除表Table 1中记录,需要同时删除其它表中与Table 1 …

不停止MySQL服务增加从库的两种方式【转载】

现在生产环境MySQL数据库是一主一从,由于业务量访问不断增大,故再增加一台从库.前提是不能影响线上业务使用,也就是说不能重启MySQL服务,为了避免出现其他情况,选择在网站访问量低峰期时间段操作. …

随机推荐

Golang游戏服务器

我对和GOLANG写MMO服务器的一些遐想: 1.沙盒(隔离性) SKYNET:原生LUA STATE作为沙盒, 进行服务器间隔离安全性高: 服务可以很容易的配置到不同节点之上. GO:估计用RECO …

java集合框架之List

一.List: 1.  特有的常见方法:(有个共性特点就是都可以操作角标) (1).添加 void add(int Index , E element):在list的指定位置插入元素 void add …

set和map和pair 转自ACdreamers

set与map容器         分类:             C/C++              2013-08-25 19:21     560人阅读     评论(0)     收藏    …

codeforces &num;305 div1 done

总算搞定了这一场比赛的题目,感觉收获蛮大 其中A,B,C都能通过自己的思考解决掉 D题思路好神,E题仔细想想也能想出来 以后坚持每两天或者一天做一场CF的div1的全套题目 除非有实在无法做出来的题目 …

XCOPY&colon; Access denied

用 XCOPY 拷贝文件,出现 “Access denied” 提示信息. 原因: 在如上拷贝的目标路径下,存在 ReadMe.txt, 并且是 只读 文件,这种情况下,需要增加一个 拷贝选项: /R …

pycharm(windows)安装及其设置中文菜单

pycharm(windows)安装及其设置中文菜单 1.下载 在官网(http://www.jetbrains.com/pycharm/download/#section=windows)进行下载 …

如何通过rsync&plus;sersync 实现同步备份

3.rsync+sersync更快更节约资源实现web数据同步4.unison+inotify实现web数据双向同步 一:为什么要实现同步备份 服务器上有些重要文件或数据时,可以把他们多备份一份到其他 …

apollo在liunx环境实战(三)

1. apollo在liunx环境实战(三) 1.1. 准备 下载apollo源码 https://github.com/ctripcorp/apollo 1.2. 创建数据库 在自己的liunx环境 …

&lbrack;T-ARA&rsqb;&lbrack;ORGR&rsqb;

歌词来源:http://music.163.com/#/song?id=29343993 作曲 : 4번타자/에스킴 [作曲 : 4p/beon-Ta-c/ja-/e-seu-Kim] 作词 : 4번 …

NuGet的几个小技巧(转)

NuGet的几个小技巧   因为可视化库程序包管理器的局限性,有很多需要的功能在界面中无法完成. 以下技巧均需要在“程序包管理器控制台”中使用命令来完成. 一.改变项目目标框架后,更新程序包 当改变项 …


版权声明:本文为weixin_39933414原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。