文章采集调用(【开源项目】袋鼠云云原生一站式数据中台PaaS)
优采云 发布时间: 2022-01-16 20:25文章采集调用(【开源项目】袋鼠云云原生一站式数据中台PaaS)
数据栈是云原生的一站式数据中心PaaS。我们在 github 和 gitee 上有一个有趣的开源项目:FlinkX。 FlinkX 是一个基于 Flink 的批量流统一数据同步工具。可以是 采集static ,是一个集全局、异构、批处理流为一体的数据同步引擎。如果你喜欢它,请给我们一个star!星星!星星!
github开源项目:
gitee开源项目:
袋鼠云云原生的一站式数据中台PaaS——数据栈,涵盖了建设数据中心过程中所需的各种工具(包括数据开发平台、数据资产平台、数据科学平台、数据服务引擎等),具有全面覆盖 离线计算和实时计算应用帮助企业大大缩短数据价值的提取过程,提高数据价值的提取能力。
目前,数据栈-离线开发平台(BatchWorks)中的数据离线同步任务和数据栈-实时开发平台(StreamWorks)中的数据实时采集任务已经基于FlinkX进行了统一. 离线采集和实时数据采集的基本原理是一样的。主要区别在于源流是否有界。因此,使用 Flink 的 Stream API 来同步这两个数据。场景,实现数据同步的批量流统一。
一、功能介绍
1、断点续传
断点续传是指数据同步任务在运行过程中由于各种原因而失败。它不需要重新同步数据,只需要从上次失败的位置继续同步即可。与网络原因下载文件失败类似,无需重新下载文件,继续下载即可,可大大节省时间和计算资源。断点续传是DataStack-离线开发平台(BatchWorks)中数据同步任务的功能,需要结合任务的错误重试机制来完成。当任务运行失败时,将在引擎中重试。重试时会从上次失败时读取的位置继续读取数据,直到任务运行成功。
2、直播采集
实时采集是数据栈实时开发平台(StreamWorks)中data采集任务的一个功能。数据实时同步到目标数据源。除了实时数据变化之外,实时采集和离线数据同步的另一个区别是实时采集任务不会停止,任务会一直*敏*感*词*数据源变化。这与 Flink 任务是一致的,所以实时 采集 任务是栈流计算应用中的任务类型,配置过程与离线计算中的同步任务基本相同。
二、Flink 中的检查点机制
断点续传和实时 采集 都依赖于 Flink 的 Checkpoint 机制,所以我们先简单看一下。
Checkpoint 是 Flink 容错机制的核心功能。它可以根据配置,根据 Stream 中各个算子的状态,定期生成 Snapshots,从而定期持久地存储这些状态数据。当 Flink 程序意外崩溃时,会重新运行。程序可以选择性地从这些快照中恢复,从而纠正因故障引起的程序数据状态中断。
当一个 Checkpoint 被触发时,一个屏障标记被插入到多个分布式 Stream Source 中,这些屏障与 Stream 中的数据记录一起流向下游算子。当操作员收到障碍时,它会暂停处理 Steam 中新收到的数据记录。因为一个 Operator 可能有多个输入 Stream,而每个 Stream 都会有一个对应的 Barrier,所以 Operator 必须等到输入 Stream 中的所有 Barrier 都到达。
当流中所有的barrier都到达operator时,那么所有的barrier就出现在同一个时间点(表示已经对齐),在等待所有barrier到达的过程中,buffer的operator 可能已经缓存了一些比 Barrier 更早到达 Operator 的数据记录(Outgoing Records),那么 Operator 会发出(Emit)这些数据记录(Outgoing Records)作为下游 Operator 的输入,最后发出( Emit) Barrier 对应的 Snapshot 作为 this checkpoint 的结果数据。
三、断点续传
1、先决条件
为了支持断点恢复上传,同步任务对数据源有一些强制性要求:
1)数据源(此处为关系数据库)必须收录升序字段,例如主键或日期类型字段。同步过程中会使用检查点机制记录该字段的值,当任务恢复运行时会使用该字段。构造查询条件,过滤已同步的数据。如果该字段的值不按升序排列,则任务恢复时过滤的数据会出错,最终导致数据丢失或重复;
2)数据源必须支持数据过滤。否则,任务将无法从断点处恢复运行,从而导致数据重复;
3)目标数据源必须支持事务,比如关系型数据库,也可以通过临时文件支持文件类型的数据源。
2、任务运行详细流程
让我们以一个具体的任务来详细介绍整个过程。任务详情如下:
1)读取数据
读取数据时,首先构建数据分片。构造数据分片就是根据通道索引和检查点记录的位置构造查询sql。sql模板如下:
select * from data_test
where id mod ${channel_num}=${channel_index}
and id > ${offset}
如果是第一次运行,或者上次任务失败时checkpoint还没有触发,则offset不存在,具体查询sql可以根据offset和channel确定:
存在偏移时
第一个频道:
select * from data_test
where id mod 2=0
and id > ${offset_0};
第二频道:
select * from data_test
where id mod 2=1
and id > ${offset_1};
当偏移不存在时
第一个频道:
select * from data_test
where id mod 2=0;
第二频道:
select * from data_test
where id mod 2=1;
数据分片构建完成后,每个通道根据自己的数据分片读取数据。
2)写入数据
在写数据之前,会先做几个操作:
一个。检查 /data_test 目录是否存在。如果目录不存在,则创建目录。如果目录存在,则执行2个操作;
湾。判断是否以overwrite方式写入数据,如果是,删除/data_test目录,然后创建目录,如果不是,执行3个操作;
C。检查/data_test/.data目录是否存在。如果存在,则先删除再创建,确保没有其他任务因异常失败而留下的脏数据文件;
数据单行写入hdfs,不支持批量写入。数据将首先写入 /data_test/.data/ 目录。数据文件的命名格式为:
channelIndex.jobId.fileIndex
收录三部分:通道索引、jobId、文件索引。
3)触发检查点时
在 FlinkX 中,“status”表示标识字段 id 的值。我们假设触发检查点时两个通道的读写情况如图所示:
触发检查点后,两个读取器首先生成一个 Snapshot 来记录读取状态。通道 0 的状态为 id=12,通道 1 的状态为 id=11。Snapshot生成后,在数据流中插入一个barrier,barrier和数据一起流向Writer。以Writer_0为例,Writer_0接收Reader_0和Reader_1发送的数据。假设首先收到Reader_0 的屏障。此时Writer_0停止向HDFS写入数据,将接收到的数据先放入InputBuffer,等待Reader_1的barrier到来。之后,将Buffer中的所有数据写出,然后生成Writer的Snapshot。整个检查点结束后,记录的任务状态为:
阅读器_0:id=12
阅读器_1:id=11
Writer_0:id=无法确定
Writer_1:id=无法确定
任务状态会记录在配置的HDFS目录/flinkx/checkpoint/abc123中。因为每个Writer会从两个Reader接收数据,而且每个通道的数据读写速率可能不同,所以Writer接收数据的顺序是不确定的,但这并不影响数据的准确性,因为数据是read 当我们只需要Reader记录的状态时,我们可以构造查询sql,我们只需要保证数据真的写入HDFS即可。在 Writer 生成 Snapshot 之前,会执行一系列操作以确保将所有接收到的数据写入 HDFS:
一个。关闭写入HDFS文件的数据流。这时候会在/data_test/.data目录下生成两个文件:
/data_test/.data/0.abc123.0
/data_test/.data/1.abc123.0
湾。将生成的两个数据文件移动到/data_test目录下;
C。将文件名模板更新为:channelIndex.abc123.1;
快照生成后,任务继续读写数据。如果在生成快照的过程中出现异常,任务会直接失败,这样本次就不会生成快照,任务恢复时会从上次成功的快照恢复。
4)任务正常结束
当任务正常结束时,它会做与生成快照时相同的操作,关闭文件流,移动临时数据文件等。
5)任务异常终止
如果任务异常结束,则假设任务结束时最后一条检查点记录的状态为:
Reader_0:id=12Reader_1:id=11
那么当任务恢复时,会将每条通道记录的状态赋值给offset,再次读取数据时构造的sql为:
第一个频道:
select * from data_test
where id mod 2=0
and id > 12;
第二频道:
select * from data_test
where id mod 2=1
and id > 11;
这将允许您从上次失败的位置继续读取数据。
3、一个支持续传上传的插件
理论上,只要支持过滤数据的数据源和支持事务的数据源都可以支持断点续传功能,FlinkX目前支持的插件如下:
四、直播采集
目前 FlinkX 支持实时 采集 插件,包括 Kafka 和 binlog 插件。binlog 插件是为实时采集 mysql 数据库设计的。如果要支持其他数据源,只需要将数据发送到Kafka,然后使用FlinkX的Kafka插件消费数据,比如oracle,只需要使用oracle的ogg发送数据到Kafka即可。这里具体讲解mysql的实时采集插件binlog。
1、二进制日志
binlog是Mysql sever层维护的二进制日志,与innodb引擎中的redo/undo log完全不同;它主要用于记录更新或可能更新 MySQL 数据的 SQL 语句,并以“事务”格式存储在磁盘上。
binlog的主要功能有:
1)Replication:MySQL Replication在Master端开启binlog,Master将其binlog传递给slave,并replay,达到主从数据一致性的目的;
2)数据恢复:通过mysqlbinlog工具恢复数据;
3)增量备份。
2、MySQL主从复制
有记录数据变化的binlog日志是不够的,我们还需要用到MySQL的主备复制功能:主备复制是指一台服务器作为主库服务器,另一台或多台服务器作为从库服务器。数据自动复制到从服务器。
主从复制的过程:
1)MySQL master将数据变化写入二进制日志(binary log,这里的记录称为binary log events,可以通过show binlog events查看);
2)MySQL slave将master的二进制日志事件复制到它的relay log中;
3)MySQL slave 在中继日志中重放事件,反映数据变化到它自己。
3、写入 Hive
binlog 插件可以监控多个表的数据变化。解析后的数据收录表名信息。读取的数据可以写入目标数据库的表中,也可以根据数据中收录的表名信息写入。不同的表,目前只有 Hive 插件支持这个功能。Hive插件目前只有写插件,功能是基于HDFS写插件实现的。也就是说,从binlog读取写入hive的功能也支持故障恢复的功能。
写入Hive的过程:
1)从数据中解析出MySQL表名,然后根据表名映射规则转换成对应的Hive表名;
2)检查Hive表是否存在,如果不存在,则创建Hive表;
3)查询Hive表的相关信息,构造HdfsOutputFormat;
4)调用 HdfsOutputFormat 将数据写入 HDFS。