官方数据:使用Debezium、Postgres和Kafka进行数据实时采集(CDC)
优采云 发布时间: 2022-11-02 23:53官方数据:使用Debezium、Postgres和Kafka进行数据实时采集(CDC)
1. 背景
他一直在改进自己的微服务架构,包括分布式工作流服务的构建,目前使用的是 Camunda 工作流引擎。使用Camunda工作流,会涉及到工作流引擎的用户系统如何与现有的用户系统集成的问题(Flowable、Activity也类似)。在现有的设计中,工作流导向侧重于企业内部流程的流动,因此系统设计了单位、部门、人员、人员归属,以对应Camunda工作流用户系统。
功能设计完成后,又面临另一个问题,如何解决现有人事系统数据如何同步到Camunda工作流引擎[`real-time`]的问题。如果现有的系统数据和工作流数据在同一个库中,则相对容易解决。在微服务架构中,不同服务的数据通常存储在不同的数据库中,因此需要数据同步。不同的方法可以达到相同的效果。
最初考虑了以下两个选项,但都略有不足:
经过大量数据的查询和对比,最终选择了德贝子木来解决以上问题以及以后更多的数据同步问题。
2. Debezium 简介
RedHat 的开源 Debezium 是一个开源工具,可以从多个数据源捕获实时变化数据并形成数据流输出。
它是一个 CDC(变更数据捕获)工具。其工作原理类似于著名的Canal、DataBus、Maxwell等,通过提取数据库日志来获取变化。
官方介绍是:
Debezium 是一个用于变更数据捕获的开源分布式平台。启动它,将其指向您的数据库,您的应用程序可以开始响应其他应用程序提交到您的数据库的所有插入、更新和删除操作。Debezium 耐用且快速,因此您的应用程序可以快速响应并且不会错过任何事件,即使出现问题也是如此
Debezium 是一个分布式平台,可将您现有的数据库转换为事件流,因此应用程序可以看到数据库中的每一个行级更改并立即做出响应。Debezium 构建在 Apache Kafka 之上,并提供与 Kafka Connect 兼容的连接器来监控特定的数据库管理系统。
Debezium 现在支持以下数据库:
与ETL不同的是,Debezimu只支持生产端连接数据库,消费者端不支持连接数据库。相反,您需要编写自己的代码来接收 Kafka 消息数据。经过分析,这种方式比较灵活,也可以很好的利用现有微服务架构中的Kafka。
3.快速搭建Debezimu测试环境。
目前,Debezium 的最新稳定版是 1.6。Debezium 已经打包了要用作 Docker 镜像的组件。因此,我们只需要安装并启动 Docker 即可按照以下步骤快速搭建测试环境。
3.1 运行 Zookeeper
docker run -it --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:1.6
3.2 运行卡夫卡
docker run -it --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:1.6
3.3 运行 PostgreSQL
docker run -it --rm --name postgres -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres debezium/example-postgres:1.6
上面使用的代码是:debezium/example-postgres:1.6,查看Debezimu官方文档,其他例子都是这个。事实上,Debezimu 已经 Docker 打包了 PostgreSQL 9~13,你可以根据需要在 Docker Hub 中选择对应的 PostgreSQL 版本。
debezium/postgres 非常小巧,使用方便,而且也有必要的设置,所以可以直接使用,无需额外配置。
3.4 运行 Debezimu Connect
docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link postgres:postgres debezium/connect:1.6
Debezium 的容器在启动时需要传入以下环境变量:
3.5 创建连接器
经过以上4个步骤,Debezium的测试环境就搭建好了。现在需要调用 Debezium 提供的 API 来创建一个连接器来建立 Debezium 和数据库之间的关系。我们将以下有效负载发布到`:8083/connectors/`。
{
"name": "fulfillment-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "192.168.99.100",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"database.server.name": "fulfillment",
"table.include.list": "public.inventory"
}
}
“name”:注册到 Kafka Connect 服务的连接器名称 “connector.class”:PostgreSQL 连接器类名称 “database.hostname”:PostgreSQL 数据库地址 “database.port”:PostgreSQL 数据库端口 “database.user”:PostgreSQL 数据库用户名 “database.password”:PostgreSQL 数据密码 “database.dbname”:连接的 PostgreSQL 数据库 “database.server.name”:虚拟数据库服务器名称,可根据实际需要定义,消费 Kafka 时应使用该值data "table.include.list":监控的数据表列表,用","分隔。PostgreSQL 应该以“.”格式写入整个表名。如果没有特定的模式,那么默认的 `public`
以下是完成的 curl 命令:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{"name": "fulfillment-connector", "config": {"connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "192.168.99.100", "database.port": "5432", "database.user": "postgres", "database.password": "postgres", "database.dbname" : "postgres", "database.server.name": "fulfillment", "table.include.list": "public.inventory" }}'
上面是一个例子,因为我用的是windows,个人觉得curl不方便,所以改用postman:
3.6 Docker Compose 配置
为方便使用,将以上 Docker 命令集成到 Docker Compose 配置中,如下:
version: "3"
services:
postgres:
image: debezium/postgres:13
container_name: postgres
hostname: postgres
environment:
POSTGRES_USER: herodotus
POSTGRES_PASSWORD: herodotus
ports:
- 5432:5432
zookeeper:
image: debezium/zookeeper:1.6
container_name: zookeeper
restart: always
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: debezium/kafka:1.6
container_name: kafka
restart: always
ports:
- 9092:9092
environment:
ZOOKEEPER_CONNECT: zookeeper:2181
BOOTSTRAP_SERVERS: kafka:9092
depends_on:
- zookeeper
connect:
image: debezium/connect:1.6
container_name: connect
restart: always
ports:
- 8083:8083
environment:
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: herodotus_connect_configs
OFFSET_STORAGE_TOPIC: herodotus_connect_offsets
STATUS_STORAGE_TOPIC: herodotus_connect_statuses
BOOTSTRAP_SERVERS: kafka:9092
depends_on:
- kafka
4.外部数据库配置
上一章介绍了Debezimu测试环境。其中使用的debezium/postgres已经配置好,使用起来比较方便。在实际使用过程中,经常使用PostgreSQL独立构建,所以需要对PostgreSQL进行配置。
4.1 以 Docker 运行基本组件
本章主要介绍Debezimu与独立PostgreSQL数据库的连接。因此,除了 PostgreSQL,Zookeeper、Kafka 和 Debezimu Connect 仍然使用 Docker 进行部署。具体部署的Docker Compose配置如下:
version: "3"
services:
zookeeper:
image: debezium/zookeeper:1.6
container_name: zookeeper
hostname: zookeeper
environment:
ZOOKEEPER_SERVER_ID: 1
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: debezium/kafka:1.6
container_name: kafka
hostname: kafka
ports:
- 9092:9092
environment:
BROKER_ID: 1
ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: LISTENER_INNER://kafka:29092,LISTENER_OUTER://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: LISTENER_INNER://kafka:29092,LISTENER_OUTER://192.168.101.10:9092
<p>
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_INNER:PLAINTEXT,LISTENER_OUTER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_INNER
KAFKA_ALLOW_PLAINTEXT_LISTENER: 'yes'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
depends_on:
- zookeeper
connect:
image: debezium/connect:1.6
container_name: connect
hostname: connect
ports:
- 8083:8083
environment:
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: herodotus_connect_configs
OFFSET_STORAGE_TOPIC: herodotus_connect_offsets
STATUS_STORAGE_TOPIC: herodotus_connect_statuses
BOOTSTRAP_SERVERS: kafka:9092
depends_on:
- kafka</p>
Kafka Listener相关的配置是为了解决Spring Kafka会连接Kafka的问题:`无法建立到节点-1的连接。经纪人可能不可用。`。
4.2 修改PostgreSQL配置
逻辑解码功能是 PostgreSQL 在 9.4 中添加的,是一种允许提取提交到事务日志的更改并在输出插件的帮助下以用户友好的方式处理这些更改的机制。输出插件使客户端能够使用更改。
PostgreSQL 连接器读取和处理数据库更改主要由两部分组成:
decoderbufs:基于 `Protobuf`,目前由 Debezimu 社区维护
wal2json :基于 `JSON`,目前由 wal2json 社区维护
pgoutput:PostgreSQL 10 及更高版本中的标准逻辑解码输出插件。它由 PostgreSQL 社区维护,并由 PostgreSQL 本身用于逻辑复制。这个插件是内置安装的,所以不需要额外安装。
逻辑解码输出插件不支持 DDL 更改,这意味着连接器无法向消费者发送 DDL 更改事件
逻辑解码复制槽支持数据库的“主”服务器。因此,如果是 PostgreSQL 服务集群,只能在 `primary` 服务器上激活 Connector。如果“主”服务器出现问题,连接器将停止。
4.2.1 修改PostgreSQL配置
在 ${PostgreSQL_HOME}/13/data 目录中,找到 postgresql.conf。
修改以下配置:
wal_level=logical
max_wal_senders=1
max_replication_slots=1
配置完成记得重启数据库
4.2.2 设置数据库权限
需要为 PostgreSQL 用户分配复制权限。定义一个 PostgreSQL 角色并分配至少两个权限,REPLICATION 和 LOGION。示例代码如下:
CREATE ROLE REPLICATION LOGIN;
具体操作可以参考以下脚本:
-- pg新建用户
CREATE USER user WITH PASSWORD 'pwd';
-- 给用户复制流权限
ALTER ROLE user replication;
-- 给用户登录数据库权限
grant CONNECT ON DATABASE test to user;
-- 把当前库public下所有表查询权限赋给用户
GRANT SELECT ON ALL TABLES IN SCHEMA public TO user;
4.3 创建连接器
将以下有效负载发布到:8083/connectors/
{
"name": "herodotus-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "192.168.101.10",
"database.port": "15432",
"database.user": "athena",
"database.password": "athena",
"database.dbname" : "athena",
"database.server.name": "herodotus",
"slot.name": "herodotus_slot",
"table.include.list": "public.sys_organization",
"publication.name": "herodotus_public_connector",
"publication.autocreate.mode": "filtered",
"plugin.name": "pgoutput"
}
}
postman界面操作如下:
下面根据本例中连接器的配置信息,对几个关键属性做进一步的说明:
插槽名称
根据上面的例子,Debezium 会在 PostgreSQL 中创建一个名为 `herodotus_slot` 的复制槽。本例中创建的connector需要通过这个replication slot获取数据变化信息。
可以通过如下sql查看replication slot的信息:
select * from pg_replication_slots;
上图中active_pid为14200,即进程ID为14200的wal_sender进程已经在使用这个replication slot与Debezium交互
database.server.name 和 table.include.list
连接器在获取到数据变化的信息后,将信息转换成统一的数据格式,发布到Kafka的topic上。Debezium 指定一个表对应一个主题。主题名称的格式为 .
接收到的信息可以用以下代码查看:
@KafkaListener(topics = {"herodotus.public.sys_organization"}, groupId = "herodotus.debezium")
public void received(String message) {
log.info("[Herodotus] |- Recived message from Debezium : [{}]", message);
}
5.运行测试
现在,您可以根据上述环境的配置来测试 Debezium 抓取数据的效果。可以进入Kafka容器,使用Kafka提供的kafka-console-consumer.sh查看Topic接收到的数据。具体命令如下:
bin/kafka-console-consumer.sh --bootstrap-server 192.168.101.10:9092 --topic herodotus.public.sys_organization
5.1 插入测试
向数据库 sys_organization 表中插入一条数据
Kafka 的消费者命令行工具接收到 Debezium 发布的数据更改消息:
格式化后的消息体如下,这里忽略schema字段,重点放在payload.before、payload.after和payload.op字段:
{
"schema": {
...
},
"payload": {
"before": null,
"after": {
"organization_id": "4",
"create_time": null,
"ranking": null,
"update_time": null,
"description": null,
"is_reserved": null,
"reversion": null,
"status": 1,
"a4_biz_org_id": null,
"biz_org_code": null,
"biz_org_desc": null,
"biz_org_id": null,
"biz_org_name": null,
"biz_org_type": null,
"organization_name": "AAAAA",
"parent_id": null,
"partition_code": null,
"short_name": null
},
"source": {
"version": "1.6.0.Final",
<p>
"connector": "postgresql",
"name": "herodotus",
"ts_ms": 1626594964405,
"snapshot": "false",
"db": "athena",
"sequence": "[\"63461608\",\"63461608\"]",
"schema": "public",
"table": "sys_organization",
"txId": 2460,
"lsn": 63461896,
"xmin": null
},
"op": "c",
"ts_ms": 1626594964846,
"transaction": null
}
}</p>
由于是插入操作,op为c(create),before为null,after为我们插入的数据。
5.2 更新测试
修改数据库sys_organization表中的一条数据
Kafka 的消费者命令行工具接收到 Debezium 发布的数据更改消息:
格式化后的消息体如下:
{
"schema": {
...
},
"payload": {
"before": null,
"after": {
"organization_id": "4",
"create_time": null,
"ranking": null,
"update_time": null,
"description": null,
"is_reserved": null,
"reversion": null,
"status": 1,
"a4_biz_org_id": null,
"biz_org_code": null,
"biz_org_desc": null,
"biz_org_id": null,
"biz_org_name": null,
"biz_org_type": null,
"organization_name": "BBBBB",
"parent_id": null,
"partition_code": null,
"short_name": null
},
"source": {
"version": "1.6.0.Final",
"connector": "postgresql",
"name": "herodotus",
"ts_ms": 1626595173601,
"snapshot": "false",
"db": "athena",
"sequence": "[\"63466888\",\"63466888\"]",
"schema": "public",
"table": "sys_organization",
"txId": 2461,
"lsn": 63467176,
"xmin": null
},
"op": "u",
"ts_ms": 1626595173825,
"transaction": null
}
}
更新产品信息的操作后,消费者会收到一条消息,其op为u(update),after为修改后的数据。
5.3 删除测试
删除数据库sys_organization表中的一条数据
Kafka 的消费者命令行工具接收到 Debezium 发布的数据更改消息:
格式化后的消息体如下:
{
"schema": {
...
},
"payload": {
"before": {
"organization_id": "3",
"create_time": null,
"ranking": null,
"update_time": null,
"description": null,
"is_reserved": null,
"reversion": null,
"status": null,
"a4_biz_org_id": null,
"biz_org_code": null,
"biz_org_desc": null,
"biz_org_id": null,
"biz_org_name": null,
"biz_org_type": null,
"organization_name": null,
"parent_id": null,
"partition_code": null,
"short_name": null
},
"after": null,
"source": {
"version": "1.6.0.Final",
"connector": "postgresql",
"name": "herodotus",
"ts_ms": 1626594566933,
"snapshot": "false",
"db": "athena",
"sequence": "[\"63461120\",\"63461120\"]",
"schema": "public",
"table": "sys_organization",
"txId": 2458,
"lsn": 63461176,
"xmin": null
},
"op": "d",
"ts_ms": 1626594567136,
"transaction": null
}
}
删除商品信息后,消费者会收到一条op为d(delete)的消息,before为删除前的数据,after为null。
6.总结
通过Debezimu进行数据同步,不仅解决了传统ETL时效性低的问题,还解决了基于消息队列需要在两端编写代码的工程量,基于容器的方式更适合微服务架构的使用,使用Kafka进行消费终端的集成,使得集成方式更加灵活方便,终端类型更加丰富。
示例代码地址:
官方数据:WordPress插件 Automatic Plugin v3.56.2自动采集
WordPress自动插件自动采集有针对性的高质量文章文章,例如亚马逊产品,Clickbank产品,Youtube视频,Vimeo视频,Feeds帖子,eBay拍卖,闪烁图像,Instagram图像,Pinterest图钉,推文,Facebook 网站和SoundCloud歌曲,只需安装和许可,为您的博客工作,它将24/7全天候工作
WordPress自动采集插件
WordPress自动插件简介
WordPress自动插件会自动抓取并将几乎所有网站发布到WordPress。
它可以使用其API从YouTube和Twitter等流行网站导入,也可以使用其抓取模块从您选择的几乎任何网站导入。
最近更新
V3.56.2修复:更改后,单击银行模块更新以再次工作 修复:使永久链接直接指向源现在可用于eBay和全球速卖通会员链接
下载地址: