Spark06:【案例】创建RDD:使用集合创建RDD、使用本地文件和HDFS文件创建RDD

一、创建RDD

RDD是Spark编程的核心,在进行Spark编程时,首要任务是创建一个初始的RDD
这样就相当于设置了Spark应用程序的输入源数据
然后在创建了初始的RDD之后,才可以通过Spark 提供的一些高阶函数,对这个RDD进行操作,来获取其它的RDD

Spark提供三种创建RDD方式:集合、本地文件、HDFS文件

1、使用程序中的集合创建RDD,主要用于进行测试,可以在实际部署到集群运行之前,自己使用集合构造一些测试数据,来测试后面的spark应用程序的流程。
2、使用本地文件创建RDD,主要用于临时性地处理一些存储了大量数据的文件
3、使用HDFS文件创建RDD,是最常用的生产环境的处理方式,主要可以针对HDFS上存储的数据,进行离线批处理操作。

二、使用集合创建RDD

首先来看一下如何使用集合创建RDD
如果要通过集合来创建RDD,需要针对程序中的集合,调用SparkContext的parallelize()方法。Spark会将集合中的数据拷贝到集群上,形成一个分布式的数据集合,也就是一个RDD。
相当于,集合中的部分数据会到一个节点上,而另一部分数据会到其它节点上。然后就可以用并行的方式来操作这个分布式数据集合了。

调用parallelize()时,有一个重要的参数可以指定,就是将集合切分成多少个partition。
Spark会为每一个partition运行一个task来进行处理。
Spark默认会根据集群的配置来设置partition的数量。我们也可以在调用parallelize()方法时,传入第二个参数,来设置RDD的partition数量,例如:parallelize(arr, 5)

1、scala代码如下:

package com.imooc.scala

import org.apache.spark.{SparkConf, SparkContext}

/**
  * Scala使用集合创建RDD
  */
object CreateRddByArrayScala {
  def main(args: Array[String]): Unit = {
    //创建SparkContext
    val conf = new SparkConf()

    conf.setAppName("CreateRddByArrayScala")//local表示在本地执行
        .setMaster("local")//local表示在本地执行

    val sc = new SparkContext(conf)
    //创建集合
    val arr = Array(1,2,3,4,5)
    //基于集合创建RDD
    val rdd = sc.parallelize(arr)

    val sum = rdd.reduce(_ + _)

    print(sum)
    //停止SparkContext
    sc.stop()

  }
}

在这里插入图片描述

2、java代码如下:

package com.imooc.java;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;

import java.util.Arrays;
import java.util.List;

/**
 * Java使用集合创建RDD
 */
public class CreateRddByArrayJava {
    public static void main(String[] args) {
        //创建SparkContext
        SparkConf conf = new SparkConf();
        conf.setAppName("CreateRddByArrayJava")
                .setMaster("local");

        JavaSparkContext sc = new JavaSparkContext(conf);

        //创建集合
        List<Integer> arr = Arrays.asList(1,2,3,4,5);

        JavaRDD<Integer> rdd = sc.parallelize(arr);

        Integer sum = rdd.reduce(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer i1, Integer i2) throws Exception {
                return i1 + i2;
            }
        });

        System.out.println(sum);
        //停止sparkContext
        sc.stop();

    }
}

在这里插入图片描述

三、使用本地文件和HDFS文件创建RDD

下面我们来看一下使用本地文件和HDFS文件创建RDD
通过SparkContext的textFile()方法,可以针对本地文件或HDFS文件创建RDD,RDD中的每个元素就是文件中的一行文本内容

textFile()方法支持针对目录、压缩文件以及通配符创建RDD

Spark默认会为HDFS文件的每一个Block创建一个partition,也可以通过textFile()的第二个参数手动设置分区数量,只能比Block数量多,不能比Block数量少,比Block数量少的话你的设置是不生效的

1、scala代码如下:

package com.imooc.scala

import org.apache.spark.{SparkConf, SparkContext}

/**
  * 需求:通过文件创建RDD
  *   1:本地文件
  *   2:HDFS文件
  */
object CreateRddByFileScala {
  def main(args: Array[String]): Unit = {
    //创建SparkContext
    val conf = new SparkConf()
    conf.setAppName("CreateRddByFileScala")//设置任务名称
      .setMaster("local")//local表示在本地执行

    val sc = new SparkContext(conf)

    val path = "D:\\hello.txt"
    //path = "hdfs://bigdata01:9000/test/hello.txt"
    //读取文件数据,可以在textFile中指定生成的RDD的分区数量
    val rdd = sc.textFile(path,2)
    //获取每一行数据的长度,计算文件内数据的总长度
    val length = rdd.map(_.length).reduce(_+_)

    println(length)

    sc.stop()

  }
}

在这里插入图片描述

2、java代码如下:

package com.imooc.java;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;


/**
 * 需求:通过文件创建RDD
 *   1:本地文件
 *   2:HDFS文件
 */
public class CreateRddByFileJava {
    public static void main(String[] args) {
        //创建SparkContext
        SparkConf conf = new SparkConf();
        conf.setAppName("CreateRddByFileJava")
                .setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);

        String path = "D:\\hello.txt";
        //path = "hdfs://bigdata01:9000/test/hello.txt";
        //读取文件数据,可以在textFile中指定生成的RDD的分区数量
        JavaRDD<String> rdd = sc.textFile(path,2);
        //获取每一行数据的长度
        JavaRDD<Integer> lengthRDD = rdd.map(new Function<String, Integer>() {
            @Override
            public Integer call(String line) throws Exception {
                return line.length();
            }
        });
        //计算文件内数据的总长度
        Integer length = lengthRDD.reduce(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });

        System.out.println(length);

        sc.stop();
    }
}

在这里插入图片描述


版权声明:本文为weixin_40612128原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。