实时文章采集(基于Openresty+Lua+Kafka对日志进行实时实时的采集)
优采云 发布时间: 2022-04-12 21:00实时文章采集(基于Openresty+Lua+Kafka对日志进行实时实时的采集)
介绍
在很多数据采集场景中,Flume是一个高性能的采集日志工具,相信大家都知道。很多人想到的Flume组件,大部分人能想到的就是Flume和Kafka的结合,用于日志记录采集,这种方案有很多优点,比如高性能、高吞吐、数据可靠性等。但是如果我们需要实时的采集 日志,这显然不是一个好的解决方案。原因如下:
目前,Flume 可以支持对某个目录下的数据文件进行实时监控。一旦一个目录的文件采集完成,它就会被标记为完成。如果数据稍后进入这个文件,Flume 不会。会被检测到。
因此,我们更多的使用这个方案来计时采集,只要生成了一个新的数据目录,我们就会采集这个目录下的数据文件。
那么本文文章将介绍基于Openresty+Lua+Kafka采集的实时日志。
需要
很多时候,我们需要对用户的埋点数据进行一次实时的采集,然后利用这个数据对用户的行为做一些实时的分析。所以,第一步当然是解决如何对数据进行实时采集。
我们这里使用的解决方案是 Openresty+Lua+Kafka。
原理介绍
那么什么是 Openresty?以下是官方引用:
OpenResty 是一个基于 Nginx 和 Lua 的高性能 Web 平台,集成了大量复杂的 Lua 库、第三方模块和大部分依赖项。它用于轻松构建能够处理超高并发和可扩展性的动态 Web 应用程序、Web 服务和动态网关。
OpenResty 通过汇集各种精心设计的 Nginx 模块,有效地将 Nginx 变成了一个强大的通用 Web 应用平台。这样,Web 开发者和系统工程师就可以使用 Lu 脚本语言调动 Nginx 支持的各种 C 和 Lua 模块,快速构建出单机并发连接数达到 10K 甚至超过 1000 的高性能 Web 应用系统。
OpenResty 的目标是让你的 Web 服务直接在 Nginx 服务内部运行,充分利用 Nginx 的非阻塞 I/O 模型,不仅适用于 HTTP 客户端请求,还适用于远程后端,如 MySQL、PostgreSQL、Memcached 和Redis 等都提供一致的高性能响应。
简单来说就是通过Nginx传递客户端的请求(本文指的是用户的行为日志),将用户的数据传递到我们指定的地方(Kafka)。为了实现这个需求,我们使用 Lua 脚本,因为 Openresty 封装了各种 Lua 模块,其中之一就是对 Kafka 模块进行分包。我们只需要编写一个简单的脚本,通过 Nginx 将用户的数据转发到 Kafka,供后续的数据消费使用。
为了您的方便,这里是一个架构图:
这里简单总结一下使用Openresty+Lua+Kafka的优势:
1.支持多种业务数据,不同的业务数据,只需要配置不同的Lua脚本,就可以将不同的业务数据发送到Kafka中的不同topic。
2.用户触发埋点数据实时跟踪采集
3.高可靠集群,由于Openresty基于Nginx,其集群具有非常高的性能和稳定性。
4.高并发,与tomcat、apache等web服务器相比,Nginx的并发比其他两个要高很多。一般情况下,处理几万并发并不难。
那么让我们开始吧。
安装 Openresty
本例采用单机部署形式。单机部署成功后,集群搭建与单机相同,只是在不同的机器上执行相同的步骤。
注:本实验基于centos7.0操作系统
1.下载 Openresty 依赖:
yum install readline-devel pcre-devel openssl-devel gcc
2.编译安装Openresty:
#1.安装openresty:
mkdir /opt/software
mkdir /opt/module
cd /opt/software/ # 安装文件所在目录
wget https://openresty.org/download/openresty-1.9.7.4.tar.gz
tar -xzf openresty-1.9.7.4.tar.gz -C /opt/module/
cd /opt/module/openresty-1.9.7.4
#2.配置:
# 指定目录为/opt/openresty,默认在/usr/local。
./configure --prefix=/opt/openresty \
--with-luajit \
--without-http_redis2_module \
--with-http_iconv_module
make
make install
3.安装 lua-resty-kafka
因为我们需要通过nginx+lua脚本将数据转发到Kafka,所以在编写lua脚本的时候需要用到lua模块中对Kafka的一些依赖。
#下载lua-resty-kafka:
cd /opt/software/
wget https://github.com/doujiang24/lua-resty-kafka/archive/master.zip
unzip master.zip -d /opt/module/
#拷贝kafka相关依赖脚本到openresty
cp -rf /opt/module/lua-resty-kafka-master/lib/resty/kafka/ /opt/openresty/lualib/resty/
注意:由于kafka大家都比较熟悉,这里就不介绍它的安装了。
Openresty安装完成后,目录结构如下:
drwxr-xr-x 2 root root 4096 Mar 24 14:26 bin
drwxr-xr-x 6 root root 4096 Mar 24 14:26 luajit
drwxr-xr-x 7 root root 4096 Mar 24 14:29 lualib
drwxr-xr-x 12 root root 4096 Mar 24 14:40 nginx
4.配置文件
编辑 /opt/openresty/nginx/conf/nginx.conf
user nginx; #Linux的用户
worker_processes auto;
worker_rlimit_nofile 100000;
#error_log logs/error.log;
#error_log logs/error.log notice;
#error_log logs/error.log info;
#pid logs/nginx.pid;
events {
worker_connections 102400;
multi_accept on;
use epoll;
}
http {
include mime.types;
default_type application/octet-stream;
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';
access_log /var/log/nginx/access.log main;
resolver 8.8.8.8;
#resolver 127.0.0.1 valid=3600s;
sendfile on;
keepalive_timeout 65;
underscores_in_headers on;
gzip on;
include /opt/openresty/nginx/conf/conf.d/common.conf; #common.conf这个文件名字可自定义
}
编辑 /opt/openresty/nginx/conf/conf.d/common.conf
##api
lua_package_path "/opt/openresty/lualib/resty/kafka/?.lua;;";
lua_package_cpath "/opt/openresty/lualib/?.so;;";
lua_shared_dict ngx_cache 128m; # cache
lua_shared_dict cache_lock 100k; # lock for cache
server {
listen 8887; #*敏*感*词*端口
server_name 192.168.3.215; #埋点日志的ip地址或域名,多个域名之间用空格分开
root html; #root指令用于指定虚拟主机的网页根目录,这个目录可以是相对路径,也可以是绝对路径。
lua_need_request_body on; #打开获取消息体的开关,以便能获取到消息体
access_log /var/log/nginx/message.access.log main;
error_log /var/log/nginx/message.error.log notice;
location = /lzp/message {
lua_code_cache on;
charset utf-8;
default_type 'application/json';
content_by_lua_file "/opt/openresty/nginx/lua/testMessage_kafka.lua";#引用的lua脚本
}
}
编辑 /opt/openresty/nginx/lua/testMessage_kafka.lua
#创建目录mkdir /opt/openresty/nginx/lua/
vim /opt/openresty/nginx/lua/testMessage_kafka.lua
#编辑内存如下:
-- require需要resty.kafka.producer的lua脚本,没有会报错
local producer = require("resty.kafka.producer")
-- kafka的集群信息,单机也是可以的
local broker_list = {
{host = "192.168.3.215", port = 9092},
}
-- 定义最终kafka接受到的数据是怎样的json格式
local log_json = {}
--增加read_body之后即可获取到消息体,默认情况下可能会是nil
log_json["body"] = ngx.req.read_body()
log_json["body_data"] = ngx.req.get_body_data()
-- 定义kafka同步生产者,也可设置为异步 async
-- -- 注意!!!当设置为异步时,在测试环境需要修改batch_num,默认是200条,若大不到200条kafka端接受不到消息
-- -- encode()将log_json日志转换为字符串
-- -- 发送日志消息,send配套之第一个参数topic:
-- -- 发送日志消息,send配套之第二个参数key,用于kafka路由控制:
-- -- key为nill(空)时,一段时间向同一partition写入数据
-- -- 指定key,按照key的hash写入到对应的partition
-- -- batch_num修改为1方便测试
local bp = producer:new(broker_list, { producer_type = "async",batch_num = 1 })
-- local bp = producer:new(broker_list)
local cjson = require("cjson.safe")
local sendMsg = cjson.encode(log_json)
local ok, err = bp:send("testMessage",nil, sendMsg)
if not ok then
ngx.log(ngx.ERR, 'kafka send err:', err)
elseif ok then
ngx.say("the message send successful")
else
ngx.say("未知错误")
end
5.启动服务运行:
useradd nginx #创建用户
passwd nginx #设置密码
#设置openresty的所有者nginx
chown -R nginx:nginx /opt/openresty/
#启动服务
cd /opt/openresty/nginx/sbin
./nginx -c /opt/openresty/nginx/conf/nginx.conf
查看服务:
ps -aux | grep nginx
nginx 2351 0.0 0.1 231052 46444 ? S Mar30 0:33 nginx: worker process
nginx 2352 0.0 0.1 233396 48540 ? S Mar30 0:35 nginx: worker process
nginx 2353 0.0 0.1 233396 48536 ? S Mar30 0:33 nginx: worker process
nginx 2354 0.0 0.1 232224 47464 ? S Mar30 0:34 nginx: worker process
nginx 2355 0.0 0.1 231052 46404 ? S Mar30 0:33 nginx: worker process
nginx 2356 0.0 0.1 232224 47460 ? S Mar30 0:34 nginx: worker process
nginx 2357 0.0 0.1 231052 46404 ? S Mar30 0:34 nginx: worker process
nginx 2358 0.0 0.1 232224 47484 ? S Mar30 0:34 nginx: worker process
root 7009 0.0 0.0 185492 2516 ? Ss Mar24 0:00 nginx: master process ./nginx -c /opt/openresty/nginx/conf/nginx.conf
查看端口:
netstat -anput | grep 8887
tcp 0 0 0.0.0.0:8887 0.0.0.0:* LISTEN 2351/nginx: worke
看到上面的流程证明服务运行正常
6.使用postman发送post请求进行简单测试,看看kafka能否接收数据
7.Kafka消费数据:
kafka-console-consumer --bootstrap-server 192.168.3.215:9092 --topic testMessage --from-beginning
如果数据被消费,则证明配置成功。如果没有调整,可以查看/var/log/nginx/message.access.log和/var/log/nginx/message.error.log中的相关错误日志进行调整。
总结
使用Openresty+Lua+Kafka,可以将用户的埋点数据实时发送到kafka集群,而Openresty是基于Nginx的,Nginx可以处理上万并发,所以即使用户数据很短一段时间这种架构也可以轻松应对内部浪涌而不会导致集群崩溃。另一方面,如果数据过多导致集群过载,我们可以随时添加另一台机器,非常方便。
另一个小扩展:如果业务数据很多,需要发送到不同的topic,我们不需要写多个脚本,而是可以联系后端添加一个json格式的字段,以及这个的值字段是主题的值名称。我们只需要编写一个通用脚本来解析Json数据并取出主题名称。