Kafka从入门到放弃(三)—— 详说消费者

之前介绍了Kafka以及生产者,包括它的一些特性和参数,这回写一下消费者。

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:4498b50e-ca35-4f53-9ff8-8c3197ab4940

[En]

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:35516686-8022-497c-85e7-0c78260df119

Kafka从入门到放弃(一) —— 初识Kafka

Kafka从入门到放弃(二) —— 详说生产者

消费者与消费者组

在Kafka中消费者是消费消息的对象。假设目前有一个消费者正在消费消息,但生产数据的速度突然上升,这时候消费者会有点力不从心,跟不上消息生产的速度,这时候咋办呢?

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:a6226355-59f3-4326-85ce-c460320950d4

[En]

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:47a800d3-dae5-4c31-8406-6525d16d4ba5

在Kafka中的消费者组就是如此,一个消费者组内的消费者订阅同一个Topic的数据,但消费不同分区的数据,提高了消费能力。

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:a491b898-24a5-4a20-9b6d-2ba814011e11

[En]

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:3d6e8d75-756b-407c-a359-567a1e201b7f

Kafka从入门到放弃(三)—— 详说消费者

Kafka从入门到放弃(三)—— 详说消费者

LEO & HW

Kafka中的分区是可以有多个副本的,我们把每个副本中待写入的那个offset称为LEO(Log End Offset),把最少消息的那个副本的LEO称为HW(High Watermark)

Kafka从入门到放弃(三)—— 详说消费者

对于消费者而言,消费者所能消费的区间就是小于HW那部分,即图中 0-3 部分。这样消费者不管是哪个副本,订阅到的消息都是一致的,即使换了leader也能接着消费。

提交偏移量

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:07c1acc9-6ec2-461a-8357-337bd2a28ea8

[En]

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:7efda5cb-3898-45c6-923c-99d01a99c784

在Kafka中,有一个名为_consumer_offset的主题,消费者会往里面发送消息,提交偏移量,这个时候消费者也是生产者。

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:454df25b-e42d-43d5-9e7d-c6454139e3b1

[En]

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:19c66733-77d8-4250-a3a5-99e822416e23

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:db9d7cf7-821f-4f8b-be8e-0956d109cc6e

[En]

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:4d2c234c-a69b-4f82-9ec3-d31158a47296

如果提交的偏移量小于处理的最后一个消息的偏移量,会造成重复消费。比如消费者提交了 6 的offset,此时又拉取了2条数据,还没等提交,消费者就挂掉了,然后就发生了再均衡。新的消费者获取到 6 的偏移量,接着处理,这就造成了重复消费。

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:647bae36-9cd8-462c-93b2-ed5620fa804e

[En]

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:2cd1b50c-3701-4763-84e1-d8fddba29b69

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:0993783c-7314-4811-83b4-33da7a5e36d3

[En]

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:6883223f-2693-4ef6-a8a3-64f1c3da9b04

在Kafka中,有几种提交偏移量的方式。

自动提交

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:f41a7833-3ff2-4803-915b-5f31b5ccf749

[En]

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:40cf30ea-e922-44d7-9c0e-1fbb6b396915

enable.auto.commit=true(是否开启自动提交,true or false)

auto.commit.interval.ms=5000(提交偏移量的时间间隔,默认5000ms)

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:cacb659f-e138-4417-bb56-ec7760cd05e9

[En]

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:ba2c00c9-34bb-49c5-a3ce-372ef5ff8fc7

通过CommitSync()方法手动提交当前偏移量

在处理完所有消息后提交,前提要把enable.auto.commit设置为false。

while (true) {
    ConsumerRecords records = consumer.poll(100);
    for(ConsumerRecords record: records){
        System.out.println("topic=%s, offset=%s,partition=%s",
                          record.topic(), record.offset(),record.partition());
    }
    try{
        consumer.commitSync();
    } catch(Exception e){
        log.error(e);
    }
}

消费者通过poll方法轮询获取消息,poll里的参数是一个超时时间,用于控制阻塞的时间,如果没有数据则会阻塞这么久,如果设置为0则会立即放回。

使用这种方法一定要在处理完所有记录后调用CommitSync()方法,避免数据丢失。如果发生错误,会进行重试。

异步提交

CommitSync() 提交偏移量的方式会造成阻塞,即需要等客户端处理完所有消息后才提交偏移量,限制了吞吐量。因此可以使用异步提交的方式,通过调用commitAsync()方法实现。

while (true) {
    ConsumerRecords<string, string> records = consumer.poll(100);
    for(ConsumerRecords<string, string> record: records){
        System.out.println("topic=%s, offset=%s,partition=%s",
                          record.topic(), record.offset(),record.partition());
    }
    consumer.commitAsync();
}
</string,></string,>

提交偏移量后就可以去做其他事了。CommitSync()方式发生错误会重试,但CommitAsync()不会。

之所以不重试,是因为有可能在收到broker响应前有其它偏移量提交了。

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:b1d7a657-8692-40d2-83aa-925d496cd79d

[En]

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:f5c81c6f-efe8-4f19-a0aa-defacde7ca66

之所以说这个问题,是因为异步提交支持在broker响应时回调,常被用于记录错误或生成度量指标。如果用他重试的话一定要注意提交的顺序。

while (true) {
    ConsumerRecords records = consumer.poll(100);
    for(ConsumerRecords record: records){
        System.out.println("topic=%s, offset=%s,partition=%s",
                          record.topic(), record.offset(),record.partition());
    }
    consumer.commitAsync(new OffsetCommitCallback() {
        public void onComplete(Map offsets,             Exception e){
            if(e != null){
                log.error("Error");
            }
        }
    });
}

异步与同步组合提交

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:7ef29dcb-256c-48e1-91ee-5a7075ccc3aa

[En]

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:22047475-5b70-4c2b-896e-7784e86e0cac

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:15c89fbb-4456-4b82-92a6-d4616d3d6779

[En]

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:1a18f506-cc49-41c3-a4e7-d702adc47722

try{
    while (true) {
        ConsumerRecords records = consumer.poll(100);
        for(ConsumerRecords record: records){
            System.out.println("topic=%s, offset=%s,partition=%s",
            record.topic(),record.offset(),record.partition());
        }
        consumer.commitAsync();
    }
}catch(Exception e){
    log.error(e);
}finally {
    try {
        consumer.commitSync();
    }
    finally{
        consumer.close();
    }
}

提交特定偏移量

commitSync() 和 commitAsync() 方法一般是在处理完一个批次后提交偏移量。如果需要更频繁的提交偏移量,需要在处理的过程中间提交的话,消费者 API 允许在调用 commitSync()和 commitAsync () 方法时传进去希望提交的分区和偏移量的 map

Map currentOffsets = new HashMap();
int count = 0;
try {
    while(true){
        ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
        if (records.isEmpty()){
           continue;
        }
        for (ConsumerRecord record : records){
            System.out.println("topic=%s, offset=%s,partition=%s",
                record.topic(),record.offset(),record.partition());
            currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset(), "no metadata"));
            // 每处理完1000条消息后就提交偏移量
            if (count%1000==0) {
                consumer.commitAsync(currentOffsets, null);
            }
            count++;
        }
    }
} finally {
    try{
        consumer.commitSync();
    } finally{
        consumer.close();
    }
}

消费者分区分配策略

分区会被分配给消费者组里的消费者进行消费,在Kafka种可以通过配置参数 partition.assignment.strategy选择分区分配策略。

Kafka从入门到放弃(三)—— 详说消费者
* RoundRobin 轮询分区 假设有10个分区,3个消费者,第一个分区给第一个消费者,第二个给第二个消费者,第三个分区给第三个消费者,第四个给第一个消费者… 以此类推

Kafka从入门到放弃(三)—— 详说消费者

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:2e8970bb-5c12-4c58-8fe0-99d52b1d9a85

[En]

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:7b906b44-0d76-4f20-b30f-0fb89fc6fec8

转载请注明出处:工众号” 大数据的奇妙冒险

Original: https://www.cnblogs.com/lyuzt/p/15713863.html
Author: 大数据的奇妙冒险
Title: Kafka从入门到放弃(三)—— 详说消费者

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/563008/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球