整套解决方案:湖仓一体电商项目(十六):业务实现之编写写入ODS层业务代码
优采云 发布时间: 2022-09-25 01:48整套解决方案:湖仓一体电商项目(十六):业务实现之编写写入ODS层业务代码
文章目录
为业务实现编写ODS层业务代码
由于这个业务涉及到MySQL业务数据和用户日志数据,这两类数据采集存储在不同的Kafka主题中,所以这里写的ODS层代码由两段代码组成。
一、编码
处理MySQL业务库binlog数据的代码复用第一个业务代码只需要在“ProduceKafkaDBDataToODS.scala”代码中写入Icebeg-ODS层表中存储的代码,在“ProduceKafkaDBDataToODS.scala”代码文件中添加以下代码:
//向Iceberg ods 层 ODS_PRODUCT_CATEGORY 表插入数据
tblEnv.executeSql(
"""
|insert into hadoop_iceberg.icebergdb.ODS_PRODUCT_CATEGORY
|select
| data['id'] as id ,
| data['p_id'] as p_id,
| data['name'] as name,
| data['pic_url'] as pic_url,
| data['gmt_create'] as gmt_create
| from kafka_db_bussiness_tbl where `table` = 'pc_product_category'
""".stripMargin)
//向Iceberg ods 层 ODS_PRODUCT_INFO 表插入数据
tblEnv.executeSql(
"""
|insert into hadoop_iceberg.icebergdb.ODS_PRODUCT_INFO
|select
| data['product_id'] as product_id ,
| data['category_id'] as category_id,
| data['product_name'] as product_name,
| data['gmt_create'] as gmt_create
| from kafka_db_bussiness_tbl where `table` = 'pc_product'
""".stripMargin)
处理用户日志的代码需要自己编写,代码中的业务逻辑主要是读取存储用户浏览日志数据topic “KAFKA-USER-LOG-DATA”中的数据,通过Flink代码处理将不同类型用户日志处理成json类型数据,将该json结果后续除了存储在Iceberg-ODS层对应的表之外还要将数据存储在Kafka topic “KAFKA-ODS-TOPIC” 中方便后续的业务处理。具体代码参照“ProduceKafkaLogDataToODS.scala”,主要代码逻辑如下:
object ProduceKafkaLogDataToODS {
private val kafkaBrokers: String = ConfigUtil.KAFKA_BROKERS
private val kafkaOdsTopic: String = ConfigUtil.KAFKA_ODS_TOPIC
private val kafkaDwdBrowseLogTopic: String = ConfigUtil.KAFKA_DWD_BROWSELOG_TOPIC
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val tblEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
env.enableCheckpointing(5000)
import org.apache.flink.streaming.api.scala._
/**
* 1.需要预先创建 Catalog
* 创建Catalog,创建表需要在Hive中提前创建好,不在代码中创建,因为在Flink中创建iceberg表不支持create table if not exists ...语法
*/
tblEnv.executeSql(
"""
|create catalog hadoop_iceberg with (
| 'type'='iceberg',
| 'catalog-type'='hadoop',
| 'warehouse'='hdfs://mycluster/lakehousedata'
|)
""".stripMargin)
/**
* {
* "logtype": "browselog",
* "data": {
* "browseProductCode": "eSHd1sFat9",
* "browseProductTpCode": "242",
* "userIp": "251.100.236.37",
* "obtainPoints": 32,
* "userId": "uid208600",
* "frontProductUrl": "https://f/dcjp/nVnE",
* "logTime": 1646980514321,
* "browseProductUrl": "https://kI/DXSNBeP/"
* }
* }
*/
/**
* 2.创建 Kafka Connector,连接消费Kafka中数据
* 注意:1).关键字要使用 " 飘"符号引起来 2).对于json对象使用 map < String,String>来接收
*/
tblEnv.executeSql(
"""
|create table kafka_log_data_tbl(
| logtype string,
| data map
|) with (
<p>
| 'connector' = 'kafka',
| 'topic' = 'KAFKA-USER-LOG-DATA',
| 'properties.bootstrap.servers'='node1:9092,node2:9092,node3:9092',
| 'scan.startup.mode'='earliest-offset', --也可以指定 earliest-offset 、latest-offset
| 'properties.group.id' = 'my-group-id',
| 'format' = 'json'
|)
""".stripMargin)
/**
* 3.将不同的业务库数据存入各自的Iceberg表
*/
tblEnv.executeSql(
"""
|insert into hadoop_iceberg.icebergdb.ODS_BROWSELOG
|select
| data['logTime'] as log_time ,
| data['userId'] as user_id,
| data['userIp'] as user_ip,
| data['frontProductUrl'] as front_product_url,
| data['browseProductUrl'] as browse_product_url,
| data['browseProductTpCode'] as browse_product_tpcode,
| data['browseProductCode'] as browse_product_code,
| data['obtainPoints'] as obtain_points
| from kafka_log_data_tbl where `logtype` = 'browselog'
""".stripMargin)
//4.将用户所有日志数据组装成Json数据存入 kafka topic ODS-TOPIC 中
//读取 Kafka 中的数据,将维度数据另外存储到 Kafka 中
val kafkaLogTbl: Table = tblEnv.sqlQuery("select logtype,data from kafka_log_data_tbl")
//将 kafkaLogTbl Table 转换成 DataStream 数据
val userLogDS: DataStream[Row] = tblEnv.toAppendStream[Row](kafkaLogTbl)
//将 userLogDS 数据转换成JSON 数据写出到 kafka topic ODS-TOPIC
val odsSinkDS: DataStream[String] = userLogDS.map(row => {
//最后返回给Kafka 日志数据的json对象
val returnJsonObj = new JSONObject()
val logType: String = row.getField(0).toString
val data: String = row.getField(1).toString
val nObject = new JSONObject()
val arr: Array[String] = data.stripPrefix("{").stripSuffix("}").split(",")
for (elem ,通过可视化的页面和即时的信息反馈,让我们对站群的整体信息和个别站点突发事件有明显的提示。方便我们管理。</p>