第二部分Flink快速应用

Flink

第二部分Flink快速应用

通过一个单词统计的案例,快速上手应用Flink,进行流处理(Streaming)和批处理(Batch)

1、单词统计案例(批处理)

1.1需求

统计一个文件中各个单词出现的次数,把统计结果输出到文件

1.2代码实现

  • 引入依赖

    <dependencies>
            <!--flink核心包-->
            <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>1.14.4</version>
            </dependency>
            <!--flink流处理包-->
            <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_2.12</artifactId>
                <version>1.14.4</version>
                <scope>provided</scope>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients_2.12</artifactId>
                <version>1.14.4</version>
            </dependency>
        </dependencies>
    
  • Java程序

    package com.sugar.test;
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.java.DataSet;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.operators.AggregateOperator;
    import org.apache.flink.api.java.operators.DataSource;
    import org.apache.flink.api.java.operators.FlatMapOperator;
    import org.apache.flink.api.java.operators.UnsortedGrouping;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.util.Collector;
    
    public class BatchDemo {
    
        public static void main(String[] args) throws Exception {
            // 1、获取flink的执行环境
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            // 2、获取待分析数据
            String input = "E:\\hello.txt";
            String output = "E:\\out.txt";
            DataSource<String> dataSource = executionEnvironment.readTextFile(input);
            // 3、处理数据
            FlatMapOperator<String, Tuple2<String, Integer>> flatMapOperator = dataSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                    //3、1 将文本内容打散成一个个单词
                    String[] split = s.split(" ");
                    for (String word : split) {
                        Tuple2<String, Integer> tuple2 = new Tuple2<>(word, 1);
                        collector.collect(tuple2);
                    }
                }
            });
            // 3、2 将相同的单词聚在一起
            UnsortedGrouping<Tuple2<String, Integer>> group = flatMapOperator.groupBy(0);
            // 3、3 将聚合到的数据累加操作
            AggregateOperator<Tuple2<String, Integer>> sum = group.sum(1);
            DataSet<Tuple2<String, Integer>> result = sum;
            // 4、保存处理结果
            result.writeAsText(output);
            // 5、触发程序进行
            executionEnvironment.execute("wordCount batch process");
        }
    
    }
    
    
  • 结果

在这里插入图片描述

2、单词统计案例(流数据)

使用nc监听端口并持续发送数据(netcat)

yum install nc -y
nc -lp 7777

2.1需求

Socket模拟实时发送单词,使用Flink实时接收数据,对指定时间窗口内(如5s)的数据进行聚合统计,每隔1s汇总计算一次,并且把时间窗口内计算结果打印出来。

2.2代码实现

package com.sugar.test;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class StreamDemo {
    public static void main(String[] args) throws Exception {
        // 获取流的执行环境
        StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        String hostname = "192.168.13.10";
        int port = 7777;
        // 获取数据源
        DataStreamSource<String> dataStreamSource = streamExecutionEnvironment.socketTextStream(hostname, port);
        // 打散
        SingleOutputStreamOperator<Tuple2<String, Integer>> singleOutputStreamOperator = dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                for (String word : s.split(" ")) {
                    collector.collect(Tuple2.of(word, 1));
                }
            }
        });
        // 分组统计
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = singleOutputStreamOperator.keyBy(0).sum(1);
        // 打印
        sum.print();
        // 触发任务执行
        streamExecutionEnvironment.execute("wordCount stream process");
    }
}

问题解决:

在这里插入图片描述

Flink程序开发的流程总结如下:

1)获得一个执行环境

2)加载/创建初始化数据

3)指定数据操作的算子

4)指定结果数据存放位置

5)调用execute()触发执行程序

注意:Flink程序是延迟计算的,只有最后调用execute()方法的时候才会真正触发执行程序


版权声明:本文为qq_45420103原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。