Spark RDD算子(四)基本的Action操作 first, take, collect, count, countByValue, reduce, fold,top
1. first,take, collect, count,countByValue, reduce, fold,top
scala版本
object ActionRDDScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("ActionRDDScala").setMaster("local")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(List(1,2,3,3))
//返回第一个元素
println(rdd.first())
println("----------------------")
//rdd.take(n)返回前n个元素
rdd.take(2).foreach(println)
println("----------------------")
//rdd.collect() 返回 RDD 中的所有元素
rdd.collect.foreach(println)
println("----------------------")
//rdd.count() 返回 RDD 中的元素个数
println(rdd.count())
println("----------------------")
//各元素在 RDD 中出现的次数 返回{(key1,次数),(key2,次数),…(keyn,次数)}
rdd.countByValue().foreach(println)
println("----------------------")
//rdd.reduce(func)
//并行整合RDD中所有数据, 类似于是scala中集合的reduce
println(rdd.reduce(_ + _))
println("----------------------")
//rdd.fold(num)(func) 一般不用这个函数
//和 reduce() 一 样, 但是提供了初始值num,每个元素计算时,先要合这个初始值进行折叠, 注意,这里会按照每个分区进行fold,然后分区之间还会再次进行fold
//提供初始值
println(rdd.fold(1)(_ + _))
println("----------------------")
//rdd.top(n)
//按照降序的或者指定的排序规则,返回前n个元素
rdd.top(2).foreach(println)
println("----------------------")
//rdd.take(n)
//对RDD元素进行升序排序,取出前n个元素并返回,也可以自定义比较器(这里不介绍),类似于top的相反的方法
rdd.takeOrdered(2)
println("----------------------")
//对 RDD 中的每个元素使用给
//定的函数
rdd.foreach(println)
println("----------------------")
}
}
java版本
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;
import java.util.Arrays;
public class ActionRDDJava {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("ActionRDDJava").setMaster("local[3]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 3));
Integer firstRDD = rdd.first();
System.out.println(firstRDD);
System.out.println("------------------");
System.out.println(rdd.take(2));
System.out.println("------------------");
System.out.println(rdd.collect());
System.out.println("------------------");
System.out.println(rdd.count());
System.out.println("------------------");
System.out.println(rdd.countByValue());
System.out.println("------------------");
Integer reduceRDD = rdd.reduce(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
System.out.println(reduceRDD);
System.out.println("------------------");
Integer foldRDD = rdd.fold(1,new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
System.out.println(foldRDD);
System.out.println("------------------");
System.out.println(rdd.top(2));
System.out.println("------------------");
System.out.println(rdd.takeOrdered(2));
System.out.println("------------------");
rdd.foreach(new VoidFunction<Integer>() {
@Override
public void call(Integer integer) throws Exception {
System.out.print(integer);
}
});
}
}
2. countByKey, collectAsMap
scala版本
object ActionPairRDDScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("ActionPairRDDScala")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(Array((1,2),(2,4),(2,5),(3,4),(3,5),(3,6)))
println(rdd.countByKey())
println(rdd.collectAsMap())
}
}
java版本
import com.google.common.collect.Lists;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Map;
public class ActionPairRDDJava {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local").setAppName("ActionPairRDDJava");
JavaSparkContext sc = new JavaSparkContext(conf);
//一步到位JavaPairRDD
JavaPairRDD<Integer,Integer> tupRDD = sc.parallelizePairs(Lists.newArrayList(new Tuple2<Integer,Integer>(1, 2),
new Tuple2<Integer,Integer>(2, 4),
new Tuple2<Integer,Integer>(2, 5),
new Tuple2<Integer,Integer>(3, 4),
new Tuple2<Integer,Integer>(3, 5),
new Tuple2<Integer,Integer>(3, 6)
));
// JavaPairRDD<Integer, Integer> mapRDD = JavaPairRDD.fromJavaRDD(tupRDD); //不好用
//countByKey
Map<Integer, Long> countByKeyRDD =tupRDD.countByKey();
for (Integer key :
countByKeyRDD.keySet()) {
System.out.println(key+","+countByKeyRDD.get(key));
}
System.out.println("------------------------");
//collectAsMap 打印输出方式一
Map<Integer, Integer> collectRDD = tupRDD.collectAsMap();
for (Map.Entry<Integer,Integer> entry :
collectRDD.entrySet()) {
System.out.println(entry.getKey()+"->"+entry.getValue());
}
//collectAsMap 打印输出方式二
System.out.println("-----------------");
Map<Integer, Integer> collectRDD1 = tupRDD.collectAsMap();
for (Integer key :
collectRDD1.keySet()) {
System.out.println(key+","+collectRDD1.get(key));
}
}
}
版权声明:本文为weixin_43434273原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。