SpringBoot2整合kafka集群

参考

SpringBoot整合kafka集群
SpringBoot整合kafka(实现producer和consumer)

1. 在pom.xml中引入依赖


            org.springframework.kafka
            spring-kafka
            2.5.7.RELEASE

完整的 pom.xml文件如下


    4.0.0

        org.springframework.boot
        spring-boot-starter-parent
        2.3.5.RELEASE

    com.roncoo.eshop
    cache
    0.0.1-SNAPSHOT
    cache
    Demo project for Spring Boot

        5.2.10.RELEASE
        UTF-8
        1.8

            org.springframework.boot
            spring-boot-starter-data-redis

            org.springframework.boot
            spring-boot-starter-web

            org.springframework.boot
            spring-boot-starter-thymeleaf

            mysql
            mysql-connector-java
            5.1.49
            runtime

            org.mybatis.spring.boot
            mybatis-spring-boot-starter
            2.1.2

            org.springframework.boot
            spring-boot-starter-actuator

            redis.clients
            jedis
            3.3.0

            org.projectlombok
            lombok
            1.18.10
            provided

            org.springframework.boot
            spring-boot-starter-cache

            net.sf.ehcache
            ehcache

            org.springframework.kafka
            spring-kafka
            2.5.7.RELEASE

            org.springframework.kafka
            spring-kafka-test
            test

            org.springframework.boot
            spring-boot-starter-test
            test

                    org.junit.vintage
                    junit-vintage-engine

            junit
            junit
            test

                org.springframework.boot
                spring-boot-maven-plugin

2. 配置application.yml

spring:
  kafka:
    bootstrap-servers: http://127.0.0.1:9091,http://127.0.0.1:9092,http://127.0.0.1:9093
    producer:
      retries: 3
      acks: all
      batch-size: 16384
      buffer-memory: 33554432
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: eshop-cache-group
      auto-offset-reset: earliest
      enable-auto-commit: false
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      max-poll-records: 20000
    listener:
      concurrency: 3
      ack-mode: MANUAL

采用Kafka提供的StringSerializer和StringDeserializer进行序列化和反序列化

3. 编写bean,service,controller

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

/**
 * @author john
 * @date 2020/11/5 - 14:11
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class KafkaMessage implements Serializable {
    private String serviceId;
    private Long Id;
}

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.roncoo.eshop.cache.model.KafkaMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

// 生产者向kafka发送消息
@RestController
@Slf4j
public class ProducerController {
    @Autowired
    private KafkaTemplate kafkaTemplate;

    @GetMapping("/send_product")
    public String sendproductmsg() throws JsonProcessingException {
        KafkaMessage kafkaMessage = new KafkaMessage();
        kafkaMessage.setId(1L);
        kafkaMessage.setServiceId("productInfoService");

        ObjectMapper mapper = new ObjectMapper();
        String kafkaJsonMessage = mapper.writeValueAsString(kafkaMessage);
        kafkaTemplate.send("eshop-cache", kafkaJsonMessage); //使用kafka模板发送信息
        String res = "消息:【" + kafkaJsonMessage + "】发送成功 SUCCESS !";
        log.info(res);
        return res;
    }

    @GetMapping("/send_shop")
    public String sendshopmsg() throws JsonProcessingException {
        KafkaMessage kafkaMessage = new KafkaMessage();
        kafkaMessage.setId(1L);
        kafkaMessage.setServiceId("shopInfoService");

        ObjectMapper mapper = new ObjectMapper();
        String kafkaJsonMessage = mapper.writeValueAsString(kafkaMessage);
        kafkaTemplate.send("eshop-cache", kafkaJsonMessage); //使用kafka模板发送信息
        String res = "消息:【" + kafkaJsonMessage + "】发送成功 SUCCESS !";
        log.info(res);
        return res;
    }
}

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

/**
 * 商品信息
 *
 * @author Administrator
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ProductInfo implements Serializable {

    private Long id;
    private String name;
    private Double price;
}

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author john
 * @date 2020/11/5 - 13:30
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ShopInfo {
    private Long id;
    private String name;
    private Integer level;
    private Double goodCommentRate;
}

4. 编写消息消费者

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.roncoo.eshop.cache.model.KafkaMessage;
import com.roncoo.eshop.cache.model.ProductInfo;
import com.roncoo.eshop.cache.model.ShopInfo;
import com.roncoo.eshop.cache.service.CacheService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class ConsumerListener {

    @Autowired
    private CacheService cacheService;

    //建议看一下KafkaListener的源码 很多api 我们也可以指定分区消费消息
// topicPartitions ={@TopicPartition(topic = "topic1", partitions = { "0", "1" })}
    @KafkaListener(topics = "eshop-cache", groupId = "eshop-cache-group")
    public void listen(String message, Acknowledgment ack) throws JsonProcessingException {
        log.info("消息内容" + message);

        // 解析消息
        ObjectMapper mapper = new ObjectMapper();
        KafkaMessage kafkaMessage = mapper.readValue(message, KafkaMessage.class);
        String serviceId = kafkaMessage.getServiceId();

        // 如果是商品信息服务
        if ("productInfoService".equals(serviceId)) {
            // 处理商品消息
            //processProductInfoChangeMessage(kafkaMessage);
        } else if ("shopInfoService".equals(serviceId)) {
            // 处理店铺消息
           // processShopInfoChangeMessage(kafkaMessage);
        }

        //手动提交offset
        ack.acknowledge();
        log.info("消费结束");
    }

}

5. 测试

SpringBoot2整合kafka集群

SpringBoot2整合kafka集群

参考代码

6. 采用自定义序列化和反序列化器进行实体类的序列化和反序列化(不推荐)

和内置的StringSerializer字符串序列化一样,如果要自定义序列化方式,需要实现接口 Serializer。假设每个字段按照下图所示的方式自定义序列化:

1. 创建KafkaMessage序列化器

import com.roncoo.eshop.cache.model.KafkaMessage;
import org.apache.kafka.common.serialization.Serializer;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.util.Map;

/**
 * @author john
 * @date 2020/11/5 - 17:00
 */
public class KafkaMessageSerializable implements Serializer {
    @Override
    public void configure(Map map, boolean b) {

    }

    @Override
    public byte[] serialize(String topic, KafkaMessage kafkaMessage) {
        byte[] dataArray = null;
        ByteArrayOutputStream outputStream = null;
        ObjectOutputStream objectOutputStream = null;
        try {
            outputStream = new ByteArrayOutputStream();
            objectOutputStream = new ObjectOutputStream(outputStream);
            objectOutputStream.writeObject(kafkaMessage);
            dataArray = outputStream.toByteArray();
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            if (outputStream != null) {
                try {
                    outputStream.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if (objectOutputStream != null) {
                try {
                    objectOutputStream.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        return dataArray;
    }

    @Override
    public void close() {

    }
}

2.创建KafkaMessage反序列化器

import com.roncoo.eshop.cache.model.KafkaMessage;
import org.apache.kafka.common.serialization.Deserializer;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.Map;

/**
 * @author john
 * @date 2020/11/5 - 17:02
 */
public class KafkaMessageDeserializer implements Deserializer {
    @Override
    public void configure(Map map, boolean b) {

    }

    @Override
    public KafkaMessage deserialize(String topic, byte[] bytes) {
        KafkaMessage kafkaMessage = null;
        ByteArrayInputStream inputStream = null;
        ObjectInputStream objectInputStream = null;
        try {
            inputStream = new ByteArrayInputStream(bytes);
            objectInputStream = new ObjectInputStream(inputStream);
            kafkaMessage = (KafkaMessage) objectInputStream.readObject();
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            if (inputStream != null) {
                try {
                    inputStream.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if (objectInputStream != null) {
                try {
                    objectInputStream.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        return kafkaMessage;
    }

    @Override
    public void close() {

    }
}

3. 修改application.yml中生产者配置的value-serializer和修改消费者配置的value-deserializer配置

spring:
  ...

  kafka:
    ...

    producer:
      ...

      value-serializer: com.roncoo.eshop.cache.serializable.KafkaMessageSerializable
    consumer:
      ...

      value-deserializer: com.roncoo.eshop.cache.serializable.KafkaMessageDeserializer

4. 修改生产者控制器部分代码

import com.fasterxml.jackson.core.JsonProcessingException;
import com.roncoo.eshop.cache.model.KafkaMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@Slf4j
public class ProducerController {
    @Autowired
    private KafkaTemplate kafkaTemplate;

    @GetMapping("/send_product")
    public String sendproductmsg() throws JsonProcessingException {
        KafkaMessage kafkaMessage = new KafkaMessage();
        kafkaMessage.setId(1L);
        kafkaMessage.setServiceId("productInfoService");

        kafkaTemplate.send("eshop-cache", kafkaMessage); //使用kafka模板发送信息
        String res = "消息:【" + kafkaMessage.toString() + "】发送成功 SUCCESS !";
        log.info(res);
        return res;
    }

    @GetMapping("/send_shop")
    public String sendshopmsg() throws JsonProcessingException {
        KafkaMessage kafkaMessage = new KafkaMessage();
        kafkaMessage.setId(1L);
        kafkaMessage.setServiceId("shopInfoService");

        kafkaTemplate.send("eshop-cache", kafkaMessage); //使用kafka模板发送信息
        String res = "消息:【" + kafkaMessage.toString() + "】发送成功 SUCCESS !";
        log.info(res);
        return res;
    }
}

5. 修改消费者部分代码

@Component
@Slf4j
public class ConsumerListener {

    @Autowired
    private CacheService cacheService;

    //建议看一下KafkaListener的源码 很多api 我们也可以指定分区消费消息
// topicPartitions ={@TopicPartition(topic = "topic1", partitions = { "0", "1" })}
    @KafkaListener(topics = "eshop-cache", groupId = "eshop-cache-group")
    public void listen(KafkaMessage kafkaMessage, Acknowledgment ack) throws JsonProcessingException {
        log.info("消息内容" + kafkaMessage.toString());

        String serviceId = kafkaMessage.getServiceId();

        // 如果是商品信息服务
        if ("productInfoService".equals(serviceId)) {
            //processProductInfoChangeMessage(kafkaMessage);
        } else if ("shopInfoService".equals(serviceId)) {
            //processShopInfoChangeMessage(kafkaMessage);
        }

        //手动提交offset
        ack.acknowledge();
        log.info("消费结束");
    }
}

测试

SpringBoot2整合kafka集群

SpringBoot2整合kafka集群

参考代码

Original: https://www.cnblogs.com/ifme/p/13932842.html
Author: if年少有为
Title: SpringBoot2整合kafka集群

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/534039/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球