[JAVA] Pourquoi la suspension synchronisée ne fonctionne pas dans Kotlin

Qu'est-ce qui ne va pas

Tout d'abord, veuillez consulter le code ci-dessous.

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

suspend fun doSomething(i: Int) {
    println("#$i enter critical section.")

    // do something critical
    delay(1000)

    println("#$i exit critical section.")
}

fun main() = runBlocking {
    repeat(2) { i ->
        launch(Dispatchers.Default) {
            println("#$i thread name: ${Thread.currentThread().name}")
            doSomething(i)
        }
    }
}

J'ai créé deux coroutines dans la fonction principale et j'ai appelé la fonction doSomething de manière asynchrone. Le résultat de l'exécution est le suivant.

#0 thread name: DefaultDispatcher-worker-1
#1 thread name: DefaultDispatcher-worker-2
#0 enter critical section.
#1 enter critical section.
#1 exit critical section.
#0 exit critical section.

L'étiquette est sortie au début afin que vous puissiez voir quel collout a été exécuté. Vous pouvez voir que deux sections critiques s'exécutent en même temps car aucun contrôle exclusif n'est appliqué.

Essayez le contrôle exclusif en ajoutant l'annotation synchronisée à la fonction de suspension.

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

@Synchronized
suspend fun doSomething(i: Int) {
    println("#$i enter critical section.")

    // do something
    delay(1000)

    println("#$i exit critical section.")
}

fun main() = runBlocking {
    repeat(2) { i ->
        launch(Dispatchers.Default) {
            println("#$i thread name: ${Thread.currentThread().name}")
            doSomething(i)
        }
    }
}

Je le ferai.

#0 thread name: DefaultDispatcher-worker-2
#0 enter critical section.
#1 thread name: DefaultDispatcher-worker-1
#1 enter critical section.
#0 exit critical section.
#1 exit critical section.

Malgré l'annotation Sychronisée, il y a deux threads dans la section critique en même temps.

Est-il ignoré même si la fonction de suspension est annotée avec Syncrhonized? Je pense, et essayez d'appliquer un contrôle exclusif en utilisant la fonction syncrhonisée à l'intérieur de la fonction de suspension.

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

val LOCK = Object()

suspend fun doSomething(i: Int) {
    synchronized(LOCK) {
        println("#$i enter critical section.")

        // do something
        delay(1000) // <- The 'delay' suspension point is inside a critical section

        println("#$i exit critical section.")
    }
}

fun main() = runBlocking {
    repeat(2) { i ->
        launch(Dispatchers.Default) {
            println("#$i thread name: ${Thread.currentThread().name}")
            doSomething(i)
        }
    }
}

Cette fois, le message "Le point de suspension du" délai "est à l'intérieur d'une section critique" est émis et une erreur de compilation se produit.

Solution

Je voudrais d'abord présenter une solution, puis expliquer pourquoi cela n'a pas fonctionné.

Utilisez Mutex lorsque vous souhaitez appliquer un contrôle exclusif dans la fonction de suspension.

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

val mutex = Mutex()

suspend fun doSomething(i: Int) {
    mutex.withLock {
        println("#$i enter critical section.")

        // do something
        delay(1000) // <- The 'delay' suspension point is inside a critical section

        println("#$i exit critical section.")
    }
}

fun main() = runBlocking {
    repeat(2) { i ->
        launch(Dispatchers.Default) {
            println("#$i thread name: ${Thread.currentThread().name}")
            doSomething(i)
        }
    }
}

Résultat de l'exécution.

#0 thread name: DefaultDispatcher-worker-1
#1 thread name: DefaultDispatcher-worker-2
#1 enter critical section.
#1 exit critical section.
#0 enter critical section.
#0 exit critical section.

Il n'y a qu'un seul thread dans la section critique.

Comment la fonction de suspension est compilée

Pourquoi le contrôle exclusif n'a-t-il pas bien fonctionné avec la fonction de suspension avec l'annotation synchronisée?

Apparemment, cela a à voir avec la façon dont la fonction de suspension est compilée en code octet. À première vue, la fonction de suspension ressemble à une fonction très spéciale, mais lorsqu'elle est compilée, elle est bien sûr convertie en fonction normale.

Vous trouverez ci-dessous le pseudo code qui obtient le jeton du serveur API et lance un message. Jetons un coup d'œil à la façon dont la fonction de suspension est compilée à l'aide de ce code.

class Item()
class Post()

suspend fun requestToken(): String {
    // get token from api
    return "token"
}

suspend fun createPost(token: String, item: Item): Post {
    // create post
    return Post()
}

fun processPost(post: Post) {
    // do post
}

suspend fun postItem(item: Item) {
    val token = requestToken()
    val post = createPost(token, item)
    processPost(post)
}

Tout d'abord, si vous essayez d'écrire ce code sans utiliser la fonction de suspension, vous obtiendrez ce qui suit.

class Item()
class Post()

fun requestToken(callback: (String) -> Unit) {
    // get token from api
    //En réalité, cela ressemble probablement à ceci.
    // service.getToken(username, password, object : ResponseCallback {
    //      override fun onSuccess(response: Response) {
    //          val token = getToken(response)
    //          callback(token)
    //      }
    // })
    callback("token")
}

fun createPost(token: String, item: Item, callback: (Post) -> Unit) {
    // create post
    callback(Post())
}

fun processPost(post: Post) {
    // do post
}

fun postItem(item: Item) {
    requestToken { token ->
        createPost(token, item) { post ->
            processPost(post)
        }
    }
}

L'accès au réseau et le traitement intensif sont effectués en arrière-plan et le rappel est appelé lorsque le résultat est renvoyé. Il semble que cela s'appelle le style de passage continu (CPS). C'est juste un rappel.

La fonction de suspension est également convertie en CPS et compilée comme décrit ci-dessus au moment de la compilation. Cependant, il est un peu différent du code ci-dessus.

Voyons d'abord comment la signature de la fonction de suspension est transformée.

// kotlin
suspend fun createPost(token: String, item: Item): Post { ... }

// Java/JVM
Object createPost(String token, Item item, Continuation<Post> cont) { ... }

Toutes les fonctions de suspension acceptent désormais les nouveaux arguments de type Coutinueation comme décrit ci-dessus. Cette Continuation \ <Post > équivaut à un rappel. La continuation est une face intérieure définie dans la bibliothèque de Kotlin. Jetons un coup d'œil au contenu.

interface Continuation<in T> {
    val context: CoroutineContext
    fun resume(value: T)
    fun resumeWithException(exception: Throwable)
}

Il a un CoroutineContext (qui contient des informations telles que le thread sur lequel il sera exécuté) et deux rappels. Jusqu'à présent, la même chose s'est produite en gros, comme si vous l'aviez corrigé manuellement avec le style de rappel ci-dessus.

Cependant, observer comment la partie de traitement de la fonction postItem est transformée fait une légère différence.

Ci-dessous, examinons étape par étape comment le code à l'intérieur de la fonction postItem est transformé. Tout d'abord, tout le code interne de la fonction postItem devient une énorme instruction switch, étiquetant tous les points de suspension. La fonction postItem renvoie chaque fois qu'elle atteint le point de suspension et est appelée à nouveau au moment opportun. L'image est la suivante.

//image
suspend fun postItem(item: Item, label: Int) {
    switch (label) {
        case 0:
            val token = requestToken()
        case 1:
            val post = createPost(token, item)
        case 2:
            processPost(post)
    }
}

Cependant, dans ce cas, des variables telles que token et post ne peuvent pas être utilisées lors de l'appel suivant, un objet est donc créé pour enregistrer l'état d'exécution dans la fonction. Cet objet est une implémentation de Continuement. Cela semble être appelé une machine à états.

//image
suspend fun postItem(item: Item) {
    val sm = object : ContinueImpl { ... }
    switch (label) {
        case 0:
            val token = requestToken()
        case 1:
            val post = createPost(token, item)
        case 2:
            processPost(post)
    }
}

Comme mentionné précédemment, toutes les fonctions de suspension sont converties en fonctions régulières qui prennent Continuation comme argument. Cette machine d'état est passée à la fonction de suspension qui est appelée en interne.

//image
fun postItem(item: Item, cont: Continuation) {
    val sm = object : ContinueImpl { ... }
    switch (label) {
        case 0:
            requestToken(sm)
        case 1:
            createPost(token, item, sm)
        case 2:
            processPost(post)
    }
}

L'étiquette de la mesure dans laquelle il a été exécuté, l'état interne, etc. doit être sauvegardé dans cette machine à états.

//image
fun postItem(item: Item, cont: Continuation) {
    val sm = object : ContinueImpl { ... }
    switch (sm.label) {
        case 0:
            sm.item = item
            sm.label = 1 // <-Étiquette suivante à exécuter
            requestToken(sm)
        case 1:
            createPost(token, item, sm)
        case 2:
            processPost(post)
    }
}

Puisque ContinueImpl hérite de Continuation, il implémente la reprise. Dans la fonction de reprise de ContineuImpl, postItem est à nouveau appelé avec la machine à états elle-même comme argument. Dans la fonction requestToken, le jeton est stocké dans cette machine à états et la fonction de reprise est appelée lorsque tout le traitement est terminé.

//image
fun postItem(item: Item, cont: Continuation) {
    val sm = object : ContinueImpl { 
        fun resume() {
            postIem(null, this)
        }
     }
    switch (sm.label) {
        case 0:
            sm.item = item
            sm.label = 1
            requestToken(sm)
        case 1:
            createPost(token, item, sm)
        case 2:
            processPost(post)
    }
}

S'il est laissé tel quel, l'état interne sera toujours écrasé, donc corrigez-le. Remplacez ContineuImpl par le propre type de cette fonction de suspension appelé ThisSM. Crée une instance uniquement si la continuation passée à l'argument n'est pas de type ThisSM. Cela permet à la fonction postItem d'hériter de son état précédent lorsqu'elle est appelée via la reprise de la machine à états générée en interne. Et dans le cas 1, il reçoit des éléments, des jetons, etc. de la machine d'état.

//image
fun postItem(item: Item, cont: Continuation) {
    val sm = cont as? ThisSM ?: object : ThisSM { 
        fun resume() {
            postIem(null, this)
        }
     }
    switch (sm.label) {
        case 0:
            sm.item = item
            sm.label = 1
            requestToken(sm)
        case 1:
            val item = sm.item
            val token = sm.result as String
            sm.label = 2
            createPost(token, item, sm)
        case 2:
            processPost(post)
    }
}

Ce n'était pas dans l'explication (lien ci-dessous), mais je pense que c'est probablement une implémentation qui appelle le CV de continuation qui a été passé en premier lorsque tout le traitement est terminé. (Je vous serais reconnaissant si vous pouviez signaler si vous avez fait une erreur)

//image
fun postItem(item: Item, cont: Continuation) {
    val sm = cont as? ThisSM ?: object : ThisSM { 
        val initialCont = cont
        fun resume() {
            postIem(null, this)
        }
     }
    switch (sm.label) {
        case 0:
            sm.item = item
            sm.label = 1
            requestToken(sm)
        case 1:
            val item = sm.item
            val token = sm.result as String
            sm.label = 2
            createPost(token, item, sm)
        case 2:
            processPost(post)
            sm.initialCont.reusme()
    }
}

Vous avez maintenant converti la fonction de suspension en une fonction normale.

Pourquoi ça n'a pas marché

Une fois que vous avez une idée approximative de la compilation de la fonction de suspension, vous pouvez comprendre pourquoi le premier code n'a pas fonctionné.

Je republierai le premier code.

@Synchronized
suspend fun doSomething(i: Int) {
    println("#$i enter critical section.")

    // do something
    delay(1000)

    println("#$i exit critical section.")
}

Lorsque ce code sera compilé, il sera traduit comme suit:

//image
@Synchronized
fun doSomething(i: Int, cont: Continuation) {
    val sm = cont as? ThisSM ?: ThisSM { ... }
    switch (sm.label) {
        case 0:
            println("#$i enter critical section.")

            sm.label = 1
            delay(1000, sm)
        case 1:
            println("#$i exit critical section.")
    }
}

Après l'appel du délai, l'objet est déverrouillé lorsque la fonction doSomething est renvoyée une fois.

Par conséquent, une fois la fonction de délai appelée et renvoyée, et jusqu'à ce que la fonction doSomething soit appelée, la fonction doSomething appelée par une autre coroutine peut acquérir le verrou.

Il semblait que plusieurs threads exécutaient la section critique à cause de cela.

Lien de référence

Recommended Posts

Pourquoi la suspension synchronisée ne fonctionne pas dans Kotlin
Grand décimal à Kotlin