Flink学习-实验4-State Backends

1. Flink学习-Streaming介绍
2. Flink学习-设置开发环境
3. Flink学习-实验1-对流数据进行过滤
4. Flink学习-实验1-对流数据进行转换
5. Flink学习-实验1-keyby分组
6. Flink学习-实验1-有状态的转换
7. Flink学习-实验1-相互连接的流
8. Flink学习-实验2-连接实现流join
9. Flink学习-实验3-时间和水印
10. Flink学习-实验3-窗口
11. Flink学习-实验3-ProcessFunction
12. Flink学习-实验3-Side Outputs
flink-quickstart
flink-training-exercises

状态后端

  1. keyed state
  2. operator state

 由Flink管理的状态(keyed)是一种共享的键/值存储,并且key状态中的每一项工作副本都会保存在负责该key的任务管理器所在机器的本地存储中,算子在进行计算时会使用该状态。
 算子状态也是放在需要用到它的本地存储中。
 Flink会定期获取所有状态的持久快照,并将这些快照复制到更耐久的地方,例如分布式文件系统。

 如果Job发生故障,Flink可以恢复应用程序的完整状态并恢复处理,就好像没有出错一样。

 Flink管理的状态存储在状态后端。
 可以使用两种状态后端实现:

  1. 基于RocksDB,将其工作状态保持在磁盘上的嵌入式键/值存储。
  2. 基于堆的状态后端,它在Java堆上保持其工作状态在内存中。
    基于堆的状态后端有两种形式:将状态快照保存到分布式文件系统的FsStateBackend,以及使用JobManager堆的MemoryStateBackend。

image

 使用状态保持在基于堆的状态后端时,访问和更新涉及在堆上读取和写入对象。
 但是对于RocksDBStateBackend中保存的对象,访问和更新涉及序列化和反序列化,因此更加昂贵。
 但是使用RocksDB的状态数量仅受本地磁盘大小的限制。
 另请注意,只有RocksDBStateBackend能够执行增量快照,这对于具有大量缓慢变化状态的应用程序来说是一个重要的好处。

 两个状态后端都能够执行异步快照,这意味着它们可以在不妨碍正在进行的流处理的情况下生成快照。
 这是RocksDBStateBackend的默认行为,而基于堆的状态后端默认是同步的。

可用的状态后端

如上图:

  1. MemoryStateBackend
  2. FsStateBackend
  3. RocksDBStateBackend

 没有指定配置时,默认使用MemoryStateBackend。

MemoryStateBackend

 MemoryStateBackend在内部将数据保存为Java堆上的对象。
 键/值状态、保存着「存储着数据和触发器等哈希表」的窗口算子。

 在进行checkpoint时,此状态后端将对状态进行快照,并将其作为检查点确认消息的一部分发送到JobManager(主节点),JobManager也将其存储在其堆上。

 可以将MemoryStateBackend配置为使用异步快照。
 虽然我们强烈建议使用异步快照来避免阻塞管道,但请注意,默认情况下,此功能目前处于启用状态。
 要禁用此功能,用户可以将构造函数中相应的布尔标志设置为false来实例化MemoryStateBackend(这应仅用于调试)。

1
new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);

MemoryStateBackend的局限性:

  1. 默认情况下,每个状态的大小限制为5MB。可以在MemoryStateBackend的构造函数中增加此值。
  2. 无论配置的最大状态大小如何,状态都不能大于akka帧大小(请参阅配置)。
  3. 聚合状态必须与JobManager内存相适应。

推荐MemoryStateBackend用于以下情况:

  1. 本地开发和调试。
  2. 几乎没有状态的作业,例如仅包含一次记录功能的作业(Map,FlatMap,Filter,…)。 Kafka消费者端需要很少的状态。

FsStateBackend

 FsStateBackend配置有文件系统URL(类型,地址,路径),例如”hdfs://namenode:8020/flink/checkpoints”或”file:///data/flink/checkpoints”。
 FsStateBackend将正在运行的数据保存在TaskManager的内存中。
 在检查点时,它将状态快照写入配置的文件系统和目录中的文件。
 最小元数据存储在JobManager的内存中(或者,在高可用性模式下,存储在元数据检查点中)。

 FsStateBackend默认使用异步快照,以避免在编写状态检查点时阻塞处理管道。
 要禁用此功能,用户可以将构造函数中相应的布尔标志设置为false来实例化FsStateBackend,例如:

1
new FsStateBackend(path, false);

FsStateBackend使用场景:

  1. 具有大状态,长窗口,大键/值状态的作业。
  2. 所有高可用性设置。

RocksDBStateBackend

 RocksDBStateBackend配置有文件系统URL(类型,地址,路径),例如”hdfs://namenode:8020/flink/checkpoints”或”file:///data/flink/checkpoints”。
 RocksDBStateBackend在RocksDB数据库中保存正在运行的数据,该数据库(默认情况下)存储在TaskManager数据目录中。
 在检查点时,整个RocksDB数据库将被检查点到配置的文件系统和目录中。
 最小元数据存储在JobManager的内存中(或者,在高可用性模式下,存储在元数据检查点中)。

 RocksDBStateBackend始终执行异步快照。

RocksDBStateBackend的局限性:

  1. 由于RocksDB的JNI网桥API基于byte数组,因此每个key和每个值支持的最大大小为2^31个字节。
    重要信息:在RocksDB中使用合并操作的状态(例如ListState)可以静默地累积>2^31字节的值大小,然后在下次检索时失败。这是目前RocksDB JNI的一个限制。

RocksDBStateBackend使用场景:

  1. 具有非常大的状态,长窗口,大键/值状态的作业。
  2. 所有高可用性设置。

 我们可以保留的状态量仅受可用磁盘空间量的限制。
 与将状态保持在内存中的FsStateBackend相比,这允许保持非常大的状态。
 但是,这也意味着使用此状态后端可以实现的最大吞吐量更低。
 对此后端的所有读/写都必须通过「序列化和反序列化」来检索和存储状态对象,这比使用基于堆的状态后端更昂贵。

 RocksDBStateBackend是目前唯一提供增量检查点的后端。

配置State Backend

 如果我们什么都不指定,则默认状态后端是作业管理器。
 如果要为集群上的所有作业建立不同的默认值,可以通过在flink-conf.yaml中定义新的默认状态后端来实现。
 可以基于每个作业覆盖默认状态后端,如下所示。
 每个作业状态后端在作业的StreamExecutionEnvironment上设置,如下例所示:

1
2
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend((StateBackend)new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));

 如果使用RocksDBStateBackend,增加如下依赖:

1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.8.0</version>
</dependency>

设置默认的State Backend

 flink-conf.yaml
 state.backend

1
2
3
4
5
6
7
8
# The backend that will be used to store operator state checkpoints

state.backend: filesystem


# Directory for storing checkpoints

state.checkpoints.dir: hdfs://namenode:8020/flink/checkpoints
邵志鹏 wechat
扫一扫上面的二维码关注我的公众号
0%