Binlog实时数据采集、落地数据使用的思索总结
优采云 发布时间: 2020-08-12 14:57前文
今天碰巧刷新技术公众号的时侯,看到一篇这样文章,是基于Flink有关于Mysql Binlog数据采集的方案,看了一下实践方式和具体操作有一些考虑情况不足的情况,缺少一些处理实际情况的操作。笔者之前有些过一些类似的采集工具实践的文章,但是并没有在整体上作出一个系统性的总结,所以我在想,是不是可以做一个个人总结性的文章,把Binlog采集中的问题以及相应的解决方案也进行总结呢?
可能很多人对于Binlog的认识还不是太充足,可能有些人会浅显的觉得:“它不就是mysql形成的,有固定结构的log嘛,把数据采集过来,然后把它做一下数据落地,它有哪些难的呢?”
的确,它本质上确实就是个log,可是实际上,关于Binlog采集从场景剖析,再到技术选型,整体内部有很多不为人知的坑,不要轻视了它。
笔者写这篇文章,目的是把实际工作中对于Binlog数据采集的开发流程的原则、注意事项、可能存在的问题点展示下来,其中也会有笔者自己的一些个人总结数据采集中的原则,为你们作参考,都是干货。
所以开始吧!
个人总结原则
首先摒弃技术框架的讨论,个人总结Binlog 日志的数据采集主要原则:
分别论述一下这三个原则的具体含意
原则一
在数据采集中,数据落地通常还会使用时间分区进行落地,那就须要我们确定一下固定的时间戳作为时间分区的基础时间序列。
在这些情况下看来,业务数据上的时间戳数组,无论是从实际开发中获取此时间戳的角度,还是现实表中就会存在这样的时间戳,都不可能所有表完全满足。
举一下例子:
表 :业务时间戳
table A : create_time,update_time
table B : create_time
table C : create_at
table D : 无
像这样的情况,理论上可以通过限制 RD 和 DBA 的在设计表时规则化表结构来实现时间戳以及命名的统一化、做限制,但是是在实际工作中,这样的情况基本上是做不到的,相信好多读者也会碰到这样的情况。
可能好多做数据采集的同学会想,我们能不能要求她们去制订标准呢?
个人的看法是,可以,但是不能把大数据底层数据采集完全借助这样相互制订的标准。原因有以下三点:
所以假如想要使用惟一固定的时间序列,就要和业务的数据剥离开,我们想要的时间戳不受业务数据的变动的影响。
原则二
在业务数据库中,一定会存在表结构变更的问题,绝大部分情况为降低列,但是也会存在列重命名、列删掉这类情况,而其中数组变更的次序是不可控的。
此原则想描述的是,导入到数据库房中的表,要适应数据库表的各类操作,保持其可用性与列数据的正确性。
原则三
此数据可回溯,其中包括两个方面
第一个描述的是,在采集binlog采集端,可以重新按位置采集binlog。
第二个描述的是,在消费binlog落地的一端,可以重复消费把数据重新落地。
此为笔者个人总结,无论是选择什么样的技术选型进行组合搭建,这几点原则是须要具备的。
实现方案以及具体操作
技术构架 : Debezium + Confluent + Kafka + OSS/S3 + Hive
基于原则一的解决方案
Debezium 提供了 New Record State Extraction 的配置选项,相当于提供了一个transform 算子,可以抽取出binlog 中的元数据。
对于 0.10 版本的配置,可以抽取 table,version,connector,name,ts_ms,db,server_id,file,pos,row 等binlog元数据信息。
其中ts_ms为binlog日志的形成时间,此为binlog元数据,可以应用于所有数据表,而且可以在完全对数据表内部结构不了解的情况下,使用此固定时间戳,完全实现我们的原则一。
关于Debezium,不同版本之前的配置参数有可能是不同的,如果读者有须要实践的话须要在官方文档上确认相应版本的配置参数。
对于其他框架,例如市面上用的较多的Canal,或者读者有自己须要开发数据采集程序的话,binlog的元数据建议全部抽取下来,在此过程以及后续过程中都可能会被用到。
基于原则二的解决方案
对于 Hive ,目前主流的数据储存格式为Parquet,ORC,Json,Avro 这几种。
抛开数据储存的效率讨论。
对于前两中数据格式,为列存,也就是说,这两种数据格式的数据读取,会严格依赖于我们数据表中的数据储存的次序,这样的数据格式,是难以满足数据列灵活降低、删除等操作的。
Avro 格式为行存,但是它须要依赖于Schema Register服务,考虑Hive的数据表读取完全要依赖一个外部服务,风险偏高。
最后确定使用Json格式进行数据储存,虽然这样的读取和储存效率没有其他格式高,但是这样可以保证业务数据的任何变更都可以在hive中读取下来。
Debezium 组件采集binlog 的数据就是为json格式,和预期的设计方案是吻合的,可以解决原则二带来的问题。
对于其他框架,例如市面上用的较多的Canal,可以设置为Json数据格式进行传输,或者读者有自己须要开发数据采集程序的话,也是相同的道理。
基于原则三的解决方案
在采集binlog采集端,可以重新按位置采集binlog。
此方案实现方法在Debezium官方网站上也给出了相应的解决方案,大概描述一下,需要用到 Kafkacat工具。
对于每一个采集的mysql实例,创建数据采集任务时,Confluent就会相应的创建connector(也就是采集程序)的采集的元数据的topic,
里面会储存相应的时间戳、文件位置、以及位置,可以通过更改此数据,重置采集binlog日志的位置。
值得注意的是,此操作的时间节点也是有限制的,和mysql的binlog日志保存周期有关,所以此方法回溯时,需要确认的是mysql日志还存在。
对于重复消费把数据重新落地。
此方案由于基于kafka,对于kafka重新制订消费offset消费位点的操作网上有很多方案,此处不再赘言。
对于读者自己实现的话,需要确认的选择的MQ支持此特点就好了。
#how_to_change_the_offsets_of_the_source_database
业务场景影响下的重要操作
此部份只描述在笔者技术构架下怎样实现以下操作,读者可以按照自己选择的技术组件探究不同的技术方案。
数据库分库分表的情况
基于Debezium的构架,一个Source 端只能对应一个mysql实例进行采集,对于同一实例上的分表情况,可以使用 Debezium Topic Routing 功能,
在采集过滤binlog时把相应须要采集的表根据正则匹配写入一个指定的topic中。
在分库的情况下,还须要在 sink 端 增加 RegexRouter transform算子进行topic 间的合并写入操作。
数据增量采集与全量采集
对于采集组件,目前目前的配置都是以增量为默认,所以无论是选择 Debezium 还是 Canal的话,正常配置就好。
但是有些时侯会存在须要采集全表的情况,笔者也给出一下全量的数据采集的方案。
方案一
Debezium 本身自带了这样的功能,需要将
snapshot.mode 参数选型设置为 when_needed,这样可以做表的全量采集操作。
官方文档中,在此处的参数配置有愈发细致的描述。
#snapshots
方案二
使用sqoop和增量采集同时使用的方法进行。
此方案适用于表数据已存在好多,而目前binlog数据频度不频繁的情况下,使用此方案。
值得注意的是有两点:
离线数据去重条件
数据落地后,通过json表映射出binlog原创数据,那么问题也就来了,我们怎么找到最新的一条数据呢?
也许我们可以简单的觉得,用我们刚才的抽取的那种ts_ms,然后做倒排不就好了吗?
大部分情况下这样做确实是可以的。
但是笔者在实际开发中,发现这样的情况是不能满足所有情况的,因为在binlog中,可能真的会存在 ts_ms 与 PK 相同,但是确实不同的两条数据。
那我们如何去解决时间都相同的两条数据呢?
答案就在上文,我们刚才建议的把binlog 的元数据都抽取下来。
SELECT *
FROM
(
SELECT *,
row_number() over(partition BY t.id ORDER BY t.`__ts_ms` DESC,t.`__file` DESC,cast(t.`__pos` AS int) DESC) AS order_by
FROM test t
WHERE dt='{pt}'
AND hour='{now_hour}'
) t1
WHERE t1.order_by = 1
解释一下这个sql 中row_number的的条件
__ts_ms : 为binlog中的ts_ms,也就是风波时间。
__file : 为binlog此条数据所在file name。
__pos : 为binlog中此数据所在文件中的位置,为数据类型。
这样的条件组合取出的数据,就是最新的一条。
也许有读者会问,如果这条数据被删除了如何办,你这样取下来的数据不就是错的了吗?
这个Debezium也有相应的操作,有相应的配置选项使你怎么选择处理删掉行为的binlog数据。
作为给你们的参考,笔者选择 rewrite 的参数配置,这样在前面的sql最内层只须要判定 “delete = ’false‘“ 就是正确的数据啦。
架构上的总结
在技术选型以及整体与细节的构架中,笔者一直在坚持一个原则——
流程尽量简洁而不简单,数据环节越长,出问题的环节就可能越多。对于后期锁定问题与运维难度也会很高。
所以笔者在技术选型也曾考虑过Flink + Kafka 的这些方法,但是基于当时的现况,笔者并没有选择这样的技术选型,笔者也探讨一下缘由。
总结上去,我当时对于Flink的思索,如果Flink没有做开发和运维监控的平台化的情况下,可以作为一个临时方案,但是后期假如仍然在这样一个开发流程下缝缝补补,多人开发下很容易出现问题,或者就是你们都这样一个程序框架下造轮子,而且越造越慢。而且后期的主要项目方向并没有把Flink平台化提上日程,所以也是考虑了一部分未来的情况进行的选择。
所以个人最后确定技术选型的时侯,并没有选用Flink。
结束语
此篇文章笔者写的较为理论化,也是对此场景的一个技术理论总结。如果文中有其他不明晰的操作的话,可以参考笔者之前的文章,有详尽的代码级操作。
技术构架上的方案多种多样,笔者只是选择了其中一种进行实现,也希望你们有其他的技术方案或则理论进行交流,烦请见谅。