一 。Sink源
1 官网提供的连接器
在这里插入图片描述
2 bahir提供的连接器 (更新速度落后版本 现在我用的scala 2.12 现在最新的是2.11 在引入的时候会与 自己重新ProcessFunction(会有冲突注 需要用哪个 就 使用哪个就行 这里我继承这个函数主要是为了实现 sideoutput 就是分流 因为在flink 1.12 中 split 和 select 已经移除了)
3 自定义sink
二 代码示例:
import java.sql.{Connection, DriverManager, PreparedStatement}
import guigu.source.PerSource
import guigu.transfrom.SensorReading
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.functions.sink.filesystem._
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
object SinkDemo {
def main (args:Array[String]):Unit={
val env = StreamExecutionEnvironment.getExecutionEnvironment
//val inputStream = env.socketTextStream("localhost",7777)
//下面的这个数据源是我在flink source里面自定义的 如果懒得弄可以使用上面的监听端口号的这种形式来测试方便
val inputStream: DataStream[SensorReading] = env.addSource(new PerSource)
env.setRestartStrategy(RestartStrategies.noRestart())
//往文件里面写入数据 writeAsCsv 这个方式已经被标记了也行在更新就弃用了 这里有一句注释 Please use the * [[org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink]] explicitly using the [[addSink()]] method.
//注意path 路劲文件夹 flink 会在这个路径下在创建一个 日期的文件夹 之后把数据写入示例 图一
//往文件里面写 实现
inputStream.addSink(StreamingFileSink.forRowFormat(new Path("C:\Users\Administrator\Desktop\flinktest\out"),new SimpleStringEncoder[SensorReading]())).builder())
//往kafka里面写入 引入
/* <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.12.1</version>
</dependency>*/
/* val properties =new Perperties;
properties.set("bootstrap.server","localhost:9200")
inputStream.addSink(new FlinkKafkaProducer[String]("quickstart-events",new SimpleStringSchema(),properties)) //引入bahir redis 连接器会冲突报错*/
//redis 引入pom 文件
/*<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>*/
val conf = new FlinkJedisPoolConfig.Builder()
.setHost("localhost")//redis 地址
.setPort(6379) //连接端口
.build()
//MaperRedis 就是你要往redis 里面写入的规则
inputStream.addSink(new RedisSink[SensorReading](conf,new MaperRedis))
//自定义输出sink mysql 需要引入 查看下自己的mysql 的版本 一定要注意版本问题 这个对于我们初学者来说非常致命 或者说怎么按照步骤来 却点不出来任何东西 需要检测版本 和 引入pom 文件的 jar 冲突问题 这里首先 要创建好一个test 数据库然后 创建一个sensor 表 字段以我这个为例 id varchar(30) temp double
/*<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>*/
inputStream.addSink(new MyJdbcSinkFun)
env.execute("textfile ")
}
}
/定义一个mapper
class MaperRedis extends RedisMapper[SensorReading] {
//定义当前写入数据的命令 HSET 表名 实际也是 表是K 其余都是 V key-value
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.HSET,"sensor_tamp")
}
//将温度指定位value
override def getKeyFromData(data: SensorReading): String = {
data.tenperature.toString
}
override def getValueFromData(data: SensorReading): String = {
data.id
}
}
class MyJdbcSinkFun extends RichSinkFunction[SensorReading] {
// 定义连接,预编译语句
var connec:Connection=_;
//预编译语句
var insertStmt: PreparedStatement=_;
var updateStmt: PreparedStatement=_;
//打开预编译
override def open(parameters: Configuration): Unit = {
connec=DriverManager.getConnection("jdbc:mysql://localhost:3306/test","root","root")
insertStmt = connec.prepareStatement("insert into sensor(id,temp) value (?,?)")
updateStmt=connec.prepareStatement("update sensor set temp=? where id=? ")
}
override def invoke(in: SensorReading): Unit = {
//先执行更新操作 ,查到就更新
updateStmt.setString(2,in.id)
updateStmt.setDouble(1,in.tenperature)
updateStmt.execute()
//如果更新没有查到
if(updateStmt.getUpdateCount ==0 ){
insertStmt.setString(1,in.id)
insertStmt.setDouble(2,in.tenperature)
insertStmt.execute()
}
}
override def close(): Unit = {
insertStmt.close()
updateStmt.close()
connec.close()
}
}
图一


版权声明:本文为newbrid007原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。