sarama Kafka客户端生产者与消费者梳理

生产者

sarama 库提供了同步生产者和异步生产者。

SyncProducer 是在 AsyncProducer 基础上加以条件限制实现的。

type SyncProducer interface {

    // SendMessage produces a given message, and returns only when it either has
    // succeeded or failed to produce. It will return the partition and the offset
    // of the produced message, or an error if the message failed to produce.
    SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error)

    // SendMessages produces a given set of messages, and returns only when all
    // messages in the set have either succeeded or failed. Note that messages
    // can succeed and fail individually; if some succeed and some fail,
    // SendMessages will return an error.
    SendMessages(msgs []*ProducerMessage) error

    // Close shuts down the producer; you must call this function before a producer
    // object passes out of scope, as it may otherwise leak memory.
    // You must call this before calling Close on the underlying client.
    Close() error
}

func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error) {
    if config == nil {
        config = NewConfig()
        config.Producer.Return.Successes = true
    }

    if err := verifyProducerConfig(config); err != nil {
        return nil, err
    }

    p, err := NewAsyncProducer(addrs, config)
    if err != nil {
        return nil, err
    }
    return newSyncProducerFromAsyncProducer(p.(*asyncProducer)), nil
}

func newSyncProducerFromAsyncProducer(p *asyncProducer) *syncProducer {
    sp := &syncProducer{producer: p}

    sp.wg.Add(2)
    go withRecover(sp.handleSuccesses)
    go withRecover(sp.handleErrors)

    return sp
}

可以看到 newSyncProducerFromAsyncProducer 方法中开启两个 goroutine 处理消息成功与失败的”回调”。这里使用 waitGroup 进行等待,从而将异步操作转变为同步操作。

SyncProducer 提供了批量发送 API, 同一批次中的消息失败与否是独立的,当出现部分失败时,SendMessages 会返回类型为 []*ProducerError 的error。

type ProducerError struct {
    Msg *ProducerMessage
    Err error
}
func (sp *syncProducer) SendMessages(msgs []*ProducerMessage) error {
    expectations := make(chan chan *ProducerError, len(msgs))
    go func() {
        for _, msg := range msgs {
            expectation := make(chan *ProducerError, 1)
            msg.expectation = expectation
            sp.producer.Input() <- msg expectations <- expectation } close(expectations) }() var errors producererrors for :="range" { if perr !="nil" perr) len(errors)> 0 {
        return errors
    }
    return nil
}

</->

所以每个我们可以清楚的知道是哪一个消息发生错误以及得到错误的具体信息。

type AsyncProducer interface {

    // AsyncClose triggers a shutdown of the producer. The shutdown has completed
    // when both the Errors and Successes channels have been closed. When calling
    // AsyncClose, you *must* continue to read from those channels in order to
    // drain the results of any messages in flight.
    AsyncClose()

    // Close shuts down the producer and waits for any buffered messages to be
    // flushed. You must call this function before a producer object passes out of
    // scope, as it may otherwise leak memory. You must call this before process
    // shutting down, or you may lose messages. You must call this before calling
    // Close on the underlying client.
    Close() error

    // Input is the input channel for the user to write messages to that they
    // wish to send.
    Input() chan<- *producermessage successes is the success output channel back to user when return.successes enabled. if true, you must read from this or producer will deadlock. it suggested that send and messages together in a single select statement. successes() <-chan errors error user. deadlock full. alternatively, can set producer.return.errors your config false, which prevents be returned. errors() *producererror } < code></->

AsyncProducer 提供5个操作行为,分别提供了异步关闭、同步关闭和操作消息的三个行为。我们往 Input() 里丢数据,在 Successe() 和 Errors()处理”回调” 结果。这种接口设计对用调用者十分友好。

不管是 SyncProducer 还是 AsyncProducer,我们都需要关注”回调”结果,及时的处理提交失败的消息是保障消息不丢失的前提。

消费者

type ConsumerGroup interface {
    Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error

    // Errors returns a read channel of errors that occurred during the consumer life-cycle.
    // By default, errors are logged and not returned over this channel.
    // If you want to implement any custom error handling, set your config's
    // Consumer.Return.Errors setting to true, and read from this channel.
    Errors() <-chan error close stops the consumergroup and detaches any running sessions. it is required to call this function before object passes out of scope, as will otherwise leak memory. close() pause suspends fetching from requested partitions. future calls broker not return records these partitions until they have been resumed using resume() resumeall(). note that method does affect partition subscription. in particular, cause a group rebalance when automatic assignment used. pause(partitions map[string][]int32) resume resumes specified which paused with pause() pauseall(). new if there are be fetched. resume(partitions all pauseall() resumeall() } < code></-chan>

从上述代码得知,一个消费者组实例提供了上述方法,当我们调用 Consume 时,我们需要消费的主题以及处理函数

type ConsumerGroupHandler interface {
    // Setup is run at the beginning of a new session, before ConsumeClaim.
    Setup(ConsumerGroupSession) error

    // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
    // but before the offsets are committed for the very last time.
    Cleanup(ConsumerGroupSession) error

    // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
    // Once the Messages() channel is closed, the Handler must finish its processing
    // loop and exit.
    ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error
}

这个 ConsumerGroupHandler 对象是我们传入的,也就是说我们要实现 ConsumerGroupHandler 这个接口中约定的行为。其中 ConsumeClaim 是我们的主体逻辑。ConsumeClaim 有两个入参,分别是

type ConsumerGroupSession interface {
    // Claims returns information about the claimed partitions by topic.
    Claims() map[string][]int32

    // MemberID returns the cluster member ID.
    MemberID() string

    // GenerationID returns the current generation ID.
    GenerationID() int32

    // MarkOffset marks the provided offset, alongside a metadata string
    // that represents the state of the partition consumer at that point in time. The
    // metadata string can be used by another consumer to restore that state, so it
    // can resume consumption.
    //
    // To follow upstream conventions, you are expected to mark the offset of the
    // next message to read, not the last message read. Thus, when calling MarkOffset
    // you should typically add one to the offset of the last consumed message.
    //
    // Note: calling MarkOffset does not necessarily commit the offset to the backend
    // store immediately for efficiency reasons, and it may never be committed if
    // your application crashes. This means that you may end up processing the same
    // message twice, and your processing should ideally be idempotent.
    MarkOffset(topic string, partition int32, offset int64, metadata string)

    // Commit the offset to the backend
    //
    // Note: calling Commit performs a blocking synchronous operation.
    Commit()

    // ResetOffset resets to the provided offset, alongside a metadata string that
    // represents the state of the partition consumer at that point in time. Reset
    // acts as a counterpart to MarkOffset, the difference being that it allows to
    // reset an offset to an earlier or smaller value, where MarkOffset only
    // allows incrementing the offset. cf MarkOffset for more details.
    ResetOffset(topic string, partition int32, offset int64, metadata string)

    // MarkMessage marks a message as consumed.
    MarkMessage(msg *ConsumerMessage, metadata string)

    // Context returns the session context.
    Context() context.Context
}

type ConsumerGroupClaim interface {
    // Topic returns the consumed topic name.
    Topic() string

    // Partition returns the consumed partition.
    Partition() int32

    // InitialOffset returns the initial offset that was used as a starting point for this claim.
    InitialOffset() int64

    // HighWaterMarkOffset returns the high water mark offset of the partition,
    // i.e. the offset that will be used for the next message that will be produced.
    // You can use this to determine how far behind the processing is.
    HighWaterMarkOffset() int64

    // Messages returns the read channel for the messages that are returned by
    // the broker. The messages channel will be closed when a new rebalance cycle
    // is due. You must finish processing and mark offsets within
    // Config.Consumer.Group.Session.Timeout before the topic/partition is eventually
    // re-assigned to another group member.
    Messages() <-chan *consumermessage } < code></-chan>

其中 ConsumerGroupSession 是消费者组连接信息,包括我们的消费位移管理,ConsumerGroupClaim 我们消费的消息数据管理

简单梳理一下消费者组主干流程:

client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config)

2.请求消费数据

 err := client.Consume(ctx, strings.Split(topics, ","), &consumer)

其中 consumer 为我们自定义的 Handler 实例

3.Consume 方法里初始化连接信息

sess, err := c.newSession(ctx, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max)

  1. newSession 方法中处理从 kafka 集群的协调者获取的各种信息、各种逻辑,最终返回 newConsumerGroupSession
return newConsumerGroupSession(ctx, c, claims, join.MemberId, join.GenerationId, handler)

5.newConsumerGroupSession 里会初始化位移管理实例、处理心跳、管理分区等,

offsets, err := newOffsetManagerFromClient(parent.groupID, memberID, generationID, parent.client)
&#x540C;&#x65F6;&#x8C03;&#x7528; sess.consume(topic, partition)

6.consume 方法里初始化 PartitionConsumer

claim, err := newConsumerGroupClaim(s, topic, partition, offset)

PartitionConsumer 的声明如下

type PartitionConsumer interface {
    AsyncClose()
    Close() error
    Messages() <-chan *consumermessage errors() <-chan *consumererror highwatermarkoffset() int64 pause() resume() ispaused() bool } < code></-chan>

其实 PartitionConsumer 才是最终消费数据的地方。

7.最终 PartitionConsumer 接口类型的实例会传递给实现 ConsumerGroupClaim 的实体。然后这个实体的实例会作为我们自定义 Handler 的第二个参数,这时就回到了我们自身实现的逻辑上。

if err := s.handler.ConsumeClaim(s, claim); err != nil {
        s.parent.handleError(err, topic, partition)
    }

其中需要注意的是

ConsumerGroupSession 提供的 Commit 是一个阻塞方法,如果要使用非阻塞标记位移提交则可以选择MarkMessage。其实就是只做了一个标记,在需要的时候在调用 Commit。sarama 会在在新增 newOffsetManagerFromClient 判断段是否开启自动位移提交,当开启自动位移提交时会启动一个 goroutine 定期检测执行位移提交操作。

if conf.Consumer.Offsets.AutoCommit.Enable {
        om.ticker = time.NewTicker(conf.Consumer.Offsets.AutoCommit.Interval)
        go withRecover(om.mainLoop)
    }
func (om *offsetManager) mainLoop() {
    defer om.ticker.Stop()
    defer close(om.closed)

    for {
        select {
        case <-om.ticker.c: om.commit() case <-om.closing: return } func (om *offsetmanager) commit() { om.flushtobroker() om.releasepoms(false) < code></-om.ticker.c:>

一般来说消费者端消息丢失较为少见,消息的重复处理场景较多。这种情况下一般需要业务方做幂等性处理。

推荐阅读

Original: https://www.cnblogs.com/arvinhuang/p/16437948.html
Author: 平凡键客
Title: sarama Kafka客户端生产者与消费者梳理

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/621462/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • mysql安装及主从复制配置

    一、安装 mysql8.0 下载mysql 安装包http://mirrors.sohu.com/mysql/MySQL-8.0/ wget http://mirrors.sohu…

    数据库 2023年5月24日
    056
  • Dubbo源码(九)-服务调用过程

    前言 本文基于Dubbo2.6.x版本,中文注释版源码已上传github:xiaoguyu/dubbo 源码分析均基于官方Demo,路径:dubbo/dubbo-demo 如果没有…

    数据库 2023年6月11日
    0109
  • idea 导入项目MAVEN报错,jdk与jms问题,以及@override报错

    一: Plugins报错: compiler Failed to execute goal org.apache.maven.plugins:maven-compiler-plug…

    数据库 2023年6月11日
    097
  • Docker安装和卸载(centos)

    Docker安装和卸载 一,已安装Docker,卸载Docker 1.方法一 sudo yum remove docker \ docker-client \ docker-cli…

    数据库 2023年6月11日
    071
  • 【Java代码之美】 — 通过Value获取Map中的键值Key的四种方法

    1.简介 最近在项目中遇到一个EasyExcel中需要取invokeHeadMap中headMap里面的具体列名的集合Index,就遇到了需要从Map从反向通过Value取对应的K…

    数据库 2023年6月6日
    0111
  • windows安装mysql8.0.29(ZIP解压安装版本)

    一. 下载mysql 8.0.29软件包 二. 解压,初始化安装 1,打开下载后文件所在目录,使用解压软件解压,打开文件夹!(如图,文件路径不要出现中文!) 2,创建my.ini文…

    数据库 2023年5月24日
    076
  • html简单学习!

    博主学习html的随记 1.常用标签 1.基础标签 2.格式标签 3.表单 4.超文本标签 5.列表 6.表格 7.样式 8.特殊符号 9.内联框架(网页嵌套) 1.常用标签 1….

    数据库 2023年6月16日
    085
  • 手把手教你写一个SpringMVC框架

    一、介绍 在日常的 web 开发中,熟悉 java 的同学一定知道,Spring MVC 可以说是目前最流行的框架,之所以如此的流行,原因很简单: 编程简洁、上手简单! 我记得刚开…

    数据库 2023年6月14日
    080
  • 多态:向上转型和向下转型

    1)本质:父类的引用指向了子类的对象 2)语法:父类类型 引用名 = new 子类类型(); 3)特点:编译类型看左边,运行类型看右边。 可以调用父类中的所有成员(需遵守访问权限)…

    数据库 2023年6月11日
    082
  • 重写Feign编码器

    有个spring cloud 架构的项目需要调用php小组的api接口,但php提供的接口入参大部分是下划线命名,而Java这边的实体类是按照驼峰编写,如果使用Fegin调用会导致…

    数据库 2023年6月6日
    080
  • H5、C3基础知识笔记

    HTML5 本文内容参考于”HTML5|W3scool”教程 简介 是最新的 HTML 标准,拥有新的语义、图形以及多媒体元素 提供了新的 API 简化了 …

    数据库 2023年6月11日
    0100
  • buuctf 派大星的烦恼

    题目如下 首先找到伤疤并提取出来,发现一共有256个数据,根据题目中的提示答案为32位的字符串,再根据伤疤只有两种状态22和44,联想到每8个伤疤拼成8位二进制,22表示0,44表…

    数据库 2023年6月11日
    0142
  • Mysql 手册

    MySQL是一个关系型数据库管理系统,由瑞典MySQL AB 公司开发,属于 Oracle 旗下产品。MySQL 是最流行的关系型数据库管理系统之一,在 WEB 应用方面,MySQ…

    数据库 2023年5月24日
    0113
  • python-tkinter 自定义tkinter风格的提示框

    博客园的密码终于找回了 前言 偶尔使用python要绘制个简单输入提示框或者复选框窗体,使用tkinter的话绘制窗体也是很麻烦的,想着能不能把它自定义一个简单可复用的提示框。然后…

    数据库 2023年6月11日
    067
  • 接口和抽象类有什么区别

    1.抽象类和接口都不能直接实例化,如果要实例化,抽象类变量必须指向实现所有抽象方法的子类对象,接口变量必须指向实现所有接口方法的类对象 2.抽象类要被子类继承,接口要被类实现 3….

    数据库 2023年6月6日
    064
  • MySQL实战45讲 4,5

    04 | 深入浅出索引(上) 索引的出现其实就是为了提高数据查询的效率,就像书的目录一样 索引的常见模型 哈希表、有序数组和搜索树 哈希表 User2 和 User4 根据身份证号…

    数据库 2023年6月16日
    0104
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球