Lors de la création d'un client TCP à l'aide de la fonction TCP de Spring Integration, il y avait des cas (parfois) où je voulais entrer en contact direct avec Socket, alors notez comment établir un contact direct avec Socket.
Le cas où je voulais toucher directement Socket est ... FIN pour une déconnexion normale, et RST pour une déconnexion anormale (lorsqu'une erreur se produit)! C'était une époque où il était nécessaire de répondre à l'exigence de connexion. Dans la fonction TCP de Spring Integration, il est possible de spécifier s'il faut utiliser FIN ou RST comme méthode de déconnexion dans le paramètre Connection Factory, mais il n'a pas été possible de basculer en fonction des conditions (dans la plage examinée ...). ..
Spring Integration [TcpSocketSupport
](https: // docs.) Pour prendre en charge l'application d'un traitement arbitraire aux Sockets générés côté framework (Sockets associés à SocketChannels dans le cas de NIO). spring.io/spring-integration/docs/5.1.2.RELEASE/reference/htmlsingle/#_the_literal_tcpsocketsupport_literal_strategy_interface) est fourni. Par défaut, la classe DefaultTcpSocketSupport
est appliquée, mais le processus qui change l'état de Socket n'est pas exécuté.
La fonction TCP de Spring Integration prend en charge un mécanisme de gestion des événements (connexion, déconnexion, détection d'erreur, etc.) qui se produisent lors du traitement dans Spring Integration, et le code introduit cette fois gère ces événements. , J'ai essayé de faire en sorte que le code se déconnecte après avoir détecté une exception avec RST. Il convient de noter que le code introduit ici n'est pas le code appliqué (ou prévu d'être) à l'application réelle, il ne peut donc pas être utilisé tel quel. (Ce n'est qu'un échantillon)
package com.example.demo;
import java.net.Socket;
import java.net.SocketException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Bean;
import org.springframework.context.event.EventListener;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Transformers;
import org.springframework.integration.ip.dsl.Tcp;
import org.springframework.integration.ip.tcp.connection.*;
import org.springframework.integration.ip.tcp.serializer.SoftEndOfStreamException;
import org.springframework.integration.ip.tcp.serializer.TcpCodecs;
@SpringBootApplication
public class SprIntDemoApplication {
public static void main(String[] args) {
SpringApplication.run(SprIntDemoApplication.class, args);
}
// TCP Server
@Bean
public IntegrationFlow integrationInboundFlow() {
return IntegrationFlows.from(Tcp.inboundGateway(Tcp.nioServer(5555)
.serializer(TcpCodecs.crlf())
.deserializer(TcpCodecs.crlf())
.get()))
.transform(Transformers.objectToString()) // byte[] -> String
.transform(m -> m) //Répondre au message reçu tel quel
.get();
}
// TCP Client
@Bean
public IntegrationFlow integrationOutboundFlow(ApplicationEventPublisher publisher) {
AbstractClientConnectionFactory factory = Tcp.nioClient("localhost", 5555)
.serializer(TcpCodecs.crlf())
.deserializer(TcpCodecs.crlf())
.tcpSocketSupport(socketManager()) //Appliquer la classe de support pour gérer le socket généré
.get();
factory.setApplicationEventPublisher(publisher);
return flow -> flow.handle(Tcp.outboundGateway(factory))
.transform(Transformers.objectToString()); // byte[] -> String
}
@Bean
public SocketManager socketManager() {
return new SocketManager();
}
@Bean
public MessagingTemplate messagingTemplate() {
return new MessagingTemplate();
}
static class SocketManager extends DefaultTcpSocketSupport {
private final Map<Integer, Socket> sockets = new ConcurrentHashMap<>();
@Override
public void postProcessSocket(Socket socket) {
super.postProcessSocket(socket);
sockets.put(socket.getLocalPort(), socket); //Enregistrez-le en interne afin de pouvoir accéder à Socket lorsqu'une erreur est détectée.
}
@EventListener
public void handleTcpConnectionExceptionEvent(TcpConnectionExceptionEvent event) {
try {
int localPort = ((TcpConnection) event.getSource()).getSocketInfo().getLocalPort();
Socket socket = sockets.get(localPort);
if (!socket.isClosed() && !(event.getCause() instanceof SoftEndOfStreamException)) {
sockets.get(localPort).setSoLinger(true, 0); //Configuré pour se déconnecter par RST
}
} catch (SocketException e) {
// ignore
}
}
@EventListener
public void handleTcpConnectionCloseEvent(TcpConnectionCloseEvent event) {
sockets.remove(((TcpConnection) event.getSource()).getSocketInfo().getLocalPort()); //Nettoyez les prises qui ne sont plus nécessaires après la déconnexion
}
}
}
package com.example.demo;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpIntDemoApplicationTests {
@Autowired
MessagingTemplate template;
@Test
public void contextLoads() {
Message<?> reply = template.sendAndReceive("integrationOutboundFlow.input",
MessageBuilder.withPayload("hello!").build());
System.out.println("reply: " + reply);
}
}
Au fait ... Si vous exécutez le code ci-dessus tel quel, aucune erreur (exception) ne se produira pendant le traitement, donc pour générer une erreur, il est nécessaire de déconnecter le Socket pendant le traitement côté serveur, mais ce livre J'omettrai cette partie dans l'entrée.
Quand j'ai lu le document, j'ai l'impression que je l'utilise au-delà du rôle de TcpSocketSupport
, mais il était utile que le mécanisme qui me permet de toucher Socket
soit supporté.