解决方案:复盘-Flume+Kafka实时数据采集

优采云 发布时间: 2020-08-30 04:28

  复盘-Flume+Kafka实时数据采集

  背景

  今天给你们分享一下我近来遇见的坑,也不能说是坑,严格来说是我知识广度不够,导致当时认为此问题无解,互联网没有解决不了的问题,只要认真对待就一定可以解决。

  下面我们来进行一次复盘,希望可以帮助到须要帮助的朋友。

  最近在关注Apache Druid(实时数据剖析),它有一个功能可以通过Kafka实时摄入数据,并提供了挺好的生态工具,在督查过程中所有的流程均已打通,已打算接入我司数据进行一下容量及响应速率测试,可以说:万事具备只欠东风。。。然而发觉这个东风,版本很低。。。最终结果可能会造成如此好用的数据剖析工具难以使用

  复盘&解决方案

  一、Kafka生产消费版本不一致未能将数据部门的数据搜集到我们工具中

  Apache Druid实时数据摄入Kafa最低要求版本0.11我司Kafka0.10

  二、这么看我司Kafka一定是不能升级的,工具也难以降级,怎么办呢?

  放弃apache druid(换工具),调研中也在关注Clickhouse,但还差一点就成功了舍弃有些可惜。。。这条路只能是无路可走时选择

  采用中转消费进程来处理,共有2个方案

  

  不能立刻解决的疼点:每天预计有N亿数据经过这个服务。这样就得先解决:高可用、高可靠问题,数据不能遗失,同时也带来维护成本,总之临时用用可以,待解决问题不少,待使用.

  

  这个方案比Golang轮询方案惟一用处数据保存到c盘了,感官上比较靠谱。。。好吧,那就这个方案,准备撸代码。

  三、我们大致总结下来解决方案,看起来没问题,但是问题仍然存在

  如果c盘坏了如何办? 单点问题服务不可靠,如处理N亿数据,要对这个服务进行监控,报警,故障处理等,需要大量人工介入,成本偏高单机io性能消耗严重,读写频繁很容易出问题,未知性很大

  四、以上方案还是不太满意,继续找解决办法,不舍弃

  基于以上缘由,能否有第三方开源软件解决呢。明知山有虎偏向虎山行,这么做不行,不管是做人做事,尽量不要给他人留下坑,同时这也是自己的学习过程。

  带着问题,加了会好多相关QQ群,找跟我同病相怜的人,看看有啥解决办法。

  有这么一句话只要你坚持了肯定都会有答案。。。Druid有一位美眉也遇见跟我同样的问题,她是使用Flume来解决

  

  Flume介绍

  详细介绍请自行百度

  优势Flume可以将应用形成的数据储存到任何集中存储器中,比如HDFS,HBase当搜集数据的速率超过将写入数据的时侯,也就是当搜集信息遇见峰值时,这时候搜集的信息十分大,甚至超过了系统的写入数据能力,这时候,Flume会在数据生产者和数据收容器间作出调整,保证其才能在二者之间提供平稳的数据.提供上下文路由特点Flume的管线是基于事务,保证了数据在传送和接收时的一致性.Flume是可靠的,容错性高的,可升级的,易管理的,并且可订制的。具有特点Flume可以高效率的将多个网站服务器中搜集的日志信息存入HDFS/HBase中使用Flume,我们可以将从多个服务器中获取的数据迅速的移交给Hadoop中不仅日志信息,Flume同时也可以拿来接入搜集规模宏大的社交网络节点风波数据,比如facebook,twitter,电商网站如亚马逊,flipkart等支持各类接入资源数据的类型以及接出数据类型支持多路径流量,多管线接入流量,多管线接出流量,上下文路由等可以被水平扩充Flume 验证方案可行性

  高可用,高可靠的第三方开源已找到,见证奇迹的时刻到了。

  首先安装两个版本Kafka,下载地址官网可以找到: *敏*感*词*安装kafka0.10 1. kafka_2.10-0.10.0.0.tgz 2. kafka_2.11-0.11.0.3.tgz

  Flume最新版本1.9 1、apache-flume-1.9.0-bin.tar.gz

  操作很多,我把配置文件贴下来,如果有问题可以加我QQ:979134,请注明缘由

  # 定义这个agent组件名称

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# =======================使用内置kafka source

a1.sources.r1.kafka.bootstrap.servers=ip:9092

a1.sources.r1.kafka.topics=kafka0-10-0

a1.sources.r1.kafka.consumer.security.protocol=SASL_PLAINTEXT

a1.sources.r1.kafka.consumer.sasl.mechanism=PLAIN

a1.sources.r1.kafka.consumer.group.id=groupid

a1.sources.r1.kafka.consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="用户名" password="密码";

a1.sources.r1.type=org.apache.flume.source.kafka.KafkaSource

a1.sources.r1.batchSize=5000

# =======================对sources进行*敏*感*词*操作 channel中会带有Header,我做了remove_header操作

a1.sources.r1.interceptors=i1

a1.sources.r1.interceptors.i1.type=remove_header

a1.sources.r1.interceptors.i1.fromList=timestamp,topic,offset,partition

a1.sources.r1.channels=c1

# channel设置 a1.sinks.k1.type有很多种,如、memory、file、jdbc、kafka 我使用kafka做为通道

# channel 先把数据发动到kafka缓存通道,处理完成sink接收,之后进行producer

a1.channels.c1.type=org.apache.flume.channel.kafka.KafkaChannel

a1.channels.c1.kafka.bootstrap.servers=127.0.0.1:9002

a1.channels.c1.kafka.topic=kafka-channel

# =======================目标生产数据

#a1.sinks.k1.type=logger 打开这里可以验证从channel传递过来的数据是否是你想要的

a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k1.kafka.topic=kafka2-1

a1.sinks.k1.kafka.bootstrap.servers=127.0.0.1:9002

a1.sinks.k1.kafka.flumeBatchSize= 1000

a1.sinks.k1.kafka.producer.acks = 1

a1.sinks.k1.kafka.producer.compression.type = snappy

# Bind the source and sink to the channel

a1.sources.r1.channels=c1

a1.sinks.k1.channel=c1

  流程回顾&最终解决,用上第三方开源软件,我也可以安安稳稳午睡啦

  

  最终数据完美接入到Druid

  

  看到这儿我已写了3小时,您是否可以给个赞和喜欢,感谢

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线