Kafka(第四节)使用Kafka模拟实时数据看板项目

实时数据看板项目

实时数据看板

简单说来,实时数据看板就是要将实时产生的业务数据,收集起来进行一定分析将分析结果以或文字或图表的形式展示出来

需求分析

本文模拟一个用于订单数据分析的实时数据看板
1、需要一个部署在linux服务器上的jar程序来模拟产生实时的订单数据;
2、需要一个工具来采集这些实时的订单数据,本文将通过jar程序将数据写入到一个log文件中,使用flume来采集log文件中的数据,既然是要做实时分析,那么flume的sink就不能是hdfs或hive的,因为hdfs需要依托hive才能做数据分析,并且hdfs也不擅长频繁的向文件添加数据,而hive也不擅长频繁的insert数据,并且hive分析速度也较慢,所以文本将flume采集的数据使用Kafka来接收;
3、Kafka接收到数据后,马上将数据消费,并发送到内存数据库中,比如redis,在内存数据库中进行实时的数据分析,本文模拟到Kafka消费数据即止,不再模拟数据存入数据库和分析的过程。

项目代码开发

准备工作

1、在Kafka中新建一个topic来接收和消费flume发来的数据

bin/kafka-topics.sh  --create --replication-factor 2 \
--topic itcast_order --zookeeper node01:2181,node02:2181,node03:2181 --partitions 5

2、IDEA中创建一个maven项目,这个项目需要包含一个将订单数据存入到log文件的类、一个订单数据本身的JavaBean、一个Kafka消费者的类。
如下是pom文件:

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.0.0</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.41</version>
    </dependency>
    <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
        <version>2.9.0</version>
    </dependency>

    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
    </dependency>


</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>2.4.1</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <transformers>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                            	<!-- 模拟实时数据产生的类的路径,需要根据实际更改 -->
                                <mainClass>realboard.LoggerPrint</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
         <plugin>
                 <artifactId> maven-assembly-plugin </artifactId>
                 <configuration>
                      <descriptorRefs>
                           <descriptorRef>jar-with-dependencies</descriptorRef>
                      </descriptorRefs>
                      <archive>
                           <manifest>
                          		<!-- 模拟实时数据产生的类的路径,需要根据实际更改 -->
                                <mainClass>realboard.LoggerPrint</mainClass>
                           </manifest>
                      </archive>
                 </configuration>
                 <executions>
                      <execution>
                           <id>make-assembly</id>
                           <phase>package</phase>
                           <goals>
                                <goal>single</goal>
                           </goals>
                      </execution>
                 </executions>
            </plugin>
    </plugins>
</build>

代码开发

订单数据类

这个类的主要目的是通过random方法生成一条随机的订单数据。

import com.alibaba.fastjson.JSONObject;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.UUID;

public class PaymentInfo
{
    private static final long serialVersionUID = -7958315778386204397L;
    private String orderId;//订单编号
    private Date createOrderTime;//订单创建时间
    private String paymentId;//支付编号
    private Date paymentTime;//支付时间
    private String productId;//商品编号
    private String productName;//商品名称
    private long productPrice;//商品价格
    private long promotionPrice;//促销价格
    private String shopId;//商铺编号
    private String shopName;//商铺名称
    private String shopMobile;//商品电话
    private long payPrice;//订单支付价格
    private int num;//订单数量
    /**
     * <Province>19</Province>
     * <City>1657</City>
     * <County>4076</County>
     */
    private String province; //省
    private String city; //市
    private String county;//县
    //102,144,114
    private String catagorys;
    public String getProvince() {
        return province;
    }
    public void setProvince(String province) {
        this.province = province;
    }

    public String getCity() {
        return city;
    }

    public void setCity(String city) {
        this.city = city;
    }

    public String getCounty() {
        return county;
    }

    public void setCounty(String county) {
        this.county = county;
    }

    public String getCatagorys() {
        return catagorys;
    }

    public void setCatagorys(String catagorys) {
        this.catagorys = catagorys;
    }

    public PaymentInfo() {
    }

    public PaymentInfo(String orderId, Date createOrderTime, String paymentId, Date paymentTime, String productId, String productName, long productPrice, long promotionPrice, String shopId, String shopName, String shopMobile, long payPrice, int num) {
        this.orderId = orderId;
        this.createOrderTime = createOrderTime;
        this.paymentId = paymentId;
        this.paymentTime = paymentTime;
        this.productId = productId;
        this.productName = productName;
        this.productPrice = productPrice;
        this.promotionPrice = promotionPrice;
        this.shopId = shopId;
        this.shopName = shopName;
        this.shopMobile = shopMobile;
        this.payPrice = payPrice;
        this.num = num;
    }

    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public Date getCreateOrderTime() {
        return createOrderTime;
    }

    public void setCreateOrderTime(Date createOrderTime) {
        this.createOrderTime = createOrderTime;
    }

    public String getPaymentId() {
        return paymentId;
    }

    public void setPaymentId(String paymentId) {
        this.paymentId = paymentId;
    }

    public Date getPaymentTime() {
        return paymentTime;
    }

    public void setPaymentTime(Date paymentTime) {
        this.paymentTime = paymentTime;
    }

    public String getProductId() {
        return productId;
    }

    public void setProductId(String productId) {
        this.productId = productId;
    }

    public String getProductName() {
        return productName;
    }

    public void setProductName(String productName) {
        this.productName = productName;
    }

    public long getProductPrice() {
        return productPrice;
    }

    public void setProductPrice(long productPrice) {
        this.productPrice = productPrice;
    }

    public long getPromotionPrice() {
        return promotionPrice;
    }

    public void setPromotionPrice(long promotionPrice) {
        this.promotionPrice = promotionPrice;
    }

    public String getShopId() {
        return shopId;
    }

    public void setShopId(String shopId) {
        this.shopId = shopId;
    }

    public String getShopName() {
        return shopName;
    }

    public void setShopName(String shopName) {
        this.shopName = shopName;
    }

    public String getShopMobile() {
        return shopMobile;
    }

    public void setShopMobile(String shopMobile) {
        this.shopMobile = shopMobile;
    }

    public long getPayPrice() {
        return payPrice;
    }

    public void setPayPrice(long payPrice) {
        this.payPrice = payPrice;
    }

    public int getNum() {
        return num;
    }

    public void setNum(int num) {
        this.num = num;
    }

    @Override
    public String toString() {
        return "PaymentInfo{" +
                "orderId='" + orderId + '\'' +
                ", createOrderTime=" + createOrderTime +
                ", paymentId='" + paymentId + '\'' +
                ", paymentTime=" + paymentTime +
                ", productId='" + productId + '\'' +
                ", productName='" + productName + '\'' +
                ", productPrice=" + productPrice +
                ", promotionPrice=" + promotionPrice +
                ", shopId='" + shopId + '\'' +
                ", shopName='" + shopName + '\'' +
                ", shopMobile='" + shopMobile + '\'' +
                ", payPrice=" + payPrice +
                ", num=" + num +
                '}';
    }

    public String random() {
        this.orderId = UUID.randomUUID().toString().replaceAll("-", "");
        this.paymentId = UUID.randomUUID().toString().replaceAll("-", "");
        this.productPrice = new Random().nextInt(1000);
        this.promotionPrice = new Random().nextInt(500);
        this.payPrice = new Random().nextInt(480);
        this.shopId = new Random().nextInt(200000)+"";

        this.catagorys = new Random().nextInt(10000)+","+new Random().nextInt(10000)+","+new Random().nextInt(10000);
        this.province = new Random().nextInt(23)+"";
        this.city = new Random().nextInt(265)+"";
        this.county = new Random().nextInt(1489)+"";

        String date = "2015-11-11 12:22:12";
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        try {
            this.createOrderTime = simpleDateFormat.parse(date);
        } catch (ParseException e) {
            e.printStackTrace();
        }
        JSONObject obj = new JSONObject();
        String jsonString = obj.toJSONString(this);
        return jsonString;
        //  return new Gson().toJson(this);
    }

}

将订单数据存到log文件的类

首先需要配置一个log文件的设置文件,放到maven项目的resources下面,具体配置如下:

### 设置###
log4j.rootLogger = debug,stdout,D,E

### 输出信息到控制抬 ###
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n

### 输出DEBUG 级别以上的日志到=E://logs/error.log ###
log4j.appender.D = org.apache.log4j.DailyRollingFileAppender
#订单数据的输出路径
log4j.appender.D.File = /export/servers/orderLogs/orderinfo.log
log4j.appender.D.Append = true
log4j.appender.D.Threshold = DEBUG 
log4j.appender.D.layout = org.apache.log4j.PatternLayout
#log4j.appender.D.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss}  [ %t:%r ] - [ %p ]  %m%n
log4j.appender.D.layout.ConversionPattern = %m%n

### 输出ERROR 级别以上的日志到=E://logs/error.log ###
log4j.appender.E = org.apache.log4j.DailyRollingFileAppender
#程序异常信息的输出路径
log4j.appender.E.File = /export/servers/orderLogs/ordererror.log
log4j.appender.E.Append = true
log4j.appender.E.Threshold = ERROR 
log4j.appender.E.layout = org.apache.log4j.PatternLayout
#log4j.appender.E.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss}  [ %t:%r ] - [ %p ]  %m%n
log4j.appender.E.layout.ConversionPattern =  %m%n

然后写一个类调用数据类的random方法,并将产生的数据存入到log文件

import org.apache.log4j.Logger;

public class LoggerPrint
{
    private static Logger logger = Logger.getLogger(LoggerPrint.class);

    public static void main(String[] args) throws InterruptedException {

        PaymentInfo paymentInfo = new PaymentInfo();
        while (true){
            String random = paymentInfo.random();
            logger.info(random);
            Thread.sleep(500);
        }
    }

}

项目打包,并部署到Linux运行

将maven项目打成Jar包,文本将Jar包命名为flume_kafka-1.0-SNAPSHOT-jar-with-dependencies.jar,将它上传到Linux机器上,在Jar包的目录下运行以下命令,就可以看log文件产生,并且数据才持续增加。

java -jar flume_kafka-1.0-SNAPSHOT-jar-with-dependencies.jar

配置flume

文件的flume要采集log文件的数据并发送给Kafka,flume的conf文件配置如下:

#为我们的source channel  sink起名
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#指定我们的source收集到的数据发送到哪个管道
a1.sources.r1.channels = c1
#指定我们的source数据收集策略
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /var/log/flume/taildir_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /export/servers/orderLogs/orderinfo.log

#指定我们的channel为memory,即表示所有的数据都装进memory当中
a1.channels.c1.type = memory
#指定我们的sink为kafka  sink,并指定我们的sink从哪个channel当中读取数据
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = itcast_order
a1.sinks.k1.kafka.bootstrap.servers = node01:9092,node02:9092,node03:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1

配置好conf后,运行flume,将数据发送到Kafka中。

Kafka消费者的类

在之前的maven项目中继续开发,新建包,包下新建一个用于Kafka消费者的类。这个类不按顺序消费数据,按分区进行消费,一个分区一个分区的消费,并且手动提交offset。
代码如下:

import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import realboard.PaymentInfo;

import java.util.*;

public class KafkaConsumer
{
    public static void main(String[] args)
    {
        Properties props=new Properties();
        //指定kafka服务器地址
        props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
        //定义消费者组,是顺便定义的
        props.put("group.id", "order group");

        //使用消费者自动提交offset值
        // props.put("enable.auto.commit", "true");
        //每次自动提交offset值的时间间隔
        // props.put("auto.commit.interval.ms",  "1000");
        //使用消费者手动提交offset值
        props.put("enable.auto.commit", "false");
        //定义key的序列化
        props.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        //定义value的序列化
        props.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");


        org.apache.kafka.clients.consumer.KafkaConsumer<String,String> consumer=new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(props);
        //订阅名为test的topic
        consumer.subscribe(Arrays.asList("itcast_order"));

        while (true)
        {
            ConsumerRecords<String, String> records=consumer.poll(100);
            //按分区消费
            Set<TopicPartition> partitions = records.partitions(); //获取当前topic全部的分区
            for (TopicPartition partition : partitions)
            {
                List<ConsumerRecord<String,String>> partitionRecords=records.records(partition);
                for (ConsumerRecord<String, String> record : partitionRecords)
                {
                    String value=record.value();
                    PaymentInfo paymentInfo=JSONObject.parseObject(value,PaymentInfo.class);
                    System.out.println(paymentInfo.toString());                                
                }
                //获取当前分区最后一条数据的offset
                long lastOffset=partitionRecords.get(partitionRecords.size()-1).offset();
                //按分区提交offset
                consumer.commitSync(Collections.singletonMap(partition,
                        new OffsetAndMetadata(lastOffset+1)));
            }
        }
    }
}

直接运行这个类,可以看到Kafka开始消费数据,后面的数据存入内存数据库和数据分析不继续模拟。


版权声明:本文为hwq317622817原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。