In the previous Handling rabbimq with python, I exchanged simple messages with RabbitMq, but since RabbitMq has various functions, this time it is various. Features and usage are summarized.
With the standard pika settings, if the queue you want to send and receive does not exist in RabbitMq, it will be created automatically, but you may want to make an error without creating the queue automatically. In that case, giving True to the passive argument of the `queue_declare ()`
function of the channel will result in an error if there is no queue.
For example, when you want to manage all queues on the producer side and allow pure connections on the consumer side.
I'm just adding `passive = True`
to the producer `channel.queue_declare ()`
created as the previous example. If the queue exists, you can connect as it is, but if the queue does not exist, `pika.exceptions.ChannelClosedByBroker`
will come up as an exception.
client_main.py
import pika
pika_param = pika.ConnectionParameters('localhost')
connection = pika.BlockingConnection(pika_param)
channel = connection.channel()
try:
channel.queue_declare(queue='hello', passive=True)
except pika.exceptions.ChannelClosedByBroker as ex:
print(ex)
exit(1)
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
connection.close()
Now that the source is ready, let's run it. An exception has occurred. If you look at the exception that occurred, you can see that there is no hello queue.
PS C:\Users\xxxx\program\python\pika> python .\client_main.py
(404, "NOT_FOUND - no queue 'hello' in vhost '/'")
The standard pika setting accepts connections unconditionally, but exclusive control can be applied so that other connections are not accepted. In that case, if True is given to the exclusive argument of the `queue_declare ()`
function of the channel, an error will occur when another connection is connected, so it can be used for checking the exclusion. If you close the connection, you can connect another connection.
For example, when you do not want to accept messages from other consumers until you prepare the message on the consumer side and send it to RabbitMq.
channel.queue_declare()Exclusive to=I'm just adding True. This time, after connecting once, I make a new connection and connect without closing the connection. For connections that come later, pika.exceptions.ChannelClosedByBroker comes up as an exception.
#### **` client_main.py`**
```py
import pika
pika_param = pika.ConnectionParameters('localhost')
connection = pika.BlockingConnection(pika_param)
channel = connection.channel()
channel.queue_declare(queue='hello', exclusive=True)
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
channel.close()
connection = pika.BlockingConnection(pika_param)
channel = connection.channel()
try:
channel.queue_declare(queue='hello')
except pika.exceptions.ChannelClosedByBroker as ex:
print('other connection access fail')
exit(1)
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
channel.close()
channel.close()Under connection.close()If you enter, it will end normally.
## Execution of exclusive connection
Now that the source is ready, let's run it. An exception occurred when connecting to the second queue. If you view the exception that occurred, you can see that hello has an access lock.
PS C:\Users\xxxx\program\python\pika> python .\client_main.py other connection access fail (405, "RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'hello' in vhost '/'. It could be originally declared on another connection or the exclusive property value does not match that of the original declaration.")
# Channel limit setting
The standard pika settings allow you to create channels unconditionally, but you can set the upper limit of the channel. In that case, you can set the upper limit by giving an upper limit to the channel_max argument of the `` `pika.ConnectionParameters ()` `` function of pika. An error will occur if you try to create a channel that exceeds the limit.
## Situation example
Since the server of RabbitMq is not abundant, when adjusting the number of channels generated, etc.
## Producer with channel caps enabled
#### **` pika.ConnectionParameters()Channel_max=I'm just adding 2. This time, the upper limit is set to 2, so 3 channels are created meaninglessly. As a result, pika could not be created for the third channel.exceptions.ConnectionClosedByBroker comes up as an exception.`**
client_main.py
import pika
pika_param = pika.ConnectionParameters('localhost', channel_max=2)
connection = pika.BlockingConnection(pika_param)
channel = connection.channel(1)
channel = connection.channel(2)
try:
channel = connection.channel(3)
except pika.exceptions.ConnectionClosedByBroker as ex:
print('channel crate error')
print(ex)
Now that the source is ready, let's run it. An exception occurred when creating the third channel. If you display the exception that occurred, you can see that the channel limit is 2.
PS C:\Users\xxxx\program\python\pika> python .\client_main.py
channel crate error
(530, 'NOT_ALLOWED - number of channels opened (2) has reached the negotiated channel_max (2)')
You can set the number of retries when the pika connection fails. In that case, you can set the number by giving the number of retries to the connection_attempts argument of the `pika.ConnectionParameters ()`
function of pika.
When the network to the RabbitMq server is unstable, etc.
pika.ConnectionParameters()Connect to_attempts=I'm just adding 2. This time, RabbitMq is dropped by making a log of pika so that you can see that it is retrying. Top logger.The xxx system is a setting for outputting the log in pika.
#### **` client_main.py`**
```py
import pika
import datetime
import logging
logger = logging.getLogger('pika')
logger.setLevel(logging.ERROR)
logger.addHandler(logging.StreamHandler())
pika_param = pika.ConnectionParameters('localhost', connection_attempts=2)
try:
print('start connect {}'.format(datetime.datetime.now()))
connection = pika.BlockingConnection(pika_param)
except pika.exceptions.AMQPConnectionError as ex:
print('connect error {}'.format(datetime.datetime.now()))
print(ex)
Now that the source is ready, let's run it. It is difficult to understand because a large amount of logs are output, but similar errors have occurred 4 times (2 errors x set value retry (2 times)).
PS C:\Users\xxxx\program\python\pika> python .\client_main.py
start connect 2020-03-01 21:46:18.549268
Socket failed to connect: <socket.socket fd=936, family=AddressFamily.AF_INET6, type=SocketKind.SOCK_STREAM, proto=6, laddr=('::', 38520, 0, 0)>; error=10061 (Unknown error)
TCP Connection attempt failed: ConnectionRefusedError(10061, 'Unknown error'); dest=(<AddressFamily.AF_INET6: 23>, <SocketKind.SOCK_STREAM: 1>, 6, '', ('::1', 5672, 0, 0))
AMQPConnector - reporting failure: AMQPConnectorSocketConnectError: ConnectionRefusedError(10061, 'Unknown error')
Socket failed to connect: <socket.socket fd=936, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('0.0.0.0', 38524)>; error=10061 (Unknown error)
TCP Connection attempt failed: ConnectionRefusedError(10061, 'Unknown error'); dest=(<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_STREAM: 1>, 6, '', ('127.0.0.1', 5672))
AMQPConnector - reporting failure: AMQPConnectorSocketConnectError: ConnectionRefusedError(10061, 'Unknown error')
Socket failed to connect: <socket.socket fd=812, family=AddressFamily.AF_INET6, type=SocketKind.SOCK_STREAM, proto=6, laddr=('::', 38533, 0, 0)>; error=10061 (Unknown error)
TCP Connection attempt failed: ConnectionRefusedError(10061, 'Unknown error'); dest=(<AddressFamily.AF_INET6: 23>, <SocketKind.SOCK_STREAM: 1>, 6, '', ('::1', 5672, 0, 0))
AMQPConnector - reporting failure: AMQPConnectorSocketConnectError: ConnectionRefusedError(10061, 'Unknown error')
Socket failed to connect: <socket.socket fd=812, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('0.0.0.0', 38535)>; error=10061 (Unknown error)
TCP Connection attempt failed: ConnectionRefusedError(10061, 'Unknown error'); dest=(<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_STREAM: 1>, 6, '', ('127.0.0.1', 5672))
AMQPConnector - reporting failure: AMQPConnectorSocketConnectError: ConnectionRefusedError(10061, 'Unknown error')
AMQP connection workflow failed: AMQPConnectionWorkflowFailed: 4 exceptions in all; last exception - AMQPConnectorSocketConnectError: ConnectionRefusedError(10061, 'Unknown error'); first exception - AMQPConnectorSocketConnectError: ConnectionRefusedError(10061, 'Unknown error').
AMQPConnectionWorkflow - reporting failure: AMQPConnectionWorkflowFailed: 4 exceptions in all; last exception - AMQPConnectorSocketConnectError: ConnectionRefusedError(10061, 'Unknown error'); first exception - AMQPConnectorSocketConnectError: ConnectionRefusedError(10061, 'Unknown error')
Connection workflow failed: AMQPConnectionWorkflowFailed: 4 exceptions in all; last exception - AMQPConnectorSocketConnectError:
ConnectionRefusedError(10061, 'Unknown error'); first exception - AMQPConnectorSocketConnectError: ConnectionRefusedError(10061,
'Unknown error')
Error in _create_connection().
Traceback (most recent call last):
File "C:\Users\minkl\AppData\Local\Programs\Python\Python36-32\lib\site-packages\pika\adapters\blocking_connection.py", line 450, in _create_connection
raise self._reap_last_connection_workflow_error(error)
pika.exceptions.AMQPConnectionError
connect error 2020-03-01 21:46:28.665843
I tried using the functions of RabbitMq that seems to be used often. I think it has all the features I need, but I felt confused if I didn't understand and use which settings were queues, connections, or channels.
Recommended Posts