[JAVA] Try changing to asynchronous processing via MQ without changing the code

Thing you want to do

This time, such processing is scattered, for example, when all of this is done via MQ.

@Autowired
TooHeavyProcessor processor;

public void doSomething() {
  // do something..

  // too heavy process in async.
  async(() -> processor.process(...));
}

Preparation

Rabbit MQ

Installing on Homebrew

$ brew update
$ brew install rabbitmq
$ brew services start rabbitmq 

Spring Boot

Start with Spring Initializr

Maven

Added dependency.

pom.xml


<dependency>
  <groupId>org.apache.commons</groupId>
  <artifactId>commons-lang3</artifactId>
  <version>3.5</version>
</dependency>
<dependency>
  <groupId>org.apache.commons</groupId>
  <artifactId>commons-collections4</artifactId>
  <version>4.1</version>
</dependency>

A compiler option is added because the parameter name is required.

pom.xml


<plugin>
  <groupId>org.apache.maven.plugins</groupId>
  <artifactId>maven-compiler-plugin</artifactId>
  <configuration>
    <compilerArgument>-parameters</compilerArgument>
    <fork>true</fork>
  </configuration>
</plugin>

Rough changes

Processing content

Change before

[Heavy process]@any

After change

[Heavy process]@any -> (queue) -> [Heavy process]@consumer

Each class

interface

TooHeavyProcessor.java


public interface TooHeavyProcessor {

  void action(User user, Item item, Date date);

  void process(String key, Date date);
}

Implementation class

Default implementation

This class is running on the Producer side of MQ, so I would like to bring it to the Consumer side.

DefaultTooHeavyProcessor.java


@Slf4j(topic = "result")
public class DefaultTooHeavyProcessor implements TooHeavyProcessor {

  @Override
  public void action(User user, Item item, Date date) {
    //Logging to check the execution result
    log.info("{} actions to {} at {}!!", StringUtils.toString(user), StringUtils.toString(item),
      FORMATTER.format(date));
  }

  @Override
  public void process(String key, Date date) {
    //Logging to check the execution result
    log.info("process for {} at {}", key, FORMATTER.format(date));
  }

}

I want to message the call itself. So, I came up with SpEL (Spring Expression Language).

processor.action(user, item, date);

this

producer


String el = "@processor.action(#user, #item, #date)";
Map<String, Object> params = new LinkedHashMap<>();
params.add("user", user);
params.add("item", item);
params.add("date", date);

ELExpression message = new ELExpression(el, params);

//Send to MQ
amqpMessageTemplate.convertAndSend("el.exchange", null, message);

consumer


public void receive(ELMessage message) {
  //Parse SpEL message
}

If you make it like this, it should be possible to execute it on the Consumer side.

MQ send class implementation

Instead of executing DefaultTooHeavyProcessor when TooHeavyProcessor is called, create an implementation class that creates SpEL and sends it to MQ.

Since dynamic behavior is required when called, implement it with Proxy. The method of creating Proxy is as follows.

Proxy


import org.springframework.util.ClassUtils;

private String beanName = "processor";
private String Class<?> type = TooHeavyProcessor.class;

@Autowired
private AmqpTemplate messageTemplate;

Proxy.newProxyInstance(ClassUtils.getDefaultClassLoader(),
  new Class[] { this.type }, (bean, method, args) -> {
    //SpEL generation
    //Utility method to generate ELExpression from beanName, method and args
    ELExpression el = SpelUtils.createELExpression(this.beanName, method, args);
    //Send to MQ
    this.messageTemplate.convertAndSend("el.exchange", null, el);
    return null;
  });

I want to register this with Bean, so generate it with FactoryBean

MessagingTooHeavyProcessorProxyFactoryBean.java


@Component
public class MessagingTooHeavyProcessorProxyFactoryBean implements FactoryBean<TooHeavyProcessor>, InitializingBean {

  private final String beanName = "processor";
  private final Class<TooHeavyProcessor> type = TooHeavyProcessor.class;

  private TooHeavyProcessor proxy;

  @Autowired
  private AmqpTemplate messageTemplate;

  @Override
  public void afterPropertiesSet() {
    this.proxy = (TooHeavyProcessor) /*proxy creation*/;
  }

  @Override
  public TooHeavyProcessor getObject() {
    return this.proxy;
  }

  @Override
  public Class<?> getObjectType() {
    return this.type;
  }

  @Override
  public boolean isSingleton() {
    return true;
  }
}

MQ Receiver class

ELReceiver


@Component
public class ELReceiver {

  /**A class that parses and executes SpEL*/
  @Autowired
  private ELExecutor elExecutor;

  //Define Exchange and Queue and create if not on Rabbit side
  @RabbitListener(bindings = @QueueBinding(
    value = @Queue(value = "el.queue", durable = "true"),
    exchange = @Exchange(value = "el.exchange", type = ExchangeTypes.FANOUT, durable = "true")))
  public void receive(ELExpression el) {
    //Analyze and execute SpEL
    this.elExecutor.execute(el);
  }
}

The analysis and execution of SpEL is described in detail in the Official documentation, so I will omit it.

Only how to make StandardEvaluationContext.

ELExecutor.java


@Autowired
private ApplicationContext appContext;

private StandardEvaluationContext createContext(Map<String, Object> args) {
  StandardEvaluationContext context = new StandardEvaluationContext();

  //Register variable
  args.forEach((key, value) -> context.setVariable(key, value));
  //Register BeanResolver so that Bean can be resolved from Bean name
  context.setBeanResolver(
    (_self, beanName) -> this.appContext.getBean(beanName));

  return context;
}

At this time, since the bean name is specified as processor in MessagingTooHeavyProcessorProxyFactoryBean, match the bean name properly. Also, I want to switch the bean name in the profile, so specify the profile.

MessagingTooHeavyProcessorProxyFactoryBean.java


// "messaging"Bean registration only when profile is specified
@Profile("messaging")
@Component("processor")
public class MessagingTooHeavyProcessorProxyFactoryBean

DefaultTooHeavyProcessor.java


// "default"Bean registration only when profile is specified
@Profile("default")
@Component("processor")
public class DefaultTooHeavyProcessor

Preparation for operation check

Web API

Prepare a Web API mouth for the trigger.

ELDemoController.java


//Only required on the Producer side
@Profile("producer")
@RestController
public class ELDemoController {

  @Autowired
  private TooHeavyProcessor processor;

  @GetMapping("/action")
  public ActionResult action() {
    User user = this.randomUser();
    Item item = this.randomItem();
    Date date = this.randomDate();

    this.processor.action(user, item, this.randomDate());

    return new ActionResult(user, item, date);
  }

  @GetMapping("/process")
  public ProcessResult process() {
    String key = RandomStringUtils.randomAlphanumeric(10);
    Date date = this.randomDate();

    this.processor.process(key, date);

    return new ProcessResult(key, date);
  }

  //private method implementation
}

Logging

MessagingTooHeavyProcessorProxyFactoryBean.java


@Slf4j(topic = "Sender")
public class MessagingTooHeavyProcessorProxyFactoryBean {
  // ... //
  @Override
  public void afterPropertiesSet() {
    // ... //
    log.info("{}", StringUtils.toString(el));
    this.messageTemplate.convertAndSend("el.exchange", null, el);
  }
  // ... //
}

ELReceiver.java


//Only required on the Consumer side
@Profile("consumer")
@Slf4j(topic = "Receiver")
public class ELReceiver {
  // ... //
  public void receive(ELExpression el) {
    log.info("{}", el);
    this.elExecutor.execute(el);
  }
  // ... //
}

DefaultTooHeavyProcessor.java


@Slf4j(topic = "result")
public class DefaultTooHeavyProcessor {
  private static final FastDateFormat FORMATTER = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss.SSS");

  @Override
  public void action(User user, Item item, Date date) {
    log.info("{} actions to {} at {}!!", StringUtils.toString(user), StringUtils.toString(item),
      FORMATTER.format(date));
  }

  @Override
  public void process(String key, Date date) {
    log.info("process for {} at {}", key, FORMATTER.format(date));
  }
}

Application property settings

application.yaml


spring:
  jackson:
    #JSON date and time format for easy viewing
    date-format: yyyy-MM-dd HH:mm:ss.SSS

---
spring:
  profiles: producer
server:
    port: 8080

---
spring:
  profiles: consumer
server:
    port: 8181

Operation check

Normal startup

Confirm the operation that just executes the first heavy processing as it is.

$ mvn spring-boot:run -Dspring.profiles.active="default,producer"
$ curl http://localhost:8080/process --silent | jq "."
{
  "key": "39iYWmzDpn",
  "date": "2017-07-03 00:05:31.211"
}

The following log is displayed in the started application.

[result] process for 39iYWmzDpn at 2017-07-03 00:05:31.211

Next, change the profile and let it be executed on the Consumer side of MQ.

Start to process via MQ

Start RabbitMQ server

$ brew services start rabbitmq
==> Successfully started `rabbitmq` (label: homebrew.mxcl.rabbitmq)

Management screen confirmation

Access http: // localhost: 15672 /

スクリーンショット 2017-02-05 17.20.28.png

Exchange and Queue confirmation

There is nothing yet.

スクリーンショット 2017-02-05 17.22.51.png

スクリーンショット 2017-02-05 17.24.13.png

Start application on Receiver side

$ mvn spring-boot:run -Dspring.profiles.active="consumer,default"

Exchange and Queue confirmation

In case of Spring-Boot, Bean registration of Rabbit Admin seems to be done automatically.

Exchange スクリーンショット 2017-02-05 17.38.00.png

Queue スクリーンショット 2017-02-05 17.37.04.png

Start application on Consumer side

Next, start with the messaging profile instead of the default.

$ mvn spring-boot:run -Dspring.profiles.active="messaging,producer"

Verification

$ curl http://localhost:8080/process --silent | jq "."
{
  "key": "ar9VXblj5S",
  "date": "2017-08-27 11:55:29.319"
}

Sender log


[Sender] ELExpression:{
  "expression" : "@processor.process(#key, #date)",
  "args" : {
    "key" : "ar9VXblj5S",
    "date" : "2017-08-27 11:55:29.319"
  }
}

Receiving log


[Receiver] ELExpression:{
  "expression" : "@processor.process(#key, #date)",
  "args" : {
    "key" : "ar9VXblj5S",
    "date" : "2017-08-27 11:55:29.319"
  }
}
[result] process for ar9VXblj5S at 2017-08-27 11:55:29.319

Summary

SpEL convenient. The application is posted on GitHub.

Recommended Posts

Try changing to asynchronous processing via MQ without changing the code
Try changing the .erb file to .slim
[Java] Try to solve the Fizz Buzz problem using recursive processing
I tried migrating Processing to VS Code
Try using the Rails API (zip code)
[Processing × Java] How to use the loop
[Processing × Java] How to use the class
Try to use docker command without sudo.
[Processing × Java] How to use the function
How to implement asynchronous processing in Outsystems
Why was the code painful to read?