SparkSQL 读写数据(十四)

Spark SQL 读写数据详解



开始啦,和SparkCore不一样,现在我们现总结一下Spark SQL读写数据。

在这里插入图片描述

一、前奏

SparkSession读取数据,可以直接使用spark.read.csv(“path”),也可以使用spark.read.format(“csv”).load(“path”)

    spark.read.csv("data.csv")
    spark.read.format("csv").load("data.csv")

read:创建一个DataFrame的加载数据对象
csv:直接读取csv文件。saprksql提供了部分直接读取的数据源。
format:要读取的数据文件类型
load:加载数据
有人疑问,为啥有format的存在,我们直接读不就好了吗?那是因为官方仅仅提供了部分文件类型api。所以要加载不存在api的文件类型时候,需要用到format(file_type).load(path)。

二、读写数据

SparkSQL默认的读写数据文件格式parquet,当我们没有指定要写的文件类型时候,SparkSQL会以parquet形式读写。如下两种方式效果一样:
\

读数据:

    val data: DataFrame = spark.read.option("header","true").format("parquet").load("data/parquet_file")
    val data1: DataFrame = spark.read.option("header","true").load("data/parquet_file")

写数据:

    data.write.format("parquet").save("data/parquet_file")
    data.write.save("data/parquet_file")

选项设置

在读写中,我们还可以添加一下选项设置:

option("key","values") // 详细见官网 https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html

贴别是对于csv文件的读写,设计option操作较多,常见的是:

option("header","true") //即保留原csv文件的列名

其他详见官网 :csv读写
强调一下:关于写文件,有一些插入模式需要注意:

Scala/JavaAny LanguageMeaning
SaveMode.ErrorIfExists (default)“error” or “errorifexists” (default)将DataFrame保存到数据源时,如果数据已经存在,预计会抛出异常。
SaveMode.Append“append”将 DataFrame 保存到数据源时,如果数据/表已存在,则希望将 DataFrame 的内容附加到现有数据中。
SaveMode.Overwrite“overwrite”覆盖模式是指在将DataFrame 保存到数据源时,如果data/table 已经存在,现有的数据会被DataFrame 的内容覆盖。
SaveMode.Ignore“ignore”忽略模式是指在将DataFrame 保存到数据源时,如果数据已经存在,则保存操作预计不会保存DataFrame 的内容,也不会更改现有数据。这类似于 SQL 中的 CREATE TABLE IF NOT EXISTS。

关于保存到持久表,也可以使用 saveAsTable 命令将数据帧作为持久表保存到 Hive 元存储中。请注意,使用此功能不需要现有的 Hive 部署。 Spark 将为您创建一个默认的本地 Hive 元存储(使用 Derby)。与 createOrReplaceTempView 命令不同,saveAsTable 将具体化 DataFrame 的内容并创建一个指向 Hive 元存储中数据的指针。即使您的 Spark 程序重新启动,持久表仍然存在,只要您保持与同一个元存储的连接。可以通过使用表的名称调用 SparkSession 上的 table 方法来创建持久表的 DataFrame。对于基于文件的数据源,例如text、parquet、json 等,您可以通过 path 选项指定自定义表路径,例如df.write.option(“path”, “/some/path”).saveAsTable(“t”).删除表时,不会删除自定义表路径,表数据仍然存在。如果未指定自定义表路径,Spark 会将数据写入到仓库目录下的默认表路径中。当表被删除时,默认的表路径也将被删除。

分桶、排序和分区

对于基于文件的数据源,还可以对输出进行存储分区和排序或分区。分桶和排序仅适用于持久表。

peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")

当使用数据集 API 时,分区可以与 save 和 saveAsTable 一起使用。

usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")

可以对单个表同时使用分区和分桶:

usersDF
  .write
  .partitionBy("favorite_color")
  .bucketBy(42, "name")
  .saveAsTable("users_partitioned_bucketed")

partitionBy 创建一个目录结构,如 Partition Discovery 部分所述。因此,它对具有高基数的列的适用性有限。相比之下,bucketBy 将数据分布在固定数量的桶中,并且可以在唯一值的数量不受限制时使用。bucketBy的基本原理比较好理解,它会根据你指定的列(可以是一个也可以是多个)计算哈希值,然后具有相同哈希值的数据将会被分到相同的分区。
在这里插入图片描述
目前在使用 bucketBy 的时候,必须和 sortBy,saveAsTable 一起使用,如下。这个操作其实是将数据保存到了文件中(如果不指定path,也会保存到一个临时目录中)。


版权声明:本文为weixin_45025143原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。