spark常用的linux命令,Spark基本命令 - Tsmilk的个人空间 - OSCHINA - 中文开源技术交流社区...

一、spark所在目录

cd usr/local/spark

a415383455814d4a0f2067e7c262f00c.png

二、启动spark

/usr/local/spark/sbin/start-all.sh

9a87664359be64dd70159e3fb172208f.png

启动Hadoop以及Spark:

bash ./starths.sh

da0ae90a27ce30fa8070708667838360.png

浏览器查看:

172.16.31.17:8080

152a676957fdff5dd9e98c0ae2340f92.png

停止Hadoop以及Spark

bash ./stophs.sh

三、基础使用

1.运行Spark示例(SparkPi)

在 ./examples/src/main 目录下有一些 Spark 的示例程序,有 Scala、Java、Python、R 等语言的版本。

c6e684c64d6a50cc1c1e094d16910d23.png

我们可以先运行一个示例程序 SparkPi(即计算 π 的近似值),执行如下命令:

./bin/run-example SparkPi 2>&1 | grep "Pi is roughly"

35c121880ce0e8852f0899b416ecbe80.png

执行时会输出非常多的运行信息,输出结果不容易找到,可以通过 grep 命令进行过滤(命令中的 2>&1 可以将所有的信息都输出到 stdout 中,否则由于输出日志的性质,还是会输出到屏幕中)

Python 版本的 SparkPi 则需要通过 spark-submit 运行:

./bin/spark-submit examples/src/main/python/pi.py

f71a4407eec5d782291ed3e045a206fc.png

2.通过Spark-shell进行交互分析

Spark Shell 支持 Scala 和 Python

Scala 运行于 Java 平台(JVM,Java 虚拟机),并兼容现有的 Java 程序。

Scala 是 Spark 的主要编程语言,如果仅仅是写 Spark 应用,并非一定要用 Scala,用 Java、Python 都是可以的。 使用 Scala 的优势是开发效率更高,代码更精简,并且可以通过 Spark Shell 进行交互式实时查询,方便排查问题。

2.1 启动Spark Shell

./bin/spark-shell

a0b2010c212fdb8004f605bcfa5da3c2.png

2.2 基础操作

Spark 的主要抽象是分布式的元素集合(distributed collection of items),称为RDD(Resilient Distributed Dataset,弹性分布式数据集),它可被分发到集群各个节点上,进行并行操作。RDDs 可以通过 Hadoop InputFormats 创建(如 HDFS),或者从其他 RDDs 转化而来。

2.2.1 RDD创建方式

参考链接:

(1)使用本地文件创建RDD:

val textFile = sc.textFile("file:///usr/local/spark/README.md")

bc4eb2ada6c9314e9a15de63403d0026.png

代码中通过 “file://” 前缀指定读取本地文件。

(2)使用hdfs文件创建RDD:

val textfile = sc.textFile("/winnie/htest/test01.txt")

7678ef0197e92afb7ddfa7d617245f3f.png

Spark shell 默认是读取 HDFS 中的文件,需要先上传文件到 HDFS 中,否则会有“org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://localhost:9000/user/local/spark/README.md”的错误。

74a1d6237ab668d34a5eaca0180b0d00.png

(3)使用集合创建RDD:

从集合中创建RDD主要有下面两个方法:makeRDD 和 parallelize

makeRDD示例:

sc.makeRDD(1 to 50)

8baf3aa7ec21d57bccc46b2d41eef013.png

sc.makeRDD(Array("1", "2", "3"))

0a1395dd55833813773f3ece7726921f.png

sc.makeRDD(List(1,3,5))

b42a6c4d58988638b985f7f1cfdb7c3e.png

parallelize示例:

同上

2.2.2 RDD两种操作

(1)RDDs支持两种类型的操作:

actions: 执行操作,在数据集上运行计算后返回值

transformations: 转化操作,从现有数据集创建一个新的数据集

转化操作并不会立即执行,而是到了执行操作才会被执行

转化操作:

map() 参数是函数,函数应用于RDD每一个元素,返回值是新的RDD

flatMap() 参数是函数,函数应用于RDD每一个元素,将元素数据进行拆分,变成迭代器,返回值是新的RDD

filter() 参数是函数,函数会过滤掉不符合条件的元素,返回值是新的RDD

distinct() 没有参数,将RDD里的元素进行去重操作

union() 参数是RDD,生成包含两个RDD所有元素的新RDD

intersection() 参数是RDD,求出两个RDD的共同元素

subtract() 参数是RDD,将原RDD里和参数RDD里相同的元素去掉

cartesian() 参数是RDD,求两个RDD的笛卡儿积

行动操作:

collect() 返回RDD所有元素

count() RDD里元素个数,对于文本文件,为总行数

first()RRD中的第一个item,对于文本文件,为首行内容

countByValue() 各元素在RDD中出现次数

reduce() 并行整合所有RDD数据,例如求和操作

fold(0)(func) 和reduce功能一样,不过fold带有初始值

aggregate(0)(seqOp,combop) 和reduce功能一样,但是返回的RDD数据类型和原RDD不一样

foreach(func) 对RDD每个元素都是使用特定函数

行动操作每次的调用是不会存储前面的计算结果的,若想要存储前面的操作结果,要把需要缓存中间结果的RDD调用cache(),cache()方法是把中间结果缓存到内存中,也可以指定缓存到磁盘中(也可以只用persisit())

(2)实例

textFile.count()

textFile.first()

e96d07021f3d3a78f605c88d06e3e1db.png

val linesWithSpark = textFile.filter(line => line.contains("Spark")) // 筛选出包含 Spark 的行

linesWithSpark.count() // 统计行数

7a572227c43ae065fb55026ff6a1f407.png

action 和 transformation 可以用链式操作的方式结合使用,使代码更为简洁:

textFile.filter(line => line.contains("Spark")).count()

找到包含单词最多的那一行内容共有几个单词:

textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)

e824ef1908d91c98cdd61ca1617f07ad.png

代码首先将每一行内容 map 为一个整数,这将创建一个新的 RDD,并在这个 RDD 中执行 reduce 操作,找到最大的数。map()、reduce() 中的参数是 Scala 的函数字面量(function literals,也称为闭包 closures),并且可以使用语言特征或 Scala/Java 的库。

Hadoop MapReduce 是常见的数据流模式,在 Spark 中同样可以实现(下面这个例子也就是 WordCount):

val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b) // 实现单词统计

wordCounts.collect() // 输出单词统计结果

053c4d400c30b5f28bccedb15a0f6cae.png

2.2.3 Spark shell退出

:quit

3.Spark SQL 和 DataFrames

Spark SQL 是 Spark 内嵌的模块,用于结构化数据。

在 Spark 程序中可以使用 SQL 查询语句或 DataFrame API。

DataFrames 和 SQL 提供了通用的方式来连接多种数据源,支持 Hive、Avro、Parquet、ORC、JSON、和 JDBC,并且可以在多种数据源之间执行 join 操作。

3.1 Spark SQL 基本操作

Spark SQL 的功能是通过 SQLContext 类来使用的,而创建 SQLContext 是通过 SparkContext 创建的。

在 Spark shell 启动时,输出日志的最后有这么几条信息:

2f9ab086b797ad4c97e340c176c1347c.png

这些信息表明 SparkContent 和 SQLContext 都已经初始化好了,可通过对应的 sc、sqlContext 变量直接进行访问。

使用 SQLContext 可以从现有的 RDD 或数据源创建 DataFrames。

通过 Spark 提供的 JSON 格式的数据源文件 ./examples/src/main/resources/people.json 来进行演示,该数据源内容如下:

bcee23abc211343eb1ee2ab717140d3e.png

【补充:xshell上传文件】

rz

【补充:xshell下载文件】

sz