Managing threads in RxJava

Dawid Wiącek
TUI-MobilityHub
Published in
7 min readFeb 26, 2021

--

Time is flying and everything is changing around us — we’ve started using coroutines instead of RxJava in our project. Of course, changing one tool in favour of another requires some time and we still have plenty of places where this library is needed. I suspect others are in the same position, so hope they’ll find this article interesting and useful.

How annoying is it for the users when your application freezes and closes without any reason? If you want to keep your Android application responsive, it is important to avoid using the main thread to perform any operation that may end up keeping it blocked. Network operations and database calls are the most common examples of operations that you should NOT call on the main thread, and RxJava gives you the tools to do it properly.

RxJava is single-threaded by default which means that an Observable(source that emits data) and the chain of operators that we can apply to it will notify its observers on the same thread on which its subscribe() method is called.

Let’s analyse the following block of code:

articlesRepository.getArticles()
.doOnNext { doOnNext(it) }
.filter { filterArticles(it) }
.map { mapArticles(it) }
.subscribe { handleResponse(it) }

where articlesRepository is an object implementing the following interface:

interface ArticlesRepository {
fun getArticles(): Observable<List<Article>>
}

If you run this code, it will be executed by the calling thread, which will be usually main thread (I will assume for the purpose of this post that ‘calling thread’ is the thread which you are using at the moment of subscribing to your RxJava stream). But how can we move it to another thread to not block the current one?

Schedulers

Firstly you have to understand that class Scheduler is the key to managing threads in RxJava. There are many standard schedulers which you can get using Schedulers factory (e.g. Schedulers.io(), Schedulers.computation(), Schedulers.trampoline()) or AndroidSchedulers if you need Android specific one (e.g. AndroidSchedulers.mainThread()).

observeOn()

Instance of Scheduler class is required by observeOn() operator which changes the thread of all operators further downstream.

Most often you will be using variant observeOn(Scheduler) but it is good to know that you can also pass to it delayError and bufferSize parameters (to learn more about it check documentation).

Lets try to add it to our code:

articlesRepository.getArticles() // calling thread
.doOnNext { doOnNext(it) } // calling thread
.observeOn(Schedulers.computation()) // change
.filter { filterArticles(it) } // computation
.map { mapArticles(it) } // computation
.subscribe { handleResponse(it) } // computation

After adding operator observeOn(Schedulers.computation()) thread in our stream was changed to the thread from computation thread pool.

We can add multiple observeOn operators:

articlesRepository.getArticles() // current
.doOnNext { doOnNext(it) } // current
.observeOn(Schedulers.computation()) // change
.filter { filterArticles(it) } // computation
.map { mapArticles(it) } // computation
.observeOn(AndroidSchedulers.mainThread()) // change
.subscribe { handleResponse(it) } // main thread

The main thing to remember about operator observeOn is that it always acts for all operators downstream.

subscribeOn()

Operator subscribeOn changes only the thread which is used when we subscribe to Observable. Lets try to add it to our code:

articlesRepository.getArticles() // io
.subscribeOn(Schedulers.io())
.doOnNext { doOnNext(it) } // io
.filter { filterArticles(it) } // io
.map { mapArticles(it) } // io
.subscribe { handleResponse(it) } // io

Now everything is done by a thread from IO thread pool but let us try to combine it with previously added observeOn operators:

articlesRepository.getArticles() // io
.subscribeOn(Schedulers.io())
.doOnNext { doOnNext(it) } // io
.observeOn(Schedulers.computation()) // change
.filter { filterArticles(it) } // computation
.map { mapArticles(it) } // computation
.observeOn(AndroidSchedulers.mainThread()) // change
.subscribe { handleResponse(it) } // main thread

As you can see now the work is done by thread from IO thread pool only till the first observeOn operator which changes the thread to different one.

Position in the stream

The important thing about subscribeOn operator is that its position in the stream does not matter (in most of the cases — please check my comment about doOnSubscribe). When we change the position of subscribeOn in our code we will see that the work is done on the same thread as before:

articlesRepository.getArticles() // io
.doOnNext { doOnNext(it) } // io
.observeOn(Schedulers.computation()) // change
.filter { filterArticles(it) } // computation
.map { mapArticles(it) } // computation
.observeOn(AndroidSchedulers.mainThread()) // change
.subscribeOn(Schedulers.io())

.subscribe { handleResponse(it) } // main thread

doOnSubscribe

Exception from this rule is doOnSubscribe (if you know more exceptions like this please let me know). Let us see what will happen when we add it to our code:

articlesRepository.getArticles() // io
.doOnNext { doOnNext(it) } // io
.observeOn(Schedulers.computation()) // change
.filter { filterArticles(it) } // computation
.map { mapArticles(it) } // computation
.doOnSubscribe { doOnSubscribe1() } // io
.subscribeOn(Schedulers.io())
.doOnSubscribe { doOnSubscribe2() } // calling thread
.observeOn(AndroidSchedulers.mainThread()) // change
.subscribe { handleResponse(it) } // main thread

Probably you were expecting that doOnSubscribe1() and doOnSubscribe2() will be called on the thread from computation thread pool but as you can see the first one is called on one from IO thread pool and the second one on calling thread. Based on this we can assume that:

  • operator observeOn doesn’t have any influence on doOnSubscribe
  • if doOnSubscribe is called above subscribeOn operator it will be called on the thread provided by subscribeOn operator
  • if doOnSubscribe is called under subscribeOn operator it will be called on the calling thread (I mean the thread on which you are subscribing to your RxJava stream — most commonly main thread)
  • if you don’t provide subscribeOn operator doOnSubscribe will be called on calling thread

Multiple instances

Another feature of subscribeOn operator is that if we have multiple instances of it in our stream most often only the first one will have “practical effect” on it. To prove it let us try to add few more instances to our stream:

articlesRepository.getArticles() // io
.doOnNext { doOnNext(it) } // io
.observeOn(Schedulers.computation()) // change
.filter { filterArticles(it) } // computation
.map { mapArticles(it) } // computation
.subscribeOn(Schedulers.io())
.subscribeOn(Schedulers.computation())
.subscribeOn(Schedulers.newThread())

.observeOn(AndroidSchedulers.mainThread()) // change
.subscribeOn(Schedulers.single())
.subscribe { handleResponse(it) } // main thread

As you can see there is no change in behaviour, only first subscribeOn operator is taken into account. However, it doesn’t mean that the rest of these operators is totally ignored (if you want to learn more about this please read this blog posted by Dávid Karnok).

Emitting items

Another thing worth mentioning about subscribeOn is how it works with Observable.just, Observable.from or Observable.range. All these operators are responsible for emitting some items to our stream and all work which will be done inside them will be called on the thread which we are using when we are subscribing to our stream ( subscribeOn will not change it). Let us try to show it on our example:

Observable.just(createArticles()) // calling thread
.doOnNext { doOnNext(it) } // io
.observeOn(Schedulers.computation()) // change
.filter { filterArticles(it) } // computation
.map { mapArticles(it) } // computation
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()) // change
.subscribe { handleResponse(it) } // main thread

Beside calling subscribeOn(Schedulers.io()) you can see that whatever we decide to do inside method createArticles() will be called on the thread from which one we are subscribing to our stream — not from thread from IO thread pool. If you would like to move this work to other thread you would have had to use defer operator:

Observable.defer { 
Observable.just(createArticles()) // io
}

.doOnNext { doOnNext(it) } // io
.observeOn(Schedulers.computation()) // change
.filter { filterArticles(it) } // computation
.map { mapArticles(it) } // computation
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()) // change
.subscribe { handleResponse(it) } // main thread

Now the work is done on the thread provided by subscribeOn operator. It works similar for create, from and range operators so be careful when you are using them to not end up putting some blocking operations on the main thread.

Subjects

It is not so obvious how operator subscribeOn works with Subjects. Let us take the following example:

val subject = PublishSubject.create<List<Article>>()

val observer = subject
.doOnNext { doOnNext(it) } // ??? thread
.subscribeOn(Schedulers.io())
.test()

subject.onNext(createArticles())

On which thread do you think method doOnNext() will be executed? Probably you are thinking about thread from IO thread pool, but the correct answer is: it will called by the thread on which subject.onNext(createArticles()) is called. If you would like to move method doOnNext() to another thread you would have to use observeOn() operator before doOnNext() operator in the stream:

val subject = PublishSubject.create<List<Article>>()

val observer = subject
.observeOn(Schedulers.io())
.doOnNext { doOnNext(it) } // io
.test()

subject.onNext(createArticles())

The situation is even more complicated if we change PublishSubject to BehaviorSubject or ReplaySubject because when you analyse how this two subjects are works, you will notice that there are two scenarios how method doOnNext() can be invoked — by calling subject.onNext() or when the observer subscribes to it. In each scenario when we use one of this two subjects combined with operator subscribeOn the thread on which method doOnNext() will be called is different.

Operators with default Scheduler

Some operators can take as a parameter Scheduler but if you don’t provide this parameter they will take their default one. Example of such operator is delay. Let us try to use it in our code:

articlesRepository.getArticles() // io
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()) // change
.delay(100, TimeUnit.MILLISECONDS)
.subscribe { handleResponse(it) } // computation

We will see that beside we have used operator observeOn(AndroidSchedulers.mainThread()) to change the thread to the main thread method handleResponse is called on one from computation thread pool. The reason is that we do not provide Scheduler for operator delay so it took its default one, which is Scheduler.io(). There are few more operators which work similar to delay in case of threading, so be careful when you see possibility of passing Scheduler as operator parameter.

The end? Not exactly

This post will give you just briefly explanation how you can manage the threads in RxJava but it will surely not cover all possible scenarios. Let me present some simple example how the things can start to be confusing:

articlesRepository.getArticles() // io
.doOnNext { doOnNext(it) }
.flatMap {
articlesRepository.getArticles() // computation
.subscribeOn(Schedulers.computation())
}
.map { mapArticles(it) } // ??? thread
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()) // change
.subscribe { handleResponse(it) } // computation

Can you answer the question on which thread method mapArticles() will be called? It will be called on thread from computation thread pool but if you are not sure about threading in your chain you should always check it e.g. using this code:

println("CurrentThread: ${Thread.currentThread().name}")

I hope this article will help you a little bit better understand how to manage threads in Rx chains.

--

--