[JAVA] Datenverarbeitung mit Apache Flink

Einführung

Hallo! Dies ist @RyosukeKawamura von LOB. ** Dieser Artikel ist ** LOB Adventskalender 2018 ** Artikel am 19. Tag **.

Dies ist der zweite Beitrag. Im Moment implementiere ich die Datenverarbeitung mit Apache Flink in meinem Unternehmen, daher möchte ich diesmal darüber schreiben.

In Bezug auf Flink gibt es nicht viele konkrete Beispiele im Netz, und selbst wenn es solche gibt, werden nur Treffer wie "Ich habe es versucht" oder "Erklärung der Flink-API" getroffen. Was soll ich also tun? Es war schwer zu verstehen, aber es gab einen Kollegen, der sehr vertraut damit war, und während ich verschiedene Dinge lernte, sagte ich schließlich: "Das ist wirklich praktisch, wenn Sie es voll ausnutzen ...!" Nachdem ich es verstanden habe, möchte ich es zusammenfassen, um mein Wissen zu organisieren.

Was ist Apache Flink?

Originaldokument ist ziemlich umfangreich. Wenn Sie es sorgfältig lesen, können Sie es verstehen (sollten). Kurz gesagt, es ist eine "verteilte Stream-Verarbeitungsplattform". Ich weiß es nicht, weil es viele ähnliche Dinge gibt (Storm, Spark usw.), aber es scheint, dass Flink sich aus der Stream-Verarbeitung entwickelt hat und in andere Bereiche wie Batches übergegangen ist.

Gute Punkte von Flink

Ich habe die Essenz noch nicht angesprochen, aber ich denke, es geht um die folgenden drei Punkte.

(1) Sowohl Stream als auch Batch werden unterstützt und können auf ähnliche Weise implementiert werden.

Ich bin ziemlich froh, dass es fast genauso implementiert werden kann, wenn Stream-Daten verarbeitet werden, die von Zeit zu Zeit gesendet werden, wenn Dateien verknüpft werden und wenn Tabellen gelesen und verarbeitet werden. Derzeit arbeiten wir an der Stapelverarbeitung, bei der tsv-Dateien verknüpft werden. Dies kann sich jedoch ändern, sodass Protokolle über Kafka usw. anstelle von Dateien gesendet werden. Dies ist ein ziemlich schöner Punkt, da ich denke, dass große Konfigurationsänderungen im Verlauf des Projekts häufig auftreten werden. Es gibt jedoch APIs, die nur für die Streaming-Verarbeitung verwendet werden können. Daher implementieren wir dieses Mal ** die Behandlung endlicher Daten, die mit Dateien verknüpft sind, aber die Verarbeitung als Streaming-Verarbeitung ** (Einzelheiten finden Sie unter "Verstopft"). Siehe unten).

(2) Die reichhaltige API erleichtert das Erreichen der juckenden Stelle beim Zählen

Das war das Überraschendste. Es ist ein Mechanismus, mit dem Sie Aggregation usw. durchführen können, indem Sie Crisp und Methoden so verbinden, als würden Sie ein Array verarbeiten. Menschen, die an funktionale Sprachen gewöhnt sind, werden es besonders leicht finden. (Ich habe keine Erfahrung ...) Dieses Mal ist es in Java implementiert, aber da Flink auch in Scala verwendet werden kann, scheint es in einem besseren Zustand zu sein. Beispielcode sieht folgendermaßen aus.

(3) Wenn ein Fehler auftritt, wird er mit Bedacht wiederhergestellt und die Verarbeitung kann von dort aus fortgesetzt werden (eine solche Implementierung ist möglich).

Es tut mir leid, dass ich dies noch nicht implementiert habe, daher verstehe ich die Details nicht. .. .. Abhängig von der Implementierung können Sie so etwas wie einen Checkpoint einrichten und von dort aus neu beginnen. Ich bin froh, dass es einfach erscheint, die Gleichheit zu garantieren. Die Konsole, die standardmäßig mit dem Flink-Cluster geliefert wird, ist auch sehr praktisch und kann den Auftragsstatus sehr schnell überprüfen.

image.png

Wo es stecken bleibt

Behandeln Sie endliche Daten wie CSV als Streaming-Verarbeitung

Obwohl wir mit der Verarbeitung unter Verwendung einer endlichen Datei als Eingabe fortfuhren, mussten wir die Anforderungen von "Ich möchte die API der Streaming-Verarbeitung verwenden" und "Ich kann eventuell zur Streaming-Verarbeitung wechseln" erfüllen. ** Datei lesen-> Einmal als Tabelle lesen-> Daten aus Tabelle laden-> Ergebnis als Datenstrom behandeln ** Ich habe es gelöst, indem ich etwas Kniffliges getan habe. Wenn Sie genau hinschauen, finden Sie es in der offiziellen Dokumentation, daher sollten Sie es richtig lesen ... Die Implementierung sieht so aus.

csvToStream.java


//Lesen Sie die Umgebung, die für die Stream- und Tabellenverarbeitung erforderlich ist
StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment(); 
StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(sEnv); 

//Datei lesen
TableSource source =  new CsvTableSource.Builder() 
    .path("/path/to/user_data.tsv")
    .ignoreFirstLine() //Ich bin froh, dass es in Ordnung ist, den Header zu überspringen, indem Sie diese Methode schreiben
    .ignoreParseErrors() //Unzulässige Daten können jetzt ignoriert werden
    .fieldDelimiter("\t") //Tabulator begrenzt
    .field("cookie", Types.STRING) 
    .field("date", Types.TIMESTAMP)
    .build(); 

//Als Tabellendatenquelle registriert
tEnv.registerTableSource("users", source);

//Daten als Tabelle abrufen
Table table = tEnv
    .scan("users"); //Hier.filter()Und.select()Und繋ぐとSQLチックな処理もできる

//In Streaming konvertieren
DataStream<UserEntity> ds = tEnv.toAppendStream(table, Row.class) //Wenn Sie eine Entität definieren, können Sie sie als Stream dieses Typs lesen.

ds.flatMap(new hogeFlatMap()).setParallelism(parallelNum).addSink(...); //Ich lasse hier weg(Es scheint eine Form der Implementierung zu sein, die häufig FlatMap ähnelt und an addSink übergeben wird)
sEnv.execute(); //Wenn Sie hier vergessen, wird nichts funktionieren

Auf den ersten Blick ist es schwer vorstellbar, wie es überhaupt funktioniert

Das ist alles. Lol Kuerzlich

python


ds.flatMap(new hogeFlatMap()).setParallelism(parallelNum).addSink(...);

Mit anderen Worten, hogeFlatMap () verarbeitet paralleles Streaming für die Anzahl der parallelNum, und was in hogeFlatMap () ausgegeben wird, wird an addSink übergeben. Es ist bequem, die Prozesse so verbinden zu können, als wären Sie mit einer Shell versehen. Ich habe es zuerst nicht verstanden. Da die flatMap jedes Mal aufgerufen wird, können Sie beim Herstellen einer Verbindung keine Verbindung zum Socket herstellen und sterben (Sie müssen sie an den Konstruktor übergeben). Daher ist es schwierig zu verstehen, was zu welchem Zeitpunkt funktioniert. Der Punkt, den ich zuerst schwer hatte, war, dass es schwer zu sehen war, selbst wenn ich trat.

abschließend

Es ist schwer zu verstehen, bis ich mir angewöhnt habe (obwohl ich es noch nicht verstanden habe), aber Flink hat einen Geruch, der ein starker Verbündeter der Datenpipeline zu sein scheint, wenn ich ihn beherrschen kann. Ich habe die Möglichkeit, mit großen Ad-Tech-Daten in Kontakt zu treten, und hoffe, dass ich mehr lernen und diese beherrschen kann.

Wir haben viele Möglichkeiten, neue Datenverarbeitungstechnologien einzusetzen, nicht nur Flink. Insbesondere können wir mit Quantität und Qualität von Daten umgehen, die andere Unternehmen nicht verarbeiten können. Wenn Sie also interessiert sind oder dies gemeinsam tun möchten, lassen Sie es uns gemeinsam tun!

Wir suchen Freunde, um "die Reihenfolge der Verteilung zu ändern und eine Werbeplattform zu erstellen"! Wir sind in einem täglichen Strategietreffen, um eine gute Dateninfrastruktur zu schaffen! !! https://lob-inc.com/recruit/

Recommended Posts

Datenverarbeitung mit Apache Flink
Datenverarbeitung mit der Stream-API von Java 8
Excel-Operation mit Apache POI
Signieren Sie XML mit Apache Santuario
Ich habe versucht, Apache Wicket zu verwenden
[Swift] Asynchrone Verarbeitung mit PromiseKit
[Verarbeitung] Versuchen Sie es mit GT Force.
CSV-Ausgabeverarbeitung mit Super-CSV
Ausgabe nach Excel mit Apache POI!
Entwicklung von Flink mit der DataStream-API
[Kotlin] Ein Beispiel für die Verarbeitung mit Enum
Löschen von Dateien mit rekursiver Verarbeitung [Java]
Führen Sie die Parallelverarbeitung mit dem CyclicBarrier von Java durch
Ich habe die Leistung des speicherinternen Datengitters Apache Ignite mit Yardstick gemessen.