整体实时数据处理链路的设计模型-上海怡健医学
优采云 发布时间: 2021-06-30 23:11整体实时数据处理链路的设计模型-上海怡健医学
1、设计背景
闲鱼当前的实际生产部署环境越来越复杂,对各种服务托盘的横向依赖错综复杂,对运行环境的纵向依赖也越来越复杂。当服务出现问题时,能否及时在海量数据中定位问题的根源,成为检验闲鱼服务能力的严峻挑战。
当网上出现问题时,往往需要十多分钟甚至更长时间才能找到问题的原因。因此,需要应用一个能够快速进行自动诊断的系统,而快速诊断的基础是一个高性能的实时数据处理系统。
这个实时数据处理系统需要具备以下能力:
1、Data real-time采集,实时分析,复杂计算,分析结果持久化。
2、 可以处理多种数据。收录应用日志、主机性能监控指标、调用链接图。
3、高可靠性。系统没有问题,数据不会丢失。
4、高性能,底层延迟。数据处理延迟不超过3秒,支持每秒千万条数据处理。
本文不涉及自动问题诊断的具体分析模型,只讨论整体实时数据处理环节的设计。
2、输入输出定义
为了便于理解系统的运行情况,我们将系统的整体输入输出定义如下:
输入:
服务请求日志(包括traceid、时间戳、客户端ip、服务器ip、耗时、返回码、服务名、方法名)
环境监测数据(索引名称、ip、时间戳、索引值)。比如cpu、jvm gc次数、jvm gc耗时、数据库指标。
输出:
一段时间内某个服务错误的根本原因,每个服务的错误分析结果用有向无环图表示。 (根节点是被分析的错误节点,叶节点是错误节点的根本原因。叶节点可能是外部依赖的服务错误,也可能是JVM异常等)。
3、建筑设计
在实际系统运行过程中,随着时间的推移,会不断产生日志数据和监控数据。生成的每条数据都有自己的时间戳。而这些带时间戳的数据的实时传输就像水在不同的管道中流动一样。
如果将连续不断的实时数据比作自来水,数据处理过程类似于自来水生产过程:
自然,我们也将实时数据处理过程分解为采集、传输、预处理、计算和存储阶段。
整体系统架构设计如下:
采集
使用阿里自研的sls日志服务产品(包括logtail+loghub组件),logtail为采集客户端。之所以选择logtail,是因为它的性能优异,可靠性高,以及灵活的插件扩展机制。宇可定制自己的采集插件,实现各种数据的实时采集。
转移
loghub可以理解为一个数据发布订阅组件,功能类似于kafka。作为数据传输通道,更稳定、更安全。详细对比文章reference:
预处理
实时数据预处理部分使用了blink流计算处理组件(开源版本叫flink,blink是阿里内部基于flink的增强版)。目前常用的实时流计算开源产品有Jstorm、SparkStream、Flink。由于Jstorm没有中间计算状态,计算过程中需要的中间结果必须依赖外部存储,会导致频繁的io影响其性能; SparkStream本质上是使用tiny批处理来模拟实时计算,实际上还是有一定的延迟; Flink以其优秀的状态管理机制保证了其计算性能和实时性,同时提供了完整的SQL表达式,让流计算变得更简单。
计算和持久化
数据经过预处理后,最终生成调用链路聚合日志和主机监控数据。主机监控数据将独立存储在tsdb时序数据库中,用于后续统计分析。由于对时间指标数据有特殊的存储结构设计,tsdb非常适合时间序列数据的存储和查询。调用链路日志聚合数据,提供给cep/graph服务进行诊断模型分析。 cep/graph service是闲鱼开发的一款应用,实现模型分析、复杂数据处理以及与外部服务的交互,借助rdb实现图数据的实时聚合。
最后将cep/graph服务分析的结果作为graph数据,dump到lindorm中实时在线查询。 Lindorm 可以看作是 hbase 的增强版,充当系统中的持久化存储。
4、设计细节和性能优化采集
日志和指标数据采集使用logtail,整个数据采集流程如图:
它提供了非常灵活的插件机制。插件有四种类型:
由于索引数据(如cpu、内存、jvm索引)需要通过调用本地机器上的服务接口来获取,因此应尽量减少请求次数。在 logtail 中,一个输入占用一个 goroutine。闲鱼使用自定义输入插件和处理器插件,通过服务请求(指标获取接口由基础监控组提供)获取输入插件中的多个指标数据(如cpu、内存、jvm指标),并格式化为在处理器插件中将一个json数组对象拆分为多条数据,以减少系统IO次数,提高性能。
转移
数据传输使用LogHub。 logtail写入数据后,blink直接消费数据。您只需要设置合理数量的分区。分区数必须大于等于bink读任务并发数,避免blink任务空闲。
预处理
预处理主要通过bink实现,主要设计和优化点:
1:写一个高效的计算过程
blink 是一个有状态的流计算框架,非常适合实时聚合、join 等操作。
在我们的应用中,我们只需要关注有错误的请求上相关服务链接的调用,所以整个日志处理流程分为两个流程:
一个。服务的请求入口日志作为单独的流处理,过滤掉请求错误数据。
B.其他中间环节的调用日志作为另一个独立的流处理,错误服务所依赖的请求数据选择是通过在traceid上加入上述流来实现的。
如上图所示,双流join后,输出的是请求错误相关的所有链接的完整数据。
2:设置合理的状态生命周期
Blink 本质上是在做join的时候通过state缓存中间数据状态,然后做数据匹配。如果状态的生命周期过长,会造成数据膨胀,影响性能。如果状态的生命周期太短,将无法正常关联一些延迟的数据。因此,需要合理配置状态生命周期,并为应用允许最大的数据延迟。 1 分钟。
使用niagara作为statebackend,以及设定state数据生命周期,单位毫秒
state.backend.type=niagara
state.backend.niagara.ttl.ms=60000
3:开启微批次/迷你批次
MicroBatch 和 MiniBatch 都是微批处理,但微批处理的触发机制略有不同。原则上,在缓存一定量的数据后触发处理,以减少对状态的访问,从而显着提高吞吐量并减少输出数据量。
开启join
blink.miniBatch.join.enabled=true
使用 microbatch 时需要保留以下两个 minibatch 配置
blink.miniBatch.allowLatencyMs=5000
防止OOM,每个批次最多缓存多少条数据
blink.miniBatch.size=20000
4: 动态负载使用 Dynamic-Rebalance 而不是 Rebalance
运行时闪烁任务最忌讳的是计算热点的存在。为了保证Dynamic Rebalance均匀使用数据,可以根据当前子分区累积的buffer数量,选择负载较轻的子分区写入,从而实现动态负载均衡。与静态再平衡策略相比,当下游各个任务的计算能力不平衡时,可以更加平衡各个任务的相对负载,从而提高整个作业的性能。
开启动态负载
task.dynamic.rebalance.enabled=true
5:自定义输出插件
数据关联后,统一请求链路上的数据需要作为数据包通知下游图分析节点。传统的方式是通过消息服务传递数据。但是,消息传递服务有两个缺点:
1、它的吞吐量相比RDB等内存数据库还是有很大差距的(大约一个数量级)。
2、在接收端,还需要根据traceid做数据关联。
我们使用自定义插件将数据异步写入RDB,并设置数据过期时间。存储在 RDB 中的数据结构中。写入时只使用traceid作为消息内容,通过metaQ通知下游计算服务,大大降低了metaQ的数据传输压力。
图聚合计算
cep/graph计算服务节点收到metaQ的通知后,会根据请求的链路数据和依赖的环境监测数据,实时生成诊断结果。诊断结果简化如下:
表示这个请求是下游JVM的线程池已满导致的,但是单个调用无法解释服务不可用的根本原因。如果需要分析整体错误情况,需要实时聚合图数据。
聚合设计如下(为了说明基本思想,简化处理):
1、首先利用redis的zrank能力,根据服务名或ip信息为每个节点分配一个全局唯一的排序号。
2、为图中的每个节点生成对应的图节点代码,编码格式:
对于头节点:头节点序号|归一化时间戳|节点代码
对于普通节点:|修正时间戳|节点代码
3、 由于每个节点在一个时间段内都有一个唯一的key,可以使用节点code作为key,redis可以用来统计每个节点。同时消除了并发读写的问题。
4、 使用redis中的set集合轻松叠加图的边。
5、记录根节点,可以通过遍历还原聚合图结构。
聚合后的结果大致如下:
这样,最终产生了服务不可用的整体原因,可以通过计算叶子节点来对根本原因进行排序。
5、收入
系统上线后,整个实时处理数据链路的延迟不超过三秒。定位闲鱼服务器问题的时间从十几分钟甚至更长的时间缩短到了不到五秒。大大提高了问题定位的效率。
6、未来展望
目前的系统可以支持闲鱼每秒几千万的数据处理能力。后续的问题自动定位服务可能会扩展到阿里巴巴内部更多的业务场景。结果,数据量翻了一番。因此,对效率和成本提出了更好的要求。
我们未来可能会做出的改进:
1、 可以自动减少或压缩处理后的数据。
2、Complex模型分析计算也可以在blink中完成,减少io,提高性能。
3、 支持多租户数据隔离。
原文链接