Tutoriel RabbitMQ 3 (Publier / S'abonner)

Tutoriel Rabbit MQ 3 https://www.rabbitmq.com/tutorials/tutorial-three-python.html C'est une traduction de. Nous sommes impatients de signaler toute erreur de traduction.

Conditions préalables

Ce didacticiel suppose que RabbitMQ est installé et en cours d'exécution sur le port standard (5672) sur l'hôte local. Si vous souhaitez utiliser un hôte, un port ou des informations d'identification différents, vous devez ajuster vos paramètres de connexion.

Si un problème survient

Si vous rencontrez des problèmes grâce à ce tutoriel, vous pouvez nous contacter via la liste de diffusion.

Publier / S'abonner

(Utilisation du client Python Pika 0.9.8)

Dans le didacticiel précédent, vous avez créé une file d'attente de travail. L'hypothèse de la file d'attente de travail est que chaque tâche est livrée à exactement un travailleur. Dans cette partie, nous ferons quelque chose de complètement différent: livrer le message à plusieurs consommateurs. Ce modèle est connu sous le nom de «publication / abonnement».

Construisez un système de journalisation simple pour illustrer le modèle. Il se compose de deux programmes, le premier est d'envoyer un message de journal et le second est de recevoir et d'imprimer.

Dans le système de journalisation, chaque copie en cours d'exécution du programme récepteur reçoit un message. Par conséquent, vous pouvez exécuter un récepteur pour écrire le journal sur le disque et en même temps exécuter l'autre récepteur pour voir le journal à l'écran.

Fondamentalement, les messages de journal publiés seront diffusés à tous les destinataires.

échange

Dans la partie précédente du didacticiel, nous avons envoyé et reçu des messages vers et depuis la file d'attente. Nous présentons ici le modèle de messagerie complet de Rabbit.

Revenons à ce que nous avons expliqué dans le tutoriel précédent:

L'idée centrale du modèle de messagerie dans RabbitMQ est que les producteurs n'envoient pas de messages directement à la file d'attente. En fait, souvent les producteurs ne savent même pas du tout si un message sera remis dans une file d'attente.

Au lieu de cela, les producteurs ne peuvent envoyer des messages qu'à un «échange». L'échange est très simple. Recevoir les messages des producteurs d'une part et les pousser dans la file d'attente d'autre part. L'échange doit savoir exactement quoi faire avec le message reçu. Dois-je l'ajouter à une file d'attente spécifique? Dois-je l'ajouter à de nombreuses files d'attente? Ou devrait-il être détruit? Ces lois sont définies par le type d'échange.

Il existe plusieurs types d'échange disponibles: sujets directs, en-têtes, ventilateurs. Concentrons-nous sur le dernier, c'est-à-dire en éventail. Créons maintenant un échange de ce type et appelons-le "logs":

channel.exchange_declare(exchange='logs',
                         type='fanout')

L'échange de fan-out est très simple. Comme vous pouvez l'imaginer d'après son nom, il diffuse chaque message qu'il reçoit dans toutes les files d'attente qu'il connaît. Et c'est exactement ce dont notre enregistreur a besoin.

Liste des échanges

Utilisez rabbitmqctl pour lister les échanges sur le serveur.

    $ sudo rabbitmqctl list_exchanges
    Listing exchanges ...
    logs      fanout
    amq.direct      direct
    amq.topic       topic
    amq.fanout      fanout
    amq.headers     headers
    ...done.

Il y en a plusieurs dans cette liste"amq.*"Il y a un échange appelé et un échange par défaut (sans nom). Ceux-ci sont créés par défaut, mais vous devez rarement les utiliser pour le moment.

Échange anonyme

Dans la partie précédente du tutoriel, je ne savais rien de l'échange, mais j'ai pu envoyer un message à la file d'attente. Chaîne vide ("") A été utilisé car il utilisait l'échange par défaut identifié par.

Rappelez-vous comment vous aviez l'habitude de publier votre message:

    channel.basic_publish(exchange='',
                          routing_key='hello',
                          body=message)

Le paramètre d'échange est le nom de l'échange. Une chaîne vide signifie un échange par défaut ou anonyme, s'il existe, le message est en cours d'acheminement_Il sera acheminé vers la file d'attente avec le nom spécifié par clé.

Au lieu de cela, vous pouvez publier sur un échange nommé:

channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)

Repère temporaire

Auparavant, vous utilisiez une file d'attente avec le nom spécifié (rappelez-vous bonjour et task_queue?). Être capable de nommer la file d'attente est très important, il était nécessaire que les travailleurs pointent vers la même file d'attente. Il est important de nommer une file d'attente lorsque vous souhaitez partager la file d'attente entre producteurs et consommateurs.

Mais ce n'est pas le cas avec nos bûcherons. Je veux tout entendre, pas une partie du message du journal. Nous ne sommes également intéressés que par les messages qui circulent actuellement, pas par les anciens messages. Deux choses sont nécessaires pour résoudre ce problème.

Tout d'abord, chaque fois que vous vous connectez à Rabbit, vous avez besoin d'une nouvelle file d'attente vide. Pour ce faire, vous pouvez créer une file d'attente avec un nom aléatoire, ou mieux, le serveur choisira un nom de file d'attente aléatoire pour vous. Vous pouvez le faire en ** ne donnant pas de paramètres de file d'attente ** à queue_declare:

result = channel.queue_declare()

Où result.method.queue contient un nom de file d'attente aléatoire. Par exemple, "amq.gen-JzTY20BRgKO-HjmUJj0wLg".

Deuxièmement, la file d'attente doit être supprimée une fois que le consommateur est déconnecté. Il existe un drapeau exclusif pour cela:

result = channel.queue_declare(exclusive=True)

contraignant

Jusqu'à présent, vous avez créé un échange et une file d'attente en éventail. Ensuite, vous devez demander à l'échange d'envoyer un message à la file d'attente. La relation entre un échange et une file d'attente est appelée liaison.

channel.queue_bind(exchange='logs',
                   queue=result.method.queue)

L'échange "logs" va maintenant ajouter le message à notre file d'attente.

Liste des liaisons

Comme vous pouvez l'imaginer, rabbitmqctl list_Vous pouvez utiliser des liaisons pour répertorier les liaisons existantes.

Tout résumé

Le programme producteur qui envoie le message du journal n'est pas très différent du didacticiel précédent. Le changement le plus important est de publier le message dans l'échange "logs" au lieu de l'échange anonyme. Vous devez donner routing_key lors de l'envoi, mais cette valeur sera ignorée car il s'agit d'un échange de fanout. code pour le script emit_log.py:

 1    #!/usr/bin/env python
 2    import pika
 3    import sys
 4
 5    connection = pika.BlockingConnection(pika.ConnectionParameters(
 6            host='localhost'))
 7    channel = connection.channel()
 8
 9    channel.exchange_declare(exchange='logs',
10                             type='fanout')
11
12    message = ' '.join(sys.argv[1:]) or "info: Hello World!"
13    channel.basic_publish(exchange='logs',
14                          routing_key='',
15                          body=message)
16    print " [x] Sent %r" % (message,)
17    connection.close()

Comme vous pouvez le voir, après avoir établi la connexion, déclarez l'échange. Cette étape est nécessaire car la publication sur des bourses inexistantes est interdite.

Si aucune file d'attente n'est liée à l'échange, le message sera perdu, mais ce n'est pas grave. Si aucun consommateur ne l'a reçu, le message peut être rejeté en toute sécurité.

Code pour receive_logs.py:

 1    #!/usr/bin/env python
 2    import pika
 3
 4    connection = pika.BlockingConnection(pika.ConnectionParameters(
 5            host='localhost'))
 6    channel = connection.channel()
 7
 8    channel.exchange_declare(exchange='logs',
 9                             type='fanout')
10
11    result = channel.queue_declare(exclusive=True)
12    queue_name = result.method.queue
13
14    channel.queue_bind(exchange='logs',
15                       queue=queue_name)
16
17    print ' [*] Waiting for logs. To exit press CTRL+C'
18
19    def callback(ch, method, properties, body):
20        print " [x] %r" % (body,)
21
22    channel.basic_consume(callback,
23                          queue=queue_name,
24                          no_ack=True)
25
26    channel.start_consuming()

C'est tout. Si vous souhaitez enregistrer le journal dans un fichier, ouvrez la console et tapez:

$ python receive_logs.py > logs_from_rabbit.log

Si vous souhaitez voir les journaux à l'écran, démarrez un nouveau terminal et exécutez:

$ python receive_logs.py

Et bien sûr, pour envoyer le journal, tapez:

$ python emit_log.py

Vous pouvez utiliser rabbitmqctl list_bindings pour vérifier que votre code crée réellement les liaisons et les files d'attente souhaitées. Lorsque j'exécute les deux programmes receive_logs.py, j'obtiens ce qui suit:

$ sudo rabbitmqctl list_bindings
Listing bindings ...
logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
...done.

L'interprétation du résultat est simple: les données issues de l'échange "logs" vont dans deux files d'attente avec les noms attribués par le serveur. Exactement comme prévu.

Passons au didacticiel 4 pour apprendre à recevoir un sous-ensemble de messages.

Recommended Posts

Tutoriel RabbitMQ 3 (Publier / S'abonner)
Tutoriel RabbitMQ 5 (sujet)
Tutoriel RabbitMQ 6 (RPC)
Tutoriel RabbitMQ 4 (Routage)
Didacticiel RabbitMQ 2 (file d'attente de travail)
Tutoriel RabbitMQ 1 ("Hello World!")