创建Springboot工程接收acticemq消息

1、JMSFactory配置

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:mongo="http://www.springframework.org/schema/data/mongo"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="http://www.springframework.org/schema/context
          http://www.springframework.org/schema/context/spring-context.xsd
          http://www.springframework.org/schema/data/mongo
          http://www.springframework.org/schema/data/mongo/spring-mongo.xsd
          http://www.springframework.org/schema/beans
          http://www.springframework.org/schema/beans/spring-beans.xsd
          http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd">

    <context:annotation-config />

    <bean id="pooledJmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
        destroy-method="stop" primary="true">
        <property name="connectionFactory" ref="jmsFactory" />
    bean>

    <bean id="jmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL"
            value="${jms.server.brokerURL:tcp://localhost:61616}" />
    bean>
beans>

2、启动类注入:

public class MqReceiverApplication {

    public static void main(String[] args) throws IOException {
        System.setProperty("java.util.logging.config.file", ".");

        SpringApplication.run(MqReceiverApplication.class, args);
    }

    @Configuration
    @ConditionalOnMissingBean(name = "jmsFactory")
    @ImportResource(locations = { "classpath:com/mq/reciver/jms.xml" })
    static class JmsConfig {
    }

    @Bean
    public RestTemplate restTemplate() {
        return new RestTemplate();
    }

}

3、接收类:

import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageListener;
import javax.jms.Session;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.jms.support.destination.DestinationResolver;

public abstract class AbstractJmsReceiver
        implements InitializingBean, ApplicationListener, DestinationResolver, MessageListener {

    private static final Logger logger = LoggerFactory.getLogger(AbstractJmsReceiver.class);

    @Autowired(required = true)
    @Qualifier("jmsFactory")
    private ConnectionFactory _connectionFactory;

    private DefaultMessageListenerContainer _containerActivity;

    @Override
    public void onApplicationEvent(ContextClosedEvent event) {
        if (_containerActivity != null && _containerActivity.isActive()) {
            _containerActivity.shutdown();
        }
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        if (this.isEnable()) {
            logger.debug("开始监听任务状态消息队列【{}】", this.getDestinationName());
            _containerActivity = new DefaultMessageListenerContainer();
            _containerActivity.setPubSubDomain(this.getPubSubDomain());// 默认点对点
            _containerActivity.setMessageListener(this);
            _containerActivity.setConnectionFactory(_connectionFactory);
            _containerActivity.setDestinationName(this.getDestinationName());
            _containerActivity.setConcurrency("1");
            _containerActivity.setDestinationResolver(this);
            _containerActivity.initialize();
            _containerActivity.start();
        }
    }

    @Override
    public Destination resolveDestinationName(Session session, String destinationName, boolean pubSubDomain)
            throws JMSException {
        return (pubSubDomain ? session.createTopic(destinationName) : session.createQueue(destinationName));
    }

    public abstract String getDestinationName();

    public abstract boolean getPubSubDomain();

    public abstract boolean isEnable();
}
package com.mq.reciver.activemq;

import javax.jms.JMSException;
import javax.jms.Message;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

@Component
public class TopicReceiver extends AbstractJmsReceiver {

    @Value("${topic.name:topic}")
    private String topicName;

    @Override
    public void onMessage(Message message) {
        try {
            String moId = message.getStringProperty("objId");
            String type = message.getStringProperty("type");
            String extraDataStr = message.getStringProperty("extraData");
            String operation = message.getStringProperty("operation");
            String operatorId = message.getStringProperty("operatorId");
            logger.debug("{} {} {} {} {}", operatorId, operation, type, moId, extraDataStr);
            if ("Delete".equals(operation)) {

            } else if ("Add".equals(operation)) {

            } else if ("Modify".equals(operation)) {

            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    @Override
    public String getDestinationName() {
        return topicName;
    }

    @Override
    public boolean getPubSubDomain() {
        return true;
    }

    @Override
    public boolean isEnable() {
        if (!StringUtils.isEmpty(topicName)) {
            return true;
        }
        return false;
    }

}
import java.util.ArrayList;
import java.util.List;

import javax.jms.JMSException;
import javax.jms.Message;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * 处理状态变化的JMS Message通知,
 *
 */
@Component
public class QueueReceiver extends AbstractJmsReceiver {

    @Value("${agent.enableReceiver:false}")
    private boolean enable = false;

    private List workProcessors = new ArrayList<>();

    public void addWorkProcessor(AbstractMqProcessor processor) {
        if (enable) {
            this.workProcessors.add(processor);
        }
    }

    @Override
    public void onMessage(Message message) {
        try {
            System.out.println(message.getPropertyNames());
            for (AbstractMqProcessor processor : workItemProcessors) {
                if (!processor.getDefName().equals("")) {
                    logger.debug("接收到新任务【{}】消息", processor.getDefName());
                    processor.process();
                }
            }
        } catch (JMSException e) {
            logger.warn("[{}]处理接收到的消息发生错误", this.getClass().getSimpleName(), e);
        } catch (Throwable e) {
            logger.warn("[{}]处理接收到的消息发生错误", this.getClass().getSimpleName(), e);
        }
    }

    @Override
    public String getDestinationName() {
        return "Queue.J.test";
    }

    @Override
    public boolean getPubSubDomain() {
        return false;
    }

    @Override
    public boolean isEnable() {
        return enable;
    }
}

Original: https://www.cnblogs.com/liangblog/p/15334371.html
Author: 凉城
Title: 创建Springboot工程接收acticemq消息

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/541730/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • Spring的AOP底层解析

    AOP原理的前置知识 动态代理在Spring中的应用: 1.AOP 2.注解@Lazy (2)Spring中针对动态代理的封装 1.ProxyFactory (1)介绍 基于两种动…

    Java 2023年6月16日
    079
  • java日期工具类–获取两个日期之间的工作日天数(只去除周六日,未去除节假日)

    获取两个日期之间的工作日天数 (只去除周六日,未去除节假日) 其他参考资料: 两个日期之间的工作日计算工具类 https://www.cnblogs.com/zzlp/p/5166…

    Java 2023年5月29日
    064
  • 手把手教你使用 Spring Boot 3 开发上线一个前后端分离的生产级系统(九)-Spring AMQP 集成与配置

    手把手教你使用 Spring Boot 3 开发上线一个前后端分离的生产级系统(一) – 介绍手把手教你使用 Spring Boot 3 开发上线一个前后端分离的生产级…

    Java 2023年6月8日
    064
  • JavaWeb序

    服务器端的编程基础 因JS的特效在本机不能显示,故直接进行javaweb的学习,抱歉… BS与CS的异同 BS:客户端服务器架构模式 CS:浏览器服务器架构模式 Tom…

    Java 2023年6月5日
    0103
  • docker 安装启动jenkins 以及问题剖析

    docker 安装启动jenkins 以及问题剖析 高考时有句”神话”,只要累不死,就往死里干。这句话依然适合现在的工作中的我们。开篇喜欢讲些小语句; 今天…

    Java 2023年6月16日
    090
  • 设计模式之代理模式

    代理模式属于结构型模式,代理模式给某一个对象提供一个代理对象,并由代理对象控制对原对象的引用,即通过代理对象访问目标对象。这样做的好处是:可以在目标对象实现的基础上,增强额外的功能…

    Java 2023年6月5日
    083
  • 22.1.8 堆排序、桶排序

    22.1.8 堆排序、桶排序 1. 堆排序:时间复杂度:O(nlogn), 空间复杂度:O(1) (1)完全二叉树: 第i个节点的左孩子:2*i+1; 第i个节点的右孩子:2*i+…

    Java 2023年6月13日
    076
  • @FeignClient常用属性

    @FeignClient(name = "gateway-test", value = "gateway-test", url = &quo…

    Java 2023年6月5日
    088
  • Effective Java 3 读后感

    Effective Java 3 读后感 最近学习了一下Effectvie Java,这是一本非常适合有一定经验的Java后端人员阅读的书。书中总结许多编码经验对开发很有帮助,比如…

    Java 2023年6月16日
    074
  • github打不开怎么办

    打开以下三个网址,分别取各个网址如下图所示位置的IP地址 github网址查询: github域名查询: 修改hosts文件,路径为c:\windows\system32\driv…

    Java 2023年6月5日
    077
  • JAVA自已设计JSON解析器

    当然,有很多很好的JSON解析的JAR包,比如JSONOBJECT,GSON,甚至也有为我们测试人员而打造的JSONPATH,但我还是自已实现了一下(之前也实现过,现在属于重构)。…

    Java 2023年5月29日
    053
  • springboot + flowable工作流

    背景:项目涉及到审批,用工作流会合适一点。由于之前未接触过,因此选用在activiti基础上开发的flowable进行 需求:在springboot中引入flowable并封装操作…

    Java 2023年6月8日
    075
  • 如何防止订单重复支付?

    大家好,我是老三,想必大家对在线支付都不陌生,今天和大家聊聊如何防止订单重复支付。 看看订单支付流程 我们来看看,电商订单支付的简要流程: 从下单/计算开始: 下单/结算:这一步虽…

    Java 2023年6月5日
    099
  • Java邮件发送中的setRecipient方法使用

    一、方法 setRecipient(Message.RecipientType type, Address address),是用于设置邮件的接收者。 1、有两个参数,第一个参数是…

    Java 2023年6月7日
    069
  • 实体类null属性滤除

    背景:用一个实体类传输数据的过程中,经常会有部分属性不需要传值,但是还是传到前端,但是显示的值为null,影响美观 需求:用实体传输时,有值的属性传,没有值的属性进行滤除 实现: …

    Java 2023年6月8日
    053
  • 网络数据流分析工具TcpEngine V1.0.0教程-1

    概述 目前主流的网络数据分析工具主要有两类,一类是http 协议分析工具,如fiddler ,这类工具擅长对字符串类型协议分析;另一类是原始网络数据包的监听分析,如Wireshar…

    Java 2023年6月5日
    070
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球