Flink学习-实验1-keyby分组

1. Flink学习-Streaming介绍
2. Flink学习-设置开发环境
3. Flink学习-实验1-对流数据进行过滤
4. Flink学习-实验1-对流数据进行转换
flink-quickstart
flink-training-exercises

keyBy

 keyBy: 能够围绕其中一个属性对流进行分区通常非常有用,以便将具有该属性相同值的所有事件组合在一起。
 例如,假设我们想要在每个出租车出行开始的网格区域中找到行驶时间最长的出租车。
 每个网格区域,即根据网格区域进行分组;然后DESC排序limit 1即可。

 在Flink中使用API:

1
2
3
rides
.flatMap(new NYCEnrichment())
.keyBy("startCell")

 简单看下源码DataStream中其中一个keyBy方法:

1
2
3
4
5
6
7
8
9
10
11
12
/**
* It creates a new {@link KeyedStream} that uses the provided key for partitioning
* its operator states.
*
* @param key
* The KeySelector to be used for extracting the key for partitioning
* @return The {@link DataStream} with partitioned state (i.e. KeyedStream)
*/
public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key) {
Preconditions.checkNotNull(key);
return new KeyedStream<>(this, clean(key));
}

 DataStream->KeyedStream

1
2
public class DataStream<T> {}
public class KeyedStream<T, KEY> extends DataStream<T> {}

 每个keyBy都会引发网络shuffle,对流进行重新分区(repartition)。
 通常,shuffle是非常昂贵的操作,因为它涉及网络通信以及序列化和反序列化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* Creates a new {@link KeyedStream} using the given {@link KeySelector}
* to partition operator state by key.
*
* @param dataStream
* Base stream of data
* @param keySelector
* Function for determining state partitions
*/
public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector, TypeInformation<KEY> keyType) {
this(
dataStream,
new PartitionTransformation<>(
dataStream.getTransformation(),
new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)),
keySelector,
keyType);
}

/**
* Creates a new {@link KeyedStream} using the given {@link KeySelector} and {@link TypeInformation}
* to partition operator state by key, where the partitioning is defined by a {@link PartitionTransformation}.
*
* @param stream
* Base stream of data
* @param partitionTransformation
* Function that determines how the keys are distributed to downstream operator(s)
* @param keySelector
* Function to extract keys from the base stream
* @param keyType
* Defines the type of the extracted keys
*/
@Internal
KeyedStream(
DataStream<T> stream,
PartitionTransformation<T> partitionTransformation,
KeySelector<T, KEY> keySelector,
TypeInformation<KEY> keyType) {

super(stream.getExecutionEnvironment(), partitionTransformation);
this.keySelector = clean(keySelector);
this.keyType = validateKeyType(keyType);
}

keySelector

 keyBy(“startCell”)有个缺点,编译器无法推断出key字段的类型,因此Flink会将键值作为元组Tuple进行传递。
 通常最好的做法是使用正确类型的KeySelector。比如:

1
2
3
4
5
6
7
8
9
rides
.flatMap(new NYCEnrichment())
.keyBy(
new KeySelector<EnrichedRide, int>() {
@Override
public int getKey(EnrichedRide ride) throws Exception {
return ride.startCell;
}
})

 或者使用Lambda表达式:

1
2
3
rides
.flatMap(new NYCEnrichment())
.keyBy(ride -> ride.startCell)

对KeyedStream进行聚合

  1. 每个出租车出行开始区域中开走的出租车。
  2. 出租车出行从开始到结束时所花费的时间(分钟)。

 下面使用了Tuple2<Integer, Minutes>,所以可以使用位置position来标记字段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Integer-> startCell Minutes-> duration
DataStream<Tuple2<Integer, Minutes>> minutesByStartCell = enrichedNYCRides
.flatMap(new FlatMapFunction<EnrichedRide, Tuple2<Integer, Minutes>>() {
@Override
public void flatMap(EnrichedRide ride,
Collector<Tuple2<Integer, Minutes>> out) throws Exception {
// 出租车出行结束事件
if (!ride.isStart) {
// 计算从开始到结束的时间间隔
Interval rideInterval = new Interval(ride.startTime.toDate().getTime(), ride.endTime.toDate().getTime());
Minutes duration = rideInterval.toDuration().toStandardMinutes();
out.collect(new Tuple2<>(ride.startCell, duration));
}
}
});

// fields position
minutesByStartCell
// startCell
.keyBy(0)
// duration
.maxBy(1).print();

 输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
...
3> (55032,11)
3> (55041,9)
3> (37568,7)
3> (49543,12)
3> (42817,9)
2> (55291,4)
2> (56029,6)
2> (57529,0)
2> (53295,10)
2> (44572,4)
2> (54286,7)
2> (37819,4)
2> (54545,8)
2> (48540,7)
2> (40077,4)
2> (38324,2)
1> (55048,1)
4> (49548,10)
2> (47061,8)
...

状态

 这里的keyBy操作是有状态的流处理。虽然状态处理是透明的,但Flink必须跟踪每个不同键的最大持续时间。
 在Flink程序中涉及到状态时,要考虑状态所占用空间可能会变多大。如果key的空间是无界的,那么状态的空间也应是无界的。
 在处理流数据时,通常更有意义的是考虑有限窗口上的聚合,而不是整个流。

reduce()和其他聚合操作

 使用reduce()可以自定义聚合操作。

邵志鹏 wechat
扫一扫上面的二维码关注我的公众号
0%