Setup RabbitMQ To setup RabbitMQ, follow this tutorial: https://www.rabbitmq.com/tutorials/tutorial-one-java.html (Currently I am using Scala for development so I needed to convert the example code written in Java into Scala.)
If your project is a maven project, you need to add the following lines into your pom.xml file.
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.2.0</version>
</dependency>
Then, start RabbitMQ service
service rabbitmq-server start
RabbitMQ Sender Following is a simple code of RabbitMQ Sender
import com.rabbitmq.client.{Channel, Connection, ConnectionFactory}
object rabbitMQ_sender {
def main(args: Array[String]) {
val factory: ConnectionFactory = new ConnectionFactory()
factory.setHost("localhost")
val connection: Connection = factory.newConnection
val channel: Channel = connection.createChannel
val queue_to_send = "test"
channel.queueDeclare(queue_to_send, false, false, false, null)
val message = "Hello World!"
channel.basicPublish("", queue_to_send, null, message.getBytes)
System.out.println(" [x] Sent '" + message + "'")
channel.close
connection.close()
}
}
In the below figure, P represents RabbitMQ Sender (P means producer). When you run the above code, the sender send a message "Hello World" to Queue Stack (red blocks in the figure). Then, the Receiver (C: consumer) receives the message.
RabbitMQ Receiver Following is a simple code of RabbitMQ Receiver.
import com.rabbitmq.client._
object rabbitMQ_receiver {
def main(args: Array[String]) {
try {
val factory: ConnectionFactory = new ConnectionFactory()
factory.setHost("localhost")
val connection: Connection = factory.newConnection()
val channel: Channel = connection.createChannel()
val QUEUE_NAME = "test"
channel.queueDeclare(QUEUE_NAME, false, false, false, null)
System.out.println(" [*] Waiting for messages. To exit press CTRL+C")
val consumer:Consumer = new DefaultConsumer(channel) {
override def handleDelivery(consumerTag: String, envelope: Envelope , properties: AMQP.BasicProperties , body: Array[Byte] ) = {
val message: String = new String (body, "UTF-8")
System.out.println (" [x] Received '" + message + "'")
}
}
channel.basicConsume (QUEUE_NAME, true, consumer)
}
}
}
While RabbitMQ Receiver is running, it returns the below result every time it gets message from a sender.
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Hello World!'
Test: Process handling using RabbitMQ So now, how can I implement these RabbitMQ Sender/Receiver into my coding?
Here is what I am going to do.
In process A, call the following function where you want process A to send AMQP message, in my case it should be the end of main() function, so that process B can be notified the job completion.
def send_ampq_task_completion(rabbitmq_host: String, queue_name: String, stop_message: String) = {
val factory: ConnectionFactory = new ConnectionFactory()
factory.setHost(rabbitmq_host)
val connection: Connection = factory.newConnection
val channel: Channel = connection.createChannel
val queue_to_send = queue_name
channel.queueDeclare(queue_to_send, false, false, false, null)
channel.basicPublish("", queue_to_send, null, stop_message.getBytes)
System.out.println(" [x] Sent '" + stop_message + "'")
channel.close
connection.close()
}
In process B, "RabbitMQ receiver" code should be modified a little bit like below. By adding while loop in the receiver, it waits until get a specific message from queue (in this case, a message from process B. These 2 messages need to be identical). Define the below function and call it where you want to stop process B until process A finishes its job.
Note that, messages are stuck in a queue whose name is the same one which is defined in channel.queueDeclare(QUEUE_NAME, false, false, false, null)
. Also, you can specify the location of RabbitMQ server with factory.setHost()
.
def get_ampq_task_completion(rabbitmq_host: String, queue_name: String, stop_message: String) = {
try {
val factory: ConnectionFactory = new ConnectionFactory()
factory.setHost("localhost")
val connection: Connection = factory.newConnection()
val channel: Channel = connection.createChannel()
val QUEUE_NAME = queue_name
channel.queueDeclare(QUEUE_NAME, false, false, false, null)
System.out.println(" [*] Waiting for messages. To exit press CTRL+C")
var enough: Boolean = false
while (!enough) {
val consumer: Consumer = new DefaultConsumer(channel) {
override def handleDelivery(consumerTag: String, envelope: Envelope, properties: AMQP.BasicProperties, body: Array[Byte]) = {
val message: String = new String(body, "UTF-8")
System.out.println(" [x] Received '" + message + "'")
if (message == stop_message) {enough = true}
}
}
channel.basicConsume(QUEUE_NAME, true, consumer)
}
channel.close()
connection.close()
}
}
In addition, I defined the below function which remove all messages in a queue. Once you send a message to RabbitMQ server and it is stacked in a queue, unless a consumer reads the message, it will never be removed (and this is one of the beneficials of AMQP service). Therefore, before starting the 2 processes, execute the following function and clear all old messages to avoid that process B mistakenly read an old message from process A and move ahead without waiting process A completion.
def clear_ampq_queues(rabbitmq_host: String, queue_name: String) = {
val factory: ConnectionFactory = new ConnectionFactory()
factory.setHost("localhost")
val connection: Connection = factory.newConnection()
val channel: Channel = connection.createChannel()
channel.queueDelete(queue_name)
channel.close()
connection.close()
}
The above codes are only applicable to sequential processes, but what I am trying to develop is like multiple processes which runs on containers in parallel and they send message after completing its job to a controller (which should be receiver in the example above). So I am going to modify the code for parallel processing.
Recommended Posts