[JAVA] So erstellen Sie eine Apache Flink-Anwendung in 5 Minuten von Grund auf neu

Dieses Tutorial zeigt Ihnen kurz, wie Sie in wenigen Minuten eine ** Apache Flink-Anwendung ** von Grund auf neu erstellen.

Vorbereitung der Entwicklungsumgebung

Apache Flink funktioniert unter Linux, Max OS X und Windows und ist kompatibel. Zum Entwickeln von Flink-Anwendungen entweder Java Version 8.0 oder höher oder Maven auf Ihrem Computer /install.html?spm=a2c65.11461447.0.0.29c056bf7J56YG) Sie müssen eine der Umgebungen ausführen. Wenn Sie die Java-Umgebung verwenden, werden durch Ausführen des Befehls "$ java Cversion" die von Ihnen verwendeten Versionsinformationen wie unten gezeigt ausgegeben.

java version "1.8.0_65"
Java(TM) SE Runtime Environment (build 1.8.0_65-b17)
Java HotSpot(TM) 64-Bit Server VM (build 25.65-b01, mixed mode)

Wenn Sie die Maven-Umgebung verwenden, werden beim Ausführen des Befehls $ mvn -version Versionsinformationen ausgegeben, die den folgenden ähnlich sind:

Apache Maven 3.5.4 (1edded0938998edf8bf061f1ceb3cfdeccf443fe; 2018-06-18T02:33:14+08:00)
Maven home: /Users/wuchong/dev/maven
Java version: 1.8.0_65, vendor: Oracle Corporation, runtime: /Library/Java/JavaVirtualMachines/jdk1.8.0_65.jdk/Contents/Home/jre
Default locale: zh_CN, platform encoding: UTF-8
OS name: "mac os x", version: "10.13.6", arch: "x86_64", family: "mac"

Verwenden Sie außerdem IntelliJ IDEA als IDE für Flink-Anwendungen (die Community-freie Version ist für dieses Tutorial ausreichend). Es wird empfohlen. Eclipse funktioniert ebenfalls für diesen Zweck, aber Eclipse hatte in der Vergangenheit Probleme mit Scala- und Java-Hybridprojekten, so Eclipse Es wird nicht empfohlen, auszuwählen.

Erstellen Sie ein Maven-Projekt

Sie können die Schritte in diesem Abschnitt ausführen, um ein Flink-Projekt zu erstellen und in IntelliJ IDEA zu importieren. Verwenden Sie den Flink Maven-Archetyp, um die Projektstruktur und einige anfängliche Standardabhängigkeiten zu erstellen. Führen Sie in Ihrem Arbeitsverzeichnis den Befehl mvn archetype: generate aus, um das Projekt zu erstellen.

    -DarchetypeGroupId=org.apache.flink \
    -DarchetypeArtifactId=flink-quickstart-java \
    -DarchetypeVersion=1.6.1 \
    -DgroupId=my-flink-project \
    -DartifactId=my-flink-project \
    -Dversion=0.1 \
    -Dpackage=myflink \
    -DinteractiveMode=false

Die obige Gruppen-ID, Artefakt-ID und das obige Paket können in einem beliebigen Pfad bearbeitet werden. Mit den oben genannten Parametern erstellt Maven automatisch eine Projektstruktur, die der folgenden ähnelt:

$ tree my-flink-project
my-flink-project
├── pom.xml
└── src
    └── main
        ├── java
        │   └── myflink
        │       ├── BatchJob.java
        │       └── StreamingJob.java
        └── resources
            └── log4j.properties

Die Datei pom.xml enthält bereits die erforderlichen Flink-Abhängigkeiten, und einige Beispielprogrammierframeworks finden Sie in src / main / java.

Kompilieren Sie das Flink-Programm

Führen Sie nun die folgenden Schritte aus, um Ihr eigenes Flink-Programm zu erstellen. Starten Sie dazu IntelliJ IDEA, wählen Sie Projekt importieren und wählen Sie pom.xml im Stammverzeichnis von my-link-project. Importieren Sie dann das Projekt wie angewiesen.

Erstellen Sie eine SocketWindowWordCount.java-Datei unter src / main / java / myflink.

package myflink;

public class SocketWindowWordCount {

    public static void main(String[] args) throws Exception {

    }
}

Im Moment ist dieses Programm nur ein grundlegendes Framework, daher werden wir den Code Schritt für Schritt ausfüllen. Bitte beachten Sie, dass die Importanweisung automatisch von der IDE hinzugefügt wird. Beachten Sie daher, dass sie unten nicht geschrieben wird. Am Ende dieses Abschnitts sehen Sie den vollständigen Code. Wenn Sie die folgenden Schritte überspringen möchten, fügen Sie den endgültigen vollständigen Code direkt in den Editor ein.

Der erste Schritt im Flink-Programm besteht darin, eine "StreamExecutionEnvironment" zu erstellen. Dies ist eine Eintragsklasse, die zum Festlegen von Parametern, Erstellen von Datenquellen, Übermitteln von Aufgaben usw. verwendet werden kann. Fügen wir es nun der Hauptfunktion hinzu.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Erstellen Sie als Nächstes eine Datenquelle, die Daten vom Socket am lokalen Port 9000 liest.

DataStream text = env.socketTextStream("localhost", 9000, "\n");

Dadurch wird ein Zeichenfolgentyp "DataStream" erstellt. DataStream ist Flinks Kern-API für die Stream-Verarbeitung. Es definiert viele allgemeine Vorgänge (Filtern, Transformieren, Aggregieren, Fenster, Assoziationen usw.). In diesem Beispiel interessiert uns, wie oft jedes Wort in einem bestimmten Zeitfenster erscheint, beispielsweise in einem 5-Sekunden-Fenster. Zu diesem Zweck werden Zeichenfolgendaten zuerst in Wörter und deren Vorkommen analysiert (dargestellt durch "Tuple2 <Zeichenfolge, Ganzzahl>"), wobei das erste Feld das Wort und das zweite Feld das Wortvorkommen ist Es wird die Anzahl der Male sein. Der Anfangswert der Anzahl der Vorkommen wird auf 1 gesetzt. Da eine Datenreihe mehrere Wörter enthalten kann, wird "flatmap" zum Parsen implementiert.

DataStream> wordCounts = text
                .flatMap(new FlatMapFunction>() {
                    @Override
                    public void flatMap(String value, Collector> out) {
                        for (String word : value.split("\\s")) {
                            out.collect(Tuple2.of(word, 1));
                        }
                    }
                });

Gruppieren Sie dann die Datenströme basierend auf dem Wortfeld (dh dem Indexfeld 0). Hier verwenden wir die Methode "keyBy (int index)", um den Datenstrom "Tuple2 <String, Integer>" mit Wortschlüssel abzurufen. Geben Sie dann ein beliebiges Fenster im Stream an und berechnen Sie das Ergebnis anhand der Daten im Fenster. In diesem Beispiel werden Wortvorkommen alle 5 Sekunden aggregiert und jedes Fenster zählt von Grund auf neu.

DataStream> windowCounts = wordCounts
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1);

Das zweite .timeWindow () gibt ein Tumble-Fenster für 5 Sekunden an. Der dritte Aufruf gibt die Gesamtaggregationsfunktion für jeden Schlüssel und jedes Fenster an. In diesem Beispiel wird dies durch das Vorkommensfeld (dh das Indexfeld 1) hinzugefügt. Der resultierende Datenstrom gibt alle 5 Sekunden die Anzahl der Vorkommen jedes Wortes aus.

Schließlich gibt es den Datenstrom an die Konsole aus und startet die Ausführung.

windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");

Ein letzter Aufruf von "env.execute" ist erforderlich, um den eigentlichen Flink-Job zu starten. Alle Operator-Operationen (Quellenerstellung, Aggregation, Drucken usw.) erstellen nur ein Diagramm der internen Operator-Operationen. Nur wenn execute () aufgerufen wird, werden sie zur Ausführung an den Cluster oder den lokalen Computer gesendet.

package myflink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class SocketWindowWordCount {

    public static void main(String[] args) throws Exception {

        // Create the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Obtain the input data by connecting to the socket. Here you want to connect to the local 9000 port. If 9000 port is not available, you'll need to change the port.
        DataStream text = env.socketTextStream("localhost", 9000, "\n");

        // Parse the data, group by word, and perform the window and aggregation operations.
        DataStream> windowCounts = text
                .flatMap(new FlatMapFunction>() {
                    @Override
                    public void flatMap(String value, Collector> out) {
                        for (String word : value.split("\\s")) {
                            out.collect(Tuple2.of(word, 1));
                        }
                    }
                })
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1);

        // Print the results to the console. Note that here single-threaded printed is used, rather than multi-threading.
        windowCounts.print().setParallelism(1);

        env.execute("Socket Window WordCount");
    }
}

Programmausführung

Um das Beispielprogramm auszuführen, starten Sie NetCat auf dem Terminal und rufen Sie den Eingabestream ab.

nc -lk 9000

Unter Windows können Sie Ncat über [NMAP] installieren und ausführen (https://nmap.org/ncat/?spm=a2c65.11461447.0.0.29c056bf7J56YG).

ncat -lk 9000

Führen Sie dann die Hauptmethode von SocketWindowWordCount direkt aus.

Geben Sie einfach ein Wort in die NetCat-Konsole ein und die Ausgabekonsole "SocketWindowWordCount" zeigt Statistiken darüber an, wie oft jedes Wort erscheint. Wenn Sie eine Anzahl von 1 oder mehr sehen möchten, geben Sie dasselbe Wort innerhalb von 5 Sekunden wiederholt ein.

image.png

Recommended Posts

So erstellen Sie eine Apache Flink-Anwendung in 5 Minuten von Grund auf neu
So erstellen Sie ein ausführbares JAR in Maven
Ich habe versucht, innerhalb von 3 Monaten einen Antrag von unerfahren zu stellen
Was ist in "Java 8 bis Java 11" passiert und wie wird eine Umgebung erstellt?
Schritte zum Veröffentlichen einer Anwendung auf Heroku
Verstehe in 5 Minuten !! Wie man Docker benutzt
So erhalten Sie eine Klasse von Element in Java
So lösen Sie Ausdrucksprobleme in Java
Ich habe versucht, eine Anwendung in 2 Sprachen zu entwickeln
[Rails] So erstellen Sie eine Umgebung mit Docker
So erstellen Sie die einfachste Blockchain in Ruby
So starten Sie einen Index aus einer beliebigen Zahl in der iterativen Ruby-Verarbeitung
So machen Sie ein Bild mit Processing teilweise transparent
So implementieren Sie die Gastanmeldung in 5 Minuten im Rails-Portfolio
So installieren Sie Docker in der lokalen Umgebung einer vorhandenen Rails-App [Rails 6 / MySQL 8]
Verstehen Sie, wie Sie den JSON-Decoder von Swift in 3 Minuten verwenden
Wie man android-midi-lib baut
[Schienen] So zeigen Sie Bilder in der Ansicht an
So übergeben Sie ein Objekt in MyBatis an Mapper, ohne ein Argument durchzugehen
So erstellen Sie einen Anwendungsserver auf einer EC2-Instanz von AWS
So ändern Sie eine Zeichenfolge in einem Array in eine Zahl in Ruby
So rufen Sie den Hashwert in einem Array in Ruby ab
So stellen Sie eine Java-Anwendung in Alibaba Cloud EDAS in Eclipse bereit
So installieren Sie die Webanwendung für jede Sprache in Nginx
Überwachen von Anwendungsinformationen in Echtzeit mit JConsole
[Integrationstestcode] So wählen Sie ein Element aus date_select aus
Verwendung von Apache POI
Umgang mit Instanzen
So verhindern Sie, dass vergangene Daten in Rails-Formularen eingegeben werden
[Ruby] Wie man gerade oder ungerade Zahlen in einem Array zählt
So geben Sie den Wert aus, wenn sich ein Array im Array befindet
So veröffentlichen Sie eine Anwendung mithilfe der AWS (3) EC2-Instanzumgebungskonstruktion
Wie installiere ich die in Ubuntu verwendete Sprache und wie erstelle ich die Umgebung?
Abrufen und Hinzufügen von Daten aus dem Firebase Firestore in Ruby
So ermitteln Sie die Länge einer Audiodatei mit Java
Wie man Lombok im Frühling benutzt
So finden Sie May'n in XPath
So blenden Sie die Bildlaufleiste in WebView aus
So führen Sie JUnit in Eclipse aus
Wie man in Ruby auf unbestimmte Zeit iteriert
Wie man die Programmierung in 3 Monaten beherrscht
So erhalten Sie Parameter in Spark
So fügen Sie eine externe Bibliothek ein
Verwendung von InjectorHolder in OpenAM
So installieren Sie jQuery in Rails 6
So benennen Sie Variablen in Java
So setzen Sie Lombok in Eclipse
So erstellen Sie CloudStack mit Docker
So verketten Sie Zeichenfolgen mit Java
So installieren Sie Swiper in Rails
So wechseln Sie von HTML zu Haml