本文采集数据实时存储的相关方案

优采云 发布时间: 2021-08-27 18:07

  本文采集数据实时存储的相关方案

  注:本文不仅提供了两种解决方案,还详细记录了一些相关信息。

  方案一

  这个方案的核心是flume采集数据。根据hive表的结构,将采集数据发送到对应的地址,达到实时数据存储的目的。这种实时实际上是一种准实时。

  假设hadoop集群已经正常启动,hive也已经正常启动,hive的文件地址为/hive/warehouse,那么hive中有如下建表语句创建的表

   create table flume_test(uuid string);

  可以推断flume_test表的地址在/hive/warehouse/flume_test,下面介绍flume:

  Flume 安装步骤

  #下载

cd /opt

mkdir flume

wget http://archive.apache.org/dist/flume/1.6.0/apache-flume-1.6.0-bin.tar.gz

tar xvzf apache-flume-1.6.0-bin.tar.gz

cd apache-flume-1.6.0-bin/conf

cp flume-env.sh.template flume-env.sh

  打开flume-env文件并添加java变量

  export JAVA_HOME=/usr/java/jdk1.8.0_111

  然后添加环境变量。一次性使用,在profile和bashrc末尾添加。

  export FLUME_HOME=/opt/flume/apache-flume-1.6.0-bin

export FLUME_CONF_DIR=$FLUME_HOME/conf

export PATH=$PATH:$FLUME_HOME/bin

  然后

  source /etc/profile

  既然flume安装好了,下面进行配置,切换到conf文件夹,将flume-conf.properties.template复制为agent.conf,然后编辑

  #定义活跃列表

agent.sources=avroSrc

agent.channels=memChannel

agent.sinks=hdfsSink

#定义source

agent.sources.avroSrc.type=avro

agent.sources.avroSrc.channels=memChannel

agent.sources.avroSrc.bind=0.0.0.0

agent.sources.avroSrc.port=4353

agent.sources.avroSrc.interceptors=timestampinterceptor

agent.sources.avroSrc.interceptors.timestampinterceptor.type=timestamp

agent.sources.avroSrc.interceptors.timestampinterceptor.preserveExisting=false

#定义channel

agent.channels.memChannel.type=memory

agent.channels.memChannel.capacity = 1000

agent.channels.memChannel.transactionCapacity = 100

#定义sink

agent.sinks.hdfsSink.type=hdfs

agent.sinks.hdfsSink.channel=memChannel

#agent.sinks.hdfsSink.hdfs.path=hdfs://hadoop-n:9000/flume/test/%{topic}/%Y%m%d%H

agent.sinks.hdfsSink.hdfs.path=hdfs://hadoop-n:9000/hive/warehouse/flume_test

agent.sinks.hdfsSink.hdfs.filePrefix=stu-flume

agent.sinks.hdfsSink.hdfs.inUsePrefix=inuse-stu-flume

agent.sinks.hdfsSink.hdfs.inUseSuffix=.temp

agent.sinks.hdfsSink.hdfs.rollInterval=0

agent.sinks.hdfsSink.hdfs.rollSize=10240000

agent.sinks.hdfsSink.hdfs.rollCount=0

agent.sinks.hdfsSink.hdfs.idleTimeout=0

agent.sinks.hdfsSink.hdfs.batchSize=100

agent.sinks.hdfsSink.hdfs.minBlockReplicas=1

# agent.sinks.hdfsSink.hdfs.writeFormat = Text

agent.sinks.hdfsSink.hdfs.fileType = DataStream

  每个具体配置请参考以下博客。需要警惕的四个属性是rollInterval、rollSize、rollCount、idleTimeout。如果发现配置无效,请检查是否配置了minBlockReplicas属性。 , 并且该值是否为1,以下连接是原因

  配置好就可以启动了,启动命令

  ./flume-ng agent -f ../conf/agent.conf -n agent -c conf -Dflume.monitoring.type=http \-Dflume.monitoring.port=5653 -Dflume.root.logger=DEBUG,console

  注意:-n 是代理的名称,需要对应配置文件的第一个值。这个启动命令也开启了监控,监控地址:5563/metrics; -f 指的是配置文件的路径和名称。修改flume的conf后,不需要重启。默认情况下,它每 30 秒刷新一次并自动加载最新配置。

  flume安装启动后,编写测试程序。打开eclipse并创建一个maven项目

  

4.0.0

scc

stu-flume

0.0.1-SNAPSHOT

war

stu-flume

log4j

log4j

1.2.9

org.apache.flume.flume-ng-clients

flume-ng-log4jappender

1.6.0

  测试 servlet

  public class GenerLogServlet extends HttpServlet {

private static final Logger LOGGER = Logger.getLogger(GenerLogServlet.class);

private static final long serialVersionUID = 1L;

@Override

protected void doGet(HttpServletRequest request, HttpServletResponse response)

throws ServletException, IOException {

for (;;) {

LOGGER.info(UUID.randomUUID().toString());

try {

Thread.sleep(100);

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

}

@Override

protected void doPost(HttpServletRequest request, HttpServletResponse response)

throws ServletException, IOException {

this.doGet(request, response);

}

}

  log4j.properties

  #log4j settings

#log4j.rootLogger=debug, CONSOLE

log4j.logger.scc.stu_flume.GenerLogServlet=debug,GenerLogServlet

#log4j.rootLogger=INFO

log4j.appender.GenerLogServlet=org.apache.flume.clients.log4jappender.Log4jAppender

log4j.appender.GenerLogServlet.Hostname=10.5.3.100

log4j.appender.GenerLogServlet.Port=4353

log4j.appender.GenerLogServletUnsafeMode=false

  启动项目,访问:8080/log开始生产数据。需要注意的是,如果flume配置是根据时间戳对文件进行分组(此时hive可以根据时间进行分区),那么agent.conf中的source必须配置

  agent.sources.avroSrc.interceptors=timestampinterceptor

agent.sources.avroSrc.interceptors.timestampinterceptor.type=timestamp

agent.sources.avroSrc.interceptors.timestampinterceptor.preserveExisting=false

  否则flume sink会报timestamp not found错误,因为源码org.apache.flume.clients.log4jappender.Log4jAvroHeaders中定义的timestamp的key是flume.client.log4j.timestamp而不是timestamp,所以需要手动添加一个时间戳,如果这个时间戳的需求必须是数据产生的时间,可以修改源码或者为源码添加*敏*感*词*手动配置。

  Flume 有非常灵活的使用方式,可以自定义source、sink、interceptor、channel selector等,适应大部分采集、数据缓冲等场景。

  观察hadoop目录,发现flume已经按照配置把数据移动到了对应的hive表目录下,如下图:

  打开hive客户端,使用数据查询命令,发现可以查询到数据!并且可以根据hive表的数据规则写入hive的分区表和桶表flume,然后实时插入数据。至此,解决方案结束。

  这个程序的缺点:

  因为flume在写文件的时候独占了正在写的文件资源,所以hive无法读取正在写的文件的内容,也就是说,如果每5分钟生成一个文件,那么正在写的文件的内容就不会被hive读取,这意味着hive最多有5分钟的延迟。而且如果减少时间,延迟也会减少,但即使设置为30分钟或1小时,在flume流量不大的情况下,也会产生很多零散的小文件。这与hive的特长背道而驰,hive擅长处理大文件,对于分散的小文件,hive性能会下降很多。

  计划二

  对比方案一,测试程序和source不变,sink改为hbase-sink,数据实时插入hbase,然后在hive中创建hbase映射表,hive从hbase读取数据,可以实现实时插入的效果。由于字数限制,计划2记录在以下博客链接中:

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线