袋鼠云研发手记:第五期和实时采集袋鼠云云引擎团队
优采云 发布时间: 2021-08-22 19:30袋鼠云研发手记:第五期和实时采集袋鼠云云引擎团队
袋鼠云研发笔记
作为一家创新驱动的科技公司,袋鼠云每年研发投入数千万,公司员工80%为技术人员,()、()等产品不断迭代。在产品研发的过程中,技术兄弟可以文武兼备,在不断提升产品性能和体验的同时,也记录了这些改进和优化的过程,现记录在“袋鼠云研发笔记”栏目,以跟上行业的步伐。童鞋分享交流。
Kangaroo 云数据堆栈引擎团队
袋鼠云数据栈引擎团队拥有多位专家级、经验丰富的后端开发工程师,分别支持公司大数据栈产品线不同子项目的开发需求。 FlinkX(基于Flink Data同步)、Jlogstash(java版logstash的实现)、FlinkStreamSQL(扩展原生FlinkSQL,实现流维表的join)多个项目。
在长期的项目实践和产品迭代过程中,团队成员不断探索和探索Hadoop技术栈,积累了丰富的经验和最佳实践。
第五期
FlinkX采集中可续传和实时性详解
袋鼠云云原生一站式数据中心PaaS-数据栈,涵盖数据中心建设过程中所需的各种工具(包括数据开发平台、数据资产平台、数据科学平台、数据服务引擎等) ,全面覆盖离线计算和实时计算应用,帮助企业大大缩短数据价值的提取过程,提高数据价值的提取能力。
数据栈架构图 目前数据栈-离线开发平台(BatchWorks)中的数据离线同步任务和数据栈-实时开发平台(StreamWorks)中的数据实时采集任务有基于 FlinkX 统一。数据离线采集和实时采集的基本原理是一样的。主要区别在于源流是否有界,所以使用 Flink 的 Stream API 来实现这两个数据同步场景来实现数据。同步批处理流程统一。
1
功能介绍
断点后继续上传
断点续传是指数据同步任务在运行过程中由于各种原因失败。无需重新同步数据。您只需要从上次失败的位置继续同步,类似于由于网络原因下载文件时。如果原因失败,则无需再次下载文件,只需继续下载,可大大节省时间和计算资源。可续传是数据栈-离线开发平台(BatchWorks)中数据同步任务的一个功能,需要结合任务的错误重试机制来完成。当任务失败时,它会在引擎中重试。重试时,会从上次失败时读取的位置继续读取数据,直到任务运行成功。
实时采集
实时采集是数据栈-实时开发平台(StreamWorks)中数据采集任务的一个功能。当数据源中的数据被添加、删除或修改时,同步任务会监控这些变化,并将数据实时同步到目标数据源。除了实时数据变化,实时采集和离线数据同步的另一个区别是:实时采集任务不会停止,任务会一直监控数据源是否发生变化。这点和Flink任务是一致的,所以实时采集任务是数字栈流计算应用中的一种任务类型,配置过程与离线计算中的同步任务基本相同。
2
Flink 中的检查点机制
无论是可续传上传还是实时采集都依赖于Flink的Checkpoint机制,所以先简单介绍一下。 Checkpoint 是 Flink 容错机制的核心功能。它可以根据配置,根据Stream中各个Operator的状态,周期性的生成Snapshots,从而将这些状态数据定期持久化存储。当 Flink 程序意外崩溃时,它会重新运行 程序可以有选择地从这些 Snapshot 中恢复,从而纠正因故障导致的程序数据状态中断。
Checkpoint被触发时,会在多个分布式Stream Sources中插入一个barrier标签,这些barrier会随着Stream中的数据记录流向下游的算子。当运营商收到屏障时,它将暂停处理 Steam 中新收到的数据记录。因为一个Operator可能有多个输入Streams,每个Stream中都会有一个对应的barrier,所以Operator必须等待输入Stream中的所有barrier都到达。当流中的所有障碍都到达操作员时,所有障碍似乎都在同一时刻(表明它们已对齐)。在等待所有barrier到达的时候,operator的缓冲区可能已经缓存了一些比Barrier更早到达Operator的数据记录(Outgoing Records)。此时,Operator 会发出(Emit)数据记录(Outgoing Records)作为下游 Operator 的输入。最后,Barrier 会对应 Snapshot (Emit) 发送出去作为第二个 Checkpoint 的结果数据。
3
断点后继续上传
先决条件
同步任务必须支持可续传,对数据源有一些强制性要求:
1、 数据源(这里特指关系型数据库)必须收录升序字段,例如主键或日期类型字段。检查点机制会在同步过程中记录这个字段的值。这在任务恢复时使用。字段结构查询条件过滤已同步的数据。如果这个字段的值不是升序,那么在任务恢复时过滤的数据是错误的,最终会导致数据丢失或重复;
2、数据源必须支持数据过滤。否则,任务无法从断点处恢复,会造成数据重复;
3、目标数据源必须支持事务,比如关系数据库。临时文件也可以支持文件类型的数据源。
任务操作的详细流程
我们用一个具体的任务来详细介绍整个过程,任务详情如下:
数据来源
mysql表,假设表名为data_test,该表收录主键字段id
目标数据源
hdfs 文件系统,假设写入路径为 /data_test
并发数
2
检查点配置
时间间隔为60s,checkpoint的StateBackend为FsStateBackend,路径为/flinkx/checkpoint
工作 ID
用于构造数据文件的名称,假设是abc123
1) 读取数据 读取数据时,首先要构造数据片段。构造数据分片就是根据通道索引和检查点记录的位置构造查询sql。 sql模板如下:
select * from data_test where id mod ${channel_num}=${channel_index}and id > ${offset}
如果是第一次运行,或者最后一个任务失败时没有触发checkpoint,那么offset不存在。根据偏移量和通道,具体查询sql:偏移量存在时的第一个通道:
select * from data_testwhere id mod 2=0and id > ${offset_0};
第二个频道:
select * from data_testwhere id mod 2=1and id > ${offset_1};
偏移量不存在时的第一个通道:
select * from data_testwhere id mod 2=0;
第二个频道:
select * from data_testwhere id mod 2=1;
数据分片构建完成后,每个通道根据自己的数据分片来读取数据。 2)Write data before write data:检查/data_test目录是否存在,如果目录不存在,创建这个目录,如果目录存在,执行2次操作;判断是否以覆盖方式写入数据,如果是,则删除/data_test目录,然后创建目录,如果不是,则执行3次操作;检查/data_test/.data目录是否存在,如果存在,先将其删除,然后再创建,以确保没有其他任务因异常失败而遗留的dirty。数据文件;写入hdfs的数据是单片写入的,不支持批量写入。数据会先写入/data_test/.data/目录,数据文件的命名格式为:channelIndex.jobId.fileIndex 收录三个部分:通道索引、jobId、文件索引。当3)checkpoint 被触发时,FlinkX 中的“状态”代表标识字段 id 的值。我们假设触发检查点时两个通道的读写如图所示:
触发checkpoint后,两个reader首先生成Snapshot记录读取状态,channel 0的状态为id=12,channel 1的状态为id=11。快照生成后,会在数据流中插入一个barrier,barrier和数据一起流向Writer。以 Writer_0 为例。 Writer_0 接收 Reader_0 和 Reader_1 发送的数据。假设先收到了Reader_0的barrier,那么Writer_0就停止向HDFS写入数据,先把收到的数据放入InputBuffer,等待Reader_1的barrier到达。然后写出Buffer中的所有数据,然后生成Writer的Snapshot。整个checkpoint结束后,记录的任务状态为: Reader_0: id=12Reader_1: id=11Writer_0: id=无法确定 Writer_1:id=无法确定任务状态 会记录在配置的HDFS目录/flinkx/检查点/abc123。因为每个Writer接收两个Reader的数据,每个通道的数据读写速率可能不同,所以Writer接收数据的顺序是不确定的,但这不影响数据的准确性,因为数据是read 这个时候只能使用Reader记录的状态来构造查询sql,我们只需要确保数据真的写入HDFS即可。
Writer 在生成 Snapshot 之前,会做一系列的操作来保证所有接收到的数据都写入 HDFS: a.关闭写入 HDFS 文件的数据流,这时候会出现两对数据在 /data_test/.data 目录中生成。两个文件:/data_test/.data/0.abc123.0/data_test/.data/1.abc123.0b。将生成的两个数据文件移动到/data_test目录下; C.更新文件名称模板更新为:channelIndex.abc123.1;快照生成后,任务继续读写数据。如果在生成快照的过程中出现异常,任务会直接失败,所以这次不会生成快照,任务会被恢复。从上次成功的快照恢复。 4)任务正常结束。任务正常结束时,会执行与生成快照时相同的操作,关闭文件流,移动临时数据文件等5)任务异常终止如果任务异常结束,假设最后一个检查点的状态任务结束时的记录为:Reader_0: id=12Reader_1: id=11 那么当任务恢复时,每个通道记录的状态都会被赋值给offset,再次读取数据时构造的sql是:第一个通道:
select * from data_testwhere id mod 2=0and id > 12;
第二个频道:
select * from data_testwhere id mod 2=1and id > 11;
这样就可以从上次失败的位置继续读取数据了。
支持续传上传的插件
理论上只要支持过滤数据的数据源和支持事务的数据源都可以支持续传功能,FlinkX目前支持的插件如下:
读者
作家
关系数据读取插件如mysql
HDFS、FTP、mysql等关系型数据库写入插件
4
实时采集
目前FlinkX支持实时采集插件,包括KafKa和binlog插件。 binlog插件是专门为实时采集mysql数据库设计的。如果要支持其他数据源,只需要将数据输入到Kafka,然后就可以使用FlinkX的Kafka插件来消费数据。比如oracle,你只需要使用oracle的ogg将数据传输到Kafka即可。这里专门讲解mysql的实时采集插件binlog。
二进制日志
binlog 是由 Mysql 服务器层维护的二进制日志。它与innodb引擎中的redo/undo log是完全不同的日志;它主要用于记录更新或潜在更新mysql数据的SQL语句,并以“事务”的形式存储在磁盘上。 binlog的主要功能有:
Replication:MySQL Replication在Master端打开binlog,Master将自己的binlog传递给slave并重放,达到主从数据一致性的目的;
数据恢复:通过mysqlbinlog工具恢复数据;
增量备份。
MySQL 主备复制
仅仅有记录数据变化的binlog日志是不够的。我们还需要用到MySQL的主从复制功能:主从复制是指一台服务器作为主数据库服务器,另一台或多台服务器作为从数据库服务器。主服务器中的数据自动复制到从服务器。
主/从复制的过程:MySQL主将数据变化写入二进制日志(二进制日志,这里的记录称为二进制日志事件,可以通过show binlog events查看); MySQL slave将master的binary log events复制到它的relay log; MySQL slave 重放中继日志中的事件,并将数据变化反映到自己的数据中。
写入 Hive
binlog插件可以监控多张表的数据变化。解析的数据收录表名信息。读取的数据可以全部写入目标数据库中的一个表中,也可以根据数据中收录的表名信息写入不同的表中。目前只有 Hive 插件支持此功能。 Hive插件目前只有一个写插件,功能是基于HDFS写插件实现的,也就是说从binlog读取和写入hive也支持故障恢复功能。
写入Hive的过程:从数据中解析出MySQL表名,然后根据表名映射规则转换成对应的Hive表名;检查Hive表是否存在,如果不存在,则创建Hive表;查询Hive表相关信息,构造HdfsOutputFormat;调用 HdfsOutputFormat 将数据写入 HDFS。
欢迎了解袋鼠云数栈