[JAVA] Beispiel für die Implementierung der reaktiven WebFlux-Programmierung (Basis)

Einführung

In der jüngsten Entwicklung von Webdiensten, um die Wiederverwendbarkeit von Back-End-Daten zu verbessern, werden diese zunehmend von verschiedenen Clients verwendet, indem sie in einen Mikrodienst konvertiert und als Web-API bereitgestellt werden. Wir zeigen Ihnen, wie Sie diese Backend-APIs und externen Service-APIs so organisieren, dass sie im Frontend einfach zu handhaben sind, und die mittlere Ebene (sogenannte BFF. Backends für Frontends), die sie mit WebFlux von Spring Boot 2 steuert, reaktiv implementieren.

Reaktiv ... was ist gut?

Die Details der "reaktiven Programmierung" werden hier nicht detailliert beschrieben, wie sie in anderen Artikeln beschrieben werden, aber es kann ** leicht eine nicht blockierende Verarbeitung implementieren **.

In der vorherigen Spring MVC wurde einer Anforderung zur Verarbeitung ein Thread zugewiesen. In der diesmal erläuterten BFF (API Aggregator Layer) wird eine externe API aufgerufen und wartet auf eine Antwort. ** "Warten auf nichts" Die Zeit "** blockiert weiterhin Threads. Daher müssen Sie jedes Mal einen Thread erzeugen, wenn Sie während der Verarbeitung eine weitere Anforderung erhalten.

Andererseits blockiert WebFlux den Thread der Wartezeit während des API-Aufrufs nicht (= nicht blockierend) und kann einen anderen Prozess ausführen, sodass Sie Anforderungen mit einer kleinen Anzahl von Threads effizient bearbeiten können. ** ** **

Die reaktive Implementierung ist jedoch eigenartig und es kann schwierig sein, sich daran zu gewöhnen. In diesem Artikel werde ich einige Implementierungsmuster vorstellen, die wahrscheinlich als Beispiele verwendet werden. Ich hoffe, dass dies denjenigen helfen wird, die von nun an daran denken, anzufangen.

Was mit einer Probe zu machen

In diesem Beispiel erstellen wir eine BFF, die die Backend-Web-API sammelt und zurückgibt. Aus diesem Grund habe ich ein einfaches Backend vorbereitet, das Scheindaten schnell zurückgibt, sodass ich es verwenden werde.

Klonen Sie von oben, installieren Sie maven und starten Sie die Anwendung. Wenn Sie auf "http: // localhost: 8081 / category" zugreifen und JSON zurückgegeben wird, ist dies erfolgreich.

Im Beispiel behandelte Modelle und APIs

Zunächst werde ich kurz auf die von BFF behandelten Back-End-Modelle eingehen. Das Back-End bietet Ressourcen für eine einfache Blog-App über die Web-API und verfügt über die folgende Modellkonfiguration.

Zusammensetzung des Blogmodells

ブログアプリER図.png

** Es ist ein bisschen Brute Force [^ 1] **, aber da APIs für jede dieser Entitäten bereitgestellt werden, schauen wir uns eine reaktive Implementierung zum Kombinieren und Zurückkehren zum Frontend an.

[^ 1]: Diese Entitäten, die ansonsten als Domänen relevant wären, sollten vom Backend auch als einzelne Artikel-API bereitgestellt werden.

Entwicklungsumgebung

Ein Projekt erstellen

Folgen Sie für Intellij Spring initializr,

Es ist in Ordnung, wenn Sie auswählen und erstellen. (Lombok ist nicht erforderlich, wird jedoch empfohlen, da es das Definieren und Generieren von POJO erleichtert. Vergessen Sie nicht, das Lombok-Plugin zu installieren.)

Klicken Sie hier für die generierte Maven-Build-Datei.

pom.xml


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.6.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>reactor</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>reactor</name>
    <description>Demo project for WebFlux</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

Dieses diesmal erstellte BFF-Beispiel befindet sich ebenfalls in einem Zustand, in dem es ausgeführt werden kann. Bitte beziehen Sie sich auch darauf.

Kommen wir nun zur Implementierungsbeschreibung.

Reaktiver API-Aufruf

Bisher war es üblich, "RestTemplate" zum Aufrufen der Web-API in Spring MVC zu verwenden. WebFlux unterstützt nicht blockierende API-Aufrufe mithilfe von "WebClient".

Das Folgende ist ein Implementierungsbeispiel für WebClient. (Vereinfacht gesagt verwende ich WebClient direkt mit @Service anstelle von @Repository.)

java:com.example.reactor.service.CategoryService.java



@Service
public class CategoryService {

    private static final String URL_BACKENDS_ROOT = "http://localhost:8081";

    private final WebClient webClient = WebClient.create();

    public Flux<Category> findAll() {
        return webClient.get()
                .uri(URL_BACKENDS_ROOT + "/categories")
                .retrieve()
                .bodyToFlux(Category.class);
    }

    public Mono<Category> read(String categoryId) {
        return webClient.get()
                .uri(URL_BACKENDS_ROOT + "/categories/{categoryId}", categoryId)
                .retrieve()
                .bodyToMono(Category.class);
    }
}

Es ist zu beachten, dass das Verarbeitungsergebnis von WebClient von "findAll" und "read" mit "Flux <~>" und "Mono <~>" umbrochen wird.

――Stellt mehrere Rückgabewerte dar, z. B. Flux <~>List <~>. [^ 2] --Mono <~> … Repräsentiert einen Rückgabewert.

[^ 2]: Flux <~> repräsentiert nicht nur mehrere Rückgabewerte, sondern unterstützt auch Event-Stream.

Mal sehen, wie der Controller diese "Flux <~>" und "Mono <~>" nach Muster behandelt.

Einzelner API-Aufruf

Schauen wir uns zunächst eine Implementierung an, die eine API aufruft.

Bild verarbeiten

activity_1.png

Implementierungsbeispiel

java:com.example.reactor.controller.BlogController.java


@RestController
@RequestMapping("blog")
public class BlogController {

    @Autowired
    private CategoryService categoryService;

    @GetMapping("/categories")
    public Flux<Category> findCategories() {
        return categoryService.findAll();
    }
}

Auf den ersten Blick ist es ein normaler RestController, aber da der Rückgabewert des Controllers "Flux <~>" ist, durchsucht er die Kategorieliste ohne zu blockieren.

Mehrere aufeinanderfolgende API-Aufrufe

Schauen wir uns als nächstes eine Implementierung an, die die beiden APIs nacheinander aufruft.

Bild verarbeiten activity_1-1.png Es ist ein Ablauf, einen Benutzer von einer Benutzer-ID abzurufen und eine Liste der Artikelüberschriften für diesen Benutzer abzurufen.

Implementierungsbeispiel

java:com.example.reactor.controller.BlogController.java


@RestController
@RequestMapping("blog")
public class BlogController {

    @Autowired
    private UserService userService;

    @Autowired
    private HeaderService headerService;

    @GetMapping("/headers/find-by-user/{userId}")
    public Mono<HeadersByUser> findHeadersByUserId(@NotNull @PathVariable final String userId) {
        return userService.read(userId)
                .flatMap(user -> headerService.findByUserId(user.getUserId())
                        .collectList()
                        .map(headers -> HeadersByUser.builder()
                                .user(user)
                                .headers(headers)
                                .build()));
    }
}

UserService # read () und HeaderService # findByUserId () werden implementiert, indem die API mit WebClient wie im vorherigen CategoryService aufgerufen wird.

Der Punkt hier ist, dass beim Durchsuchen der Artikelkopfliste nach dem Erwerb eines Benutzers die sequentielle Verarbeitung mit "flatMap ()" verbunden ist.

return userService.read(userId)
    .flatMap(user -> /*Nächster reaktiver Prozess(Flux/Mono) */ )

Wenn der nächste Prozess ein reaktiver Prozess ist, der auch "Flux / Mono" zurückgibt, verbinden Sie ihn auf diese Weise mit "flatMap ()".

Im Gegenteil, wenn Sie anstelle von "Flux / Mono" eine normale Nachbearbeitung durchführen möchten (im Beispiel das Antwortmodell nachfüllen), verbinden Sie sich mit "map ()".

return headerService.findByUserId(user.getUserId())
    .collectList()
    .map(headers -> /*Nächste normale Verarbeitung(Nicht-Flussmittel/Mono) */ )

Um den erfassten Benutzer- und Artikelheader zusammen in einem Objekt zurückzugeben, konvertieren Sie im Beispiel "Flux

" mit "collectList ()" und dem folgenden "in" Mono <List
> " HeadersByUser` wird im Modell festgelegt.

java:com.example.reactor.resource.HeadersByUser.java


@Data
@Builder
public class HeadersByUser implements Serializable {
    private User user;
    private List<Header> headers;
}

Indem Sie auf diese Weise Mono <HeadersByUser> mit dem ** Rückgabewert des Controllers verbinden, wird zum ersten Mal eine sequentielle Verarbeitung ausgeführt **.

Apropos, Wie oben erwähnt, ist die Implementierung von POJO sehr einfach, da Getter / Setter automatisch durch die @ Data-Annotation von lombok generiert werden. @ Builder ist mit Lambda-Ausdrücken einfach zu handhaben, da Sie die anfänglichen Einstellungen von Eigenschaften zum Zeitpunkt der Instanziierung in die Methodenkette schreiben können.

Instanziieren Sie mit lomboks Builder, während Sie den Wert in einer Zeile initialisieren


HeadersByUser model =
    HeadersByUser.builder().user(user).headers(headers).build();

Es gibt auch andere nützliche Funktionen. Weitere Informationen finden Sie unter Dieser Artikel.

Mehrere parallele API-Aufrufe

Schauen wir uns nun eine Implementierung an, die zwei APIs parallel aufruft und wartet.

Bild verarbeiten activity_2.png Ruft parallel zur Kategorie-ID die Kategorieinformationen und die Artikelkopfliste ab.

Implementierungsbeispiel

java:com.example.reactor.controller.BlogController.java



@RestController
@RequestMapping("blog")
public class BlogController {

    @Autowired
    private CategoryService categoryService;

    @Autowired
    private HeaderService headerService;

    @GetMapping("/headers/find-by-category/{categoryId}")
    public Mono<HeadersWithCategory> findHeadersByCategoryId(@NotNull @PathVariable final String categoryId) {
        return Mono.zip(
                    categoryService.read(categoryId), // T1
                    headerService.findByCategoryId(categoryId).collectList() // T2
                )
                .map(tuple2 -> {
                    final Category category = tuple2.getT1();
                    final List<Header> headers = tuple2.getT2();
                    return HeadersWithCategory.builder()
                            .category(category)
                            .headers(headers)
                            .build();
                });
    }
}

CategoryService # read () und HeaderService # findByCategoryId () sind Implementierungen, die WebClient wie gewohnt verwenden und Mono <Category> und Flux <Header> zurückgeben.

Bei paralleler Verarbeitung

Mono.zip(Monoverarbeitung 1,Monoverarbeitung 2,...)

Wird genutzt. Und jedes reaktive Verarbeitungsergebnis kann aus dem "Tuple2" -Objekt erhalten werden.

    .map(tuple2 -> {
        final Category category = tuple2.getT1();
        final List<Header> headers = tuple2.getT2();
        ...

Wenn die Anzahl der parallelen Prozesse auf drei oder vier steigt, sind die entsprechenden Tupel ebenfalls "Tuple3", "Tupel4" usw., aber die Verwendung ist dieselbe. Sie können das Verarbeitungsergebnis erhalten, indem Sie "getT1 () ... T5 ()" verwenden, das der in "Mono.zip ()" angegebenen Reihenfolge entspricht.

Im Beispiel wird das Modell "HeadersWithCategory", das die erfassten Kategorien und die Liste der Artikelüberschriften zusammenfasst, definiert und am Ende der Parallelverarbeitung zusammen zurückgegeben.

java:com.example.reactor.resource.HeadersWithCategory.java



@Data
@Builder
public class HeadersWithCategory implements Serializable {
    private Category category;
    private List<Header> headers;
}

Sequentieller und paralleler Kombinations-API-Aufruf

Lassen Sie uns abschließend einen Blick auf die Implementierung von API-Aufrufen werfen, die sequentiell und parallel kombiniert werden.

activity_1-2.png Es ist ein Ablauf, den Artikelkopf aus der Artikel-ID abzurufen und dann den Artikelinhalt und die Kommentarliste abzurufen.

java:com.example.reactor.controller.BlogController.java


@RestController
@RequestMapping("blog")
public class BlogController {

    @Autowired
    private HeaderService headerService;

    @Autowired
    private BodyService bodyService;

    @Autowired
    private CommentService commentService;

    @GetMapping("/entries/{entryId}")
    public Mono<Entry> getEntry(@NotNull @PathVariable Long entryId) {
        return headerService.read(entryId)
                .flatMap(header -> Mono.zip(
                            bodyService.read(header.getEntryId()), // T1
                            commentService.findByEntryId(header.getEntryId()).collectList() // T2
                        )
                        .map(tuple2 -> {
                            final Body body = tuple2.getT1();
                            final List<Comment> comments = tuple2.getT2();
                            return Entry.builder()
                                    .header(header)
                                    .body(body)
                                    .comments(comments)
                                    .build();
                        })
                );
    }
}

Es gibt keine besondere Ergänzung. Es ist eine einfache Kombination der herkömmlichen sequentiellen und parallelen Verarbeitung.

Die Definition des "Entry" -Modells der Antwort lautet wie folgt.

java:com.example.reactor.resource.Entry.java


@Data
@Builder
public class Entry implements Serializable {
    public Header header;
    public Body body;
    public List<Comment> comments;
}

Überprüfen Sie die Funktion der von Ihnen erstellten Probe

Diese BFF-Probe finden Sie unter API Aggregator Sample (Reaktor): GitHub. Nach dem Klonen und der Installation von Maven können Sie den Vorgang mit dem folgenden Endpunkt überprüfen, indem Sie die Anwendung starten. (Natürlich starten Sie auch Backends)

Stichprobe Endpunkt
Einzelner API-Aufruf http://localhost:8080/blog/categories
Mehrere aufeinanderfolgende API-Aufrufe http://localhost:8080/blog/headers/find-by-user/qiitaro
Mehrere parallele API-Aufrufe http://localhost:8080/blog/headers/find-by-category/java
Sequentieller und paralleler Kombinations-API-Aufruf http://localhost:8080/blog/entries/1

Zu beachtende Punkte

Abschließend fasse ich die Punkte zusammen, die bei der Implementierung zu beachten sind.

Verwenden Sie nicht block ()

Mit block () erhalten Sie eine vertraute synchrone Implementierung wie folgt: Diese Implementierung blockiert jedoch und wartet auf die Rückkehr der Verarbeitung zur asynchronen Verarbeitung.

    @GetMapping("/headers/find-by-user/{userId}")
    public Mono<HeadersByUser> findHeadersByUserId(@NotNull @PathVariable final String userId) {
        
        User user = userService.read(userId).block();
        
        List<Header> headers =
                headerService.findByUserId(user.getUserId())
                        .collectList()
                        .block();
        
        HeadersByUser response =
                HeadersByUser.builder()
                        .user(user)
                        .headers(headers)
                        .build();
        
        return Mono.just(response);
    }

Verwenden Sie WebFlux nicht, da dies den größten Vorteil aufgibt.

Wenn der Server Netty ist, wird die Verarbeitung mit block () übrigens nicht unterstützt, sodass zunächst ein Fehler auftritt. Bitte beachten Sie, dass es für Tomcat verfügbar ist. (Wenn Sie es gemäß dem in diesem Artikel beschriebenen Verfahren konfigurieren, ist es Netty.)

Die reaktive Verarbeitung ist mit dem Ende verbunden (Rückgabewert des Controllers)

Ich wollte schreiben, indem ich die Prozesse nacheinander oder parallel verbinde, aber wenn ich versuche, sie zu verschieben, funktioniert sie möglicherweise überhaupt nicht. Überprüfen Sie in einem solchen Fall, ob der reaktive Prozess, der "Flux / Mono" zurückgibt, ordnungsgemäß mit dem nachfolgenden Prozess verbunden ist.

Auch wenn es keinen Rückgabewert gibt, muss der Typ "Mono " auf den Rückgabewert des Controllers zurückgesetzt und verbunden werden.


Dies ist das Ende dieser Basisausgabe. BFF als API-Aggregator kann in gewissem Umfang mit der oben beschriebenen Methode erstellt werden, es ist jedoch erforderlich, für den tatsächlichen Betrieb etwas mehr zu berücksichtigen, z. B. Fehlerbehandlung, Wiederholungsversuch und Sitzungsbehandlung.

Ich werde das in den nächsten und nachfolgenden Einträgen erklären. Vielen Dank.

Recommended Posts