Study Note / Waiting for read () in Java Socket InputStream is not released by Thread.interrupt ()

Threads waiting for read () on a Java Socket InputStream are waiting for I / O in blocking mode. I tried to find out how to release this, that is, to release the read () wait and proceed with the process by throwing an exception.

--Conclusion: Interrupting the thread waiting for read () using Thread.interrupt () had no effect and could be released by calling Socket.close (). --There are other ways to free the read () wait, see the Socket.getInputStream () JavaDoc for more information:

Below is an experimental memo.

First, create a black hole-like TCP server that keeps reading the data received from the client socket and does not return any data. Example: https://github.com/msakamoto-sf/javasnack/blob/master/src/main/java/javasnack/tool/BlackholeTcpServer.java

Next, create a task that connects to the above TCP server in blocking mode and waits for read () by implementing the Runnable interface. At that time, add a method to interrupt the thread from a thread (= external) other than the thread executing the task, or to close the Socket waiting for read (). Example:

class BlockingIOTask implements Runnable {
    final int no;
    final int remotePort;
    volatile Socket clientSocket = null;
    volatile Thread currentThread = null;
    volatile boolean isInterrupted = false;
    volatile boolean done = false;

    public BlockingIOTask(final int no, final int remotePort) {
        this.no = no;
        this.remotePort = remotePort;
    }

    /**
     *Read from the outside()A method for interrupting a waiting thread
     */
    public void interruptThread() {
        if (Objects.nonNull(currentThread)) {
            currentThread.interrupt();
        }
    }

    /**
     *Read from the outside()Close the waiting Socket()Method to
     */
    public void closeSocket() {
        if (Objects.nonNull(clientSocket) && clientSocket.isConnected()) {
            try {
                clientSocket.close();
            } catch (IOException ignore) {
            }
        }
    }

    @Override
    public void run() {
        //Save the thread reference running this task
        currentThread = Thread.currentThread();
        InetSocketAddress connectTo = new InetSocketAddress("127.0.0.1", this.remotePort);
        clientSocket = new Socket();
        try {
            clientSocket.connect(connectTo);
            //For the time being, send appropriate data.
            OutputStream out = clientSocket.getOutputStream();
            out.write(new byte[] { 0x00, 0x01, 0x02 });
            out.write(new byte[] { 0x03, 0x04, 0x05 });
            out.flush();
            InputStream in = clientSocket.getInputStream();
            // read()Start waiting
            in.read();
        } catch (IOException e) {
            //Assertion for TestNG for later embedding in test code.
            assertTrue(e instanceof SocketException);
            assertEquals(e.getMessage(), "Socket closed");
        } finally {
            if (clientSocket.isConnected()) {
                try {
                    clientSocket.close();
                } catch (IOException ignore) {
                }
            }
        }
        this.isInterrupted = currentThread.isInterrupted();
        this.done = true;
    }
}

Check the actual behavior by embedding it in the test code (using TestNG). See the comments in the code for a detailed explanation.

public class TestGracefulShutdownBlockingIOTaskDemo {

    BlackholeTcpServer blackholeTcpServer = null;
    int blackholeTcpServerPort = 0;

    @BeforeClass
    public void beforeClass() throws IOException {
        //Before executing this test class, start the black hole TCP server and
        //Make a note of the randomly assigned listening port number.
        BlackholeTcpServer server = new BlackholeTcpServer();
        this.blackholeTcpServerPort = server.start();
    }

    @AfterClass
    public void afterClass() {
        if (Objects.nonNull(this.blackholeTcpServer)) {
            this.blackholeTcpServer.stop();
        }
    }

    @Test
    void testGracefulShutdownBlockingIOTaskDemo() throws InterruptedException {

        //Start BlockingIOTask for 4 threads
        final int NUM = 4;
        ExecutorService es = Executors.newFixedThreadPool(NUM);
        BlockingIOTask tasks[] = new BlockingIOTask[NUM];
        for (int i = 0; i < NUM; i++) {
            tasks[i] = new BlockingIOTask(i, this.blackholeTcpServerPort);
            es.submit(tasks[i]);
        }

        // ExecutorService.shutdown()First, submit a new task()Prohibit(=Closed the entrance)。
        es.shutdown();

        //Wait 50ms and check if the task is finished.
        //Of course, read()The task is not finished because it is blocked waiting.
        assertFalse(es.awaitTermination(50, TimeUnit.MILLISECONDS));

        // ExecutorService.shutdownNow()To call. Interrupt the thread that is executing the task.
        List<Runnable> l = es.shutdownNow();
        assertEquals(l.size(), 0); //Since no new task has been submitted, the number of tasks waiting to be executed will be 0.

        // ExecutorService.awaitTermination()Is returning false
        // read()It can be seen that the waiting blocking is not canceled by the thread interrupt.
        assertFalse(es.awaitTermination(50, TimeUnit.MILLISECONDS));

        //Try Thread.interrupt()Is called manually.
        for (int i = 0; i < NUM; i++) {
            tasks[i].interruptThread();
        }

        //After all, read with thread interrupt()Wait blocking is not released.
        assertFalse(es.awaitTermination(50, TimeUnit.MILLISECONDS));

        // Socket.close()Try to call from the outside.
        for (int i = 0; i < NUM; i++) {
            tasks[i].closeSocket();
        }
        // ExecutorService.awaitTermination()Returns true, so
        //You can see that all the tasks have been completed.
        assertTrue(es.awaitTermination(50, TimeUnit.MILLISECONDS));

        //Actually check the flag status of the task. First, the done flags are all true.
        assertTrue(tasks[0].done);
        assertTrue(tasks[1].done);
        assertTrue(tasks[2].done);
        assertTrue(tasks[3].done);
        //Further Thread.isInterrupted()All the flags that saved the are also true,
        //interrupt was accepted, but read()It turns out that the wait was not released.
        assertTrue(tasks[0].isInterrupted);
        assertTrue(tasks[1].isInterrupted);
        assertTrue(tasks[2].isInterrupted);
        assertTrue(tasks[3].isInterrupted);
    }
}

This is a simple experiment memo.

Recommended Posts

Study Note / Waiting for read () in Java Socket InputStream is not released by Thread.interrupt ()
9 strongest sites for learning Java by self study
Read the packet capture obtained by tcpdump in Java
A note for Initializing Fields in the Java tutorial