flink数据同步mysql到hive_Flink 实现同步kafka数据到hive表

KafkaToHive.java 程序入口

import config.GlobConfig;

import com.alibaba.fastjson.JSONObject;

import org.apache.flink.api.common.serialization.SimpleStringSchema;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import utils.CoverTypeToSql;

import utils.SinkHive;

import java.util.List;

import java.util.Properties;

import java.util.stream.Collectors;

/**

* @author 暮雪

* @title: KafkaToHive

* @projectName mysql2hive

* @description: flink -> kafka ==> flink stream ==> hive

* @date 2021.1.21

*/

public class KafkaToHive {

public static void main(String[] args) throws Exception {

// 创建执行环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

GlobConfig.putTableData();

//读取kafka数据

Properties properties = new Properties();

properties.setProperty("bootstrap.servers", GlobConfig.KAFKA_ADDR);

properties.setProperty("group.id", GlobConfig.KAFKA_CUSTOMER_GROUP);

DataStreamSource kafkaDS = env.addSource(

new FlinkKafkaConsumer(

GlobConfig.KAFKA_TOPIC,

new SimpleStringSchema(),

properties)

.setStartFromLatest()

);

// 过滤掉无效数据

SingleOutputStreamOperator jsonData = kafkaDS.filter(json -> {

try {

JSONObject jsonStr = JSONObject.parseObject(json);

String mysqlName = jsonStr.getJSONObject("source").getString("table");

if (mysqlName == null || mysqlName.equals("") || GlobConfig.tableMap.get(mysqlName) == null) {

return false;

}

return true;

} catch (Exception e) {

return false;

}

});

SingleOutputStreamOperator outputStreamOperator = jsonData.map(line -> {

JSONObject jsonObject = JSONObject.parseObject(line);

//数据来源的表名

String tableName = GlobConfig.tableMap.get(jsonObject.getJSONObject("source").getString("table"));

JSONObject afterData = jsonObject.getJSONObject("after");

List columnList = afterData.keySet().stream().collect(Collectors.toList());

//INSERT INTO table (c1,c2...) values (v1,v2..)

String sql = "INSERT INTO " + tableName + "( ";

String sql2 = " VALUES( ";

for (String column : columnList) {

sql += column + ", ";

sql2 += CoverTypeToSql.cover(afterData.getObject(column, Object.class)) + ", ";

}

String s1 = sql.substring(0, sql.length() - 2);

String s2 = sql2.substring(0, sql2.length() - 2);

String s3 = s1 + ") " + s2 + ")";

System.out.println(s3);

return s3;

});

//通过自定义sink 写到hive

outputStreamOperator.addSink(new SinkHive());

env.execute();

}

}

GlobConfig.java 全局配置

import java.util.HashMap;

/**

* @author 暮雪

* @title: GlobConfig

* @projectName mysql2hive

* @description: 配置类

* @date 2021.1.20

*/

public class GlobConfig {

//kafka 参数

public static String KAFKA_ADDR = "computer-28:9092,computer-29:9092,computer-30:9092";

public static String KAFKA_CUSTOMER_GROUP = "customer_muxue"; //可修改

public static String KAFKA_TOPIC = "sync_demo"; //需与数栖平台中建立的Kafka topic保持一致

//hive 连接信息

public static String HIVE_ADDR = "jdbc:hive2://computer-28:10000/data_collection";

public static String HIVE_USER = "admin";

public static String HIVE_PASSWORD = "";

//数据源 表名映射

public static HashMap tableMap = new HashMap<>();

/***

* 装载表对应关系

* tableMap.put("mysql_table_name","hive_table_name");

*/

public static void putTableData() {

//示例对应表

tableMap.put("demo", "sync_demo");

//todo 在这里添加MySQL表与hive表的对应关系

}

}

CoverTypeToSql.java 类型转换工具类

/**

* @author 暮雪

* @title: CoverTypeToSql

* @projectName mysql2hive

* @description: 不同数据类型转为sql

* @date 2021.1.21

*/

public class CoverTypeToSql {

public static String cover(Object value) {

if (value instanceof String || value instanceof Character) {

return "'" + value + "'";

} else {

return value + "";

}

}

}

SinkHive.java 自定义sink类

import config.GlobConfig;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.sql.Connection;

import java.sql.DriverManager;

import java.sql.PreparedStatement;

/**

* @author 暮雪

* @title: SinkHive

* @projectName mysql2hive

* @description: 自定义sink写到hive表中

* @date 2021.1.20

*/

public class SinkHive extends RichSinkFunction implements SinkFunction {

private static String driverName = "org.apache.hive.jdbc.HiveDriver"; //驱动名称

private static String url = GlobConfig.HIVE_ADDR; //

private static String user = GlobConfig.HIVE_USER;

private static String password = GlobConfig.HIVE_PASSWORD;

private Connection connection;

private PreparedStatement statement;

// 1,初始化

@Override

public void open(Configuration parameters) throws Exception {

super.open(parameters);

Class.forName(driverName);

connection = DriverManager.getConnection(url, user, password);

}

// 2,执行

@Override

public void invoke(String value, Context context) throws Exception {

if (value != null && value != "") {

statement = connection.prepareStatement(value);

statement.execute();

}

}

// 3,关闭

@Override

public void close() throws Exception {

super.close();

if (statement != null)

statement.close();

if (connection != null)

connection.close();

}

}


版权声明:本文为weixin_31000273原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。