Flume架构及核心组件Flume的架构图:Flume实战案例

优采云 发布时间: 2021-07-31 22:14

  Flume架构及核心组件Flume的架构图:Flume实战案例

  Flume 概览

  官方文档:

  Flume 是一个分布式、高可靠、高可用的日志数据采集服务,可以高效地采集、聚合和移动大量的日志数据。它具有基于流数据的简单灵活的架构。它具有健壮性和容错性,具有可调节的可靠性机制以及许多故障转移和恢复机制。它使用简单且可扩展的数据模型来允许对应用程序进行在线分析。

  Flume 架构和核心组件

  Flume 的架构图:

  

  Flume 部署

  准备JDK环境:

  [root@hadoop01 ~]# java -version

java version "11.0.8" 2020-07-14 LTS

Java(TM) SE Runtime Environment 18.9 (build 11.0.8+10-LTS)

Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.8+10-LTS, mixed mode)

[root@hadoop01 ~]#

  下载 Flum:

  复制下载链接下载:

  [root@hadoop01 ~]# cd /usr/local/src

[root@hadoop01 /usr/local/src]# wget https://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.16.2.tar.gz

  解压到合适的目录:

  [root@hadoop01 /usr/local/src]# tar -zxvf flume-ng-1.6.0-cdh5.16.2.tar.gz -C /usr/local

[root@hadoop01 /usr/local/src]# cd /usr/local/apache-flume-1.6.0-cdh5.16.2-bin/

[root@hadoop01 /usr/local/apache-flume-1.6.0-cdh5.16.2-bin]# ls

bin CHANGELOG cloudera conf DEVNOTES docs lib LICENSE NOTICE README RELEASE-NOTES tools

[root@hadoop01 /usr/local/apache-flume-1.6.0-cdh5.16.2-bin]#

  配置环境变量:

  [root@hadoop01 /usr/local/apache-flume-1.6.0-cdh5.16.2-bin]# vim ~/.bash_profile

export FLUME_HOME=/usr/local/apache-flume-1.6.0-cdh5.16.2-bin

export PATH=$PATH:$FLUME_HOME/bin

[root@hadoop01 /usr/local/apache-flume-1.6.0-cdh5.16.2-bin]# source ~/.bash_profile

  编辑配置文件:

  [root@hadoop01 ~]# cp $FLUME_HOME/conf/flume-env.sh.template $FLUME_HOME/conf/flume-env.sh

[root@hadoop01 ~]# vim $FLUME_HOME/conf/flume-env.sh

# 配置JDK

export JAVA_HOME=/usr/local/jdk/11

export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"

  测试flume-ng命令:

  [root@hadoop01 ~]# flume-ng version

Flume 1.6.0-cdh5.16.2

Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git

Revision: df92badde3691ee3eb6074a177f0e96682345381

Compiled by jenkins on Mon Jun 3 03:49:33 PDT 2019

From source with checksum 9336bfa3ff8cfb5e20cd9d700135a2c1

[root@hadoop01 ~]#

  Flume实战案例-从指定网口采集输出数据到控制台

  使用Flume的关键是写配置文件:

  配置Source,配置Channel,配置Sink,将以上三个组件串起来

  所以先创建一个配置文件:

  [root@hadoop01 ~]# vim $FLUME_HOME/conf/netcat-example.conf

# a1是agent的名称

a1.sources = r1 # source的名称

a1.sinks = k1 # sink的名称

a1.channels = c1 # channel的名称

# 描述和配置source

a1.sources.r1.type = netcat # 指定source的类型为netcat

a1.sources.r1.bind = localhost # 指定source的ip

a1.sources.r1.port = 44444 # 指定source的端口

# 定义sink

a1.sinks.k1.type = logger # 指定sink类型,logger就是将数据输出到控制台

# 定义一个基于内存的channel

a1.channels.c1.type = memory # channel类型

a1.channels.c1.capacity = 1000 # channel的容量

a1.channels.c1.transactionCapacity = 100 # channel中每个事务的最大事件数

# 将source和sink绑定到channel上,即将三者串连起来

a1.sources.r1.channels = c1 # 指定r1这个source的channel为c1

a1.sinks.k1.channel = c1 # 指定k1这个sink的channel为c1

  启动代理:

  [root@hadoop01 ~]# flume-ng agent --name a1 -c $FLUME_HOME/conf -f $FLUME_HOME/conf/netcat-example.conf -Dflume.root.logger=INFO,console

  然后通过telnet命令向44444端口发送一些数据:

  [root@hadoop01 ~]# telnet localhost 44444

...

hello flume

OK

  此时会看到flume的输出中打印了接收到的数据:

  2020-11-02 16:08:47,965 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 20 66 6C 75 6D 65 0D hello flume. }

  Flume实战案例-实时监控一个文件采集新增数据输出到控制台

  同理,先创建一个配置文件:

  [root@hadoop01 ~]# vim $FLUME_HOME/conf/file-example.conf

# a1是agent的名称

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# 描述和配置source

a1.sources.r1.type = exec

a1.sources.r1.command = tail -f /data/data.log

a1.sources.r1.shell = /bin/sh -c

# 定义sink

a1.sinks.k1.type = logger

# 定义一个基于内存的channel

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# 将source和sink绑定到channel上,即将三者串连起来

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

  创建一个测试文件:

  [root@hadoop01 ~]# touch /data/data.log

  启动代理:

  [root@hadoop01 ~]# flume-ng agent --name a1 -c $FLUME_HOME/conf -f $FLUME_HOME/conf/file-example.conf -Dflume.root.logger=INFO,console

  向 data.log 写入一些内容:

  [root@hadoop01 ~]# echo "hello flume" >> /data/data.log

[root@hadoop01 ~]# echo "hello world" >> /data/data.log

  此时flume的输出中,会看到打印了监控文件的新数据:

  2020-11-02 16:21:26,946 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 20 66 6C 75 6D 65 hello flume }

2020-11-02 16:21:38,707 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64 hello world }

  Flume 实际案例-实时采集 登录服务器 A 到服务器 B

  要达到这个要求,需要使用Avro的Source和SInk。流程图如下:

  

  为了测试方便,我这里用机器进行模拟。首先,A机的配置文件如下:

  [root@hadoop01 ~]# vim $FLUME_HOME/conf/exec-memory-avro.conf

# 定义各个组件的名称

exec-memory-avro.sources = exec-source

exec-memory-avro.sinks = avro-sink

exec-memory-avro.channels = memory-channel

# 描述和配置source

exec-memory-avro.sources.exec-source.type = exec

exec-memory-avro.sources.exec-source.command = tail -f /data/data.log

exec-memory-avro.sources.exec-source.shell = /bin/sh -c

# 定义sink

exec-memory-avro.sinks.avro-sink.type = avro

exec-memory-avro.sinks.avro-sink.hostname = hadoop01

exec-memory-avro.sinks.avro-sink.port = 44444

# 定义一个基于内存的channel

exec-memory-avro.channels.memory-channel.type = memory

exec-memory-avro.channels.memory-channel.capacity = 1000

exec-memory-avro.channels.memory-channel.transactionCapacity = 100

# 将source和sink绑定到channel上,即将三者串连起来

exec-memory-avro.sources.exec-source.channels = memory-channel

exec-memory-avro.sinks.avro-sink.channel = memory-channel

  机器B的配置文件如下:

  [root@hadoop01 ~]# vim $FLUME_HOME/conf/avro-memory-logger.conf

# 定义各个组件的名称

avro-memory-logger.sources = avro-source

avro-memory-logger.sinks = logger-sink

avro-memory-logger.channels = memory-channel

# 描述和配置source

avro-memory-logger.sources.avro-source.type = avro

avro-memory-logger.sources.avro-source.bind = hadoop01

avro-memory-logger.sources.avro-source.port = 44444

# 定义sink

avro-memory-logger.sinks.logger-sink.type = logger

# 定义一个基于内存的channel

avro-memory-logger.channels.memory-channel.type = memory

avro-memory-logger.channels.memory-channel.capacity = 1000

avro-memory-logger.channels.memory-channel.transactionCapacity = 100

# 将source和sink绑定到channel上,即将三者串连起来

avro-memory-logger.sources.avro-source.channels = memory-channel

avro-memory-logger.sinks.logger-sink.channel = memory-channel

  先启动机器B的agent,否则机器A的agent如果不能*敏*感*词*目标机器的端口可能会报错:

  [root@hadoop01 ~]# flume-ng agent --name avro-memory-logger -c $FLUME_HOME/conf -f $FLUME_HOME/conf/avro-memory-logger.conf -Dflume.root.logger=INFO,console

  启动机器A的代理:

  [root@hadoop01 ~]# flume-ng agent --name exec-memory-avro -c $FLUME_HOME/conf -f $FLUME_HOME/conf/exec-memory-avro.conf -Dflume.root.logger=INFO,console

  向 data.log 写入一些内容:

  [root@hadoop01 ~]# echo "hello flume" >> /data/data.log

[root@hadoop01 ~]# echo "hello world" >> /data/data.log

[root@hadoop01 ~]# echo "hello avro" >> /data/data.log

  此时B机的agent在控制台的输出如下,于是我们就实现了A服务器上的日志实时采集到B服务器的功能:

  2020-11-02 17:05:20,929 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 20 66 6C 75 6D 65 hello flume }

2020-11-02 17:05:21,486 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64 hello world }

2020-11-02 17:05:51,505 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 20 61 76 72 6F hello avro }

  集成Flume和Kafka完成实时数据采集

  在上面的例子中,Agent B 将采集到的数据下沉到控制台,但在实际应用中它显然不会这样做。相反,它通常将数据接收到外部数据源。如HDFS、ES、Kafka等。在实时流处理架构中,大多数情况下,Sink到Kafka,然后下游消费者(一个或多个)接收数据进行实时处理。如下图所示:

  

  所以基于前面的例子,这里是如何集成Kafka的。其实很简单,把Logger Sink改成Kafka Sink就可以了。切换到Kafka后的流程如下:

  

  新建一个配置文件,内容如下:

  [root@hadoop01 ~]# vim $FLUME_HOME/conf/avro-memory-kafka.conf

# 定义各个组件的名称

avro-memory-kafka.sources = avro-source

avro-memory-kafka.sinks = kafka-sink

avro-memory-kafka.channels = memory-channel

# 描述和配置source

avro-memory-kafka.sources.avro-source.type = avro

avro-memory-kafka.sources.avro-source.bind = hadoop01

avro-memory-kafka.sources.avro-source.port = 44444

# 定义sink

avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink

avro-memory-kafka.sinks.kafka-sink.brokerList = kafka01:9092

avro-memory-kafka.sinks.kafka-sink.topic = flume-topic

# 一个批次里发送多少消息

avro-memory-kafka.sinks.kafka-sink.batchSize = 5

# 指定采用的ack模式,可以参考kafka的ack机制

avro-memory-kafka.sinks.kafka-sink.requiredAcks = 1

# 定义一个基于内存的channel

avro-memory-kafka.channels.memory-channel.type = memory

avro-memory-kafka.channels.memory-channel.capacity = 1000

avro-memory-kafka.channels.memory-channel.transactionCapacity = 100

# 将source和sink绑定到channel上,即将三者串连起来

avro-memory-kafka.sources.avro-source.channels = memory-channel

avro-memory-kafka.sinks.kafka-sink.channel = memory-channel

  配置完成后,启动Agent:

  [root@hadoop01 ~]# flume-ng agent --name avro-memory-kafka -c $FLUME_HOME/conf -f $FLUME_HOME/conf/avro-memory-kafka.conf -Dflume.root.logger=INFO,console

  然后启动另一个代理:

  [root@hadoop01 ~]# flume-ng agent --name exec-memory-avro -c $FLUME_HOME/conf -f $FLUME_HOME/conf/exec-memory-avro.conf -Dflume.root.logger=INFO,console

  启动一个Kafka消费者,方便观察Kafka收到的数据:

  [root@kafka01 ~]# kafka-console-consumer.sh --bootstrap-server kafka01:9092 --topic flume-topic --from-beginning

  向 data.log 写入一些内容:

  [root@hadoop01 ~]# echo "hello kafka sink" >> /data/data.log

[root@hadoop01 ~]# echo "hello flume" >> /data/data.log

[root@hadoop01 ~]# echo "hello agent" >> /data/data.log

  此时Kafka消费者控制台会正常输出如下内容,证明Flume和Kafka已经成功集成:

  [root@kafka01 ~]# kafka-console-consumer.sh --bootstrap-server kafka01:9092 --topic flume-topic --from-beginning

hello kafka sink

hello flume

hello agent

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线