文章实时采集(1.实时数据采集3.Kafka实时流数据接入-吐血梳理)
优采云 发布时间: 2022-04-18 10:48文章实时采集(1.实时数据采集3.Kafka实时流数据接入-吐血梳理)
各位朋友您好,我最近写了几篇关于实时数据分析的文章文章,都是基于当时的问题分析。今天打开这篇文章文章是因为实时分析工具已经实现了从0到1的业务数据对接,如果任何工具或功能不能与业务融合,那它所做的一切都是无用的,无法体现它的价值。所有的痛点和解决方案也来自于业务的使用。
这个文章我就不讲怎么选型号了,因为网上有很多类似的文章,只要细心就能找到,但不管是什么型号选择,重点是“行业研究”,防止错误选择。我一般会根据以下三大前提来选款(以下陈述纯属个人观点,如有不妥请在下方评论)
有很多公司在使用它,并且有很好的数据显示开源软件。关注社区情况。最近有没有继续迭代,防止自己进入深坑?最重要的是有没有和你类似的场景,你用过你要使用的工具。方便技术咨询
行业研究,行业研究,行业研究重要的事情说三遍
实时数据分析目前主要应用在业务场景中(很多公司对实时性有很强的需求)
1. 实时数据访问共6个数据源
2. 由于刚刚访问的日均数据量约为160万,当前摄取的数据量约为400万
以往的文章直通车(链接地址这里就不贴了,可以百度搜索)
1. 插件编写——Flume海量数据实时数据转换
2.回顾-Flume+Kafka实时数据采集
3.Kafka实时流式数据接入-吐血梳理与实践-Druid实时数据分析
4. 实时数据分析 Druid - 环境部署&试用
好了,以上就是简单的介绍,我们来说说今天的话题。
一. 为什么要做实时流数据分析?
以前不太喜欢碰数据,但总觉得没什么用。只有当我因为工作原因触及数据的门槛时,我才知道数据的重要性。
通常我们根据过去的经验做出决定。俗话说“做这个应该没问题”,但没有数据支持往往不够准确,大概率会出现问题,所以我们要从【经验决策】走向【真实-时间数据驱动的决策],使所有行动都以数据为事实。
二. 整体架构流程及分解
首先介绍一下我要解决的需求和痛点:
1. 实时流式数据摄取、显示图表、导出实时报告
2. 分析以往报告,90% 数据汇总,无需详细数据
根据上面的分析,选择了olap,最终选择了Apache Druid。
什么是阿帕奇德鲁伊
Druid 是一个分布式数据处理系统,支持实时多维 OLAP 分析。它既支持高速实时数据摄取处理,又支持实时灵活的多维数据分析查询。因此,Druid 最常用的场景是大数据背景下灵活快速的多维 OLAP 分析。此外,Druid 有一个关键特性:支持基于时间戳的数据预聚合摄取和聚合分析,因此一些用户经常在有时序数据处理和分析的场景中使用它。
为什么来自 Druid 的亚秒级响应的交互式查询支持更高的并发性。支持实时导入,导入可查询,支持高并发导入。使用分布式无共享架构,它可以扩展到 PB 级别。支持聚合函数、count 和 sum,以及使用 javascript 实现自定义 UDF。支持复杂的聚合器,用于近似查询的聚合器,例如 HyperLoglog 和 6. 雅虎的开源 DataSketches。支持 Groupby、Select、Search 查询。不支持大表之间的join,但是它的lookup功能满足Join with dimension tables。(最新版本0.18已经支持Join,具体性能有待测试) 架构
需求分析和核心引擎选型基本完成。先说一下整体架构
建筑设计的三个原则
适应原理 简单原理 进化原理
选择合适的架构,切记不要过度设计,过度设计未必实用。
架构图
结构意图
实时计算分析如何形成数据闭环,以下三点最重要
1. 数据清洗改造:需要通过一定的规则和规范,保证业务方传输的数据实时清洗改造或建模
2. 实时计算引擎:OLAP在线分析引擎选型
3.离线存储:深度存储,保证实时OLAP性能,也可作为日常数据容灾
三、 踩坑及解决方法
由于第一次接触数据分析相关的场景,很多工具和知识都是从零开始的。我知道我应该尽快补足功课,尤其是实时场景应用。
由于缺乏知识,在整体架构的构建和开发过程中存在许多问题。让我用图形的方式解释一下,这样就不会有学生对实时流数据不熟悉了。
数据清洗和转换
访问标准和规范非常重要。由于业务方数量众多,只要有标准的切割方法,每个业务方的日志规格很可能不一致(我们的工具不能要求业务方修改大量的日志规格)。
在这种情况下,我们可以梳理出两种业务业态:
1. 文本 -> Json
原始日志
2019-02-11 19:03:30.123|INFO|1.0|10.10.10.10|push-service|trace_id:0001|msg:错误信息|token:abcd
清洗后
{"ts":"2020-05-07 16:29:05","times":"2019-02-11 19:03:30.123", "errLevel": "INFO", "version":"1.0" , "ip":"10.10.10.10", "service-name":"push-service", "trace_id": "trace_id:0001","msg": "msg:错误信息"}
Json 结构 A -> Json 结构 B
原始日志 -> Json结构A
{"ts":"2020-05-07 16:29:05","times":"2019-02-11 19:03:30.123", "errLevel": "INFO", "version":"1.0" , "ip":"10.10.10.10", "service-name":"push-service", "trace_id": "trace_id:0001","msg": "msg:错误信息"}
清洗、转换后
{"ts—time":"2020-05-07 16:29:05", "errLevel": "INFO"}
最终统一输出JSON(标准化输入输出)
流程图
以上是标准的整体流程,为此我开发了两个Flume插件
1. 文本 -> Json 插件
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=***.flume.textToJson.TextToJsonBuilder
a1.sources.r1.interceptors.i1.textToJson={"times":"#0", "errLevel": "#1", "version":"#2" , "ip":"#3", "service-name":"#4", "trace_id": "#5","msg": "#6"}
a1.sources.r1.interceptors.i1.separator=\\,
Json 结构 A -> Json 结构 B
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=flume.***.StringTransJsonBuilder
a1.sources.r1.interceptors.i1.template={"scid":"data.data.data.scid","tpc":"data.data.tpc", "did": "data.data.did"}
a1.sources.r1.interceptors.i1.where={"key1":"value1", "data.key2":"value2"}
a1.sources.r1.interceptors.i1.addheader=comment
上述过程没有任何问题。. . 但问题来了。
由于我们是消费者业务端Kafka Topics,所以有这样一种场景,所有业务方都将数据放到一个大topic中,我们需要对数据进行清洗转换成我们需要的数据源。见下图:
在上图的*敏*感*词*部分,接收到的主题数据必须经过*敏*感*词*的清洗和转换。由于业务topic有10个partition,如果我们启动一个Flume NG去消费,就会造成数据的积压。. .
1. 业务主题有10个分区,单个Flume NG进程可以理解为1个分区。. . 严重不足
2. 测试结果从业务端接收数据7小时,数据实际清洗2小时,数据继续被挤压
对应解决方案:
1. 启动 10 个 Flume 进程,相当于 10 个 Topic 分区,但这会消耗资源。. .
2. Python 进行数据清理和转换。
Flume NG 在内部为我们做了很多高可用。高可靠性保证,有限的资源只能暂时放弃这个计划。
所以选择了方案2,放弃了高可用和高可靠,但是最终的结果还是很不错的,用Python的消费速度是10个Flume NG的两倍。
结论:我们自己处理ETL,短期内是可行的,但长期来看还是要选择工具来处理。毕竟已经为我们准备了很多保障(要想做好工作,就必须先利好工具)。这句话不无道理。
目前遇到的最大困难是清洗和转换。其他的小坑在之前的文章里已经写过了,大家可以搜索一下。
阿帕奇德鲁伊
我使用最新版本的 0.18。该版本官方公告已宣布加入支持,但尚未进行测试。
该工具已经使用了一个多月,到目前为止它看起来很完美。
待续
很高兴这个项目能迈出一小步,我们的架构还要迭*敏*感*词*发,以后会继续更新这个系列文章哈哈
特别感谢老板给我机会开发这个项目。. . 给我机会从我的工作中成长