Canal数据同步


一、介绍

**Canal [kə’næl]**,译意为水道/管道/沟渠,canal是阿里巴巴旗下的一款开源项目,基于Java开发。基于数据库增量日志解析,提供增量数据订阅&消费。GitHub的地址:https://github.com/alibaba/canal

Canal是基于mysql的主从同步来实现的,MySQL主从同步的原理如下:

image-20220216211803090

  • 1)MySQL master 将数据变更写入二进制日志( binary log),其中记录的数据叫做binary log events
  • 2)MySQL slave 将 master 的 binary log events拷贝到它的中继日志(relay log)
  • 3)MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

而Canal就是把自己伪装成MySQL的一个slave节点,从而监听master的binary log变化。再把得到的变化信息通知给Canal的客户端,进而完成对其它数据库的同步。

image-20210821115948395

常用场景:

  • 更新缓存
  • 抓取业务表的新增变化数据,用于制作实时统计

二、安装

1、开启MySQL主从同步

Canal是基于MySQL的主从同步功能,因此必须先开启MySQL的主从功能才可以。

修改文件:

vi /tmp/mysql/conf/my.cnf

添加内容:

log-bin=/var/lib/mysql/mysql-bin
binlog-do-db=test

配置解读:

  • log-bin=/var/lib/mysql/mysql-bin:设置binary log文件的存放地址和文件名,叫做mysql-bin
  • binlog-do-db=test:指定对哪个database记录binary log events,这里记录heima这个库

最终效果:

[mysqld]
skip-name-resolve
character_set_server=utf8
datadir=/var/lib/mysql
server-id=1000
log-bin=/var/lib/mysql/mysql-bin
binlog-do-db=test

设置用户权限

接下来添加一个仅用于数据同步的账户,出于安全考虑,这里仅提供对test这个库的操作权限。

create user canal@'%' IDENTIFIED by 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%' identified by 'canal';
FLUSH PRIVILEGES;

重启mysql即可

测试设置是否成功:在mysql控制台,或者Navicat中,输入命令:

show master status;

2、安装 Canal

(1)下载

下载地址:https://github.com/alibaba/canal/releases

选择 deployer下载,上传服务器,解压

注意:canal 解压后是分散的,需要自己制定解压目录

tar -zxvf canal.deployer-1.1.2.tar.gz -C /opt/module/canal

(2)canal.properties

canal.properties 的配置 解析

#################################################
#########         common argument        ############# 
#################################################
canal.id = 1
canal.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, RocketMQ
canal.serverMode = tcp

...

#################################################
#########         destinations        ############# 
#################################################
# 配置监控的mysql服务,可配置多个,对应conf
canal.destinations = example

说明:这个文件是 canal 的基本通用配置,canal 端口号默认就是 11111,canal.serverMode: canal 的输出 model,默认 tcp

多实例配置如果创建多个实例,通过前面 canal 架构,我们可以知道,一个 canal 服务中可以有多个 instance,conf/下的每一个 example 即是一个实例,每个实例下面都有独立的配置文件。默认只有一个实例 example,如果需要多个实例处理不同的 MySQL 数据的话,直接拷贝出多个 example,并对其重新命名,命名和配置文件中指定的名称一致,然后修改canal.properties 中的 canal.destinations=实例 1,实例 2,实例 3。

如果使用默认实例,则不用修改该文件

(3)instance.properties

修改下面的参数

# canal作为从库的 id ,必须与 mysql 的id不一样
canal.instance.mysql.slaveId=100
# mysql ip端口
canal.instance.master.address=127.0.0.1:3306
# 开始同步的位置,全部同步的话无需配置,当需要忽略旧数据时要配置
canal.instance.master.position=
# 用户名密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal

# 配置要监听的表
# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=

canal.instance.filter.regex 的格式

过滤方式 格式 案例
全库全表 .*\\..* .*\\..*
指定库全表 库名..* test..*
单表 库名.表名 test.user
多规则组合使用 库名1..*,库名2.表名1,库名3.表名2 (逗号分隔) test..*,test2.user1,test3.user2

三、Canal客户端

此处演示 TCP 模式的 Canal

Canal提供了各种语言的客户端,当Canal监听到binlog变化时,会通知Canal的客户端。

我们可以利用Canal提供的Java客户端,监听Canal通知消息。当收到变化的消息时,完成对缓存的更新。

不过这里我们会使用GitHub上的第三方开源的canal-starter客户端。地址:https://github.com/NormanGyllenhaal/canal-client

与SpringBoot完美整合,自动装配,比官方客户端要简单好用很多。

1、依赖

<dependency>
    <groupId>top.javatool</groupId>
    <artifactId>canal-spring-boot-starter</artifactId>
    <version>1.2.1-RELEASE</version>
</dependency>

2、配置

canal:
  destination: example # canal的集群名字,要与安装canal时设置的名称一致
  server: 127.0.0.1:11111 # canal服务地址

3、监听表的实体类

通过、、等注解完成Item与数据库表字段的映射:

  • @CanalTable:指定监听的表

  • @Id:标记为 id 字段

  • @Column:当类的属性名和表字段名不一致时,需指定字段名

  • @Transient:标注属性在表中不存在

import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.annotation.Transient;
import top.javatool.canal.client.annotation.CanalTable;

import javax.persistence.Column;
import java.util.Date;

@Data
@CanalTable("t_user")
public class TUser {

    @Id
    private Long id;

    @Column(name = "user_name")
    private String userName;

    @Column(name = "user_age")
    private Integer userAge;

    @Column(name = "created_time")
    private Date createdTime;

    @Transient
    private Integer stock;
}

4、监听器

通过实现EntryHandler<T>接口编写监听器,监听Canal消息。注意两点:

  • 实现类通过@CanalTable("tb_item")指定监听的表信息
  • EntryHandler的泛型是与表对应的实体类
/**
 * @author  rewind
 * @date  2022/3/27 20:45
 * @desc 监听器
 **/
@Slf4j
@Component
@CanalTable("t_user")
public class TUserHandler implements EntryHandler<TUser> {

    @Autowired
    private RedisTemplate redisTemplate;

    /**
     * 监听到数据库的新增操作时执行
     */
    @Override
    public void insert(TUser tUser) {
        log.info("监听到数据新增:{}",tUser);
        Map<String, Object> map = BeanUtil.beanToMap(tUser);
        //HashMap<String, Object> map = new HashMap<>();
        map.put("id",tUser.getId().toString());
        redisTemplate.opsForHash().putAll("USER:" ,map);
    }

    /**
     * 监听到数据库的修改操作时执行
     */
    @Override
    public void update(TUser before, TUser after) {
        log.info("监听到数据修改:");
        log.info("修改前数据:{}",before);
        log.info("修改后数据:{}",after);
    }

    /**
     * 监听到数据库的删除操作时执行
     */
    @Override
    public void delete(TUser tUser) {
        log.info("监听到数据删除:{}",tUser);
    }
}

  目录