Reintroduction to Operators with RxJava Part 1

This article is the 10th article of RxJava Advent Calendar 2016.

I rushed into the RxJava Advent Calendar with momentum, but I was wondering what to write, By the way, I thought I was a little confused and decided to study Operators again.

There are many of them, so I decided to divide them, so I plan to divide them into 5 times in total.

Now, let's take a look at the two things, Creating Observables and Transforming Observables, described in ReactiveX Introduction.

Creating Operators who mainly create new ʻObservable`s

Create An operator who makes ʻObservable` by scratch. On Android, it's the one you often see in asynchronous communication.

The usage is as follows.

public class Create {
    public static void main(String[] args) {
        Observable.create(e -> {
            Person person = new Person();
            person.age = 100;
            person.name = "nshiba";
            e.onNext(person);
            e.onComplete();
        }).subscribe(System.out::println);
    }

    private static class Person {
        int age;
        String name;

        @Override
        public String toString() {
            return name + ":" + String.valueOf(age);
        }
    }
}

output

nshiba:100

It's like passing a value with ʻonNext and calling ʻonComplete at the end.

Also, in case of an error, call ʻonError`.

Defer defer is the operator created when you subscribe to the ʻObservable` to be executed.

Normal create creates a ʻObservable to run on the fly, but defer delays the creation of the ʻObservable itself.

Observable observable = Observable.defer(() -> observer -> {
    observer.onNext("test");
    observer.onComplete();
});

//At this moment you can create a new Observable in defer
observable.subscribe(System.out::println);

Empty/Never/Throw I think these Operators will be used primarily for testing in limited applications.

Empty Create an ʻObservable that has no value but ends normally. That is, only ʻonComplete is called.

Never Create an ʻObservable` that has no value and never terminates.

Throw Creates an ʻObservable` that has no value but exits with the specified error.

From Converts various objects to Observable. I think I often convert lists, so I made a sample with fromArray.

int[] nums = new int[] {1, 2, 3, 4, 5};
Observable
        .fromArray(nums)
        .subscribe(ints -> {
            System.out.println("onNext");
            System.out.println(Arrays.toString(ints));
        },
        throwable -> {
            System.out.println("onError");
        },
        () -> {
            System.out.println("onComplete");
        });

output

onNext
[1, 2, 3, 4, 5]
onComplete

Interval Generates an ʻObservable` that outputs an integer value at specified regular intervals. You can also specify how much to delay first.

Observable
        .interval(1, TimeUnit.SECONDS)
        .subscribe(System.out::print);

output

01234567789...

Just Generate ʻObservable with the object passed directly as an argument. Also, if you pass more than one, ʻonNext will be called accordingly, and if you pass more than one, no error will occur even if the types are not unified.

Observable.just(3, 1, 5, 4, "test")
        .subscribe(num -> {
            System.out.println("onNext: " + num);
        }, throwable -> {
            System.out.println("onError");
        }, () -> {
            System.out.println("onComplete");
        });

output

onNext: 3
onNext: 1
onNext: 5
onNext: 4
onNext: test
onComplete

Range Generates an ʻObservable` that outputs an integer in the specified range.

Observable.range(0, 10)
        .subscribe(i -> {
            System.out.println("onNext: " + i);
        }, throwable -> {
            System.out.println("onError");
        }, () -> {
            System.out.println("onComplete");
        });

output

onNext: 0
onNext: 1
onNext: 2
onNext: 3
onNext: 4
onNext: 5
onNext: 6
onNext: 7
onNext: 8
onNext: 9
onComplete

Repeat Generates ʻObservable` that repeats a specified number of times.

Observable.just(1, 2, 3, 4, 5)
        .repeat(3)
        .subscribe(i -> {
            System.out.println("onNext: " + i);
        }, throwable -> {
            System.out.println("onError");
        }, () -> {
            System.out.println("onComplete");
        });

output

onNext: 1
onNext: 2
onNext: 3
onNext: 1
onNext: 2
onNext: 3
onNext: 1
onNext: 2
onNext: 3
onComplete

Start Create ʻObservablethat outputs the return value of the method that can return the calculated value. There is something similar toCreate, but this one has an arbitrary return value and does not call ʻonNext, ʻonComplete`.

Observable.fromCallable(() -> {
    String str = "java";
    str += ":" + "RxJava";
    return str;
}).subscribe(System.out::println);

output

java:RxJava

Timer Creates an ʻObservable` that outputs the value after a delay of the specified amount of time.

System.out.println(System.currentTimeMillis());
Observable.timer(3, TimeUnit.SECONDS)
        .subscribe(aLong -> {
            System.out.println(System.currentTimeMillis());
        });

output

1480975677330
1480975680651

Transforming Buffer An operator who creates a list by splitting a stream at specified intervals.

Observable.range(1, 5)
        .buffer(3)
        .subscribe(System.out::println);

output

[1, 2, 3]
[4, 5]

FlatMap An operator that processes what comes into the stream and then synthesizes it into a new ʻObservable`.

Observable.just(1, 2, 3)
        .flatMap(i -> Observable.range(i, i * 2))
        .subscribe(System.out::print);

output

122345345678

GroupBy An operator who divides a stream into groups according to conditions. If you return the same value for what you want to be in the same group, it will be in the same group.

Observable.range(1, 10)
        .groupBy(integer -> integer % 3)
        .subscribe(integerIntegerGroupedObservable -> {
            integerIntegerGroupedObservable.toList().subscribe(System.out::println);
        });

output

[3, 6, 9]
[1, 4, 7, 10]
[2, 5, 8]

Map An operator who can change the value flowing into the stream. The difference from FlatMap above is that FlatMap returns ʻObservable and Map` returns the value itself.

Observable.just(1,2,3)
        .map(i -> i * 10)
        .subscribe(System.out::println);

output

10
20
30

Scan Operator who accesses the list sequentially. Access two by two. At first, the first and second are passed to the arguments, and after that, the value passed to the return value in the previous process is passed to the first argument, and the next value is passed to the second argument. The value passed at the time of subscribe is the value passed to the first element + return value.

Observable.range(1, 5)
        .scan((sum, item) -> sum + item)
        .subscribe(System.out::println);

output

1
3
6
10
15

Window An operator that splits a stream at specified intervals and creates a new ʻObservablein the split stream. Similar toBuffer above, but BufferoutputsList , while Window outputs ʻObservable <Integer>.

Observable.range(1,5)
        .window(3)
        .subscribe(integerObservable -> {
            integerObservable.toList().subscribe(System.out::println);
        });

output

[1, 2, 3]
[4, 5]

Finally

I hope I can do other things like this. The source code is open to the public-> nshiba / rx-samples

Also, if you have any mistakes, I would be grateful if you could write them in the comments or issues on github.

Recommended Posts

Reintroduction to Operators with RxJava Part 1
to_ ○
Switch from JSP + JSTL to Thymeleaf
Reintroduction to Operators with RxJava Part 1
Association (1 to 1)! !!
Java to learn with ramen [Part 1]
SaveAsBinaryFile with Spark (Part 2)
Tutorial to create a blog with Rails for beginners Part 1
Tutorial to create a blog with Rails for beginners Part 0
Bean mapping with MapStruct Part 1
Java to play with Function
Bean mapping with MapStruct Part 3
Bean mapping with MapStruct Part 2
How to number (number) with html.erb
How to update with activerecord-import
Connect to MySQL 8 with Java
Grouping [RxJava] [1,1,2,2,3,3,1,1] like [[1,1], [2,2], [3,3], [1,1]] To do
Introduction to Spring Boot Part 1
Connect to oracle with eclipse!
Introduction to Java that can be understood even with Krillin (Part 1)