RabbitMQ Tutorial 3 (Publish / Subscribe)

RabbitMQ Tutorial 3 https://www.rabbitmq.com/tutorials/tutorial-three-python.html It is a translation of. We look forward to pointing out any translation errors.

Prerequisites

This tutorial assumes that RabbitMQ is installed and running on the standard port (5672) on the local host. If you want to use a different host, port, or credentials, you need to adjust your connection settings.

If a problem occurs

If you encounter any issues through this tutorial, you can contact us through the mailing list.

Publish / Subscribe

(Using pika 0.9.8 Python client)

In the previous tutorial, you created a work queue. The assumption of the work queue is that each task is delivered to exactly one worker. In this part, we'll do something completely different: deliver the message to multiple consumers. This pattern is known as "publish / subscribe".

Build a simple logging system to illustrate the pattern. It consists of two programs, the first is to send a log message and the second is to receive and print.

In the logging system, each running copy of the receiver program gets a message. Therefore, you can run one receiver to write the log to disk, and at the same time run the other receiver to see the log on the screen.

Basically, published log messages will be broadcast to all receivers.

exchange

In the previous part of the tutorial, you sent and received messages to and from the queue. Here we introduce Rabbit's full messaging model.

Let's look back at what we explained in the previous tutorial:

The core idea of the messaging model in RabbitMQ is that producers don't send messages directly to the queue. In fact, often producers don't even know at all whether a message will be delivered to a queue.

Instead, producers can only send messages to one "exchange". The exchange is very simple. Receive messages from the producer on the one hand and push them to the queue on the other. The exchange needs to know exactly what to do with the received message. Should I add it to a specific queue? Should I add it to many queues? Or should it be destroyed? Those laws are defined by the type of exchange.

There are several exchange types available: direct topics, headers, fanouts. Let's focus on the last one, that is, fanout. Now let's create an exchange of that type and call it "logs":

channel.exchange_declare(exchange='logs',
                         type='fanout')

The fanout exchange is very simple. As you can imagine from the name, it broadcasts every message it receives to every queue it knows. And that's exactly what our logger needs.

List of exchanges

Use rabbitmqctl to list the exchanges on the server.

    $ sudo rabbitmqctl list_exchanges
    Listing exchanges ...
    logs      fanout
    amq.direct      direct
    amq.topic       topic
    amq.fanout      fanout
    amq.headers     headers
    ...done.

There are several in this list"amq.*"There is an exchange called and a default (unnamed) exchange. These are created by default, but you rarely need to use them at this time.

Anonymous exchange

In the previous part of the tutorial, I didn't know anything about the exchange, but I was able to send a message to the queue. Empty string ("") Was used because it used the default exchange identified by.

Recall how you used to publish your message:

    channel.basic_publish(exchange='',
                          routing_key='hello',
                          body=message)

The exchange parameter is the name of the exchange. An empty string means a default or anonymous exchange, if it exists the message is routing_It will be routed to the queue with the name specified by key.

Instead, you can publish to a named exchange:

channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)

Temporary cue

Previously, you were using a queue with the specified name (remember hello and task_queue?). Being able to name the queue is very important, it was necessary for the workers to point to the same queue. Naming cues is important when you want to share cues between producers and consumers.

But that's not the case with our loggers. I want to hear everything, not part of the log message. We are also only interested in the messages that are currently flowing, not in the old messages. Two things are needed to solve this.

First, whenever you connect to Rabbit, you need a fresh, empty queue. To do this, you can create a queue with a random name, or better, the server will choose a random queue name for you. You can do this by ** not giving queue parameters ** to queue_declare:

result = channel.queue_declare()

Where result.method.queue contains a random queue name. For example, "amq.gen-JzTY20BRgKO-HjmUJj0wLg".

Second, the queue should be deleted once the consumer is disconnected. There is an exclusive flag for that:

result = channel.queue_declare(exclusive=True)

binding

So far, you've created a fanout exchange and cues. Next, you need to instruct the exchange to send a message to the queue. The relationship between an exchange and a queue is called a binding.

channel.queue_bind(exchange='logs',
                   queue=result.method.queue)

The "logs" exchange will now add the message to our queue.

List of bindings

As you can imagine, rabbitmqctl list_You can use bindings to list existing bindings.

All summary

The producer program that sends the log message is not much different from the previous tutorial. The most important change is to publish the message to the "logs" exchange instead of the anonymous exchange. You must give the routing_key when sending, but that value is ignored because it is a fanout exchange. code for the emit_log.py script:

 1    #!/usr/bin/env python
 2    import pika
 3    import sys
 4
 5    connection = pika.BlockingConnection(pika.ConnectionParameters(
 6            host='localhost'))
 7    channel = connection.channel()
 8
 9    channel.exchange_declare(exchange='logs',
10                             type='fanout')
11
12    message = ' '.join(sys.argv[1:]) or "info: Hello World!"
13    channel.basic_publish(exchange='logs',
14                          routing_key='',
15                          body=message)
16    print " [x] Sent %r" % (message,)
17    connection.close()

As you can see, after establishing the connection, declare the exchange. This step is necessary because publishing to non-existent exchanges is prohibited.

If no queue is bound to the exchange, the message will be lost, but that's okay. If no consumer has received it, the message can be safely discarded.

Code for receive_logs.py:

 1    #!/usr/bin/env python
 2    import pika
 3
 4    connection = pika.BlockingConnection(pika.ConnectionParameters(
 5            host='localhost'))
 6    channel = connection.channel()
 7
 8    channel.exchange_declare(exchange='logs',
 9                             type='fanout')
10
11    result = channel.queue_declare(exclusive=True)
12    queue_name = result.method.queue
13
14    channel.queue_bind(exchange='logs',
15                       queue=queue_name)
16
17    print ' [*] Waiting for logs. To exit press CTRL+C'
18
19    def callback(ch, method, properties, body):
20        print " [x] %r" % (body,)
21
22    channel.basic_consume(callback,
23                          queue=queue_name,
24                          no_ack=True)
25
26    channel.start_consuming()

That's it. If you want to save the log to a file, open a console and type:

$ python receive_logs.py > logs_from_rabbit.log

If you want to see the on-screen logs, start a new terminal and run:

$ python receive_logs.py

And of course, to send the log, type:

$ python emit_log.py

You can use rabbitmqctl list_bindings to verify that your code is actually creating the bindings and queues you want. When I run the two receive_logs.py programs, I get the following:

$ sudo rabbitmqctl list_bindings
Listing bindings ...
logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
...done.

The interpretation of the result is straightforward: the data coming from the "logs" exchange goes to two queues with the names assigned by the server. That's exactly what I intended.

Let's move on to Tutorial 4 to learn how to receive a subset of messages.

Recommended Posts

RabbitMQ Tutorial 3 (Publish / Subscribe)
RabbitMQ Tutorial 5 (Topic)
RabbitMQ Tutorial 6 (RPC)
RabbitMQ Tutorial 4 (Routing)
RabbitMQ Tutorial 2 (Work Queue)
RabbitMQ Tutorial 1 ("Hello World!")