This time, such processing is scattered, for example, when all of this is done via MQ.
@Autowired
TooHeavyProcessor processor;
public void doSomething() {
// do something..
// too heavy process in async.
async(() -> processor.process(...));
}
Rabbit MQ
$ brew update
$ brew install rabbitmq
$ brew services start rabbitmq
Spring Boot
Maven
Added dependency.
pom.xml
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.5</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.1</version>
</dependency>
A compiler option is added because the parameter name is required.
pom.xml
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<compilerArgument>-parameters</compilerArgument>
<fork>true</fork>
</configuration>
</plugin>
[Heavy process]@any
[Heavy process]@any -> (queue) -> [Heavy process]@consumer
TooHeavyProcessor.java
public interface TooHeavyProcessor {
void action(User user, Item item, Date date);
void process(String key, Date date);
}
This class is running on the Producer side of MQ, so I would like to bring it to the Consumer side.
DefaultTooHeavyProcessor.java
@Slf4j(topic = "result")
public class DefaultTooHeavyProcessor implements TooHeavyProcessor {
@Override
public void action(User user, Item item, Date date) {
//Logging to check the execution result
log.info("{} actions to {} at {}!!", StringUtils.toString(user), StringUtils.toString(item),
FORMATTER.format(date));
}
@Override
public void process(String key, Date date) {
//Logging to check the execution result
log.info("process for {} at {}", key, FORMATTER.format(date));
}
}
I want to message the call itself. So, I came up with SpEL (Spring Expression Language).
processor.action(user, item, date);
this
producer
String el = "@processor.action(#user, #item, #date)";
Map<String, Object> params = new LinkedHashMap<>();
params.add("user", user);
params.add("item", item);
params.add("date", date);
ELExpression message = new ELExpression(el, params);
//Send to MQ
amqpMessageTemplate.convertAndSend("el.exchange", null, message);
consumer
public void receive(ELMessage message) {
//Parse SpEL message
}
If you make it like this, it should be possible to execute it on the Consumer side.
Instead of executing DefaultTooHeavyProcessor
when TooHeavyProcessor
is called, create an implementation class that creates SpEL and sends it to MQ.
Since dynamic behavior is required when called, implement it with Proxy
.
The method of creating Proxy
is as follows.
Proxy
import org.springframework.util.ClassUtils;
private String beanName = "processor";
private String Class<?> type = TooHeavyProcessor.class;
@Autowired
private AmqpTemplate messageTemplate;
Proxy.newProxyInstance(ClassUtils.getDefaultClassLoader(),
new Class[] { this.type }, (bean, method, args) -> {
//SpEL generation
//Utility method to generate ELExpression from beanName, method and args
ELExpression el = SpelUtils.createELExpression(this.beanName, method, args);
//Send to MQ
this.messageTemplate.convertAndSend("el.exchange", null, el);
return null;
});
I want to register this with Bean
, so generate it with FactoryBean
MessagingTooHeavyProcessorProxyFactoryBean.java
@Component
public class MessagingTooHeavyProcessorProxyFactoryBean implements FactoryBean<TooHeavyProcessor>, InitializingBean {
private final String beanName = "processor";
private final Class<TooHeavyProcessor> type = TooHeavyProcessor.class;
private TooHeavyProcessor proxy;
@Autowired
private AmqpTemplate messageTemplate;
@Override
public void afterPropertiesSet() {
this.proxy = (TooHeavyProcessor) /*proxy creation*/;
}
@Override
public TooHeavyProcessor getObject() {
return this.proxy;
}
@Override
public Class<?> getObjectType() {
return this.type;
}
@Override
public boolean isSingleton() {
return true;
}
}
ELReceiver
@Component
public class ELReceiver {
/**A class that parses and executes SpEL*/
@Autowired
private ELExecutor elExecutor;
//Define Exchange and Queue and create if not on Rabbit side
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "el.queue", durable = "true"),
exchange = @Exchange(value = "el.exchange", type = ExchangeTypes.FANOUT, durable = "true")))
public void receive(ELExpression el) {
//Analyze and execute SpEL
this.elExecutor.execute(el);
}
}
The analysis and execution of SpEL is described in detail in the Official documentation, so I will omit it.
Only how to make StandardEvaluationContext
.
ELExecutor.java
@Autowired
private ApplicationContext appContext;
private StandardEvaluationContext createContext(Map<String, Object> args) {
StandardEvaluationContext context = new StandardEvaluationContext();
//Register variable
args.forEach((key, value) -> context.setVariable(key, value));
//Register BeanResolver so that Bean can be resolved from Bean name
context.setBeanResolver(
(_self, beanName) -> this.appContext.getBean(beanName));
return context;
}
At this time, since the bean name is specified as processor
in MessagingTooHeavyProcessorProxyFactoryBean
, match the bean name properly.
Also, I want to switch the bean name in the profile, so specify the profile.
MessagingTooHeavyProcessorProxyFactoryBean.java
// "messaging"Bean registration only when profile is specified
@Profile("messaging")
@Component("processor")
public class MessagingTooHeavyProcessorProxyFactoryBean
DefaultTooHeavyProcessor.java
// "default"Bean registration only when profile is specified
@Profile("default")
@Component("processor")
public class DefaultTooHeavyProcessor
Web API
Prepare a Web API mouth for the trigger.
ELDemoController.java
//Only required on the Producer side
@Profile("producer")
@RestController
public class ELDemoController {
@Autowired
private TooHeavyProcessor processor;
@GetMapping("/action")
public ActionResult action() {
User user = this.randomUser();
Item item = this.randomItem();
Date date = this.randomDate();
this.processor.action(user, item, this.randomDate());
return new ActionResult(user, item, date);
}
@GetMapping("/process")
public ProcessResult process() {
String key = RandomStringUtils.randomAlphanumeric(10);
Date date = this.randomDate();
this.processor.process(key, date);
return new ProcessResult(key, date);
}
//private method implementation
}
MessagingTooHeavyProcessorProxyFactoryBean.java
@Slf4j(topic = "Sender")
public class MessagingTooHeavyProcessorProxyFactoryBean {
// ... //
@Override
public void afterPropertiesSet() {
// ... //
log.info("{}", StringUtils.toString(el));
this.messageTemplate.convertAndSend("el.exchange", null, el);
}
// ... //
}
ELReceiver.java
//Only required on the Consumer side
@Profile("consumer")
@Slf4j(topic = "Receiver")
public class ELReceiver {
// ... //
public void receive(ELExpression el) {
log.info("{}", el);
this.elExecutor.execute(el);
}
// ... //
}
DefaultTooHeavyProcessor.java
@Slf4j(topic = "result")
public class DefaultTooHeavyProcessor {
private static final FastDateFormat FORMATTER = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss.SSS");
@Override
public void action(User user, Item item, Date date) {
log.info("{} actions to {} at {}!!", StringUtils.toString(user), StringUtils.toString(item),
FORMATTER.format(date));
}
@Override
public void process(String key, Date date) {
log.info("process for {} at {}", key, FORMATTER.format(date));
}
}
application.yaml
spring:
jackson:
#JSON date and time format for easy viewing
date-format: yyyy-MM-dd HH:mm:ss.SSS
---
spring:
profiles: producer
server:
port: 8080
---
spring:
profiles: consumer
server:
port: 8181
Confirm the operation that just executes the first heavy processing as it is.
$ mvn spring-boot:run -Dspring.profiles.active="default,producer"
$ curl http://localhost:8080/process --silent | jq "."
{
"key": "39iYWmzDpn",
"date": "2017-07-03 00:05:31.211"
}
The following log is displayed in the started application.
[result] process for 39iYWmzDpn at 2017-07-03 00:05:31.211
Next, change the profile and let it be executed on the Consumer side of MQ.
$ brew services start rabbitmq
==> Successfully started `rabbitmq` (label: homebrew.mxcl.rabbitmq)
Access http: // localhost: 15672 /
There is nothing yet.
$ mvn spring-boot:run -Dspring.profiles.active="consumer,default"
In case of Spring-Boot, Bean registration of Rabbit Admin
seems to be done automatically.
Exchange
Queue
Next, start with the messaging
profile instead of the default
.
$ mvn spring-boot:run -Dspring.profiles.active="messaging,producer"
$ curl http://localhost:8080/process --silent | jq "."
{
"key": "ar9VXblj5S",
"date": "2017-08-27 11:55:29.319"
}
Sender log
[Sender] ELExpression:{
"expression" : "@processor.process(#key, #date)",
"args" : {
"key" : "ar9VXblj5S",
"date" : "2017-08-27 11:55:29.319"
}
}
Receiving log
[Receiver] ELExpression:{
"expression" : "@processor.process(#key, #date)",
"args" : {
"key" : "ar9VXblj5S",
"date" : "2017-08-27 11:55:29.319"
}
}
[result] process for ar9VXblj5S at 2017-08-27 11:55:29.319
SpEL convenient. The application is posted on GitHub.
Recommended Posts