[JAVA] Try changing to asynchronous processing via MQ without changing the code

Thing you want to do

This time, such processing is scattered, for example, when all of this is done via MQ.

TooHeavyProcessor processor;

public void doSomething() {
  // do something..

  // too heavy process in async.
  async(() -> processor.process(...));


Rabbit MQ

Installing on Homebrew

$ brew update
$ brew install rabbitmq
$ brew services start rabbitmq 

Spring Boot

Start with Spring Initializr


Added dependency.



A compiler option is added because the parameter name is required.



Rough changes

Processing content

Change before

[Heavy process]@any

After change

[Heavy process]@any -> (queue) -> [Heavy process]@consumer

Each class



public interface TooHeavyProcessor {

  void action(User user, Item item, Date date);

  void process(String key, Date date);

Implementation class

Default implementation

This class is running on the Producer side of MQ, so I would like to bring it to the Consumer side.


@Slf4j(topic = "result")
public class DefaultTooHeavyProcessor implements TooHeavyProcessor {

  public void action(User user, Item item, Date date) {
    //Logging to check the execution result
    log.info("{} actions to {} at {}!!", StringUtils.toString(user), StringUtils.toString(item),

  public void process(String key, Date date) {
    //Logging to check the execution result
    log.info("process for {} at {}", key, FORMATTER.format(date));


I want to message the call itself. So, I came up with SpEL (Spring Expression Language).

processor.action(user, item, date);



String el = "@processor.action(#user, #item, #date)";
Map<String, Object> params = new LinkedHashMap<>();
params.add("user", user);
params.add("item", item);
params.add("date", date);

ELExpression message = new ELExpression(el, params);

//Send to MQ
amqpMessageTemplate.convertAndSend("el.exchange", null, message);


public void receive(ELMessage message) {
  //Parse SpEL message

If you make it like this, it should be possible to execute it on the Consumer side.

MQ send class implementation

Instead of executing DefaultTooHeavyProcessor when TooHeavyProcessor is called, create an implementation class that creates SpEL and sends it to MQ.

Since dynamic behavior is required when called, implement it with Proxy. The method of creating Proxy is as follows.


import org.springframework.util.ClassUtils;

private String beanName = "processor";
private String Class<?> type = TooHeavyProcessor.class;

private AmqpTemplate messageTemplate;

  new Class[] { this.type }, (bean, method, args) -> {
    //SpEL generation
    //Utility method to generate ELExpression from beanName, method and args
    ELExpression el = SpelUtils.createELExpression(this.beanName, method, args);
    //Send to MQ
    this.messageTemplate.convertAndSend("el.exchange", null, el);
    return null;

I want to register this with Bean, so generate it with FactoryBean


public class MessagingTooHeavyProcessorProxyFactoryBean implements FactoryBean<TooHeavyProcessor>, InitializingBean {

  private final String beanName = "processor";
  private final Class<TooHeavyProcessor> type = TooHeavyProcessor.class;

  private TooHeavyProcessor proxy;

  private AmqpTemplate messageTemplate;

  public void afterPropertiesSet() {
    this.proxy = (TooHeavyProcessor) /*proxy creation*/;

  public TooHeavyProcessor getObject() {
    return this.proxy;

  public Class<?> getObjectType() {
    return this.type;

  public boolean isSingleton() {
    return true;

MQ Receiver class


public class ELReceiver {

  /**A class that parses and executes SpEL*/
  private ELExecutor elExecutor;

  //Define Exchange and Queue and create if not on Rabbit side
  @RabbitListener(bindings = @QueueBinding(
    value = @Queue(value = "el.queue", durable = "true"),
    exchange = @Exchange(value = "el.exchange", type = ExchangeTypes.FANOUT, durable = "true")))
  public void receive(ELExpression el) {
    //Analyze and execute SpEL

The analysis and execution of SpEL is described in detail in the Official documentation, so I will omit it.

Only how to make StandardEvaluationContext.


private ApplicationContext appContext;

private StandardEvaluationContext createContext(Map<String, Object> args) {
  StandardEvaluationContext context = new StandardEvaluationContext();

  //Register variable
  args.forEach((key, value) -> context.setVariable(key, value));
  //Register BeanResolver so that Bean can be resolved from Bean name
    (_self, beanName) -> this.appContext.getBean(beanName));

  return context;

At this time, since the bean name is specified as processor in MessagingTooHeavyProcessorProxyFactoryBean, match the bean name properly. Also, I want to switch the bean name in the profile, so specify the profile.


// "messaging"Bean registration only when profile is specified
public class MessagingTooHeavyProcessorProxyFactoryBean


// "default"Bean registration only when profile is specified
public class DefaultTooHeavyProcessor

Preparation for operation check


Prepare a Web API mouth for the trigger.


//Only required on the Producer side
public class ELDemoController {

  private TooHeavyProcessor processor;

  public ActionResult action() {
    User user = this.randomUser();
    Item item = this.randomItem();
    Date date = this.randomDate();

    this.processor.action(user, item, this.randomDate());

    return new ActionResult(user, item, date);

  public ProcessResult process() {
    String key = RandomStringUtils.randomAlphanumeric(10);
    Date date = this.randomDate();

    this.processor.process(key, date);

    return new ProcessResult(key, date);

  //private method implementation



@Slf4j(topic = "Sender")
public class MessagingTooHeavyProcessorProxyFactoryBean {
  // ... //
  public void afterPropertiesSet() {
    // ... //
    log.info("{}", StringUtils.toString(el));
    this.messageTemplate.convertAndSend("el.exchange", null, el);
  // ... //


//Only required on the Consumer side
@Slf4j(topic = "Receiver")
public class ELReceiver {
  // ... //
  public void receive(ELExpression el) {
    log.info("{}", el);
  // ... //


@Slf4j(topic = "result")
public class DefaultTooHeavyProcessor {
  private static final FastDateFormat FORMATTER = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss.SSS");

  public void action(User user, Item item, Date date) {
    log.info("{} actions to {} at {}!!", StringUtils.toString(user), StringUtils.toString(item),

  public void process(String key, Date date) {
    log.info("process for {} at {}", key, FORMATTER.format(date));

Application property settings


    #JSON date and time format for easy viewing
    date-format: yyyy-MM-dd HH:mm:ss.SSS

  profiles: producer
    port: 8080

  profiles: consumer
    port: 8181

Operation check

Normal startup

Confirm the operation that just executes the first heavy processing as it is.

$ mvn spring-boot:run -Dspring.profiles.active="default,producer"
$ curl http://localhost:8080/process --silent | jq "."
  "key": "39iYWmzDpn",
  "date": "2017-07-03 00:05:31.211"

The following log is displayed in the started application.

[result] process for 39iYWmzDpn at 2017-07-03 00:05:31.211

Next, change the profile and let it be executed on the Consumer side of MQ.

Start to process via MQ

Start RabbitMQ server

$ brew services start rabbitmq
==> Successfully started `rabbitmq` (label: homebrew.mxcl.rabbitmq)

Management screen confirmation

Access http: // localhost: 15672 /

スクリーンショット 2017-02-05 17.20.28.png

Exchange and Queue confirmation

There is nothing yet.

スクリーンショット 2017-02-05 17.22.51.png

スクリーンショット 2017-02-05 17.24.13.png

Start application on Receiver side

$ mvn spring-boot:run -Dspring.profiles.active="consumer,default"

Exchange and Queue confirmation

In case of Spring-Boot, Bean registration of Rabbit Admin seems to be done automatically.

Exchange スクリーンショット 2017-02-05 17.38.00.png

Queue スクリーンショット 2017-02-05 17.37.04.png

Start application on Consumer side

Next, start with the messaging profile instead of the default.

$ mvn spring-boot:run -Dspring.profiles.active="messaging,producer"


$ curl http://localhost:8080/process --silent | jq "."
  "key": "ar9VXblj5S",
  "date": "2017-08-27 11:55:29.319"

Sender log

[Sender] ELExpression:{
  "expression" : "@processor.process(#key, #date)",
  "args" : {
    "key" : "ar9VXblj5S",
    "date" : "2017-08-27 11:55:29.319"

Receiving log

[Receiver] ELExpression:{
  "expression" : "@processor.process(#key, #date)",
  "args" : {
    "key" : "ar9VXblj5S",
    "date" : "2017-08-27 11:55:29.319"
[result] process for ar9VXblj5S at 2017-08-27 11:55:29.319


SpEL convenient. The application is posted on GitHub.

