Logiciel open source qui implémente le protocole MQTT. http://mosquitto.org/
Un autre OSS qui implémente MQTT semble être Apache Apollo.
Protocole de modèle de publication / abonnement (publication-abonnement) https://sango.shiguredo.jp/mqtt
Un type de paradigme de messagerie asynchrone qui permet à un expéditeur de message (éditeur) d'envoyer un message sans supposer un destinataire spécifique (abonné). https://ja.wikipedia.org/wiki/%E5%87%BA%E7%89%88-%E8%B3%BC%E8%AA%AD%E5%9E%8B%E3%83%A2%E3%83%87%E3%83%AB
Dans la messagerie asynchrone, les messages sont lancés les uns après les autres sans attendre le résultat, de sorte que le moment du traitement des messages ne correspond pas. http://ledsun.hatenablog.com/entry/2013/07/18/181044
Le site suivant sera utile. http://dev.classmethod.jp/cloud/setting-up-mosquitto-logging-on-amazon-linux/
[root@localhost tmp]# wget http://download.opensuse.org/repositories/home:/oojah:/mqtt/CentOS_CentOS-7/home:oojah:mqtt.repo -O "/etc/yum.repos.d/Mosquitto.repo"
[root@localhost tmp]# yum install mosquitto mosquitto-clients
[root@localhost tmp]# ls -litr /etc/yum.repos.d/Mosquitto.repo
[root@localhost tmp]# less /etc/init.d/mosquitto
#! /bin/sh
~~~~~
### BEGIN INIT INFO
# Provides: mosquitto
# Required-Start: $network $remote_fs
# Required-Stop: $network $remote_fs
# Default-Start: 3 5
# Default-Stop: 0 1 2 6
# Short-Description: Mosquitto MQTT broker
# Description: Mosquitto MQTT broker
### END INIT INFO
[root@localhost tmp]# /sbin/chkconfig --add mosquitto
[root@localhost tmp]# /sbin/chkconfig --list mosquitto
mosquitto 0:off 1:off 2:off 3:on 4:off 5:on 6:off
[root@localhost tmp]# /sbin/chkconfig mosquitto on
[root@localhost tmp]# /sbin/chkconfig --list mosquitto
mosquitto 0:off 1:off 2:on 3:on 4:on 5:on 6:off
[root@localhost tmp]# /etc/init.d/mosquitto start
Starting mosquitto (via systemctl): [ OK ]
Installez monit (logiciel de surveillance) et configurez-le pour qu'il démarre automatiquement lorsque le moustique tombe en panne. https://easyengine.io/tutorials/monitoring/monit/
[root@localhost tmp]# cd ~
[root@localhost tmp]# wget http://mmonit.com/monit/dist/binary/5.14/monit-5.14-linux-x64.tar.gz
[root@localhost tmp]# tar zxvf monit-5.14-linux-x64.tar.gz
[root@localhost tmp]# cd monit-5.14/
[root@localhost tmp]# cp bin/monit /usr/bin/monit
[root@localhost tmp]# mkdir /etc/monit
[root@localhost tmp]# touch /etc/monit/monitrc
[root@localhost tmp]# chmod 0700 /etc/monit/monitrc
[root@localhost tmp]# ln -s /etc/monit/monitrc /etc/monitrc
[root@localhost tmp]# wget https://gist.githubusercontent.com/rahul286/9975061/raw/1aa107e62ecaaa2dacfdb61a12f13efb6f15005b/monit -P /etc/init.d/
[root@localhost tmp]# chmod u+x /etc/init.d/monit
[root@localhost tmp]# echo "START=yes" > /etc/default/monit
[root@localhost tmp]# monit -t
[root@localhost tmp]# /sbin/chkconfig --add monit
[root@localhost tmp]# /sbin/chkconfig monit on
[root@localhost tmp]# /sbin/chkconfig --list monit
[root@localhost tmp]# view /etc/monit.d/mosquitto.conf
check process mosquitto with pidfile /var/run/mosquitto.pid
start = "/etc/init.d/mosquitto start"
stop = "/etc/init.d/mosquitto stop"
[root@localhost tmp]# sudo mkdir /var/log/mosquitto
[root@localhost tmp]# sudo chown mosquitto /var/log/mosquitto
[root@localhost tmp]# view /etc/mosquitto/mosquitto.conf
Total 0
pid_file /var/run/mosquitto.pid
persistence true
persistence_location /var/lib/mosquitto/
log_dest syslog
log_dest file /var/log/mosquitto/mosquitto.log
#log_type debug
log_type error
log_type warning
log_type notice
log_type information
#log_type none
log_type subscribe
log_type unsubscribe
#log_type websockets
#log_type all
connection_messages true
log_timestamp true
include_dir /etc/mosquitto/conf.d
[root@localhost tmp]# /etc/init.d/mosquitto reload
Lancez deux terminaux et exécutez les commandes suivantes pour chacun.
À partir du sujet capteurs / température, publiez la valeur 32 pour vous-même avec qos1 et abonnez-vous.
#Souscrire
[root@localhost tmp]# mosquitto_sub -t sensors/temperature -q
#Publier
[root@localhost tmp]# mosquitto_pub -t sensors/temperature -m 32 -q 1
#Si vous souhaitez envoyer un message contenant un code de saut de ligne-Vous pouvez publier un message contenant un code de saut de ligne en publiant le contenu du fichier à l'aide de l'option f.
[root@localhost tmp]# mosquitto_pub -t sensors/temperature -f /var/tmp/test.txt
http://mosquitto.org/man/mosquitto_sub-1.html http://mosquitto.org/man/mosquitto_pub-1.html
Ce qui suit sera utile. https://librabuch.jp/2015/09/mosquiito_paho_python_mqtt/
Installez pip qui gère les packages python. C'est facile car vous pouvez installer python en utilisant pip avec une seule commande.
[root@localhost opt]# curl -kL https://bootstrap.pypa.io/get-pip.py | python curl -kL https://bootstrap.pypa.io/get-pip.py | python
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 1487k 100 1487k 0 0 1631k 0 --:--:-- --:--:-- --:--:-- 1630k
Collecting pip
Downloading pip-8.1.1-py2.py3-none-any.whl (1.2MB)
100% |████████████████████████████████| 1.2MB 333kB/s
Collecting wheel
Downloading wheel-0.29.0-py2.py3-none-any.whl (66kB)
100% |████████████████████████████████| 71kB 1.9MB/s
Installing collected packages: pip, wheel
Successfully installed pip-8.1.1 wheel-0.29.0
paho est une bibliothèque Eclipse qui fournit la fonctionnalité MQTT.
https://eclipse.org/paho/ https://pypi.python.org/pypi/paho-mqtt/1.1
[root@localhost opt]# pip install paho-mqtt
Collecting paho-mqtt
Downloading paho-mqtt-1.1.tar.gz (41kB)
100% |████████████████████████████████| 51kB 3.4MB/s
Building wheels for collected packages: paho-mqtt
Running setup.py bdist_wheel for paho-mqtt ... done
Stored in directory: /root/.cache/pip/wheels/97/db/5f/1ddca8ee2f9b58f9bb68208323bd39bb0b177f32f434aa4b95
Successfully built paho-mqtt
Installing collected packages: paho-mqtt
Successfully installed paho-mqtt-1.1
[root@localhost opt]# ls -litr /usr/lib/python2.7/site-packages/paho/mqtt
Total 196
135479923 -rw-r--r--.1 racine racine 8713 19 mars 23:15 publish.py
135479924 -rw-r--r--.1 racine racine 91388 19 mars 23:15 client.py
135479925 -rw-r--r--.1 racine racine 20 mars 19 23:15 __init__.py
135479926 -rw-r--r--.1 racine racine 170 19 mars 23:15 __init__.pyc
135479927 -rw-r--r--.1 racine racine 71288 19 mars 23:15 client.pyc
135479928 -rw-r--r--.1 racine racine 8332 19 mars 23:15 publish.pyc
Préparez également les éditeurs et les abonnés.
from time import sleep
import paho.mqtt.client as mqtt
HOST = '127.0.0.1'
PORT = 1883
KEEP_ALIVE = 60
TOPIC = 'test_topic/test1'
MESSAGE = 'test message'
PUBLISH_NUMBER = 5
SLEEP_TIME = 5
def publish_many_times(client, topic='topic/default', message='default', number=1, time=1, print_flag=False):
for i in range(number):
client.publish(topic, message)
if print_flag == True:
print (topic + ' ' + message)
sleep(time)
client.disconnect()
if __name__ == '__main__':
client = mqtt.Client(protocol=mqtt.MQTTv311)
print "publish start " + str(type(client))
client.connect(HOST, port=PORT, keepalive=KEEP_ALIVE)
publish_many_times(client,TOPIC, MESSAGE, PUBLISH_NUMBER, SLEEP_TIME)
# -*- coding: utf-8 -*-
import paho.mqtt.client as mqtt
HOST = '127.0.0.1'
PORT = 1883
KEEP_ALIVE = 60
TOPIC = 'test_topic/test1'
"""
Exécuter en essayant de se connecter
def on_connect(client, userdata, flags, respons_code):
* client
Instance de classe client
* userdata
Lors de la création d'une instance d'une nouvelle classe Client avec n'importe quel type de données>Peut être mis en place
* flags
Dictionnaire avec indicateurs de réponse
Valable pour les utilisateurs dont la session propre est définie sur 0.
Déterminez si la session existe toujours.
Si la session de nettoyage est 0, reconnectez-vous à l'utilisateur précédemment connecté.
0 :La session n'existe pas
1 :La session existe
* respons_code
Le code de réponse indique si la connexion a réussi.
0:Connexion réussie
1:La connexion a échoué- incorrect protocol version
2:La connexion a échoué- invalid client identifier
3:La connexion a échoué- server unavailable
4:La connexion a échoué- bad username or password
5:La connexion a échoué- not authorised
"""
def on_connect(client, userdata, flags, respons_code):
print('status {0}'.format(respons_code))
client.subscribe(client.topic)
"""
def on_message(client, userdata, message):
Exécuter lorsque le sujet est reçu
"""
def on_message(client, userdata, message):
print(message.topic + ' ' + str(message.payload))
if __name__ == '__main__':
client = mqtt.Client(protocol=mqtt.MQTTv311)
client.topic = TOPIC
client.on_connect = on_connect
client.on_message = on_message
client.connect(HOST, port=PORT, keepalive=KEEP_ALIVE)
#boucle
client.loop_forever()
[root@localhost tmp]# python publisher.py
[root@localhost tmp]# python subscriber.py
status 0
test_topic/test1 test message
test_topic/test1 test message
test_topic/test1 test message
client.on_connect () et client.on_message () sont des fonctions de rappel Dans la boucle à l'intérieur de client.loop_forever (), il est appelé et exécuté par le gestionnaire.
Voir ici pour les rappels.
def _handle_connack(self):
if self._strict_protocol:
if self._in_packet['remaining_length'] != 2:
return MQTT_ERR_PROTOCOL
if len(self._in_packet['packet']) != 2:
return MQTT_ERR_PROTOCOL
(flags, result) = struct.unpack("!BB", self._in_packet['packet'])
if result == CONNACK_REFUSED_PROTOCOL_VERSION and self._protocol == MQTTv311:
self._easy_log(MQTT_LOG_DEBUG, "Received CONNACK ("+str(flags)+", "+str(result)+"), attempting downgrade to MQTT v3.1.")
# Downgrade to MQTT v3.1
self._protocol = MQTTv31
return self.reconnect()
if result == 0:
self._state = mqtt_cs_connected
self._easy_log(MQTT_LOG_DEBUG, "Received CONNACK ("+str(flags)+", "+str(result)+")")
self._callback_mutex.acquire()
if self.on_connect:
self._in_callback = True
if sys.version_info[0] < 3:
argcount = self.on_connect.func_code.co_argcount
else:
argcount = self.on_connect.__code__.co_argcount
if argcount == 3:
self.on_connect(self, self._userdata, result)
else:
flags_dict = dict()
flags_dict['session present'] = flags & 0x01
self.on_connect(self, self._userdata, flags_dict, result)
self._in_callback = False
self._callback_mutex.release()
if result == 0:
rc = 0
self._out_message_mutex.acquire()
for m in self._out_messages:
m.timestamp = time.time()
if m.state == mqtt_ms_queued:
self.loop_write() # Process outgoing messages that have just been queued up
self._out_message_mutex.release()
return MQTT_ERR_SUCCESS
if m.qos == 0:
self._in_callback = True # Don't call loop_write after _send_publish()
rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
self._in_callback = False
if rc != 0:
self._out_message_mutex.release()
return rc
elif m.qos == 1:
if m.state == mqtt_ms_publish:
self._inflight_messages = self._inflight_messages + 1
m.state = mqtt_ms_wait_for_puback
self._in_callback = True # Don't call loop_write after _send_publish()
rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
self._in_callback = False
if rc != 0:
self._out_message_mutex.release()
return rc
elif m.qos == 2:
if m.state == mqtt_ms_publish:
self._inflight_messages = self._inflight_messages + 1
m.state = mqtt_ms_wait_for_pubrec
self._in_callback = True # Don't call loop_write after _send_publish()
rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
self._in_callback = False
if rc != 0:
self._out_message_mutex.release()
return rc
elif m.state == mqtt_ms_resend_pubrel:
self._inflight_messages = self._inflight_messages + 1
m.state = mqtt_ms_wait_for_pubcomp
self._in_callback = True # Don't call loop_write after _send_pubrel()
rc = self._send_pubrel(m.mid, m.dup)
self._in_callback = False
if rc != 0:
self._out_message_mutex.release()
return rc
self.loop_write() # Process outgoing messages that have just been queued up
self._out_message_mutex.release()
return rc
elif result > 0 and result < 6:
return MQTT_ERR_CONN_REFUSED
else:
return MQTT_ERR_PROTOCOL
def _handle_on_message(self, message):
self._callback_mutex.acquire()
matched = False
for t in self.on_message_filtered:
if topic_matches_sub(t[0], message.topic):
self._in_callback = True
t[1](self,self._userdata,message)
self._in_callback = False
matched = True
if matched == False and self.on_message:
self._in_callback = True
self.on_message(self,self._userdata,message)
self._in_callback = False
self._callback_mutex.release()
Recommended Posts