文章采集组合工具(同步mysql增量数据的工具Canal,本篇文章的大纲)
优采云 发布时间: 2021-08-30 03:13文章采集组合工具(同步mysql增量数据的工具Canal,本篇文章的大纲)
Lao Liu 是一名即将找到工作的二年级*敏*感*词*。写博客一方面是总结大数据开发的知识点,另一方面是希望能帮助小伙伴们在不求人的情况下进行自学。由于老刘是大数据开发自学,所以博客肯定会有一些不足,还望大家批评指正,一起进步!
背景
大数据领域的数据来源包括业务数据库的数据,以及移动终端的数据和服务器产生的日志数据。当我们采集数据时,可以根据下游数据的不同要求,使用不同的采集工具来做。今天老刘给大家介绍一下Canal,一款mysql增量数据同步工具。本文文章的大纲如下:
Canal的概念mysql中主备复制的原理Canal如何同步MySQL中的数据Canal的HA机制设计了各种数据同步方案的简单总结
老刘力求用这篇文章让大家直接使用Canal工具,而不是花其他时间学习。
mysql主从复制实现原理
既然是用Canal来同步mysql中的增量数据,那么小刘先讲一下mysql的主备复制原理,再讲一下Canal的核心知识。
根据这张图,老刘将mysql的主备复制原理分解为以下几个过程:
主服务器必须先启动二进制日志binlog,用于记录任何修改数据库数据的事件。主服务器将数据更改记录到二进制 binlog 日志中。从服务器会将主服务器的二进制日志复制到其本地中继日志(Relaylog)。这一步详细说,首先slave server会启动一个worker thread I/O线程,I/O线程会和主库建立一个普通的client单连接,然后启动一个特殊的二进制转储(binlog dump)线程,这个binlog dump 线程会读取主服务器上二进制日志中的事件,然后将二进制事件发送到I/O线程并保存在从服务器上的中继日志中。从服务器启动SQL线程,从中继日志中读取二进制日志,在从服务器本地进行数据修改操作,从服务器更新数据。
那么mysql主/备复制的实现原理就结束了。看完这个流程,你能猜到Canal是怎么工作的吗?
Canal 核心知识点 Canal 工作原理
Canal 的工作原理是模拟 MySQL slave 的交互协议,伪装成 MySQL slave,向 MySQL master 发起 dump 协议。 MySQL master收到转储请求后,会开始推送binlog到Canal。最后,Canal 会解析 binlog 对象。
运河概念
Canal,美国的[kəˈnæl],读成这样,意思是水路/管道/渠道。主要目的是同步MySQL中的增量数据(可以理解为实时数据)。它是阿里巴巴的子公司。用纯 Java 开发的开源项目。
运河建筑
server代表canal的一个运行实例,对应一个JVM。 Instance对应一个数据队列,1个canal server对应instanceinstance下的1..n子模块:
EventParser:数据源访问,模拟salve协议和master的交互,协议分析EventSink:Parser和Store链接器,数据过滤、处理、分发。 EventStore:数据存储 MetaManager:增量订阅消费信息管理
说完了Canal的基本概念,接下来我们来谈谈Canal是如何同步mysql的增量数据的。
canal同步mysql增量数据并打开mysql binlog
我们使用Canal同步mysql增量数据的前提是开启了mysql的binlog。阿里云的mysql数据库默认开启了binlog,但是如果我们自己安装mysql,需要手动开启binlog日志功能。
首先找到mysql配置文件:
etc/my.cnf
server-id=1
log-bin=mysql-bin
binlog-format=ROW
这里有一个关于binlog格式的知识点,老刘来告诉你。
binlog的三种格式:STATEMENT、ROW、MIXED
ROW 模式(通常使用它)
日志会记录每行数据被修改的形式。它不会记录 SQL 语句执行的上下文相关信息。它只会记录要修改的数据,修改了哪些数据,修改了哪些数据。只有值,没有SQL多表关联的情况。
优点:它只需要记录修改了哪条数据,是什么样子的,所以它的日志内容会非常清楚地记录每一行数据修改的细节,非常容易理解。
缺点:在ROW模式下,尤其是添加数据时,所有执行的语句都会被记录到日志中,并且会随着每一行记录的修改而被记录下来,会产生大量的日志内容。
语句模式
每一条修改数据的 SQL 语句都会被记录。
缺点:因为是记录的执行语句,为了让这些语句在slave端正确执行,他还必须记录每条语句执行过程中的一些相关信息,即上下文信息。确保所有语句在从端执行时都能得到与在主端执行时相同的结果。
但是目前,例如在某些版本中无法正确复制 step() 函数。存储过程中使用了last-insert-id()函数,可能会导致slave和master上的ID不一致。 ROW模式下没有数据不一致。
混合模式
同时使用上述两种模式。
canal实时同步首先我们需要配置环境,在conf/example/instance.properties下:
## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要修改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#username/password,需要修改成自己的数据库信息
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\*\\\\..\*
其中canal.instance.connectionCharset表示java中编码类型对应的数据库的编码方式,如UTF-8、GBK、ISO-8859-1。
配置完成后,就要开始了
sh bin/startup.sh
关闭使用 bin/stop.sh
观察日志
一般用cat查看canal/canal.log,example/example.log
启动客户端
IDEA业务代码中,如果mysql中有增量数据,拉入IDEA控制台打印出来
在 pom.xml 文件中添加:
com.alibaba.otter
canal.client
1.0.12
添加客户端代码:
public class Demo {
public static void main(String[] args) {
//创建连接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("hadoop03", 11111),
"example", "", "");
connector.connect();
//订阅
connector.subscribe();
connector.rollback();
int batchSize = 1000;
int emptyCount = 0;
int totalEmptyCount = 100;
while (totalEmptyCount > emptyCount) {
Message msg = connector.getWithoutAck(batchSize);
long id = msg.getId();
List entries = msg.getEntries();
if(id == -1 || entries.size() == 0){
emptyCount++;
System.out.println("emptyCount : " + emptyCount);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}else{
emptyCount = 0;
printEntry(entries);
}
connector.ack(id);
}
}
// batch -> entries -> rowchange - rowdata -> cols
private static void printEntry(List entries) {
for (CanalEntry.Entry entry : entries){
if(entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN ||
entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND){
continue;
}
CanalEntry.RowChange rowChange = null;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
CanalEntry.EventType eventType = rowChange.getEventType();
System.out.println(entry.getHeader().getLogfileName()+" __ " +
entry.getHeader().getSchemaName() + " __ " + eventType);
List rowDatasList = rowChange.getRowDatasList();
for(CanalEntry.RowData rowData : rowDatasList){
for(CanalEntry.Column column: rowData.getAfterColumnsList()){
System.out.println(column.getName() + " - " +
column.getValue() + " - " +
column.getUpdated());
}
}
}
}
}
在mysql中写入数据,客户端将增量数据打印到控制台。 Canal的HA机制设计
大数据领域的很多框架都会有HA机制。运河的HA分为两部分。 Canal 服务器和 Canal 客户端有相应的 HA 实现:
canal server:为了减少mysql dump的请求,不同服务器上的实例要求同一时间只有一个实例在运行,其他的都处于standby状态。 Canal客户端:为了保证有序,一个canal客户端只能同时对一个实例进行get/ack/rollback操作,否则无法保证客户端的接收顺序。
整个HA机制的控制主要依赖ZooKeeper的几个特性,ZooKeeper在此不再赘述。
运河服务器:
canal server 想要启动canal 实例时,会先尝试对ZooKeeper 进行启动判断(创建一个EPHEMERAL 节点,谁启动谁成功)。 ZooKeeper节点创建成功后,对应的canal服务器会启动对应的canal实例,未成功创建的canal实例会进入standby状态。 ZooKeeper一旦发现canal server创建的节点消失了,会立即通知其他canal server重新执行步骤1,重新选择canal server启动实例。 canal客户端每次连接时,都会先询问谁启动了canal实例的ZooKeeper,然后再与其建立连接。一旦连接不可用,它将尝试再次连接。 canal client的方法和canal server类似,也是使用ZooKeeper抢占EPHEMERAL节点的方法进行控制。
配置Canal HA并实时同步数据到Kafka。
修改conf/canal.properties文件
canal.zkServers = hadoop02:2181,hadoop03:2181,hadoop04:2181
canal.serverMode = kafka
canal.mq.servers = hadoop02:9092,hadoop03:9092,hadoop04:9092
配置 conf/example/example.instance
canal.instance.mysql.slaveId = 790 /两台canal server的slaveID唯一
canal.mq.topic = canal_log //指定将数据发送到kafka的topic
数据同步计划总结
说完Canal工具,现在给大家简单总结一下目前常用的数据采集tool。不涉及架构知识,简单总结给大家一个印象。
常见的data采集工具包括:DataX、Flume、Canal、Sqoop、LogStash等
DataX(处理离线数据)
DataX是阿里巴巴开源的异构数据源离线同步工具。异构数据源的离线同步是指将数据从源同步到目的地。但是,端到端数据源的类型很多。在DataX存在之前,端到端的链路会形成复杂的网状结构,非常碎片化,无法抽象出同步核心逻辑。
为了解决异构数据源的同步问题,DataX将复杂的Mesh同步链路改造成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。
所以,当你需要连接一个新的数据源时,你只需要将这个数据源连接到DataX,就可以和现有的数据源无缝同步数据。
DataX本身作为离线数据同步框架,采用Framework+plugin架构构建。数据源读写被抽象为Reader/Writer插件,并纳入整个同步框架。
Reader:是data采集模块,负责采集data源的数据,并将数据发送给Framework。 Writer:是数据写入模块,负责不断地从Framework中获取数据,并将数据写入目的地。 Framework:用于连接Reader和Writer,作为两者之间的数据传输通道,处理缓冲、并发、数据转换等问题。
DataX的核心架构如下图:
核心模块介绍:
DataX 完成单个数据同步作业。我们称之为工作。 DataX收到一个作业后,会启动一个进程来完成整个作业同步过程。 DataX Job启动后,会根据不同的源切分策略,分成多个小Task(子任务),方便并发执行。拆分多个任务后,DataX Job 会调用Scheduler 模块,根据配置的并发数据量重新组合拆分的任务,组装成一个TaskGroup(任务组)。每个 TaskGroup 负责以一定的并发量运行所有分配的任务。单个任务组的默认并发任务数为 5。每个任务由 TaskGroup 启动。 Task启动后,会启动Reader->Channel->Writer线程完成任务同步工作。 DataX作业完成后,作业会*敏*感*词*并等待多个TaskGroup模块任务的完成,等待所有TaskGroup任务完成后作业成功退出。否则异常退出。 Flume(处理实时数据)
Flume 的主要应用场景是同步日志数据,主要收录三个组件:Source、Channel、Sink。
Flume 最大的优势是官网提供了丰富的 Source、Channel、Sink。根据不同的业务需求,我们可以在官网找到相关的配置。此外,Flume 还提供了自定义这些组件的接口。
Logstash(处理离线数据)
Logstash 是一个具有实时数据传输能力的管道,负责将数据信息从管道的输入端传输到管道的输出端;同时,这个pipeline还允许你根据自己的需要在中间添加过滤Net,Logstash提供了很多强大的过滤器来满足各种应用场景。
Logstash 由 JRuby 编写,使用简单的基于消息的架构,并在 JVM 上运行。管道中的数据流称为事件,分为输入阶段、过滤阶段和输出阶段。
Sqoop(处理离线数据)
Sqoop 是一种用于在 Hadoop 和关系数据库之间传输数据的工具。用于将数据从MySQL等关系数据库导出到Hadoop的HDFS,从Hadoop文件系统导出到关系数据库。 Sqoop底层还是MapReducer,使用时一定要注意数据倾斜。
总结
老刘的文章文章主要介绍了Canal工具的核心知识点以及它们的数据对比采集tools。其中data采集tools只是大体讲概念和应用,目的是为了让大家有个印象。老刘敢保证,看完这个文章,基本就相当于入门了,剩下的就是练习了。
好了,mysql增量数据同步工具Canal的内容就讲完了。虽然现在的水平可能不如大哥们,但老刘会努力变得更好,让所有的朋友都自学,从不求人!
如有相关问题,请联系公众号:努力工作的老刘。 文章看到了,喜欢这个,关注支持一波!