官方数据:flink-cdc实时增量同步mysql数据到hive

优采云 发布时间: 2022-09-21 18:09

  官方数据:flink-cdc实时增量同步mysql数据到hive

  本文首发于我的个人博客网站等待下一个秋天——Flink

  什么是疾病预防控制中心?

  CDC 是 (Change Data Capture) 的缩写。其核心思想是监控和捕获数据库的变化(包括数据或数据表的INSERT、更新UPDATE、删除DELETE等),将这些变化按发生的顺序完整记录下来,写入消息中间件供其他服务使用。订阅和消费。

  1.环境准备

  注意:如果没有安装hadoop,可以不用yarn直接使用flink独立环境。

  2. 下载以下依赖项

  从以下两个地址下载flink的依赖,放到lib目录下。

  flink-sql-connector-hive-2.2.0_2.11-1.13.5.jar

  如果你的 Flink 是其他版本,可以在这里下载。

  说明:我的hive版本是2.1.1,为什么我选择的版本号是2.2.0,这是官方给出的版本文件通信:

  元存储版本Maven依赖SQL Client JAR

  1.0.0 - 1.2.2

  flink-sql-connector-hive-1.2.2

  下载

  2.0.0 - 2.2.0

  flink-sql-connector-hive-2.2.0

  下载

  2.3.0 - 2.3.6

  flink-sql-connector-hive-2.3.6

  下载

  3.0.0 - 3.1.2

  flink-sql-connector-hive-3.1.2

  下载

  官方文档地址在这里,大家可以自行查看。

  3.启动flink-sql客户端首先在yarn上启动一个应用,进入flink13.5目录,执行:

  bin/yarn-session.sh -d -s 2 -jm 1024 -tm 2048 -qu root.sparkstreaming -nm flink-cdc-hive

  进入flink sql命令行

  bin/sql-client.sh embedded -s flink-cdc-hive

  4.操作蜂巢

  1)首选创建目录

  CREATE CATALOG hive_catalog WITH (

'type' = 'hive',

'hive-conf-dir' = '/etc/hive/conf.cloudera.hive'

);

  这里注意:hive-conf-dir是你hive配置文件的地址,需要主配置文件hive-site.xml。您可以将这些配置文件从 hive 节点复制到这台机器上。 .

  

  2)查询

  此时我们应该做一些常规的DDL操作来验证配置是否有问题:

  use catalog hive_catalog;

show databases;

  随便查询一张表

  use test

show tables;

select * from people;

  可能会报错:

  把hadoop-mapreduce-client-core-3.0.0.jar放到flink的lib目录下,这个是我的,要根据你的hadoop版本来选择。

  注意:很重要,把这个jar包放到Lib下后,需要重启应用,然后再用yarn-session启动一个应用,因为我发现好像有缓存,kill掉应用并重新启动它:

  然后,可以查询数据,查询结果:

  5.mysql数据同步到hive

  flink sql中不能直接将mysql数据导入hive,需要分两步:

  mysql数据同步kafka; kafka数据同步hive;

  关于mysql数据到kafka的增量同步,前面有文章的分析,这里不做概述;重点是同步kafka数据到hive。

  1) 创建一个与kafka关联的表:

  之前的mysql同步到kafka,表是flink sql建表,connector='upsert-kafka',这里有区别:

  CREATE TABLE product_view_mysql_kafka_parser(

`id` int,

`user_id` int,

`product_id` int,

`server_id` int,

`duration` int,

`times` string,

`time` timestamp

) WITH (

'connector' = 'kafka',

'topic' = 'flink-cdc-kafka',

'properties.bootstrap.servers' = 'kafka-001:9092',

'scan.startup.mode' = 'earliest-offset',

'format' = 'json'

);

  2)创建一个 hive 表

  创建hive需要指定SET table.sql-dialect=hive;,否则flink sql命令行无法识别这种建表语法。为什么需要这样做,请参阅此文档 Hive Dialects。

  

  -- 创建一个catalag用户hive操作

CREATE CATALOG hive_catalog WITH (

'type' = 'hive',

'hive-conf-dir' = '/etc/hive/conf.cloudera.hive'

);

use catalog hive_catalog;

-- 可以看到我们的hive里面有哪些数据库

show databases;

use test;

show tables;

  上面我们现在可以看到hive中有哪些数据库和表;然后创建一个 hive 表:

  CREATE TABLE product_view_kafka_hive_cdc (

`id` int,

`user_id` int,

`product_id` int,

`server_id` int,

`duration` int,

`times` string,

`time` timestamp

) STORED AS parquet TBLPROPERTIES (

'sink.partition-commit.trigger'='partition-time',

'sink.partition-commit.delay'='0S',

'sink.partition-commit.policy.kind'='metastore,success-file',

'auto-compaction'='true',

'compaction.file-size'='128MB'

);

  然后做数据同步:

  insert into hive_catalog.test.product_view_kafka_hive_cdc

select *

from

default_catalog.default_database.product_view_mysql_kafka_parser;

  注意:这里指定表名,我使用catalog.database.table,这种格式,因为这是两个不同的库,需要显式指定catalog-database-table。

  网上还有其他解决方案,关于mysql实时增量同步到hive:

  在网上看到一个实时数仓架构图,觉得还行:

  参考文献

  解决方案:整合Flume和Kafka完成实时数据采集

  大家好,又见面了,我是你们的朋友全栈君。

  注意:引用的网站应该和你的kafka版本一致,因为里面的字段会不一致。例如:#kafka-sink 这是1.6的版本,如果需要检查 data.log

  复制

  发布者:全栈程序员栈负责人,转载请注明出处:原文链接:

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线