[JAVA] Comment accéder directement à Socket avec la fonction TCP de Spring Integration

Lors de la création d'un client TCP à l'aide de la fonction TCP de Spring Integration, il y avait des cas (parfois) où je voulais entrer en contact direct avec Socket, alors notez comment établir un contact direct avec Socket.

Le cas où je voulais toucher directement Socket est ... FIN pour une déconnexion normale, et RST pour une déconnexion anormale (lorsqu'une erreur se produit)! C'était une époque où il était nécessaire de répondre à l'exigence de connexion. Dans la fonction TCP de Spring Integration, il est possible de spécifier s'il faut utiliser FIN ou RST comme méthode de déconnexion dans le paramètre Connection Factory, mais il n'a pas été possible de basculer en fonction des conditions (dans la plage examinée ...). ..

Utilisation de TcpSocketSupport

Spring Integration [TcpSocketSupport](https: // docs.) Pour prendre en charge l'application d'un traitement arbitraire aux Sockets générés côté framework (Sockets associés à SocketChannels dans le cas de NIO). spring.io/spring-integration/docs/5.1.2.RELEASE/reference/htmlsingle/#_the_literal_tcpsocketsupport_literal_strategy_interface) est fourni. Par défaut, la classe DefaultTcpSocketSupport est appliquée, mais le processus qui change l'état de Socket n'est pas exécuté.

Application d'une implémentation qui RST lorsqu'une exception se produit

La fonction TCP de Spring Integration prend en charge un mécanisme de gestion des événements (connexion, déconnexion, détection d'erreur, etc.) qui se produisent lors du traitement dans Spring Integration, et le code introduit cette fois gère ces événements. , J'ai essayé de faire en sorte que le code se déconnecte après avoir détecté une exception avec RST. Il convient de noter que le code introduit ici n'est pas le code appliqué (ou prévu d'être) à l'application réelle, il ne peut donc pas être utilisé tel quel. (Ce n'est qu'un échantillon)

package com.example.demo;

import java.net.Socket;
import java.net.SocketException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Bean;
import org.springframework.context.event.EventListener;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Transformers;
import org.springframework.integration.ip.dsl.Tcp;
import org.springframework.integration.ip.tcp.connection.*;
import org.springframework.integration.ip.tcp.serializer.SoftEndOfStreamException;
import org.springframework.integration.ip.tcp.serializer.TcpCodecs;

@SpringBootApplication
public class SprIntDemoApplication {

  public static void main(String[] args) {
    SpringApplication.run(SprIntDemoApplication.class, args);
  }

  // TCP Server
  @Bean
  public IntegrationFlow integrationInboundFlow() {
    return IntegrationFlows.from(Tcp.inboundGateway(Tcp.nioServer(5555)
        .serializer(TcpCodecs.crlf())
        .deserializer(TcpCodecs.crlf())
        .get()))
        .transform(Transformers.objectToString()) // byte[] -> String
        .transform(m -> m) //Répondre au message reçu tel quel
        .get();
  }

  // TCP Client
  @Bean
  public IntegrationFlow integrationOutboundFlow(ApplicationEventPublisher publisher) {
    AbstractClientConnectionFactory factory = Tcp.nioClient("localhost", 5555)
        .serializer(TcpCodecs.crlf())
        .deserializer(TcpCodecs.crlf())
        .tcpSocketSupport(socketManager()) //Appliquer la classe de support pour gérer le socket généré
        .get();
    factory.setApplicationEventPublisher(publisher);
    return flow -> flow.handle(Tcp.outboundGateway(factory))
        .transform(Transformers.objectToString());  // byte[] -> String
  }

  @Bean
  public SocketManager socketManager() {
    return new SocketManager();
  }

  @Bean
  public MessagingTemplate messagingTemplate() {
    return new MessagingTemplate();
  }

  static class SocketManager extends DefaultTcpSocketSupport {
    private final Map<Integer, Socket> sockets = new ConcurrentHashMap<>();

    @Override
    public void postProcessSocket(Socket socket) {
      super.postProcessSocket(socket);
      sockets.put(socket.getLocalPort(), socket); //Enregistrez-le en interne afin de pouvoir accéder à Socket lorsqu'une erreur est détectée.
    }

    @EventListener
    public void handleTcpConnectionExceptionEvent(TcpConnectionExceptionEvent event) {
      try {
        int localPort = ((TcpConnection) event.getSource()).getSocketInfo().getLocalPort();
        Socket socket = sockets.get(localPort);
        if (!socket.isClosed() && !(event.getCause() instanceof SoftEndOfStreamException)) {
          sockets.get(localPort).setSoLinger(true, 0); //Configuré pour se déconnecter par RST
        }
      } catch (SocketException e) {
        // ignore
      }
    }

    @EventListener
    public void handleTcpConnectionCloseEvent(TcpConnectionCloseEvent event) {
      sockets.remove(((TcpConnection) event.getSource()).getSocketInfo().getLocalPort()); //Nettoyez les prises qui ne sont plus nécessaires après la déconnexion
    }

  }

}
package com.example.demo;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpIntDemoApplicationTests {

  @Autowired
  MessagingTemplate template;

  @Test
  public void contextLoads() {
    Message<?> reply = template.sendAndReceive("integrationOutboundFlow.input",
        MessageBuilder.withPayload("hello!").build());
    System.out.println("reply: " + reply);
  }

}

Au fait ... Si vous exécutez le code ci-dessus tel quel, aucune erreur (exception) ne se produira pendant le traitement, donc pour générer une erreur, il est nécessaire de déconnecter le Socket pendant le traitement côté serveur, mais ce livre J'omettrai cette partie dans l'entrée.

Résumé

Quand j'ai lu le document, j'ai l'impression que je l'utilise au-delà du rôle de TcpSocketSupport, mais il était utile que le mécanisme qui me permet de toucher Socket soit supporté.

Document de référence

Recommended Posts

Comment accéder directement à Socket avec la fonction TCP de Spring Integration
Comment vérifier avant d'envoyer un message au serveur avec Spring Integration
J'ai examiné le flux de communication TCP avec Spring Integration (édition client)
J'ai examiné le flux de communication TCP avec Spring Integration (édition serveur)
Comment appliquer HandlerInterceptor à la passerelle entrante http de Spring Integration
Accédez au h2db intégré de Spring Boot avec jdbcTemplate
Comment démarrer par environnement avec Spring Boot de Maven
Implémentons une fonction pour limiter le nombre d'accès à l'API avec SpringBoot + Redis
Comment ajouter la fonction de suppression
Comment utiliser MinIO avec la même fonction que S3 Utiliser docker-compose
Comment réaliser un téléchargement de fichiers volumineux avec Rest Template of Spring
Comment déterminer le nombre de parallèles
Comment trier une liste de SelectItems
Comment implémenter TextInputLayout avec la fonction de validation
[Traitement × Java] Comment utiliser la fonction
Comment appliquer immédiatement les modifications de Thymeleaf au navigateur avec #Spring Boot + maven
Comment décorer CSS sur les boutons radio de rails6 form_with (helper)
[Spring Boot] J'ai étudié comment implémenter le post-traitement de la demande reçue.
Comment lire le corps de la requête plusieurs fois avec Spring Boot + Spring Security
Comment convertir un tableau de chaînes en un tableau d'objets avec l'API Stream
J'ai essayé de visualiser l'accès de Lambda → Athena avec AWS X-Ray
[swift5] Comment changer la couleur de TabBar ou la couleur de l'élément de TabBar avec le code
Comment mettre en œuvre la fonction d'authentification du courrier au moment de l'inscription de l'utilisateur
Comment trouver la cause de l'erreur Ruby
Imprimez des formulaires directement sur l'imprimante avec Jasper Reports
Personnalisez la répartition du contenu de Recyclerview
Comment utiliser MyBatis2 (iBatis) avec Spring Boot 1.4 (Spring 4)
[Rails6] Comment connecter la fonction d'affichage générée par Scaffold avec la fonction utilisateur générée par devise
Comment utiliser h2db intégré avec Spring Boot
Comment vérifier la dernière version de io.spring.platform pour l'écriture dans pom.xml de Spring (STS)
L'histoire de la montée de la série Spring Boot 1.5 à la série 2.1
Comment mettre en œuvre la fonction de chapelure avec Gretel
Essayez d'implémenter la fonction de connexion avec Spring Boot
Comment obtenir le jour d'aujourd'hui
[Pour les débutants] Comment implémenter la fonction de suppression
Sortie de la façon d'utiliser la méthode slice
Paramètres du gestionnaire de ressources lors de la livraison du SPA avec la fonction de ressource statique de Spring Boot
Comment afficher le résultat du remplissage du formulaire
Comment définir des variables d'environnement dans le fichier de propriétés de l'application Spring Boot
[Spring Boot] Comment se référer au fichier de propriétés
[Introduction à Spring Boot] Fonction d'authentification avec Spring Security
Comment créer un serveur Jenkins avec un conteneur Docker sur CentOS 7 de VirtualBox et accéder au serveur Jenkins à partir d'un PC local
Si vous utilisez SQLite avec VSCode, utilisez l'extension (comment voir le fichier binaire de sqlite3)
Comment utiliser git avec la puissance de jgit dans un environnement sans commandes git
Comment demander en passant un tableau à une requête avec le client HTTP de Ruby
[Docker] Comment voir le contenu des volumes. Démarrez un conteneur avec les privilèges root.
Comment définir une limite de relance pour sidekiq et notifier les files d'attente mortes avec Slack
[Explication approximative] Comment séparer le fonctionnement de l'environnement de production et de l'environnement de développement avec Rails
Résumé de l'utilisation du jeu de proxy dans IE lors de la connexion avec Java
[Java] Comment obtenir l'URL de la source de transition
Comment changer l'action avec plusieurs boutons d'envoi
Comment supprimer / mettre à jour le champ de liste de OneToMany
Comment écrire Scala du point de vue de Java
[Java] Comment omettre l'injection de constructeur de ressort avec Lombok
Comment créer une fonction de messagerie LINE avec Ruby