有状态的计算

流式计算分为无状态和有状态两种情况。
无状态的计算观察每个独立事件,并根据最后一个事件输出结果。
有状态的计算则会基于多个事件输出结果。
《Flink基础教程》。

一致性

 当在分布式系统中引入状态时,自然也引入了一致性问题。
 一致性实际上是”正确性级别”的另一种说法,即在成功处理故障并恢复之后得到的结果,与没有发生任何故障时得到的结果相比,前者有多正确?

 在流处理中,一致性分为3个级别:

  1. at-most-once:这其实是没有正确性保障的委婉说法——故障发生之后,计数结果可能丢失。
  2. at-least-once:这表示计数结果可能大于正确值,但绝不会小于正确值。也就是说,计数程序在发生故障后可能多算,但是绝不会少算。
  3. exactly-once:这指的是系统保证在发生故障后得到的计数结果与正确值一致。

 曾经,at-least-once非常流行。
 第一代流处理器(如Storm和Samza)刚问世时只保证at-least-once,原因有二:

  1. 保证 exactly-once 的系统实现起来更复杂。这在基础架构层(决定什么代表正确,以及exactly-once的范围是什么)和实现层都很有挑战性。
  2. 流处理系统的早期用户愿意接受框架的局限性,并在应用层想办法弥补 (例如使应用程序具有幂等性,或者用批量计算层再做一遍计算)。

 最先保证exactly-once的系统(Storm Trident和Spark Streaming)在性能和表现力这两个方面付出了很大的代价。

 Flink的一个重大价值在于,它既保证了exactly-once,也具有低延迟和高吞吐的处理能力。

image

检查点:保证exactly-once

 一种被称为“检查点”的特性,在 出现故障时将系统重置回正确状态。
 一种标记,或者记号。

 检查点是Flink最有价值的创新之一,因为它使Flink可以保证exactly-once,并且不需要牺牲性能。
 Flink检查点的核心作用是确保状态正确,即使遇到程序中断,也要正确。

1
2
3
4
5
6
7
8
val stream: DataStream[(String, Int)] = ...
val counts: DataStream[(String, Int)] = stream
.keyBy(record => record._1)
.mapWithState((in: (String, Int), count: Option[Int]) =>
count match {
case Some(c) => ( (in._1, c + in._2), Some(c + in._2) )
case None => ( (in._1, in._2), Some(in._2) )
})

 该程序有两个算子:keyBy算子用来将记录按照第一个元素(一个字符串)进行分组,根据该key将数据进行重新分区,然后将记录再发送给下一个算子:有状态的map算子(mapWithState)。
 map算子在接收到每个元素后,将输入记录的第二个字段的数据加到现有总数中,再将更新过的元素发射出去。
 如程序的初始状态:输入流中的6条记录被检查点屏障(checkpoint barrier)隔开,所有的map算子状态均为0(计数还未开始)。
 所有key为a的记录将被顶层的map算子处理,所有key为b的记录将被中间层的map算子处理,所有key为c的记录则将被底层的map算子处理。

image

 检查点该如何保证exactly-once呢?
 检查点屏障和普通记录类似。
 它们由算子处理,但并不参与计算,而是会触发与检查点相关的行为。
 当读取输入流的数据源(与keyBy算子内联)遇到检查点屏障时,它将其在输入流中的位置保存到稳定存储中。
 如果输入流来自消息传输系统(Kafka或MapRStreams),这个位置就是偏移量。
 Flink的存储机制是插件化的,稳定存储可以是分布式文件系统,如HDFS、S3或MapR-FS。

image

 检查点屏障像普通记录一样在算子之间流动。
 当map算子处理完前3条记录并收到检查点屏障时,它们会将状态以异步的方式写入稳定存储。

image

 当map算子的状态备份和检查点屏障的位置备份被确认之后,该检查点操作就可以被标记为完成。
 我们在无须停止或者阻断计算的条件下,在一个逻辑时间点(对应检查点屏障在输入流中的位置)为计算状态拍了快照。
 通过确保备份的状态和位置指向同一个逻辑时间点,从而保证exactly-once。
 当没有出现故障时,Flink检查点的开销极小,检查点操作的速度由稳定存储的可用带宽决定。

image

 检查点操作完成,状态和位置均已备份到稳定存储中。
 输入流中的所有记录都已处理完成。值得注意的是,备份的状态值与实际的状态值是不同的。备份反映的是检查点的状态。
 如果检查点操作失败,Flink会丢弃该检查点并继续正常执行,因为之后的某一个检查点可能会成功。
 虽然恢复时间可能更长,但是对于状态的保证依旧很有力。
 只有在一系列连续的检查点操作失败之后,Flink才会抛出错误,因为这通常预示着发生了严重且持久的错误。

image

 故障紧跟检查点,导致最底部的实例丢失。
 在这种情况下,Flink会重新拓扑(可能会获取新的执行资源),将输入流倒回到上一个检查点,然后恢复状态值并从该处开始继续计算。
 [“a”,2]、[“a”,2] 和 [“c”,2] 这几条记录将被重播。

image

 Flink将输入流倒回到上一个检查点屏障的位置,同时恢复map算子的状态值。
 然后,Flink从此处开始重新处理。这样做保证了在记录被处理之后,map算子的状态值与没有发生故障时的一致。
 Flink检查点算法的正式名称是异步屏障快照(asynchronous barrier snapshotting)。
 该算法大致基于Chandy-Lamport分布式快照算法。

保存点:状态版本控制

 检查点由Flink自动生成,用来在故障发生时重新处理记录,从而修正状态。
 Flink用户还可以通过另一个特性有意识地管理状态版本,这个特性叫作保存点(savepoint)。
 保存点与检查点的工作方式完全相同,只不过它由用户通过Flink命令行工具或者Web控制台手动触发,而不由Flink自动触发。
 和检查点一样,保存点也被保存在稳定存储中。
 用户可以从保存点重启作业,而不用从头开始。
 保存点可以被视为作业在某一个特定时间点的快照(该时间点即为保存点被触发的时间点)。
 对保存点的另一种理解是,它在明确的时间点保存应用程序状态的版本。
 这和用版本控制系统保存应用程序的版本很相似。
 最简单的例子是在不修改应用程序代码的情况下,每隔固定的时间拍快照,即照原样保存应用程序状态的版本。

image

 分别在t1时刻和t2时刻触发了保存点。
 因此,可以在任何时候返回到这两个时间点,并且重启程序。
 更重要的是,可以从保存点启动被修改过的程序版本。
 举例来说,可以修改应用程序的代码(假设称新版本为v.1),然后从t1时刻开始运行改动过的代码。
 这样一来,v.0和v.1 这两个版本同时运行,并在之后的时间里获取各自的保存点。

image

 使用保存点更新Flink应用程序的版本。
 新版本可以从旧版本生成的一个保存点处开始执行。
 保存点可用于应对流处理作业在生产环境中遇到的许多挑战:

  1. 应用程序代码升级:假设你在已经处于运行状态的应用程序中发现了一个 bug,并且希望之后的事件都可以用修复后的新版本来处理。通过触发保存点并从该保存点处运行新版本,下游的应用程序并不会察觉到不同(当然,被更新的部分除外)。
  2. Flink版本更新:Flink自身的更新也变得简单,因为可以针对正在运行 的任务触发保存点,并从保存点处用新版本的Flink重启任务。
  3. 维护和迁移:使用保存点,可以轻松地“暂停和恢复”应用程序。这对于集群维护以及向新集群迁移的作业来说尤其有用。此外,它还有利于开发、测试和调试,因为不需要重播整个事件流。
  4. 假设模拟与恢复:在可控的点上运行其他的应用逻辑,以模拟假设的场景,这样做在很多时候非常有用。
  5. A/B测试:从同一个保存点开始,并行地运行应用程序的两个版本,有助于进行A/B测试。

 Flink内部的检查点机制以保存点的形式呈现给用户,用来应对上述挑战。
 这反映了Flink检查点本质上是一个可持续升级状态版本的可编程机制,这一点很像具有多版本并发控制的数据库系统。

端到端的一致性和作为数据库的流处理器

image

 输入数据来自一个分区存储系统(如Kafka或者MapRStreams这样的消息队列)。
 source读取输入数据,根据key分区,并将数据路由到有状态的算子实例。
 有状态的算子将状态内容(比如前例中的计数结果)或者一些衍生结果写入sink,再由 sink将结果传送到输出存储系统中(例如文件系统或数据库)。
 接着,查询服务(比如数据库查询API)就可以允许用户对状态进行查询(最简单的例子就是查询计数结果),因为状态已经被写入输出存储系统了。

 在将状态内容传送到输出存储系统的过程中,如何保证exactly-once呢?这叫作端到端的一致性。
 本质上有两种实现方法,用哪一种方法则取决于输出存储系统的类型,以及应用程序的需求。

  1. 在sink环节缓冲所有输出,并在sink收到检查点记录时,将输出”原子提交”到存储系统。这种方法保证输出存储系统中只存在有一致性保障的结果,并且不会出现重复的数据。从本质上说,输出存储系统会参与Flink的检查点操作。要做到这一点,输出存储系统需要具备”原子提交”的能力。
  2. 急切地将数据写入输出存储系统,同时牢记这些数据可能是”脏”的,而且需要在发生故障时重新处理。如果发生故障,就需要将输出、输入和Flink作业全部回滚,从而将”脏”数据覆盖,并将已经写入输出的”脏”数据删除。注意,在很多情况下,其实并没有发生删除操作。例如,如果新记录只是覆盖旧纪录(而不是添加到输出中),那么
    “脏”数据只在检查点之间短暂存在,并且最终会被修正过的新数据覆盖。

 这两种方法恰好对应关系数据库系统中的两种为人所熟知的事务隔离级别:已提交读(read committed)和未提交读(read uncommitted)。
 已提交读保证所有读取(查询输出)都只读取已提交的数据,而不会读取中间、传输中或”脏”的数据。
 之后的读取可能会返回不同的结果,因为数据可能已被改变。
 未提交读则允许读取”脏”数据;换句话说,查询总是看到被处理过的最新版本的数据。
 之所以本例需要有输出存储系统,是因为外部无法访问Flink的内部状态,所以输出存储系统成了查询目标。
 但是,如果可以直接查询状态,则在某些情况下根本就不需要输出存储系统,因为状态本身就已经包含了查询所需的信息。
 这种情况在许多应用程序中真实存在,直接查询状态可以大大地简化架构,同时大幅提升性能。

image

 Flink提供一个查询API,通过该API可以对Flink发出查询请求,然后得到当前的状态值。
 从某种意义上说,在有限的情景下,Flink可以替代数据库,并同时提供写路径(输入流不断更新状态)和读路径(可查询状态)。尽管这对于许多应用程序都行得通,但可查询状态受到的限制还是比通用数据库大得多。

Flink的性能

Yahoo! Streaming Benchmark

image
image

  1. 使用Flink状态
  2. 改进数据生成器并增加吞吐量
  3. 消除网络瓶颈
  4. 使用MapR Streams
  5. 增加key基数

 总之…性能优异【并不代表能够直接用于生产】

【完】

GitHub flink-training-exercises
GitHub sql-training
本文知识来源于书本,仅供快速浏览学习,勿盲目转载
GitHub flink-training-course/课程表(持续更新)

邵志鹏 wechat
扫一扫上面的二维码关注我的公众号
0%