[JAVA] Traitement des données avec Apache Flink

introduction

salut! C'est @RyosukeKawamura de LOB. ** Cet article est ** LOB Advent Calendar 2018 ** Article sur le 19ème jour **.

Ceci est le deuxième article. En ce moment, j'implémente le traitement des données à l'aide d'Apache Flink dans mon entreprise, j'aimerais donc en parler cette fois.

En ce qui concerne Flink, il n'y a pas beaucoup d'exemples concrets sur le net, et même s'il y en a, seuls les hits tels que "J'ai essayé" ou "Explication de l'API de Flink" sont touchés. Alors que dois-je faire après tout? C'était difficile à comprendre, mais il y avait un collègue qui le connaissait très bien, et pendant que j'apprenais diverses choses, j'ai finalement dit: "C'est vraiment pratique si vous en faites pleinement usage ...!" Maintenant que je le comprends, je voudrais le résumer dans le but d'organiser mes connaissances.

Qu'est-ce qu'Apache Flink

Document original est assez substantiel. Si vous le lisez attentivement, vous pouvez le comprendre (devrait). En un mot, c'est une «plateforme de traitement de flux distribué». Je ne sais pas car il y a beaucoup de choses similaires (Storm, Spark, etc.), mais il semble que Flink a évolué à partir du traitement de flux et a exsudé vers d'autres domaines tels que les lots.

Bons points de Flink

Je n'ai pas encore touché à l'essentiel, mais je pense qu'il s'agit des trois points suivants.

(1) Le flux et le lot sont pris en charge et peuvent être implémentés de la même manière.

Je suis assez heureux qu'il puisse être implémenté presque de la même manière lors du traitement de données de flux envoyées de temps en temps, lors de la liaison de fichiers et lors de la lecture et du traitement de tables. Ce sur quoi nous travaillons actuellement est le traitement par lots dans lequel les fichiers tsv sont liés, mais cela peut changer afin que les journaux soient envoyés via Kafka, etc. au lieu de fichiers. C'est un point assez intéressant, car je pense que de grands changements de configuration se produiront fréquemment au fur et à mesure que le projet avance. Cependant, il existe des API qui ne peuvent être utilisées que dans le traitement du streaming, donc cette fois, nous implémentons ** la gestion des données finies liées aux fichiers mais les traitant comme un traitement en streaming ** (pour plus de détails, voir "Encrassé" Voir ci-dessous).

(2) L'API riche permet d'atteindre facilement l'endroit qui démange lors du comptage

C'était le plus surprenant. C'est un mécanisme qui vous permet d'effectuer une agrégation, etc. simplement en connectant crisp et des méthodes comme si vous traitez un tableau. Les personnes habituées aux langages fonctionnels trouveront cela particulièrement facile. (Je n'ai pas d'expérience ...) Cette fois, il est implémenté en Java, mais comme Flink peut également être utilisé dans Scala, il semble qu'il soit en meilleur état. Exemple de code ressemble à ceci.

(3) Lorsqu'un échec se produit, il récupère sagement et le traitement peut être repris à partir de là (une mise en œuvre comme celle-ci est possible)

Je suis désolé de ne pas avoir encore implémenté cela, donc je ne comprends pas les détails. .. .. Selon l'implémentation, il semble que vous puissiez configurer quelque chose comme un point de contrôle et recommencer à partir de là. Je suis content qu'il semble facile de garantir l'égalité. La console fournie par défaut avec le cluster Flink est également très pratique et très rapide pour vérifier l'état du travail.

image.png

Où ça se coince

Traitez les données finies telles que CSV comme un traitement en continu

Bien que nous procédions au traitement en utilisant un fichier fini comme entrée, nous devions répondre aux exigences de «Je veux utiliser l'API de traitement en continu» et «Je pourrais éventuellement passer au traitement en continu». ** Lire le fichier-> Lire une fois comme tableau-> Charger les données du tableau-> Traiter le résultat comme un flux de données ** Je l'ai résolu en faisant quelque chose d'un peu délicat. Si vous regardez de plus près, vous pouvez le trouver dans la documentation officielle, vous devriez donc le lire correctement ... L'implémentation ressemble à ceci.

csvToStream.java


//Lire l'environnement requis pour le traitement des flux et le traitement des tables
StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment(); 
StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(sEnv); 

//Lecture de fichiers
TableSource source =  new CsvTableSource.Builder() 
    .path("/path/to/user_data.tsv")
    .ignoreFirstLine() //Je suis heureux que vous puissiez ignorer l'en-tête simplement en écrivant cette méthode
    .ignoreParseErrors() //Les données illégales peuvent désormais être ignorées
    .fieldDelimiter("\t") //Onglet délimité
    .field("cookie", Types.STRING) 
    .field("date", Types.TIMESTAMP)
    .build(); 

//Enregistré en tant que source de données de table
tEnv.registerTableSource("users", source);

//Obtenir des données sous forme de tableau
Table table = tEnv
    .scan("users"); //ici.filter()Et.select()Et繋ぐとSQLチックな処理もできる

//Convertir en streaming
DataStream<UserEntity> ds = tEnv.toAppendStream(table, Row.class) //Si vous définissez une entité, vous pouvez la lire comme un flux de ce type.

ds.flatMap(new hogeFlatMap()).setParallelism(parallelNum).addSink(...); //J'omets ici(Cela semble être une forme d'implémentation qui ressemble souvent à FlatMap et à le passer à addSink)
sEnv.execute(); //Si vous oubliez ici, rien ne fonctionnera

À première vue, il est difficile d'imaginer comment cela fonctionne en premier lieu

C'est tout. Lol Il y a un instant

python


ds.flatMap(new hogeFlatMap()).setParallelism(parallelNum).addSink(...);

En d'autres termes, hogeFlatMap () traite le streaming en parallèle pour le nombre de parallelNum, et ce qui se trouve dans hogeFlatMap () est passé à addSink. Il est confortable de pouvoir connecter les processus comme si vous étiez canalisé avec un shell. Je n'ai pas compris au début. Puisque le flatMap est appelé à chaque fois, si vous établissez une connexion, vous ne pourrez pas vous connecter au socket et vous mourrez (vous devez le passer au constructeur), il est donc difficile de comprendre ce qui fonctionne à quel moment. Le fait que j'ai eu du mal au début était qu'il était difficile de voir même si je marchais.

en conclusion

C'est difficile à comprendre jusqu'à ce que je prenne une habitude (même si je ne l'ai pas encore compris), mais Flink a un parfum qui semble être un allié fort du pipeline de données si je peux le maîtriser. J'ai la chance d'entrer en contact avec de grandes données de technologie publicitaire, j'espère donc pouvoir les étudier davantage et les maîtriser.

Nous avons de nombreuses opportunités d'utiliser les nouvelles technologies de traitement des données, pas seulement Flink. En particulier, nous pouvons gérer la quantité et la qualité des données que d'autres entreprises ne peuvent pas gérer, donc si vous êtes intéressé ou souhaitez le faire ensemble, faisons-le ensemble!

Nous recherchons des amis pour "changer l'ordre de distribution et créer une plateforme publicitaire"! Nous sommes en réunion de stratégie quotidienne pour créer une bonne infrastructure de données! !! https://lob-inc.com/recruit/

Recommended Posts

Traitement des données avec Apache Flink
Traitement des données à l'aide de l'API de flux de Java 8
Opération Excel avec Apache POI
Signer XML à l'aide d'Apache Santuario
J'ai essayé d'utiliser Apache Wicket
[Swift] Traitement asynchrone à l'aide de PromiseKit
[Traitement] Essayez d'utiliser GT Force.
Traitement de la sortie CSV avec Super-CSV
Sortie vers Excel en utilisant Apache POI!
Développement de Flink à l'aide de l'API DataStream
[Kotlin] Un exemple de traitement utilisant Enum
Suppression de fichiers à l’aide du traitement récursif [Java]
Effectuer un traitement parallèle à l'aide de CyclicBarrier de Java
J'ai mesuré les performances de la grille de données en mémoire Apache Ignite à l'aide de Yardstick.