[MQTT / Java] Implemented a class that does MQTT Pub / Sub in Java

Introduction

I've tried using MQTT in Python / Java, so I'm posting it. This article corresponds to the following three. If you haven't done so already, please see 1 first.

  1. [MQTT] Introducing MQTT on a command basis (two times before)
  2. [Python] Implemented a class that does MQTT Pub / Sub in Python (previous)
  3. [Java] Implemented a class that does MQTT Pub / Sub in Java (this article)
  4. [ROS] Implemented a node for MQTT communication (next time)

Library, broker installation

This is in the article 2nd time before, so I will write it for those who have not installed it yet.

Library, broker installation
### 1. For windows Please download the installer according to your environment from "Binary Installation" ⇒ "Windows" on the following site. https://mosquitto.org/download/ ![image.png](https://qiita-image-store.s3.ap-northeast-1.amazonaws.com/0/426354/e6ec1314-653b-3dd2-31d7-dc1619f14f1f.png) ### 2. For Linux Execute the following two commands.
# Mosquitto(Broker)Install
$ sudo apt-get install mosquitto

#Install Mosquitto client
$ sudo apt-get install mosquitto-clients

Client library installation

The installation method of Java client library is as follows. From this link org.eclipse.paho.client.mqttv3_1.2.3.jar Please download.

Separate thread processing code for Publisher and Subscriber

Similar to the Python article, it works separately, such as ROS and OpenRTM. I want to call from the system. In Python, when you start Subscriber, if you do loop_start (), the processing of another thread seems to start, but in Java it seems that it is not (this is an unknown point. Someone can easily process in another thread. Please let me know if you can.) I also make a class that processes in another thread.

Therefore, the program in this article consists of the following programs.

--Publisher (call from another system) --Subscriber (called as a separate thread) --Thread (call Subscriber)

I referred to this article very much.

Publisher

MqttPublisher.java


package mqtt;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MqttPublisher {
	String broker = "";
	String topic  = "";
	
	/**
	 *constructor
	 * @param brokerHostName
	 * @param publishTopic
	 */
	public MqttPublisher(String brokerHostName,String publishTopic) {
		broker = "tcp://"+brokerHostName+":1883";
		topic  = publishTopic;
	}

	/**
	 *Publish the argument.
	 * @param publishMessage
	 */
	public void publish(String publishMessage) {
        final int qos             = 2;
        final String clientId     = "Publisher";
        try {
            MqttClient mqttClient = new MqttClient(broker, clientId, new MemoryPersistence());
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(false);

            mqttClient.connect(connOpts);

            MqttMessage message = new MqttMessage(publishMessage.getBytes());
            message.setQos(qos);
//            System.out.println("publish message");
//            System.out.println("Topic : "+topic+", Message : "+message);
            mqttClient.publish(topic, message);

            mqttClient.disconnect();
            mqttClient.close();
        } catch(MqttException me) {
            System.out.println("reason: "   + me.getReasonCode());
            System.out.println("message: "  + me.getMessage());
            System.out.println("localize: " + me.getLocalizedMessage());
            System.out.println("cause: "    + me.getCause());
            System.out.println("exception: "+ me);
        }
    }
	
    public static void main(String[] args) {
    	MqttPublisher publisher = new MqttPublisher("localhost","testTopic2");
    	publisher.publish("test");
    }   
}

As shown in the main function, specify the host name and topic name when creating an instance and publish the message with the publish method.

Subscriber

MqttSubscriber.java


package mqtt;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.sql.Timestamp;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MqttSubscriber  implements MqttCallback {
	Timestamp recieveTime;
	Timestamp lastTime;
	String broker = "";
	String topic  = "";
	
	/**
	 *constructor
	 * @param brokerHostName
	 * @param subscribeTopic
	 */
	public MqttSubscriber(String brokerHostName,String subscribeTopic) {
		broker = "tcp://"+brokerHostName+":1883";
		topic  = subscribeTopic;
	}
	
    /**
     *Called when losing connection with MQTT broker.
     */
    @Override
    public void connectionLost(Throwable cause) {
        System.out.println("Connection lost");
        System.exit(1);
    }

    /**
     *Called when a message is received.
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) throws MqttException {
//        System.out.println("Message arrived");
//        System.out.println("Topic:"+ topic);
//        System.out.println("Message: " + new String(message.getPayload()));
        
        recieveTime = new Timestamp(System.currentTimeMillis());
        MqttThread.recieveData = new String(message.getPayload());
    }
    
    /**
     *Determine if you have subscribed.
     * @return is New flag
     */
    public boolean isNew() {
    	boolean flag = false;
    	if(recieveTime==lastTime) flag = false;
    	else flag = true;
    	lastTime=recieveTime;
    	return flag;
    }

    public static void main(String[] args) throws InterruptedException {
        try {
            MqttSubscriber subscriber = new MqttSubscriber("localhost","testTopic1");
            subscriber.subscribe();
        } catch(MqttException me) {
            System.out.println("reason: "   + me.getReasonCode());
            System.out.println("message: "  + me.getMessage());
            System.out.println("localize: " + me.getLocalizedMessage());
            System.out.println("cause: "    + me.getCause());
            System.out.println("exception: "+ me);
        }
    }

    /**
     *Receive a message.
     *Stay connected until there is a standard input.
     * 
     * @throws MqttException
     * @throws InterruptedException 
     */
    public void subscribe() throws MqttException, InterruptedException {
        //Subscribe settings
        final int qos             = 2;
        final String clientId     = "Subscribe";

        MqttClient client = new MqttClient(broker, clientId, new MemoryPersistence());
        client.setCallback(this);
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setCleanSession(false);

        System.out.println("Connecting to broker:"+broker);
        client.connect(connOpts);

        client.subscribe(topic, qos);

        BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
        try{
            //Keep waiting until you receive standard input
            br.readLine();
        }catch(IOException e){
            System.exit(1);
        }
        client.disconnect();
        client.close();
        System.out.println("Disconnected");
    }
    
    /**
     *Required for MqttCallback, not likely to be called by subscribe.
     */
	@Override
	public void deliveryComplete(IMqttDeliveryToken arg0) {
		//TODO auto-generated method stub
		
	}
}

As with publisher, specify the host name and topic name when creating an instance and start subscribe with the subscribe method.

Thread processing

A class that inherits Thread and performs Subscribe processing by thread processing.

MqttThread.java


package mqtt;

import org.eclipse.paho.client.mqttv3.MqttException;

public class MqttThread extends Thread{
	String broker = "";
	String topic  = "";
	static MqttSubscriber subscriber;
	public static String recieveData = "";
	
	//constructor
	public MqttThread(String brokerHostName,String subscribeTopic) {
		broker = brokerHostName;
		topic  = subscribeTopic;
		subscriber = new MqttSubscriber(broker, topic);
	}
	
	public void run() {
		try {
            subscriber.subscribe();
        } catch(MqttException me) {
            System.out.println("reason: "   + me.getReasonCode());
            System.out.println("message: "  + me.getMessage());
            System.out.println("localize: " + me.getLocalizedMessage());
            System.out.println("cause: "    + me.getCause());
            System.out.println("exception: "+ me);
        } catch (InterruptedException e) {
			//TODO auto-generated catch block
			e.printStackTrace();
		}	
	}
	
	public boolean isNew() {
		boolean flag = false;
		flag = subscriber.isNew();
		return flag;
	}
}

Example of use

Subscribe processing is performed in another thread, the callback function (messageArrived function) that is called at the time of subscribe as in Python. The Publish process calls a function each time to publish.

package main;

import mqtt.MqttThread;
import mqtt.MqttPublisher;

public class testMQTTClient {
	
	public static void main(String[] args) {
            MqttThread mthread = new MqttThread("hostname","Topic name");
            mthread.start();
            //Publish only once
            MqttPublisher publisher = new MqttPublisher("hostname","Topic name");
            publisher.publish("Message content");
            if(mthread.isNew()){
                System.out.println(mthread.recieveData);
            }
    }
}

Specify the host name and topic name when calling the constructor (when creating an instance).

With mthread.start (), the subscribe process runs in a separate thread. It will be published only once with publisher.publish ("message "). You can access the subscribed data content with mthread.receiveData. Use it in combination with the ʻisNew ()` function as in Python.

Launch Pub / Sub from the previous article For reference, you can start Publisher and Subsrriber and check the operation.

in conclusion

This time, I introduced an implementation example in Java. I tried to make it usable when actually running from another system, not the operation of Pub / Sub alone.

Recommended Posts

[MQTT / Java] Implemented a class that does MQTT Pub / Sub in Java
[Java] Implement a function that uses a class implemented in the Builder pattern
Write a class that can be ordered in Java
What is a class in Java language (3 /?)
What is a class in Java language (1 /?)
What is a class in Java language (2 /?)
Why does Java call a file a class?
A bat file that uses Java in windows
A quick review of Java learned in class
General-purpose StringConverter class that utilizes generics in Java8
Write a class in Kotlin and call it in Java
A quick review of Java learned in class part3
A library that realizes multi-line strings in Java multiline-string
Find a subset in Java
I can't create a Java class with a specific name in IntelliJ
Handling when calling a key that does not exist in hash
How to implement a job that uses Java API in JobScheduler
I wrote a Stalin sort that feels like a mess in Java
Creating a matrix class in Java Part 2-About matrices (linear algebra)-
Java version notation that changes in Java 10
3 Implement a simple interpreter in Java
I created a PDF in Java.
StringBuffer and StringBuilder Class in Java
A simple sample callback in Java
Get stuck in a Java primer
How to test a private method in Java and partially mock that method
A story about a Spring Boot project written in Java that supports Kotlin
Autowired fields in a class that inherits TextWebSocketHandler in Spring Boot become NULL
Sample program that returns the hash value of a file in Java
I made a class that can use JUMAN and KNP from Java
About returning a reference in a Java Getter
When seeking multiple in a Java array
[Creating] A memorandum about coding in Java
Java creates a table in a Word document
Java creates a pie chart in Excel
Create a TODO app in Java 7 Create Header
Try making a calculator app in Java
Implement something like a stack in Java
Split a string with ". (Dot)" in Java
Implemented basic search / sort algorithm in Java
[Java] Difference between equals and == in a character string that is a reference type
Java String class methods that are a little useful to know but don't know
[Java] Where is the implementation class of annotation that exists in Bean Validation?
[Java] Implemented a strict line feed stream reader that readsLine only with CrLf.
Cause of is not visible when calling a method of another class in java