Flink学习-实验3-时间和水印

1. Flink学习-Streaming介绍
2. Flink学习-设置开发环境
3. Flink学习-实验1-对流数据进行过滤
4. Flink学习-实验1-对流数据进行转换
5. Flink学习-实验1-keyby分组
6. Flink学习-实验1-有状态的转换
7. Flink学习-实验1-相互连接的流
8. Flink学习-实验2-连接实现流join
flink-quickstart
flink-training-exercises

事件时间-处理时间-摄取时间

  1. 处理时间:是指执行相应Flink算子的机器的系统时间,如TaskManager所在机器的系统时间。

 当Flink流处理程序在处理时间上运行时,所有基于时间的操作(如时间窗口)将使用执行相应算子所在机器的系统时钟。
 在每小时的处理时间窗口中,将包括在系统时钟所显示的完整小时(不跨小时比如1:30~2:30)之内到达特定算子的所有数据记录。
 例如,如果Flink应用程序在上午9:15开始运行,则第一个每小时处理时间窗口将包括在上午9:15到10:00之间处理的事件,下一个窗口将包括在上午10:00到11:00之间处理的事件,等等以此类推。
 处理时间是最简单的时间概念,不需要流和机器之间进行协调。它提供了最好的性能和最低的延迟。
 但是,在分布式和异步环境中,处理时间不提供确定性,因为它容易受到记录到达系统的速度的影响,既包括比如到达消息系统的速度、也包括到达flink soruce和算子以及算子之间流动的速度、还有可能遇到中断等。

 实际上:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public TimeWindow(long start, long end) {
this.start = start;
this.end = end;
}

/**
* Method to get the window start for a timestamp.
*
* @param timestamp epoch millisecond to get the window start.
* @param offset The offset which window start would be shifted by.
* @param windowSize The size of the generated windows.
* @return window start
*/
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
return timestamp - (timestamp - offset + windowSize) % windowSize;
}

ProcessingTimeSessionWindows

1
2
3
4
5
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
long currentProcessingTime = context.getCurrentProcessingTime();
return Collections.singletonList(new TimeWindow(currentProcessingTime, currentProcessingTime + sessionTimeout));
}

TumblingProcessingTimeWindows

1
2
3
4
5
6
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
final long now = context.getCurrentProcessingTime();
long start = TimeWindow.getWindowStartWithOffset(now, offset, size);
return Collections.singletonList(new TimeWindow(start, start + size));
}

SlidingProcessingTimeWindows

1
2
3
4
5
6
7
8
9
10
11
12
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
timestamp = context.getCurrentProcessingTime();
List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
for (long start = lastStart;
start > timestamp - size;
start -= slide) {
windows.add(new TimeWindow(start, start + size));
}
return windows;
}
  1. 事件时间

 事件时间是每个单独事件在其生产设备上发生的时间,如订单的创建时间。
 事件时间通常在进入Flink之前是数据记录的一部分,如创建时间字段,并且可以从每个记录中提取该事件时间戳(创建时间)。
 基于事件时间,时间的向前演进取决于数据,而不是任何时钟【然而时间窗口的时间仍然是自然时间-即系统时钟】。
 如果使用事件时间,必须指定如何生成事件时间水印,这是用来表示事件时间进度的机制。
 在一个完美的世界中【注意:这个世界显然并不完美..so】,事件时间处理将产生完全一致和确定的结果,无论事件何时到达或无论其以什么顺序到达。
 但是,除非事件已知按顺序到达(按时间戳),否则事件时间处理会在等待无序事件时产生一些延迟。
 由于只能等待一段有限的时间,因此限制了确定性事件时间应用程序的运行方式。
 假设所有数据都已到达,事件时间算子将按预期运行,即使在处理无序或延迟事件或重新处理历史数据时也会产生正确且一致的结果。
 例如,每小时事件时间窗口将包含带有落入该小时的事件时间戳的所有记录,无论它们到达的顺序如何,或者何时处理它们。

 请注意,有时当事件时间程序实时处理实时数据时,它们将使用一些处理时间操作,以保证它们及时进行【如果数据源产生数据较慢,最后一个或几个窗口是不能及时触发的。因为窗口触发依赖事件时间,而事件时间依赖数据,如果没有数据实时到达,就一定会有最后一个或几个窗口不能及时触发】。

  1. 摄取时间

 摄取时间是事件进入Flink的时间。在进入Flink Source算子后,每个记录将Source的当前时间作为时间戳,并且基于时间的操作(如时间窗口)引用该时间戳。
 摄取时间在概念上位于事件时间和处理时间之间。
 与处理时间相比,它稍贵一些,但可以提供更可预测的结果。
 因为摄取时间使用稳定的时间戳(在Source处分配一次),所以对记录的不同窗口操作将引用相同的时间戳,而在处理时间中,每个窗口算子可以将记录分配给不同的窗口(基于本地系统时钟和任何延迟传输)。
 与事件时间相比,基于摄取时间的Flink程序无法处理任何无序事件或延迟到达的数据,但程序不必指定如何生成水印。
 在Flink内部,摄取时间与事件时间非常相似,但具有自动时间戳分配和自动水印生成功能。

image

设置时间特性

 Flink程序中通常首先要设置时间特性,定义好数据流Source的行为方式(例如,它们是否将分配时间戳),以及像KeyedStream.timeWindow(Time.seconds(30))这样的窗口操作应该使用什么时间概念。

 以下示例展示了一个Flink程序,该程序在每小时时间窗口中聚合事件。窗口的行为和时间特性相匹配。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

// alternatively:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<MyEvent> stream = env.addSource(new FlinkKafkaConsumer09<MyEvent>(topic, schema, props));

stream
.keyBy( (event) -> event.getUser() )
.timeWindow(Time.hours(1))
.reduce( (a, b) -> a.add(b) )
.addSink(...);

事件时间和水印

 支持事件时间的流处理需要一种方法来衡量事件时间的演进(进度)。例如,当事件时间超过一小时结束时,需要通知构建每小时窗口的窗口算子,以便算子可以关闭正在进行的窗口。
 事件时间可以独立于处理时间(系统时钟的自然时间)向前演进。 例如,在一个程序中,算子的当前事件时间可能略微落后于处理时间(考虑到接收事件的延迟),而两者都以相同的速度向前演进。
 另一方面,通过快速转发已经缓存在Kafka主题中的一些历史数据,另一个流处理程序可能会在几周的事件时间内进行演进,但只需几秒钟的处理时间(花费几秒钟处理时间处理缓存了很久事件时间的历史数据)。
 Flink中用于衡量事件时间进度的机制是水印。
 水印作为数据流的一部分流动并带有时间戳t。水印WaterMark(t)表明事件时间已到达该流中的时间为t,这意味着不应该有来自流的具有时间戳t’<=t的元素。
 下图显示了具有(逻辑)时间戳的事件流,以及内部流动的水印。在该示例中,事件是有序的(关于它们的时间戳),意味着水印仅是流中的周期性标记。

image

 水印对于无序流是至关重要的,如下所示,其中事件不按时间戳排序。通常,水印是在流中声明一个时间点,到达某个时间戳的所有事件都应该到达。 一旦水印到达算子,算子就可以将其内部事件时钟上涨到水印的值。

image

并行流中的水印

 在Source函数或之后生成水印。
 Source函数每个并行子任务通常独立地生成其水印。
 这些水印定义了该特定并行Source的事件时间。

 当水印通过流处理程序时,它们会在他们到达的算子处推进事件时间。
 每当算子向前推进其事件时间时,它就为其后继算子生成下游的新水印。
 一些算子会同时处理多个输入流,例如,一个union,或者跟在keyBy(…)后面或partition(…)函数的算子。
 这样的算子当前事件时间是其输入流的事件时间的最小值。
 由于其输入流更新其事件时间,因此算子也是如此。
 不是很好理解,需要消化一下。

迟到的数据

 某些数据可能不符合水印条件,这意味着即使在水印WaterMark(t)发生之后,也会出现更多具有时间戳t’<=t的数据。 实际上,在现实世界中,某些数据可以被任意延迟,从而无法为所有已经发生过的有了确定时间戳的数据指定某个时间(水印)。 此外,即使可以跳过一些延迟的数据,也不希望出现太多延迟水印的情况,因为它在事件时间窗口的评估中造成了太多延迟(就是不准确了)。
 出于这个原因,流处理程序需要显示地预期会有一些迟到的数据。
 迟到的数据是在系统的事件时钟之后到达的数据(由水印发出信号)已经超过了延迟数据的时间戳时间。
 如:水印时间是t,数据时间戳是t’ t’<=t,数据到达的时间t’’ > t 意思就是水印的时间戳在时钟时间到了之后,又进来的数据事件时间是属于刚刚结束了的水印范围里的。

空转数据

 目前,对于纯事件时间水印生成器,如果没有要处理的数据,则水印就不能向前推进了。 这意味着在输入数据存在间隙的情况下,事件时间将不会往前推进,如此以来,窗口算子将不会被触发,现有窗口将不能产生任何输出数据【这么明显的痛点?还是数据规模不适合实时处理】。

邵志鹏 wechat
扫一扫上面的二维码关注我的公众号
0%