spark读取hdfs存入mysql_Spark学习笔记——读写HDFS

使用Spark读写HDFS中的parquet文件

文件夹中的parquet文件

20180110230317795465.png

build.sbt文件

name := "spark-hbase"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(

"org.apache.spark" %% "spark-core" % "2.1.0",

"mysql" % "mysql-connector-java" % "5.1.31",

"org.apache.spark" %% "spark-sql" % "2.1.0",

"org.apache.hbase" % "hbase-common" % "1.3.0",

"org.apache.hbase" % "hbase-client" % "1.3.0",

"org.apache.hbase" % "hbase-server" % "1.3.0",

"org.apache.hbase" % "hbase" % "1.2.1"

)

Scala实现方法

import org.apache.spark.{SparkConf, SparkContext}

import org.apache.spark.sql._

import java.util.Properties

import com.google.common.collect.Lists

import org.apache.spark.sql.types.{ArrayType, StringType, StructField, StructType}

import org.apache.hadoop.hbase.HBaseConfiguration

import org.apache.hadoop.hbase.client.{Result, Scan}

import org.apache.hadoop.hbase.io.ImmutableBytesWritable

import org.apache.hadoop.hbase.mapreduce.TableInputFormat

/**

* Created by mi on 17-4-11.

*/

case class resultset(name: String,

info: String,

summary: String)

case class IntroItem(name: String, value: String)

case class BaikeLocation(name: String,

url: String = "",

info: Seq[IntroItem] = Seq(),

summary: Option[String] = None)

case class MewBaikeLocation(name: String,

url: String = "",

info: Option[String] = None,

summary: Option[String] = None)

object MysqlOpt {

def main(args: Array[String]): Unit = {

val conf = new SparkConf().setAppName("WordCount").setMaster("local")

val sc = new SparkContext(conf)

val sqlContext = new SQLContext(sc)

import sqlContext.implicits._

//定义数据库和表信息

val url = "jdbc:mysql://localhost:3306/baidubaike?useUnicode=true&characterEncoding=UTF-8"

val table = "baike_pages"

//读取parquetFile,并写入Mysql

val sparkSession = SparkSession.builder()

.master("local")

.appName("spark session example")

.getOrCreate()

val parquetDF = sparkSession.read.parquet("/home/mi/coding/coding/baikeshow_data/baikeshow")

// parquetDF.collect().take(20).foreach(println)

//parquetDF.show()

//BaikeLocation是读取的parquet文件中的case class

val ds = parquetDF.as[BaikeLocation].map { line =>

//把info转换为新的case class中的类型String

val info = line.info.map(item => item.name + ":" + item.value).mkString(",")

//注意需要把字段放在一个case class中,不然会丢失列信息

MewBaikeLocation(name = line.name, url = line.url, info = Some(info), summary = line.summary)

}.cache()

ds.show()

// ds.take(2).foreach(println)

//写入Mysql

// val prop = new Properties()

// prop.setProperty("user", "root")

// prop.setProperty("password", "123456")

// ds.write.mode(SaveMode.Append).jdbc(url, "baike_location", prop)

//写入parquetFile

ds.repartition(10).write.parquet("/home/mi/coding/coding/baikeshow_data/baikeshow1")

}

}


版权声明:本文为weixin_29271053原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。