文章采集工具(同步mysql增量数据的工具Canal,本篇文章的大纲)

优采云 发布时间: 2022-01-19 10:14

  文章采集工具(同步mysql增量数据的工具Canal,本篇文章的大纲)

  老刘是一名即将找工作的*敏*感*词*二年级。一方面,他写博客总结大数据开发的知识点。由于老刘是自学大数据开发的,博客肯定会有一些不足。也希望大家多多批评指正,共同进步!

  背景

  大数据领域的数据源包括来自业务库的数据,以及移动端嵌入的点数据和服务器端生成的日志数据。我们在对数据进行采集的时候,可以根据下游对数据的需求,使用不同的采集工具。今天老刘说的是canal,一个mysql增量数据同步的工具。这个文章的大纲如下:

  Canal的概念mysql中的主备复制原理Canal如何从MySQL同步数据Canal的HA机制的设计各种数据同步方案的小结

  老刘力求用这篇文章文章让大家直接上手Canal,不花其他时间学习。

  mysql主从复制实现原理

  由于使用canal来同步mysql中的增量数据,所以老刘先讲mysql的主备复制原理,再讲canal的核心知识点。

  

  根据这张图,老刘将mysql主备复制的原理分解为以下过程:

  主服务器必须先启动二进制日志binlog,用于记录任何修改数据库数据的事件。主服务器将数据更改记录到二进制 binlog 日志中。从服务器会将主服务器的二进制日志复制到其本地中继日志(Relaylog)中。这一步,从服务器会先启动一个工作线程I/O线程,该I/O线程会与主库建立普通的客户端单连接,然后在主库上启动一个特殊的二进制转储(binlog)服务器。dump) 线程,这个binlog dump线程会读取主服务器上二进制日志中的事件,然后将二进制事件发送到I/O线程并保存到从服务器上的中继日志中。从服务器启动SQL线程,从中继日志中读取二进制日志,

  至此mysql主备复制的实现原理就讲完了。看完这个流程,你能猜出Canal的工作原理吗?

  运河核心知识 运河如何运作

  Canal的工作原理是模拟MySQL slave的交互协议,伪装成MySQL slave,向MySQL master发起dump协议。MySQL master 收到 dump 请求后,会开始将 binlog 推送到 Canal。最后 Canal 解析 binlog 对象。

  运河概念

  canal,美[kəˈnæl],读法是这样的,意思是waterway/pipe/channel,主要目的是同步MySQL中的增量数据(可以理解为实时数据),是阿里巴巴开源下的纯Java开发项目。

  运河建筑

  

  server代表一个canal运行实例,对应一个JVM。instance对应一个数据队列,一个canal server对应instance实例下的1..n个子模块:

  EventParser:数据源访问,模拟slave协议与master交互,协议分析 EventSink:Parser和Store连接器,数据过滤、处理、分发工作 EventStore:数据存储 MetaManager:增量订阅&消费信息管理器

  说完了canal的基本概念,接下来就来说说canal是如何同步mysql的增量数据的。

  Canal 同步 MySQL 增量数据 打开 mysql binlog

  Canal同步mysql增量数据的前提是开启了mysql binlog,而阿里云的mysql数据库默认开启了binlog,但是如果我们自己安装mysql,需要手动开启binlog日志功能。

  首先找到mysql配置文件:

  etc/my.cnf

server-id=1

log-bin=mysql-bin

binlog-format=ROW

  这里有一个关于binlog格式的知识点,老刘会告诉你。

  binlog的三种格式:STATEMENT、ROW、MIXED

  ROW 模式(通常使用)

  日志会记录每一行数据的修改形式。它不会记录执行 SQL 语句的上下文相关信息。它只记录要修改的数据,修改了哪些数据,修改后的样子。只有价值,不会再有SQL。表关联。优点:它只需要记录哪些数据被修改了,是什么样子的,所以它的日志内容会清楚的记录每一行数据修改的细节,非常容易理解。缺点:在ROW模式下,尤其是数据添加的情况下,所有执行的语句都会被记录在日志中,并且会被记录为每行记录的修改,会产生大量的日志内容

  声明模式

  每个修改数据的 SQL 语句都会被记录下来。缺点:由于是记录的执行语句,为了让这些语句在slave端正确执行,他还必须在执行过程中记录每条语句的一些相关信息,即上下文信息,以保证所有语句在从端执行时,可以获得与在主端执行时相同的结果。但是目前,比如step()函数在某些版本中无法正确复制,而存储过程中使用了last-insert-id()函数,可能会导致slave和master的ID不一致,即就是会出现数据不一致的情况。在 ROW 模式中情况并非如此。

  混合模式

  以上两种模式都使用。

  运河实时同步

  第一:首先我们要配置环境,在conf/example/instance.properties下编辑如下代码

  ## mysql serverId

canal.instance.mysql.slaveId = 1234

#position info,需要修改成自己的数据库信息

canal.instance.master.address = 127.0.0.1:3306

canal.instance.master.journal.name =

canal.instance.master.position =

canal.instance.master.timestamp =

#canal.instance.standby.address =

#canal.instance.standby.journal.name =

#canal.instance.standby.position =

#canal.instance.standby.timestamp =

#username/password,需要修改成自己的数据库信息

canal.instance.dbUsername = canal

canal.instance.dbPassword = canal

canal.instance.defaultDatabaseName =

canal.instance.connectionCharset = UTF-8

#table regex

canal.instance.filter.regex = .\*\\\\..\*

其中,canal.instance.connectionCharset 代表数据库的编码方式对应到 java 中的编码类型,比如 UTF-8,GBK,ISO-8859-1。

  第二:配置完成后,即将启动。

  sh bin/startup.sh

关闭使用 bin/stop.sh

  第三:观察日志一般用cat查看canal/canal.log,example/example.log

  第四:在IDEA中启动客户端的业务代码。如果mysql有增量数据,拉过来,在IDEA控制台打印出来,添加到pom.xml文件中:

  

com.alibaba.otter

canal.client

1.0.12

  添加客户端代码:

  public class Demo {

public static void main(String[] args) {

//创建连接

CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("hadoop03", 11111),

"example", "", "");

connector.connect();

//订阅

connector.subscribe();

connector.rollback();

int batchSize = 1000;

int emptyCount = 0;

int totalEmptyCount = 100;

while (totalEmptyCount > emptyCount) {

Message msg = connector.getWithoutAck(batchSize);

long id = msg.getId();

List entries = msg.getEntries();

if(id == -1 || entries.size() == 0){

emptyCount++;

System.out.println("emptyCount : " + emptyCount);

try {

Thread.sleep(3000);

} catch (InterruptedException e) {

e.printStackTrace();

}

}else{

emptyCount = 0;

printEntry(entries);

}

connector.ack(id);

}

}

// batch -> entries -> rowchange - rowdata -> cols

private static void printEntry(List entries) {

for (CanalEntry.Entry entry : entries){

if(entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN ||

entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND){

continue;

}

CanalEntry.RowChange rowChange = null;

try {

rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());

} catch (InvalidProtocolBufferException e) {

e.printStackTrace();

}

CanalEntry.EventType eventType = rowChange.getEventType();

System.out.println(entry.getHeader().getLogfileName()+" __ " +

entry.getHeader().getSchemaName() + " __ " + eventType);

List rowDatasList = rowChange.getRowDatasList();

for(CanalEntry.RowData rowData : rowDatasList){

for(CanalEntry.Column column: rowData.getAfterColumnsList()){

System.out.println(column.getName() + " - " +

column.getValue() + " - " +

column.getUpdated());

}

}

}

}

}

  第五:在mysql中写入数据,客户端将增量数据打印到控制台。

  Canal 的 HA 机构设计

  在大数据领域,很多框架都有HA机制。Canal的HA分为两部分。Canal 服务器和 Canal 客户端有相应的 HA 实现:

  canal server:为了减少对mysql dump的请求,只需要不同服务器上的一个实例同时运行,其他的都处于standby状态。canal客户端:为了保证有序性,一个实例只能有一个canal客户端同时进行get/ack/rollback操作,否则无法保证客户端接收的顺序。

  整个HA机制的控制主要依赖于ZooKeeper的几个特性,ZooKeeper这里不再赘述。

  运河服务器:

  canal服务器要启动canal实例时,首先尝试用ZooKeeper启动判断(创建一个EPHEMERAL节点,谁创建成功谁就允许启动)。ZooKeeper节点创建成功后,对应的canal服务器会启动对应的canal实例,未成功创建的canal实例将处于standby状态。一旦 ZooKeeper 发现 canal server 创建的节点消失,它立即通知其他 canal server 再次执行步骤 1 中的操作,并重新选择一个 canal server 启动实例。canal客户端每次连接时,都会先询问是谁启动了canal实例的ZooKeeper,然后与之建立连接。一旦连接不可用,它将尝试再次连接。

  Canal HA 配置,实时同步数据到kafka。

  第一:修改conf/canal.properties文件

  canal.zkServers = hadoop02:2181,hadoop03:2181,hadoop04:2181

canal.serverMode = kafka

canal.mq.servers = hadoop02:9092,hadoop03:9092,hadoop04:9092

  二:配置conf/example/example.instance

  canal.instance.mysql.slaveId = 790 /两台canal server的slaveID唯一

canal.mq.topic = canal_log //指定将数据发送到kafka的topic

  数据同步方案总结

  说完Canal工具,简单总结一下目前常用的data采集工具。它不会涉及架构知识,而是一个简短的总结,给你一个印象。

  常用的data采集工具包括:DataX、Flume、Canal、Sqoop、LogStash等。

  DataX(处理离线数据)

  DataX是阿里巴巴开源的异构数据源离线同步工具。异构数据源的离线同步是指将源数据同步到目的地。但是,有许多类型的端到端数据源。在DataX之前,端到端的数据源在末端的链接会形成复杂的网状结构,非常碎片化,无法抽象出同步核心逻辑。

  

  为了解决异构数据源的同步问题,DataX将复杂的网状同步链路变成星形数据链路,DataX作为中间传输载体负责连接各种数据源。

  因此,当您需要访问一个新的数据源时,只需要将这个数据源连接到DataX,就可以实现与现有数据源的无缝数据同步。

  

  DataX作为离线数据同步框架,本身是采用Framework+plugin架构构建的。数据源读写被抽象为Reader/Writer插件,并入整个同步框架。

  Reader:是data采集模块,负责采集数据源的数据,并将数据发送给Framework。Writer:是一个数据写入模块,负责不断地从Framework中取出数据,并将数据写入目的地。框架:用于连接Reader和Writer,作为两者的数据传输通道,处理缓冲、并发、数据转换等问题。

  DataX的核心架构如下:

  

  核心模块介绍:

  DataX 完成一个单一的数据同步作业,我们称之为 Job。DataX收到Job后,会启动一个进程,完成整个Job同步过程。DataX Job启动后,会根据不同的源端切分策略将Job分成多个小Task(子任务),方便并发执行。多个任务拆分后,DataX Job会调用Scheduler模块将拆分后的任务重新组合,并根据配置的并发数据量组装成一个TaskGroup(任务组)。每个任务组负责以一定的并发性运行所有分配的任务。单个任务组的默认并发任务数为 5。每个任务由任务组启动。Task启动后,Reader->Channel->的线程 Writer会固定启动,完成任务同步。DataX作业完成后,Job*敏*感*词*并等待多个TaskGroup模块任务完成,待所有TaskGroup任务完成后Job成功退出。否则异常退出。Flume(处理实时数据)

  

  Flume的主要应用场景是同步日志数据,主要包括三个组件:Source、Channel、Sink。

  Flume最大的优势在于官网提供了丰富的Source、Channel、Sink。根据不同的业务需求,我们可以在官网找到相关配置。此外,Flume 还提供了自定义这些组件的接口。

  Logstash(处理离线数据)

  

  Logstash是一个具有实时数据传输能力的管道,负责将数据信息从管道的输入端传输到管道的输出端;同时,这个管道还可以让你根据自己的需要在中间添加一个过滤器,Logstash 提供了很多强大的过滤器来满足各种应用场景。

  Logstash 是用 JRuby 编写的,使用简单的基于消息的架构,并在 JVM 上运行。管道中的数据流称为事件,分为输入阶段、过滤器阶段和输出阶段。

  Sqoop(处理离线数据)

  

  Sqoop 是一种用于在 Hadoop 和关系数据库之间传输数据的工具。它用于将数据从 MySQL 等关系数据库导出到 Hadoop 的 HDFS,从 Hadoop 文件系统导出到关系数据库。Sqoop 底层还是使用了 MapReducer,所以在使用的时候一定要注意数据倾斜。

  总结

  老刘的文章文章主要介绍了Canal工具的核心知识点及其data采集工具的对比,其中data采集工具只简单的说一下概念和应用,以及目的是让每个人都有印象。老刘敢保证,看完这个文章基本就相当于入门了,剩下的就是练习了。

  好了,同步mysql增量数据的工具canal的内容就讲完了。虽然现在的水平可能比不上大佬,但是老刘会努力变得更好,让你自己学习,从不求人!

  如有相关问题,请联系公众号:努力工作的老刘。文章我看到了,点赞、关注、支持一波!

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线