实现MySQL同步数据到ES构建宽表

Ceven,德勤乐融(北京)科技有限公司
邮箱:likailin@deqinyuerong.com

能力特点包括:

  • 灵活,支持反查打宽表,特定逻辑数据清洗,对账,告警等场景
  • 调试方便,通过任务参数配置自动打开 debug 端口,对接 IDE 调试
  • SDK 接口清晰,提供丰富的上下文信息,方便数据逻辑开发

本文基于我们业务中的实际需求(MySQL -> ElasticSearch 宽表构建),梳理一下具体的开发调试流程,希望对大家有所帮助。

MySQL 擅长关系型数据操作,我们在其中存储了 product, tag, product_tag_mapping 表数据,用以表示 产品标签之间多对多关系。精简的数据结构如下:

ElasticSearch 擅长搜索,但是并不支持不同索引间的联合查询, 所以构造宽表是业界刚需。我们存储其上的产品索引结构如下:

PUT es_product
{
  "mappings" : {
    "properties" : {
      "id" : {
        "type" : "integer"
      },
      "name" : {
        "type" : "text"
      },
      "tags" : {
        "type" : "nested",
        "properties" : {
          "id" : {
            "type" : "integer"
          },
          "name" : {
            "type" : "text"
          }
        }
      }
    }
  }
}

CloudCanal 在 同步 MySQL -> ElasticSearch 数据过程中,会兼顾 全量增量两种情况,我们可以创建 两个独立的任务,分别同步产品的基础信息和附加信息(即标签信息)。

  • 基础信息任务
  • 使用基本的映射关系,将 MySQL 中的 product 数据表,映射到 es_product 索引中,即可保证全量和增量的数据同步。
  • 附加信息任务
  • 创建 CloudCanal 任务将 MySQL 中的 product_tag_mapping 数据表映射到 es_product 索引中,同步过程中反查源数据库中的 tag 信息,构造宽表数据,填充进 es_product 索引,实现附加信息全量和增量的数据同步。

1. MySQL 表结构初始化

创建产品信息表
CREATE TABLE product (
  id bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  name varchar(64) COLLATE utf8_unicode_ci NOT NULL DEFAULT '' COMMENT '名称',
  PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci COMMENT='产品信息记录表';

创建标签信息表
CREATE TABLE tag (
  id bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  name varchar(64) COLLATE utf8_unicode_ci NOT NULL DEFAULT '' COMMENT '名称',
  PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci COMMENT='标签信息记录表';

创建产品标签关系表
CREATE TABLE product_tag_mapping (
  id bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  product_id bigint(20) unsigned NOT NULL DEFAULT '0' COMMENT '产品ID',
  tag_id bigint(20) unsigned NOT NULL DEFAULT '0' COMMENT '标签ID',
  PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci COMMENT='产品标签关系表';

2. MySQL 填充测试数据

填充产品信息
INSERT INTO product (name)
VALUES
    ('product_1');

填充标签信息
INSERT INTO tag (name)
VALUES
    ('tag_1'),
    ('tag_2');

填充产品标签关系信息
INSERT INTO product_tag_mapping (product_id, tag_id)
VALUES
    (1, 1);

3. ElasticSearch 索引创建(也可以使用 CloudCanal 结构迁移)

PUT es_product
{
  "mappings" : {
    "properties" : {
      "id" : {
        "type" : "integer"
      },
      "name" : {
        "type" : "text"
      },
      "tags" : {
        "type" : "nested",
        "properties" : {
          "id" : {
            "type" : "integer"
          },
          "name" : {
            "type" : "text"
          }
        }
      }
    }
  }
}

4. 编写自定义代码

初始化的项目需要手工配置一下 pom.xml 文件,将 sdk 指向本地目录文件,代码片段如下


    com.clougence.cloudcanal
    cloudcanal-sdk
    1.0.0-SNAPSHOT
    system

        /path/to/your/project/src/main/resources/lib/cloudcanal-sdk-2.0.0.9-SNAPSHOT.jar

public class Tag {
    private int id;
    private String name;

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}
        @Override
        public List process(List list, CustomProcessorContext context) {
            DataSource dataSource = (DataSource) context.getProcessorContextMap().get(RdbContextKey.SOURCE_DATASOURCE);
            String stage = context.getProcessorContextMap().get("currentTaskStage").toString();

            for (CustomRecord record : list) {
                try (Connection connection = dataSource.getConnection(); Statement statement = connection.createStatement()) {

                    // 由于 ES 的嵌套结构会被认为是独立的文档,故需要填充旧的数据
                    ResultSet rs = statement.executeQuery("SELECT tag.id, tag.name" +
                            " FROM product.product_tag_mapping AS mapping" +
                            " LEFT JOIN product.tag AS tag ON tag.id = mapping.tag_id" +
                            " WHERE mapping.product_id = " + record.getFieldMapAfter().get("product_id").getValue()
                    );

                    List tags = buildTags(rs);
                    if ("INCREMENT".equals(stage)) {
                        // 增量创建的 product_tag_mapping 处于内存中,无法通过 SQL 语句查询得到,故需要单独处理
                        rs = statement.executeQuery("SELECT id, name FROM product.tag WHERE id = " + record.getFieldMapAfter().get("tag_id").getValue().toString());
                        List newTags = buildTags(rs);
                        tags.add(newTags.get(0));
                    }

                    ObjectMapper mapper = new ObjectMapper();
                    String json = mapper.writeValueAsString(tags);
                    Map tagField = new LinkedHashMap<>();
                    tagField.put("tags", json);
                    RecordBuilder.modifyRecordBuilder(record)
                            .addField(tagField)
                            .build();
                } catch (SQLException | JsonProcessingException e) {
                    e.printStackTrace();
                }
            }
            return list;
        }

        private List buildTags(ResultSet rs) throws SQLException {
            List tags = new ArrayList<>();
            while (rs.next()) {
                Tag tag = new Tag();
                tag.setId(rs.getInt("id"));
                tag.setName(rs.getString("name"));
                tags.add(tag);
            }
            return tags;
        }

执行如下命令编译生成自定义代码包, 之后会在 target 目录中生成 jar 文件

mvn clean package -Dmaven.test.skip=true -Dmaven.compile.fork=true

5. 创建 CloudCanal 任务

全量增量同步 product 信息到 es_product 索引,在此就不做具体描述,详情请参考 CloudCanal 文档。

此时查询产品数据,得到结果

可去掉自动启动任务选项,以便于单步追踪调试

Tips: &#x53EA;&#x914D;&#x7F6E;&#x589E;&#x52A0;&#x64CD;&#x4F5C;&#xFF0C;&#x4E0D;&#x8981;&#x914D;&#x7F6E;&#x7F16;&#x8F91;&#x548C;&#x5220;&#x9664;&#xFF0C;&#x5426;&#x5219;&#x53EF;&#x80FD;&#x9020;&#x6210;&#x5BF9;&#x6570;&#x636E;&#x7684;&#x8BEF;&#x5220;&#xFF1B;
      &#x7F16;&#x8F91;&#x548C;&#x5220;&#x9664;&#x64CD;&#x4F5C;&#xFF0C;&#x53EA;&#x6700;&#x597D;&#x4F7F;&#x7528; ES &#x8C03;&#x7528;&#x7684;&#x65B9;&#x5F0F;&#x8FDB;&#x884C;&#x5904;&#x7406;&#xFF1B;
      &#x589E;&#x52A0;&#x64CD;&#x4F5C;&#x6700;&#x597D;&#x4E0D;&#x8981;&#x4F7F;&#x7528; ES &#x8C03;&#x7528;&#x7684;&#x65B9;&#x5F0F;&#x5904;&#x7406;&#xFF0C;&#x4F1A;&#x5F15;&#x8D77;&#x9AD8;&#x5E76;&#x53D1;&#x95EE;&#x9898;&#x3002;
Tips: &#x521B;&#x5EFA;&#x4EFB;&#x52A1;&#x65F6;&#x5982;&#x679C;&#x4E0D;&#x4E0A;&#x4F20;&#x81EA;&#x5B9A;&#x4E49;&#x4EE3;&#x7801;&#x5305;&#xFF0C;&#x4E4B;&#x540E;&#x5C06;&#x65E0;&#x6CD5;&#x4E0A;&#x4F20;&#xFF0C;&#x9664;&#x975E;&#x91CD;&#x5EFA;&#x4EFB;&#x52A1;&#x3002;
      &#x4E0A;&#x4F20;&#x81EA;&#x5B9A;&#x4E49;&#x4EE3;&#x7801;&#xFF0C;&#x610F;&#x5473;&#x7740;&#x521B;&#x5EFA;&#x7279;&#x6B8A;&#x7C7B;&#x578B;&#x7684;&#x4EFB;&#x52A1;&#xFF0C;&#x7136;&#x540E;&#x624D;&#x4F1A;&#x51FA;&#x73B0;&#x7279;&#x6B8A;&#x7684;&#x9009;&#x9879;&#x8FDB;&#x884C;&#x5B57;&#x6BB5;&#x6620;&#x5C04;&#x3002;

将 id 和 tag_id 调整为 “只订阅不同步”(老版本此处会显示为仅供自定义代码使用),实现只订阅这两个字段,而不会真正写入到 ES 索引,而将 product_id 映射到对端的 id。

设置映射 _id,以指定目标 ES 索引中的 id 为 product_id

Tips: product_id &#x5B57;&#x6BB5;&#x5FC5;&#x987B;&#x505A;&#x6620;&#x5C04;&#xFF0C;&#x5426;&#x5219;&#x5373;&#x4F7F;&#x914D;&#x7F6E;&#x4E86; _id &#x4FE1;&#x606F;&#xFF0C;&#x4F9D;&#x65E7;&#x65E0;&#x6CD5;&#x6B63;&#x5E38;&#x6267;&#x884C;&#xFF0C;&#x4F1A;&#x5FFD;&#x7565; product_id &#x5B57;&#x6BB5;&#x7684;&#x503C;&#x3002;

6. 同步结果

自定义代码在开发阶段最麻烦的事情是如何高效进行调试,CloudCanal 能够比较友好的让开发在本地直接调试代码逻辑。

修改任务参数

任务详情-> 参数修改

Tips&#xFF1A;&#x6BCF;&#x6B21;&#x4FEE;&#x6539;&#x5B8C;&#x53C2;&#x6570;&#x4FE1;&#x606F;&#x4E4B;&#x540E;&#xFF0C;&#x5FC5;&#x987B;&#x70B9;&#x51FB;&#x751F;&#x6548;&#x914D;&#x7F6E;&#x548C;&#x91CD;&#x542F;&#x4EFB;&#x52A1;&#xFF1B;
      &#x5728;&#x4EFB;&#x52A1;&#x8BE6;&#x60C5;&#x914D;&#x7F6E;&#x4E2D;&#xFF0C;&#x4E5F;&#x53EF;&#x4EE5;&#x4E0A;&#x4F20;&#x65B0;&#x7684;&#x4EE3;&#x7801;&#x5305;&#xFF0C;&#x6FC0;&#x6D3B;&#x548C;&#x91CD;&#x542F;&#x4EFB;&#x52A1;&#x540E;&#x53EF;&#x4EE5;&#x4F7F;&#x7528;&#x3002;

配置 IntelliJ IDEA Debug 模式

Tips: &#x8BBE;&#x7F6E;&#x597D;&#x65AD;&#x70B9;&#x4EE5;&#x540E;&#xFF0C;&#x9700;&#x8981;&#x5148;&#x542F;&#x52A8; CloudCanal &#x4EFB;&#x52A1;&#xFF0C;&#x518D;&#x70B9;&#x51FB; debug &#x6309;&#x94AE;&#xFF0C;&#x624D;&#x80FD; Attach &#x5230;&#x8FDC;&#x7A0B;&#x7684; 8787 &#x7AEF;&#x53E3;&#xFF1B;
      CloudCanal &#x4F1A;&#x4E00;&#x76F4; pending&#xFF0C;&#x76F4;&#x5230;&#x6709; Attachment&#xFF0C;&#x624D;&#x4F1A;&#x7EE7;&#x7EED;&#x6267;&#x884C;&#xFF0C;&#x6240;&#x4EE5;&#x4E0D;&#x9700;&#x8981;&#x5355;&#x6B65;&#x8DDF;&#x8E2A;&#x8C03;&#x8BD5;&#x65F6;&#xFF0C;&#x4E00;&#x5B9A;&#x8BB0;&#x5F97;&#x5173;&#x95ED;&#x8C03;&#x8BD5;&#x6A21;&#x5F0F;&#xFF0C;&#x5426;&#x5219;&#x4EFB;&#x52A1;&#x65E0;&#x6CD5;&#x6267;&#x884C;&#x3002;

CloudCanal 自定义代码能够拓展的能力具有不错的想象空间,我们甚至能加入一些在线业务逻辑的处理,让业务需求能够更好的满足,同时配合社区版调试也很方便。希望未来这块能力在便利功能,性能等层面有更好的表现。

CloudCanal 会不断提供一些预览的能力,包括新数据链路, 优化能力,功能插件。本文所描述的自定义代码能力目前也处于内测阶段。如需体验,可添加我们小助手(微信号:suhuayue001)进行了解和试用。

Original: https://www.cnblogs.com/clougence/p/15773827.html
Author: clougence
Title: 实现MySQL同步数据到ES构建宽表

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/711053/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球