采集文章系统(这是关于变更数据采集(CDC)系列的第二部分。)
优采云 发布时间: 2022-02-20 19:08采集文章系统(这是关于变更数据采集(CDC)系列的第二部分。)
这是有关更改数据采集 (CDC) 的系列文章的第二部分。在这个 文章 中,让我们讨论 CDC 用例,看看有哪些工具可以帮助您将 CDC 添加到您的架构中。
更改数据采集 促进了事件驱动的应用程序架构。它允许应用程序侦听数据库、数据仓库等中的变化并根据这些变化采取行动。
概括地说,以下是对数据更改采取行动所产生的用例和架构。
让我们探索一下。
提取、转换、加载
到目前为止,CDC 最常见的用例之一是提取、转换、加载 (ETL)。ETL 是一个从源获取数据(提取)、以某种方式对其进行处理(转换)并将其发送到目标(加载)的过程。
数据复制(一次性同步)和镜像(连续复制)是 ETL 过程的好例子。ETL 是一个涵盖非常不同的用例的总称,例如:.
CDC 不仅可以帮助解决这些用例,而且还是解决这些问题的最佳方式。例如,为了将数据镜像到数据仓库,您必须捕获发生的任何更改并将其应用于源数据库。正如本系列第 1 部分讨论的流复制日志系列的第 1 部分中所讨论的,数据库使用 CDC 来使备用实例保持最新以进行故障转移,因为它高效且可扩展。在更广泛的架构中挖掘这些事件时,您的数据仓库可以与备用数据库实例一样保持最新,以进行灾难恢复。
保持缓存和搜索索引系统更新也是 ETL 问题和 CDC 的一个很好的用例。今天创建的大型应用程序由许多不同的数据存储组成。例如,一些架构利用 Postgres、Redis 和 Elasticsearch 作为关系数据库、缓存层和搜索引擎。所有这些都是为特定数据用例设计的记录系统,但数据需要在每个存储中进行镜像。
您永远不希望用户搜索产品并发现它不再存在。陈旧的缓存和搜索索引会导致糟糕的用户体验。CDC 可用于构建数据管道,使这些存储与其上游依赖项保持同步。
理论上,一个应用程序可以同时向 Postgres、Redis 和 Elasticsearch 写入数据,但是“双写入”的管理很困难,并且可能导致系统不同步。CDC 提供了更健壮且更易于维护的实现。与其将更新索引和缓存的逻辑添加到单个单体应用程序中,不如创建一个事件驱动的微服务,该微服务可以独立于面向用户的系统进行构建、维护、改进和部署。该微服务保持索引和缓存更新,以确保用户操作最相关的数据。
集成和自动化
SaaS 的兴起导致生成数据或需要更新数据的工具数量激增。CDC 可以提供更好的模型来保持 Salesforce、Hubspot 等的更新,并允许需要响应这些数据变化的业务逻辑自动化。
我们上面描述的每个用例都将数据发送到特定的目的地。但是,最强大的目的地是具有云功能的目的地。捕获数据更改和触发云功能可用于执行本文中提到(或未提及)的每个用例。
由于无需维护服务器,云功能大幅增长;它们自动扩展,易于使用和部署。这种流行性和实用性在 JAMStack 等架构中得到了清晰的证明。CDC 非常适合这种架构模式。
今天,云功能是由事件触发的。此事件可能发生在文件上传到 Amazon S3 或 HTTP 请求时。但是,正如您可能已经猜到的那样,这个触发事件可能是由 CDC 系统发出的。
例如,这是一个 AWS Lambda 函数,它接受数据更改事件并索引 Algolia 的搜索数据:
const algoliasearch = require("algoliasearch");
const client = algoliasearch(process.env.ALGOLIA_APP_ID, process.env.ALGOLIA_API_KEY);
const index = client.initIndex(process.env.ALGOLIA_INDEX_NAME);
exports.handler = async function(event, context) {
console.log("EVENT: \\n" + JSON.stringify(event, null, 2))
const request = event.Records[0].cf.request;
// Accessing the Data Record
//
const body = Buffer.from(request.body.data, 'base64').toString();
const { schema, payload } = body;
const { before, after, source, op } = payload;
if (req.method === 'POST') {
try {
// if read, create, or update operation create o update index
if (op === 'r' || op === 'c' || op === 'u') {
console.log(`operation: ${op}, id: ${after.id}`)
after.objectID = after.id
await index.saveObject(after)
} else if (op === 'd') {
console.log(`operation: d, id: ${before.id}`)
await index.deleteObject(before.id)
}
return res.status(200).send()
} catch (error) {
console.log(`error: ${JSON.stringify(error)}`)
return res.status(500).send()
}
}
return context.logStreamName
}
每次触发这个函数,它都会查看数据变化(op),并在 Algolia 中执行相应的动作。例如,如果数据库发生了删除操作,我们可以在 Algolia 中执行一个 deleteObject。
响应 CDC 事件的函数可以小而简单。但是,CDC 以及基于事件的架构也可以简化原本非常复杂的架构。
例如,在应用程序中实现 Webhook 的功能成为 CDC 中更紧迫的问题。Webhook 允许用户在某些事件发生时触发 POST 请求,通常是数据更改。例如,使用 Github,您可以在合并拉取请求时触发云功能。合并的拉取请求是对数据存储的 UPDATE 操作,这意味着 CDC 系统可以捕获此事件。一般来说,大多数 webhook 事件都可以转换为 CDC 系统可以捕获的 INSERT UPDATE 和 DELETE 操作。
历史
在某些情况下,您可能不想对 CDC 事件采取行动,而只想存储原创更改。使用 CDC,数据管道可以将所有更改事件存储到云存储桶中,以进行长期处理和分析。存储用于历史分析的数据的最佳位置是在云存储桶中,称为数据湖。
数据湖是一个集中式存储,可让您以任意规模存储所有结构化和非结构化数据。数据湖通常使用云对象存储桶解决方案,例如 Amazon S3 或 Digital Ocean Spaces。
例如,一旦数据进入数据湖,Amazon Presto 等 SQL 查询引擎就可以针对不断变化的数据集运行分析查询。
在存储原创更改时,您不仅拥有数据的当前状态,还拥有所有以前的状态(历史)。这就是 CDC 为历史分析增加很多价值的原因。
拥有历史数据可以让您支持灾难恢复工作,还可以让您回答有关数据的回顾性问题。例如,假设您的团队重新定义了每月活跃用户 (MAU) 的计算方式。借助用户数据集的完整历史记录,可以根据过去的任何日期进行新的 MAU 计算,并将结果与当前状态进行比较。
这种丰富的历史也具有面向用户的价值。审核日志和活动日志是向用户显示数据更改的功能。
捕获和存储更改事件为实现这些功能提供了更好的框架。与 webhook 一样,审计日志和活动日志都源于可被 CDC 系统捕获的操作。
警报
任何警报系统的工作都是将事件通知利益相关者。例如,当您收到新的电子邮件通知时,系统会通知您对电子邮件数据存储的 INSERT 操作。通常,大多数警报都与数据存储的变化有关,这意味着 CDC 非常适合电力警报系统。
例如,假设您有一家电子商务商店。在采购表上启用 CDC 后,您可以捕获更改事件并通过在进行新采购时执行 Slack 警报来通知团队。
就像审计或活动日志一样,CDC 提供的通知不仅提供有关发生情况的信息,还提供有关更改本身的详细信息。
Tom 将标题从“会议纪要”更新为“我的新会议”。
这种警报行为也具有内在价值。从基础设施监控的角度来看,CDC 事件可以深入了解用户如何与您的应用程序和数据进行交互。例如,您可以查看用户添加、更新或删除信息的时间和方式。可以将此数据发送到 Prometheus UI 以监控此信息并采取措施。
开始使用 CDC
在第一部分中,我们讨论了 CDC 的各种常见实现。
这些都可以用来构建我们在本文中讨论的用例。最重要的是,由于 CDC 专注于数据,因此该过程与编程语言无关,并且可以集成到大多数架构中。
轮询和触发器
使用轮询或数据库触发器时,没有开销,也不需要安装。您可以从构建查询开始,以轮询或利用数据库的触发器(如果支持)。
流日志处理
数据库使用流复制日志进行备份和恢复,这意味着大多数数据库提供了一些开箱即用的 CDC 行为。挖掘这些事件的难易程度取决于数据存储本身。最好的起点是深入研究数据库的复制功能。下面是一些最流行的数据库的复制日志资源。
要开始使用流式日志记录,答案与相关数据库相关联。在未来文章,我将探索每种情况的样子。
直接实施任何这些确实需要一些时间、计划和努力。如果您想开始使用 CDC,最低门槛是采用知道如何从您使用的数据存储中进行通信和捕获更改的 CDC 工具。
更改数据采集工具
这里有一些很棒的工具供您评估。
地比西
Debezium 是迄今为止最受欢迎的 CDC 工具。它维护良好、开源,并建立在 Apache Kafka 之上。它支持 MongoDB、MySQL、PostgreSQL 和更多开箱即用的数据库。
在高层次上,Debezium 使用 Hook 数据库的复制日志并将更改事件发送到 Kafka。你甚至可以在没有 Kafka 的情况下独立运行 Debezium。
真正的好处是 Debezium 都是基于配置的。安装和配置 Debezium 后,您可以使用基于 JSON 的配置来配置与数据存储的连接。
{
"name": "fulfillment-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "192.168.99.100",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"database.server.name": "fulfillment",
"table.include.list": "public.inventory"
}
}
连接后,Debezium 会对您的数据进行初始快照,并将更改事件发送到 Kafka 主题。然后服务可以使用主题并对其采取行动。
这里有一些开始使用 Debeizium 的好地方。
梅罗沙
Meroxa 是一个实时数据协调平台,可为您提供实时基础架构。Meroxa 消除了与配置和管理代理、连接器、转换、功能和流式基础设施相关的时间和开销。您所要做的就是添加资源并构建管道。Meroxa 支持 PostgreSQL、MongoDB、SQL Server 等。
可以在 Visual Dashboard 中或使用 Meroxa CLI 建立 CDC 管道:
# Add Resource
$ meroxa resource add my-postgres --type postgres -u postgres://$PG_USER:$PG_PASS@$PG_URL:$PG_PORT/$PG_DB
# Add Webhook
$ meroxa resource add my-url --type url -u $CUSTOM_HTTP_URL
# Create CDC Pipeline
$ meroxa connect --from my-postgres --input $TABLE_NAME --to my-url
上面的用例有更详细的介绍。
有一些很好的资源可以帮助您开始使用 Meroxa。
我迫不及待地想看看你建造了什么。