Flink学习-实验3-窗口

1. Flink学习-Streaming介绍
2. Flink学习-设置开发环境
3. Flink学习-实验1-对流数据进行过滤
4. Flink学习-实验1-对流数据进行转换
5. Flink学习-实验1-keyby分组
6. Flink学习-实验1-有状态的转换
7. Flink学习-实验1-相互连接的流
8. Flink学习-实验2-连接实现流join
9. Flink学习-实验3-时间和水印
flink-quickstart
flink-training-exercises

Flink具有非常富表现力的窗口语义。
窗口是如何对无界数据流进行聚合计算的?
Flink支持哪些类型的窗口?
如何使用窗口聚合实现DataStream程序?

简介

例如使用流处理来计算以下问题。

  1. 每分钟页面的查看次数。
  2. 每位用户每周的会话数量。
  3. 每分钟传感器的最高温度。

 使用Flink计算窗口化的分析取决于两个主要抽象:为窗口分配事件的窗口分配器-Window Assigners(根据需要创建新的窗口对象),以及用于分配给窗口事件的窗口函数-Window Functions。
 Flink的窗口API还包含触发器-Triggers的概念,它决定何时调用窗口函数,以及Evictors,它可以删除窗口中收集的元素。

 窗口的基本形式中,我们可以将窗口应用于分组的流中,如下所示:

1
2
3
4
stream.
.keyBy(<key selector>)
.window(<window assigner>)
.reduce|aggregate|process(<window function>)

 我们还可以使用非分组的流窗口,但需要注意的是,在这种情况下,流处理不会并行完成:

1
2
3
stream.
.windowAll(<window assigner>)
.reduce|aggregate|process(<window function>)

Window Assigners

 窗口分配器。
Flink有几种内置类型的窗口分配器,如下图所示:

  1. 滚动时间窗口
  2. 滑动时间窗口
  3. 滚动计数窗口
  4. 滑动计数窗口
  5. 会话窗口
  6. 全局窗口

image

可以使用这些窗口分配器的一些示例,以及如何指定窗口分配器:

  1. 滚动时间窗口
    每分钟页面浏览量
    TumblingEventTimeWindows.of(Time.minutes(1))

  2. 滑动时间窗口
    每10秒计算一次1分钟内页面浏览量
    SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(10))

  3. 会话窗口
    每个会话的页面查看,其中会话由会话之间至少30分钟的间隔定义
    EventTimeSessionWindows.withGap(Time.minutes(30))

 可以使用Time.milliseconds(n),Time.seconds(n),Time.minutes(n),Time.hours(n)和Time.days(n)之一指定时间长度。
 基于时间的窗口分配器(包括会话窗口)具有事件时间和处理时间的风格。
 这两种时间窗口之间存在显著的权衡。

使用处理时间窗口,您必须接受以下限制:

  1. 无法处理历史数据。
  2. 无法正确处理无序数据。
  3. 结果将是不确定的。
    但具有较低延迟的优点。

 使用基于计数的窗口时,需要注意,在批处理完成之前,这些窗口不会触发。
 虽然我们可以使用自定义触发器自行实现该行为,但是没有选项可以超时并处理部分窗口。

 全局窗口分配器将每个事件(使用相同的key)分配给同一个全局窗口。
 这仅在您使用自定义触发器进行自定义窗口时才有用。
 在大多数情况下,这可能看起来很有用,最好使用ProcessFunction。

Window Functions

有三种基本方式可用于处理窗口内容:

  1. 作为批处理,使用ProcessWindowFunction,迭代式的将窗口内容传递下去;
  2. 使用ReduceFunction或AggregateFunction,在每个事件分配给窗口时递增调用;
  3. 或者上面两者的组合,其中当窗口被触发时,将ReduceFunction或AggregateFunction的预聚合结果提供给ProcessWindowFunction。

 以下是方法1和3的示例。在每种情况下,我们在1分钟事件时间窗口中找到每个传感器的峰值,并生成包含(key,窗口结束时间戳,max_value)的元组流。

ProcessWindowFunction

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
DataStream<SensorReading> input = ...

input
.keyBy(“key”)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.process(new MyWastefulMax());

public static class MyWastefulMax extends ProcessWindowFunction<
SensorReading, // input type
Tuple3<String, Long, Integer>, // output type
Tuple, // key type
TimeWindow> { // window type

@Override
public void process(
Tuple key,
Context context,
Iterable<SensorReading> events,
Collector<Tuple3<String, Long, Integer>> out) {
int max = 0;
for (SensorReading event : events) {
if (event.value > max) max = event.value;
}
// note the rather hideous cast
out.collect(new Tuple3<>((Tuple1<String>)key).f0, context.window().getEnd(), max));
}
}

这个实现中需要注意的几点:

  1. key选择器被指定编码为String的字段名称。
    这使得编译器无法知道我们的key是字符串,因此我们必须在ProcessWindowFunction中使用的key类型是元组。
    意思就是Tuple key,这里不能直接写成String key。最后一行(Tuple1)key这里做了强制转换。
  2. 分配给窗口的所有事件都必须在分组后的Flink状态中进行缓冲,直到窗口被触发。这代价可能非常昂贵。
  3. 我们的ProcessWindowFunction会传递一个Context对象,我们可以从中获取有关窗口的信息。 具体如下所示:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    public abstract class Context implements java.io.Serializable {
    public abstract W window();

    public abstract long currentProcessingTime();
    public abstract long currentWatermark();

    public abstract KeyedStateStore windowState();
    public abstract KeyedStateStore globalState();
    }

 windowState和globalState可以存储每个key,每个窗口或全局每个key的信息。
 这可能很有用,例如,如果要记录当前窗口的某些内容并在处理后续窗口时使用它。

增量聚合实例

计算每分钟内传感器的峰值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
DataStream<SensorReading> input = ...

input
.keyBy(x -> x.key)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.reduce(new MyReducingMax(), new MyWindowFunction());

private static class MyReducingMax implements ReduceFunction<SensorReading> {
public SensorReading reduce(SensorReading r1, SensorReading r2) {
return r1.value() > r2.value() ? r1 : r2;
}
}

private static class MyWindowFunction extends ProcessWindowFunction<
SensorReading, Tuple3<String, Long, SensorReading>, String, TimeWindow> {

@Override
public void process(
String key,
Context context,
Iterable<SensorReading> maxReading,
Collector<Tuple3<String, Long, SensorReading>> out) {

SensorReading max = maxReading.iterator().next();
out.collect(new Tuple3<String, Long, SensorReading>(key, context.window().getEnd(), max));
}
}

 通过上面的实现,我们选择使用更强大的KeySelector。
 另请注意,Iterable将只包含一个读数 - 由MyReducingMax计算的预聚合最大值。

延迟事件

 默认情况下,使用事件时间窗口时,会删除延迟事件。
 窗口API有两个可选部分,可以让您更好地控制它。

 我们可以使用名为Side Outputs的机制将要删除的事件转移到备用输出流。 以下是可能的示例:

1
2
3
4
5
6
7
8
OutputTag<Event> lateTag = new OutputTag<Event>("late"){};

SingleOutputStreamOperator<Event> result = stream.
.keyBy(...)
.window(...)
.process(...);

DataStream<Event> lateStream = result.getSideOutput(lateTag);

 我们还可以指定允许延迟的间隔,在此期间将继续将延迟事件分配给适当的窗口(其状态将被保留)。 默认情况下,每个延迟事件都会导致窗口函数延迟触发。
 默认情况下,允许的延迟为0。换句话说,水印后面的元素被丢弃(或发送到边缘(侧面Side Outputs)输出)。

1
2
3
4
5
stream.
.keyBy(...)
.window(...)
.allowedLateness(Time.seconds(10))
.process(...);

不完美的窗口

 Flink的窗口API的某些方面可能不会以您期望的方式运行。
 根据有关Stack Overflow和flink-user邮件列表的常见问题,以下是一些可能让您感到惊讶的窗口事实。

  1. 滑动窗口导致重复拷贝
    滑动窗口分配器可以创建许多窗口对象,并将每个事件复制到每个相关窗口中。
    例如,如果您每15分钟有一个24小时的滑动窗口,则每个事件将被复制到(24 60) / 15 = 4 24 = 96个窗口中。
  2. 时间窗口与时钟对齐【小时】
    仅仅因为我们正在使用长达一小时的处理时间窗口并开始在12:05运行我们的应用程序并不意味着第一个窗口将在1:05关闭。
    实际上是第一个窗口长55分钟,凌晨1点关闭。
  3. 窗口后面又有窗口
    例如:
    1
    2
    3
    4
    5
    6
    stream
    .keyBy(t -> t.key)
    .timeWindow(<time specification>)
    .reduce(<reduce function>)
    .timeWindowAll(<same time specification>)
    .reduce(<same reduce function>)

  我们可能希望Flink的运行时足够聪明,可以为我们执行上面的并行预聚合(假设我们使用的是ReduceFunction或AggregateFunction),但事实并非如此。

  1. 空的时间窗口没有结果
    只有在为其分配事件时才会创建Windows。
    因此,如果在给定的时间范围内没有事件,则不会输出任何结果,而且如果事件不够及时连续,最后一个或多个窗口永远不会被及时触发。
  2. 延迟事件会导致延迟合并
    会话窗口基于可以合并的窗口的抽象。
    每个元素最初都分配给一个新窗口,之后只要它们之间的间隙足够小,就会合并窗口。
    通过这种方式,延迟事件可以弥合分隔两个先前单独的会话的间隙,从而产生延迟合并。
  3. Evictors与增量聚合不兼容
    根据定义,这是事实 - 我们不能驱逐没有存储的元素。
    但这意味着依赖于使用Evictors的设计正在采用一种反面模式。

学习实例

1
2
com.dataartisans.flinktraining.exercises.datastream_java.windows.HourlyTipsExercise
com.dataartisans.flinktraining.solutions.datastream_java.windows.HourlyTipsSolution
邵志鹏 wechat
扫一扫上面的二维码关注我的公众号
0%