AMQP is a protocol for messaging services. Someone thrusts a message into a Queue and someone retrieves the message from the Queue. I understand that.
The following page is very helpful for details. GREE engineer's page Easy to read and very nice.
However, the quality is incomprehensible without moving it, so I experimented. The tools used for the experiment are as follows
The environment construction procedure will be described later.
The screen is from RabbitMQ's Management Plugin.
According to the GREE engineer's page, Message
is received by ʻExchange, and ʻExchange
is passed to Queue
.
For the time being, I decided to make it from ʻExchange, and here is what I could see from the [sample code](http://kombu.readthedocs.org/en/latest/reference/kombu.html#exchange) of
kombu`.
from kombu import Connection,Exchange
exchange = Exchange('foo_exc', type='direct')
with Connection('amqp://guest:guest@localhost:5672//') as c:
bound = exchange(c.default_channel)
bound.declare()
ʻExchangeis completed. At the bottom,
foo_exc` created by the above process.
Continue to create Queue
based on Sample Code.
from kombu import Connection,Exchange,Queue
exchange = Exchange('foo_exc', type='direct')
queue = Queue('bar_queue', exchange=exchange, routing_key='hoge.fuga')
with Connection('amqp://guest:guest@localhost:5672//') as c:
bound = queue(c.default_channel)
bound.declare()
Queue
is created.
Finally create a Message
and plunge into ʻExchange`.
See Reference rather than sample code.
from kombu import Connection,Exchange
exchange = Exchange('foo_exc', type='direct')
with Connection('amqp://guest:guest@localhost:5672//') as c:
bound_exc = exchange(c.default_channel)
msg = bound_exc.Message("Hello, World")
bound_exc.publish(msg, routing_key='hoge.fuga')
It seems that Hello, World
is properly included. Around Get messages.
The Message
that cannot be taken is just garbage, so take it.
Sample code
from kombu import Connection,Exchange,Queue,Consumer
exchange = Exchange('foo_exc', type='direct')
queue = Queue('bar_queue', exchange=exchange, routing_key='hoge.fuga')
def callback(body, message):
print body
message.ack()
with Connection('amqp://guest:guest@localhost:5672//') as c:
with Consumer(c.default_channel, queues=[queue], callbacks=[callback]):
c.drain_events()
result
ubuntu@ubuntu:~$ python consume.py
Hello, World
Hello, World
is displayed.
If you don't call message.ack ()
, the message will not disappear from Queue
.
RabbitMQ
Built with Docker + docker-compose
docker-compose.yml
rabbit:
image: rabbitmq:3-management
hostname: rabbit001
ports:
- "15672:15672"
- "5672:5672"
kombu
Install with pip normally
pip install kombu
Recommended Posts