flink lookup join tps 测试 hbase mysql starrocks

数据仓库的数据仓库层是关联流表时最重要的拓宽流表的方式。项目中最常用的维表存储是HBase/MySQL/redis,主要用于大表、小表和高性能查询三种流表关联查询场景。

[En]

The DW layer of the data warehouse is the most important way to widen the flow table when associating the flow table. The most frequently used dimension table storage in the project is hbase/mysql/redis, which is used in three flow table association query scenarios: large table, small table and high performance query.

最近一段时间对STARROCK的研究和使用发现,用STARROCK来存储维表似乎也很好,所以做这样的测试来测试HBase/MySQL/STARROKS/REDIS对于维表的TPS水平,以及是否可以用STARROCK来代替MySQL和HBase。

[En]

Recent research and use of starrocks for a period of time, found that using starrocks for the storage of dimension tables also seems to be very good, so do such a test to test the level of tps of hbase/mysql/starrocks/redis for dimension tables, and whether starrocks can be used instead of mysql and hbase.

测试服务器

CDH 6.2.1
服务器:3个64G 16核

[En]

Server: 3 64G 16 cores

CDH 所有组件都安装在这 3 台服务器上

流数据

测试流旨在从Kafka读取数据,并包含以下字段

[En]

The test flow is designed to read data from kafka and contains the following fields

CREATE TABLE user_log
(
    user_id     STRING,
    item_id     STRING,
    category_id STRING,
    behavior    STRING,
    page        STRING,
    position    STRING,
    sort        STRING,
    last_page   STRING,
    next_page   STRING,
    ts          TIMESTAMP(3),
    process_time as proctime(),
    WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
      'connector' = 'kafka'
      ,'topic' = 'user_behavior'
      ,'properties.bootstrap.servers' = 'localhost:9092'
      ,'properties.group.id' = 'user_log'
      ,'scan.startup.mode' = 'latest-offset'
      ,'format' = 'json'
);

维表设计

维度表包含以下字段

[En]

The dimension table contains the following fields

user_id: 10位数字字符串,不足10位,补 0,反转
性别:男,女,未知<details><summary>[En]</summary>Gender: man, woman, unknown</details>
年龄:1-100位<details><summary>[En]</summary>Age: 1-100 digits</details>
学历:小学、初中、高中、本科、研究生、博士生<details><summary>[En]</summary>Education: primary school, junior high school, senior high school, undergraduate, postgraduate, doctoral student</details>
地址:uuid<details><summary>[En]</summary>Address: uuid</details>
工作地点:UUID<details><summary>[En]</summary>Work place: uuid</details>
收入范围:1至10之间的随机数<details><summary>[En]</summary>Income range: random numbers from 1 to 10</details>
默认发货地址:uuid<details><summary>[En]</summary>Default shipping address: uuid</details>
注册时间:数据写入时间<details><summary>[En]</summary>Registration time: data write time</details>
修改时间:数据写入时间<details><summary>[En]</summary>Modification time: data write time</details>

维度表写入流程如下:

[En]

The dimension table write stream is as follows:


CREATE TABLE user_info
(
    user_id     STRING,
    sex     STRING,
    age     integer,
    degree    STRING,
    address        STRING,
    work_address    STRING,
    income_range        STRING,
    default_shipping_address   STRING,
    register_date   timestamp(3),
    udpate_date          TIMESTAMP(3),
) WITH (
      'connector' = 'kafka'
      ,'topic' = 'user_behavior'
      ,'properties.bootstrap.servers' = 'dcmp10:9092,dcmp11:9092,dcmp12:9092'
      ,'properties.group.id' = 'user_log'
      ,'scan.startup.mode' = 'latest-offset'
      ,'format' = 'json'
);
  • 注:设计维度表数据为100万条,实际写入99万条,未找到剩余的1万条模拟流表关联。
    [En]

    * Note: the design dimension table data is 1 million, 990,000 items are actually written, and the remaining 10,000 simulated flow table association cannot be found.

hbase 表

创建hbase 表

create 'user_info', { NAME => 'f',IN_MEMORY => 'true'}

写入数据

drop table if exists hbase_user_info_sink;CREATE TABLE hbase_user_info_sink(    user_id STRING,    f      ROW(sex                      STRING,        age                      INTEGER,        degree                   STRING,        address                  STRING,        work_address             STRING,        income_range             STRING,        default_shipping_address STRING,        register_date            TIMESTAMP(3),        udpate_date              TIMESTAMP(3))) WITH (      'connector' = 'hbase-2.2'      ,'zookeeper.quorum' = 'dcmp10:2181,dcmp11:2181,dcmp12:2181'      ,'zookeeper.znode.parent' = '/hbase'      ,'table-name' = 'user_info'    ,'sink.buffer-flush.max-size' = '10mb'    ,'sink.buffer-flush.max-rows' = '2000'      );insert into hbase_user_info_sinkselect user_id, row(sex, age, degree, address, work_address, income_range,default_shipping_address, register_date, udpate_date)from user_info;

hbase 数据

hbase(main):002:0> count 'user_info',INTERVAL=100000(hbase):2: warning: already initialized constant INTERVALCurrent count: 100000, row: 1010090000Current count: 200000, row: 2020190000Current count: 300000, row: 3030290000Current count: 400000, row: 4040390000Current count: 500000, row: 5050490000Current count: 600000, row: 6060590000Current count: 700000, row: 7070690000Current count: 800000, row: 8080790000Current count: 900000, row: 9090890000990000 row(s)Took 30.5924 seconds=> 990000

tpc 测试

测试SQL如下:

[En]

The test sql is as follows:

-- Lookup Source: Sync Mode
-- kafka source
CREATE TABLE user_log
(
    user_id     STRING,
    item_id     STRING,
    category_id STRING,
    behavior    STRING,
    page        STRING,
    position    STRING,
    sort        STRING,
    last_page   STRING,
    next_page   STRING,
    ts          TIMESTAMP(3),
    process_time as proctime(),
    WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
      'connector' = 'kafka'
      ,'topic' = 'user_log'
      ,'properties.bootstrap.servers' = 'dcmp10:9092,dcmp11:9092,dcmp12:9092'
      ,'properties.group.id' = 'user_log'
      ,'scan.startup.mode' = 'latest-offset'
      ,'format' = 'json'
      );

drop table if exists hbase_behavior_conf;
CREATE
TEMPORARY TABLE hbase_behavior_conf (
    user_id STRING,
    f      ROW(sex                      STRING,
        age                      INTEGER,
        degree                   STRING,
        address                  STRING,
        work_address             STRING,
        income_range             STRING,
        default_shipping_address STRING,
        register_date            TIMESTAMP(3),
        udpate_date              TIMESTAMP(3))
) WITH (
      'connector' = 'hbase-2.2'
      ,'zookeeper.quorum' = 'dcmp10:2181,dcmp11:2181,dcmp12:2181'
      ,'zookeeper.znode.parent' = '/hbase'
      ,'table-name' = 'user_info'
   ,'lookup.cache.max-rows' = '100000'
   ,'lookup.cache.ttl' = '10 minute' -- ttl time 超过这么长时间无数据才行
   ,'lookup.async' = 'true'
);

CREATE TABLE user_log_sink
(
    user_id                  STRING,
    item_id                  STRING,
    category_id              STRING,
    behavior                 STRING,
    page                     STRING,
    position               STRING,
    sort                     STRING,
    last_page                STRING,
    next_page                STRING,
    ts                       TIMESTAMP(3),
    sex                      STRING,
    age                      INTEGER,
    degree                   STRING,
    address                  STRING,
    work_address             STRING,
    income_range             STRING,
    default_shipping_address STRING,
    register_date            TIMESTAMP(3),
    udpate_date              TIMESTAMP(3)
--   ,primary key (user_id) not enforced
) WITH (
      'connector' = 'kafka'
      ,'topic' = 'user_log_sink'
      ,'properties.bootstrap.servers' = 'dcmp10:9092,dcmp11:9092,dcmp12:9092'
      ,'properties.group.id' = 'user_log'
      ,'scan.startup.mode' = 'group-offsets'
      ,'format' = 'json'
      );

INSERT INTO user_log_sink
SELECT a.user_id
     ,a.item_id
     ,a.category_id
     ,a.behavior
     ,a.page
     ,a.position
     ,a.sort
     ,a.last_page
     ,a.next_page
     ,a.ts
     ,b.sex
     ,b.age
     ,b.degree
     ,b.address
     ,b.work_address
     ,b.income_range
     ,b.default_shipping_address
     ,b.register_date
     ,b.udpate_date
FROM user_log a
         left join mysql_behavior_conf FOR SYSTEM_TIME AS OF a.process_time AS b
                   ON a.user_id = b.user_id
where a.behavior is not null;

mysql 数据百万(十万也差不多),加了索引和lookup 缓存,tps 稳定在 3600 左右 (GC 情况还比较好,就不贴图了)

flink lookup join tps 测试 hbase mysql starrocks
  • 一级并行达到MySQL的性能瓶颈,多个并行场景不再测试。
    [En]

    * one degree of parallelism reaches the performance bottleneck of mysql, and multiple parallelism scenarios are no longer tested.

starrocks

没想到,星石维修台是一个只有5个战斗力的人渣,初测TPS只有200多个,这与官网的介绍是一致的。毕竟,使用不同的场景并没有什么不同。斯基普。

[En]

Unexpectedly, the starrocks maintenance table is a scum with only 5 combat strength, and the preliminary test tps is only more than 200, which is in line with the introduction of the official website. after all, it is not different to use different scenarios. Skip.

redis

没有可以跳过的环境。

[En]

There is no environment to skip.

完整代码参考: github sqlSubmit

欢迎来到Flink菜鸟公众号。与Flink(开发技术)相关的推文将不定期更新。

[En]

Welcome to Flink Cainiao official account. Tweets related to Flink (development technology) will be updated irregularly.

Original: https://www.cnblogs.com/Springmoon-venn/p/16502855.html
Author: Flink菜鸟
Title: flink lookup join tps 测试 hbase mysql starrocks

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/6943/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

免费咨询
免费咨询
扫码关注
扫码关注
联系站长

站长Johngo!

大数据和算法重度研究者!

持续产出大数据、算法、LeetCode干货,以及业界好资源!

2022012703491714

微信来撩,免费咨询:xiaozhu_tec

分享本页
返回顶部
最近整理资源【免费获取】:   👉 程序员最新必读书单  | 👏 互联网各方向面试题下载 | ✌️计算机核心资源汇总