文章实时采集( 实时数据流的python产生器选型?选型选型?)

优采云 发布时间: 2021-12-18 20:19

  文章实时采集(

实时数据流的python产生器选型?选型选型?)

  Flume和Kafka完成实时数据采集

  写在前面

  Flume和Kafka一般在生产环境中结合使用。可以将两者结合使用来采集实时日志信息,这一点非常重要。如果你不知道flume和kafka,可以先查一下我写的关于这两部分的知识。再学习一下,这部分操作也是可以的。

  采集 的实时数据面临一个问题。我们如何生成我们的实时数据源?因为我们可能想直接获取实时数据流不是那么方便。在文章之前写过一篇关于实时数据流的python*敏*感*词*的文章,文章地址:

  大家可以先看看,如何生成实时数据...

  主意?? 如何开始??

  分析:我们可以从数据的流向开始。数据一开始就在网络服务器上。我们的访问日志是nginx服务器实时采集到指定文件的。我们从这个文件中采集日志数据,即:webserver=>flume=>kafka

  网络服务器日志存储文件位置

  这个文件的位置一般是我们自己设置的

  我们的网络日志存储的目录是:

  /home/hadoop/data/project/logs/access.log

  [hadoop@hadoop000 logs]$ pwd

/home/hadoop/data/project/logs

[hadoop@hadoop000 logs]$ ls

access.log

[hadoop@hadoop000 logs]$

  水槽

  做flume其实就是写一个conf文件,所以面临选择的问题

  来源选择?频道选择?水槽选择?

  这里我们选择exec source memory channel kafka sink

  怎么写?

  按照前面提到的步骤 1234

  从官方网站上,我们可以找到我们的选择应该怎么写:

  1) 配置源

  执行源

  # Describe/configure the source

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /home/hadoop/data/project/logs/access.log

a1.sources.r1.shell = /bin/sh -c

  2) 配置通道

  记忆通道

  a1.channels.c1.type = memory

  3) 配置*敏*感*词*

  卡夫卡水槽

  水槽1.6 版本可以参考

  a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k1.brokerList = hadoop000:9092

a1.sinks.k1.topic = flume_kafka

a1.sinks.k1.batchSize = 5

a1.sinks.k1.requiredAcks =1

  4) 将以上三个组件串在一起

  a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

  **我们的新文件叫做 test3.conf

  粘贴我们分析的代码:**

  [hadoop@hadoop000 conf]$ vim test3.conf

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /home/hadoop/data/project/logs/access.log

a1.sources.r1.shell = /bin/sh -c

a1.channels.c1.type = memory

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k1.brokerList = hadoop000:9092

a1.sinks.k1.topic = flume_kafka

a1.sinks.k1.batchSize = 5

a1.sinks.k1.requiredAcks =1

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

  这里不展开了,因为涉及kafka的东西,首先要部署kafka,

  Kafka部署

  Kafka是如何部署的??

  按照官网的说法,我们先启动一个zookeeper进程,然后就可以启动kafka服务器了

  第一步:启动zookeeper

  [hadoop@hadoop000 ~]$

[hadoop@hadoop000 ~]$ jps

29147 Jps

[hadoop@hadoop000 ~]$ zkServer.sh start

JMX enabled by default

Using config: /home/hadoop/app/zk/bin/../conf/zoo.cfg

Starting zookeeper ... STARTED

[hadoop@hadoop000 ~]$ jps

29172 QuorumPeerMain

29189 Jps

[hadoop@hadoop000 ~]$

  第二步:启动服务器

  [hadoop@hadoop000 ~]$ kafka-server-start.sh $KAFKA_HOME/config/server.properties

#外开一个窗口,查看jps

[hadoop@hadoop000 ~]$ jps

29330 Jps

29172 QuorumPeerMain

29229 Kafka

[hadoop@hadoop000 ~]$

  如果这部分不是很熟悉,可以参考

  第 3 步:创建主题

  [hadoop@hadoop000 ~]$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flume_kafka

WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.

Created topic "flume_kafka".

[hadoop@hadoop000 ~]$

  第四步:启动之前的代理

   [hadoop@hadoop000 conf]$ flume-ng agent --name a1 --conf . --conf-file ./test3.conf -Dflume.root.logger=INFO,console

  第 5 步:启动消费者

  kafka-console-consumer.sh --zookeeper hadoop000:2181 –topic flume-kafka

  执行完上面的第五步,就会收到刷新屏幕的结果了,哈哈哈!!

  

  上面的消费者会不断刷新屏幕,还是很有意思的!!!

  这里的消费者就是把接收到的数据放到屏幕上

  后面我们会介绍使用SparkStreaming作为消费者实时接收数据,接收的数据是为了简单的数据清洗而开发的,从随机生成的日志中过滤掉我们需要的数据……

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线