Code-Cookbook

博客

  • Blogs
    • 1. [Springboot x spark]java.util.concurrent.ExecutionException: Boxed Error
    • 2. [Apollo]Apollo Config Center
    • 3. [Confluent]Confluent快速上手
    • 4. [Flink]CommdLine+Springboot+flink无法指定配置文件启动
    • 5. [Flink]Flink-connector-http
    • 6. [Flink]FlinkKafkaProducer启用压缩
    • 7. [Flink]FlinkSQL时间处理函数
    • 8. [Flink]Flink Sources
    • 9. [Flink]Flink中自定义watermark生成器
      • 9.1. watermark是什么
      • 9.2. 一个问题
      • 9.3. WatermarkStrategy
      • 9.4. 解决窗口不计算问题
      • 9.5. 自定义WatermarkGenerator
        • 9.5.1. 自定义
        • 9.5.2. 如何使用
      • 9.6. 处理空闲Source
        • 9.6.1. 一探究竟
      • 9.7. 注意
      • 9.8. 总结
    • 10. [Flink]Flink的并行度与TaskSlot
    • 11. [Flink]ProcessFunction无法使用,抛出InvalidProgramException
    • 12. [Flink]使用状态算子将stream聚合输出
    • 13. [Flink]关于Flink的Checkpoint的一次问题排查
    • 14. [Flink]如何更通用地将Kafka(或其他)数据落地Hive?
    • 15. [Flink]监控Flink Metrics
    • 16. [Flink]自定义序列化消费Kafka数据
    • 17. [Flink源码]Flink任务是如何启动的
    • 18. [Flink源码]YarnApplication模式的任务启动
    • 19. [Flink源码]流式工厂模式与配置的延迟绑定
    • 20. [Git]Git问题
    • 21. [Git]误在Master分支开发并commit无法push
    • 22. [Hadoop]Hadoop distcp
    • 23. [Hadoop]一些Hadoop问题
    • 24. [Hive]Hive分区表批量删除分区
    • 25. [Hive]Hive的Analyze函数,Statistics in Hive
    • 26. [Hive]修改存储格式为Parquet的Hive表的字段类型
    • 27. [Hive]在指定位置添加字段
    • 28. [Hive]外部表修改为内部表
    • 29. [Hive]更新Metastore中的LastAccessTime
    • 30. [Hive]本地连接需要Kerberos认证的Hive
    • 31. [Java]如何根据需要动态生成Java的class
    • 32. [Java]元注解
    • 33. 注解解析
    • 34. [Java]集合
    • 35. 简单(常用)数据结构
    • 36. [Java]Java8 Stream API
    • 37. [Java]SPI和责任链模式
    • 38. [Java]Socket
    • 39. [Java]使用Java在服务端和客户端之间传送文件
    • 40. [Java]三种策略模式应用于服务的启动
    • 41. [Java]多线程
    • 42. [Java]生产者消费者模型问题
    • 43. [Java]让项目顺利读取resources目录下的文件
    • 44. [Java]设计模式
    • 45. Continuing…
    • 46. [Java]设计模式六大原则
    • 47. [Java]面向对象知识点梳理
    • 48. [Java]OOP防脱发指南
    • 49. [Kerberos]Message stream modified (41)错误
    • 50. [Kudu]关于Kudu Upsert列的问题
    • 51. [Kudu]关于Kudu列的顺序的修改
    • 52. [MongoDB]MongoDB基本查询
    • 53. [Pyspark]PySpark
    • 54. [PySpark]PySpark On Yarn
    • 55. [Spark]CDP上安装其他版本SPARK(SPARK3)
    • 56. [SQL]Druid SQL解析器
    • 57. [SQL]IN/NOT IN/EXISTS/NOT EXISTS的替代写法
    • 58. [SQL]IN OR NOT IN , IS A PROBLEM
    • 59. [SQL]SQLLineage解析SQL血缘
    • 60. [SQL]业务数据库中的create_time和update_time分析时的问题
    • 61. [SQL]为什么LEFT JOIN后总数却与右表的总数一样了?
    • 62. [SQL]求用户任意天连续登录(每天为第多少天连续登录)
    • 63. [SQL]计算指定日期的年-周(为某年的第多少周)
    • 64. [Scala]函数中闭包(Closure)和柯里化(Currying)
    • 65. [Shell]EOF
    • 66. [Shell] Zip命令
    • 67. [Shell]Shell脚本日期递增(起止日期内递增)
    • 68. [Shell]将字符串转换为数字进行大小比较
    • 69. [Shell]打印本机IP
    • 70. [SparkStreaming]消费kafka写入Hive失败的问题Lease timeout of 0 seconds expired
    • 71. [Spark]SparkSQL 列转行的一种方法
    • 72. [Spark]SparkSQL JDBC并发连接读取
    • 73. [Spark]Spark提交任务RSA premaster secret error
    • 74. [Spark]Springboot整合Spark, 本地、集群部署
    • 75. [Spark]如何使用Java创建一个Row
    • 76. [Spark]将Spark DataFrame中的数值取出
    • 77. [Springboot]okHttp错误:Exception in thread “OkHttp Dispatcher” java.lang.IllegalStateException: closed
    • 78. [Vim]Vim查找和替换命令
    • 79. [debezium]在启动任务时传入SQL语句生成Snapshot
    • 80. [debezium]热修改Debezium MySQL Connector配置

Random ramblings

  • Random ramblings

大数据

  • Bigdata
  • Bigdata Tools

大数据辅助工具

  • Auxiliary tools

SQL相关

  • SQL
Code-Cookbook
  • Blogs
  • 9. [Flink]Flink中自定义watermark生成器
  • 查看页面源码

9. [Flink]Flink中自定义watermark生成器

9.1. watermark是什么

Flink是一个在有界和无界数据流上的状态计算引擎,大量引用在实时流处理。实时数据流中往往数据到达会存在多种问题,比如乱序,水位线机制的引入很好地解决了时间语义问题。

The mechanism in Flink to measure progress in event time is watermarks. Watermarks flow as part of the data stream and carry a timestamp t. A Watermark(t) declares that event time has reached time t in that stream, meaning that there should be no more elements from the stream with a timestamp t’ <= t (i.e. events with timestamps older or equal to the watermark).

Flink中测量事件时间进展的机制是水印。水印作为数据流的一部分,并携带时间戳t。水印(t)声明事件时间在该流中已经达到时间t,这意味着流中不应该再有时间戳t <= t的元素(即时间戳比水印更早或等于水印的事件)。

简而言之就是水位线被插入到数据流中,作为流的一部分,用作一个事件标记,表示时间推进到了某一时刻。

以下对水位线的实现和内部原理不做过多介绍,详细地可以去参考Flink官网。

9.2. 一个问题

一个包含事件时间watermark处理的实时流任务(Keyed Stream)一般代码结构如下:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<MyEvent> stream = env.readFile(
        myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
        FilePathFilter.createDefaultFilter(), typeInfo);

DataStream<MyEvent> withTimestampsAndWatermarks = stream
        .filter( event -> event.severity() == WARNING )
        .assignTimestampsAndWatermarks(<watermark strategy>);

withTimestampsAndWatermarks
        .keyBy( (event) -> event.getGroup() )
        .window(TumblingEventTimeWindows.of(Time.seconds(10)))
        .reduce( (a, b) -> a.add(b) )
        .addSink(...);

水位线被分配在数据流初始阶段,随后接着keyby算子将流划分不同的partition,随后是分配窗口,接着是一个计算算子reduce或者低阶的process函数。

当一个流中数据较少,如果在某一时刻,上游突然不在发送数据,那么总有一些数据不能触发窗口计算。因为根据watermark原理,水位线时间是不断向前推进的,只有新的数据到来,事件时间(如果是事件语义)超过了窗口的结束时间,这个窗口内的计算才会触发,窗口的数据才会被处理。

问题简化:

如果一个任务启动,上游仅仅发送一条数据,那么这一条数据的事件时间永远也不会触发窗口计算。

在解决这个问题之前,我们先看看WatermarkStrategy以及Flink框架式如果产生水印的。

9.3. WatermarkStrategy

为了处理事件时间,Flink需要知道事件时间戳,这意味着流中的每个元素都需要分配其事件时间戳。这通常是通过使用TimestampAssigner从元素中的某个字段访问/提取时间戳来实现的。

public interface WatermarkStrategy<T> 
    extends TimestampAssignerSupplier<T>,
            WatermarkGeneratorSupplier<T>{

    /**
     * Instantiates a {@link TimestampAssigner} for assigning timestamps according to this
     * strategy.
     */
    @Override
    TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);

    /**
     * Instantiates a WatermarkGenerator that generates watermarks according to this strategy.
     */
    @Override
    WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}

水印的生成包括了两个部分

1、一个是为数据流分配时间,如果是事件时间,就是从数据流中的元素中提取并分配

2、一个是水印生成器,用来生成和更新水印

我们来看一个具体的代码和背后是如果作用的

source.assignTimestampsAndWatermarks(WatermarkStrategy
                .<String>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                //自定义watermark,这里必须用lambada表达式,否则会报watermarkStrategy不能序列化
                //.<String>forGenerator((WatermarkGeneratorSupplier<String>) context -> new BoundedOutOfOrdernessWatermarkBasedOnEventTime(Duration.ofSeconds(5), Duration.ofSeconds(60), autoWatermarkInterval))
                //.forGenerator(new WatermarkGeneratorSupplier<String>() {
                //    @Override
                //    public WatermarkGenerator<String> createWatermarkGenerator(Context context) {
                //        return new BoundedOutOfOrdernessWatermarkBasedOnEventTime(Duration.ofSeconds(1), Duration.ofSeconds(100), autoWatermarkInterval);
                //    }
                //})
                //.<String>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                .withTimestampAssigner(new SerializableTimestampAssigner<String>() {
                    @Override
                    public long extractTimestamp(String element, long recordTimestamp) {
                        if (element != null) {
                            return JSONObject.parseObject(element).getLong("timestamp") * 1000;
                        }
                        return recordTimestamp;
                    }
                })
                .withIdleness(Duration.ofSeconds(60))
        );

1、从元素中解析timestamp得到时间作为事件时间,使用的是SerializableTimestampAssigner事件时间分配器,它继承了TimestampAssigner接口,

/** A {@link TimestampAssigner} that is also {@link java.io.Serializable}. */
@PublicEvolving
@FunctionalInterface
public interface SerializableTimestampAssigner<T> extends TimestampAssigner<T>, Serializable {}

用户需要自己实现extractTimestamp方法

@Public
@FunctionalInterface
public interface TimestampAssigner<T> {

    /**
     * The value that is passed to {@link #extractTimestamp} when there is no previous timestamp
     * attached to the record.
     */
    long NO_TIMESTAMP = Long.MIN_VALUE;

    /**
     * Assigns a timestamp to an element, in milliseconds since the Epoch. This is independent of
     * any particular time zone or calendar.
     *
     * <p>The method is passed the previously assigned timestamp of the element. That previous
     * timestamp may have been assigned from a previous assigner. If the element did not carry a
     * timestamp before, this value is {@link #NO_TIMESTAMP} (= {@code Long.MIN_VALUE}: {@value
     * Long#MIN_VALUE}).
     *
     * @param element The element that the timestamp will be assigned to.
     * @param recordTimestamp The current internal timestamp of the element, or a negative value, if
     *     no timestamp has been assigned yet.
     * @return The new timestamp.
     */
    long extractTimestamp(T element, long recordTimestamp);
}

2、使用Flink框架默认提供的BoundedOutOfOrdernessWatermarks,允许一定的乱序,我们来看看它的源码

/**
 * A WatermarkGenerator for situations where records are out of order, but you can place an upper
 * bound on how far the events are out of order. An out-of-order bound B means that once an event
 * with timestamp T was encountered, no events older than {@code T - B} will follow any more.
 *
 * <p>The watermarks are generated periodically. The delay introduced by this watermark strategy is
 * the periodic interval length, plus the out-of-orderness bound.
 */
@Public
public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {

    /** The maximum timestamp encountered so far. */
    private long maxTimestamp;

    /** The maximum out-of-orderness that this watermark generator assumes. */
    private final long outOfOrdernessMillis;

    /**
     * Creates a new watermark generator with the given out-of-orderness bound.
     *
     * @param maxOutOfOrderness The bound for the out-of-orderness of the event timestamps.
     */
    public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
        checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
        checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");

        this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();

        // start so that our lowest watermark would be Long.MIN_VALUE.
        this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
    }

    // ------------------------------------------------------------------------

    @Override
    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
        maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
    }
}

BoundedOutOfOrdernessWatermarks在被实例化的时候,有一个最大时间戳maxTimestamp被赋值为Long.MIN_VALUE + outOfOrdernessMillis + 1,这个也就是水位线的初始时间值。

同时BoundedOutOfOrdernessWatermarks实现了WatermarkGenerator接口

/**
 * The {@code WatermarkGenerator} generates watermarks either based on events or periodically (in a
 * fixed interval).
 *
 * <p><b>Note:</b> This WatermarkGenerator subsumes the previous distinction between the {@code
 * AssignerWithPunctuatedWatermarks} and the {@code AssignerWithPeriodicWatermarks}.
 */
@Public
public interface WatermarkGenerator<T> {

    /**
     * Called for every event, allows the watermark generator to examine and remember the event
     * timestamps, or to emit a watermark based on the event itself.
     */
    void onEvent(T event, long eventTimestamp, WatermarkOutput output);

    /**
     * Called periodically, and might emit a new watermark, or not.
     *
     * <p>The interval in which this method is called and Watermarks are generated depends on {@link
     * ExecutionConfig#getAutoWatermarkInterval()}.
     */
    void onPeriodicEmit(WatermarkOutput output);
}

里面包含两个方法,一个是onEvent,一个是onPeriodicEmit,onEvent在每次有数据流过的时候被调用,每个元素调用一次,onPeriodicEmit周期性地被触发,默认每隔200毫秒触发一次,这个默认值可以通过streamExecutionEnvironment.getConfig().getAutoWatermarkInterval()得到,可以通过streamExecutionEnvironment.getConfig().setAutoWatermarkInterval(1000L)修改。

有了这个生成器接口的逻辑,我们再回到BoundedOutOfOrdernessWatermarks,细看水印生成的细节。

outOfOrdernessMillis就是用户允许的数据乱序时间。

当程序第一次启动时,还没有数据流入的时候,onPeriodicEmit被调用,该方法作用是向数据流中emit(发射,生成)一个水印,水印值被赋值为maxTimestamp - outOfOrdernessMillis - 1,计算后也就是Long.MIN_VALUE

当有数据流过时,onEvent被调用,此时maxTimestamp被赋值为maxTimestamp和事件时间的最大值,maxTimestamp得到更新

后面onPeriodicEmit周期性被调用的时候,不断向流中发射水印,水印也就一直被生成和更新。

如果没有新的数据流入,那么水印就会一直保持相同值。

9.4. 解决窗口不计算问题

回到最开始的那个问题,如果数据流中只有一个元素,新的水印不能生成和更新,窗口也就不能触发计算。

那么,如果想要同样触发窗口计算,就需要让新的水印继续生成和更新。这就需要自定义水印生成器了。

9.5. 自定义WatermarkGenerator

为了解决上述问题,核心思路就是让没有新的数据到来的时候,水印也能继续更新和生成。

9.5.1. 自定义

下面这个类是一个自定义的水印生成器,允许一定时间的乱序,并且可以指定一定的等待时间,如果等待时间到达后仍然没有新的数据到来,就会更新水印。该代码改编自前文提到的BoundedOutOfOrdernessWatermarks。

大致逻辑是,每次有数据到来时,将当前数据对应的事件时间作为上一次的逻辑处理时间(currentLogicalEventTimeMills),每次onPeriodicEmit调用时,给上一次的逻辑处理时间加上固定时间间隔,该时间间隔也就是onPeriodicEmit的调用间隔时间,如果一直没有新的数据到来,那么currentLogicalEventTimeMills就会偏离上一次事件时间(lastEventTimestamp)越来越大,直到差值超过允许等待的时间,就触发更新水印。

/**
 * A watermark generator for generating watermarks. This class is modified from {@link BoundedOutOfOrdernessWatermarks}.
 */
public class BoundedOutOfOrdernessWatermarkBasedOnEventTime implements WatermarkGenerator<String> {
    /**
     * The maximum timestamp encountered so far.
     */
    private long maxTimestamp;

    /**
     * The maximum out-of-orderness that this watermark generator assumes.
     */
    private final long outOfOrdernessMillis;

    /**
     * Processing time of last event
     */
    private long lastEventTimestamp;

    /**
     * Time to emit watermark if no event comes for a long time.
     * The goal is to trigger the computation of the window even when no record is coming.
     */
    private final Duration waitTimeInMillsToEmitWatermark;

    /**
     * watermark generator interval
     */
    private final long autoWatermarkInterval;

    private long currentLogicalEventTimeMills;

    /**
     * Creates a new watermark generator with the given out-of-orderness bound.
     *
     * @param maxOutOfOrderness              The bound for the out-of-orderness of the event timestamps.
     * @param waitTimeInMillsToEmitWatermark A time allow flink to wait for in case of that no next element arrives for a long time.
     *                                       When the waiting time is up and no next element arrives, the watermark will still be generated and emitted
     * @param autoWatermarkInterval          watermark generator interval
     */
    public BoundedOutOfOrdernessWatermarkBasedOnEventTime(Duration maxOutOfOrderness, Duration waitTimeInMillsToEmitWatermark, long autoWatermarkInterval) {
        this.waitTimeInMillsToEmitWatermark = waitTimeInMillsToEmitWatermark;
        this.autoWatermarkInterval = autoWatermarkInterval;
        checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
        checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");

        this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();

        // start so that our lowest watermark would be Long.MIN_VALUE.
        this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
        this.lastEventTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
        this.currentLogicalEventTimeMills = Long.MIN_VALUE + outOfOrdernessMillis + 1;
    }

    public BoundedOutOfOrdernessWatermarkBasedOnEventTime(Duration maxOutOfOrderness, Duration waitTimeInMillsToEmitWatermark) {
        this(maxOutOfOrderness, waitTimeInMillsToEmitWatermark, 200L);
    }

    /**
     * Flink will call this method when events arrive for each record.
     *
     * @param event          element in stream
     * @param eventTimestamp the time an event happened
     * @param output         An output for watermarks. The output accepts watermarks and idleness (inactivity) status
     */
    @Override
    public void onEvent(String event, long eventTimestamp, WatermarkOutput output) {
        maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
        lastEventTimestamp = maxTimestamp;
        currentLogicalEventTimeMills = maxTimestamp;
    }

    /**
     * Flink will call this method once in a while, the interval is defined by {@code ExecutionConfig.setAutoWatermarkInterval(...)},
     * for example:
     * <pre>{@code env.getConfig().setAutoWatermarkInterval(400L);}
     * </pre>
     * <p>
     * If flink wait enough time, and still not get next record, so we need to emit a new watermark triggering a computation of last window.
     * If we don't do that, the computation of last window will never be triggered.
     * It is important when we want to get some status value from the last event.
     *
     * @param output An output for watermarks. The output accepts watermarks and idleness (inactivity) status
     */
    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        currentLogicalEventTimeMills = currentLogicalEventTimeMills + autoWatermarkInterval;
        if ((currentLogicalEventTimeMills - lastEventTimestamp) > waitTimeInMillsToEmitWatermark.toMillis()) {
            output.emitWatermark(new Watermark(currentLogicalEventTimeMills - outOfOrdernessMillis - 1));
            maxTimestamp = currentLogicalEventTimeMills;
        } else {
            output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
        }
    }
}

9.5.2. 如何使用

source.assignTimestampsAndWatermarks(WatermarkStrategy
                //自定义watermark,这里必须用lambada表达式,否则会报watermarkStrategy不能序列化
                .<String>forGenerator((WatermarkGeneratorSupplier<String>) context -> new BoundedOutOfOrdernessWatermarkBasedOnEventTime(Duration.ofSeconds(5), Duration.ofSeconds(60), autoWatermarkInterval))
                //.forGenerator(new WatermarkGeneratorSupplier<String>() {
                //    @Override
                //    public WatermarkGenerator<String> createWatermarkGenerator(Context context) {
                //        return new BoundedOutOfOrdernessWatermarkBasedOnEventTime(Duration.ofSeconds(1), Duration.ofSeconds(100), autoWatermarkInterval);
                //    }
                //})
                //.<String>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                .withTimestampAssigner(new SerializableTimestampAssigner<String>() {
                    @Override
                    public long extractTimestamp(String element, long recordTimestamp) {
                        if (element != null) {
                            return JSONObject.parseObject(element).getLong("timestamp") * 1000;
                        }
                        return recordTimestamp;
                    }
                })
        );

9.6. 处理空闲Source

一个实际问题,如果Flink消费kafka的数据,kafka的topic中包含三个partition,flink创建3个consumer线程去消费,由于存在数据倾斜问题,导致topic的3个partition中,存在至少一个partition长时间没有新的数据,就会导致一个consumer thread一直处于空闲状态,这就称为空闲输入源,该consumer thread对应的watermark也就会一直停滞不前。

由于水印对齐机制,flink会从并行的task中取最小的watermark作为实际的watermark,这就会导致计算出现问题,比如窗口无法触发计算。

为了解决这个问题,flink提供了一个方法来检测空闲输入源,允许等待一定的时间,如果时间到达后,仍然没有新数据到来,就会忽略该task的任务。

使用方法:

WatermarkStrategy
        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
        .withIdleness(Duration.ofMinutes(1));

9.6.1. 一探究竟

我们来看看这个withIdleness究竟做了什么,戳进去源码看一看。

withIdleness是WatermarkStrategy的一个方法,在内部又创建了一个WatermarkStrategyWithIdleness,并传入了WatermarkStrategy自己,

    /**
     * Creates a new enriched {@link WatermarkStrategy} that also does idleness detection in the
     * created {@link WatermarkGenerator}.
     *
     * <p>Add an idle timeout to the watermark strategy. If no records flow in a partition of a
     * stream for that amount of time, then that partition is considered "idle" and will not hold
     * back the progress of watermarks in downstream operators.
     *
     * <p>Idleness can be important if some partitions have little data and might not have events
     * during some periods. Without idleness, these streams can stall the overall event time
     * progress of the application.
     */
    default WatermarkStrategy<T> withIdleness(Duration idleTimeout) {
        checkNotNull(idleTimeout, "idleTimeout");
        checkArgument(
                !(idleTimeout.isZero() || idleTimeout.isNegative()),
                "idleTimeout must be greater than zero");
        return new WatermarkStrategyWithIdleness<>(this, idleTimeout);
    }

在WatermarkStrategyWithIdleness内部,时间分配器复用WatermarkStrategy的,水印生成器也复用了WatermarkStrategy的,不同的是在createWatermarkGenerator内部,创建了WatermarksWithIdleness

/** A {@link WatermarkStrategy} that adds idleness detection on top of the wrapped strategy. */
final class WatermarkStrategyWithIdleness<T> implements WatermarkStrategy<T> {

    private static final long serialVersionUID = 1L;

    private final WatermarkStrategy<T> baseStrategy;
    private final Duration idlenessTimeout;

    WatermarkStrategyWithIdleness(WatermarkStrategy<T> baseStrategy, Duration idlenessTimeout) {
        this.baseStrategy = baseStrategy;
        this.idlenessTimeout = idlenessTimeout;
    }

    @Override
    public TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
        return baseStrategy.createTimestampAssigner(context);
    }

    @Override
    public WatermarkGenerator<T> createWatermarkGenerator(
            WatermarkGeneratorSupplier.Context context) {
        return new WatermarksWithIdleness<>(
                baseStrategy.createWatermarkGenerator(context), idlenessTimeout);
    }
}

点进WatermarksWithIdleness去看看,发现它继承了WatermarkGenerator,说明它本质也是一个水印生成器,

1、onEvent调用了前面的WatermarkStrategy的,并且启动了一个空闲计时器idlenessTimer

2、onPeriodicEmit部分大有不同,如果检测到空闲了,就将该output标记为空闲源,下游计算就不会等待该output,否则,继续周期性地生成新的watermark

@Public
public class WatermarksWithIdleness<T> implements WatermarkGenerator<T> {

    private final WatermarkGenerator<T> watermarks;

    private final IdlenessTimer idlenessTimer;

    /**
     * Creates a new WatermarksWithIdleness generator to the given generator idleness detection with
     * the given timeout.
     *
     * @param watermarks The original watermark generator.
     * @param idleTimeout The timeout for the idleness detection.
     */
    public WatermarksWithIdleness(WatermarkGenerator<T> watermarks, Duration idleTimeout) {
        this(watermarks, idleTimeout, SystemClock.getInstance());
    }

    @VisibleForTesting
    WatermarksWithIdleness(WatermarkGenerator<T> watermarks, Duration idleTimeout, Clock clock) {
        checkNotNull(idleTimeout, "idleTimeout");
        checkArgument(
                !(idleTimeout.isZero() || idleTimeout.isNegative()),
                "idleTimeout must be greater than zero");
        this.watermarks = checkNotNull(watermarks, "watermarks");
        this.idlenessTimer = new IdlenessTimer(clock, idleTimeout);
    }

    @Override
    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
        watermarks.onEvent(event, eventTimestamp, output);
        idlenessTimer.activity();
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        if (idlenessTimer.checkIfIdle()) {
            output.markIdle();
        } else {
            watermarks.onPeriodicEmit(output);
        }
    }
}

9.7. 注意

如果为水印策略指定了withIdleness(Duration idleTimeout) ,上面说到的解决办法就要注意了,如果确实存在了空闲源,那么onPeriodicEmit的实际调用次数就是idleTimeout与autoWatermarkInterval的比值,

因为:

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        if (idlenessTimer.checkIfIdle()) {
            output.markIdle();
        } else {
            watermarks.onPeriodicEmit(output);
        }
    }

onPeriodicEmit是每隔autoWatermarkInterval调用一次,当源空闲的时候,就不会再被周期调用。

在这个时候,上述自定义的watermark生成器的waitTimeInMillsToEmitWatermark参数就需要考究了,它不能大于实际的窗口时长。

9.8. 总结

1、自定义水位线生成器可以解决源端无后续输入导致的窗口不触发问题,该问题生产环境中一般很少遇到。

2、空闲源产生的时候,流中就不会再继续产生新的watermark,如果其他partition有新数据,他们的watermark会继续更新。但是下游在watermark对齐的时候,就会忽略空闲源的。

如果错误,欢迎指正!

上一页 下一页

© 版权所有 2020-2025, roohom。

利用 Sphinx 构建,使用的 主题 由 Read the Docs 开发.