5.2.SpringBoot整合Kafka(开整)

1、准备工作

pom配置:

    <dependency>
        <groupid>org.springframework.kafka</groupid>
        <artifactid>spring-kafka</artifactid>
        <version>2.8.8</version>
    </dependency>

application-dev.yml配置 (配置在spring下):

 kafka:
    bootstrap-servers:
      - localhost:9092
    template:
      default-topic: demo.topic
    producer:
      retries: 3 # &#x91CD;&#x8BD5;&#x6B21;&#x6570;&#xFF0C;&#x8BBE;&#x7F6E;&#x5927;&#x4E8E;0&#x7684;&#x503C;&#xFF0C;&#x5219;&#x5BA2;&#x6237;&#x7AEF;&#x4F1A;&#x5C06;&#x53D1;&#x9001;&#x5931;&#x8D25;&#x7684;&#x8BB0;&#x5F55;&#x91CD;&#x65B0;&#x53D1;&#x9001;
      batch-size: 16384 #&#x6279;&#x91CF;&#x5904;&#x7406;&#x5927;&#x5C0F;&#xFF0C;16K
      buffer-memory: 33554432 #&#x7F13;&#x51B2;&#x5B58;&#x50A8;&#x5927;&#xFF0C;32M
      acks: 1
      # &#x6307;&#x5B9A;&#x6D88;&#x606F;key&#x548C;&#x6D88;&#x606F;&#x4F53;&#x7684;&#x7F16;&#x89E3;&#x7801;&#x65B9;&#x5F0F;
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      # &#x662F;&#x5426;&#x81EA;&#x52A8;&#x63D0;&#x4EA4;(&#x624B;&#x52A8;&#x63D0;&#x4EA4;&#x8981;&#x5173;&#x95ED;&#xFF0C;&#x4E0D;&#x7136;&#x4F1A;&#x62A5;&#x9519;)
      group-id: spring_customer
      enable-auto-commit: false
      # &#x6D88;&#x8D39;&#x504F;&#x79FB;&#x914D;&#x7F6E;
      # none&#xFF1A;&#x5982;&#x679C;&#x6CA1;&#x6709;&#x4E3A;&#x6D88;&#x8D39;&#x8005;&#x627E;&#x5230;&#x5148;&#x524D;&#x7684;offset&#x7684;&#x503C;,&#x5373;&#x6CA1;&#x6709;&#x81EA;&#x52A8;&#x7EF4;&#x62A4;&#x504F;&#x79FB;&#x91CF;,&#x4E5F;&#x6CA1;&#x6709;&#x624B;&#x52A8;&#x7EF4;&#x62A4;&#x504F;&#x79FB;&#x91CF;,&#x5219;&#x629B;&#x51FA;&#x5F02;&#x5E38;
      # earliest&#xFF1A;&#x5728;&#x5404;&#x5206;&#x533A;&#x4E0B;&#x6709;&#x63D0;&#x4EA4;&#x7684;offset&#x65F6;&#xFF1A;&#x4ECE;offset&#x5904;&#x5F00;&#x59CB;&#x6D88;&#x8D39;&#xFF1B;&#x5728;&#x5404;&#x5206;&#x533A;&#x4E0B;&#x65E0;&#x63D0;&#x4EA4;&#x7684;offset&#x65F6;&#xFF1A;&#x4ECE;&#x5934;&#x5F00;&#x59CB;&#x6D88;&#x8D39;
      # latest&#xFF1A;&#x5728;&#x5404;&#x5206;&#x533A;&#x4E0B;&#x6709;&#x63D0;&#x4EA4;&#x7684;offset&#x65F6;&#xFF1A;&#x4ECE;offset&#x5904;&#x5F00;&#x59CB;&#x6D88;&#x8D39;&#xFF1B;&#x5728;&#x5404;&#x5206;&#x533A;&#x4E0B;&#x65E0;&#x63D0;&#x4EA4;&#x7684;offset&#x65F6;&#xFF1A;&#x4ECE;&#x6700;&#x65B0;&#x7684;&#x6570;&#x636E;&#x5F00;&#x59CB;&#x6D88;&#x8D39;
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    # &#x76D1;&#x542C;
    listener:
      # record&#xFF1A;&#x5F53;&#x6BCF;&#x4E00;&#x6761;&#x8BB0;&#x5F55;&#x88AB;&#x6D88;&#x8D39;&#x8005;&#x76D1;&#x542C;&#x5668;&#xFF08;ListenerConsumer&#xFF09;&#x5904;&#x7406;&#x4E4B;&#x540E;&#x63D0;&#x4EA4;
      # batch&#xFF1A;&#x5F53;&#x6BCF;&#x4E00;&#x6279;poll()&#x7684;&#x6570;&#x636E;&#x88AB;ListenerConsumer&#x5904;&#x7406;&#x4E4B;&#x540E;&#x63D0;&#x4EA4;
      # time&#xFF1A;&#x5F53;&#x6BCF;&#x4E00;&#x6279;poll()&#x7684;&#x6570;&#x636E;&#x88AB;ListenerConsumer&#x5904;&#x7406;&#x4E4B;&#x540E;&#xFF0C;&#x8DDD;&#x79BB;&#x4E0A;&#x6B21;&#x63D0;&#x4EA4;&#x65F6;&#x95F4;&#x5927;&#x4E8E;TIME&#x65F6;&#x63D0;&#x4EA4;
      # count&#xFF1A;&#x5F53;&#x6BCF;&#x4E00;&#x6279;poll()&#x7684;&#x6570;&#x636E;&#x88AB;ListenerConsumer&#x5904;&#x7406;&#x4E4B;&#x540E;&#xFF0C;&#x88AB;&#x5904;&#x7406;record&#x6570;&#x91CF;&#x5927;&#x4E8E;&#x7B49;&#x4E8E;COUNT&#x65F6;&#x63D0;&#x4EA4;
      # count_time&#xFF1A;TIME&#x6216;COUNT&#x4E2D;&#x6709;&#x4E00;&#x4E2A;&#x6761;&#x4EF6;&#x6EE1;&#x8DB3;&#x65F6;&#x63D0;&#x4EA4;
      # manual&#xFF1A;&#x5F53;&#x6BCF;&#x4E00;&#x6279;poll()&#x7684;&#x6570;&#x636E;&#x88AB;ListenerConsumer&#x5904;&#x7406;&#x4E4B;&#x540E;, &#x624B;&#x52A8;&#x8C03;&#x7528;Acknowledgment.acknowledge()&#x540E;&#x63D0;&#x4EA4;
      # manual_immediate&#xFF1A;&#x624B;&#x52A8;&#x8C03;&#x7528;Acknowledgment.acknowledge()&#x540E;&#x7ACB;&#x5373;&#x63D0;&#x4EA4;&#xFF0C;&#x4E00;&#x822C;&#x63A8;&#x8350;&#x4F7F;&#x7528;&#x8FD9;&#x79CD;
      ack-mode: manual_immediate
      # &#x5728;&#x4FA6;&#x542C;&#x5668;&#x5BB9;&#x5668;&#x4E2D;&#x8FD0;&#x884C;&#x7684;&#x7EBF;&#x7A0B;&#x6570;&#x3002;
      concurrency: 5
 # &#x5176;&#x5B83;&#x5C5E;&#x6027;&#x914D;&#x7F6E;
 properties:
 # &#x8BBE;&#x7F6E;&#x53D1;&#x9001;&#x6D88;&#x606F;&#x7684;&#x5927;&#x5C0F;
    max.request.size: 10240000

2、创建生产者和消费者

生产者:

/**
 * cf
 * &#x751F;&#x4EA7;&#x8005;
 */
@RestController
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<string, object> kafkaTemplate;

    /**
     * &#x53D1;&#x9001;&#x6D88;&#x606F;
     * @param message
     */
    @PostMapping("/kafka")
    public void sendMessage1(@RequestBody String message) {
        kafkaTemplate.send("topic_test01", "cfKey",message);
    }

}
</string,>

消费者:

/**
 * cf
 * &#x6D88;&#x8D39;&#x8005;
 */
@Component
@Slf4j
public class KafkaConsumer {
    /**
     * kafka&#x7684;&#x76D1;&#x542C;&#x5668; &#x6D88;&#x8D39;&#x6D88;&#x606F;
     * @param record
     * @param item
     */
    @KafkaListener(topics = "topic_test01", groupId = "spring_customer")
    public void topicListener(ConsumerRecord<string, string> record, Acknowledgment item) {
        log.info("&#x6211;&#x5F00;&#x59CB;&#x6D88;&#x8D39;&#xFF1A;{}==,{}==,{};",record.topic(),record.partition(),record.value());
        //&#x624B;&#x52A8;&#x63D0;&#x4EA4;
        item.acknowledge();
    }
}
</string,>

在 Kafka 中消息通过服务器推送给各个消费者,而 Kafka 的消费者在消费消息时,需要提供一个监听器(Listener)对某个 Topic 实现监听,从而获取消息,这也是 Kafka 消费消息的唯一方式。

@KafkaListener 注解实现监听器,就是我们用到的。

源码:

@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
@MessageMapping
@Documented
@Repeatable(KafkaListeners.class)
public @interface KafkaListener {
    String id() default "";
    String containerFactory() default "";
    //消息 Topic
    String[] topics() default {};
    //Topic 的模式匹配表达式
    String topicPattern() default "";
    //Topic 分区
    TopicPartition[] topicPartitions() default {};
    String containerGroup() default "";
    String errorHandler() default "";
    //消息分组 Id
    String groupId() default "";
    boolean idIsGroup() default true;
    String clientIdPrefix() default "";
    String beanRef() default "__listener";
}

在使用 @KafkaListener 时,最核心的操作是设置 Topic,而 Kafka 还提供了一个模式匹配表达式可以对目标 Topic 实现灵活设置。
在这里,强调下 groupId 这个属性,这就涉及 Kafka 中另一个核心概念: 消费者分组(Consumer Group)。

设计消费者组的目的是应对集群环境下的多服务实例问题。

显然,如果采用发布-订阅模式会导致一个服务的不同实例可能会消费到同一条消息。

为了解决这个问题,Kafka 中提供了消费者组的概念。 一旦我们使用了消费组,一条消息只能被同一个组中的某一个服务实例所消费

3.测试一下

5.2.SpringBoot整合Kafka(开整)

成功:

5.2.SpringBoot整合Kafka(开整)
  1. 复杂操作1 (ConcurrentKafkaListenerContainerFactory)应用

kafka过滤器

1.我们在使用kafka的时候,会遇到某一些消息过滤的情况,所以我们需要配置过滤器,来清洗kafka中的数据。
2.有时候我们需要配置并发消费,所以要Specify the container concurrency.指定容器并发性

配置如下:


/**
 * cf
 * 消费者消息过滤器
 */
@Configuration
public class CustomKafkaFilter {
    @Autowired
    private ConsumerFactory consumerFactory;
    @Bean("concurrentKafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory factory =
                new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory);
        //指定容器并发性(配置文件配置也可)
        factory.setConcurrency(3);
        //开启批量消费 (配置文件配置也可)
        factory.setBatchListener(true);
        // 被过滤的消息将被丢弃
        factory.setAckDiscarded(true);
        // 设置记录筛选策略
        factory.setRecordFilterStrategy(new RecordFilterStrategy() {
            @Override
            public boolean filter(ConsumerRecord consumerRecord) {
                String msg = consumerRecord.value().toString();
                 //这里写筛选规则
                return true;// 返回true消息将会被丢弃
            }
        });
        return factory;
    }
}

注意: @Bean(“concurrentKafkaListenerContainerFactory”)一定要指定名称,否则spring会认为没有这个bean自动创建一个,会出现重复执行问题。

  1. 复杂操作2 封装 kafka生产者

工具类KafkaAnalyzeProducer:


import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.FailureCallback;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.util.concurrent.SuccessCallback;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * kafka生产者
 */
@Slf4j
@Component
public class KafkaAnalyzeProducer {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    /**
     * 发送数据到kafka
     *
     * @param topic        topic名称
     * @param message      发送信息字符串
     * @param sendCallBack 发送回调
     */
    public void send(String topic, String message, SendCallBack sendCallBack) {

        ListenableFuture listenableFuture = kafkaTemplate.send(topic, message);
        //发送成功后回调
        SuccessCallback successCallback = new SuccessCallback() {
            @SneakyThrows
            @Override
            public void onSuccess(Object result) {
                sendCallBack.sendSuccessCallBack(topic, message);
            }
        };
        //发送失败回调
        FailureCallback failureCallback = new FailureCallback() {
            @SneakyThrows
            @Override
            public void onFailure(Throwable ex) {
                sendCallBack.sendFailCallBack(topic, message, ex);
            }
        };

        listenableFuture.addCallback(successCallback, failureCallback);
    }

    /**
     * producer 同步方式发送数据
     *
     * @param topic   topic名称
     * @param message producer发送的数据
     */
    public void sendAsynchronize(String topic, String message) throws InterruptedException, ExecutionException, TimeoutException {
        kafkaTemplate.send(topic, message).get(10, TimeUnit.SECONDS);
    }

    /**
     * producer 异步方式发送数据
     *
     * @param topic   topic名称
     * @param message producer发送的数据
     */
    public void sendSynchronize(String topic, String message) {
        kafkaTemplate.send(topic, message).addCallback(new ListenableFutureCallback() {
            @Override
            public void onFailure(Throwable throwable) {
                log.error("----事件kafka记录解析完成放入topic:{},发送失败{}", topic, message, throwable);
            }

            @Override
            public void onSuccess(Object o) {
                log.info("----事件kafka记录解析完成放入topic:{},发送成功:{}", topic, message);
            }
        });
    }

    public void sendMessage(String topic, String message) {
        this.kafkaTemplate.send(topic, message);
    }
}

SendCallBack:

import java.text.ParseException;

public interface SendCallBack {

    /**
     * 生产成功回调
     * @param topic topic
     * @param msg 信息字符串
     */
    void sendSuccessCallBack(String topic,String msg) throws ParseException;

    /**
     * 生产失败回调
     * @param topic topic
     * @param msg 信息字符串
     * @param ex 异常
     */
    void sendFailCallBack(String topic,String msg,Throwable ex) throws ParseException;

}

使用方式:

    @Autowired
    private KafkaAnalyzeProducer kafkaAnalyzeProducer;

// 下面的放方法里面就好了
kafkaAnalyzeProducer.send(topic,JSON.toJSONString(itemMap,SerializerFeature.WriteMapNullValue),new SendCallBack() {
        @Override
        public void sendSuccessCallBack (String topic, String msg){
            JSONObject jsonObject = JSONObject.parseObject(msg);

            log.info("----sendSuccessCallBack事件kafka记录解析完成放入topic:{},发送成功:{}", topic, jsonObject.getOrDefault("id", "").toString());
        }

        @Override
        public void sendFailCallBack (String topic, String msg, Throwable ex){
            JSONObject jsonObject = JSONObject.parseObject(msg);

            log.error("----sendFailCallBack事件kafka记录解析完成放入topic:{},发送失败{}", topic, jsonObject.getOrDefault("id", "").toString(), ex);
        }
    });

ok基本上就分享这些就够用了。

Original: https://www.cnblogs.com/daohangtaiqian/p/16537782.html
Author: 道行太浅
Title: 5.2.SpringBoot整合Kafka(开整)

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/594396/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球