Efficient Synchronization of Shared State with Coroutines

Synchronizing a complex state in Kotlin can be achieved using a combination of coroutines and shared flow properties. This approach leverages the concurrency strengths of coroutines and the excellent state management capabilities provided by the Kotlin Flow API.

Kotlin coroutines provide a way of writing asynchronous code in a sequential manner. When dealing with a complex state, one of the main challenges is the synchronization of access to shared mutable state. With coroutines, you can achieve concurrency control since coroutines can be executed concurrently using a multi-threaded dispatcher.

When dealing with state management, using StateFlow and SharedFlow is an efficient way to handle the state mutations and dissemination of these state updates. Using these constructs you can create a shared, mutable state that can be safely observed from multiple points in your code, including from different coroutine contexts.

Observing Changes to the Shared Counter

Let’s consider a function which is designed to execute a specific block of code concurrently across multiple coroutines. There are two primary challenges that need to be managed effectively. First, it’s critical to ensure that concurrent modifications to the shared state are handled in a thread-safe manner. Second, one must ensure that all observers consistently receive the latest changes.

/**
 * This function launches a given suspending [action] [n] times in parallel,
 * and then waits for all the coroutines to complete.
 *
 * @param n number of coroutines to launch
 * @param k times an action is repeated by each coroutine
 * @param action the action to repeat
 */
suspend fun parallelRun(n: Int = 3, k: Int = 5, action: suspend () -> Unit) {
    val time = measureTimeMillis {
        coroutineScope { // scope for coroutines
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")
}

To better illustrate our goal, imagine there’s a counter that is being incremented concurrently by several coroutines. At the same time, there are three other coroutines dedicated solely to monitoring changes to this counter, reporting the previous and the updated value each time the counter increments.

As the counter is being updated, our listeners report on the changes:

Coroutine 1 receives state update - old value: 0, new value: 1
Coroutine 2 receives state update - old value: 0, new value: 1
Coroutine 3 receives state update - old value: 0, new value: 1
Coroutine 1 receives state update - old value: 1, new value: 2
Coroutine 1 receives state update - old value: 2, new value: 3
Coroutine 2 receives state update - old value: 1, new value: 2
// etc

Core Building Blocks

Starting with the obvious, the counter needs no explanation 🙂

private var value = 0

Next, let’s represent the change. Although we could utilize a Pair for this purpose, we’re opting to define a dedicated class for convenience and clarity.

data class StateUpdate(val oldValue: Int, val newValue: Int)

Now comes the tricky part, we want to ensure that the changes can be distributed to the subscribed listeners. This is where SharedFlow becomes a crucial component.

class MyState {
    private val _state = MutableSharedFlow<StateUpdate>(
        // No replay needed, we want to track changes 
        // from the moment we subscribe
        replay = 0,
        // Some buffer capacity to avoid dropping updates
        extraBufferCapacity = 64
    )
    val state: SharedFlow<StateUpdate> get() = _state

    // The counter
    private var value = 0

    // There's more
}

SharedFlow serves the purpose of capturing every emitted value regardless of the number of collectors and their timings. The main difference between StateFlow and SharedFlow is that the former always has a value, while the latter can have multiple values (buffers), ensuring that all the updates are delivered. In our example, we cache up to 64 values of the observed counter.

Incrementing the Counter

It’s essential to ensure that the counter is incremented in a thread-safe fashion.

class MyState {
    private val _state = MutableSharedFlow<StateUpdate>(
        // No replay needed, we want to track changes 
        // from the moment we subscribe
        replay = 0,        
        // Some buffer capacity to avoid dropping updates
        extraBufferCapacity = 64
    )
    val state: SharedFlow<StateUpdate> get() = _state
    private var value = 0

    private val mutex = Mutex()

    suspend fun increment() {
        mutex.withLock {
            val oldValue = value
            val newValue = oldValue + 1
            _state.emit(StateUpdate(oldValue, newValue))
            value = newValue
        }
    }
}

We’ve added a Mutex instance to synchronize access to the _state variable, and we’re using the withLock extension function instead of the synchronized block. The withLock function acquires the lock when entered, and releases it when leaving. This prevents potential race conditions when increment() is invoked simultaneously from multiple threads, making it thread-safe. With each increment, we now emit both the old and the new value.

Observing the Changes

Subscribing to the state changes is straightforward. The code below starts three coroutines, each of which listens for state changes. As we saw in the previous section, the state variable of type SharedFlow emits an update. Calling myState.state.collect is the way to start collecting the emitted values. In the section parallelRun we trigger a concurrent increment of our shared state, the counter.

fun main() = runBlocking {
    val myState = MyState()

    // Start n coroutines that observe state updates
    repeat(3) {
        launch {
            myState.state.collect { (oldValue, newValue) ->
                println("Coroutine ${it + 1} receives state update - old value: $oldValue, new value: $newValue")
            }
        }
    }
    // Periodically increment the state
    parallelRun {
        delay(100)
        myState.increment()
    }
}

Why Not Volatile?

You might wonder why to bother with all this complexity. Couldn’t we somehow make use of a much simpler means, namely the @Volatile annotation?

Well, the @Volatile annotation is used in Kotlin (and Java; as a keyword) to indicate that a variable should always be read from / written to the main memory, not the CPU cache. Flagging a variable as @Volatile ensures that changes to the variable are always visible to other threads. It’s often used for variables accessed by multiple threads. But it doesn’t provide any atomic operations: for instance, count++ or count + 1 is a combination of two operations and isn’t atomic.

StateFlow is a type-safe, lifecycle-safe API from kotlinx.coroutines. While @Volatile makes sure every read / write is from / to the main memory (but not atomic), StateFlow is a flow that represents a read-only state with a single-updatable data value that emits updates to the value to its collectors. It goes beyond @Volatile by not only providing thread safety, but also allowing Flow-like transformations (like map, filter, transform, etc). Its main advantage is that it abstracts and handles concurrency and thread safety, making it easier to write safe concurrent code. The flow is always active and in a specific state, hence the name.

Summary

In our example, we wanted to track a history of changes. This is why we leveraged another construct from the coroutines library, the SharedFlow. It’s a hot flow because its active instance exists independently of the presence of collectors. It’s called shared because it should be shared among multiple collectors and they can start / stop collecting at any point. Whereas a StateFlow always has a specific value, and starting to collect the flow will first return the current value, a SharedFlow doesn’t have a specified current value. Collectors that start to collect the flow will just get values that are sent to the flow from the point when they start collecting.

In summary, while @Volatile annotation provides a simple form of memory safety for concurrent code, StateFlow and SharedFlow not only handles synchronization and safety in multi-threaded code more securely and conveniently, but they also allow for far richer transformations and operations on data.

Thanks for reading. Feel free to check out the source code from GitHub and experiment.

Related Post

Leave a Reply

Your email address will not be published. Required fields are marked *

×