spark从mysql读取数据_Spark读取数据库(Mysql)的四种方式讲解

现在Spark支撑四种办法从数据库中读取数据,这里以Mysql为例进行介绍。

一、不指定查询条件

这个办法连接MySql的函数原型是:

def jdbc(url: String, table: String, properties: Properties): DataFrame

咱们只需求供给Driver的url,需求查询的表名,以及连接表有关特点properties。下面是详细比如:

val url = "jdbc:mysql://www.iteblog.com:3306/iteblog?user=iteblog&password=iteblog"

val prop = new Properties()

val df = sqlContext.read.jdbc(url, "iteblog", prop )

println(df.count())

println(df.rdd.partitions.size)

咱们运转上面的程序,能够看到df.rdd.partitions.size输出成果是1,这个成果的意义是iteblog表的所有数据都是由RDD的一个分区处理的,所以说,假如你这个表很大,很可能会呈现OOM

WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 14, spark047219):

java.lang.OutOfMemoryError: GC overhead limit exceeded at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3380)

这种办法在数据量大的时分不主张运用。

二、指定数据库字段的规模

这种办法即是经过指定数据库中某个字段的规模,可是惋惜的是,这个字段有必要是数字,来看看这个函数的函数原型:

def jdbc(

url: String,

table: String,

columnName: String,

lowerBound: Long,

upperBound: Long,

numPartitions: Int,

connectionProperties: Properties): DataFrame

前两个字段的意义和办法一相似。columnName即是需求分区的字段,这个字段在数据库中的类型有必要是数字;lowerBound即是分区的下界;upperBound即是分区的上界;numPartitions是分区的个数。同样,咱们也来看看怎么运用:

val lowerBound = 1

val upperBound = 100000

val numPartitions = 5

val url = "jdbc:mysql://www.iteblog.com:3306/iteblog?user=iteblog&password=iteblog"

val prop = new Properties()

val df = sqlContext.read.jdbc(url, "iteblog", "id", lowerBound, upperBound, numPartitions, prop)

这个办法能够将iteblog表的数据散布到RDD的几个分区中,分区的数量由numPartitions参数决议,在抱负情况下,每个分区处理一样数量的数据,咱们在运用的时分不主张将这个值设置的比较大,由于这可能致使数据库挂掉!可是根据前面介绍,这个函数的缺陷即是只能运用整形数据字段作为分区关键词。

这个函数在极端情况下,也即是设置将numPartitions设置为1,其意义和第一种办法共同。

三、根据恣意字段进行分区

根据前面两种办法的约束,Spark还供给了根据恣意字段进行分区的办法,函数原型如下:

def jdbc(

url: String,

table: String,

predicates: Array[String],

connectionProperties: Properties): DataFrame

这个函数相比第一种办法多了predicates参数,咱们能够经过这个参数设置分区的根据,来看看比如:

val predicates = Array[String]("reportDate <= '2014-12-31'",

"reportDate > '2014-12-31' and reportDate <= '2015-12-31'")

val url = "jdbc:mysql://www.iteblog.com:3306/iteblog?user=iteblog&password=iteblog"

val prop = new Properties()

val df = sqlContext.read.jdbc(url, "iteblog", predicates, prop)

最终rdd的分区数量就等于predicates.length。

四、经过load获取

Spark还供给经过load的办法来读取数据。

sqlContext.read.format("jdbc").options(

Map("url" -> "jdbc:mysql://www.iteblog.com:3306/iteblog?user=iteblog&password=iteblog",

"dbtable" -> "iteblog")).load()

options函数支撑url、driver、dbtable、partitionColumn、lowerBound、upperBound以及numPartitions选项,仔细的同学必定发现这个和办法二的参数共同。是的,其内部完成原理部分和办法二大体共同。一起load办法还支撑json、orc等数据源的读取

(责任编辑:最模板)


版权声明:本文为weixin_39844880原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。