实时文章采集( 前面Flume和Kafka的实时数据源,怎么产生呢??)

优采云 发布时间: 2021-09-15 20:03

  实时文章采集(

前面Flume和Kafka的实时数据源,怎么产生呢??)

  水槽和卡夫卡完成实时数据处理采集

  写在前面

  Flume和Kafka通常在生产环境中一起使用。能够结合使用它们来采集实时日志信息非常重要。如果你不知道flume和Kafka,你可以先看看我对这两部分的了解。同样,这部分操作也是可能的。html

  实时数据采集,它面临一个问题。我们如何生成实时数据源?因为我们可能需要直接获取实时数据流,所以不太方便。我之前写过一篇文章文章,关于实时数据流的python*敏*感*词*文章地址:

  您可以看看如何生成实时数据。。。蟒蛇

  思路??如何开始??nginx

  分析:我们可以从数据流开始。数据在开始时位于Web服务器中。我们的访问日志由nginx服务器实时采集到指定的文件。我们从这个文件中采集日志数据,即:webserver=>水槽=>卡夫卡韦布

  web服务器日志文件的位置

  这个文件的位置通常是我们自己设置的shell

  我们的web日志存储在:

  /Apache在家/Hadoop/data/project/logs/access.log

  [hadoop@hadoop000 logs]$ pwd

/home/hadoop/data/project/logs

[hadoop@hadoop000 logs]$ ls

access.log

[hadoop@hadoop000 logs]$

  氟美芬

  Flume实际上是编写conf文件,它面临着类型选择的问题

  来源选择?频道选择?水槽选择?*敏*感*词*

  这里我们选择exec源内存通道Kafka*敏*感*词*服务器

  怎么写

  如前所述,步骤1234应用程序

  从官方网站上,我们可以了解如何编写我们的车型选择:

  1)configure source

  执行源

  # Describe/configure the source

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /home/hadoop/data/project/logs/access.log

a1.sources.r1.shell = /bin/sh -c

  2)configure通道

  存储通道

  a1.channels.c1.type = memory

  3)configure*敏*感*词*

  卡夫卡水槽

  flume1.Version 6可以被引用

  a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k1.brokerList = hadoop000:9092

a1.sinks.k1.topic = flume_kafka

a1.sinks.k1.batchSize = 5

a1.sinks.k1.requiredAcks =1

  4)串上述三个组件

  a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

  **让我们创建一个名为test3.conf

  发布我们分析的代码:**

  [hadoop@hadoop000 conf]$ vim test3.conf

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /home/hadoop/data/project/logs/access.log

a1.sources.r1.shell = /bin/sh -c

a1.channels.c1.type = memory

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k1.brokerList = hadoop000:9092

a1.sinks.k1.topic = flume_kafka

a1.sinks.k1.batchSize = 5

a1.sinks.k1.requiredAcks =1

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

  我们不要从这里开始。卡夫卡牵涉其中,我们必须首先部署卡夫卡

  卡夫卡的部署

  如何部署卡夫卡

  参考官方网站,让我们先启动zookeeper进程,然后启动Kafka的服务器

  步骤1:启动zookeeper

  [hadoop@hadoop000 ~]$

[hadoop@hadoop000 ~]$ jps

29147 Jps

[hadoop@hadoop000 ~]$ zkServer.sh start

JMX enabled by default

Using config: /home/hadoop/app/zk/bin/../conf/zoo.cfg

Starting zookeeper ... STARTED

[hadoop@hadoop000 ~]$ jps

29172 QuorumPeerMain

29189 Jps

[hadoop@hadoop000 ~]$

  步骤2:启动服务器

  [hadoop@hadoop000 ~]$ kafka-server-start.sh $KAFKA_HOME/config/server.properties

#外开一个窗口,查看jps

[hadoop@hadoop000 ~]$ jps

29330 Jps

29172 QuorumPeerMain

29229 Kafka

[hadoop@hadoop000 ~]$

  如果是,这部分不是很熟悉,可以参考

  步骤3:创建一个主题

  [hadoop@hadoop000 ~]$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flume_kafka

WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.

Created topic "flume_kafka".

[hadoop@hadoop000 ~]$

  步骤4:启动上一个代理

  [hadoop@hadoop000 conf]$ flume-ng agent --name a1 --conf . --conf-file ./test3.conf -Dflume.root.logger=INFO,console

  步骤5:启动消费者

  kafka-console-consumer.sh --zookeeper hadoop000:2181 –topic flume-kafka

  执行上述第五步后,您将收到屏幕刷屏结果,哈哈

  

  上面的消费者总是会刷屏幕,这仍然很有趣

  此处的消费者将接收到的数据发送到屏幕

  稍后,我们将介绍sparkstreaming用于为消费者实时接收数据,并且所接收的数据用于简单的数据清理,以从随机生成的日志中过滤我们需要的数据

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线