IDEA中实现 Spark WordCount(包含Maven中增加scala)

1、新建maven工程

步骤:File–>New–>Project–>Maven
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

2、下载jdk,junit,Spark相关的jar包

jdk,junit修改版本号
jdk,junit的相关信息原本就有,根据实际修改版本号即可,我用的是jdk1.8和junit4.12
在这里插入图片描述
在此提供jdk,junit的相关信息,方便复制

<properties>
  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  <maven.compiler.source>1.8</maven.compiler.source>
  <maven.compiler.target>1.8</maven.compiler.target>
</properties>

<dependencies>
  <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.12</version>
    <scope>test</scope>
  </dependency>
</dependencies>

Spark的jar包信息百度搜索复制
Spark的jar包信息怎么来?也就是这个中间的信息怎么来,不用死记硬背,百度搜索。
百度搜索maven,进maven repository:maven,搜索Spark,按需要点进去,找到对应的版本号复制即可。
该网站比较难进,在此提供一下网址,
网址:https://mvnrepository.com/artifact/org.apache.spark
在此演示scala的jar包查找,Spark和log4j自己查找复制。
我用的是2.6.0版本,在此演示一下。
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在此提供一下所需依赖包信息:

	<dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>2.11.8</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>2.1.1</version>
    </dependency>

 	<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>2.1.1</version>
    </dependency>

    <dependency>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
      <version>1.2.17</version>
    </dependency>

    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>1.7.21</version>
    </dependency>

右下角点击下载jar包,Enable Auto-Import是自动下载,随便点哪个都可以~
在这里插入图片描述
以上下载好之后,我们需要干活了,先new一个scala吧。
我们发现,src下任何一个文件夹没办法new出scala,需要我们设置。
在这里插入图片描述
在这里插入图片描述

在main文件夹下新建一个scala目录(目录名随便起哈),与java平级,并将scala设置为Sources root。

(说明一下Sources root的用处:通过这个类指定一个文件夹,你告诉IntelliJ IDEA,这个文件夹及其子文件夹中包含的源代码,可以编译为构建过程的一部分。)
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
进入File–>Project Structure–>Libraries–>±->Scala SDK–>选中–>OK–>OK–>OK(一直Ok就行了)
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
另一种Maven中新增scala的方法:工程名右击–>AddFramework Support–>选择Scala即可。

3、实现wordcount

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
  def main(args: Array[String]): Unit = {
    val conf:SparkConf=new SparkConf().setMaster("local[2]").setAppName("wordcount")
    val sc:SparkContext = SparkContext.getOrCreate(conf)
    val rdd1:RDD[String]=sc.parallelize(List("hello world","hello java","hello scala"))
    rdd1.flatMap(x=>x.split(" ")).map((_,1)).reduceByKey(_+_).collect.foreach(println)
  }
}

结果如下:
在这里插入图片描述
日志也一并打出来了,看着很累赘,可以将日志部分去掉:

  • 步骤1:建一个与src平级的目录resource,将其设置为资源文件夹(设置方式:右击–>Mark Directory as–>source root)
  • 步骤2:找到log4j(全称:log4j-defaults.properties),把它复制到刚刚新建的resource资源文件夹中,改名为log4j.properties(必须要改)。
    log4j的位置:
    External Libraries–>maven:org.apache.spark:spark-core_2.11:2.1.1–>spark-core_2.11-2.1.1.jar–>org–>apache.spark–>log4j-defaults.properties。
    在这里插入图片描述
  • 步骤3:将log4j文件第一行的INFO修改为ERROR。
    在这里插入图片描述
    这个时候再运行WordCount,结果如下:
    在这里插入图片描述

4、RDD两种创建方式

1、使用集合创建RDD
2、加载文件创建RDD(包括加载本地文件和加载HDFS文件)

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object WordCount {
  def main(args: Array[String]): Unit = {
    val conf:SparkConf=new SparkConf().setMaster("local[2]").setAppName("wordcount")
    val sc:SparkContext=SparkContext.getOrCreate(conf)
    //使用集合创建RDD
    val rdd1:RDD[String]=sc.parallelize(List("hello world","hello java","hello scala"),3) //第二个参数设置的是分区,不设置默认是上面的local[2]
   //sc.makeRDD(List("hello world","hello java","hello scala"),3)
    rdd1.flatMap(x=>x.split(" ")).map((_,1)).reduceByKey(_+_).collect.foreach(println)
   val partitions = rdd1.partitions
    println("rdd分区:"+partitions.length)  //可以试着打印出分区数

    //通过加载本地文件产生RDD
    println("--------------------------------------")
    val lines:RDD[String] = sc.textFile("D:\\test\\a.txt ") //本地目录
    lines.collect().foreach(println)
    //通过加载HDFS文件产生RDD
    println("--------------------------------------")
    val linesHDFS:RDD[String] = sc.textFile("hdfs://hadoop20:9000/wctest/a.txt") //HDFS目录
    linesHDFS.collect().foreach(println)
  }
}

版权声明:本文为weixin_48185778原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。