阅读 723

canal数据同步

前面提到数据库缓存不一致的几种解决方案,但是在不同的场景下各有利弊,而今天我们使用的canal进行缓存与数据同步的方案是最好的,但是也有一个缺点,就是相对前面几种解决方案会引入阿里巴巴的canal组件,订阅消费binlog日志,增加的系统复杂度。

canal官网地址:https://github.com/alibaba/canal/wiki

简介:

img

canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议

  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )

  • canal 解析 binary log 对象(原始为 byte 流)

     

第一步:创建MySQL配置文件

创建MySQL文件目录
mkdir -p /home/mysql/{master,slave}/{logs,data,conf,mysql-files}
在/home/mysql/master/conf目录下创建my.cnf配置文件并写入一下信息


#主从复制-主机配置
[mysqld]
#设置主机启动端口3316端口
port=3316
#主服务器唯一ID
server-id = 1
#启用二进制日志
log-bin=mysql-bin
#设置logbin格式
binlog_format = Row
#设置mysql的安装目录
#basedir=
#设置mysql数据库的数据的存放目录
datadir=/home/mysql/master/data
#允许最大连接数
max_connections=200
#允许连接失败的次数
max_connect_errors=10
#服务端使用的字符集默认为utf8mb4
character-set-server=utf8mb4
#创建新表时将使用的默认存储引擎
default-storage-engine=INNODB
#默认使用mysql_native_password插件认证
#mysql_native_password
default_authentication_plugin=mysql_native_password
#忽略大小写
lower-case-table-names = 1
[mysql]
#设置mysql客户端默认字符集
default-character-set=utf8mb4
#MySQL导入导出文件限制
#secure_file_priv =

[client]
#设置mysql客户端连接服务端时默认使用的端口
port=3316
default-character-set=utf8mb4

 

第二步:拉取MySQL镜像

docker pull mysql

第三步:创建MySQL容器

docker run \
 -p 3316:3316 \
 -e MYSQL_ROOT_PASSWORD=123456 \
 -v /home/mysql/master/data:/home/mysql/master/data:rw \
 -v /home/mysql/master/logs:/var/log/mysql:rw \
 -v /home/mysql/master/conf/my.cnf:/etc/mysql/my.cnf:rw \
 -v /home/mysql/mster/mysql-files:/var/lib/mysql-files \
 -v /etc/localtime:/etc/localtime:ro \
 --name mysql3316_master \
 --privileged=true \
 --restart=always \
 -d mysql

第四步:创建canal进行数据同步的MySQL用户

create user canal@'%' IDENTIFIED by 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

第五步:创建canal

docker run -p 11111:11111 --name canal \
       -e canal.destinations=canal_test \
       -e canal.instance.master.address=192.168.0.100:3316  \
       -e canal.instance.dbUsername=canal  \
       -e canal.instance.dbPassword=canal  \
       -e canal.instance.connectionCharset=UTF-8 \
       -e canal.instance.tsdb.enable=true \
       -e canal.instance.gtidon=false  \
       -e canal.instance.filter.regex=canal_test\\..* \
       --privileged=true \
       -d canal/canal-server:v1.1.5

说明:

  • -p 11111:11111:这是canal的默认监听端口

  • -e canal.instance.master.address=mysql:3316

  • -e canal.instance.dbUsername=canal:数据库用户名

  • -e canal.instance.dbPassword=canal :数据库密码

  • -e canal.instance.filter.regex=:要监听的表名称

表名称监听支持的语法:

mysql 数据解析关注的表,Perl正则表达式.
多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)
常见例子:
1.  所有表:.*   or  .*\\..*
2.  canal schema下所有表: canal\\..*
3.  canal下的以canal打头的表:canal\\.canal.*
4.  canal schema下的一张表:canal.test1
5.  多个规则组合使用然后以逗号隔开:canal\\..*,mysql.test1,mysql.test2

 

查看canal是否创建成功

docker exec -it canal bash 进入容器内部

查看日志,若显示canal正在进行数据同步,则说明搭建成功

 

 

第六步:测试

创建springboot项目

pom.xml添加依赖

<dependency>
   <groupId>top.javatool</groupId>
   <artifactId>canal-spring-boot-starter</artifactId>
   <version>1.2.1-RELEASE</version>
</dependency>

application.yml文件添加配置

canal:
 destination: canal_test
 server: master:11111

创建测试实体类

import org.springframework.data.annotation.Id;

import javax.persistence.Column;
import javax.persistence.Table;

@Table(name = "order")
public class Order {
   @Id
   @Column(name = "order_id")
   private Long orderId;

   @Column(name = "order_name")
   private String orderName;

   @Column(name = "order_status")
   private Integer orderStatus;

   @Column(name = "user_id")
   private Long userId;

   public Long getOrderId() {
       return orderId;
   }

   public void setOrderId(Long orderId) {
       this.orderId = orderId;
   }

   public String getOrderName() {
       return orderName;
   }

   public void setOrderName(String orderName) {
       this.orderName = orderName;
   }

   public Integer getOrderStatus() {
       return orderStatus;
   }

   public void setOrderStatus(Integer orderStatus) {
       this.orderStatus = orderStatus;
   }

   public Long getUserId() {
       return userId;
   }

   public void setUserId(Long userId) {
       this.userId = userId;
   }

   @Override
   public String toString() {
       return "Order{" +
               "orderId=" + orderId +
               ", orderName='" + orderName + '\'' +
               ", orderStatus=" + orderStatus +
               ", userId=" + userId +
               '}';
   }
}

创建测试handler

import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.context.CanalContext;
import top.javatool.canal.client.handler.EntryHandler;
import top.javatool.canal.client.model.CanalModel;

@Component
@CanalTable("order")
public class TestHandler implements EntryHandler<Order> {

   @Override
   public void insert(Order order) {
       System.out.println("insert:" + order);
       CanalModel canal = CanalContext.getModel();

       System.out.println("================================================");
       System.out.println(canal);
       System.out.println("================================================");

   }

   @Override
   public void update(Order before, Order after) {
       System.out.println("update before:" + before);
       System.out.println("update after:" + after);
       CanalModel canal = CanalContext.getModel();
       System.out.println("================================================");
       System.out.println(canal);
       System.out.println("================================================");

   }

   @Override
   public void delete(Order order) {
       System.out.println("delete:" + order);
       CanalModel canal = CanalContext.getModel();
       System.out.println("================================================");
       System.out.println(canal);
       System.out.println("================================================");

   }
}

启动springboot项目

随意一下修改数据则可以看到以下日志

  version: 1
 logfileName: "mysql-bin.000004"
 logfileOffset: 2217
 serverId: 1
 serverenCode: "UTF-8"
 executeTime: 1630830321000
 sourceType: MYSQL
 schemaName: ""
 tableName: ""
 eventLength: 90
}
entryType: TRANSACTIONBEGIN
storeValue: " +"
, header {
 version: 1
 logfileName: "mysql-bin.000004"
 logfileOffset: 2378
 serverId: 1
 serverenCode: "UTF-8"
 executeTime: 1630830321000
 sourceType: MYSQL
 schemaName: "canal_test"
 tableName: "order"
 eventLength: 67
 eventType: UPDATE
 props {
   key: "rowsCount"
   value: "1"
 }
}
entryType: ROWDATA
storeValue: "\bf\020\002P\000b\263\002\n\034\b\000\020\004\032\border_id \001(\0000\000B\0011R\003int\n\'\b\001\020\f\032\norder_name \000(\0000\000B\001aR\fvarchar(255)\n*\b\002\020\f\032\forder_status \000(\0000\000B\00212R\fvarchar(255)\n$\b\003\020\f\032\auser_id \000(\0000\000B\0011R\fvarchar(255)\022\034\b\000\020\004\032\border_id \001(\0000\000B\0011R\003int\022(\b\001\020\f\032\norder_name \000(\0010\000B\00212R\fvarchar(255)\022*\b\002\020\f\032\forder_status \000(\0000\000B\00212R\fvarchar(255)\022$\b\003\020\f\032\auser_id \000(\0000\000B\0011R\fvarchar(255)"
, header {
 version: 1
 logfileName: "mysql-bin.000004"
 logfileOffset: 2445
 serverId: 1
 serverenCode: "UTF-8"
 executeTime: 1630830321000
 sourceType: MYSQL
 schemaName: ""
 tableName: ""
 eventLength: 31
}
entryType: TRANSACTIONEND
storeValue: "\022\003300"
],raw=false,rawEntries=[]]
update before:Order{orderId=null, orderName='a', orderStatus=null, userId=null}
update after:Order{orderId=1, orderName='12', orderStatus=12, userId=1}
================================================
CanalModel{id=4, database='canal_test', table='order', executeTime=1630830321000, createTime=null}
================================================

至此说明我们的canal搭建和使用都没有问题,这种数据同步方式可以极大的减少缓存与数据库的数据不一致问题。

服务器评测 http://www.cncsto.com/ 

服务器测评 http://www.cncsto.com/ 

站长资源 https://www.cscnn.com/ 

小鱼创业 https://www.237fa.com/ 


文章分类
后端
文章标签
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐