Flink学习-实验1-对流数据进行转换

1. Flink学习-Streaming介绍
2. Flink学习-设置开发环境
3. Flink学习-实验1-对流数据进行过滤
flink-quickstart
flink-training-exercises

one-to-one map()

 网格区域:从西北角到东南角,或者左上角为(0,0)到右下角大约100*100的区域。
 GeoUtils.mapToGridCell(float lon, float lat)方法将会把一个点的经纬度转为一块上述网格区域。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public static int mapToGridCell(float lon, float lat) {
int xIndex = (int)Math.floor((Math.abs(LON_WEST) - Math.abs(lon)) / DELTA_LON);
int yIndex = (int)Math.floor((LAT_NORTH - lat) / DELTA_LAT);

return xIndex + (yIndex * NUMBER_OF_GRID_X);
}

/**
* mapToGridCell
*/
public static class EnrichedRide extends TaxiRide {
public int startCell;
public int endCell;

public EnrichedRide() {
}

public EnrichedRide(TaxiRide ride) {
this.rideId = ride.rideId;
this.isStart = ride.isStart;
this.startTime = ride.startTime;
this.endTime = ride.endTime;

this.startCell = GeoUtils.mapToGridCell(ride.startLon, ride.startLat);
this.endCell = GeoUtils.mapToGridCell(ride.endLon, ride.endLat);
}

public String toString() {
return super.toString() + "," +
Integer.toString(this.startCell) + "," +
Integer.toString(this.endCell);
}
}

/**
* one-to-one
*/
public static class Enrichment implements MapFunction<TaxiRide, EnrichedRide> {
@Override
public EnrichedRide map(TaxiRide taxiRide) throws Exception {
return new EnrichedRide(taxiRide);
}
}

 由于出租车出行数据中每一个事件都包含起始位置和结束位置,自然也就有起始网格区域和结束网格区域。

one-to-more flatmap()

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* one-to-more Filter then Map
*/
public static class NYCEnrichment implements FlatMapFunction<TaxiRide, EnrichedRide> {
@Override
public void flatMap(TaxiRide taxiRide, Collector<EnrichedRide> out) throws Exception {
// 这里举例不太恰当one-to-one 或者 one-to-0 如WordCount里的字符串split是典型的FlatMap
FilterFunction<TaxiRide> valid = new NYCFilter();
if (valid.filter(taxiRide)) {
out.collect(new EnrichedRide(taxiRide));
}
}
}

end

1
2
3
4
5
6
7
8
9
10
DataStream<TaxiRide> filteredRides = rides
// filter out rides that do not start or stop in NYC
.filter(new NYCFilter());

DataStream<EnrichedRide> filteredAndMapedRides = rides
// filter out rides that do not start or stop in NYC
.filter(new NYCFilter()).map(new Enrichment());

DataStream<EnrichedRide> filteredAndFlatMapedRides = rides.
flatMap(new NYCEnrichment());
邵志鹏 wechat
扫一扫上面的二维码关注我的公众号
0%