文章目录
运行环境
- VMware
- Centos7 虚拟机操作系统
- Docker
- Jdk8
- Hadoop3.1.3
- scala 2.11.8
- flink-1.9.1
- Maven3.6.3
- IDEA
注:在搭建Flink集群时需有Hadoop环境,尽管案例里没有用到HDFS,但通过资源调度管理的yarn组件启动Flink-shell 会有更好的效果。
关于Docker搭建Hadoop集群可以参考这篇博文 基于Docker搭建完全分布式集群
1. 安装、配置 Flink
1.1使用Xftp工具将Flink资源传入虚拟机

1.2 在虚拟机中将Flink资源传输到docker容器
$ sudo docker cp flink-1.9.1-bin-scala_2.11.tgz hadoop101:/opt/download

1.3 解压Flink资源并配置环境变量
tar -zxvf flink-1.9.1-bin-scala_2.12.tgz -C ./
mv flink-1.9.2 flink
vim /etc/profile.d/home.sh
source /etc/profile

2.Flink单节点测试
2.1 在hadoop101节点启动zookeeper 和 hadoop dfs 和 yarn
zkServer.sh start
start-dfs.sh
start-yarn.sh

2.2 以 Yarn方式启动 Flink Scala
./start-cluster.sh
./start-scala-shell.sh yarn

测试完毕后, :q 退出
3.配置 flink集群
在flink根目录下的conf文件夹中修改 masters 文件的内容为
hadoop101:8081
然后再修改workers文件夹的内容为
Hadoop101
Hadoop102
Hadoop103
配置 flink-conf.yaml文件:
将 jobmanager.rpc.address 的属性值设置为 hadoop101
再添加一行 taskmanager.tmp.dirs: /opt/module/flink/tmp 指定flink任务的缓存目录
接下来,分发配置好的 Flink资源到其他节点,即 hadoop102 和 hadoop103
scp -r ./flink hadoop103:/opt/module/flink
scp -r ./flink hadoop102:/opt/module/flink

4. 外部测试FlinkStreaming词频统计程序
创建IDEA项目,需下载 scala 插件,给项目配置scala的SDK,笔者这里的版本号是2.11.8
4.1 配置pom.xml 依赖
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.ycc</groupId>
<artifactId>wordcount</artifactId>
<name>WordCount</name>
<packaging>jar</packaging>
<version>1.0</version>
<repositories>
<repository>
<id>alimaven</id>
<name>aliyun maven</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
<repository>
<id>maven1</id>
<name>maven1</name>
<url>http://repo.maven.apache.org/maven2</url>
</repository>
</repositories>
<properties>
<flink.version>1.9.1</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<pluginManagement></pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>compile-scala</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-compile-first</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>2.11.8</scalaVersion>
</configuration>
</plugin>
</plugins>
</build>
</project>
4.2 编写词频统计的Flink程序
WordCount.scala
这个程序主要就是使用 Flink 的API方法进行词频统计,通过flatMap转换操作将每一行的数据源根据空格进行分隔,并把所有字母统一为小写,之后再使用map操作使其变为key-value键值对,即(单词, 出现频数),最后在进行分组、求和,就得出了最终的词频统计结果 。当在IDEA中执行时,使用的是IDEA本地环境,没有发布到Flink集群环境上,故可直接在IDEA中进行测试。
package com.ycc
import org.apache.flink.api.scala._
object WordCount {
def main(args: Array[String]): Unit = {
//第1步:建立执行环境
val env = ExecutionEnvironment.getExecutionEnvironment
//第2步:创建数据源
val text = env.fromElements(
"hello, world!",
"hello, world!",
"hello, world!")
//第3步:对数据集指定转换操作
val counts = text.flatMap { _.toLowerCase.split(" ") }
.map { (_, 1) }
.groupBy(0)
.sum(1)
// 第4步:输出结果
counts.print()
}
}
StreamWordCount.scala
与上个程序的不同是其数据源来自于IP:端口号的形式,通过在Flink集群的节点,即hadoop101节点向一个端口发送消息,然后程序向这个端口获取数据,获取后进行词频套机的操作,不同于之前,它用到了timeWindow窗口,而且最终有使用execute方法提交这个Job任务。
package com.ycc
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
object StreamWordCount{
def main(args: Array[String]): Unit = {
//第1步:建立执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//第2步:创建数据源
val source = env.socketTextStream("hadoop101",9999,'\n')
//第3步:对数据集指定转换操作逻辑
val dataStream = source.flatMap(_.split(" "))
.map((_,1))
.keyBy(0)
.timeWindow(Time.seconds(2),Time.seconds(2))
.sum(1)
//第4步:指定计算结果输出位置
dataStream.print()
//第5步:指定名称并触发流计算
env.execute("Flink Streaming Word Count")
}
}
4.3 启动 Flink集群 ,测试词频统计程序

第一个程序,IDEA中的执行结果

测试第二个程序前需在hadoop101节点监听9999端口号 并发送消息
nc -lk 9999

第二个程序, FlinkStreaming程序在IDEA中的执行结果
5. 本地测试 将打包的FlinkStream程序 提交到Flink集群
5.1 在IDEA使用Maven打包FLink项目

5.2 在Flink集群节点执行jar包
在代码中,需监听hadoop101节点的端口,故在hadoop102节点提交Flink包
测试第一个程序的结果:
flink run --class com.ycc.WordCount ./wordcount-1.0.jar

测试第二个程序出现报错

通过访问Flink web 页面,地址: http://hadoop101:8081/查看报错信息

根据报错提示,发现出错原因是端口占用,可能是因为之前测试9999端口时,没有正常退出,现尝试将StreamWordCount.scala程序val source = env.socketTextStream(“hadoop101”,9999,’\n’)
这部分中的9999端口号改为12345,并重新打包,传输到docker容器,在提交任务前,需先打开hadoop101节点的监听端口 nc -lk 12345,再提交jar包,命令如下:
nc -lk 12345
flink run --class com.ycc.StreamWordCount ./wordcount-1.0.jar
查询词频统计输出的结果
tail -f ./log/flink*.out
因为没有使用后台执行的参数,现使用第三个节点通过ssh命令远程登陆hadoop102查看结果,最后的词频统计结果如下:
至此,已完成了 Flink集群的搭建以及 FlinkStreaming操作进行词频统计。