J'ai essayé d'utiliser MQTT avec Python / Java, donc je le poste. Cet article correspond aux trois suivants. Si vous ne l'avez pas déjà fait, voyez d'abord 1.
C'est dans l'article 2ème fois avant, donc je l'écrirai pour ceux qui ne l'ont pas encore installé.
# Mosquitto(Broker)Installer
$ sudo apt-get install mosquitto
#Installer le client Mosquitto
$ sudo apt-get install mosquitto-clients
La méthode d'installation de la bibliothèque cliente Java est la suivante. À partir de ce lien org.eclipse.paho.client.mqttv3_1.2.3.jar Veuillez télécharger.
Comme l'article Python, il fonctionne séparément, comme ROS et OpenRTM. Je veux l'appeler du système.
En Python, lorsque vous démarrez Subscriber, si vous faites loop_start ()
, le traitement d'un autre thread semble démarrer, mais en Java il semble que ce ne soit pas le cas (c'est un point inconnu. Quelqu'un peut facilement traiter dans un autre thread. S'il vous plaît laissez-moi savoir si vous le pouvez.) Je crée également une classe qui traite dans un autre thread.
Par conséquent, le programme de cet article comprend les programmes suivants.
--Editeur (appel d'un autre système) --Abonné (appelé en tant que thread séparé)
J'ai beaucoup fait référence à cet article.
Publisher
MqttPublisher.java
package mqtt;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MqttPublisher {
String broker = "";
String topic = "";
/**
*constructeur
* @param brokerHostName
* @param publishTopic
*/
public MqttPublisher(String brokerHostName,String publishTopic) {
broker = "tcp://"+brokerHostName+":1883";
topic = publishTopic;
}
/**
*Publiez l'argument.
* @param publishMessage
*/
public void publish(String publishMessage) {
final int qos = 2;
final String clientId = "Publisher";
try {
MqttClient mqttClient = new MqttClient(broker, clientId, new MemoryPersistence());
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(false);
mqttClient.connect(connOpts);
MqttMessage message = new MqttMessage(publishMessage.getBytes());
message.setQos(qos);
// System.out.println("publish message");
// System.out.println("Topic : "+topic+", Message : "+message);
mqttClient.publish(topic, message);
mqttClient.disconnect();
mqttClient.close();
} catch(MqttException me) {
System.out.println("reason: " + me.getReasonCode());
System.out.println("message: " + me.getMessage());
System.out.println("localize: " + me.getLocalizedMessage());
System.out.println("cause: " + me.getCause());
System.out.println("exception: "+ me);
}
}
public static void main(String[] args) {
MqttPublisher publisher = new MqttPublisher("localhost","testTopic2");
publisher.publish("test");
}
}
Comme indiqué dans la fonction principale, spécifiez le nom d'hôte et le nom du sujet lors de la création d'une instance et publiez le message avec la méthode publish
.
Subscriber
MqttSubscriber.java
package mqtt;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.sql.Timestamp;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MqttSubscriber implements MqttCallback {
Timestamp recieveTime;
Timestamp lastTime;
String broker = "";
String topic = "";
/**
*constructeur
* @param brokerHostName
* @param subscribeTopic
*/
public MqttSubscriber(String brokerHostName,String subscribeTopic) {
broker = "tcp://"+brokerHostName+":1883";
topic = subscribeTopic;
}
/**
*Appelé en cas de perte de connexion avec le courtier MQTT.
*/
@Override
public void connectionLost(Throwable cause) {
System.out.println("Connection lost");
System.exit(1);
}
/**
*Appelé lorsqu'un message est reçu.
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws MqttException {
// System.out.println("Message arrived");
// System.out.println("Topic:"+ topic);
// System.out.println("Message: " + new String(message.getPayload()));
recieveTime = new Timestamp(System.currentTimeMillis());
MqttThread.recieveData = new String(message.getPayload());
}
/**
*Jugez si vous êtes abonné ou non.
* @le retour est un nouveau drapeau
*/
public boolean isNew() {
boolean flag = false;
if(recieveTime==lastTime) flag = false;
else flag = true;
lastTime=recieveTime;
return flag;
}
public static void main(String[] args) throws InterruptedException {
try {
MqttSubscriber subscriber = new MqttSubscriber("localhost","testTopic1");
subscriber.subscribe();
} catch(MqttException me) {
System.out.println("reason: " + me.getReasonCode());
System.out.println("message: " + me.getMessage());
System.out.println("localize: " + me.getLocalizedMessage());
System.out.println("cause: " + me.getCause());
System.out.println("exception: "+ me);
}
}
/**
*Recevoir un message.
*Restez connecté jusqu'à ce qu'il y ait une entrée standard.
*
* @throws MqttException
* @throws InterruptedException
*/
public void subscribe() throws MqttException, InterruptedException {
//Paramètres d'abonnement
final int qos = 2;
final String clientId = "Subscribe";
MqttClient client = new MqttClient(broker, clientId, new MemoryPersistence());
client.setCallback(this);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(false);
System.out.println("Connecting to broker:"+broker);
client.connect(connOpts);
client.subscribe(topic, qos);
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
try{
//Attendez de recevoir une entrée standard
br.readLine();
}catch(IOException e){
System.exit(1);
}
client.disconnect();
client.close();
System.out.println("Disconnected");
}
/**
*Requis pour MqttCallback, peu susceptible d'être appelé par abonnement.
*/
@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
//TODO talon de méthode généré automatiquement
}
}
Comme pour l'éditeur, spécifiez le nom d'hôte et le nom de la rubrique lors de la création d'une instance, et commencez à vous abonner avec la méthode subscribe
.
Une classe qui hérite de Thread
et effectue le traitement d'abonnement dans le traitement des threads.
MqttThread.java
package mqtt;
import org.eclipse.paho.client.mqttv3.MqttException;
public class MqttThread extends Thread{
String broker = "";
String topic = "";
static MqttSubscriber subscriber;
public static String recieveData = "";
//constructeur
public MqttThread(String brokerHostName,String subscribeTopic) {
broker = brokerHostName;
topic = subscribeTopic;
subscriber = new MqttSubscriber(broker, topic);
}
public void run() {
try {
subscriber.subscribe();
} catch(MqttException me) {
System.out.println("reason: " + me.getReasonCode());
System.out.println("message: " + me.getMessage());
System.out.println("localize: " + me.getLocalizedMessage());
System.out.println("cause: " + me.getCause());
System.out.println("exception: "+ me);
} catch (InterruptedException e) {
//Bloc de capture généré automatiquement TODO
e.printStackTrace();
}
}
public boolean isNew() {
boolean flag = false;
flag = subscriber.isNew();
return flag;
}
}
Le processus d'abonnement est effectué dans un autre thread, la fonction de rappel (fonction messageArrived
) qui est appelée au moment de l'abonnement comme en Python.
Le processus de publication appelle la fonction à chaque fois pour publier.
package main;
import mqtt.MqttThread;
import mqtt.MqttPublisher;
public class testMQTTClient {
public static void main(String[] args) {
MqttThread mthread = new MqttThread("nom d'hôte","Nom du sujet");
mthread.start();
//Publier une seule fois
MqttPublisher publisher = new MqttPublisher("nom d'hôte","Nom du sujet");
publisher.publish("Contenu du message");
if(mthread.isNew()){
System.out.println(mthread.recieveData);
}
}
}
Spécifiez le nom d'hôte et le nom de la rubrique lors de l'appel du constructeur (lors de la création d'une instance).
Avec mthread.start ()
, le processus d'abonnement s'exécute dans un thread séparé.
Il ne sera publié qu'une seule fois avec publisher.publish (" message ")
.
Vous pouvez accéder au contenu des données souscrit avec mthread.receiveData
. Utilisez-le comme un ensemble avec la fonction ʻisNew () `comme en Python.
Lancez Pub / Sub à partir de l'article précédent Pour référence, vous pouvez démarrer Publisher et Subsrriber et vérifier l'opération.
Cette fois, j'ai présenté un exemple d'implémentation en Java. J'ai essayé de le rendre utilisable lors du déplacement d'un autre système, et non du seul fonctionnement de Pub / Sub.
Recommended Posts