[JAVA] Comment créer une application Apache Flink à partir de zéro en 5 minutes

Ce tutoriel vous montrera brièvement comment créer une ** application Apache Flink ** à partir de zéro en quelques minutes.

Préparation de l'environnement de développement

Apache Flink fonctionne sur Linux, Max OS X et Windows et est compatible. Pour développer des applications Flink, soit Java version 8.0 ou ultérieure ou Maven sur votre ordinateur /install.html?spm=a2c65.11461447.0.0.29c056bf7J56YG) Vous devez exécuter l'un des environnements. Si vous utilisez l'environnement Java, l'exécution de la commande $ java Cversion affichera les informations de version que vous utilisez comme indiqué ci-dessous.

java version "1.8.0_65"
Java(TM) SE Runtime Environment (build 1.8.0_65-b17)
Java HotSpot(TM) 64-Bit Server VM (build 25.65-b01, mixed mode)

Si vous utilisez l'environnement Maven, l'exécution de la commande $ mvn -version produira des informations de version similaires à ce qui suit:

Apache Maven 3.5.4 (1edded0938998edf8bf061f1ceb3cfdeccf443fe; 2018-06-18T02:33:14+08:00)
Maven home: /Users/wuchong/dev/maven
Java version: 1.8.0_65, vendor: Oracle Corporation, runtime: /Library/Java/JavaVirtualMachines/jdk1.8.0_65.jdk/Contents/Home/jre
Default locale: zh_CN, platform encoding: UTF-8
OS name: "mac os x", version: "10.13.6", arch: "x86_64", family: "mac"

De plus, utilisez IntelliJ IDEA comme IDE pour les applications Flink (la version sans communauté est suffisante pour ce tutoriel). C'est recommandé. Eclipse fonctionne également dans ce but, mais Eclipse a eu des problèmes avec les projets hybrides Scala et Java dans le passé, donc Eclipse Il n'est pas recommandé de sélectionner.

Créer un projet Maven

Vous pouvez suivre les étapes de cette section pour créer un projet Flink et l'importer dans IntelliJ IDEA. Utilisez Flink Maven Archetype pour créer la structure du projet et certaines dépendances par défaut initiales. Dans votre répertoire de travail, exécutez la commande mvn archetype: generate pour créer le projet.

    -DarchetypeGroupId=org.apache.flink \
    -DarchetypeArtifactId=flink-quickstart-java \
    -DarchetypeVersion=1.6.1 \
    -DgroupId=my-flink-project \
    -DartifactId=my-flink-project \
    -Dversion=0.1 \
    -Dpackage=myflink \
    -DinteractiveMode=false

Le groupId, artifactId et le package ci-dessus peuvent être modifiés vers n'importe quel chemin. En utilisant les paramètres ci-dessus, Maven créera automatiquement une structure de projet similaire à la suivante:

$ tree my-flink-project
my-flink-project
├── pom.xml
└── src
    └── main
        ├── java
        │   └── myflink
        │       ├── BatchJob.java
        │       └── StreamingJob.java
        └── resources
            └── log4j.properties

Le fichier pom.xml contient déjà les dépendances Flink requises, et quelques exemples de frameworks de programmation sont fournis dans src / main / java.

Compilez le programme Flink

Maintenant, suivez les étapes ci-dessous pour créer votre propre programme Flink. Pour ce faire, lancez IntelliJ IDEA, sélectionnez Importer un projet et sélectionnez pom.xml sous le répertoire racine de my-link-project. Puis importez le projet comme indiqué.

Créez un fichier SocketWindowWordCount.java sous src / main / java / myflink.

package myflink;

public class SocketWindowWordCount {

    public static void main(String[] args) throws Exception {

    }
}

Pour l'instant, ce programme n'est qu'un framework de base, nous allons donc remplir le code étape par étape. Veuillez noter que la déclaration d'importation sera ajoutée automatiquement par l'EDI, veuillez donc noter qu'elle ne sera pas écrite ci-dessous. À la fin de cette section, vous verrez le code complété. Si vous souhaitez ignorer les étapes ci-dessous, collez le code complet final directement dans l'éditeur.

La première étape du programme Flink est de créer un StreamExecutionEnvironment. Il s'agit d'une classe d'entrée qui peut être utilisée pour définir des paramètres, créer des sources de données, soumettre des tâches, etc. Maintenant, ajoutons-le à la fonction principale.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Ensuite, créez une source de données qui lit les données du socket sur le port local 9000.

DataStream text = env.socketTextStream("localhost", 9000, "\n");

Cela créera un type de chaîne «DataStream». DataStream est l'API principale de Flink pour le traitement des flux. Il définit de nombreuses opérations courantes (filtrage, transformation, agrégation, fenêtres, associations, etc.). Dans cet exemple, nous nous intéressons au nombre de fois où chaque mot apparaît dans une fenêtre temporelle particulière, par exemple une fenêtre de 5 secondes. À cette fin, les données de chaîne sont d'abord analysées en mots et leurs occurrences (représentées par «Tuple2 <String, Integer>»), le premier champ étant le mot et le second champ étant l'occurrence du mot. Ce sera le nombre de fois. La valeur initiale du nombre d'occurrences est définie sur 1. Puisqu'il peut y avoir plusieurs mots dans une ligne de données, flatmap est implémenté pour l'analyse.

DataStream> wordCounts = text
                .flatMap(new FlatMapFunction>() {
                    @Override
                    public void flatMap(String value, Collector> out) {
                        for (String word : value.split("\\s")) {
                            out.collect(Tuple2.of(word, 1));
                        }
                    }
                });

Regroupez ensuite les flux de données en fonction du champ de mot (c'est-à-dire le champ d'index 0). Ici, nous utilisons la méthode keyBy (int index) pour obtenir le flux de données à clé de mot Tuple2 <String, Integer>. Spécifiez ensuite n'importe quelle fenêtre du flux et calculez le résultat en fonction des données de la fenêtre. Dans cet exemple, les occurrences de mots sont agrégées toutes les 5 secondes et chaque fenêtre compte à partir de zéro.

DataStream> windowCounts = wordCounts
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1);

Le second .timeWindow () spécifie une fenêtre de basculement pendant 5 secondes. Le troisième appel spécifie la fonction d'agrégation totale pour chaque touche et chaque fenêtre. Dans cet exemple, cela est ajouté par le champ occurrences (c'est-à-dire le champ d'index 1). Le flux de données résultant imprime le nombre d'occurrences de chaque mot toutes les 5 secondes.

Enfin, il sort le flux de données vers la console et démarre l'exécution.

windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");

Un dernier appel ʻenv.executeest requis pour démarrer le travail Flink réel. Toutes les opérations de l'Opérateur (création de source, agrégation, impression, etc.) ne construisent qu'un graphique des opérations internes de l'Opérateur. Ce n'est que lorsque ʻexecute ()est appelé qu'ils seront envoyés au cluster ou à l'ordinateur local pour exécution.

package myflink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class SocketWindowWordCount {

    public static void main(String[] args) throws Exception {

        // Create the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Obtain the input data by connecting to the socket. Here you want to connect to the local 9000 port. If 9000 port is not available, you'll need to change the port.
        DataStream text = env.socketTextStream("localhost", 9000, "\n");

        // Parse the data, group by word, and perform the window and aggregation operations.
        DataStream> windowCounts = text
                .flatMap(new FlatMapFunction>() {
                    @Override
                    public void flatMap(String value, Collector> out) {
                        for (String word : value.split("\\s")) {
                            out.collect(Tuple2.of(word, 1));
                        }
                    }
                })
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1);

        // Print the results to the console. Note that here single-threaded printed is used, rather than multi-threading.
        windowCounts.print().setParallelism(1);

        env.execute("Socket Window WordCount");
    }
}

Exécution du programme

Pour exécuter l'exemple de programme, démarrez NetCat sur le terminal et récupérez le flux d'entrée.

nc -lk 9000

Pour Windows, vous pouvez installer et exécuter Ncat via NMAP.

ncat -lk 9000

Puis exécutez directement la méthode principale de SocketWindowWordCount.

Tapez simplement un mot dans la console NetCat et la console de sortie SocketWindowWordCount affichera des statistiques sur la fréquence à laquelle chaque mot apparaît. Si vous voulez voir un nombre de 1 ou plus, entrez le même mot à plusieurs reprises dans les 5 secondes.

image.png

Recommended Posts

Comment créer une application Apache Flink à partir de zéro en 5 minutes
Comment créer un fichier exécutable dans Maven
J'ai essayé de faire une demande en 3 mois d'inexpérimenté
Que s'est-il passé dans «Java 8 to Java 11» et comment créer un environnement
Étapes pour publier une application sur Heroku
Comprendre en 5 minutes !! Comment utiliser Docker
Comment obtenir une classe depuis Element en Java
Comment résoudre les problèmes d'expression en Java
J'ai essayé de développer une application en 2 langues
[Rails] Comment créer un environnement avec Docker
Comment créer la blockchain la plus simple de Ruby
Comment démarrer un indice à partir d'un nombre arbitraire dans le traitement itératif Ruby
Comment rendre une image partiellement transparente avec le traitement
Comment implémenter la connexion invité en 5 minutes sur le portefeuille de rails
Comment installer Docker dans l'environnement local d'une application Rails existante [Rails 6 / MySQL 8]
Comprendre comment utiliser le décodeur JSON de Swift en 3 minutes
Comment construire android-midi-lib
[Rails] Comment afficher les images dans la vue
Comment passer un objet à Mapper dans MyBatis sans passer par un argument
Comment créer un serveur d'applications sur une instance EC2 d'AWS
Comment changer une chaîne dans un tableau en un nombre dans Ruby
Comment récupérer la valeur de hachage dans un tableau dans Ruby
Comment déployer une application Java sur Alibaba Cloud EDAS dans Eclipse
Comment installer une application Web pour chaque langue dans Nginx
Comment surveiller les informations d'application en temps réel à l'aide de JConsole
[Code de test d'intégration] Comment sélectionner un élément dans date_select
Comment utiliser Apache POI
Comment gérer les instances
Comment empêcher la saisie de dates passées dans les formulaires Rails
[Ruby] Comment compter les nombres pairs ou impairs dans un tableau
Comment afficher la valeur lorsqu'il y a un tableau dans le tableau
Comment publier une application à l'aide de la construction d'environnement d'instance AWS (3) EC2
Comment installer le langage utilisé dans Ubuntu et comment créer l'environnement
Comment obtenir et ajouter des données depuis Firebase Firestore dans Ruby
Comment obtenir la longueur d'un fichier audio avec Java
Comment utiliser Lombok au printemps
Comment trouver May'n dans XPath
Comment masquer la barre de défilement dans WebView
Comment exécuter JUnit dans Eclipse
Comment itérer indéfiniment en Ruby
Comment maîtriser la programmation en 3 mois
Comment obtenir des paramètres dans Spark
Comment insérer une bibliothèque externe
Comment utiliser InjectorHolder dans OpenAM
Comment installer jQuery dans Rails 6
Comment nommer des variables en Java
Comment définir Lombok dans Eclipse
Comment créer CloudStack à l'aide de Docker
Comment concaténer des chaînes avec Java
Comment installer Swiper in Rails
Comment passer du HTML à Haml