Debezium的基本使用(以MySQL为例)

  • GreatSQL社区原创内容未经授权不得随意使用,转载请联系小编并注明来源。
  • GreatSQL是MySQL的国产分支版本,使用上与MySQL一致。

一、Debezium介绍

摘自官网:

Debezium is a set of distributed services to capture changes in your databases so that your applications can see those changes and respond to them. Debezium records all row-level changes within each database table in a change event stream, and applications simply read these streams to see the change events in the same order in which they occurred.

简单理解就是Debezium可以捕获数据库中所有行级的数据变化并包装成事件流顺序输出。

二、基本使用

下面以MySQL为例介绍Debezium的基本使用。

CREATE USER 'dbz'@'%' IDENTIFIED BY 'dbzpwd';

GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'dbz' IDENTIFIED BY 'dbzpwd';
SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::" FROM information_schema.global_variables WHERE variable_name='log_bin';

-- If the following error occurs: The 'INFORMATION_SCHEMA.GLOBAL_VARIABLES' feature is disabled...

-- please execute the given SQL again after execute this SQL: set global show_compatibility_56=on;

如果是 OFF则需要修改MySQL配置文件,类似下面这样:

server-id         = 223344      #必须有
log_bin           = mysql-bin   #log_bin的值是binlog文件序列的基本名称
binlog_format     = ROW             #必须是ROW
binlog_row_image  = FULL            #必须是FULL
expire_logs_days  = 10              #依据实际情况而定
create database inventory;
create table inventory.a (id bigint primary key auto_increment, name varchar(32));
insert into inventory.a values (null, 'n1'),(null, 'n2'),(null, 'n3');

pom.xml


    io.debezium
    debezium-api
    ${version.debezium}

    io.debezium
    debezium-embedded
    ${version.debezium}

    io.debezium
    debezium-connector-mysql
    ${version.debezium}

目前Debezium最新稳定版本为: 1.9.5.Final

create database inventory;
create table inventory.a (id bigint primary key, name varchar(32));
insert into inventory.a values (1, 'n1'),(2, 'n2'),(3, 'n3');
package com.greatdb.dbzdemo;

import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;

/**
 * @author wang.jianwen
 * @version 1.0
 * @date 2022/07/29
 */
public class DebeziumTest {

    private static DebeziumEngine> engine;

    public static void main(String[] args) throws Exception {
        final Properties props = new Properties();
        props.setProperty("name", "dbz-engine");
        props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");

        //offset config begin - 使用文件来存储已处理的binlog偏移量
        props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
        props.setProperty("offset.storage.file.filename", "/tmp/dbz/storage/mysql_offsets.dat");
        props.setProperty("offset.flush.interval.ms", "0");
        //offset config end

        props.setProperty("database.server.name", "mysql-connector");
        props.setProperty("database.history", "io.debezium.relational.history.FileDatabaseHistory");
        props.setProperty("database.history.file.filename", "/tmp/dbz/storage/mysql_dbhistory.txt");

        props.setProperty("database.server.id", "122112");  //需要与MySQL的server-id不同
        props.setProperty("database.hostname", "tmg");
        props.setProperty("database.port", "3306");
        props.setProperty("database.user", "mysqluser");
        props.setProperty("database.password", "mysqlpw");
        props.setProperty("database.include.list", "inventory");//要捕获的数据库名
        props.setProperty("table.include.list", "inventory.a");//要捕获的数据表

        props.setProperty("snapshot.mode", "initial");//全量+增量

        // 使用上述配置创建Debezium引擎,输出样式为Json字符串格式
        engine = DebeziumEngine.create(Json.class)
                .using(props)
                .notifying(record -> {
                    System.out.println(record);//输出到控制台
                })
                .using((success, message, error) -> {
                    if (error != null) {
                        // 报错回调
                        System.out.println("------------error, message:" + message + "exception:" + error);
                    }
                    closeEngine(engine);
                })
                .build();

        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.execute(engine);
        addShutdownHook(engine);
        awaitTermination(executor);

        System.out.println("------------main finished.");
    }

    private static void closeEngine(DebeziumEngine> engine) {
        try {
            engine.close();
        } catch (IOException ignored) {
        }
    }

    private static void addShutdownHook(DebeziumEngine> engine) {
        Runtime.getRuntime().addShutdownHook(new Thread(() -> closeEngine(engine)));
    }

    private static void awaitTermination(ExecutorService executor) {
        if (executor != null) {
            try {
                executor.shutdown();
                while (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

程序运行后,您可以看到控制台输出:

[En]

After the program runs, you can see the console output:

...(省略)
EmbeddedEngineChangeEvent [key={"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"}],"optional":false,"name":"mysql_connector.inventory.a.Key"},"payload":{"id":1}}, value={"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"mysql_connector.inventory.a.Envelope"},"payload":{"before":null,"after":{"id":1,"name":"n1"},"source":{"version":"1.8.1.Final","connector":"mysql","name":"mysql-connector","ts_ms":1659064005186,"snapshot":"true","db":"inventory","sequence":null,"table":"a","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1659064005191,"transaction":null}}, sourceRecord=SourceRecord{sourcePartition={server=mysql-connector}, sourceOffset={ts_sec=1659064005, file=mysql-bin.000001, pos=154, snapshot=true}} ConnectRecord{topic='mysql-connector.inventory.a', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{mysql_connector.inventory.a.Key:STRUCT}, value=Struct{after=Struct{id=1,name=n1},source=Struct{version=1.8.1.Final,connector=mysql,name=mysql-connector,ts_ms=1659064005186,snapshot=true,db=inventory,table=a,server_id=0,file=mysql-bin.000001,pos=154,row=0},op=r,ts_ms=1659064005191}, valueSchema=Schema{mysql_connector.inventory.a.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]
EmbeddedEngineChangeEvent [key={"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"}],"optional":false,"name":"mysql_connector.inventory.a.Key"},"payload":{"id":2}}, value={"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"mysql_connector.inventory.a.Envelope"},"payload":{"before":null,"after":{"id":2,"name":"n2"},"source":{"version":"1.8.1.Final","connector":"mysql","name":"mysql-connector","ts_ms":1659064005195,"snapshot":"true","db":"inventory","sequence":null,"table":"a","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1659064005196,"transaction":null}}, sourceRecord=SourceRecord{sourcePartition={server=mysql-connector}, sourceOffset={ts_sec=1659064005, file=mysql-bin.000001, pos=154, snapshot=true}} ConnectRecord{topic='mysql-connector.inventory.a', kafkaPartition=null, key=Struct{id=2}, keySchema=Schema{mysql_connector.inventory.a.Key:STRUCT}, value=Struct{after=Struct{id=2,name=n2},source=Struct{version=1.8.1.Final,connector=mysql,name=mysql-connector,ts_ms=1659064005195,snapshot=true,db=inventory,table=a,server_id=0,file=mysql-bin.000001,pos=154,row=0},op=r,ts_ms=1659064005196}, valueSchema=Schema{mysql_connector.inventory.a.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]
EmbeddedEngineChangeEvent [key={"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"}],"optional":false,"name":"mysql_connector.inventory.a.Key"},"payload":{"id":3}}, value={"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"mysql_connector.inventory.a.Envelope"},"payload":{"before":null,"after":{"id":3,"name":"n3"},"source":{"version":"1.8.1.Final","connector":"mysql","name":"mysql-connector","ts_ms":1659064005196,"snapshot":"last","db":"inventory","sequence":null,"table":"a","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1659064005196,"transaction":null}}, sourceRecord=SourceRecord{sourcePartition={server=mysql-connector}, sourceOffset={ts_sec=1659064005, file=mysql-bin.000001, pos=154}} ConnectRecord{topic='mysql-connector.inventory.a', kafkaPartition=null, key=Struct{id=3}, keySchema=Schema{mysql_connector.inventory.a.Key:STRUCT}, value=Struct{after=Struct{id=3,name=n3},source=Struct{version=1.8.1.Final,connector=mysql,name=mysql-connector,ts_ms=1659064005196,snapshot=last,db=inventory,table=a,server_id=0,file=mysql-bin.000001,pos=154,row=0},op=r,ts_ms=1659064005196}, valueSchema=Schema{mysql_connector.inventory.a.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]
...(省略)

可以看到,所有数据都已输出,关键数据如下:

[En]

You can see that all the data has been output, and the key data are as follows:

..."payload":{"before":null,"after":{"id":1,"name":"n1"}..."op":"r"...

..."payload":{"before":null,"after":{"id":2,"name":"n2"}..."op":"r"...

..."payload":{"before":null,"after":{"id":3,"name":"n3"}..."op":"r"...

  • 接下来 新增一条数据:
insert into inventory.a values (4, 'n4');

控制台输出:

..."payload":{"before":null,"after":{"id":4,"name":"n4"}..."op":"c"...

  • 修改一条数据:
update inventory.a set name = 'n4-upd' where id = 4;

控制台输出:

..."payload":{"before":{"id":4,"name":"n4"},"after":{"id":4,"name":"n4-upd"}..."op":"u"...

  • 删除一条数据:
delete from inventory.a where id = 1;

控制台输出:

..."payload":{"before":{"id":1,"name":"n1"},"after":null..."op":"d"...

三、总结

本文以MySQL为例介绍了Debezium在代码中基本使用流程,对MySQL的数据进行常见的增删改操作,Debezium将捕获这些数据行的变化,并记录了数据行变化前后的数据,并对外提供事件流,外部可以获取并对事件进行相应处理。

Original: https://www.cnblogs.com/greatsql/p/16607170.html
Author: GreatSQL
Title: Debezium的基本使用(以MySQL为例)

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/504980/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • mysql约束

    一、表约束 PK主键约束(索引)唯一约束 非空 默认值 在关系数据库,一个表中,只能有一个主键(Primary Key),有些数据库没有pk,系统报出错误。 在myql数据库,建立…

    数据库 2023年6月9日
    0115
  • MySQL第1章——数据库概述

    数据库概述 为什么要使用数据库 什么是数据持久化? 数据持久化就是把数据保存到可掉电式存储设备中供以后使用。大多数情况下,特别是企业级应用,数据持久化意味着将内存中的数据保存到硬盘…

    数据库 2023年6月14日
    0137
  • MySQL8.0 新特性 Hash Join

    概述&背景 MySQL一直被人诟病没有实现HashJoin,最新发布的8.0.18已经带上了这个功能,令人欣喜。有时候在想,MySQL为什么一直不支持HashJoin呢?我…

    数据库 2023年6月9日
    0142
  • 异步线程里的日志不好追踪?小支一招,轻松搞定!

    众所周知,通过唯一的链路id来追踪一次请求的所有日志,对于排查生产问题来说,会是非常给力的。这个比较容易实现。我之前的博客也有多次提及 ▄︻┻┳═一 https://www.cnb…

    数据库 2023年6月9日
    0162
  • 【黄啊码】MySQL复制以及调优

    一. 简介 MySQL自带复制方案,带来好处有: 数据备份。负载均衡。分布式数据。 概念介绍: 主机(master):被复制的数据库。从机(slave):复制主机数据的数据库。 复…

    数据库 2023年6月16日
    0163
  • TypeScript语言基础

    一、什么是TypeScript 编程语言包括动态类型语言和静态类型语言。动态类型语言是指在程序运行阶段才检查变量数据类型的语言,在定义变量时不需要指定变量的数据类型,通常在编译时不…

    数据库 2023年6月14日
    0137
  • Java面向对象(上)

    Java面向对象(上) 一、面向对象的思想 1、面向过程: 面向过程就是分析出解决问题所需要的步骤,然后用函数把这些步骤逐一实现,使用的时候依次调用就可以了。 2、面向对象: 面向…

    数据库 2023年6月11日
    0145
  • 数据结构与算法-农夫过河问题

    农夫过河问题——最短路径算法 问题描述:农夫用小木筏将狼、羊、菜从起始岸运到目标岸,小木筏每次只能带一种物品,也可以什么都不带,因为食物链的关系,人不在的时候,狼会吃羊,羊会吃菜,…

    数据库 2023年6月14日
    0172
  • 配置中心的设计-nacos vs apollo

    和 apollo 一样,nacos 也是一款配置中心,同样可以实现配置的集中管理、分环境管理、即时生效等等。不过,nacos 还具备了服务发现的功能。 分析 apollo 时,我们…

    数据库 2023年6月6日
    0201
  • MySQL中读页缓冲区buffer pool

    Buffer pool 我们都知道我们读取页面是需要将其从磁盘中读到内存中,然后等待CPU对数据进行处理。我们直到从磁盘中读取数据到内存的过程是十分慢的,所以我们读取的页面需要将其…

    数据库 2023年5月24日
    0162
  • Element UI的第一个程序(标签使用)

    1:Element UI 官方文档:https://element.faas.ele.me/ 2:Element UI是什么? 网站快速成型工具 *Element,一套为开发者、设…

    数据库 2023年6月6日
    0157
  • 百万数据 mysql count(*)优化

    Original: https://www.cnblogs.com/sunshine-blast/p/16272978.htmlAuthor: 魁哥2020Title: 百万数据 …

    数据库 2023年5月24日
    0134
  • SQLZOO练习7–Using NULL

    teacher表: iddeptnamephonemobile 101 1 Shrivell 2753 07986 555 1234 102 1 Throd 2754 07122 …

    数据库 2023年6月16日
    0102
  • mysql对属性的增删改

    修改表 alter table 创建表db 查看表 desc与describe desc table 查看建表语句show create table t1; 修改表名 alter …

    数据库 2023年6月9日
    0164
  • 23种设计模式之中介者模式(Mediator)

    文章目录 概述 中介者模式的优缺点 中介者模式的使用场景 中介者模式的结构和实现 * 模式结构 模式实现 总结 概述 用一个中介对象来封装一系列的对象交互,中介者使各对象不需要显式…

    数据库 2023年6月6日
    0141
  • 如何识别 SQL Server 的版本

    本文介绍如何识别当前的Microsoft SQL Server 版本号和相应的产品或Service Pack 级别。同时介绍如何识别正在使用的SQL Server 具体版本。 如何…

    数据库 2023年6月11日
    0103
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球