5.2.SpringBoot整合Kafka(开整)

1、准备工作

pom配置:

    <dependency>
        <groupid>org.springframework.kafka</groupid>
        <artifactid>spring-kafka</artifactid>
        <version>2.8.8</version>
    </dependency>

application-dev.yml配置 (配置在spring下):

 kafka:
    bootstrap-servers:
      - localhost:9092
    template:
      default-topic: demo.topic
    producer:
      retries: 3 # &#x91CD;&#x8BD5;&#x6B21;&#x6570;&#xFF0C;&#x8BBE;&#x7F6E;&#x5927;&#x4E8E;0&#x7684;&#x503C;&#xFF0C;&#x5219;&#x5BA2;&#x6237;&#x7AEF;&#x4F1A;&#x5C06;&#x53D1;&#x9001;&#x5931;&#x8D25;&#x7684;&#x8BB0;&#x5F55;&#x91CD;&#x65B0;&#x53D1;&#x9001;
      batch-size: 16384 #&#x6279;&#x91CF;&#x5904;&#x7406;&#x5927;&#x5C0F;&#xFF0C;16K
      buffer-memory: 33554432 #&#x7F13;&#x51B2;&#x5B58;&#x50A8;&#x5927;&#xFF0C;32M
      acks: 1
      # &#x6307;&#x5B9A;&#x6D88;&#x606F;key&#x548C;&#x6D88;&#x606F;&#x4F53;&#x7684;&#x7F16;&#x89E3;&#x7801;&#x65B9;&#x5F0F;
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      # &#x662F;&#x5426;&#x81EA;&#x52A8;&#x63D0;&#x4EA4;(&#x624B;&#x52A8;&#x63D0;&#x4EA4;&#x8981;&#x5173;&#x95ED;&#xFF0C;&#x4E0D;&#x7136;&#x4F1A;&#x62A5;&#x9519;)
      group-id: spring_customer
      enable-auto-commit: false
      # &#x6D88;&#x8D39;&#x504F;&#x79FB;&#x914D;&#x7F6E;
      # none&#xFF1A;&#x5982;&#x679C;&#x6CA1;&#x6709;&#x4E3A;&#x6D88;&#x8D39;&#x8005;&#x627E;&#x5230;&#x5148;&#x524D;&#x7684;offset&#x7684;&#x503C;,&#x5373;&#x6CA1;&#x6709;&#x81EA;&#x52A8;&#x7EF4;&#x62A4;&#x504F;&#x79FB;&#x91CF;,&#x4E5F;&#x6CA1;&#x6709;&#x624B;&#x52A8;&#x7EF4;&#x62A4;&#x504F;&#x79FB;&#x91CF;,&#x5219;&#x629B;&#x51FA;&#x5F02;&#x5E38;
      # earliest&#xFF1A;&#x5728;&#x5404;&#x5206;&#x533A;&#x4E0B;&#x6709;&#x63D0;&#x4EA4;&#x7684;offset&#x65F6;&#xFF1A;&#x4ECE;offset&#x5904;&#x5F00;&#x59CB;&#x6D88;&#x8D39;&#xFF1B;&#x5728;&#x5404;&#x5206;&#x533A;&#x4E0B;&#x65E0;&#x63D0;&#x4EA4;&#x7684;offset&#x65F6;&#xFF1A;&#x4ECE;&#x5934;&#x5F00;&#x59CB;&#x6D88;&#x8D39;
      # latest&#xFF1A;&#x5728;&#x5404;&#x5206;&#x533A;&#x4E0B;&#x6709;&#x63D0;&#x4EA4;&#x7684;offset&#x65F6;&#xFF1A;&#x4ECE;offset&#x5904;&#x5F00;&#x59CB;&#x6D88;&#x8D39;&#xFF1B;&#x5728;&#x5404;&#x5206;&#x533A;&#x4E0B;&#x65E0;&#x63D0;&#x4EA4;&#x7684;offset&#x65F6;&#xFF1A;&#x4ECE;&#x6700;&#x65B0;&#x7684;&#x6570;&#x636E;&#x5F00;&#x59CB;&#x6D88;&#x8D39;
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    # &#x76D1;&#x542C;
    listener:
      # record&#xFF1A;&#x5F53;&#x6BCF;&#x4E00;&#x6761;&#x8BB0;&#x5F55;&#x88AB;&#x6D88;&#x8D39;&#x8005;&#x76D1;&#x542C;&#x5668;&#xFF08;ListenerConsumer&#xFF09;&#x5904;&#x7406;&#x4E4B;&#x540E;&#x63D0;&#x4EA4;
      # batch&#xFF1A;&#x5F53;&#x6BCF;&#x4E00;&#x6279;poll()&#x7684;&#x6570;&#x636E;&#x88AB;ListenerConsumer&#x5904;&#x7406;&#x4E4B;&#x540E;&#x63D0;&#x4EA4;
      # time&#xFF1A;&#x5F53;&#x6BCF;&#x4E00;&#x6279;poll()&#x7684;&#x6570;&#x636E;&#x88AB;ListenerConsumer&#x5904;&#x7406;&#x4E4B;&#x540E;&#xFF0C;&#x8DDD;&#x79BB;&#x4E0A;&#x6B21;&#x63D0;&#x4EA4;&#x65F6;&#x95F4;&#x5927;&#x4E8E;TIME&#x65F6;&#x63D0;&#x4EA4;
      # count&#xFF1A;&#x5F53;&#x6BCF;&#x4E00;&#x6279;poll()&#x7684;&#x6570;&#x636E;&#x88AB;ListenerConsumer&#x5904;&#x7406;&#x4E4B;&#x540E;&#xFF0C;&#x88AB;&#x5904;&#x7406;record&#x6570;&#x91CF;&#x5927;&#x4E8E;&#x7B49;&#x4E8E;COUNT&#x65F6;&#x63D0;&#x4EA4;
      # count_time&#xFF1A;TIME&#x6216;COUNT&#x4E2D;&#x6709;&#x4E00;&#x4E2A;&#x6761;&#x4EF6;&#x6EE1;&#x8DB3;&#x65F6;&#x63D0;&#x4EA4;
      # manual&#xFF1A;&#x5F53;&#x6BCF;&#x4E00;&#x6279;poll()&#x7684;&#x6570;&#x636E;&#x88AB;ListenerConsumer&#x5904;&#x7406;&#x4E4B;&#x540E;, &#x624B;&#x52A8;&#x8C03;&#x7528;Acknowledgment.acknowledge()&#x540E;&#x63D0;&#x4EA4;
      # manual_immediate&#xFF1A;&#x624B;&#x52A8;&#x8C03;&#x7528;Acknowledgment.acknowledge()&#x540E;&#x7ACB;&#x5373;&#x63D0;&#x4EA4;&#xFF0C;&#x4E00;&#x822C;&#x63A8;&#x8350;&#x4F7F;&#x7528;&#x8FD9;&#x79CD;
      ack-mode: manual_immediate
      # &#x5728;&#x4FA6;&#x542C;&#x5668;&#x5BB9;&#x5668;&#x4E2D;&#x8FD0;&#x884C;&#x7684;&#x7EBF;&#x7A0B;&#x6570;&#x3002;
      concurrency: 5
 # &#x5176;&#x5B83;&#x5C5E;&#x6027;&#x914D;&#x7F6E;
 properties:
 # &#x8BBE;&#x7F6E;&#x53D1;&#x9001;&#x6D88;&#x606F;&#x7684;&#x5927;&#x5C0F;
    max.request.size: 10240000

2、创建生产者和消费者

生产者:

/**
 * cf
 * &#x751F;&#x4EA7;&#x8005;
 */
@RestController
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<string, object> kafkaTemplate;

    /**
     * &#x53D1;&#x9001;&#x6D88;&#x606F;
     * @param message
     */
    @PostMapping("/kafka")
    public void sendMessage1(@RequestBody String message) {
        kafkaTemplate.send("topic_test01", "cfKey",message);
    }

}
</string,>

消费者:

/**
 * cf
 * &#x6D88;&#x8D39;&#x8005;
 */
@Component
@Slf4j
public class KafkaConsumer {
    /**
     * kafka&#x7684;&#x76D1;&#x542C;&#x5668; &#x6D88;&#x8D39;&#x6D88;&#x606F;
     * @param record
     * @param item
     */
    @KafkaListener(topics = "topic_test01", groupId = "spring_customer")
    public void topicListener(ConsumerRecord<string, string> record, Acknowledgment item) {
        log.info("&#x6211;&#x5F00;&#x59CB;&#x6D88;&#x8D39;&#xFF1A;{}==,{}==,{};",record.topic(),record.partition(),record.value());
        //&#x624B;&#x52A8;&#x63D0;&#x4EA4;
        item.acknowledge();
    }
}
</string,>

在 Kafka 中消息通过服务器推送给各个消费者,而 Kafka 的消费者在消费消息时,需要提供一个监听器(Listener)对某个 Topic 实现监听,从而获取消息,这也是 Kafka 消费消息的唯一方式。

@KafkaListener 注解实现监听器,就是我们用到的。

源码:

@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
@MessageMapping
@Documented
@Repeatable(KafkaListeners.class)
public @interface KafkaListener {
    String id() default "";
    String containerFactory() default "";
    //消息 Topic
    String[] topics() default {};
    //Topic 的模式匹配表达式
    String topicPattern() default "";
    //Topic 分区
    TopicPartition[] topicPartitions() default {};
    String containerGroup() default "";
    String errorHandler() default "";
    //消息分组 Id
    String groupId() default "";
    boolean idIsGroup() default true;
    String clientIdPrefix() default "";
    String beanRef() default "__listener";
}

在使用 @KafkaListener 时,最核心的操作是设置 Topic,而 Kafka 还提供了一个模式匹配表达式可以对目标 Topic 实现灵活设置。
在这里,强调下 groupId 这个属性,这就涉及 Kafka 中另一个核心概念: 消费者分组(Consumer Group)。

设计消费者组的目的是应对集群环境下的多服务实例问题。

显然,如果采用发布-订阅模式会导致一个服务的不同实例可能会消费到同一条消息。

为了解决这个问题,Kafka 中提供了消费者组的概念。 一旦我们使用了消费组,一条消息只能被同一个组中的某一个服务实例所消费

3.测试一下

5.2.SpringBoot整合Kafka(开整)

成功:

5.2.SpringBoot整合Kafka(开整)
  1. 复杂操作1 (ConcurrentKafkaListenerContainerFactory)应用

kafka过滤器

1.我们在使用kafka的时候,会遇到某一些消息过滤的情况,所以我们需要配置过滤器,来清洗kafka中的数据。
2.有时候我们需要配置并发消费,所以要Specify the container concurrency.指定容器并发性

配置如下:


/**
 * cf
 * 消费者消息过滤器
 */
@Configuration
public class CustomKafkaFilter {
    @Autowired
    private ConsumerFactory consumerFactory;
    @Bean("concurrentKafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory factory =
                new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory);
        //指定容器并发性(配置文件配置也可)
        factory.setConcurrency(3);
        //开启批量消费 (配置文件配置也可)
        factory.setBatchListener(true);
        // 被过滤的消息将被丢弃
        factory.setAckDiscarded(true);
        // 设置记录筛选策略
        factory.setRecordFilterStrategy(new RecordFilterStrategy() {
            @Override
            public boolean filter(ConsumerRecord consumerRecord) {
                String msg = consumerRecord.value().toString();
                 //这里写筛选规则
                return true;// 返回true消息将会被丢弃
            }
        });
        return factory;
    }
}

注意: @Bean(“concurrentKafkaListenerContainerFactory”)一定要指定名称,否则spring会认为没有这个bean自动创建一个,会出现重复执行问题。

  1. 复杂操作2 封装 kafka生产者

工具类KafkaAnalyzeProducer:


import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.FailureCallback;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.util.concurrent.SuccessCallback;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * kafka生产者
 */
@Slf4j
@Component
public class KafkaAnalyzeProducer {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    /**
     * 发送数据到kafka
     *
     * @param topic        topic名称
     * @param message      发送信息字符串
     * @param sendCallBack 发送回调
     */
    public void send(String topic, String message, SendCallBack sendCallBack) {

        ListenableFuture listenableFuture = kafkaTemplate.send(topic, message);
        //发送成功后回调
        SuccessCallback successCallback = new SuccessCallback() {
            @SneakyThrows
            @Override
            public void onSuccess(Object result) {
                sendCallBack.sendSuccessCallBack(topic, message);
            }
        };
        //发送失败回调
        FailureCallback failureCallback = new FailureCallback() {
            @SneakyThrows
            @Override
            public void onFailure(Throwable ex) {
                sendCallBack.sendFailCallBack(topic, message, ex);
            }
        };

        listenableFuture.addCallback(successCallback, failureCallback);
    }

    /**
     * producer 同步方式发送数据
     *
     * @param topic   topic名称
     * @param message producer发送的数据
     */
    public void sendAsynchronize(String topic, String message) throws InterruptedException, ExecutionException, TimeoutException {
        kafkaTemplate.send(topic, message).get(10, TimeUnit.SECONDS);
    }

    /**
     * producer 异步方式发送数据
     *
     * @param topic   topic名称
     * @param message producer发送的数据
     */
    public void sendSynchronize(String topic, String message) {
        kafkaTemplate.send(topic, message).addCallback(new ListenableFutureCallback() {
            @Override
            public void onFailure(Throwable throwable) {
                log.error("----事件kafka记录解析完成放入topic:{},发送失败{}", topic, message, throwable);
            }

            @Override
            public void onSuccess(Object o) {
                log.info("----事件kafka记录解析完成放入topic:{},发送成功:{}", topic, message);
            }
        });
    }

    public void sendMessage(String topic, String message) {
        this.kafkaTemplate.send(topic, message);
    }
}

SendCallBack:

import java.text.ParseException;

public interface SendCallBack {

    /**
     * 生产成功回调
     * @param topic topic
     * @param msg 信息字符串
     */
    void sendSuccessCallBack(String topic,String msg) throws ParseException;

    /**
     * 生产失败回调
     * @param topic topic
     * @param msg 信息字符串
     * @param ex 异常
     */
    void sendFailCallBack(String topic,String msg,Throwable ex) throws ParseException;

}

使用方式:

    @Autowired
    private KafkaAnalyzeProducer kafkaAnalyzeProducer;

// 下面的放方法里面就好了
kafkaAnalyzeProducer.send(topic,JSON.toJSONString(itemMap,SerializerFeature.WriteMapNullValue),new SendCallBack() {
        @Override
        public void sendSuccessCallBack (String topic, String msg){
            JSONObject jsonObject = JSONObject.parseObject(msg);

            log.info("----sendSuccessCallBack事件kafka记录解析完成放入topic:{},发送成功:{}", topic, jsonObject.getOrDefault("id", "").toString());
        }

        @Override
        public void sendFailCallBack (String topic, String msg, Throwable ex){
            JSONObject jsonObject = JSONObject.parseObject(msg);

            log.error("----sendFailCallBack事件kafka记录解析完成放入topic:{},发送失败{}", topic, jsonObject.getOrDefault("id", "").toString(), ex);
        }
    });

ok基本上就分享这些就够用了。

Original: https://www.cnblogs.com/daohangtaiqian/p/16537782.html
Author: 道行太浅
Title: 5.2.SpringBoot整合Kafka(开整)

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/713389/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • centos 7 快速安装 nload 流量工具

    yum install -y gcc gcc-c++ ncurses-devel make wget wget http://www.roland-riegel.de/nload/…

    技术杂谈 2023年5月30日
    099
  • http代理连接

    基于Linux服务器的http代理连接 1. 准备工作 &#x76EE;&#x6807;&#x670D;&#x52A1;&#x5668; &…

    技术杂谈 2023年6月21日
    0123
  • Git简介

    Git是一个开源的分布式版本控制系统,是目前主流的版本控制系统,很多软件项目都会用它做源代码管理。Git的常用操作想必很多人都会,但是可能了解Git内部原理的人并不多。了解一些底层…

    技术杂谈 2023年7月11日
    099
  • 技术管理进阶——把控基建与业务的比例和节奏

    原创不易,求分享、求一键三连 前段时间有个粉丝问了一个问题: 小钗你好,我十分喜欢技术,但真的转到工程团队后又十分困惑:工作没人评价也没人push!做得好没人夸奖,做得差没人批评,…

    技术杂谈 2023年6月1日
    0120
  • Vue基础-文本显示,v-html插入html代码

    显示内容 {{}} {{ message }} 效果: Hello Vue.js! v-html 效果: aaa Original: https://www.cnblogs.com…

    技术杂谈 2023年5月31日
    0114
  • Docker容器网络(七)

    文章目录 概述 docker创建的默认网络 查看当前运行容器的网络 常用的网络驱动程序 * 自定义的network bridge(桥接网络驱动程序) overlay(覆盖网络驱动程…

    技术杂谈 2023年7月24日
    0136
  • jquery常用获取属性的方法有哪些(attr prop区别)

    jquery常用获取属性的方法有:1、attr()方法,可获取并返回指定属性的值,语法”$(selector).attr(“属性名”)&#822…

    技术杂谈 2023年5月31日
    0109
  • Hexo博客系列(三)-将Hexo v3.x个人博客发布到GitLab Pages

    【原文链接】:https://blog.tecchen.xyz ,博文同步发布到博客园。由于精力有限,对文章的更新可能不能及时同步,请点击上面的原文链接访问最新内容。欢迎访问我的个…

    技术杂谈 2023年7月11日
    0111
  • JAVA多线程实战

    由于项目上要和其他系统交互,而该系统采用同步接口,我们采用单线程调用,接收一条数据平均需要4~6s。而我们需要汇总近三个月的订单信息,大约一次有几千条数据,所以进行一次交互大概需要…

    技术杂谈 2023年7月24日
    0101
  • SpringBoot-Mybatis

    SpringBoot 整合 Mybatis SpringBoot-Mybatis 10.1 导入 MyBatis 所需要的依赖 org.mybatis.spring.boot my…

    技术杂谈 2023年6月21日
    0102
  • 文件系统预读【转】

    转自:https://www.cnblogs.com/linhaostudy/p/16126723.html 正文 所谓预读,是指文件系统为应用程序一次读出比预期更多的文件内容并缓…

    技术杂谈 2023年5月31日
    0112
  • LeetCode58.最后一个单词的长度

    给你一个字符串 s,由若干单词组成,单词前后用一些空格字符隔开。返回字符串中 最后一个 单词的长度。 单词 是指仅由字母组成、不包含任何空格字符的最大子字符串。 示例 1: 输入:…

    技术杂谈 2023年7月24日
    093
  • 阿里云有奖体验:如何通过ECS挂载NAS文件系统

    实验简介 本实验提供CentOS系统ECS一台和NAS文件服务。 NAS基于POSIX文件接口,天然适配原生操作系统,提供共享访问,同时保证数据一致性和锁互斥。它提供了简单的可扩展…

    技术杂谈 2023年7月11日
    0125
  • Java动态脚本Groovy读取配置文件

    前言:请各大网友尊重本人原创知识分享,谨记本人博客: 南国以南i 核心涉及: @Value:作用是通过注解将常量、配置文件中的值、其他bean的属性值注入到变量中,作为变量的初始值…

    技术杂谈 2023年7月11日
    0119
  • 在springboot中使用拦截器

    在springboot中使用拦截器 拦截器Interceptor,是SpringMVC中的核心内容,利用spring的AOP(Aspect Oriented Programming…

    技术杂谈 2023年7月11日
    0111
  • Redis

    一、Redis的介绍 什么是Redis Redis是一个开源的使用ANSI C语言编写Key-Value内存数据库 读写性能强,支持多种数据类型 把数据存储在内存中的高速缓存 Re…

    技术杂谈 2023年7月11日
    0117
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球