官方数据:flink-cdc实时增量同步mysql数据到hive
优采云 发布时间: 2022-09-21 18:09官方数据:flink-cdc实时增量同步mysql数据到hive
本文首发于我的个人博客网站等待下一个秋天——Flink
什么是疾病预防控制中心?
CDC 是 (Change Data Capture) 的缩写。其核心思想是监控和捕获数据库的变化(包括数据或数据表的INSERT、更新UPDATE、删除DELETE等),将这些变化按发生的顺序完整记录下来,写入消息中间件供其他服务使用。订阅和消费。
1.环境准备
注意:如果没有安装hadoop,可以不用yarn直接使用flink独立环境。
2. 下载以下依赖项
从以下两个地址下载flink的依赖,放到lib目录下。
flink-sql-connector-hive-2.2.0_2.11-1.13.5.jar
如果你的 Flink 是其他版本,可以在这里下载。
说明:我的hive版本是2.1.1,为什么我选择的版本号是2.2.0,这是官方给出的版本文件通信:
元存储版本Maven依赖SQL Client JAR
1.0.0 - 1.2.2
flink-sql-connector-hive-1.2.2
下载
2.0.0 - 2.2.0
flink-sql-connector-hive-2.2.0
下载
2.3.0 - 2.3.6
flink-sql-connector-hive-2.3.6
下载
3.0.0 - 3.1.2
flink-sql-connector-hive-3.1.2
下载
官方文档地址在这里,大家可以自行查看。
3.启动flink-sql客户端首先在yarn上启动一个应用,进入flink13.5目录,执行:
bin/yarn-session.sh -d -s 2 -jm 1024 -tm 2048 -qu root.sparkstreaming -nm flink-cdc-hive
进入flink sql命令行
bin/sql-client.sh embedded -s flink-cdc-hive
4.操作蜂巢
1)首选创建目录
CREATE CATALOG hive_catalog WITH (
'type' = 'hive',
'hive-conf-dir' = '/etc/hive/conf.cloudera.hive'
);
这里注意:hive-conf-dir是你hive配置文件的地址,需要主配置文件hive-site.xml。您可以将这些配置文件从 hive 节点复制到这台机器上。 .
2)查询
此时我们应该做一些常规的DDL操作来验证配置是否有问题:
use catalog hive_catalog;
show databases;
随便查询一张表
use test
show tables;
select * from people;
可能会报错:
把hadoop-mapreduce-client-core-3.0.0.jar放到flink的lib目录下,这个是我的,要根据你的hadoop版本来选择。
注意:很重要,把这个jar包放到Lib下后,需要重启应用,然后再用yarn-session启动一个应用,因为我发现好像有缓存,kill掉应用并重新启动它:
然后,可以查询数据,查询结果:
5.mysql数据同步到hive
flink sql中不能直接将mysql数据导入hive,需要分两步:
mysql数据同步kafka; kafka数据同步hive;
关于mysql数据到kafka的增量同步,前面有文章的分析,这里不做概述;重点是同步kafka数据到hive。
1) 创建一个与kafka关联的表:
之前的mysql同步到kafka,表是flink sql建表,connector='upsert-kafka',这里有区别:
CREATE TABLE product_view_mysql_kafka_parser(
`id` int,
`user_id` int,
`product_id` int,
`server_id` int,
`duration` int,
`times` string,
`time` timestamp
) WITH (
'connector' = 'kafka',
'topic' = 'flink-cdc-kafka',
'properties.bootstrap.servers' = 'kafka-001:9092',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
2)创建一个 hive 表
创建hive需要指定SET table.sql-dialect=hive;,否则flink sql命令行无法识别这种建表语法。为什么需要这样做,请参阅此文档 Hive Dialects。
-- 创建一个catalag用户hive操作
CREATE CATALOG hive_catalog WITH (
'type' = 'hive',
'hive-conf-dir' = '/etc/hive/conf.cloudera.hive'
);
use catalog hive_catalog;
-- 可以看到我们的hive里面有哪些数据库
show databases;
use test;
show tables;
上面我们现在可以看到hive中有哪些数据库和表;然后创建一个 hive 表:
CREATE TABLE product_view_kafka_hive_cdc (
`id` int,
`user_id` int,
`product_id` int,
`server_id` int,
`duration` int,
`times` string,
`time` timestamp
) STORED AS parquet TBLPROPERTIES (
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.delay'='0S',
'sink.partition-commit.policy.kind'='metastore,success-file',
'auto-compaction'='true',
'compaction.file-size'='128MB'
);
然后做数据同步:
insert into hive_catalog.test.product_view_kafka_hive_cdc
select *
from
default_catalog.default_database.product_view_mysql_kafka_parser;
注意:这里指定表名,我使用catalog.database.table,这种格式,因为这是两个不同的库,需要显式指定catalog-database-table。
网上还有其他解决方案,关于mysql实时增量同步到hive:
在网上看到一个实时数仓架构图,觉得还行:
参考文献
解决方案:整合Flume和Kafka完成实时数据采集
大家好,又见面了,我是你们的朋友全栈君。
注意:引用的网站应该和你的kafka版本一致,因为里面的字段会不一致。例如:#kafka-sink 这是1.6的版本,如果需要检查 data.log
复制
发布者:全栈程序员栈负责人,转载请注明出处:原文链接: