140_SpringBoot案例-fanout发布订阅模式

整体核心

140_SpringBoot案例-fanout发布订阅模式

目标

:::info
使用springboot完成rabbitmq的消费模式-Fanout
:::

140_SpringBoot案例-fanout发布订阅模式

实现步骤

:::info
1:创建生产者工程:springboot-rabbitmq-fanout-producer
2:创建消费者工程:springboot-rabbitmq-fanout-consumer
3:引入spring-boot-rabbitmq的依赖
4:进行消息的分发和测试
5:查看和观察web控制台的状况
:::

生产者

创建生产者工程:springboot-rabbitmq-fanout-producer

140_SpringBoot案例-fanout发布订阅模式

在pom.xml中引入依赖


    org.springframework.boot
    spring-boot-starter-amqp

    org.springframework.boot
    spring-boot-starter-web

在application.yml进行配置

服务端口
server:
  port: 8080
配置rabbitmq服务
spring:
  rabbitmq:
    username: admin
    password: admin
    virtual-host: /
    host: 47.104.141.27
    port: 5672

定义订单的生产者

package com.xuexiangban.rabbitmq.springbootrabbitmqfanoutproducer.service;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;
/**
 * @author: 学相伴-飞哥
 * @description: OrderService
 * @Date : 2021/3/4
 */
@Component
public class OrderService {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    // 1: 定义交换机
    private String exchangeName = "fanout_order_exchange";
    // 2: 路由key
    private String routeKey = "";
    public void makeOrder(Long userId, Long productId, int num) {
        // 1: 模拟用户下单
        String orderNumer = UUID.randomUUID().toString();
        // 2: 根据商品id productId 去查询商品的库存
        // int numstore = productSerivce.getProductNum(productId);
        // 3:判断库存是否充足
        // if(num >  numstore ){ return  "商品库存不足..."; }
        // 4: 下单逻辑
        // orderService.saveOrder(order);
        // 5: 下单成功要扣减库存
        // 6: 下单完成以后
        System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);
        // 发送订单信息给RabbitMQ fanout
        rabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer);
    }
}

绑定关系

:::info
绑定关系方式有两种:一种是配置类,一种是注解@RabbitListener(bindings =@QueueBinding
推荐使用配置类
生产者和消费者中只需要一边添加配置类就可以了,建议消费者中添加,因为通常消费者先启动
:::

package com.xuexiangban.rabbitmq.springbootrabbitmqfanoutproducer.service;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * @Author : JCccc
 * @CreateTime : 2019/9/3
 * @Description :
 **/
@Configuration
public class FanoutRabbitConfig {

    // 声明注册fanout模式的交换机
    @Bean
    public FanoutExchange fanoutOrderExchange() {
        return new FanoutExchange("fanout_order_exchange", true, false);
    }
    // 声明队列
    @Bean
    public Queue emailQueue() {
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        //一般设置一下队列的持久化就好,其余两个就是默认false
        return new Queue("email.fanout.queue", true);
    }
    @Bean
    public Queue smsQueue() {
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        //一般设置一下队列的持久化就好,其余两个就是默认false
        return new Queue("sms.fanout.queue", true);
    }
    @Bean
    public Queue weixinQueue() {
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        //一般设置一下队列的持久化就好,其余两个就是默认false
        return new Queue("weixin.fanout.queue", true);
    }
    //绑定  将队列和交换机绑定
    @Bean
    public Binding emailBinding() {
        return BindingBuilder.bind(emailQueue()).to(fanoutOrderExchange());
    }
    @Bean
    public Binding smsBinding() {
        return BindingBuilder.bind(smsQueue()).to(fanoutOrderExchange());
    }
    @Bean
    public Binding weixinBinding() {
        return BindingBuilder.bind(weixinQueue()).to(fanoutOrderExchange());
    }
}

进行测试

package com.xuexiangban.rabbitmq.springbootrabbitmqfanoutproducer;
import com.xuexiangban.rabbitmq.springbootrabbitmqfanoutproducer.service.OrderService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class SpringbootRabbitmqFanoutProducerApplicationTests {
    @Autowired
    OrderService orderService;
    @Test
    public void contextLoads() throws Exception {
        for (int i = 0; i < 10; i++) {
            Thread.sleep(1000);
            Long userId = 100L + i;
            Long productId = 10001L + i;
            int num = 10;
            orderService.makeOrder(userId, productId, num);
        }
    }
}

消费者

创建消费者工程:springboot-rabbitmq-fanout-consumer

140_SpringBoot案例-fanout发布订阅模式

引入依赖pom.xml


    org.springframework.boot
    spring-boot-starter-amqp

    org.springframework.boot
    spring-boot-starter-web

在application.yml进行配置

服务端口
server:
  port: 8081
配置rabbitmq服务
spring:
  rabbitmq:
    username: admin
    password: admin
    virtual-host: /
    host: 47.104.141.27
    port: 5672

消费者 – 邮件服务

package com.xuexiangban.rabbitmq.springbootrabbitmqfanoutconsumer.consumer;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
// bindings其实就是用来确定队列和交换机绑定关系
@RabbitListener(bindings =@QueueBinding(
        // email.fanout.queue 是队列名字,这个名字你可以自定随便定义。
        value = @Queue(value = "email.fanout.queue",autoDelete = "false"),
        // order.fanout 交换机的名字 必须和生产者保持一致
        exchange = @Exchange(value = "fanout_order_exchange",
                // 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式
                type = ExchangeTypes.FANOUT)
))
@Component
public class EmailService {
    // @RabbitHandler 代表此方法是一个消息接收的方法。该不要有返回值
    @RabbitHandler
    public void messagerevice(String message){
        // 此处省略发邮件的逻辑
        System.out.println("email-------------->" + message);
    }
}

消费者 – 短信服务

package com.xuexiangban.rabbitmq.springbootrabbitmqfanoutconsumer.consumer;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
// bindings其实就是用来确定队列和交换机绑定关系
@RabbitListener(bindings =@QueueBinding(
        // email.fanout.queue 是队列名字,这个名字你可以自定随便定义。
        value = @Queue(value = "sms.fanout.queue",autoDelete = "false"),
        // order.fanout 交换机的名字 必须和生产者保持一致
        exchange = @Exchange(value = "fanout_order_exchange",
                // 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式
                type = ExchangeTypes.FANOUT)
))
@Component
public class SMSService {
    // @RabbitHandler 代表此方法是一个消息接收的方法。该不要有返回值
    @RabbitHandler
    public void messagerevice(String message){
        // 此处省略发邮件的逻辑
        System.out.println("sms-------------->" + message);
    }
}

消费者 – 微信服务

package com.xuexiangban.rabbitmq.springbootrabbitmqfanoutconsumer.consumer;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
// bindings其实就是用来确定队列和交换机绑定关系
@RabbitListener(bindings =@QueueBinding(
        // email.fanout.queue 是队列名字,这个名字你可以自定随便定义。
        value = @Queue(value = "weixin.fanout.queue",autoDelete = "false"),
        // order.fanout 交换机的名字 必须和生产者保持一致
        exchange = @Exchange(value = "fanout_order_exchange",
                // 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式
                type = ExchangeTypes.FANOUT)
))
@Component
public class WeixinService {
    // @RabbitHandler 代表此方法是一个消息接收的方法。该不要有返回值
    @RabbitHandler
    public void messagerevice(String message){
        // 此处省略发邮件的逻辑
        System.out.println("weixin-------------->" + message);
    }
}

启动服务SpringbootRabbitmqFanoutConsumerApplication,查看效果

140_SpringBoot案例-fanout发布订阅模式

Original: https://www.cnblogs.com/wl3pb/p/16577897.html
Author: 清风(学习-踏实)
Title: 140_SpringBoot案例-fanout发布订阅模式

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/581235/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • 面试官:线程池如何按照core、max、queue的执行循序去执行?(内附详细解析)

    前言 这是一个真实的面试题。 前几天一个朋友在群里分享了他刚刚面试候选者时问的问题: “线程池如何按照core、max、queue的执行循序去执行?”。 我…

    Java 2023年5月30日
    089
  • Linux服务器启动jstatd服务

    Linux,jstatd,Linux服务器启动jstatd服务 Linux服务器启动jstatd服务 1.查找jdk所在目录 2.在jdk的bin目录下创建文件jstatd.all…

    Java 2023年6月8日
    075
  • Markdown基础语法(上)

    前言 按照官方文档,和根据自己所用和所理解所写 一.标题语法 一级标题最大,六级标题最小 一级标题 ## 二级标题 ### 三级标题 #### 四级标题 ##### 五级标题 ##…

    Java 2023年6月13日
    072
  • JVM快速扫盲篇

    JVM虚拟机基础 JVM虚拟机结构 jvm的整体结构大致如下: 类加载器:类加载器用来加载Java类到JVM虚拟机中,源代码程序.java文件在经过编译器编译之后就被转换成字节代码…

    Java 2023年6月5日
    085
  • HM2022ssm-mp4.2【DML增删改编程控制(2)】

    逻辑删除 3.1 需求 这是一个员工和其所签的合同表,关系是一个员工可以签多个合同,是一个一(员工)对多(合同)的表 员工ID为1的张业绩,总共签了三个合同,如果此时他离职了,我们…

    Java 2023年6月5日
    088
  • 干掉 PowerDesigner,这款数据库设计神器真的绝了!!!

    最近在造轮子,从 0 到 1 的那种,就差前台的界面了,大家可以耐心耐心耐心期待一下。其中需要设计一些数据库表,可以通过 Navicat 这种图形化管理工具直接开搞,也可以通过一些…

    Java 2023年6月9日
    078
  • Android 11 使用 BroadcastReceiver 监听短消息

    使用装有MIUI系统的小米手机,静态注册的广播接收器监听短消息。 在 AndroidManifest.xml中声明权限 <uses-permission android:na…

    Java 2023年6月7日
    071
  • java并发实战:连接池实现

    池化技术简介 在我们使用数据库的过程中,我们往往使用数据库连接池而不是直接使用数据库连接进行操作,这是因为每一个数据库连接的创建和销毁的代价是昂贵的,而池化技术则预先创建了资源,这…

    Java 2023年5月29日
    078
  • python的三层架构

    项目目录规范 Foo/ |– core/ # 存放业务逻辑相关代码 | |– core.py | |– api/ # 存放接口文件,接口主要用于为业务逻辑提供数据操作。 | …

    Java 2023年6月7日
    0101
  • Day13 note

    super注意点: 1、super调用父类的构造方法,必须在构造方法的第一行 2、super必须只能出现在子类的方法或者构造方法中 3、super和this不能同时调用构造方法对比…

    Java 2023年6月5日
    079
  • 通俗讲解分布式锁:场景和使用方法

    对于锁大家肯定不会陌生,比如 synchronized 关键字 和 ReentrantLock 可重入锁,一般我们用其在多线程环境中控制对资源的并发访问。但是随着业务的发展,分布式…

    Java 2023年5月29日
    055
  • 七、Java数组

    Java数组 什么是数组 数组是相同类型数据的有序集合。 数组描述的是相同类型的若干个数据,按照一定的先后次序排列组合而成。 其中,每一个数据称作为一个数组元素,每个数组元素可以通…

    Java 2023年6月7日
    067
  • springcloud各种网址

    1.springcloud中文文档: https://springcloud.cc/spring-cloud-dalston.html 2.疑难杂症解决网址 springcloud…

    Java 2023年5月30日
    079
  • Kafka和RabbitMQ有哪些区别,各自适合什么场景?

    经常有人问我 有个 xx 需求,我应该用 Kafka 还是 RabbitMQ ? 这个问题很常见,而且很多人对二者的选择也把握不好。 所以我决定写篇文章来详细说一下:Kafka 和…

    Java 2023年6月7日
    068
  • 【校招VIP】[Java][一本][6分]按照真实的技能点进行业务描述

    关注【校招VIP】公众号 ,回复【简历 】,添加校招顾问微信,即可获取简历指导! 本份简历是一位21届一本java同学的简历,简历评分6分。 一、学员简历 二、指导意见 简历版式问…

    Java 2023年6月5日
    082
  • 线程池底层原理

    线程池底层原理 ThreadPoolExecutor 参数 corePoolSize(必填) : 核心线程数 maximumPoolSize(必填):最大线程数 keepAlive…

    Java 2023年6月5日
    080
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球