[JAVA] Essayez de passer au traitement asynchrone via MQ sans changer le code

Chose que tu veux faire

Cette fois, un tel traitement est dispersé, par exemple, lorsque tout cela est fait via MQ.

@Autowired
TooHeavyProcessor processor;

public void doSomething() {
  // do something..

  // too heavy process in async.
  async(() -> processor.process(...));
}

Préparation

Rabbit MQ

Installing on Homebrew

$ brew update
$ brew install rabbitmq
$ brew services start rabbitmq 

Spring Boot

Start with Spring Initializr

Maven

Dépendance ajoutée.

pom.xml


<dependency>
  <groupId>org.apache.commons</groupId>
  <artifactId>commons-lang3</artifactId>
  <version>3.5</version>
</dependency>
<dependency>
  <groupId>org.apache.commons</groupId>
  <artifactId>commons-collections4</artifactId>
  <version>4.1</version>
</dependency>

Ajout d'une option de compilation car le nom du paramètre est requis.

pom.xml


<plugin>
  <groupId>org.apache.maven.plugins</groupId>
  <artifactId>maven-compiler-plugin</artifactId>
  <configuration>
    <compilerArgument>-parameters</compilerArgument>
    <fork>true</fork>
  </configuration>
</plugin>

Changements brutaux

Traitement du contenu

Changer avant

[Heavy process]@any

Après le changement

[Heavy process]@any -> (queue) -> [Heavy process]@consumer

Chaque classe

interface

TooHeavyProcessor.java


public interface TooHeavyProcessor {

  void action(User user, Item item, Date date);

  void process(String key, Date date);
}

Classe d'implémentation

Implémentation par défaut

Cette classe fonctionne du côté producteur de MQ, donc je voudrais l'amener du côté consommateur.

DefaultTooHeavyProcessor.java


@Slf4j(topic = "résultat")
public class DefaultTooHeavyProcessor implements TooHeavyProcessor {

  @Override
  public void action(User user, Item item, Date date) {
    //Journalisation pour vérifier le résultat de l'exécution
    log.info("{} actions to {} at {}!!", StringUtils.toString(user), StringUtils.toString(item),
      FORMATTER.format(date));
  }

  @Override
  public void process(String key, Date date) {
    //Journalisation pour vérifier le résultat de l'exécution
    log.info("process for {} at {}", key, FORMATTER.format(date));
  }

}

Je veux envoyer l'appel lui-même. Donc, je suis venu avec SpEL (Spring Expression Language).

processor.action(user, item, date);

cette

producer


String el = "@processor.action(#user, #item, #date)";
Map<String, Object> params = new LinkedHashMap<>();
params.add("user", user);
params.add("item", item);
params.add("date", date);

ELExpression message = new ELExpression(el, params);

//Envoyer à MQ
amqpMessageTemplate.convertAndSend("el.exchange", null, message);

consumer


public void receive(ELMessage message) {
  //Analyser le message SpEL
}

Il devrait être possible de l'exécuter du côté du consommateur.

Implémentation de la classe de transmission MQ

Au lieu d'exécuter DefaultTooHeavyProcessor lorsque TooHeavyProcessor est appelé, créez une classe d'implémentation qui crée SpEL et l'envoie à MQ.

Puisque le comportement dynamique est requis lors de l'appel, implémentez-le avec Proxy. La méthode de création de «Proxy» est la suivante.

Proxy


import org.springframework.util.ClassUtils;

private String beanName = "processor";
private String Class<?> type = TooHeavyProcessor.class;

@Autowired
private AmqpTemplate messageTemplate;

Proxy.newProxyInstance(ClassUtils.getDefaultClassLoader(),
  new Class[] { this.type }, (bean, method, args) -> {
    //Génération SpEL
    //Méthode utilitaire pour générer ELExpression à partir de beanName, méthode et args
    ELExpression el = SpelUtils.createELExpression(this.beanName, method, args);
    //Envoyer à MQ
    this.messageTemplate.convertAndSend("el.exchange", null, el);
    return null;
  });

Je veux l'enregistrer avec Bean, alors générez-le avec FactoryBean

MessagingTooHeavyProcessorProxyFactoryBean.java


@Component
public class MessagingTooHeavyProcessorProxyFactoryBean implements FactoryBean<TooHeavyProcessor>, InitializingBean {

  private final String beanName = "processor";
  private final Class<TooHeavyProcessor> type = TooHeavyProcessor.class;

  private TooHeavyProcessor proxy;

  @Autowired
  private AmqpTemplate messageTemplate;

  @Override
  public void afterPropertiesSet() {
    this.proxy = (TooHeavyProcessor) /*création de proxy*/;
  }

  @Override
  public TooHeavyProcessor getObject() {
    return this.proxy;
  }

  @Override
  public Class<?> getObjectType() {
    return this.type;
  }

  @Override
  public boolean isSingleton() {
    return true;
  }
}

Classe de récepteur MQ

ELReceiver


@Component
public class ELReceiver {

  /**Classe qui analyse et exécute SpEL*/
  @Autowired
  private ELExecutor elExecutor;

  //Définir Exchange et Queue et créer si ce n'est du côté de Lapin
  @RabbitListener(bindings = @QueueBinding(
    value = @Queue(value = "el.queue", durable = "true"),
    exchange = @Exchange(value = "el.exchange", type = ExchangeTypes.FANOUT, durable = "true")))
  public void receive(ELExpression el) {
    //Analyser et exécuter SpEL
    this.elExecutor.execute(el);
  }
}

L'analyse et l'exécution de SpEL sont décrites en détail dans le Document officiel, donc je vais l'omettre.

Seulement comment faire StandardEvaluationContext.

ELExecutor.java


@Autowired
private ApplicationContext appContext;

private StandardEvaluationContext createContext(Map<String, Object> args) {
  StandardEvaluationContext context = new StandardEvaluationContext();

  //Enregistrer les variables
  args.forEach((key, value) -> context.setVariable(key, value));
  //Enregistrez BeanResolver pour que Bean puisse être résolu à partir du nom du Bean
  context.setBeanResolver(
    (_self, beanName) -> this.appContext.getBean(beanName));

  return context;
}

À ce stade, le nom du bean est spécifié en tant que «processor» dans «MessagingTooHeavyProcessorProxyFactoryBean», donc faites correspondre correctement le nom du bean. Je veux également changer le nom du bean dans le profil, alors spécifiez le profil.

MessagingTooHeavyProcessorProxyFactoryBean.java


// "messaging"Enregistrement du bean uniquement lorsque le profil est spécifié
@Profile("messaging")
@Component("processor")
public class MessagingTooHeavyProcessorProxyFactoryBean

DefaultTooHeavyProcessor.java


// "default"Enregistrement du bean uniquement lorsque le profil est spécifié
@Profile("default")
@Component("processor")
public class DefaultTooHeavyProcessor

Préparation au contrôle de fonctionnement

Web API

Préparez une bouche d'API Web pour le déclencheur.

ELDemoController.java


//Nécessaire uniquement du côté producteur
@Profile("producer")
@RestController
public class ELDemoController {

  @Autowired
  private TooHeavyProcessor processor;

  @GetMapping("/action")
  public ActionResult action() {
    User user = this.randomUser();
    Item item = this.randomItem();
    Date date = this.randomDate();

    this.processor.action(user, item, this.randomDate());

    return new ActionResult(user, item, date);
  }

  @GetMapping("/process")
  public ProcessResult process() {
    String key = RandomStringUtils.randomAlphanumeric(10);
    Date date = this.randomDate();

    this.processor.process(key, date);

    return new ProcessResult(key, date);
  }

  //Implémentation de méthode privée
}

Enregistrement

MessagingTooHeavyProcessorProxyFactoryBean.java


@Slf4j(topic = "Expéditeur")
public class MessagingTooHeavyProcessorProxyFactoryBean {
  // ... //
  @Override
  public void afterPropertiesSet() {
    // ... //
    log.info("{}", StringUtils.toString(el));
    this.messageTemplate.convertAndSend("el.exchange", null, el);
  }
  // ... //
}

ELReceiver.java


//Uniquement requis du côté consommateur
@Profile("consumer")
@Slf4j(topic = "Receveur")
public class ELReceiver {
  // ... //
  public void receive(ELExpression el) {
    log.info("{}", el);
    this.elExecutor.execute(el);
  }
  // ... //
}

DefaultTooHeavyProcessor.java


@Slf4j(topic = "résultat")
public class DefaultTooHeavyProcessor {
  private static final FastDateFormat FORMATTER = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss.SSS");

  @Override
  public void action(User user, Item item, Date date) {
    log.info("{} actions to {} at {}!!", StringUtils.toString(user), StringUtils.toString(item),
      FORMATTER.format(date));
  }

  @Override
  public void process(String key, Date date) {
    log.info("process for {} at {}", key, FORMATTER.format(date));
  }
}

Paramètres des propriétés de l'application

application.yaml


spring:
  jackson:
    #Format de date et d'heure JSON pour une visualisation facile
    date-format: yyyy-MM-dd HH:mm:ss.SSS

---
spring:
  profiles: producer
server:
    port: 8080

---
spring:
  profiles: consumer
server:
    port: 8181

Contrôle de fonctionnement

Démarrage normal

Confirmez l'opération qui exécute simplement le premier traitement lourd tel quel.

$ mvn spring-boot:run -Dspring.profiles.active="default,producer"
$ curl http://localhost:8080/process --silent | jq "."
{
  "key": "39iYWmzDpn",
  "date": "2017-07-03 00:05:31.211"
}

Le journal suivant s'affiche dans l'application démarrée.

[résultat] process for 39iYWmzDpn at 2017-07-03 00:05:31.211

Ensuite, modifiez le profil et laissez-le être exécuté du côté consommateur de MQ.

Commencer à traiter via MQ

Démarrer le serveur Rabbit MQ

$ brew services start rabbitmq
==> Successfully started `rabbitmq` (label: homebrew.mxcl.rabbitmq)

Confirmation de l'écran de gestion

Accédez à http: // localhost: 15672 /

スクリーンショット 2017-02-05 17.20.28.png

Confirmation d'échange et de file d'attente

Il n'y a encore rien.

スクリーンショット 2017-02-05 17.22.51.png

スクリーンショット 2017-02-05 17.24.13.png

Démarrer l'application côté récepteur

$ mvn spring-boot:run -Dspring.profiles.active="consumer,default"

Confirmation d'échange et de file d'attente

Dans le cas de Spring-Boot, il semble que l'enregistrement Bean de Rabbit Admin se fasse automatiquement.

Exchange スクリーンショット 2017-02-05 17.38.00.png

Queue スクリーンショット 2017-02-05 17.37.04.png

Lancer l'application côté consommateur

Ensuite, commencez par le profil «messagerie» au lieu de «par défaut».

$ mvn spring-boot:run -Dspring.profiles.active="messaging,producer"

Vérification

$ curl http://localhost:8080/process --silent | jq "."
{
  "key": "ar9VXblj5S",
  "date": "2017-08-27 11:55:29.319"
}

Journal de l'expéditeur


[Expéditeur] ELExpression:{
  "expression" : "@processor.process(#key, #date)",
  "args" : {
    "key" : "ar9VXblj5S",
    "date" : "2017-08-27 11:55:29.319"
  }
}

Journal de réception


[Receveur] ELExpression:{
  "expression" : "@processor.process(#key, #date)",
  "args" : {
    "key" : "ar9VXblj5S",
    "date" : "2017-08-27 11:55:29.319"
  }
}
[résultat] process for ar9VXblj5S at 2017-08-27 11:55:29.319

Résumé

SpEL pratique. L'application est répertoriée sur GitHub.

Recommended Posts

Essayez de passer au traitement asynchrone via MQ sans changer le code
Essayez de changer le fichier .erb en .slim
[Java] Essayez de résoudre le problème de Fizz Buzz en utilisant un traitement récursif
J'ai essayé de migrer le traitement vers VS Code
[Traitement × Java] Comment utiliser la boucle
[Traitement × Java] Comment utiliser la classe
[Traitement × Java] Comment utiliser la fonction
Comment implémenter le traitement asynchrone dans Outsystems
Pourquoi le code était-il pénible à lire?