Why Synchronized suspend doesn't work in Kotlin

What's wrong

First of all, please take a look at the code below.

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

suspend fun doSomething(i: Int) {
    println("#$i enter critical section.")

    // do something critical
    delay(1000)

    println("#$i exit critical section.")
}

fun main() = runBlocking {
    repeat(2) { i ->
        launch(Dispatchers.Default) {
            println("#$i thread name: ${Thread.currentThread().name}")
            doSomething(i)
        }
    }
}

I created two coroutines in the main function and called the doSomething function asynchronously. The execution result is as follows.

#0 thread name: DefaultDispatcher-worker-1
#1 thread name: DefaultDispatcher-worker-2
#0 enter critical section.
#1 enter critical section.
#1 exit critical section.
#0 exit critical section.

The label is output at the beginning so that you can see which coroutine was executed. You can see that two critical sections are running at the same time because no exclusive control is applied.

Try exclusive control by adding the Synchronized annotation to the suspend function.

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

@Synchronized
suspend fun doSomething(i: Int) {
    println("#$i enter critical section.")

    // do something
    delay(1000)

    println("#$i exit critical section.")
}

fun main() = runBlocking {
    repeat(2) { i ->
        launch(Dispatchers.Default) {
            println("#$i thread name: ${Thread.currentThread().name}")
            doSomething(i)
        }
    }
}

I will do it.

#0 thread name: DefaultDispatcher-worker-2
#0 enter critical section.
#1 thread name: DefaultDispatcher-worker-1
#1 enter critical section.
#0 exit critical section.
#1 exit critical section.

There are two threads in the critical section at the same time, even though they are annotated with Sychronized.

Is it ignored even if the suspend function is annotated with Syncrhonized? I think, and try to apply exclusive control using the syncrhonized function inside the suspend function.

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

val LOCK = Object()

suspend fun doSomething(i: Int) {
    synchronized(LOCK) {
        println("#$i enter critical section.")

        // do something
        delay(1000) // <- The 'delay' suspension point is inside a critical section

        println("#$i exit critical section.")
    }
}

fun main() = runBlocking {
    repeat(2) { i ->
        launch(Dispatchers.Default) {
            println("#$i thread name: ${Thread.currentThread().name}")
            doSomething(i)
        }
    }
}

This time, the message "The'delay' suspension point is inside a critical section" is output and a compile error occurs.

Solution

I would like to present a solution first and then explain why it didn't work.

If you want to apply exclusive control in the suspend function, use Mutex.

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

val mutex = Mutex()

suspend fun doSomething(i: Int) {
    mutex.withLock {
        println("#$i enter critical section.")

        // do something
        delay(1000) // <- The 'delay' suspension point is inside a critical section

        println("#$i exit critical section.")
    }
}

fun main() = runBlocking {
    repeat(2) { i ->
        launch(Dispatchers.Default) {
            println("#$i thread name: ${Thread.currentThread().name}")
            doSomething(i)
        }
    }
}

Execution result.

#0 thread name: DefaultDispatcher-worker-1
#1 thread name: DefaultDispatcher-worker-2
#1 enter critical section.
#1 exit critical section.
#0 enter critical section.
#0 exit critical section.

There is only one thread that can be properly placed in the critical section.

How the suspend function is compiled

Why didn't exclusive control work well with the suspend function with the Synchronized annotation?

Apparently it has something to do with how the suspend function is compiled into bytecode. At first glance, the suspend function looks like a very special function, but when it is compiled, it is of course converted to a normal function.

Below is the pseudo code that gets the token from the API server and throws a post. Let's take a quick look at how the suspend function is compiled using this code.

class Item()
class Post()

suspend fun requestToken(): String {
    // get token from api
    return "token"
}

suspend fun createPost(token: String, item: Item): Post {
    // create post
    return Post()
}

fun processPost(post: Post) {
    // do post
}

suspend fun postItem(item: Item) {
    val token = requestToken()
    val post = createPost(token, item)
    processPost(post)
}

First of all, if you try to write this code without using the suspend function, you will get the following.

class Item()
class Post()

fun requestToken(callback: (String) -> Unit) {
    // get token from api
    //In reality, it probably looks like this.
    // service.getToken(username, password, object : ResponseCallback {
    //      override fun onSuccess(response: Response) {
    //          val token = getToken(response)
    //          callback(token)
    //      }
    // })
    callback("token")
}

fun createPost(token: String, item: Item, callback: (Post) -> Unit) {
    // create post
    callback(Post())
}

fun processPost(post: Post) {
    // do post
}

fun postItem(item: Item) {
    requestToken { token ->
        createPost(token, item) { post ->
            processPost(post)
        }
    }
}

Network access and heavy processing are done in the background, and callback is called when the result is returned. It seems that this is called Continuation-passing style (CPS). It's just a callback.

The suspend function is also converted to CPS and compiled as described above at compile time. However, it is a little different from the above code.

First, let's see how the signature of the suspend function is transformed.

// kotlin
suspend fun createPost(token: String, item: Item): Post { ... }

// Java/JVM
Object createPost(String token, Item item, Continuation<Post> cont) { ... }

All suspend functions will now accept new Coutinueation type arguments as described above. This Continuation \ <Post > is equivalent to callback. Continuation is an intraface defined in the kotlin library. Let's take a look at the contents.

interface Continuation<in T> {
    val context: CoroutineContext
    fun resume(value: T)
    fun resumeWithException(exception: Throwable)
}

It has a CoroutineContext (which has information such as which thread it will be executed on) and two callbacks. So far, basically the same thing has happened as if you manually reverted to the callback style above.

However, when you observe how the processing part of the postItem function is transformed, it makes a slight difference.

Below, let's take a step-by-step look at how the code inside the postItem function is transformed. First, the entire internal code of the postItem function becomes a huge switch statement, which labels all suspend points. The postItem function returns each time it reaches the suspend point and is called again at the appropriate time. The image is as follows.

//image
suspend fun postItem(item: Item, label: Int) {
    switch (label) {
        case 0:
            val token = requestToken()
        case 1:
            val post = createPost(token, item)
        case 2:
            processPost(post)
    }
}

However, in this case, variables such as token and post cannot be used in the next call, so an object is created to save the execution state inside the function. This object is an implementation of Continuement. It seems to be called a state machine.

//image
suspend fun postItem(item: Item) {
    val sm = object : ContinueImpl { ... }
    switch (label) {
        case 0:
            val token = requestToken()
        case 1:
            val post = createPost(token, item)
        case 2:
            processPost(post)
    }
}

As mentioned earlier, all suspend functions are converted to regular functions that take Continuation as an argument. This state machine is passed to the suspend function that is called internally.

//image
fun postItem(item: Item, cont: Continuation) {
    val sm = object : ContinueImpl { ... }
    switch (label) {
        case 0:
            requestToken(sm)
        case 1:
            createPost(token, item, sm)
        case 2:
            processPost(post)
    }
}

The label of how far it has been executed, the internal state, etc. should be saved in this state machine.

//image
fun postItem(item: Item, cont: Continuation) {
    val sm = object : ContinueImpl { ... }
    switch (sm.label) {
        case 0:
            sm.item = item
            sm.label = 1 // <-Next label to run
            requestToken(sm)
        case 1:
            createPost(token, item, sm)
        case 2:
            processPost(post)
    }
}

Since ContinueImpl inherits from Continuation, it implements resume. Inside the resume function of ContineuImpl, the postItem is called again with the state machine itself as an argument. In the requestToken function, the token is stored in this state machine and the resume function is called when all the processing is completed.

//image
fun postItem(item: Item, cont: Continuation) {
    val sm = object : ContinueImpl { 
        fun resume() {
            postIem(null, this)
        }
     }
    switch (sm.label) {
        case 0:
            sm.item = item
            sm.label = 1
            requestToken(sm)
        case 1:
            createPost(token, item, sm)
        case 2:
            processPost(post)
    }
}

If it is left as it is, the internal state will always be overwritten, so fix it. Change ContineuImpl to this suspend function's own type called ThisSM. Instantiate only if the continuation passed as an argument is not of type ThisSM. This allows the postItem function to inherit its previous state when called via the internally generated state machine resume. And in the case 1 block, we receive items, tokens, etc. from the state machine.

//image
fun postItem(item: Item, cont: Continuation) {
    val sm = cont as? ThisSM ?: object : ThisSM { 
        fun resume() {
            postIem(null, this)
        }
     }
    switch (sm.label) {
        case 0:
            sm.item = item
            sm.label = 1
            requestToken(sm)
        case 1:
            val item = sm.item
            val token = sm.result as String
            sm.label = 2
            createPost(token, item, sm)
        case 2:
            processPost(post)
    }
}

It wasn't in the explanation (link below), but I think it's probably an implementation that calls resume of continuation that was passed first when all the processing is completed. (I would appreciate it if you could point out if you made a mistake)

//image
fun postItem(item: Item, cont: Continuation) {
    val sm = cont as? ThisSM ?: object : ThisSM { 
        val initialCont = cont
        fun resume() {
            postIem(null, this)
        }
     }
    switch (sm.label) {
        case 0:
            sm.item = item
            sm.label = 1
            requestToken(sm)
        case 1:
            val item = sm.item
            val token = sm.result as String
            sm.label = 2
            createPost(token, item, sm)
        case 2:
            processPost(post)
            sm.initialCont.reusme()
    }
}

You have now converted the suspend function into a regular function.

Why it didn't work

Once you have a rough idea of how the suspend function compiles, you can understand why the first code didn't work.

I will repost the first code.

@Synchronized
suspend fun doSomething(i: Int) {
    println("#$i enter critical section.")

    // do something
    delay(1000)

    println("#$i exit critical section.")
}

This code is translated as follows when compiled.

//image
@Synchronized
fun doSomething(i: Int, cont: Continuation) {
    val sm = cont as? ThisSM ?: ThisSM { ... }
    switch (sm.label) {
        case 0:
            println("#$i enter critical section.")

            sm.label = 1
            delay(1000, sm)
        case 1:
            println("#$i exit critical section.")
    }
}

After calling delay, the object is unlocked when the doSomething function returns once.

Therefore, after the delay function is called and returned, and until the doSomething function is called, the doSomething function called by another coroutine can acquire the lock.

This made it seem as if there were multiple threads running the critical section.

Reference link

Recommended Posts

Why Synchronized suspend doesn't work in Kotlin
Big Decimal in Kotlin
[Boostrap] Why doesn't it work? jQuery-chan? ~ Fighting long and long errors ~