利用采集器 采集的平台 当 RocketMQ 遇见 Elastic Stack | RocketMQ 使
优采云 发布时间: 2022-05-01 05:18利用采集器 采集的平台 当 RocketMQ 遇见 Elastic Stack | RocketMQ 使
Elastic Stack 是广泛使用的开源日志处理技术栈,最初亦称作 ELK;Elasticsearch 是一个搜索和分析引擎;Logstash 是服务器端数据处理管道,能够同时从多个来源采集数据,转换数据,然后将数据发送到诸如 Elasticsearch 等“存储库”中;Kibana 则可以让用户使用图形和图表对 Elasticsearch 中数据进行可视化。后来在采集器(L)中,加入了一系列轻量的单一功能数据采集器(称为 Beats),其中最常用的是 FileBeat,于是也有 EFK 一说。
为了实现 RocketMQ 与 Elastic Stack 方便对接,需要在采集器(Logstash、Beats)层面支持:
1、RocketMQ作为output:数据从其他数据源发送到RocketMQ2、RocketMQ作为input:数据从RocketMQ到ES等存储
相关项目:
RocketMQ对接Logstash
rocketmq-logstash-integration 项目在RocketMQ添加了对接Logtash的input和output插件,通过Logstash,既可将数据从其他数据源到RocketMQ,也可从RocketMQ消费消息写入ES。
使用方式
安装插件Logtash支持以插件方式在运行时安装 RocketMQ input 和 output,目前 rockemq-logstash-integration 支持 Logstash 8.0版本[3]。
1、参考如下文档,制作Logtash input或output插件gem:
2、进入logstash安装目录中,执行如下命令安装插件(gem文件路径替换成上一步解压出的文件路径):
bin/logstash-plugin install --no-verify --local /path/to/logstash-input-rocketmq-1.0.0.gembin/logstash-plugin install --no-verify --local /path/to/logstash-output-rocketmq-1.0.0.gem
如遇到安装过程很慢,可尝试替换Gemfile中的source为
将数据发送到 RocketMQ
如下配置将采集/var/log/及其子目录下文件名后缀为.log的文件内容,并以消息的形式发送到RocketMQ,topic为topic-test:
input { file { path => ["/var/log/**/*.log"] }}output { rocketmq { namesrv_addr => "localhost:9876" topic => "topic-test" }}
将上述配置保存到rocketmq_output.conf中,启动Logstash即可:
bin/logstash -f /path/to/rocketmq_output.conf
rocketmq_output详细配置说明
从RocketMQ消费消息并输出到ES
如下配置文件可实现消费topic-test中消费消息,并将其写入本地的ES,其中index指定为test-%{+YYYY.MM.dd}:
input { rocketmq { namesrv_addr => "localhost:9876" topic => "topic-test" group => "GID_input" }}output { elasticsearch { hosts => ["127.0.0.1:9200"] index => "test-%{+YYYY.MM.dd}" }}
将上述配置保存到rocketmq_input.conf中,启动Logstash即可:
bin/logstash -f rocketmq_input.conf
rocketmq_input详细配置说明
RocketMQ对接Beats
Beats是一系列的采集器,最初是Filebeat[5],后来增加了Heartbeat[6]、 Metricbeat[7]等不同数据源采集器。
rocketmq-beats-integration项目支持Beats将采集到的数据发送到RocketMQ。
使用方式
安装Beats
与Logtash不同,目前Beats不支持在运行时加载插件,因而需要下载Beats v7.17.0版本[8],对其重新编译后增加支持 RocketMQ 整合。详细操作可参考:
将数据发送到 RocketMQ
以FileBeat为例,如下配置文件可实现从/var/log/messages采集数据,并发送到RocketMQ的TopicTesttopic中:
filebeat.inputs: - type: filestream enabled: true paths: - /var/log/messagesoutput.rocketmq: nameservers: [ "127.0.0.1:9876" ] topic: TopicTest
将上述内容保存至rocketmq_output.yml,运行如下命令:
./filebeat -c rocketmq_output.yml
对于其他类型的Beats,output.rocketmq部分的配置是一致的,只需按照特定Beats配置其数据源即可。
详细配置说明
整合示例
最后,我们介绍一个例子,将Nginx的访问日志导入RocketMQ,通过RocketMQ Streams按分钟统计HTTP返回码的数量,结果记录到RocketMQ,最后保存到Elasticsearch。
Nginx 访问日志 --> RocketMQ
Filebeat占用资源少,适合作为agent采集原始日志。下面的配置文件,可从/var/log/nginx/access.log采集日志,并发送到RocketMQ的TopicNginxAcctopic:
filebeat.inputs: - type: filestream enabled: true paths: - /var/log/nginx/access.logoutput.rocketmq: nameservers: [ "127.0.0.1:9876" ] topic: TopicNginxAcc
启动本文支持RocketMQ的filebeat,即可开启日志采集:
./filebeat -c nginx-beat.yml -e
Nginx 访问日志数据分析
在上一步中,我们将日志数据发送到RocketMQ。接下来,就可以利用RocketMQ Streams对这些日志数据进行处理。
我们从保存Nginx访问日志数据的topicTopicNginxAcc中消费,解析日志内容,只保留统计用到的时间和HTTP返回码字段,然后按1分钟为维度进行统计,最后将统计结果以消息形式发送到TopicNginxCount。
使用RocketMQ Streams需要引入依赖:
org.apache.rocketmq rocketmq-streams-clients 1.0.1-preview
代码如下:
package org.apache.rocketmq;<br /><br />import com.alibaba.fastjson.JSONObject;import org.apache.rocketmq.streams.client.StreamBuilder;import org.apache.rocketmq.streams.client.source.DataStreamSource;import org.apache.rocketmq.streams.client.strategy.WindowStrategy;import org.apache.rocketmq.streams.client.transform.window.Time;import org.apache.rocketmq.streams.client.transform.window.TumblingWindow;<br />import java.time.LocalDateTime;import java.time.format.DateTimeFormatter;import java.util.Locale;import java.util.regex.Matcher;import java.util.regex.Pattern;<br />public class RocketMqStreamsDemo { // nginx访问日志pattern public static final String LOG_PATTERN = "^(?[^ ]*) (?[^ ]*) (?[^ ]*) \\[(?[^\\]]*)\\] \"(?\\S+)(?: +(?[^ ]*) +\\S*)?\" (?[^ ]*) (?[^ ]*)(?: \"(?[^\\\"]*)\" \"(?[^\\\"]*)\" \"(?[^\\\"]*)\")?";<br /> // nginx访问日志的日期格式 private static final DateTimeFormatter fromFormatter = DateTimeFormatter.ofPattern("dd/MMM/yyyy:HH:mm:ss Z", Locale.US); // rocektmq-streams window支持的日期格式 private static final DateTimeFormatter toFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss", Locale.CHINA);<br /> private static final String NAMESRV_ADDRESS = "localhost:9876";<br /><br /> public static void main(String[] args) { Pattern pattern = Pattern.compile(LOG_PATTERN); DataStreamSource dataStream = StreamBuilder.dataStream("my-namespace", "nginx-acc-job"); dataStream .fromRocketmq("TopicNginxAcc", "GID-count-code", true, NAMESRV_ADDRESS) //消费TopicNginxAcc .map(event -> ((JSONObject) event).get("message")) .map(message -> { // 只保留有用的 code 和 time 字段 final String logStr = (String) message; final Matcher matcher = pattern.matcher(logStr); JSONObject logJson = new JSONObject(); if (matcher.find()) { logJson.put("code", matcher.group("code")); final String dateTime = matcher.group("time"); logJson.put("time", LocalDateTime.parse(dateTime, fromFormatter).format(toFormatter)); } System.out.println(logJson); return logJson; }) .window(TumblingWindow.of(Time.minutes(1))) .groupBy("code") //按code聚合 .setTimeField("time") .count("total") .waterMark(1) .toDataSteam() .toRocketmq("TopicNginxCount", "GID-result", NAMESRV_ADDRESS) // 保存到TopicNginxCount .with(WindowStrategy.highPerformance()) .start(); }}
RocketMQ ---> Elasticsearch
最后,我们用Logstash将上一步保存到TopicNginxCount的统计结果,导入到Elasticsearch。
安装完logstash-input-rocketmq插件后,启动Logstash,采用以下配置文件即可:
input { rocketmq { namesrv_addr => "localhost:9876" topic => "TopicNginxCount" group => "GID-nginx-result" }}<br />filter { json { source => "message" remove_field => ["message"] }}<br />output { elasticsearch { hosts => ["http://localhost:9200"] index => "nginx-access-%{+YYYY.MM.dd}" }}
数据将保存到Elasticsearch中以nginx-access开头的index,按天保存。
调用Elasticsearch的Rest API,可以列出index中的数据:
curl -X GET "localhost:9200/nginx-access-2022.03.11/_search?pretty" -H 'Content-Type: application/json' -d'{ "query": { "match_all": {} }}'
下面是部分结果,start_time和end_time分别是统计的起止时间,total是这段时间内HTTP code 为code的统计次数:
{ "hits" : [ { "_index" : "nginx-access-2022.03.11", "_id" : "er-ed38Bn85qBf4OFHDW", "_score" : 1.0, "_source" : { "@version" : "1", "end_time" : "2022-03-11 14:12:00", "start_time" : "2022-03-11 14:11:00", "fire_time" : "2022-03-11 14:12:01", "total" : 52, "@timestamp" : "2022-03-11T06:15:22.928736Z", "code" : "200" } }, { "_index" : "nginx-access-2022.03.11", "_id" : "eb-ed38Bn85qBf4OFHDW", "_score" : 1.0, "_source" : { "@version" : "1", "end_time" : "2022-03-11 14:12:00", "start_time" : "2022-03-11 14:11:00", "fire_time" : "2022-03-11 14:12:01", "total" : 1, "@timestamp" : "2022-03-11T06:15:22.928472Z", "code" : "404" } } ]}
引用链接
[1][2]
[3]
[4][5][6][7]
[8]
[9]
加入 Apache RocketMQ 社区
十年铸剑,Apache RocketMQ 的成长离不开全球接近 500 位开发者的积极参与贡献,相信在下个版本你就是 Apache RocketMQ 的贡献者,在社区不仅可以结识社区大牛,提升技术水平,也可以提升个人影响力,促进自身成长。
社区 5.0 版本正在进行着如火如荼的开发,另外还有接近 30 个 SIG(兴趣小组)等你加入,欢迎立志打造世界级分布式系统的同学加入社区,添加社区开发者微信:rocketmq666 即可进群,参与贡献,打造下一代消息、事件、流融合处理平台。
微信扫码添加小火箭进群
另外还可以加入钉钉群与 RocketMQ 爱好者一起广泛讨论:
钉钉扫码加群