文章实时采集(1.实时数据采集3.Kafka实时流数据接入-吐血梳理)

优采云 发布时间: 2022-04-18 10:48

  文章实时采集(1.实时数据采集3.Kafka实时流数据接入-吐血梳理)

  各位朋友您好,我最近写了几篇关于实时数据分析的文章文章,都是基于当时的问题分析。今天打开这篇文章文章是因为实时分析工具已经实现了从0到1的业务数据对接,如果任何工具或功能不能与业务融合,那它所做的一切都是无用的,无法体现它的价值。所有的痛点和解决方案也来自于业务的使用。

  这个文章我就不讲怎么选型号了,因为网上有很多类似的文章,只要细心就能找到,但不管是什么型号选择,重点是“行业研究”,防止错误选择。我一般会根据以下三大前提来选款(以下陈述纯属个人观点,如有不妥请在下方评论)

  有很多公司在使用它,并且有很好的数据显示开源软件。关注社区情况。最近有没有继续迭代,防止自己进入深坑?最重要的是有没有和你类似的场景,你用过你要使用的工具。方便技术咨询

  行业研究,行业研究,行业研究重要的事情说三遍

  实时数据分析目前主要应用在业务场景中(很多公司对实时性有很强的需求)

  1. 实时数据访问共6个数据源

  2. 由于刚刚访问的日均数据量约为160万,当前摄取的数据量约为400万

  以往的文章直通车(链接地址这里就不贴了,可以百度搜索)

  1. 插件编写——Flume海量数据实时数据转换

  2.回顾-Flume+Kafka实时数据采集

  3.Kafka实时流式数据接入-吐血梳理与实践-Druid实时数据分析

  4. 实时数据分析 Druid - 环境部署&试用

  好了,以上就是简单的介绍,我们来说说今天的话题。

  一. 为什么要做实时流数据分析?

  以前不太喜欢碰数据,但总觉得没什么用。只有当我因为工作原因触及数据的门槛时,我才知道数据的重要性。

  通常我们根据过去的经验做出决定。俗话说“做这个应该没问题”,但没有数据支持往往不够准确,大概率会出现问题,所以我们要从【经验决策】走向【真实-时间数据驱动的决策],使所有行动都以数据为事实。

  

  二. 整体架构流程及分解

  首先介绍一下我要解决的需求和痛点:

  1. 实时流式数据摄取、显示图表、导出实时报告

  2. 分析以往报告,90% 数据汇总,无需详细数据

  根据上面的分析,选择了olap,最终选择了Apache Druid。

  什么是阿帕奇德鲁伊

  Druid 是一个分布式数据处理系统,支持实时多维 OLAP 分析。它既支持高速实时数据摄取处理,又支持实时灵活的多维数据分析查询。因此,Druid 最常用的场景是大数据背景下灵活快速的多维 OLAP 分析。此外,Druid 有一个关键特性:支持基于时间戳的数据预聚合摄取和聚合分析,因此一些用户经常在有时序数据处理和分析的场景中使用它。

  为什么来自 Druid 的亚秒级响应的交互式查询支持更高的并发性。支持实时导入,导入可查询,支持高并发导入。使用分布式无共享架构,它可以扩展到 PB 级别。支持聚合函数、count 和 sum,以及使用 javascript 实现自定义 UDF。支持复杂的聚合器,用于近似查询的聚合器,例如 HyperLoglog 和 6. 雅虎的开源 DataSketches。支持 Groupby、Select、Search 查询。不支持大表之间的join,但是它的lookup功能满足Join with dimension tables。(最新版本0.18已经支持Join,具体性能有待测试) 架构

  

  需求分析和核心引擎选型基本完成。先说一下整体架构

  建筑设计的三个原则

  适应原理 简单原理 进化原理

  选择合适的架构,切记不要过度设计,过度设计未必实用。

  架构图

  

  结构意图

  

  实时计算分析如何形成数据闭环,以下三点最重要

  1. 数据清洗改造:需要通过一定的规则和规范,保证业务方传输的数据实时清洗改造或建模

  2. 实时计算引擎:OLAP在线分析引擎选型

  3.离线存储:深度存储,保证实时OLAP性能,也可作为日常数据容灾

  

  三、 踩坑及解决方法

  由于第一次接触数据分析相关的场景,很多工具和知识都是从零开始的。我知道我应该尽快补足功课,尤其是实时场景应用。

  由于缺乏知识,在整体架构的构建和开发过程中存在许多问题。让我用图形的方式解释一下,这样就不会有学生对实时流数据不熟悉了。

  数据清洗和转换

  访问标准和规范非常重要。由于业务方数量众多,只要有标准的切割方法,每个业务方的日志规格很可能不一致(我们的工具不能要求业务方修改大量的日志规格)。

  在这种情况下,我们可以梳理出两种业务业态:

  1. 文本 -> Json

  原始日志

2019-02-11 19:03:30.123|INFO|1.0|10.10.10.10|push-service|trace_id:0001|msg:错误信息|token:abcd

清洗后

{"ts":"2020-05-07 16:29:05","times":"2019-02-11 19:03:30.123", "errLevel": "INFO", "version":"1.0" , "ip":"10.10.10.10", "service-name":"push-service", "trace_id": "trace_id:0001","msg": "msg:错误信息"}

  Json 结构 A -> Json 结构 B

  原始日志 -> Json结构A

{"ts":"2020-05-07 16:29:05","times":"2019-02-11 19:03:30.123", "errLevel": "INFO", "version":"1.0" , "ip":"10.10.10.10", "service-name":"push-service", "trace_id": "trace_id:0001","msg": "msg:错误信息"}

清洗、转换后

{"ts—time":"2020-05-07 16:29:05", "errLevel": "INFO"}

  最终统一输出JSON(标准化输入输出)

  流程图

  

  以上是标准的整体流程,为此我开发了两个Flume插件

  1. 文本 -> Json 插件

  a1.sources.r1.interceptors=i1

a1.sources.r1.interceptors.i1.type=***.flume.textToJson.TextToJsonBuilder

a1.sources.r1.interceptors.i1.textToJson={"times":"#0", "errLevel": "#1", "version":"#2" , "ip":"#3", "service-name":"#4", "trace_id": "#5","msg": "#6"}

a1.sources.r1.interceptors.i1.separator=\\,

  Json 结构 A -> Json 结构 B

  a1.sources.r1.interceptors=i1

a1.sources.r1.interceptors.i1.type=flume.***.StringTransJsonBuilder

a1.sources.r1.interceptors.i1.template={"scid":"data.data.data.scid","tpc":"data.data.tpc", "did": "data.data.did"}

a1.sources.r1.interceptors.i1.where={"key1":"value1", "data.key2":"value2"}

a1.sources.r1.interceptors.i1.addheader=comment

  上述过程没有任何问题。. . 但问题来了。

  由于我们是消费者业务端Kafka Topics,所以有这样一种场景,所有业务方都将数据放到一个大topic中,我们需要对数据进行清洗转换成我们需要的数据源。见下图:

  

  在上图的*敏*感*词*部分,接收到的主题数据必须经过*敏*感*词*的清洗和转换。由于业务topic有10个partition,如果我们启动一个Flume NG去消费,就会造成数据的积压。. .

  1. 业务主题有10个分区,单个Flume NG进程可以理解为1个分区。. . 严重不足

  2. 测试结果从业务端接收数据7小时,数据实际清洗2小时,数据继续被挤压

  对应解决方案:

  1. 启动 10 个 Flume 进程,相当于 10 个 Topic 分区,但这会消耗资源。. .

  2. Python 进行数据清理和转换。

  Flume NG 在内部为我们做了很多高可用。高可靠性保证,有限的资源只能暂时放弃这个计划。

  所以选择了方案2,放弃了高可用和高可靠,但是最终的结果还是很不错的,用Python的消费速度是10个Flume NG的两倍。

  结论:我们自己处理ETL,短期内是可行的,但长期来看还是要选择工具来处理。毕竟已经为我们准备了很多保障(要想做好工作,就必须先利好工具)。这句话不无道理。

  目前遇到的最大困难是清洗和转换。其他的小坑在之前的文章里已经写过了,大家可以搜索一下。

  阿帕奇德鲁伊

  我使用最新版本的 0.18。该版本官方公告已宣布加入支持,但尚未进行测试。

  该工具已经使用了一个多月,到目前为止它看起来很完美。

  待续

  很高兴这个项目能迈出一小步,我们的架构还要迭*敏*感*词*发,以后会继续更新这个系列文章哈哈

  特别感谢老板给我机会开发这个项目。. . 给我机会从我的工作中成长

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线