Flume和Kafka完成实时数据采集

优采云 发布时间: 2020-08-06 16:17

  Flume和Kafka完成实时数据采集

  写在前面

  在生产环境中,通常将水槽和Kafka结合使用. 可以同时使用它们两者来采集实时日志信息非常重要. 如果您不懂水槽和卡夫卡,可以先检查一下我写的关于这两部分的知识. 让我们再次学习,这部分操作也是可能的.

  实时数据的采集面临一个问题. 我们如何生成实时数据源?因为我们可能想直接获取实时数据流不是那么方便. 我之前写了一篇有关实时数据流的python*敏*感*词*的文章,文章地址:

  您可以先看看如何生成实时数据...

  在想什么? ?如何开始? ?

  分析: 我们可以从数据流开始. 数据首先位于Web服务器中. 我们的访问日志由Nginx服务器实时采集到指定文件中. 我们从该文件采集日志数据,即: webserver => flume => kafka

  Web服务器日志存储文件位置

  此文件的位置通常由我们自己设置

  我们的网络日志的存储目录为:

  /home/hadoop/data/project/logs/access.log下面

  [hadoop@hadoop000 logs]$ pwd

/home/hadoop/data/project/logs

[hadoop@hadoop000 logs]$ ls

access.log

[hadoop@hadoop000 logs]$

  水槽

  做水槽实际上是写一个conf文件,所以面临选择的问题

  来源选择?频道选择?选择*敏*感*词*?

  在这里,我们选择exec源存储通道kafka sink

  怎么写?

  按照前面提到的步骤1234

  在官方网站上,我们可以找到应如何选择:

  1)配置源

  exec来源

  # Describe/configure the source

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /home/hadoop/data/project/logs/access.log

a1.sources.r1.shell = /bin/sh -c

  2)配置频道

  内存频道

  a1.channels.c1.type = memory

  3)配置*敏*感*词*

  卡夫卡水槽

  对于flume1.6版本,请参阅#kafka-sink

  a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k1.brokerList = hadoop000:9092

a1.sinks.k1.topic = flume_kafka

a1.sinks.k1.batchSize = 5

a1.sinks.k1.requiredAcks =1

  将以上三个组成部分组合在一起

  a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

  我们的新文件称为test3.conf

  粘贴我们自己分析的代码:

  [hadoop@hadoop000 conf]$ vim test3.conf

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /home/hadoop/data/project/logs/access.log

a1.sources.r1.shell = /bin/sh -c

a1.channels.c1.type = memory

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k1.brokerList = hadoop000:9092

a1.sinks.k1.topic = flume_kafka

a1.sinks.k1.batchSize = 5

a1.sinks.k1.requiredAcks =1

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

  我们不会从这里开始,因为它涉及kafka的事情,我们必须首先部署kafka,

  Kafka部署

  如何部署Kafka? ?

  请访问官方网站,我们首先启动Zookeeper流程,然后才能启动kafka服务器

  第1步: 启动动物园管理员

  [hadoop@hadoop000 ~]$

[hadoop@hadoop000 ~]$ jps

29147 Jps

[hadoop@hadoop000 ~]$ zkServer.sh start

JMX enabled by default

Using config: /home/hadoop/app/zk/bin/../conf/zoo.cfg

Starting zookeeper ... STARTED

[hadoop@hadoop000 ~]$ jps

29172 QuorumPeerMain

29189 Jps

[hadoop@hadoop000 ~]$

  第2步: 启动服务器

  [hadoop@hadoop000 ~]$ kafka-server-start.sh $KAFKA_HOME/config/server.properties

#外开一个窗口,查看jps

[hadoop@hadoop000 ~]$ jps

29330 Jps

29172 QuorumPeerMain

29229 Kafka

[hadoop@hadoop000 ~]$

  如果这部分不是很熟悉,您可以参考

  第3步: 创建主题

  [hadoop@hadoop000 ~]$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flume_kafka

WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.

Created topic "flume_kafka".

[hadoop@hadoop000 ~]$

  第4步: 启动上一个代理

   [hadoop@hadoop000 conf]$ flume-ng agent --name a1 --conf . --conf-file ./test3.conf -Dflume.root.logger=INFO,console

  第5步: 启动消费者

  kafka-console-consumer.sh --zookeeper hadoop000:2181 –topic flume-kafka

  执行上述第五步后,您将收到刷新屏幕的结果,哈哈哈! !

  

  上面的消费者将不断刷新屏幕,这仍然非常有趣!!!

  这里的使用者将接收到的数据放在屏幕上

  稍后,我们将介绍使用SparkStreaming作为使用者来实时接收数据,并对接收到的数据进行简单的数据清理,并从随机生成的日志中过滤出所需的数据...

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线