汇总:日志系统之 Flume 采集加 morphline 解析

优采云 发布时间: 2022-11-09 18:58

  汇总:日志系统之 Flume 采集加 morphline 解析

  概述

  这段时间花费了一部分时间来处理消息总线和日志之间的接口。以下是日志采集和日志解析中遇到的一些问题和处理解决方案。

  Log 采集-flumelogstash VS flume

  让我们从选择日志采集器开始。我们选择使用 ElasticSearch 作为日志存储和搜索引擎。基于 ELK 的技术栈(ElasticSearch、Logstash、Kibana)在日志系统方向上如此流行,所以将 Logstash 纳入调查对象是顺理成章的,Logstash 是几个主流日志采集器中的后起之秀,被 Elastic 收购后更加成熟,社区也更加活跃。

  Logstash 的设计:输入、过滤、输出。水槽设计源,通道,水槽,当然水槽也有一个*敏*感*词*。具体设计没有多废话,基本就是分拆、解耦、流水线的思路。同时,它们都支持分布式扩展,例如 Logstash 既可以用作*敏*感*词*也可以用作索引器,而 flume 可以与多个代理形成分布式事件流。

  我与水槽的接触早于Logstash。当我最近研究Logstash时,它强大的过滤器给我留下了深刻的印象,尤其是grok。之前 Flume 阵营最强调的是,它对各种开源组件的源、下沉、通道扩展支持非常强大。

  Logstash 是一个很好的,

  但是它对JRuby(一种类似于Ruby语法的JVM平台语言)的实现使其定制不太灵活,这是我放弃Logstash的主要原因。出于生态原因,我确实需要 Java 技术堆栈提供的可扩展性(这里的主要目标是将消息总线用作日志采集的缓存队列),这就是 flume 的优势。但是,flume 很少提到对日志的解析支持,即使有支持正则表达式的*敏*感*词*,也只是非常有限的搜索和替换。经过一番研究,发现Flume实际上提供了这样的*敏*感*词*,即*敏*感*词*。它可以完成日志的解析。

  对数解析 - *敏*感*词*吗啉简介

  Morphline是由Flume的母公司cloudera开源的ETL框架。它用于构建和更改基于 Hadoop 的 ETL(提取、传输、加载)流处理程序。(值得一提的是,flume是由cloudera捐赠给Apache的,后来被改造为flume-ng。Morphline允许你构建ETL作业,无需编码,也不需要广泛的MapReduce技能。

  Morphline是一个丰富的配置文件,可以像定义一个转换链一样简单,用于使用来自任何数据源的任何类型的数据,处理数据并将结果加载到Hadoop组件中。它用简单的配置步骤取代了Java编程。

  Morphline是一个可以嵌入任何Java程序中的类库。Morphline是一个可以存储转换命令的内存容器。这些命令作为插件加载到 Morphline 中,以执行加载、解析、转换或处理单个记录等任务。记录是内存中具有名称-值对的数据结构。Morphline是可扩展的,集成了现有的功能和第三方系统。

  此文章不是吗啉软文,因此请参阅 cloudera 的官方 CDK 文档以获取更多信息。

  

  下面是一张子图,生动地展示了Morphline的粗略处理模型:

  *敏*感*词*

  这是另一张图表,显示了Morphline在大数据生态系统中的架构模型:

  形态线-架构

  后来,morphline主要由Kite领导,Kite是一个基于Hadoop构建的抽象数据模型层的API接口。这是关于形态线的风筝SDK文档。

  强大的常规提取器 – grok

  事实上,我寻找Morphline是为了找到grok,或者找到一种方法来提供grok。Grok 使用常规解析从非结构化日志数据中提取结构化字段。因为 Logstash 已经提供了大量经过验证的 grok 规则,这是 Logstash 的优势所在,如果可以直接在 flume 中使用这些规则,就可以直接集成 Logstash 的能力(其实只要有规律的文本,就可以提取正则,但已经有成熟的东西不需要花很多精力自己去验证了)。在这里,我不会详细介绍。

  服务器使用*敏*感*词*线

  Flume在代理中使用*敏*感*词*。客户端ETL日志的优势可以利用客户端PC分散的算力,省去服务端解析的麻烦,但代理数量非常多,分散在各个生产服务器上,日志格式也多种多样。也就是说,在代理中做太多会让我们在应对变化时变得不灵活。因此,我们只在客户端采集,不解析。在服务器端,morphline 用于解析日志。这相当于启动解析服务,从日志采集队列中获取日志,使用 morphline 进行解析转换,然后将解析后的结构化日志发送到索引队列,直到索引服务将它们存储在 ElasticSearch 中。整个过程大致如下:

  带队列的管道

  这种基于队列的异步管道本质上与 Storm 等流处理器的同步管道相同,后者使用廉价的 PC 来传播计算。

  

  程序示例

  为了在程序中使用 Morphline,您首先需要添加对 Morphline 的 Maven 依赖:

   org.kitesdk kite-morphlines-all ${kite.version} org.apache.hadoop hadoop-common pom true

  版本为 1.0.0。请注意,这里面有一些依赖关系,需要从Twitter的存储库下载,所以你知道:请自带梯子。

  示例程序:

  private void process(Message message) { msgBuffer.add(message); if (msgBuffer.size() < MESSAGE_BUFFER_SIZE) return; try { Notifications.notifyBeginTransaction(morphline); for (Message msg : msgBuffer) { Event logEvent = GSON.fromJson(new String(msg.getContent()), Event.class); String originalLog = new String(logEvent.getBody()); logEvent.getHeaders().put(MORPHLINE_GROK_FIELD_NAME, originalLog); logEvent.setBody(null); Record record = new Record(); for (Map.Entry entry : logEvent.getHeaders().entrySet()) { record.put(entry.getKey(), entry.getValue()); } byte[] bytes = logEvent.getBody(); if (bytes != null && bytes.length > 0) { logger.info("original : " + new String(bytes)); record.put(Fields.ATTACHMENT_BODY, bytes); } Notifications.notifyStartSession(morphline); boolean success = morphline.process(record); if (!success) { logger.error("failed to process record! from : " + morphlineFileAndId); logger.error("record body : " + new String(logEvent.getBody())); } } //do some ETL jobs List records = this.extract(); List events = this.transfer(records); this.load(events); } catch (JsonSyntaxException e) { logger.error(e); Notifications.notifyRollbackTransaction(morphline); } finally { //clear buffer and extractor this.extracter.getRecords().clear(); this.msgBuffer.clear(); Notifications.notifyCommitTransaction(morphline); Notifications.notifyShutdown(morphline); } }

  这里只是一个部分代码,显示了*敏*感*词*的一般用法。主要逻辑在配置文件中:

  morphlines : [ { id : morphline1 importCommands : ["org.kitesdk.**"] commands : [ { grok { dictionaryString : """ """ expressions : { original : """""" } extract : true numRequiredMatches : atLeastOnce # default is atLeastOnce findSubstrings : false addEmptyStrings : false } } { logInfo { format : "output record: {}", args : ["@{}"] } } ] } ]

  如上所述,我们的主要内容是使用 grok 来解析日志,logstash 已经提供了许多开箱即用的 grok 模式供您使用,但对于自定义日志格式类型,您通常需要自己解析它。这是其一。

  回顾

  事实上,业内使用Flume的都是大型互联网公司,比如美团。他们通常使用flume+kafka+storm+hadoop生态系统。使用 Storm Stream 进行实时解析,使用 MapReduce 进行离线分析是一个高度定制的用例,几乎不需要 Flume 的代理在客户端上进行解析,因此很少提及 Flume 的 Morphline。

  但是*敏*感*词*

  依然是少有的文本ETL工具,无论是在采集时直接使用morphline做ETL,还是在服务器端做,flume+morphline带来的灵活性都不逊色于Logstash。

  干货内容:伪原创内容采集方法 伪原创内容怎么采集

  伪原创内容采集方法,伪原创内容采集?有的朋友会为采集使用伪原创工具,但是现在市面上的伪原创工具不全,有的好,有的坏,有的假。大部分伪原创软件伪原创实力越强,其内容的可读性就越差,一句话可能根本不流畅,更别说理解意思了,只好自己修改. 以下是详细内容,有需要的一起来看看吧!

  

  采集、伪原创被很多人羞辱,不管怎样,存在都是有道理的。万物皆有阴阳,既然有原创,那一定有错原创,关键是如何把握这个度。

  伪原创工具

  

  慢慢地,原创搜索引擎对内容的识别度越来越强,于是各种伪原创工具和软件应运而生,大部分伪原创工具直接被打乱了伪原创文章数字序列是用来迷惑搜索引擎的。后来,短语被打乱,然后短语被拆分并替换为同义词。伪原创 越强,内容的可读性越差。可能完全无法理解,更别说理解其中的意思了。

  这对用户来说是浪费时间,而对于搜索引擎来说,它为用户提供了质量非常低的推荐,大大降低了用户体验,所以这种方法今天不再使用,因为搜索引擎对内容的语义和可读性得到了很大的改善。

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线