[MQTT / Java] Implémentation d'une classe qui fait Pub / Sub de MQTT en Java

introduction

J'ai essayé d'utiliser MQTT avec Python / Java, donc je le poste. Cet article correspond aux trois suivants. Si vous ne l'avez pas déjà fait, voyez d'abord 1.

  1. [MQTT] Présentation de MQTT sur une base de commande (deux fois avant)
  2. [Python] Implémentation d'une classe pour faire MQTT Pub / Sub en Python (précédent)
  3. [Java] Implémentation d'une classe qui fait Pub / Sub de MQTT en Java (cet article)
  4. [ROS] Implémentation d'un nœud pour la communication MQTT (la prochaine fois)

Bibliothèque, installation de courtier

C'est dans l'article 2ème fois avant, donc je l'écrirai pour ceux qui ne l'ont pas encore installé.

Bibliothèque, installation du courtier
### 1. Pour Windows Veuillez télécharger le programme d'installation en fonction de votre environnement à partir de «Installation binaire» ⇒ «Windows» sur le site suivant. https://mosquitto.org/download/ ![image.png](https://qiita-image-store.s3.ap-northeast-1.amazonaws.com/0/426354/e6ec1314-653b-3dd2-31d7-dc1619f14f1f.png) ### 2. Pour Linux Exécutez les deux commandes suivantes.
# Mosquitto(Broker)Installer
$ sudo apt-get install mosquitto

#Installer le client Mosquitto
$ sudo apt-get install mosquitto-clients

Installation de la bibliothèque cliente

La méthode d'installation de la bibliothèque cliente Java est la suivante. À partir de ce lien org.eclipse.paho.client.mqttv3_1.2.3.jar Veuillez télécharger.

Code de traitement de thread séparé pour l'éditeur et l'abonné

Comme l'article Python, il fonctionne séparément, comme ROS et OpenRTM. Je veux l'appeler du système. En Python, lorsque vous démarrez Subscriber, si vous faites loop_start (), le traitement d'un autre thread semble démarrer, mais en Java il semble que ce ne soit pas le cas (c'est un point inconnu. Quelqu'un peut facilement traiter dans un autre thread. S'il vous plaît laissez-moi savoir si vous le pouvez.) Je crée également une classe qui traite dans un autre thread.

Par conséquent, le programme de cet article comprend les programmes suivants.

--Editeur (appel d'un autre système) --Abonné (appelé en tant que thread séparé)

J'ai beaucoup fait référence à cet article.

Publisher

MqttPublisher.java


package mqtt;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MqttPublisher {
	String broker = "";
	String topic  = "";
	
	/**
	 *constructeur
	 * @param brokerHostName
	 * @param publishTopic
	 */
	public MqttPublisher(String brokerHostName,String publishTopic) {
		broker = "tcp://"+brokerHostName+":1883";
		topic  = publishTopic;
	}

	/**
	 *Publiez l'argument.
	 * @param publishMessage
	 */
	public void publish(String publishMessage) {
        final int qos             = 2;
        final String clientId     = "Publisher";
        try {
            MqttClient mqttClient = new MqttClient(broker, clientId, new MemoryPersistence());
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(false);

            mqttClient.connect(connOpts);

            MqttMessage message = new MqttMessage(publishMessage.getBytes());
            message.setQos(qos);
//            System.out.println("publish message");
//            System.out.println("Topic : "+topic+", Message : "+message);
            mqttClient.publish(topic, message);

            mqttClient.disconnect();
            mqttClient.close();
        } catch(MqttException me) {
            System.out.println("reason: "   + me.getReasonCode());
            System.out.println("message: "  + me.getMessage());
            System.out.println("localize: " + me.getLocalizedMessage());
            System.out.println("cause: "    + me.getCause());
            System.out.println("exception: "+ me);
        }
    }
	
    public static void main(String[] args) {
    	MqttPublisher publisher = new MqttPublisher("localhost","testTopic2");
    	publisher.publish("test");
    }   
}

Comme indiqué dans la fonction principale, spécifiez le nom d'hôte et le nom du sujet lors de la création d'une instance et publiez le message avec la méthode publish.

Subscriber

MqttSubscriber.java


package mqtt;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.sql.Timestamp;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MqttSubscriber  implements MqttCallback {
	Timestamp recieveTime;
	Timestamp lastTime;
	String broker = "";
	String topic  = "";
	
	/**
	 *constructeur
	 * @param brokerHostName
	 * @param subscribeTopic
	 */
	public MqttSubscriber(String brokerHostName,String subscribeTopic) {
		broker = "tcp://"+brokerHostName+":1883";
		topic  = subscribeTopic;
	}
	
    /**
     *Appelé en cas de perte de connexion avec le courtier MQTT.
     */
    @Override
    public void connectionLost(Throwable cause) {
        System.out.println("Connection lost");
        System.exit(1);
    }

    /**
     *Appelé lorsqu'un message est reçu.
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) throws MqttException {
//        System.out.println("Message arrived");
//        System.out.println("Topic:"+ topic);
//        System.out.println("Message: " + new String(message.getPayload()));
        
        recieveTime = new Timestamp(System.currentTimeMillis());
        MqttThread.recieveData = new String(message.getPayload());
    }
    
    /**
     *Jugez si vous êtes abonné ou non.
     * @le retour est un nouveau drapeau
     */
    public boolean isNew() {
    	boolean flag = false;
    	if(recieveTime==lastTime) flag = false;
    	else flag = true;
    	lastTime=recieveTime;
    	return flag;
    }

    public static void main(String[] args) throws InterruptedException {
        try {
            MqttSubscriber subscriber = new MqttSubscriber("localhost","testTopic1");
            subscriber.subscribe();
        } catch(MqttException me) {
            System.out.println("reason: "   + me.getReasonCode());
            System.out.println("message: "  + me.getMessage());
            System.out.println("localize: " + me.getLocalizedMessage());
            System.out.println("cause: "    + me.getCause());
            System.out.println("exception: "+ me);
        }
    }

    /**
     *Recevoir un message.
     *Restez connecté jusqu'à ce qu'il y ait une entrée standard.
     * 
     * @throws MqttException
     * @throws InterruptedException 
     */
    public void subscribe() throws MqttException, InterruptedException {
        //Paramètres d'abonnement
        final int qos             = 2;
        final String clientId     = "Subscribe";

        MqttClient client = new MqttClient(broker, clientId, new MemoryPersistence());
        client.setCallback(this);
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setCleanSession(false);

        System.out.println("Connecting to broker:"+broker);
        client.connect(connOpts);

        client.subscribe(topic, qos);

        BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
        try{
            //Attendez de recevoir une entrée standard
            br.readLine();
        }catch(IOException e){
            System.exit(1);
        }
        client.disconnect();
        client.close();
        System.out.println("Disconnected");
    }
    
    /**
     *Requis pour MqttCallback, peu susceptible d'être appelé par abonnement.
     */
	@Override
	public void deliveryComplete(IMqttDeliveryToken arg0) {
		//TODO talon de méthode généré automatiquement
		
	}
}

Comme pour l'éditeur, spécifiez le nom d'hôte et le nom de la rubrique lors de la création d'une instance, et commencez à vous abonner avec la méthode subscribe.

Traitement des fils

Une classe qui hérite de Thread et effectue le traitement d'abonnement dans le traitement des threads.

MqttThread.java


package mqtt;

import org.eclipse.paho.client.mqttv3.MqttException;

public class MqttThread extends Thread{
	String broker = "";
	String topic  = "";
	static MqttSubscriber subscriber;
	public static String recieveData = "";
	
	//constructeur
	public MqttThread(String brokerHostName,String subscribeTopic) {
		broker = brokerHostName;
		topic  = subscribeTopic;
		subscriber = new MqttSubscriber(broker, topic);
	}
	
	public void run() {
		try {
            subscriber.subscribe();
        } catch(MqttException me) {
            System.out.println("reason: "   + me.getReasonCode());
            System.out.println("message: "  + me.getMessage());
            System.out.println("localize: " + me.getLocalizedMessage());
            System.out.println("cause: "    + me.getCause());
            System.out.println("exception: "+ me);
        } catch (InterruptedException e) {
			//Bloc de capture généré automatiquement TODO
			e.printStackTrace();
		}	
	}
	
	public boolean isNew() {
		boolean flag = false;
		flag = subscriber.isNew();
		return flag;
	}
}

Exemple d'utilisation

Le processus d'abonnement est effectué dans un autre thread, la fonction de rappel (fonction messageArrived) qui est appelée au moment de l'abonnement comme en Python. Le processus de publication appelle la fonction à chaque fois pour publier.

package main;

import mqtt.MqttThread;
import mqtt.MqttPublisher;

public class testMQTTClient {
	
	public static void main(String[] args) {
            MqttThread mthread = new MqttThread("nom d'hôte","Nom du sujet");
            mthread.start();
            //Publier une seule fois
            MqttPublisher publisher = new MqttPublisher("nom d'hôte","Nom du sujet");
            publisher.publish("Contenu du message");
            if(mthread.isNew()){
                System.out.println(mthread.recieveData);
            }
    }
}

Spécifiez le nom d'hôte et le nom de la rubrique lors de l'appel du constructeur (lors de la création d'une instance).

Avec mthread.start (), le processus d'abonnement s'exécute dans un thread séparé. Il ne sera publié qu'une seule fois avec publisher.publish (" message "). Vous pouvez accéder au contenu des données souscrit avec mthread.receiveData. Utilisez-le comme un ensemble avec la fonction ʻisNew () `comme en Python.

Lancez Pub / Sub à partir de l'article précédent Pour référence, vous pouvez démarrer Publisher et Subsrriber et vérifier l'opération.

en conclusion

Cette fois, j'ai présenté un exemple d'implémentation en Java. J'ai essayé de le rendre utilisable lors du déplacement d'un autre système, et non du seul fonctionnement de Pub / Sub.

Recommended Posts

[MQTT / Java] Implémentation d'une classe qui fait Pub / Sub de MQTT en Java
[Java] Implémenter une fonction qui utilise une classe implémentée dans le modèle Builder
Ecrire une classe qui peut être ordonnée en Java Un petit mémo standard
Qu'est-ce qu'une classe en langage Java (3 /?)
Qu'est-ce qu'une classe en langage Java (1 /?)
Qu'est-ce qu'une classe en langage Java (2 /?)
Pourquoi Java appelle-t-il un fichier une classe?
Un fichier bat qui utilise Java dans Windows
Un examen rapide de Java appris en classe
Classe StringConverter à usage général qui utilise des génériques dans Java8
Ecrire une classe en Kotlin et l'appeler en Java
Un examen rapide de Java appris en classe part3
Une bibliothèque qui réalise des chaînes multilignes en chaîne multiligne Java
Rechercher un sous-ensemble en Java
Je ne peux pas créer une classe Java avec un nom spécifique dans IntelliJ
Comment implémenter un travail qui utilise l'API Java dans JobScheduler
J'ai écrit une sorte de livre qui ressemble à Java
Création d'une classe pour une matrice en Java Partie 2-A propos d'une matrice (algèbre linéaire) -
Notation de version Java qui change dans Java 10
3 Implémentez un interpréteur simple en Java
J'ai créé un PDF avec Java.
Classe StringBuffer et StringBuilder en Java
Un exemple simple de rappels en Java
Restez coincé dans un Java Primer
Comment tester une méthode privée et la simuler partiellement en Java
Une histoire sur un projet Spring Boot écrit en Java qui prend en charge Kotlin
Les champs auto-câblés dans une classe qui hérite de TextWebSocketHandler dans Spring Boot deviennent NULL
Exemple de programme qui renvoie la valeur de hachage d'un fichier en Java
J'ai créé une classe qui peut utiliser JUMAN et KNP de Java
A propos du renvoi d'une référence dans un Java Getter
Lors de la recherche de plusieurs dans un tableau Java
[Création] Un mémorandum sur le codage en Java
Java crée un tableau dans un document Word
Java crée un graphique circulaire dans Excel
Créer une application TODO dans Java 7 Créer un en-tête
Créons une application de calcul avec Java
Implémenter quelque chose comme une pile en Java
Diviser une chaîne avec ". (Dot)" en Java
Implémentation d'un algorithme de recherche / tri de base en Java
[Java] Différence entre equals et == dans une chaîne de caractères qui est un type de référence
Méthodes de classe Java String qui sont un peu utiles à connaître mais ne savent pas
[Java] Où est la classe d'implémentation de l'annotation qui existe dans BeanValidation?
[Java] Implémentation d'un lecteur de flux de ligne stricte qui lit uniquement avec CrLf.
La cause n'est pas visible lors de l'appel de méthodes d'autres classes en java