文章实时采集(java最近项目中须要实时采集业务数据库CDC数据(这里数据) )

优采云 发布时间: 2022-04-01 00:27

  文章实时采集(java最近项目中须要实时采集业务数据库CDC数据(这里数据)

)

  业务描述:java

  最近项目中需要实时采集业务数据库CDC数据(这里的数据已经序列化成avro格式数据),这里我们使用Flume+Hdfs做技术架构。数据库

  了解 Flume 的朋友都知道,它的组件分为三个部分:source、channel、sink。具体原理部分在此不再赘述。可以查看官网或者flume技术博客。这里就讲讲实现过程和加坑之路。阿帕奇

  来自业务的数据存储在kafka中,所以source端使用kafkaSource,即kafkaConsumer,sink使用hdfsSink,channel使用file type。json

  hdfsSink 编写的文件格式有两种:文本文件和序列文件。无论选择哪种文件格式,登陆hdfs后都不能直接使用。前面说过,业务数据已经序列化成avro格式,但是要求是hdfs上的数据必须是直接可用的。建筑学

  考虑了几种解决方案:maven

  1、使用hive建立一个外部表来关联hdfs上的数据。这里有一个问题。虽然hive支持读取seq文件格式,但是seq文件中的数据(hdfsSink使用Sequence File格式存储)是avro格式的。我尝试建表查询,结果是乱码,文本文件也是这样。这个方法通过了。其实hive可以直接读取avro格式的指定数据的schema,但是。. . 我的文件格式不起作用,它可以通过实现接口本身将数据序列化为avro格式。哎呀

  2.使用API​​读取avro数据。这样,首先需要使用API​​读取seq文件数据,然后使用avro API进行反序列化。根据hadoop指导书hadoop IO章节中的demo,读取seq文件。然后我去avro官网的api,发现官网给出的demo是把数据序列化成avro文件,然后反序列化avro文件,和个人需求不一样,emmm。. . 继续翻API,好像找到了一个可以使用的类,但是最后还是不成功,这个方法也通过了。网址

  3.使用kafkaConsumer自带的参数反序列化avro。我以这种方式在互联网上阅读了很多博客。千篇一律的文章可能与实际需求不符。有的博客说直接配置这两个参数:code

  “key.deserializer”, "org.apache.kafka.common.serialization.StringDeserializer"

“value.deserializer”, "org.apache.kafka.common.serialization.ByteArrayDeserializer"

  首先,我不知道如何反序列化这样的数据,其次,kafkaConsumer的默认参数就是这两个。形式

  以下是正确配置的(在我看来):

  “key.deserializer”, "io.confluent.kafka.serializers.KafkaAvroDeserializer"

“value.deserializer”, "io.confluent.kafka.serializers.KafkaAvroDeserializer"

“schema.registry.url”, “http://avro-schema.user-defined.com”

  这里的key的反序列化方式可以根据业务给出的格式来确定。这里的键值是 avro 格式。

  看到这两个参数也给了,你可以根据自己的需要添加,我这里没用:

  kafka.consumer.specific.avro.reader = true

useFlumeEventFormat = true

  本以为这样可以,但结果往往不如预期,直接报错:

  

  解决了几个错误后,我终于发现这个错误是无法反转序列的根本问题。因此,查看kafkaSource源码,发现类型转换有问题(只有这一次),即图中提到的GenericRecord转换String错误。

  解决方法:挠头。. .

  Flume支持自定义源码,于是赶紧翻到flume书,按照书中的demo写了一个源码。具体实现其实就是这行代码:

  ConsumerRecords records = consumer.poll(100)

  改变消费者返回的记录类型,从而最终实现avro反序列化数据后的json格式。但这还没有结束。虽然实现了功能,但是自己写的代码肯定不如源码质量好。都想把源码的kafkaSource拿出来改一下看看效果。整个周期大约花了一周时间。. . 这不简单。以上如有错误,请指出并指正,谢谢~~

  下面是用到的pom文件,注意版本,注意版本,注意版本,重要的说三遍。由于版本不对,拿了一个老版本的源码,改了半天,各种坑。汇合的来源必须匹配。没有 Maven 存储库。Cloudera 取决于我的情况。

  

org.apache.flume.flume-ng-sources

flume-kafka-source

1.6.0-cdh5.16.2

${scope.version}

org.apache.flume

flume-ng-core

1.6.0-cdh5.16.2

${scope.version}

io.confluent

kafka-avro-serializer

5.2.2

${scope.version}

confluent

Confluent

http://packages.confluent.io/maven/

cloudera

https://repository.cloudera.com/artifactory/cloudera-repos/

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线