整套解决方案:湖仓一体电商项目(十六):业务实现之编写写入ODS层业务代码

优采云 发布时间: 2022-09-25 01:48

  整套解决方案:湖仓一体电商项目(十六):业务实现之编写写入ODS层业务代码

  文章目录

  为业务实现编写ODS层业务代码

  由于这个业务涉及到MySQL业务数据和用户日志数据,这两类数据采集存储在不同的Kafka主题中,所以这里写的ODS层代码由两段代码组成。

  一、编码

  处理MySQL业务库binlog数据的代码复用第一个业务代码只需要在“ProduceKafkaDBDataToODS.scala”代码中写入Icebeg-OD​​S层表中存储的代码,在“ProduceKafkaDBDataToODS.scala”代码文件中添加以下代码:

  //向Iceberg ods 层 ODS_PRODUCT_CATEGORY 表插入数据

tblEnv.executeSql(

"""

|insert into hadoop_iceberg.icebergdb.ODS_PRODUCT_CATEGORY

|select

| data['id'] as id ,

| data['p_id'] as p_id,

| data['name'] as name,

| data['pic_url'] as pic_url,

| data['gmt_create'] as gmt_create

| from kafka_db_bussiness_tbl where `table` = 'pc_product_category'

""".stripMargin)

//向Iceberg ods 层 ODS_PRODUCT_INFO 表插入数据

tblEnv.executeSql(

"""

|insert into hadoop_iceberg.icebergdb.ODS_PRODUCT_INFO

|select

| data['product_id'] as product_id ,

| data['category_id'] as category_id,

| data['product_name'] as product_name,

| data['gmt_create'] as gmt_create

| from kafka_db_bussiness_tbl where `table` = 'pc_product'

""".stripMargin)

处理用户日志的代码需要自己编写,代码中的业务逻辑主要是读取存储用户浏览日志数据topic “KAFKA-USER-LOG-DATA”中的数据,通过Flink代码处理将不同类型用户日志处理成json类型数据,将该json结果后续除了存储在Iceberg-ODS层对应的表之外还要将数据存储在Kafka topic “KAFKA-ODS-TOPIC” 中方便后续的业务处理。具体代码参照“ProduceKafkaLogDataToODS.scala”,主要代码逻辑如下:

object ProduceKafkaLogDataToODS {

private val kafkaBrokers: String = ConfigUtil.KAFKA_BROKERS

private val kafkaOdsTopic: String = ConfigUtil.KAFKA_ODS_TOPIC

private val kafkaDwdBrowseLogTopic: String = ConfigUtil.KAFKA_DWD_BROWSELOG_TOPIC

def main(args: Array[String]): Unit = {

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

val tblEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

env.enableCheckpointing(5000)

import org.apache.flink.streaming.api.scala._

/**

* 1.需要预先创建 Catalog

* 创建Catalog,创建表需要在Hive中提前创建好,不在代码中创建,因为在Flink中创建iceberg表不支持create table if not exists ...语法

*/

tblEnv.executeSql(

"""

|create catalog hadoop_iceberg with (

| 'type'='iceberg',

| 'catalog-type'='hadoop',

| 'warehouse'='hdfs://mycluster/lakehousedata'

|)

""".stripMargin)

/**

* {

* "logtype": "browselog",

* "data": {

* "browseProductCode": "eSHd1sFat9",

* "browseProductTpCode": "242",

* "userIp": "251.100.236.37",

* "obtainPoints": 32,

* "userId": "uid208600",

* "frontProductUrl": "https://f/dcjp/nVnE",

* "logTime": 1646980514321,

* "browseProductUrl": "https://kI/DXSNBeP/"

* }

* }

*/

/**

* 2.创建 Kafka Connector,连接消费Kafka中数据

* 注意:1).关键字要使用 " 飘"符号引起来 2).对于json对象使用 map < String,String>来接收

*/

tblEnv.executeSql(

"""

|create table kafka_log_data_tbl(

| logtype string,

| data map

|) with (

<p>

| &#39;connector&#39; = &#39;kafka&#39;,

| &#39;topic&#39; = &#39;KAFKA-USER-LOG-DATA&#39;,

| &#39;properties.bootstrap.servers&#39;=&#39;node1:9092,node2:9092,node3:9092&#39;,

| &#39;scan.startup.mode&#39;=&#39;earliest-offset&#39;, --也可以指定 earliest-offset 、latest-offset

| &#39;properties.group.id&#39; = &#39;my-group-id&#39;,

| &#39;format&#39; = &#39;json&#39;

|)

""".stripMargin)

/**

* 3.将不同的业务库数据存入各自的Iceberg表

*/

tblEnv.executeSql(

"""

|insert into hadoop_iceberg.icebergdb.ODS_BROWSELOG

|select

| data[&#39;logTime&#39;] as log_time ,

| data[&#39;userId&#39;] as user_id,

| data[&#39;userIp&#39;] as user_ip,

| data[&#39;frontProductUrl&#39;] as front_product_url,

| data[&#39;browseProductUrl&#39;] as browse_product_url,

| data[&#39;browseProductTpCode&#39;] as browse_product_tpcode,

| data[&#39;browseProductCode&#39;] as browse_product_code,

| data[&#39;obtainPoints&#39;] as obtain_points

| from kafka_log_data_tbl where `logtype` = &#39;browselog&#39;

""".stripMargin)

//4.将用户所有日志数据组装成Json数据存入 kafka topic ODS-TOPIC 中

//读取 Kafka 中的数据,将维度数据另外存储到 Kafka 中

val kafkaLogTbl: Table = tblEnv.sqlQuery("select logtype,data from kafka_log_data_tbl")

//将 kafkaLogTbl Table 转换成 DataStream 数据

val userLogDS: DataStream[Row] = tblEnv.toAppendStream[Row](kafkaLogTbl)

//将 userLogDS 数据转换成JSON 数据写出到 kafka topic ODS-TOPIC

val odsSinkDS: DataStream[String] = userLogDS.map(row => {

//最后返回给Kafka 日志数据的json对象

val returnJsonObj = new JSONObject()

val logType: String = row.getField(0).toString

val data: String = row.getField(1).toString

val nObject = new JSONObject()

val arr: Array[String] = data.stripPrefix("{").stripSuffix("}").split(",")

for (elem ,通过可视化的页面和即时的信息反馈,让我们对站群的整体信息和个别站点突发事件有明显的提示。方便我们管理。</p>

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线