Think of RxJava as a library that makes asynchronous processing easier to write

RxJava Advent Calendar 2017 Yesterday was @ toastkidjp's "Process Runtime Permission with RxPermissions". The third day is @ guignol's "RxAndroid and RxSwing Scheduler". An interesting story about the implementation of Scheduler.

No one was running for the second day, so it's already been a while, but I'll write an article.

Overview

Knowing "what is this useful for?" Will improve your motivation when studying. I thought there might be a way to start studying after knowing "What kind of library can RxJava do?" Let's think of it as a Java library that makes it easier to write.

Target

People who are interested in RxJava but haven't used it yet (especially Android app developers)

Not covered in this article

--Reactive programming

What to write in this article

Description of RxJava as a library that makes asynchronous processing easier to write

Notice

--Use an Lambda expression --Sample code is written in Java. Lambda expressions should be used as it is a library that makes heavy use of functional interfaces.


What makes RxJava different from traditional ones?

--You can write by connecting processes with a method chain like Stream API --Easy to write waits for multiple asynchronous processes --You can easily write another thread execution for some processing ... You can change the processing thread in small units. --Delayed execution is possible


Introduction

RxJava is provided as a library and can be installed using Maven or Gradle.

Requirements

It must be the following version or later.

Name Version
Java 1.6-
Android 2.3-

RxJava is often used in Android application development, but it is not limited to Android and can be used in ordinary Java and Kotlin applications.

Note that RxJava2 has its own functional interface defined, so Java 8 or later Functional interface of java.util.function package (Function, Consumer Etc.) and are not compatible with them.

Add dependency

Just add one line to build.gradle. At the time of writing this article, 2.1.7 was the latest.

app/build.gradle


dependencies {
    implementation 'io.reactivex.rxjava2:rxjava:2.1.7'

For Android app development, it is useful to add another line, RxAndroid. Rather, it doesn't make much sense to include RxJava without it. The main purpose is to use the Scheduler (to specify the thread that executes the process) to execute the process in the main thread of Android. The latest version is 2.0.1.

app/build.gradle


    implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'

If you want to use Lambda expressions, you need to install Retrolambda and Android Gradle Plugin 3.0, but I will omit them here.

license

Both RxJava and RxAndroid It is licensed under the Apache License 2.0. If you want to use it, you need to write the license in the app. For Android apps, com.google.gms: oss-licenses may help.

View open source licenses using com.google.gms: oss-licenses


recipe

I wondered if there are not many articles that use Single (notify one value or completion or error) and Completable (notify completion or error) in the RxJava introductory article, so I will introduce an example using both. To do. I think that the desynchronization of processing at a fine level, which is the specialty of RxJava, can be realized mainly by using Single / Maybe / Completable, and I often use them in actual development.

Make the method that is the bottleneck asynchronous

ExecutorService It can be written like this when processing using the conventional ExecutorService.

Make the method that is the bottleneck asynchronous


final ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(() -> bottleneckMethod());
executor.shutdown();

RxJava This is written in RxJava as follows.

Make the method that is the bottleneck asynchronous(RxJava)


Completable.fromAction(() -> bottleneckMethod())
    .subscribeOn(Schedulers.newThread())
    .subscribe();

Process flow

  1. Execute bottleneckMethod () that you want to process asynchronously Action (Function that operates with no argument and no return value Get an instance of Completable with type interface) as an argument
  2. Specify the execution thread with subscribeOn
  3. Execute processing by subscribe …… Until this method is called, the above processing will not be executed at all (delayed execution)

subscribeOn is a method that specifies the execution thread of the entire series of processes. In the above code, it is specified that a new thread is created and processing is executed there.

Delayed execution

The feature of RxJava is that no processing is executed until subscribe () is called. For example, in the case of the following code

Completable completable = Completable.fromAction(() -> bottleneckMethod())
                .subscribeOn(Schedulers.newThread());
System.out.println("start");
completable.subscribe();

The execution result is as follows.

Execution result


start
RxNewThreadScheduler-1 bottle neck
end.

The result of the println method executed before subscribe () is output to the standard output first.

Use the result of asynchronous processing

ExecutorService Let's write in the same way with the traditional API.

Use the result of asynchronous processing(ExecutorService)


final ExecutorService executor = Executors.newSingleThreadExecutor();
final Future awaitable = executor.submit(() -> anyReturnBottleneckMethod());
executor.shutdown();

try {
    System.out.println(awaitable.get(2, TimeUnit.SECONDS));
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (TimeoutException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}

RxJava The same process is written using RxJava as follows.

Use the result of asynchronous processing(RxJava)


Single.create(emitter -> emitter.onSuccess(anyReturnBottleneckMethod()))
    .subscribeOn(Schedulers.newThread())
    .subscribe(System.out::println, Throwable::printStackTrace);

Since we need to use the processing result, we will use Single to notify the value. It is possible to define the processing when an error occurs in the second argument of the subscribe method.

By the way, you can also write using fromCallable instead of create. Callable is java.util.concurrent.Callable. The same function type that delays the value as Supplier added in Java 8. In the interface, it was added in Java 1.5, so it seems that it was diverted. On the other hand, I'm not sure because Runnable is not used and a functional interface called Action is defined separately.

fromCallable


Single.fromCallable(() -> anyReturnBottleneckMethod())

RxJava can help you when you want to write such asynchronous processing wait code.

The important thing is to use the right tools for the problem you want to solve. ExecutorService is an API suitable for writing code that executes large tasks in parallel, and it is possible that you cannot write a wait for processing like this time, but you are not good at it.

Update the UI with the results of network communication

For example, if you want to execute networkRequest () in an I / O thread and display the result in TextView, you can write as follows.

Update the UI with the results of network communication


Single.create(emitter -> networkRequest())
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(textView::setText)

subscribeOn and observeOn

Both are methods for specifying the execution thread of processing. observeOn modifies the execution thread from the following method. The specification in observeOn takes precedence over the specification in subscribeOn.

In the previous code, the execution thread changes as follows.

Update the UI with the results of network communication


Single.create(emitter -> networkRequest()) // I/O thread
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(textView::setText)        //Main thread

What makes you happy about this?

You might think so.

Important on Android, there are restrictions such as "network communication must be in the background thread" and "view update must be in the main thread", and if you violate it, a runtime exception will occur and the app will crash. I will. There are various mechanisms such as Handler & Looper to avoid the restriction. You can do the same with them, but with RxJava (and RxAndroid) you can write more straightforward code.


Things to consider before implementation

  1. Learning cost
  2. Number of methods

1. Learning cost

Knowledge of Stream API + α is enough for writing, so you don't need to study so much, but it is simply a fairly huge library, so it will take a considerable amount of time to fully understand it. Although it is not Android, I have heard that the company chose the latter because the learning cost of Rx became a bottleneck by weighing the learning cost of RxSwift + the introduction cost of MVVM and the introduction cost of Clean Architecture. It seems that the implementation of Rx series in each language is similar, so it may be advantageous for developers who used in other languages to enter smoothly, but on the contrary, it will be difficult for people who are unfamiliar with Rx to enter. You also need to think about it.

2. Number of methods

Including RxJava2 as of the end of 2017 in an Android app will increase the number of methods by just under 10,000. It's not a small number, and depending on the project, it may exceed the limit and you may be forced to make it MultiDex. In fact, that's the case with the apps I'm developing at work. Once installed, it can be difficult to remove (because it's too convenient), so you should consider alternatives as well. If you're using Kotlin, make sure you check Coroutine (though I haven't).


in conclusion

I introduced RxJava as a "library that makes asynchronous processing easier to write". RxJava, which allows you to specify the execution thread in small units, is widely accepted by Android application developers who have restrictions on the execution thread in the framework.

The important thing is to use the right tools for the problem you want to solve. ExecutorService is a tool for executing large processes in parallel, and RxJava is relatively suitable for asynchronization in small units as discussed here.

reference

Books

"RxJava Reactive Programming" (Shoeisha) …… It's been a while since it was released, but at this point It is the most detailed and easy-to-understand book written in Japanese.

Link

Recommended Posts

Think of RxJava as a library that makes asynchronous processing easier to write
Created a library that makes it easy to handle Android Shared Prefences
Introducing the Spring Boot Actuator, a function that makes the operation of Spring Boot applications easier.
[Java] I want to write asynchronous processing using Promise in Java-Trial of Promise-like grammar of JavaScript-
How to install JavaScript library "select2" that makes multiple selections of selectbox fashionable in Rails 6.0
[1st] RSpec beginners tried to write ModelSpec as a beginner
A collection of patterns that you want to be aware of so as not to complicate the code
The story of making a binding for libui, a GUI library for Ruby that is easy to install
Wasteful processing of collections-I want to give you a chance to write good code. 5 [C # refactoring sample]
I made a GitHub Action that makes it easy to understand the execution result of RSpec