RabbitMQ Tutorial 2 (Work Queue)

RabbitMQ Tutorial 2 https://www.rabbitmq.com/tutorials/tutorial-two-python.html It is a translation of. We look forward to pointing out any translation errors.

Prerequisites

This tutorial assumes that RabbitMQ is installed and running on the standard port (5672) on the local host. If you want to use a different host, port, or credentials, you need to adjust your connection settings.

If a problem occurs

If you encounter any issues through this tutorial, you can contact us through the mailing list.

Work queue

(Using pika 0.9.8 Python client)

In the first tutorial, I wrote a program to send and receive messages from a named queue. This creates a work queue that is used to distribute time-consuming tasks among multiple workers.

The main idea behind work queues (commonly known as task queues) is to perform resource-intensive tasks immediately and avoid waiting for them to complete. Instead, schedule the task to run later. Encapsulate the task as a message and send it to the queue. A worker process running in the background retrieves the task and finally runs the job. When you run many workers, tasks are shared among them.

This concept is especially useful in web applications, where it is impossible to handle complex tasks during a short HTTP request.

Preparation

In the previous part of this tutorial, you sent a message that contained "Hello World!". We're about to send a string that represents a complex task. It doesn't have real-world tasks like resizing images or rendered PDF files, so let's fake it by pretending to be busy, using the time.sleep () function do it. Let's show the complexity by the number of dots in the string, let's say that each dot occupies a second of "work". For example, a fake task marked "Hello ..." takes 3 seconds.

Slightly modify the send.py code from the previous example so that any message is sent from the command line. This program schedules tasks in the work queue, so let's name it new_task.py:

import sys

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=message)
print " [x] Sent %r" % (message,)

The previous receive.py script also requires some changes. It requires a fake second of work for each dot contained in the message body. Let's call this worker.py because it takes a message from the queue and executes the task:

import time

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count('.') )
    print " [x] Done"

Round robin dispatch

One of the benefits of using task queues is that you can easily parallelize your work. Building a backlog of work allows you to add more workers, so it scales easily.

First, let's run two worker.py scripts at the same time. They get messages from both queues, but exactly how? Let's take a look.

You need to open three consoles. The two run the worker.py script. These consoles will be two consumers, "C1" and "C2".

shell1$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
shell2$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C

Publish a new task in the third console. After starting the consumer, you can publish some messages.

shell3$ python new_task.py First message.
shell3$ python new_task.py Second message..
shell3$ python new_task.py Third message...
shell3$ python new_task.py Fourth message....
shell3$ python new_task.py Fifth message.....

Let's see if it is delivered to workers:

shell1$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'First message.'
 [x] Received 'Third message...'
 [x] Received 'Fifth message.....'
shell2$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Second message..'
 [x] Received 'Fourth message....'

By default, RabbitMQ sends each message to the next consumer in the column. On average, all consumers get the same number of messages. This method of delivering messages is called round robin. Try this with 3 or more workers.

Message acknowledgment

It can take a few seconds to perform the task. You may be wondering what happens if one of the consumers starts a long task, partially completes it, and dies. In the current code, RabbitMQ delivers the message to the customer once and immediately removes it from memory. In this case, if you kill the worker, you will lose the message being processed. You will also lose all messages that have been dispatched to a particular worker but have not yet been processed.

But I don't want to lose all the tasks. If a worker dies, I want the task to be delivered to other workers.

To ensure that your messages are never lost, RabbitMQ supports message acknowledgments. An ack (nowledgement) is returned by the consumer to inform RabbitMQ that a particular message has been received and processed, and RabbitMQ is free to delete it.

If a consumer dies without sending an ack, RabbitMQ will realize that the message was not completely processed and will redistribute it to another consumer. That way, you can be confident that your message will not be lost if the worker dies occasionally.

There is no message timeout. RabbitMQ redistributes the message only if the worker connection dies. Processing the message can take a very, very long time.

Message acknowledgment is turned on by default. In the previous example, it was explicitly turned off via the no_ack = True flag. Remove this flag and let the worker send an appropriate acknowledgment when the task is complete.

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count('.') )
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,
                      queue='hello')

With this code, you can use CTRL + C while processing a message to make sure that killing a worker does not lose anything. Immediately after the worker dies, all unanswered messages are redistributed.

Forgotten acknowledgment

    basic_Forgetting to ack is a common mistake. It's a simple error, but the result is serious. The message is redelivered when the client exits (though it may look like a random redelivery), but I can no longer release any unacked messages and RabbitMQ uses more memory Will come to do.

To debug this kind of error, messages_You can use rabbitmqctl to output the unacknowledged field:

    $ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
    Listing queues ...
    hello    0       0
    ...done.

Message durability

You've learned how to make sure your tasks aren't lost if the consumer dies. However, if the RabbitMQ server goes down, the work will still be lost.

If RabbitMQ quits or crashes, you'll forget your queues and messages unless you tell them not to. To ensure that messages are not lost, two things are needed: both the queue and the message should be marked as durable.

First, you need to make sure that RabbitMQ doesn't lose the queue. Therefore, it must be declared as * durable *:

channel.queue_declare(queue='hello', durable=True)

This command is correct in itself, but it doesn't work in our setup. This is because the queue called hello is already defined as non-durable. RabbitMQ cannot redefine an existing queue with different parameters and will return an error to any program that tries to do so. But there is a quick workaround, let's define the queue with a different name, for example task_queue:

channel.queue_declare(queue='task_queue', durable=True)

This queue_declare change should be applied to both the producer and consumer code.

Now you can see that the task_queue queue is not lost when RabbitMQ restarts. Next, let's mark the message as persistent, by giving delivery_mode a value of 2.

channel.basic_publish(exchange='',
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))
Notes on message persistence

Marking a message as persistent does not guarantee that the message will never be lost. It tells RabbitMQ to save the message to disk, but there is still a short time lag between RabbitMQ accepting the message and not saving it yet. Also, RabbitMQ fsyncs for each message(2)Does not run, it may just be cached and not actually written to disk. The guarantee of persistence is not strong, but it's more than enough for a simple task queue. If you need a strong guarantee, you can use publisher confirmation.

Fair dispatch

You may have noticed that dispatch still doesn't work as we want. For example, if you have two workers, all odd messages are heavy and even messages are light, then one worker is always busy and the other does little work. RabbitMQ knows nothing about it and continues to dispatch messages evenly.

This happens because RabbitMQ simply dispatches the message when it is queued. I haven't seen the number of unconfirmed messages for consumers. It simply blindly sends all nth messages to the nth consumer.

To avoid this, you can use the basic.qos method with the prefetch_count = 1 setting. This tells RabbitMQ not to give one worker multiple messages at once. In other words, it does not send a new message to the worker until the worker processes the previous message and acknowledges it. Instead, dispatch it to the next worker who is not yet busy.

channel.basic_qos(prefetch_count=1)
Notes on queue size

The queue can fill up if all workers are in use. In such cases, you need to add more workers or take some other strategies.

All summary

Final code for the new_task.py script:

 1    #!/usr/bin/env python
 2    import pika
 3    import sys
 4
 5    connection = pika.BlockingConnection(pika.ConnectionParameters(
 6            host='localhost'))
 7    channel = connection.channel()
 8
 9    channel.queue_declare(queue='task_queue', durable=True)
10
11    message = ' '.join(sys.argv[1:]) or "Hello World!"
12    channel.basic_publish(exchange='',
13                          routing_key='task_queue',
14                          body=message,
15                          properties=pika.BasicProperties(
16                             delivery_mode = 2, # make message persistent
17                          ))
18    print " [x] Sent %r" % (message,)
19    connection.close()

And worker:

 1    #!/usr/bin/env python
 2    import pika
 3    import time
 4
 5    connection = pika.BlockingConnection(pika.ConnectionParameters(
 6            host='localhost'))
 7    channel = connection.channel()
 8
 9    channel.queue_declare(queue='task_queue', durable=True)
10    print ' [*] Waiting for messages. To exit press CTRL+C'
11
12    def callback(ch, method, properties, body):
13        print " [x] Received %r" % (body,)
14        time.sleep( body.count('.') )
15        print " [x] Done"
16        ch.basic_ack(delivery_tag = method.delivery_tag)
17
18    channel.basic_qos(prefetch_count=1)
19    channel.basic_consume(callback,
20                          queue='task_queue')
21
22    channel.start_consuming()

You can set up a work queue using message acknowledgments and prefetch_count. The durability option ensures that the task survives even if RabbitMQ is restarted.

Now you can move on to Tutorial 3 and learn how to deliver the same message to many consumers.

Recommended Posts

RabbitMQ Tutorial 2 (Work Queue)
RabbitMQ Tutorial 5 (Topic)
RabbitMQ Tutorial 6 (RPC)
RabbitMQ Tutorial 4 (Routing)
RabbitMQ Tutorial 3 (Publish / Subscribe)
RabbitMQ Tutorial 1 ("Hello World!")
queue