实时文章采集(基于Flink实时处理实时处理海量日志需求分析(图))

优采云 发布时间: 2022-04-08 22:15

  实时文章采集(基于Flink实时处理实时处理海量日志需求分析(图))

  第12章-Flink案例

  本章将介绍 Flink 已经在多个场景中实现的*敏*感*词*案例。一是实时处理海量日志,包括日志采集、日志传输、实时日志清理与异常检测、日志存储、日志展示。介绍一下Flink在其中的作用,希望整个日志处理架构可以在自己公司灵活使用;二是在百亿级数据的情况下,如何使用Flink实时去重,本例将进行对比。其他几种常见的去重实现方案;三是Flink在监控报警系统中的实现。在这种情况下,还详细介绍了一个监控报警系统的整个环节,每一个环节都缺一不可。, 并且还介绍了Flink未来会结合机器学习算法做一些AIOps。这三个案例比较典型。如果你也在做类似的项目,希望对你的技术选型有所帮助。

  12.1 基于Flink的海量日志实时处理

  Section 11.5 讲解了 Flink 如何实时处理异常日志,并对比分析了几种常用的 log采集 工具。我们也知道,日志是排查在线异常必不可少的一部分。通过异常日志,我们可以快速定位问题的根源。那么企业对日志处理有哪些要求呢?

  12.1.1 海量日志实时处理需求分析

  现在,公司正在构建分布式、微服务和云原生架构。在这样的架构下,项目应用的日志分布在不同的机器上,使得日志查询更加困难。因此,统一的日志采集几乎是不可能的。每个公司都必须的。据笔者研究,现在很多公司都是统一采集日志,也做日志的实时ETL,使用ELK等一些主流技术来展示、搜索、分析日志,但是缺乏实时日志警报。综上所述,大部分公司对于日志的现状是:

  本节作者将讲解日志的完整链接,包括日志的实时采集、日志的ETL、日志的实时监控和告警、日志的存储日志,日志的可视化图表展示和搜索分析等。

  12.1.2 海量日志架构设计的实时处理

  在分析了我们案例的需求之后,接下来就是对整个项目的架构进行合理的设计。

  

  整个架构分为五层:日志访问层、日志调峰层、日志处理层、日志存储层、日志展示层。

  12.1.3 实时记录采集

  在11.5.1中,我对比了这些流行的log采集工具(Logstash、Filebeat、Fluentd、Logagent),从功能完整性、性能、成本、使用难度等方面综合考虑其他方面,这里的演示使用了Filebeat。

  安装 Filebeat

  在服务器上下载Fliebeat6.3.2安装包(请根据你的服务器和你需要的版本下载),下载后解压。

  1

  tar xzf filebeat-6.3.2-linux-x86_64.tar.gz

  配置 Filebeat

  配置 Filebeat,需要编辑 Filebeat 配置文件 filebeat.yml。不同的安装方式,配置文件的存放路径不同。解压包的安装方式,解压目录下有配置文件;对于 rpm 和 deb 方法,配置文件路径是 /etc/filebeat/filebeat.yml。

  因为Filebeat要实时登录采集,所以要让Filebeat知道日志的路径在哪里。在下面的配置文件中定义日志文件的路径。通常建议在服务器上固定存放日志的路径,然后应用程序的所有日志都放在这个固定的路径下,这样Filebeat的日志路径配置只需要填写一次,和配置一样可以复制到其他机器上运行Filebeat。配置如下。

  1

2

3

4

5

6

  - type: log

# 配置为 true 表示开启

enabled: true

# 日志的路径

paths:

- /var/logs/*.log

  以上配置意味着/var/logs目录下所有以.log结尾的文件都会是采集,然后配置日志输出方式。这里使用的是Kafka,配置如下。

  1

2

3

4

5

6

7

8

  output.kafka:

# 填写 Kafka 地址信息

hosts: ["localhost:9092"]

# 数据发到哪个 topic

topic: zhisheng-log

partition.round_robin:

reachable_only: false

required_acks: 1

  对于上面解释的两个配置,作者将它们写在一个新创建的配置文件kafka.yml中,然后在启动Filebeat的时候使用这个配置。

  1

2

3

4

5

6

7

8

9

10

11

  filebeat.inputs:

- type: log

enabled: true

paths:

- /var/logs/*.log

output.kafka:

hosts: ["localhost:9092"]

topic: zhisheng_log

partition.round_robin:

reachable_only: false

required_acks: 1

  启动 Filebeat

  日志路径的配置和Kafka的配置写完之后,再用如下命令启动Filebeat:

  1

  bin/filebeat -e -c kafka.yml

  执行命令后出现的日志如下,说明启动成功,在终端上也可以看到会打印出metrics数据。

  

  验证 Filebeat 是否正在向 Kafka 发送日志数据

  然后你要检查这个时候日志数据是否真的发送到了Kafka。可以通过kafka自带的命令来消费这个topic,看数据是否持续发送。命令如下:

  1

  bin/kafka-console-consumer.sh --zookeeper 106.54.248.27:2181 --topic zhisheng_log --from-beginning

  如果有数据,则表示数据已经发送到Kafka。如果不喜欢用这种方式验证,可以写一个 Flink Job 来读取 Kafka 中 topic 的数据。比如你写一个job,运行结果如下。日志数据已成功发送到Kafka。

  

  发送到 Kafka 的日志结构

  现在数据已经发送到Kafka,通过在Kafka中消费topic的数据,可以判断数据的格式是否为JSON。结构如下:

  1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

  {

"@timestamp": "2019-10-26T08:18:18.087Z",

"@metadata": {

"beat": "filebeat",

"type": "doc",

"version": "6.8.4",

"topic": "zhisheng_log"

},

"prospector": {

"type": "log"

},

"input": {

"type": "log"

},

"beat": {

"name": "VM_0_2_centos",

"hostname": "VM_0_2_centos",

"version": "6.8.4"

},

"host": {

"name": "VM_0_2_centos"

},

"source": "/var/logs/middleware/kafka.log",

"offset": 9460,

"log": {

"file": {

"path": "/var/logs/middleware/kafka.log"

}

},

"message": "2019-10-26 16:18:11 TRACE [Controller id=0] Leader imbalance ratio for broker 0 is 0.0 (kafka.controller.KafkaController)"

}

  这个日志结构收录很多字段,例如时间戳、元数据、主机、来源、消息等,但其中有些字段根本不需要。您可以根据公司的需要丢弃部分字段,并配置要丢弃的字段。在 kafka.yml 中如下图所示。

  1

2

3

  processors:

- drop_fields:

fields: ["prospector","input","beat","log","offset","@metadata"]

  然后再次启动Filebeat,发现上面配置的字段不在新数据中(@metadata除外)。另外,作者验证了不仅@metadata字段不能丢弃,而且如果在drop_fields中配置了@timestamp字段,则不Works,两者都不允许丢弃。一般来说,一行日志就足够长了。添加这么多我们不需要的字段会增加数据的大小。对于生产环境来说,日志数据量非常大,无疑会对后续的所有环节造成损害。有一定的影响,所以一定要做好底层数据源的精简。另外,发送Kafka时可以压缩数据,可以在配置文件中配置一个压缩:gzip。简化的日志数据结构如下:

  1

2

3

4

5

6

7

8

9

10

11

12

13

14

  {

"@timestamp": "2019-10-26T09:23:16.848Z",

"@metadata": {

"beat": "filebeat",

"type": "doc",

"version": "6.8.4",

"topic": "zhisheng_log"

},

"host": {

"name": "VM_0_2_centos"

},

"source": "/var/logs/middleware/kafka.log",

"message": "2019-10-26 17:23:11 TRACE [Controller id=0] Leader imbalance ratio for broker 0 is 0.0 (kafka.controller.KafkaController)"

}

  12.1.4 统一日志格式

  因为Filebeat是机器上的采集日志,所以这些日志的种类很多,比如应用运行日志、作业构建、编译打包日志、中间件服务运行日志等。通常在公司,可以约定开发日志打印的规则,但是中间件等服务的日志是不固定的。如果 Kafka 中的消息直接存储在 ElasticSearch 中,以后想区分过滤可能会出现问题。. 为了避免这个问题,我们必须在将日志存储到 ElasticSearch 之前进行数据格式化和清理工作。因为 Flink 处理数据的速度更快,并且可以实时完成,所以我们选择在 Flink Job 中完成这项工作。

  解析这个作业中的消息,这一行的日志信息一般收录很多信息,比如日志打印时间、日志级别、应用名称、唯一ID(用来关联每个请求)、请求上下文等。那么我们需要一个新的日志结构对象来统一日志的格式,定义如下:

  12.1.5 实时日志清理12.1.6 实时日志报警12.1.7 实时日志存储1 2.1.8 日志实时展示12.1.9 总结与反思

  加入知识星球看上面的文章:

  

  纯粹的乐趣

  

  扫码打赏,如你所说

  

  

  打开支付宝扫一扫,即可扫码打赏

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线