Spark SQL 读写数据详解
开始啦,和SparkCore不一样,现在我们现总结一下Spark SQL读写数据。

一、前奏
SparkSession读取数据,可以直接使用spark.read.csv(“path”),也可以使用spark.read.format(“csv”).load(“path”)
spark.read.csv("data.csv")
spark.read.format("csv").load("data.csv")
read:创建一个DataFrame的加载数据对象
csv:直接读取csv文件。saprksql提供了部分直接读取的数据源。
format:要读取的数据文件类型
load:加载数据
有人疑问,为啥有format的存在,我们直接读不就好了吗?那是因为官方仅仅提供了部分文件类型api。所以要加载不存在api的文件类型时候,需要用到format(file_type).load(path)。
二、读写数据
SparkSQL默认的读写数据文件格式parquet,当我们没有指定要写的文件类型时候,SparkSQL会以parquet形式读写。如下两种方式效果一样:
\
读数据:
val data: DataFrame = spark.read.option("header","true").format("parquet").load("data/parquet_file")
val data1: DataFrame = spark.read.option("header","true").load("data/parquet_file")
写数据:
data.write.format("parquet").save("data/parquet_file")
data.write.save("data/parquet_file")
选项设置
在读写中,我们还可以添加一下选项设置:
option("key","values") // 详细见官网 https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html
贴别是对于csv文件的读写,设计option操作较多,常见的是:
option("header","true") //即保留原csv文件的列名
其他详见官网 :csv读写
强调一下:关于写文件,有一些插入模式需要注意:
| Scala/Java | Any Language | Meaning |
|---|---|---|
| SaveMode.ErrorIfExists (default) | “error” or “errorifexists” (default) | 将DataFrame保存到数据源时,如果数据已经存在,预计会抛出异常。 |
| SaveMode.Append | “append” | 将 DataFrame 保存到数据源时,如果数据/表已存在,则希望将 DataFrame 的内容附加到现有数据中。 |
| SaveMode.Overwrite | “overwrite” | 覆盖模式是指在将DataFrame 保存到数据源时,如果data/table 已经存在,现有的数据会被DataFrame 的内容覆盖。 |
| SaveMode.Ignore | “ignore” | 忽略模式是指在将DataFrame 保存到数据源时,如果数据已经存在,则保存操作预计不会保存DataFrame 的内容,也不会更改现有数据。这类似于 SQL 中的 CREATE TABLE IF NOT EXISTS。 |
关于保存到持久表,也可以使用 saveAsTable 命令将数据帧作为持久表保存到 Hive 元存储中。请注意,使用此功能不需要现有的 Hive 部署。 Spark 将为您创建一个默认的本地 Hive 元存储(使用 Derby)。与 createOrReplaceTempView 命令不同,saveAsTable 将具体化 DataFrame 的内容并创建一个指向 Hive 元存储中数据的指针。即使您的 Spark 程序重新启动,持久表仍然存在,只要您保持与同一个元存储的连接。可以通过使用表的名称调用 SparkSession 上的 table 方法来创建持久表的 DataFrame。对于基于文件的数据源,例如text、parquet、json 等,您可以通过 path 选项指定自定义表路径,例如df.write.option(“path”, “/some/path”).saveAsTable(“t”).删除表时,不会删除自定义表路径,表数据仍然存在。如果未指定自定义表路径,Spark 会将数据写入到仓库目录下的默认表路径中。当表被删除时,默认的表路径也将被删除。
分桶、排序和分区
对于基于文件的数据源,还可以对输出进行存储分区和排序或分区。分桶和排序仅适用于持久表。
peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
当使用数据集 API 时,分区可以与 save 和 saveAsTable 一起使用。
usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
可以对单个表同时使用分区和分桶:
usersDF
.write
.partitionBy("favorite_color")
.bucketBy(42, "name")
.saveAsTable("users_partitioned_bucketed")
partitionBy 创建一个目录结构,如 Partition Discovery 部分所述。因此,它对具有高基数的列的适用性有限。相比之下,bucketBy 将数据分布在固定数量的桶中,并且可以在唯一值的数量不受限制时使用。bucketBy的基本原理比较好理解,它会根据你指定的列(可以是一个也可以是多个)计算哈希值,然后具有相同哈希值的数据将会被分到相同的分区。
目前在使用 bucketBy 的时候,必须和 sortBy,saveAsTable 一起使用,如下。这个操作其实是将数据保存到了文件中(如果不指定path,也会保存到一个临时目录中)。