1、JMSFactory配置
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:mongo="http://www.springframework.org/schema/data/mongo" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/data/mongo http://www.springframework.org/schema/data/mongo/spring-mongo.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd"> <context:annotation-config /> <bean id="pooledJmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop" primary="true"> <property name="connectionFactory" ref="jmsFactory" /> bean> <bean id="jmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="${jms.server.brokerURL:tcp://localhost:61616}" /> bean> beans>
2、启动类注入:
public class MqReceiverApplication {
public static void main(String[] args) throws IOException {
System.setProperty("java.util.logging.config.file", ".");
SpringApplication.run(MqReceiverApplication.class, args);
}
@Configuration
@ConditionalOnMissingBean(name = "jmsFactory")
@ImportResource(locations = { "classpath:com/mq/reciver/jms.xml" })
static class JmsConfig {
}
@Bean
public RestTemplate restTemplate() {
return new RestTemplate();
}
}
3、接收类:
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageListener;
import javax.jms.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.jms.support.destination.DestinationResolver;
public abstract class AbstractJmsReceiver
implements InitializingBean, ApplicationListener, DestinationResolver, MessageListener {
private static final Logger logger = LoggerFactory.getLogger(AbstractJmsReceiver.class);
@Autowired(required = true)
@Qualifier("jmsFactory")
private ConnectionFactory _connectionFactory;
private DefaultMessageListenerContainer _containerActivity;
@Override
public void onApplicationEvent(ContextClosedEvent event) {
if (_containerActivity != null && _containerActivity.isActive()) {
_containerActivity.shutdown();
}
}
@Override
public void afterPropertiesSet() throws Exception {
if (this.isEnable()) {
logger.debug("开始监听任务状态消息队列【{}】", this.getDestinationName());
_containerActivity = new DefaultMessageListenerContainer();
_containerActivity.setPubSubDomain(this.getPubSubDomain());// 默认点对点
_containerActivity.setMessageListener(this);
_containerActivity.setConnectionFactory(_connectionFactory);
_containerActivity.setDestinationName(this.getDestinationName());
_containerActivity.setConcurrency("1");
_containerActivity.setDestinationResolver(this);
_containerActivity.initialize();
_containerActivity.start();
}
}
@Override
public Destination resolveDestinationName(Session session, String destinationName, boolean pubSubDomain)
throws JMSException {
return (pubSubDomain ? session.createTopic(destinationName) : session.createQueue(destinationName));
}
public abstract String getDestinationName();
public abstract boolean getPubSubDomain();
public abstract boolean isEnable();
}
package com.mq.reciver.activemq;
import javax.jms.JMSException;
import javax.jms.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
@Component
public class TopicReceiver extends AbstractJmsReceiver {
@Value("${topic.name:topic}")
private String topicName;
@Override
public void onMessage(Message message) {
try {
String moId = message.getStringProperty("objId");
String type = message.getStringProperty("type");
String extraDataStr = message.getStringProperty("extraData");
String operation = message.getStringProperty("operation");
String operatorId = message.getStringProperty("operatorId");
logger.debug("{} {} {} {} {}", operatorId, operation, type, moId, extraDataStr);
if ("Delete".equals(operation)) {
} else if ("Add".equals(operation)) {
} else if ("Modify".equals(operation)) {
}
} catch (JMSException e) {
e.printStackTrace();
}
}
@Override
public String getDestinationName() {
return topicName;
}
@Override
public boolean getPubSubDomain() {
return true;
}
@Override
public boolean isEnable() {
if (!StringUtils.isEmpty(topicName)) {
return true;
}
return false;
}
}
import java.util.ArrayList;
import java.util.List;
import javax.jms.JMSException;
import javax.jms.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* 处理状态变化的JMS Message通知,
*
*/
@Component
public class QueueReceiver extends AbstractJmsReceiver {
@Value("${agent.enableReceiver:false}")
private boolean enable = false;
private List workProcessors = new ArrayList<>();
public void addWorkProcessor(AbstractMqProcessor processor) {
if (enable) {
this.workProcessors.add(processor);
}
}
@Override
public void onMessage(Message message) {
try {
System.out.println(message.getPropertyNames());
for (AbstractMqProcessor processor : workItemProcessors) {
if (!processor.getDefName().equals("")) {
logger.debug("接收到新任务【{}】消息", processor.getDefName());
processor.process();
}
}
} catch (JMSException e) {
logger.warn("[{}]处理接收到的消息发生错误", this.getClass().getSimpleName(), e);
} catch (Throwable e) {
logger.warn("[{}]处理接收到的消息发生错误", this.getClass().getSimpleName(), e);
}
}
@Override
public String getDestinationName() {
return "Queue.J.test";
}
@Override
public boolean getPubSubDomain() {
return false;
}
@Override
public boolean isEnable() {
return enable;
}
}
Original: https://www.cnblogs.com/liangblog/p/15334371.html
Author: 凉城
Title: 创建Springboot工程接收acticemq消息
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/541730/
转载文章受原作者版权保护。转载请注明原作者出处!