文章实时采集(mysql实时数据采集框架基本介绍-mysql日志 )

优采云 发布时间: 2022-03-16 15:15

  文章实时采集(mysql实时数据采集框架基本介绍-mysql日志

)

  MySQL实时数据采集框架maxwell基本介绍

  maxwell 是一个可以实时读取 MySQL 二进制日志 binlog 的应用程序,并生成 JSON 格式的消息作为生产者发送到 Kafka、Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其他平台。其常见的应用场景包括ETL、缓存维护、表级dml指标采集、增量到搜索引擎、数据分区迁移、数据库切割的binlog回滚方案。

  官网地址:

  GitHub地址:

  maxwell主要提供以下功能:

  支持 SELECT * FROM table 方式的 44 行全数据初始化 支持主库故障转移后 binlog position (GTID) 的自动恢复。可以对数据进行分区,解决数据倾斜的问题。发给kafka的数据支持数据库、表、列。同级数据分区的工作方式是伪装成Slave,接收binlog事件,然后根据schema信息进行组装,可以接受ddl、xid、row等各种eventbinlog引入。

  binlog 是 mysql 中的二进制日志。主要用于记录mysql数据库中发生或可能改变数据的SQL语句,并以二进制形式保存在磁盘中。如果我们后面需要配置主从数据库,如果我们需要从slave数据库同步主库的内容,我们可以通过binlog进行同步。说白了,binlog可以用来解决mysql数据库中数据的实时同步。binlog也有三种格式:STATEMENT、ROW、MIXED。

  基于行的复制(RBR):不记录每条SQL语句的上下文信息,只需要记录哪些数据被修改过,修改后的样子。基于混合的复制(MBR):混合使用上述两种模式。一般复制使用STATEMENT模式保存binlog,ROW模式用于STATEMENT模式下无法复制的操作保存binlog。MySQL 会根据执行的 SQL 语句选择如何保存日志。

  因为语句只有sql,没有数据,无法获取原创的变更日志,一般推荐使用ROW方式)MySQL数据实时同步。我们可以通过解析mysql的bin-log来实现。有很多方法可以解析 bin-log。,可以通过各种方式实现,例如 canal 或 max-well。下面是各种提取方法的比较

  

  开启binlog功能第一步:添加普通用户maxwell

  为mysql添加一个普通用户maxwell,因为maxwell软件默认用户是maxwell,进入mysql客户端,然后执行如下命令授权

  mysql -uroot  -p <br >set global validate_password_policy=LOW;<br >set global validate_password_length=6;<br >CREATE USER 'maxwell'@'%' IDENTIFIED BY '123456';<br >GRANT ALL ON maxwell.* TO 'maxwell'@'%';<br >GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';<br >flush privileges;<br >

  第二步:开启binlog机制

  对于我们的mysql数据库,我们可以通过修改mysql的配置文件来启用mysql的binlog功能。修改配置文件后,我们需要重启mysql服务。

  sudo vim /etc/my.cnf<br ># 添加或修改以下三行配置<br >log-bin= /var/lib/mysql/mysql-bin # 日志存放位置<br >binlog-format=ROW # binlog格式<br >server_id=1 # server id<br ># 重启mysql<br >sudo service mysqld restart<br >

  第三步:验证mysql服务

  # 进入mysql客户端,并执行以下命令进行验证<br >mysql -uroot -p<br >mysql> show variables like '%log_bin%';<br >+---------------------------------+-----------------------------+<br >| Variable_name                   | Value                       |<br >+---------------------------------+-----------------------------+<br >| log_bin                         | ON                          |<br >| log_bin_basename                | /var/lib/mysql/binlog       |<br >| log_bin_index                   | /var/lib/mysql/binlog.index |<br >| log_bin_trust_function_creators | OFF                         |<br >| log_bin_use_v1_row_events       | OFF                         |<br >| sql_log_bin                     | ON                          |<br >+---------------------------------+-----------------------------+<br ><br >root@0f3525c74115:/# cd /var/lib/mysql/<br >root@0f3525c74115:/var/lib/mysql# ls -la<br >-rw-r-----  1 mysql mysql      324 Jul  2 14:54  binlog.000007<br >-rw-r-----  1 mysql mysql      156 Jul  2 14:54  binlog.000008<br >-rw-r-----  1 mysql mysql       32 Jul  2 14:54  binlog.index<br ># 以上是binlog文件说明已经开启<br >

  安装maxwell实现实时采集mysql数据第一步:下载maxwell

  下载:

  第二步:修改配置

  cp config.properties.example config.properties<br >vim config.properties<br > #-----------------<br >producer=kafka<br >kafka.bootstrap.servers=node01:9092,node02:9092,node03:9092<br >host=node03<br >user=maxwell<br >password=123456<br >producer=kafka<br >host=node03<br >port=3306<br >user=maxwell<br >password=123456<br >kafka.bootstrap.servers=node01:9092,node02:9092,node03:9092<br >kafka_topic=maxwell_kafka<br ># 一定要注意:一定要保证我们使用maxwell用户和123456密码能够连接上mysql数据库<br >

  启动服务 启动我们的zookeeper服务 启动kafka服务,为kafka创建topic 启动maxwell服务,测试插入数据到数据库,看kafka能不能同步到mysql数据

  # 创建kafka topic 分区 3 副本 2<br >bin/kafka-topics.sh  --create --topic maxwell_kafka --partitions 3 --replication-factor 2 --zookeeper node01:2181<br ><br ># 创建kafka 消费者 <br >bin/kafka-console-consumer.sh --topic maxwell_kafka --from-beginning --bootstrap-server node01:9092,node02:9092,node03:9092<br ><br ># 启动maxwell<br >bin/maxwell<br >

  插入数据测试

  -- 创建目标库<br >CREATE DATABASE /*!32312 IF NOT EXISTS*/`test` /*!40100 DEFAULT CHARACTER SET utf8 */;<br >USE `test`;<br ><br >-- 创建表<br >CREATE TABLE `myuser` (<br >  `id` int(12) NOT NULL,<br >  `name` varchar(32) DEFAULT NULL,<br >  `age` varchar(32) DEFAULT NULL,<br >  PRIMARY KEY (`id`)<br >) ENGINE=InnoDB DEFAULT CHARSET=utf8;<br ><br >-- 插入数据<br />insert  into `myuser`(`id`,`name`,`age`) values (1,'zhangsan',NULL),(2,'xxx',NULL),(3,'ggg',NULL),(5,'xxxx',NULL),(8,'skldjlskdf',NULL),(10,'ggggg',NULL),(99,'ttttt',NULL),(114,NULL,NULL),(121,'xxx',NULL);<br /><br />-- 岂容kafka消费者,会看到消费的数据<br />bin/kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --topic  maxwell_kafka<br />

  

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线