1. 简介
Canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费功能。
工作原理
Mysql主备复制原理
- MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
- MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
- MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
canal工作原理
- canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
- canal 解析 binary log 对象(原始为 byte 流)
mysql的binlog
它记录了所有的DDL和DML(除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间。主要用来备份和数据同步。
binlog有三种模式: STATEMENT
、 ROW
、 MIXED
- STATEMENT 记录的是执行的sql语句
- ROW 记录的是真实的行数据记录
- MIXED 记录的是1+2,优先按照1的模式记录
比如:
update user set age = 33
对应 STATEMENT
模式只是记录了当前执行的sql,而对应 ROW
模式则有可能有成千上万条记录(当然这取决于你user表的记录数)
2. 可以干什么
基于日志增量订阅和消费的业务包括
- 数据库镜像
- 数据库实时备份
- 索引构建和实时维护(拆分异构索引、倒排索引等)
- 业务 cache 刷新
- 带业务逻辑的增量数据处理
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
3. 安装
原生安装请参考:https://github.com/alibaba/canal/wiki/QuickStart
3.1 docker-compose安装
3.1.1 创建同步用户
如果想查看mysql server的相关配置可以参考 https://www.cnblogs.com/ludangxin/p/16358928.html 中的master配置
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
3.1.2 修改配置文件
首先启动一个零时的容器用于 将容器中的配置文件信息copy到宿主机
run 零时容器
docker run --name canal-temp -d --rm canal/canal-server:v1.1.6
执行copy操作 copy配置文件到当前目录中
docker cp canal-temp:/home/admin/canal-server/conf ./canal-server/conf
canal-server/conf
配置文件目录结构如下
canal-server/conf
├── canal.properties # canal server 的配置文件参数信息 例如:服务的端口/集群参数/server 模式(# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ)等
├── canal_local.properties
├── example # canal 实例相关配置信息,如果想要监听多个mysql server 可以copy此文件进行配置,当然也要在canal.properties的 canal.destinations 中添加对应的文件夹名称
│ ├── h2.mv.db
│ ├── instance.properties
│ └── meta.dat
├── logback.xml
...
修改配置文件信息
canal.properties
我们使用默认的配置信息 即:canal.serverMode = tcp
example/instance.properties
中配置mysql server连接信息 如下:
position info
mysql url 我这里直接使用的是 mysql容器name
canal.instance.master.address=my_mysql:3306
监听的binlog 文件名称 例:mysql-bin.000007
canal.instance.master.journal.name=
日志文件的Offset
canal.instance.master.position=
username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
table regex
默认配置是同步所有的库和表
#canal.instance.filter.regex=.*\\..*
配置只监听test库的user表,如果需要读取多个表可以使用正则表达式或者用逗号隔开
canal.instance.filter.regex=test.user
mysql 数据解析关注的表,Perl正则表达式.
多个正则之间以逗号(,)分隔,转义符需要双斜杠()
常见例子:
所有表:.* or ….
canal schema下所有表: canal…
canal下的以canal打头的表:canal.canal.
canal schema下的一张表:canal.test1
多个规则组合使用:canal…*,mysql.test1,mysql.test2 (逗号分隔)
注意:此过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解
析sql,所以无法准确提取tableName进行过滤)
3.1.3 启动canal
docker-compose.yaml
如下
因为canal需要读取mysql server的bin-log所以需要设置加入到mysql server的网络中去
version: '3'
services:
canal:
image: canal/canal-server:v1.1.6
hostname: canal
container_name: canal
restart: "no"
ports:
- "11111:11111"
volumes:
- "./canal-server/conf:/home/admin/canal-server/conf"
- "./canal-server/logs:/home/admin/canal-server/logs"
networks:
- mysql_mysql
networks:
mysql_mysql:
external: true
4. springboot 测试
4.1 添加依赖
org.springframework.boot
spring-boot-starter-web
org.projectlombok
lombok
true
top.javatool
canal-spring-boot-starter
1.2.1-RELEASE
4.2 添加配置
canal:
server: localhost:11111
destination: example
logging:
level:
top.javatool.canal.client.client: OFF
4.3 监听canal数据
package com.ldx.canaldemo.handler;
import com.ldx.canaldemo.domain.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;
@Slf4j
@Component
// 监听user表
@CanalTable(value = "user")
public class UserHandler implements EntryHandler {
@Override
public void insert(User user) {
log.info("insert info {}", user);
}
@Override
public void update(User before, User after) {
log.info("update before {} ", before);
log.info("update after {}", after);
}
@Override
public void delete(User user) {
log.info("delete {}", user);
}
}
package com.ldx.canaldemo.domain;
import lombok.Data;
import javax.persistence.Column;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Table;
import java.io.Serializable;
@Data
@Table(name = "user")
public class User implements Serializable {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Column(name = "id")
private Integer id;
@Column(name = "username")
private String username;
@Column(name = "password")
private String password;
@Column(name = "sex")
private Integer sex;
}
4.4 测试
user表信息如下
CREATE TABLE user
(
id
int(11) NOT NULL,
username
varchar(255) COLLATE utf8mb4_bin DEFAULT NULL,
password
varchar(255) COLLATE utf8mb4_bin DEFAULT NULL,
age
int(1) DEFAULT NULL,
sex
int(1) COLLATE utf8mb4_bin DEFAULT NULL,
PRIMARY KEY (id
)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
测试新增
INSERT INTO test
.user
(id
, username
, password
, age
, sex
) VALUES (5, 'zhangtieniu', '123456', 28, 1);
查看控制台输出如下
2022-06-14 13:50:38.144 INFO 71053 --- [xecute-thread-1] com.ldx.canaldemo.handler.UserHandler : insert info User(id=5, username=zhangtieniu, password=123456, age=28, sex=1)
测试修改
UPDATE test
.user
SET username
= 'zhangsan', age = 23 WHERE id = 5;
查看控制台输出如下
2022-06-14 13:54:55.997 INFO 71053 --- [xecute-thread-2] com.ldx.canaldemo.handler.UserHandler : update before User(id=null, username=zhangtieniu, password=null, age=28, sex=null)
2022-06-14 13:54:55.997 INFO 71053 --- [xecute-thread-2] com.ldx.canaldemo.handler.UserHandler : update after User(id=5, username=zhangsan, password=123456, age=23, sex=1)
测试删除
DELETE FROM test
.user
WHERE id = 5;
查看控制台输出如下
2022-06-14 13:56:46.359 INFO 71053 --- [xecute-thread-3] com.ldx.canaldemo.handler.UserHandler : delete User(id=5, username=zhangsan, password=123456, age=23, sex=1)
5. rabbit mq 测试
5.1 修改canal配置
canal.properties
修改如下
将serverMode 修改成rabbitMQ
canal.serverMode = rabbitMQ
添加rabbitmq 配置信息
rabbitmq.host = rabbitmq:5672
rabbitmq.virtual.host = /
rabbitmq.exchange = canal.exchange
rabbitmq.username = admin
rabbitmq.password = admin123
example/instance.properties
添加路由规则
canal.mq.topic=canal_routing_key
5.2 springboot consumer
5.2.1 添加依赖
org.springframework.boot
spring-boot-starter-amqp
5.2.2 添加配置
spring:
rabbitmq:
host: localhost
port: 5672
# rabbit 默认的虚拟主机
virtual-host: /
# rabbit 用户名密码
username: admin
password: admin123
listener:
simple:
# manual 手动确认
acknowledge-mode: manual
5.2.3 添加 consumer
package com.ldx.canaldemo.rabbitmq;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author ludangxin
* @date 2022/6/14
*/
@Slf4j
@Component
public class RabbitMQConsumer {
@RabbitListener(bindings =
{@QueueBinding(
value = @Queue(value = MqConstant.CANAL_QUEUE, durable = "true"),
exchange = @Exchange(value = MqConstant.CANAL_EXCHANGE),
key = MqConstant.CANAL_ROUTING_KEY)
})
public void helloRabbitMq(Message message, Channel channel) throws IOException {
MessageProperties messageProperties = message.getMessageProperties();
try {
log.info(message.toString());
log.info(new String(message.getBody()));
channel.basicAck(messageProperties.getDeliveryTag(), false);
} catch(Exception e) {
// 当前的消息是否重新投递的消息,也就是该消息是重新回到队列里的消息
if(messageProperties.getRedelivered()) {
log.info("消息已重复处理失败,拒绝再次接收...");
// 拒绝消息Ò
channel.basicReject(messageProperties.getDeliveryTag(), false);
}
else {
log.info("消息即将再次返回队列处理...");
channel.basicNack(messageProperties.getDeliveryTag(), false, true);
}
}
}
}
package com.ldx.canaldemo.rabbitmq;
/**
* @author ludangxin
* @date 2022/6/14
*/
public interface MqConstant {
String CANAL_EXCHANGE = "canal.exchange";
String CANAL_QUEUE = "canal_queue";
String CANAL_ROUTING_KEY = "canal_routing_key";
}
5.3 启动测试
先启动项目让程序自动建立所需mq中的交换机和队列
测试新增
INSERT INTO test
.user
(id
, username
, password
, age
, sex
) VALUES (8, 'zhangtieniu', '123456', 28, 1);
查看控制台输出如下
2022-06-14 14:42:04.818 INFO 73549 --- [ntContainer#0-1] c.l.canaldemo.rabbitmq.RabbitMQConsumer : (Body:'[B@189a76a(byte[414])' MessageProperties [headers={}, contentLength=0, redelivered=false, receivedExchange=canal.exchange, receivedRoutingKey=canal_routing_key, deliveryTag=4, consumerTag=amq.ctag-KXSHZ8D0wMQo7z2_L2LKsg, consumerQueue=canal_queue])
2022-06-14 14:42:04.818 INFO 73549 --- [ntContainer#0-1] c.l.canaldemo.rabbitmq.RabbitMQConsumer : {"data":[{"id":"8","username":"zhangtieniu","password":"123456","age":"28","sex":"1"}],"database":"test","es":1655188924000,"id":6,"isDdl":false,"mysqlType":{"id":"int(11)","username":"varchar(255)","password":"varchar(255)","age":"int(11)","sex":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"username":12,"password":12,"age":4,"sex":12},"table":"user","ts":1655188924822,"type":"INSERT"}
测试修改
UPDATE test
.user
SET username
= 'zhangsan', age = 23 WHERE id = 8;
查看控制台输出如下
2022-06-14 14:56:23.471 INFO 73549 --- [ntContainer#0-1] c.l.canaldemo.rabbitmq.RabbitMQConsumer : (Body:'[B@6a3a1f0(byte[446])' MessageProperties [headers={}, contentLength=0, redelivered=false, receivedExchange=canal.exchange, receivedRoutingKey=canal_routing_key, deliveryTag=5, consumerTag=amq.ctag-KXSHZ8D0wMQo7z2_L2LKsg, consumerQueue=canal_queue])
2022-06-14 14:56:23.471 INFO 73549 --- [ntContainer#0-1] c.l.canaldemo.rabbitmq.RabbitMQConsumer : {"data":[{"id":"8","username":"zhangsan","password":"123456","age":"23","sex":"1"}],"database":"test","es":1655189783000,"id":7,"isDdl":false,"mysqlType":{"id":"int(11)","username":"varchar(255)","password":"varchar(255)","age":"int(11)","sex":"varchar(255)"},"old":[{"username":"zhangtieniu","age":"28"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"username":12,"password":12,"age":4,"sex":12},"table":"user","ts":1655189783493,"type":"UPDATE"}
测试删除
DELETE FROM test
.user
WHERE id = 8;
查看控制台输出如下
2022-06-14 14:57:06.407 INFO 73549 --- [ntContainer#0-1] c.l.canaldemo.rabbitmq.RabbitMQConsumer : (Body:'[B@628caa50(byte[411])' MessageProperties [headers={}, contentLength=0, redelivered=false, receivedExchange=canal.exchange, receivedRoutingKey=canal_routing_key, deliveryTag=6, consumerTag=amq.ctag-KXSHZ8D0wMQo7z2_L2LKsg, consumerQueue=canal_queue])
2022-06-14 14:57:06.408 INFO 73549 --- [ntContainer#0-1] c.l.canaldemo.rabbitmq.RabbitMQConsumer : {"data":[{"id":"8","username":"zhangsan","password":"123456","age":"23","sex":"1"}],"database":"test","es":1655189826000,"id":8,"isDdl":false,"mysqlType":{"id":"int(11)","username":"varchar(255)","password":"varchar(255)","age":"int(11)","sex":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"username":12,"password":12,"age":4,"sex":12},"table":"user","ts":1655189826419,"type":"DELETE"}
6. canal admin管理canal
详情查看:https://gitee.com/zhengqingya/docker-compose
使用手册:https://github.com/alibaba/canal/wiki/Canal-Admin-Guide
原生安装:https://github.com/alibaba/canal/wiki/Canal-Admin-QuickStart
- 创建canal admin 数据库
canal_manager
- 运行初始化sql
Liunx/canal/canal_admin/canal_manager.sql
文件内容如下:
CREATE DATABASE /*!32312 IF NOT EXISTS*/ canal_manager
/*!40100 DEFAULT CHARACTER SET utf8 COLLATE utf8_bin */;
USE canal_manager
;
SET NAMES utf8;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for canal_adapter_config
-- ----------------------------
DROP TABLE IF EXISTS canal_adapter_config
;
CREATE TABLE canal_adapter_config
(
id
bigint(20) NOT NULL AUTO_INCREMENT,
category
varchar(45) NOT NULL,
name
varchar(45) NOT NULL,
status
varchar(45) DEFAULT NULL,
content
text NOT NULL,
modified_time
timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (id
)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
-- Table structure for canal_cluster
-- ----------------------------
DROP TABLE IF EXISTS canal_cluster
;
CREATE TABLE canal_cluster
(
id
bigint(20) NOT NULL AUTO_INCREMENT,
name
varchar(63) NOT NULL,
zk_hosts
varchar(255) NOT NULL,
modified_time
timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (id
)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
-- Table structure for canal_config
-- ----------------------------
DROP TABLE IF EXISTS canal_config
;
CREATE TABLE canal_config
(
id
bigint(20) NOT NULL AUTO_INCREMENT,
cluster_id
bigint(20) DEFAULT NULL,
server_id
bigint(20) DEFAULT NULL,
name
varchar(45) NOT NULL,
status
varchar(45) DEFAULT NULL,
content
text NOT NULL,
content_md5
varchar(128) NOT NULL,
modified_time
timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (id
),
UNIQUE KEY sid_UNIQUE
(server_id
)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
-- Table structure for canal_instance_config
-- ----------------------------
DROP TABLE IF EXISTS canal_instance_config
;
CREATE TABLE canal_instance_config
(
id
bigint(20) NOT NULL AUTO_INCREMENT,
cluster_id
bigint(20) DEFAULT NULL,
server_id
bigint(20) DEFAULT NULL,
name
varchar(45) NOT NULL,
status
varchar(45) DEFAULT NULL,
content
text NOT NULL,
content_md5
varchar(128) DEFAULT NULL,
modified_time
timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (id
),
UNIQUE KEY name_UNIQUE
(name
)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
-- Table structure for canal_node_server
-- ----------------------------
DROP TABLE IF EXISTS canal_node_server
;
CREATE TABLE canal_node_server
(
id
bigint(20) NOT NULL AUTO_INCREMENT,
cluster_id
bigint(20) DEFAULT NULL,
name
varchar(63) NOT NULL,
ip
varchar(63) NOT NULL,
admin_port
int(11) DEFAULT NULL,
tcp_port
int(11) DEFAULT NULL,
metric_port
int(11) DEFAULT NULL,
status
varchar(45) DEFAULT NULL,
modified_time
timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (id
)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
-- Table structure for canal_user
-- ----------------------------
DROP TABLE IF EXISTS canal_user
;
CREATE TABLE canal_user
(
id
bigint(20) NOT NULL AUTO_INCREMENT,
username
varchar(31) NOT NULL,
password
varchar(128) NOT NULL,
name
varchar(31) NOT NULL,
roles
varchar(31) NOT NULL,
introduction
varchar(255) DEFAULT NULL,
avatar
varchar(255) DEFAULT NULL,
creation_date
timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (id
)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
SET FOREIGN_KEY_CHECKS = 1;
-- ----------------------------
-- Records of canal_user
-- ----------------------------
BEGIN;
INSERT INTO canal_user
VALUES (1, 'admin', '6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9', 'Canal Manager', 'admin', NULL, NULL, '2019-07-14 00:05:28');
COMMIT;
SET FOREIGN_KEY_CHECKS = 1;
docker-compose.yaml
如下
version: '3'
services:
canal_admin:
image: canal/canal-admin:v1.1.6
container_name: canal_admin
restart: unless-stopped
volumes:
- "./canal/canal-admin/logs:/home/admin/canal-admin/logs"
environment:
TZ: Asia/Shanghai
LANG: en_US.UTF-8
canal.adminUser: admin
canal.adminPasswd: 123456
spring.datasource.address: my_mysql:3306
spring.datasource.database: canal_manager
spring.datasource.username: root
spring.datasource.password: 123456
ports:
- "8089:8089"
networks:
- canal
- mysql_mysql
canal_server:
image: canal/canal-server:v1.1.6
container_name: canal_server
restart: unless-stopped
volumes:
- "./canal/canal-server/logs:/home/admin/canal-server/logs"
environment:
TZ: Asia/Shanghai
LANG: en_US.UTF-8
canal.admin.manager: canal_admin:8089
canal.admin.port: 11110
canal.admin.user: admin
canal.admin.passwd: 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9
ports:
- "11110:11110"
- "11111:11111"
- "11112:11112"
depends_on:
- canal_admin
links:
- canal_admin
networks:
- canal
- mysql_mysql
networks:
canal:
mysql_mysql:
external: true
- 启动服务/访问
http://localhost:8089/#/canalServer/nodeServers
Original: https://www.cnblogs.com/ludangxin/p/16376932.html
Author: 张铁牛
Title: Canal-监听数据库表的变化
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/581124/
转载文章受原作者版权保护。转载请注明原作者出处!