Flink学习-实验5-Broadcast

1. Flink学习-Streaming介绍
2. Flink学习-设置开发环境
3. Flink学习-实验1-对流数据进行过滤
4. Flink学习-实验1-对流数据进行转换
5. Flink学习-实验1-keyby分组
6. Flink学习-实验1-有状态的转换
7. Flink学习-实验1-相互连接的流
8. Flink学习-实验2-连接实现流join
9. Flink学习-实验3-时间和水印
10. Flink学习-实验3-窗口
11. Flink学习-实验3-ProcessFunction
12. Flink学习-实验3-Side Outputs
13. Flink学习-实验4-State Backends
14. Flink学习-实验4-Checkpoints
15. Flink学习-实验5-Broadcast
flink-quickstart
flink-training-exercises

找到最近的出租车

 给定要广播的位置,此练习的目标是查看并报告离要求地点最近的已完成行程的出租车。

我们将连接两个流:

  1. 出租车出行事件流。
  2. 分类查询流。

 在这种情况下,查询是两个以逗号分隔的浮点数,表示(经度,纬度)对。
 预期的输出是一个在接收到查询后行程结束的出租车流,每个连续的出行都更接近请求的位置。

 几个比较好的位置:

1
2
3
-74, 41                   // Near, but outside the city to the NNW
-73.7781, 40.6413 // JFK Airport
-73.977664, 40.761484 // The Museum of Modern Art

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* Java reference implementation for the "Nearest Future Taxi" exercise of the Flink training
* (http://training.data-artisans.com). This solution doesn't worry about leaking state.
*
* Given a location that is broadcast, the goal of this exercise is to watch the stream of
* taxi rides, and report on the taxis that complete their ride closest to the requested location.
*
* Parameters:
* -input path-to-input-file
*
* Use nc -lk 9999 to establish a socket stream from stdin on port 9999
*
* Some good locations:
*
* -74, 41 (Near, but outside the city to the NNW)
* -73.7781, 40.6413 (JFK Airport)
* -73.977664, 40.761484 (Museum of Modern Art)
*/

com.dataartisans.flinktraining.solutions.datastream_java.broadcast.NearestTaxiSolution
com.dataartisans.flinktraining.solutions.datastream_java.broadcast.NearestTaxiWithCleanupSolution

 另外,在实际应用程序中,考虑如何最终清除与过时查询关联的所有状态是有意义的。

行驶中的出租车

 此练习的目标是报告所有出租车的情况,当查询广播流(值为n)时,意思是当前行驶时间至少持续了n分钟。

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* Java reference implementation for the "Ongoing Rides" exercise of the Flink training
* (http://training.data-artisans.com).
*
* The goal of this exercise is to report on all taxis whose current ride has been ongoing
* for at least n minutes whenever the broadcast stream is queried (with the value of n).
*
* Parameters:
* -input path-to-input-file
*
*/

com.dataartisans.flinktraining.solutions.datastream_java.broadcast.NearestTaxiSolution.OngoingRidesSolution

规则引擎

 出租车流由出租车id来进行分组,对于每辆出租车,最近的出行事件都保留在key的状态中。
 这意味着,对于每一辆出租车,我们都有其当前状况的记录:要么是在某个地方、某个时间开始的一段行程的中途,要么是我们接收的最后一个事件是一段出行结束了。

 查询是一个java表达式,在计算时得出一个布尔值。
 这些查询表达式的作用域包括一个出行事件和当前水印。
 当一个新的查询表达式到达时,它将对所有存储的分组后的事件进行迭代计算,当查询表达式对某些迭代计算结果为True时,KeyedBroadcastProcessFunction将发出该事件。
 类似地,当新的出租车事件到达它们的流时,查询表达式也会对它们进行计算,如果表达式返回True,就会发出它们。

例如:

1
2
3
4
5
6
7
true

false

ride.isStart && (watermark - ride.getEventTime()) > 100 * 60000

!ride.isStart && ride.getEuclideanDistance(-74, 41) < 10.0

 说明:上面的第三个查询匹配100分钟前开始的正在进行的行程,最后一个查询匹配距离指定位置10公里以内的行程。如果我们只是想查看出行事件是否基本正常,”true”和”false”是方便的查询。

 为了简单起见,实现「一次只应该有一个查询」—一个新的查询将替换前面的查询。

 TaxiQueryExercise类创建了两个流,并使用Janino编译java表达式。
 剩下要做的是完成KeyedBroadcastProcessFunction的processElement和processBroadcastElement方法的实现。

1
com.dataartisans.flinktraining.solutions.datastream_java.broadcast.TaxiQuerySolution
邵志鹏 wechat
扫一扫上面的二维码关注我的公众号
0%