Synchronizing a complex state in Kotlin can be achieved using a combination of coroutines and shared flow properties. This approach leverages the concurrency strengths of coroutines and the excellent state management capabilities provided by the Kotlin Flow API.
Kotlin coroutines provide a way of writing asynchronous code in a sequential manner. When dealing with a complex state, one of the main challenges is the synchronization of access to shared mutable state. With coroutines, you can achieve concurrency control since coroutines can be executed concurrently using a multi-threaded dispatcher.
When dealing with state management, using StateFlow
and SharedFlow
is an efficient way to handle the state mutations and dissemination of these state updates. Using these constructs you can create a shared, mutable state that can be safely observed from multiple points in your code, including from different coroutine contexts.
Observing Changes to the Shared Counter
Let’s consider a function which is designed to execute a specific block of code concurrently across multiple coroutines. There are two primary challenges that need to be managed effectively. First, it’s critical to ensure that concurrent modifications to the shared state are handled in a thread-safe manner. Second, one must ensure that all observers consistently receive the latest changes.
/**
* This function launches a given suspending [action] [n] times in parallel,
* and then waits for all the coroutines to complete.
*
* @param n number of coroutines to launch
* @param k times an action is repeated by each coroutine
* @param action the action to repeat
*/
suspend fun parallelRun(n: Int = 3, k: Int = 5, action: suspend () -> Unit) {
val time = measureTimeMillis {
coroutineScope { // scope for coroutines
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
To better illustrate our goal, imagine there’s a counter that is being incremented concurrently by several coroutines. At the same time, there are three other coroutines dedicated solely to monitoring changes to this counter, reporting the previous and the updated value each time the counter increments.
As the counter is being updated, our listeners report on the changes:
Coroutine 1 receives state update - old value: 0, new value: 1
Coroutine 2 receives state update - old value: 0, new value: 1
Coroutine 3 receives state update - old value: 0, new value: 1
Coroutine 1 receives state update - old value: 1, new value: 2
Coroutine 1 receives state update - old value: 2, new value: 3
Coroutine 2 receives state update - old value: 1, new value: 2
// etc
Core Building Blocks
Starting with the obvious, the counter needs no explanation 🙂
private var value = 0
Next, let’s represent the change. Although we could utilize a Pair
for this purpose, we’re opting to define a dedicated class for convenience and clarity.
data class StateUpdate(val oldValue: Int, val newValue: Int)
Now comes the tricky part, we want to ensure that the changes can be distributed to the subscribed listeners. This is where SharedFlow
becomes a crucial component.
class MyState {
private val _state = MutableSharedFlow<StateUpdate>(
// No replay needed, we want to track changes
// from the moment we subscribe
replay = 0,
// Some buffer capacity to avoid dropping updates
extraBufferCapacity = 64
)
val state: SharedFlow<StateUpdate> get() = _state
// The counter
private var value = 0
// There's more
}
SharedFlow
serves the purpose of capturing every emitted value regardless of the number of collectors and their timings. The main difference between StateFlow
and SharedFlow
is that the former always has a value, while the latter can have multiple values (buffers), ensuring that all the updates are delivered. In our example, we cache up to 64 values of the observed counter.
Incrementing the Counter
It’s essential to ensure that the counter is incremented in a thread-safe fashion.
class MyState {
private val _state = MutableSharedFlow<StateUpdate>(
// No replay needed, we want to track changes
// from the moment we subscribe
replay = 0,
// Some buffer capacity to avoid dropping updates
extraBufferCapacity = 64
)
val state: SharedFlow<StateUpdate> get() = _state
private var value = 0
private val mutex = Mutex()
suspend fun increment() {
mutex.withLock {
val oldValue = value
val newValue = oldValue + 1
_state.emit(StateUpdate(oldValue, newValue))
value = newValue
}
}
}
We’ve added a Mutex
instance to synchronize access to the _state
variable, and we’re using the withLock
extension function instead of the synchronized
block. The withLock
function acquires the lock when entered, and releases it when leaving. This prevents potential race conditions when increment()
is invoked simultaneously from multiple threads, making it thread-safe. With each increment, we now emit both the old and the new value.
Observing the Changes
Subscribing to the state changes is straightforward. The code below starts three coroutines, each of which listens for state changes. As we saw in the previous section, the state
variable of type SharedFlow
emits an update. Calling myState.state.collect
is the way to start collecting the emitted values. In the section parallelRun
we trigger a concurrent increment of our shared state, the counter.
fun main() = runBlocking {
val myState = MyState()
// Start n coroutines that observe state updates
repeat(3) {
launch {
myState.state.collect { (oldValue, newValue) ->
println("Coroutine ${it + 1} receives state update - old value: $oldValue, new value: $newValue")
}
}
}
// Periodically increment the state
parallelRun {
delay(100)
myState.increment()
}
}
Why Not Volatile?
You might wonder why to bother with all this complexity. Couldn’t we somehow make use of a much simpler means, namely the @Volatile
annotation?
Well, the @Volatile
annotation is used in Kotlin (and Java; as a keyword) to indicate that a variable should always be read from / written to the main memory, not the CPU cache. Flagging a variable as @Volatile
ensures that changes to the variable are always visible to other threads. It’s often used for variables accessed by multiple threads. But it doesn’t provide any atomic operations: for instance, count++
or count + 1
is a combination of two operations and isn’t atomic.
StateFlow
is a type-safe, lifecycle-safe API from kotlinx.coroutines
. While @Volatile
makes sure every read / write is from / to the main memory (but not atomic), StateFlow
is a flow that represents a read-only state with a single-updatable data value that emits updates to the value to its collectors. It goes beyond @Volatile
by not only providing thread safety, but also allowing Flow-like transformations (like map
, filter
, transform,
etc). Its main advantage is that it abstracts and handles concurrency and thread safety, making it easier to write safe concurrent code. The flow is always active and in a specific state, hence the name.
Summary
In our example, we wanted to track a history of changes. This is why we leveraged another construct from the coroutines library, the SharedFlow
. It’s a hot flow because its active instance exists independently of the presence of collectors. It’s called shared because it should be shared among multiple collectors and they can start / stop collecting at any point. Whereas a StateFlow
always has a specific value, and starting to collect the flow will first return the current value, a SharedFlow
doesn’t have a specified current value. Collectors that start to collect the flow will just get values that are sent to the flow from the point when they start collecting.
In summary, while @Volatile
annotation provides a simple form of memory safety for concurrent code, StateFlow
and SharedFlow
not only handles synchronization and safety in multi-threaded code more securely and conveniently, but they also allow for far richer transformations and operations on data.
Thanks for reading. Feel free to check out the source code from GitHub and experiment.