一、环境准备
1、jdk 8+
2、mysql 5.7+
3、Elasticsearch 7+
4、kibana 7+
5、canal.adapter 1.1.5
二、部署
一、创建数据库CanalDb和表UserInfo
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for UserInfo
-- ----------------------------
DROP TABLE IF EXISTS UserInfo
;
CREATE TABLE UserInfo
(
id
int(11) NOT NULL AUTO_INCREMENT,
user_name
varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
phone
varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
age
int(11) DEFAULT NULL,
PRIMARY KEY (id
) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;
SET FOREIGN_KEY_CHECKS = 1;
二、kibana创建索引
PUT canal_product
{
"mappings": {
"properties": {
"user_name": {
"type": "text"
},
"phone": {
"type": "text"
},
"age": {
"type": "integer"
}
}
}
}
三、下载安装canal.adapter
github:https://github.com/alibaba/canal/releases/tag/canal-1.1.5
额外需要下载v1.1.5-alpha-2快照版本的canal.adapter-1.1.5.tar.gz(release1.1.5版本的jar包有bug)
分别解压缩后,将v1.1.5-alpha-2解压缩文件夹下plugin文件夹中的 client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar 替换掉release版本的plugin文件的 client-adapter.es7x-1.1.5-jar-with-dependencies.jar,并重命名,再将该jar赋予权限 chmod 777 client-adapter.es7x-1.1.5-jar-with-dependencies.jar
1、解压并修改配置文件 conf/application.yml
只需要修改特定的几处即可,关于各节点说明可参考官方说明: https://help.aliyun.com/document_detail/135297.html
srcDataSources:
defaultDS:
url: jdbc:mysql://127.0.0.1:3306/CanalDb?useUnicode=true&characterEncoding=utf-8&useSSL=false
username: canal
password: canal
- name: es7
hosts: 127.0.0.1:9200 # 127.0.0.1:9200 for rest mode ,127.0.0.1:9003 for transport mode
properties:
mode: rest #transport or rest
security.auth: es:22222 # only used for rest mode
cluster.name: elasticsearch # es集群节点名称
2、启动服务
启动服务
./bin/startup.sh
3、查看日志是否启动成功
cat logs/adapter/adapter.log
如图所示
4、实时同步
向数据库中插入一条数据
INSERT INTO CanalDb
.UserInfo
( user_name
, phone
, age
) VALUES ('张三', '10086', 99);
查看日志
kibana查看索引数据
GET canal_product/_search
5、全量同步,修改conf/es7/mytest_user.yml配置文件,或者新建一个yml文件也可
dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值
destination: example # canal的instance或者MQ的topic
groupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据
esMapping:
_index: canal_product # es 的索引名称
_id: _id # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配
sql: "SELECT
p.id as _id,
p.user_name,
p.phone,
p.age
FROM
UserInfo p " # sql映射
etlCondition: "where p.id>={}" #etl的条件参数
commitBatch: 3000 # 提交批大小
curl -X POST http://127.0.0.1:8081/etl/es7/mytest_user.yml
学习链接:https://help.aliyun.com/document_detail/135297.html
Original: https://www.cnblogs.com/sportsky/p/16536264.html
Author: SportSky
Title: canal-1.1.5实时同步MySQL数据到Elasticsearch
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/609206/
转载文章受原作者版权保护。转载请注明原作者出处!