让您了解zabbix集成了ELK来采集系统异常日志以触发警报〜

优采云 发布时间: 2020-08-08 02:57

  让我们今天了解ELK的“ L” -Logstash. 是的,这就是神奇的小组成部分. 众所周知,它是ELK不可或缺的组成部分. 它完成输入,过滤和输出. (输出)工作量也是我们作为运维人员需要掌握的困难. 说到这一点,我们充满爱与恨. “爱是美好,仇恨是困难的”;这个Logstash具有强大的插件功能,除了对我们进行过滤外,高效的日志输出还可以帮助我们与Zabbix监视集成吗?

  由于我们的Logstash支持多种输出类型,因此它可以采集Web服务日志,系统日志和内核日志;但是,有日志输出,这肯定无法避免错误日志的出现;当出现错误日志时尽管可以通过ELK找到它,但ELK无法提供实时警报,这有点尴尬. 我们要做的是既要像zabbix和nagios一样进行监控,也要发出警报. ELK仅对此进行监视,但不对其发出警报;但是没关系,我们的Logstash插件可以与zabbix结合使用,以采集需要警报的日志(例如,带有错误标识的日志)以完成日志监视并触发警报〜

  Logstash支持多种输出介质,例如syslog,http,tcp,elasticsearch,kafka等. 如果我们将logstash采集的日志输出到zabbix警报,则必须使用logstash-output-zabbix插件,并通过此插件集成使用zabbix的logstash,过滤logstash采集的数据,将错误信息的日志输出到zabbix,最后通过zabbix告警机制触发;

  [root@localhost ~]# /usr/local/logstash/bin/logstash-plugin install logstash-output-zabbix #安装logstash-output-zabbix插件

Validating logstash-output-zabbix

Installing logstash-output-zabbix

Installation successful

  环境案例要求:

  通过读取系统日志文件监控信息,过滤掉日志信息中的错误关键字,如ERR,错误,失败,警告等信息,用异常关键字过滤掉这些异常日志信息,然后输出到zabbix,通过zabbix警报机制触发警报;以下环境为filebeat作为采集终端;输出到kafaka消息队列,最后将日志由logsatsh提取和过滤,并输出到zabbix

  [filebeat]日志采集终端

  filebeat.inputs:

- type: log

enabled: true

paths:

- /var/log/secure

- /var/log/messages

- /var/log/cron

fields:

log_topic: system_log

processors:

- drop_fields:

fields: ["beat", "input", "source", "offset", "prospector"] #这里在filebeat中直接去掉不需要的字段。

filebeat.config.modules:

path: ${path.config}/modules.d/*.yml

reload.enabled: false

name: 192.168.37.147 #这是日志输出标识,表明日志来自哪个主机,后面再logstash会用到。

output.kafka:

enabled: true

hosts: ["192.168.37.147:9092", "192.168.37.148:9092", "192.168.37.149:9092"] #日志输出到kafka集群

version: "0.10"

topic: '%{[fields.log_topic]}'

partition.round_robin:

reachable_only: true

worker: 2

required_acks: 1

compression: gzip

max_message_bytes: 10000000

logging.level: debug

  [Logstash端]

  

  [root @ localhost〜]#vim /usr/local/logstash/config/etc/system_log.conf

  input {

kafka {

bootstrap_servers => "192.168.37.147:9092,192.168.37.148:9092,192.168.37.149:9092"

topics => ["system_log"]

codec => "json"

}

}

filter {

if [fields][log_topic] == "system_log" { #指定filebeat产生的日志主题

mutate {

add_field => [ "[zabbix_key]", "oslogs" ] #新增的字段,字段名是zabbix_key,值为oslogs。

add_field => [ "[zabbix_host]", "%{[host][name]}" ] #新增的字段,字段名是zabbix_host,值可以在这里直接定义,也可以引用字段变量来获取。这里的%{[host][name]获取的就是日志数据的来源IP,这个来源IP在filebeat配置中的name选项进行定义。

}

}

grok {

match => { "message" => "%{SYSLOGTIMESTAMP:message_timestamp} %{SYSLOGHOST:hostname} %{DATA:message_program}(?:\[%{POSINT:message_pid}\])?: %{GREEDYDATA:message_content}" } #这里通过grok对message字段的数据进行字段划分,这里将message字段划分了5个子字段。其中,message_content字段会在output中用到。

}

mutate { #这里是删除不需要的字段

remove_field => "@version"

remove_field => "message"

}

date { #这里是对日志输出中的日期字段进行转换,其中message_timestamp字段是默认输出的时间日期字段,将这个字段的值传给 @timestamp字段。

match => [ "message_timestamp","MMM d HH:mm:ss", "MMM dd HH:mm:ss", "ISO8601"]

}

}

output {

if [message_content] =~ /(ERR|error|ERROR|Failed)/ { #定义在message_content字段中,需要过滤的关键字信息,也就是在message_content字段中出现给出的这些关键字,那么就将这些信息发送给zabbix。

zabbix {

zabbix_host => "[zabbix_host]" #这个zabbix_host将获取上面filter部分定义的字段变量%{[host][name]的值

zabbix_key => "[zabbix_key]" #这个zabbix_key将获取上面filter部分中给出的值

zabbix_server_host => "192.168.37.149" #这是指定zabbix server的IP地址

zabbix_server_port => "10051" #这是指定zabbix server的*敏*感*词*端口

zabbix_value => "message_content" #定要传给zabbix监控项item(oslogs)的值, zabbix_value默认的值是"message"字段,因为上面我们已经删除了"message"字段,因此,这里需要重新指定,根据上面filter部分对"message"字段的内容划分,这里指定为"message_content"字段,其实,"message_content"字段输出的就是服务器上具体的日志内容。

}

}

}

  [root @ localhost logstash]#nohup / usr / local / logstash / bin / logstash -f config / etc / system_log.conf --path.data = / tmp /#在此,--path.data指定单词logstash进程的数据存储目录,用于在服务器上启动多个logstash进程环境

  

  [测试]不确定事件配置文件是否正确,我们可以在前台运行并输出标准输出;验证是否成功过滤了文件拍采集的日志〜

  stdout {codec => ruby​​debug}#我们将这条指令添加到输出终端,在前台运行测试,看它是否可以过滤出错误日志输出. 效果如下〜(记得在ok run后注释掉该指令并在后台运行)

  #/ usr / local / logstash / bin / logstash -f config / etc / system_log.conf --path.data = / tmp /

  

  [创建了zabbix监视模板以立即发出警报]

  1. 创建模板

  将单词模板链接到192.168.37.147,创建的模板上的监视项将自动在192.168.37.147上生效,

  

  2. 创建一个应用程序集,单击“应用程序集”-“创建应用程序集”

  

  3. 创建监控项,单击监控项,创建监控项

  

  4. 警报触发器,创建触发器

  

  

  将我们创建的日志采集模板连接到需要采集日志以验证警报触发效果的主机

  

  [模拟警报]

  ssh连接到192.168.37.147日志采集主机,故意输入错误的密码以使系统生成错误日志,验证是否足以发送到zabbix端,以下是我们过滤后的错误日志信息,例如诸如“错误”,“失败”等. 〜到目前为止,错误日志输出已成功采集〜

  

  

  [摘要]

  首先,让我们尝试一下想法:

  我们的架构基本上没有变化. 仍然是filebat采集日志并将其推送到kibana消息队列,然后Logstash去提取日志数据,并在处理后最终将其传输出去;它只是转移到zabbix的输出;这可以实现功能,核心英雄是Logsatsh插件(logstash-output-zabbix);

  这里需要注意的是: filebeat采集终端的IP必须与zabbix监控主机的IP对应,否则日志将不通过〜

  分享一些技巧: 通过此命令,您可以测试zabbix上定义的键值;以下输出变为正常〜,如果失败为非零,则表示失败

  [root @ localhost zabbix_sender]#/ usr / local / zabbix / bin / zabbix_sender -s 192.168.37.147 -z 192.168.37.149 -k“ oslogs” -o 1

  来自服务器的信息: “已处理: 1;失败: 0;总计: 1;花费的时间: 0.000081”

  已发送: 1;跳过: 0总计: 1

  详细说明: -s: 指定本地代理方

  -z: 指定zabbix服务器

  -k: 指定键值

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线