Coroutines

Lessons learnt using Coroutines Flow in the Android Dev Summit 2019 app

Best practices we found when using Flow in the ADS 2019 app

This article is about the best practices we found when using Flow in the Android Dev Summit (ADS) 2019 app; which has just been open sourced. Keep reading to find out how each layer of our app handles data streams.

The ADS app architecture follows the recommended app architecture guide, with the addition of a domain layer (of UseCases) which help separate concerns, keeping classes small, focused, reusable and testable:

img Architecture of the ADS 2019 app

Like many Android apps the ADS app lazily loads data from the network or a cache; we found this to be a perfect use case for Flow. For one shot operations, suspend functions were a better fit. There are two main commits that refactor the app to use Coroutines. The first commit migrates one-shot operations, and the second one migrates to data streams.

1. Prefer exposing streams as Flows (not Channels)

There are two ways you can deal with streams of data in coroutines: the Flow API and the Channel API. Channels are a synchronisation primitive whereas Flow is built to model streams of data: it’s a factory for subscriptions to streams of data. Channels can however be used to back a Flow, as we’ll see later.

Prefer exposing Flow since it gives you more flexibility, more explicit contracts and operators than Channel

Flows automatically close the stream of data due to the nature of the terminal operators which trigger the execution of the stream of data and complete successfully or exceptionally depending on all the flow operations in the producer side. Therefore, you can’t (nearly as easily) leak resources on the producer side. This is easier to do with Channels: the producer might not clean up heavy resources if the Channel is not closed properly.

The data layer of an app is responsible for providing data usually by reading from a database or fetching from the Internet. For example here’s a DataSource interface that exposes a stream of user event data:

interface UserEventDataSource {
  fun getObservableUserEvent(userId: String): Flow<UserEventResult>
}

2. How to use Flow in your Android app architecture

UseCase and Repository

The layers in-between View/ViewModel and the DataSource (i.e. UseCase and Repository in our case) often need to combine data from multiple queries or transform the data before it can be used by the ViewModel layer. Just like Kotlin sequences, Flow supports a large set of operators to transform your data. There are a wealth of operators already available, or you can create your own transformation (e.g. using the transform operator). However, Flow exposes suspend lambdas on many of the operators, there’s often no need to make a custom transform to accomplish complex tasks, just call suspend functions from inside your Flow.

In our ADS example, we want to combine the UserEventResult with session data in the Repository layer. We use the map operator to apply a suspend lambda to each value of the Flow retrieved from DataSource:

/* Copyright 2019 Google LLC.   
   SPDX-License-Identifier: Apache-2.0 */
class DefaultSessionAndUserEventRepository(
    private val userEventDataSource: UserEventDataSource,
    private val sessionRepository: SessionRepository
) : SessionAndUserEventRepository {

    override fun getObservableUserEvent(
        userId: String?,
        eventId: SessionId
    ): Flow<Result<LoadUserSessionUseCaseResult>> {
        // Handles null userId

        // Observes the user events and merges them with session data
        return userEventDataSource.getObservableUserEvent(userId, eventId).map { userEventResult ->
            // lambda of the map operator that can call suspend functions
            val event = sessionRepository.getSession(eventId)

            // Merges session with user data and emits the result
            val userSession = UserSession(
                event,
                userEventResult.userEvent ?: createDefaultUserEvent(event)
            )
            Result.Success(LoadUserSessionUseCaseResult(userSession))
        }
    }
}

ViewModel

When performing UI ↔ ViewModel communication with LiveData, the ViewModel layer should consume the stream of data coming from the data layer using a terminal operator (e.g. collect, first or toList).

/* Copyright 2019 Google LLC.   
   SPDX-License-Identifier: Apache-2.0 */
// Simplified version of the real code
class SessionDetailViewModel(
    private val loadUserSessionUseCase: LoadUserSessionUseCase,
    /* ... */
): ViewModel() {

    private fun listenForUserSessionChanges(sessionId: SessionId) {
        viewModelScope.launch {
            loadUserSessionUseCase(sessionId).collect { loadResult ->
                // Update multiple LiveDatas to notify the View
            }
        }
    }
}

See full code here.

If you’re converting a Flow to a LiveData, you can use the Flow.asLiveData() extension function from the androidX lifecycle LiveData ktx library. This is very convenient since it will share a single underlying subscription to the Flow and will manage the subscription based on the observers’ lifecycles. Moreover, LiveData also keeps the most recent value for late-coming observers and the subscription active across configuration changes. Check this simpler code that showcases how you can use the extension function:

class SimplifiedSessionDetailViewModel(
  private val loadUserSessionUseCase: LoadUserSessionUseCase,
  /* ... */
): ViewModel() {

  val sessions = loadUserSessionUseCase(sessionId).asLiveData()

}

Disclaimer: The code snippet above is not part of the app; it’s a simplified version of the code that showcases how you can use Flow.asLiveData().

3. When to use a BroadcastChannel or Flow as an implementation detail

Back to the DataSource implementation, how can we implement the getObservableUserEvent function we exposed above? The team considered two alternatives implementations: the flow builder or the BroadcastChannel API. Each serve different use cases.

When to use Flow

Flow is a cold stream. A cold stream is a data source whose producer will execute for each listener that starts consuming events, resulting in a new stream of data being created on each subscription. Once the consumer stops listening or the producer block finishes, the stream of data will be closed automatically.

Flow is a great fit when the production of data needs to start/stop to match the observer

You can emit a limited or unlimited number of elements using the flow builder.

val oneElementFlow: Flow<Int> = flow {
  // producer block starts here, stream starts
  emit(1)
  // producer block finishes here, stream will be closed
}

val unlimitedElementFlow: Flow<Int> = flow {
  // producer block starts here, stream starts
  while(true) {
    // Do calculations
    emit(result)
    delay(100)
  }
  // producer block finishes here, stream will be closed
}

Flow tends to be used for expensive tasks as it provides automatic cleanup via coroutine cancellation. Notice that this cancellation is cooperative, a flow that never suspends can never be cancelled: in our example, since delay is a suspend function that checks for cancellation, when the subscriber stops listening, the Flow will stop and cleanup resources.

When to use BroadcastChannel

A Channel is a concurrency primitive for communicating between coroutines. A BroadcastChannel is an implementation of Channel with multicast capabilities.

There are some cases where you might want to use an implementation of BroadcastChannel in your data source layer:

Use BroadcastChannel when the producer(s) and consumer(s) have different lifetimes or operate completely independently of each other

The BroadcastChannel API is the perfect fit when you want the producer to follow a different lifecycle and broadcast the current result to anyone who’s listening. In this way, the producer doesn’t need to start every time a new listener starts consuming events.

You can still expose a Flow to the caller, they don’t need to know about how this is implemented. You can use the extension function BroadcastChannel.asFlow() to expose a BroadcastChannel as a Flow.

However, closing that Flow won’t cancel the subscription. When using BroadcastChannel, you have to take care of its lifecycle. They don’t know if there are listeners or not, and will keep resources alive until the BroadcastChannel is cancelled or closed. Make sure to close the BroadcastChannel when it’s no longer needed. Also, remember that a closed channel cannot be active again, you’d need to create a new instance.

An example of how to use the BroadcastChannel API can be found in the next section.

Disclaimer

Parts of the Flow and Channel APIs are still in experimental, they’re likely to change. There are some situations where you would currently use Channels but the recommendation in the future may change to use Flow. Specifically, the StateFlow and Flow’s share operator proposals may reduce the usage of Channel in the future.

4. Convert data streams callback-based APIs to Coroutines

Multiple libraries already support coroutines for data streams operations, including Room. For those that don’t, you can convert any callback-based API to Coroutines.

Flow implementation

If you want to convert a stream callback-based API to use Flow, you can use the channelFlow function (also callbackFlow, which shares the same implementation). channelFlow creates an instance of a Flow whose elements are sent to a Channel. This allows us to provide elements running in a different context or concurrently.

In the following sample, we want to emit the elements that we get from a callback into a Flow:

  1. Create a flow with the channelFlow builder that registers a callback to a third party library.
  2. Emit all items received from the callback to the Flow.
  3. When the subscriber stops listening, we unregister the subscription to the API using the suspend fun awaitClose.
/* Copyright 2019 Google LLC.   
   SPDX-License-Identifier: Apache-2.0 */
override fun getObservableUserEvent(userId: String, eventId: SessionId): Flow<UserEventResult> {

    // 1) Create Flow with channelFlow
    return channelFlow<UserEventResult> {

        val eventDocument = firestore.collection(USERS_COLLECTION)
            .document(userId)
            .collection(EVENTS_COLLECTION)
            .document(eventId)
    
        // 1) Register callback to the API
        val subscription = eventDocument.addSnapshotListener { snapshot, _ ->
            val userEvent = if (snapshot.exists()) {
                parseUserEvent(snapshot)
            } else { null }

            // 2) Send items to the Flow
            channel.offer(UserEventResult(userEvent))
        }

        // 3) Don't close the stream of data, keep it open until the consumer
        // stops listening or the API calls onCompleted or onError. 
        // When that happens, cancel the subscription to the 3P library
        awaitClose { subscription.remove() }
    }
}

See full code here.

BroadcastChannel implementation

For our stream of data that tracks user authentication with Firestore, we used the BroadcastChannel API as we want to register one Authentication listener that follows a different lifecycle and broadcasts the current result to anyone who’s listening.

To convert a callback API to BroadcastChannel you need a bit more code than with Flow. You can create a class where the instance of the BroadcastChannel can be kept in a variable. During initialisation, register the callback that sends elements to the BroadcastChannel as before:

/* Copyright 2019 Google LLC.   
   SPDX-License-Identifier: Apache-2.0 */
class FirebaseAuthStateUserDataSource(...) : AuthStateUserDataSource {

    private val channel = ConflatedBroadcastChannel<Result<AuthenticatedUserInfo>>()

    private val listener: ((FirebaseAuth) -> Unit) = { auth ->
        // Data processing logic

        // Send the current user for observers
        if (!channel.isClosedForSend) {
            channel.offer(Success(FirebaseUserInfo(auth.currentUser)))
        } else {
            unregisterListener()
        }
    }
    
    @Synchronized
    override fun getBasicUserInfo(): Flow<Result<AuthenticatedUserInfo>> {
        if (!isListening) {
            firebase.addAuthStateListener(listener)
            isListening = true
        }
        return channel.asFlow()
    }
}

See full code here.

5. Testing tips

To test Flow transformations (as we do in the UseCase and Repository layers), you can use the flow builder to return fake data. For example:

/* Copyright 2019 Google LLC.
   SPDX-License-Identifier: Apache-2.0 */
object FakeUserEventDataSource : UserEventDataSource {
  override fun getObservableUserEvents(userId: String) = flow {
    emit(UserEventsResult(userEvents))
  }
}

class DefaultSessionAndUserEventRepositoryTest {
  @Test
  fun observableUserEvents_areMappedCorrectly() = runBlockingTest {
    // Prepare repo
    val userEvents = repository
          .getObservableUserEvents("user", true).first()
    // Assert user events
  }
}

To test implementations of Flow successfully, a good idea is to use the take operator to get some items from the Flow and the toList operator as the terminal operator to get the results in a list. See an example of this in the following test:

class AnotherStreamDataSourceImplTest {
  @Test
  fun `Test happy path`() = runBlockingTest {
    // Prepare subject
    val result = subject.flow.take(1).toList()
    // Assert expected result
  }
}

The take operator is a great fit to close the Flow after you get the items. Not closing a started Flow (or BroadcastChannel) after each test will leak memory and creates a flaky and inconsistent test suite.

Note: If the implementation of the DataSource is done with a BroadcastChannel, the code above is not enough. You have to manage its lifecycle by making sure you start the BroadcastChannel before the test and close it after the test finishes. If not, you’ll leak memory. You can see a test like this in this other Flow sample.

Testing Coroutines best practices also apply here. If you create a new coroutine in code under test, you might want to execute it in your test thread for a deterministic execution of your test. Check out more about this in the Testing Coroutines ADS 2019 talk.


Summary

  • Prefer exposing Flow to consumers rather than Channel because of all the explicit contracts and operators Flow provides.

  • With Flow, the producer block will get executed every time there’s a new listener and the lifecycle of the stream of data will be handled automatically.

  • With BroadcastChannel, you can share the producer but you have to manage its lifecycle yourself.

  • Consider converting callback-based APIs to coroutines for a better and more idiomatic integration of the APIs within your app.

  • Easily test implementations of Flow by using the take and toList operators.