Threads waiting for read () on a Java Socket InputStream are waiting for I / O in blocking mode. I tried to find out how to release this, that is, to release the read () wait and proceed with the process by throwing an exception.
--Conclusion: Interrupting the thread waiting for read () using Thread.interrupt () had no effect and could be released by calling Socket.close (). --There are other ways to free the read () wait, see the Socket.getInputStream () JavaDoc for more information:
Below is an experimental memo.
First, create a black hole-like TCP server that keeps reading the data received from the client socket and does not return any data. Example: https://github.com/msakamoto-sf/javasnack/blob/master/src/main/java/javasnack/tool/BlackholeTcpServer.java
Next, create a task that connects to the above TCP server in blocking mode and waits for read () by implementing the Runnable
interface. At that time, add a method to interrupt the thread from a thread (= external) other than the thread executing the task, or to close the Socket waiting for read ().
Example:
class BlockingIOTask implements Runnable {
final int no;
final int remotePort;
volatile Socket clientSocket = null;
volatile Thread currentThread = null;
volatile boolean isInterrupted = false;
volatile boolean done = false;
public BlockingIOTask(final int no, final int remotePort) {
this.no = no;
this.remotePort = remotePort;
}
/**
*Read from the outside()A method for interrupting a waiting thread
*/
public void interruptThread() {
if (Objects.nonNull(currentThread)) {
currentThread.interrupt();
}
}
/**
*Read from the outside()Close the waiting Socket()Method to
*/
public void closeSocket() {
if (Objects.nonNull(clientSocket) && clientSocket.isConnected()) {
try {
clientSocket.close();
} catch (IOException ignore) {
}
}
}
@Override
public void run() {
//Save the thread reference running this task
currentThread = Thread.currentThread();
InetSocketAddress connectTo = new InetSocketAddress("127.0.0.1", this.remotePort);
clientSocket = new Socket();
try {
clientSocket.connect(connectTo);
//For the time being, send appropriate data.
OutputStream out = clientSocket.getOutputStream();
out.write(new byte[] { 0x00, 0x01, 0x02 });
out.write(new byte[] { 0x03, 0x04, 0x05 });
out.flush();
InputStream in = clientSocket.getInputStream();
// read()Start waiting
in.read();
} catch (IOException e) {
//Assertion for TestNG for later embedding in test code.
assertTrue(e instanceof SocketException);
assertEquals(e.getMessage(), "Socket closed");
} finally {
if (clientSocket.isConnected()) {
try {
clientSocket.close();
} catch (IOException ignore) {
}
}
}
this.isInterrupted = currentThread.isInterrupted();
this.done = true;
}
}
Check the actual behavior by embedding it in the test code (using TestNG). See the comments in the code for a detailed explanation.
public class TestGracefulShutdownBlockingIOTaskDemo {
BlackholeTcpServer blackholeTcpServer = null;
int blackholeTcpServerPort = 0;
@BeforeClass
public void beforeClass() throws IOException {
//Before executing this test class, start the black hole TCP server and
//Make a note of the randomly assigned listening port number.
BlackholeTcpServer server = new BlackholeTcpServer();
this.blackholeTcpServerPort = server.start();
}
@AfterClass
public void afterClass() {
if (Objects.nonNull(this.blackholeTcpServer)) {
this.blackholeTcpServer.stop();
}
}
@Test
void testGracefulShutdownBlockingIOTaskDemo() throws InterruptedException {
//Start BlockingIOTask for 4 threads
final int NUM = 4;
ExecutorService es = Executors.newFixedThreadPool(NUM);
BlockingIOTask tasks[] = new BlockingIOTask[NUM];
for (int i = 0; i < NUM; i++) {
tasks[i] = new BlockingIOTask(i, this.blackholeTcpServerPort);
es.submit(tasks[i]);
}
// ExecutorService.shutdown()First, submit a new task()Prohibit(=Closed the entrance)。
es.shutdown();
//Wait 50ms and check if the task is finished.
//Of course, read()The task is not finished because it is blocked waiting.
assertFalse(es.awaitTermination(50, TimeUnit.MILLISECONDS));
// ExecutorService.shutdownNow()To call. Interrupt the thread that is executing the task.
List<Runnable> l = es.shutdownNow();
assertEquals(l.size(), 0); //Since no new task has been submitted, the number of tasks waiting to be executed will be 0.
// ExecutorService.awaitTermination()Is returning false
// read()It can be seen that the waiting blocking is not canceled by the thread interrupt.
assertFalse(es.awaitTermination(50, TimeUnit.MILLISECONDS));
//Try Thread.interrupt()Is called manually.
for (int i = 0; i < NUM; i++) {
tasks[i].interruptThread();
}
//After all, read with thread interrupt()Wait blocking is not released.
assertFalse(es.awaitTermination(50, TimeUnit.MILLISECONDS));
// Socket.close()Try to call from the outside.
for (int i = 0; i < NUM; i++) {
tasks[i].closeSocket();
}
// ExecutorService.awaitTermination()Returns true, so
//You can see that all the tasks have been completed.
assertTrue(es.awaitTermination(50, TimeUnit.MILLISECONDS));
//Actually check the flag status of the task. First, the done flags are all true.
assertTrue(tasks[0].done);
assertTrue(tasks[1].done);
assertTrue(tasks[2].done);
assertTrue(tasks[3].done);
//Further Thread.isInterrupted()All the flags that saved the are also true,
//interrupt was accepted, but read()It turns out that the wait was not released.
assertTrue(tasks[0].isInterrupted);
assertTrue(tasks[1].isInterrupted);
assertTrue(tasks[2].isInterrupted);
assertTrue(tasks[3].isInterrupted);
}
}
This is a simple experiment memo.