SparkRDD算子(四) Action操作take, collect, count, countByValue, reduce, fold,top,countByKey,collectAsMap

Spark RDD算子(四)基本的Action操作 first, take, collect, count, countByValue, reduce, fold,top

1. first,take, collect, count,countByValue, reduce, fold,top

scala版本

object ActionRDDScala {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("ActionRDDScala").setMaster("local")
    val sc = new SparkContext(conf)

    val rdd = sc.parallelize(List(1,2,3,3))

   //返回第一个元素 
    println(rdd.first())
    println("----------------------")

   //rdd.take(n)返回前n个元素 
    rdd.take(2).foreach(println)
    println("----------------------")

   //rdd.collect() 返回 RDD 中的所有元素 
    rdd.collect.foreach(println)
    println("----------------------")

   //rdd.count() 返回 RDD 中的元素个数 
    println(rdd.count())
    println("----------------------")


	//各元素在 RDD 中出现的次数 返回{(key1,次数),(key2,次数),…(keyn,次数)} 
    rdd.countByValue().foreach(println)
    println("----------------------")


	//rdd.reduce(func) 
	//并行整合RDD中所有数据, 类似于是scala中集合的reduce 
    println(rdd.reduce(_ + _))
    println("----------------------")

	//rdd.fold(num)(func) 一般不用这个函数 
	//和 reduce() 一 样, 但是提供了初始值num,每个元素计算时,先要合这个初始值进行折叠, 注意,这里会按照每个分区进行fold,然后分区之间还会再次进行fold 
	//提供初始值 
    println(rdd.fold(1)(_ + _))
    println("----------------------")

	//rdd.top(n) 
	//按照降序的或者指定的排序规则,返回前n个元素 
    rdd.top(2).foreach(println)
    println("----------------------")
	
	//rdd.take(n) 
	//对RDD元素进行升序排序,取出前n个元素并返回,也可以自定义比较器(这里不介绍),类似于top的相反的方法 
    rdd.takeOrdered(2)
    println("----------------------")

	//对 RDD 中的每个元素使用给 
	//定的函数 
    rdd.foreach(println)
    println("----------------------")
  }
}

java版本

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;
import java.util.Arrays;


public class ActionRDDJava {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("ActionRDDJava").setMaster("local[3]");
        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 3));

        Integer firstRDD = rdd.first();
        System.out.println(firstRDD);
        System.out.println("------------------");

        System.out.println(rdd.take(2));
        System.out.println("------------------");

        System.out.println(rdd.collect());
        System.out.println("------------------");

        System.out.println(rdd.count());
        System.out.println("------------------");

        System.out.println(rdd.countByValue());
        System.out.println("------------------");

        Integer reduceRDD = rdd.reduce(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });
        System.out.println(reduceRDD);
        System.out.println("------------------");

        Integer foldRDD = rdd.fold(1,new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });
        System.out.println(foldRDD);
        System.out.println("------------------");

        System.out.println(rdd.top(2));
        System.out.println("------------------");

        System.out.println(rdd.takeOrdered(2));
        System.out.println("------------------");

        rdd.foreach(new VoidFunction<Integer>() {
            @Override
            public void call(Integer integer) throws Exception {
                System.out.print(integer);
            }
        });
    }
}

2. countByKey, collectAsMap

scala版本

object ActionPairRDDScala {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("ActionPairRDDScala")
    val sc = new SparkContext(conf)

    val rdd = sc.parallelize(Array((1,2),(2,4),(2,5),(3,4),(3,5),(3,6)))
    println(rdd.countByKey())
    println(rdd.collectAsMap())
  }
}

java版本

import com.google.common.collect.Lists;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Map;


public class ActionPairRDDJava {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local").setAppName("ActionPairRDDJava");
        JavaSparkContext sc = new JavaSparkContext(conf);

        //一步到位JavaPairRDD
        JavaPairRDD<Integer,Integer> tupRDD = sc.parallelizePairs(Lists.newArrayList(new Tuple2<Integer,Integer>(1, 2),
                new Tuple2<Integer,Integer>(2, 4),
                new Tuple2<Integer,Integer>(2, 5),
                new Tuple2<Integer,Integer>(3, 4),
                new Tuple2<Integer,Integer>(3, 5),
                new Tuple2<Integer,Integer>(3, 6)
        ));

//        JavaPairRDD<Integer, Integer> mapRDD = JavaPairRDD.fromJavaRDD(tupRDD);   //不好用

        //countByKey
        Map<Integer, Long> countByKeyRDD =tupRDD.countByKey();
        for (Integer key :
                countByKeyRDD.keySet()) {
            System.out.println(key+","+countByKeyRDD.get(key));
        }
        System.out.println("------------------------");

        //collectAsMap 打印输出方式一
        Map<Integer, Integer> collectRDD = tupRDD.collectAsMap();
        for (Map.Entry<Integer,Integer> entry :
                collectRDD.entrySet()) {
            System.out.println(entry.getKey()+"->"+entry.getValue());
        }

        //collectAsMap  打印输出方式二
        System.out.println("-----------------");
        Map<Integer, Integer> collectRDD1 = tupRDD.collectAsMap();
        for (Integer key :
                collectRDD1.keySet()) {
            System.out.println(key+","+collectRDD1.get(key));
        }
    }
}

版权声明:本文为weixin_43434273原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。