实时文章采集(基于Flink实时处理实时处理海量日志需求分析(图))
优采云 发布时间: 2022-04-08 22:15实时文章采集(基于Flink实时处理实时处理海量日志需求分析(图))
第12章-Flink案例
本章将介绍 Flink 已经在多个场景中实现的*敏*感*词*案例。一是实时处理海量日志,包括日志采集、日志传输、实时日志清理与异常检测、日志存储、日志展示。介绍一下Flink在其中的作用,希望整个日志处理架构可以在自己公司灵活使用;二是在百亿级数据的情况下,如何使用Flink实时去重,本例将进行对比。其他几种常见的去重实现方案;三是Flink在监控报警系统中的实现。在这种情况下,还详细介绍了一个监控报警系统的整个环节,每一个环节都缺一不可。, 并且还介绍了Flink未来会结合机器学习算法做一些AIOps。这三个案例比较典型。如果你也在做类似的项目,希望对你的技术选型有所帮助。
12.1 基于Flink的海量日志实时处理
Section 11.5 讲解了 Flink 如何实时处理异常日志,并对比分析了几种常用的 log采集 工具。我们也知道,日志是排查在线异常必不可少的一部分。通过异常日志,我们可以快速定位问题的根源。那么企业对日志处理有哪些要求呢?
12.1.1 海量日志实时处理需求分析
现在,公司正在构建分布式、微服务和云原生架构。在这样的架构下,项目应用的日志分布在不同的机器上,使得日志查询更加困难。因此,统一的日志采集几乎是不可能的。每个公司都必须的。据笔者研究,现在很多公司都是统一采集日志,也做日志的实时ETL,使用ELK等一些主流技术来展示、搜索、分析日志,但是缺乏实时日志警报。综上所述,大部分公司对于日志的现状是:
本节作者将讲解日志的完整链接,包括日志的实时采集、日志的ETL、日志的实时监控和告警、日志的存储日志,日志的可视化图表展示和搜索分析等。
12.1.2 海量日志架构设计的实时处理
在分析了我们案例的需求之后,接下来就是对整个项目的架构进行合理的设计。
整个架构分为五层:日志访问层、日志调峰层、日志处理层、日志存储层、日志展示层。
12.1.3 实时记录采集
在11.5.1中,我对比了这些流行的log采集工具(Logstash、Filebeat、Fluentd、Logagent),从功能完整性、性能、成本、使用难度等方面综合考虑其他方面,这里的演示使用了Filebeat。
安装 Filebeat
在服务器上下载Fliebeat6.3.2安装包(请根据你的服务器和你需要的版本下载),下载后解压。
1
tar xzf filebeat-6.3.2-linux-x86_64.tar.gz
配置 Filebeat
配置 Filebeat,需要编辑 Filebeat 配置文件 filebeat.yml。不同的安装方式,配置文件的存放路径不同。解压包的安装方式,解压目录下有配置文件;对于 rpm 和 deb 方法,配置文件路径是 /etc/filebeat/filebeat.yml。
因为Filebeat要实时登录采集,所以要让Filebeat知道日志的路径在哪里。在下面的配置文件中定义日志文件的路径。通常建议在服务器上固定存放日志的路径,然后应用程序的所有日志都放在这个固定的路径下,这样Filebeat的日志路径配置只需要填写一次,和配置一样可以复制到其他机器上运行Filebeat。配置如下。
1
2
3
4
5
6
- type: log
# 配置为 true 表示开启
enabled: true
# 日志的路径
paths:
- /var/logs/*.log
以上配置意味着/var/logs目录下所有以.log结尾的文件都会是采集,然后配置日志输出方式。这里使用的是Kafka,配置如下。
1
2
3
4
5
6
7
8
output.kafka:
# 填写 Kafka 地址信息
hosts: ["localhost:9092"]
# 数据发到哪个 topic
topic: zhisheng-log
partition.round_robin:
reachable_only: false
required_acks: 1
对于上面解释的两个配置,作者将它们写在一个新创建的配置文件kafka.yml中,然后在启动Filebeat的时候使用这个配置。
1
2
3
4
5
6
7
8
9
10
11
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/logs/*.log
output.kafka:
hosts: ["localhost:9092"]
topic: zhisheng_log
partition.round_robin:
reachable_only: false
required_acks: 1
启动 Filebeat
日志路径的配置和Kafka的配置写完之后,再用如下命令启动Filebeat:
1
bin/filebeat -e -c kafka.yml
执行命令后出现的日志如下,说明启动成功,在终端上也可以看到会打印出metrics数据。
验证 Filebeat 是否正在向 Kafka 发送日志数据
然后你要检查这个时候日志数据是否真的发送到了Kafka。可以通过kafka自带的命令来消费这个topic,看数据是否持续发送。命令如下:
1
bin/kafka-console-consumer.sh --zookeeper 106.54.248.27:2181 --topic zhisheng_log --from-beginning
如果有数据,则表示数据已经发送到Kafka。如果不喜欢用这种方式验证,可以写一个 Flink Job 来读取 Kafka 中 topic 的数据。比如你写一个job,运行结果如下。日志数据已成功发送到Kafka。
发送到 Kafka 的日志结构
现在数据已经发送到Kafka,通过在Kafka中消费topic的数据,可以判断数据的格式是否为JSON。结构如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
{
"@timestamp": "2019-10-26T08:18:18.087Z",
"@metadata": {
"beat": "filebeat",
"type": "doc",
"version": "6.8.4",
"topic": "zhisheng_log"
},
"prospector": {
"type": "log"
},
"input": {
"type": "log"
},
"beat": {
"name": "VM_0_2_centos",
"hostname": "VM_0_2_centos",
"version": "6.8.4"
},
"host": {
"name": "VM_0_2_centos"
},
"source": "/var/logs/middleware/kafka.log",
"offset": 9460,
"log": {
"file": {
"path": "/var/logs/middleware/kafka.log"
}
},
"message": "2019-10-26 16:18:11 TRACE [Controller id=0] Leader imbalance ratio for broker 0 is 0.0 (kafka.controller.KafkaController)"
}
这个日志结构收录很多字段,例如时间戳、元数据、主机、来源、消息等,但其中有些字段根本不需要。您可以根据公司的需要丢弃部分字段,并配置要丢弃的字段。在 kafka.yml 中如下图所示。
1
2
3
processors:
- drop_fields:
fields: ["prospector","input","beat","log","offset","@metadata"]
然后再次启动Filebeat,发现上面配置的字段不在新数据中(@metadata除外)。另外,作者验证了不仅@metadata字段不能丢弃,而且如果在drop_fields中配置了@timestamp字段,则不Works,两者都不允许丢弃。一般来说,一行日志就足够长了。添加这么多我们不需要的字段会增加数据的大小。对于生产环境来说,日志数据量非常大,无疑会对后续的所有环节造成损害。有一定的影响,所以一定要做好底层数据源的精简。另外,发送Kafka时可以压缩数据,可以在配置文件中配置一个压缩:gzip。简化的日志数据结构如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
"@timestamp": "2019-10-26T09:23:16.848Z",
"@metadata": {
"beat": "filebeat",
"type": "doc",
"version": "6.8.4",
"topic": "zhisheng_log"
},
"host": {
"name": "VM_0_2_centos"
},
"source": "/var/logs/middleware/kafka.log",
"message": "2019-10-26 17:23:11 TRACE [Controller id=0] Leader imbalance ratio for broker 0 is 0.0 (kafka.controller.KafkaController)"
}
12.1.4 统一日志格式
因为Filebeat是机器上的采集日志,所以这些日志的种类很多,比如应用运行日志、作业构建、编译打包日志、中间件服务运行日志等。通常在公司,可以约定开发日志打印的规则,但是中间件等服务的日志是不固定的。如果 Kafka 中的消息直接存储在 ElasticSearch 中,以后想区分过滤可能会出现问题。. 为了避免这个问题,我们必须在将日志存储到 ElasticSearch 之前进行数据格式化和清理工作。因为 Flink 处理数据的速度更快,并且可以实时完成,所以我们选择在 Flink Job 中完成这项工作。
解析这个作业中的消息,这一行的日志信息一般收录很多信息,比如日志打印时间、日志级别、应用名称、唯一ID(用来关联每个请求)、请求上下文等。那么我们需要一个新的日志结构对象来统一日志的格式,定义如下:
12.1.5 实时日志清理12.1.6 实时日志报警12.1.7 实时日志存储1 2.1.8 日志实时展示12.1.9 总结与反思
加入知识星球看上面的文章:
纯粹的乐趣
扫码打赏,如你所说
打开支付宝扫一扫,即可扫码打赏