【技术干货】大数据日志分析统计

优采云 发布时间: 2022-05-14 10:30

  【技术干货】大数据日志分析统计

  作者:罗广,网易资深开发工程师,目前从事分布式视频处理系统的开发和维护。曾经参与斯凯手游平台服务端开发和海康威视交警平台组的平台架构开发及JVM调优工作,熟悉以SOA为基础的分布式系统架构设计和storm流式处理框架。在lucene和solr搜索方面有着丰富的经验,对hadoop和spark有浓厚的兴趣。

  

  一

  日志概述

  日志来源于第三方CDK厂商,主要内容为推流和拉流的日志,包括rtmp推拉流、flv拉流、hls拉流,flv和hls拉流的协议为http,rtmp推拉流的协议为rtmp,大概的格式如下:

  

  分析指标为在线人数、总时长、总流量(客户端到服务器的流量和服务器到客户端到流量之和)。需要用要到的数据也在上图中标红显示,初一乍看,distinct和sum就可以搞定,但实际上上述流日志在纵向方面显示较为一致,在横向方面上反差较大。本次统计的粒度要求精确到频道及分钟,频道在日志数据中不能直接获取。RTMP数据在一个会话中(具备同一个会话ID)大概有10来条,但是只有2条数据能间接得到;FLV数据一条数据代表一个会话,能较为轻松的获取到频道;HLS数据是客户端主动向服务器先获取一条Meta信息,比如有多少条data信息数据可以获取,然后不停的获取data数据。一个会话可能产生数以万计的日志记录。

  日志分析特点:数据量大、数据内容格式不一、延时性(只能获取隔天的数据)、统计的粒度较细(后续可能5分钟、1小时、日、月等)。一天的数据量4000万左右,完全不均衡分布在大概250个域名上,1个月左右数据量翻倍(取决新增用户及转化率);在单位分钟内进行统计,向后扩展兼容其他粒度。

  二

  分析历程

  阶段一:数据模型分析

  

  从CDN厂商收集日志,解析,存储到MySQL,发起分析统计任务。该阶段为需求调研,到建立数据模型的阶段,并不涉及到数据量大小。

  阶段二:数据清洗、MongoDB存储及MapReduce

  

  在模型分析阶段,随着数据量的增大和对分析指标的聚合操作,MySQL存储在检索、聚合、及后续的再度聚合,已经满足不了系统需求,更换存储已经势在必行。MongoDB因其文档方式存储、API简易(JSON)、高性能、集群Sharding方便、内置MapReduce等功能,成为本系统的替代数据库,网易在MongoDB方面也具备较为资深的实践经验。

  通过对存储数据的反复分析对比发现,Uri、Url相关的数据未产生任何价值,极大的增加了存储容量,故此对原始数据进行清洗,只保留客户端IP、日期、时间、时长、流量等对结果产生影响的字段,并且记录不符合原始日志分析规则的数据,以便后续进行数据订正。根据不同的协议规则(Rtmp除外),设置其频道ID,方便后续在频道的基础上进行分析。Rtmp数据保存在单独的表中,根据同一会话,同一频道,对rtmp数据进行订正。此时,日志数据都已经被矫正到同一平面,编写相关维度的map、reduce、finalize函数进行MapReduce计算。作业调度器(schduler)对任务执行单元(worker)进行协调管理,根据任务链判断是否派发下一任务,及任务状态的变更。

  三

  总体架构

  即紧接上文的阶段三,在阶段二的基础上,针对系统的高可用性、分布式、容错等维度考虑,采用无中心化方案设计,依赖于RabbitMQ进行水平扩展,增加了对外接口层及任务的手动发布处理。

  

  1

  collector

  日志数据采集器(定时),从CDN方获取各域名对应的数据包下载地址,把具备同样后缀的域名归类为一个整体,建立一个任务(Job)。根据下载地址下载日志包(原始数据包经过gzip压缩),该过程中为防止并发访问过高,遭受CDN方请求拒绝,进行熔断处理,即在指定的时间内,抑制访问次数。

  获得数据包后,调用linux系统命令gzip命令进行解压;顺序读取日志文件,若单个日志文件较大,则进行分包处理,带上包头号;对该批次的数据进行gzip压缩,扔到Rabbitmq中。以上建立Job,一个Job最多对应4个域名,分别建立sub job。每次的下载,解压缩,数据发送,分别记录状态(起始时间、耗时、失败/成功),方便后续的失败任务检测和性能分析。

  2

  worker

  

  任务执行单元。从rabbitmq处订阅获取日志包数据,如果为HTTP协议(上文中的hls、flv),则先对数据进行清洗,再根据频道维度进行java方式的map reduce计算,ip集合数据结构使用hashset进行ip去重,最后把结果存于mongodb中;若是RTMP协议则对数据进行更新(设置频道),运用mongodb对数据进行聚合,产生的数据格式与HTTP日志相同,方便后续的频道分天聚合运算。在聚合运算中涉及到按分钟粒度进行分析,这里运用了一个小技巧,把日期date和time时间合并增加一个字段ymdhm,该字段表示分,该字段数据极度离散,大大提高了map reduce的运行效率。

  MongoDB中的MapReduce相当于关系数据库中的group by。使用MapReduce要实现两个函数Map和Reduce函数。Map函数调用emit(key,value),遍历Collection中所有的记录,将key与value传递给Reduce函数进行处理。本文中使用javascript代码编写map和reduce函数,主要处理了ip集合去重和流量的累加。

  3

  scheduler

  worker执行调度器,负责对各个子任务的状态记录,判断子任务是否作业完成,如果执行完成,从作业链中获取下一子任务的队列,扔到rabbitmq中供worker消费。此外还包括失败任务检测,对当天的任务列表进行轮询判断任务是否执行完成,若是有任务执行失败,则从分析表中攫取数据扔到临时集合中,重新进行聚合分析,最后将结果归档。

  4

  mongodb

  MongoDB的版本为3.0.1,对应的java客户端为mongo-java-driver3.0.1。高可用性方面使用官方推荐的主从复制和高可用方案Replicat Set,Replicat Set具有自动切换功能,当Primary挂掉之后,可以自动由Replica Set中的某一个Secondary来切换到Primary,以实现高可用的目的。比如配置了一个由3台服务器组成的mongo集群,1个primary和2个secondry,客户端在连接的时候需要把全部的IP都写上,并且设置读操作从副本集读取,达到主从分离,减少主节点的访问压力。

  

  5

  rabbitmq

  为了确保消息不会丢失,RabbitMQ支持消息应答。消费者发送一个消息应答,告诉RabbitMQ这个消息已经接收并且处理完毕,RabbitMQ才可以删除它。如果一个消费者宕机没有发送应答,RabbitMQ会理解为这个消息没有处理完全,然后交给另一个消费者去重新处理。这样,依赖与RabbitMQ的所有节点都不会丢失消息,保证了整个分析过程的完整性。如果没有任何消息超时限制,那么只有当消费者宕机时,RabbitMQ才会重新投递,即便处理一条消息会花费很长的时间。

  Mirror queue是RabbitMQ高可用的一种,queue中的消息每个节点都会存在一份copy,这个在单个节点失效的情况下,整个集群仍旧可以提供服务。但是由于数据需要在多个节点复制,在增加可用性的同时,系统的吞吐量会有所下降。在实现机制上,mirror queue内部实现了一套选举算法,有一个master和多个slave,queue中的消息以master为主。对于publish,可以选择任意一个节点进行连接,rabbitmq内部若该节点不是master,则转发给master,master向其他slave节点发送该消息,后进行消息本地化处理,并组播复制消息到其他节点存储;对于consumer,可以选择任意一个节点进行连接,消费的请求会转发给master,为保证消息的可靠性,consumer需要进行ack确认,master收到ack后,才会删除消息,ack消息会同步(默认异步)到其他各个节点,进行slave节点删除消息。若master节点失效,则mirror queue会自动选举出一个节点(slave中消息队列最长者)作为master,作为消息消费的基准参考;若slave节点失效,mirror queue集群中其他节点的状态无需改变。

  Mirror queue使用较为简单,先把当前节点加入之前已经启动的RabbitMQ节点,再设置HA的策略,如下图为镜像节点的启动脚本:

  

  镜像节点设置成功之后,可以看到整个集群状态。

  

  四

  运行状况

  部署到线上一共使用了7台服务器,其中2台云主机,每一台云主机都是4核8G,部署schduler、collector和rabbitmq;3台云主机,每一台也是4核8G,部署mongodb集群Replica Set;2台云主机,每台8核32G,部署16个worker实例,4000万的数据,20分钟完成分析。

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线