5.2.SpringBoot整合Kafka(开整)

1、准备工作

pom配置:

    <dependency>
        <groupid>org.springframework.kafka</groupid>
        <artifactid>spring-kafka</artifactid>
        <version>2.8.8</version>
    </dependency>

application-dev.yml配置 (配置在spring下):

 kafka:
    bootstrap-servers:
      - localhost:9092
    template:
      default-topic: demo.topic
    producer:
      retries: 3 # &#x91CD;&#x8BD5;&#x6B21;&#x6570;&#xFF0C;&#x8BBE;&#x7F6E;&#x5927;&#x4E8E;0&#x7684;&#x503C;&#xFF0C;&#x5219;&#x5BA2;&#x6237;&#x7AEF;&#x4F1A;&#x5C06;&#x53D1;&#x9001;&#x5931;&#x8D25;&#x7684;&#x8BB0;&#x5F55;&#x91CD;&#x65B0;&#x53D1;&#x9001;
      batch-size: 16384 #&#x6279;&#x91CF;&#x5904;&#x7406;&#x5927;&#x5C0F;&#xFF0C;16K
      buffer-memory: 33554432 #&#x7F13;&#x51B2;&#x5B58;&#x50A8;&#x5927;&#xFF0C;32M
      acks: 1
      # &#x6307;&#x5B9A;&#x6D88;&#x606F;key&#x548C;&#x6D88;&#x606F;&#x4F53;&#x7684;&#x7F16;&#x89E3;&#x7801;&#x65B9;&#x5F0F;
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      # &#x662F;&#x5426;&#x81EA;&#x52A8;&#x63D0;&#x4EA4;(&#x624B;&#x52A8;&#x63D0;&#x4EA4;&#x8981;&#x5173;&#x95ED;&#xFF0C;&#x4E0D;&#x7136;&#x4F1A;&#x62A5;&#x9519;)
      group-id: spring_customer
      enable-auto-commit: false
      # &#x6D88;&#x8D39;&#x504F;&#x79FB;&#x914D;&#x7F6E;
      # none&#xFF1A;&#x5982;&#x679C;&#x6CA1;&#x6709;&#x4E3A;&#x6D88;&#x8D39;&#x8005;&#x627E;&#x5230;&#x5148;&#x524D;&#x7684;offset&#x7684;&#x503C;,&#x5373;&#x6CA1;&#x6709;&#x81EA;&#x52A8;&#x7EF4;&#x62A4;&#x504F;&#x79FB;&#x91CF;,&#x4E5F;&#x6CA1;&#x6709;&#x624B;&#x52A8;&#x7EF4;&#x62A4;&#x504F;&#x79FB;&#x91CF;,&#x5219;&#x629B;&#x51FA;&#x5F02;&#x5E38;
      # earliest&#xFF1A;&#x5728;&#x5404;&#x5206;&#x533A;&#x4E0B;&#x6709;&#x63D0;&#x4EA4;&#x7684;offset&#x65F6;&#xFF1A;&#x4ECE;offset&#x5904;&#x5F00;&#x59CB;&#x6D88;&#x8D39;&#xFF1B;&#x5728;&#x5404;&#x5206;&#x533A;&#x4E0B;&#x65E0;&#x63D0;&#x4EA4;&#x7684;offset&#x65F6;&#xFF1A;&#x4ECE;&#x5934;&#x5F00;&#x59CB;&#x6D88;&#x8D39;
      # latest&#xFF1A;&#x5728;&#x5404;&#x5206;&#x533A;&#x4E0B;&#x6709;&#x63D0;&#x4EA4;&#x7684;offset&#x65F6;&#xFF1A;&#x4ECE;offset&#x5904;&#x5F00;&#x59CB;&#x6D88;&#x8D39;&#xFF1B;&#x5728;&#x5404;&#x5206;&#x533A;&#x4E0B;&#x65E0;&#x63D0;&#x4EA4;&#x7684;offset&#x65F6;&#xFF1A;&#x4ECE;&#x6700;&#x65B0;&#x7684;&#x6570;&#x636E;&#x5F00;&#x59CB;&#x6D88;&#x8D39;
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    # &#x76D1;&#x542C;
    listener:
      # record&#xFF1A;&#x5F53;&#x6BCF;&#x4E00;&#x6761;&#x8BB0;&#x5F55;&#x88AB;&#x6D88;&#x8D39;&#x8005;&#x76D1;&#x542C;&#x5668;&#xFF08;ListenerConsumer&#xFF09;&#x5904;&#x7406;&#x4E4B;&#x540E;&#x63D0;&#x4EA4;
      # batch&#xFF1A;&#x5F53;&#x6BCF;&#x4E00;&#x6279;poll()&#x7684;&#x6570;&#x636E;&#x88AB;ListenerConsumer&#x5904;&#x7406;&#x4E4B;&#x540E;&#x63D0;&#x4EA4;
      # time&#xFF1A;&#x5F53;&#x6BCF;&#x4E00;&#x6279;poll()&#x7684;&#x6570;&#x636E;&#x88AB;ListenerConsumer&#x5904;&#x7406;&#x4E4B;&#x540E;&#xFF0C;&#x8DDD;&#x79BB;&#x4E0A;&#x6B21;&#x63D0;&#x4EA4;&#x65F6;&#x95F4;&#x5927;&#x4E8E;TIME&#x65F6;&#x63D0;&#x4EA4;
      # count&#xFF1A;&#x5F53;&#x6BCF;&#x4E00;&#x6279;poll()&#x7684;&#x6570;&#x636E;&#x88AB;ListenerConsumer&#x5904;&#x7406;&#x4E4B;&#x540E;&#xFF0C;&#x88AB;&#x5904;&#x7406;record&#x6570;&#x91CF;&#x5927;&#x4E8E;&#x7B49;&#x4E8E;COUNT&#x65F6;&#x63D0;&#x4EA4;
      # count_time&#xFF1A;TIME&#x6216;COUNT&#x4E2D;&#x6709;&#x4E00;&#x4E2A;&#x6761;&#x4EF6;&#x6EE1;&#x8DB3;&#x65F6;&#x63D0;&#x4EA4;
      # manual&#xFF1A;&#x5F53;&#x6BCF;&#x4E00;&#x6279;poll()&#x7684;&#x6570;&#x636E;&#x88AB;ListenerConsumer&#x5904;&#x7406;&#x4E4B;&#x540E;, &#x624B;&#x52A8;&#x8C03;&#x7528;Acknowledgment.acknowledge()&#x540E;&#x63D0;&#x4EA4;
      # manual_immediate&#xFF1A;&#x624B;&#x52A8;&#x8C03;&#x7528;Acknowledgment.acknowledge()&#x540E;&#x7ACB;&#x5373;&#x63D0;&#x4EA4;&#xFF0C;&#x4E00;&#x822C;&#x63A8;&#x8350;&#x4F7F;&#x7528;&#x8FD9;&#x79CD;
      ack-mode: manual_immediate
      # &#x5728;&#x4FA6;&#x542C;&#x5668;&#x5BB9;&#x5668;&#x4E2D;&#x8FD0;&#x884C;&#x7684;&#x7EBF;&#x7A0B;&#x6570;&#x3002;
      concurrency: 5
 # &#x5176;&#x5B83;&#x5C5E;&#x6027;&#x914D;&#x7F6E;
 properties:
 # &#x8BBE;&#x7F6E;&#x53D1;&#x9001;&#x6D88;&#x606F;&#x7684;&#x5927;&#x5C0F;
    max.request.size: 10240000

2、创建生产者和消费者

生产者:

/**
 * cf
 * &#x751F;&#x4EA7;&#x8005;
 */
@RestController
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<string, object> kafkaTemplate;

    /**
     * &#x53D1;&#x9001;&#x6D88;&#x606F;
     * @param message
     */
    @PostMapping("/kafka")
    public void sendMessage1(@RequestBody String message) {
        kafkaTemplate.send("topic_test01", "cfKey",message);
    }

}
</string,>

消费者:

/**
 * cf
 * &#x6D88;&#x8D39;&#x8005;
 */
@Component
@Slf4j
public class KafkaConsumer {
    /**
     * kafka&#x7684;&#x76D1;&#x542C;&#x5668; &#x6D88;&#x8D39;&#x6D88;&#x606F;
     * @param record
     * @param item
     */
    @KafkaListener(topics = "topic_test01", groupId = "spring_customer")
    public void topicListener(ConsumerRecord<string, string> record, Acknowledgment item) {
        log.info("&#x6211;&#x5F00;&#x59CB;&#x6D88;&#x8D39;&#xFF1A;{}==,{}==,{};",record.topic(),record.partition(),record.value());
        //&#x624B;&#x52A8;&#x63D0;&#x4EA4;
        item.acknowledge();
    }
}
</string,>

在 Kafka 中消息通过服务器推送给各个消费者,而 Kafka 的消费者在消费消息时,需要提供一个监听器(Listener)对某个 Topic 实现监听,从而获取消息,这也是 Kafka 消费消息的唯一方式。

@KafkaListener 注解实现监听器,就是我们用到的。

源码:

@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
@MessageMapping
@Documented
@Repeatable(KafkaListeners.class)
public @interface KafkaListener {
    String id() default "";
    String containerFactory() default "";
    //消息 Topic
    String[] topics() default {};
    //Topic 的模式匹配表达式
    String topicPattern() default "";
    //Topic 分区
    TopicPartition[] topicPartitions() default {};
    String containerGroup() default "";
    String errorHandler() default "";
    //消息分组 Id
    String groupId() default "";
    boolean idIsGroup() default true;
    String clientIdPrefix() default "";
    String beanRef() default "__listener";
}

在使用 @KafkaListener 时,最核心的操作是设置 Topic,而 Kafka 还提供了一个模式匹配表达式可以对目标 Topic 实现灵活设置。
在这里,强调下 groupId 这个属性,这就涉及 Kafka 中另一个核心概念: 消费者分组(Consumer Group)。

设计消费者组的目的是应对集群环境下的多服务实例问题。

显然,如果采用发布-订阅模式会导致一个服务的不同实例可能会消费到同一条消息。

为了解决这个问题,Kafka 中提供了消费者组的概念。 一旦我们使用了消费组,一条消息只能被同一个组中的某一个服务实例所消费

3.测试一下

5.2.SpringBoot整合Kafka(开整)

成功:

5.2.SpringBoot整合Kafka(开整)
  1. 复杂操作1 (ConcurrentKafkaListenerContainerFactory)应用

kafka过滤器

1.我们在使用kafka的时候,会遇到某一些消息过滤的情况,所以我们需要配置过滤器,来清洗kafka中的数据。
2.有时候我们需要配置并发消费,所以要Specify the container concurrency.指定容器并发性

配置如下:


/**
 * cf
 * 消费者消息过滤器
 */
@Configuration
public class CustomKafkaFilter {
    @Autowired
    private ConsumerFactory consumerFactory;
    @Bean("concurrentKafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory factory =
                new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory);
        //指定容器并发性(配置文件配置也可)
        factory.setConcurrency(3);
        //开启批量消费 (配置文件配置也可)
        factory.setBatchListener(true);
        // 被过滤的消息将被丢弃
        factory.setAckDiscarded(true);
        // 设置记录筛选策略
        factory.setRecordFilterStrategy(new RecordFilterStrategy() {
            @Override
            public boolean filter(ConsumerRecord consumerRecord) {
                String msg = consumerRecord.value().toString();
                 //这里写筛选规则
                return true;// 返回true消息将会被丢弃
            }
        });
        return factory;
    }
}

注意: @Bean(“concurrentKafkaListenerContainerFactory”)一定要指定名称,否则spring会认为没有这个bean自动创建一个,会出现重复执行问题。

  1. 复杂操作2 封装 kafka生产者

工具类KafkaAnalyzeProducer:


import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.FailureCallback;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.util.concurrent.SuccessCallback;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * kafka生产者
 */
@Slf4j
@Component
public class KafkaAnalyzeProducer {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    /**
     * 发送数据到kafka
     *
     * @param topic        topic名称
     * @param message      发送信息字符串
     * @param sendCallBack 发送回调
     */
    public void send(String topic, String message, SendCallBack sendCallBack) {

        ListenableFuture listenableFuture = kafkaTemplate.send(topic, message);
        //发送成功后回调
        SuccessCallback successCallback = new SuccessCallback() {
            @SneakyThrows
            @Override
            public void onSuccess(Object result) {
                sendCallBack.sendSuccessCallBack(topic, message);
            }
        };
        //发送失败回调
        FailureCallback failureCallback = new FailureCallback() {
            @SneakyThrows
            @Override
            public void onFailure(Throwable ex) {
                sendCallBack.sendFailCallBack(topic, message, ex);
            }
        };

        listenableFuture.addCallback(successCallback, failureCallback);
    }

    /**
     * producer 同步方式发送数据
     *
     * @param topic   topic名称
     * @param message producer发送的数据
     */
    public void sendAsynchronize(String topic, String message) throws InterruptedException, ExecutionException, TimeoutException {
        kafkaTemplate.send(topic, message).get(10, TimeUnit.SECONDS);
    }

    /**
     * producer 异步方式发送数据
     *
     * @param topic   topic名称
     * @param message producer发送的数据
     */
    public void sendSynchronize(String topic, String message) {
        kafkaTemplate.send(topic, message).addCallback(new ListenableFutureCallback() {
            @Override
            public void onFailure(Throwable throwable) {
                log.error("----事件kafka记录解析完成放入topic:{},发送失败{}", topic, message, throwable);
            }

            @Override
            public void onSuccess(Object o) {
                log.info("----事件kafka记录解析完成放入topic:{},发送成功:{}", topic, message);
            }
        });
    }

    public void sendMessage(String topic, String message) {
        this.kafkaTemplate.send(topic, message);
    }
}

SendCallBack:

import java.text.ParseException;

public interface SendCallBack {

    /**
     * 生产成功回调
     * @param topic topic
     * @param msg 信息字符串
     */
    void sendSuccessCallBack(String topic,String msg) throws ParseException;

    /**
     * 生产失败回调
     * @param topic topic
     * @param msg 信息字符串
     * @param ex 异常
     */
    void sendFailCallBack(String topic,String msg,Throwable ex) throws ParseException;

}

使用方式:

    @Autowired
    private KafkaAnalyzeProducer kafkaAnalyzeProducer;

// 下面的放方法里面就好了
kafkaAnalyzeProducer.send(topic,JSON.toJSONString(itemMap,SerializerFeature.WriteMapNullValue),new SendCallBack() {
        @Override
        public void sendSuccessCallBack (String topic, String msg){
            JSONObject jsonObject = JSONObject.parseObject(msg);

            log.info("----sendSuccessCallBack事件kafka记录解析完成放入topic:{},发送成功:{}", topic, jsonObject.getOrDefault("id", "").toString());
        }

        @Override
        public void sendFailCallBack (String topic, String msg, Throwable ex){
            JSONObject jsonObject = JSONObject.parseObject(msg);

            log.error("----sendFailCallBack事件kafka记录解析完成放入topic:{},发送失败{}", topic, jsonObject.getOrDefault("id", "").toString(), ex);
        }
    });

ok基本上就分享这些就够用了。

Original: https://www.cnblogs.com/daohangtaiqian/p/16537782.html
Author: 道行太浅
Title: 5.2.SpringBoot整合Kafka(开整)

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/713389/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • RuntimeError: No response returned.报错分析与解决方案

    这是在做开源项目的时候遇到的问题,程序部署上线后不定时会突然出现这样一条报错,终于被搞烦了决定彻底查清原因。 这是我正在使用的版本 fastapi==0.78.0 uvicorn=…

    技术杂谈 2023年6月21日
    0117
  • 词表示

    在NLP领域,自然语言通常是指以文本的形式存在,但是计算无法对这些文本数据进行计算,通常需要将这些文本数据转换为一系列的数值进行计算。那么具体怎么做的呢?这里就用到词向量的概念。 …

    技术杂谈 2023年7月11日
    085
  • MIUI 蓝牙不能微信钉钉通话

    MIUI 系统不能 使用蓝牙耳机进行通话(微信,钉钉),音乐没有问题; 问题: 打电话能用蓝牙,听音乐也没有问题; 发现只有 开会的时候和微信语音视频的时候不能,只能听筒和外放; …

    技术杂谈 2023年5月31日
    0212
  • java学习之springboot

    0x00前言 呀呀呀时隔好久我又来做笔记了,上个月去大型保密活动了,这里在网上看了一些教程如果说不是去做java开发我就不做ssm的手动整合了采用springboot去一并开发。S…

    技术杂谈 2023年6月21日
    0114
  • 如何找回QQ聊天记录、语音、图片?

    多图长图预警,本教程适用于 安卓手机 认真仔细看完答案的成功几率翻倍哟! 请各位认真看答案!求您了~ 2020年/4/4日 更新 人民不会忘记,祖国不会忘记,我们不会忘记,先烈不朽…

    技术杂谈 2023年6月21日
    098
  • 内存泄漏过度释放检測

    假设出现exc_bad_access错误,基本上是因为内存泄漏,错误释放。对一个已经释放的对象进行release操作。可是xcode有时候不会告诉你错误在什么地方(Visual S…

    技术杂谈 2023年5月31日
    088
  • TCP/IP和UDP

    TCP/IP即传输控制/网络协议,是面向连接的协议,发送数据前要先建立连接(发送方和接收方的成对的两个之间必须建 立连接),TCP提供可靠的服务,也就是说,通过TCP连接传输的数据…

    技术杂谈 2023年7月24日
    065
  • 关于测试的讨论从未停止过

    关于测试的讨论从未停止过,最近忙着推荐 seldom 测试框架,所以,上testerhome 次数增加。 有人发贴子 &#x6280;&#x672F;&#x…

    技术杂谈 2023年5月31日
    098
  • 全新升级的AOP框架Dora.Interception[6]: 框架设计和实现原理

    目录一、调用链抽象二、基于约定的拦截器定义三、基于调用上下文的依赖注入容器四、拦截器的提供五、调用链的构建六、方法拦截的实现原理七、依赖注入框架的整合八、看看生成的代理类 从设计模…

    技术杂谈 2023年5月31日
    0114
  • label文本框

    *label文本框私有属性 属性 说明 value 值,通用用$引用变量 prefix value的前缀 suffix value的后缀 lines 文本行数0或不设置,则默认1行…

    技术杂谈 2023年6月1日
    098
  • Mac下Charles踩坑记录

    初次使用Charles,摸索着抓包的过程中遇到了很多问题。在这里一一记录一下,避免其他初学者踩坑。 问题1:不显示request和response? 在Charles的配置页面可以…

    技术杂谈 2023年7月24日
    067
  • 2022.21 Web页面和本地程序通信的方法

    因为安全限制,web页面不能直接操作本地电脑资源,因此一些访问本地文件或操作本地设备之类的操作需要客户端本地程序来做,这就涉及到web页面与客户端电脑本地程序通信交互。 以前主流浏…

    技术杂谈 2023年5月30日
    094
  • 树莓派OLED模块的使用教程大量例程详解

    简介 Python有两个可以用的OLED库 [Adafruit_Python_SSD1306库]—>只支持SSD1306 [Luma.oled库]—>支持 SSD130…

    技术杂谈 2023年7月23日
    094
  • PyTorch 介绍 | 优化模型参数

    既然已经有模型和数据了,是时候在数据上优化模型参数来训练、验证和测试它了。模型训练是一个迭代过程;在每一次迭代( epoch),模型会作出一个预测,计算其预测误差( loss),收…

    技术杂谈 2023年7月25日
    085
  • Docker基本命令

    &#x6CE8;&#x610F;:&#x5220;&#x9664;&#x955C;&#x50CF;,&#x6B63;&amp…

    技术杂谈 2023年6月21日
    099
  • lsmod——显示已载入系统的模块

    lsmod——显示已载入系统的模块 lsmod 其实就是list modules的缩写,即 列出所有模块. 功能说明:显示已载入系统的模块。 语法:lsmod 说明:执行lsmod…

    技术杂谈 2023年5月31日
    090
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球