[Java] [MQTT/Java] Implemented MQTT Pub/Sub class in Java

4 minute read

Introduction

I posted it because I tried using MQTT with Python/Java. This article is 3 below. If you haven’t already, please take a look at 1.

  1. [MQTT] Command-based introduction of MQTT(previously)
  2. [Python] Implemented MQTT Pub/Sub class in Python(previous)
  3. [Java] Implemented MQTT Pub/Sub class in Java (this article)
  4. [ROS] Implemented MQTT communication node (next time)

Install library, broker

This is from the previous previous issue article, so write it for those who have not yet installed it.

Installing libraries and brokers
### 1. For windows Please download the installer according to your environment from "Binary Installation" ⇒ "Windows" on the following site. https://mosquitto.org/download/ ![image.png](https://qiita-image-store.s3.ap-northeast-1.amazonaws.com/0/426354/e6ec1314-653b-3dd2-31d7-dc1619f14f1f.png) ### 2. For Linux Execute the following two commands. ```shell # Mosquitto (Broker) installed $ sudo apt-get install mosquitto # Install Mosquitto client $ sudo apt-get install mosquitto-clients ``` ## Client library installation The method for installing the Java client library is as follows. From this [link](https://repo.eclipse.org/content/repositories/paho-releases/org/eclipse/paho/org.eclipse.paho.client.mqttv3/) org.eclipse.paho.client.mqttv3_1.2.3.jar Please download.

Publisher and Subscriber and separate thread processing code

Similar to the Python article, ROSandOpenRTM work separately. I want to call it from the system. In Python, when I started Subscriber with loop_start(), it seemed that the processing of another thread would start, but it seems that it is not so in Java (this is a unclear point. Please let me know if you can.) I am also creating a class that processes in a separate thread.

Therefore, the program of this article consists of the following programs.

-Publisher (call from another system) -Subscriber (called as a separate thread) -Thread (call Subscriber)

This article was very helpful.

Publisher

MqttPublisher.java


package mqtt;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MqttPublisher {
String broker = "";
String topic = "";
It's a sequel.
/**
* Constructor
* @param brokerHostName
* @param publishTopic
*/
public MqttPublisher(String brokerHostName,String publishTopic) {
broker = "tcp://"+brokerHostName+":1883";
topic = publishTopic;
}

/**
* Publish the argument.
* @param publishMessage
*/
public void publish(String publishMessage) {
        final int qos = 2;
        final String clientId = "Publisher";
        try {
            MqttClient mqttClient = new MqttClient(broker, clientId, new MemoryPersistence());
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(false);

            mqttClient.connect(connOpts);

            MqttMessage message = new MqttMessage(publishMessage.getBytes());
            message.setQos(qos);
// System.out.println("publish message");
// System.out.println("Topic :"+topic+", Message :"+message);
            mqttClient.publish(topic, message);

            mqttClient.disconnect();
            mqttClient.close();
        } catch(MqttException me) {
            System.out.println("reason: "+ me.getReasonCode());
            System.out.println("message: "+ me.getMessage());
            System.out.println("localize: "+ me.getLocalizedMessage());
            System.out.println("cause: "+ me.getCause());
            System.out.println("exception: "+ me);
        }
    }
It's a sequel.
    public static void main(String[] args) {
    MqttPublisher publisher = new MqttPublisher("localhost","testTopic2");
    publisher.publish("test");
    }
}

As in the main function, publish the message with the publish method by specifying the host name and topic name when creating the instance.

Subscriber

MqttSubscriber.java


package mqtt;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.sql.Timestamp;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MqttSubscriber implements MqttCallback {
Timestamp recieveTime;
Timestamp lastTime;
String broker = "";
String topic = "";
It's a sequel.
/**
* Constructor
* @param brokerHostName
* @param subscribeTopic
*/
public MqttSubscriber(String brokerHostName,String subscribeTopic) {
broker = "tcp://"+brokerHostName+":1883";
topic = subscribeTopic;
}
It's a sequel.
    /**
     * Called when the connection with the MQTT broker is lost.
     */
    @Override
    public void connectionLost(Throwable cause) {
        System.out.println("Connection lost");
        System.exit(1);
    }

    /**
     * Called when a message is received.
     */
    @Overridepublic void messageArrived(String topic, MqttMessage message) throws MqttException {
//        System.out.println("Message arrived");
//        System.out.println("Topic:"+ topic);
//        System.out.println("Message: " + new String(message.getPayload()));
        
        recieveTime = new Timestamp(System.currentTimeMillis());
        MqttThread.recieveData = new String(message.getPayload());
    }
    
    /**
     * Subscribeしたか否かを判断する.
     * @return isNewフラグ
     */
    public boolean isNew() {
     boolean flag = false;
     if(recieveTime==lastTime) flag = false;
     else flag = true;
     lastTime=recieveTime;
     return flag;
    }

    public static void main(String[] args) throws InterruptedException {
        try {
            MqttSubscriber subscriber = new MqttSubscriber("localhost","testTopic1");
            subscriber.subscribe();
        } catch(MqttException me) {
            System.out.println("reason: "   + me.getReasonCode());
            System.out.println("message: "  + me.getMessage());
            System.out.println("localize: " + me.getLocalizedMessage());
            System.out.println("cause: "    + me.getCause());
            System.out.println("exception: "+ me);
        }
    }

    /**
     * メッセージを受信する.
     * 標準入力があるまで接続し続ける.
     * 
     * @throws MqttException
     * @throws InterruptedException 
     */
    public void subscribe() throws MqttException, InterruptedException {
        //Subscribe設定
        final int qos             = 2;
        final String clientId     = "Subscribe";

        MqttClient client = new MqttClient(broker, clientId, new MemoryPersistence());
        client.setCallback(this);
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setCleanSession(false);

        System.out.println("Connecting to broker:"+broker);
        client.connect(connOpts);

        client.subscribe(topic, qos);

        BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
        try{
            //標準入力を受け取るまで待ち続ける
            br.readLine();
        }catch(IOException e){
            System.exit(1);
        }
        client.disconnect();
        client.close();
        System.out.println("Disconnected");
    }
    
    /**
     * MqttCallbackに必要,subscribeからは呼び出されなさそう.
     */
 @Override
 public void deliveryComplete(IMqttDeliveryToken arg0) {
  // TODO 自動生成されたメソッド・スタブ
  
 }
}

こちらもpublisherと同様にインスタンス生成時にホスト名とトピック名を指定してsubscribeメソッドでsubscribeを開始しします。

スレッド処理

Threadを継承して、Subscribeの処理をスレッド処理で行うクラスです。

MqttThread.java


package mqtt;

import org.eclipse.paho.client.mqttv3.MqttException;

public class MqttThread extends Thread{
 String broker = "";
 String topic  = "";
 static MqttSubscriber subscriber;
 public static String recieveData = "";
 
 //コンストラクタ
 public MqttThread(String brokerHostName,String subscribeTopic) {
  broker = brokerHostName;
  topic  = subscribeTopic;
  subscriber = new MqttSubscriber(broker, topic);
 }
 
 public void run() {
  try {
            subscriber.subscribe();
        } catch(MqttException me) {
            System.out.println("reason: "   + me.getReasonCode());
            System.out.println("message: "  + me.getMessage());
            System.out.println("localize: " + me.getLocalizedMessage());
            System.out.println("cause: "    + me.getCause());
            System.out.println("exception: "+ me);
        } catch (InterruptedException e) {
   // TODO 自動生成された catch ブロック
   e.printStackTrace();
  } 
 }
 
 public boolean isNew() {
  boolean flag = false;
  flag = subscriber.isNew();
  return flag;
 }
}

使用例

Subscribeの処理は別スレッド、Pythonと同様にsubscribe時に呼び出されるcallback関数内(messageArrived関数)で処理を行います。 Publishの処理は都度関数を呼び出してpublishします。

package main;

import mqtt.MqttThread;
import mqtt.MqttPublisher;

public class testMQTTClient {
 
 public static void main(String[] args) {
            MqttThread mthread = new MqttThread("ホスト名","トピック名");
            mthread.start();
            // 1回だけpublish
            MqttPublisher publisher = new MqttPublisher("ホスト名","トピック名");
            publisher.publish("メッセージ内容");
            if(mthread.isNew()){
                System.out.println(mthread.recieveData);
            }
    }
}

コンストラクタ呼び出し時(インスタンス生成時)にホスト名とトピック名を指定します。

mthread.start()でsubscribeの処理が別スレッドで動きます。 publisher.publish("メッセージ")で1回だけpublishされます。 subscribeされたデータ内容にはmthread.receiveDataでアクセスできます。Pythonのものと同様にisNew()関数とセットで使ってください。

前々回の記事のPub/Subの起動の部分を参考に、PublisherとSubsrriberを起動して動作確認できると思います。

おわりに

今回はJavaでの実装例を紹介しました。 Pub/Sub単体での動作ではなく、実際にほかのシステムから動かす際に使えるようにしてみました。