1、linux写入指令:
nc -lk 9999
nc代表当前节点的Ip名字,后面的9999是自己定义的端口号
2、在java中,创建maven项目,导入maven依赖:
<properties>
<java.version>1.8</java.version>
<spark.version>2.1.0</spark.version>
<scala.version>2.11</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.11</version>
</dependency>
</dependencies>
3、java代码:
public class SparkStream {
public static void main(String[] args) throws InterruptedException {
//1、设置环境
SparkConf conf=new SparkConf().setAppName("Streaming").setMaster("local[2]");
//2、生成sparkStreaming对象,设置时间片的间隔
JavaStreamingContext stream = new JavaStreamingContext(conf, Durations.seconds(10));
//3、实现TCP连接,获取实时数据
JavaReceiverInputDStream<String> dStream = stream.socketTextStream("hadoop66", 9999, StorageLevel.MEMORY_AND_DISK());
//4、拆分数据保存在Iterator集合中
JavaDStream<String> stringJavaDStream = dStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
String[] word = s.split(",");
//将数组转换为List集合
List<String> list = Arrays.asList(word);
return list.iterator();
}
});
//5、封装成一个个键值对
JavaPairDStream<String, Integer> hostName = stringJavaDStream.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String, Integer>(s, 1);
}
});
//6、统计次数
JavaPairDStream<String, Integer> javaPairDStream = hostName.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
//7、打印结果
javaPairDStream.print();
//8、开启流式计算
stream.start();
//9、等待时间片中数据算完,把结果返回来
stream.awaitTermination();
stream.stop();
}
}
4、在linux界面输入数据,用“,”隔开,以回车键写入
hadoop66:~$ nc -lk 9999
ge,he,ge,le,we,te
hello,hi,hello,hi
th,th
hahh,hah,hah
版权声明:本文为weixin_43365615原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。