实时文章采集(发展历程FlinkSQL应用实践平台建设总结展望(组图))
优采云 发布时间: 2022-01-31 09:14实时文章采集(发展历程FlinkSQL应用实践平台建设总结展望(组图))
摘要:本文组织了 Homework Group 实时计算负责人张颖在 Flink Forward Asia 2021 上的分享。Flink 在 Homework 实时计算的演进中发挥了重要作用。尤其是在 FlinkSQL 的帮助下,实时任务的开发效率得到了极大的提升。本文文章主要分享FlinkSQL在作业帮助中的使用和实践经验,以及随着任务规模的增加,从0到1搭建实时计算平台过程中遇到的问题和解决方法. 内容包括:
发展历程 Flink SQL 应用实践平台建设总结与展望
FFA 2021 现场重播和演讲 PDF 下载
一、发展历程
Homework Help主要利用人工智能、大数据等技术为学生提供更高效的学习解决方案。因此,业务数据主要包括学生的出勤率和知识点的掌握情况。在整体架构中,无论是binlog还是普通日志,都是在采集之后写入Kafka,分别通过实时和离线计算写入存储层。基于OLAP,对外提供相应的产品化服务,如工作台、BI分析等。工具。
目前 Homework Help 的实时计算主要基于 Flink,开发过程分为三个阶段:
2019 年实时计算包括少量 SparkStreaming 作业,提供给导师和讲师。在解决实时性需求的过程中,会发现开发效率很低,数据很难复用;之后,常规的做法是在生产实践中逐步应用 Flink JAR,积累经验后开始搭建平台,应用 Flink SQL。但是,20年来,业务提出了很多实时计算的需求,我们的开发人力储备不足。当时,在 Flink SQL 1.9 发布后不久,SQL 功能发生了很大变化,所以我们的做法是直接将 Flink SQL 应用到实时数仓方向。现在,整个实时数仓90%以上的任务都是使用Flink SQL实现的。; 到了 2020 年 11 月,Flink 的 Job 数量迅速增加到数百个,我们开始构建从 0 到 1 的实时计算平台,已经支撑了公司所有重要业务线,并且计算部署在多个集群上在多云中。
介绍以下两个方面:
FlinkSQL实践中遇到的典型问题及解决方案;搭建实时计算平台过程中的一些思考。二、Flink SQL 应用实践
下面是基于 Flink SQL 的完整数据流架构:
binlog/log 采集 写入 Kafka 后,topic 会自动注册为元数据表,这是后续所有实时 SQL 作业的起点。用户可以在 SQL 作业中使用此表,而无需定义复杂的 DDL。
同时,在考虑实际应用时,还需要根据元数据表添加或替换表属性:
新增:元数据记录表级属性,但 SQL 作业可能需要添加任务级属性。比如对于Kafka源表,添加job的group.id来记录offset;替换:离线测试时,在引用元数据表的基础上,只需要定义broker topic等属性覆盖源表,即可快速构建一行。测试表格如下。
框架还需要支持用户的SQL作业,方便输出指标和日志,实现全链路监控和跟踪。
这里主要介绍SQL添加Trace函数时的DAG优化实践,以及我们对Table底层物理存储的选择和封装。
2.1 SQL 添加 Trace 功能
SQL可以提高开发效率,但是业务逻辑的复杂度还是有的,复杂的业务逻辑写的DML会很长。在这种情况下,建议使用视图来提高可读性。因为视图的 SQL 比较短,所以不应该像代码规范中的单个函数那样太长。
下图左侧是一个示例任务的部分DAG,可以看到有很多SQL节点。这种情况下很难定位,因为如果是DataStream API实现的代码,也可以加日志。但是 SQL 做不到。用户可以干预的入口很少,只能看到整个作业的输入输出。
类似于在函数中打印日志,我们希望支持在视图中添加 Trace,以方便案例追踪。
但是我在尝试将 Trace 添加到 SQL 时遇到了一些问题,这是一个简化的示例:
右上角的SQL创建source_table为源表,prepare_data视图读取表,在sql中调用foo udf,然后使用StatementSet分别插入两个下游,同时将视图转换为DataStream调用 TraceSDK 写入跟踪系统。
注意:我们当时是基于 1.9 开发的。为了清楚起见,我们还使用了一些后来添加的功能。
上图下方的实际 DAG 看起来并不像预期的那样:
DAG分为上下不相关的两部分。Kafka源表是DataSource部分,读取两次;foo 方法被调用了 3 次。
数据源压力和计算性能需要优化。
为了解决这个问题,我们需要从几个角度进行优化。这里主要介绍DAG合并的思想。无论是table还是stream的env,都会产生相应的transformation。我们的做法是统一合并到stream env中,这样就可以在stream env中得到一个完整的变换列表,然后生成StreamGraph提交。
左下角是我们优化的 DAG,读取源表并只调用一次 foo 方法:
优化后的 DAG 效果与我们写 SQL 时的逻辑图非常相似,性能自然符合预期。
回到问题本身,业务可以简单地用一条语句给视图的某些字段添加trace,例如:prepare_data.trace.fields=f0,f1. 由于SQL自然收录字段名,所以trace数据的可读性甚至高于普通原木。
2.2 表的选择与设计
如前所述,我们的首要要求是提高人的效率。因此,Table 需要具备更好的分层和复用能力,并且支持模板化开发,以便 N 个端到端的 Flink 作业能够快速串联起来。
我们的解决方案是基于 Redis 实现的,它首先有几个优点:
高qps,低延迟:这应该是所有实时计算的关注点;TTL:用户无需关心数据如何退出,可以给出合理的TTL;通过使用protobuf等高性能紧凑的序列化方式,使用TTL,整体存储小于200G,redis的内存压力可以接受;适合计算模型:为了保证计算本身的时序,会进行keyBy操作,同时需要处理的数据会被shuffle到同一个并发,所以不依赖存储过多考虑锁优化。
接下来,我们的场景主要是解决多索引和触发消息的问题。
上图显示了一个表格示例,显示学生是否出现在某个章节中:
多索引:数据首先以字符串形式存储,比如key=(uid, course_id), value=serialize(is_attend, ...),这样我们就可以在SQL中JOIN ON uid AND course_id。如果 JOIN ON 其他字段,比如 course_id 怎么办?我们的做法是同时写一个以lesson_id为key的集合,集合中的元素是对应的(uid,lesson_id)。接下来,在找lesson_id = 123的时候,先取出集合下的所有元素,然后通过管道找到所有的VALUE并返回;触发消息:写入redis后,会同时向Kafka写入一条更新消息。两个存储在 Redis Connector 的实现中都保证了一致性、顺序性和不丢失数据。
这些功能都封装在 Redis Connector 中,业务可以通过 DDL 简单定义这样的 Table。
DDL 中的几个重要属性:
primary 定义主键,对应字符串数据结构,如示例中的uid + course_id;index.fields 定义了辅助搜索的索引字段,如示例中的课程id;也可以定义多个索引;poster.kafka定义了接收触发消息kafka表也定义在元数据中,用户可以在后续的SQL作业中直接读取该表而无需定义。
因此,整个开发模式复用性高,用户可以轻松开发端到端的N个SQL作业,而无需担心如何追溯案例。
三、平台搭建
上述数据流架构搭建完成后,2020.11的实时作业数量迅速增加到几百个,比2019年快很多。这个时候我们开始搭建实时计算平台从0到1,然后分享了搭建过程中的一些想法。
平台支持的功能主要有三个起点:
统一:统一不同云厂商的不同集群环境、Flink版本、提交方式等。之前hadoop客户端分散在用户的提交机器上,给集群数据和任务安全带来隐患,增加了后续集群升级和迁移的成本。我们希望通过平台统一任务的提交入口和提交方式;易用性:平台交互可以提供更易用的功能,比如调试、语义检测等,可以提高任务测试的人为效率,记录任务的版本历史。方便的在线和回滚操作;规范:权限控制、流程审批等,类似于线上服务的线上流程,通过平台,实时任务的研发流程可以标准化。3.1 规范——实时任务进程管理
FlinkSQL 让开发变得非常简单高效,但是越简单越难标准化,因为写一段 SQL 可能只需要两个小时,但通过规范却需要半天时间。
但是,该规范仍然需要执行。一些问题类似于在线服务,在实时计算中也遇到过:
记不清了:任务上线已经一年了,最初的需求可能是口耳相传。最好记住wiki或email,但在任务交接时容易记住;不规范:UDF或DataStream代码 两种方式都没有按照规范,可读性差,以至于后来接手的同学不能升级,或者不敢改,长期维护不了。还应该有一个关于如何编写包括实时任务的 SQL 的规范;找不到:线上运行的任务依赖一个jar,哪个git模块对应哪个commitId,如果有问题如何第一时间找到对应的代码实现;盲改:一直正常的任务,周末突然报警,
规范主要分为三个部分:
开发:RD 可以从 UDF 原型项目快速创建 UDF 模块,该项目基于 flink 快速入门。创建的 UDF 模块可以正常编译,包括 WordCount 之类的 udf 示例,以及 ReadMe 和 VersionHelper 等默认的 helper 方法。根据业务需求修改后,通过CR上传到Git;需求管理和编译:提交的代码会与需求卡片关联,集群编译和QA测试后,可以在线下单;在线:根据模块和编译输出,选择更新/被job owner或leader批准后创建和重新部署哪些job。
整个研发过程不能离线修改,比如更改jar包或者对哪个任务生效。一个实时任务,即使运行几年,也能查到谁在线,谁批准了当前任务,当时的测试记录,对应的Git代码,提出的实时指标要求由谁开始。任务维持很长时间。
3.2 易用性 - 监控
我们当前的 Flink 作业在 Yarn 上运行。作业启动后,预计 Prometheus 会抓取 Yarn 分配的 Container,然后连接到报警系统。用户可以根据告警系统配置Kafka延迟告警和Checkpoint故障告警。构建此路径时遇到两个主要问题:
PrometheusReporter启动HTTPServer后,Prometheus如何动态感知;它还需要能够控制度量的大小,以避免采集大量无用数据;我们的 SQL 源表基本上是基于 Kafka 的。相比第三方工具,在计算平台上配置Kafka延迟告警更方便。因为自然可以得到任务读取的topic和group.id,所以也可以和任务失败使用同一个告警组。结合告警模板,配置告警非常简单。
关于解决方案:
发现功能是在官方PrometheusReporter的基础上增加的。Container 的 HTTPServer 启动后,对应的 ip:port 以临时节点的形式注册到 zk 上,然后使用 Prometheus 的 discover 目标来监控 zk 节点的变化。由于是临时节点,当 Container 被销毁时,该节点就消失了,Prometheus 也能感应到它不再被抓取。这样一来,就很容易为普罗米修斯搭建一条抢夺的路径。KafkaConsumer.records-lag 是一个比较实用和重要的延迟指标,它主要做了两个任务。修改 KafkaConnector 并在 KafkaConsumer.poll 之后将其公开,以确保 records-lag 指示器可见。此外,在这样做的过程中,我们发现不同Kafka版本的metric格式是不同的()。我们的方法是将它们扁平化为一种格式,并将它们注册到 flink 的指标中。这样不同版本暴露的指标是一致的。四、总结与展望
上一阶段使用 Flink SQL 来支持实时作业的快速开发,搭建了实时计算平台来支持数千个 Flink 作业。
更大的见解之一是 SQL 确实简化了开发,但它也阻止了更多的技术细节。对实时作业运维工具的要求,比如 Trace,或者任务的规范没有改变,对这些的要求更加严格。因为在细节被屏蔽的同时,一旦出现问题,用户不知道如何处理。就像冰山一角,漏水越少,下沉越多,越需要做好周边系统的建设。
二是适应现状。一是能尽快满足当前的需求。比如,我们正在提高人的效率,降低发展门槛。同时还要继续探索更多的业务场景,比如用HBase和RPC服务代替Redis Connector。现在的好处是修改了底层存储,用户对SQL作业的感知很小,因为SQL作业基本都是业务逻辑,DDL定义了元数据。
接下来的计划主要分为三个部分:
支持资源弹性伸缩,平衡实时作业的成本和时效;我们从 1.9 开始*敏*感*词*应用 Flink SQL,现在版本升级变化很大,需要考虑如何让业务能够低成本升级和使用新版本。版本中的功能;探索流批融合在实际业务场景中的实现。
FFA 2021 现场重播和演讲 PDF 下载
更多Flink相关技术问题,可以扫码加入社区钉钉交流群