Flink模拟天猫双十一实时统计系统

一、项目背景
在大数据实时计算方向,天猫双11的实时交易额是最具权威性的,当然技术架构也是相当复杂的,该项目由为简单实现,因为天猫双11的数据是多维度多系统,实时粒度更微小的,但是在技术的总体架构上是相近的,主要的组件都是用到大数据实时计算组件Flink。
二、准备工作
1.安装配置filebeat
在这里插入图片描述
编写beat-kafka.yml文件指定filebeat的input和output
在这里插入图片描述
2.安装kafka
在这里插入图片描述

修改配置文件
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
3.编写shell脚本
模拟每秒钟产生一条交易额数据,数据内容为用户id,购买商品的付款金额,用户所在城市及所购买的商品
在这里插入图片描述
为脚本添加权限
在这里插入图片描述
输出到double11.log文件
在这里插入图片描述
在这里插入图片描述

4.编写flink-api程序实时消费Kafka数据
(1)项目结构
在这里插入图片描述

(2)pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flink_double11</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>

        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <repositories>
        <repository>
            <id>maven-ali</id>
            <url>http://maven.aliyun.com/nexus/content/repositories/central</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
                <updatePolicy>always</updatePolicy>
                <checksumPolicy>fail</checksumPolicy>
            </snapshots>
        </repository>
    </repositories>
    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.0.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.0.0</version>
            <scope>provided</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.8_2.11</artifactId>
            <version>1.0.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>

    </dependencies>
</project>

(3)统计总的实时交易额代码

package com.fastweb;

import java.util.Properties;

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;

public class Double11Sum {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        env.enableCheckpointing(1000);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "hadoop01:9092");
        properties.setProperty("zookeeper.connect", "hadoop01:2181");
        properties.setProperty("group.id", "test");

        FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<String>("test", new SimpleStringSchema(),
                properties);

        DataStream<String> stream = env.addSource(myConsumer);

        DataStream<Tuple2<String, Integer>> counts = stream.flatMap(new LineSplitter()).keyBy(0).sum(1);
        counts.print();

        env.execute("Double 11 Real Time Transaction Volume");
    }

    //统计总的实时交易额
    public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        private static final long serialVersionUID = 1L;

        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            JSONObject object = JSONObject.parseObject(value);
            String message = object.getString("message");
            Integer price = Integer.parseInt(message.split(",")[1]);
            out.collect(new Tuple2<String, Integer>("price", price));
        }
    }
}

(4)运行报错修改
java.lang.NoClassDeFoundError:org/apache/flink/streaming/util/serialization/DeserializationSchema
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

勾选之后报错解决

三、实现过程
1.开启进程
顺序为zookeeper——Hadoop集群——flink——historyserver

2.开启filebeat
Filebeat实时监控double11.log产生的每条交易额记录,将记录实时流向到Kafka的topic
运行filebeat
在这里插入图片描述
在这里插入图片描述
3.开启Kafka
在这里插入图片描述
4.运行代码Double11Sum统计总的实时交易额待定
在这里插入图片描述
5.运行double11.sh脚本
在这里插入图片描述
在这里插入图片描述
出现log文件
在这里插入图片描述
6. Filebeat将交易额输出到Kafka
在这里插入图片描述
7.API总的实时交易额结果可视化
在这里插入图片描述
在这里插入图片描述
8.运行代码Double11SumCity按城市分类汇总实时交易额

package com.fastweb;

import java.util.Properties;

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;

public class Double11SumByCity {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        env.enableCheckpointing(1000);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "hadoop01:9092");
        properties.setProperty("zookeeper.connect", "hadoop01:2181");
        properties.setProperty("group.id", "test");

        FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<String>("test", new SimpleStringSchema(),
                properties);

        DataStream<String> stream = env.addSource(myConsumer);

        DataStream<Tuple2<String, Integer>> cityCounts = stream.flatMap(new CitySplitter()).keyBy(0).sum(1);
        cityCounts.print();

        env.execute("Double 11 Real Time Transaction Volume");
    }

    //按城市分类汇总实时交易额
    public static final class CitySplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        private static final long serialVersionUID = 1L;

        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            JSONObject object = JSONObject.parseObject(value);
            String message = object.getString("message");
            Integer price = Integer.parseInt(message.split(",")[1]);
            String city = message.split(",")[2];
            out.collect(new Tuple2<String, Integer>(city, price));
        }
    }
}

9.再次运行double11.sh脚本
10.API按城市分类汇总实时交易额结果可视化

在这里插入图片描述
在这里插入图片描述


版权声明:本文为bb123116原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。