【nacos】5.3 nacos 更新mqtt配置,自动加载连接EMQX

上一节问题:

  1. 问题

1.1 如果修改一些需要预加载的配置呢,如下场景是否不用启动服务器能立即生效?

  • 修改连接Mqtt服务器,并订阅
  • 修改连接TCP服务器
  • 修改TCP客户端端口
  • 修改mysql等数据库地址,端口,账号,密码

单使用@RefreshScope + @Value以上4种场景均不能生效(需要重启spring boot项目,mqtt服务器才能重新连接)

1.2 如何才能生效?

当我们修改nacos值时,发现控制台打印信息如下

【nacos】5.3 nacos 更新mqtt配置,自动加载连接EMQX

查看 RefreshEventListener.class 代码,发现当naocs变化时handle方法会调用,并打印变化的key

public void handle(RefreshEvent event) {
        if (this.ready.get()) {
            log.debug("Event received " + event.getEventDesc());
            Set keys = this.refresh.refresh();
            log.info("Refresh keys changed: " + keys);
        }

    }

所以,只需要参考 RefreshEventListener.class新建一个监听类 MqttConfigListener.java即可

  1. 实战

案例:修改nacos配置,实施重连emqx服务器(mqtt)

说明:关键看

  • MqttConfigListener.handle(RefreshEvent event)
  • emqClient.init();//重新连接EMQX

  • MqttConfigListener.java

package com.hzd.listener;

import com.hzd.mqtt.client.EmqClient;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.cloud.context.refresh.ContextRefresher;
import org.springframework.cloud.endpoint.event.RefreshEvent;
import org.springframework.cloud.endpoint.event.RefreshEventListener;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.event.SmartApplicationListener;
import org.springframework.stereotype.Component;

import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * Coding by 李炯 on 2022/11/11 11:24
 */
@Component
@Slf4j
public class MqttConfigListener implements SmartApplicationListener {

    @Autowired
    private EmqClient emqClient;
    private ContextRefresher refresh;
    private AtomicBoolean ready = new AtomicBoolean(false);

    public MqttConfigListener(ContextRefresher refresh) {
        this.refresh = refresh;
    }

    public boolean supportsEventType(Class eventType) {
        return ApplicationReadyEvent.class.isAssignableFrom(eventType) || RefreshEvent.class.isAssignableFrom(eventType);
    }

    public void onApplicationEvent(ApplicationEvent event) {
        if (event instanceof ApplicationReadyEvent) {
            this.handle((ApplicationReadyEvent)event);
        } else if (event instanceof RefreshEvent) {
            this.handle((RefreshEvent)event);
        }

    }

    public void handle(ApplicationReadyEvent event) {
        this.ready.compareAndSet(false, true);
    }

    public void handle(RefreshEvent event) {
        if (this.ready.get()) {
            Set keys = this.refresh.refresh();
            log.info("Refresh keys changed>>>>>>>>: " + keys.toString());
            if (keys.toString().contains("mqtt")){
                emqClient.disConnect();//断开之前的连接
                emqClient.init();//重新连接EMQX
            }
        }

    }
}

*

EmqClient.java
package com.hzd.mqtt.client;

import com.hzd.mqtt.enums.QosEnum;
import com.hzd.mqtt.properties.MqttProperties;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

/**
 * Created by
 */
@Component
@Slf4j
public class EmqClient {

    private IMqttClient mqttClient;

    @Autowired
    private MqttProperties mqttProperties;

    @Autowired
    private MqttCallback mqttCallback;

    @Autowired
    private MqttProperties properties;

    @PostConstruct
    public void init() {
        MqttClientPersistence mempersitence = new MemoryPersistence();
        try {
            mqttClient = new MqttClient(mqttProperties.getBrokerUrl(), mqttProperties.getClientId(), mempersitence);
        } catch (MqttException e) {
            log.error("初始化客户端mqttClient对象失败,errormsg={},brokerUrl={},clientId={}", e.getMessage(), mqttProperties.getBrokerUrl(), mqttProperties.getClientId());
        }
        //连接服务端
        connect(properties.getUsername(), properties.getPassword());
        //订阅一个主题
        String topic_hzd = "/hzd/pub/#";
        String topic_device_connect = "/device/connect/#";
        subscribe(topic_hzd, QosEnum.QoS2);
        subscribe(topic_device_connect, QosEnum.QoS2);
        log.info("EmqClient连接成功,订阅成功.topic={},{}", topic_hzd, topic_device_connect);

    }

    /**
     * 连接broker
     *
     * @param username
     * @param password
     */
    public void connect(String username, String password) {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setAutomaticReconnect(true);
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        options.setCleanSession(true);

        mqttClient.setCallback(mqttCallback);

        try {
            mqttClient.connect(options);
        } catch (MqttException e) {
            log.error("mqtt客户端连接服务端失败,失败原因{}", e.getMessage());
        }
    }

    /**
     * 断开连接
     */
    @PreDestroy
    public void disConnect() {
        try {
            mqttClient.disconnect();
        } catch (MqttException e) {
            log.error("断开连接产生异常,异常信息{}", e.getMessage());
        }
    }

    /**
     * 重连
     */
    public void reConnect() {
        try {
            mqttClient.reconnect();
        } catch (MqttException e) {
            log.error("重连失败,失败原因{}", e.getMessage());
        }
    }

    /**
     * 发布消息
     *
     * @param topic
     * @param msg
     * @param qos
     * @param retain
     */
    public void publish(String topic, String msg, QosEnum qos, boolean retain) {

        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setPayload(msg.getBytes());
        mqttMessage.setQos(qos.value());
        mqttMessage.setRetained(retain);
        try {
            mqttClient.publish(topic, mqttMessage);
        } catch (MqttException e) {
            log.error("发布消息失败,errormsg={},topic={},msg={},qos={},retain={}", e.getMessage(), topic, msg, qos.value(), retain);
        }

    }

    /**
     * 订阅
     *
     * @param topicFilter
     * @param qos
     */
    public void subscribe(String topicFilter, QosEnum qos) {
        try {
            mqttClient.subscribe(topicFilter, qos.value());
        } catch (MqttException e) {
            log.error("订阅主题失败,errormsg={},topicFilter={},qos={}", e.getMessage(), topicFilter, qos.value());
        }

    }

    /**
     * 取消订阅
     *
     * @param topicFilter
     */
    public void unSubscribe(String topicFilter) {
        try {
            mqttClient.unsubscribe(topicFilter);
        } catch (MqttException e) {
            log.error("取消订阅失败,errormsg={},topicfiler={}", e.getMessage(), topicFilter);
        }
    }

}

Original: https://blog.csdn.net/ladymorgana/article/details/127810860
Author: ladymorgana
Title: 【nacos】5.3 nacos 更新mqtt配置,自动加载连接EMQX

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/655950/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球