核心方法:flink sql实战案例

优采云 发布时间: 2022-09-21 18:13

  核心方法:flink sql实战案例

  目录

  一、背景

  使用flink sql实时同步数据

  二、进程

  三个步骤

  源-->>汇->>插入

  三、案例1.flink sql读取Kafka写入MySQL源码

  CREATE TABLE source_table (

user_id VARCHAR,

item_id VARCHAR,

category_id VARCHAR,

behavior VARCHAR,

ts TIMESTAMP

) WITH (

'connector.type' = 'kafka', -- 使用 kafka connector

'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本

'connector.topic' = 'user_behavior', -- kafka topic

'connector.startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取

'connector.properties.0.key' = 'zookeeper.connect', -- 连接信息

'connector.properties.0.value' = 'localhost:2181',

'connector.properties.1.key' = 'bootstrap.servers',

'connector.properties.1.value' = 'localhost:9092',

'update-mode' = 'append',

'format.type' = 'json', -- 数据源格式为 json

'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则

)

  下沉

  CREATE TABLE sink_table (

dt VARCHAR,

pv BIGINT,

uv BIGINT

) WITH (

'connector.type' = 'jdbc', -- 使用 jdbc connector

'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', -- jdbc url

'connector.table' = 'pvuv_sink', -- 表名

'connector.username' = 'username', -- 用户名

<p>

&#39;connector.password&#39; = &#39;password&#39;, -- 密码

&#39;connector.write.flush.max-rows&#39; = &#39;1&#39; -- 默认5000条,为了演示改为1条

)</p>

  插入

  INSERT INTO sink_table

SELECT

DATE_FORMAT(ts, &#39;yyyy-MM-dd HH:00&#39;) as dt,

COUNT(*) as pv,

COUNT(DISTINCT user_id) as uv

FROM source_table

GROUP BY DATE_FORMAT(ts, &#39;yyyy-MM-dd HH:00&#39;)

  2.flinksql 读取 kafka 写入 kudu 源

  -- kafka source

drop table if exists source_table;

CREATE TABLE source_table (

user_id VARCHAR

,item_id VARCHAR

,category_id VARCHAR

,behavior INT

,ts TIMESTAMP(3)

,process_time as proctime()

, WATERMARK FOR ts AS ts

) WITH (

&#39;connector&#39; = &#39;kafka&#39;

,&#39;topic&#39; = &#39;user_behavior&#39;

,&#39;properties.bootstrap.servers&#39; = &#39;venn:9092&#39;

,&#39;properties.group.id&#39; = &#39;source_table&#39;

,&#39;scan.startup.mode&#39; = &#39;group-offsets&#39;

,&#39;format&#39; = &#39;json&#39;

);

  下沉

  -- kafka sink

drop table if exists sink_table;

CREATE TABLE sink_table (

user_id STRING

,item_id STRING

<p>

,category_id STRING

,ts TIMESTAMP(3)

) WITH (

&#39;connector.type&#39; = &#39;kudu&#39;

,&#39;kudu.masters&#39; = &#39;venn:7051,venn:7151,venn:7251&#39;

,&#39;kudu.table&#39; = &#39;source_table&#39;

,&#39;kudu.hash-columns&#39; = &#39;user_id&#39;

,&#39;kudu.primary-key-columns&#39; = &#39;user_id&#39;

,&#39;kudu.max-buffer-size&#39; = &#39;5000&#39;

,&#39;kudu.flush-interval&#39; = &#39;1000&#39;

);

</p>

  插入

  -- insert

insert into sink_table

select user_id, item_id, category_id,ts

from source_table;

  四、注​​释1.断点续传

  断点续传是指数据同步任务在运行过程中因各种原因失败。不需要重新同步数据,只需要从上次失败的位置继续同步即可。如果原因失败,则无需重新下载文件,继续下载即可,可大大节省时间和计算资源。

  默认关闭,如果启用,调整isRestore: true

  2.直播采集

  根据数据源的数据是否实时变化,数据同步可以分为离线数据同步和实时数据同步。上面介绍的断点恢复,就是离线数据同步的功能。实时采集其实是实时数据。同步,当数据源中的数据被添加、删除或修改时,同步任务会监控这些变化,并将变化的数据实时同步到目标数据源。除了实时数据变化之外,实时采集和离线数据同步的另一个区别是实时采集任务不会停止,任务会一直*敏*感*词*数据源变化。

  3.回溯问题

  例如,mysql 是一个事务数据库,它会更新。最新的消息被发送到过去,更新之前的消息必须被召回。 update-和update+这两条消息都在状态。

  举个简单的例子,统计男女人数,一开始 MySQL 是男性,然后 MySQL 更新为女性。这时候,你收到的kafka,消息就会来,状态最初收录男,然后男退出。 , 当女性进来时,删除男性并添加女性。状态一般在rocksdb中,table.exec.state.ttl的窗口时间可以设置。

  相关参数

val tEnv: TableEnvironment = ...

val configuration = tEnv.getConfig().getConfiguration()

configuration.setString("table.exec.mini-batch.enabled", "true") // 启用

configuration.setString("table.exec.mini-batch.allow-latency", "5 s") // 缓存超时时长

configuration.setString("table.exec.mini-batch.size", "5000") // 缓存大小

  ps:因为我在这方面不是很专业,所以还处于学习阶段。有什么问题可以多多指教~

  核心方法:搜索引擎优化(SEO)常用工具

  

<p>华美商城华美导购推荐,搜索引擎优化(SEO)常用工具。内容和结构工具 搜索引擎爬取内容模拟器可以模拟蜘蛛爬取指定网页的文本、链接、关键词和描述信息 相似页面检测工具,检查两个页面的相似度。如果相似度超过80%,可能会被处罚在线创建GoogleSitemaps在线创建网站地图文件中文:英文:创建软件,可以轻松创建网站SitemapsGoogleAdwords关键词工具查询指定关键词的扩展匹配,搜索量、趋势和流行度。百度相关搜索按热门节目排序,列出指定关键词相关扩展匹配和热度关键词密度分析工具,分析指定关键词在指定页面的出现次数,以及对应百分比密度 中文:英文:关键词热门排名和指数百度排名:百度指数:排名:搜狗指数:搜搜龙虎排名:工具

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线