Exploring RxJava in Android: Use Cases of Single, Observable, Map, and Flowable in Network Calls

Manoj Kumar
8 min readNov 4, 2023

--

RxJava is a popular reactive programming library in the Android ecosystem. It simplifies asynchronous programming and provides powerful tools for handling data streams. In this article, we’ll dive into how RxJava’s Single, Observable, map, and Flowable can be effectively used in Android app development, focusing on network calls. We'll also discuss the integration of Hilt for dependency injection and MVVM architecture for clean app structure.

Prerequisites

Before we start, ensure that you have set up Hilt for dependency injection in your Android project and organized it using the MVVM architectural pattern. Hilt simplifies the injection of dependencies, making your code cleaner and more maintainable.

Use Case 1: Single for a Single Network Request

Single is a RxJava type used when you expect a single result or an error. It's ideal for making network requests where you anticipate one response. Let's create a simple example:

Fetching User Profile Data

Let’s say you have an Android app where you want to fetch and display the user’s profile data after they log in. The user profile data includes details such as their name, email, profile picture, and some other information. You want to use Hilt for dependency injection, MVVM for the architecture, and Single from RxJava to handle the network call.

Here’s how you can structure the different components of this use case:

ViewModel: Create a ViewModel to manage the user profile data.

@HiltViewModel
class UserProfileViewModel @Inject constructor(private val userRepository: UserRepository) : ViewModel() {
private val userProfileData = MutableLiveData<UserProfile>()

fun getUserProfileData(): LiveData<UserProfile> {
userRepository.getUserProfile()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
{ userProfile -> userProfileData.postValue(userProfile) },
{ error -> /* Handle error */ }
)
return userProfileData
}
}

Repository: Create a repository that interacts with the network layer and returns a Single for the user profile data.

class UserRepository @Inject constructor(private val userService: UserService) {
fun getUserProfile(): Single<UserProfile> {
return userService.getUserProfile()
.map { response -> response.toUserProfile() }
.onErrorResumeNext { error -> Single.error(UserProfileFetchError(error)) }
}
}

Service: Define a service that makes the network call using Retrofit.

interface UserService {
@GET("user/profile")
fun getUserProfile(): Single<UserProfileResponse>
}

Model: Create models for the data received from the network.

data class UserProfileResponse(
val name: String,
val email: String,
val profilePictureUrl: String
// Other fields
)

data class UserProfile(
val name: String,
val email: String,
val profilePictureUrl: String
// Other fields
)

View: In your activity or fragment, observe the LiveData from the ViewModel to update the UI when the data is available.

class UserProfileActivity : AppCompatActivity() {
@Inject
lateinit var viewModel: UserProfileViewModel

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_user_profile)

viewModel.getUserProfileData().observe(this, { userProfile ->
// Update UI with user profile data
nameTextView.text = userProfile.name
emailTextView.text = userProfile.email
// Set profile picture, and so on
})
}
}

In the above use case, we used Single from RxJava to handle the network call for fetching the user profile data. The repository encapsulates the network call, and the ViewModel manages the interaction with the repository and updates the UI when the data is available. Hilt is used for dependency injection, and MVVM architecture provides a clear separation of concerns.

Use Case 2: Observable for Multiple Network Requests

Observable is used when you expect multiple emissions and want to receive all items emitted by the source. This is suitable for handling multiple network requests simultaneously. Let's create an example where we fetch data from two different endpoints in parallel:

Use Case: Suppose you have a ViewModel in an Android app that needs to fetch data from five different API endpoints concurrently and then combine the results into a single response.

Without zip Operator:

You can use the Observable class to create five parallel requests without using the zip operator. In this case, you'd typically create five Observable instances, each representing a network request, and then merge them together using the merge operator. Here's how you might implement it:

class MyViewModel @ViewModelInject constructor(private val myApiService: MyApiService) : ViewModel() {

fun fetchData() {
val observable1 = myApiService.fetchData1()
val observable2 = myApiService.fetchData2()
val observable3 = myApiService.fetchData3()
val observable4 = myApiService.fetchData4()
val observable5 = myApiService.fetchData5()

Observable.merge(observable1, observable2, observable3, observable4, observable5)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
{ data -> /* Process individual data responses */ },
{ error -> /* Handle error */ }
)
}
}

above code creates five observables for parallel requests and merges them together. The responses are processed individually.

Using zip Operator:

Alternatively, you can use the zip operator to combine the results of the five parallel requests into a single response. This can be useful when you need to wait for all requests to complete before processing the combined result:

class MyViewModel @ViewModelInject constructor(private val myApiService: MyApiService) : ViewModel() {

fun fetchData() {
val observable1 = myApiService.fetchData1()
val observable2 = myApiService.fetchData2()
val observable3 = myApiService.fetchData3()
val observable4 = myApiService.fetchData4()
val observable5 = myApiService.fetchData5()

Observable.zip(
observable1, observable2, observable3, observable4, observable5,
{ result1, result2, result3, result4, result5 ->
/* Combine the results into a single response here */
}
)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
{ combinedResult -> /* Process the combined result */ },
{ error -> /* Handle error */ }
)
}
}

In the above code, the zip operator waits for all five observables to complete and then combines their results. You can process the combined result in the lambda passed to zip. This can be helpful when you need to synchronize the responses from multiple sources.

Use Case 3: map for Data Transformation

The map operator is used to transform emitted items into a different format. This is helpful when you need to map the network response into a more usable domain model. Let's see how it can be used:

Data Transformation and Presentation

you often have data that needs to be transformed before it is presented to the user. This data transformation can involve converting raw data from network requests or databases into a format suitable for UI rendering. The map operator in RxAndroid is often used for this purpose.

Here’s how it typically works:

  1. Data Source: Your app fetches data from various sources, such as network requests, databases, or shared preferences. These data sources are wrapped as observables using RxAndroid.
  2. ViewModel: In the MVVM architecture, the ViewModel is responsible for managing and processing the data. It receives data from the data sources as observables.
  3. Use of map Operator: Within the ViewModel, you can use the map operator to transform the raw data. For example, you might map a list of raw objects into a list of UI models that can be directly bound to the View.
class MyViewModel @ViewModelInject constructor(private val repository: MyRepository) : ViewModel() {
val data: LiveData<List<UiModel>> = repository.getData()
.toFlowable(BackpressureStrategy.BUFFER)
.flatMap { data ->
Observable.fromIterable(data)
.map { rawObject ->
// Transform rawObject into a UiModel
UiModel(rawObject)
}
.toList()
.toObservable()
}
.toLiveData()
}

In the above code:

  • repository.getData() fetches raw data from a data source (e.g., a network request).
  • flatMap is used to flatten the list of raw data, making it iterable.
  • map is used to transform each raw object into a UiModel.
  • The final list of UiModels is sent to the UI layer as LiveData for UI rendering.

UI Layer: The UI layer (Activity/Fragment) observes the LiveData provided by the ViewModel and updates the UI based on the transformed data.

Using the map operator in this scenario allows you to perform data transformation and maintain a clear separation between the data processing in the ViewModel and the UI presentation in the View. Hilt helps in providing dependencies, ensuring that ViewModel and other components can be injected and tested easily.

Use Case 4: Flowable for Real-time Data Streaming

Flowable is a powerful RxJava type for handling asynchronous and potentially long-running data streams that may produce data at a high rate, which could lead to backpressure. Here are some common use cases where Flowable can be beneficial for network calls in Android, along with an explanation of backpressure:

1. Real-time Data Streaming:

Imagine you are building a stock trading app that receives real-time stock price updates. These updates can arrive rapidly, and you need to handle and process them as they come in. In this case, Flowable is a great choice to manage the continuous stream of stock price updates.

@HiltViewModel
class StockViewModel @Inject constructor() : ViewModel() {

private val disposable = CompositeDisposable()
val stockPriceLiveData = MutableLiveData<Double>()

fun startStockUpdates() {
val stockUpdatesFlowable = StockManager.getStockUpdates()

disposable.add(
stockUpdatesFlowable
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
{ price -> stockPriceLiveData.value = price },
{ error -> /* Handle errors */ }
)
)
}

override fun onCleared() {
super.onCleared()
disposable.dispose()
}
}

In the above code example, we use Flowable to simulate real-time stock price updates, and the updates are displayed in the UI.

2. Continuous Sensor Data:

If your app collects data from sensors, such as GPS or accelerometer data, you may receive a constant flow of sensor readings. Using Flowable, you can efficiently handle and process these readings as they arrive.

3. Large Data Pagination:

When dealing with paginated data from a REST API, you may have a large dataset that’s too big to fetch all at once. A Flowable can be used to fetch and display data in smaller chunks as the user scrolls through a list. This approach is especially useful when working with infinite scrolling or paginating data from a database.

Backpressure:

Backpressure is a mechanism to handle situations where the rate at which data is emitted (produced) by a source is faster than the rate at which it can be processed (consumed) by the subscriber. When backpressure occurs, it can lead to issues like memory overload and application crashes.

To address backpressure in Flowable, RxJava provides various backpressure strategies:

  1. Buffer: When a downstream consumer cannot keep up, Flowable buffers items until the consumer is ready to handle them. This strategy can lead to increased memory usage if the buffer size is not limited.
  2. Drop: When the downstream consumer cannot keep up, the Flowable discards items that can't be processed. This strategy can lead to data loss but is memory-efficient.
  3. Latest: The Flowable emits only the most recent item when the downstream consumer can't keep up. This strategy ensures that the consumer gets the latest data but may miss some intermediate data.
  4. Error: This strategy throws an exception when backpressure occurs. It’s useful when you want to handle backpressure explicitly and potentially apply flow control.
  5. None: This is the default behavior and doesn’t handle backpressure explicitly, potentially leading to application crashes if not managed.

To handle backpressure, you can use operators like onBackpressureBuffer, onBackpressureDrop, onBackpressureLatest, or onBackpressureError to specify how you want to manage backpressure in your Flowable.

For network calls with Flowable, you may not always need to explicitly handle backpressure. If your use case involves processing data emitted at a high rate, consider using one of the backpressure strategies based on your specific requirements to ensure that your app remains stable and efficient.

Conclusion

RxJava is a powerful tool for handling asynchronous operations, especially in Android app development. In this article, we explored the use cases of Single, Observable, map, and Flowable in network calls, integrated Hilt for dependency injection, and organized the code using the MVVM architecture. Understanding when and how to use these RxJava constructs can greatly improve your app's efficiency, maintainability, and responsiveness.

--

--

Manoj Kumar
Manoj Kumar

No responses yet