Beim Erstellen eines TCP-Clients mit der TCP-Funktion von Spring Integration gab es (manchmal) Fälle, in denen ich direkten Kontakt mit Socket herstellen wollte. Notieren Sie sich daher, wie Sie direkten Kontakt mit Socket herstellen können.
Der Fall, in dem ich Socket direkt berühren wollte, ist ... FIN für normale Trennung und RST für abnormale Trennung (wenn ein Fehler auftritt)! Es war eine Zeit, in der es notwendig war, die Verbindungsanforderungen zu erfüllen. In der TCP-Funktion von Spring Integration kann angegeben werden, ob FIN oder RST als Trennmethode in der Einstellung von Connection Factory verwendet werden soll, es war jedoch nicht möglich, abhängig von den Bedingungen (in dem untersuchten Bereich ...) zu wechseln. ..
Spring Integration [TcpSocketSupport
](https: // docs.) Zur Unterstützung der Anwendung einer beliebigen Verarbeitung auf Sockets, die auf der Framework-Seite generiert wurden (Sockets, die SocketChannels im Fall von NIO zugeordnet sind). spring.io/spring-integration/docs/5.1.2.RELEASE/reference/htmlsingle/#_the_literal_tcpsocketsupport_literal_strategy_interface) wird bereitgestellt. Standardmäßig wird die Klasse "DefaultTcpSocketSupport" angewendet, aber der Prozess, der den Status von Socket ändert, wird nicht ausgeführt.
Die TCP-Funktion von Spring Integration unterstützt einen Mechanismus zur Behandlung von Ereignissen (Verbindung, Trennung, Fehlererkennung usw.), die bei der Verarbeitung in Spring Integration auftreten, und der diesmal eingeführte Code behandelt diese Ereignisse. Ich habe versucht, den Code zu trennen, nachdem eine Ausnahme mit RST festgestellt wurde. Es ist zu beachten, dass der hier eingeführte Code nicht der Code ist, der auf die tatsächliche Anwendung angewendet wird (oder geplant ist), sodass er nicht so verwendet werden kann, wie er ist. (Es ist nur eine Probe)
package com.example.demo;
import java.net.Socket;
import java.net.SocketException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Bean;
import org.springframework.context.event.EventListener;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Transformers;
import org.springframework.integration.ip.dsl.Tcp;
import org.springframework.integration.ip.tcp.connection.*;
import org.springframework.integration.ip.tcp.serializer.SoftEndOfStreamException;
import org.springframework.integration.ip.tcp.serializer.TcpCodecs;
@SpringBootApplication
public class SprIntDemoApplication {
public static void main(String[] args) {
SpringApplication.run(SprIntDemoApplication.class, args);
}
// TCP Server
@Bean
public IntegrationFlow integrationInboundFlow() {
return IntegrationFlows.from(Tcp.inboundGateway(Tcp.nioServer(5555)
.serializer(TcpCodecs.crlf())
.deserializer(TcpCodecs.crlf())
.get()))
.transform(Transformers.objectToString()) // byte[] -> String
.transform(m -> m) //Beantworten Sie die empfangene Nachricht so wie sie ist
.get();
}
// TCP Client
@Bean
public IntegrationFlow integrationOutboundFlow(ApplicationEventPublisher publisher) {
AbstractClientConnectionFactory factory = Tcp.nioClient("localhost", 5555)
.serializer(TcpCodecs.crlf())
.deserializer(TcpCodecs.crlf())
.tcpSocketSupport(socketManager()) //Wenden Sie die Supportklasse an, um den generierten Socket zu verwalten
.get();
factory.setApplicationEventPublisher(publisher);
return flow -> flow.handle(Tcp.outboundGateway(factory))
.transform(Transformers.objectToString()); // byte[] -> String
}
@Bean
public SocketManager socketManager() {
return new SocketManager();
}
@Bean
public MessagingTemplate messagingTemplate() {
return new MessagingTemplate();
}
static class SocketManager extends DefaultTcpSocketSupport {
private final Map<Integer, Socket> sockets = new ConcurrentHashMap<>();
@Override
public void postProcessSocket(Socket socket) {
super.postProcessSocket(socket);
sockets.put(socket.getLocalPort(), socket); //Speichern Sie es intern, damit Sie auf Socket zugreifen können, wenn ein Fehler erkannt wird.
}
@EventListener
public void handleTcpConnectionExceptionEvent(TcpConnectionExceptionEvent event) {
try {
int localPort = ((TcpConnection) event.getSource()).getSocketInfo().getLocalPort();
Socket socket = sockets.get(localPort);
if (!socket.isClosed() && !(event.getCause() instanceof SoftEndOfStreamException)) {
sockets.get(localPort).setSoLinger(true, 0); //Auf RST einstellen
}
} catch (SocketException e) {
// ignore
}
}
@EventListener
public void handleTcpConnectionCloseEvent(TcpConnectionCloseEvent event) {
sockets.remove(((TcpConnection) event.getSource()).getSocketInfo().getLocalPort()); //Reinigen Sie Steckdosen, die nach dem Trennen nicht mehr benötigt werden
}
}
}
package com.example.demo;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpIntDemoApplicationTests {
@Autowired
MessagingTemplate template;
@Test
public void contextLoads() {
Message<?> reply = template.sendAndReceive("integrationOutboundFlow.input",
MessageBuilder.withPayload("hello!").build());
System.out.println("reply: " + reply);
}
}
Übrigens ... Wenn Sie den obigen Code so ausführen, wie er ist, tritt während der Verarbeitung kein Fehler (Ausnahme) auf. Um einen Fehler zu generieren, müssen Sie den Socket während der Verarbeitung auf der Serverseite trennen, aber dieses Buch Ich werde diesen Teil im Eintrag weglassen.
Wenn ich das Dokument lese, habe ich das Gefühl, dass ich es über die Rolle von "TcpSocketSupport" hinaus verwende, aber es war hilfreich, dass der Mechanismus, mit dem ich "Socket" berühren kann, unterstützt wurde.
Recommended Posts