[JAVA] Warum funktioniert synchronisiertes Suspend in Kotlin nicht?

Was ist los

Schauen Sie sich zunächst den folgenden Code an.

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

suspend fun doSomething(i: Int) {
    println("#$i enter critical section.")

    // do something critical
    delay(1000)

    println("#$i exit critical section.")
}

fun main() = runBlocking {
    repeat(2) { i ->
        launch(Dispatchers.Default) {
            println("#$i thread name: ${Thread.currentThread().name}")
            doSomething(i)
        }
    }
}

Ich habe zwei Coroutinen in der Hauptfunktion erstellt und die Funktion doSomething asynchron aufgerufen. Das Ausführungsergebnis ist wie folgt.

#0 thread name: DefaultDispatcher-worker-1
#1 thread name: DefaultDispatcher-worker-2
#0 enter critical section.
#1 enter critical section.
#1 exit critical section.
#0 exit critical section.

Das Label wird am Anfang ausgegeben, damit Sie sehen können, welches Collout ausgeführt wurde. Sie können sehen, dass zwei kritische Abschnitte gleichzeitig ausgeführt werden, da kein exklusives Steuerelement angewendet wird.

Versuchen Sie die exklusive Steuerung, indem Sie der Suspended-Funktion die synchronisierte Annotation hinzufügen.

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

@Synchronized
suspend fun doSomething(i: Int) {
    println("#$i enter critical section.")

    // do something
    delay(1000)

    println("#$i exit critical section.")
}

fun main() = runBlocking {
    repeat(2) { i ->
        launch(Dispatchers.Default) {
            println("#$i thread name: ${Thread.currentThread().name}")
            doSomething(i)
        }
    }
}
#0 thread name: DefaultDispatcher-worker-2
#0 enter critical section.
#1 thread name: DefaultDispatcher-worker-1
#1 enter critical section.
#0 exit critical section.
#1 exit critical section.
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

val LOCK = Object()

suspend fun doSomething(i: Int) {
    synchronized(LOCK) {
        println("#$i enter critical section.")

        // do something
        delay(1000) // <- The 'delay' suspension point is inside a critical section

        println("#$i exit critical section.")
    }
}

fun main() = runBlocking {
    repeat(2) { i ->
        launch(Dispatchers.Default) {
            println("#$i thread name: ${Thread.currentThread().name}")
            doSomething(i)
        }
    }
}

Trotz der synchronisierten Annotation befinden sich zwei Threads gleichzeitig im kritischen Abschnitt. Wird es ignoriert, auch wenn die Suspend-Funktion mit Syncrhonized kommentiert ist? Ich denke, und versuche, die exklusive Kontrolle mithilfe der synchronisierten Funktion innerhalb der Suspend-Funktion anzuwenden. Dieses Mal wird die Meldung "Der Verzögerungspunkt der Verzögerung befindet sich in einem kritischen Abschnitt" ausgegeben und ein Kompilierungsfehler tritt auf. Lösung Ich möchte zuerst die Lösung vorstellen und dann erklären, warum sie nicht funktioniert hat. Verwenden Sie Mutex, wenn Sie die exklusive Steuerung in der Suspend-Funktion anwenden möchten.

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

val mutex = Mutex()

suspend fun doSomething(i: Int) {
    mutex.withLock {
        println("#$i enter critical section.")

        // do something
        delay(1000) // <- The 'delay' suspension point is inside a critical section

        println("#$i exit critical section.")
    }
}

fun main() = runBlocking {
    repeat(2) { i ->
        launch(Dispatchers.Default) {
            println("#$i thread name: ${Thread.currentThread().name}")
            doSomething(i)
        }
    }
}

Ausführungsergebnis.

#0 thread name: DefaultDispatcher-worker-1
#1 thread name: DefaultDispatcher-worker-2
#1 enter critical section.
#1 exit critical section.
#0 enter critical section.
#0 exit critical section.

Es gibt nur einen Thread im kritischen Bereich.

Wie die Suspend-Funktion kompiliert wird

Warum funktionierte die exklusive Steuerung mit der Suspend-Funktion mit der synchronisierten Annotation nicht gut?

Anscheinend hat es damit zu tun, wie die Suspend-Funktion in Bytecode kompiliert wird. Auf den ersten Blick sieht die Suspend-Funktion wie eine ganz besondere Funktion aus, aber wenn sie kompiliert wird, wird sie natürlich in eine normale Funktion konvertiert.

Unten ist der Pseudocode, der das Token vom API-Server erhält und einen Beitrag auslöst. Lassen Sie uns einen kurzen Blick darauf werfen, wie die Suspend-Funktion mit diesem Code kompiliert wird.

class Item()
class Post()

suspend fun requestToken(): String {
    // get token from api
    return "token"
}

suspend fun createPost(token: String, item: Item): Post {
    // create post
    return Post()
}

fun processPost(post: Post) {
    // do post
}

suspend fun postItem(item: Item) {
    val token = requestToken()
    val post = createPost(token, item)
    processPost(post)
}

Wenn Sie versuchen, diesen Code ohne Verwendung der Suspend-Funktion zu schreiben, erhalten Sie zunächst Folgendes.

class Item()
class Post()

fun requestToken(callback: (String) -> Unit) {
    // get token from api
    //In Wirklichkeit sieht es wahrscheinlich so aus.
    // service.getToken(username, password, object : ResponseCallback {
    //      override fun onSuccess(response: Response) {
    //          val token = getToken(response)
    //          callback(token)
    //      }
    // })
    callback("token")
}

fun createPost(token: String, item: Item, callback: (Post) -> Unit) {
    // create post
    callback(Post())
}

fun processPost(post: Post) {
    // do post
}

fun postItem(item: Item) {
    requestToken { token ->
        createPost(token, item) { post ->
            processPost(post)
        }
    }
}

Der Netzwerkzugriff und die umfangreiche Verarbeitung erfolgen im Hintergrund, und der Rückruf wird aufgerufen, wenn das Ergebnis zurückgegeben wird. Es scheint, dass dies als Continuation-Passing-Stil (CPS) bezeichnet wird. Es ist nur ein Rückruf.

Die Suspend-Funktion wird ebenfalls in CPS konvertiert und zur Kompilierungszeit wie oben beschrieben kompiliert. Es unterscheidet sich jedoch ein wenig vom obigen Code.

Lassen Sie uns zunächst sehen, wie die Signatur der Suspend-Funktion transformiert wird.

// kotlin
suspend fun createPost(token: String, item: Item): Post { ... }

// Java/JVM
Object createPost(String token, Item item, Continuation<Post> cont) { ... }

Alle Suspend-Funktionen akzeptieren jetzt neue Argumente vom Typ Coutinueation, wie oben beschrieben. Diese Fortsetzung \ entspricht einem Rückruf. Fortsetzung ist eine Innenseite, die in Kotlins Bibliothek definiert ist. Werfen wir einen Blick auf den Inhalt.

interface Continuation<in T> {
    val context: CoroutineContext
    fun resume(value: T)
    fun resumeWithException(exception: Throwable)
}

Es hat einen CoroutineContext (der Informationen enthält, z. B. welchen Thread es ausgeführt wird) und zwei Rückrufe. Bisher ist im Grunde dasselbe passiert, als ob Sie es manuell auf den obigen Rückrufstil festgelegt hätten.

Die Beobachtung, wie der Verarbeitungsteil der postItem-Funktion transformiert wird, macht jedoch einen kleinen Unterschied.

Im Folgenden sehen wir uns Schritt für Schritt an, wie der Code in der postItem-Funktion transformiert wird. Erstens wird der gesamte interne Code der postItem-Funktion zu einer riesigen switch-Anweisung, die alle Suspend-Punkte kennzeichnet. Die postItem-Funktion kehrt jedes Mal zurück, wenn sie den Suspend-Punkt erreicht, und wird zum entsprechenden Zeitpunkt erneut aufgerufen. Das Bild ist wie folgt.

//Bild
suspend fun postItem(item: Item, label: Int) {
    switch (label) {
        case 0:
            val token = requestToken()
        case 1:
            val post = createPost(token, item)
        case 2:
            processPost(post)
    }
}

In diesem Fall können Variablen wie Token und Post jedoch nicht beim nächsten Aufruf verwendet werden. Daher wird ein Objekt erstellt, um den Ausführungsstatus innerhalb der Funktion zu speichern. Dieses Objekt ist eine Implementierung von Continuement. Es scheint eine Zustandsmaschine genannt zu werden.

//Bild
suspend fun postItem(item: Item) {
    val sm = object : ContinueImpl { ... }
    switch (label) {
        case 0:
            val token = requestToken()
        case 1:
            val post = createPost(token, item)
        case 2:
            processPost(post)
    }
}

Wie bereits erwähnt, werden alle Suspend-Funktionen in reguläre Funktionen konvertiert, die Continuation als Argument verwenden. Diese Zustandsmaschine wird an die intern aufgerufene Suspend-Funktion übergeben.

//Bild
fun postItem(item: Item, cont: Continuation) {
    val sm = object : ContinueImpl { ... }
    switch (label) {
        case 0:
            requestToken(sm)
        case 1:
            createPost(token, item, sm)
        case 2:
            processPost(post)
    }
}

Das Etikett, wie weit es ausgeführt wurde, der interne Status usw. sollten in dieser Statusmaschine gespeichert werden.

//Bild
fun postItem(item: Item, cont: Continuation) {
    val sm = object : ContinueImpl { ... }
    switch (sm.label) {
        case 0:
            sm.item = item
            sm.label = 1 // <-Nächstes Label, das ausgeführt werden soll
            requestToken(sm)
        case 1:
            createPost(token, item, sm)
        case 2:
            processPost(post)
    }
}

Da ContinueImpl von Continuation erbt, wird die Wiederaufnahme implementiert. Innerhalb der Resume-Funktion von ContineuImpl wird postItem erneut mit der Zustandsmaschine selbst als Argument aufgerufen. In der requestToken-Funktion wird das Token in dieser Zustandsmaschine gespeichert und die Wiederaufnahmefunktion wird aufgerufen, wenn die gesamte Verarbeitung abgeschlossen ist.

//Bild
fun postItem(item: Item, cont: Continuation) {
    val sm = object : ContinueImpl { 
        fun resume() {
            postIem(null, this)
        }
     }
    switch (sm.label) {
        case 0:
            sm.item = item
            sm.label = 1
            requestToken(sm)
        case 1:
            createPost(token, item, sm)
        case 2:
            processPost(post)
    }
}

Wenn es so bleibt, wie es ist, wird der interne Status immer überschrieben. Korrigieren Sie ihn daher. Ändern Sie ContineuImpl in den eigenen Typ dieser Suspend-Funktion namens ThisSM. Erstellt eine Instanz nur, wenn die an das Argument übergebene Fortsetzung nicht vom Typ ThisSM ist. Dadurch kann die postItem-Funktion ihren vorherigen Status erben, wenn sie über den intern generierten Statusmaschinen-Lebenslauf aufgerufen wird. Und im Fall 1 Block empfängt es Gegenstände, Token usw. von der Zustandsmaschine.

//Bild
fun postItem(item: Item, cont: Continuation) {
    val sm = cont as? ThisSM ?: object : ThisSM { 
        fun resume() {
            postIem(null, this)
        }
     }
    switch (sm.label) {
        case 0:
            sm.item = item
            sm.label = 1
            requestToken(sm)
        case 1:
            val item = sm.item
            val token = sm.result as String
            sm.label = 2
            createPost(token, item, sm)
        case 2:
            processPost(post)
    }
}

Es war nicht in der Erklärung (Link unten), aber ich denke, es ist wahrscheinlich eine Implementierung, die den Lebenslauf der Fortsetzung aufruft, der zuerst übergeben wurde, wenn die gesamte Verarbeitung abgeschlossen ist. (Ich würde es begrüßen, wenn Sie darauf hinweisen könnten, wenn Sie einen Fehler gemacht haben)

//Bild
fun postItem(item: Item, cont: Continuation) {
    val sm = cont as? ThisSM ?: object : ThisSM { 
        val initialCont = cont
        fun resume() {
            postIem(null, this)
        }
     }
    switch (sm.label) {
        case 0:
            sm.item = item
            sm.label = 1
            requestToken(sm)
        case 1:
            val item = sm.item
            val token = sm.result as String
            sm.label = 2
            createPost(token, item, sm)
        case 2:
            processPost(post)
            sm.initialCont.reusme()
    }
}

Sie haben jetzt die Suspend-Funktion in eine reguläre Funktion konvertiert.

Warum es nicht funktioniert hat

Wenn Sie eine ungefähre Vorstellung davon haben, wie die Suspend-Funktion kompiliert wird, können Sie verstehen, warum der erste Code nicht funktioniert hat.

Ich werde den ersten Code erneut veröffentlichen.

@Synchronized
suspend fun doSomething(i: Int) {
    println("#$i enter critical section.")

    // do something
    delay(1000)

    println("#$i exit critical section.")
}

Wenn dieser Code kompiliert wird, wird er wie folgt übersetzt:

//Bild
@Synchronized
fun doSomething(i: Int, cont: Continuation) {
    val sm = cont as? ThisSM ?: ThisSM { ... }
    switch (sm.label) {
        case 0:
            println("#$i enter critical section.")

            sm.label = 1
            delay(1000, sm)
        case 1:
            println("#$i exit critical section.")
    }
}

Nach dem Aufruf von delay wird das Objekt entsperrt, wenn die Funktion doSomething einmal zurückkehrt.

Daher kann die von einer anderen Coroutine aufgerufene Funktion doSomething die Sperre erhalten, nachdem die Verzögerungsfunktion aufgerufen und zurückgegeben wurde und bis die Funktion doSomething aufgerufen wird.

Aus diesem Grund schien es, als würden mehrere Threads den kritischen Abschnitt ausführen.

Referenzlink

Recommended Posts

Warum funktioniert synchronisiertes Suspend in Kotlin nicht?
Große Dezimalstelle in Kotlin