智能采集平台(OPPO实时计算平台架构实践(一):开源+自研相结合)
优采云 发布时间: 2022-03-11 20:13智能采集平台(OPPO实时计算平台架构实践(一):开源+自研相结合)
导读:今天分享的内容是OPPO实时计算平台架构的实践。它将围绕以下四个内容展开:
01
背景介绍
首先介绍一下OPPO大数据所涵盖的业务范围以及大数据平台的概况。
1.OPPO大数据业务范围
说到OPPO,大家应该都不陌生了。它是中国三大智能手机制造商之一。智能手机的设计、制造和销售是OPPO非常重要的一环,用户群体也非常庞大。目前,ColorOS(OPPO定制的系统)月活跃用户超过3亿。依托手机,衍生出很多商业服务。主要类别包括用户服务、商店和游戏、内容产品和智能服务。我们的大数据几乎服务于所有这些业务。其中,典型的服务对象包括软件商店、浏览器、商城(OPPO电商业务)。目前电商业务主要以手机、数码产品、物联网产品的销售为主。
2. 大数据计算(开源+自研相结合)
大数据平台存储的数据量已超过600P,日增量数据量超过万亿条,日增量数据量达数PB。这是我们大数据平台的能力矩阵,列出了一些主要的。我们主要采用开源+自研相结合的方式来构建我们的大数据计算系统。开源包括Flink、Spark、Trino、Yarn等系统和组件。在这些开源系统和组件的基础上,我们构建了自主研发的数据接入、实时计算、离线计算、交互分析系统和数据质量等系统。
02
平台架构
1. 实时平台架构
OPPO 实时计算平台是基于 Flink 构建的。计算引擎为 Flink,目前支持 SQL 和 JAR 开发操作。架构图的最上层是面向数据开发者的交互式开发页面,包括SQL开发IDE、JAR作业开发IDE、作业监控管理工具等。下一层是 Data API 和 Open Api。该层处理各种业务逻辑。Data API 主要处理与我们平台中各种作业相关的逻辑。Open API 是一组暴露我们计算服务的接口,主要是为公司服务的。其他平台,让他们可以基于我们的计算能力快速构建一些自己的产品。再往下是 Job GateWay。Job GateWay 执行与作业编译、在线和离线等相关的操作。通过 Job GateWay,作业提交到 Yarn 集群或 K8s 集群运行。另一个模块是Backend模块,主要处理在线作业监控逻辑。架构上的服务将与左侧的 MetaData 模块进行交互,该模块存储了我们所有作业的元数据信息。图中右侧的智能监控是一个贯穿所有模块的外部服务,所有模块的监控数据进入监控系统。智能监控提供指标或日志查询和监控功能。图中右侧的智能监控是一个贯穿所有模块的外部服务,所有模块的监控数据进入监控系统。智能监控提供指标或日志查询和监控功能。图中右侧的智能监控是一个贯穿所有模块的外部服务,所有模块的监控数据进入监控系统。智能监控提供指标或日志查询和监控功能。
从整个系统层面进行设计,从以下几个方面构建整个系统:
这是实时计算平台的基本情况。
2. 实时开发过程
模块如何在工作生命周期中协同工作?
这就是系统中的各个模块在作业的整个生命周期中如何协作。
3. SQL IDE
以上是 SQL IDE 的截图。可以看到图的左侧显示了作业元数据信息,包括作业可以使用的库、表等。中间是SQL开发窗口,可以编写SQL,格式化SQL,提供SQL自动补全。右侧是作业参数编辑窗口和作业版本管理窗口。底部是 SQL 调试的结果反馈窗口。当前显示的作业是调试成功状态。目前平台上3000+的作业,80%以上都是用SQL开发的。
4. 发展阶段
一个工作有自己的生命周期,同一个平台也有自己的发展阶段。让我们看看我们现在所处的阶段。
首先,我根据可用性、易用性、易用性和使用意愿定义了平台的四个不同阶段:
对应我们的计算平台,在我们提供了SQL作业和JAR作业以及一些基础的监控报告之后,已经可以使用了。另外,提供了比较完善的运维工具,监控告警也比较好用。一个好用的系统,我认为,应该提供作业诊断、调优建议,以及更完善的操作工具。在他想用的阶段,业务基本可以无视一切。他正常提交作业后,系统会帮他维护。工作生命周期完成后,将运行业务报告发送给业务人员,这是最先进的。阶段。按照之前定义的评价标准,我认为我们的平台应该处于从好用到好用的过渡阶段。
5. 作业诊断
① 诊断目标
作业诊断目标可以将作业的运行状态实时反馈给业务。整个作业运行过程中有很多监控指标,每个指标都有不同的含义。如果只展示监控指标,业务可能无法理解,所以我们需要将相关指标实时、可读,业务可读的格式。提供反馈的方式。另外,当作业运行过程中出现问题时,应给出作业调优建议,作业诊断就是为了实现这两个目标。通过监控采集作业生命周期中产生的各种指标和日志信息来实现、构建诊断系统和分析的基本路径。
② 诊断分析
首先,我们分析工作生命周期中的指标和状态,了解从工作发展到工作终止的生命周期中会产生什么样的信息。不同的阶段有不同的信息。在作业开发阶段,会出现语法错误、参数错误等一些提示。在作业调试阶段,权限检查失败,环境检查失败。这样的信息可以提供给用户。在作业提交阶段,有资源检查异常、参数检查异常等。现在,所有三个阶段都直接向 IDE 提供信息,以便向作业开发人员提供反馈。在job运行阶段,会出现source异常、sink异常、序列化和反序列化异常、数据延迟、OOM异常、checkpoint异常、数据倾斜和其他信息。作业诊断主要关注作业运行和作业终止两个阶段。该工作将因各种原因终止。这时候因为job没有监控信息,所以需要分析一些终止的日志信息。
③ 诊断过程
作业诊断的一般结构如上图所示。从 IDE 提交的作业很容易提交到计算集群通过 Job GateWay 运行。这里直接提交到 Yarn 集群。该工作有两个角色:JM 和 TM。每个 JM 都有自己的度量系统,并向外界公开一个 REST API,TM 也是类似的。作业的指标通过作业节点自身的监控系统上报给智能监控平台进行存储和处理。另一个是日志信息。我们在每个 Yarn 节点上部署 LogAgent,它将节点上的日志采集聚合到智能监控平台进行存储,并提供检索服务。
此外,智能监控平台还可以配置各种度量触发策略。以作业重启指标为例,我们在平台上配置作业重启告警和回调策略,注册回调接口。如果发生作业重启,它会形成一个警报并回调我们注册的接口通知给我们的作业诊断模块。
诊断模块收到回调后,会首先尝试通过MetaData提供的REST接口获取作业信息。获取作业信息后,会通过JM REST接口获取作业异常信息。因为重启可能是内部重启,也就是在自身重启策略范围内的重启,实际上并没有挂掉。这时候就可以通过JM的REST接口获取准确的异常信息了。有了准确的异常信息,可以通过分析得到作业重启的原因,然后将分析结果和异常信息写入DB和ES。DB主要存储分析结果,ES存储异常的具体信息。,便于后续跟进。
因为这个诊断有可能是不准确的,我们可以通过再次分析ES中的日志信息来修正诊断结果。如果现阶段无法从JM REST获取到异常信息,则很有可能该作业实际上已经被挂起。这时候LogAgent之前上报的日志就派上用场了。此时,可以通过监控平台提供的日志检索来检索日志。拿出来,分析一下log,最后得到一个结果,把分析的结果和具体的log保存下来。
这是工作诊断的一般流程。
④ 诊断结果
诊断结果出来后,平台可以在页面上展示部分诊断结果和调优建议。此外,您还可以通过日志查询查看具体的作业日志信息。
目前可以做到,大致就是图中的结果,显示任务的当前状态,比如当前使用了多少核,是什么状态,最近重启过,原因重启是内存溢出。然后给出内存溢出的调优建议,建议适当调整TM的内存。
6. 链路监控
从数据访问系统OBUS,数据经过初步处理后写入kafka,然后Flink接收kafka数据进行处理。核心链路的流量很大,很重要。我们做了一个核心链路延迟监控。延迟可以分为几个阶段。第一阶段是在 OBUS 内处理服务数据的延迟。OBUS已经处理了发送kafka的延迟。一般来说,kafka处理完后是同步发送的,但是有可能是这个地方发生了失败再发起发送,在重试的过程中会有很大的延迟。另一个是Kafka已经收到消息,Flink有足够的计算能力产生延迟。三个延迟加在一起就是整个链路的延迟。
首先OBUS接收到数据时会记录一个接收到的时间,记为server_time,OBUS在数据处理结束时会记录一个时间parse_time,然后发送给kafka。kafka本身不需要记录时间,kafka是存储消息的。写入时将记录存储的时间时间戳。最后,Flink在这个阶段接收消息的时候,有一个process_time,这样就可以得到四次。分析清楚后,下一步就是执行。我们优化了 flink KafkaSource 模块中的代码。在这里,我们将接收到的信息进行计算,并将计算结果作为自定义指标报告给监控平台,您可以将其存储起来。
最后,我遇到了这样的链路监控情况。除了图表,我们还可以配置一些告警策略进行监控。当链路出现延迟时,我可以及时报警,方便我们准确定位和定位问题。快速恢复。
7. 实时 SLA
在链路监控的基础上,进一步保证了实时的作业SLA准时率。
业务准入进来了,除了想看每个工作的运行情况,他还想看一份整体的运行报告。为此,我们做了实时的准时保证报告。前期准备工作需要去采集业务对不同job延迟的容忍度,结合这个指标采集,结合链路延迟的延迟数据采集上面我们做的监控,我们可以很容易的在某一个准点的时间得到工作,大局可以上报。如果准时率不是100%,可以找出准时工作,再结合工作诊断,甚至可以快速找出导致准时的工作原因是什么.
03
应用实践
1. 实时数据仓库
实时计算的一个典型应用场景是实时数据仓库。实时数仓比对的核心逻辑是数据拆分、数据清洗和数据聚合。数据源从应用端嵌入,业务同步数据,MySQL数据,Oracle数据,数据写入Kafka。数据仓库团队编写SQL通过实时平台访问Kafka数据,并对数据进行拆分。图层ODS数据,根据整个平台的表格做一些关联和清洗,得到DWD图层数据。再往下,对DWD数据做一些汇总和聚合操作,得到一些业务真正想要的数据。
目前,实时数仓已在公司内部全面推广。几乎所有业务访问数据都经过实时数仓,很少有业务去Kafka接收原创访问数据。
2. 实时大屏
实时大屏在电商推广活动中占有非常重要的地位。比如618、双十一等活动,刚过半夜,各大电商就已经开始发布战报。为什么他们可以这么快发送?? 在很大程度上,它也得益于实时计算的强大计算能力。在类似的活动中,OPPO也做了自己的大屏。这里的大屏幕实际上是相似的。一般的计算无非就是GMV、PV、UV,以及订单量等等都是一些指标。
电子商务的核心数据一般都写在 MySQL 等 DB 中。如何将数据导入我们的一个计算平台进行计算,是一个需要解决的问题。
比较经典的链接之一是数据在 MySQL 中。通过Canal等工具,将数据写入Kafka。Flink 从 Kafka 拉取数据进行计算,并将计算结果输出到 DB 上报。
这个环节的好处是整个解决方案用到的组件,比如Canal和Kafka,已经开发运行多年,非常成熟。其次,基于这些成熟的组件,一般公司都开发了一些比较完善的监控*敏*感*词*。另外,如果 MySQL 数据导出后,下游计算不止一次,Kafka 消息可以被消费多次进行计算,因此它的扩展性会相对更好。
但它也有一些明显的缺点。我们可以看到,这里的一个数据至少可以通过Canal和Kafka计算到计算层。链接还是很长的。较长的链接相对难以保证。需要保证每个节点都正常。如果某一点有问题,则无法生成数据。另外,这个环节主要支持增量场景。在电商推广的过程中,主要是增量计算,所以问题不大。
还有一个比较新的环节,Flink CDC。Flink CDC 是社区去年才支持的能力。从图中可以看出,这个链接很短。Flink 可以直接提取 MySQL Binlog,然后进行分析计算。它最大的优点是链路短,涉及的元件少,所以理论上稳定性会更高,数据延迟会更低。此链接同时支持完整和增量。但是有一个明显的缺点,就是比较新,还没有形成比较完整的解决方案。比如我们要聚合一些复杂的链接数据,就没有那么简单了。
在做大屏之前,我们也对这两个环节做了一些验证,最终选择了第一个经典环节。主要原因是我们的数据量不是很大,各方面的延迟都是可以接受的。也比较成熟,各个环节的连接和监控都比较完善,实际运行效果还是比较好的。
04
规划
OPPO实时计算平台背后的演进有两个方向,一是库湖一体化建设,二是云原生支持。
1. 沧湖综合建设
从目前业界的实践经验来看,数仓与湖的一体化不仅可以节省大量的存储资源,还可以简化大数据系统的架构。上面我们也看到,目前系统下的数据仓库建设整个链条很长,中间要流过几次Kafka和Flink,而且由于数据链长,存储资源的浪费也很严重。
我们选择Iceberg进行仓湖一体化建设。典型的应用场景是准实时数据仓库。准实时数据仓库不需要数据传输,可以通过很短的链接提供现有数据仓库的能力。但这受到技术限制。它可能只能达到近乎实时的效果。目前已经开通了kafka到lceberg的链接,现在部分数据已经通过这种方式进入存储了。另外,CDC到lceberg的链接也已经打开,数据也已经存入storage。lceberg 将这个 block 读到 Flink 之后,还在进行中。这块完成后,我们基本可以在此基础上搭建我们的准实时数仓了。
2. 云原生
另一个发展方向是支持云原生,实现弹性伸缩,充分利用云资源。目前,计算资源主要由 Yarn 管理。接下来,我们将支持 K8s 调度。公司有很多大型的K8s集群,上面运行着很多在线服务。那时,Flink 任务可以与这些服务混合使用。资源得到充分利用。
05
精彩问答
Q:Kafka表字段元数据是如何管理的?
A:元数据管理分为两种版本架构:
第一种方法是将数据写入MySQL表中进行独立管理。这种方法的缺点是实时元数据只能实时使用,不能与离线结合。
第二种方法是使用 FlinkHive Catalog 进行管理。在这个阶段,元数据管理是两种方法的结合。原有业务继续在MySQL中管理,新业务使用HMS管理。
Q:如何在kafka表中添加新字段?
A:由于数据格式有很多种,比如avro、json格式,所以不同格式的数据类型的操作方法是不一样的。如果在页面上编辑表格为json格式的处理方式,然后在写入数据时添加相应的字段。是的,需要的时候使用json格式的序列化或者反序列化。由于该表被编辑,所有涉及该表的作业都需要重新发布才能生效。
Q:你们公司是怎么做MySQL for Kafka join 分库分表的?
A:维度表使用单表,很少使用分库分表。如果想做,能不能先把分库分表做一个union,再加入维表?
Q:K8s 如何做云原生?
A:K8s处于实现阶段,在研究阶段发现了一个问题。Yarn 支持 Perjob 模式提交 JAR 和 SQL 作业,但 K8s 不支持 Perjob 模式提交 SQL 作业。因此,我们将 K8s 的 Application 模式改造为支持类似于 Yarn 的 perjob 模式提交 SQL 作业。