When creating a TCP client using the TCP function of Spring Integration, there were cases (sometimes) where I wanted to make direct contact with Socket, so make a note of how to make direct contact with Socket.
The case where I wanted to make direct contact with Socket is ... FIN for normal disconnection, and RST for abnormal disconnection (when some error occurs)! It was a time when it was necessary to meet the connection requirement. In the TCP function of Spring Integration, it is possible to specify whether to use FIN or RST as the disconnection method in the Connection Factory setting, but it was not possible to switch depending on the conditions (in the range examined ...). ..
Spring Integration TcpSocketSupport
To support applying arbitrary processing to the Socket generated on the framework side (Socket linked to SocketChannel in the case of NIO). spring.io/spring-integration/docs/5.1.2.RELEASE/reference/htmlsingle/#_the_literal_tcpsocketsupport_literal_strategy_interface) is provided. By default, the class DefaultTcpSocketSupport
is applied, but the process that changes the state of Socket is not executed.
The TCP function of Spring Integration supports a mechanism for handling events (connection, disconnection, error detection, etc.) that occur during processing within Spring Integration, and the code introduced here handles these events. , I tried to make the code to disconnect after detecting an exception with RST. It should be noted that the code introduced here is not the code applied (or planned to be) to the actual application, so it cannot be used as it is. (It is just a sample)
package com.example.demo;
import java.net.Socket;
import java.net.SocketException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Bean;
import org.springframework.context.event.EventListener;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Transformers;
import org.springframework.integration.ip.dsl.Tcp;
import org.springframework.integration.ip.tcp.connection.*;
import org.springframework.integration.ip.tcp.serializer.SoftEndOfStreamException;
import org.springframework.integration.ip.tcp.serializer.TcpCodecs;
@SpringBootApplication
public class SprIntDemoApplication {
public static void main(String[] args) {
SpringApplication.run(SprIntDemoApplication.class, args);
}
// TCP Server
@Bean
public IntegrationFlow integrationInboundFlow() {
return IntegrationFlows.from(Tcp.inboundGateway(Tcp.nioServer(5555)
.serializer(TcpCodecs.crlf())
.deserializer(TcpCodecs.crlf())
.get()))
.transform(Transformers.objectToString()) // byte[] -> String
.transform(m -> m) //Reply the received message as it is
.get();
}
// TCP Client
@Bean
public IntegrationFlow integrationOutboundFlow(ApplicationEventPublisher publisher) {
AbstractClientConnectionFactory factory = Tcp.nioClient("localhost", 5555)
.serializer(TcpCodecs.crlf())
.deserializer(TcpCodecs.crlf())
.tcpSocketSupport(socketManager()) //Apply support class to manage generated Socket
.get();
factory.setApplicationEventPublisher(publisher);
return flow -> flow.handle(Tcp.outboundGateway(factory))
.transform(Transformers.objectToString()); // byte[] -> String
}
@Bean
public SocketManager socketManager() {
return new SocketManager();
}
@Bean
public MessagingTemplate messagingTemplate() {
return new MessagingTemplate();
}
static class SocketManager extends DefaultTcpSocketSupport {
private final Map<Integer, Socket> sockets = new ConcurrentHashMap<>();
@Override
public void postProcessSocket(Socket socket) {
super.postProcessSocket(socket);
sockets.put(socket.getLocalPort(), socket); //Save it internally so that you can access the Socket when an abnormality is detected.
}
@EventListener
public void handleTcpConnectionExceptionEvent(TcpConnectionExceptionEvent event) {
try {
int localPort = ((TcpConnection) event.getSource()).getSocketInfo().getLocalPort();
Socket socket = sockets.get(localPort);
if (!socket.isClosed() && !(event.getCause() instanceof SoftEndOfStreamException)) {
sockets.get(localPort).setSoLinger(true, 0); //Set to disconnect by RST
}
} catch (SocketException e) {
// ignore
}
}
@EventListener
public void handleTcpConnectionCloseEvent(TcpConnectionCloseEvent event) {
sockets.remove(((TcpConnection) event.getSource()).getSocketInfo().getLocalPort()); //Clean Sockets that are no longer needed after disconnection
}
}
}
package com.example.demo;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpIntDemoApplicationTests {
@Autowired
MessagingTemplate template;
@Test
public void contextLoads() {
Message<?> reply = template.sendAndReceive("integrationOutboundFlow.input",
MessageBuilder.withPayload("hello!").build());
System.out.println("reply: " + reply);
}
}
By the way ... If you execute the above code as it is, no error (exception) will occur during processing, so in order to generate an error, it is necessary to disconnect the Socket during processing on the server side, but this book I will omit that part in the entry.
When I read the document, I feel that I am using it beyond the role of TcpSocketSupport
, but it was helpful that the mechanism that allows me to touch Socket
was supported.
Recommended Posts