Apache FlinkCEP实现超时状态监控

CEP - Complex Event Processing复杂事件处理。
订单下单后超过多久还未进行支付确认。
打车订单生成后超过多长时间没有确认上车。
外卖超过预定送达时间多久还没有确认送达。

Apache FlinkCEP API
CEPTimeoutEventJob

FlinkCEP源码简析

DataStream和PatternStream

 DataStream 一般由相同类型事件或元素组成,一个DataStream可以通过一系列的转换操作如Filter、Map等转换为另一个DataStream。
 PatternStream 是对CEP模式匹配的流的抽象,把DataStream和Pattern组合在一块,然后对外提供select和flatSelect等方法。PatternStream并不是DataStream,它提供方法把匹配的模式序列和与其相关联的事件组成的映射(就是Map<模式名称,List<事件>>)发出去,发到SingleOutputStreamOperator里面,SingleOutputStreamOperator是DataStream。
 CEPOperatorUtils工具类里的方法和变量使用了「PatternStream」来命名,比如:

1
2
3
4
public static <IN, OUT> SingleOutputStreamOperator<OUT> createPatternStream(...){...}
public static <IN, OUT1, OUT2> SingleOutputStreamOperator<OUT1> createTimeoutPatternStream(...){...}

final SingleOutputStreamOperator<OUT> patternStream;

 SingleOutputStreamOperator

1
2
@Public
public class SingleOutputStreamOperator<T> extends DataStream<T> {...}

 PatternStream的构造方法:

1
2
3
4
5
6
7
8
9
10
11
PatternStream(final DataStream<T> inputStream, final Pattern<T, ?> pattern) {
this.inputStream = inputStream;
this.pattern = pattern;
this.comparator = null;
}

PatternStream(final DataStream<T> inputStream, final Pattern<T, ?> pattern, final EventComparator<T> comparator) {
this.inputStream = inputStream;
this.pattern = pattern;
this.comparator = comparator;
}

Pattern、Quantifier和EventComparator

 Pattern是模式定义的Base Class,Builder模式,定义好的模式会被NFACompiler用来生成NFA。
 如果想要自己实现类似next和followedBy这种方法,比如timeEnd,对Pattern进行扩展重写应该是可行的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class Pattern<T, F extends T> {

/** 模式名称 */
private final String name;

/** 前面一个模式 */
private final Pattern<T, ? extends T> previous;

/** 一个事件如果要被当前模式匹配到,必须满足的约束条件 */
private IterativeCondition<F> condition;

/** 时间窗口长度,在时间长度内进行模式匹配 */
private Time windowTime;

/** 模式量词,意思是一个模式匹配几个事件等 默认是匹配到一个 */
private Quantifier quantifier = Quantifier.one(ConsumingStrategy.STRICT);

/** 停止将事件收集到循环状态时,事件必须满足的条件 */
private IterativeCondition<F> untilCondition;

/**
* 适用于{@code times}模式,用来维护模式里事件可以连续发生的次数
*/
private Times times;

// 匹配到事件之后的跳过策略
private final AfterMatchSkipStrategy afterMatchSkipStrategy;

...
}

 Quantifier是用来描述具体模式行为的,主要有三大类:
 Single-单一匹配、Looping-循环匹配、Times-一定次数或者次数范围内都能匹配到。
 每一个模式Pattern可以是optional可选的(单一匹配或循环匹配),并可以设置ConsumingStrategy。
 循环和次数也有一个额外的内部ConsumingStrategy,用在模式中接收的事件之间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
public class Quantifier {
...
/**
* 5个属性,可以组合,但并非所有的组合都是有效的
*/
public enum QuantifierProperty {
SINGLE,
LOOPING,
TIMES,
OPTIONAL,
GREEDY
}

/**
* 描述在此模式中匹配哪些事件的策略
*/
public enum ConsumingStrategy {
STRICT,
SKIP_TILL_NEXT,
SKIP_TILL_ANY,

NOT_FOLLOW,
NOT_NEXT
}

/**
* 描述当前模式里事件可以连续发生的次数;举个例子,模式条件无非就是boolean,满足true条件的事件连续出现times次,或者一个次数范围,比如2~4次,2次,3次,4次都会被当前模式匹配出来,因此同一个事件会被重复匹配到
*/
public static class Times {
private final int from;
private final int to;

private Times(int from, int to) {
Preconditions.checkArgument(from > 0, "The from should be a positive number greater than 0.");
Preconditions.checkArgument(to >= from, "The to should be a number greater than or equal to from: " + from + ".");
this.from = from;
this.to = to;
}

public int getFrom() {
return from;
}

public int getTo() {
return to;
}

// 次数范围
public static Times of(int from, int to) {
return new Times(from, to);
}

// 指定具体次数
public static Times of(int times) {
return new Times(times, times);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Times times = (Times) o;
return from == times.from &&
to == times.to;
}

@Override
public int hashCode() {
return Objects.hash(from, to);
}
}
...
}

 EventComparator,自定义事件比较器,实现EventComparator接口。

1
2
3
public interface EventComparator<T> extends Comparator<T>, Serializable {
long serialVersionUID = 1L;
}

NFACompiler和NFA

 NFACompiler提供将Pattern编译成NFA或者NFAFactory的方法,使用NFAFactory可以创建多个NFA。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class NFACompiler {
...
/**
* NFAFactory 创建NFA的接口
*
* @param <T> Type of the input events which are processed by the NFA
*/
public interface NFAFactory<T> extends Serializable {
NFA<T> createNFA();
}

/**
* NFAFactory的具体实现NFAFactoryImpl
*
* <p>The implementation takes the input type serializer, the window time and the set of
* states and their transitions to be able to create an NFA from them.
*
* @param <T> Type of the input events which are processed by the NFA
*/
private static class NFAFactoryImpl<T> implements NFAFactory<T> {

private static final long serialVersionUID = 8939783698296714379L;

private final long windowTime;
private final Collection<State<T>> states;
private final boolean timeoutHandling;

private NFAFactoryImpl(
long windowTime,
Collection<State<T>> states,
boolean timeoutHandling) {

this.windowTime = windowTime;
this.states = states;
this.timeoutHandling = timeoutHandling;
}

@Override
public NFA<T> createNFA() {
// 一个NFA由状态集合、时间窗口的长度和是否处理超时组成
return new NFA<>(states, windowTime, timeoutHandling);
}
}
}

 NFA:Non-deterministic finite automaton - 非确定的有限(状态)自动机。
 更多内容参见https://zh.wikipedia.org/wiki/非确定有限状态自动机

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class NFA<T> {
/**
* NFACompiler返回的所有有效的NFA状态集合
* These are directly derived from the user-specified pattern.
*/
private final Map<String, State<T>> states;

/**
* Pattern.within(Time)指定的时间窗口长度
*/
private final long windowTime;

/**
* 一个超时匹配的标记
*/
private final boolean handleTimeout;
...
}

PatternSelectFunction和PatternFlatSelectFunction

 当一个包含被匹配到的事件的映射能够通过模式名称访问到的时候,PatternSelectFunction的select()方法会被调用。模式名称是由Pattern定义的时候指定的。select()方法恰好返回一个结果,如果需要返回多个结果,则可以实现PatternFlatSelectFunction。

1
2
3
4
5
6
7
public interface PatternSelectFunction<IN, OUT> extends Function, Serializable {

/**
* 从给到的事件映射中生成一个结果。这些事件使用他们关联的模式名称作为唯一标识
*/
OUT select(Map<String, List<IN>> pattern) throws Exception;
}

 PatternFlatSelectFunction,不是返回一个OUT,而是使用Collector把匹配到的事件收集起来。

1
2
3
4
5
6
7
public interface PatternFlatSelectFunction<IN, OUT> extends Function, Serializable {

/**
* 生成一个或多个结果
*/
void flatSelect(Map<String, List<IN>> pattern, Collector<OUT> out) throws Exception;
}

SelectTimeoutCepOperator、PatternTimeoutFunction

 SelectTimeoutCepOperator是在CEPOperatorUtils中调用createTimeoutPatternStream()方法时创建出来。
 SelectTimeoutCepOperator中会被算子迭代调用的方法是processMatchedSequences()和processTimedOutSequences()。
 模板方法…对应到抽象类AbstractKeyedCEPPatternOperator中processEvent()方法和advanceTime()方法。
 还有FlatSelectTimeoutCepOperator和对应的PatternFlatTimeoutFunction。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class SelectTimeoutCepOperator<IN, OUT1, OUT2, KEY>
extends AbstractKeyedCEPPatternOperator<IN, KEY, OUT1, SelectTimeoutCepOperator.SelectWrapper<IN, OUT1, OUT2>> {

private OutputTag<OUT2> timedOutOutputTag;

public SelectTimeoutCepOperator(
TypeSerializer<IN> inputSerializer,
boolean isProcessingTime,
NFACompiler.NFAFactory<IN> nfaFactory,
final EventComparator<IN> comparator,
AfterMatchSkipStrategy skipStrategy,
// 参数命名混淆了flat...包括SelectWrapper类中的成员命名...
PatternSelectFunction<IN, OUT1> flatSelectFunction,
PatternTimeoutFunction<IN, OUT2> flatTimeoutFunction,
OutputTag<OUT2> outputTag,
OutputTag<IN> lateDataOutputTag) {
super(
inputSerializer,
isProcessingTime,
nfaFactory,
comparator,
skipStrategy,
new SelectWrapper<>(flatSelectFunction, flatTimeoutFunction),
lateDataOutputTag);
this.timedOutOutputTag = outputTag;
}
...
}

1
2
3
4
public interface PatternTimeoutFunction<IN, OUT> extends Function, Serializable {

OUT timeout(Map<String, List<IN>> pattern, long timeoutTimestamp) throws Exception;
}
1
2
3
4
public interface PatternFlatTimeoutFunction<IN, OUT> extends Function, Serializable {

void timeout(Map<String, List<IN>> pattern, long timeoutTimestamp, Collector<OUT> out) throws Exception;
}

CEP和CEPOperatorUtils

 CEP是创建PatternStream的工具类,PatternStream只是DataStream和Pattern的组合。

1
2
3
4
5
6
7
8
9
10
public class CEP {

public static <T> PatternStream<T> pattern(DataStream<T> input, Pattern<T, ?> pattern) {
return new PatternStream<>(input, pattern);
}

public static <T> PatternStream<T> pattern(DataStream<T> input, Pattern<T, ?> pattern, EventComparator<T> comparator) {
return new PatternStream<>(input, pattern, comparator);
}
}

 CEPOperatorUtils是在PatternStream的select()方法和flatSelect()方法被调用的时候,去创建SingleOutputStreamOperator(DataStream)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class CEPOperatorUtils {
...
private static <IN, OUT, K> SingleOutputStreamOperator<OUT> createPatternStream(
final DataStream<IN> inputStream,
final Pattern<IN, ?> pattern,
final TypeInformation<OUT> outTypeInfo,
final boolean timeoutHandling,
final EventComparator<IN> comparator,
final OperatorBuilder<IN, OUT> operatorBuilder) {
final TypeSerializer<IN> inputSerializer = inputStream.getType().createSerializer(inputStream.getExecutionConfig());

// check whether we use processing time
final boolean isProcessingTime = inputStream.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;

// compile our pattern into a NFAFactory to instantiate NFAs later on
final NFACompiler.NFAFactory<IN> nfaFactory = NFACompiler.compileFactory(pattern, timeoutHandling);

final SingleOutputStreamOperator<OUT> patternStream;

if (inputStream instanceof KeyedStream) {
KeyedStream<IN, K> keyedStream = (KeyedStream<IN, K>) inputStream;

patternStream = keyedStream.transform(
operatorBuilder.getKeyedOperatorName(),
outTypeInfo,
operatorBuilder.build(
inputSerializer,
isProcessingTime,
nfaFactory,
comparator,
pattern.getAfterMatchSkipStrategy()));
} else {
KeySelector<IN, Byte> keySelector = new NullByteKeySelector<>();

patternStream = inputStream.keyBy(keySelector).transform(
operatorBuilder.getOperatorName(),
outTypeInfo,
operatorBuilder.build(
inputSerializer,
isProcessingTime,
nfaFactory,
comparator,
pattern.getAfterMatchSkipStrategy()
)).forceNonParallel();
}

return patternStream;
}
...
}

FlinkCEP实现步骤

  1. IN: DataSource -> DataStream -> Transformations -> DataStream
  2. Pattern: Pattern.begin.where.next.where…times…
  3. PatternStream: CEP.pattern(DataStream, Pattern)
  4. DataStream: PatternStream.select(PatternSelectFunction) PatternStream.flatSelect(PatternSelectFunction)
  5. OUT: DataStream -> Transformations -> DataStream -> DataSink

FlinkCEP匹配超时实现步骤

 TimeoutCEP的流需要keyBy,即KeyedStream,如果inputStream不是KeyedStream,会new一个0字节的Key(上面CEPOperatorUtils源码里有提到)。

1
KeySelector<IN, Byte> keySelector = new NullByteKeySelector<>();

 Pattern最后调用within设置窗口时间。
 如果是对主键进行分组,一个时间窗口内最多只会匹配出一个超时事件,使用PatternStream.select(…)就可以了。

  1. IN: DataSource -> DataStream -> Transformations -> DataStream -> keyBy -> KeyedStream
  2. Pattern: Pattern.begin.where.next.where…within(Time windowTime)
  3. PatternStream: CEP.pattern(KeyedStream, Pattern)
  4. OutputTag: new OutputTag(…)
  5. SingleOutputStreamOperator: PatternStream.flatSelect(OutputTag, PatternFlatTimeoutFunction, PatternFlatSelectFunction)
  6. DataStream: SingleOutputStreamOperator.getSideOutput(OutputTag)
  7. OUT: DataStream -> Transformations -> DataStream -> DataSink

FlinkCEP超时不足

 和Flink窗口聚合类似,如果使用事件时间和依赖事件生成的水印向前推进,需要后续的事件到达,才会触发窗口进行计算和输出结果。

FlinkCEP超时完整demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
public class CEPTimeoutEventJob {
private static final String LOCAL_KAFKA_BROKER = "localhost:9092";
private static final String GROUP_ID = CEPTimeoutEventJob.class.getSimpleName();
private static final String GROUP_TOPIC = GROUP_ID;

public static void main(String[] args) throws Exception {
// 参数
ParameterTool params = ParameterTool.fromArgs(args);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 使用事件时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(5000);

env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getConfig().disableSysoutLogging();
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(5, 10000));

// 不使用POJO的时间
final AssignerWithPeriodicWatermarks extractor = new IngestionTimeExtractor<POJO>();

// 与Kafka Topic的Partition保持一致
env.setParallelism(3);

Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER);
kafkaProps.setProperty("group.id", GROUP_ID);

// 接入Kafka的消息
FlinkKafkaConsumer011<POJO> consumer = new FlinkKafkaConsumer011<>(GROUP_TOPIC, new POJOSchema(), kafkaProps);
DataStream<POJO> pojoDataStream = env.addSource(consumer)
.assignTimestampsAndWatermarks(extractor);

pojoDataStream.print();

// 根据主键aid分组 即对每一个POJO事件进行匹配检测【不同类型的POJO,可以采用不同的within时间】
// 1.
DataStream<POJO> keyedPojos = pojoDataStream
.keyBy("aid");

// 从初始化到终态-一个完整的POJO事件序列
// 2.
Pattern<POJO, POJO> completedPojo =
Pattern.<POJO>begin("init")
.where(new SimpleCondition<POJO>() {
private static final long serialVersionUID = -6847788055093903603L;

@Override
public boolean filter(POJO pojo) throws Exception {
return "02".equals(pojo.getAstatus());
}
})
.followedBy("end")
// .next("end")
.where(new SimpleCondition<POJO>() {
private static final long serialVersionUID = -2655089736460847552L;

@Override
public boolean filter(POJO pojo) throws Exception {
return "00".equals(pojo.getAstatus()) || "01".equals(pojo.getAstatus());
}
});

// 找出1分钟内【便于测试】都没有到终态的事件aid
// 如果针对不同类型有不同within时间,比如有的是超时1分钟,有的可能是超时1个小时 则生成多个PatternStream
// 3.
PatternStream<POJO> patternStream = CEP.pattern(keyedPojos, completedPojo.within(Time.minutes(1)));

// 定义侧面输出timedout
// 4.
OutputTag<POJO> timedout = new OutputTag<POJO>("timedout") {
private static final long serialVersionUID = 773503794597666247L;
};

// OutputTag<L> timeoutOutputTag, PatternFlatTimeoutFunction<T, L> patternFlatTimeoutFunction, PatternFlatSelectFunction<T, R> patternFlatSelectFunction
// 5.
SingleOutputStreamOperator<POJO> timeoutPojos = patternStream.flatSelect(
timedout,
new POJOTimedOut(),
new FlatSelectNothing()
);

// 打印输出超时的POJO
// 6.7.
timeoutPojos.getSideOutput(timedout).print();
timeoutPojos.print();

env.execute(CEPTimeoutEventJob.class.getSimpleName());
}

/**
* 把超时的事件收集起来
*/
public static class POJOTimedOut implements PatternFlatTimeoutFunction<POJO, POJO> {
private static final long serialVersionUID = -4214641891396057732L;

@Override
public void timeout(Map<String, List<POJO>> map, long l, Collector<POJO> collector) throws Exception {
if (null != map.get("init")) {
for (POJO pojoInit : map.get("init")) {
System.out.println("timeout init:" + pojoInit.getAid());
collector.collect(pojoInit);
}
}
// 因为end超时了,还没收到end,所以这里是拿不到end的
System.out.println("timeout end: " + map.get("end"));
}
}

/**
* 通常什么都不做,但也可以把所有匹配到的事件发往下游;如果是宽松临近,被忽略或穿透的事件就没办法选中发往下游了
* 一分钟时间内走完init和end的数据
*
* @param <T>
*/
public static class FlatSelectNothing<T> implements PatternFlatSelectFunction<T, T> {
private static final long serialVersionUID = -3029589950677623844L;

@Override
public void flatSelect(Map<String, List<T>> pattern, Collector<T> collector) {
System.out.println("flatSelect: " + pattern);
}
}
}

测试结果(followedBy):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
3> POJO{aid='ID000-0', astyle='STYLE000-0', aname='NAME-0', logTime=1563419728242, energy=529.00, age=0, tt=2019-07-18, astatus='02', createTime=null, updateTime=null}
3> POJO{aid='ID000-1', astyle='STYLE000-2', aname='NAME-1', logTime=1563419728783, energy=348.00, age=26, tt=2019-07-18, astatus='02', createTime=null, updateTime=null}
3> POJO{aid='ID000-0', astyle='STYLE000-0', aname='NAME-0', logTime=1563419749259, energy=492.00, age=0, tt=2019-07-18, astatus='00', createTime=null, updateTime=null}
flatSelect: {init=[POJO{aid='ID000-0', astyle='STYLE000-0', aname='NAME-0', logTime=1563419728242, energy=529.00, age=0, tt=2019-07-18, astatus='02', createTime=null, updateTime=null}], end=[POJO{aid='ID000-0', astyle='STYLE000-0', aname='NAME-0', logTime=1563419749259, energy=492.00, age=0, tt=2019-07-18, astatus='00', createTime=null, updateTime=null}]}
timeout init:ID000-1
3> POJO{aid='ID000-1', astyle='STYLE000-2', aname='NAME-1', logTime=1563419728783, energy=348.00, age=26, tt=2019-07-18, astatus='02', createTime=null, updateTime=null}
timeout end: null
3> POJO{aid='ID000-2', astyle='STYLE000-0', aname='NAME-0', logTime=1563419829639, energy=467.00, age=0, tt=2019-07-18, astatus='03', createTime=null, updateTime=null}
3> POJO{aid='ID000-2', astyle='STYLE000-0', aname='NAME-0', logTime=1563419841394, energy=107.00, age=0, tt=2019-07-18, astatus='00', createTime=null, updateTime=null}
3> POJO{aid='ID000-3', astyle='STYLE000-0', aname='NAME-0', logTime=1563419967721, energy=431.00, age=0, tt=2019-07-18, astatus='02', createTime=null, updateTime=null}
3> POJO{aid='ID000-3', astyle='STYLE000-2', aname='NAME-0', logTime=1563419979567, energy=32.00, age=26, tt=2019-07-18, astatus='03', createTime=null, updateTime=null}
3> POJO{aid='ID000-3', astyle='STYLE000-2', aname='NAME-0', logTime=1563419993612, energy=542.00, age=26, tt=2019-07-18, astatus='01', createTime=null, updateTime=null}
flatSelect: {init=[POJO{aid='ID000-3', astyle='STYLE000-0', aname='NAME-0', logTime=1563419967721, energy=431.00, age=0, tt=2019-07-18, astatus='02', createTime=null, updateTime=null}], end=[POJO{aid='ID000-3', astyle='STYLE000-2', aname='NAME-0', logTime=1563419993612, energy=542.00, age=26, tt=2019-07-18, astatus='01', createTime=null, updateTime=null}]}
3> POJO{aid='ID000-4', astyle='STYLE000-0', aname='NAME-0', logTime=1563420063760, energy=122.00, age=0, tt=2019-07-18, astatus='02', createTime=null, updateTime=null}
3> POJO{aid='ID000-4', astyle='STYLE000-0', aname='NAME-0', logTime=1563420078008, energy=275.00, age=0, tt=2019-07-18, astatus='03', createTime=null, updateTime=null}
timeout init:ID000-4
3> POJO{aid='ID000-4', astyle='STYLE000-0', aname='NAME-0', logTime=1563420063760, energy=122.00, age=0, tt=2019-07-18, astatus='02', createTime=null, updateTime=null}
timeout end: null

邵志鹏 wechat
扫一扫上面的二维码关注我的公众号
0%