Flink学习-实验2-Stateful Enrichment

1. Flink学习-Streaming介绍
2. Flink学习-设置开发环境
3. Flink学习-实验1-对流数据进行过滤
4. Flink学习-实验1-对流数据进行转换
5. Flink学习-实验1-keyby分组
6. Flink学习-实验1-有状态的转换
7. Flink学习-实验1-相互连接的流
flink-quickstart
flink-training-exercises

对出租车出行数据和出租车费用数据进行关联join

输入数据

两个输入流,一个是来自TaxiRideSource的TaxiRide事件;另一个是来自TaxiFareSource的TaxiFare。
如果需要调整容错逻辑,可以关注和使用CheckpointedTaxiRideSource和CheckpointedTaxiFareSource。

期望输出

结果是Tuple2<TaxiRide, TaxiFare>,就是一个简单的join,把出行数据和费用数据关联到一块。

CoFlatMapFunction接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
* A CoFlatMapFunction implements a flat-map transformation over two
* connected streams.
*
* <p>The same instance of the transformation function is used to transform
* both of the connected streams. That way, the stream transformations can
* share state.
*
* <p>An example for the use of connected streams would be to apply rules that change over time
* onto elements of a stream. One of the connected streams has the rules, the other stream the
* elements to apply the rules to. The operation on the connected stream maintains the
* current set of rules in the state. It may receive either a rule update (from the first stream)
* and update the state, or a data element (from the second stream) and apply the rules in the
* state to the element. The result of applying the rules would be emitted.
*
* @param <IN1> Type of the first input.
* @param <IN2> Type of the second input.
* @param <OUT> Output type.
*/
@Public
public interface CoFlatMapFunction<IN1, IN2, OUT> extends Function, Serializable {

/**
* This method is called for each element in the first of the connected streams.
*
* @param value The stream element
* @param out The collector to emit resulting elements to
* @throws Exception The function may throw exceptions which cause the streaming program
* to fail and go into recovery.
*/
void flatMap1(IN1 value, Collector<OUT> out) throws Exception;

/**
* This method is called for each element in the second of the connected streams.
*
* @param value The stream element
* @param out The collector to emit resulting elements to
* @throws Exception The function may throw exceptions which cause the streaming program
* to fail and go into recovery.
*/
void flatMap2(IN2 value, Collector<OUT> out) throws Exception;
}

简单栗子 AJoinBExerise

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package com.dataartisans.flinktraining.exercises.datastream_java.state;

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.util.Collector;

import java.io.Serializable;

public class AJoinBExerise {

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Tuple2<String, String>> a = env.fromElements(
new Tuple2<String, String>("ss", "StringString"),
new Tuple2<String, String>("ls", "list")
).keyBy(0);

DataStream<Tuple3<String, Integer, Double>> b = env.fromElements(
new Tuple3<String, Integer, Double>("ss", new Integer(2), new Double(3.3)),
new Tuple3<String, Integer, Double>("ls", new Integer(5), new Double(3.9))
).keyBy(0);

DataStream<Tuple4<String, String, Integer, Double>> res = a.connect(b)
.flatMap(new ABEnrichmentFunction());
res.print();

/**
* 1> (ls,list,5,3.9)
* 2> (ss,StringString,2,3.3)
*/

env.execute("AJoinBExerise Job");
}

public static class ABEnrichmentFunction extends RichCoFlatMapFunction<Tuple2<String, String>, Tuple3<String, Integer, Double>, Tuple4<String, String, Integer, Double>> {

private ValueState<Tuple2State> tuple2ValueState;
private ValueState<Tuple3State> tuple3ValueState;

@Override
public void open(Configuration parameters) throws Exception {
tuple2ValueState = getRuntimeContext().getState(new ValueStateDescriptor<>("saved a", Tuple2State.class));
tuple3ValueState = getRuntimeContext().getState(new ValueStateDescriptor<>("saved b", Tuple3State.class));
}

// a Tuple2
@Override
public void flatMap1(Tuple2<String, String> value, Collector<Tuple4<String, String, Integer, Double>> out) throws Exception {
// 判断b的状态 并输出
Tuple3State tuple3State = tuple3ValueState.value();
if (tuple3State != null && tuple3State.getTuple3() != null) {
tuple3ValueState.clear();
out.collect(new Tuple4(value.f0, value.f1, tuple3State.getTuple3().f1, tuple3State.getTuple3().f2));
} else {
// 更新a的状态
tuple2ValueState.update(new Tuple2State().setTuple2(value));
}
}

// b Tuple3
@Override
public void flatMap2(Tuple3<String, Integer, Double> value, Collector<Tuple4<String, String, Integer, Double>> out) throws Exception {
// 判断a的状态 并输出
Tuple2State tuple2State = tuple2ValueState.value();
if (tuple2State != null && tuple2State.getTuple2() != null) {
tuple2ValueState.clear();
out.collect(new Tuple4(tuple2State.getTuple2().f0, tuple2State.getTuple2().f1, value.f1, value.f2));
} else {
// 更新b的状态
tuple3ValueState.update(new Tuple3State().setTuple3(value));
}
}
}

private static class Tuple2State implements Serializable {
private Tuple2<String, String> tuple2;

public Tuple2<String, String> getTuple2() {
return tuple2;
}

public Tuple2State setTuple2(Tuple2<String, String> tuple2) {
this.tuple2 = tuple2;
return this;
}
}

private static class Tuple3State implements Serializable {
private Tuple3<String, Integer, Double> tuple3;

public Tuple3<String, Integer, Double> getTuple3() {
return tuple3;
}

public Tuple3State setTuple3(Tuple3<String, Integer, Double> tuple3) {
this.tuple3 = tuple3;
return this;
}
}

}

出租车数据-官方示例

1
2
com.dataartisans.flinktraining.exercises.datastream_java.state.RidesAndFaresExercise
com.dataartisans.flinktraining.solutions.datastream_java.state.RidesAndFaresSolution
邵志鹏 wechat
扫一扫上面的二维码关注我的公众号
0%