大数据应用技术 | 使用 Docker搭建 Flink 集群 | FlinkStreaming实时数据流处理程序实现词频统计

运行环境


  • VMware
  • Centos7 虚拟机操作系统
  • Docker
  • Jdk8
  • Hadoop3.1.3
  • scala 2.11.8
  • flink-1.9.1
  • Maven3.6.3
  • IDEA

:在搭建Flink集群时需有Hadoop环境,尽管案例里没有用到HDFS,但通过资源调度管理的yarn组件启动Flink-shell 会有更好的效果。

关于Docker搭建Hadoop集群可以参考这篇博文 基于Docker搭建完全分布式集群

1. 安装、配置 Flink


1.1使用Xftp工具将Flink资源传入虚拟机

在这里插入图片描述

1.2 在虚拟机中将Flink资源传输到docker容器

$ sudo docker cp flink-1.9.1-bin-scala_2.11.tgz hadoop101:/opt/download

在这里插入图片描述

1.3 解压Flink资源并配置环境变量

tar -zxvf flink-1.9.1-bin-scala_2.12.tgz -C ./

mv flink-1.9.2 flink

vim /etc/profile.d/home.sh

source /etc/profile

在这里插入图片描述

2.Flink单节点测试


2.1 在hadoop101节点启动zookeeper 和 hadoop dfs 和 yarn

zkServer.sh start

start-dfs.sh

start-yarn.sh

在这里插入图片描述

2.2 以 Yarn方式启动 Flink Scala

./start-cluster.sh
./start-scala-shell.sh yarn

在这里插入图片描述

测试完毕后, :q 退出

3.配置 flink集群


在flink根目录下的conf文件夹中修改 masters 文件的内容为

hadoop101:8081

然后再修改workers文件夹的内容为

Hadoop101
Hadoop102
Hadoop103

配置 flink-conf.yaml文件:
jobmanager.rpc.address 的属性值设置为 hadoop101
再添加一行 taskmanager.tmp.dirs: /opt/module/flink/tmp 指定flink任务的缓存目录

接下来,分发配置好的 Flink资源到其他节点,即 hadoop102 和 hadoop103

scp -r ./flink hadoop103:/opt/module/flink
scp -r ./flink hadoop102:/opt/module/flink

在这里插入图片描述

4. 外部测试FlinkStreaming词频统计程序


创建IDEA项目,需下载 scala 插件,给项目配置scala的SDK,笔者这里的版本号是2.11.8

4.1 配置pom.xml 依赖

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.ycc</groupId>
        <artifactId>wordcount</artifactId>
        <name>WordCount</name>
        <packaging>jar</packaging>
        <version>1.0</version>
        <repositories>
            <repository>
                <id>alimaven</id>
                <name>aliyun maven</name>
                <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
            </repository>
	        <repository>
	            <id>maven1</id>
	            <name>maven1</name>
	            <url>http://repo.maven.apache.org/maven2</url>
	        </repository>
        </repositories>

    <properties>
        <flink.version>1.9.1</flink.version>
    </properties>
    <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-scala_2.11</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_2.11</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients_2.11</artifactId>
                <version>${flink.version}</version>
            </dependency>

            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>2.11.8</version>
            </dependency>
    </dependencies>
    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <pluginManagement></pluginManagement>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <id>compile-scala</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>compile</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <scalaVersion>2.11.8</scalaVersion>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

4.2 编写词频统计的Flink程序

WordCount.scala

这个程序主要就是使用 Flink 的API方法进行词频统计,通过flatMap转换操作将每一行的数据源根据空格进行分隔,并把所有字母统一为小写,之后再使用map操作使其变为key-value键值对,即(单词, 出现频数),最后在进行分组、求和,就得出了最终的词频统计结果 。当在IDEA中执行时,使用的是IDEA本地环境,没有发布到Flink集群环境上,故可直接在IDEA中进行测试。

package com.ycc

import org.apache.flink.api.scala._

object WordCount {
  def main(args: Array[String]): Unit = {

    //第1步:建立执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment

    //第2步:创建数据源
    val text = env.fromElements(
      "hello, world!",
      "hello, world!",
      "hello, world!")

    //第3步:对数据集指定转换操作
    val counts = text.flatMap { _.toLowerCase.split(" ") }
      .map { (_, 1) }
      .groupBy(0)
      .sum(1)

    // 第4步:输出结果
    counts.print()
  }
}

StreamWordCount.scala

与上个程序的不同是其数据源来自于IP:端口号的形式,通过在Flink集群的节点,即hadoop101节点向一个端口发送消息,然后程序向这个端口获取数据,获取后进行词频套机的操作,不同于之前,它用到了timeWindow窗口,而且最终有使用execute方法提交这个Job任务。

package com.ycc

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time

object StreamWordCount{
  def main(args: Array[String]): Unit = {

    //第1步:建立执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //第2步:创建数据源
    val source = env.socketTextStream("hadoop101",9999,'\n')

    //第3步:对数据集指定转换操作逻辑
    val dataStream = source.flatMap(_.split(" "))
      .map((_,1))
      .keyBy(0)
      .timeWindow(Time.seconds(2),Time.seconds(2))
      .sum(1)

    //第4步:指定计算结果输出位置
    dataStream.print()

    //第5步:指定名称并触发流计算
    env.execute("Flink Streaming Word Count")
  }
}

4.3 启动 Flink集群 ,测试词频统计程序

在这里插入图片描述

第一个程序,IDEA中的执行结果

在这里插入图片描述
测试第二个程序前需在hadoop101节点监听9999端口号 并发送消息

nc -lk 9999

在这里插入图片描述
第二个程序, FlinkStreaming程序在IDEA中的执行结果
在这里插入图片描述

5. 本地测试 将打包的FlinkStream程序 提交到Flink集群


5.1 在IDEA使用Maven打包FLink项目

在这里插入图片描述

5.2 在Flink集群节点执行jar包

在代码中,需监听hadoop101节点的端口,故在hadoop102节点提交Flink包
测试第一个程序的结果:

flink run --class com.ycc.WordCount ./wordcount-1.0.jar

在这里插入图片描述
测试第二个程序出现报错
在这里插入图片描述
在这里插入图片描述
通过访问Flink web 页面,地址: http://hadoop101:8081/查看报错信息

在这里插入图片描述
根据报错提示,发现出错原因是端口占用,可能是因为之前测试9999端口时,没有正常退出,现尝试将StreamWordCount.scala程序val source = env.socketTextStream(“hadoop101”,9999,’\n’)
这部分中的9999端口号改为12345,并重新打包,传输到docker容器,在提交任务前,需先打开hadoop101节点的监听端口 nc -lk 12345,再提交jar包,命令如下:

nc -lk 12345

flink run --class com.ycc.StreamWordCount ./wordcount-1.0.jar

查询词频统计输出的结果

tail -f ./log/flink*.out

因为没有使用后台执行的参数,现使用第三个节点通过ssh命令远程登陆hadoop102查看结果,最后的词频统计结果如下:
在这里插入图片描述
至此,已完成了 Flink集群的搭建以及 FlinkStreaming操作进行词频统计。


版权声明:本文为Unirithe原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。