一、介绍
**Canal [kə’næl]**,译意为水道/管道/沟渠,canal是阿里巴巴旗下的一款开源项目,基于Java开发。基于数据库增量日志解析,提供增量数据订阅&消费。GitHub的地址:https://github.com/alibaba/canal
Canal是基于mysql的主从同步来实现的,MySQL主从同步的原理如下:
- 1)MySQL master 将数据变更写入二进制日志( binary log),其中记录的数据叫做binary log events
- 2)MySQL slave 将 master 的 binary log events拷贝到它的中继日志(relay log)
- 3)MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
而Canal就是把自己伪装成MySQL的一个slave节点,从而监听master的binary log变化。再把得到的变化信息通知给Canal的客户端,进而完成对其它数据库的同步。
常用场景:
- 更新缓存
- 抓取业务表的新增变化数据,用于制作实时统计
二、安装
1、开启MySQL主从同步
Canal是基于MySQL的主从同步功能,因此必须先开启MySQL的主从功能才可以。
修改文件:
vi /tmp/mysql/conf/my.cnf
添加内容:
log-bin=/var/lib/mysql/mysql-bin
binlog-do-db=test
配置解读:
log-bin=/var/lib/mysql/mysql-bin
:设置binary log文件的存放地址和文件名,叫做mysql-binbinlog-do-db=test
:指定对哪个database记录binary log events,这里记录heima这个库
最终效果:
[mysqld]
skip-name-resolve
character_set_server=utf8
datadir=/var/lib/mysql
server-id=1000
log-bin=/var/lib/mysql/mysql-bin
binlog-do-db=test
设置用户权限
接下来添加一个仅用于数据同步的账户,出于安全考虑,这里仅提供对test这个库的操作权限。
create user canal@'%' IDENTIFIED by 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%' identified by 'canal';
FLUSH PRIVILEGES;
重启mysql即可
测试设置是否成功:在mysql控制台,或者Navicat中,输入命令:
show master status;
2、安装 Canal
(1)下载
下载地址:https://github.com/alibaba/canal/releases
选择 deployer
下载,上传服务器,解压
注意:canal 解压后是分散的,需要自己制定解压目录
tar -zxvf canal.deployer-1.1.2.tar.gz -C /opt/module/canal
(2)canal.properties
canal.properties
的配置 解析
#################################################
######### common argument #############
#################################################
canal.id = 1
canal.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, RocketMQ
canal.serverMode = tcp
...
#################################################
######### destinations #############
#################################################
# 配置监控的mysql服务,可配置多个,对应conf
canal.destinations = example
说明:这个文件是 canal 的基本通用配置,canal 端口号默认就是 11111,canal.serverMode
: canal 的输出 model,默认 tcp
多实例配置如果创建多个实例,通过前面 canal 架构,我们可以知道,一个 canal 服务中可以有多个 instance,conf/下的每一个 example 即是一个实例,每个实例下面都有独立的配置文件。默认只有一个实例 example,如果需要多个实例处理不同的 MySQL 数据的话,直接拷贝出多个 example,并对其重新命名,命名和配置文件中指定的名称一致,然后修改canal.properties 中的 canal.destinations=实例 1,实例 2,实例 3。
如果使用默认实例,则不用修改该文件
(3)instance.properties
修改下面的参数
# canal作为从库的 id ,必须与 mysql 的id不一样
canal.instance.mysql.slaveId=100
# mysql ip端口
canal.instance.master.address=127.0.0.1:3306
# 开始同步的位置,全部同步的话无需配置,当需要忽略旧数据时要配置
canal.instance.master.position=
# 用户名密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 配置要监听的表
# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=
canal.instance.filter.regex
的格式
过滤方式 | 格式 | 案例 |
---|---|---|
全库全表 | .*\\..* |
.*\\..* |
指定库全表 | 库名..* | test..* |
单表 | 库名.表名 | test.user |
多规则组合使用 | 库名1..*,库名2.表名1,库名3.表名2 (逗号分隔) | test..*,test2.user1,test3.user2 |
三、Canal客户端
此处演示 TCP 模式的 Canal
Canal提供了各种语言的客户端,当Canal监听到binlog变化时,会通知Canal的客户端。
我们可以利用Canal提供的Java客户端,监听Canal通知消息。当收到变化的消息时,完成对缓存的更新。
不过这里我们会使用GitHub上的第三方开源的canal-starter客户端。地址:https://github.com/NormanGyllenhaal/canal-client
与SpringBoot完美整合,自动装配,比官方客户端要简单好用很多。
1、依赖
<dependency>
<groupId>top.javatool</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
<version>1.2.1-RELEASE</version>
</dependency>
2、配置
canal:
destination: example # canal的集群名字,要与安装canal时设置的名称一致
server: 127.0.0.1:11111 # canal服务地址
3、监听表的实体类
通过、、等注解完成Item与数据库表字段的映射:
@CanalTable
:指定监听的表@Id
:标记为 id 字段@Column
:当类的属性名和表字段名不一致时,需指定字段名@Transient
:标注属性在表中不存在
import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.annotation.Transient;
import top.javatool.canal.client.annotation.CanalTable;
import javax.persistence.Column;
import java.util.Date;
@Data
@CanalTable("t_user")
public class TUser {
@Id
private Long id;
@Column(name = "user_name")
private String userName;
@Column(name = "user_age")
private Integer userAge;
@Column(name = "created_time")
private Date createdTime;
@Transient
private Integer stock;
}
4、监听器
通过实现EntryHandler<T>
接口编写监听器,监听Canal消息。注意两点:
- 实现类通过
@CanalTable("tb_item")
指定监听的表信息 - EntryHandler的泛型是与表对应的实体类
/**
* @author rewind
* @date 2022/3/27 20:45
* @desc 监听器
**/
@Slf4j
@Component
@CanalTable("t_user")
public class TUserHandler implements EntryHandler<TUser> {
@Autowired
private RedisTemplate redisTemplate;
/**
* 监听到数据库的新增操作时执行
*/
@Override
public void insert(TUser tUser) {
log.info("监听到数据新增:{}",tUser);
Map<String, Object> map = BeanUtil.beanToMap(tUser);
//HashMap<String, Object> map = new HashMap<>();
map.put("id",tUser.getId().toString());
redisTemplate.opsForHash().putAll("USER:" ,map);
}
/**
* 监听到数据库的修改操作时执行
*/
@Override
public void update(TUser before, TUser after) {
log.info("监听到数据修改:");
log.info("修改前数据:{}",before);
log.info("修改后数据:{}",after);
}
/**
* 监听到数据库的删除操作时执行
*/
@Override
public void delete(TUser tUser) {
log.info("监听到数据删除:{}",tUser);
}
}