汇总:【盘点】七个常用的网页数据抽取工具

优采云 发布时间: 2022-11-28 06:43

  汇总:【盘点】七个常用的网页数据抽取工具

  作为大数据从业者和研究者,我们经常需要从网页中获取数据。如果不想自己写爬虫程序,可以借助一些专业的网页数据提取工具来达到这个目的。接下来小编就为大家盘点七款常用的网页数据提取工具。

  1.导入.io

  本工具是一款不需要客户端的爬虫工具。所有工作都可以在浏览器中进行。操作方便简单。爬取数据后,可以在可视化界面进行筛选。

  2.解析中心

  本工具需要下载客户端才能运行。打开后,该工具类似于浏览器。输入 URL 后,可以提取数据。它支持 Windows、MacOS 和 Linux 操作系统。

  

" />

  3.网络抓取工具

  本工具是基于Chrome浏览器的插件,可直接通过谷歌应用商店免费获取并安装。可以轻松抓取静态网页,也可以用js动态加载网页。

  如果想详细了解这个工具的使用方法,可以参考下面的教程:关于webscraper的问题,这个就够了

  4. 80条腿

  这个工具的背后是一个由 50,000 台计算机组成的 Plura 网格。功能强大,但更多的是面向企业级客户。商业用途明显,监控能力强,价格相对昂贵。

  5. 优采云

采集

  

" />

  该工具是目前国内最成熟的网页数据采集工具。需要下载客户端,可以在客户端进行可视化数据抓取。该工具还有国际版的 Octoparse 软件。根据采集能力,该工具分为免费版、专业版、旗舰版、私有云、企业定制版五个版本。支付。

  6.做数字

  这是一款针对起步晚但爬取效率高的企业的基于Web的云爬取工具,无需额外下载客户端。

  7. 优采云

采集器

  这是中国老牌的采集器

公司。很早就商业化了,但是学习成本比较高,规则制定也比较复杂。收费方式为软件收费,旗舰版售价1000元左右,付款后无上限。

  汇总:浅析数据采集工具Flume

  标题:水槽系列

  第一章 Flume 基础理论 1.1 数据采集工具的背景

  Hadoop业务的大致整体开发流程:

  任何一个完整的大数据平台一般都包括以下基本处理流程:

  数据采集

数据 ETL

数据存储

数据计算/分析

数据展现

  其中,数据采集是所有数据系统不可或缺的。随着大数据越来越受到关注,数据采集的挑战变得尤为突出。这包括:

  数据源多种多样

数据量大,变化快

如何保证数据采集的可靠性的性能

如何避免重复数据

如何保证数据的质量

  今天我们就来看看目前市面上的一些数据采集产品,重点关注它们是如何实现高可靠性、高性能和高扩展性的。

  总结:

  数据来源一般包括:

  1、业务数据

2、爬取的网络公开数据

3、购买数据

4、自行采集日志数据

  1.1 Flume简介

  Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.

  Flume是一个分布式、可靠、高可用的海量日志聚合系统,支持自定义系统中的各种数据发送方来采集

数据。同时,Flume提供了对数据进行简单处理和写入各种数据接收方的能力。

  1、Apache Flume是一个分布式、可靠、高可用的海量日志采集

、聚合、传输系统。与Sqoop属于同一个数据采集系统组件,只不过Sqoop是用来采集关系型数据库数据,而Flume是用来采集流式数据。

  2. Flume的名字来源于最初的近实时日志数据采集

工具,现在被广泛用于任何流式事件数据的采集

。它支持将来自许多数据源的数据聚合到HDFS。

  3、一般的采集需求,通过flume的简单配置即可实现。Flume对于特殊场景也有很好的自定义扩展能力,所以Flume可以适用于大部分日常的数据采集场景。

  4、Flume最初由Cloudera开发,2011年贡献给Apache基金会,2012年成为Apache的顶级项目。Flume OG(Original Generation)是Flume的原创

版本,后来升级为Flume NG(Next/新一代)。

  5、Flume的优点:水平可扩展性、可扩展性、可靠性。

  1.2 水槽版本

  Flume 在 0.9.x 和 1.x 之间有重大的架构调整:

  在 1.x 版本后重命名为 Flume NG

  0.9.x版本叫做Flume OG,最后一个版本是0.94,之后被Apache重构

  N是新的,O是旧的

  Flume1.7版本要求:

  Flume OG Old/Original Generation

Flume NG New/Next Generation

  注意以上是flume1.7的要求,其他版本的要求可能不同!!

  本文使用版本链接:

  官网链接:

  Flume1.9版本要求:

  系统要求

  Java Runtime Environment - Java 1.8 or later

Memory - Sufficient memory for configurations used by sources, channels or sinks

Disk Space - Sufficient disk space for configurations used by channels or sinks

Directory Permissions - Read/Write permissions for directories used by agent

  第二章 Flume架构/核心组件

  agent:能独立执行一个数据收集任务的JVM进程

source : agent中的一个用来跟数据源对接的服务

channel : agent内部的一个中转组件

sink : agent中的一个用来跟数据目的地对接的服务

event: 消息流转的一个载体/对象

header body

常见source的类型

Avro source :接收网络端口中的数据

exec source: *敏*感*词*文件新增内容 tail -f

spooldir source :监控文件夹的,如果这个文件夹里面的文件发送了变化,就可以采集

Taildir source: 多目录多文件实时监控

常见的channel的类型

memory : 内存中 , 快 , 但不安全

file : 相对来说安全些,但是效率低些

jdbc: 使用数据库进行数据的保存

常见的sink的类型

logger 做测试使用

HDFS 离线数据的sink 一般

Kafka 流式数据的sink

以上仅仅是常见的一些,官网中有完整的。

  2.1 简介

  Flume的数据流是由事件贯穿的。Event是Flume的基本数据单元。它携带日志数据(以字节数组的形式)并携带头信息。这些事件由代理外部的源生成。当Source捕获到事件后,会进行特定的格式化,然后Source将事件Push到(单个或多个)Channel中。您可以将 Channel 视为一个缓冲区,用于保存事件,直到 Sink 完成对事件的处理。Sink 负责持久化日志或将事件推送到另一个 Source。

  Flume以agent为最小的独立运行单元

  一个代理就是一个JVM

  单个代理由三个组件组成:Source、Sink和Channel。

  如下官网图片

  解释:

  2.2 Flume的三大核心组件

  事件

  Event是Flume数据传输的基本单位。

  Flume 以事件的形式将数据从源传输到最终目的地。

  事件由可选的标头和收录

数据的字节数组组成。

  加载的数据对 Flume 是不透明的。

  Header 是一个收录

键值字符串对的无序集合,key 在集合内是唯一的。

  可以使用上下文路由来扩展标头。

  客户

  客户端是一个将原创

日志包装成事件并将它们发送给一个或多个代理的实体

  目的是将Flume与数据源系统解耦

  在 Flume 的拓扑中不需要

  代理人

  一个Agent收录

source、channel、sink等组件。

  它利用这些组件将事件从一个节点传输到另一个节点或传输到最终目的地。

  代理是 Flume 流的基础部分。

  Flume 为这些组件提供配置、生命周期管理和监控支持。

  代理来源

  Source负责接收事件或通过特殊机制产生事件,将事件批处理成一个或多个

  收录

两种类型的事件驱动和轮询

  不同类型的来源

  与系统集成的源:Syslog、Netcat、监控目录池

  自动生成事件的来源:Exec

  Agent与Agent之间通信的IPC源:avro、thrift

  来源必须与至少一个频道相关联

  代理商渠道

  Channel位于Source和Sink之间,用于缓存传入的事件

  当 sink 成功将事件发送到下一个通道或最终目的地时,事件从通道中删除

  不同的渠道提供不同程度的持久性

  内存通道:volatile(不稳定)

  文件通道:基于WAL(Write-Ahead Logging)实现

  JDBC Channel:基于嵌入式数据库实现

  Channel支持交易,提供较弱的订单保障

  可以使用任意数量的源和*敏*感*词*

  代理的水槽

  Sink负责将事件传递到下一层或最终目的地,成功后从通道中移除事件

  不同类型的*敏*感*词*,例如 HDFS、HBase

  2.3 Flume经典部署方案

  1.单Agent采集数据

  代理负责从Web服务器采集

数据到HDFS。

  2. Multi-Agent串联

  在采集数据的过程中,可以将多个agent串联起来,组成一条事件数据线进行传输,但需要注意的是,相邻两个agent的前一个agent的sink类型必须与本次的source类型相同后者代理一致。

  3.合并连接多个Agent

  多个agent串并联,构成一个复杂的数据采集架构。体现了flume的灵活部署。并且对于关键节点,也可以进行高可用配置。

  4.复用

  一个数据流可以被复制成多个数据流,交给多个不同的组件处理。一般用于计算,同时永久存储。

  第三章Flume安装与案例 3.1 安装与部署 3.1.1 Flume1.7 安装与部署

  1、将apache-flume-1.7.0-bin.tar.gz上传到hadoop0的/software目录下,并解压

  [root@hadoop0 software]# tar -zxvf apache-flume-1.7.0-bin.tar.gz

  2.重命名为flume

  [root@hadoop0 software]# mv apache-flume-1.7.0-bin flume

  3.修改flume-env.sh文件

  [root@hadoop0 conf]# mv flume-env.sh.template flume-env.sh

  然后vim flume-env.sh,修改jdk路径

  export JAVA_HOME=/software/jdk

  3.1.2 Flume1.9安装部署

  1、将apache-flume-1.9.0-bin.tar.gz上传到hadoop10的/software目录下,并解压

  [root@hadoop10 software]# tar -zxvf apache-flume-1.9.0-bin.tar.gz

  2.重命名为flume

  [root@hadoop10 software]# mv apache-flume-1.9.0-bin flume

  3.修改flume-env.sh文件

  [root@hadoop10 conf]# mv flume-env.sh.template flume-env.sh

  然后vim flume-env.sh,修改jdk路径

  export JAVA_HOME=/software/jdk

  4.看Flume版本

  [root@hadoop10 bin]# flume-ng version

Flume 1.9.0

Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git

Revision: d4fcab4f501d41597bc616921329a4339f73585e

Compiled by fszabo on Mon Dec 17 20:45:25 CET 2018

From source with checksum 35db629a3bda49d23e9b3690c80737f9

[root@hadoop10 bin]# pwd

/software/flume/bin

[root@hadoop10 bin]#

  3.2 案例 3.2.1 监控端口数据(官方案例)

  1、在flume的目录下面创建文件夹

[root@hadoop0 flume]# mkdir job

[root@hadoop0 flume]# cd job

2、定义配置文件telnet-logger.conf

[root@hadoop0 job]# vim telnet-logger.conf

添加内容如下:

# example.conf: A single-node Flume configuration

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

<p>

" />

# Describe/configure the source

a1.sources.r1.type = netcat

a1.sources.r1.bind = localhost

a1.sources.r1.port = 44444

# Describe the sink

a1.sinks.k1.type = logger

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

3、先开启flume*敏*感*词*端口

退到flume目录

官方样例:bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console

实际操作:

bin/flume-ng agent --conf conf/ --name a1 --conf-file job/telnet-logger.conf -Dflume.root.logger=INFO,console

4、执行telnet localhost 44444

telnet localhost 44444

会先报找不到telnet

[root@hadoop10 flume]# telnet localhost 44444

bash: telnet: command not found...

[root@hadoop10 flume]#

然后执行yum -y install telnet

5、发送命令测试即可

</p>

  以上配置telnet-logger.conf文件内容说明:

  # example.conf: A single-node Flume configuration

# Name the components on this agent #a1: 表示的是agent的名字

a1.sources = r1 #r1 : 表示的是a1的输入源

a1.sinks = k1 #k1 : 表示的a1的输出目的地

a1.channels = c1 #c1 : 表示的a1的缓冲区

# Describe/configure the source #配置source

a1.sources.r1.type = netcat #表示a1的输入源r1的类型是netcat类型

a1.sources.r1.bind = localhost #表示a1*敏*感*词*的主机

a1.sources.r1.port = 44444 #表示a1*敏*感*词*的端口号

# Describe the sink #描述sink

a1.sinks.k1.type = logger #表示a1的输入目的地k1的类型是logger

# Use a channel which buffers events in memory

a1.channels.c1.type = memory #表示a1的channel的类型是memory类型

a1.channels.c1.capacity = 1000 #表示a1的channel总容量1000个event

a1.channels.c1.transactionCapacity = 100 #表示a1的channel传输的时候收集到了100个event以后再去提交事务

# Bind the source and sink to the channel

a1.sources.r1.channels = c1 #表示将r1和c1 连接起来

a1.sinks.k1.channel = c1 #表示将k1和c1 连接起来

3、先开启flume*敏*感*词*端口

退到flume目录

官方样例:bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console

实际操作:bin/flume-ng agent --conf conf/ --name a1 --conf-file job/telnet-logger.conf -Dflume.root.logger=INFO,console

bin/flume-ng agent --conf conf/ --name a1 --conf-file job/telnet-logger2.conf -Dflume.root.logger=INFO,console

参数说明:

--conf conf : 表示配置文件在conf目录

--name a1 : 表示给agent起名为a1

--conf-file job/telnet-logger.conf : flume本次启动所要读取的配置文件在job文件夹下面的telnet-logger.conf文件

-Dflume.root.logger=INFO,console : -D 表示flume运行时候的动态修改flume.root.logger参数值,并将日志打印到控制台,级别是INFO级别。

日志级别: log、info、warn、error

  3.2.2 监控目录下的文件到HDFS

  1、创建配置文件dir-hdfs.conf

在job目录下面 vim dir-hdfs.conf

添加下面的内容:

a3.sources = r3

a3.sinks = k3

a3.channels = c3

# Describe/configure the source

a3.sources.r3.type = spooldir

a3.sources.r3.spoolDir = /software/flume/upload

a3.sources.r3.fileSuffix = .COMPLETED

a3.sources.r3.fileHeader = true

a3.sources.r3.ignorePattern = ([^ ]*\.tmp)

# Describe the sink

a3.sinks.k3.type = hdfs

a3.sinks.k3.hdfs.path = hdfs://hadoop10:8020/flume/upload/%Y%m%d/%H

a3.sinks.k3.hdfs.filePrefix = upload-

a3.sinks.k3.hdfs.round = true

a3.sinks.k3.hdfs.roundValue = 1

a3.sinks.k3.hdfs.roundUnit = hour

a3.sinks.k3.hdfs.useLocalTimeStamp = true

a3.sinks.k3.hdfs.batchSize = 100

a3.sinks.k3.hdfs.fileType = DataStream

a3.sinks.k3.hdfs.rollInterval = 600

a3.sinks.k3.hdfs.rollSize = 134217700

a3.sinks.k3.hdfs.rollCount = 0

a3.sinks.k3.hdfs.minBlockReplicas = 1

# Use a channel which buffers events in memory

a3.channels.c3.type = memory

a3.channels.c3.capacity = 1000

a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel

a3.sources.r3.channels = c3

a3.sinks.k3.channel = c3

2、启动监控目录命令

bin/flume-ng agent --conf conf/ --name a3 --conf-file job/dir-hdfs.conf

  以上配置dir-hdfs.conf文件内容说明:

  1、创建配置文件dir-hdfs.conf

在job目录下面 vim dir-hdfs.conf

添加下面的内容:

a3.sources = r3 #定义source为r3

a3.sinks = k3 #定义sink为k3

a3.channels = c3 #定义channel为c3

# Describe/configure the source #配置source相关的信息

a3.sources.r3.type = spooldir #定义source的类型是spooldir类型

a3.sources.r3.spoolDir = /software/flume/upload #定义监控的具体的目录

a3.sources.r3.fileSuffix = .COMPLETED #文件上传完了之后的后缀

a3.sources.r3.fileHeader = true #是否有文件头

a3.sources.r3.ignorePattern = ([^ ]*\.tmp) #忽略以tmp结尾的文件,不进行上传

# Describe the sink #配置sink相关的信息

a3.sinks.k3.type = hdfs #定义sink的类型是hdfs

a3.sinks.k3.hdfs.path = hdfs://hadoop10:8020/flume/upload/%Y%m%d/%H #文件上传到hdfs的具体的目录

a3.sinks.k3.hdfs.filePrefix = upload- #文件上传到hdfs之后的前缀

a3.sinks.k3.hdfs.round = true #是否按照时间滚动生成文件

a3.sinks.k3.hdfs.roundValue = 1 #多长时间单位创建一个新的文件

a3.sinks.k3.hdfs.roundUnit = hour #时间单位

a3.sinks.k3.hdfs.useLocalTimeStamp = true #是否使用本地时间

a3.sinks.k3.hdfs.batchSize = 100 #积累多少个event才刷写到hdfs一次

a3.sinks.k3.hdfs.fileType = DataStream #文件类型

a3.sinks.k3.hdfs.rollInterval = 600 #多久生成新文件

a3.sinks.k3.hdfs.rollSize = 134217700 #多大生成新文件

a3.sinks.k3.hdfs.rollCount = 0 #多少event生成新文件

a3.sinks.k3.hdfs.minBlockReplicas = 1 #副本数

# Use a channel which buffers events in memory

a3.channels.c3.type = memory

a3.channels.c3.capacity = 1000

a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel

a3.sources.r3.channels = c3

a3.sinks.k3.channel = c3

2、启动监控目录命令

bin/flume-ng agent --conf conf/ --name a3 --conf-file job/dir-hdfs.conf

  在执行上面命令的过程中遇到了一个小问题:

  ......

SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.

SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

Exception in thread "SinkRunner-PollingRunner-DefaultSinkProcessor" java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V

at org.apache.hadoop.conf.Configuration.set(Configuration.java:1357)

at org.apache.hadoop.conf.Configuration.set(Configuration.java:1338)

at org.apache.hadoop.conf.Configuration.setBoolean(Configuration.java:1679)

at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:221)

at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:572)

at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:412)

at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)

at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)

at java.lang.Thread.run(Thread.java:748)

  解决方法:删除lib文件夹下的guava-11.0.2.jar,以兼容Hadoop版本。可以通过重命名将其注释掉(达到删除的效果)。

  [root@hadoop10 lib]# mv guava-11.0.2.jar guava-11.0.2.jar.backup

  3.2.3 监控文件到HDFS

  1、创建一个自动化文件

[root@hadoop0 job]# vim mydateauto.sh

写入:

#!/bin/bash

while true

do

echo `date`

sleep 1

done

然后运行测试:

[root@hadoop0 job]# sh mydateauto.sh

Wed Aug 19 18:34:19 CST 2020

Wed Aug 19 18:34:20 CST 2020

<p>

" />

然后修改配置,将输出的日志追加到某个文件中

#!/bin/bash

while true

do

echo `date` >> /software/flume/mydate.txt

sleep 1

done

再次执行[root@hadoop0 job]# sh mydateauto.sh

就会在flume的文件夹下面生成了mydate.txt文件

通过tail -f mydate.txt 查看

再次执行sh mydateauto.sh 查看输出。

2、创建配置vim file-hdfs.conf

# Name the components on this agent

a2.sources = r2

a2.sinks = k2

a2.channels = c2

# Describe/configure the source

a2.sources.r2.type = exec

a2.sources.r2.command = tail -F /software/flume/mydate.txt

a2.sources.r2.shell = /bin/bash -c

# Describe the sink

a2.sinks.k2.type = hdfs

a2.sinks.k2.hdfs.path = hdfs://hadoop10:8020/flume/%Y%m%d/%H

a2.sinks.k2.hdfs.filePrefix = logs-

a2.sinks.k2.hdfs.round = true

a2.sinks.k2.hdfs.roundValue = 1

a2.sinks.k2.hdfs.roundUnit = hour

a2.sinks.k2.hdfs.useLocalTimeStamp = true

a2.sinks.k2.hdfs.batchSize = 1000

a2.sinks.k2.hdfs.fileType = DataStream

a2.sinks.k2.hdfs.rollInterval = 600

a2.sinks.k2.hdfs.rollSize = 134217700

a2.sinks.k2.hdfs.rollCount = 0

a2.sinks.k2.hdfs.minBlockReplicas = 1

# Use a channel which buffers events in memory

a2.channels.c2.type = memory

a2.channels.c2.capacity = 1000

a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel

a2.sources.r2.channels = c2

a2.sinks.k2.channel = c2

3、启动

bin/flume-ng agent --conf conf/ --name a2 --conf-file job/file-hdfs.conf

</p>

  上述配置文件-hdfs.conf文件内容说明:

  # Name the components on this agent

a2.sources = r2 #定义source为r2

a2.sinks = k2 #定义sink为k2

a2.channels = c2 #定义channel为c2

# Describe/configure the source

a2.sources.r2.type = exec #定义source的类型是exec 可执行命令

a2.sources.r2.command = tail -F /software/flume/mydate.txt #具体文件位置

a2.sources.r2.shell = /bin/bash -c #命令开头

# Describe the sink #sink相关配置

a2.sinks.k2.type = hdfs #定义sink的类型是hdfs

a2.sinks.k2.hdfs.path = hdfs://hadoop10:8020/flume/%Y%m%d/%H #具体的位置

a2.sinks.k2.hdfs.filePrefix = logs-

a2.sinks.k2.hdfs.round = true

a2.sinks.k2.hdfs.roundValue = 1

a2.sinks.k2.hdfs.roundUnit = hour

a2.sinks.k2.hdfs.useLocalTimeStamp = true

a2.sinks.k2.hdfs.batchSize = 100

a2.sinks.k2.hdfs.fileType = DataStream

a2.sinks.k2.hdfs.rollInterval = 600 #单位是秒!!

a2.sinks.k2.hdfs.rollSize = 134217700

a2.sinks.k2.hdfs.rollCount = 0

a2.sinks.k2.hdfs.minBlockReplicas = 1

# Use a channel which buffers events in memory

a2.channels.c2.type = memory

a2.channels.c2.capacity = 1000

a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel

a2.sources.r2.channels = c2

a2.sinks.k2.channel = c2

3、启动

bin/flume-ng agent --conf conf/ --name a2 --conf-file job/file-hdfs.conf

  过程中遇到的一个小问题:

  18 Oct 2021 14:32:24,340 INFO [conf-file-poller-0] (org.apache.flume.sink.DefaultSinkFactory.create:42) - Creating instance of sink: k2, type: hdfs

18 Oct 2021 14:32:24,348 ERROR [conf-file-poller-0] (org.apache.flume.node.AbstractConfigurationProvider.loadSinks:469) - Sink k2 has been removed due to an error during configuration

java.lang.InstantiationException: Incompatible sink and channel settings defined. sink&#39;s batch size is greater than the channels transaction capacity. Sink: k2, batch size = 1000, channel c2, transaction capacity = 100

at org.apache.flume.node.AbstractConfigurationProvider.checkSinkChannelCompatibility(AbstractConfigurationProvider.java:403)

at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:462)

at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:106)

at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:145)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)

at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)

at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

  解决方案:

  问题原因:原因其实很明了了,就是字面的意思,channel 与 sink的设置不匹配,sink的batch size大于channel的transaction capacity

解决方案:将a2.sinks.k2.hdfs.batchSize设置为小于等于100 。 或者注释掉也可以。

  3.2.4 多目录多文件实时监控(Taildir源码)

  与之前使用的 Source 的比较

  Spooldir Source 用于同步新文件,但不适合对实时追加日志的文件进行*敏*感*词*并同步。

Exec source 用于监控一个实时追加的文件,不能实现断点续传;

Taildir Source 用于*敏*感*词*多个实时追加的文件,并且能够实现断点续传。

  操作案例:

  1、在job下面创建 vim taildir-hdfs.conf

a3.sources = r3

a3.sinks = k3

a3.channels = c3

# Describe/configure the source

a3.sources.r3.type = TAILDIR

a3.sources.r3.positionFile = /software/flume/taildir.json

a3.sources.r3.filegroups = f1 f2

a3.sources.r3.filegroups.f1 = /software/flume/taildirtest/filedir/.*file.*

a3.sources.r3.filegroups.f2 = /software/flume/taildirtest/logdir/.*log.*

# Describe the sink

a3.sinks.k3.type = hdfs

a3.sinks.k3.hdfs.path = hdfs://hadoop10:8020/flume/uploadtaildir/%Y%m%d/%H

a3.sinks.k3.hdfs.filePrefix = upload-

a3.sinks.k3.hdfs.round = true

a3.sinks.k3.hdfs.roundValue = 1

a3.sinks.k3.hdfs.roundUnit = hour

a3.sinks.k3.hdfs.useLocalTimeStamp = true

a3.sinks.k3.hdfs.batchSize = 100

a3.sinks.k3.hdfs.fileType = DataStream

a3.sinks.k3.hdfs.rollInterval = 600

a3.sinks.k3.hdfs.rollSize = 134217700

a3.sinks.k3.hdfs.rollCount = 0

a3.sinks.k3.hdfs.minBlockReplicas = 1

# Use a channel which buffers events in memory

a3.channels.c3.type = memory

a3.channels.c3.capacity = 1000

a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel

a3.sources.r3.channels = c3

a3.sinks.k3.channel = c3

2、创建文件文件夹,注意需要在启动之前创建监控的文件夹

[root@hadoop10 flume]# mkdir taildirtest

[root@hadoop10 flume]# cd taildirtest/

[root@hadoop10 taildirtest]# ll

total 0

[root@hadoop10 taildirtest]# mkdir filedir

[root@hadoop10 taildirtest]# mkdir logdir

[root@hadoop10 taildirtest]# ll

total 0

drwxr-xr-x. 2 root root 6 Oct 18 16:44 filedir

drwxr-xr-x. 2 root root 6 Oct 18 16:45 logdir

[root@hadoop10 taildirtest]# vim file.txt

[root@hadoop10 taildirtest]# vim log.txt

[root@hadoop10 taildirtest]# ll

total 8

drwxr-xr-x. 2 root root 6 Oct 18 16:44 filedir

-rw-r--r--. 1 root root 35 Oct 18 16:45 file.txt

drwxr-xr-x. 2 root root 6 Oct 18 16:45 logdir

-rw-r--r--. 1 root root 35 Oct 18 16:46 log.txt

3、启动监控目录命令

bin/flume-ng agent --conf conf/ --name a3 --conf-file job/taildir-hdfs.conf

4、测试

[root@hadoop10 taildirtest]# cp file.txt filedir/

[root@hadoop10 taildirtest]# cp log.txt logdir/

[root@hadoop10 taildirtest]# cd filedir/

[root@hadoop10 filedir]# echo hello1 >> file.txt

[root@hadoop10 filedir]# cd ../logdir/

[root@hadoop10 logdir]# echo hello2 >> log.txt

[root@hadoop10 logdir]#

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线