I've tried using MQTT in Python / Java, so I'm posting it. This article corresponds to the following three. If you haven't done so already, please see 1 first.
This is in the article 2nd time before, so I will write it for those who have not installed it yet.
# Mosquitto(Broker)Install
$ sudo apt-get install mosquitto
#Install Mosquitto client
$ sudo apt-get install mosquitto-clients
The installation method of Java client library is as follows. From this link org.eclipse.paho.client.mqttv3_1.2.3.jar Please download.
Similar to the Python article, it works separately, such as ROS and OpenRTM. I want to call from the system.
In Python, when you start Subscriber, if you do loop_start ()
, the processing of another thread seems to start, but in Java it seems that it is not (this is an unknown point. Someone can easily process in another thread. Please let me know if you can.) I also make a class that processes in another thread.
Therefore, the program in this article consists of the following programs.
--Publisher (call from another system) --Subscriber (called as a separate thread) --Thread (call Subscriber)
I referred to this article very much.
Publisher
MqttPublisher.java
package mqtt;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MqttPublisher {
String broker = "";
String topic = "";
/**
*constructor
* @param brokerHostName
* @param publishTopic
*/
public MqttPublisher(String brokerHostName,String publishTopic) {
broker = "tcp://"+brokerHostName+":1883";
topic = publishTopic;
}
/**
*Publish the argument.
* @param publishMessage
*/
public void publish(String publishMessage) {
final int qos = 2;
final String clientId = "Publisher";
try {
MqttClient mqttClient = new MqttClient(broker, clientId, new MemoryPersistence());
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(false);
mqttClient.connect(connOpts);
MqttMessage message = new MqttMessage(publishMessage.getBytes());
message.setQos(qos);
// System.out.println("publish message");
// System.out.println("Topic : "+topic+", Message : "+message);
mqttClient.publish(topic, message);
mqttClient.disconnect();
mqttClient.close();
} catch(MqttException me) {
System.out.println("reason: " + me.getReasonCode());
System.out.println("message: " + me.getMessage());
System.out.println("localize: " + me.getLocalizedMessage());
System.out.println("cause: " + me.getCause());
System.out.println("exception: "+ me);
}
}
public static void main(String[] args) {
MqttPublisher publisher = new MqttPublisher("localhost","testTopic2");
publisher.publish("test");
}
}
As shown in the main function, specify the host name and topic name when creating an instance and publish the message with the publish
method.
Subscriber
MqttSubscriber.java
package mqtt;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.sql.Timestamp;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MqttSubscriber implements MqttCallback {
Timestamp recieveTime;
Timestamp lastTime;
String broker = "";
String topic = "";
/**
*constructor
* @param brokerHostName
* @param subscribeTopic
*/
public MqttSubscriber(String brokerHostName,String subscribeTopic) {
broker = "tcp://"+brokerHostName+":1883";
topic = subscribeTopic;
}
/**
*Called when losing connection with MQTT broker.
*/
@Override
public void connectionLost(Throwable cause) {
System.out.println("Connection lost");
System.exit(1);
}
/**
*Called when a message is received.
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws MqttException {
// System.out.println("Message arrived");
// System.out.println("Topic:"+ topic);
// System.out.println("Message: " + new String(message.getPayload()));
recieveTime = new Timestamp(System.currentTimeMillis());
MqttThread.recieveData = new String(message.getPayload());
}
/**
*Determine if you have subscribed.
* @return is New flag
*/
public boolean isNew() {
boolean flag = false;
if(recieveTime==lastTime) flag = false;
else flag = true;
lastTime=recieveTime;
return flag;
}
public static void main(String[] args) throws InterruptedException {
try {
MqttSubscriber subscriber = new MqttSubscriber("localhost","testTopic1");
subscriber.subscribe();
} catch(MqttException me) {
System.out.println("reason: " + me.getReasonCode());
System.out.println("message: " + me.getMessage());
System.out.println("localize: " + me.getLocalizedMessage());
System.out.println("cause: " + me.getCause());
System.out.println("exception: "+ me);
}
}
/**
*Receive a message.
*Stay connected until there is a standard input.
*
* @throws MqttException
* @throws InterruptedException
*/
public void subscribe() throws MqttException, InterruptedException {
//Subscribe settings
final int qos = 2;
final String clientId = "Subscribe";
MqttClient client = new MqttClient(broker, clientId, new MemoryPersistence());
client.setCallback(this);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(false);
System.out.println("Connecting to broker:"+broker);
client.connect(connOpts);
client.subscribe(topic, qos);
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
try{
//Keep waiting until you receive standard input
br.readLine();
}catch(IOException e){
System.exit(1);
}
client.disconnect();
client.close();
System.out.println("Disconnected");
}
/**
*Required for MqttCallback, not likely to be called by subscribe.
*/
@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
//TODO auto-generated method stub
}
}
As with publisher, specify the host name and topic name when creating an instance and start subscribe with the subscribe
method.
A class that inherits Thread
and performs Subscribe processing by thread processing.
MqttThread.java
package mqtt;
import org.eclipse.paho.client.mqttv3.MqttException;
public class MqttThread extends Thread{
String broker = "";
String topic = "";
static MqttSubscriber subscriber;
public static String recieveData = "";
//constructor
public MqttThread(String brokerHostName,String subscribeTopic) {
broker = brokerHostName;
topic = subscribeTopic;
subscriber = new MqttSubscriber(broker, topic);
}
public void run() {
try {
subscriber.subscribe();
} catch(MqttException me) {
System.out.println("reason: " + me.getReasonCode());
System.out.println("message: " + me.getMessage());
System.out.println("localize: " + me.getLocalizedMessage());
System.out.println("cause: " + me.getCause());
System.out.println("exception: "+ me);
} catch (InterruptedException e) {
//TODO auto-generated catch block
e.printStackTrace();
}
}
public boolean isNew() {
boolean flag = false;
flag = subscriber.isNew();
return flag;
}
}
Subscribe processing is performed in another thread, the callback function (messageArrived
function) that is called at the time of subscribe as in Python.
The Publish process calls a function each time to publish.
package main;
import mqtt.MqttThread;
import mqtt.MqttPublisher;
public class testMQTTClient {
public static void main(String[] args) {
MqttThread mthread = new MqttThread("hostname","Topic name");
mthread.start();
//Publish only once
MqttPublisher publisher = new MqttPublisher("hostname","Topic name");
publisher.publish("Message content");
if(mthread.isNew()){
System.out.println(mthread.recieveData);
}
}
}
Specify the host name and topic name when calling the constructor (when creating an instance).
With mthread.start ()
, the subscribe process runs in a separate thread.
It will be published only once with publisher.publish ("message ")
.
You can access the subscribed data content with mthread.receiveData
. Use it in combination with the ʻisNew ()` function as in Python.
Launch Pub / Sub from the previous article For reference, you can start Publisher and Subsrriber and check the operation.
This time, I introduced an implementation example in Java. I tried to make it usable when actually running from another system, not the operation of Pub / Sub alone.
Recommended Posts