Kotlin Channels: An Alternative to BlockingQueues

When dealing with concurrent programming in Kotlin, it’s essential to understand how to safely share data across different threads. Two commonly compared approaches are BlockingQueue and Channel. Both serve as a means for data sharing but they cater to different programming paradigms.

What are BlockingQueues?

A BlockingQueue is a construct available in the Java concurrency library. It is a thread-safe queue that blocks the calling thread trying to add elements when the queue is full, or when trying to take elements from an empty queue. This blocking mechanism is a straightforward way to coordinate between producer and consumer threads.

import java.util.concurrent.BlockingQueue

class Producer(private val queue: BlockingQueue<Int>) : Runnable {
    private val logger = LoggerFactory.getLogger(this::class.java)
    override fun run() {
        for (i in 0..15) {
            queue.put(i)    // Blocks when the queue is full
            logger.info("Produced: $i")
        }
        queue.put(POISON_PILL)  // Signal the consumer to finish
        logger.info("Producer finished")
    }
}

class Consumer(private val queue: BlockingQueue<Int>) : Runnable {
    override fun run() {
        // Wait a bit before starting to consume
        Thread.sleep(2000)
        while (true) {
            val i = queue.take()    // Blocks when the queue is empty
            if (i == POISON_PILL) {
                logger.info("Consumer finished")
                return
            } else {
                logger.info("Consumed: $i")
            }
        }
    }
}

const val POISON_PILL = Int.MIN_VALUE

The above code leverages Java concurrency library. It is executed using multiple OS-level threads, where the BlockingQueue is used for synchronization. The put() and take() methods in a BlockingQueue will block their respective calling threads if the queue is full or empty, respectively.

To ensure that the program terminates, the consumer needs a signal that there are no more items to be consumed. Typically, this is done using a special value, a so called poison pill, that, when received, causes the consumer to exit. The producer sends this value after sending all of its items.

Next, we run the program passing a blocking queue with a buffer of ten elements, just to prove the point that the producer will block when the buffer fills up.

import java.util.concurrent.LinkedBlockingQueue

fun main() {
    // Create a shared queue with max capacity of 10 elements
    val queue: BlockingQueue<Int> = LinkedBlockingQueue(10)

    val producer = Thread(Producer(queue)).apply { name = "producer" }
    val consumer = Thread(Consumer(queue)).apply { name = "consumer" }

    // Start both threads
    producer.start()
    consumer.start()

    // Wait to finish
    producer.join()
    consumer.join()

    logger.info("Main finished")
}

When we run the program we can see that it uses three different threads: main and two other ones for the producer and the consumer. Another observation is that the producer pauses when the queue’s buffer fills up. The producer eventually puts all of the items to the queue, sends a terminal value and finishes. The consumer gradually fetches all of the queued elements and terminates once it swallows the poison pill.

[producer] INFO  Produced: 0
[producer] INFO  Produced: 1
[producer] INFO  Produced: 2
[producer] INFO  Produced: 3
[producer] INFO  Produced: 4
[producer] INFO  Produced: 5
[producer] INFO  Produced: 6
[producer] INFO  Produced: 7
[producer] INFO  Produced: 8
[producer] INFO  Produced: 9
[producer] INFO  Produced: 10
[consumer] INFO  Consumed: 0
[consumer] INFO  Consumed: 1
[producer] INFO  Produced: 11
[consumer] INFO  Consumed: 2
[consumer] INFO  Consumed: 3
[producer] INFO  Produced: 12
[producer] INFO  Produced: 13
[producer] INFO  Produced: 14
[consumer] INFO  Consumed: 4
[consumer] INFO  Consumed: 5
[consumer] INFO  Consumed: 6
[consumer] INFO  Consumed: 7
[consumer] INFO  Consumed: 8
[consumer] INFO  Consumed: 9
[producer] INFO  Produced: 15
[consumer] INFO  Consumed: 10
[consumer] INFO  Consumed: 11
[producer] INFO  Producer finished
[consumer] INFO  Consumed: 12
[consumer] INFO  Consumed: 13
[consumer] INFO  Consumed: 14
[consumer] INFO  Consumed: 15
[consumer] INFO  Consumer finished
[main] INFO  Main finished

With this approach we used multiple OS-level threads to achieve our goal of implementing a simple producer-consumer app. It works, but it doesn’t scale as much as what we can achieve with Kotlin coroutines.

Enter Kotlin Channels

Kotlin introduces a conceptually similar construct known as a Channel. Channels are part of Kotlin coroutines, which are a powerful feature for managing asynchronous computations and processes. Like a BlockingQueue and threads, a Channel provides a way for coroutines to communicate with each other, passing data back and forth.

The key difference, however, is that rather than blocking the thread when operations cannot proceed immediately, Channel operations are non-blocking. They’re designed to work with Kotlin’s suspending functions, which allow the current coroutine to be suspended without blocking the underlying thread.

Non-Blocking Operations

The equivalent of put in channel terminology is send, and for take, it is receive. These operations do not block the thread; instead, they suspend the coroutine, allowing other coroutines to continue running on the same thread while waiting for the operation to complete. This is a more efficient use of threads, which could be scarce resources. Especially on platforms with a limited number of threads, like Android. Here’s an implementation of our app using a Channel in Kotlin.

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.slf4j.LoggerFactory

fun main() = runBlocking<Unit> {
    val logger = LoggerFactory.getLogger("BlockingConsumerProducer")

    val channel = Channel<Int>(capacity = 10)

    // Coroutine for sending
    launch {
        for (x in 0..15) {
            channel.send(x) // suspends when the channel is full
            logger.info("Sent: $x")
        }
        channel.close() // Close the channel to indicate no more elements will be sent
        logger.info("Producer finished")
    }

    // Coroutine for receiving
    launch {
        // Wait a bit before starting to consume
        delay(2000)
        for (y in channel) { // iterates until the channel is closed
            logger.info("Received: $y")
        }
        logger.info("Consumer finished")
    }
}

If we inspect the output we find out that there’s a single (main) thread and two coroutines that suspend and resume based on the very same rules as we observed before (buffer full vs empty). Here’s the output:

[main @coroutine#2] INFO  Sent: 0
[main @coroutine#2] INFO  Sent: 1
[main @coroutine#2] INFO  Sent: 2
[main @coroutine#2] INFO  Sent: 3
[main @coroutine#2] INFO  Sent: 4
[main @coroutine#2] INFO  Sent: 5
[main @coroutine#2] INFO  Sent: 6
[main @coroutine#2] INFO  Sent: 7
[main @coroutine#2] INFO  Sent: 8
[main @coroutine#2] INFO  Sent: 9
[main @coroutine#3] INFO  Received: 0
[main @coroutine#3] INFO  Received: 1
[main @coroutine#3] INFO  Received: 2
[main @coroutine#3] INFO  Received: 3
[main @coroutine#3] INFO  Received: 4
[main @coroutine#3] INFO  Received: 5
[main @coroutine#3] INFO  Received: 6
[main @coroutine#3] INFO  Received: 7
[main @coroutine#3] INFO  Received: 8
[main @coroutine#3] INFO  Received: 9
[main @coroutine#3] INFO  Received: 10
[main @coroutine#2] INFO  Sent: 10
[main @coroutine#2] INFO  Sent: 11
[main @coroutine#2] INFO  Sent: 12
[main @coroutine#2] INFO  Sent: 13
[main @coroutine#2] INFO  Sent: 14
[main @coroutine#2] INFO  Sent: 15
[main @coroutine#2] INFO  Producer finished
[main @coroutine#3] INFO  Received: 11
[main @coroutine#3] INFO  Received: 12
[main @coroutine#3] INFO  Received: 13
[main @coroutine#3] INFO  Received: 14
[main @coroutine#3] INFO  Received: 15
[main @coroutine#3] INFO  Consumer finished

Summary

Kotlin’s Channel and Java’s BlockingQueue both facilitate the producer-consumer pattern in concurrent programming. However, channels align with Kotlin’s non-blocking philosophy and are better suited for cooperative multitasking using coroutines.

By using send and receive methods, you can write code that is more responsive and scalable. When converting from traditional threaded models to coroutine-based concurrency, it’s essential to grasp the nuances of these constructs to avoid pitfalls. More on that in the posts to come.

Thanks for reading and stay tuned for a deeper dive into channels. Source code is available on GitHub.

Related Post

Leave a Reply

Your email address will not be published. Required fields are marked *

×