记录一下SpringBoot+MQTT的使用

spring boot 添加 mqtt 收发消息保存消息等

   <dependency>
            <groupid>org.eclipse.paho</groupid>
            <artifactid>org.eclipse.paho.client.mqttv3</artifactid>
            <version>1.2.3</version>
        </dependency>
  mqtt:
    urls: tcp://127.0.0.1:1883
    clientId: WebKn9wu
    username: king
    password: 12341234
    completionTimeout: 30000
/**
 * Mqtt &#x5C5E;&#x6027;&#x914D;&#x7F6E;&#x7C7B;
 */
@Component
@Configuration
@Setter
@Getter
public class MqttConfig {

    @Value("${spring.mqtt.username}")
    private String username;

    @Value("${spring.mqtt.password}")
    private String password;

    @Value("${spring.mqtt.urls}")
    private String urls;

    @Value("${spring.mqtt.clientId}")
    private String clientId;
    /** automaticReconnection &#x662F;&#x5426;&#x81EA;&#x52A8;&#x91CD;&#x8FDE; &#x9ED8;&#x8BA4;false */
    private boolean automaticReconnection = true;
    /**
     *  &#x8FDE;&#x63A5;&#x8D85;&#x65F6;
     */
    @Value("${spring.mqtt.completionTimeout}")
    private int completionTimeout;
}
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
/**
 * Mqtt &#x521B;&#x5EFA;&#x8FDE;&#x63A5; &#x6D88;&#x606F;&#x53D1;&#x5E03; &#x8BA2;&#x9605;
 */
@Component
public class MyMqttClient {

    private static final Logger logger = LoggerFactory.getLogger(MyMqttClient.class);
    @Autowired
    private MqttConfig config;
    @Autowired
    private PushCallback pushCallback;

    @Autowired
    private MqttLogService mqttLogService;

    private org.eclipse.paho.client.mqttv3.MqttClient client;
    private static MemoryPersistence memoryPersistence = null;
    /** options MQTT Cleint&#x8FDE;&#x63A5;&#x914D;&#x7F6E; */
    private MqttConnectOptions options;

    public org.eclipse.paho.client.mqttv3.MqttClient getClient() {
        return client;
    }

    public void setClient(org.eclipse.paho.client.mqttv3.MqttClient client) {
        this.client = client;
    }

    /**
     * connect : &#x5BA2;&#x6237;&#x7AEF;&#x8FDE;&#x63A5;
     */
    @PostConstruct
    public void connect() {
        try {
            //            &#x8BBE;&#x7F6E;&#x6301;&#x4E45;&#x5316;&#x65B9;&#x5F0F;
            memoryPersistence = new MemoryPersistence();
            this.client = new org.eclipse.paho.client.mqttv3.MqttClient(config.getUrls(), config.getClientId(), memoryPersistence);
            options = new MqttConnectOptions();
            // &#x8BBE;&#x7F6E;&#x662F;&#x5426;&#x6E05;&#x7A7A;session,&#x8FD9;&#x91CC;&#x5982;&#x679C;&#x8BBE;&#x7F6E;&#x4E3A;false&#x8868;&#x793A;&#x670D;&#x52A1;&#x5668;&#x4F1A;&#x4FDD;&#x7559;&#x5BA2;&#x6237;&#x7AEF;&#x7684;&#x8FDE;&#x63A5;&#x8BB0;&#x5F55;&#xFF0C;
            // 重连清空session
            options.setCleanSession(true);
            // &#x8BBE;&#x7F6E;&#x8FDE;&#x63A5;&#x7684;&#x7528;&#x6237;&#x540D;
            options.setUserName(config.getUsername());
            // &#x8BBE;&#x7F6E;&#x8FDE;&#x63A5;&#x7684;&#x5BC6;&#x7801;
            options.setPassword(config.getPassword().toCharArray());
            // &#x8BBE;&#x7F6E;&#x8D85;&#x65F6;&#x65F6;&#x95F4; &#x5355;&#x4F4D;&#x4E3A;&#x79D2;
            options.setConnectionTimeout(config.getCompletionTimeout());
            // &#x8BBE;&#x7F6E;&#x4F1A;&#x8BDD;&#x5FC3;&#x8DF3;&#x65F6;&#x95F4; &#x5355;&#x4F4D;&#x4E3A;&#x79D2; &#x670D;&#x52A1;&#x5668;&#x4F1A;&#x6BCF;&#x9694;1.5*20&#x79D2;&#x7684;&#x65F6;&#x95F4;&#x5411;&#x5BA2;&#x6237;&#x7AEF;&#x53D1;&#x9001;&#x5FC3;&#x8DF3;&#x5224;&#x65AD;&#x5BA2;&#x6237;&#x7AEF;&#x662F;&#x5426;&#x5728;&#x7EBF;&#xFF0C;&#x4F46;&#x8FD9;&#x4E2A;&#x65B9;&#x6CD5;&#x5E76;&#x6CA1;&#x6709;&#x91CD;&#x8FDE;&#x7684;&#x673A;&#x5236;
            options.setKeepAliveInterval(30);
            // &#x662F;&#x5426;&#x91CD;&#x8FDE;
            options.setAutomaticReconnect(true);
            // options.setMaxInflight(config.getMaxInflight());
            this.setClient(client);
            client.setCallback(pushCallback);
            // &#x5982;&#x679C;&#x662F;new&#x7684;&#x56DE;&#x8C03; &#x53EF;&#x4EE5;&#x628A; &#x4E1A;&#x52A1;&#x63A5;&#x53E3;&#x4F20;&#x8FC7;&#x53BB;
            // client.setCallback(new DefaultCallback(this, mqttLogService));
            if (null !=client){
                client.connect(options);
            }
        } catch (MqttException e) {
            logger.error(e.getCause().getLocalizedMessage());
            e.printStackTrace();
        }
    }

    /**
     * publish : &#x53D1;&#x5E03;
     *
     * @param qos         &#x8FDE;&#x63A5;&#x65B9;&#x5F0F;
     * @param retained    &#x662F;&#x5426;&#x4FDD;&#x7559;
     * @param topic       &#x4E3B;&#x9898;
     * @param pushMessage &#x6D88;&#x606F;&#x4F53;
     */
    public void publish(int qos, boolean retained, String topic, String pushMessage) {
        MqttMessage message = new MqttMessage();
        message.setQos(qos);
        message.setRetained(retained);
        message.setPayload(pushMessage.getBytes());
        MqttTopic mTopic = this.getClient().getTopic(topic);
        if (null == mTopic) {
            logger.info("topic not exist");
        }
         MqttDeliveryToken token;
        try {
            token = mTopic.publish(message);
            token.waitForCompletion();
        } catch (MqttPersistenceException e) {
            e.printStackTrace();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * &#x8BA2;&#x9605;&#x67D0;&#x4E2A;&#x4E3B;&#x9898;
     * @param topic &#x4E3B;&#x9898;
     * @param qos &#x6D88;&#x606F;&#x8D28;&#x91CF; 0 1 2
     */
    public void subscribe(String topic, int qos) {
        if (null != client && client.isConnected()) {
            logger.info("&#x5F00;&#x59CB;&#x8BA2;&#x9605;&#x4E3B;&#x9898;" + topic);
            try {
                this.getClient().subscribe(topic, qos);
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }else {
            System.out.println("mqttClient is error");
        }
    }

    /**
     * &#x53D6;&#x6D88;&#x8BA2;&#x9605;
     * @param topic &#x8981;&#x53D6;&#x6D88;&#x7684;&#x4E3B;&#x9898;
     */
    public void cleanTopic(String topic) {
        if (null != client && !client.isConnected()) {
            try {
                client.unsubscribe(topic);
            } catch (MqttException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        } else {
            System.out.println("mqttClient is error");
        }
    }
    /**&#x5173;&#x95ED;&#x8FDE;&#x63A5;*/
    @PreDestroy
    public void closeConnect() {
        //&#x5173;&#x95ED;&#x5B58;&#x50A8;&#x65B9;&#x5F0F;
        if (null != memoryPersistence) {
            try {
                memoryPersistence.close();
            } catch (MqttPersistenceException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        } else {
            logger.error("memoryPersistence is null");
        }

//        &#x5173;&#x95ED;&#x8FDE;&#x63A5;
        if (null != client) {
            if (client.isConnected()) {
                try {
                    client.disconnect();
                    client.close();
                } catch (MqttException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                    logger.error("&#x5173;&#x95ED;&#x8FDE;&#x63A5;&#x5F02;&#x5E38;={}",e.getMessage());
                }
            } else {
                logger.error("&#x5173;&#x95ED;&#x8FDE;&#x63A5;&#x5F02;&#x5E38;={}","mqttClient is not connect");
            }
        } else {
            logger.error("&#x5173;&#x95ED;&#x8FDE;&#x63A5;&#x5F02;&#x5E38;={}","mqttClient is null");
        }
    }
}

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/***
 * Mqtt &#x6D88;&#x606F;&#x8FD4;&#x56DE;&#x6307;&#x5B9A;&#x63A5;&#x53E3;
 */
@Slf4j
@Component
public class PushCallback implements MqttCallbackExtended {
    private static final Logger logger = LoggerFactory.getLogger(PushCallback.class);
    @Autowired
    private MyMqttClient client;

    @Autowired
    private MqttConfig mqttConfig;

    // &#x8FD9;&#x662F;&#x4FDD;&#x5B58;&#x6570;&#x636E;&#x7684;&#x63A5;&#x53E3;
    @Autowired
    private MqttLogService mqttLogService;

/**
     * &#x4E22;&#x5931;&#x4E86;&#x5BF9;&#x670D;&#x52A1;&#x7AEF;&#x7684;&#x8FDE;&#x63A5;&#x540E;&#x89E6;&#x53D1;&#x7684;&#x56DE;&#x8C03;
     * @param throwable
     */
    @Override
    public void connectionLost(Throwable throwable) {
        new Thread(() -> {
            logger.info("[MQTT] &#x8FDE;&#x63A5;&#x65AD;&#x5F00;&#xFF0C;1s&#x4E4B;&#x540E;&#x5C1D;&#x8BD5;&#x91CD;&#x8FDE;...", throwable);
            boolean reconnecting = false;
            for (int i = 1; i < 5; i++) {
                try {
                    if (client.getClient().isConnected()) {
                        break;
                    }
                    Thread.sleep(1000);
                    boolean needReconnect = !mqttConfig.isAutomaticReconnection() && !reconnecting && !client.getClient().isConnected();
                    if (needReconnect) {
                        logger.info("&#x5F00;&#x59CB;&#x91CD;&#x8FDE;...");
                        client.getClient().reconnect();
                        reconnecting = true;
                    }
                } catch (Exception e) {
                    logger.info("mqtt&#x91CD;&#x8FDE;&#x5931;&#x8D25;,&#x7EE7;&#x7EED;&#x91CD;&#x8FDE;,reason:" + e.getMessage(), e);
                    continue;
                }
            }
        }).start();
    }

  /**
     * &#x5E94;&#x7528;&#x6536;&#x5230;&#x6D88;&#x606F;&#x540E;&#x89E6;&#x53D1;&#x7684;&#x56DE;&#x8C03;
     * @param topic
     * @param mqttMessage
     * @throws Exception
     */
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        log.info("=====&#x63A5;&#x6536;&#x6D88;&#x606F;&#x4E3B;&#x9898; : " + topic);
        String msg = new String(mqttMessage.getPayload());
        log.info("=====&#x63A5;&#x6536;&#x6D88;&#x606F;&#x5185;&#x5BB9; : " + msg);
        String[] tsplit = topic.split("/");
        // &#x8C03;&#x7528; MqttLogService &#x5185;&#x7684;&#x5B58;&#x50A8;&#x65B9;&#x6CD5;

    }

 /**
     * &#x6D88;&#x606F;&#x53D1;&#x5E03;&#x8005;&#x6D88;&#x606F;&#x53D1;&#x5E03;&#x5B8C;&#x6210;&#x4EA7;&#x751F;&#x7684;&#x56DE;&#x8C03;
     * @param iMqttDeliveryToken
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        log.info("=====deliveryComplete---------" + iMqttDeliveryToken.isComplete());
    }

 /**
     * &#x8FDE;&#x63A5;&#x5B8C;&#x6210; &#x89E6;&#x53D1;&#x56DE;&#x8C03;
     * @param b
     * @param s
     */
    @Override
    public void connectComplete(boolean b, String s) {
        // &#x8BA2;&#x9605;&#x95F4;&#x9694;&#x81EA;&#x52A8;&#x4E0A;&#x62A5;&#x6570;&#x636E;
        client.subscribe("$king/devices/xxx/#", 0);
        // &#x8BA2;&#x9605;&#x8BBE;&#x5907;&#x4FE1;&#x606F;
        client.subscribe("$king/devices/xxx/#", 0);
        log.info("mqtt&#x8FDE;&#x63A5;&#x6210;&#x529F;&#xFF0C;&#x5BA2;&#x6237;&#x7AEF;ID&#xFF1A;" + mqttConfig.getUrls());
        // &#x5BF9;&#x8FDE;&#x63A5;&#x6210;&#x529F; &#x4FDD;&#x5B58;&#x65E5;&#x5FD7;
    }

}

第二种 Callback

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

/***
 * Mqtt &#x6D88;&#x606F;&#x8FD4;&#x56DE;&#x6307;&#x5B9A;&#x63A5;&#x53E3;
 */
@Slf4j
@Component
public class DefaultCallback implements MqttCallbackExtended {
    private static final Logger logger = LoggerFactory.getLogger(DefaultCallback.class);

    private MqttLogService mqttLogService;

    private MyMqttClient client;

    public DefaultCallback(){

    }
    public DefaultCallback(MyMqttClient pushClient, MqttLogService service) {
        super();
        this.client = pushClient;
        this.mqttLogService = service;
    }

    /**
     * &#x4E22;&#x5931;&#x4E86;&#x5BF9;&#x670D;&#x52A1;&#x7AEF;&#x7684;&#x8FDE;&#x63A5;&#x540E;&#x89E6;&#x53D1;&#x7684;&#x56DE;&#x8C03;
     * @param throwable
     */
    @Override
    public void connectionLost(Throwable throwable) {
        new Thread(() -> {
            logger.info("[MQTT] &#x8FDE;&#x63A5;&#x65AD;&#x5F00;&#xFF0C;1s&#x4E4B;&#x540E;&#x5C1D;&#x8BD5;&#x91CD;&#x8FDE;...", throwable);
            MqttClient mqttClient = client.getClient();
            boolean reconnecting = false;
            for (int i = 1; i < 5; i++) {
                try {
                    if (mqttClient.isConnected()) {
                        break;
                    }
                    Thread.sleep(1000);
                    boolean needReconnect = !client.config.isAutomaticReconnection() && !reconnecting && !mqttClient.isConnected();
                    if (needReconnect) {
                        logger.info("&#x5F00;&#x59CB;&#x91CD;&#x8FDE;...");
                        mqttClient.reconnect();
                        reconnecting = true;
                    }
                } catch (Exception e) {
                    logger.info("mqtt&#x91CD;&#x8FDE;&#x5931;&#x8D25;,&#x7EE7;&#x7EED;&#x91CD;&#x8FDE;,reason:" + e.getMessage(), e);
                    continue;
                }
            }
        }).start();
    }

    /**
     * &#x5E94;&#x7528;&#x6536;&#x5230;&#x6D88;&#x606F;&#x540E;&#x89E6;&#x53D1;&#x7684;&#x56DE;&#x8C03;
     * @param topic
     * @param mqttMessage
     * @throws Exception
     */
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        log.info("=====&#x63A5;&#x6536;&#x6D88;&#x606F;&#x4E3B;&#x9898; : " + topic);
        String msg = new String(mqttMessage.getPayload());
        log.info("=====&#x63A5;&#x6536;&#x6D88;&#x606F;&#x5185;&#x5BB9; : " + msg);

        String[] tsplit = topic.split("/");
        // mqttLogService.saveMqtt(mqttLog);
        log.info("-----&#x5BF9;&#x6570;&#x636E;&#x8FDB;&#x884C;&#x6301;&#x4E45;&#x5316;");
    }

    /**
     * &#x6D88;&#x606F;&#x53D1;&#x5E03;&#x8005;&#x6D88;&#x606F;&#x53D1;&#x5E03;&#x5B8C;&#x6210;&#x4EA7;&#x751F;&#x7684;&#x56DE;&#x8C03;
     * @param iMqttDeliveryToken
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        log.info("=====deliveryComplete---------" + iMqttDeliveryToken.isComplete());
    }

    /**
     * &#x8FDE;&#x63A5;&#x5B8C;&#x6210; &#x89E6;&#x53D1;&#x56DE;&#x8C03;
     * @param b
     * @param s
     */
    @Override
    public void connectComplete(boolean b, String s) {
         // &#x8BA2;&#x9605;&#x95F4;&#x9694;&#x81EA;&#x52A8;&#x4E0A;&#x62A5;&#x6570;&#x636E;
        client.subscribe("$king/devices/xxx/#", 0);
        // &#x8BA2;&#x9605;&#x8BBE;&#x5907;&#x4FE1;&#x606F;
        client.subscribe("$king/devices/xxx/#", 0);
        log.info("mqtt&#x8FDE;&#x63A5;&#x6210;&#x529F;&#xFF0C;&#x5BA2;&#x6237;&#x7AEF;ID&#xFF1A;" + client.config.getUrls());
        // &#x5BF9;&#x8FDE;&#x63A5;&#x6210;&#x529F; &#x4FDD;&#x5B58;&#x65E5;&#x5FD7;
    }
}

Original: https://www.cnblogs.com/KingPingyue/p/15954751.html
Author: 汤姆·希德勒斯顿
Title: 记录一下SpringBoot+MQTT的使用

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/592479/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球