Spark写数据存入MySQL
依赖:
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.25</version>
</dependency>
</dependencies>
PS:Spark依赖的版本注意要对应,否则可能出现类缺失的问题
导包:
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;
import java.util.*;
java代码:
public class toMySQL {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("spark_mysql");
//创建一个SparkSession类型的spark对象
SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate();
//转成JavaSparkContext对象
JavaSparkContext sc = new JavaSparkContext(sparkSession.sparkContext());
sc.setLogLevel("ERROR");
//创建Properties对象
Properties properties = new Properties();
properties.setProperty("user", "root"); // 用户名
properties.setProperty("password", "123456"); // 密码
properties.setProperty("driver", "com.mysql.cj.jdbc.Driver");
properties.setProperty("numPartitions","10");
//创建数据
JavaRDD<String> rdd = sc.textFile("datas/1.txt");
JavaRDD<String> rdd1 = rdd.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
String s1 = s.replace(",", "").replace(".", "").replace("?", "");
return Arrays.asList(s1.split(" ")).iterator();
}
});
JavaPairRDD<String, Integer> rdd2 = rdd1.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<>(s, 1);
}
});
JavaPairRDD<String, Integer> rdd3 = rdd2.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
});
//将数据转化成Row对象形式,用以存入数据库
JavaRDD<Row> rdd4 = rdd3.map(new Function<Tuple2<String, Integer>, Row>() {
@Override
public Row call(Tuple2<String, Integer> ss) throws Exception {
return RowFactory.create(ss._1, ss._2);
}
});
//动态构造DataFrame元数据
List structFields = new ArrayList();
structFields.add(DataTypes.createStructField("word", DataTypes.StringType,true));
structFields.add(DataTypes.createStructField("count",DataTypes.IntegerType,true));
//构建StructType,用于最后DataFrame元数据的描述
StructType scheme = DataTypes.createStructType(structFields);
//创建临时视图
Dataset<Row> dataFrame = sparkSession.createDataFrame(rdd4, scheme);
//将数据写入对应数据库对应的表中
dataFrame.write().mode(SaveMode.Overwrite).jdbc("jdbc:mysql://localhost:3306/test","words1",properties);
sparkSession.stop();
}
}
版权声明:本文为weixin_45264992原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。