package ToMysql
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.{DataStream, SingleOutputStreamOperator}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
import java.util.Properties
object MysqlTest {
def main(args: Array[String]): Unit = {
// 创建环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 并行度为1
env.setParallelism(1)
// 连接kafka
val properties = new Properties()
properties.setProperty("bootstrap.servers","node2:9092")
properties.setProperty("zookeeper.connect","node2:2181")
properties.setProperty("group.id","jos")
// 消费kafka
val KafkaTest: DataStream[String] = env.addSource(new FlinkKafkaConsumer08[String]("test",new SimpleStringSchema(),properties))
KafkaTest.print()
val map: SingleOutputStreamOperator[student] = KafkaTest.map(new MapFunction[String, student]() {
override def map(value: String): student = {
val split: Array[String] = value.split(" ")
// new 一个表的实体类
val stu: student = new student
// 赋值
stu.setId(split(0))
stu.setName(split(1))
stu.setAge(split(2).toInt)
stu
}
})
map.addSink(new SinkToMySql())
env.execute("MyJdbcSink2")
}
class SinkToMySql() extends RichSinkFunction[student] {
var conn: Connection = null;
var ps: PreparedStatement = null
val driver = "com.mysql.jdbc.Driver"
val url: String = "jdbc:mysql://node01:3306/test"
val username = "root"
val password = "root"
val maxActive = "20"
//初始化的操作
override def open(parameters: Configuration): Unit = {
super.open(parameters)
super.open(parameters)
Class.forName("com.mysql.jdbc.Driver")
conn = DriverManager.getConnection(url, username, password)
conn.setAutoCommit(false)
}
//反复调用的函数
override def invoke(value: student): Unit = {
val sql: String = "insert into student(name,age) values(?,?)"
ps = conn.prepareStatement(sql)
//ps.setString(0,value.getId)
ps.setString(1,value.getName)
ps.setString(2,value.getAge.toString)
ps.execute()
conn.commit()
}
override def close(): Unit = {
super.close()
if (conn != null) {
conn.close()
}
}
}
}
版权声明:本文为weixin_44694973原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。