[JAVA] Versuchen Sie, über MQ zur asynchronen Verarbeitung zu wechseln, ohne den Code zu ändern

Was du machen willst

Diesmal ist eine solche Verarbeitung beispielsweise verstreut, wenn dies alles über MQ erfolgt.

@Autowired
TooHeavyProcessor processor;

public void doSomething() {
  // do something..

  // too heavy process in async.
  async(() -> processor.process(...));
}

Vorbereitung

Rabbit MQ

Installing on Homebrew

$ brew update
$ brew install rabbitmq
$ brew services start rabbitmq 

Spring Boot

Start with Spring Initializr

Maven

Abhängigkeit hinzugefügt.

pom.xml


<dependency>
  <groupId>org.apache.commons</groupId>
  <artifactId>commons-lang3</artifactId>
  <version>3.5</version>
</dependency>
<dependency>
  <groupId>org.apache.commons</groupId>
  <artifactId>commons-collections4</artifactId>
  <version>4.1</version>
</dependency>

Compiler-Option hinzugefügt, da Parametername erforderlich ist.

pom.xml


<plugin>
  <groupId>org.apache.maven.plugins</groupId>
  <artifactId>maven-compiler-plugin</artifactId>
  <configuration>
    <compilerArgument>-parameters</compilerArgument>
    <fork>true</fork>
  </configuration>
</plugin>

Grobe Veränderungen

Inhalte verarbeiten

Vorher ändern

[Heavy process]@any

Nach der veränderung

[Heavy process]@any -> (queue) -> [Heavy process]@consumer

Jede Klasse

Schnittstelle

TooHeavyProcessor.java


public interface TooHeavyProcessor {

  void action(User user, Item item, Date date);

  void process(String key, Date date);
}

Implementierungsklasse

Standardimplementierung

Diese Klasse läuft auf der Producer-Seite von MQ, daher möchte ich sie auf die Consumer-Seite bringen.

DefaultTooHeavyProcessor.java


@Slf4j(topic = "Ergebnis")
public class DefaultTooHeavyProcessor implements TooHeavyProcessor {

  @Override
  public void action(User user, Item item, Date date) {
    //Protokollierung zur Überprüfung des Ausführungsergebnisses
    log.info("{} actions to {} at {}!!", StringUtils.toString(user), StringUtils.toString(item),
      FORMATTER.format(date));
  }

  @Override
  public void process(String key, Date date) {
    //Protokollierung zur Überprüfung des Ausführungsergebnisses
    log.info("process for {} at {}", key, FORMATTER.format(date));
  }

}

Ich möchte den Anruf selbst senden. Also habe ich mir SpEL (Spring Expression Language) ausgedacht.

processor.action(user, item, date);

Dies

producer


String el = "@processor.action(#user, #item, #date)";
Map<String, Object> params = new LinkedHashMap<>();
params.add("user", user);
params.add("item", item);
params.add("date", date);

ELExpression message = new ELExpression(el, params);

//An MQ senden
amqpMessageTemplate.convertAndSend("el.exchange", null, message);

consumer


public void receive(ELMessage message) {
  //Spel-Nachricht analysieren
}

Es sollte möglich sein, es auf der Verbraucherseite auszuführen.

Implementierung der MQ-Übertragungsklasse

Anstatt "DefaultTooHeavyProcessor" auszuführen, wenn "TooHeavyProcessor" aufgerufen wird, erstellen Sie eine Implementierungsklasse, die SpEL erstellt und an MQ sendet.

Da beim Aufruf dynamisches Verhalten erforderlich ist, implementieren Sie es mit "Proxy". Die Methode zum Erstellen von "Proxy" ist wie folgt.

Proxy


import org.springframework.util.ClassUtils;

private String beanName = "processor";
private String Class<?> type = TooHeavyProcessor.class;

@Autowired
private AmqpTemplate messageTemplate;

Proxy.newProxyInstance(ClassUtils.getDefaultClassLoader(),
  new Class[] { this.type }, (bean, method, args) -> {
    //SpEL-Generierung
    //Dienstprogrammmethode zum Generieren von ELExpression aus beanName, method und args
    ELExpression el = SpelUtils.createELExpression(this.beanName, method, args);
    //An MQ senden
    this.messageTemplate.convertAndSend("el.exchange", null, el);
    return null;
  });

Ich möchte dies bei Bean registrieren, also generiere es bei FactoryBean

MessagingTooHeavyProcessorProxyFactoryBean.java


@Component
public class MessagingTooHeavyProcessorProxyFactoryBean implements FactoryBean<TooHeavyProcessor>, InitializingBean {

  private final String beanName = "processor";
  private final Class<TooHeavyProcessor> type = TooHeavyProcessor.class;

  private TooHeavyProcessor proxy;

  @Autowired
  private AmqpTemplate messageTemplate;

  @Override
  public void afterPropertiesSet() {
    this.proxy = (TooHeavyProcessor) /*Proxy-Erstellung*/;
  }

  @Override
  public TooHeavyProcessor getObject() {
    return this.proxy;
  }

  @Override
  public Class<?> getObjectType() {
    return this.type;
  }

  @Override
  public boolean isSingleton() {
    return true;
  }
}

MQ-Empfängerklasse

ELReceiver


@Component
public class ELReceiver {

  /**Klasse, die SpEL analysiert und ausführt*/
  @Autowired
  private ELExecutor elExecutor;

  //Definieren Sie Exchange und Queue und erstellen Sie, wenn nicht auf der Rabbit-Seite
  @RabbitListener(bindings = @QueueBinding(
    value = @Queue(value = "el.queue", durable = "true"),
    exchange = @Exchange(value = "el.exchange", type = ExchangeTypes.FANOUT, durable = "true")))
  public void receive(ELExpression el) {
    //SpEL analysieren und ausführen
    this.elExecutor.execute(el);
  }
}

Die Analyse und Ausführung von SpEL ist im Official Document ausführlich beschrieben, daher werde ich es weglassen.

Nur wie man "StandardEvaluationContext" macht.

ELExecutor.java


@Autowired
private ApplicationContext appContext;

private StandardEvaluationContext createContext(Map<String, Object> args) {
  StandardEvaluationContext context = new StandardEvaluationContext();

  //Variablen registrieren
  args.forEach((key, value) -> context.setVariable(key, value));
  //Registrieren Sie BeanResolver, damit Bean aus dem Bean-Namen aufgelöst werden kann
  context.setBeanResolver(
    (_self, beanName) -> this.appContext.getBean(beanName));

  return context;
}

Zu diesem Zeitpunkt wird der Bean-Name in "MessagingTooHeavyProcessorProxyFactoryBean" als "Prozessor" angegeben. Passen Sie also den Bean-Namen richtig an. Ich möchte auch den Bean-Namen im Profil ändern, also geben Sie das Profil an.

MessagingTooHeavyProcessorProxyFactoryBean.java


// "messaging"Bean-Registrierung nur, wenn ein Profil angegeben ist
@Profile("messaging")
@Component("processor")
public class MessagingTooHeavyProcessorProxyFactoryBean

DefaultTooHeavyProcessor.java


// "default"Bean-Registrierung nur, wenn ein Profil angegeben ist
@Profile("default")
@Component("processor")
public class DefaultTooHeavyProcessor

Vorbereitung zur Funktionsprüfung

Web API

Halten Sie einen Web-API-Mund für den Auslöser bereit.

ELDemoController.java


//Nur auf der Herstellerseite erforderlich
@Profile("producer")
@RestController
public class ELDemoController {

  @Autowired
  private TooHeavyProcessor processor;

  @GetMapping("/action")
  public ActionResult action() {
    User user = this.randomUser();
    Item item = this.randomItem();
    Date date = this.randomDate();

    this.processor.action(user, item, this.randomDate());

    return new ActionResult(user, item, date);
  }

  @GetMapping("/process")
  public ProcessResult process() {
    String key = RandomStringUtils.randomAlphanumeric(10);
    Date date = this.randomDate();

    this.processor.process(key, date);

    return new ProcessResult(key, date);
  }

  //Private Methodenimplementierung
}

Protokollierung

MessagingTooHeavyProcessorProxyFactoryBean.java


@Slf4j(topic = "Absender")
public class MessagingTooHeavyProcessorProxyFactoryBean {
  // ... //
  @Override
  public void afterPropertiesSet() {
    // ... //
    log.info("{}", StringUtils.toString(el));
    this.messageTemplate.convertAndSend("el.exchange", null, el);
  }
  // ... //
}

ELReceiver.java


//Nur auf der Verbraucherseite erforderlich
@Profile("consumer")
@Slf4j(topic = "Empfänger")
public class ELReceiver {
  // ... //
  public void receive(ELExpression el) {
    log.info("{}", el);
    this.elExecutor.execute(el);
  }
  // ... //
}

DefaultTooHeavyProcessor.java


@Slf4j(topic = "Ergebnis")
public class DefaultTooHeavyProcessor {
  private static final FastDateFormat FORMATTER = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss.SSS");

  @Override
  public void action(User user, Item item, Date date) {
    log.info("{} actions to {} at {}!!", StringUtils.toString(user), StringUtils.toString(item),
      FORMATTER.format(date));
  }

  @Override
  public void process(String key, Date date) {
    log.info("process for {} at {}", key, FORMATTER.format(date));
  }
}

Einstellungen für Anwendungseigenschaften

application.yaml


spring:
  jackson:
    #JSON-Datums- und Uhrzeitformat für einfache Anzeige
    date-format: yyyy-MM-dd HH:mm:ss.SSS

---
spring:
  profiles: producer
server:
    port: 8080

---
spring:
  profiles: consumer
server:
    port: 8181

Funktionsprüfung

Normaler Start

Bestätigen Sie den Vorgang, der gerade die erste schwere Verarbeitung ausführt.

$ mvn spring-boot:run -Dspring.profiles.active="default,producer"
$ curl http://localhost:8080/process --silent | jq "."
{
  "key": "39iYWmzDpn",
  "date": "2017-07-03 00:05:31.211"
}

Das folgende Protokoll wird in der gestarteten Anwendung angezeigt.

[Ergebnis] process for 39iYWmzDpn at 2017-07-03 00:05:31.211

Ändern Sie als Nächstes das Profil und lassen Sie es auf der Verbraucherseite von MQ ausführen.

Starten Sie die Verarbeitung über MQ

Starten Sie den Rabbit MQ-Server

$ brew services start rabbitmq
==> Successfully started `rabbitmq` (label: homebrew.mxcl.rabbitmq)

Bestätigung des Verwaltungsbildschirms

Greifen Sie auf http: // localhost: 15672 / zu

スクリーンショット 2017-02-05 17.20.28.png

Exchange- und Warteschlangenbestätigung

Es gibt noch nichts.

スクリーンショット 2017-02-05 17.22.51.png

スクリーンショット 2017-02-05 17.24.13.png

Starten Sie die Anwendung auf der Empfängerseite

$ mvn spring-boot:run -Dspring.profiles.active="consumer,default"

Exchange- und Warteschlangenbestätigung

Im Fall von Spring-Boot scheint die Bean-Registrierung von Rabbit Admin automatisch zu erfolgen.

Exchange スクリーンショット 2017-02-05 17.38.00.png

Queue スクリーンショット 2017-02-05 17.37.04.png

Starten Sie die Anwendung auf der Verbraucherseite

Beginnen Sie als Nächstes mit dem "Messaging" -Profil anstelle des "Standard" -Profils.

$ mvn spring-boot:run -Dspring.profiles.active="messaging,producer"

Bestätigung

$ curl http://localhost:8080/process --silent | jq "."
{
  "key": "ar9VXblj5S",
  "date": "2017-08-27 11:55:29.319"
}

Absenderprotokoll


[Absender] ELExpression:{
  "expression" : "@processor.process(#key, #date)",
  "args" : {
    "key" : "ar9VXblj5S",
    "date" : "2017-08-27 11:55:29.319"
  }
}

Protokoll empfangen


[Empfänger] ELExpression:{
  "expression" : "@processor.process(#key, #date)",
  "args" : {
    "key" : "ar9VXblj5S",
    "date" : "2017-08-27 11:55:29.319"
  }
}
[Ergebnis] process for ar9VXblj5S at 2017-08-27 11:55:29.319

Zusammenfassung

SpEL bequem. Die Anwendung ist auf GitHub aufgeführt.

Recommended Posts

Versuchen Sie, über MQ zur asynchronen Verarbeitung zu wechseln, ohne den Code zu ändern
Versuchen Sie, die .erb-Datei in .slim zu ändern
[Java] Versuchen Sie, das Fizz Buzz-Problem mithilfe der rekursiven Verarbeitung zu lösen
Ich habe versucht, Processing auf VS Code zu migrieren
[Verarbeitung × Java] Verwendung der Schleife
[Verarbeitung × Java] Verwendung der Klasse
[Verarbeitung × Java] Verwendung der Funktion
So implementieren Sie die asynchrone Verarbeitung in Outsystems
Warum war das Lesen des Codes schmerzhaft?