Flink学习-实验1-对流数据进行过滤

1. Flink学习-Streaming介绍
2. Flink学习-设置开发环境
flink-quickstart
flink-training-exercises

  1. 筛选出始发和终点都在纽约的出租车数据。
  2. GeoUtils里面提供了是否在纽约的判断。
  3. 一个很简单的Filter
1
2
3
4
5
public static boolean isInNYC(float lon, float lat) {

return !(lon > LON_EAST || lon < LON_WEST) &&
!(lat > LAT_NORTH || lat < LAT_SOUTH);
}

修改ExerciseBase的出租车数据位置

1
2
public final static String pathToRideData = "/Users/shaozhipeng/Resources/2019/trainingData/nycTaxiRides.gz";
public final static String pathToFareData = "/Users/shaozhipeng/Resources/2019/trainingData/nycTaxiFares.gz";

数据输入

1
2
3
4
5
6
7
8
9
// get an ExecutionEnvironment
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// configure event-time processing
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// get the taxi ride data stream
DataStream<TaxiRide> rides = env.addSource(
new TaxiRideSource("/Users/shaozhipeng/Resources/2019/trainingData/nycTaxiRides.gz", maxDelay, servingSpeed));

数据输出

 期望输出起始点不在纽约的数据到控制台。

RideCleansingExercise

 com.dataartisans.flinktraining.exercises.datastream_java.basics.RideCleansingExercise

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package com.dataartisans.flinktraining.exercises.datastream_java.basics;

import com.dataartisans.flinktraining.exercises.datastream_java.datatypes.TaxiRide;
import com.dataartisans.flinktraining.exercises.datastream_java.sources.TaxiRideSource;
import com.dataartisans.flinktraining.exercises.datastream_java.utils.ExerciseBase;
import com.dataartisans.flinktraining.exercises.datastream_java.utils.GeoUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
* The "Ride Cleansing" exercise from the Flink training
* (http://training.data-artisans.com).
* The task of the exercise is to filter a data stream of taxi ride records to keep only rides that
* start and end within New York City. The resulting stream should be printed.
* <p>
* Parameters:
* -input path-to-input-file
*/
public class RideCleansingExercise extends ExerciseBase {
public static void main(String[] args) throws Exception {

ParameterTool params = ParameterTool.fromArgs(args);
final String input = params.get("input", ExerciseBase.pathToRideData);

final int maxEventDelay = 60; // events are out of order by max 60 seconds
final int servingSpeedFactor = 600; // events of 10 minutes are served in 1 second

// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(ExerciseBase.parallelism);

// start the data generator
DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new TaxiRideSource(input, maxEventDelay, servingSpeedFactor)));

DataStream<TaxiRide> filteredRides = rides
// filter out rides that do not start or stop in NYC
.filter(new NYCFilter());

// print the filtered stream
printOrTest(filteredRides);

// run the cleansing pipeline
env.execute("Taxi Ride Cleansing");
}

private static class NYCFilter implements FilterFunction<TaxiRide> {

@Override
public boolean filter(TaxiRide taxiRide) throws Exception {
// 起点和终点都在纽约
return GeoUtils.isInNYC(taxiRide.startLon, taxiRide.startLat) && GeoUtils.isInNYC(taxiRide.endLon, taxiRide.endLat);
}
}

}

 直接运行RideCleansingExercise或者运行单元测试RideCleansingTest
 com.dataartisans.flinktraining.exercises.datastream_java.basics.RideCleansingTest。

对应的解决方案RideCleansingSolution.java

 filter中的throw new MissingSolutionException();替换为具体的逻辑判断。

邵志鹏 wechat
扫一扫上面的二维码关注我的公众号
0%