Les threads en attente de read () sur Java Socket InputStream attendent les E / S en mode de blocage. J'ai essayé de savoir comment libérer ceci, c'est-à-dire libérer l'attente read () et poursuivre le processus en lançant une exception.
--Conclusion: L'interruption du thread en attente de read () à l'aide de Thread.interrupt () n'avait aucun effet et pouvait être libérée en appelant Socket.close ().
Voici un mémo expérimental.
Tout d'abord, créez un serveur TCP de type trou noir qui continue de lire les données reçues du socket client et ne renvoie aucune donnée. Exemple: https://github.com/msakamoto-sf/javasnack/blob/master/src/main/java/javasnack/tool/BlackholeTcpServer.java
Ensuite, créez une tâche qui se connecte au serveur TCP ci-dessus en mode bloquant et attend read () en implémentant l'interface Runnable
. À ce moment, ajoutez une méthode pour interrompre le thread à partir d'un thread (= externe) autre que le thread exécutant la tâche, ou pour fermer le Socket en attente de read ().
Exemple:
class BlockingIOTask implements Runnable {
final int no;
final int remotePort;
volatile Socket clientSocket = null;
volatile Thread currentThread = null;
volatile boolean isInterrupted = false;
volatile boolean done = false;
public BlockingIOTask(final int no, final int remotePort) {
this.no = no;
this.remotePort = remotePort;
}
/**
*Lire de l'extérieur()Une méthode pour interrompre un thread en attente
*/
public void interruptThread() {
if (Objects.nonNull(currentThread)) {
currentThread.interrupt();
}
}
/**
*Lire de l'extérieur()Fermez le socket en attente()Méthode pour
*/
public void closeSocket() {
if (Objects.nonNull(clientSocket) && clientSocket.isConnected()) {
try {
clientSocket.close();
} catch (IOException ignore) {
}
}
}
@Override
public void run() {
//Enregistrez la référence de thread exécutant cette tâche
currentThread = Thread.currentThread();
InetSocketAddress connectTo = new InetSocketAddress("127.0.0.1", this.remotePort);
clientSocket = new Socket();
try {
clientSocket.connect(connectTo);
//Pour le moment, envoyez les données appropriées.
OutputStream out = clientSocket.getOutputStream();
out.write(new byte[] { 0x00, 0x01, 0x02 });
out.write(new byte[] { 0x03, 0x04, 0x05 });
out.flush();
InputStream in = clientSocket.getInputStream();
// read()Attendez
in.read();
} catch (IOException e) {
//Assertion pour TestNG pour une intégration ultérieure dans le code de test.
assertTrue(e instanceof SocketException);
assertEquals(e.getMessage(), "Socket closed");
} finally {
if (clientSocket.isConnected()) {
try {
clientSocket.close();
} catch (IOException ignore) {
}
}
}
this.isInterrupted = currentThread.isInterrupted();
this.done = true;
}
}
Vérifiez le comportement réel en l'incorporant dans le code de test (à l'aide de TestNG). Voir les commentaires dans le code pour une explication détaillée.
public class TestGracefulShutdownBlockingIOTaskDemo {
BlackholeTcpServer blackholeTcpServer = null;
int blackholeTcpServerPort = 0;
@BeforeClass
public void beforeClass() throws IOException {
//Avant d'exécuter cette classe de test, démarrez le serveur TCP du trou noir et
//Notez le numéro de port d'écoute attribué au hasard.
BlackholeTcpServer server = new BlackholeTcpServer();
this.blackholeTcpServerPort = server.start();
}
@AfterClass
public void afterClass() {
if (Objects.nonNull(this.blackholeTcpServer)) {
this.blackholeTcpServer.stop();
}
}
@Test
void testGracefulShutdownBlockingIOTaskDemo() throws InterruptedException {
//Démarrer BlockingIOTask pour 4 threads
final int NUM = 4;
ExecutorService es = Executors.newFixedThreadPool(NUM);
BlockingIOTask tasks[] = new BlockingIOTask[NUM];
for (int i = 0; i < NUM; i++) {
tasks[i] = new BlockingIOTask(i, this.blackholeTcpServerPort);
es.submit(tasks[i]);
}
// ExecutorService.shutdown()Commencez par soumettre une nouvelle tâche()Interdire(=Fermé l'entrée)。
es.shutdown();
//Attendez 50 ms et vérifiez si la tâche est terminée.
//Bien sûr, lisez()La tâche n'est pas terminée car elle est bloquée en attente.
assertFalse(es.awaitTermination(50, TimeUnit.MILLISECONDS));
// ExecutorService.shutdownNow()Appeler. Interrompez le thread qui exécute la tâche.
List<Runnable> l = es.shutdownNow();
assertEquals(l.size(), 0); //Puisqu'aucune nouvelle tâche n'a été soumise, le nombre de tâches en attente d'exécution sera de 0.
// ExecutorService.awaitTermination()Renvoie false
// read()On voit que le blocage en attente n'est pas annulé par l'interruption de thread.
assertFalse(es.awaitTermination(50, TimeUnit.MILLISECONDS));
//Essayez le fil.interrupt()Est appelé manuellement.
for (int i = 0; i < NUM; i++) {
tasks[i].interruptThread();
}
//Après tout, lire dans l'interruption de fil()Le blocage d'attente n'est pas libéré.
assertFalse(es.awaitTermination(50, TimeUnit.MILLISECONDS));
// Socket.close()Essayez d'appeler de l'extérieur.
for (int i = 0; i < NUM; i++) {
tasks[i].closeSocket();
}
// ExecutorService.awaitTermination()Renvoie vrai, donc
//Vous pouvez voir que toutes les tâches sont terminées.
assertTrue(es.awaitTermination(50, TimeUnit.MILLISECONDS));
//Vérifiez en fait l'état du drapeau de la tâche. Premièrement, les drapeaux terminés sont tous vrais.
assertTrue(tasks[0].done);
assertTrue(tasks[1].done);
assertTrue(tasks[2].done);
assertTrue(tasks[3].done);
//Fil supplémentaire.isInterrupted()Tous les drapeaux qui ont sauvé le sont également vrais,
//l'interruption a été acceptée, mais lu()Il s'avère que l'attente n'a pas été relâchée.
assertTrue(tasks[0].isInterrupted);
assertTrue(tasks[1].isInterrupted);
assertTrue(tasks[2].isInterrupted);
assertTrue(tasks[3].isInterrupted);
}
}
Ceci est un simple mémo d'expérience.