文章实时采集(一个个人总结性的原则采集中的问题以及相应的含义)

优采云 发布时间: 2021-09-05 06:01

  文章实时采集(一个个人总结性的原则采集中的问题以及相应的含义)

  上一篇

  今天不小心刷新了技术公众号,看到了这样的一篇文章文章,是基于Flink关于Mysql Binlog数据采集的提议。看了实际的方法和具体的操作,有些考虑还不够。 ,缺少一些处理实际情况的操作。作者之前也做过一些类似采集tools的实践文章,但是没有做一个整体的系统总结,所以想知道是否可以做一个个人总结文章,把问题放到Binlog采集以及相应的解决方案也总结了?

  可能很多人对 Binlog 还不够了解。可能有些人表面上会想:“不是mysql生成的,有固定结构的日志,把数据采集拿过来做,数据登陆有什么难度?”

  的确,它本质上确实是一个日志,但实际上,关于Binlog采集从场景分析到技术选择,整体里面还有很多不为人知的坑,所以不要小看它。

  作者写这篇文章,目的是为了展示Binlog数据采集在实际工作中开发过程的原理、注意事项以及可能出现的问题,也会有作者的一些个人总结。数据采集中的原理,供大家参考,干货。

  那么让我们开始吧!

  个人总结原则

  首先抛开技术框架的讨论,亲自总结一下Binlog日志的数据采集主要原理:

  分别说明这三个原则的具体含义

  原则一

  在data采集中,数据登陆一般采用时间分区进行登陆,所以我们需要确定一个固定的时间戳作为时间分区的基本时间序列。

  这种情况下,好像是业务数据上的timestamp字段,无论是从实际开发中获取这个timestamp的角度,还是实际表中会有这样的timestamp,都是不可能的表完全满足。

  举个反例:

  表:商业时间戳

  表 A:create_time、update_time

  表 B:create_time

  表 C:create_at

  表 D:无

  这样的情况,理论上可以通过限制RD和DBA在设计表时对表结构的正则化来统一限制时间戳和命名,但是在实际工作中,这种情况基本是不可能的,并且相信很多读者都会遇到这种情况。

  可能很多做data采集的同学会想,能不能请他们制定标准?

  我个人的看法是,是的,但是底层的大数据采集不能完全依赖这种相互开发的标准。有以下三个原因:

  所以如果要使用唯一的固定时间序列,就必须将其与业务数据分开。我们想要的时间戳不受业务数据变化的影响。

  原则二

  在业务数据库中,肯定存在表结构变化的问题。大多数情况下是添加列,但也有列重命名、列删除等情况,字段变化的顺序不可控。

  这个原则想描述的是,导入数据仓库的表必须适应数据库表的各种操作,以保持其可用性和列数据的正确性。

  原则三

  这个数据可以追溯,包括两个方面

  第一个说明是在采集binlog采集端,可以再按一次采集binlog位置。

  第二种描述是消费binlog登陆结束时,可以通过重复消费重新登陆数据。

  这是作者的个人总结,无论选择什么样的技术选择组合构建,都需要具备这些原则。

  实现计划及具体操作

  技术架构:Debezium + Confluent + Kafka + OSS/S3 + Hive

  基于原则一的解决方案

  Debezium 提供了 New Record State Extraction 的配置选项,相当于提供了一个转换算子来提取 binlog 中的元数据。

  对于0.10版本的配置,可以提取表、版本、连接器、名称、ts_ms、db、server_id、file、pos、row等binlog元数据信息

  其中ts_ms是binlog日志的生成时间,这是binlog元数据,可以应用于所有数据表,可以利用这个固定的时间戳来完全实现我们的原则一。

  关于Debezium,不同版本之前的配置参数可能不同。读者如需练习,需在官方文档中确认对应版本的配置参数。

  对于其他框架,比如市面上比较常用的Canal,或者读者如果需要自己开发数据采集program,建议提取binlog的元数据。本流程及后续流程可能会用到。

  基于原理二的解决方案

  对于 Hive,目前主流的数据存储格式有 Parquet、ORC、Json、Avro。

  暂且不谈数据存储的效率。

  对于前两种数据格式,列存储,也就是说这两种数据格式的数据读取会严格依赖我们数据表中数据存储的顺序,这种数据格式不能满足数据列的灵活添加、删除等操作。

  Avro 格式是行存储,但需要依赖 Schema Register 服务。考虑到Hive的数据表读取完全依赖外部服务,风险太大。

  最后确定使用Json格式进行数据存储。虽然这种读取和存储效率不如其他格式高,但可以保证业务数据的任何变化都可以在hive中读取。

  Debezium组件采集binlog的数据为json格式,符合预期的设计方案,可以解决原理2带来的问题。

  对于其他框架,比如市面上比较常用的Canal,可以设置成Json数据格式进行传输,或者读者如果需要自己开发数据采集程序,同样适用。

  基于原则三的解决方案

  在采集binlog采集侧,可以再次按下位置采集binlog。

  Debezium 官方网站上也给出了这个方案的实现。也给出了相应的解决方案。大致描述一下,需要Kafkacat工具。

  对于采集的每个mysql实例,在创建数据采集任务时,Confluent会相应地创建连接器的采集元数据的topic(即采集program),

  对应的时间戳、文件位置、位置会存储在

  。您可以通过修改此数据来重置采集binlog 日志的位置。

  值得注意的是,本次操作的时间节点也是有限制的,这与mysql的binlog日志的存储周期有关,所以用这种方式回溯时,需要确认mysql日志还存在。

  重新登陆数据重复消费。

  这个方案是基于Kafka的,对于Kafka重新设计消费抵消消费网站的操作,网上有很多方案,这里不再赘述。

  为读者自己实现,选择的需要确认的MQ支持此功能。

  #how_to_change_the_offsets_of_the_source_database

  业务场景影响下的重要运营

  本节只介绍如何在作者的技术框架下实现以下操作。读者可以根据自己选择的技术组件探索不同的技术解决方案。

  数据库分库分表情况

  基于Debezium的架构,采集一个Source只能对应一个mysql实例。对于同一个实例的表拆分,可以使用Debezium Topic Routing功能。

  采集过滤binlog时,将需要采集的对应表按照正则匹配写入指定主题。

  子库的情况下,还需要在sink端添加RegexRouter变换算子,进行topic之间的合并和写入操作。

  数据增量采集和全额采集

  对于采集组件,当前配置默认是增量,所以无论选择Debezium还是Canal,正常配置都可以。

  但有时会出现需要采集full table 的情况。作者还给出了全数据的方案采集。

  方案一

  Debezium 本身自带这样的功能,需要改一下

  snapshot.mode的参数选择设置为when_needed,这样就可以完成表的所有采集操作。

  官方文档中,这里的参数配置有更详细的说明。

  #快照

  计划二

  同时使用sqoop和增量采集。

  此方案适用于表数据较多,但目前binlog数据频率不高的情况,使用此方案。

  值得注意的有两点:

  离线重复数据删除条件

  数据落地后,通过json表映射出原来的binlog数据,那么问题来了,我们如何找到最新的那条数据?

  也许我们可以简单地认为使用我们刚刚提取的ts_ms然后进行反演还不够?

  在大多数情况下,这确实是可能的。

  但是在实际开发中,笔者发现这样的情况并不能满足所有的情况,因为在binlog中,可能真的有两个数据和ts_ms和PK一样,但确实不同。

  那我们如何同时求解两条数据呢?

  答案就在上面,我们只是建议提取所有 binlog 元数据。

  SELECT *

FROM

(

SELECT *,

row_number() over(partition BY t.id ORDER BY t.`__ts_ms` DESC,t.`__file` DESC,cast(t.`__pos` AS int) DESC) AS order_by

FROM test t

WHERE dt='{pt}'

AND hour='{now_hour}'

) t1

WHERE t1.order_by = 1

  说明本sql中row_number的条件

  __ts_ms:binlog中的ts_ms,即事件时间。

  __file:是binlog数据的文件名。

  __pos:是binlog中数据所在文件在文件中的位置,为数据类型。

  这样组合条件取出的数据是最新的。

  有的读者可能会问,如果这条数据被删除了怎么办?你这样检索的数据是不是错了?

  这个Debezium也有相应的操作,有相应的配置选项供你选择如何处理删除行为的binlog数据。

  作为给大家参考,作者选择了rewrite的参数配置,这样在上面sql最外层,只需要判断“delete = 'false'”是正确的数据即可。

  架构总结

  在技术选择和整体细节结构上,作者始终坚持一个原则——

  过程应该尽可能简单,而不是越简单越好。数据链路越长,可能出现问题的链路就越多。后期锁定问题和运维也会很困难。

  所以笔者在技术选型上考虑了Flink + Kafka的方式,但基于当时的情况,笔者没有选择这样的技术选型,笔者也会说明原因。

  总结一下,我当时就想到了 Flink。如果 Flink 不是基于平台的用于开发和运维监控,它可以作为一个临时解决方案。人类开发下很容易出问题,或者大家在这样的程序框架下造轮子,造的越多越慢。而且后期的主要项目方向并没有把Flink平台化提上日程,所以也是考虑到了部分未来情况的选择。

  所以当我最终决定技术选择的时候,我没有选择 Flink。

  结论

  文章的这篇文章比较理论,也是这个场景的技术理论总结。如果文章中还有其他不清楚的操作,可以参考作者之前的文章详细代码级操作。

  技术架构方案有很多种。我只是选择了其中之一来实施。也希望大家有其他的技术方案或理论交流。请纠正我。

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线