I can read it! RxJava

Overview

When I first see the source written in RxJava, the process flow is hard to follow, There are many cases where you cannot understand the part of the logic that you originally want to understand. (At least I was ...)

This article is the main classes and methods of RxJava,

By following the process while actually moving the code, Read the readers! RxJava ”.

Target

――What is RxJava delicious? --I saw the process written in RxJava, but I couldn't understand it. ――I'm having trouble with the library I want to use being reactive -** "I googled with RxJava ..." **

observable.png

** "I don't understand the meaning of this figure! !! 』** I will send it to you.

Execution environment

--Java8 (or higher)

In case of gradle, just write the following.

build.gradle


repositories {
    mavenCentral()
    maven { url 'https://oss.jfrog.org/libs-snapshot' }
}

dependencies {
    compile 'io.reactivex.rxjava2:rxjava:2.2.0-SNAPSHOT'
}

For other people, please refer to the following. https://github.com/ReactiveX/RxJava

In addition, lambda is used as prerequisite knowledge. If you are worried about lambda, please learn by the following. https://qiita.com/sanotyan1202/items/64593e8e981e8d6439d3

Sample code

The sample code is below. It works as it is, so please try it.

https://github.com/youyanntoto/RxJavaSample

In addition, this volume will mainly explain the sample code.

Main story

About the processing contents of the sample code

Main.java


    public static void main(String[] args) {
        System.out.println("*** main start ***");

        RestUtil.Weather tokyoWeather = RestUtil.getWeather(RestUtil.Place.TOKYO);
        RestUtil.Weather yokohamaWeather = RestUtil.getWeather(RestUtil.Place.YOKOHAMA);
        RestUtil.Weather nagoyaWeather = RestUtil.getWeather(RestUtil.Place.NAGOYA);

        System.out.println(tokyoWeather);
        System.out.println(yokohamaWeather);
        System.out.println(nagoyaWeather);

        System.out.println("*** main end ***");
    }

RestUtil.java


    public static Weather getWeather(Place place) {
        Weather weather;

        //Get the weather appropriately for each location
        switch (place) {
            case TOKYO:
                weather = Weather.SUNNY;
                break;
            case YOKOHAMA:
                weather = Weather.RAINY;
                break;
            case NAGOYA:
                weather = Weather.WINDY;
                break;
            default:
                weather = Weather.SUNNY;
                break;
        }

        try {
            //Communication processing time 500~Make it take 999ms
            Thread.sleep(new Random().nextInt(500) + 500);
        } catch (InterruptedException e) {
            // nop
        }

        return weather;
    }

See the Main class. Perform Rest communication (RestUtil # getWeather (RestUtil.Place)) and Performs the process to acquire the weather (RestUtil.Weather) in the specified area.

As you can see in the RestUtil class, the communication process itself is a dummy.

RestUtil.getWeather now takes 500-999ms each. So, if you run Main.java, it will take about 1500-3000ms of 500-999ms x 3.

Now let's reactivate the above code.

Part 1. "Completable" to tell the completion

Sample code

CompletableSample.java


    public static void main(String[] args) {
        System.out.println("*** main start ***");

        Completable.create(emitter -> {
            RestUtil.Weather tokyoWeather = RestUtil.getWeather(RestUtil.Place.TOKYO);
            RestUtil.Weather yokohamaWeather = RestUtil.getWeather(RestUtil.Place.YOKOHAMA);
            RestUtil.Weather nagoyaWeather = RestUtil.getWeather(RestUtil.Place.NAGOYA);

            System.out.println(tokyoWeather);
            System.out.println(yokohamaWeather);
            System.out.println(nagoyaWeather);
            emitter.onComplete();

        }).subscribe(() -> {
            System.out.println("Complete!!");

        }, throwable -> {
            System.out.println("Error!!");
            throwable.printStackTrace();

        });

        System.out.println("*** main end ***");
    }

Commentary

First, the processing of the argument of Completable.create (in the lambda) is executed. The processing itself is the same as the processing of Main.java at the beginning.

Next, emitter.onComplete () By calling, the first argument of subscribe is executed.

By the way, the second argument of subscribe is when you call emitter.onError () Alternatively, it will be executed when an error is thrown in the lambda of the argument of Completable.create. The throwable passed there will be the error object that occurred in the event of an error.

Summary, Processing of Completable.create arguments → Processing of the first argument (onSuccess) of subscribe or processing of the second argument (onError) The process is executed in the flow.

Part 2. "Single" passing one value

Sample code

SingleSample.java


    public static void main(String[] args) {
        System.out.println("*** main start ***");

        Single.<List<RestUtil.Weather>>create(emitter -> {
            RestUtil.Weather tokyoWeather = RestUtil.getWeather(RestUtil.Place.TOKYO);
            RestUtil.Weather yokohamaWeather = RestUtil.getWeather(RestUtil.Place.YOKOHAMA);
            RestUtil.Weather nagoyaWeather = RestUtil.getWeather(RestUtil.Place.NAGOYA);
            emitter.onSuccess(List.of(tokyoWeather, yokohamaWeather, nagoyaWeather));

        }).subscribe(weathers -> {
            System.out.println("Complete!!");
            for (RestUtil.Weather weather : weathers) {
                System.out.println(weather);
            }
            
        }, throwable -> {
            System.out.println("Error!!");
            throwable.printStackTrace();
            
        });

        System.out.println("*** main end ***");
    }

Commentary

Like the Completable of Part 1 Handling of Single.create arguments → Processing of the first argument (onSuccess) of subscribe or processing of the second argument (onError) The process is executed in the flow.

The difference is the place where the weather is output.

In Single, you can pass ** only one ** value to the subsequent processing, so List the weather in each area emitter.onSuccess(T) By calling, it is passed to the processing of the first argument of subscribe. In addition, the type of the argument passed there is specified by the generics at the time of Single # create.

If you have dealt with Promise in JavaScript, the image is similar.

Part 3. (Extra edition) "Stream"

Sample code

StreamSample.java


/**
 * sample for Stream
 */
    public static void main(String[] args) {
        System.out.println("*** main start ***");

        RestUtil.Weather tokyoWeather = RestUtil.getWeather(RestUtil.Place.TOKYO);
        RestUtil.Weather yokohamaWeather = RestUtil.getWeather(RestUtil.Place.YOKOHAMA);
        RestUtil.Weather nagoyaWeather = RestUtil.getWeather(RestUtil.Place.NAGOYA);

        Stream.of(List.of(tokyoWeather, yokohamaWeather, nagoyaWeather))
                .forEach(weather -> {
                    System.out.println(weather + "");
                });

        System.out.println("*** main end ***");
    }

Commentary

Before we talk about Observables, let's learn a little about Stream. If you already understand Stream, skip it.

Then, the flow of processing, First, get the weather for each district and generate a list, It is streamed and the weather of each district is output.

Since the form in which the values flow in sequence is close to Observable, which will be explained next, Please understand the process flow.

Part 4. "Observable" that passes multiple values

Sample code

ObservableSample.java


    public static void main(String[] args) {
        System.out.println("*** main start ***");

        Observable.<RestUtil.Weather>create(emitter -> {
            RestUtil.Weather tokyoWeather = RestUtil.getWeather(RestUtil.Place.TOKYO);
            emitter.onNext(tokyoWeather);

            RestUtil.Weather yokohamaWeather = RestUtil.getWeather(RestUtil.Place.YOKOHAMA);
            emitter.onNext(yokohamaWeather);

            RestUtil.Weather nagoyaWeather = RestUtil.getWeather(RestUtil.Place.NAGOYA);
            emitter.onNext(nagoyaWeather);

            emitter.onComplete();

        }).subscribe(weather -> {
            System.out.println("Next!!");
            System.out.println(weather);

        }, throwable -> {
            System.out.println("Error!!");
            throwable.printStackTrace();

        }, () -> {
            System.out.println("Complete!!");

        });

        System.out.println("*** main end ***");
    }

Commentary

Unlike conventional Completable and Single, There are three arguments for subscribe.

As a processing flow The Observable.create process is executed first. After that, the first argument of subscribe is executed each time emitter.onNext (T) is called. Finally, when emitter.onComplete () is called, the third argument of subscribe is executed. The second argument is the process that is called when an error occurs as before.

Summary, Observable.create processing → Processing of the first argument (onNext) of subscribe or processing of the second argument (onError) → Processing of the third argument (onComplete) of subscribe The process is executed in the flow.

Since multiple values can be passed to Observable, the value can be output each time communication is performed.

Part 5. "subscribeOn", "observeOn" to specify the thread

Sample code

ThreadSample.java


    public static void main(String[] args) {
        System.out.println("*** main start ***");

        Observable.<RestUtil.Weather>create(emitter -> {
            RestUtil.Weather tokyoWeather = RestUtil.getWeather(RestUtil.Place.TOKYO);
            emitter.onNext(tokyoWeather);

            RestUtil.Weather yokohamaWeather = RestUtil.getWeather(RestUtil.Place.YOKOHAMA);
            emitter.onNext(yokohamaWeather);

            RestUtil.Weather nagoyaWeather = RestUtil.getWeather(RestUtil.Place.NAGOYA);
            emitter.onNext(nagoyaWeather);

            emitter.onComplete();

        }).subscribeOn(Schedulers.io())
                .observeOn(Schedulers.newThread())
                .subscribe(weather -> {
                    System.out.println("Next!!");
                    System.out.println(weather);

                }, throwable -> {
                    System.out.println("Error!!");
                    throwable.printStackTrace();

                }, () -> {
                    System.out.println("Complete!!");

                });

        System.out.println("*** main end ***");
    }

Commentary

In the source code explained in Part 4 "subscribeOn", "observeOn" Has been added.

There are some advantages of RxJava, The biggest of these is that the process of specifying this thread is easy to write.

"subscribeOn" specifies the processing thread in Observable.create, "observeOn" can specify the processing thread in subscribe.

In the above sample, communication processing is performed by IO thread, Subsequent processing (processing that uses communication results) is executed by another thread.

As a scene that is often used, in Android development, communication processing is performed in a separate thread, It is a standard practice that the main thread performs the processing that handles the View in the subsequent processing.

Part 6. Convert the value "map"

Sample code

ObservableMapSample.java


    public static void main(String[] args) {
        System.out.println("*** main start ***");

        Observable.<RestUtil.Weather>create(emitter -> {
            RestUtil.Weather tokyoWeather = RestUtil.getWeather(RestUtil.Place.TOKYO);
            emitter.onNext(tokyoWeather);

            RestUtil.Weather yokohamaWeather = RestUtil.getWeather(RestUtil.Place.YOKOHAMA);
            emitter.onNext(yokohamaWeather);

            RestUtil.Weather nagoyaWeather = RestUtil.getWeather(RestUtil.Place.NAGOYA);
            emitter.onNext(nagoyaWeather);

            emitter.onComplete();

        }).map(weather -> {
            switch (weather) {
                case SUNNY:
                    return "happy day!!";
                case RAINY:
                    return "sad day...";
                case WINDY:
                default:
                    return "normal day";
            }

        }).subscribe(weather -> {
            System.out.println("Next!!");
            System.out.println(weather);

        }, throwable -> {
            System.out.println("Error!!");
            throwable.printStackTrace();

        }, () -> {
            System.out.println("Complete!!");

        });

        System.out.println("*** main end ***");
    }

Commentary

A new method map has been added to the source code of Part 4. You can see that you can actually move it, The value passed to the first argument of subscribe has been ** converted **.

As you can see, what is being converted is the process passed to map.

By receiving the flowing value and returning the converted value in the return clause, the map You can convert the value to be sent to the subsequent processing.

Part 7. Connect "flatMap"

Sample code

ObservableFlatMapSample.java


    public static void main(String[] args) {
        System.out.println("*** main start ***");

        Observable.<RestUtil.Place>create(emitter -> {
            emitter.onNext(RestUtil.Place.TOKYO);
            emitter.onNext(RestUtil.Place.YOKOHAMA);
            emitter.onNext(RestUtil.Place.NAGOYA);
            emitter.onComplete();

        }).flatMap(place -> {
            return Observable.create(emitter -> {
                RestUtil.Weather tokyoWeather = RestUtil.getWeather(place);
                emitter.onNext(tokyoWeather);
                emitter.onComplete();
            });

        }).subscribe(weather -> {
            System.out.println("Next!!");
            System.out.println(weather);

        }, throwable -> {
            System.out.println("Error!!");
            throwable.printStackTrace();

        }, () -> {
            System.out.println("Complete!!");

        });

        System.out.println("*** main end ***");
    }

Commentary

A new method flatMap has been added to the source code of Part 4. The value passed in the first Observable.create has changed from weather to region. And another Observable is generated in flatMap.

The processing flow is The first Observable process is executed, Every time emitter.onNext (T) is called The processing in the second Observable that is returned in the flatMap is performed. In other words, flatMap connects the first Observable and the second Observable.

In this example, the communication point is only the second Observable, so there is no particular taste, but Actually, DB processing is performed by the first Observable, Communication processing is performed with the second Observable It is used in scenes where separate time-consuming processing is performed.

At the end

With the above, "Read! RxJava ”is over.

The world of RxJava is wide and still growing, so The content of this article is not enough to cover everything, You should be able to understand the basics by combining and applying the classes and methods introduced here. (At least in the sense of being readable)

I hope I can help anyone who is going to jump into the world of RxJava.

Recommended Posts

I can read it! RxJava
I want to use NetBeans on Mac → I can use it!
I will post it lightly
I read module-info.java in java.base
I read the Kotlin startbook
I tried node-jt400 (IFS read)
I read the source of ArrayList I read
I read the source of Integer
I read the source of Long
Can you do it? Java EE
I read the source of Short
I read the source of Byte
I read the source of String
I can deploy Heroku, but I get an error and can't open it
I can't use SQS that can read the questionnaire with a scanner → I could use it after changing the java ver