文章采集工具(同步mysql增量数据的工具Canal,本篇文章的大纲)
优采云 发布时间: 2022-01-19 10:14文章采集工具(同步mysql增量数据的工具Canal,本篇文章的大纲)
老刘是一名即将找工作的*敏*感*词*二年级。一方面,他写博客总结大数据开发的知识点。由于老刘是自学大数据开发的,博客肯定会有一些不足。也希望大家多多批评指正,共同进步!
背景
大数据领域的数据源包括来自业务库的数据,以及移动端嵌入的点数据和服务器端生成的日志数据。我们在对数据进行采集的时候,可以根据下游对数据的需求,使用不同的采集工具。今天老刘说的是canal,一个mysql增量数据同步的工具。这个文章的大纲如下:
Canal的概念mysql中的主备复制原理Canal如何从MySQL同步数据Canal的HA机制的设计各种数据同步方案的小结
老刘力求用这篇文章文章让大家直接上手Canal,不花其他时间学习。
mysql主从复制实现原理
由于使用canal来同步mysql中的增量数据,所以老刘先讲mysql的主备复制原理,再讲canal的核心知识点。
根据这张图,老刘将mysql主备复制的原理分解为以下过程:
主服务器必须先启动二进制日志binlog,用于记录任何修改数据库数据的事件。主服务器将数据更改记录到二进制 binlog 日志中。从服务器会将主服务器的二进制日志复制到其本地中继日志(Relaylog)中。这一步,从服务器会先启动一个工作线程I/O线程,该I/O线程会与主库建立普通的客户端单连接,然后在主库上启动一个特殊的二进制转储(binlog)服务器。dump) 线程,这个binlog dump线程会读取主服务器上二进制日志中的事件,然后将二进制事件发送到I/O线程并保存到从服务器上的中继日志中。从服务器启动SQL线程,从中继日志中读取二进制日志,
至此mysql主备复制的实现原理就讲完了。看完这个流程,你能猜出Canal的工作原理吗?
运河核心知识 运河如何运作
Canal的工作原理是模拟MySQL slave的交互协议,伪装成MySQL slave,向MySQL master发起dump协议。MySQL master 收到 dump 请求后,会开始将 binlog 推送到 Canal。最后 Canal 解析 binlog 对象。
运河概念
canal,美[kəˈnæl],读法是这样的,意思是waterway/pipe/channel,主要目的是同步MySQL中的增量数据(可以理解为实时数据),是阿里巴巴开源下的纯Java开发项目。
运河建筑
server代表一个canal运行实例,对应一个JVM。instance对应一个数据队列,一个canal server对应instance实例下的1..n个子模块:
EventParser:数据源访问,模拟slave协议与master交互,协议分析 EventSink:Parser和Store连接器,数据过滤、处理、分发工作 EventStore:数据存储 MetaManager:增量订阅&消费信息管理器
说完了canal的基本概念,接下来就来说说canal是如何同步mysql的增量数据的。
Canal 同步 MySQL 增量数据 打开 mysql binlog
Canal同步mysql增量数据的前提是开启了mysql binlog,而阿里云的mysql数据库默认开启了binlog,但是如果我们自己安装mysql,需要手动开启binlog日志功能。
首先找到mysql配置文件:
etc/my.cnf
server-id=1
log-bin=mysql-bin
binlog-format=ROW
这里有一个关于binlog格式的知识点,老刘会告诉你。
binlog的三种格式:STATEMENT、ROW、MIXED
ROW 模式(通常使用)
日志会记录每一行数据的修改形式。它不会记录执行 SQL 语句的上下文相关信息。它只记录要修改的数据,修改了哪些数据,修改后的样子。只有价值,不会再有SQL。表关联。优点:它只需要记录哪些数据被修改了,是什么样子的,所以它的日志内容会清楚的记录每一行数据修改的细节,非常容易理解。缺点:在ROW模式下,尤其是数据添加的情况下,所有执行的语句都会被记录在日志中,并且会被记录为每行记录的修改,会产生大量的日志内容
声明模式
每个修改数据的 SQL 语句都会被记录下来。缺点:由于是记录的执行语句,为了让这些语句在slave端正确执行,他还必须在执行过程中记录每条语句的一些相关信息,即上下文信息,以保证所有语句在从端执行时,可以获得与在主端执行时相同的结果。但是目前,比如step()函数在某些版本中无法正确复制,而存储过程中使用了last-insert-id()函数,可能会导致slave和master的ID不一致,即就是会出现数据不一致的情况。在 ROW 模式中情况并非如此。
混合模式
以上两种模式都使用。
运河实时同步
第一:首先我们要配置环境,在conf/example/instance.properties下编辑如下代码
## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要修改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#username/password,需要修改成自己的数据库信息
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\*\\\\..\*
其中,canal.instance.connectionCharset 代表数据库的编码方式对应到 java 中的编码类型,比如 UTF-8,GBK,ISO-8859-1。
第二:配置完成后,即将启动。
sh bin/startup.sh
关闭使用 bin/stop.sh
第三:观察日志一般用cat查看canal/canal.log,example/example.log
第四:在IDEA中启动客户端的业务代码。如果mysql有增量数据,拉过来,在IDEA控制台打印出来,添加到pom.xml文件中:
com.alibaba.otter
canal.client
1.0.12
添加客户端代码:
public class Demo {
public static void main(String[] args) {
//创建连接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("hadoop03", 11111),
"example", "", "");
connector.connect();
//订阅
connector.subscribe();
connector.rollback();
int batchSize = 1000;
int emptyCount = 0;
int totalEmptyCount = 100;
while (totalEmptyCount > emptyCount) {
Message msg = connector.getWithoutAck(batchSize);
long id = msg.getId();
List entries = msg.getEntries();
if(id == -1 || entries.size() == 0){
emptyCount++;
System.out.println("emptyCount : " + emptyCount);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}else{
emptyCount = 0;
printEntry(entries);
}
connector.ack(id);
}
}
// batch -> entries -> rowchange - rowdata -> cols
private static void printEntry(List entries) {
for (CanalEntry.Entry entry : entries){
if(entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN ||
entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND){
continue;
}
CanalEntry.RowChange rowChange = null;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
CanalEntry.EventType eventType = rowChange.getEventType();
System.out.println(entry.getHeader().getLogfileName()+" __ " +
entry.getHeader().getSchemaName() + " __ " + eventType);
List rowDatasList = rowChange.getRowDatasList();
for(CanalEntry.RowData rowData : rowDatasList){
for(CanalEntry.Column column: rowData.getAfterColumnsList()){
System.out.println(column.getName() + " - " +
column.getValue() + " - " +
column.getUpdated());
}
}
}
}
}
第五:在mysql中写入数据,客户端将增量数据打印到控制台。
Canal 的 HA 机构设计
在大数据领域,很多框架都有HA机制。Canal的HA分为两部分。Canal 服务器和 Canal 客户端有相应的 HA 实现:
canal server:为了减少对mysql dump的请求,只需要不同服务器上的一个实例同时运行,其他的都处于standby状态。canal客户端:为了保证有序性,一个实例只能有一个canal客户端同时进行get/ack/rollback操作,否则无法保证客户端接收的顺序。
整个HA机制的控制主要依赖于ZooKeeper的几个特性,ZooKeeper这里不再赘述。
运河服务器:
canal服务器要启动canal实例时,首先尝试用ZooKeeper启动判断(创建一个EPHEMERAL节点,谁创建成功谁就允许启动)。ZooKeeper节点创建成功后,对应的canal服务器会启动对应的canal实例,未成功创建的canal实例将处于standby状态。一旦 ZooKeeper 发现 canal server 创建的节点消失,它立即通知其他 canal server 再次执行步骤 1 中的操作,并重新选择一个 canal server 启动实例。canal客户端每次连接时,都会先询问是谁启动了canal实例的ZooKeeper,然后与之建立连接。一旦连接不可用,它将尝试再次连接。
Canal HA 配置,实时同步数据到kafka。
第一:修改conf/canal.properties文件
canal.zkServers = hadoop02:2181,hadoop03:2181,hadoop04:2181
canal.serverMode = kafka
canal.mq.servers = hadoop02:9092,hadoop03:9092,hadoop04:9092
二:配置conf/example/example.instance
canal.instance.mysql.slaveId = 790 /两台canal server的slaveID唯一
canal.mq.topic = canal_log //指定将数据发送到kafka的topic
数据同步方案总结
说完Canal工具,简单总结一下目前常用的data采集工具。它不会涉及架构知识,而是一个简短的总结,给你一个印象。
常用的data采集工具包括:DataX、Flume、Canal、Sqoop、LogStash等。
DataX(处理离线数据)
DataX是阿里巴巴开源的异构数据源离线同步工具。异构数据源的离线同步是指将源数据同步到目的地。但是,有许多类型的端到端数据源。在DataX之前,端到端的数据源在末端的链接会形成复杂的网状结构,非常碎片化,无法抽象出同步核心逻辑。
为了解决异构数据源的同步问题,DataX将复杂的网状同步链路变成星形数据链路,DataX作为中间传输载体负责连接各种数据源。
因此,当您需要访问一个新的数据源时,只需要将这个数据源连接到DataX,就可以实现与现有数据源的无缝数据同步。
DataX作为离线数据同步框架,本身是采用Framework+plugin架构构建的。数据源读写被抽象为Reader/Writer插件,并入整个同步框架。
Reader:是data采集模块,负责采集数据源的数据,并将数据发送给Framework。Writer:是一个数据写入模块,负责不断地从Framework中取出数据,并将数据写入目的地。框架:用于连接Reader和Writer,作为两者的数据传输通道,处理缓冲、并发、数据转换等问题。
DataX的核心架构如下:
核心模块介绍:
DataX 完成一个单一的数据同步作业,我们称之为 Job。DataX收到Job后,会启动一个进程,完成整个Job同步过程。DataX Job启动后,会根据不同的源端切分策略将Job分成多个小Task(子任务),方便并发执行。多个任务拆分后,DataX Job会调用Scheduler模块将拆分后的任务重新组合,并根据配置的并发数据量组装成一个TaskGroup(任务组)。每个任务组负责以一定的并发性运行所有分配的任务。单个任务组的默认并发任务数为 5。每个任务由任务组启动。Task启动后,Reader->Channel->的线程 Writer会固定启动,完成任务同步。DataX作业完成后,Job*敏*感*词*并等待多个TaskGroup模块任务完成,待所有TaskGroup任务完成后Job成功退出。否则异常退出。Flume(处理实时数据)
Flume的主要应用场景是同步日志数据,主要包括三个组件:Source、Channel、Sink。
Flume最大的优势在于官网提供了丰富的Source、Channel、Sink。根据不同的业务需求,我们可以在官网找到相关配置。此外,Flume 还提供了自定义这些组件的接口。
Logstash(处理离线数据)
Logstash是一个具有实时数据传输能力的管道,负责将数据信息从管道的输入端传输到管道的输出端;同时,这个管道还可以让你根据自己的需要在中间添加一个过滤器,Logstash 提供了很多强大的过滤器来满足各种应用场景。
Logstash 是用 JRuby 编写的,使用简单的基于消息的架构,并在 JVM 上运行。管道中的数据流称为事件,分为输入阶段、过滤器阶段和输出阶段。
Sqoop(处理离线数据)
Sqoop 是一种用于在 Hadoop 和关系数据库之间传输数据的工具。它用于将数据从 MySQL 等关系数据库导出到 Hadoop 的 HDFS,从 Hadoop 文件系统导出到关系数据库。Sqoop 底层还是使用了 MapReducer,所以在使用的时候一定要注意数据倾斜。
总结
老刘的文章文章主要介绍了Canal工具的核心知识点及其data采集工具的对比,其中data采集工具只简单的说一下概念和应用,以及目的是让每个人都有印象。老刘敢保证,看完这个文章基本就相当于入门了,剩下的就是练习了。
好了,同步mysql增量数据的工具canal的内容就讲完了。虽然现在的水平可能比不上大佬,但是老刘会努力变得更好,让你自己学习,从不求人!
如有相关问题,请联系公众号:努力工作的老刘。文章我看到了,点赞、关注、支持一波!