文章实时采集(java最近项目中须要实时采集业务数据库CDC数据(这里数据) )
优采云 发布时间: 2022-04-01 00:27文章实时采集(java最近项目中须要实时采集业务数据库CDC数据(这里数据)
)
业务描述:java
最近项目中需要实时采集业务数据库CDC数据(这里的数据已经序列化成avro格式数据),这里我们使用Flume+Hdfs做技术架构。数据库
了解 Flume 的朋友都知道,它的组件分为三个部分:source、channel、sink。具体原理部分在此不再赘述。可以查看官网或者flume技术博客。这里就讲讲实现过程和加坑之路。阿帕奇
来自业务的数据存储在kafka中,所以source端使用kafkaSource,即kafkaConsumer,sink使用hdfsSink,channel使用file type。json
hdfsSink 编写的文件格式有两种:文本文件和序列文件。无论选择哪种文件格式,登陆hdfs后都不能直接使用。前面说过,业务数据已经序列化成avro格式,但是要求是hdfs上的数据必须是直接可用的。建筑学
考虑了几种解决方案:maven
1、使用hive建立一个外部表来关联hdfs上的数据。这里有一个问题。虽然hive支持读取seq文件格式,但是seq文件中的数据(hdfsSink使用Sequence File格式存储)是avro格式的。我尝试建表查询,结果是乱码,文本文件也是这样。这个方法通过了。其实hive可以直接读取avro格式的指定数据的schema,但是。. . 我的文件格式不起作用,它可以通过实现接口本身将数据序列化为avro格式。哎呀
2.使用API读取avro数据。这样,首先需要使用API读取seq文件数据,然后使用avro API进行反序列化。根据hadoop指导书hadoop IO章节中的demo,读取seq文件。然后我去avro官网的api,发现官网给出的demo是把数据序列化成avro文件,然后反序列化avro文件,和个人需求不一样,emmm。. . 继续翻API,好像找到了一个可以使用的类,但是最后还是不成功,这个方法也通过了。网址
3.使用kafkaConsumer自带的参数反序列化avro。我以这种方式在互联网上阅读了很多博客。千篇一律的文章可能与实际需求不符。有的博客说直接配置这两个参数:code
“key.deserializer”, "org.apache.kafka.common.serialization.StringDeserializer"
“value.deserializer”, "org.apache.kafka.common.serialization.ByteArrayDeserializer"
首先,我不知道如何反序列化这样的数据,其次,kafkaConsumer的默认参数就是这两个。形式
以下是正确配置的(在我看来):
“key.deserializer”, "io.confluent.kafka.serializers.KafkaAvroDeserializer"
“value.deserializer”, "io.confluent.kafka.serializers.KafkaAvroDeserializer"
“schema.registry.url”, “http://avro-schema.user-defined.com”
这里的key的反序列化方式可以根据业务给出的格式来确定。这里的键值是 avro 格式。
看到这两个参数也给了,你可以根据自己的需要添加,我这里没用:
kafka.consumer.specific.avro.reader = true
useFlumeEventFormat = true
本以为这样可以,但结果往往不如预期,直接报错:
解决了几个错误后,我终于发现这个错误是无法反转序列的根本问题。因此,查看kafkaSource源码,发现类型转换有问题(只有这一次),即图中提到的GenericRecord转换String错误。
解决方法:挠头。. .
Flume支持自定义源码,于是赶紧翻到flume书,按照书中的demo写了一个源码。具体实现其实就是这行代码:
ConsumerRecords records = consumer.poll(100)
改变消费者返回的记录类型,从而最终实现avro反序列化数据后的json格式。但这还没有结束。虽然实现了功能,但是自己写的代码肯定不如源码质量好。都想把源码的kafkaSource拿出来改一下看看效果。整个周期大约花了一周时间。. . 这不简单。以上如有错误,请指出并指正,谢谢~~
下面是用到的pom文件,注意版本,注意版本,注意版本,重要的说三遍。由于版本不对,拿了一个老版本的源码,改了半天,各种坑。汇合的来源必须匹配。没有 Maven 存储库。Cloudera 取决于我的情况。
org.apache.flume.flume-ng-sources
flume-kafka-source
1.6.0-cdh5.16.2
${scope.version}
org.apache.flume
flume-ng-core
1.6.0-cdh5.16.2
${scope.version}
io.confluent
kafka-avro-serializer
5.2.2
${scope.version}
confluent
Confluent
http://packages.confluent.io/maven/
cloudera
https://repository.cloudera.com/artifactory/cloudera-repos/