Streaming Cassandra在WePay的应用

原作者:Joy Gao
翻译:知了小巷

WePay通过几乎没有纠纷和欺诈的「集成支付」帮助各平台增加收入。
WePay能够无缝地让平台上的用户(付款人和商家,买家和卖家,捐赠者和组织者等)之间完成付款。
WePay专为软件平台独特的在线、POS和全渠道需求而构建。
go.wepay.com
原文地址:https://wecode.wepay.com/posts/streaming-cassandra-at-wepay-part-1
https://vitess.io
cassandra_hook.py
operators/cassandra_to_gcs.py
change data capture (CDC) feature
How is data read?
Cassandra:节点间的信息交换使用了gossip协议,因此所有节点都可以快速了解集群中的所有其他节点。

引入Cassandra

 一直以来,MySQL都是WePay微服务事实上的首选数据库。随着WePay的业务增长,写入微服务数据库的数据量越积越多,大量的数据要求我们必须在对MySQL数据库进行分片(如使用vitess](https://vitess.io))和切换到具有本地分片机制的NoSQL数据库之间做出权衡。经过一系列的调研评估,我们选择了NoSQL数据库Cassandra,主要是因为它具有高可用性、水平可扩展性以及处理高写入吞吐量的能力。

批量ETL

 在将Cassandra引入我们的基础架构之后,我们的下一个挑战是找出一种方法,将Cassandra中的数据暴露给我们的数据仓库BigQuery,用于分析和报表输出。为此,我们很快在apache airflow上建立了一个cassandra_hook和cassandra_to_gcs算子来执行全部数据的加载任务。由于它在每次加载数据时都要重写整个数据库,这样显然不能扩展。为了扩展整个流程管道,我们评估了两种增量加载的方法,但两者都有它们的缺点:

  1. 范围查询
    【Clustering Key: 主要用于进行Range Query,并且使用的时候需要按照建表顺序进行提供信息,可以理解为索引键,是Primary Key的一部分,另一部分是Partition Key。】
    这是一种常见的ETL方法,其中数据是通过定期的范围查询(例如每小时或每天)提取的。任何熟悉Cassandra数据建模的人都会很快意识到这种方法是多么不切实际。需要对Cassandra表进行建模,以优化生产中使用的查询模式。在大多数情况下,添加这个用于分析的查询模式意味着要克隆大量不同的Clustering Key。关系数据库管理人员可能建议使用辅助索引(二级索引)来支持这种查询模式,但是Cassandra中的辅助索引是本地化的,因此这种方法本身会带来性能和伸缩性问题。
  2. 处理未合并的SSTables
    SSTables是Cassandra的不可变存储文件。Cassandra提供了一个sstabledump CLI命令,该命令将SSTable内容转换为人类可读的JSON格式的数据。然而,Cassandra构建在LSM树的概念之上,这意味着SSTables定期合并到新的压缩文件中。根据压缩策略,检测到还未合并的SSTable文件可能很有挑战性(稍后我们会了解Cassandra中的增量备份功能,它只备份未压缩的SSTable文件;因此,这种方法也会奏效。)

 考虑到上述这些挑战,而且已经为MySQL构建过并运行了一个流数据管道,我们开始为Cassandra探索流选项。

流处理

双写

image

 往Cassandra写数据的同时将数据发布给Kafka。这种双重写入,可以通过内置触发器或对客户端API进行自行封装来实现,不过这种方法存在性能问题。首先,由于我们现在需要对两个系统而不是一个系统进行写操作,因此会增加写延迟。更重要的是,当对一个系统的写操作由于超时而失败时,写操作是否成功是不确定的。为了保证两个系统上的数据一致性,我们必须实现分布式事务,但是为了达成共识而进行的多次往返会增加延迟并进一步降低吞吐量。这违背了高写入吞吐量数据库的目的。

将Kafka作为事件源

image

 直接将数据写入Kafka,而不是直接往Cassandra里面写;然后从Kafka中消费数据写入Cassandra。目前在业界,事件作为数据源是一种非常流行的方法。但是,如果您已经有直接将数据写入Cassandra的现有服务,则需要更改应用程序代码并进行非常重要的迁移。这种方法还违反了读写一致性:如果进程执行写操作,那么执行后续读操作的相同进程必须观察写操作的效果。由于写是通过Kafka路由的,所以在发出写和应用写之间会有一个延迟;在此期间,将导致从Cassandra读取到陈旧的数据。这可能会导致无法预料的生产问题。

解析Commit Logs

image

 Cassandra在3.0版本中引入了一个对有更新的数据进行捕获(CDC)的特性来公开其Commit Logs「类似MySQL binlog」。Commit Logs是Cassandra中的预写日志,用于在机器崩溃时提供数据持久化机制。在未启用CDC的情况下,Commit Logs通常在刷新时被丢弃。启用CDC后,它们在刷新时被转移到本地CDC目录下,然后Cassandra节点上的其他进程可以读取该目录。这允许我们使用与MySQL流管道相同的CDC机制。它使得生产操作与数据分析进行解耦,因此不需要应用程序工程师做额外的工作。

 最终,在考虑了吞吐量、数据一致性和关注点分离之后,最后一个选项——解析Commit Logs——成为了最有力的竞争者。

深入Commit Log

 除了公开Commit Log之外,Cassandra还提供了CommitLogReader和CommitLogReadHandler类来帮助我们对Commit Log进行反序列化。这样,似乎已经完成了艰苦的工作,剩下的是应用转换——将反序列化后的数据转换为Avro记录并将它们发布到Kafka。然而,随着我们进一步深入CDC特性和Cassandra本身的实现,我们意识到有许多新的挑战。

延迟处理

 Commit Log仅在其已写满时才到达CDC目录,在这种情况下,Commit Log将被刷新或者丢弃。这意味着在记录事件和捕获事件之间存在延迟。如果执行的写入很少或没有写操作,则事件捕获的延迟可以是任意长的。

空间管理

 在MySQL中,您可以设置binlog的保留时限,以便在配置的保留期后自动删除日志。然而在Cassandra中没有这样的选择。将Commit Log传输到CDC目录后,必须通过消费来清理处理后的Commit Log。如果CDC目录的可用磁盘空间超过给定阈值,则将拒绝对数据库的进一步写入。

重复的事件

 单个Cassandra节点上的Commit Log并不包含对整个集群的所有写操作;它们只包含对当前节点的写操作。这就需要处理所有节点上的Commit Log。但由于Cassandra的高可用副本机制,如果复制因子为N,则每个事件的N个副本将被发送到下游。

无序的事件

 对单个Cassandra节点的写入在到达Commit Log时会被连续记录。然而,这些事件在向外部发出时可能会出现乱序。这些事件的下游消费者必须了解事件时间,并实现与Cassandra的读取路径类似的last write wins(最后写入为准)逻辑,以获得正确的结果。

表结构发生改变

 表(模式)结构的更改通过Gossip协议进行通信,不会记录在Commit Log中。因此,只能尽最大努力去检测模式的变化。

不完整的数据行

 Cassandra不会执行写前读(read before write),因此更改事件不会捕获每个列的状态,它们只捕获修改列的状态。【意思就是更新一个字段,更新事件只有这个字段的数据-可能还有主键】这使得部分列的变更事件没有什么用了,只有一整行的数据都变更才有意义。

 一旦我们对Cassandra的Commit Log有了深入的了解,我们就可以根据给定的条件约束重新评估我们的需求,以设计最小可行的基础设施。

最小可行的基础设施

 借鉴「最小可行产品」的理念,我们想设计一个数据管道,用最小的功能和需求集来满足我们的紧急客户。对于Cassandra的CDC来说,这意味着:

  1. 引入CDC不能对生产数据库的健康和性能产生负面影响;生产库运行缓慢和系统停机时间要比分析管道中的数据延迟要昂贵得多。
  2. 查询数据仓库中的Cassandra表应该与查询生产数据库的结果相匹配(排除延迟);数据重复或不完整的行会增加每个最终用户的后续处理工作量。

有了这些标准,我们开始集思广益寻找解决方案,最终提出了三种方法:

无状态的流处理

 这个解决方案的灵感来自Datastax的一篇描述「高级复制」的博客文章。想法是在每个Cassandra节点上部署代理用来处理本地的Commit Log。对于基于分区键的写子集,每个代理都被认为是”主代理”,这样每个事件都只有一个主代理。然后在CDC期间,为了避免重复的事件,每个代理只向Kafka发送一个事件(如果它是事件的主代理)。为了处理最终的一致性,每个代理都会在事件到达时将事件分类到每个表的时间切片窗口中(但不会立即发布它们);当一个窗口过期时,将对该窗口中的事件进行hash计算,并将hash值与其他节点进行比较。如果它们不匹配,则从不一致的节点获取数据,以便通过last write wins(最后写入为准)来解析正确的值。最后,该窗口中更正后的事件将被发送到Kafka。任何超出时间切片窗口的无序事件都必须记录到一个无序文件中,并单独处理。由于重复数据删除和排序是在内存中完成的,所以担心代理故障转移会导致数据丢失、OOM问题会影响生产数据库,以及这个实现的总体复杂性使我们无法进一步探究它。

有状态的流处理

 这个解决方案是功能最丰富的。其思想是,每个Cassandra节点上的代理将处理Commit Log并将事件发布到Kafka,而无需进行去重和排序。然后,流处理引擎将使用这些原始事件并执行繁重的工作(如使用缓存过滤掉重复事件、使用事件时间窗口管理事件顺序,以及通过在写入状态存储之前执行读取来捕获未修改列的状态-即合并为完整一行数据),然后将这些派生事件发布到单独的Kafka主题。最后,使用KCBQ把单独Kafka主题里的事件同步到BigQuery。这种方法很有吸引力,因为它通常解决了这个问题——任何人都可以订阅后一个Kafka主题,而不需要自己再处理一遍去重和排序(即实时ETL)。然而,这种方法引入了大量的操作开销;我们必须维护一个流处理引擎、一个数据库和一个缓存。

读时处理

 与前面方法类似,其思想是在每个Cassandra节点上处理Commit Log,并将事件发送给Kafka,而无需去重和排序。不同的是,流处理部分被完全消除。相反,原始事件将通过KCBQ直接同步到BigQuery。在BigQuery原始表之上创建视图用来处理列的去重、排序和合并以形成完整的行。因为BigQuery视图是虚拟表,所以每次查询视图时处理都是延迟的。为了防止视图查询变得过于昂贵,视图将定期转为实体表。通过利用BigQuery的大规模并行查询引擎,这种方法消除了计算复杂性和代码复杂性。然而,缺点是非KCBQ下游消费者必须自己完成所有工作。

考虑到流式传输Cassandra的主要目的是数据仓库,我们最终决定实现读时处理。它为我们现有的用例提供了基本的功能,并且可以灵活地扩展到未来提到的其他两个更通用的解决方案。

总结

 在为Cassandra构建实时数据管道的过程中,我们收到了大量关于这个项目感兴趣的内容。因此,我们决定在Debezium体系下开放Cassandra CDC代理的源代码,作为孵化连接器。如果您想了解更多信息或作出贡献,请查看正在进行的pull request,以获得源代码和文档。

邵志鹏 wechat
扫一扫上面的二维码关注我的公众号
0%