Datahub小结

往datahub发送数据时,建议使用Producer。好处是不用设置shardId,这样datahub在增加或减少shard时,业务代码都不需要变更。
另外datahub的shardId只会往前增,老的数据不用,只能停用。

<dependency>
    <groupId>com.aliyun.datahubgroupId>
    <artifactId>aliyun-sdk-datahubartifactId>
    <version>2.18.0-publicversion>
dependency>
<dependency>
      <groupId>com.aliyun.datahubgroupId>
      <artifactId>datahub-client-libraryartifactId>
      <version>1.1.12-publicversion>
dependency>
import com.aliyun.datahub.client.exception.AuthorizationFailureException;
import com.aliyun.datahub.client.exception.DatahubClientException;
import com.aliyun.datahub.client.exception.InvalidParameterException;
import com.aliyun.datahub.client.exception.MalformedRecordException;
import com.aliyun.datahub.client.exception.NoPermissionException;
import com.aliyun.datahub.client.exception.ShardNotFoundException;
import com.aliyun.datahub.client.model.Field;
import com.aliyun.datahub.client.model.FieldType;
import com.aliyun.datahub.client.model.RecordEntry;
import com.aliyun.datahub.client.model.RecordSchema;
import com.aliyun.datahub.client.model.TupleRecordData;
import com.aliyun.datahub.clientlibrary.config.ProducerConfig;
import com.aliyun.datahub.clientlibrary.producer.Producer;
import com.aliyun.datahub.exception.ResourceNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class DatahubWriter {
    private static final Logger LOG = LoggerFactory.getLogger(DatahubWriter.class);

    private static void sleep(long milliSeconds) {
        try {
            TimeUnit.MILLISECONDS.sleep(milliSeconds);
        } catch (InterruptedException e) {
            // TODO:自行处理异常
        }
    }

    private static List genRecords(RecordSchema schema) {
        List recordEntries = new ArrayList<>();
        for (int cnt = 0; cnt < 10; ++cnt) {
            RecordEntry entry = new RecordEntry();
            entry.addAttribute("key1", "value1");
            entry.addAttribute("key2", "value2");

            TupleRecordData data = new TupleRecordData(schema);
            data.setField("field1", "testValue");
            data.setField("field2", 1);

            entry.setRecordData(data);
            recordEntries.add(entry);
        }
        return recordEntries;
    }

    private static void sendRecords(Producer producer, List recordEntries) {
        int maxRetry = 3;
        while (true) {
            try {
                // 自动选择shard写入
                producer.send(recordEntries, maxRetry);

                // 指定写入shard "0"
                // producer.send(recordEntries, "0", maxRetry);
                LOG.error("send records: {}", recordEntries.size());
                break;
            } catch (MalformedRecordException e) {
                // record 格式非法,根据业务场景选择忽略或直接抛异常
                LOG.error("write fail", e);
                throw e;
            } catch (InvalidParameterException |
                    AuthorizationFailureException |
                    NoPermissionException e) {
                // 请求参数非法
                // 签名不正确
                // 没有权限
                LOG.error("write fail", e);
                throw e;
            } catch (ShardNotFoundException e) {
                // shard 不存在, 如果不是写入自己指定的shard,可以不用处理
                LOG.error("write fail", e);
                sleep(1000);
            } catch (ResourceNotFoundException e) {
                // project, topic 或 shard 不存在
                LOG.error("write fail", e);
                throw e;
            } catch (DatahubClientException e) {
                // 基类异常,包含网络问题等,可以选择重试
                LOG.error("write fail", e);
                sleep(1000);
            }
        }
    }

    public static void main(String[] args) {
        // Endpoint以Region: 华东1为例,其他Region请按实际情况填写
                String endpoint = "http://dh-cn-hangzhou.aliyuncs.com";
                String accessId = "";
                String accessKey = "";
                String projectName = "";
                String topicName = "";

        RecordSchema schema = new RecordSchema();
        schema.addField(new Field("field1", FieldType.STRING));
        schema.addField(new Field("field2", FieldType.BIGINT));

        ProducerConfig config = new ProducerConfig(endpoint, accessId, accessKey);
        Producer producer = new Producer(projectName, topicName, config);

        // 根据场景控制循环
        boolean stop = false;
        try {
            while (!stop) {
                List recordEntries = genRecords(schema);
                sendRecords(producer, recordEntries);
            }
        } finally {
            // 确保资源正确释放
            producer.close();
        }
    }
}

上面示例中的RecordSchema也可以通过datahubclient动态获取:
RecordSchema recordSchema = datahubClient.getTopic(projectName, topicName).getRecordSchema();

初始化datahubClient的办法:

// https://help.aliyun.com/document_detail/158841.html// Endpoint以Region: 华东1为例,其他Region请按实际情况填写
String endpoint = "http://dh-cn-hangzhou.aliyuncs.com";
String accessId = "";
String accessKey = "";
// 创建DataHubClient实例
DatahubClient datahubClient = DatahubClientBuilder.newBuilder()
        .setDatahubConfig(
                new DatahubConfig(endpoint,
                        // 是否开启二进制传输,服务端2.12版本开始支持
                        new AliyunAccount(accessId, accessKey), true))
                        //专有云使用出错尝试将参数设置为           false
        // HttpConfig可不设置,不设置时采用默认值
        .setHttpConfig(new HttpConfig()
                .setCompressType(HttpConfig.CompressType.LZ4) // 读写数据推荐打开网络传输 LZ4压缩
                .setConnTimeout(10000))
        .build();

可能会出现的报错:

[{"errorcode":"LimitExceeded","index":1,"message":"The limit of throughput rate is exceeded."},{"errorcode":"LimitExceeded","index":2,"message":"The limit of throughput rate is exceeded."}],"requestId":"202203101111111111111"}

报错的原因,超过了相关指标。

指标描述及查看:

Web Console目前提供Metric功能,用户可以通过Metric界面查看准实时的Topic级别流量等信息,目前提供的指标有:

QPS:读写Request/Second
RPS: 读写Record/Second
Throughput:读写Throughput/Second (单位KB)
Latency:读写请求Latency/Request (单位微秒)
https://help.aliyun.com/document_detail/158786.html

相关限制描述【超过会报错】:

Datahub小结

https://help.aliyun.com/document_detail/47441.html

以下三个指标都是基于数据包大小的,只是不同的维度:
单个String长度:是针对单个filed的
Http BodySize:这个限制是针对单个写入请求
Throughput限制 :是某个时间点,所有请求加起来的大小 。如果超限,则报错:

The limit of throughput rate is exceeded.

关于RPS批量提交场景的统计规则:

List recordEntries
producer.send(recordEntries, maxRetry);

调批量接口时,是不是List中的多少条,就有多少条Record。
从api调用看只有1次,但RPS可能很大,譬如List有1万条,那么RPS就是1万

sdk中批量api将待推送的数据作为一个整体发给datahub,然后DataHub收到后一条一条处理

Original: https://www.cnblogs.com/softidea/p/15988645.html
Author: 沧海一滴
Title: Datahub小结

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/545846/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球