Flink学习-实验1-相互连接的流

1. Flink学习-Streaming介绍
2. Flink学习-设置开发环境
3. Flink学习-实验1-对流数据进行过滤
4. Flink学习-实验1-对流数据进行转换
5. Flink学习-实验1-keyby分组
6. Flink学习-实验1-有状态的转换
flink-quickstart
flink-training-exercises

多个数据流

有时候不是简单的对单一的输入流做预处理转换,而是使用额外的动态的数据流通过阈值、规则、参数对常规的数据流进行动态的转换,在Flink里面,比如一个单一的算子有两个输入流。
连接流用于实现流join。

image

image

举个栗子

RichCoFlatMapFunction

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package org.apache.flink.streaming.api.functions.co;

import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.RichFunction;

/**
* A RichCoFlatMapFunction represents a FlatMap transformation with two different input
* types. In addition to that the user can use the features provided by the
* {@link RichFunction} interface.
*
* @param <IN1>
* Type of the first input.
* @param <IN2>
* Type of the second input.
* @param <OUT>
* Output type.
*/
@Public
public abstract class RichCoFlatMapFunction<IN1, IN2, OUT> extends AbstractRichFunction implements
CoFlatMapFunction<IN1, IN2, OUT> {

private static final long serialVersionUID = 1L;
}

example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// control流用于指定必须从streamOfWords流中过滤掉的单词
DataStream<String> control = env.fromElements("DROP", "IGNORE").keyBy(x -> x);
// data和artisans不在control流中,状态是不会被记录为true的(flatMap1),即为null,所以streamOfWords在调用flatMap2时会被out输出
DataStream<String> streamOfWords = env.fromElements("data", "DROP", "artisans", "IGNORE").keyBy(x -> x);

// 两个流要想被连接在一块,要么两个流都是未分组的,要么都是分组的即keyed-都做了keyby操作;如果都做了keyby,「key的值必须是相同的」
control.connect(streamOfWords)
.flatMap(new ControlFunction())
.print();

/**
* 1> data
* 4> artisans
*/
env.execute("ConnectedStreamsExerise Job");
}

ControlFunction

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static class ControlFunction extends RichCoFlatMapFunction<String, String, String> {
// key的状态用Boolean值来保存,是被两个流共享的
// Boolean的blocked用于记住单词是否在control流中,而且这些单词会从streamOfWords流中被过滤掉
private ValueState<Boolean> blocked;

@Override
public void open(Configuration config) {
blocked = getRuntimeContext().getState(new ValueStateDescriptor<>("blocked", Boolean.class));
}

// control.connect(streamOfWords)顺序决定了control流中的元素会被Flink运行时执行flatMap1时传入处理;streamOfWords流中的元素会被Flink运行时执行flatMap2时传入处理
@Override
public void flatMap1(String control_value, Collector<String> out) throws Exception {
blocked.update(Boolean.TRUE);
}

@Override
public void flatMap2(String data_value, Collector<String> out) throws Exception {
if (blocked.value() == null) {
out.collect(data_value);
}
}
}

重要的是要认识到我们无法控制flatMap1和flatMap2回调的调用顺序(先后)。
这两个输入流相互竞争,Flink运行时将对它想要的来自一个流或另一个流的事件进行消费处理。
在时序或排序很重要的情况下,您可能会发现有必要在管理的Flink状态中缓冲事件,直到您的应用程序准备好处理它们为止。

邵志鹏 wechat
扫一扫上面的二维码关注我的公众号
0%