Ce tutoriel vous montrera brièvement comment créer une ** application Apache Flink ** à partir de zéro en quelques minutes.
Apache Flink fonctionne sur Linux, Max OS X et Windows et est compatible. Pour développer des applications Flink, soit Java version 8.0 ou ultérieure ou Maven sur votre ordinateur /install.html?spm=a2c65.11461447.0.0.29c056bf7J56YG) Vous devez exécuter l'un des environnements. Si vous utilisez l'environnement Java, l'exécution de la commande $ java Cversion
affichera les informations de version que vous utilisez comme indiqué ci-dessous.
java version "1.8.0_65"
Java(TM) SE Runtime Environment (build 1.8.0_65-b17)
Java HotSpot(TM) 64-Bit Server VM (build 25.65-b01, mixed mode)
Si vous utilisez l'environnement Maven, l'exécution de la commande $ mvn -version produira des informations de version similaires à ce qui suit:
Apache Maven 3.5.4 (1edded0938998edf8bf061f1ceb3cfdeccf443fe; 2018-06-18T02:33:14+08:00)
Maven home: /Users/wuchong/dev/maven
Java version: 1.8.0_65, vendor: Oracle Corporation, runtime: /Library/Java/JavaVirtualMachines/jdk1.8.0_65.jdk/Contents/Home/jre
Default locale: zh_CN, platform encoding: UTF-8
OS name: "mac os x", version: "10.13.6", arch: "x86_64", family: "mac"
De plus, utilisez IntelliJ IDEA comme IDE pour les applications Flink (la version sans communauté est suffisante pour ce tutoriel). C'est recommandé. Eclipse fonctionne également dans ce but, mais Eclipse a eu des problèmes avec les projets hybrides Scala et Java dans le passé, donc Eclipse Il n'est pas recommandé de sélectionner.
Vous pouvez suivre les étapes de cette section pour créer un projet Flink et l'importer dans IntelliJ IDEA. Utilisez Flink Maven Archetype pour créer la structure du projet et certaines dépendances par défaut initiales. Dans votre répertoire de travail, exécutez la commande mvn archetype: generate
pour créer le projet.
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.6.1 \
-DgroupId=my-flink-project \
-DartifactId=my-flink-project \
-Dversion=0.1 \
-Dpackage=myflink \
-DinteractiveMode=false
Le groupId, artifactId et le package ci-dessus peuvent être modifiés vers n'importe quel chemin. En utilisant les paramètres ci-dessus, Maven créera automatiquement une structure de projet similaire à la suivante:
$ tree my-flink-project
my-flink-project
├── pom.xml
└── src
└── main
├── java
│ └── myflink
│ ├── BatchJob.java
│ └── StreamingJob.java
└── resources
└── log4j.properties
Le fichier pom.xml contient déjà les dépendances Flink requises, et quelques exemples de frameworks de programmation sont fournis dans src / main / java
.
Maintenant, suivez les étapes ci-dessous pour créer votre propre programme Flink. Pour ce faire, lancez IntelliJ IDEA, sélectionnez Importer un projet et sélectionnez pom.xml sous le répertoire racine de my-link-project. Puis importez le projet comme indiqué.
Créez un fichier SocketWindowWordCount.java
sous src / main / java / myflink
.
package myflink;
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
}
}
Pour l'instant, ce programme n'est qu'un framework de base, nous allons donc remplir le code étape par étape. Veuillez noter que la déclaration d'importation sera ajoutée automatiquement par l'EDI, veuillez donc noter qu'elle ne sera pas écrite ci-dessous. À la fin de cette section, vous verrez le code complété. Si vous souhaitez ignorer les étapes ci-dessous, collez le code complet final directement dans l'éditeur.
La première étape du programme Flink est de créer un StreamExecutionEnvironment
. Il s'agit d'une classe d'entrée qui peut être utilisée pour définir des paramètres, créer des sources de données, soumettre des tâches, etc. Maintenant, ajoutons-le à la fonction principale.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Ensuite, créez une source de données qui lit les données du socket sur le port local 9000.
DataStream text = env.socketTextStream("localhost", 9000, "\n");
Cela créera un type de chaîne «DataStream». DataStream
est l'API principale de Flink pour le traitement des flux. Il définit de nombreuses opérations courantes (filtrage, transformation, agrégation, fenêtres, associations, etc.). Dans cet exemple, nous nous intéressons au nombre de fois où chaque mot apparaît dans une fenêtre temporelle particulière, par exemple une fenêtre de 5 secondes. À cette fin, les données de chaîne sont d'abord analysées en mots et leurs occurrences (représentées par «Tuple2 <String, Integer>»), le premier champ étant le mot et le second champ étant l'occurrence du mot. Ce sera le nombre de fois. La valeur initiale du nombre d'occurrences est définie sur 1. Puisqu'il peut y avoir plusieurs mots dans une ligne de données, flatmap
est implémenté pour l'analyse.
DataStream> wordCounts = text
.flatMap(new FlatMapFunction>() {
@Override
public void flatMap(String value, Collector> out) {
for (String word : value.split("\\s")) {
out.collect(Tuple2.of(word, 1));
}
}
});
Regroupez ensuite les flux de données en fonction du champ de mot (c'est-à-dire le champ d'index 0). Ici, nous utilisons la méthode keyBy (int index)
pour obtenir le flux de données à clé de mot Tuple2 <String, Integer>
. Spécifiez ensuite n'importe quelle fenêtre du flux et calculez le résultat en fonction des données de la fenêtre. Dans cet exemple, les occurrences de mots sont agrégées toutes les 5 secondes et chaque fenêtre compte à partir de zéro.
DataStream> windowCounts = wordCounts
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
Le second .timeWindow ()
spécifie une fenêtre de basculement pendant 5 secondes. Le troisième appel spécifie la fonction d'agrégation totale pour chaque touche et chaque fenêtre. Dans cet exemple, cela est ajouté par le champ occurrences (c'est-à-dire le champ d'index 1). Le flux de données résultant imprime le nombre d'occurrences de chaque mot toutes les 5 secondes.
Enfin, il sort le flux de données vers la console et démarre l'exécution.
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
Un dernier appel ʻenv.executeest requis pour démarrer le travail Flink réel. Toutes les opérations de l'Opérateur (création de source, agrégation, impression, etc.) ne construisent qu'un graphique des opérations internes de l'Opérateur. Ce n'est que lorsque ʻexecute ()
est appelé qu'ils seront envoyés au cluster ou à l'ordinateur local pour exécution.
package myflink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
// Create the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Obtain the input data by connecting to the socket. Here you want to connect to the local 9000 port. If 9000 port is not available, you'll need to change the port.
DataStream text = env.socketTextStream("localhost", 9000, "\n");
// Parse the data, group by word, and perform the window and aggregation operations.
DataStream> windowCounts = text
.flatMap(new FlatMapFunction>() {
@Override
public void flatMap(String value, Collector> out) {
for (String word : value.split("\\s")) {
out.collect(Tuple2.of(word, 1));
}
}
})
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
// Print the results to the console. Note that here single-threaded printed is used, rather than multi-threading.
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
}
Pour exécuter l'exemple de programme, démarrez NetCat sur le terminal et récupérez le flux d'entrée.
nc -lk 9000
Pour Windows, vous pouvez installer et exécuter Ncat via NMAP.
ncat -lk 9000
Puis exécutez directement la méthode principale de SocketWindowWordCount
.
Tapez simplement un mot dans la console NetCat et la console de sortie SocketWindowWordCount
affichera des statistiques sur la fréquence à laquelle chaque mot apparaît. Si vous voulez voir un nombre de 1 ou plus, entrez le même mot à plusieurs reprises dans les 5 secondes.
Recommended Posts