Ich habe versucht, MQTT mit Python / Java zu verwenden, also poste ich es. Dieser Artikel entspricht den folgenden drei. Wenn Sie dies noch nicht getan haben, lesen Sie bitte zuerst 1.
Dies ist in dem Artikel 2. Mal zuvor, also werde ich es für diejenigen schreiben, die es noch nicht installiert haben.
# Mosquitto(Broker)Installieren
$ sudo apt-get install mosquitto
#Installieren Sie den Mosquitto-Client
$ sudo apt-get install mosquitto-clients
Die Installationsmethode der Java-Clientbibliothek lautet wie folgt. Von diesem Link org.eclipse.paho.client.mqttv3_1.2.3.jar Bitte herunterladen.
Wie der Python-Artikel funktioniert er separat, z. B. ROS und OpenRTM. Ich möchte es vom System aus aufrufen.
Wenn Sie in Python Subscriber starten und loop_start ()
ausführen, scheint die Verarbeitung eines anderen Threads zu beginnen, in Java scheint dies jedoch nicht der Fall zu sein (dies ist ein unbekannter Punkt. Jemand kann problemlos in einem anderen Thread verarbeiten. Bitte lassen Sie mich wissen, wenn Sie können.) Ich mache auch eine Klasse, die in einem anderen Thread verarbeitet.
Daher besteht das Programm in diesem Artikel aus den folgenden Programmen.
--Publisher (Anruf von einem anderen System)
Ich habe sehr viel auf diesen Artikel verwiesen.
Publisher
MqttPublisher.java
package mqtt;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MqttPublisher {
String broker = "";
String topic = "";
/**
*Konstrukteur
* @param brokerHostName
* @param publishTopic
*/
public MqttPublisher(String brokerHostName,String publishTopic) {
broker = "tcp://"+brokerHostName+":1883";
topic = publishTopic;
}
/**
*Veröffentlichen Sie das Argument.
* @param publishMessage
*/
public void publish(String publishMessage) {
final int qos = 2;
final String clientId = "Publisher";
try {
MqttClient mqttClient = new MqttClient(broker, clientId, new MemoryPersistence());
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(false);
mqttClient.connect(connOpts);
MqttMessage message = new MqttMessage(publishMessage.getBytes());
message.setQos(qos);
// System.out.println("publish message");
// System.out.println("Topic : "+topic+", Message : "+message);
mqttClient.publish(topic, message);
mqttClient.disconnect();
mqttClient.close();
} catch(MqttException me) {
System.out.println("reason: " + me.getReasonCode());
System.out.println("message: " + me.getMessage());
System.out.println("localize: " + me.getLocalizedMessage());
System.out.println("cause: " + me.getCause());
System.out.println("exception: "+ me);
}
}
public static void main(String[] args) {
MqttPublisher publisher = new MqttPublisher("localhost","testTopic2");
publisher.publish("test");
}
}
Geben Sie wie in der Hauptfunktion beim Erstellen einer Instanz den Hostnamen und den Themennamen an und veröffentlichen Sie die Nachricht mit der Methode "Publish".
Subscriber
MqttSubscriber.java
package mqtt;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.sql.Timestamp;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MqttSubscriber implements MqttCallback {
Timestamp recieveTime;
Timestamp lastTime;
String broker = "";
String topic = "";
/**
*Konstrukteur
* @param brokerHostName
* @param subscribeTopic
*/
public MqttSubscriber(String brokerHostName,String subscribeTopic) {
broker = "tcp://"+brokerHostName+":1883";
topic = subscribeTopic;
}
/**
*Wird aufgerufen, wenn die Verbindung zum MQTT-Broker unterbrochen wird.
*/
@Override
public void connectionLost(Throwable cause) {
System.out.println("Connection lost");
System.exit(1);
}
/**
*Wird aufgerufen, wenn eine Nachricht empfangen wird.
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws MqttException {
// System.out.println("Message arrived");
// System.out.println("Topic:"+ topic);
// System.out.println("Message: " + new String(message.getPayload()));
recieveTime = new Timestamp(System.currentTimeMillis());
MqttThread.recieveData = new String(message.getPayload());
}
/**
*Stellen Sie fest, ob Sie abonniert haben.
* @return is New flag
*/
public boolean isNew() {
boolean flag = false;
if(recieveTime==lastTime) flag = false;
else flag = true;
lastTime=recieveTime;
return flag;
}
public static void main(String[] args) throws InterruptedException {
try {
MqttSubscriber subscriber = new MqttSubscriber("localhost","testTopic1");
subscriber.subscribe();
} catch(MqttException me) {
System.out.println("reason: " + me.getReasonCode());
System.out.println("message: " + me.getMessage());
System.out.println("localize: " + me.getLocalizedMessage());
System.out.println("cause: " + me.getCause());
System.out.println("exception: "+ me);
}
}
/**
*Eine Nachricht erhalten.
*Bleiben Sie in Verbindung, bis ein Standardeingang vorhanden ist.
*
* @throws MqttException
* @throws InterruptedException
*/
public void subscribe() throws MqttException, InterruptedException {
//Einstellungen abonnieren
final int qos = 2;
final String clientId = "Subscribe";
MqttClient client = new MqttClient(broker, clientId, new MemoryPersistence());
client.setCallback(this);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(false);
System.out.println("Connecting to broker:"+broker);
client.connect(connOpts);
client.subscribe(topic, qos);
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
try{
//Warten Sie, bis Sie die Standardeingabe erhalten
br.readLine();
}catch(IOException e){
System.exit(1);
}
client.disconnect();
client.close();
System.out.println("Disconnected");
}
/**
*Erforderlich für MqttCallback, wird wahrscheinlich nicht von subscribe aufgerufen.
*/
@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
//TODO automatisch generierter Methodenstub
}
}
Geben Sie wie beim Herausgeber beim Erstellen einer Instanz den Hostnamen und den Themennamen an und starten Sie das Abonnieren mit der Methode "subscribe".
Eine Klasse, die "Thread" erbt und die Abonnementverarbeitung in der Threadverarbeitung ausführt.
MqttThread.java
package mqtt;
import org.eclipse.paho.client.mqttv3.MqttException;
public class MqttThread extends Thread{
String broker = "";
String topic = "";
static MqttSubscriber subscriber;
public static String recieveData = "";
//Konstrukteur
public MqttThread(String brokerHostName,String subscribeTopic) {
broker = brokerHostName;
topic = subscribeTopic;
subscriber = new MqttSubscriber(broker, topic);
}
public void run() {
try {
subscriber.subscribe();
} catch(MqttException me) {
System.out.println("reason: " + me.getReasonCode());
System.out.println("message: " + me.getMessage());
System.out.println("localize: " + me.getLocalizedMessage());
System.out.println("cause: " + me.getCause());
System.out.println("exception: "+ me);
} catch (InterruptedException e) {
//TODO automatisch generierter Fangblock
e.printStackTrace();
}
}
public boolean isNew() {
boolean flag = false;
flag = subscriber.isNew();
return flag;
}
}
Der Vorgang des Abonnierens wird in einem anderen Thread ausgeführt, der Rückruffunktion (messageArrived
-Funktion), die zum Zeitpunkt des Abonnierens wie in Python aufgerufen wird.
Der Veröffentlichungsprozess ruft die Funktion jedes Mal zum Veröffentlichen auf.
package main;
import mqtt.MqttThread;
import mqtt.MqttPublisher;
public class testMQTTClient {
public static void main(String[] args) {
MqttThread mthread = new MqttThread("Hostname","Themenname");
mthread.start();
//Nur einmal veröffentlichen
MqttPublisher publisher = new MqttPublisher("Hostname","Themenname");
publisher.publish("Nachrichteninhalt");
if(mthread.isNew()){
System.out.println(mthread.recieveData);
}
}
}
Geben Sie beim Aufrufen des Konstruktors (beim Erstellen einer Instanz) den Hostnamen und den Themennamen an.
Mit mthread.start ()
wird der Abonnementprozess in einem separaten Thread ausgeführt.
Es wird nur einmal mit "publisher.publish (" message ")" veröffentlicht.
Sie können mit mthread.receiveData
auf den abonnierten Dateninhalt zugreifen. Verwenden Sie es als Set mit der Funktion isNew ()
wie in Python.
Pub / Sub aus dem vorherigen Artikel starten Als Referenz können Sie Publisher und Subsrriber starten und den Vorgang überprüfen.
Dieses Mal habe ich ein Implementierungsbeispiel in Java vorgestellt. Ich habe versucht, es nutzbar zu machen, wenn ich tatsächlich von einem anderen System wechsle, nicht nur von Pub / Sub.
Recommended Posts