Threads, die im InputStream von Java Socket auf read () warten, warten im Blockierungsmodus auf E / A. Ich habe versucht herauszufinden, wie dies freigegeben werden kann, dh das read () wait zu lösen und mit dem Prozess fortzufahren, indem eine Ausnahme ausgelöst wird.
Unten ist ein experimentelles Memo.
Erstellen Sie zunächst einen Blackhole-ähnlichen TCP-Server, der die vom Client-Socket empfangenen Daten weiter liest und keine Daten zurückgibt. Beispiel: https://github.com/msakamoto-sf/javasnack/blob/master/src/main/java/javasnack/tool/BlackholeTcpServer.java
Erstellen Sie als Nächstes eine Aufgabe, die im Blockierungsmodus eine Verbindung zum oben genannten TCP-Server herstellt und auf read () wartet, indem Sie die Schnittstelle "Runnable" implementieren. Fügen Sie zu diesem Zeitpunkt eine Methode hinzu, um den Thread von einem anderen Thread (= extern) als dem Thread, der die Aufgabe ausführt, zu unterbrechen oder den Socket zu schließen, der auf read () wartet. Beispiel:
class BlockingIOTask implements Runnable {
final int no;
final int remotePort;
volatile Socket clientSocket = null;
volatile Thread currentThread = null;
volatile boolean isInterrupted = false;
volatile boolean done = false;
public BlockingIOTask(final int no, final int remotePort) {
this.no = no;
this.remotePort = remotePort;
}
/**
*Von außen lesen()Eine Methode zum Unterbrechen eines wartenden Threads
*/
public void interruptThread() {
if (Objects.nonNull(currentThread)) {
currentThread.interrupt();
}
}
/**
*Von außen lesen()Schließen Sie die wartende Steckdose()Methode zu
*/
public void closeSocket() {
if (Objects.nonNull(clientSocket) && clientSocket.isConnected()) {
try {
clientSocket.close();
} catch (IOException ignore) {
}
}
}
@Override
public void run() {
//Speichern Sie die Thread-Referenz, die diese Aufgabe ausführt
currentThread = Thread.currentThread();
InetSocketAddress connectTo = new InetSocketAddress("127.0.0.1", this.remotePort);
clientSocket = new Socket();
try {
clientSocket.connect(connectTo);
//Senden Sie vorerst entsprechende Daten.
OutputStream out = clientSocket.getOutputStream();
out.write(new byte[] { 0x00, 0x01, 0x02 });
out.write(new byte[] { 0x03, 0x04, 0x05 });
out.flush();
InputStream in = clientSocket.getInputStream();
// read()Fang an zu warten
in.read();
} catch (IOException e) {
//Behauptung für TestNG zum späteren Einbetten in Testcode.
assertTrue(e instanceof SocketException);
assertEquals(e.getMessage(), "Socket closed");
} finally {
if (clientSocket.isConnected()) {
try {
clientSocket.close();
} catch (IOException ignore) {
}
}
}
this.isInterrupted = currentThread.isInterrupted();
this.done = true;
}
}
Überprüfen Sie das tatsächliche Verhalten, indem Sie es in den Testcode einbetten (mit TestNG). In den Kommentaren im Code finden Sie eine detaillierte Erklärung.
public class TestGracefulShutdownBlockingIOTaskDemo {
BlackholeTcpServer blackholeTcpServer = null;
int blackholeTcpServerPort = 0;
@BeforeClass
public void beforeClass() throws IOException {
//Starten Sie vor dem Ausführen dieser Testklasse den Black-Hole-TCP-Server und
//Notieren Sie sich die zufällig zugewiesene Abhörportnummer.
BlackholeTcpServer server = new BlackholeTcpServer();
this.blackholeTcpServerPort = server.start();
}
@AfterClass
public void afterClass() {
if (Objects.nonNull(this.blackholeTcpServer)) {
this.blackholeTcpServer.stop();
}
}
@Test
void testGracefulShutdownBlockingIOTaskDemo() throws InterruptedException {
//Starten Sie BlockingIOTask für 4 Threads
final int NUM = 4;
ExecutorService es = Executors.newFixedThreadPool(NUM);
BlockingIOTask tasks[] = new BlockingIOTask[NUM];
for (int i = 0; i < NUM; i++) {
tasks[i] = new BlockingIOTask(i, this.blackholeTcpServerPort);
es.submit(tasks[i]);
}
// ExecutorService.shutdown()Senden Sie zunächst eine neue Aufgabe()Verbieten(=Den Eingang geschlossen)。
es.shutdown();
//Warten Sie 50 ms und prüfen Sie, ob die Aufgabe abgeschlossen ist.
//Natürlich lesen()Die Aufgabe ist nicht beendet, da das Warten blockiert ist.
assertFalse(es.awaitTermination(50, TimeUnit.MILLISECONDS));
// ExecutorService.shutdownNow()Anrufen. Unterbrechen Sie den Thread, der die Aufgabe ausführt.
List<Runnable> l = es.shutdownNow();
assertEquals(l.size(), 0); //Da keine neue Aufgabe gesendet wurde, beträgt die Anzahl der Aufgaben, die auf die Ausführung warten, 0.
// ExecutorService.awaitTermination()Gibt false zurück
// read()Es ist ersichtlich, dass die Warteblockierung durch den Thread-Interrupt nicht aufgehoben wird.
assertFalse(es.awaitTermination(50, TimeUnit.MILLISECONDS));
//Versuchen Sie es mit Thread.interrupt()Wird manuell aufgerufen.
for (int i = 0; i < NUM; i++) {
tasks[i].interruptThread();
}
//Immerhin Thread-Interrupt einlesen()Warteblockierung wird nicht freigegeben.
assertFalse(es.awaitTermination(50, TimeUnit.MILLISECONDS));
// Socket.close()Versuchen Sie von außen anzurufen.
for (int i = 0; i < NUM; i++) {
tasks[i].closeSocket();
}
// ExecutorService.awaitTermination()Gibt also true zurück
//Sie können sehen, dass alle Aufgaben abgeschlossen wurden.
assertTrue(es.awaitTermination(50, TimeUnit.MILLISECONDS));
//Überprüfen Sie tatsächlich den Flaggenstatus der Aufgabe. Erstens sind die erledigten Flags alle wahr.
assertTrue(tasks[0].done);
assertTrue(tasks[1].done);
assertTrue(tasks[2].done);
assertTrue(tasks[3].done);
//Weiterer Thread.isInterrupted()Alle Flags, die das gespeichert haben, sind auch wahr.
//Interrupt wurde akzeptiert, aber gelesen()Es stellt sich heraus, dass das Warten nicht freigegeben wurde.
assertTrue(tasks[0].isInterrupted);
assertTrue(tasks[1].isInterrupted);
assertTrue(tasks[2].isInterrupted);
assertTrue(tasks[3].isInterrupted);
}
}
Dies ist ein einfaches Experiment-Memo.
Recommended Posts