实时文章采集(实时采集Kafka如何实时写入到MySQL的一些坑点,完整源码就不贴出来)
优采云 发布时间: 2022-04-10 23:17实时文章采集(实时采集Kafka如何实时写入到MySQL的一些坑点,完整源码就不贴出来)
文章目录
01 简介
最近在做实时采集Kafka发布内容到MySQL,本文记录重点,不详述,希望对大家有所帮助。
02 实现2.1 添加依赖
在项目中,除了添加基本的Flink环境依赖外,还需要添加flink-connector-kafka的依赖:
org.apache.flink
flink-connector-kafka_2.11
1.13.6
另外,由于 Flink 使用 Kafka 作为源,读取到的字符串都会被解析。本文主要使用“json”方式,所以需要引入序列化包,但是flink-connector -kafka已经自带了,就不用再介绍了。
好的,这里,如果我们写FlinkSQL来启动它,它只会闪退,为什么?因为我们缺少了'kafka-clients-2.1.0.jar'这个包,但是没有必要导入它,因为它已经收录在flink-connector-kafka中了。
为什么在这里特别提到“序列化包”和“kafka-clients 包”?因为如果我们部署 Flink On Yarn,这两个包需要放在 HDFS 中,如下:
2.2 Flink SQL
好了,关键的FlinkSQL来了,怎么写?
先看Source,也就是我们的Kafka,如下:
CREATE TABLE t_student (
id INT,
name STRING
) WITH (
'connector' = 'kafka',
'topic' = 'cdc_user',
'properties.bootstrap.servers' = '10.194.166.92:9092',
'properties.group.id' = 'flink-cdc-mysql-kafka',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
)
然后sink输出,我这里需要输出到MySQL:
CREATE TABLE t_student_copy (
id INT,
name STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://127.0.0.1:3306/big_data',
'username' = 'root',
'password' = '123456',
'table-name' = 't_student_copy'
)
最后用INSERT INTO来声明怎么写:
INSERT INTO t_student_copy(id,name) SELECT id,name FROM t_student
2.3 配置Kafka域名
另外需要注意的是,当我们运行Flink程序时,会出现类似如下的错误:
无法连接代理…
这时候我们需要在运行Flink程序的服务器上配置Kafka的域名,具体在hosts文件中:
vi /etc/hosts
好的,到这里,我们只要使用Kafka工具发送json格式的数据,Flink程序就可以实时接收,写入MySQL数据库。
03 正文结束
本文主要记录Kafka实时写入MySQL的一些坑。完整的源代码将不会发布。希望能给你一些启发和帮助。感谢您的阅读,本文结束!
附件:KafkaTool教程: