实时文章采集( 【干货】业务数据采集工具选型实时数仓相关工具的选型)

优采云 发布时间: 2022-04-11 10:27

  实时文章采集(

【干货】业务数据采集工具选型实时数仓相关工具的选型)

  Flink 实时数仓项目-业务数据采集

  前言

  前面完成日志数据的采集,下面进行业务数据的采集。

  一、采集工具选择

  实时数仓相关工具的选择采集在这个文章中提到:Flink实时数仓中各种CDC的比较

  二、Flink-CDC 学习

  业务数据采集选择了Flink-CDC,Flink-CDC的学习与使用在此:Flink-CDC 2.0 学习与使用

  三、MySQL 数据准备

  前提条件:业务数据存储在MySQL中。首先在MySQL中创建一个名为gmall2022的数据库,然后执行相关的sql文件导入建表语句。

  1.binlog 配置

  修改/etc/f文件如下:

  server-id = 1

log-bin=mysql-bin

binlog_format=row

binlog-do-db=gmall2022

  这里打开gmall2022数据库的binlog,格式为row。

  配置完成后重启MySQL使配置生效:sudo systemctl restart mysqld

  可以查看/var/lib/mysql目录下的binlog文件:

  

  2.模拟生成数据

  业务数据是用户真实的订单数据等,所以不可用,所以也使用脚本模拟来生成真实数据。

  1)将生成的数据对应的脚本上传到/opt/module/gmall-flink/rt_db目录

  2)修改application.properties中的数据库连接信息如下:

  logging.level.root=info

spring.datasource.driver-class-name=com.mysql.jdbc.Driver

spring.datasource.url=jdbc:mysql://hadoop102:3306/gmall-flink-2022?characterEncoding=utf-8& useSSL=false&serverTimezone=GMT%2B8

spring.datasource.username=root spring.datasource.password=000000

logging.pattern.console=%m%n

mybatis-plus.global-config.db-config.field-strategy=not_null #业务日期

mock.date=2021-03-06

#是否重置

mock.clear=1

#是否重置用户

mock.clear.user=0

… …

  3)运行jar包:java -jar gmall2020-mock-db-2020-11-27.jar

  

  再次进入/var/lib/mysql目录,可以看到索引文件发生了变化。

  四、业务数据采集模块

  在 IDEA 中新建一个模块 gmall2021-realtime,并创建如下包结构:

  

  目录角色

  应用程序

  在每一层处理数据的 Flink 程序

  豆子

  数据对象

  常见的

  公共常数

  实用程序

  工具

  1.配置文件

  在 pom.xml 中导入以下依赖项:

  

1.8

${java.version}

${java.version}

1.13.0

2.12

3.1.3

org.springframework.boot

spring-boot-starter-web

org.projectlombok

lombok

true

org.springframework.boot

spring-boot-starter-test

test

org.apache.flink

flink-java

${flink.version}

org.apache.flink

flink-streaming-java_${scala.version}

${flink.version}

org.apache.flink

flink-connector-kafka_${scala.version}

${flink.version}

org.apache.flink

flink-clients_${scala.version}

${flink.version}

org.apache.flink

flink-cep_${scala.version}

${flink.version}

org.apache.flink

flink-json

${flink.version}

com.alibaba

fastjson

1.2.75

org.apache.hadoop

hadoop-client

${hadoop.version}

mysql

mysql-connector-java

5.1.49

com.ververica

flink-connector-mysql-cdc

2.0.0

org.apache.flink

flink-table-planner-blink_2.12

${flink.version}

org.projectlombok

lombok

1.18.20

org.slf4j

slf4j-api

1.7.25

org.slf4j

slf4j-log4j12

1.7.25

org.apache.logging.log4j

log4j-to-slf4j

2.14.0

  在资源目录下创建 log4j.properties 配置文件:

  log4j.rootLogger=error,stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.target=System.out

log4j.appender.stdout.layout=org.apache.log4j.PatternLayout

log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n

  2.代码实现

  实现目标:使用 Flink-CDC 实时监控 MySQL 中的数据,然后封装变化的数据发送给 Kafka。

  主程序代码如下:

  public class Flink_CDCWithCustomerSchema {

public static void main(String[] args) throws Exception {

//1、创建流式执行环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

//设置检查点和状态后端

env.setStateBackend(new HashMapStateBackend());

env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://hadoop102:8020/gmall-flink-20220410/ck"));

env.enableCheckpointing(5000L);

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

env.getCheckpointConfig().setCheckpointTimeout(10000L);

env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);

//2、创建Flink-CDC的Source

DebeziumSourceFunction mysqlSource = MySqlSource.builder()

.hostname("hadoop102")

.port(3306)

.username("root")

.password("000000")

.databaseList("gmall-flink-2022")

.startupOptions(StartupOptions.latest())

.deserializer(new MyDeserializationSchema())

.build();

//3、使用CDC Source从MySQL中读取数据

DataStreamSource mysqlDataStream = env.addSource(mysqlSource);

//4、将从MySQL中读取到并序列化后的数据发送到Kafka中

mysqlDataStream.addSink(MyKafkaUtil.getKafkaSink("ods_base_db"));

//5、执行任务

env.execute();

}

}

  自定义序列化器如下:

  public class MyDeserializationSchema implements DebeziumDeserializationSchema {

/*

数据包装格式:

{

"database":"",

"tableName":"",

"operation":"",

"before":{"id":"","tm_name":""...},

"after":{"id":"","tm_name":""...},

*/

@Override

public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {

//创建JSON对象,用于封装最终的返回值数据信息

JSONObject result = new JSONObject();

//获取数据库名和表名

//topic中包含了数据库名和表名,格式为: mysql_binlog_source.gmall-flink.z_user_info

String[] fields = sourceRecord.topic().split("\\.");

String database=fields[1];

String tableName = fields[2];

result.put("database",database);

result.put("tableName",tableName);

//获取操作类型

Envelope.Operation operation = Envelope.operationFor(sourceRecord);

//把create类型的操作转化为insert类型,方便使用

String s = operation.toString().toLowerCase();

if("create".equals(s)){

s="insert";

}

result.put("operation",s);

//拿到before和after的数据

Struct value = (Struct) sourceRecord.value();

//获取操作前数据

result.put("before",getValue(value.getStruct("before")));

//获取操作后数据

result.put("after",getValue(value.getStruct("after")));

//输出到下游

collector.collect(result.toString());

}

//提取出来了一个方法

//将before和after的数据封装到JSON对象里

public JSONObject getValue(Struct struct){

JSONObject valueJSON = new JSONObject();

//如果里面有数据,就获取对应的元数据信息,即列名,然后依次将数据放入到JSON对象里

if(struct!=null){

Schema schema = struct.schema();

for (Field field : schema.fields()) {

valueJSON.put(field.name(),struct.get(field));

}

}

return valueJSON;

}

@Override

public TypeInformation getProducedType() {

return BasicTypeInfo.STRING_TYPE_INFO;

}

}

  公开课如下:

  public class MyKafkaUtil {

//Kafka链接地址

private static String KAFKA_SERVE="hadoop102:9092,hadoop103:9092,hadoop104:9092";

//Kafka相关配置信息

private static Properties properties=new Properties();

static{

properties.setProperty("bootstrap.servers",KAFKA_SERVE);

}

public static FlinkKafkaProducer getKafkaSink(String topic){

return new FlinkKafkaProducer(topic,new SimpleStringSchema(),properties);

}

}

  至此,MySQL中业务数据的采集就完成了。

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线