海量日志实时收集系统架构设计与go语言实现
优采云 发布时间: 2022-06-29 14:18海量日志实时收集系统架构设计与go语言实现
日志收集系统应该说是到达一定规模的公司的标配了,一个能满足业务需求、运维成本低、稳定的日志收集系统对于运维的同学和日志使用方的同学都是非常nice的。然而这时理想中的日志收集系统,现实往往不是这样的...本篇的主要内容是:首先吐槽一下公司以前的日志收集和上传;介绍新的实时日志收集系统架构;用go语言实现。澄清一下,并不是用go语言实现全部,比如用到卡夫卡肯定不能重写一个kafka吧……
logagent所有代码已上传到github:。
1 老系统吐槽
我司以前的日志收集系统概述如下:
日志收集的频率有每小时收集一次、每5分钟收集一次、实时收集三种。大部分情况是每小时收集上传一次。
(1) 每5分钟上传一次和每小时上传一次的情况是这样的:
每台机器上都需要部署一个日志收集agengt,部署一个日志上传agent,每台机器都需要挂载hadoop集群的客户端。
日志收集agent负责切割日志,上传agent整点的时候启动利用hadoop客户端,将切割好的前1小时或前5分钟日志打包上传到hadoop集群。
(2) 实时传输的情况是这样的
每台机器上部署另一个agent,该agent实时收集日志传输到kafka。
看到这里你可能都看不下去了,这么复杂臃肿费劲的日志收集系统是怎么设计出来的?额...先辩解一下,这套系统有4年以上的历史了,当时的解决方案确实有限。辩解完之后还是得吐槽一下系统存在的问题:
(1)首先部署在每台机器上的agent没有做统一的配置入口,需要根据不同业务到不同机器上配置,运维成本太大;十台机器也就罢了,问题是现在有几万台机器,几千个服务。
(2) 最无语的是针对不同的hadoop集群,需要挂载多个hadoop客户端,也就是存在一台机器上部署几个hadoop客户端的情况。运维成本太大……
(3) 没做限流,整点的时候传输压力变大。某些机器有很多日志,一到整点压力就上来了。无图无真相,我们来看下:
CPU:看绿色的线条
负载:
网卡:
这组机器比较典型(这就是前文说的有多个hadoop客户端的情况),截图是凌晨至上午的时间段,还未到真正的高峰期。不过总体上可看出整点的压力是明显比非正点高很多的,已经到了不能忍的地步。
(4) 省略n条吐槽……
2 新系统架构
首先日志收集大可不必在客户端分为1小时、5分钟、实时这几种频率,只需要实时一种就能满足前面三种需求。
其次可以砍掉在机器上挂载hadoop客户端,放在其他地方做日志上传hadoop流程。
第三,做统一的配置管理系统,提供友好的web界面,用户只需要在web界面上配置一组service需要收集的日志,便可通知该组service下的所有机器上的日志收集agent。
第四,流量削峰。应该说实时收集可以避免旧系统整点负载过大情况,但依旧应该做限流功能,防止高峰期agent过度消耗资源影响业务。
第五,日志补传...
实际上公司有的部门在用flume做日志收集,但觉得太重。经过一段时间调研和结合自身业务特点,利用开源软件在适当做些开发会比较好。go应该擅长做这个事,而且方便运维。好了,附上架构图。
将用go实现logagent,Web,transfer这个三个部分。
logagent主要负责按照配置实时收集日志发送到kafka,此外还需watch etcd中的配置,如改变,需要热更新。
web部分主要用于更新etcd中的配置,etcd已提供接口,我们只需要集成到资源管理系统或CMDB系统的管理界面中去即可。
transfer 做的是消费kafka队列中的日志,发送到es/hadoop/storm中去。
3 实现logagent
3.1 配置设计
首先思考下logagent的配置文件内容:
<p style="padding: 0.5em;max-width: 100%;font-size: 14px;letter-spacing: 0px;font-family: Consolas, Inconsolata, Courier, monospace;border-radius: 0px;color: rgb(169, 183, 198);background-image: none;background-attachment: scroll;background-color: rgb(40, 43, 46);background-position: 0% 0%;background-repeat: repeat;margin-left: 8px;margin-right: 8px;line-height: 1.75em;box-sizing: border-box !important;overflow-wrap: normal !important;display: block !important;word-break: normal !important;overflow: auto !important;">etcd_addr = 10.134.123.183:2379 # etcd 地址<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" />etcd_timeout = 5 # 连接etcd超时时间<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" />etcd_watch_key = /logagent/%s/logconfig # etcd key 格式<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /><br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" />kafka_addr = 10.134.123.183:9092 # 卡夫卡地址<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /><br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" />thread_num = 4 # 线程数<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" />log = ./log/logagent.log # agent的日志文件<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" />level = debug # 日志级别<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /><br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /># *敏*感*词*哪些日志,日志限流大小,发送到卡夫卡的哪个topic 这个部分可以放到etcd中去。<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /></p>
如上所说,*敏*感*词*哪些日志,日志限流大小,发送到卡夫卡的哪个topic 这个部分可以放到etcd中去。etcd中存储的value格式设计如下:
<p style="padding: 0.5em;max-width: 100%;font-size: 14px;letter-spacing: 0px;font-family: Consolas, Inconsolata, Courier, monospace;border-radius: 0px;color: rgb(169, 183, 198);background-image: none;background-attachment: scroll;background-color: rgb(40, 43, 46);background-position: 0% 0%;background-repeat: repeat;margin-left: 8px;margin-right: 8px;line-height: 1.75em;box-sizing: border-box !important;overflow-wrap: normal !important;display: block !important;word-break: normal !important;overflow: auto !important;">`[<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> {<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> "service":"test_service", <br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> "log_path": "/search/nginx/logs/ping-android.shouji.sogou.com_access_log", "topic": "nginx_log",<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> "send_rate": 1000<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> },<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> {<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> "service":"srv.android.shouji.sogou.com",<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> "log_path": "/search/nginx/logs/srv.android.shouji.sogou.com_access_log","topic": "nginx_log",<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> "send_rate": 2000<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> }<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" />]`<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /><br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> - "service":"服务名称", <br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> - "log_path": "应该*敏*感*词*的日志文件", <br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> - "topic": "kfk topic",<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> - "send_rate": "日志条数限制" <br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /></p>
其实可以将更多的配置放入etcd中,根据自身业务情况可自行定义,本次就做如此设计,接下来可以写解析配置文件的代码了。
config.go
<p style="padding: 0.5em;max-width: 100%;font-size: 14px;letter-spacing: 0px;font-family: Consolas, Inconsolata, Courier, monospace;border-radius: 0px;color: rgb(169, 183, 198);background-image: none;background-attachment: scroll;background-color: rgb(40, 43, 46);background-position: 0% 0%;background-repeat: repeat;margin-left: 8px;margin-right: 8px;line-height: 1.75em;box-sizing: border-box !important;overflow-wrap: normal !important;display: block !important;word-break: normal !important;overflow: auto !important;">package main<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /><br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" />import (<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> "fmt"<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> "github.com/astaxie/beego/config"<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" />)<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /><br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" />type AppConfig struct {<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> EtcdAddr string<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> EtcdTimeOut int<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> EtcdWatchKey string<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /><br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> KafkaAddr string<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /><br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> ThreadNum int<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> LogFile string<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> LogLevel string<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" />}<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /><br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" />var appConf = &AppConfig{}<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /><br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" />func initConfig(file string) (err error) {<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> conf, err := config.NewConfig("ini", file)<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> if err != nil {<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> fmt.Println("new config failed, err:", err)<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> return<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> }<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> appConf.EtcdAddr = conf.String("etcd_addr")<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> appConf.EtcdTimeOut = conf.DefaultInt("etcd_timeout", 5)<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> appConf.EtcdWatchKey = conf.String("etcd_watch_key")<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /><br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> appConf.KafkaAddr = conf.String("kafka_addr")<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /><br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> appConf.ThreadNum = conf.DefaultInt("thread_num", 4)<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> appConf.LogFile = conf.String("log")<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> appConf.LogLevel = conf.String("level")<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> return<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" />} <br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /></p>
代码主要定义了一个AppConf结构体,然后读取配置文件,存放到结构体中。
此外,还有部分配置在etcd中,需要做两件事,第一次启动程序时将配置从etcd拉取下来;然后启动一个协程去watch etcd中的配置是否更改,如果更改需要拉取并更新到内存中。代码如下:
etcd.go:
<p><p style="padding: 0.5em;max-width: 100%;font-size: 14px;letter-spacing: 0px;font-family: Consolas, Inconsolata, Courier, monospace;border-radius: 0px;color: rgb(169, 183, 198);background-image: none;background-attachment: scroll;background-color: rgb(40, 43, 46);background-position: 0% 0%;background-repeat: repeat;margin-left: 8px;margin-right: 8px;line-height: 1.75em;box-sizing: border-box !important;overflow-wrap: normal !important;display: block !important;word-break: normal !important;overflow: auto !important;">package main<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /><br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" />import (<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> "context"<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> "fmt"<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> "sync"<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> "time"<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /><br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> "github.com/astaxie/beego/logs"<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> client "github.com/coreos/etcd/clientv3"<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" />)<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /><br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" />var (<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> confChan = make(chan string, 10)<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> cli *client.Client<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> waitGroup sync.WaitGroup<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" />)<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /><br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" />func initEtcd(addr []string, keyFormat string, timeout time.Duration) (err error) {<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> // init a global var cli and can not close<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> cli, err = client.New(client.Config{<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> Endpoints: addr,<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> DialTimeout: timeout,<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> })<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> if err != nil {<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> fmt.Println("connect etcd error:", err)<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> return<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> }<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> logs.Debug("init etcd success")<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> // defer cli.Close() //can not close<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /><br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> var etcdKeys []string<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> ips, err := getLocalIP()<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> if err != nil {<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> fmt.Println("get local ip error:", err)<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> return<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> }<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> for _, ip := range ips {<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> key := fmt.Sprintf(keyFormat, ip)<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> etcdKeys = append(etcdKeys, key)<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> }<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /><br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> // first, pull conf from etcd<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> for _, key := range etcdKeys {<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> ctx, cancel := context.WithTimeout(context.Background(), time.Second)<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> resp, err := cli.Get(ctx, key)<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> cancel()<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> if err != nil {<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> fmt.Println("get etcd key failed, error:", err)<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> continue<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> }<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /><br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> for _, ev := range resp.Kvs {<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> // return result is not string<br style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;" /> confChan