Flink学习-实验1-有状态的转换

1. Flink学习-Streaming介绍
2. Flink学习-设置开发环境
3. Flink学习-实验1-对流数据进行过滤
4. Flink学习-实验1-对流数据进行转换
5. Flink学习-实验1-keyby分组
flink-quickstart
flink-training-exercises

为什么Flink要管理状态?

 我们自己编写的Flink程序是可以自己来使用和管理状态,而不需要Flink参与其中,但是Flink为它管理的状态提供了一些引人注目的特性。

  1. 本地性:Flink状态保存在处理它的机器上面,并且可以以内存速度进行访问
  2. 可持续性: Flink状态能够自动进行检查点保存和自动恢复
  3. 垂直可伸缩:通过添加更多的本地磁盘,可以将Flink状态保存在嵌入式RocksDB实例中
  4. 水平可伸缩:Flink状态会随着集群的增长和收缩而重新分布
  5. 可查询:可以通过REST API查询Flink状态

Rich Functions 丰富的功能? 😄

 前面已经用过的几个函数接口FilterFunction, MapFunction, 和FlatMapFunction,这些是单一的抽象模板方法模式(接口)。
 Flink还提供加了Rich的所谓“富”的概念,RichFunction,由此扩展开来AbstractRichFunction,ProcessFunction、ProcessJoinFunction、ProcessWindowFunction、ProcessAllWindowFunction、KeyedProcessFunction、CoProcessFunction、BaseBroadcastProcessFunction、RichAsyncFunction、RichCoFlatMapFunction、RichCoMapFunction、RichParallelSourceFunction、RichSinkFunction、RichSourceFunction、RichWindowFunction、RichFlatMapFunction、RichMapFunction、RichFilterFunction…。

1
2
3
4
5
6
7
8
9
10
11
12
@Public
public interface RichFunction extends Function {
void open(Configuration var1) throws Exception;

void close() throws Exception;

RuntimeContext getRuntimeContext();

IterationRuntimeContext getIterationRuntimeContext();

void setRuntimeContext(RuntimeContext var1);
}
  1. open():在算子初始化期间调用open()一次。例如,加载一些静态数据或打开外部服务的连接等。
  2. getRuntimeContext():getRuntimeContext()提供了对「一整套潜在有趣内容」的访问,但最值得注意的是如何创建和访问由Flink管理的状态。

Keyed State的例子

 使用最简单的ValueState来举例。

  1. ValueState:对于每个key,Flink将存储一个对象——在本例中,是一个类型为MovingAverage的对象。
  2. 出于性能的考虑,Flink还提供了ListState和MapState。
  3. MovingAverage:滑动平均,或者理解为移动平均数。

 现在有一个传感器数据的DataStream,String是传感器ID,Double是传感器读到的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.socketTextStream("localhost", 19000, "\n");
DataStream<Tuple2<String, Double>> input = text.map(new MapFunction<String, Tuple2<String, Double>>() {
@Override
public Tuple2<String, Double> map(String s) throws Exception {
// S001 2.0
// S001 5.0
// S001 9.0
// nc -l 19000
String[] record = s.split("\\W+");
return new Tuple2<>(record[0], Double.valueOf(record[1]));
}
});
DataStream<Tuple2<String, Double>> smoothed = input.keyBy(0).map(new Smoother());
smoothed.print();
/**
* 2> (S001,0.0)
* 2> (S001,0.0)
* 2> (S001,7.0)
*/

env.execute("MovingAverageKeyedStateExercise Job");

 实现一个有状态的转换RichMapFunction。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public static class Smoother extends RichMapFunction<Tuple2<String, Double>, Tuple2<String, Double>> {
private ValueState<MovingAverage> averageState;

@Override
public void open(Configuration conf) {
ValueStateDescriptor<MovingAverage> descriptor =
new ValueStateDescriptor<>("moving average", MovingAverage.class);
averageState = getRuntimeContext().getState(descriptor);
}

@Override
public Tuple2<String, Double> map(Tuple2<String, Double> item) throws Exception {
// access the state for this key
MovingAverage average = averageState.value();

// create a new MovingAverage (with window size 2) if none exists for this key
if (average == null) {
average = new MovingAverage(2);
}

// add this event to the moving average
average.add(item.f1);
averageState.update(average);

// return the smoothed result
return new Tuple2(item.f0, average.getAverage());
}
}

 MovingAverage实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class MovingAverage {
double sum;
int size;
LinkedList<Double> list;
double average;

/**
* 构造方法
*
* @param size
*/
public MovingAverage(int size) {
this.list = new LinkedList<>();
this.size = size;
}

public double add(double val) {
sum += val;
list.offer(val);
if (list.size() <= size) {
return sum / list.size();
}
sum -= list.poll();
return average = sum / size;
}

public double getAverage() {
return average;
}
}

清除状态

 如果数据是无界的,即持续不断增长的数据流,每个key的状态会存在某个地方且无限增长,而磁盘是有限的,且很多时候需要清除不再需要的状态。
 在ProcessFunction中使用Timer定时器来处理清除状态的操作。
 Flink1.6版本之后增加了TTL功能,即存活时间,过期自动删除。

1
averageState.clear()

Non-keyed状态

 算子状态。
 所涉及的接口有些不同,用户定义的函数通常不需要算子状态,不做讨论。

邵志鹏 wechat
扫一扫上面的二维码关注我的公众号
0%