[JAVA] Développement de Flink à l'aide de l'API DataStream

Dans cet article, nous passerons en revue les bases du traitement de flux distribué et explorerons le développement de Flink et DataStream ** API ** à titre d'exemple.

Concept de base du traitement de flux

La définition du traitement de flux peut être différente. Conceptuellement, le traitement par flux et le traitement par lots sont les deux faces d'une même pièce. Leur relation dépend du fait que les éléments ArrayList, Java sont directement considérés comme un ensemble de données limité et sont accessibles par des indices ou des itérateurs.

image.png

Figure 1. Le côté gauche est un trieur de pièces

Le classificateur de pièces peut être décrit comme un système de traitement de flux. Au préalable, tous les composants utilisés pour classer les pièces sont connectés en série. Les pièces entrent en permanence dans le système et sont envoyées dans une autre file d'attente pour une utilisation future. La même chose s'applique à la photo de droite.

Les systèmes de traitement de flux ont de nombreuses caractéristiques. Les systèmes de traitement de flux utilisent généralement un schéma de traitement basé sur les données pour prendre en charge le traitement d'un nombre infini d'ensembles de données. Définissez l'opérateur à l'avance et traitez les données. Pour représenter une logique de calcul complexe, les moteurs de traitement de flux distribués, y compris Flink, utilisent généralement des graphiques DAG pour représenter l'ensemble de la logique de calcul.

Chaque point du DAG représente l'opérateur, qui est l'unité logique de base. Organisez votre logique de calcul en graphiques orientés pour permettre aux données de circuler des bords vers votre système à partir de nœuds sources spéciaux. Les données sont transmises et traitées entre opérateurs via différentes méthodes de transmission de données telles que la transmission réseau et la transmission locale. Enfin, les résultats des données sont envoyés à un système externe ou à une base de données via d'autres nœuds de synchronisation spécialisés.

image.png

Figure 2. Graphique logique de calcul du DAG et modèle physique d'exécution réel.

Chaque opérateur du graphe logique possède plusieurs threads simultanés sur le graphe physique. Pour les moteurs de traitement de flux distribués, le modèle physique d'exécution réel est plus complexe car chaque opérateur peut avoir plusieurs instances. Comme le montre la figure 2, l'opérateur source A a deux instances et l'opérateur intermédiaire C a également deux instances.

Dans le modèle logique, A et B sont des nœuds en amont de C, et dans le modèle physique correspondant, il peut y avoir des échanges de données entre toutes les instances de C, A, B.

Lors de la distribution des instances d'opérateur à différents processus, les données sont transmises sur le réseau. Le transfert de données entre plusieurs instances dans le même processus n'a généralement pas besoin de passer par le réseau.

Tableau 1 Graphique de calcul du DAG construit à l'aide d'Apache Storm. La définition d'API d'Apache Storm est "orientée opération", elle est donc de bas niveau.

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("spout", new RandomSentenceSpout(), 5);
builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));

Tableau 2 Graphique de calcul du DAG construit à l'aide d'Apache Flink. Les définitions d'API d'Apache Flink sont plus «orientées données», elles sont donc à un niveau supérieur.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = env.readTextFile ("input");
DataStream<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).keyBy(0).sum(1);
counts.writeAsText("output");

Étant donné que le graphe DAG représente la logique de calcul du traitement de flux, la plupart des API sont conçues autour de la construction de ce graphe de logique de calcul. Le tableau 1 montre un exemple de WordCount d'Apache Storm, qui était populaire il y a quelques années.

Apache Storm ajoute les opérateurs Spout et Bolt au graphique pour spécifier la manière dont les opérateurs se connectent. De cette façon, après avoir construit le graphe entier, soumettez-le pour exécution dans un cluster distant ou local.

En revanche, l'API Apache Flink crée également des graphiques de logique de calcul, mais les définitions d'API de Flink sont plus orientées vers la logique de traitement des données. Flink extrait le flux de données dans un ensemble infini, définit un groupe d'opérations sur cet ensemble et crée automatiquement le graphique DAG correspondant au niveau de la couche inférieure.

En conséquence, l'API Flink est à un niveau supérieur. De nombreux chercheurs peuvent préférer la grande flexibilité de Storm dans leurs expériences, car elle facilite la sécurisation de la structure graphique attendue. Cependant, l'industrie dans son ensemble préfère les API avancées telles que l'API Flink, car elles sont plus faciles à utiliser.

Présentation de l'API Flink DataStream

Nous expliquerons en détail comment utiliser l'API Flink DataStream basée sur le concept de base du traitement de flux jusqu'à présent. Commençons par un exemple simple. Le tableau 3 est un exemple de streaming WordCount. Il ne comporte que 5 lignes de code, mais il fournit la structure de base pour développer des programmes basés sur l'API Flink DataStream.

Tableau 3 Exemples WordCount basés sur l'API Flink DataStream

// 1. Set the runtime environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. Configure the data source to read data
DataStream<String> text = env.readTextFile ("input");
// 3. Perform a series of transformations
DataStream<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).keyBy(0).sum(1);
// 4. Configure the Sink to write data out
counts.writeAsText("output");
// 5. Submit for execution
env.execute("Streaming WordCount");

Pour implémenter Streaming WordCount, obtenez d'abord l'objet StreamExecutionEnvironment. C'est l'objet de contexte qui construit le graphique. Ajoutez un opérateur basé sur cet objet. Pour les niveaux de traitement de flux, créez une source de données pour accéder aux données. Cet exemple utilise une source de données intégrée pour lire les fichiers situés dans l'objet Environment.

Ensuite, récupérez l'objet DataStream, qui est un ensemble de données infini. Effectuez une série d'opérations sur cet ensemble de données. Par exemple, dans l'exemple WordCount, chaque enregistrement (c'est-à-dire une ligne du fichier) est d'abord séparé en mots et implémenté par l'opération FlatMap.

L'appel de FlatMap ajoute l'opérateur au graphique DAG sous-jacent. Ensuite, pour obtenir un flux de mots, regroupez les mots dans le flux (KeyBy) et calculez cumulativement les données pour chaque mot (somme (1)). Les données de mot calculées forment un nouveau flux et sont écrites dans le fichier de sortie.

Enfin, appelez la méthode env # execute pour démarrer l'exécution du programme. Assurez-vous qu'aucune des méthodes que vous avez appelées précédemment ne traite des données et créez un graphique DAG pour représenter votre logique de calcul.

Construisez le graphe entier et appelez d'abord explicitement la méthode Execute. Le cadre fournit des graphiques de calcul au cluster pour accéder aux données et exécuter la logique.

Les exemples de diffusion en continu basés sur WordCount montrent que la compilation d'un processeur de flux basé sur l'API Flink DataStream nécessite généralement trois étapes: l'accès, le traitement et l'écriture des données.

Enfin, appelez la méthode Execute.

image.png

Figure 3. Présentation du fonctionnement de Flink Data Stream.

Comme vous pouvez le voir dans l'exemple précédent, le cœur de l'API Flink DataStream est un objet DataStream qui représente des données en continu. L'ensemble du graphe de logique de calcul est construit sur la base de l'appel de différentes opérations sur l'objet DataStream pour créer un nouvel objet DataStream.

En général, il existe quatre types d'opérations sur DataStream. Le premier type est une opération d'enregistrement unique, qui filtre les enregistrements indésirables (opération de filtrage) et convertit chaque enregistrement (opération de mappage). Le deuxième type est une opération multi-enregistrements. Par exemple, pour compter le volume total de commandes en une heure, ajoutez tous les enregistrements de commande en une heure. Pour prendre en charge ce type d'opération, vous devez combiner les enregistrements requis via une fenêtre de traitement.

Le troisième type consiste à manipuler plusieurs flux et à les convertir en un seul flux. Par exemple, vous pouvez fusionner plusieurs flux avec des opérations telles que Union, Join et Connect. Ces opérations utilisent une logique différente pour fusionner les flux, mais produisent finalement un nouveau flux unifié, ce qui permet certaines opérations cross-stream.

Le quatrième type est une "opération de fractionnement", qui est prise en charge par DataStream et qui contraste avec l'opération de fusion. Ces opérations divisent le flux en plusieurs flux selon les règles, et chaque flux fractionné est un sous-ensemble du flux précédent.

image.png

Figure 4. Différents types de sous-types DataStream. Différents sous-types prennent en charge différents ensembles d'opérations.

Pour prendre en charge différentes opérations de flux, Flink introduit différents ensembles de types de flux pour indiquer le type d'ensemble de données de flux intermédiaire. La figure 4 montre le type complet de relation de transformation.

Pour les opérations d'enregistrement unique comme Map, le résultat est de type DataStream. L'opération Split produit un SplitStream. Basé sur SplitStream, utilisez la méthode Select pour filtrer les enregistrements souhaités afin d'obtenir le flux de base.

De même, l'opération Connect obtient un ConnectedStream dédié après avoir appelé StreamA.connect (StreamB). Les opérations prises en charge par ConnectedStream sont différentes des opérations prises en charge par DataStream commun.

Ceci est le résultat de la fusion de deux flux différents, vous permettant de spécifier une logique de traitement différente pour les enregistrements dans les deux flux, et les résultats traités forment un nouveau flux DataStream. Traitez différents enregistrements avec le même opérateur pour partager les informations d'état pendant le traitement. Certaines opérations de jointure de niveau supérieur doivent être implémentées via des opérations de connexion de niveau inférieur.

Vous pouvez également diviser le flux par heure ou par nombre en utilisant la fenêtre. Sélectionnez une logique de fractionnement spécifique. Lorsque tous les enregistrements du groupe arrivent, récupérez tous les enregistrements et effectuez des opérations de cheminement et de somme. Par conséquent, en traitant chaque groupe, vous obtenez un ensemble de données de sortie et toutes les données de sortie forment un nouveau flux de base.

Pour un DataStream commun, utilisez l'opération allWindow, qui représente un processus de fenêtrage unifié pour l'ensemble du flux. Par conséquent, il n'est pas possible d'effectuer des calculs simultanés à l'aide de plusieurs instances d'opérateur. Pour résoudre ce problème, regroupez d'abord les enregistrements par clé à l'aide de la méthode KeyBy. Après cela, des processus Windows séparés sont exécutés en parallèle pour les enregistrements correspondant à différentes clés.

L'opération KeyBy est l'une des opérations les plus importantes et les plus couramment utilisées. Il est expliqué en détail ci-dessous.

image.png

Figure 5. Comparaison du fonctionnement de la fenêtre du flux de base et de KeyedStream

Les opérations de fenêtre sur KeyedStream permettent un traitement simultané à l'aide de plusieurs instances. La figure 5 montre une comparaison des opérations allWindow sur un objet DataStream de base et des opérations Window sur un objet KeyedStream. Pour traiter les données dans plusieurs instances simultanées en même temps, utilisez l'opération KeyBy pour regrouper les données.

Les opérations KeyBy et Window regroupent les données, mais les opérations KeyBy divisent le flux horizontalement et les opérations Window divisent le flux verticalement.

Après avoir fractionné les données avec KeyBy, chaque instance d'opérateur suivante peut traiter les données correspondant à un ensemble de clés particulier. De plus, Flink permet aux opérateurs de maintenir un état particulier. L'état des opérateurs sur le KeyedStream est stocké de manière distribuée.

KeyBy est une méthode d'allocation de données déterministe (la section suivante présentera d'autres méthodes d'allocation). Si un travail qui a échoué est redémarré et que le parallélisme change, Flink réaffecte le groupe de clés, garantissant que le groupe qui gère une clé particulière doit contenir l'état de cette clé et est cohérent. Assurer le sexe.

Enfin, notez que l'opération KeyBy ne fonctionne que si le nombre de clés dépasse le nombre d'instances simultanées de l'opérateur. Toutes les données correspondant à la même clé sont envoyées à la même instance, donc si le nombre de clés est inférieur au nombre d'instances, certaines instances ne pourront pas recevoir les données et la puissance de calcul ne sera pas pleinement utilisée. Devenir.

Autres issues

Flink prend en charge des méthodes de regroupement physiques autres que KeyBy lors de l'échange de données entre opérateurs. Comme le montre la figure 1, les méthodes de regroupement physique dans Flink Data Stream comprennent:

--Global: l'opérateur en amont envoie tous les enregistrements à la première instance de l'opérateur en aval. --Diffusion: l'opérateur en amont envoie chaque enregistrement à toutes les instances de l'opérateur en aval. --Forward: l'opérateur en amont envoie l'enregistrement à toutes les instances de l'opérateur en aval. Chaque instance d'opérateur en amont envoie un enregistrement à l'instance correspondante de l'opérateur en aval. Cette méthode s'applique uniquement si le nombre d'instances de l'opérateur en amont est le même que le nombre d'instances de l'opérateur en aval.

image.png

Figure 6. Méthodes de regroupement physique autres que KeyBy

Outre les méthodes de regroupement, un autre concept important de l'API Flink DataStream est le type de système.

Comme le montre la figure 7, l'objet Flink DataStream a un paramètre de type de système fort. Vous devez spécifier le type d'élément pour chaque objet DataStream. Le mécanisme de sérialisation sous-jacent de Flink s'appuie sur ces informations pour optimiser la sérialisation. Plus précisément, la couche inférieure de Flink utilise un objet TypeInformation pour décrire le type. L'objet TypeInformation définit une chaîne d'informations liées au type utilisées par l'infrastructure de sérialisation.

image.png

Figure 7. Système de type d'API Flink DataStream

Flink a quelques types de base intégrés couramment utilisés. Pour ceux-ci, Flink fournit également ses informations de type et peut être utilisé directement sans aucune déclaration supplémentaire. Flink peut utiliser un mécanisme d'inférence de type pour identifier le type correspondant. Cependant, il existe des exceptions.

Par exemple, l'API Flink DataStream prend en charge Java et Scala. De nombreuses API Scala transmettent les informations de type via des paramètres implicites, donc si vous devez appeler l'API Scala via Java, vous devez transmettre les informations de type via des paramètres implicites. Un autre exemple est l'effacement basé sur Java des types génériques. Si le type de flux est un type générique, il peut ne pas être nécessaire de déduire le type d'informations après l'effacement. Dans ce cas, le type d'informations doit également être spécifié explicitement.

Dans Flink, les API Java utilisent généralement le type Tuple lors de la jonction de plusieurs champs, tandis que les API Scala utilisent souvent les types Row et Case Class. Par rapport au type Row, le type Taple a deux restrictions: le nombre de champs ne peut pas dépasser 25 et la valeur NULL ne peut pas être utilisée dans tous les champs.

Enfin, Flink vous permet de personnaliser de nouveaux types, TypeInformation et de sérialiser avec Kryo. Cependant, cela peut entraîner des problèmes de migration. Par conséquent, nous vous recommandons d'éviter les types personnalisés.

Exemple

Regardons un exemple un peu plus compliqué. Supposons que vous ayez une source de données dans votre système qui surveille vos commandes. Il utilise Tuple2 pour imprimer le type et le volume du produit commandé lors de la passation d'une nouvelle commande. Il compte ensuite le volume des transactions de tous types d'articles en temps réel.

Tableau 4 Un exemple de statistiques d'ordre en temps réel.

public class GroupedProcessingTimeWindowSample {
    private static class DataSource extends RichParallelSourceFunction<Tuple2<String, Integer>> {
        private volatile boolean isRunning = true;

        @Override
        public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
            Random random = new Random();
            while (isRunning) {
                Thread.sleep((getRuntimeContext().getIndexOfThisSubtask() + 1) * 1000 * 5);
                String key = "Classification" + (char) ('A' + random.nextInt(3));
                int value = random.nextInt(10) + 1;

                System.out.println(String.format("Emits\t(%s, %d)", key, value));
                ctx.collect(new Tuple2<>(key, value));
            }
        }

        @Override
        public void cancel() {
            isRunning = false;
        }
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        DataStream<Tuple2<String, Integer>> ds = env.addSource(new DataSource());
        KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = ds.keyBy(0);

        keyedStream.sum(1).keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {
            @Override
            public Object getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                return "";
            }
        }).fold(new HashMap<String, Integer>(), new FoldFunction<Tuple2<String, Integer>, HashMap<String, Integer>>() {
            @Override
            public HashMap<String, Integer> fold(HashMap<String, Integer> accumulator, Tuple2<String, Integer> value) throws Exception {
                accumulator.put(value.f0, value.f1);
                return accumulator;
            }
        }).addSink(new SinkFunction<HashMap<String, Integer>>() {
            @Override
            public void invoke(HashMap<String, Integer> value, Context context) throws Exception {
                  //Volume de commutation de produit individuel
                  System.out.println(value);
                  //Montant de la marchandise
                  System.out.println(value.values().stream().mapToInt(v -> v).sum());
            }
        });

        env.execute();
    }
}

Le tableau 4 montre la mise en œuvre de cet exemple. Ici, nous allons implémenter une source de données fictive qui hérite de RichParallelSourceFunction. RichParallelSourceFunction est une API SourceFunction avec plusieurs instances.

Implémentez deux méthodes, la méthode Run et la méthode Cancel. Flink appelle la méthode Run directement à la source au moment de l'exécution. Les données doivent être sorties en continu pour former le flux initial. Lors de l'implémentation de la méthode Run, générez aléatoirement des enregistrements pour le type d'élément et le volume de transaction et envoyez-les en utilisant la méthode ctx # collect. Si vous devez annuler la tâche source utilisée par la variable Volatile pour Flink pour marquer et contrôler son état d'exécution, utilisez la méthode Cancel.

Ensuite, commencez à créer le graphique avec la méthode Main. Tout d'abord, créez un objet StreamExecutionEnviroment. La méthode getExecutionEnvironment, qui est appelée pour créer l'objet, détermine automatiquement l'environnement afin que l'objet approprié soit créé. Par exemple, un clic droit dans l'EDI et l'exécution de la méthode crée un objet LocalStreamExecutionEnvironment.

Lorsqu'il est exécuté dans un environnement réel, il crée un objet RemoteStreamExecutionEnvironment. Créez une source pour obtenir le flux initial basé sur l'objet Environment. Ensuite, afin de compter le montant de la transaction pour chaque type d'élément, KeyBy est utilisé pour regrouper le flux d'entrée via le premier champ (type d'élément) de Tuple, et le deuxième champ (montant de la transaction) de l'enregistrement correspondant à chaque clé. ) Se résume.

Dans la couche inférieure, l'opérateur Sum utilise la méthode State pour contenir la valeur totale des volumes de transaction pour chaque clé (type d'élément). Lorsqu'un nouvel enregistrement arrive, l'opérateur Sum met à jour le volume total des transactions gérées et imprime l'enregistrement .NET.

Si vous ne voulez compter que des volumes de type, le programme se termine ici. Ajoutez l'opérateur Sink immédiatement après l'opérateur Sum pour imprimer un volume de transaction mis à jour en permanence pour chaque type d'élément. Cependant, pour compter le nombre de transactions de tous types, sortez tous les enregistrements du même nœud de calcul.

J'utilise KeyBy pour renvoyer la même clé pour tous les enregistrements, les regrouper et envoyer tous les enregistrements à la même instance.

Utilisez ensuite la méthode Fold pour maintenir le volume de chaque type d'élément dans l'opérateur. Notez que la méthode Fold est marquée obsolète, mais qu'elle ne peut pas aujourd'hui être remplacée par d'autres opérations dans l'API DataStream. Par conséquent, cette méthode reçoit une valeur initiale.

Ensuite, lorsque chaque enregistrement du flux suivant arrive, l'opérateur appelle la fonction FoldFunction passée pour mettre à jour la valeur initiale et envoie la valeur mise à jour.

Utilisez HashMap pour suivre le volume de transaction actuel pour chaque type d'élément. Mettez à jour HashMap lorsque de nouveaux arrivent. De cette manière, HashMap du dernier type d'élément et volume de transaction est reçu via Sink, et le volume total de transaction et le volume de transaction de chaque élément sont générés en fonction de cette valeur.

Cet exemple montre comment utiliser l'API DataStream. Vous pouvez écrire plus efficacement. Les tables et SQL supérieurs prennent également en charge un mécanisme de retrait qui gère mieux cette situation.

image.png

Figure 8 Diagramme schématique de l'API.

Enfin, jetons un œil aux principes de l'API DataStream. Lorsque vous appelez l'algorithme de carte DataStream #, Flink crée un objet Transformation dans la couche inférieure. Cet objet représente un nœud dans le graphique logique de calcul. Il enregistre une fonction définie par l'utilisateur (UDF), MapFunction.

Créez plus d'objets DataStream en utilisant plus de méthodes. Chaque objet a un objet Transformation, qui forme une structure graphique basée sur des dépendances de calcul.

Ceci est un graphique de calcul. Flink transforme ensuite la structure du graphe pour finalement générer le graphe de tâche nécessaire pour soumettre le travail.

Aperçu

Cet article présente l'API Flink DataStream, une API de niveau inférieur pour Flink. Dans le développement réel, vous devez utiliser vous-même certains concepts basés sur l'API tels que State et Time, ce qui est gênant. Dans les cours suivants, nous présenterons également des tables et des API SQL de niveau supérieur. À l'avenir, Table et SQL pourraient devenir le courant dominant de l'API de Flink.

Cependant, les API de niveau inférieur créent une expressivité plus puissante. L'API DataStream peut être requise pour des opérations à granularité fine.

Recommended Posts

Développement de Flink à l'aide de l'API DataStream
[Rails 6] Développement d'API à l'aide de GraphQL (Query)
Notes de développement MOD à l'aide de l'API Minecraft 14.4 Fabric # 1
Créez un environnement de "développement d'API + vérification d'API à l'aide de Swagger UI" avec Docker
Exemple d'utilisation de vue.config.js
Développement du niveau de facteur
Résumé de l'utilisation de FragmentArgs
Développement de DSL avec ANTLR 4.7.1
Résumé de l'utilisation de DBFlow
Exemple de paramètres utilisant where
Résumé de l'utilisation de ButterKnife
Exemple d'utilisation d'une classe abstraite
Traitement des données avec Apache Flink
Limitation de débit à l'aide de RateLimiter of Resilience4j
Exemple d'utilisation de l'API Bulk de Salesforce à partir d'un client Java avec PK-chunking
Résumé de la compréhension de Docker par les débutants ③ ~ Jusqu'à l'API de proxy à l'aide de nginx ~