自动采集推送(WindowServer2008R2日志Enterprise部署采集目标环境:Window部署 )
优采云 发布时间: 2021-12-26 09:03自动采集推送(WindowServer2008R2日志Enterprise部署采集目标环境:Window部署
)
写在前面
ELK 三剑客(ElasticSearch、Logstash、Kibana)基本可以满足一些日志采集
、信息处理、统计分析、可视化报表等日志分析任务,但是对我们来说……太重了,技术栈也不是全部道路。我们的场景是在各个业务部门的服务器上采集各种业务系统,所以尽量不影响服务器的性能,以侵入性最小的方式采集,不做其他冗余操作。因此,在前端日志采集方面,与Logstash、Flume等其他采集工具进行对比后,决定使用轻量级的**Filebeat**作为日志采集工具。Filebeat 由 Go 开发,运行不需要额外的部署环境,更依赖 Flume。
采集链接如下: Filebeat日志采集、处理、转换后推送到kafka,使用clickhouse的kafka引擎进行消费和存储。因此,我暂时将其称为肯德基组合。
Filebeat 部署
采集目标环境:
系统:Window Server 2008 R2 Enterprise
日志类别:IIS日志、业务系统日志
日志路径:D:/IIS//logs/.txt, D:/IIS//logs/.txt, D:/IIS//logs/*.txt
文件节拍:7.12.1 ()
由于合集是windows操作系统,建议下载Filebeat压缩包,作为windows服务运行。安装包msi不方便调试,需要频繁的卸载安装操作。下载后解压,在配置文件filebeat.yml中进行配置。
业务系统日志格式示例:
2021-04-06 11:21:17,940 [39680] DEBUG Zc - time:0ms update XXX set ModifyTime=GETDATE(), [State] = 190, [FuZeRen] = '张三' where [ID] = '90aa9a69-7a33-420e-808c-624693c65aef' and [CompanyID] = '9e52867e-2035-4148-b09e-55a90b3020d5'
2021-04-06 11:21:21,612 [22128] DEBUG Service ModelBase - time:0ms (/api/XXX/XXX/XXX?InfoID=6d43b831-6169-46d2-9518-f7c9ed6fe39c&ValidateStatus=1)更新材料状态
2021-04-06 11:21:21,612 [22128] DEBUG Zc - time:0ms select ID from XXX where InfoRelationID='6d43b831-6169-46d2-9518-f7c9ed6fe39c'
2021-04-06 11:21:21,612 [22128] DEBUG Zc - time:0ms insert into XXXX(ValidateDate ,[ID],[ValidateState],[ValidateUser],[ValidateUserID],[ValidateUnit],[ValidateUnitID],[ValidateUnitType],[InfoRelationID]) values( GETDATE(),'c77cf4ab-71b5-46c7-b91b-2829d73aa700',1,'XXXX','0387f889-e1d4-48aa-b275-2241da1d2c9e','XXXXX*敏*感*词*','2f2a94c8-c23c-4e8a-98b3-c32a9b0487f7',0,'6d43b831-6119-46d2-9518-f7c9ed6fe39c')
2021-04-06 03:25:22,237 [46840] ERROR ASP.global_asax - time:0ms 客户端信息:Ip:116.238.55.21, 173.131.245.61 浏览器:Chrome 版本:68 操作系统:WinNT服务端错误信息:
页面:http://www.A.com:803/dbapp_53475dbapp_e524534.php
错误源:System.Web.Mvc
堆栈跟踪: at System.Web.Mvc.DefaultControllerFactory.GetControllerInstance(RequestContext requestContext, Type controllerType)
at System.Web.Mvc.DefaultControllerFactory.CreateController(RequestContext requestContext, String controllerName)
at System.Web.Mvc.MvcHandler.ProcessRequestInit(HttpContextBase httpContext, IController& controller, IControllerFactory& factory)
at System.Web.Mvc.MvcHandler.BeginProcessRequest(HttpContextBase httpContext, AsyncCallback callback, Object state)
at System.Web.HttpApplication.CallHandlerExecutionStep.System.Web.HttpApplication.IExecutionStep.Execute()
at System.Web.HttpApplication.ExecuteStep(IExecutionStep step, Boolean& completedSynchronously)
FileBeat 配置:
<p>max_procs: 2
queue:
mem:
events: 2048
flush.min_events: 2048
# ============================== Filebeat inputs ===============================
filebeat.inputs:
# 管理系统
- type: log
enabled: true
encoding: GB2312
paths:
- D:/IIS/www.A.com/logs/*.txt
multiline.pattern: '^\d{4}-\d{1,2}-\d{1,2}'
multiline.negate: true
multiline.match: after
fields:
topic: 'dlbZcZGBSyslogs'
fields_under_root: true
# 单位系统
- type: log
enabled: true
encoding: GB2312
paths:
- D:/IIS/www.B.com/logs/*.txt
### Multiline options
multiline.pattern: '^\d{4}-\d{1,2}-\d{1,2}'
multiline.negate: true
multiline.match: after
fields:
topic: 'dlbZcDWSyslogs'
fields_under_root: true
# 个人系统
- type: log
enabled: true
encoding: GB2312
paths:
- D:/IIS/www.C.com/logs/*.txt
### Multiline options
multiline.pattern: '^\d{4}-\d{1,2}-\d{1,2}'
multiline.negate: true
multiline.match: after
fields:
topic: 'dlbZcMySyslogs'
fields_under_root: true
# 调试输出
#output.console:
# pretty: true
#output.file:
# path: "D:/bigData"
# filename: filebeat.log
# -------------------------------- Kafka Output --------------------------------
output.kafka:
# Boolean flag to enable or disable the output module.
enabled: true
hosts: ["192.168.1.10:9092"]
# The Kafka topic used for produced events. The setting can be a format string
# using any event field. To set the topic from document type use `%{[type]}`.
topic: '%{[topic]}'
# Authentication details. Password is required if username is set.
#username: ''
#password: ''
# The number of concurrent load-balanced Kafka output workers.
worker: 2
max_message_bytes: 10000000
# ================================= Processors =================================
processors:
- add_host_metadata:
when.not.contains.tags: forwarded
- add_cloud_metadata: ~
- add_docker_metadata: ~
- add_kubernetes_metadata: ~
- script:
lang: javascript
id: my_filter
tag: enable
source: >
function process(event) {
var str = event.Get("message");
var sp = str.split(" ");
var log_datetime = sp.slice(0,2).join(" ");
var regEx = /^\d{4}-\d{2}-\d{2}$/;
var prefix_date = log_datetime.substring(0, 10);
if(prefix_date.match(regEx) != null)
{
event.Put("server","221");
log_datetime = log_datetime.replace(",",".");
log_datetime = log_datetime.replace("'","");
regEx = /^\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}.\d{3}$/;
if(log_datetime.match(regEx) != null)
{
event.Put("log_datetime",log_datetime);
event.Put("log_index",sp.slice(2,3).join(" ").replace("[","").replace("]",""));
event.Put("log_level",sp.slice(3,4).join(" "));
if(str.match(/(?