阅读 153

Canal+RabbitMQ数据异构填坑指南

此文章主要补充Canal整合RabbitMQ时填过的坑。

1、完整数据处理流程

用户中心数据处理流程

这里核心处理点,在于Canal的部署和RabbitMQ的整合。

Canal主要模拟MySQL的Slave,实现主从复制,拉取Mysql的binlog进行解析转换成相应的数据,获取到数据后,将数据推送到RabbitMQ(默认支持的是RocketMQ和Kafka,新版加入对RabbitMQ的支持)。

Canal整合RabbitMQ主要采用的是 路由模式

2、Canal 配置及部署

2.1、Canal Admin 安装及配置

新版本Canal已经支持在线管理,我们可以先安装一个canal-admin。参考:Canal Admin QuickStart

canal-admin的核心模型主要有:

  1. instance,对应canal-server里的instance,一个最小的订阅mysql的队列
  2. server,对应canal-server,一个server里可以包含多个instance
  3. 集群,对应一组canal-server,组合在一起面向高可用HA的运维

简单解释:

  1. instance因为是最原始的业务订阅诉求,它会和 server/集群 这两个面向资源服务属性的进行关联,比如instance A绑定到server A上或者集群 A上,
  2. 有了任务和资源的绑定关系后,对应的资源服务就会接收到这个任务配置,在对应的资源上动态加载instance,并提供服务。动态加载的过程,有点类似于之前的autoScan机制,只不过基于canal-admin之后可就以变为远程的web操作,而不需要在机器上运维配置文件
  3. 将server抽象成资源之后,原本canal-server运行所需要的canal.properties/instance.properties配置文件就需要在web ui上进行统一运维,每个server只需要以最基本的启动配置 (比如知道一下canal-admin的manager地址,以及访问配置的账号、密码即可)

集群模式配置

  1. 先配置一个zk地址和集群名称。


    集群模式
  2. 配置canal.properties
    2.1. 选择主配置

    集群模式主配置

    2.2. 设置canal.properties,可以先载入模板,然后进行修改。
    canal.properties

canal.properties核心配置:

...
# register ip to zookeeper
canal.register.ip = 10.8.158.4
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
canal.user = canal
canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458

# canal admin config
canal.admin.manager = 10.8.158.4:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441

canal.zkServers = 10.8.158.4:2181
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, RocketMQ
canal.serverMode = rabbitmq
...
# table meta tsdb info
canal.instance.tsdb.enable = false
...
# 一下几项均为 1.1.5 新版本新增支持 rabbitmq 的配置
rabbitmq.host = 10.224.45.9:25674
rabbitmq.virtual.host = /
# 指定 rabbitmq 上的 exchange 名称, "新建 `Exchange`" 步骤新建的名称
rabbitmq.exchange = usercenter
# 连接 rabbitmq 的用户名
rabbitmq.username = guest
# 连接 rabbitmq 的密码
rabbitmq.password = guest
...

3、 Canal Server配置及部署

canal-server接入canal-admin,参考:Canal Admin ServerGuide

3.1. 核心配置

root@dreamson-QiTianM425-N000:/tmp/canal-1.1.5/conf# cat canal_local.properties 
# register ip
canal.register.ip = 10.8.158.4

# canal admin config
canal.admin.manager = 10.8.158.4:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster = usercenter

3.2. 启动canal-server,注册canal-admin

root@dreamson-QiTianM425-N000:/tmp/canal-1.1.5# bash bin/startup.sh local

注册后在admin中的展现。


server注册admin

4.Instance实例配置

4.1. 新增instance实例

Instance实例启动后,即可监听mysql的binlog,并将数据推到RabbitMQ。推送的规则参考:Canal Kafka/RocketMQ QuickStart
RabbitMQ的配置的topic,事实上是设置到routeKey上。

新增instance实例
instance列表

5. 最后我们调试一下

MQ监控处理

@RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = "usercenter", durable = "true"),
                    exchange = @Exchange(value = "usercenter", type = ExchangeTypes.TOPIC),
                    key = "usercenter"
            )
    }, concurrency = "10")
    public void test(String message) {
        log.info("test接收到消息。message:{}", message);
    }

修改表数据

修改数据库数据

MQ监控输入日志如下

2021-02-04 11:34:53.037 INFO  [???-???-auth] com.???.???.???.auth.???.comsumer.AuthComsumer :: - test接收到消息。message:{"data":[{"id":"163","name":"0.18278394004589313","age":"0"}],"database":"user_center","es":1612409677000,"id":15,"isDdl":false,"mysqlType":{"id":"bigint(20)","name":"varchar(200)","age":"int(4)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"name":12,"age":4},"table":"test","ts":1612409677905,"type":"INSERT"}
2021-02-04 11:34:53.050 INFO  [???-???-auth] com.???.???.???.auth.???.comsumer.AuthComsumer :: - test接收到消息。message:{"data":[{"id":"163","name":"0.461750433278969","age":"0"}],"database":"user_center","es":1612409677000,"id":15,"isDdl":false,"mysqlType":{"id":"bigint(20)","name":"varchar(200)","age":"int(4)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"name":12,"age":4},"table":"test_1","ts":1612409677905,"type":"INSERT"}
2021-02-04 11:34:53.051 INFO  [???-???-auth] com.???.???.???.auth.???.comsumer.AuthComsumer :: - test接收到消息。message:{"data":[{"id":"163","name":"0.12820304849017694","age":"0"}],"database":"user_center","es":1612409677000,"id":15,"isDdl":false,"mysqlType":{"id":"bigint(20)","name":"varchar(200)","age":"int(4)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"name":12,"age":4},"table":"test_2","ts":1612409677905,"type":"INSERT"}

作者:DreamsonMa

原文链接:https://www.jianshu.com/p/d7c1a7160c60

文章分类
后端
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐