算法 自动采集列表( FlinkFlinkSQL实践遇到的典型问题以及解决方案2.实时计算平台建设过程中的思考)
优采云 发布时间: 2022-02-02 03:10算法 自动采集列表(
FlinkFlinkSQL实践遇到的典型问题以及解决方案2.实时计算平台建设过程中的思考)
文/张颖
概括
Flink 在 Homework 实时计算的演进中发挥了重要作用。尤其是在 FlinkSQL 的帮助下,实时任务的开发效率得到了极大的提升。
本文文章主要分享FlinkSQL在作业帮助中的使用和实践经验,以及随着任务规模的增加,从0到1搭建实时计算平台过程中遇到的问题和解决方法.
一、发展历程
Homework Help主要利用人工智能、大数据等技术为学生提供更高效的学习解决方案。因此,业务数据主要包括学生的出勤率和知识点的掌握情况。在整体架构中,无论是binlog还是普通日志,都是在采集之后写入Kafka,分别通过实时和离线计算写入存储层。基于OLAP,对外提供相应的产品化服务,如工作台、BI分析等。工具。
目前 Homework Help 的实时计算主要基于 Flink,开发过程分为三个阶段:
1. 2019 年实时计算包括少量 SparkStreaming 作业,提供给导师和讲师。在解决实时性需求的过程中,会发现开发效率很低,数据几乎无法复用。
2. 之后,正常的做法是在生产实践中逐步应用Flink JAR,积累经验后开始搭建平台和应用Flink SQL。但是在过去的20年里,业务对实时计算提出了很多要求,而我们开发的人力储备不足。当时,在 Flink SQL 1.9 发布后不久,SQL 功能发生了很大变化,所以我们的做法是直接将 Flink SQL 应用到实时数仓方向。目前整个实时数仓90%以上的任务都是使用Flink SQL实现的。
3. 到 2020 年 11 月,Flink 的 Job 数量迅速增加到数百个,我们开始构建从 0 到 1 的实时计算平台,已经支撑了公司所有的重要业务线,计算是部署在多个云中。在一个集群上。
介绍以下两个方面:
1. FlinkSQL 实践中遇到的典型问题及解决方案
2. 实时计算平台建设的几点思考
二、Flink SQL 应用实践
下面是基于 Flink SQL 的完整数据流架构:
binlog/log 采集 写入 Kafka 后,topic 会自动注册为元数据表,这是后续所有实时 SQL 作业的起点。用户可以在 SQL 作业中使用此表,而无需定义复杂的 DDL。
同时,在考虑实际应用时,还需要根据元数据表添加或替换表属性:
1. 新增:元数据记录表级属性,但 SQL 作业可能需要添加任务级属性。例如,对于Kafka源表,添加作业的group.id来记录偏移量。
2. 替换:离线测试时,在引用元数据表的基础上,只需要定义broker topic等属性覆盖源表,即可快速构建离线测试表。
框架还需要支持用户的SQL作业,方便输出指标和日志,实现全链路监控和跟踪。
这里主要介绍SQL添加Trace函数时的DAG优化实践,以及我们对Table底层物理存储的选择和封装。
2.1. SQL 增加 Trace 功能
SQL可以提高开发效率,但是业务逻辑的复杂度还是有的,复杂的业务逻辑写的DML会很长。在这种情况下,建议使用视图来提高可读性。因为视图的 SQL 比较短,所以不应该像代码规范中的单个函数那样太长。
下图左侧是一个示例任务的部分DAG,可以看到有很多SQL节点。这种情况下很难定位,因为如果是DataStream API实现的代码,也可以加日志。但是 SQL 做不到。用户可以干预的入口很少,只能看到整个作业的输入输出。
类似于在函数中打印日志,我们希望支持在视图中添加 Trace,以方便案例追踪。
但是我在尝试将 Trace 添加到 SQL 时遇到了一些问题,这是一个简化的示例:
右上角的SQL创建source_table为源表,prepare_data视图读取表,在sql中调用foo udf,然后使用StatementSet分别插入两个下游,同时将视图转换为DataStream调用 TraceSDK 写入跟踪系统。
注意:我们当时是基于 1.9 开发的。为了清楚起见,我们还使用了一些后来添加的功能。
上图下方的实际 DAG 看起来并不像预期的那样:
1. DAG分为上下不相关的两部分。Kafka源表是DataSource部分,读取两次。
2. foo 方法被调用了 3 次。
数据源压力和计算性能需要优化。
为了解决这个问题,我们需要从几个角度进行优化。这里主要介绍DAG合并的思想。无论是table还是stream的env,都会产生相应的transformation。我们的做法是统一合并到stream env中,这样就可以在stream env中得到一个完整的变换列表,然后生成StreamGraph提交。
左下角是我们优化的 DAG,读取源表并只调用一次 foo 方法:
优化后的 DAG 效果与我们写 SQL 时的逻辑图非常相似,性能自然符合预期。
回到问题本身,业务可以简单的使用一条语句给视图的某些字段添加trace,例如:prepare_data.trace.fields=f0,f1. 由于SQL中自然收录字段名,所以trace数据可读性甚至高于普通日志。
2.2. 表选择与设计
如前所述,我们的首要要求是提高人的效率。因此,Table 需要具备更好的分层和复用能力,并且支持模板化开发,以便 N 个端到端的 Flink 作业能够快速串联起来。
我们的解决方案是基于 Redis 实现的,它首先有几个优点:
1. 高qps,低延迟:这应该是所有实时计算的关注点。
2. TTL:用户无需关心数据如何退出该字段,只需给一个合理的TTL即可。
3. 通过使用protobuf等高性能紧凑的序列化方式,并使用TTL,整体存储小于200G,redis的内存压力可以接受。
4. 适配计算模型:为了保证计算本身的时序,会进行keyBy操作,同时需要处理的数据会被shuffle到同一个并发,所以它不会过多依赖存储来考虑锁的优化。
接下来,我们的场景主要是解决多索引和触发消息的问题。
上图显示了一个表格示例,显示学生是否出现在某个章节中:
1. 多索引:数据首先以字符串形式存储,比如key=(uid, course_id), value=serialize(is_attend, ...),这样我们就可以在SQL中JOIN ON uid AND course_id . 如果 JOIN ON 其他字段,比如 course_id 怎么办?我们的做法是同时写一个以lesson_id为key的集合,集合中的元素是对应的(uid,lesson_id)。接下来在找lesson_id = 123的时候,先取出集合下的所有元素,然后通过管道找到所有的VALUE并返回。
2. 触发消息:写入redis后,会同时向Kafka写入一条更新消息。在 Redis Connector 的实现中,保证了两个存储之间的一致性、顺序性和不丢失数据。
这些功能都封装在 Redis Connector 中,业务可以通过 DDL 简单定义这样的 Table。
DDL 中的几个重要属性:
1. primary 定义了主键,对应字符串数据结构,比如例子中的uid + course_id。
2. index.fields 定义了辅助搜索的索引字段,例如示例中的课程id;也可以定义多个索引。
3. poster.kafka 定义了接收触发消息的kafka 表。该表也在元数据中定义,用户可以直接读取该表,而无需在后续的 SQL 作业中定义。
因此,整个开发模式复用性高,用户可以轻松开发端到端的N个SQL作业,而无需担心如何追溯案例。
三、 平台搭建
上述数据流架构搭建完成后,2020.11实时作业数量迅速增加到几百个,比2019年快很多。这个时候我们开始搭建实时计算平台从0到1,然后分享了搭建过程中的一些想法。
平台支持的功能主要有三个起点:
1. 统一:统一不同云厂商的不同集群环境、Flink版本、提交方式等;之前hadoop客户端分散在用户的提交机器上,对集群数据和任务安全存在隐患。升级和迁移成本。我们希望通过平台统一任务的提交入口和提交方式。
2.易用性:平台交互可以提供更易用的功能,如调试、语义检测等,可以提高任务测试的人为效率,并记录任务的版本历史,支持方便在线和回滚操作。
3. 规范:权限控制、流程审批等,类似于在线服务的在线流程,通过平台可以规范实时任务的研发流程。
3.1.规范——实时任务进程管理
FlinkSQL 让开发变得非常简单和高效,但是越简单越难标准化,因为可能写一段 SQL 只需要两个小时,但通过规范却需要半天时间。
但是,该规范仍然需要执行。一些问题类似于在线服务,在实时计算中也遇到过:
1. 记不清了:任务上线一年了,最初的需求可能是口耳相传。最好记住wiki或email,但在任务交接时很容易记不清。
2. 不规则:UDF或DataStream代码,均不符合规范,可读性差。结果,后来接手的学生无法升级,也不敢改变,无法长期维持。还应该有一个关于如何编写包括实时任务在内的 SQL 的规范。
3. 找不到:线上运行的任务依赖一个jar,哪个git模块对应哪个commitId,有问题怎么第一时间找到对应的代码实现。
4.盲改:一直正常的任务,周末突然报警,原因是私下修改了线上任务的sql。
规范主要分为三个部分:
1. 开发:RD 可以从 UDF 原型项目中快速创建 UDF 模块,该项目基于 flink 快速入门。创建的 UDF 模块可以正常编译,包括 WordCount 之类的 udf 示例,以及 ReadMe 和 VersionHelper 等默认的 helper 方法。根据业务需求修改后,通过CR上传到Git。
2. 需求管理与编译:提交的代码将与需求卡片相关联。集群编译和QA测试后,即可下单上线。
3. 在线:根据模块和编译输出选择更新/创建哪些作业,并在作业所有者或领导批准后重新部署。
整个研发过程不能离线修改,比如更改jar包或者对哪个任务生效。一个实时任务,即使跑了几年,也能查到谁在线,谁批准了当前任务,当时的测试记录,对应的Git代码,以及提出的实时指标要求谁开始的。任务维持很长时间。
3.2 易用性监控
我们当前的 Flink 作业在 Yarn 上运行。作业启动后,预计 Prometheus 会抓取 Yarn 分配的 Container,然后连接到报警系统。用户可以根据告警系统配置Kafka延迟告警和Checkpoint故障告警。构建此路径时遇到两个主要问题:
1. PrometheusReporter启动HTTPServer后,Prometheus如何动态感知;它还需要能够控制度量的大小以避免采集大量无用数据。
2. 我们SQL的源表基本都是Kafka。相比第三方工具,在计算平台上配置Kafka延迟告警更方便。因为自然可以得到任务读取的topic和group.id,所以也可以和任务失败使用同一个告警组。结合告警模板,配置告警非常简单。
关于解决方案:
1. 添加了基于官方 PrometheusReporter 的发现功能。Container 的 HTTPServer 启动后,对应的 ip:port 以临时节点的形式注册到 zk 上,然后使用 Prometheus 的 discover 目标来监控 zk 节点的变化。由于是临时节点,当 Container 被销毁时,该节点就消失了,Prometheus 也能感应到它不再被抓取。这样一来,就很容易为普罗米修斯搭建一条抢夺的路径。
2. KafkaConsumer.records-lag 是一个比较实用和重要的延迟指标,主要做了两个任务。修改 KafkaConnector 并在 KafkaConsumer.poll 之后将其公开,以确保 records-lag 指示器可见。另外,在做这个的过程中,我们发现不同Kafka版本的metric格式是不同的()。我们的方法是将它们扁平化为一种格式,并将它们注册到 flink 的指标中。这样不同版本暴露的指标是一致的。
四、总结与展望
上一阶段使用 Flink SQL 来支持实时作业的快速开发,搭建了实时计算平台来支持数千个 Flink 作业。
更大的见解之一是 SQL 确实简化了开发,但它也阻止了更多的技术细节。对实时作业运维工具的要求,比如 Trace,或者任务的规范没有改变,对这些的要求更加严格。
因为在细节被屏蔽的同时,一旦出现问题,用户不知道如何处理。就像冰山一角,漏水越少,下沉越多,越需要做好周边系统的建设。
二是适应现状。一是能尽快满足当前的需求。比如,我们正在提高人的效率,降低发展门槛。同时还要继续探索更多的业务场景,比如用HBase和RPC服务代替Redis Connector。现在的好处是修改了底层存储,用户对SQL作业的感知很小,因为SQL作业基本都是业务逻辑,DDL定义了元数据。
接下来的计划主要分为三个部分:
1. 支持资源弹性伸缩,平衡实时作业的成本和时效性。
2. 我们从 1.9 开始*敏*感*词*应用 Flink SQL。现在版本升级发生了很大的变化,我们需要考虑如何让业务能够低成本的升级和使用新版本中的特性。
3. 探索流批集成在实际业务场景中的实现。
关于作者
张颖,2019年加入乔邦大数据中台研发部,负责实时计算相关工作。