实时文章采集( 【干货】业务数据采集工具选型实时数仓相关工具的选型)
优采云 发布时间: 2022-04-11 10:27实时文章采集(
【干货】业务数据采集工具选型实时数仓相关工具的选型)
Flink 实时数仓项目-业务数据采集
前言
前面完成日志数据的采集,下面进行业务数据的采集。
一、采集工具选择
实时数仓相关工具的选择采集在这个文章中提到:Flink实时数仓中各种CDC的比较
二、Flink-CDC 学习
业务数据采集选择了Flink-CDC,Flink-CDC的学习与使用在此:Flink-CDC 2.0 学习与使用
三、MySQL 数据准备
前提条件:业务数据存储在MySQL中。首先在MySQL中创建一个名为gmall2022的数据库,然后执行相关的sql文件导入建表语句。
1.binlog 配置
修改/etc/f文件如下:
server-id = 1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=gmall2022
这里打开gmall2022数据库的binlog,格式为row。
配置完成后重启MySQL使配置生效:sudo systemctl restart mysqld
可以查看/var/lib/mysql目录下的binlog文件:
2.模拟生成数据
业务数据是用户真实的订单数据等,所以不可用,所以也使用脚本模拟来生成真实数据。
1)将生成的数据对应的脚本上传到/opt/module/gmall-flink/rt_db目录
2)修改application.properties中的数据库连接信息如下:
logging.level.root=info
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://hadoop102:3306/gmall-flink-2022?characterEncoding=utf-8& useSSL=false&serverTimezone=GMT%2B8
spring.datasource.username=root spring.datasource.password=000000
logging.pattern.console=%m%n
mybatis-plus.global-config.db-config.field-strategy=not_null #业务日期
mock.date=2021-03-06
#是否重置
mock.clear=1
#是否重置用户
mock.clear.user=0
… …
3)运行jar包:java -jar gmall2020-mock-db-2020-11-27.jar
再次进入/var/lib/mysql目录,可以看到索引文件发生了变化。
四、业务数据采集模块
在 IDEA 中新建一个模块 gmall2021-realtime,并创建如下包结构:
目录角色
应用程序
在每一层处理数据的 Flink 程序
豆子
数据对象
常见的
公共常数
实用程序
工具
1.配置文件
在 pom.xml 中导入以下依赖项:
1.8
${java.version}
${java.version}
1.13.0
2.12
3.1.3
org.springframework.boot
spring-boot-starter-web
org.projectlombok
lombok
true
org.springframework.boot
spring-boot-starter-test
test
org.apache.flink
flink-java
${flink.version}
org.apache.flink
flink-streaming-java_${scala.version}
${flink.version}
org.apache.flink
flink-connector-kafka_${scala.version}
${flink.version}
org.apache.flink
flink-clients_${scala.version}
${flink.version}
org.apache.flink
flink-cep_${scala.version}
${flink.version}
org.apache.flink
flink-json
${flink.version}
com.alibaba
fastjson
1.2.75
org.apache.hadoop
hadoop-client
${hadoop.version}
mysql
mysql-connector-java
5.1.49
com.ververica
flink-connector-mysql-cdc
2.0.0
org.apache.flink
flink-table-planner-blink_2.12
${flink.version}
org.projectlombok
lombok
1.18.20
org.slf4j
slf4j-api
1.7.25
org.slf4j
slf4j-log4j12
1.7.25
org.apache.logging.log4j
log4j-to-slf4j
2.14.0
在资源目录下创建 log4j.properties 配置文件:
log4j.rootLogger=error,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
2.代码实现
实现目标:使用 Flink-CDC 实时监控 MySQL 中的数据,然后封装变化的数据发送给 Kafka。
主程序代码如下:
public class Flink_CDCWithCustomerSchema {
public static void main(String[] args) throws Exception {
//1、创建流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//设置检查点和状态后端
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://hadoop102:8020/gmall-flink-20220410/ck"));
env.enableCheckpointing(5000L);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(10000L);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
//2、创建Flink-CDC的Source
DebeziumSourceFunction mysqlSource = MySqlSource.builder()
.hostname("hadoop102")
.port(3306)
.username("root")
.password("000000")
.databaseList("gmall-flink-2022")
.startupOptions(StartupOptions.latest())
.deserializer(new MyDeserializationSchema())
.build();
//3、使用CDC Source从MySQL中读取数据
DataStreamSource mysqlDataStream = env.addSource(mysqlSource);
//4、将从MySQL中读取到并序列化后的数据发送到Kafka中
mysqlDataStream.addSink(MyKafkaUtil.getKafkaSink("ods_base_db"));
//5、执行任务
env.execute();
}
}
自定义序列化器如下:
public class MyDeserializationSchema implements DebeziumDeserializationSchema {
/*
数据包装格式:
{
"database":"",
"tableName":"",
"operation":"",
"before":{"id":"","tm_name":""...},
"after":{"id":"","tm_name":""...},
*/
@Override
public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {
//创建JSON对象,用于封装最终的返回值数据信息
JSONObject result = new JSONObject();
//获取数据库名和表名
//topic中包含了数据库名和表名,格式为: mysql_binlog_source.gmall-flink.z_user_info
String[] fields = sourceRecord.topic().split("\\.");
String database=fields[1];
String tableName = fields[2];
result.put("database",database);
result.put("tableName",tableName);
//获取操作类型
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
//把create类型的操作转化为insert类型,方便使用
String s = operation.toString().toLowerCase();
if("create".equals(s)){
s="insert";
}
result.put("operation",s);
//拿到before和after的数据
Struct value = (Struct) sourceRecord.value();
//获取操作前数据
result.put("before",getValue(value.getStruct("before")));
//获取操作后数据
result.put("after",getValue(value.getStruct("after")));
//输出到下游
collector.collect(result.toString());
}
//提取出来了一个方法
//将before和after的数据封装到JSON对象里
public JSONObject getValue(Struct struct){
JSONObject valueJSON = new JSONObject();
//如果里面有数据,就获取对应的元数据信息,即列名,然后依次将数据放入到JSON对象里
if(struct!=null){
Schema schema = struct.schema();
for (Field field : schema.fields()) {
valueJSON.put(field.name(),struct.get(field));
}
}
return valueJSON;
}
@Override
public TypeInformation getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
公开课如下:
public class MyKafkaUtil {
//Kafka链接地址
private static String KAFKA_SERVE="hadoop102:9092,hadoop103:9092,hadoop104:9092";
//Kafka相关配置信息
private static Properties properties=new Properties();
static{
properties.setProperty("bootstrap.servers",KAFKA_SERVE);
}
public static FlinkKafkaProducer getKafkaSink(String topic){
return new FlinkKafkaProducer(topic,new SimpleStringSchema(),properties);
}
}
至此,MySQL中业务数据的采集就完成了。