实时文章采集(环境说明准备工作flume安装暂略,后续更新(组图) )

优采云 发布时间: 2021-11-26 16:32

  实时文章采集(环境说明准备工作flume安装暂略,后续更新(组图)

)

  环境说明准备水槽安装

  暂时省略,后续更新

  水槽介绍

  Apache Flume 是一个分布式、可靠且可用的系统,用于有效地采集、聚合大量日志数据,并将其从许多不同来源移动到集中式数据存储。在大数据生态中,flume经常被用来完成数据采集的工作。

  

  它的实时性非常高,延迟在1-2s左右,可以是准实时的。

  并且因为mysql是程序员常用的数据库,所以以flume实时采集mysql数据库为例。要了解flume 采集 数据是如何处理的,我们必须先探索它的架构:

  Flume运行的核心是Agent。Flume 以 agent 为最小的独立运行单元。代理是一个 JVM。它是一个完整的数据采集工具,具有三个核心组件,即

  源,通道,汇。通过这些组件,Event 可以从一处流向另一处,如下图所示。

  

  三大组件

  来源

  Source是数据采集端,负责在数据被捕获后进行特殊格式化,将数据封装在一个事件中,然后将事件推送到Channel中。

  Flume提供了多种源码实现,包括Avro Source、Exce Source、Spooling Directory Source、NetCat Source、Syslog Source、Syslog TCP Source、Syslog UDP Source、HTTP Source、HDFS Source等,如果内置的Source不能满足你的需求, Flume 还支持自定义 Source。

  可以看到原生flume的源码不支持sql源码,所以我们需要添加一个插件,如何添加后面会讲到。

  渠道

  Channel是连接Source和Sink的组件。您可以将其视为数据缓冲区(数据队列)。它可以将事件临时存储在内存中或将其持久保存在本地磁盘上,直到*敏*感*词*处理完事件。

  对于Channel,Flume提供了Memory Channel、JDBC Chanel、File Channel等。

  下沉

  Flume Sink 取出Channel 中的数据,存储在文件系统、数据库中,或者提交到远程服务器。

  Flume 还提供了各种 sink 实现,包括 HDFS sink、Logger sink、Avro sink、File Roll sink、Null sink、HBase sink 等。

  当 Flume Sink 设置为存储数据时,您可以将数据存储在文件系统、数据库和 hadoop 中。当日志数据较小时,可以将数据存储在文件系统中,并设置一定的时间间隔来保存数据。当日志数据较多时,可以将相应的日志数据存储在Hadoop中,方便日后进行相应的数据分析。

  在这个例子中,我使用 kafka 作为*敏*感*词*

  下载flume-ng-sql-source插件

  在此下载flume-ng-sql-source,最新版本为1.5.3。

  下载后解压,通过idea运行程序,用maven打包成jar包,重命名为flume-ng-sql-source-1.5.3.jar

  编译好的jar包应该放在FLUME_HOME/lib下,FLUME_HOME是你linux下flume的文件夹,比如我的是/opt/install/flume

  卡夫卡安装

  我们使用flume将数据采集传输到kafka,并启动kafak消费监控,可以看到实时数据

  jdk1.8 安装

  暂时省略,后续更新

  动物园管理员安装

  暂时省略,后续更新

  卡夫卡安装

  暂时省略,后续更新

  mysql5.7.24安装

  暂时省略,后续更新

  Flume提取mysql数据到Kafka新建数据库和表

  完成以上安装工作后,就可以开始实现demo了。

  首先,如果我们要抓取mysql数据,必须要有一个数据库和表,并且记住数据库和表的名称,然后把这个信息写入flume配置文件中。

  创建数据库:

  create database test

  创建一个表:

  create table fk(

id int UNSIGNED AUTO_INCREMENT,

name VARCHAR(100) NOT NULL,

PRIMARY KEY ( id )

);

  新建配置文件(重要)

  cd到flume的conf文件夹并添加一个新文件mysql-flume.conf

  [root@localhost ~]# cd /opt/install/flume

[root@localhost flume]# ls

bin conf doap_Flume.rdf lib NOTICE RELEASE-NOTES tools

CHANGELOG DEVNOTES docs LICENSE README.md status

[root@localhost flume]# cd conf

[root@localhost conf]# ls

flume-conf.properties.template log4j.properties

flume-env.ps1.template mysql-connector-java-5.1.35

flume-env.sh mysql-connector-java-5.1.35.tar.gz

flume-env.sh.template mysql-flume.conf

  注:mysql-flume.conf原本不存在,是我自己生成的,具体配置如下

  在这个文件中写入:

  a1.channels = ch-1

a1.sources = src-1

a1.sinks = k1

###########sql source#################

# For each one of the sources, the type is defined

a1.sources.src-1.type = org.keedio.flume.source.SQLSource

a1.sources.src-1.hibernate.connection.url = jdbc:mysql://youhost:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false

# Hibernate Database connection properties

a1.sources.src-1.hibernate.connection.user = root

a1.sources.src-1.hibernate.connection.password = xxxxxxxx

a1.sources.src-1.hibernate.connection.autocommit = true

a1.sources.src-1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect

a1.sources.src-1.hibernate.connection.driver_class = com.mysql.jdbc.Driver

a1.sources.src-1.run.query.delay=5000

a1.sources.src-1.status.file.path = /opt/install/flume/status

a1.sources.src-1.status.file.name = sqlSource.status

# Custom query

a1.sources.src-1.start.from = 0

a1.sources.src-1.custom.query = select `id`, `name` from fk

a1.sources.src-1.batch.size = 1000

a1.sources.src-1.max.rows = 1000

a1.sources.src-1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider

a1.sources.src-1.hibernate.c3p0.min_size=1

a1.sources.src-1.hibernate.c3p0.max_size=10

################################################################

a1.channels.ch-1.type = memory

a1.channels.ch-1.capacity = 10000

a1.channels.ch-1.transactionCapacity = 10000

a1.channels.ch-1.byteCapacityBufferPercentage = 20

a1.channels.ch-1.byteCapacity = 800000

################################################################

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k1.topic = testTopic

a1.sinks.k1.brokerList = 10.100.4.6:9092

a1.sinks.k1.requiredAcks = 1

a1.sinks.k1.batchSize = 20

  这是我的文件。一些私人信息已被其他字符串替换。写mysql-flume.conf的时候可以复制上面这段代码。下面是这段代码的详细注释,你可以用更多注释版本的代码修改你的conf文件

  # a1表示agent的名称

# source是a1的输入源

# channels是缓冲区

# sinks是a1输出目的地,本例子sinks使用了kafka

a1.channels = ch-1

a1.sources = src-1

a1.sinks = k1

###########sql source#################

# For each one of the sources, the type is defined

a1.sources.src-1.type = org.keedio.flume.source.SQLSource

# 连接mysql的一系列操作,youhost改为你虚拟机的ip地址,可以通过ifconfig或者ip addr查看

# url中要加入?useUnicode=true&characterEncoding=utf-8&useSSL=false,否则有可能连接失败

a1.sources.src-1.hibernate.connection.url = jdbc:mysql://youhost:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false

# Hibernate Database connection properties

# mysql账号,一般都是root

a1.sources.src-1.hibernate.connection.user = root

# 填入你的mysql密码

a1.sources.src-1.hibernate.connection.password = xxxxxxxx

a1.sources.src-1.hibernate.connection.autocommit = true

# mysql驱动

a1.sources.src-1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect

# 驱动版本过低会无法使用,驱动安装下文会提及

a1.sources.src-1.hibernate.connection.driver_class = com.mysql.jdbc.Driver

a1.sources.src-1.run.query.delay=5000

# 存放status文件

a1.sources.src-1.status.file.path = /opt/install/flume/status

a1.sources.src-1.status.file.name = sqlSource.status

# Custom query

a1.sources.src-1.start.from = 0

# 填写需要采集的数据表信息,你也可以使用下面的方法:

# agent.sources.sql-source.table =table_name

# agent.sources.sql-source.columns.to.select = *

a1.sources.src-1.custom.query = select `id`, `name` from fk

a1.sources.src-1.batch.size = 1000

a1.sources.src-1.max.rows = 1000

a1.sources.src-1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider

a1.sources.src-1.hibernate.c3p0.min_size=1

a1.sources.src-1.hibernate.c3p0.max_size=10

################################################################

a1.channels.ch-1.type = memory

a1.channels.ch-1.capacity = 10000

a1.channels.ch-1.transactionCapacity = 10000

a1.channels.ch-1.byteCapacityBufferPercentage = 20

a1.channels.ch-1.byteCapacity = 800000

################################################################

# 使用kafka

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

# 这个项目中你创建的或使用的topic名字

a1.sinks.k1.topic = testTopic

# kafka集群,broker列表,由于我没有使用集群所以只有一个

# 如果你搭建了集群,代码如下:agent.sinks.k1.brokerList = kafka-node1:9092,kafka-node2:9092,kafka-node3:9092

a1.sinks.k1.brokerList = 10.100.4.6:9092

a1.sinks.k1.requiredAcks = 1

a1.sinks.k1.batchSize = 20

  将mysql驱动添加到flume的lib目录下

  wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.35.tar.gz

tar xzf mysql-connector-java-5.1.35.tar.gz

cp mysql-connector-java-5.1.35-bin.jar /你flume的位置/lib/

  启动动物园管理员

  在启动kafka之前启动zookeeper

  cd到zookeeper的bin目录

  启动:

  ./zkServer.sh start

  等待运行

  ./zkCli.sh

  启动卡夫卡

  在xshell中打开一个新窗口,cd到kafka目录,启动kafka

  bin/kafka-server-start.sh config/server.properties &

  创建一个新主题

  bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testTopic

  注1:testTopic是你使用的主题名称,对应上面mysql-flume.conf中的内容。

  注2:可以使用bin/kafka-topics.sh --list --zookeeper localhost:2181 查看创建的topic。

  启动水槽

  在xshell中打开一个新窗口,cd到flume目录,启动flume

  bin/flume-ng agent -n a1 -c conf -f conf/mysql-flume.conf -Dflume.root.logger=INFO,console

  等待他运行,同时我们可以打开一个新窗口连接数据库,使用我们新创建的test数据库和fk表。

  实时采集数据

  Flume会实时将采集数据发送到kafka,我们可以启动一个kafak消费监控,用于查看mysql的实时数据

  bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testTopic --from-beginning

  这时候就可以查看数据了,kafka会打印mysql中的数据

  然后我们改变数据库中的一条数据,新读取的数据也会改变

  前:

  

  后:

  

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线