Flink学习-实验3-ProcessFunction

1. Flink学习-Streaming介绍
2. Flink学习-设置开发环境
3. Flink学习-实验1-对流数据进行过滤
4. Flink学习-实验1-对流数据进行转换
5. Flink学习-实验1-keyby分组
6. Flink学习-实验1-有状态的转换
7. Flink学习-实验1-相互连接的流
8. Flink学习-实验2-连接实现流join
9. Flink学习-实验3-时间和水印
10. Flink学习-实验3-窗口
flink-quickstart
flink-training-exercises

ProcessFunction

 ProcessFunction将事件处理与定时器和状态相结合,使其成为流处理应用程序的强大构建模块。
 这是使用Flink创建事件驱动应用程序的基础。
 它与RichFlatMap非常相似,但增加了定时器。

 下面是出租车排序的例子:

1
2
events.keyBy((ConnectedCarEvent event) -> event.carId)
.process(new SortFunction())

 在此代码段中,我们看到一个名为SortFunction的ProcessFunction应用于分过组的流。
 这意味着我们将单独(通过时间戳)对每辆汽车的事件进行排序,而不是试图实现整个流的全局排序 - 这是无法并行完成的。
 SortFunction如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public static class SortFunction extends KeyedProcessFunction<String, ConnectedCarEvent, ConnectedCarEvent> {
/* we'll use a PriorityQueue to buffer not-yet-fully-sorted events */
private ValueState<PriorityQueue<ConnectedCarEvent>> queueState = null;

@Override
public void open(Configuration config) {
/* set up the state we want to use */
...
}

@Override
public void processElement(ConnectedCarEvent event, Context context, Collector<ConnectedCarEvent> out) throws Exception {
/* add/sort this event into the queue */
...

/* set an event-time timer for when the stream is complete up to the event-time of this event */
...
}

@Override
public void onTimer(long timestamp, OnTimerContext context, Collector<ConnectedCarEvent> out) throws Exception {
/* release the items at the head of the queue that are now ready, based on the CurrentWatermark */
...
}
}

需要注意的事项:

  1. 有几种类型的ProcessFunctions - 这是一个KeyedProcessFunction,但也有CoProcessFunctions,BroadcastProcessFunctions等。
  2. KeyedProcessFunction是一种RichFunction。 作为RichFunction,它可以访问使用被管理的key的状态所需要的open和getRuntimeContext方法。
  3. 要实现两个回调方法:processElement和onTimer。 每个传入事件都会调用processElement;定时器触发时会调用onTimer。 这些可以是事件时间或处理时间计时器。 两个回调都提供了一个上下文对象,可用于与TimerService(以及其他)进行交互。 两个回调都传递了一个可用于发出结果的收集器。

open()

1
2
3
4
5
6
7
@Override
public void open(Configuration config) {
ValueStateDescriptor<PriorityQueue<ConnectedCarEvent>> descriptor = new ValueStateDescriptor<>(
"sorted-events", TypeInformation.of(new TypeHint<PriorityQueue<ConnectedCarEvent>>() {})
);
queueState = getRuntimeContext().getState(descriptor);
}

processElement()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public void processElement(ConnectedCarEvent event, Context context, Collector<ConnectedCarEvent> out) throws Exception {
TimerService timerService = context.timerService();

if (context.timestamp() > timerService.currentWatermark()) {
PriorityQueue<ConnectedCarEvent> queue = queueState.value();
if (queue == null) {
queue = new PriorityQueue<>(10);
}
queue.add(event);
queueState.update(queue);
timerService.registerEventTimeTimer(event.timestamp);
}
}

注意以下两点:

  1. 延迟事件会发生什么? 水印背后的事件(即迟到的)正在被丢弃。 如果您想要做一些比这更好的事情,请考虑使用边缘输出(side output)。
  2. 我们为event.timestamp设置事件时间计时器。 这实际上是一种非常常见的模式。 以将此视为”当所有影响早期事件的无序性已得到解决时,请将我叫醒。”

onTimer()

 当时间到来时,所有那些可能影响早期事件的无序性不再是问题,我们可以释放队列中位于水印之前的所有事件。
 假设我们可以信任自己的水印,它们已经正确排序,准备好了,而且现在应该已经到达了。

 可能会有多个事件要释放出来,因为可能有多个事件具有相同的时间戳。
 Flink重复删除计时器 - 它只会为给定的时间戳和key创建一个计时器 - 因此我们无法保证计时器和事件之间的一对一关系。

1
2
3
4
5
6
7
8
9
10
11
@Override
public void onTimer(long timestamp, OnTimerContext context, Collector<ConnectedCarEvent> out) throws Exception {
PriorityQueue<ConnectedCarEvent> queue = queueState.value();
Long watermark = context.timerService().currentWatermark();
ConnectedCarEvent head = queue.peek();
while (head != null && head.timestamp <= watermark) {
out.collect(head);
queue.remove(head);
head = queue.peek();
}
}
邵志鹏 wechat
扫一扫上面的二维码关注我的公众号
0%