Hallo! Dies ist @RyosukeKawamura von LOB. ** Dieser Artikel ist ** LOB Adventskalender 2018 ** Artikel am 19. Tag **.
Dies ist der zweite Beitrag. Im Moment implementiere ich die Datenverarbeitung mit Apache Flink in meinem Unternehmen, daher möchte ich diesmal darüber schreiben.
In Bezug auf Flink gibt es nicht viele konkrete Beispiele im Netz, und selbst wenn es solche gibt, werden nur Treffer wie "Ich habe es versucht" oder "Erklärung der Flink-API" getroffen. Was soll ich also tun? Es war schwer zu verstehen, aber es gab einen Kollegen, der sehr vertraut damit war, und während ich verschiedene Dinge lernte, sagte ich schließlich: "Das ist wirklich praktisch, wenn Sie es voll ausnutzen ...!" Nachdem ich es verstanden habe, möchte ich es zusammenfassen, um mein Wissen zu organisieren.
Originaldokument ist ziemlich umfangreich. Wenn Sie es sorgfältig lesen, können Sie es verstehen (sollten). Kurz gesagt, es ist eine "verteilte Stream-Verarbeitungsplattform". Ich weiß es nicht, weil es viele ähnliche Dinge gibt (Storm, Spark usw.), aber es scheint, dass Flink sich aus der Stream-Verarbeitung entwickelt hat und in andere Bereiche wie Batches übergegangen ist.
Ich habe die Essenz noch nicht angesprochen, aber ich denke, es geht um die folgenden drei Punkte.
Ich bin ziemlich froh, dass es fast genauso implementiert werden kann, wenn Stream-Daten verarbeitet werden, die von Zeit zu Zeit gesendet werden, wenn Dateien verknüpft werden und wenn Tabellen gelesen und verarbeitet werden. Derzeit arbeiten wir an der Stapelverarbeitung, bei der tsv-Dateien verknüpft werden. Dies kann sich jedoch ändern, sodass Protokolle über Kafka usw. anstelle von Dateien gesendet werden. Dies ist ein ziemlich schöner Punkt, da ich denke, dass große Konfigurationsänderungen im Verlauf des Projekts häufig auftreten werden. Es gibt jedoch APIs, die nur für die Streaming-Verarbeitung verwendet werden können. Daher implementieren wir dieses Mal ** die Behandlung endlicher Daten, die mit Dateien verknüpft sind, aber die Verarbeitung als Streaming-Verarbeitung ** (Einzelheiten finden Sie unter "Verstopft"). Siehe unten).
Das war das Überraschendste. Es ist ein Mechanismus, mit dem Sie Aggregation usw. durchführen können, indem Sie Crisp und Methoden so verbinden, als würden Sie ein Array verarbeiten. Menschen, die an funktionale Sprachen gewöhnt sind, werden es besonders leicht finden. (Ich habe keine Erfahrung ...) Dieses Mal ist es in Java implementiert, aber da Flink auch in Scala verwendet werden kann, scheint es in einem besseren Zustand zu sein. Beispielcode sieht folgendermaßen aus.
Es tut mir leid, dass ich dies noch nicht implementiert habe, daher verstehe ich die Details nicht. .. .. Abhängig von der Implementierung können Sie so etwas wie einen Checkpoint einrichten und von dort aus neu beginnen. Ich bin froh, dass es einfach erscheint, die Gleichheit zu garantieren. Die Konsole, die standardmäßig mit dem Flink-Cluster geliefert wird, ist auch sehr praktisch und kann den Auftragsstatus sehr schnell überprüfen.
Obwohl wir mit der Verarbeitung unter Verwendung einer endlichen Datei als Eingabe fortfuhren, mussten wir die Anforderungen von "Ich möchte die API der Streaming-Verarbeitung verwenden" und "Ich kann eventuell zur Streaming-Verarbeitung wechseln" erfüllen. ** Datei lesen-> Einmal als Tabelle lesen-> Daten aus Tabelle laden-> Ergebnis als Datenstrom behandeln ** Ich habe es gelöst, indem ich etwas Kniffliges getan habe. Wenn Sie genau hinschauen, finden Sie es in der offiziellen Dokumentation, daher sollten Sie es richtig lesen ... Die Implementierung sieht so aus.
csvToStream.java
//Lesen Sie die Umgebung, die für die Stream- und Tabellenverarbeitung erforderlich ist
StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(sEnv);
//Datei lesen
TableSource source = new CsvTableSource.Builder()
.path("/path/to/user_data.tsv")
.ignoreFirstLine() //Ich bin froh, dass es in Ordnung ist, den Header zu überspringen, indem Sie diese Methode schreiben
.ignoreParseErrors() //Unzulässige Daten können jetzt ignoriert werden
.fieldDelimiter("\t") //Tabulator begrenzt
.field("cookie", Types.STRING)
.field("date", Types.TIMESTAMP)
.build();
//Als Tabellendatenquelle registriert
tEnv.registerTableSource("users", source);
//Daten als Tabelle abrufen
Table table = tEnv
.scan("users"); //Hier.filter()Und.select()Und繋ぐとSQLチックな処理もできる
//In Streaming konvertieren
DataStream<UserEntity> ds = tEnv.toAppendStream(table, Row.class) //Wenn Sie eine Entität definieren, können Sie sie als Stream dieses Typs lesen.
ds.flatMap(new hogeFlatMap()).setParallelism(parallelNum).addSink(...); //Ich lasse hier weg(Es scheint eine Form der Implementierung zu sein, die häufig FlatMap ähnelt und an addSink übergeben wird)
sEnv.execute(); //Wenn Sie hier vergessen, wird nichts funktionieren
Das ist alles. Lol Kuerzlich
python
ds.flatMap(new hogeFlatMap()).setParallelism(parallelNum).addSink(...);
Mit anderen Worten, hogeFlatMap () verarbeitet paralleles Streaming für die Anzahl der parallelNum, und was in hogeFlatMap () ausgegeben wird, wird an addSink übergeben. Es ist bequem, die Prozesse so verbinden zu können, als wären Sie mit einer Shell versehen. Ich habe es zuerst nicht verstanden. Da die flatMap jedes Mal aufgerufen wird, können Sie beim Herstellen einer Verbindung keine Verbindung zum Socket herstellen und sterben (Sie müssen sie an den Konstruktor übergeben). Daher ist es schwierig zu verstehen, was zu welchem Zeitpunkt funktioniert. Der Punkt, den ich zuerst schwer hatte, war, dass es schwer zu sehen war, selbst wenn ich trat.
Es ist schwer zu verstehen, bis ich mir angewöhnt habe (obwohl ich es noch nicht verstanden habe), aber Flink hat einen Geruch, der ein starker Verbündeter der Datenpipeline zu sein scheint, wenn ich ihn beherrschen kann. Ich habe die Möglichkeit, mit großen Ad-Tech-Daten in Kontakt zu treten, und hoffe, dass ich mehr lernen und diese beherrschen kann.
Wir haben viele Möglichkeiten, neue Datenverarbeitungstechnologien einzusetzen, nicht nur Flink. Insbesondere können wir mit Quantität und Qualität von Daten umgehen, die andere Unternehmen nicht verarbeiten können. Wenn Sie also interessiert sind oder dies gemeinsam tun möchten, lassen Sie es uns gemeinsam tun!
Wir suchen Freunde, um "die Reihenfolge der Verteilung zu ändern und eine Werbeplattform zu erstellen"! Wir sind in einem täglichen Strategietreffen, um eine gute Dateninfrastruktur zu schaffen! !! https://lob-inc.com/recruit/
Recommended Posts