Diesmal ist eine solche Verarbeitung beispielsweise verstreut, wenn dies alles über MQ erfolgt.
@Autowired
TooHeavyProcessor processor;
public void doSomething() {
// do something..
// too heavy process in async.
async(() -> processor.process(...));
}
Rabbit MQ
$ brew update
$ brew install rabbitmq
$ brew services start rabbitmq
Spring Boot
Maven
Abhängigkeit hinzugefügt.
pom.xml
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.5</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.1</version>
</dependency>
Compiler-Option hinzugefügt, da Parametername erforderlich ist.
pom.xml
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<compilerArgument>-parameters</compilerArgument>
<fork>true</fork>
</configuration>
</plugin>
[Heavy process]@any
[Heavy process]@any -> (queue) -> [Heavy process]@consumer
TooHeavyProcessor.java
public interface TooHeavyProcessor {
void action(User user, Item item, Date date);
void process(String key, Date date);
}
Diese Klasse läuft auf der Producer-Seite von MQ, daher möchte ich sie auf die Consumer-Seite bringen.
DefaultTooHeavyProcessor.java
@Slf4j(topic = "Ergebnis")
public class DefaultTooHeavyProcessor implements TooHeavyProcessor {
@Override
public void action(User user, Item item, Date date) {
//Protokollierung zur Überprüfung des Ausführungsergebnisses
log.info("{} actions to {} at {}!!", StringUtils.toString(user), StringUtils.toString(item),
FORMATTER.format(date));
}
@Override
public void process(String key, Date date) {
//Protokollierung zur Überprüfung des Ausführungsergebnisses
log.info("process for {} at {}", key, FORMATTER.format(date));
}
}
Ich möchte den Anruf selbst senden. Also habe ich mir SpEL (Spring Expression Language) ausgedacht.
processor.action(user, item, date);
Dies
producer
String el = "@processor.action(#user, #item, #date)";
Map<String, Object> params = new LinkedHashMap<>();
params.add("user", user);
params.add("item", item);
params.add("date", date);
ELExpression message = new ELExpression(el, params);
//An MQ senden
amqpMessageTemplate.convertAndSend("el.exchange", null, message);
consumer
public void receive(ELMessage message) {
//Spel-Nachricht analysieren
}
Es sollte möglich sein, es auf der Verbraucherseite auszuführen.
Anstatt "DefaultTooHeavyProcessor" auszuführen, wenn "TooHeavyProcessor" aufgerufen wird, erstellen Sie eine Implementierungsklasse, die SpEL erstellt und an MQ sendet.
Da beim Aufruf dynamisches Verhalten erforderlich ist, implementieren Sie es mit "Proxy". Die Methode zum Erstellen von "Proxy" ist wie folgt.
Proxy
import org.springframework.util.ClassUtils;
private String beanName = "processor";
private String Class<?> type = TooHeavyProcessor.class;
@Autowired
private AmqpTemplate messageTemplate;
Proxy.newProxyInstance(ClassUtils.getDefaultClassLoader(),
new Class[] { this.type }, (bean, method, args) -> {
//SpEL-Generierung
//Dienstprogrammmethode zum Generieren von ELExpression aus beanName, method und args
ELExpression el = SpelUtils.createELExpression(this.beanName, method, args);
//An MQ senden
this.messageTemplate.convertAndSend("el.exchange", null, el);
return null;
});
Ich möchte dies bei Bean
registrieren, also generiere es bei FactoryBean
MessagingTooHeavyProcessorProxyFactoryBean.java
@Component
public class MessagingTooHeavyProcessorProxyFactoryBean implements FactoryBean<TooHeavyProcessor>, InitializingBean {
private final String beanName = "processor";
private final Class<TooHeavyProcessor> type = TooHeavyProcessor.class;
private TooHeavyProcessor proxy;
@Autowired
private AmqpTemplate messageTemplate;
@Override
public void afterPropertiesSet() {
this.proxy = (TooHeavyProcessor) /*Proxy-Erstellung*/;
}
@Override
public TooHeavyProcessor getObject() {
return this.proxy;
}
@Override
public Class<?> getObjectType() {
return this.type;
}
@Override
public boolean isSingleton() {
return true;
}
}
ELReceiver
@Component
public class ELReceiver {
/**Klasse, die SpEL analysiert und ausführt*/
@Autowired
private ELExecutor elExecutor;
//Definieren Sie Exchange und Queue und erstellen Sie, wenn nicht auf der Rabbit-Seite
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "el.queue", durable = "true"),
exchange = @Exchange(value = "el.exchange", type = ExchangeTypes.FANOUT, durable = "true")))
public void receive(ELExpression el) {
//SpEL analysieren und ausführen
this.elExecutor.execute(el);
}
}
Die Analyse und Ausführung von SpEL ist im Official Document ausführlich beschrieben, daher werde ich es weglassen.
Nur wie man "StandardEvaluationContext" macht.
ELExecutor.java
@Autowired
private ApplicationContext appContext;
private StandardEvaluationContext createContext(Map<String, Object> args) {
StandardEvaluationContext context = new StandardEvaluationContext();
//Variablen registrieren
args.forEach((key, value) -> context.setVariable(key, value));
//Registrieren Sie BeanResolver, damit Bean aus dem Bean-Namen aufgelöst werden kann
context.setBeanResolver(
(_self, beanName) -> this.appContext.getBean(beanName));
return context;
}
Zu diesem Zeitpunkt wird der Bean-Name in "MessagingTooHeavyProcessorProxyFactoryBean" als "Prozessor" angegeben. Passen Sie also den Bean-Namen richtig an. Ich möchte auch den Bean-Namen im Profil ändern, also geben Sie das Profil an.
MessagingTooHeavyProcessorProxyFactoryBean.java
// "messaging"Bean-Registrierung nur, wenn ein Profil angegeben ist
@Profile("messaging")
@Component("processor")
public class MessagingTooHeavyProcessorProxyFactoryBean
DefaultTooHeavyProcessor.java
// "default"Bean-Registrierung nur, wenn ein Profil angegeben ist
@Profile("default")
@Component("processor")
public class DefaultTooHeavyProcessor
Web API
Halten Sie einen Web-API-Mund für den Auslöser bereit.
ELDemoController.java
//Nur auf der Herstellerseite erforderlich
@Profile("producer")
@RestController
public class ELDemoController {
@Autowired
private TooHeavyProcessor processor;
@GetMapping("/action")
public ActionResult action() {
User user = this.randomUser();
Item item = this.randomItem();
Date date = this.randomDate();
this.processor.action(user, item, this.randomDate());
return new ActionResult(user, item, date);
}
@GetMapping("/process")
public ProcessResult process() {
String key = RandomStringUtils.randomAlphanumeric(10);
Date date = this.randomDate();
this.processor.process(key, date);
return new ProcessResult(key, date);
}
//Private Methodenimplementierung
}
MessagingTooHeavyProcessorProxyFactoryBean.java
@Slf4j(topic = "Absender")
public class MessagingTooHeavyProcessorProxyFactoryBean {
// ... //
@Override
public void afterPropertiesSet() {
// ... //
log.info("{}", StringUtils.toString(el));
this.messageTemplate.convertAndSend("el.exchange", null, el);
}
// ... //
}
ELReceiver.java
//Nur auf der Verbraucherseite erforderlich
@Profile("consumer")
@Slf4j(topic = "Empfänger")
public class ELReceiver {
// ... //
public void receive(ELExpression el) {
log.info("{}", el);
this.elExecutor.execute(el);
}
// ... //
}
DefaultTooHeavyProcessor.java
@Slf4j(topic = "Ergebnis")
public class DefaultTooHeavyProcessor {
private static final FastDateFormat FORMATTER = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss.SSS");
@Override
public void action(User user, Item item, Date date) {
log.info("{} actions to {} at {}!!", StringUtils.toString(user), StringUtils.toString(item),
FORMATTER.format(date));
}
@Override
public void process(String key, Date date) {
log.info("process for {} at {}", key, FORMATTER.format(date));
}
}
application.yaml
spring:
jackson:
#JSON-Datums- und Uhrzeitformat für einfache Anzeige
date-format: yyyy-MM-dd HH:mm:ss.SSS
---
spring:
profiles: producer
server:
port: 8080
---
spring:
profiles: consumer
server:
port: 8181
Bestätigen Sie den Vorgang, der gerade die erste schwere Verarbeitung ausführt.
$ mvn spring-boot:run -Dspring.profiles.active="default,producer"
$ curl http://localhost:8080/process --silent | jq "."
{
"key": "39iYWmzDpn",
"date": "2017-07-03 00:05:31.211"
}
Das folgende Protokoll wird in der gestarteten Anwendung angezeigt.
[Ergebnis] process for 39iYWmzDpn at 2017-07-03 00:05:31.211
Ändern Sie als Nächstes das Profil und lassen Sie es auf der Verbraucherseite von MQ ausführen.
$ brew services start rabbitmq
==> Successfully started `rabbitmq` (label: homebrew.mxcl.rabbitmq)
Greifen Sie auf http: // localhost: 15672 / zu
Es gibt noch nichts.
$ mvn spring-boot:run -Dspring.profiles.active="consumer,default"
Im Fall von Spring-Boot scheint die Bean-Registrierung von Rabbit Admin
automatisch zu erfolgen.
Exchange
Queue
Beginnen Sie als Nächstes mit dem "Messaging" -Profil anstelle des "Standard" -Profils.
$ mvn spring-boot:run -Dspring.profiles.active="messaging,producer"
$ curl http://localhost:8080/process --silent | jq "."
{
"key": "ar9VXblj5S",
"date": "2017-08-27 11:55:29.319"
}
Absenderprotokoll
[Absender] ELExpression:{
"expression" : "@processor.process(#key, #date)",
"args" : {
"key" : "ar9VXblj5S",
"date" : "2017-08-27 11:55:29.319"
}
}
Protokoll empfangen
[Empfänger] ELExpression:{
"expression" : "@processor.process(#key, #date)",
"args" : {
"key" : "ar9VXblj5S",
"date" : "2017-08-27 11:55:29.319"
}
}
[Ergebnis] process for ar9VXblj5S at 2017-08-27 11:55:29.319
SpEL bequem. Die Anwendung ist auf GitHub aufgeführt.
Recommended Posts