sarama Kafka客户端生产者与消费者梳理

生产者

sarama 库提供了同步生产者和异步生产者。

SyncProducer 是在 AsyncProducer 基础上加以条件限制实现的。

type SyncProducer interface {

    // SendMessage produces a given message, and returns only when it either has
    // succeeded or failed to produce. It will return the partition and the offset
    // of the produced message, or an error if the message failed to produce.
    SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error)

    // SendMessages produces a given set of messages, and returns only when all
    // messages in the set have either succeeded or failed. Note that messages
    // can succeed and fail individually; if some succeed and some fail,
    // SendMessages will return an error.
    SendMessages(msgs []*ProducerMessage) error

    // Close shuts down the producer; you must call this function before a producer
    // object passes out of scope, as it may otherwise leak memory.
    // You must call this before calling Close on the underlying client.
    Close() error
}

func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error) {
    if config == nil {
        config = NewConfig()
        config.Producer.Return.Successes = true
    }

    if err := verifyProducerConfig(config); err != nil {
        return nil, err
    }

    p, err := NewAsyncProducer(addrs, config)
    if err != nil {
        return nil, err
    }
    return newSyncProducerFromAsyncProducer(p.(*asyncProducer)), nil
}

func newSyncProducerFromAsyncProducer(p *asyncProducer) *syncProducer {
    sp := &syncProducer{producer: p}

    sp.wg.Add(2)
    go withRecover(sp.handleSuccesses)
    go withRecover(sp.handleErrors)

    return sp
}

可以看到 newSyncProducerFromAsyncProducer 方法中开启两个 goroutine 处理消息成功与失败的”回调”。这里使用 waitGroup 进行等待,从而将异步操作转变为同步操作。

SyncProducer 提供了批量发送 API, 同一批次中的消息失败与否是独立的,当出现部分失败时,SendMessages 会返回类型为 []*ProducerError 的error。

type ProducerError struct {
    Msg *ProducerMessage
    Err error
}
func (sp *syncProducer) SendMessages(msgs []*ProducerMessage) error {
    expectations := make(chan chan *ProducerError, len(msgs))
    go func() {
        for _, msg := range msgs {
            expectation := make(chan *ProducerError, 1)
            msg.expectation = expectation
            sp.producer.Input() <- msg expectations <- expectation } close(expectations) }() var errors producererrors for :="range" { if perr !="nil" perr) len(errors)> 0 {
        return errors
    }
    return nil
}

</->

所以每个我们可以清楚的知道是哪一个消息发生错误以及得到错误的具体信息。

type AsyncProducer interface {

    // AsyncClose triggers a shutdown of the producer. The shutdown has completed
    // when both the Errors and Successes channels have been closed. When calling
    // AsyncClose, you *must* continue to read from those channels in order to
    // drain the results of any messages in flight.
    AsyncClose()

    // Close shuts down the producer and waits for any buffered messages to be
    // flushed. You must call this function before a producer object passes out of
    // scope, as it may otherwise leak memory. You must call this before process
    // shutting down, or you may lose messages. You must call this before calling
    // Close on the underlying client.
    Close() error

    // Input is the input channel for the user to write messages to that they
    // wish to send.
    Input() chan<- *producermessage successes is the success output channel back to user when return.successes enabled. if true, you must read from this or producer will deadlock. it suggested that send and messages together in a single select statement. successes() <-chan errors error user. deadlock full. alternatively, can set producer.return.errors your config false, which prevents be returned. errors() *producererror } < code></->

AsyncProducer 提供5个操作行为,分别提供了异步关闭、同步关闭和操作消息的三个行为。我们往 Input() 里丢数据,在 Successe() 和 Errors()处理”回调” 结果。这种接口设计对用调用者十分友好。

不管是 SyncProducer 还是 AsyncProducer,我们都需要关注”回调”结果,及时的处理提交失败的消息是保障消息不丢失的前提。

消费者

type ConsumerGroup interface {
    Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error

    // Errors returns a read channel of errors that occurred during the consumer life-cycle.
    // By default, errors are logged and not returned over this channel.
    // If you want to implement any custom error handling, set your config's
    // Consumer.Return.Errors setting to true, and read from this channel.
    Errors() <-chan error close stops the consumergroup and detaches any running sessions. it is required to call this function before object passes out of scope, as will otherwise leak memory. close() pause suspends fetching from requested partitions. future calls broker not return records these partitions until they have been resumed using resume() resumeall(). note that method does affect partition subscription. in particular, cause a group rebalance when automatic assignment used. pause(partitions map[string][]int32) resume resumes specified which paused with pause() pauseall(). new if there are be fetched. resume(partitions all pauseall() resumeall() } < code></-chan>

从上述代码得知,一个消费者组实例提供了上述方法,当我们调用 Consume 时,我们需要消费的主题以及处理函数

type ConsumerGroupHandler interface {
    // Setup is run at the beginning of a new session, before ConsumeClaim.
    Setup(ConsumerGroupSession) error

    // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
    // but before the offsets are committed for the very last time.
    Cleanup(ConsumerGroupSession) error

    // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
    // Once the Messages() channel is closed, the Handler must finish its processing
    // loop and exit.
    ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error
}

这个 ConsumerGroupHandler 对象是我们传入的,也就是说我们要实现 ConsumerGroupHandler 这个接口中约定的行为。其中 ConsumeClaim 是我们的主体逻辑。ConsumeClaim 有两个入参,分别是

type ConsumerGroupSession interface {
    // Claims returns information about the claimed partitions by topic.
    Claims() map[string][]int32

    // MemberID returns the cluster member ID.
    MemberID() string

    // GenerationID returns the current generation ID.
    GenerationID() int32

    // MarkOffset marks the provided offset, alongside a metadata string
    // that represents the state of the partition consumer at that point in time. The
    // metadata string can be used by another consumer to restore that state, so it
    // can resume consumption.
    //
    // To follow upstream conventions, you are expected to mark the offset of the
    // next message to read, not the last message read. Thus, when calling MarkOffset
    // you should typically add one to the offset of the last consumed message.
    //
    // Note: calling MarkOffset does not necessarily commit the offset to the backend
    // store immediately for efficiency reasons, and it may never be committed if
    // your application crashes. This means that you may end up processing the same
    // message twice, and your processing should ideally be idempotent.
    MarkOffset(topic string, partition int32, offset int64, metadata string)

    // Commit the offset to the backend
    //
    // Note: calling Commit performs a blocking synchronous operation.
    Commit()

    // ResetOffset resets to the provided offset, alongside a metadata string that
    // represents the state of the partition consumer at that point in time. Reset
    // acts as a counterpart to MarkOffset, the difference being that it allows to
    // reset an offset to an earlier or smaller value, where MarkOffset only
    // allows incrementing the offset. cf MarkOffset for more details.
    ResetOffset(topic string, partition int32, offset int64, metadata string)

    // MarkMessage marks a message as consumed.
    MarkMessage(msg *ConsumerMessage, metadata string)

    // Context returns the session context.
    Context() context.Context
}

type ConsumerGroupClaim interface {
    // Topic returns the consumed topic name.
    Topic() string

    // Partition returns the consumed partition.
    Partition() int32

    // InitialOffset returns the initial offset that was used as a starting point for this claim.
    InitialOffset() int64

    // HighWaterMarkOffset returns the high water mark offset of the partition,
    // i.e. the offset that will be used for the next message that will be produced.
    // You can use this to determine how far behind the processing is.
    HighWaterMarkOffset() int64

    // Messages returns the read channel for the messages that are returned by
    // the broker. The messages channel will be closed when a new rebalance cycle
    // is due. You must finish processing and mark offsets within
    // Config.Consumer.Group.Session.Timeout before the topic/partition is eventually
    // re-assigned to another group member.
    Messages() <-chan *consumermessage } < code></-chan>

其中 ConsumerGroupSession 是消费者组连接信息,包括我们的消费位移管理,ConsumerGroupClaim 我们消费的消息数据管理

简单梳理一下消费者组主干流程:

client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config)

2.请求消费数据

 err := client.Consume(ctx, strings.Split(topics, ","), &consumer)

其中 consumer 为我们自定义的 Handler 实例

3.Consume 方法里初始化连接信息

sess, err := c.newSession(ctx, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max)

  1. newSession 方法中处理从 kafka 集群的协调者获取的各种信息、各种逻辑,最终返回 newConsumerGroupSession
return newConsumerGroupSession(ctx, c, claims, join.MemberId, join.GenerationId, handler)

5.newConsumerGroupSession 里会初始化位移管理实例、处理心跳、管理分区等,

offsets, err := newOffsetManagerFromClient(parent.groupID, memberID, generationID, parent.client)
&#x540C;&#x65F6;&#x8C03;&#x7528; sess.consume(topic, partition)

6.consume 方法里初始化 PartitionConsumer

claim, err := newConsumerGroupClaim(s, topic, partition, offset)

PartitionConsumer 的声明如下

type PartitionConsumer interface {
    AsyncClose()
    Close() error
    Messages() <-chan *consumermessage errors() <-chan *consumererror highwatermarkoffset() int64 pause() resume() ispaused() bool } < code></-chan>

其实 PartitionConsumer 才是最终消费数据的地方。

7.最终 PartitionConsumer 接口类型的实例会传递给实现 ConsumerGroupClaim 的实体。然后这个实体的实例会作为我们自定义 Handler 的第二个参数,这时就回到了我们自身实现的逻辑上。

if err := s.handler.ConsumeClaim(s, claim); err != nil {
        s.parent.handleError(err, topic, partition)
    }

其中需要注意的是

ConsumerGroupSession 提供的 Commit 是一个阻塞方法,如果要使用非阻塞标记位移提交则可以选择MarkMessage。其实就是只做了一个标记,在需要的时候在调用 Commit。sarama 会在在新增 newOffsetManagerFromClient 判断段是否开启自动位移提交,当开启自动位移提交时会启动一个 goroutine 定期检测执行位移提交操作。

if conf.Consumer.Offsets.AutoCommit.Enable {
        om.ticker = time.NewTicker(conf.Consumer.Offsets.AutoCommit.Interval)
        go withRecover(om.mainLoop)
    }
func (om *offsetManager) mainLoop() {
    defer om.ticker.Stop()
    defer close(om.closed)

    for {
        select {
        case <-om.ticker.c: om.commit() case <-om.closing: return } func (om *offsetmanager) commit() { om.flushtobroker() om.releasepoms(false) < code></-om.ticker.c:>

一般来说消费者端消息丢失较为少见,消息的重复处理场景较多。这种情况下一般需要业务方做幂等性处理。

推荐阅读

Original: https://www.cnblogs.com/arvinhuang/p/16437948.html
Author: 平凡键客
Title: sarama Kafka客户端生产者与消费者梳理

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/684863/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • P3224 [HNOI2012]永无乡 题解

    题意概括 有若干集合,每个集合最初包含一个值,和一个编号1~n。两个操作:合并两个集合,查询包含值x的集合中第k大值最初的集合编号。 维护集合之间关系显然用并查集,但怎么处理询问,…

    技术杂谈 2023年6月21日
    0125
  • Vue Element-ui表单校验规则,你掌握了哪些?

    1、前言 Element-ui表单校验规则,使得错误提示可以直接在form-item下面显示,无需弹出框,因此还是很好用的。 我在做了登录页面的表单校验后,一度以为我已经很了解表单…

    技术杂谈 2023年6月21日
    082
  • RuntimeError: No response returned.报错分析与解决方案

    这是在做开源项目的时候遇到的问题,程序部署上线后不定时会突然出现这样一条报错,终于被搞烦了决定彻底查清原因。 这是我正在使用的版本 fastapi==0.78.0 uvicorn=…

    技术杂谈 2023年6月21日
    0115
  • 从决策辅助者到决策制定者 — CHRO新角色

    CEO都知道:公司的成功仰赖人力资源。创造价值的不是企业,而是人才。但若对绝大多数公司抽丝剥茧仔细分析,你会发现,CEO通常与首席人力资源官(CHRO)和人力资源部门(HR)关系疏…

    技术杂谈 2023年5月31日
    078
  • 记:大学同学的婚礼邀约

    晚上在家上网课的时候,大约9点左右的样子,手机突然响了,看了下是宜昌的号码,心里揣测,估计又是骚扰电话,本来想直接挂掉的。但是还是接了,电话那头,瞬间传来一种油腻的打招呼声音。大学…

    技术杂谈 2023年7月11日
    070
  • Qt学习笔记

    联系方式 QQ: 2653728884 ,加Q请注明添加原因! Original: https://www.cnblogs.com/arminker/p/5121596.htmlA…

    技术杂谈 2023年7月11日
    072
  • 离线安装 Dapr

    Dapr 官方从 1.7 版本开始提供了离线安装Dapr 的支持。 Dapr CLI 工具和 自宿主模式安装可以参考以下几个链接: Dapr 离线安装 & 在线执行 dap…

    技术杂谈 2023年5月31日
    078
  • MybatisPlus——全网配置最全的代码生成器

    MybatisPlus代码生成器 这里讲解的是新版 (mybatis-plus 3.5.1+版本),旧版不兼容 官方文档:https://baomidou.com/(建议多看看官方…

    技术杂谈 2023年7月11日
    072
  • Rust:axum学习笔记(3) extract 

    预备知识:json序列化/反序列化 鉴于现在web开发中,json格式被广泛使用,先熟悉下rust中如何进行json序列化/反序列化。 先加入serde_json依赖项,然后就可以…

    技术杂谈 2023年5月31日
    090
  • canvas 常用 api 及 设计

    canvas元素 可被用来通过脚本(通常是JavaScript)绘制图形。比如,它可以被用来绘制图形,制作图片集合,甚至用来实现动画效果。你可以(也应该)在元素标签内写入可提供替代…

    技术杂谈 2023年5月30日
    0113
  • Spring基于注解的AOP的切面优先级

    每一个切面都有一个默认的优先级(默认值为Integer的最大值) @Order(1)通过这个注解设置一个正整数数值,数值越小,优先级越高 @Component @Aspect//&…

    技术杂谈 2023年7月11日
    097
  • 计算机网络-王道考研 002 物理层

    物理层 物理层 物理层基本概念 数据通信基础知识 典型的数据通信模型 数据通信相关术语 三种通信方式 两种数据传输方式 小章总结 -数据通信基础知识 码元、波特、速率、带宽 码元 …

    技术杂谈 2023年7月11日
    079
  • HDU 4831 Scenic Popularity

    Time Limit: 2000/1000 MS (Java/Others) Memory Limit: 32768/32768 K (Java/Others)Total Subm…

    技术杂谈 2023年5月30日
    091
  • 笔记:Java集合框架(一)

    Java集合框架(一) Collection接口 继承结构 Iterator接口 Iterator接口定义了迭代器的基本方法: java;gutter:true; hasNext(…

    技术杂谈 2023年7月25日
    060
  • use IFS in bash

    function dfd() { #http://www.cnblogs.com/hunterfu/archive/2010/02/23/1672129.html IFS=$’\n…

    技术杂谈 2023年5月31日
    094
  • 树莓派用python打开摄像头测试的两种方式附例程

    简介 树莓派用python打开摄像头常用两种方法,一种是使用picamare库,另一种是使用OpenCV的库,本文分别介绍两种方式 方式一:picamare 1.代码如下: fro…

    技术杂谈 2023年7月23日
    081
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球