Spark向Mysql读写数据

初始操作

1.创建数据库 bigdata0407

2.创建表

CREATE TABLE `user` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `username` varchar(32) NOT NULL COMMENT '用户名称',
  `birthday` date DEFAULT NULL COMMENT '生日',
  `sex` char(1) DEFAULT NULL COMMENT '性别',
  `address` varchar(256) DEFAULT NULL COMMENT '地址',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

向MySql写入数据

import java.sql.{Date, DriverManager}
import java.text.SimpleDateFormat
import org.apache.spark.{SparkConf, SparkContext}
object demo04 {
  def main(args: Array[String]): Unit = {
    // 定义配置文件
    val conf = new SparkConf().setMaster("local[*]").setAppName("mysql")
    // 根据配置文件 获取上下文对象
    val sc = new SparkContext(conf)
    // 定义要添加的数据
    var datas = sc.parallelize(List(("安荷", "1998/2/7", "女", "江苏省"), ("白秋", "2000/3/7", "女", "天津市"), ("雪莲", "1998/6/7", "女", "湖北省"), ("宾白", "1999/7/3", "男", "河北省"), ("宾实", "2000/8/7", "男", "河北省"), ("斌斌", "1998/3/7", "男", "江苏省")))
    // 实例连接
    val conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata0407?characterEncoding=UTF-8", "root", "root")
     // 循环遍历数据写入
    datas.toLocalIterator.foreach(x => {
      // 定义sql 添加语句
      val sql = "insert into `user` (id, username, birthday, sex, address) values (null, ? , ? , ? , ?);"
      val ps = conn.prepareStatement(sql)
      //数据复制
      ps.setString(1, x._1)
      ps.setDate(2, Date.valueOf(x._2.replaceAll("/", "-")))
      ps.setString(3, x._3)
      ps.setString(4, x._4)
      // 数据提交
      ps.execute()
    })
    // 关闭资源
    conn.close()
  }
}

向MySQL读取数据

import java.sql.{Connection, DriverManager}
import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{SparkConf, SparkContext}
object demo05 {
  def main(args: Array[String]): Unit = {
    // 定义配置文件
    val conf = new SparkConf().setAppName("mysql").setMaster("local[*]")
    // 通过配置文件实例spark上下文对象
    val sc = new SparkContext(conf)
    // 定义连接器
    def getCon(): Connection = {
      DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata0407?characterEncoding=UTF-8", "root", "root")
    }
    //定义SQL 用来读取数据 username, birthday, sex, address
    var sql = "select * from `user` where ? <= id and id <= ?;"
    // 实例 JdbcRDD 进行读取数据
    new JdbcRDD(sc, getCon, sql, 0, 100, 3, re => {
      // 读取姓名
      val username = re.getString("username")
      // 读取日期
      val birthday = re.getDate("birthday")
      // 读取性别
      val sex = re.getString("sex")
      // 读取地址
      val address = re.getString("address")
      // 将结果输出
      println(s"${username} ${birthday} ${sex} ${address}")
    }).collect() // 结果收集
  }
}


版权声明:本文为qq_43791724原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。