[MQTT / Java] Implementierte eine Klasse, die Pub / Sub von MQTT in Java ausführt

Einführung

Ich habe versucht, MQTT mit Python / Java zu verwenden, also poste ich es. Dieser Artikel entspricht den folgenden drei. Wenn Sie dies noch nicht getan haben, lesen Sie bitte zuerst 1.

  1. [MQTT] Einführung von MQTT auf Befehlsbasis (zweimal zuvor)
  2. [Python] Implementierte eine Klasse, die MQTT Pub / Sub in Python ausführt (vorherige)
  3. [Java] Implementierte eine Klasse, die Pub / Sub von MQTT in Java ausführt (dieser Artikel)
  4. [ROS] Implementierte einen Knoten für die MQTT-Kommunikation (nächstes Mal)

Bibliothek, Broker-Installation

Dies ist in dem Artikel 2. Mal zuvor, also werde ich es für diejenigen schreiben, die es noch nicht installiert haben.

Bibliothek, Broker-Installation
### 1. Für Windows Bitte laden Sie das Installationsprogramm entsprechend Ihrer Umgebung von "Binäre Installation" ⇒ "Windows" auf der folgenden Site herunter. https://mosquitto.org/download/ ![image.png](https://qiita-image-store.s3.ap-northeast-1.amazonaws.com/0/426354/e6ec1314-653b-3dd2-31d7-dc1619f14f1f.png) ### 2. Für Linux Führen Sie die folgenden zwei Befehle aus.
# Mosquitto(Broker)Installieren
$ sudo apt-get install mosquitto

#Installieren Sie den Mosquitto-Client
$ sudo apt-get install mosquitto-clients

Installation der Client-Bibliothek

Die Installationsmethode der Java-Clientbibliothek lautet wie folgt. Von diesem Link org.eclipse.paho.client.mqttv3_1.2.3.jar Bitte herunterladen.

Separater Thread-Verarbeitungscode für Publisher und Subscriber

Wie der Python-Artikel funktioniert er separat, z. B. ROS und OpenRTM. Ich möchte es vom System aus aufrufen. Wenn Sie in Python Subscriber starten und loop_start () ausführen, scheint die Verarbeitung eines anderen Threads zu beginnen, in Java scheint dies jedoch nicht der Fall zu sein (dies ist ein unbekannter Punkt. Jemand kann problemlos in einem anderen Thread verarbeiten. Bitte lassen Sie mich wissen, wenn Sie können.) Ich mache auch eine Klasse, die in einem anderen Thread verarbeitet.

Daher besteht das Programm in diesem Artikel aus den folgenden Programmen.

--Publisher (Anruf von einem anderen System)

Ich habe sehr viel auf diesen Artikel verwiesen.

Publisher

MqttPublisher.java


package mqtt;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MqttPublisher {
	String broker = "";
	String topic  = "";
	
	/**
	 *Konstrukteur
	 * @param brokerHostName
	 * @param publishTopic
	 */
	public MqttPublisher(String brokerHostName,String publishTopic) {
		broker = "tcp://"+brokerHostName+":1883";
		topic  = publishTopic;
	}

	/**
	 *Veröffentlichen Sie das Argument.
	 * @param publishMessage
	 */
	public void publish(String publishMessage) {
        final int qos             = 2;
        final String clientId     = "Publisher";
        try {
            MqttClient mqttClient = new MqttClient(broker, clientId, new MemoryPersistence());
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(false);

            mqttClient.connect(connOpts);

            MqttMessage message = new MqttMessage(publishMessage.getBytes());
            message.setQos(qos);
//            System.out.println("publish message");
//            System.out.println("Topic : "+topic+", Message : "+message);
            mqttClient.publish(topic, message);

            mqttClient.disconnect();
            mqttClient.close();
        } catch(MqttException me) {
            System.out.println("reason: "   + me.getReasonCode());
            System.out.println("message: "  + me.getMessage());
            System.out.println("localize: " + me.getLocalizedMessage());
            System.out.println("cause: "    + me.getCause());
            System.out.println("exception: "+ me);
        }
    }
	
    public static void main(String[] args) {
    	MqttPublisher publisher = new MqttPublisher("localhost","testTopic2");
    	publisher.publish("test");
    }   
}

Geben Sie wie in der Hauptfunktion beim Erstellen einer Instanz den Hostnamen und den Themennamen an und veröffentlichen Sie die Nachricht mit der Methode "Publish".

Subscriber

MqttSubscriber.java


package mqtt;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.sql.Timestamp;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MqttSubscriber  implements MqttCallback {
	Timestamp recieveTime;
	Timestamp lastTime;
	String broker = "";
	String topic  = "";
	
	/**
	 *Konstrukteur
	 * @param brokerHostName
	 * @param subscribeTopic
	 */
	public MqttSubscriber(String brokerHostName,String subscribeTopic) {
		broker = "tcp://"+brokerHostName+":1883";
		topic  = subscribeTopic;
	}
	
    /**
     *Wird aufgerufen, wenn die Verbindung zum MQTT-Broker unterbrochen wird.
     */
    @Override
    public void connectionLost(Throwable cause) {
        System.out.println("Connection lost");
        System.exit(1);
    }

    /**
     *Wird aufgerufen, wenn eine Nachricht empfangen wird.
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) throws MqttException {
//        System.out.println("Message arrived");
//        System.out.println("Topic:"+ topic);
//        System.out.println("Message: " + new String(message.getPayload()));
        
        recieveTime = new Timestamp(System.currentTimeMillis());
        MqttThread.recieveData = new String(message.getPayload());
    }
    
    /**
     *Stellen Sie fest, ob Sie abonniert haben.
     * @return is New flag
     */
    public boolean isNew() {
    	boolean flag = false;
    	if(recieveTime==lastTime) flag = false;
    	else flag = true;
    	lastTime=recieveTime;
    	return flag;
    }

    public static void main(String[] args) throws InterruptedException {
        try {
            MqttSubscriber subscriber = new MqttSubscriber("localhost","testTopic1");
            subscriber.subscribe();
        } catch(MqttException me) {
            System.out.println("reason: "   + me.getReasonCode());
            System.out.println("message: "  + me.getMessage());
            System.out.println("localize: " + me.getLocalizedMessage());
            System.out.println("cause: "    + me.getCause());
            System.out.println("exception: "+ me);
        }
    }

    /**
     *Eine Nachricht erhalten.
     *Bleiben Sie in Verbindung, bis ein Standardeingang vorhanden ist.
     * 
     * @throws MqttException
     * @throws InterruptedException 
     */
    public void subscribe() throws MqttException, InterruptedException {
        //Einstellungen abonnieren
        final int qos             = 2;
        final String clientId     = "Subscribe";

        MqttClient client = new MqttClient(broker, clientId, new MemoryPersistence());
        client.setCallback(this);
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setCleanSession(false);

        System.out.println("Connecting to broker:"+broker);
        client.connect(connOpts);

        client.subscribe(topic, qos);

        BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
        try{
            //Warten Sie, bis Sie die Standardeingabe erhalten
            br.readLine();
        }catch(IOException e){
            System.exit(1);
        }
        client.disconnect();
        client.close();
        System.out.println("Disconnected");
    }
    
    /**
     *Erforderlich für MqttCallback, wird wahrscheinlich nicht von subscribe aufgerufen.
     */
	@Override
	public void deliveryComplete(IMqttDeliveryToken arg0) {
		//TODO automatisch generierter Methodenstub
		
	}
}

Geben Sie wie beim Herausgeber beim Erstellen einer Instanz den Hostnamen und den Themennamen an und starten Sie das Abonnieren mit der Methode "subscribe".

Thread-Verarbeitung

Eine Klasse, die "Thread" erbt und die Abonnementverarbeitung in der Threadverarbeitung ausführt.

MqttThread.java


package mqtt;

import org.eclipse.paho.client.mqttv3.MqttException;

public class MqttThread extends Thread{
	String broker = "";
	String topic  = "";
	static MqttSubscriber subscriber;
	public static String recieveData = "";
	
	//Konstrukteur
	public MqttThread(String brokerHostName,String subscribeTopic) {
		broker = brokerHostName;
		topic  = subscribeTopic;
		subscriber = new MqttSubscriber(broker, topic);
	}
	
	public void run() {
		try {
            subscriber.subscribe();
        } catch(MqttException me) {
            System.out.println("reason: "   + me.getReasonCode());
            System.out.println("message: "  + me.getMessage());
            System.out.println("localize: " + me.getLocalizedMessage());
            System.out.println("cause: "    + me.getCause());
            System.out.println("exception: "+ me);
        } catch (InterruptedException e) {
			//TODO automatisch generierter Fangblock
			e.printStackTrace();
		}	
	}
	
	public boolean isNew() {
		boolean flag = false;
		flag = subscriber.isNew();
		return flag;
	}
}

Anwendungsbeispiel

Der Vorgang des Abonnierens wird in einem anderen Thread ausgeführt, der Rückruffunktion (messageArrived-Funktion), die zum Zeitpunkt des Abonnierens wie in Python aufgerufen wird. Der Veröffentlichungsprozess ruft die Funktion jedes Mal zum Veröffentlichen auf.

package main;

import mqtt.MqttThread;
import mqtt.MqttPublisher;

public class testMQTTClient {
	
	public static void main(String[] args) {
            MqttThread mthread = new MqttThread("Hostname","Themenname");
            mthread.start();
            //Nur einmal veröffentlichen
            MqttPublisher publisher = new MqttPublisher("Hostname","Themenname");
            publisher.publish("Nachrichteninhalt");
            if(mthread.isNew()){
                System.out.println(mthread.recieveData);
            }
    }
}

Geben Sie beim Aufrufen des Konstruktors (beim Erstellen einer Instanz) den Hostnamen und den Themennamen an.

Mit mthread.start () wird der Abonnementprozess in einem separaten Thread ausgeführt. Es wird nur einmal mit "publisher.publish (" message ")" veröffentlicht. Sie können mit mthread.receiveData auf den abonnierten Dateninhalt zugreifen. Verwenden Sie es als Set mit der Funktion isNew () wie in Python.

Pub / Sub aus dem vorherigen Artikel starten Als Referenz können Sie Publisher und Subsrriber starten und den Vorgang überprüfen.

abschließend

Dieses Mal habe ich ein Implementierungsbeispiel in Java vorgestellt. Ich habe versucht, es nutzbar zu machen, wenn ich tatsächlich von einem anderen System wechsle, nicht nur von Pub / Sub.

Recommended Posts

[MQTT / Java] Implementierte eine Klasse, die Pub / Sub von MQTT in Java ausführt
[Java] Implementieren Sie eine Funktion, die eine im Builder-Muster implementierte Klasse verwendet
Schreiben einer Klasse, die in Java bestellt werden kann Ein kleines Standard-Memo
Was ist eine Klasse in der Java-Sprache (3 /?)
Was ist eine Klasse in der Java-Sprache (1 /?)
Was ist eine Klasse in der Java-Sprache (2 /?)
Warum nennt Java eine Datei eine Klasse?
Eine Bat-Datei, die Java in Windows verwendet
Ein kurzer Überblick über Java, das im Unterricht gelernt wurde
Allzweck-StringConverter-Klasse, die Generika in Java8 verwendet
Schreiben Sie eine Klasse in Kotlin und nennen Sie sie in Java
Ein kurzer Überblick über Java, das in Klasse 3 gelernt wurde
Eine Bibliothek, die mehrzeilige Zeichenfolgen in mehrzeiligen Java-Zeichenfolgen realisiert
Suchen Sie eine Teilmenge in Java
Ich kann in IntelliJ keine Java-Klasse mit einem bestimmten Namen erstellen
So implementieren Sie einen Job, der die Java-API in JobScheduler verwendet
Ich habe eine Sterling-Sorte geschrieben, die sich wie in Java anfühlt
Erstellen einer Klasse für eine Matrix in Java Teil 2 - Über eine Matrix (lineare Algebra) -
Java-Versionsnotation, die sich in Java 10 ändert
3 Implementieren Sie einen einfachen Interpreter in Java
Ich habe ein PDF mit Java erstellt.
StringBuffer- und StringBuilder-Klasse in Java
Ein einfaches Beispiel für Rückrufe in Java
Bleiben Sie in einem Java Primer stecken
So testen Sie eine private Methode und verspotten sie teilweise in Java
Eine Geschichte über ein in Java geschriebenes Spring Boot-Projekt, das Kotlin unterstützt
Autowired-Felder in einer Klasse, die TextWebSocketHandler in Spring Boot erbt, werden NULL
Beispielprogramm, das den Hashwert einer Datei in Java zurückgibt
Ich habe eine Klasse erstellt, die JUMAN und KNP aus Java verwenden kann
Informationen zum Zurückgeben einer Referenz in einem Java Getter
Bei der Suche nach mehreren in einem Java-Array
[Erstellen] Ein Memorandum über das Codieren in Java
Java erstellt eine Tabelle in einem Word-Dokument
Java erstellt ein Kreisdiagramm in Excel
Erstellen Sie eine TODO-App in Java 7 Create Header
Lassen Sie uns eine Taschenrechner-App mit Java erstellen
Implementieren Sie so etwas wie einen Stack in Java
Teilen Sie eine Zeichenfolge in Java mit ". (Dot)"
Implementierung eines grundlegenden Such- / Sortieralgorithmus in Java
[Java] Unterschied zwischen gleich und == in einer Zeichenfolge, die ein Referenztyp ist
Java String-Klassenmethoden, die ein wenig nützlich sind, aber nicht bekannt sind
[Java] Wo befindet sich die Implementierungsklasse der Annotation, die in BeanValidation vorhanden ist?
[Java] Implementierte einen strengen Zeilenvorschub-Stream-Reader, der Line nur mit CrLf liest.
Ursache ist nicht sichtbar, wenn Methoden anderer Klassen in Java aufgerufen werden