采集自动组合(如下在流批一体的探索构建一体准实时数仓应用)

优采云 发布时间: 2022-01-26 07:16

  采集自动组合(如下在流批一体的探索构建一体准实时数仓应用)

  基于Hive的离线数据仓库往往是企业大数据生产系统中不可或缺的一部分。Hive数据仓库成熟度和稳定性高,但是因为离线,延迟非常大。在一些时延要求较高的场景下,需要构建基于 Flink 的实时数仓,将链路时延降低到秒级。但是一套离线数仓和一套实时数仓的架构会带来一倍以上的资源消耗,甚至导致重复开发。

  我是否必须放弃现有的 Hive 数据仓库来构建流式链接?不,在 Flink 的帮助下,现有的 Hive 离线数仓可以做到准实时。本文由 Apache Flink Committer 和阿里巴巴技术专家李劲松在 InfoQ 技术公开课上的分享整理而成。文章 将分析当前离线实时数仓的难点,并详细讲解 Flink 如何解决 Hive 流审批和准实时数仓的问题。,实现更高效、更合理的资源配置。文章大纲如下:

  实时离线数仓难点 Flink 流批融合与准实时数仓的探索与建设 实时离线数仓应用实践难点

  线下仓库

  

  上图是一个典型的离线数据仓库。假设公司现在有需求。目前,公司拥有大量数据。它需要每天生成一个报表,并输出到业务数据库中。第一种是刚刚入库的业务数据,大致分为两种,一种是MySQL binlog,一种是业务系统中的业务管理。这个日志管理信息可以通过Flume等工具发送到采集,然后离线存储到仓库。那么随着业务越来越多,业务中的每个表都可以抽象出来。抽象的好处是更好的管理和更有效的数据重用和计算重用。因此,数据仓库分为多层(细节层、中间层、服务层等),

  不仅仅是HiveSQL,Hive只是静态批量计算,业务需要每天上报,也就是说每天都在进行计算。在这种情况下,它将取决于调度工具和血统管理:

  当任务非常大的时候,我们往往需要很长时间才能得到结果,也就是我们常说的T+1、H+1,这就是离线数仓的问题。

  第三方工具

  

  如前所述,离线数仓不仅仅是简单的 Hive 计算,它还依赖于其他第三方工具,例如:

  无论是离线数仓还是第三方工具,主要的问题其实是“慢”。如何解决慢的问题,是时候出现实时数据仓库了。

  实时数据仓库

  

  实时数仓其实是从 Hive+HDFS 的组合变成了 Kafka,而 ETL 的功能是通过 Flink 的流处理来解决的。这个时候,调度和血缘管理就没有问题了。通过实时不断的增量更新,最终输出到业务DB。

  虽然延迟降低了,但是这时候我们会面临一些其他的问题:

  Lambda 架构

  

  所以这个时候很多人会选择一套实时和一套离线的方式,互不干扰,根据任务是否需要遵循实时的要求来分离需求。

  这种架构看似解决了所有问题,但实际上带来了很多问题。首先,Lambda 架构将离线和实时分开。他们解决的业务问题是相同的,但是两种解决方案从同一个数据源产生不同的计算结果。不同层级的表结构可能不一致,当出现数据不一致时,需要对比检查。

  随着这个 Lambda 架构越走越远,开发团队、表结构表依赖、计算模型等可能会分离。越走越会发现成本越来越高,统一的成本也越来越大。.

  

  那么问题来了,实时数仓会消耗这么大的资源,历史数据无法保留。Lambda 架构中存在很多问题。有哪些解决方案可以解决?

  数据湖

  

  数据湖有很多优势。原子性使我们能够实现准实时批流集成,并支持对现有数据的修改。不过,数据湖毕竟是新一代的数仓存储架构,各方面都不是完美的。现有的数据湖强烈依赖 Spark(当然 Flink 也在拥抱数据湖)。将数据迁移到数据湖需要团队考虑迁移成本和人员学习成本。

  如果没有这么大的迁移数据湖的决心,有没有稍微温和一点的方案来加速现有的离线数仓呢?

  Flink 对批流融合的探索

  统一元数据

  

  Flink 一直在不断致力于离线和实时的统一,从统一元数据开始。简单来说就是将Kafka表的元数据信息存储在HiveMetaStore中,统一离线和实时表Meta。

  (目前开源的实时计算还没有比较完善的持久化MetaStore,Hive MetaStore不仅可以保存离线表,还可以承担实时计算的MetaStore能力)。

  统一计算引擎

  

  元数据相同后,实时和离线的表结构和层次可以设计成一样,接下来就是共享了:

  统一数据

  

  分析元数据和计算引擎的统一,进一步分析是否可以统一实时和离线数据,避免数据不一致,避免数据的重复存储和重复计算。ETL计算可以统一吗?既然实时表的设计可以和离线表完全一样,那我们能不能简单的只有实时表的ETL计算,离线表从实时表中获取数据呢?

  此外,离线链路的数据准备可以通过实时链路加速,批量计算可以用流式输入代替调度。

  

  Flink Hive/File Streaming Sink就是为了解决这个问题,实时的Kafka表可以实时同步到对应的离线表:

  这时,离线批量计算也可以交给实时调度。在实时任务处理中,某个机会(Partition Commit,见后续)会自动调度离线部分任务进行数据同步。

  这时候实时表和离线表已经基本统一了,那么问题来了,Kafka中的表和Hive中的表可以共用一张表吗?我的想法是,以后可能会出现以下几种情况。在数据仓库中定义一个表,对应Kafka和Hive+HDFS这两个物理存储:

  Hive Streaming Sink 的实现

  

  Flink 在 1.11 之前已经有 StreamingFileSink。在 1.11 中,它不仅将其集成到 SQL 中,还让这个 Hive Streaming Sink 像离线的 Hive SQL 一样。所有业务逻辑都由 SQL 处理。处理,并带来进一步的增量。

  接下来介绍Hive/File Streaming Sink,它分为两个组件,FileWriter和PartitionCommitter:

  

  由于流式作业是不间断运行的,如何设置分区提交时间,分区何时提交?

  如果当前时间Current时间>分区产生的时间+commitDelay延迟,则为分区提交可以开始的时间。一个简单的例子是每小时分区。比如现在是12:00过了1分钟,11:00+1小时的分区已经过去了,那么可以说11:00的分区不会再有数据了,所以我们可以提交11: 00. 划分。(如果有LateEvent怎么办?所以分区的提交也要求是幂等的。)

  

  接下来介绍分区提交的具体作用。最直接的就是写 SuccessFile 和 Add partition 到 Hive metastore。

  Flink 内置了对 Hive-MetaStore 和 SuccessFile 的支持。只要将“sink.partition-commit.policy.kind”配置为“metastore,success-file”,就可以在提交分区时自动将分区添加到Hive,并写入SuccessFile,当添加操作完成后,分区为实际上对 Hive 可见。

  自定义机制允许您自定义分区提交策略类。这个类的实现可以在这个分区的任务处理完成后进行:比如触发下游调度,统计分析,或者触发Hive的小文件合并。(当然,触发Hive的小文件合并,不仅需要重新开始一个job,也不能保证一致性,Flink在后续会有进一步的探索,在Flink jobs中会主动完成小文件的合并)。

  实时消费

  

  不仅仅是准实时的数据摄取,Flink 还带来了维度表,将 Hive 表和流关联起来,实时消费 Hive 表。

  我们知道 Flink 支持通过维表关联查询 MySQL 和 HBase,在计算中维护了一个 LRU 缓存,查询 MySQL 或者 HBase 会漏掉。但是如果没有查找功能怎么办?数据一般都放在离线数据仓库中,所以我们在业务上一般使用Hive Table来定期同步到HBase或者MySQL。Flink 还可以允许直接维度表与 Hive 表相关联。当前的实现非常简单。它需要在每个并发中加载Hive表的所有数据,仅用于小表的关联。

  传统的 Hive Table 只支持批量读取和计算,但我们现在可以使用流式的方式来监控 Hive 中的分区/文件生成,即每条数据都可以实时消费。充分复用 Flink Streaming SQL 方式,可以对 HBase、MySQL、Hive Table 进行 Join 操作,最终通过 FileWriter 实时写入 Hive Table。

  构建流批处理准实时数仓的应用实践

  

  案例如下:通过Flume采集日志,查看Logs,计算每个年龄段的PV。这时候,我们有两个链接:

  

  这里就是我们刚才提到的,虽然它对应两个数据库:realtime_db和offline_db,但是它们共享一个元数据。

  对于 Hive 表,我们可以通过 Flink SQL 提供的 Hive 方言语法在 Flink 中创建 Hive 表,然后使用 Hive 的 DDL 语法在 Flink 中创建 Hive 表。这里设置PARTITION BY day 和hour 与实时链接不同,因为实时链接没有分区的概念。

  如何避免表结构中的分区导致的schema差异?一个可以解决的方案是考虑引入隐藏分区的定义。Partition的字段可以是某个字段的Computed Column,也可以和实际常见的情况进行比较,比如day或者hour是通过time字段计算出来的,然后是下面三个参数:

  然后设置回默认的 Flink 方言,创建 Kafka 的实时表,通过 insert into 将 Kafka 中的数据同步到 Hive。

  

  这部分是关于Kafka中的表是如何通过Dim join获取User表的age字段的。图中需要关注的是参数lookup.join.cache.ttl。我们会将用户表以与广播类似的方式广播到每个任务,但在此过程中可能会对 Hive 中的表进行更新操作。,这里的1h表示数据有效期只有1小时。创建视图的目的是增加 Dim Join 所需的处理时间(Dim Join 需要定义 Process 时间是一个不自然的过程,我们还将考虑如何在不破坏 SQL 语义的情况下简化 DimJoin 的语法。)

  

  通过实时Pipeline的方式消费Hive Table,而不是过去通过调度或者手动触发批处理作业,第一个参数streaming-source.enable,开启流处理机制,然后使用start-offset参数指定哪个分区/file 开始消费。至此,整个流-审批-一体化准实时数仓应用基本完成。

  未来计划

  Hive 在分区级别管理的 Table Format 在方便性上存在一些限制。如果是Iceberg等新的Table Format,会有更好的支持。未来,Flink 将加强以下几个方面:

  更多详情,请查看 InfoQ 公开课的完整视频回放:

  直播回放:基于 Flink+Hive 搭建流审批和准实时数据仓库 | InfoQ

  导师:

  李劲松,信花名,阿里巴巴技术专家,Apache Flink Committer。从2014年开始,一直专注于阿里巴巴内部的Galaxy流计算框架;2017年起开始Flink研发,主要关注批量计算、数据结构和类型。

  关注我转发本文文章,私信我“获取资讯”,即可免费获得价值4999元的InfoQ迷你书,点击文末“了解更多”,即可前往InfoQ官网获取最新资讯~

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线