2.框架实时采集处理方案,Streaming处理

优采云 发布时间: 2021-08-15 21:23

  2.框架实时采集处理方案,Streaming处理

  1.Background

  在实际生产中,我们经常会遇到像kafka这样的流式数据,而原创数据并不是我们想要的,需要通过一定的逻辑处理转换成我们需要的数据。针对这种需求,本文采用NiFi+Spark Streaming技术方案设计了一种通用的实时采集处理各种外部数据源的方法。

  2.Framework

  实时采集处理方案由两部分组成:数据采集和流处理。数据采集来自NiFi中的任务流采集外部数据源,数据写入指定端口。在流式处理中,Spark Streaming从NiFi中的指定端口读取数据,进行相关的数据转换,然后写入Kafka。整个流式采集处理框架如下:

  流媒体框架

  3.数据采集

  NiFi 是一个易于使用、功能强大且可靠的数据提取、数据处理和分发系统。 NiFi 专为数据流而设计。支持指标图的高度可配置的数据路由、转换和系统中介逻辑,支持从多种数据源动态拉取数据。它由 NSA 开源,是顶级 Apache 项目之一。详情请见:。

  在NiFi中,会根据不同的数据源创建相应的模板,然后通过模板部署任务流。任务流将采集data 源数据,然后写入指定端口。对于不同的数据源,数据采集的方法是不同的。比如数据库类型的数据源需要对采集使用记录水位和增量拉取的方法。为了方便后续的数据转换,这里将数据统一转换为csv格式。比如mongodb的json数据会根据字段展开到第一层,对象值会被序列化成字符串。

  最简单的任务流程如下:

  任务流程

  GetFile本身读取的文件是带有标题的csv格式,如下图:

  id,name,age

1000,name1,20

1001,name2,21

1002,name3,22

  UpdateAttribute 将设置目标字段名称、类型和转换规则,如下所示:

  tid|string|.select(df("*"), df("id").cast("string").as("tid"))

tname|string|.select(df("*"), df("name").cast("string").as("tname"))

tage|string|.select(df("*"), df("age").cast("int").as("tage"))

  4.流处理

  Spark Streaming 是一个基于 Spark 的实时计算框架。它是 Spark Core API 的扩展。可实现流式数据的实时处理,具有良好的可扩展性、高吞吐量和容错性。

  Spark Streaming 连接 NiFi 数据并执行流式处理步骤:

  1.初始化上下文

  final SparkConf sparkConf = new SparkConf().setAppName(appName).setMaster(sparkMaster);

JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000L));

  2.连接nifi中指定的输出端口

  SiteToSiteClientConfig config = new SiteToSiteClient.Builder().url(nifiUrl).portName(nifiPort).buildConfig();

final JavaReceiverInputDStream packetStream = ssc.receiverStream(new NiFiReceiver(config, StorageLevel.MEMORY_AND_DISK()));

  3.读取端口上的流数据和属性

  JavaDStream ds = packetStream.map(new Function() {

@Override

public NifiFeed call(NiFiDataPacket dataPacket) throws Exception {

return new NifiFeed(new String(dataPacket.getContent()), dataPacket.getAttributes());

}

});

  NifiFeed 是一种用于存储数据和属性的自定义数据结构。

  4.数据转换

  ds.foreachRDD(new VoidFunction() {

@Override

public void call(JavaRDD rdd) throws Exception {

rdd.foreachPartition(new VoidFunction() {

@Override

public void call(Iterator iterator) throws Exception {

try {

while (iterator.hasNext()) {

//TODO:执行数据转换

}

} catch (Exception e) {

//TODO:异常处理

}

}

});

}

});

  数据转换需要动态执行属性中的代码。这里使用jexl开源库动态执行java代码。详情请见:。

  5.启动服务

  ssc.start();

ssc.awaitTermination();

  5.Summary

  本方案使用NiFi处理采集数据,然后通过Spark Streaming流引擎,将采集数据按照规定进行转换,生成新数据发送到Kafka系统进行后续服务或处理,如Kylin Streaming模型构建。

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线