[JAVA] Spring WebFlux Reactive Programming Implementation Sample (Basic)

Introduction

In the recent development of Web services, in order to improve the reusability of back-end data, there are an increasing number of cases where it is used by various clients by converting it into a microservice and providing it as a Web API. We will show you how to organize these backend APIs and external service APIs so that they are easy to handle on the frontend, and to reactively implement the intermediate layer (so-called BFF. Backends For Frontends) that controls them with WebFlux of Spring Boot 2.

Reactive ... what's good?

I won't go into details about "reactive programming" here as I'm familiar with other articles, but it makes it easy to implement ** non-blocking processing **.

In the previous Spring MVC, one thread is assigned to one request and processing is performed, so in the BFF (API aggregator layer) explained this time, an external API is called and waiting for a response ** "Wasteful waiting to do nothing Time "** will continue to block threads. Therefore, you need to spawn a thread each time you receive another request during processing.

On the other hand, WebFlux does not block the thread of the waiting time during API call (= non-blocking) and can use it for other processing, so you can handle requests efficiently with a small number of threads. ** **

However, reactive implementations are quirky and can be difficult to get used to. In this article, I will introduce some implementation patterns that are likely to be used as samples, so I hope it will help those who are thinking about starting from now on.

What to make with a sample

In this sample, we will create a BFF that collects and returns the backend Web API. Therefore, I prepared a simple backend that returns mock data quickly, so I will use it.

Please clone from the above, maven install, and then start the application. If you access http: // localhost: 8081 / categories and JSON is returned, it is successful.

Models and APIs handled in the sample

First, I will briefly touch on the back-end model handled by BFF. The back end provides the resources of a simple blog application with Web API, and has the following model configuration.

Blog model composition

ブログアプリER図.png

** It's a bit brute force [^ 1] **, but since APIs are prepared for each of these entities, let's see a reactive implementation for combining and returning to the front end.

[^ 1]: These entities, which would otherwise be relevant as domains, should also be provided by the backend as a single article API.

Development environment

Creating a project

For Intellij, follow Spring initializr,

It is OK if you select and create. (Lombok is not required, but it is recommended as it makes it easier to define and generate POJOs. Don't forget to install the lombok plugin)

Click here for the generated maven build file.

pom.xml


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.6.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>reactor</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>reactor</name>
    <description>Demo project for WebFlux</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

This BFF sample made this time is also on GitHub in a state where it can be executed, so please refer to it as well.

Now, let's get into the implementation description.

Reactive API call

In the past, it was common to use RestTemplate to call the Web API in Spring MVC. WebFlux supports non-blocking API calls by using WebClient.

The following is an implementation sample of WebClient. (Simplified, I use WebClient directly with @Service instead of @Repository)

java:com.example.reactor.service.CategoryService.java



@Service
public class CategoryService {

    private static final String URL_BACKENDS_ROOT = "http://localhost:8081";

    private final WebClient webClient = WebClient.create();

    public Flux<Category> findAll() {
        return webClient.get()
                .uri(URL_BACKENDS_ROOT + "/categories")
                .retrieve()
                .bodyToFlux(Category.class);
    }

    public Mono<Category> read(String categoryId) {
        return webClient.get()
                .uri(URL_BACKENDS_ROOT + "/categories/{categoryId}", categoryId)
                .retrieve()
                .bodyToMono(Category.class);
    }
}

It should be noted that the processing result of WebClient of findAll and read is wrapped with Flux <~> and Mono <~>.

――Represents multiple return values such as Flux <~>List <~>. [^ 2] --Mono <~>… Represents one return value.

[^ 2]: Flux <~> not only represents multiple return values, it also supports event-stream.

Let's see how the controller handles these Flux <~> and Mono <~> by pattern.

One-shot API call

First, let's look at an implementation that calls one API.

Processing image

activity_1.png

Implementation sample

java:com.example.reactor.controller.BlogController.java


@RestController
@RequestMapping("blog")
public class BlogController {

    @Autowired
    private CategoryService categoryService;

    @GetMapping("/categories")
    public Flux<Category> findCategories() {
        return categoryService.findAll();
    }
}

At first glance, it is a normal RestController, but since the return value of the controller is Flux <~>, it is searching the category list without blocking.

Multiple sequential API calls

Next, let's look at an implementation that calls the two APIs in sequence.

Processing image activity_1-1.png It is a flow to get a user from a user ID and get a list of article headers for that user.

Implementation sample

java:com.example.reactor.controller.BlogController.java


@RestController
@RequestMapping("blog")
public class BlogController {

    @Autowired
    private UserService userService;

    @Autowired
    private HeaderService headerService;

    @GetMapping("/headers/find-by-user/{userId}")
    public Mono<HeadersByUser> findHeadersByUserId(@NotNull @PathVariable final String userId) {
        return userService.read(userId)
                .flatMap(user -> headerService.findByUserId(user.getUserId())
                        .collectList()
                        .map(headers -> HeadersByUser.builder()
                                .user(user)
                                .headers(headers)
                                .build()));
    }
}

ʻUserService # read ()andHeaderService # findByUserId ()` are implemented to hit the API with WebClient like the previous CategoryService.

The point here is that when searching the article header list after acquiring the user, the sequential processing is connected with flatMap ().

return userService.read(userId)
    .flatMap(user -> /*Next reactive process(Flux/Mono) */ )

In this way, if the next process is a reactive process that also returns Flux / Mono, connect it withflatMap ().

On the contrary, instead of Flux / Mono, if you want to perform normal post-processing (refilling to the response model in the example), connect withmap ().

return headerService.findByUserId(user.getUserId())
    .collectList()
    .map(headers -> /*Next normal processing(Non-Flux/Mono) */ )

In the sample, in order to return the acquired user and article header together in one object, convert Flux <Header> to Mono <List <Header >> with collectList () and the following HeadersByUser is set in the model.

java:com.example.reactor.resource.HeadersByUser.java


@Data
@Builder
public class HeadersByUser implements Serializable {
    private User user;
    private List<Header> headers;
}

By connecting Mono <HeadersByUser> to the ** return value of the controller in this way, sequential processing will be executed for the first time **.

By the way, As mentioned above, the implementation of POJO is very simple because getters / setters are automatically generated by lombok's @ Data annotation. @Builder is easy to handle with a lambda expression because you can write the initial setting of the property at the time of instantiation in the method chain.

Instantiate while initializing the value in one line using lombok Builder


HeadersByUser model =
    HeadersByUser.builder().user(user).headers(headers).build();

There are other useful functions as well. For details, refer to this article.

Multiple parallel API calls

Now let's look at an implementation that calls two APIs in parallel and waits.

Processing image activity_2.png Get category information and article header list in parallel from category ID.

Implementation sample

java:com.example.reactor.controller.BlogController.java



@RestController
@RequestMapping("blog")
public class BlogController {

    @Autowired
    private CategoryService categoryService;

    @Autowired
    private HeaderService headerService;

    @GetMapping("/headers/find-by-category/{categoryId}")
    public Mono<HeadersWithCategory> findHeadersByCategoryId(@NotNull @PathVariable final String categoryId) {
        return Mono.zip(
                    categoryService.read(categoryId), // T1
                    headerService.findByCategoryId(categoryId).collectList() // T2
                )
                .map(tuple2 -> {
                    final Category category = tuple2.getT1();
                    final List<Header> headers = tuple2.getT2();
                    return HeadersWithCategory.builder()
                            .category(category)
                            .headers(headers)
                            .build();
                });
    }
}

CategoryService # read () and HeaderService # findByCategoryId () are implementations that use WebClient as usual and return Mono <Category> and Flux <Header>.

When processing in parallel,

Mono.zip(Mono processing 1,Mono processing 2,...)

Is used. And each reactive processing result can be obtained from the Tuple2 object.

    .map(tuple2 -> {
        final Category category = tuple2.getT1();
        final List<Header> headers = tuple2.getT2();
        ...

If the number of parallel processes increases to 3 or 4, the corresponding Tuples will also be Tuple3, Tuple4, etc., but the usage is the same. You can get the processing result by using getT1 () ... T5 () corresponding to the order specified in Mono.zip ().

In the sample, the model HeadersWithCategory that summarizes the acquired categories and the article header list is defined and returned together at the end of parallel processing.

java:com.example.reactor.resource.HeadersWithCategory.java



@Data
@Builder
public class HeadersWithCategory implements Serializable {
    private Category category;
    private List<Header> headers;
}

Sequential and parallel combination API calls

Finally, let's take a look at the implementation of API calls that combine sequential and parallel.

activity_1-2.png It is a flow to get the article header from the article ID, and then get the article content and comment list.

java:com.example.reactor.controller.BlogController.java


@RestController
@RequestMapping("blog")
public class BlogController {

    @Autowired
    private HeaderService headerService;

    @Autowired
    private BodyService bodyService;

    @Autowired
    private CommentService commentService;

    @GetMapping("/entries/{entryId}")
    public Mono<Entry> getEntry(@NotNull @PathVariable Long entryId) {
        return headerService.read(entryId)
                .flatMap(header -> Mono.zip(
                            bodyService.read(header.getEntryId()), // T1
                            commentService.findByEntryId(header.getEntryId()).collectList() // T2
                        )
                        .map(tuple2 -> {
                            final Body body = tuple2.getT1();
                            final List<Comment> comments = tuple2.getT2();
                            return Entry.builder()
                                    .header(header)
                                    .body(body)
                                    .comments(comments)
                                    .build();
                        })
                );
    }
}

There is no particular supplement. It is a straightforward combination of the conventional sequential and parallel processing.

The definition of the ʻEntry` model of the response is as follows.

java:com.example.reactor.resource.Entry.java


@Data
@Builder
public class Entry implements Serializable {
    public Header header;
    public Body body;
    public List<Comment> comments;
}

Check the operation of the sample you made

This BFF sample can be found at API Aggregator Sample (reactor): GitHub. After cloning and maven install, you can check the operation with the following endpoint by starting the application. (Of course, please also start backends)

sample end point
One-shot API call http://localhost:8080/blog/categories
Multiple sequential API calls http://localhost:8080/blog/headers/find-by-user/qiitaro
Multiple parallel API calls http://localhost:8080/blog/headers/find-by-category/java
Sequential and parallel combination API calls http://localhost:8080/blog/entries/1

Points to note

Finally, I will summarize the points to be aware of when implementing.

Do not use block ()

With block (), you get a familiar synchronous implementation like this: However, this implementation will block the asynchronous process and wait until the process returns.

    @GetMapping("/headers/find-by-user/{userId}")
    public Mono<HeadersByUser> findHeadersByUserId(@NotNull @PathVariable final String userId) {
        
        User user = userService.read(userId).block();
        
        List<Header> headers =
                headerService.findByUserId(user.getUserId())
                        .collectList()
                        .block();
        
        HeadersByUser response =
                HeadersByUser.builder()
                        .user(user)
                        .headers(headers)
                        .build();
        
        return Mono.just(response);
    }

Don't use WebFlux as it will give up the biggest advantage.

By the way, if the server is Netty, processing using block () is not supported, so an error will occur in the first place, Please note that it is available for Tomcat. (If you configure it according to the procedure in this article, it will be Netty)

Reactive processing is connected to the end (return value of Controller)

I intended to write by connecting the processes sequentially or in parallel, but when I try to move it, it may not work at all. In such a case, check whether the reactive process that returns Flux / Mono is properly connected to the subsequent process.

Even if there is no return value, it is necessary to return the type Mono <Void> to the return value of Controller and connect it.


This is the end of this basic edition. BFF as an API Aggregator can be created to some extent by the method so far, but it will be necessary to consider a little more for actual operation such as error handling, retry, session handling, etc.

I will explain about that in the next and subsequent entries. Thank you very much.

Recommended Posts

Spring WebFlux Reactive Programming Implementation Sample (Basic)
Basic Programming Resources