J'ai résumé comment utiliser Rabbit Mq dans Dernière fois et Dernière fois. Maintenant que nous avons élargi un peu plus la portée et examiné comment utiliser Rabbit Mq du point de vue des relations producteur-consommateur, nous allons résumer les avantages avec un exemple simple.
Les messages envoyés par le producteur sont stockés dans une file d'attente, et le consommateur les reçoit dans cet ordre et exécute le processus.
Lors de l'exécution d'un travail qui ne peut pas être exécuté en même temps sur un site Web utilisé par plusieurs personnes.
Le processus de mise en file d'attente se termine sans attendre le processus du consommateur, il n'envoie donc le message que deux fois.
client_main.py
import pika
import datetime
pika_param = pika.ConnectionParameters('localhost')
connection = pika.BlockingConnection(pika_param)
channel = connection.channel()
channel.queue_declare(queue='hello')
print('Send Message 1 Start. {}'.format(datetime.datetime.now()))
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World 1')
print('Send Message 1 End. {}'.format(datetime.datetime.now()))
print('Send Message 2 Start. {}'.format(datetime.datetime.now()))
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World 2')
print('Send Message 2 End. {}'.format(datetime.datetime.now()))
Lorsque vous exécutez l'exemple, l'heure d'envoi et de réception s'affiche et prend fin. Si vous regardez les files d'attente sur l'écran de gestion, vous pouvez voir qu'il y en a deux dans Message.
PS C:\Users\xxxx\program\python\pika> python .\client_main.py
Send Message 1 Start. 2020-03-08 18:53:45.658027
Send Message 1 End. 2020-03-08 18:53:45.659027
Send Message 2 Start. 2020-03-08 18:53:45.659027
Send Message 2 End. 2020-03-08 18:53:45.660026
Le récepteur prépare simplement une fonction qui prend le processus de la file d'attente et l'exécute.
host_main.py
import datetime
import pika
pika_param = pika.ConnectionParameters(host='localhost', channel_max=2)
connection = pika.BlockingConnection(pika_param)
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print("{} Received. {}".format(body, datetime.datetime.now()))
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(
queue='hello', on_message_callback=callback)
channel.start_consuming()
Lorsque l'échantillon est exécuté, le traitement est effectué dans le même ordre que lorsqu'il a été envoyé correctement. Vous pouvez voir que Message est 0 lorsque vous regardez les files d'attente sur l'écran de gestion car vous avez traité le message.
PS C:\Users\xxxx\program\python\pika> python .\host_main.py
b'Hello World 1' Received. 2020-03-08 19:02:11.756469
b'Hello World 2' Received. 2020-03-08 19:02:11.757469
Les producteurs et les consommateurs peuvent fonctionner indépendamment, vous pouvez donc facilement augmenter le nombre de producteurs et de consommateurs. De plus, la distribution des messages est distribuée par défaut, donc l'ajout de plusieurs consommateurs le distribuera naturellement.
Étant donné que le nombre d'utilisateurs du système augmente et que le traitement en retour est insuffisant, nous augmenterons le nombre de consommateurs pour y faire face.
Je veux augmenter le nombre de consommateurs pendant que le producteur est en marche, alors j'envoie un message environ 20 fois et j'augmente le nombre de consommateurs entre-temps.
import time
import datetime
import pika
pika_param = pika.ConnectionParameters('localhost')
connection = pika.BlockingConnection(pika_param)
channel = connection.channel()
channel.queue_declare(queue='hello')
for i in range(20):
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World {}'.format(i))
print('Send Message {} Exec. {}'.format(i, datetime.datetime.now()))
time.sleep(2)
connection.close()
Le récepteur a préparé deux fonctions qui prennent le processus de la file d'attente et l'exécutent. J'ai mis un numéro dans le message à afficher afin qu'il puisse être identifié comme chaque source.
host1_main.py
import datetime
import pika
pika_param = pika.ConnectionParameters(host='localhost', channel_max=2)
connection = pika.BlockingConnection(pika_param)
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print("Host1 {} Received. {}".format(body, datetime.datetime.now()))
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(
queue='hello', on_message_callback=callback)
channel.start_consuming()
host2_main.py
import datetime
import pika
pika_param = pika.ConnectionParameters(host='localhost', channel_max=2)
connection = pika.BlockingConnection(pika_param)
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print("Host 2 {} Received. {}".format(body, datetime.datetime.now()))
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(
queue='hello', on_message_callback=callback)
channel.start_consuming()
Lors de l'exécution, exécutez dans l'ordre suivant afin que le côté émetteur augmente le côté réception pendant le démarrage.
PS C:\Users\xxxx\program\python\pika> python .\client_main.py
Send Message 0 Exec. 2020-03-08 19:48:12.834261
Send Message 1 Exec. 2020-03-08 19:48:14.835843
Send Message 2 Exec. 2020-03-08 19:48:16.838815
Send Message 3 Exec. 2020-03-08 19:48:18.839815
Send Message 4 Exec. 2020-03-08 19:48:20.840815
Send Message 5 Exec. 2020-03-08 19:48:22.841815
Send Message 6 Exec. 2020-03-08 19:48:24.842788
Send Message 7 Exec. 2020-03-08 19:48:26.843861
Send Message 8 Exec. 2020-03-08 19:48:28.845190
Send Message 9 Exec. 2020-03-08 19:48:30.845934
PS C:\Users\xxxx\program\python\pika> python .\host1_main.py
Host1 b'Hello World 0' Received. 2020-03-08 19:48:12.836260
Host1 b'Hello World 1' Received. 2020-03-08 19:48:14.839838
Host1 b'Hello World 2' Received. 2020-03-08 19:48:16.841816
Host1 b'Hello World 3' Received. 2020-03-08 19:48:18.840818
Host1 b'Hello World 4' Received. 2020-03-08 19:48:20.842817
Host1 b'Hello World 6' Received. 2020-03-08 19:48:24.844791
Host1 b'Hello World 8' Received. 2020-03-08 19:48:28.847190
PS C:\Users\xxxx\program\python\pika> python .\host2_main.py
Host 2 b'Hello World 5' Received. 2020-03-08 19:48:22.843819
Host 2 b'Hello World 7' Received. 2020-03-08 19:48:26.845863
Host 2 b'Hello World 9' Received. 2020-03-08 19:48:30.847937
En regardant le résultat de l'exécution, vous pouvez voir que le message arrive également du côté du consommateur qui a augmenté au moment où le nombre de consommateurs a été augmenté. De plus, le traitement distribué des messages fonctionne normalement et les messages sont traités en alternance.
Identique à l'exemple simple d'ajout d'un producteur / consommateur.
Identique à l'exemple simple d'ajout d'un producteur / consommateur.
Les clients de RabbitMq ont des bibliothèques dans différentes langues, vous pouvez donc échanger des messages sans être conscient des différences entre les langues. De plus, non seulement les producteurs et les consommateurs, mais aussi les producteurs et les consommateurs fonctionnent de manière indépendante, il n'est donc pas nécessaire d'unifier les langues.
Lorsque vous souhaitez fournir un service qui s'exécute dans différentes langues en tant que service unique et que vous souhaitez créer un service intégré en utilisant le moins possible l'ancien service.
Utilisez le producteur python ci-dessus.
Utilisez
host1_main.py` '' écrit ci-dessus en python. Utilisez la source suivante comme Kotlin comme une autre langue.
host_kotlin_main.kt
import com.rabbitmq.client.AMQP
import com.rabbitmq.client.ConnectionFactory
import com.rabbitmq.client.DefaultConsumer
import com.rabbitmq.client.Envelope
fun main(argv: Array<String>) {
val factory = ConnectionFactory()
factory.host = "localhost"
val connection = factory.newConnection()
val channel = connection.createChannel()
channel.queueDeclare("hello", false, false, false, null)
val callback = object : DefaultConsumer(channel) {
override fun handleDelivery(consumerTag: String, envelope: Envelope, properties: AMQP.BasicProperties, body: ByteArray) {
val message = String(body, charset("UTF-8"))
println("Host Kotlin '$message' Received")
channel.basicAck(envelope.deliveryTag, false)
}
}
channel.basicConsume("hello", false, callback)
}
En courant, j'ai démarré le consommateur puis le producteur.
PS C:\Users\xxxx\program\python\pika> python .\client_main.py
Send Message 0 Exec. 2020-03-08 21:21:06.572582
Send Message 1 Exec. 2020-03-08 21:21:08.573535
Send Message 2 Exec. 2020-03-08 21:21:10.574442
Send Message 3 Exec. 2020-03-08 21:21:12.575198
PS C:\Users\xxxx\program\python\pika> python .\host1_main.py
Host1 b'Hello World 1' Received. 2020-03-08 21:21:08.575536
Host1 b'Hello World 3' Received. 2020-03-08 21:21:12.577199
"C:\Program Files\Java\bin\java.exe" .... Host_kotlin_mainKt
Host Kotlin 'Hello World 0' Received
Host Kotlin 'Hello World 2' Received
Si vous regardez le résultat de l'exécution, vous pouvez voir que cela fonctionne normalement, que le consommateur soit python ou Kotlin (Java).
J'ai essayé de résumer l'utilisation pratique de Rabbit Mq. La plupart des RabbitMq étaient en file d'attente ou AMQP, et il n'y avait presque pas de RabbitMq unique. Personnellement, je l'ai trouvé facile à utiliser car il se disperse naturellement et il est très facile d'ajouter des producteurs et des consommateurs. Cette fois, je me suis concentré sur la relation entre producteurs et consommateurs, mais quand je me suis concentré sur la file d'attente, il y avait encore de nombreuses fonctions. Je pense qu'il faudra beaucoup de temps pour tout comprendre.
Recommended Posts