1、下载 因为rabbit是用erlang开发的,所以要先安装erlang环境
1.1 下载erlang https://www.erlang.org/downloads ,配置环境变量 ERLANG_HOME—>D:\Program Files\Erlang OTP ; path-> %ERLANG_HOME%\bin 。控制台erl 验证安装成功
1.2 下载 RabbitMQ https://github.com/rabbitmq/rabbitmq-server/releases
1.3 安装管理插件 进入安装目录,sbin目录下,执行:rabbitmq-plugins enable rabbitmq_management ,如果执行失败,说明erlang 和RabbitMQ 的版本不对应,对应关系可参考 https://www.rabbitmq.com/which-erlang.html
2、控制台配置
2.1 浏览器输入http://localhost:15672,账号密码默认是:guest/guest
2.2 新建exchange和queue
2.3 绑定exchange和queue
3 构建springboot项目
3.1 引入maven
<dependency> <groupId>org.springframework.bootgroupId> <artifactId>spring-boot-starter-amqpartifactId> dependency>
3.2 生产者
3.2.1 application.yml
spring:
rabbitmq:
test1:
host: localhost
username: test
password: test
port: 5672
virtual-host: host
listener:
direct:
acknowledge-mode: manual
simple:
concurrency: 20
publisher-confirm-type: CORRELATED
test2:
host: localhost
username: guest
password: guest
port: 5672
virtual-host: /
listener:
direct:
acknowledge-mode: manual
simple:
concurrency: 20
publisher-confirm-type: CORRELATED
3.2.2 RabbitMqConfig
package com.example.producer;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.web.client.RestTemplate;
/**
* @author xiufengd
* @date 2022/6/28 10:25
* 未来可期
*/
@Configuration
public class RabbitMqConfig {
// 注意这里使用了primary注解
// 声明连接工厂连接开发服务器
@Primary
@Bean(name = "test1ConnectionFactory")
public ConnectionFactory devConnectionFactory(@Value("${spring.rabbitmq.test1.host}") String host,
@Value("${spring.rabbitmq.test1.port}") int port,
@Value("${spring.rabbitmq.test1.username}") String username,
@Value("${spring.rabbitmq.test1.password}") String password,
@Value("${spring.rabbitmq.test1.virtual-host}") String virtualHost) {
// 使用@Value直接读取配置文件中的信息
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
// 声明连接工厂连接测试服务器
@Bean(name = "test2ConnectionFactory")
public ConnectionFactory testConnectionFactory(@Value("${spring.rabbitmq.test2.host}") String host,
@Value("${spring.rabbitmq.test2.port}") int port,
@Value("${spring.rabbitmq.test2.username}") String username,
@Value("${spring.rabbitmq.test2.password}") String password,
@Value("${spring.rabbitmq.test2.virtual-host}") String virtualHost) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
// 声明开发服务器rabbitTemplate
@Bean(name = "test1RabbitTemplate")
public RabbitTemplate test1RabbitTemplate(@Qualifier("test1ConnectionFactory") ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
// 声明测试服务器连接 rabbitTemplate
@Bean(name = "test2RabbitTemplate")
public RabbitTemplate test2RabbitTemplate(@Qualifier("test2ConnectionFactory") ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
/**
* 声明dev containerFactory
* @param rabbitListenerContainerFactoryConfigurer
* @param connectionFactory
* @return
*/
@Bean(name = "test1ContainerFactory")
public SimpleRabbitListenerContainerFactory devSimpleRabbitListenerContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer rabbitListenerContainerFactoryConfigurer,
@Qualifier("test1ConnectionFactory") ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory containerFactory=new SimpleRabbitListenerContainerFactory();
rabbitListenerContainerFactoryConfigurer.configure(containerFactory,connectionFactory);
return containerFactory;
}
/**
* 声明 test containerFactory
* @param rabbitListenerContainerFactoryConfigurer
* @param connectionFactory
* @return
*/
@Bean(name = "test2ContainerFactory")
public SimpleRabbitListenerContainerFactory testSimpleRabbitListenerContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer rabbitListenerContainerFactoryConfigurer,
@Qualifier("test2ConnectionFactory") ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory containerFactory=new SimpleRabbitListenerContainerFactory();
rabbitListenerContainerFactoryConfigurer.configure(containerFactory,connectionFactory);
return containerFactory;
}
@Bean
public RestTemplate restTemplate() {
return new RestTemplate();
}
}
3.2.3 ProducerApplicationTests
package com.example.producer;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
import java.io.UnsupportedEncodingException;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@SpringBootTest
class ProducerApplicationTests{
// 指定dev服务器rabbitTemplate
@Resource(name = "test1RabbitTemplate")
AmqpTemplate test1AmqpTemplate;
// 指定test服务器rabbitTemplate
@Resource (name = "test2ConnectionFactory")
ConnectionFactory test2ConnectionFactory;
@Resource(name = "test2RabbitTemplate")
AmqpTemplate test2AmqpTemplate;
@Test
void contextLoads() throws InterruptedException {
Message message= MessageBuilder.withBody("消息中心11111111111111111111111".getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
.setContentEncoding("UTF-8")
.setMessageId(UUID.randomUUID().toString()).build();
test1AmqpTemplate.convertAndSend("fanout","",message);
RabbitTemplate rabbitTemplate=new RabbitTemplate(test2ConnectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback((correlationData, ack, cause)->{
if(ack){
System.out.println("发送消息到交换器成功,MessageId:" + correlationData.getId());
}else{
System.out.println("publishConfirm消息发送到交换器被退回,Id:" + correlationData.getId() + ";退回原因是:" + cause);
}
});
rabbitTemplate.setReturnsCallback(returnedMessage->{
System.out.println("ReturnsCallback------------------"+returnedMessage.getReplyText());
try {
System.out.println("ReturnsCallback------------------"+new String(returnedMessage.getMessage().getBody(),"utf-8"));
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
System.out.println("ReturnsCallback------------------"+returnedMessage.getExchange());
});
rabbitTemplate.convertAndSend("exchange1","queue2_Routing_key",message,new CorrelationData(UUID.randomUUID().toString()));
TimeUnit.SECONDS.sleep(10L);
}
}
3.3 消费者
3.3.1 application.yml
spring:
rabbitmq:
test1:
host: localhost
username: test
password: test
port: 5672
virtual-host: host
listener:
direct:
acknowledge-mode: manual
simple:
concurrency: 20
publisher-confirm-type: correlated
test2:
host: localhost
username: guest
password: guest
port: 5672
virtual-host: /
listener:
direct:
acknowledge-mode: manual
simple:
concurrency: 20
publisher-confirm-type: correlated
server:
port: 8081
3.3.2 RabbitMqConfig
package com.example.consumer;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.web.client.RestTemplate;
/**
* @author xiufengd
* @date 2022/6/28 10:25
* 未来可期
*/
@Configuration
public class RabbitMqConfig {
// 注意这里使用了primary注解
// 声明连接工厂连接开发服务器
@Primary
@Bean(name = "test1ConnectionFactory")
public ConnectionFactory devConnectionFactory(@Value("${spring.rabbitmq.test1.host}") String host,
@Value("${spring.rabbitmq.test1.port}") int port,
@Value("${spring.rabbitmq.test1.username}") String username,
@Value("${spring.rabbitmq.test1.password}") String password,
@Value("${spring.rabbitmq.test1.virtual-host}") String virtualHost) {
// 使用@Value直接读取配置文件中的信息
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
// 声明连接工厂连接测试服务器
@Bean(name = "test2ConnectionFactory")
public ConnectionFactory testConnectionFactory(@Value("${spring.rabbitmq.test2.host}") String host,
@Value("${spring.rabbitmq.test2.port}") int port,
@Value("${spring.rabbitmq.test2.username}") String username,
@Value("${spring.rabbitmq.test2.password}") String password,
@Value("${spring.rabbitmq.test2.virtual-host}") String virtualHost) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
// 声明开发服务器rabbitTemplate
@Bean(name = "test1RabbitTemplate")
public RabbitTemplate test1RabbitTemplate(@Qualifier("test1ConnectionFactory") ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
// 声明测试服务器连接 rabbitTemplate
@Bean(name = "test2RabbitTemplate")
public RabbitTemplate test2RabbitTemplate(@Qualifier("test2ConnectionFactory") ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
/**
* 声明dev containerFactory
* @param rabbitListenerContainerFactoryConfigurer
* @param connectionFactory
* @return
*/
@Bean(name = "test1ContainerFactory")
public SimpleRabbitListenerContainerFactory devSimpleRabbitListenerContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer rabbitListenerContainerFactoryConfigurer,
@Qualifier("test1ConnectionFactory") ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory containerFactory=new SimpleRabbitListenerContainerFactory();
rabbitListenerContainerFactoryConfigurer.configure(containerFactory,connectionFactory);
return containerFactory;
}
/**
* 声明 test containerFactory
* @param rabbitListenerContainerFactoryConfigurer
* @param connectionFactory
* @return
*/
@Bean(name = "test2ContainerFactory")
public SimpleRabbitListenerContainerFactory testSimpleRabbitListenerContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer rabbitListenerContainerFactoryConfigurer,
@Qualifier("test2ConnectionFactory") ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory containerFactory=new SimpleRabbitListenerContainerFactory();
rabbitListenerContainerFactoryConfigurer.configure(containerFactory,connectionFactory);
return containerFactory;
}
@Bean
public RestTemplate restTemplate() {
return new RestTemplate();
}
}
3.3.3 Consumer1
package com.example.consumer;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
/**
* @author xiufengd
* @date 2022/6/27 16:07
* 未来可期
*/
@Component
public class Consumer1 {
@RabbitHandler
@RabbitListener(queues = "fanoutqueue1",containerFactory = "test1ContainerFactory")// 指定队列名称和容器工厂,接收开发服务器队列q.test1中的消息
public void receive(Message message, Channel channel) throws IOException {
byte bytes[]=null;
if (message != null) {
bytes=message.getBody();
}
try {
String msg=new String(bytes,"UTF-8");
System.out.println("fanoutqueue1------------------------"+msg);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
@RabbitHandler
@RabbitListener(queues = "fanoutqueue2",containerFactory = "test1ContainerFactory")// 指定队列名称和容器工厂,接收开发服务器队列q.test2中的消息
public void receiveQueue1(Object object) {
Message message=(Message)object;
byte bytes[]=null;
if (message != null) {
bytes=message.getBody();
}
try {
String msg=new String(bytes,"UTF-8");
System.out.println("fanoutqueue2------------------------"+msg);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
@RabbitHandler
@RabbitListener(queues = "fanoutqueue3",containerFactory = "test1ContainerFactory")// 指定队列名称和容器工厂,接收开发服务器队列q.test3中的消息
public void receiveQueue2(Object object) {
Message message=(Message)object;
byte bytes[]=null;
if (message != null) {
bytes=message.getBody();
}
try {
String msg=new String(bytes,"UTF-8");
System.out.println("fanoutqueue3------------------------"+msg);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
@RabbitHandler
@RabbitListener(queues = "queue2",containerFactory = "test2ContainerFactory")
// 指定队列名称和容器工厂,接收测试服务器队列q.test1中的消息
public void receive23message(Message message, Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
byte bytes[]=null;
if (message != null) {
bytes=message.getBody();
}
try {
String msg=new String(bytes,"UTF-8");
System.out.println("queue2------------------------"+msg);
channel.basicAck(deliveryTag,true);
} catch (UnsupportedEncodingException e) {
System.out.println(e.getMessage());
channel.basicNack(deliveryTag,false,true);
e.printStackTrace();
}
}
}
4 入门结束,按以上可正常运行,如果有异常,可能是exchange和queue没配置对,具体详细说明,下期细说
Original: https://www.cnblogs.com/xiufengd/p/16425890.html
Author: 程序员丁先生
Title: rabbitmq初体验 rabbitmq入门
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/562491/
转载文章受原作者版权保护。转载请注明原作者出处!