汇总:离线数仓(四)1、网站流量日志数据采集Flume采集
优采云 发布时间: 2022-10-16 12:09汇总:离线数仓(四)1、网站流量日志数据采集Flume采集
网站流量日志数据采集Flume采集
在网站流量日志分析场景中,对数据采集部分的可靠性和容错性要求通常不是很严格,需要注意对数据采集进行上下文分析>:
需要为数据移动的操作
1. Flume 版本选择
nginx日志生成场景
Flume采集系统搭建
在服务器上部署代理节点并修改配置文件
启动代理节点,将采集中的数据聚合到指定的HDFS目录中
2.Flume核心配置(一)tailDirSource配置
核心配置如下
a1.sources = r1
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /var/log/flume/taildir_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /var/log/test1/example.log
a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*
配置说明
位置文件
文件组。
通过以上配置,可以监控文件内容的增加和文件的增加。与配置的文件名正则表达式不匹配的文件不会被拖尾。
(2)*敏*感*词*配置
这一次,要将日志 采集 传输到 HDFS,您需要使用 HDFSSink 文件。HDFSSink 需要配置滚动属性。
示例说明:
如果hdfs的replica为3,配置的滚动时间为10秒,那么第二秒flume检测到hdfs正在复制blocks,此时flume会滚动,会影响flume的滚动方式。所以通常hdfs.minBlockReplicas配置为1,就检测不到replica的replication。但是hdfs的实际副本还是3
(3) 完整版配置文件
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /var/log/flume/taildir_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /var/log/test1/example.log
a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*
# Describe the sink
#指定hdfs sink
a1.sinks.k1.type = hdfs
#hdfs目录,带有时间信息
a1.sinks.k1.hdfs.path = /flume/tailout/%Y-%m-%d/
#生成的hdfs文件名的前缀
a1.sinks.k1.hdfs.filePrefix = events-
#指定滚动时间,默认是30秒,设置为0表示禁用该策略
a1.sinks.k1.hdfs.rollInterval = 0
#指定滚动大小,设置为0表示禁用该策略
a1.sinks.k1.hdfs.rollSize = 200000000
#指定滚动条数
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.batchSize = 100
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#副本策略
a1.sinks.k1.hdfs.minBlockReplicas=1
#生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
a1.sinks.k1.hdfs.fileType = DataStream
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
(4) 启动flume agent采集数据
创建目录
mkdir -p /var/log/test1/
mkdir -p /var/log/test2/
将测试数据上传到上面创建的目录!!
水槽启动命令
bin/flume-ng agent --conf-file job/log2hdfs.conf -name a1 -Dflume.root.logger=INFO,console
<p>
</p>
(5) hdfs 路径是否正确?
问题
根据上面flume代理的配置文件,会出现数据存储的路径信息不正确,需要按照日志时间存储的情况。
3. Flume 自定义*敏*感*词*
实施步骤
创建maven项目
创建一个新类来实现flume提供的Interceptor接口
打个jar包,上传到flume安装目录下的lib文件夹
开发flume代理配置文件参考header中的日期信息
执行
创建一个maven java项目并导入jar包
org.apache.flume
flume-ng-core
1.8.0
provided
org.apache.maven.plugins
maven-compiler-plugin
3.0
1.8
1.8
UTF-8
自定义水槽*敏*感*词*
package com.yyds.interceptor;
import org.apache.commons.compress.utils.Charsets;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class CustomerInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
//获得body的内容
String eventBody = new String(event.getBody(), Charsets.UTF_8);
final String[] bodyArr = eventBody.split(" ");
String time_local = "";
if (bodyArr.length > 11) {
time_local = bodyArr[4] ;
}
final Map headers = event.getHeaders();
// 添加时间信息 到event的header
if (StringUtils.isNotBlank(time_local)) {
headers.put("event_time", time_local);
} else {
headers.put("event_time", "unkown");
}
event.setHeaders(headers);
return event;
}
@Override
public List intercept(List events) {
<p>
List out = new ArrayList();
for (Event event : events) {
Event outEvent = intercept(event);
if (outEvent != null) {
out.add(outEvent);
}
}
return out;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new CustomerInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
</p>
包上传服务器
把我们的*敏*感*词*打成一个jar包,放到flume的lib目录下
开发flume配置文件
开发flume配置文件
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /var/log/flume/taildir_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /var/log/test1/example.log
a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*
#interceptor
a1.sources.r1.interceptors =i1
a1.sources.r1.interceptors.i1.type =com.yyds.interceptor.CustomTimeInterceptor$Builder
# Describe the sink
#指定hdfs sink
a1.sinks.k1.type = hdfs
#hdfs目录,带有时间信息
a1.sinks.k1.hdfs.path = /flume/tailout/%{event_time}/
#生成的hdfs文件名的前缀
a1.sinks.k1.hdfs.filePrefix = events-
#指定滚动时间,默认是30秒,设置为0表示禁用该策略
a1.sinks.k1.hdfs.rollInterval = 0
#指定滚动大小,设置为0表示禁用该策略
a1.sinks.k1.hdfs.rollSize = 200000000
#指定滚动条数
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.batchSize = 100
#副本策略
a1.sinks.k1.hdfs.minBlockReplicas=1
#生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
a1.sinks.k1.hdfs.fileType = DataStream
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
汇总:采集站如何做SEO排名?
采集站SEO对于很多小伙伴来说可能是个大问题,但也有很多小伙伴想把工作做好,我觉得现在做SEO很多站长估计都不会写自己的原创文章,大多数人都是各种抄袭各种伪原创,会采集直接采集。但是搜索引擎几乎无法容忍垃圾采集网站,找到一个就会淘汰一个。但是我们也可以发现,还有一些采集站一直排名比较好,怎么回事,下面陈阳SEO谈谈采集站的优化技巧。
采集如何进行SEO排名
1、一定要清楚知道,新网站(域名new)直接开始做采集,起床的概率基本上是0,因为新网站对搜索引擎没有信任,没有一段时间的磨合,上来做采集,百度等搜索引擎会认为这是垃圾网站,不回馈排名和权重, 甚至收录。
2、所以采集站必须网站已经有一定的基础,经常是蜘蛛爬,在搜索引擎上有一定的信任度和排名,这类站通常是一个比较老网站,也就是老网站。这种网站确实采集更有效。
3、由于现在各大搜索引擎都非常重视网站内容,所以网站想要排名好,更新文章的质量一定要高,原创程度要高,但是采集站一定不能达到原创,但质量我们可以控制,采集网站采集内容必须是优质内容,垃圾内容不采集。
4、直接采集后进入网站在搜索引擎去处理后的重量也很低,所以我们需要做伪原创处理,市场上一般都使用伪原创插件。
5、采集站必须注重文章的质量,这种质量 文章体现在排版好、错别字少、图片清晰、字体合理、句子流畅、文章文章等具有现实意义。
6.当一个采集站很好地处理文章的布局,文章,伪原创质量时,这个网站的排名和权重不会差。
7、由于飓风算法的冲击,采集网站必须注意不能跨领域采集,每个分类只能采集与分类主题相关的内容,同时注重资源的整合,采集更有价值的内容,比如时效性新闻,这件事情搜索引擎需要缺乏的内容,这样才能得到更好的收录和排名。
8、采集站主要是解决内容问题,当然其他优化细节和普通站应该做的优化来做优化。