Simplifying APIs with coroutines and Flow
Learn how to create your own coroutine adapters and see how they work under the hood
If you’re a library author, you might want to make your Java-based or callback-based libraries easier to consume from Kotlin using coroutines and Flow. Alternatively, if you’re an API consumer, you may be willing to adapt a 3rd party API surface to coroutines to make them more Kotlin friendly.
This article covers how to simplify APIs using coroutines and Flow as well as how to build your own adapter using suspendCancellableCoroutine
and callbackFlow
APIs. For the most curious ones, those APIs will be dissected and you’ll see how they work under the hood.
If you prefer to watch a video about this topic, check this one out:
Check existing coroutine adapters
Before writing your own wrappers for existing APIs, check if an adapter or extension function is available for your use case. There are existing libraries with coroutine adapters for common types.
Future types
For future types, there are integrations for Java 8’s CompletableFuture, and Guava’s ListenableFuture. This is not an exhaustive list, search online if an adapter for your future type already exists.
// Awaits completion of CompletionStage without blocking a thread
suspend fun <T> CompletionStage<T>.await(): T
// Awaits completion of ListenableFuture without blocking a thread
suspend fun <T> ListenableFuture<T>.await(): T
With these functions, you can get rid of callbacks and just suspend the coroutine until the future result comes back.
Reactive Streams
For reactive stream libraries, there are integrations for RxJava, Java 9 APIs, and reactive streams libraries.
// Transforms the given reactive Publisher into Flow.
fun <T : Any> Publisher<T>.asFlow(): Flow<T>
These functions convert a reactive stream into Flow.
Android specific APIs
For Jetpack libraries or Android platform APIs, take a look at the Jetpack KTX libraries list. Currently, more than 20 libraries have a KTX version, creating sweet idiomatic versions of Java APIs, ranging from SharedPreferences to ViewModels, SQLite and even Play Core.
Callbacks
Callbacks are a very common solution for asynchronous communication. In fact, we use them for the Java programming language solution in the Running tasks in background thread guide. However, they come with some drawbacks: this design leads to nested callbacks which ends up in incomprehensible code. Also, error handling is more complicated as there isn’t an easy way to propagate them.
In Kotlin, you can simplify calling callbacks using coroutines, but for that, you’ll need to build your own adapter.
Build your own adapter
If you don’t find an adapter for your use case, it’s usually quite straightforward to write your own. For one-shot async calls, use the suspendCancellableCoroutine
API. For streaming data, use the callbackFlow
API.
As an exercise, the following examples will use the Fused Location Provider API from Google Play Services to get location data. The API surface is simple but it uses callbacks to perform async operations. With coroutines, we can get rid of those callbacks that can quickly make our code unreadable when the logic gets complicated.
In case you want to explore other solutions, you can get inspiration from the source code of all the functions linked above.
One-shot async calls
The Fused Location Provider API provides the getLastLocation
method to obtain the last known location. The ideal API for coroutines is a suspend function that returns exactly that.
Note that this API returns a
Task
and there’s already an adapter available for it. However, for learning purposes, we’ll use it as an example.
We can have a better API by creating an extension function on FusedLocationProviderClient
:
suspend fun FusedLocationProviderClient.awaitLastLocation(): Location
As this is a one-shot async operation, we use the suspendCancellableCoroutine
function: a low-level building block for creating suspending functions from the coroutines library.
suspendCancellableCoroutine
executes the block of code passed to it as a parameter, then suspends the coroutine execution while waiting for the signal to continue. The coroutine will resume executing when the resume
or resumeWithException
method is called in the coroutine’s Continuation
object. For more information about continuations, check out the suspend modifier under the hood article.
We use the callbacks that can be added to the getLastLocation
method to resume the coroutine appropriately. See the implementation below:
// Extension function on FusedLocationProviderClient, returns last known location
suspend fun FusedLocationProviderClient.awaitLastLocation(): Location =
// Create a new coroutine that can be cancelled
suspendCancellableCoroutine<Location> { continuation ->
// Add listeners that will resume the execution of this coroutine
lastLocation.addOnSuccessListener { location ->
// Resume coroutine and return location
continuation.resume(location)
}.addOnFailureListener { e ->
// Resume the coroutine by throwing an exception
continuation.resumeWithException(e)
}
// End of the suspendCancellableCoroutine block. This suspends the
// coroutine until one of the callbacks calls the continuation parameter.
}
Note: Although you will also find a non-cancellable version of this coroutine builder in the coroutines library (i.e. suspendCoroutine
), it is preferable to always choose suspendCancellableCoroutine
to handle cancellation of the coroutine scope, or to propagate cancellation from the underlying API.
suspendCancellableCoroutine under the hood
Internally, suspendCancellableCoroutine
uses suspendCoroutineUninterceptedOrReturn
to get the Continuation
of the coroutine inside a suspend function. That Continuation
object is intercepted by a CancellableContinuation
that will control the lifecycle of that coroutine from that point (its implementation has the functionality of a Job
with some restrictions).
After that, the lambda passed to suspendCancellableCoroutine
will be executed and the coroutine will either resume immediately if the lambda returns a result or will be suspended until the CancellableContinuation
is resumed manually from the lambda.
See my own comments in the following code snippet (following the original implementation) to understand what’s happening:
public suspend inline fun <T> suspendCancellableCoroutine(
crossinline block: (CancellableContinuation<T>) -> Unit
): T =
// Get the Continuation object of the coroutine that it's running this suspend function
suspendCoroutineUninterceptedOrReturn { uCont ->
// Take over the control of the coroutine. The Continuation's been
// intercepted and it follows the CancellableContinuationImpl lifecycle now
val cancellable = CancellableContinuationImpl(uCont.intercepted(), /* ... */)
/* ... */
// Call block of code with the cancellable continuation
block(cancellable)
// Either suspend the coroutine and wait for the Continuation to be resumed
// manually in `block` or return a result if `block` has finished executing
cancellable.getResult()
}
To know more about how suspend functions work under the hood, check out the suspend modifier under the hood article.
Streaming data
If instead we wanted to receive periodic location updates (using the requestLocationUpdates
function) whenever the user’s device moves in the real world, we’d need to create a stream of data using Flow. The ideal API would look like this:
fun FusedLocationProviderClient.locationFlow(): Flow<Location>
To convert streaming callback-based APIs to Flow, use the callbackFlow
flow builder that creates a new flow. In the callbackFlow
lambda, we’re in the context of a coroutine, therefore, suspend functions can be called. Unlike the flow
flow builder, channelFlow
allows values to be emitted from a different CoroutineContext
or outside a coroutine, with the offer
method.
Normally, flow adapters using callbackFlow
follow these three generic steps:
- Create the callback that adds elements into the flow using
offer
. - Register the callback.
- Wait for the consumer to cancel the coroutine and unregister the callback.
Applying this recipe to this use case, we get the following implementation:
// Send location updates to the consumer
fun FusedLocationProviderClient.locationFlow() = callbackFlow<Location> {
// A new Flow is created. This code executes in a coroutine!
// 1. Create callback and add elements into the flow
val callback = object : LocationCallback() {
override fun onLocationResult(result: LocationResult?) {
result ?: return // Ignore null responses
for (location in result.locations) {
try {
offer(location) // Send location to the flow
} catch (t: Throwable) {
// Location couldn't be sent to the flow
}
}
}
}
// 2. Register the callback to get location updates by calling requestLocationUpdates
requestLocationUpdates(
createLocationRequest(),
callback,
Looper.getMainLooper()
).addOnFailureListener { e ->
close(e) // in case of error, close the Flow
}
// 3. Wait for the consumer to cancel the coroutine and unregister
// the callback. This suspends the coroutine until the Flow is closed.
awaitClose {
// Clean up code goes here
removeLocationUpdates(callback)
}
}
callbackFlow under the hood
Internally, callbackFlow
uses a channel, which is conceptually very similar to a blocking queue. A channel is configured with a capacity
: the number of elements that can be buffered. The channel created in callbackFlow
has the default capacity of 64 elements. When adding a new element to an already full channel, send
will suspend the producer until there’s space for the new element in the channel whereas offer
won’t add the element to the channel and will return false
immediately.
awaitClose under the hood
Interestingly, awaitClose
uses suspendCancellableCoroutine
under the hood. See my own comments in the following code snippet (following the original implementation) to understand what’s happening:
public suspend fun ProducerScope<*>.awaitClose(block: () -> Unit = {}) {
/* ... */
try {
// Suspend the coroutine with a cancellable continuation
suspendCancellableCoroutine<Unit> { cont ->
// Suspend forever and resume the coroutine successfully only
// when the Flow/Channel is closed
invokeOnClose { cont.resume(Unit) }
}
} finally {
// Always execute caller's clean up code
block()
}
}
Reusing the Flow
Flows are cold and lazy unless specified otherwise with intermediate operators such as conflate
. This means that the builder block will be executed each time a terminal operator is called on the flow. This might not be a huge problem in our case as adding new location listeners is cheap, however, it might make a difference in other implementations.
val FusedLocationProviderClient.locationFlow() = callbackFlow<Location> {
/* ... */
}.shareIn(
// Make the flow follow the applicationScope
applicationScope,
// Emit the last emitted element to new collectors
replay = 1,
// Keep the producer active while there are active subscribers
started = SharingStarted.WhileSubscribed()
)
To learn more about best practices for adding an applicationScope
to your app, check out this article.
Consider creating coroutine adapters to make your APIs or existing APIs concise, readable and Kotlin idiomatic. First check if the adapter is already available and if not, create your own using suspendCancellableCoroutine
for one-shot calls and callbackFlow
for streaming data.
To get hands-on this topic, check out the Building a Kotlin extensions library codelab.