技术文章:Filebeat+Kafka+ELK日志采集(二)——Filebeat
优采云 发布时间: 2022-10-04 01:06技术文章:Filebeat+Kafka+ELK日志采集(二)——Filebeat
一、Filebeat概述
日志采集使用filebeat,采集的日志经过简单处理(多行合并)发送到Kafka、Logstash、Elasticsearch等。
2. 快速上手
让我们从最简单的模型快速开始,然后谈谈原理和细节。
2.1。下载、安装、配置、启动:
1. 下载
curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-8.3.2-linux-x86_64.tar.gz
2.减压
tar xzvf filebeat-8.3.2-linux-x86_64.tar.gz
3.配置
进入filebeat解压目录,编辑filebean.yml
#输入配置
filebeat.inputs:
#输入类型
- type: log
#开启输入
enabled: true
#日志文件路径
paths:
- /usr/share/filebeat/log/test.log
#输出到控制台
output.console:
pretty: true
enable: true
4.开始:
./filebeat -e -c filebeat.yml
五、成功案例
启动成功后,将信息写入配置中的日志文件(/usr/share/filebeat/log/test.log),控制台会打印日志采集,如图1-1以下:
如上图所示,最简单的Filebeat日志采集已经构建成功(指定文件路径,直接输出到控制台)。message字段是日志文件中的日志信息,其他数据是Filebeat附加的信息,包括采集time@TimeStamp、日志文件路径路径等。
6. 实际工作开发
Filebeat的工作原理,采集发送到Kafka/Logstash/Elasticsearch的数据,日志格式和字段处理等详细介绍如下。
三、什么是Filebeat 3.1,Filebeat和Beats的关系
首先filebeat是Beats的一员。
Beats 是一个轻量级的日志采集器。事实上,Beats 家族有 6 个成员。在早期的 ELK 架构中,Logstash 用于采集和解析日志,但 Logstash 消耗的内存、cpu、io 等资源较多。与 Logstash 相比,Beats 占用的系统 CPU 和内存几乎可以忽略不计。
Beats 目前包括六种工具:
3.2. 什么是文件节拍
Filebeat 是一个用于转发和集中日志数据的轻量级交付工具。Filebeat 监控您指定的日志文件或位置,采集日志事件,并将它们转发到 Elasticsearch 或 Logstash 进行索引。
Filebeat 的工作原理是这样的:当您启动 Filebeat 时,它会启动一个或多个输入,并在为日志数据指定的位置中查找这些输入。对于 Filebeat 找到的每个日志,Filebeat 都会启动一个采集器。每个采集器读取单个日志以获取新内容并将新日志数据发送到 libbeat,libbeat 将聚合事件并将聚合数据发送到为 Filebeat 配置的输出。
工作流程图3-1如下:
4.filebeat的原理是什么 4.1、filebeat的组成
filebeat 结构:由两个组件组成,输入(输入)和收割机(采集器),它们一起工作以跟踪文件并将事件数据发送到您指定的输出。收割机负责读取单个文件的内容。收割机逐行读取每个文件并将内容发送到输出。为每个文件启动一个收割机。收割机负责打开和关闭文件,这意味着文件描述符在收割机运行时保持打开状态。如果文件在采集过程中被删除或重命名,Filebeat 将继续读取该文件。这样做的一个副作用是磁盘上的空间被保留,直到收割机关闭。默认情况下,Filebeat 会保持文件打开,直到达到 close_inactive。
关闭收割机可以产生结果:
文件处理程序关闭,如果收割机仍在读取文件,则将其删除,则释放底层资源。
只有在 scan_frequency 过期后,才会重新开始采集文件。
如果在收割机关闭时移动或删除文件,则不会继续采集文件。
输入负责管理收割机并查找所有要读取的资源。如果输入类型是日志,输入将查找驱动器上与定义的路径匹配的所有文件,并为每个文件启动收割机。每个输入都运行在自己的 Go 进程中,Filebeat 目前支持多种输入类型。每种输入类型都可以定义多次。日志输入检查每个文件以查看是否需要启动收割机,收割机是否已在运行,或者该文件是否可以忽略
4.2、filebeat如何保存文件的状态
Filebeat 会保存每个文件的状态,并经常将状态刷新到磁盘上的注册表文件中。此状态用于记住收割机读取的最后一个偏移量,并确保发送所有日志行。如果输出不可访问(如 Elasticsearch 或 Logstash),Filebeat 将跟踪发送的最后一行,并在输出再次可用时继续读取文件。当 Filebeat 运行时,每个输入的状态信息也保存在内存中。当 Filebeat 重新启动时,来自注册表文件的数据用于重建状态,并且 Filebeat 在最后一个已知位置继续每个收割机。对于每个输入,Filebeat 都会保留它找到的每个文件的状态。由于文件可以重命名或移动,因此文件名和路径不足以识别文件。对于每个文件,
4.3. filebeat如何保证至少有一次数据消费
Filebeat 保证事件将至少传递到配置的输出一次,并且不会丢失任何数据。因为它将每个事件的传递状态存储在注册表文件中。在定义的输出被阻塞并且所有事件都未被确认的情况下,Filebeat 将继续尝试发送事件,直到输出确认已接收到事件。如果 Filebeat 在发送事件的过程中关闭,它不会在关闭之前等待输出确认所有事件。当 Filebeat 重新启动时,在 Filebeat 关闭之前未确认的所有事件都会再次发送到输出。这可确保每个事件至少发送一次,但您最终可能会将重复的事件发送到输出。
5. Filebeat使用详细说明
本节将介绍Filebeat采集多数据源(多输入)、原创日志处理、字段过滤、搭配输出到Kafka/Logstash/Elasticsearch等功能。
完整的配置如下。后续对输入、输出、过滤等功能的分析将根据完整的配置进行。
# ====================== Inputs =====================
#日志采集类型及路径(可配置多个)
filebeat.inputs:
- type: log
enabled: true
#每次采集缓冲大小,默认16k(16384),可手动调大,提供吞吐量
#harvester_buffer_size: 1638400
#每条日志最大字节数,默认10M,超过该设置将丢弃剩余信息。
# max_bytes: 10485760
#日志文件路径
paths:
#采集该具体日志文件
- /var/log/test.log
#添加新字段可发送至不同topic
fields:
kafka_topic: firstTopic
#第二个采集配置
- type: log
enabled: true
paths:
#采集该目录下所有.log文件
- /var/log/*.log
#添加新字段可发送至不同topic
fields:
kafka_topic: secondTopic
#多行合并规则,以时间开头的为一条完整日志,否则合并到上一行(java、python日志都以日期开头)
multiline.type: pattern
#中括号日期开头:[2015-08-24 11:49:14,389]
#multiline.pattern: '^\[[0-9]{4}-[0-9]{2}-[0-9]{2}'
#日期开头:2015-08-24 11:49:14,389
multiline.pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}'
multiline.negate: true
multiline.match: after
#合并最大条数,默认500
mutiline.max_lines: 1000
# 这个文件记录日志读取的位置,如果容器重启,可以从记录的位置开始取日志
# registry_file: /usr/soft/filebeat/data/registry
# ============= Filebeat modules ====================
filebeat.config.modules:
# Glob pattern for configuration loading
path: ${path.config}/modules.d/*.yml
# Set to true to enable config reloading
reload.enabled: false
# ==================== Outputs =========================
#kafka地址,可配置多个用逗号隔开
output.kafka:
enabled: true
hosts: ["192.168.154.128:9092","192.168.154.129:9092"]
<p>
#根据上面添加字段发送不同topic
topic: '%{[fields.kafka_topic]}'
#控制台输出
#output.console:
# pretty: true
# enable: true
# ===================== Processors ===========================
processors:
- add_host_metadata:
when.not.contains.tags: forwarded
- add_cloud_metadata: ~
- add_docker_metadata: ~
- add_kubernetes_metadata: ~
#设置忽略字段,以下字段不显示在日志中
- drop_fields:
fields: ["host","input","agent","ecs","log","@version","flags"]
ignore_missing: false
</p>
5.1。输入配置
Filebeat输入类型包括:log/filestream(日志文件)、Kafka、Redis、UDP、TCP、HTTP等20多种输入类型,具体请参考官方文档:输入配置。
本文以多个日志输入的形式进行说明,如下配置所示,采集两个不同文件地址的日志信息。
# ====================== Inputs =====================
#日志采集类型及路径(可配置多个)
filebeat.inputs:
- type: log
enabled: true
#日志文件路径
paths:
#采集该具体日志文件
- /var/log/test.log
#添加新字段可发送至不同topic
fields:
kafka_topic: firstTopic
#第二个采集配置
- type: log
enabled: true
paths:
#采集该目录下所有.log文件
- /var/log/*.log
#添加新字段可发送至不同topic
fields:
kafka_topic: secondTopic
如上代码所示,第一个采集source采集具体文件/var/log/test.log;
第二个采集sources采集 /var/log/ 目录下的所有.log 文件。
将fields.kafka_topic 字段添加到每个采集 源中,然后可以根据该字段动态发送到不同的主题。
5.2. 多行日志合并
实际项目中完整的日志可能收录多行信息,比如下面的Java错误日志。
2022-01-07 14:21:31.616 [main] [org.springframework.boot.SpringApplication]
ERROR: Application run failed
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'scopedTarget.Config': Injection of autowired dependencies failed;
Caused by: java.lang.IllegalArgumentException: Could not resolve placeholder at org.springframework.util.PropertyPlaceholderHelper.parseStringValue(PropertyPlaceholderHelper.java:178)
因为Filebeat采集每次都是以行为单位,默认每一行都被认为是一条消息,所以需要将多行日志组合成一个完整的日志。
操作方法:信息收集之 操作系统识别
《作者主页》:志别三天wyx
《作者简介》:CSDN top100、阿里云博客专家、华为云分享专家、网络安全领域优质创造者
《专栏介绍》:此文章已收录在《网络安全快速入门》专栏
为什么要识别操作系统?
不同的操作系统,同一操作系统的不同版本,默认开放的服务和开放的漏洞都是不同的。
操作系统识别
1.人工识别
通过改变路径和Ping命令的大小写,可以大致区分操作系统。
1.更改案例
Windows 系统不区分大小写,Linux 系统区分大小写。
根据这个特性,改变地址栏中路径的大小写,如果页面不受影响,则为Windows系统;如果没有找到该页面,则为Linux系统。
1) 例如这个 网站:
将地址栏中的路径由小写改为大写,页面不受影响,说明网站不区分大小写,是Windows系统。
2)看下面的网站:
将地址栏中的路径由大写改为小写,页面变为404,说明网站区分大小写,是Linux系统。
2. TTL
TTL(Time To Live)是IPv4请求包的一个字段,用来表示一个IP数据包在网络中可以转发的最大跳数(最大255)。
Windows系统默认TTL为128,Linux系统默认TTL为64。我们可以通过TTL来判断目标操作系统。
1)直接ping目标网站,如果TTL在65~128之间,则表示Windows系统。
以下是我ping通的该网段的Windows系统。由于没有网络,所以ttl没有减少,是128。
2)如果TTL在1到64之间,说明是Linux系统。
下面是我ping这个网段的Linux系统。由于没有网络,所以ttl没有减少,是64。
TTL只能粗略判断操作系统,不能判断操作系统的版本。
由于TTL的默认值是可以修改的,所以根据TTL值判断的操作系统类型只能作为参考。
二、工具识别 1. Nmap
Nmap(Network Mapper)是一个网络检测和嗅探工具,可以根据特征行为指纹匹配特征库判断操作系统和版本;
-O 参数扫描目标 网站 的操作系统。
语法:nmap -O IP
1)我们去网上找一个网站来测试一下。从下图中的扫描结果可以看出,目标网站是Linux系统,版本大概在2.4或2.6之间。
值得一提的是,我们使用ping命令来测试这个网站的操作系统。根据TTL(128),是Windows系统,如下图所示:
很明显,目标主机修改了TTL的默认值,这也说明操作系统很容易根据TTL进行欺骗。
2)我们拿自己的虚拟机来测试一下。从下图中我们可以发现nmap扫描的结果是win XP、win 7或者win2012:
其实我的虚拟机是win 10:
可以看出,Nmap的扫描结果并不是100%正确,但是参考度还是比较高的。
2.p0f
p0f 是一种被动指纹识别工具,可捕获通过的流量并根据数据包确定操作系统。
在命令行输入p0f回车,进入被动检测状态,然后使用浏览器访问目标网站。
如下图,os栏显示p0f识别的操作系统。
p0f工具不能保证100%的准确率,大多数识别工具的结果只能提供一定的参考价值。