Spark写数据存入MySQL

Spark写数据存入MySQL

依赖:

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.1.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>3.1.2</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.25</version>
        </dependency>
    </dependencies>

PS:Spark依赖的版本注意要对应,否则可能出现类缺失的问题

导包:

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;
import java.util.*;

java代码:

public class toMySQL {
    public static void main(String[] args) {

        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("spark_mysql");
        //创建一个SparkSession类型的spark对象
        SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate();
        //转成JavaSparkContext对象
        JavaSparkContext sc = new JavaSparkContext(sparkSession.sparkContext());
        sc.setLogLevel("ERROR");

        //创建Properties对象
        Properties properties = new Properties();
        properties.setProperty("user", "root"); // 用户名
        properties.setProperty("password", "123456"); // 密码
        properties.setProperty("driver", "com.mysql.cj.jdbc.Driver");
        properties.setProperty("numPartitions","10");


        //创建数据
        JavaRDD<String> rdd = sc.textFile("datas/1.txt");

        JavaRDD<String> rdd1 = rdd.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) throws Exception {
                String s1 = s.replace(",", "").replace(".", "").replace("?", "");
                return Arrays.asList(s1.split(" ")).iterator();
            }
        });

        JavaPairRDD<String, Integer> rdd2 = rdd1.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<>(s, 1);
            }
        });

        JavaPairRDD<String, Integer> rdd3 = rdd2.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        });

        //将数据转化成Row对象形式,用以存入数据库
        JavaRDD<Row> rdd4 = rdd3.map(new Function<Tuple2<String, Integer>, Row>() {
            @Override
            public Row call(Tuple2<String, Integer> ss) throws Exception {
                return RowFactory.create(ss._1, ss._2);
            }
        });

        //动态构造DataFrame元数据
        List structFields = new ArrayList();
        structFields.add(DataTypes.createStructField("word", DataTypes.StringType,true));
        structFields.add(DataTypes.createStructField("count",DataTypes.IntegerType,true));

        //构建StructType,用于最后DataFrame元数据的描述
        StructType scheme = DataTypes.createStructType(structFields);
        //创建临时视图
        Dataset<Row> dataFrame = sparkSession.createDataFrame(rdd4, scheme);
        //将数据写入对应数据库对应的表中
        dataFrame.write().mode(SaveMode.Overwrite).jdbc("jdbc:mysql://localhost:3306/test","words1",properties);
        sparkSession.stop();
    }
}

版权声明:本文为weixin_45264992原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。