Flink开发项目中遇到的一些问题总汇(持续更新)

问题 解决:
1、(Constants.A, Constants.B) 是定位到具体配置
2、设置flink全局变量

env.getConfig.setGlobalJobParameters(C)

但是某些算子可能用到非全局变量中的一些配置例如:map、process中open 函数中的配置需要局部配置变量。
3、map和process的联系:
map方法不允许缺少数据,也就是原来多少条数据,处理后依然是多少条数据,只是用来做转换。
本次开发map函数用来将source流中的数据转换成json model 对象

.map(JsonUtil.fromJson[LogModel] _).setParallelism(dealParallelism)

为什么不在map中写逻辑规则,是因为map方法不允许缺少数据,数据不知道什么时候来数据调用map需要吃资源,采用process,可以懒加载???
process中:
events (流元素)
state (容错,一致性,只在Keyed Stream)
timers (事件时间和处理时间, 只在keyed stream)
eg:

  .process(new FProcess)

  class FProcess extends ProcessFunction[LogModel, (Object, Boolean)] {

    //mutable.HashSet可变的hashset
    val cmdSet: mutable.HashSet[String] = new mutable.HashSet[String]()

    override def open(parameters: Configuration): Unit = {
    //配置局部变量
      val globConf = getRuntimeContext.getExecutionConfig.getGlobalJobParameters.asInstanceOf[Configuration]
      val cmdStr = globConf.getString(Constants.D, "")
      val splits = cmdStr.split("\\|")
      for (cmd <- splits) {
        cmdSet.add(cmd)
      }
 
    }
    override def processElement(model: LogModel, ctx: ProcessFunction[LogModel, (Object, Boolean)]#Context,
                                out: Collector[(Object, Boolean)]): Unit = {

      val action = model.action
      if (action.length > 0) {
        breakable {
          for (cmd <- cmdSet) {
            if (action.contains(cmd)) {
              val entity = new E
              entity.setCmd()
              entity.setSourceIp(model.A)
              entity.setUserName(model.B)
              entity.setDestinationIp(model.C)
              entity.setWarnLevel(4)
              out.collect((entity, true))
              break()
            }
          }
        }

}

4、
E:RabbitMQ 中该模块的queue,类似kafka中的topic

val stream4A = env.addSource(new RMQSource(connectionConfig, E, false, new SimpleStringSchema()))    

5、个人理解:因为某些业务有时间限制,而数据源本身会因为网略等因素出现延迟等现象,采用时间戳得方式判断不够严谨,所以采用封装event采用判断事件发生的时间更加合理。

AssignerWithPeriodicWatermarks 周期性的生成watermark,生成间隔可配置,根据数据的eventTime来更新watermark时间
AssignerWithPunctuatedWatermarks 不会周期性生成watermark,只根据元素的eventTime来更新watermark。
当用EventTime和ProcessTime来计算时,元素本身都是不带时间戳的,只有以IngestionTime计算时才会打上进入系统的时间戳。

以下为AssignerWithPunctuatedWatermarks需要重写得两个方法

 override def checkAndGetNextWatermark(lastElement: LogModel, extractedTimestamp: Long): Watermark = {
    //设定一个10s的缓冲区
      new Watermark(extractedTimestamp - 10000)
    }

    override def extractTimestamp(element: LogModel, previousElementTimestamp: Long): Long = {
    //事件时间
      element.logTime
    }

无论何时一个特定的事件表明一个新的watermark可能需要被创建,都使用AssignerWithPunctuatedWatermarks来生成。
在这个类中Flink首先调用extractTimestamp(…)来为元素分配一个timestamp,然后立即调用该元素上的checkAndGetNextWatermark(…)方法。
checkAndGetNextWatermark(…)方法传入在extractTimestamp(…)方法中分配的timestamp,并决定是否需要生产watermark。一旦checkAndGetNextWatermark(…)返回一个非空的watermark并且watermark比前一个watermark大的话,这个新的watermark将会被发送。

6、Timestamp和Watermark的关系和区别:

指定时间戳(Assigning Timestamps)
为了使用eventtime,Flink需要知道事件的时间戳,也就是说数据流中的元素需要分配一个事件时间戳。
这个通常是通过抽取或者访问事件中某些字段的时间戳来获取的。
时间戳的分配伴随着水印的生成,告诉系统事件时间中的进度。
这里有两种方式来分配时间戳和生成水印:
1、直接在数据流源中进行
2、通过timestamp assigner和watermark generator生成:在Flink中,timestamp 分配器也定义了用来发射的水印。
注意:timestamp和watermark都是通过从1970年1月1日0时0分0秒到现在的毫秒数来指定的。

有Timestamp和Watermark的源函数(Source Function with Timestamps And Watermarks)
数据流源可以直接为它们产生的数据元素分配timestamp,并且他们也能发送水印。
这样做的话,就没必要再去定义timestamp分配器了,需要注意的是:如果一个timestamp分配器被使用的话,由源提供的任何timestamp和watermark都会被重写。
为了通过源直接为一个元素分配一个timestamp,源需要调用SourceContext中的collectWithTimestamp(…)方法。为了生成watermark,源需要调用emitWatermark(Watermark)方法。
下面是一个简单的(无checkpoint)由源分配timestamp和产生watermark的例子:

override def run(ctx: SourceContext[MyType]): Unit = {
    while (/* condition */) {
        val next: MyType = getNext()
        ctx.collectWithTimestamp(next, next.eventTimestamp)

        if (next.hasWatermarkTime) {
            ctx.emitWatermark(new Watermark(next.getWatermarkTime))
        }
    }
}

7、TimeStamp分配器和Watermark生成器(Timestamp Assigners / Watermark Generators)
Timestamp分配器获取一个流并生成一个新的带有时间戳元素和水印的流。
如果原来的流中已经有了timestamp和/或水印的话,这个timestamp分配器会覆盖掉。
Timestamp分配器常常在数据源之后就立即指定了,但是并不是要严格这么做,一个常用的模式是先解析(MapFunction)和过滤(FilterFunction)后再指定timestamp 分配器。在任何情况下,时间戳分配器都必须在第一个在事件时间上运行的操作(如:第一个时间窗口操作)之前指定。有一个特殊情况,当使用Kafka作为流作业的数据源时,Flink允许在源内部指定timestamp分配器和watermark生成器。更多关于如何进行的信息请参考Kafka Connector的文档。
接下来的部分展示了要创建自己的timestamp 抽取器和watermark发射器,程序员需要实现的主要接口。想要查看Flink预定义的抽取器,请前往预定于Timestamp Extractors/Watermark Emitter页面。

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val stream: DataStream[MyEvent] = env.readFile(
         myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
         FilePathFilter.createDefaultFilter());

val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
        .filter( _.severity == WARNING )
        .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())

withTimestampsAndWatermarks
        .keyBy( _.getGroup )
        .timeWindow(Time.seconds(10))
        .reduce( (a, b) => a.add(b) )
        .addSink(...)

8、周期性水印(With Periodic Watermarks):

AssignerWithPeriodicWatermarks周期性地分配timestamp和生成watermark(可能依赖于元素或者纯粹基于处理时间)。
watermark产生的事件间隔(每n毫秒)是通过ExecutionConfig.setAutoWatermarkInterval(…)来定义的,每当分配器的getCurrentWatermark()方法被调用时,如果返回的watermark是非空并且大于上一个watermark的话,一个新的watermark将会被发射。

class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {

    val maxOutOfOrderness = 3500L; // 3.5 seconds

    var currentMaxTimestamp: Long;

    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
        val timestamp = element.getCreationTime()
        currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
        timestamp;
    }

    override def getCurrentWatermark(): Watermark = {
        // return the watermark as current highest timestamp minus the out-of-orderness bound
        new Watermark(currentMaxTimestamp - maxOutOfOrderness);
    }
}

class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {

    val maxTimeLag = 5000L; // 5 seconds

    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
        element.getCreationTime
    }

    override def getCurrentWatermark(): Watermark = {
        // return the watermark as current time minus the maximum time lag
        new Watermark(System.currentTimeMillis() - maxTimeLag)
    }
}

9、带断点的水印(With Punctuated Watermarks)
无论何时一个特定的事件表明一个新的watermark可能需要被创建,都使用AssignerWithPunctuatedWatermarks来生成。在这个类中Flink首先调用extractTimestamp(…)来为元素分配一个timestamp,然后立即调用该元素上的checkAndGetNextWatermark(…)方法。
checkAndGetNextWatermark(…)方法传入在extractTimestamp(…)方法中分配的timestamp,并决定是否需要生产watermark。一旦checkAndGetNextWatermark(…)返回一个非空的watermark并且watermark比前一个watermark大的话,这个新的watermark将会被发送。

class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] {

    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
        element.getCreationTime
    }

    override def checkAndGetNextWatermark(lastElement: MyEvent, extractedTimestamp: Long): Watermark = {
        if (lastElement.hasWatermarkMarker()) new Watermark(extractedTimestamp) else null
    }
}

#------------------------(2020年6月6日 11:37:36)-------------------------------
最近太忙了一直没有更新
10、本次项目开发数据流向
多Sink 写出 Flink有很好的支持,不要像我傻兮兮的去找,自己实际动手测试一下。

数据源:RMQ (读取数据源没什么大问题,读嘛,能有什么问题呢)

数据处理:根据业务处理做不同处理。

数据输出:这个是真的坑,还花我钱买专栏,Flink 支持多路输出直接将处理好的数据直接addsink 就好,没必要想太多,当时太菜了,以为中间可能会有什么坑,也懒得试就直接买了某个专栏 。啥也不是。

本次项目数据输出有点多:

1、输出到Mysql的告警表,做前端展示。
2、将产生告警的数据的原始数据写入ES,做数据追溯
3、将告警数据写入RMQ1,做告警得归并,入另一个系统。
项目迭代:新写入一个RMQ2 ,做复杂事件处理。

写入Mysql,调用写好得SqlHelper ,这个我会在后期项目结束,将完整的SqlHelper 发出来

//写入mysql
val alartId: Long = sqlHelper.insertGeneratedKey(entity)

//写入mysql的Help方法
 public long insertGeneratedKey(Object object) {
        if (object == null) {
            return -1L;
        }

        //获取类型类
        Class clazz = object.getClass();
        //获取连接
        conn = C3P0Util.getConnection();
        //获取该类的所有字段
        Field[] fields = clazz.getDeclaredFields();

        StringBuilder sql = new StringBuilder();
        StringBuilder sqlValue = new StringBuilder();

        //获取该类的数据库名和表名
        Table table = (Table) clazz.getAnnotation(Table.class); //得到类的注解
        String tableName = table.name();

        //编辑插入sql

        sql.append("INSERT INTO ").append(getDatabaseTableName(database,tableName)).
            append(" (");

        for (int i = 0; i < fields.length; i++) {

            //使其可以访问私有字段
            fields[i].setAccessible(true);
            //根据字段获取起get方法
            PropertyDescriptor propertyDescriptor = null;
            try {
                propertyDescriptor = new PropertyDescriptor(fields[i].getName(),
                    clazz);
            } catch (IntrospectionException e) {
                e.printStackTrace();
            }

            if (propertyDescriptor != null) {
                Method method = propertyDescriptor.getReadMethod();

                //获取其对应数据库的字段名
                Column column = method.getAnnotation(Column.class);
                sql.append(column.name());
                sqlValue.append("?");

                if (i != fields.length - 1) {
                    sql.append(",");
                    sqlValue.append(",");
                }
            }
        }

        sql.append(") ").append("VALUES (").append(sqlValue).append(")");

        try {
            ps = conn.prepareStatement(sql.toString(), Statement.RETURN_GENERATED_KEYS);
            logger.info(sql.toString());
            Field[] fieldsTemp = clazz.getDeclaredFields();

            for (int i = 0; i < fieldsTemp.length; i++) {
                fieldsTemp[i].setAccessible(true);
                ps.setObject(i + 1, fieldsTemp[i].get(object));
            }

            ps.execute();
            rs = ps.getGeneratedKeys();
            if (rs.next()) {
                return rs.getLong(1);
            }

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            C3P0Util.close(conn, ps, rs);
        }
        return -1L;
    }

    //写入es
    stream.flatMap(new FromModelToMapFunction).addSink(new ElasticsearchSink[java.util.Map[String, Object]](ESConfig, transportAddresses,
      new WarnLogESSink(esIndex, esType))).setParallelism(sinkParallelism)
//FromModelToMapFunction
class FromModelToMapFunction extends RichFlatMapFunction[(Object, String, Int, Long, ArrayBuffer[LogModel]),
  java.util.Map[String, Object]] {


  override def flatMap(value: (Object, String, Int, Long, ArrayBuffer[LogModel]),
                       out: Collector[java.util.Map[String, Object]]): Unit = {
    val ab = value._5
    for (model <- ab) {
      val option = LogModel.fromModelToMap(value._2, value._3, value._4, model)
      val map: util.Map[String, Object] = option.get
      //        if (!map.isEmpty) {
      //        }
      out.collect(map)

    }

  }
}

//WarnLogESSink
class WarnLogESSink(index: String, `type`: String)
//class AlarmLogESSink(index: String, `type`: String)
  extends ElasticsearchSinkFunction[java.util.Map[String, Object]] {

  def createIndexRequest(element: java.util.Map[String, Object]): IndexRequest = {

    return Requests.indexRequest()
      .index(index)
      //      .index(index)
      .`type`(`type`)
      .source(element)
  }

  override def process(t: util.Map[String, Object], runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
    requestIndexer.add(createIndexRequest(t))

  }
}

//

 //写入rmq
    stream.map(t => JsonUtil.toJson(t._1)).addSink(new RichRMQSink(connectionConfig, rmqQueueName,
      new SimpleStringSchema())).setParallelism(sinkParallelism)

是因为Flink 源码中的问题,这里我粘出来。

//这是RmqSink中setupQueue方法

    /**
     * Sets up the queue. The default implementation just declares the queue. The user may override
     * this method to have a custom setup for the queue (i.e. binding the queue to an exchange or
     * defining custom queue parameters)
     */
    protected void setupQueue() throws IOException {
        if (queueName != null) {
            channel.queueDeclare(queueName, false, false, false, null);
        }
    }
//这是RmqSource 中的setupQueue  方法
    /**
     * Sets up the queue. The default implementation just declares the queue. The user may override
     * this method to have a custom setup for the queue (i.e. binding the queue to an exchange or
     * defining custom queue parameters)
     */
    protected void setupQueue() throws IOException {
        channel.queueDeclare(queueName, true, false, false, null);
    }

参数解释:
第二个参数是durable。队列是否持久化。false为不支持。一般在我们写入RabbitMQ之前,RabbitMQ的对列已经创建好,有些会指定durable为true。这个时候就有问题了,因为RabbitMQ队列durable属性不同,就会一直去连接,直到RabbitMQ挂掉或程序关掉。
发现问题了么????印象中的问题是:RabbitMQ Web UI,会发现自己创建RabbitMQ和系统创建RabbitMQ的队列属性不一致 ,就是序列化的问题

channel.queueDeclare(queueName, false, false, false, null);
channel.queueDeclare(queueName, true, false, false, null);
//重写setupQueue方法
public class RichRMQSink extends RMQSink<String> {
    public RichRMQSink(RMQConnectionConfig rmqConnectionConfig, String queueName, SerializationSchema<String> schema) {
        super(rmqConnectionConfig, queueName, schema);
    }

    /**
     * 此方法必须重写,如果RabbitMQ的queue的durable属性设置为true,则会导致RabbitMQ会一直connection,导致RabbitMQ耗尽资源挂掉
     */
    @Override
    protected void setupQueue() {
        try{
            if (queueName != null) {
                channel.queueDeclare(queueName, true, false, false, null);
            }

        }catch (IOException e){
            System.out.println("io exception");
        }
    }
}

11、并行度+定时器+触发口诀:单流取最大,多流取最小。
12、


版权声明:本文为weixin_42642502原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。