实时文章采集(实时采集Kafka如何实时写入到MySQL的一些坑点,完整源码就不贴出来)

优采云 发布时间: 2022-04-10 23:17

  实时文章采集(实时采集Kafka如何实时写入到MySQL的一些坑点,完整源码就不贴出来)

  文章目录

  01 简介

  最近在做实时采集Kafka发布内容到MySQL,本文记录重点,不详述,希望对大家有所帮助。

  02 实现2.1 添加依赖

  在项目中,除了添加基本的Flink环境依赖外,还需要添加flink-connector-kafka的依赖:

  

org.apache.flink

flink-connector-kafka_2.11

1.13.6

  另外,由于 Flink 使用 Kafka 作为源,读取到的字符串都会被解析。本文主要使用“json”方式,所以需要引入序列化包,但是flink-connector -kafka已经自带了,就不用再介绍了。

  好的,这里,如果我们写FlinkSQL来启动它,它只会闪退,为什么?因为我们缺少了'kafka-clients-2.1.0.jar'这个包,但是没有必要导入它,因为它已经收录在flink-connector-kafka中了。

  为什么在这里特别提到“序列化包”和“kafka-clients 包”?因为如果我们部署 Flink On Yarn,这两个包需要放在 HDFS 中,如下:

  

  2.2 Flink SQL

  好了,关键的FlinkSQL来了,怎么写?

  先看Source,也就是我们的Kafka,如下:

  CREATE TABLE t_student (

id INT,

name STRING

) WITH (

'connector' = 'kafka',

'topic' = 'cdc_user',

'properties.bootstrap.servers' = '10.194.166.92:9092',

'properties.group.id' = 'flink-cdc-mysql-kafka',

'scan.startup.mode' = 'earliest-offset',

'format' = 'json'

)

  然后sink输出,我这里需要输出到MySQL:

  CREATE TABLE t_student_copy (

id INT,

name STRING,

PRIMARY KEY (id) NOT ENFORCED

) WITH (

'connector' = 'jdbc',

'url' = 'jdbc:mysql://127.0.0.1:3306/big_data',

'username' = 'root',

'password' = '123456',

'table-name' = 't_student_copy'

)

  最后用INSERT INTO来声明怎么写:

  INSERT INTO t_student_copy(id,name) SELECT id,name FROM t_student

  2.3 配置Kafka域名

  另外需要注意的是,当我们运行Flink程序时,会出现类似如下的错误:

  无法连接代理…

  这时候我们需要在运行Flink程序的服务器上配置Kafka的域名,具体在hosts文件中:

  vi /etc/hosts

  好的,到这里,我们只要使用Kafka工具发送json格式的数据,Flink程序就可以实时接收,写入MySQL数据库。

  03 正文结束

  本文主要记录Kafka实时写入MySQL的一些坑。完整的源代码将不会发布。希望能给你一些启发和帮助。感谢您的阅读,本文结束!

  附件:KafkaTool教程:

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线