解决方案:基于Binlog实时同步数据仓库问题总结

优采云 发布时间: 2022-11-25 01:19

  解决方案:基于Binlog实时同步数据仓库问题总结

  前言

  今天刷公众号的时候,看到了这篇文章:基于Binlog和Flink的实时同步数据仓库实践。主要解释是基于Flink对Mysql Binlog数据采集的规划。看了实用的方法和具体的代码操作,感觉有些情况考虑的还不够。作者之前对采集工具的实践也做过一些类似的总结,但是没有从整体上做一个系统的总结,所以我在想,能不能做一个个人的总结文章,总结一下Binlog采集中存在的问题以及相应的解决方法也总结了?

  很多人可能对Binlog还不够了解,有些人可能肤浅地想:“不就是mysql产生的吗,它有固定结构的日志,采集

数据,然后做成数据落地,它有什么这么困难?”

  的确,本质上确实是日志,但其实Binlog采集从场景分析到技术选型都有很多不为人知的坑,大家不要小看它。

  作者写这篇文章的目的是展示Binlog数据采集开发过程在实际工作中的原理、注意事项以及可能出现的问题。也会有一些笔者自己在资料采集

中的原则个人总结,供大家参考,都是干货。

  让我们开始吧!

  一、Binlog实时采集与汇总原理

  首先抛开技术框架的讨论,我个人总结一下Binlog日志数据采集的主要原则:

  解释这三个原则的具体含义

  原则一

  在数据采集中,数据落地一般采用时间分区的方式进行落地,因此我们需要确定一个固定的时间戳作为时间分区的基本时间序列。

  这种情况下,业务数据上的时间戳字段,无论从实际开发中获取这个时间戳,还是在实际表中都会有这样的时间戳来看,不可能所有的表都完全满足。

  举个反例:

  表:业务时间戳(或事件时间)

  表 A:创建时间、更新时间

  表 B:创建时间

  表 C:create_at

  表D:无

  像这样的情况,理论上在设计表的时候可以通过限制RD和DBA规范表结构来统一和限制时间戳和命名,但是在实际工作中,这样的情况基本是不可能的。是的,相信很多读者都会遇到这种情况。

  很多做数据采集的同学可能会想,能不能请他们来制定标准呢?

  我个人的看法是可以,但是大数据的底层数据采集不能完全依赖这样的相互标准。

  原因有以下三个:

  因此,如果要使用唯一的固定时间序列,就必须将其与业务数据分开。我们想要的时间戳不受业务数据变化的影响。

  原理二

  在一个业务数据库中,肯定存在表结构变化的问题。大多数情况下是添加列,但也有列重命名、列删除等情况,字段变化顺序不可控。

  这个原则想描述的是导入数据仓库的表要适应数据库表的各种操作,并保持其可用性和列数据的正确性。

  原则三

  这个数据是有溯源的,包括两个方面

  第一种描述是在binlog采集

端,可以重新按location采集

binlog。

  第二种描述是在consumer binlog登陆结束时,可以通过重复消费重新登陆数据。

  这是作者的个人总结。无论选择何种技术选型进行组合施工,都需要满足这些原则。

  二、实施方案及具体操作

  技术架构:Debezium + Confluent + Kafka + OSS/S3 + Hive

  基于原理一的解决方案:

  

" />

  Debezium 提供了 New Record State Extraction 的配置选项,相当于提供了一个 transform 算子,可以提取 binlog 中的元数据。

  对于0.10版本的配置,可以提取table、version、connector、name、ts_ms、db、server_id、file、pos、row等binlog元数据信息。

  其中ts_ms是binlog日志的生成时间,是binlog元数据,可以应用于所有数据表,在不知道数据表内部结构的情况下,可以使用这个固定的时间戳来充分实现我们的原理一.

  关于Debezium,不同版本之前的配置参数可能不同。如果读者需要练习,需要在官方文档中确认对应版本的配置参数。

  对于其他框架,比如市场上广泛使用的Canal,或者读者需要开发数据采集程序,建议将binlog的元数据全部提取出来,在本流程和后续流程中可能会用到。

  基于原则二的解决方案

  对于Hive,目前主流的数据存储格式有Parquet、ORC、Json、Avro。

  抛开数据存储的效率讨论。

  对于前两种数据格式,它是按列存储的,也就是说,这两种数据格式的数据读取将严格依赖于数据在我们的数据表中的存储顺序。这样的数据格式不能满足数据列的灵活性。添加、删除等操作。

  Avro格式是基于行的,但是需要依赖Schema Register服务。考虑到Hive的数据表读取完全依赖于外部服务,风险太大。

  最后确定使用Json格式进行数据存储。这种读取和存储效率虽然没有其他格式高,但是可以保证业务数据的任何变化都能在hive中读取到。

  Debezium组件采集的binlog数据为json格式,符合预期的设计方案,可以解决第二个原理带来的问题。

  对于其他框架,比如市场上使用比较广泛的Canal,可以设置为Json数据格式进行传输,或者需要读者自行开发数据采集程序,同理。

  基于原理三的解决方案

  在binlog采集

端,可以重新按位置采集

binlog。

  Debezium官网也给出了该方案的实现。大致描述了相应的解决方案,需要Kafkacat工具。

  对于每一个采集到的mysql实例,在创建数据采集任务时,Confluent会为connector(即采集程序)采集的元数据对应创建一个topic。

  相应的时间戳、文件位置、位置都会存储在里面。您可以通过修改该数据来重新设置采集

binlog日志的位置。

  值得注意的是,这个操作的时间节点也是有限制的,这跟mysql binlog日志的保存周期有关。所以这种方式回溯的时候,需要确认mysql的日志是否还存在。

  对于重复消费,数据重新落地。

  因为这个方案是基于Kafka的,所以网上有很多Kafka重新设计消费offset消费站点的方案,这里不再赘述。

  读者自己实现,需要确认的选择的MQ支持该功能就好了。

  #how_to_change_the_offsets_of_the_source_database

  3、业务场景不同

  本节仅介绍在笔者的技术架构下如何实现以下操作。读者可以根据自己选择的技术组件探索不同的技术方案。

  1)数据库分库分表的情况

  基于Debezium的架构,一个source只能对应一个mysql实例进行采集。对于同一个实例上的分表情况,可以使用Debezium Topic Routing功能。

  在采集

过滤binlog时,将对应的要采集

的表按照正则匹配写入到指定的topic中。

  在分库的情况下,还需要在sink端添加RegexRouter变换算子,进行topic之间的merge写操作。

  2)数据增量采集和全量采集

  对于采集组件,目前配置默认是基于增量的,所以无论选择Debezium还是Canal,都可以正常配置。

  但是有时候会出现需要采集

全表的情况,作者也给出了采集

全量数据的方案。

  选项一:

  Debezium本身就有这样的功能,需要

  snapshot.mode参数选择设置为when_needed,这样可以进行表的全量采集

操作。

  

" />

  在官方文档中,对这里的参数配置有比较详细的说明。

  #快照

  方案二:

  同时使用sqoop和增量采集

  该方案适用于表数据已经很多,但binlog数据当前频率不高的情况。

  值得注意的有两点:

  3)离线重删条件

  数据落地后,通过json表映射binlog原创

数据,那么问题来了,我们如何找到最新的一条数据呢?

  或许我们可以简单的认为,用刚才提取出来的ts_ms就好了,然后反演一下?

  在大多数情况下确实可以这样做。

  但是笔者在实际开发中发现这样的情况并不能满足所有的情况,因为在binlog中,可能真的有两条数据ts_ms和PK相同,但确实不同。

  那么我们如何同时求解两个数据呢?

  答案如上,我们只是建议提取所有binlog的元数据。

  选择 *

  从

  (

  选择 *,

  row_number() over(partition BY t.id ORDER BY t.`__ts_ms` DESC,t.`__file` DESC,cast(t.`__pos` AS int) DESC) AS order_by

  从测试 t

  WHERE dt='{pt}'

  AND hour='{now_hour}'

  ) t1

  在哪里 t1。order_by = 1

  解释一下这条sql中row_number的情况:

  这样的条件组合检索到的数据是最新的。

  可能有读者会问,如果删除了这条数据,你这样取回的数据不就错了吗?

  这个Debezium也有相应的操作,有相应的配置选项供你选择如何处理删除行为的binlog数据。

  作为大家的参考,笔者选择了rewrite的参数配置,这样上面的SQL最外层只需要判断“delete = 'false'”是正确的数据即可。

  四、结构总结

  在技​​术的选择和整体与细节的结构上,笔者一直坚持一个原则——

  过程要尽量简单但不简单。数据链接越长,可能出错的链接就越多。后期的加锁问题和运维也会非常困难。

  所以笔者在技术选型上也考虑了Flink+Kafka的方式,但是基于当时的现状,笔者并没有选择这样的技术选型,笔者也说明了原因。

  总结一下,我当时对 Flink 的想法是,如果 Flink 没有一个开发和运维监控的平台,可以作为一个临时的解决方案。很容易出问题,或者只是大家在这样的程序框架下造轮子,越造越慢。而且后期的主要项目方向也没有把Flink平台提上日程,所以选择的时候也是考虑了一部分未来的情况。

  所以,最后确定技术选型的时候,我并没有选择Flink。

  5.结论

  解决方案:在线工具导航(一)

  工具 123 - 1750 工具收录

在整个工位

  

" />

  本工具导航网站共收录

1750个在线工具,包括转换工具、在线制作、在线生成、在线查询、图片工具、检测工具、PDF工具、SEO工具、站长推荐、生活工具、学习工具、娱乐工具、站长工具等。

  

" />

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线