Openresty + Lua + Kafka实现实时日志采集

优采云 发布时间: 2020-08-06 09:06

  简介

  在许多数据采集方案中,Flume是高性能的日志采集工具. 我相信每个人都知道. 许多人认为Flume是一个组件,可以将它们中的大多数与Flume和Kafka的组合相关联以进行日志采集. 该解决方案具有许多优势,例如高性能,高吞吐量和数据可靠性. 但是,如果我们需要实时采集日志,显然这不是一个好的解决方案. 原因如下:

  目前,Flume可以支持对目录中数据文件的实时监视. 某个目录的文件采集完成后,将使用完成的符号进行标记. 如果以后有数据输入此文件,则不会检测到Flume.

  因此,我们通常使用这种方案进行计时采集. 只要生成新的数据目录,我们就会将数据文件采集到该目录中.

  然后,本文将向您介绍基于Openresty + Lua + Kafka的实时日志采集.

  要求

  很多时候,我们需要实时采集用户的掩埋点数据,然后使用这些数据对用户的行为进行一些实时分析. 因此,第一步当然是解决如何实时采集数据.

  我们在这里使用的解决方案是Openresty + Lua + Kafka.

  原理介绍

  那么什么是Openresty?这是官方报价:

  OpenResty是基于Nginx和Lua的高性能Web平台. 它集成了许多复杂的Lua库,第三方模块及其大多数依赖项. 它用于方便地构建可处理超高并发性和高可伸缩性的动态Web应用程序,Web服务和动态网关.

  OpenResty通过融合各种精心设计的Nginx模块,有效地将Nginx变成了功能强大的通用Web应用程序平台. 这样,Web开发人员和系统工程师可以使用Lu脚本语言来调动Nginx支持的各种C和Lua模块,并快速构建一个具有10K甚至1000个以上的单机并发连接的高性能Web应用程序系统.

  OpenResty的目标是使您的Web服务直接在Nginx服务内部运行,充分利用Nginx的非阻塞I / O模型,不仅用于HTTP客户端请求,甚至用于远程后端(例如MySQL,PostgreSQL) ,Memcached和Redis等具有一致的高性能响应.

  简单来说,就是通过Nginx发送客户端的请求(本文指的是用户的行为日志),以将用户的数据传递到我们指定的位置(卡夫卡),为了达到这一要求,我们使用Lua脚本,因为Openresty封装了各种Lua模块,其中之一是子安装Kafka模块,所以我们只需要编写一个简单的脚本即可通过Nginx将用户数据转发到Kafka,以便随后使用数据.

  以下是供大家理解的体系*敏*感*词*:

  

  以下是使用Openresty + Lua + Kafka的优点的简要摘要:

  1. 支持多种业务数据,不同的业务数据,只需要配置不同的Lua脚本,就可以将不同的业务数据发送到Kafka中的不同主题.

  2. 实时采集用户触发的埋藏点数据

  3. 高度可靠的集群. 由于Openresty基于Nginx,因此其群集具有非常高的性能和稳定性.

  4. 高并发. 与tomcat,apache和其他Web服务器相比,Nginx的并发性比其他两个要高得多. 在正常情况下处理数万个并发并不难.

  接下来让我们做一些实际的工作.

  安装Openresty

  此示例使用独立部署表单. 成功完成独立部署后,将在独立计算机上构建集群,只是在不同的计算机上执行相同的步骤.

  注意: 本实验基于centos7.0操作系统

  1. 下载Openresty依赖项:

  yum install readline-devel pcre-devel openssl-devel gcc

  2. 编译并安装Openresty:

  #1.安装openresty:

mkdir /opt/software

mkdir /opt/module

cd /opt/software/ # 安装文件所在目录

wget https://openresty.org/download/openresty-1.9.7.4.tar.gz

tar -xzf openresty-1.9.7.4.tar.gz -C /opt/module/

cd /opt/module/openresty-1.9.7.4

#2.配置:

# 指定目录为/opt/openresty,默认在/usr/local。

./configure --prefix=/opt/openresty \

--with-luajit \

--without-http_redis2_module \

--with-http_iconv_module

make

make install

  3. 安装lua-resty-kafka

  因为我们需要通过nginx + lua脚本将数据转发到Kafka,所以在编写lua脚本时需要在lua模块中使用一些Kafka依赖项.

  #下载lua-resty-kafka:

cd /opt/software/

wget https://github.com/doujiang24/lua-resty-kafka/archive/master.zip

unzip master.zip -d /opt/module/

#拷贝kafka相关依赖脚本到openresty

cp -rf /opt/module/lua-resty-kafka-master/lib/resty/kafka/ /opt/openresty/lualib/resty/

  注意: 由于每个人都熟悉Kafka,因此这里不会介绍其安装.

  安装Openresty之后,目录结构如下:

  drwxr-xr-x 2 root root 4096 Mar 24 14:26 bin

drwxr-xr-x 6 root root 4096 Mar 24 14:26 luajit

drwxr-xr-x 7 root root 4096 Mar 24 14:29 lualib

drwxr-xr-x 12 root root 4096 Mar 24 14:40 nginx

  4. 配置文件

  编辑/opt/openresty/nginx/conf/nginx.conf

  user nginx; #Linux的用户

worker_processes auto;

worker_rlimit_nofile 100000;

#error_log logs/error.log;

#error_log logs/error.log notice;

#error_log logs/error.log info;

#pid logs/nginx.pid;

events {

worker_connections 102400;

multi_accept on;

use epoll;

}

http {

include mime.types;

default_type application/octet-stream;

log_format main '$remote_addr - $remote_user [$time_local] "$request" '

'$status $body_bytes_sent "$http_referer" '

'"$http_user_agent" "$http_x_forwarded_for"';

access_log /var/log/nginx/access.log main;

resolver 8.8.8.8;

#resolver 127.0.0.1 valid=3600s;

sendfile on;

keepalive_timeout 65;

underscores_in_headers on;

gzip on;

include /opt/openresty/nginx/conf/conf.d/common.conf; #common.conf这个文件名字可自定义

}

  编辑/opt/openresty/nginx/conf/conf.d/common.conf

  ##api

lua_package_path "/opt/openresty/lualib/resty/kafka/?.lua;;";

lua_package_cpath "/opt/openresty/lualib/?.so;;";

lua_shared_dict ngx_cache 128m; # cache

lua_shared_dict cache_lock 100k; # lock for cache

server {

listen 8887; #*敏*感*词*端口

server_name 192.168.3.215; #埋点日志的ip地址或域名,多个域名之间用空格分开

root html; #root指令用于指定虚拟主机的网页根目录,这个目录可以是相对路径,也可以是绝对路径。

lua_need_request_body on; #打开获取消息体的开关,以便能获取到消息体

access_log /var/log/nginx/message.access.log main;

error_log /var/log/nginx/message.error.log notice;

location = /lzp/message {

lua_code_cache on;

charset utf-8;

default_type 'application/json';

content_by_lua_file "/opt/openresty/nginx/lua/testMessage_kafka.lua";#引用的lua脚本

}

}

  编辑/opt/openresty/nginx/lua/testMessage_kafka.lua

  #创建目录mkdir /opt/openresty/nginx/lua/

vim /opt/openresty/nginx/lua/testMessage_kafka.lua<br />#编辑内存如下:

  -- require需要resty.kafka.producer的lua脚本,没有会报错

local producer = require("resty.kafka.producer")

-- kafka的集群信息,单机也是可以的

local broker_list = {

{host = "192.168.3.215", port = 9092},

}

-- 定义最终kafka接受到的数据是怎样的json格式

local log_json = {}

--增加read_body之后即可获取到消息体,默认情况下可能会是nil

log_json["body"] = ngx.req.read_body()

log_json["body_data"] = ngx.req.get_body_data()

-- 定义kafka同步生产者,也可设置为异步 async

-- -- 注意!!!当设置为异步时,在测试环境需要修改batch_num,默认是200条,若大不到200条kafka端接受不到消息

-- -- encode()将log_json日志转换为字符串

-- -- 发送日志消息,send配套之第一个参数topic:

-- -- 发送日志消息,send配套之第二个参数key,用于kafka路由控制:

-- -- key为nill(空)时,一段时间向同一partition写入数据

-- -- 指定key,按照key的hash写入到对应的partition

-- -- batch_num修改为1方便测试

local bp = producer:new(broker_list, { producer_type = "async",batch_num = 1 })

-- local bp = producer:new(broker_list)

local cjson = require("cjson.safe")

local sendMsg = cjson.encode(log_json)

local ok, err = bp:send("testMessage",nil, sendMsg)

if not ok then

ngx.log(ngx.ERR, 'kafka send err:', err)

elseif ok then

ngx.say("the message send successful")

else

ngx.say("未知错误")

end

  5. 开始服务操作:

  useradd nginx #创建用户

passwd nginx #设置密码

#设置openresty的所有者nginx

chown -R nginx:nginx /opt/openresty/

#启动服务

cd /opt/openresty/nginx/sbin

./nginx -c /opt/openresty/nginx/conf/nginx.conf

查看服务:

ps -aux | grep nginx

nginx 2351 0.0 0.1 231052 46444 ? S Mar30 0:33 nginx: worker process

nginx 2352 0.0 0.1 233396 48540 ? S Mar30 0:35 nginx: worker process

nginx 2353 0.0 0.1 233396 48536 ? S Mar30 0:33 nginx: worker process

nginx 2354 0.0 0.1 232224 47464 ? S Mar30 0:34 nginx: worker process

nginx 2355 0.0 0.1 231052 46404 ? S Mar30 0:33 nginx: worker process

nginx 2356 0.0 0.1 232224 47460 ? S Mar30 0:34 nginx: worker process

nginx 2357 0.0 0.1 231052 46404 ? S Mar30 0:34 nginx: worker process

nginx 2358 0.0 0.1 232224 47484 ? S Mar30 0:34 nginx: worker process

root 7009 0.0 0.0 185492 2516 ? Ss Mar24 0:00 nginx: master process ./nginx -c /opt/openresty/nginx/conf/nginx.conf

查看端口:

netstat -anput | grep 8887

tcp 0 0 0.0.0.0:8887 0.0.0.0:* LISTEN 2351/nginx: worke

  看到上述过程,就可以证明服务正常运行

  6. 使用邮递员发送发帖请求以进行简单测试,以查看Kafka是否可以接受数据

  

  7.kafka消费数据:

  kafka-console-consumer --bootstrap-server 192.168.3.215:9092 --topic testMessage --from-beginning

<p>如果消耗了数据,则说明配置成功. 如果未调整,则可以检查与/var/log/nginx/message.access.log和/var/log/nginx/message.error.log相关的错误日志以进行调整

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线