Dieses Tutorial zeigt Ihnen kurz, wie Sie in wenigen Minuten eine ** Apache Flink-Anwendung ** von Grund auf neu erstellen.
Apache Flink funktioniert unter Linux, Max OS X und Windows und ist kompatibel. Zum Entwickeln von Flink-Anwendungen entweder Java Version 8.0 oder höher oder Maven auf Ihrem Computer /install.html?spm=a2c65.11461447.0.0.29c056bf7J56YG) Sie müssen eine der Umgebungen ausführen. Wenn Sie die Java-Umgebung verwenden, werden durch Ausführen des Befehls "$ java Cversion" die von Ihnen verwendeten Versionsinformationen wie unten gezeigt ausgegeben.
java version "1.8.0_65"
Java(TM) SE Runtime Environment (build 1.8.0_65-b17)
Java HotSpot(TM) 64-Bit Server VM (build 25.65-b01, mixed mode)
Wenn Sie die Maven-Umgebung verwenden, werden beim Ausführen des Befehls $ mvn -version Versionsinformationen ausgegeben, die den folgenden ähnlich sind:
Apache Maven 3.5.4 (1edded0938998edf8bf061f1ceb3cfdeccf443fe; 2018-06-18T02:33:14+08:00)
Maven home: /Users/wuchong/dev/maven
Java version: 1.8.0_65, vendor: Oracle Corporation, runtime: /Library/Java/JavaVirtualMachines/jdk1.8.0_65.jdk/Contents/Home/jre
Default locale: zh_CN, platform encoding: UTF-8
OS name: "mac os x", version: "10.13.6", arch: "x86_64", family: "mac"
Verwenden Sie außerdem IntelliJ IDEA als IDE für Flink-Anwendungen (die Community-freie Version ist für dieses Tutorial ausreichend). Es wird empfohlen. Eclipse funktioniert ebenfalls für diesen Zweck, aber Eclipse hatte in der Vergangenheit Probleme mit Scala- und Java-Hybridprojekten, so Eclipse Es wird nicht empfohlen, auszuwählen.
Sie können die Schritte in diesem Abschnitt ausführen, um ein Flink-Projekt zu erstellen und in IntelliJ IDEA zu importieren. Verwenden Sie den Flink Maven-Archetyp, um die Projektstruktur und einige anfängliche Standardabhängigkeiten zu erstellen. Führen Sie in Ihrem Arbeitsverzeichnis den Befehl mvn archetype: generate
aus, um das Projekt zu erstellen.
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.6.1 \
-DgroupId=my-flink-project \
-DartifactId=my-flink-project \
-Dversion=0.1 \
-Dpackage=myflink \
-DinteractiveMode=false
Die obige Gruppen-ID, Artefakt-ID und das obige Paket können in einem beliebigen Pfad bearbeitet werden. Mit den oben genannten Parametern erstellt Maven automatisch eine Projektstruktur, die der folgenden ähnelt:
$ tree my-flink-project
my-flink-project
├── pom.xml
└── src
└── main
├── java
│ └── myflink
│ ├── BatchJob.java
│ └── StreamingJob.java
└── resources
└── log4j.properties
Die Datei pom.xml enthält bereits die erforderlichen Flink-Abhängigkeiten, und einige Beispielprogrammierframeworks finden Sie in src / main / java
.
Führen Sie nun die folgenden Schritte aus, um Ihr eigenes Flink-Programm zu erstellen. Starten Sie dazu IntelliJ IDEA, wählen Sie Projekt importieren und wählen Sie pom.xml im Stammverzeichnis von my-link-project. Importieren Sie dann das Projekt wie angewiesen.
Erstellen Sie eine SocketWindowWordCount.java
-Datei unter src / main / java / myflink
.
package myflink;
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
}
}
Im Moment ist dieses Programm nur ein grundlegendes Framework, daher werden wir den Code Schritt für Schritt ausfüllen. Bitte beachten Sie, dass die Importanweisung automatisch von der IDE hinzugefügt wird. Beachten Sie daher, dass sie unten nicht geschrieben wird. Am Ende dieses Abschnitts sehen Sie den vollständigen Code. Wenn Sie die folgenden Schritte überspringen möchten, fügen Sie den endgültigen vollständigen Code direkt in den Editor ein.
Der erste Schritt im Flink-Programm besteht darin, eine "StreamExecutionEnvironment" zu erstellen. Dies ist eine Eintragsklasse, die zum Festlegen von Parametern, Erstellen von Datenquellen, Übermitteln von Aufgaben usw. verwendet werden kann. Fügen wir es nun der Hauptfunktion hinzu.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Erstellen Sie als Nächstes eine Datenquelle, die Daten vom Socket am lokalen Port 9000 liest.
DataStream text = env.socketTextStream("localhost", 9000, "\n");
Dadurch wird ein Zeichenfolgentyp "DataStream" erstellt. DataStream
ist Flinks Kern-API für die Stream-Verarbeitung. Es definiert viele allgemeine Vorgänge (Filtern, Transformieren, Aggregieren, Fenster, Assoziationen usw.). In diesem Beispiel interessiert uns, wie oft jedes Wort in einem bestimmten Zeitfenster erscheint, beispielsweise in einem 5-Sekunden-Fenster. Zu diesem Zweck werden Zeichenfolgendaten zuerst in Wörter und deren Vorkommen analysiert (dargestellt durch "Tuple2 <Zeichenfolge, Ganzzahl>"), wobei das erste Feld das Wort und das zweite Feld das Wortvorkommen ist Es wird die Anzahl der Male sein. Der Anfangswert der Anzahl der Vorkommen wird auf 1 gesetzt. Da eine Datenreihe mehrere Wörter enthalten kann, wird "flatmap" zum Parsen implementiert.
DataStream> wordCounts = text
.flatMap(new FlatMapFunction>() {
@Override
public void flatMap(String value, Collector> out) {
for (String word : value.split("\\s")) {
out.collect(Tuple2.of(word, 1));
}
}
});
Gruppieren Sie dann die Datenströme basierend auf dem Wortfeld (dh dem Indexfeld 0). Hier verwenden wir die Methode "keyBy (int index)", um den Datenstrom "Tuple2 <String, Integer>" mit Wortschlüssel abzurufen. Geben Sie dann ein beliebiges Fenster im Stream an und berechnen Sie das Ergebnis anhand der Daten im Fenster. In diesem Beispiel werden Wortvorkommen alle 5 Sekunden aggregiert und jedes Fenster zählt von Grund auf neu.
DataStream> windowCounts = wordCounts
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
Das zweite .timeWindow ()
gibt ein Tumble-Fenster für 5 Sekunden an. Der dritte Aufruf gibt die Gesamtaggregationsfunktion für jeden Schlüssel und jedes Fenster an. In diesem Beispiel wird dies durch das Vorkommensfeld (dh das Indexfeld 1) hinzugefügt. Der resultierende Datenstrom gibt alle 5 Sekunden die Anzahl der Vorkommen jedes Wortes aus.
Schließlich gibt es den Datenstrom an die Konsole aus und startet die Ausführung.
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
Ein letzter Aufruf von "env.execute" ist erforderlich, um den eigentlichen Flink-Job zu starten. Alle Operator-Operationen (Quellenerstellung, Aggregation, Drucken usw.) erstellen nur ein Diagramm der internen Operator-Operationen. Nur wenn execute ()
aufgerufen wird, werden sie zur Ausführung an den Cluster oder den lokalen Computer gesendet.
package myflink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
// Create the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Obtain the input data by connecting to the socket. Here you want to connect to the local 9000 port. If 9000 port is not available, you'll need to change the port.
DataStream text = env.socketTextStream("localhost", 9000, "\n");
// Parse the data, group by word, and perform the window and aggregation operations.
DataStream> windowCounts = text
.flatMap(new FlatMapFunction>() {
@Override
public void flatMap(String value, Collector> out) {
for (String word : value.split("\\s")) {
out.collect(Tuple2.of(word, 1));
}
}
})
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
// Print the results to the console. Note that here single-threaded printed is used, rather than multi-threading.
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
}
Um das Beispielprogramm auszuführen, starten Sie NetCat auf dem Terminal und rufen Sie den Eingabestream ab.
nc -lk 9000
Unter Windows können Sie Ncat über [NMAP] installieren und ausführen (https://nmap.org/ncat/?spm=a2c65.11461447.0.0.29c056bf7J56YG).
ncat -lk 9000
Führen Sie dann die Hauptmethode von SocketWindowWordCount
direkt aus.
Geben Sie einfach ein Wort in die NetCat-Konsole ein und die Ausgabekonsole "SocketWindowWordCount" zeigt Statistiken darüber an, wie oft jedes Wort erscheint. Wenn Sie eine Anzahl von 1 oder mehr sehen möchten, geben Sie dasselbe Wort innerhalb von 5 Sekunden wiederholt ein.
Recommended Posts