参考
SpringBoot整合kafka集群
SpringBoot整合kafka(实现producer和consumer)
1. 在pom.xml中引入依赖
org.springframework.kafka
spring-kafka
2.5.7.RELEASE
完整的 pom.xml
文件如下
4.0.0
org.springframework.boot
spring-boot-starter-parent
2.3.5.RELEASE
com.roncoo.eshop
cache
0.0.1-SNAPSHOT
cache
Demo project for Spring Boot
5.2.10.RELEASE
UTF-8
1.8
org.springframework.boot
spring-boot-starter-data-redis
org.springframework.boot
spring-boot-starter-web
org.springframework.boot
spring-boot-starter-thymeleaf
mysql
mysql-connector-java
5.1.49
runtime
org.mybatis.spring.boot
mybatis-spring-boot-starter
2.1.2
org.springframework.boot
spring-boot-starter-actuator
redis.clients
jedis
3.3.0
org.projectlombok
lombok
1.18.10
provided
org.springframework.boot
spring-boot-starter-cache
net.sf.ehcache
ehcache
org.springframework.kafka
spring-kafka
2.5.7.RELEASE
org.springframework.kafka
spring-kafka-test
test
org.springframework.boot
spring-boot-starter-test
test
org.junit.vintage
junit-vintage-engine
junit
junit
test
org.springframework.boot
spring-boot-maven-plugin
2. 配置application.yml
spring:
kafka:
bootstrap-servers: http://127.0.0.1:9091,http://127.0.0.1:9092,http://127.0.0.1:9093
producer:
retries: 3
acks: all
batch-size: 16384
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: eshop-cache-group
auto-offset-reset: earliest
enable-auto-commit: false
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 20000
listener:
concurrency: 3
ack-mode: MANUAL
采用Kafka提供的StringSerializer和StringDeserializer进行序列化和反序列化
3. 编写bean,service,controller
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* @author john
* @date 2020/11/5 - 14:11
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class KafkaMessage implements Serializable {
private String serviceId;
private Long Id;
}
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.roncoo.eshop.cache.model.KafkaMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
// 生产者向kafka发送消息
@RestController
@Slf4j
public class ProducerController {
@Autowired
private KafkaTemplate kafkaTemplate;
@GetMapping("/send_product")
public String sendproductmsg() throws JsonProcessingException {
KafkaMessage kafkaMessage = new KafkaMessage();
kafkaMessage.setId(1L);
kafkaMessage.setServiceId("productInfoService");
ObjectMapper mapper = new ObjectMapper();
String kafkaJsonMessage = mapper.writeValueAsString(kafkaMessage);
kafkaTemplate.send("eshop-cache", kafkaJsonMessage); //使用kafka模板发送信息
String res = "消息:【" + kafkaJsonMessage + "】发送成功 SUCCESS !";
log.info(res);
return res;
}
@GetMapping("/send_shop")
public String sendshopmsg() throws JsonProcessingException {
KafkaMessage kafkaMessage = new KafkaMessage();
kafkaMessage.setId(1L);
kafkaMessage.setServiceId("shopInfoService");
ObjectMapper mapper = new ObjectMapper();
String kafkaJsonMessage = mapper.writeValueAsString(kafkaMessage);
kafkaTemplate.send("eshop-cache", kafkaJsonMessage); //使用kafka模板发送信息
String res = "消息:【" + kafkaJsonMessage + "】发送成功 SUCCESS !";
log.info(res);
return res;
}
}
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* 商品信息
*
* @author Administrator
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ProductInfo implements Serializable {
private Long id;
private String name;
private Double price;
}
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author john
* @date 2020/11/5 - 13:30
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ShopInfo {
private Long id;
private String name;
private Integer level;
private Double goodCommentRate;
}
4. 编写消息消费者
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.roncoo.eshop.cache.model.KafkaMessage;
import com.roncoo.eshop.cache.model.ProductInfo;
import com.roncoo.eshop.cache.model.ShopInfo;
import com.roncoo.eshop.cache.service.CacheService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class ConsumerListener {
@Autowired
private CacheService cacheService;
//建议看一下KafkaListener的源码 很多api 我们也可以指定分区消费消息
// topicPartitions ={@TopicPartition(topic = "topic1", partitions = { "0", "1" })}
@KafkaListener(topics = "eshop-cache", groupId = "eshop-cache-group")
public void listen(String message, Acknowledgment ack) throws JsonProcessingException {
log.info("消息内容" + message);
// 解析消息
ObjectMapper mapper = new ObjectMapper();
KafkaMessage kafkaMessage = mapper.readValue(message, KafkaMessage.class);
String serviceId = kafkaMessage.getServiceId();
// 如果是商品信息服务
if ("productInfoService".equals(serviceId)) {
// 处理商品消息
//processProductInfoChangeMessage(kafkaMessage);
} else if ("shopInfoService".equals(serviceId)) {
// 处理店铺消息
// processShopInfoChangeMessage(kafkaMessage);
}
//手动提交offset
ack.acknowledge();
log.info("消费结束");
}
}
5. 测试
6. 采用自定义序列化和反序列化器进行实体类的序列化和反序列化(不推荐)
和内置的StringSerializer字符串序列化一样,如果要自定义序列化方式,需要实现接口 Serializer。假设每个字段按照下图所示的方式自定义序列化:
1. 创建KafkaMessage序列化器
import com.roncoo.eshop.cache.model.KafkaMessage;
import org.apache.kafka.common.serialization.Serializer;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.util.Map;
/**
* @author john
* @date 2020/11/5 - 17:00
*/
public class KafkaMessageSerializable implements Serializer {
@Override
public void configure(Map map, boolean b) {
}
@Override
public byte[] serialize(String topic, KafkaMessage kafkaMessage) {
byte[] dataArray = null;
ByteArrayOutputStream outputStream = null;
ObjectOutputStream objectOutputStream = null;
try {
outputStream = new ByteArrayOutputStream();
objectOutputStream = new ObjectOutputStream(outputStream);
objectOutputStream.writeObject(kafkaMessage);
dataArray = outputStream.toByteArray();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
if (outputStream != null) {
try {
outputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (objectOutputStream != null) {
try {
objectOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return dataArray;
}
@Override
public void close() {
}
}
2.创建KafkaMessage反序列化器
import com.roncoo.eshop.cache.model.KafkaMessage;
import org.apache.kafka.common.serialization.Deserializer;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.Map;
/**
* @author john
* @date 2020/11/5 - 17:02
*/
public class KafkaMessageDeserializer implements Deserializer {
@Override
public void configure(Map map, boolean b) {
}
@Override
public KafkaMessage deserialize(String topic, byte[] bytes) {
KafkaMessage kafkaMessage = null;
ByteArrayInputStream inputStream = null;
ObjectInputStream objectInputStream = null;
try {
inputStream = new ByteArrayInputStream(bytes);
objectInputStream = new ObjectInputStream(inputStream);
kafkaMessage = (KafkaMessage) objectInputStream.readObject();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
if (inputStream != null) {
try {
inputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (objectInputStream != null) {
try {
objectInputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return kafkaMessage;
}
@Override
public void close() {
}
}
3. 修改application.yml中生产者配置的value-serializer和修改消费者配置的value-deserializer配置
spring:
...
kafka:
...
producer:
...
value-serializer: com.roncoo.eshop.cache.serializable.KafkaMessageSerializable
consumer:
...
value-deserializer: com.roncoo.eshop.cache.serializable.KafkaMessageDeserializer
4. 修改生产者控制器部分代码
import com.fasterxml.jackson.core.JsonProcessingException;
import com.roncoo.eshop.cache.model.KafkaMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@Slf4j
public class ProducerController {
@Autowired
private KafkaTemplate kafkaTemplate;
@GetMapping("/send_product")
public String sendproductmsg() throws JsonProcessingException {
KafkaMessage kafkaMessage = new KafkaMessage();
kafkaMessage.setId(1L);
kafkaMessage.setServiceId("productInfoService");
kafkaTemplate.send("eshop-cache", kafkaMessage); //使用kafka模板发送信息
String res = "消息:【" + kafkaMessage.toString() + "】发送成功 SUCCESS !";
log.info(res);
return res;
}
@GetMapping("/send_shop")
public String sendshopmsg() throws JsonProcessingException {
KafkaMessage kafkaMessage = new KafkaMessage();
kafkaMessage.setId(1L);
kafkaMessage.setServiceId("shopInfoService");
kafkaTemplate.send("eshop-cache", kafkaMessage); //使用kafka模板发送信息
String res = "消息:【" + kafkaMessage.toString() + "】发送成功 SUCCESS !";
log.info(res);
return res;
}
}
5. 修改消费者部分代码
@Component
@Slf4j
public class ConsumerListener {
@Autowired
private CacheService cacheService;
//建议看一下KafkaListener的源码 很多api 我们也可以指定分区消费消息
// topicPartitions ={@TopicPartition(topic = "topic1", partitions = { "0", "1" })}
@KafkaListener(topics = "eshop-cache", groupId = "eshop-cache-group")
public void listen(KafkaMessage kafkaMessage, Acknowledgment ack) throws JsonProcessingException {
log.info("消息内容" + kafkaMessage.toString());
String serviceId = kafkaMessage.getServiceId();
// 如果是商品信息服务
if ("productInfoService".equals(serviceId)) {
//processProductInfoChangeMessage(kafkaMessage);
} else if ("shopInfoService".equals(serviceId)) {
//processShopInfoChangeMessage(kafkaMessage);
}
//手动提交offset
ack.acknowledge();
log.info("消费结束");
}
}
测试
Original: https://www.cnblogs.com/ifme/p/13932842.html
Author: if年少有为
Title: SpringBoot2整合kafka集群
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/534039/
转载文章受原作者版权保护。转载请注明原作者出处!