文章采集组合工具(同步mysql增量数据的工具Canal,本篇文章的大纲)

优采云 发布时间: 2021-08-30 03:13

  文章采集组合工具(同步mysql增量数据的工具Canal,本篇文章的大纲)

  Lao Liu 是一名即将找到工作的二年级*敏*感*词*。写博客一方面是总结大数据开发的知识点,另一方面是希望能帮助小伙伴们在不求人的情况下进行自学。由于老刘是大数据开发自学,所以博客肯定会有一些不足,还望大家批评指正,一起进步!

  背景

  大数据领域的数据来源包括业务数据库的数据,以及移动终端的数据和服务器产生的日志数据。当我们采集数据时,可以根据下游数据的不同要求,使用不同的采集工具来做。今天老刘给大家介绍一下Canal,一款mysql增量数据同步工具。本文文章的大纲如下:

  Canal的概念mysql中主备复制的原理Canal如何同步MySQL中的数据Canal的HA机制设计了各种数据同步方案的简单总结

  老刘力求用这篇文章让大家直接使用Canal工具,而不是花其他时间学习。

  mysql主从复制实现原理

  既然是用Canal来同步mysql中的增量数据,那么小刘先讲一下mysql的主备复制原理,再讲一下Canal的核心知识。

  

  根据这张图,老刘将mysql的主备复制原理分解为以下几个过程:

  主服务器必须先启动二进制日志binlog,用于记录任何修改数据库数据的事件。主服务器将数据更改记录到二进制 binlog 日志中。从服务器会将主服务器的二进制日志复制到其本地中继日志(Relaylog)。这一步详细说,首先slave server会启动一个worker thread I/O线程,I/O线程会和主库建立一个普通的client单连接,然后启动一个特殊的二进制转储(binlog dump)线程,这个binlog dump 线程会读取主服务器上二进制日志中的事件,然后将二进制事件发送到I/O线程并保存在从服务器上的中继日志中。从服务器启动SQL线程,从中继日志中读取二进制日志,在从服务器本地进行数据修改操作,从服务器更新数据。

  那么mysql主/备复制的实现原理就结束了。看完这个流程,你能猜到Canal是怎么工作的吗?

  Canal 核心知识点 Canal 工作原理

  Canal 的工作原理是模拟 MySQL slave 的交互协议,伪装成 MySQL slave,向 MySQL master 发起 dump 协议。 MySQL master收到转储请求后,会开始推送binlog到Canal。最后,Canal 会解析 binlog 对象。

  运河概念

  Canal,美国的[kəˈnæl],读成这样,意思是水路/管道/渠道。主要目的是同步MySQL中的增量数据(可以理解为实时数据)。它是阿里巴巴的子公司。用纯 Java 开发的开源项目。

  运河建筑

  

  server代表canal的一个运行实例,对应一个JVM。 Instance对应一个数据队列,1个canal server对应instanceinstance下的1..n子模块:

  EventParser:数据源访问,模拟salve协议和master的交互,协议分析EventSink:Parser和Store链接器,数据过滤、处理、分发。 EventStore:数据存储 MetaManager:增量订阅消费信息管理

  说完了Canal的基本概念,接下来我们来谈谈Canal是如何同步mysql的增量数据的。

  canal同步mysql增量数据并打开mysql binlog

  我们使用Canal同步mysql增量数据的前提是开启了mysql的binlog。阿里云的mysql数据库默认开启了binlog,但是如果我们自己安装mysql,需要手动开启binlog日志功能。

  首先找到mysql配置文件:

  etc/my.cnf

server-id=1

log-bin=mysql-bin

binlog-format=ROW

  这里有一个关于binlog格式的知识点,老刘来告诉你。

  binlog的三种格式:STATEMENT、ROW、MIXED

  ROW 模式(通常使用它)

  日志会记录每行数据被修改的形式。它不会记录 SQL 语句执行的上下文相关信息。它只会记录要修改的数据,修改了哪些数据,修改了哪些数据。只有值,没有SQL多表关联的情况。

  优点:它只需要记录修改了哪条数据,是什么样子的,所以它的日志内容会非常清楚地记录每一行数据修改的细节,非常容易理解。

  缺点:在ROW模式下,尤其是添加数据时,所有执行的语句都会被记录到日志中,并且会随着每一行记录的修改而被记录下来,会产生大量的日志内容。

  语句模式

  每一条修改数据的 SQL 语句都会被记录。

  缺点:因为是记录的执行语句,为了让这些语句在slave端正确执行,他还必须记录每条语句执行过程中的一些相关信息,即上下文信息。确保所有语句在从端执行时都能得到与在主端执行时相同的结果。

  但是目前,例如在某些版本中无法正确复制 step() 函数。存储过程中使用了last-insert-id()函数,可能会导致slave和master上的ID不一致。 ROW模式下没有数据不一致。

  混合模式

  同时使用上述两种模式。

  canal实时同步首先我们需要配置环境,在conf/example/instance.properties下:

   ## mysql serverId

 canal.instance.mysql.slaveId = 1234

 #position info,需要修改成自己的数据库信息

 canal.instance.master.address = 127.0.0.1:3306

 canal.instance.master.journal.name =

 canal.instance.master.position =

 canal.instance.master.timestamp =

 #canal.instance.standby.address =

 #canal.instance.standby.journal.name =

 #canal.instance.standby.position =

 #canal.instance.standby.timestamp =

 #username/password,需要修改成自己的数据库信息

 canal.instance.dbUsername = canal

 canal.instance.dbPassword = canal

 canal.instance.defaultDatabaseName =

 canal.instance.connectionCharset = UTF-8

 #table regex

 canal.instance.filter.regex = .\*\\\\..\*

  其中canal.instance.connectionCharset表示java中编码类型对应的数据库的编码方式,如UTF-8、GBK、ISO-8859-1。

  配置完成后,就要开始了

   sh bin/startup.sh

 关闭使用 bin/stop.sh

  观察日志

  一般用cat查看canal/canal.log,example/example.log

  启动客户端

  IDEA业务代码中,如果mysql中有增量数据,拉入IDEA控制台打印出来

  在 pom.xml 文件中添加:

   

   com.alibaba.otter

   canal.client

   1.0.12

 

  添加客户端代码:

  public class Demo {

 public static void main(String[] args) {

     //创建连接

     CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("hadoop03", 11111),

             "example", "", "");

     connector.connect();

     //订阅

     connector.subscribe();

     connector.rollback();

     int batchSize = 1000;

     int emptyCount = 0;

     int totalEmptyCount = 100;

     while (totalEmptyCount > emptyCount) {

         Message msg = connector.getWithoutAck(batchSize);

         long id = msg.getId();

         List entries = msg.getEntries();

         if(id == -1 || entries.size() == 0){

             emptyCount++;

             System.out.println("emptyCount : " + emptyCount);

             try {

                 Thread.sleep(3000);

             } catch (InterruptedException e) {

                 e.printStackTrace();

             }

         }else{

             emptyCount = 0;

             printEntry(entries);

         }

         connector.ack(id);

     }

 }

 // batch -> entries -> rowchange - rowdata -> cols

 private static void printEntry(List entries) {

     for (CanalEntry.Entry entry : entries){

         if(entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN ||

                 entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND){

             continue;

         }

         CanalEntry.RowChange rowChange = null;

         try {

             rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());

         } catch (InvalidProtocolBufferException e) {

             e.printStackTrace();

         }

         CanalEntry.EventType eventType = rowChange.getEventType();

         System.out.println(entry.getHeader().getLogfileName()+" __ " +

                 entry.getHeader().getSchemaName() + " __ " + eventType);

         List rowDatasList = rowChange.getRowDatasList();

         for(CanalEntry.RowData rowData : rowDatasList){

             for(CanalEntry.Column column: rowData.getAfterColumnsList()){

                 System.out.println(column.getName() + " - " +

                         column.getValue() + " - " +

                         column.getUpdated());

             }

         }

     }

 }

}

  在mysql中写入数据,客户端将增量数据打印到控制台。 Canal的HA机制设计

  大数据领域的很多框架都会有HA机制。运河的HA分为两部分。 Canal 服务器和 Canal 客户端有相应的 HA 实现:

  canal server:为了减少mysql dump的请求,不同服务器上的实例要求同一时间只有一个实例在运行,其他的都处于standby状态。 Canal客户端:为了保证有序,一个canal客户端只能同时对一个实例进行get/ack/rollback操作,否则无法保证客户端的接收顺序。

  整个HA机制的控制主要依赖ZooKeeper的几个特性,ZooKeeper在此不再赘述。

  运河服务器:

  canal server 想要启动canal 实例时,会先尝试对ZooKeeper 进行启动判断(创建一个EPHEMERAL 节点,谁启动谁成功)。 ZooKeeper节点创建成功后,对应的canal服务器会启动对应的canal实例,未成功创建的canal实例会进入standby状态。 ZooKeeper一旦发现canal server创建的节点消失了,会立即通知其他canal server重新执行步骤1,重新选择canal server启动实例。 canal客户端每次连接时,都会先询问谁启动了canal实例的ZooKeeper,然后再与其建立连接。一旦连接不可用,它将尝试再次连接。 canal client的方法和canal server类似,也是使用ZooKeeper抢占EPHEMERAL节点的方法进行控制。

  配置Canal HA并实时同步数据到Kafka。

  修改conf/canal.properties文件

   canal.zkServers = hadoop02:2181,hadoop03:2181,hadoop04:2181

 canal.serverMode = kafka

 canal.mq.servers = hadoop02:9092,hadoop03:9092,hadoop04:9092

  配置 conf/example/example.instance

    canal.instance.mysql.slaveId = 790 /两台canal server的slaveID唯一

  canal.mq.topic = canal_log //指定将数据发送到kafka的topic

  数据同步计划总结

  说完Canal工具,现在给大家简单总结一下目前常用的数据采集tool。不涉及架构知识,简单总结给大家一个印象。

  常见的data采集工具包括:DataX、Flume、Canal、Sqoop、LogStash等

  DataX(处理离线数据)

  DataX是阿里巴巴开源的异构数据源离线同步工具。异构数据源的离线同步是指将数据从源同步到目的地。但是,端到端数据源的类型很多。在DataX存在之前,端到端的链路会形成复杂的网状结构,非常碎片化,无法抽象出同步核心逻辑。

  

  为了解决异构数据源的同步问题,DataX将复杂的Mesh同步链路改造成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。

  所以,当你需要连接一个新的数据源时,你只需要将这个数据源连接到DataX,就可以和现有的数据源无缝同步数据。

  

  DataX本身作为离线数据同步框架,采用Framework+plugin架构构建。数据源读写被抽象为Reader/Writer插件,并纳入整个同步框架。

  Reader:是data采集模块,负责采集data源的数据,并将数据发送给Framework。 Writer:是数据写入模块,负责不断地从Framework中获取数据,并将数据写入目的地。 Framework:用于连接Reader和Writer,作为两者之间的数据传输通道,处理缓冲、并发、数据转换等问题。

  DataX的核心架构如下图:

  

  核心模块介绍:

  DataX 完成单个数据同步作业。我们称之为工作。 DataX收到一个作业后,会启动一个进程来完成整个作业同步过程。 DataX Job启动后,会根据不同的源切分策略,分成多个小Task(子任务),方便并发执行。拆分多个任务后,DataX Job 会调用Scheduler 模块,根据配置的并发数据量重新组合拆分的任务,组装成一个TaskGroup(任务组)。每个 TaskGroup 负责以一定的并发量运行所有分配的任务。单个任务组的默认并发任务数为 5。每个任务由 TaskGroup 启动。 Task启动后,会启动Reader->Channel->Writer线程完成任务同步工作。 DataX作业完成后,作业会*敏*感*词*并等待多个TaskGroup模块任务的完成,等待所有TaskGroup任务完成后作业成功退出。否则异常退出。 Flume(处理实时数据)

  

  Flume 的主要应用场景是同步日志数据,主要收录三个组件:Source、Channel、Sink。

  Flume 最大的优势是官网提供了丰富的 Source、Channel、Sink。根据不同的业务需求,我们可以在官网找到相关的配置。此外,Flume 还提供了自定义这些组件的接口。

  Logstash(处理离线数据)

  

  Logstash 是一个具有实时数据传输能力的管道,负责将数据信息从管道的输入端传输到管道的输出端;同时,这个pipeline还允许你根据自己的需要在中间添加过滤Net,Logstash提供了很多强大的过滤器来满足各种应用场景。

  Logstash 由 JRuby 编写,使用简单的基于消息的架构,并在 JVM 上运行。管道中的数据流称为事件,分为输入阶段、过滤阶段和输出阶段。

  Sqoop(处理离线数据)

  

  Sqoop 是一种用于在 Hadoop 和关系数据库之间传输数据的工具。用于将数据从MySQL等关系数据库导出到Hadoop的HDFS,从Hadoop文件系统导出到关系数据库。 Sqoop底层还是MapReducer,使用时一定要注意数据倾斜。

  总结

  老刘的文章文章主要介绍了Canal工具的核心知识点以及它们的数据对比采集tools。其中data采集tools只是大体讲概念和应用,目的是为了让大家有个印象。老刘敢保证,看完这个文章,基本就相当于入门了,剩下的就是练习了。

  好了,mysql增量数据同步工具Canal的内容就讲完了。虽然现在的水平可能不如大哥们,但老刘会努力变得更好,让所有的朋友都自学,从不求人!

  如有相关问题,请联系公众号:努力工作的老刘。 文章看到了,喜欢这个,关注支持一波!

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线