Présentation pratique d'Apache Beam (Dataflow) [Python]

introduction

Cet article est basé sur le contenu de la documentation Apache Beam (https://beam.apache.org/documentation/).

Il implémente un programme qui peut être traité par lots avec le SDK Apache Beam Python, et résume la procédure et la méthode pour l'exécuter avec Cloud Dataflow. Il aborde également les concepts de base d'Apache Beam, les tests et la conception.

beam-logo-full-color-name-right-500.png

Premiers pas avec le SDK Apache Beam

Le SDK Apache Beam peut être sélectionné parmi ** Java **, ** Python **, ** Go ** et fournit les ** fonctions suivantes qui simplifient le mécanisme de traitement distribué **. Faire.

Environnement d'exécution Apache Beam

Les programmes créés par le SDK Apache Beam peuvent être exécutés sur des systèmes de traitement de données distribués tels que: Dans Apache Beam, cet environnement d'exécution est appelé ** Runner **.

Cette fois, nous l'exécuterons dans deux environnements d'exécution, DirectRunner et DataflowRunner.

Implémentation du pipeline

Un programme Apache Beam général (simple) est créé et fonctionne comme suit.

  1. Créez un ** objet Pipeline ** et définissez les options d'exécution.
  2. Utilisez ** Read Transform ** pour lire des données à partir d'un système de stockage externe ou en mémoire et ** créer une PCollection **.
  3. Appliquez ** Transform ** à PCollection. Transform peut transformer les éléments de PCollection avec différentes logiques.
  4. Appliquez ** Write Transform ** pour écrire la PCollection transformée par Transform vers une source externe.

Pour ce flux de processus, le pipeline serait le suivant:

image.png

Implémentons en fait un pipeline simple comme celui ci-dessus en Python. L'environnement d'exploitation est supposé être le suivant.

--Version Python: 2.7 ou supérieur pour 2 séries ou 3.5 ou supérieur pour 3 séries

Installation du SDK Apache Beam

Si vous n'avez pas besoin de packages supplémentaires, installez-les avec la commande suivante:

pip install apache-beam

Cette fois, nous supposons qu'il sera exécuté sur Dataflow (GCP), nous allons donc également installer des packages supplémentaires de GCP.

pip install apache-beam[gcp]

Code d'achèvement

Ceci est le code complété. Je vais expliquer chacun d'eux ci-dessous.

pipeline.py


import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions


class MyOptions(PipelineOptions):
    """Options personnalisées."""
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_argument(
            '--input',
            default='./input.txt',
            help='Input path for the pipeline')

        parser.add_argument(
            '--output',
            default='./output.txt',
            help='Output path for the pipeline')


class ComputeWordLength(beam.DoFn):
    """Processus de conversion pour trouver le nombre de caractères."""

    def __init__(self):
        pass

    def process(self, element):
        yield len(element)


def run():
    options = MyOptions()
    # options.view_as(StandardOptions).runner = 'DirectRunner'
    p = beam.Pipeline(options=options)

    (p
     | 'ReadFromText' >> beam.io.ReadFromText(options.input)  # I/Appliquer O Transform pour charger les données dans le chemin facultatif
     | 'ComputeWordLength' >> beam.ParDo(ComputeWordLength())  #Appliquer la transformation
     | 'WriteToText' >> beam.io.WriteToText(options.output))  # I/Appliquer O Transform et écrire des données dans le chemin facultatif

    p.run()


if __name__ == '__main__':
    run()

Pipeline L'objet Pipeline ** encapsule toutes vos tâches de traitement de données **. Les programmes Apache Beam créent généralement d'abord un objet Pipeline pour créer une PCollection et appliquer une Transform.

Créer un pipeline

Pour utiliser le programme Apache Beam, vous devez d'abord créer une instance du pipeline Apache Beam SDK (généralement dans la fonction principale). Ensuite, lorsque vous créez le pipeline, vous définissez les options d'exécution.

Le code suivant est un exemple de création d'une instance de Pipeline.

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions


options = PipelineOptions()  #Options d'exécution
p = beam.Pipeline(options=options)

Paramètres de PipelineOptions

Vous pouvez utiliser PipelineOptions pour définir les exécuteurs qui exécutent le pipeline et ** les options spécifiques requises pour l'exécuteur sélectionné **. Par exemple, il peut contenir des informations telles que l'ID du projet et l'emplacement de stockage des fichiers.

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions


options = PipelineOptions()
options.view_as(StandardOptions).runner = 'DirectRunner'  #Désignation du coureur

p = beam.Pipeline(options=options)

Il existe deux options, l'une consiste à le définir par programme et l'autre à le transmettre à partir d'un argument de ligne de commande. Un exemple est décrit dans [Exécuter avec Cloud Dataflow](Exécuter avec # cloud-dataflow-) ci-dessous.

Ajouter des options personnalisées

Vous pouvez ajouter des ** options personnalisées ** en plus des PipelineOptions standard. L'exemple suivant ajoute une option pour spécifier les chemins d'entrée et de sortie. Les options personnalisées vous permettent également de spécifier une description ou une valeur par défaut qui sera affichée lorsque l'utilisateur passe --help à partir d'un argument de ligne de commande.

Vous pouvez créer des options personnalisées en ** héritant de PipelineOptions **.

class MyOptions(PipelineOptions):
    """Options personnalisées."""
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_argument(
            '--input',  #Nom de l'option
            default='./input.txt',  #Valeur par défaut
            help='Input path for the pipeline')  #La description

        parser.add_argument(
            '--output',
            default='./output.txt',
            help='Output path for the pipeline')

Transmettez les options que vous avez créées comme suit:

p = beam.Pipeline(options=MyOptions())

Pour définir une option personnalisée sur une valeur autre que la valeur par défaut, transmettez une valeur à partir de l'argument de ligne de commande comme suit:

--input=value --output==value

PCollection Une PCollection est un objet ** qui représente un ensemble de données à distribuer. Dans le pipeline Apache Beam, Transform utilise PCollection comme entrée et sortie. Par conséquent, si vous souhaitez traiter les données dans le pipeline, vous devez créer une PCollection.

Après avoir créé un objet Pipeline, vous devez d'abord créer au moins une PCollection.

Créer une PCollection

Utilisez I / O Transform pour lire des données à partir d'une source externe ou créer une PCollection à partir de la mémoire. Ce dernier est principalement utile pour les tests et le débogage.

Créer une PCollection à partir d'une source externe

Utilisez I / O Transform pour créer une PCollection à partir d'une source externe. Pour lire les données, appliquez la transformation de lecture fournie par chaque transformation d'E / S à l'objet Pipeline.

Voici comment appliquer une transformation en lecture à un pipeline pour créer une PCollection:

lines = p | 'ReadFromText' >> beam.io.ReadFromText('gs://some/input-data.txt')

Créer une PCollection à partir de la mémoire

Utilisez Créer une transformation pour créer une PCollection à partir de la mémoire.

lines = (p | 'ReadFromInMemory' >> beam.Create(['To be, or not to be: that is the question: ', 'Whether \'tis nobler in the mind to suffer ', 'The slings and arrows of outrageous fortune, ', 'Or to take arms against a sea of troubles, ']))

Transform Transform fournit un ** cadre de traitement général **. La transformation est appliquée à chaque élément de la PCollection d'entrée.

Le SDK Apache Beam fournit une variété de transformations que vous pouvez appliquer à votre PCollection. Cela inclut les ** transformations Core ** génériques telles que ParDo et Combine, ainsi que les ** transformations composites ** qui combinent une ou plusieurs transformations Core. Diverses transformations sont fournies, veuillez donc vous référer à ici.

Appliquer la transformation

Chaque transformation dans le SDK Apache Beam fournit l'opérateur de canal |, vous pouvez donc appliquer la transformation en appliquant cette méthode à l'entrée PCollection.

[Output PCollection] = [Input PCollection] | [Transform]

Vous pouvez également chaîner des transformations pour créer un pipeline comme suit:

[Output PCollection] = ([Initial Input PCollection] 
                             | [First Transform]
                             | [Second Transform]
                             | [Third Transform])

Ce pipeline a le même flux que cet exemple d'implémentation, le pipeline aura donc cette forme.

image.png

Transform crée une nouvelle PCollection sans apporter de modifications à la PCollection d'entrée. ** Transform ne modifie pas la PCollection d'entrée. ** PCollection est invariant par définition. Par conséquent, vous pouvez appliquer plusieurs transformations à la même PCollection pour créer une branche la PCollection.

[Output PCollection] = [Initial Input PCollection]

[Output PCollection A] = [Output PCollection] | [Transform A]
[Output PCollection B] = [Output PCollection] | [Transform B]

La forme de ce pipeline ressemble à ceci:

image.png

I/O Transform Lorsque vous créez un pipeline, vous devez souvent lire des données à partir d'une source externe, telle qu'un fichier ou une base de données. De même, vous pouvez générer des données du pipeline vers un système de stockage externe.

Le SDK Apache Beam fournit une transformation d'E / S pour les types de stockage de données courants (https://beam.apache.org/documentation/io/built-in/). Si vous souhaitez lire ou écrire un stockage de données non pris en charge, vous devez implémenter votre propre transformation d'E / S.

Lire les données

Read Transform transforme les données lues à partir d'une source externe en PCollection. Vous pouvez utiliser la transformation en lecture à tout moment lors de la création du pipeline, mais cela se fait généralement en premier.

lines = pipeline | beam.io.ReadFromText('gs://some/input-data.txt')

Écrire des données

Write Transform écrit les données de PCollection dans une source de données externe. Pour imprimer les résultats d'un pipeline, utilisez la transformation d'écriture à la fin du pipeline dans la plupart des cas.

output | beam.io.WriteToText('gs://some/output-data')

Lecture à partir de plusieurs fichiers

De nombreuses transformations de lecture prennent en charge la lecture à partir de plusieurs fichiers d'entrée qui correspondent à l'opérateur glob. L'exemple suivant utilise l'opérateur glob (*) pour lire tous les fichiers d'entrée correspondants avec le préfixe «input-» et le suffixe «.csv» à l'emplacement spécifié.

lines = p | 'ReadFromText' >> beam.io.ReadFromText('path/to/input-*.csv')

Écriture dans plusieurs fichiers

Write Transform écrit par défaut dans plusieurs fichiers. Ce faisant, le nom de fichier est utilisé comme préfixe pour tous les fichiers de sortie.

L'exemple suivant écrit plusieurs fichiers dans un même emplacement. Chaque fichier est précédé de «nombres» et suffixé de «.csv».

output | 'WriteToText' >> beam.io.WriteToText('/path/to/numbers', file_name_suffix='.csv')

Exécution du pipeline

Exécutons maintenant le pipeline à l'aide du code terminé (#Completed Code). Exécutez localement et Cloud Dataflow comme environnement d'exécution.

Préparez un fichier texte contenant la chaîne de caractères suivante pour la saisie.

input.txt


good morning.
good afternoon.
good evening.

Exécutez localement

Pour exécuter le pipeline localement, définissez PipelineOptions sur DirectRunner en tant que runner, mais vous n'avez pas besoin de spécifier explicitement le runner, sauf si vous avez un paramètre spécifique.

Exécutez la commande suivante à partir de la ligne de commande. Réécrivez la destination d'entrée et les chemins de destination de sortie en fonction de l'environnement.

python pipeline.py --input=./input.txt --output=./output.txt

Cet exemple d'implémentation est un pipeline qui compte le nombre de caractères dans un mot, le résultat suivant est donc généré. De plus, par défaut, «beam.io.WriteToText» ajoute la chaîne «00000-of-00001» à la fin du nom de fichier pour distribuer et écrire dans plusieurs fichiers. Si vous voulez écrire dans un fichier, vous pouvez le faire en vidant l'argument shard_name_template.

output.txt-00000-of-00001


13
15
13

Exécuter dans Cloud Dataflow

Cloud Dataflow est un service entièrement géré fourni par GCP (Google Cloud Platfom) qui traite les données en mode flux ou en mode batch. .. Les utilisateurs peuvent traiter une énorme quantité de données en utilisant une capacité pratiquement illimitée sur une base de paiement à l'utilisation sans se soucier du fonctionnement des infrastructures telles que les serveurs.

L'exécution du pipeline dans Cloud Dataflow crée une tâche dans votre projet GCP qui utilise les ressources Compute Engine et Cloud Storage. Pour tirer parti de Cloud Dataflow, activez l '** API Dataflow ** dans GCP.

Une petite modification est nécessaire pour exécuter [Completed Code](#Completed Code) dans Cloud Dataflow. Modifiez-le comme suit.

pipeline.py


import apache_beam as beam
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import WorkerOptions


GCP_PROJECT_ID = 'my-project-id'
GCS_BUCKET_NAME = 'gs://my-bucket-name'
JOB_NAME = 'compute-word-length'


class MyOptions(PipelineOptions):
    """Options personnalisées."""
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_argument(
            '--input',
            default='{}/input.txt'.format(GCS_BUCKET_NAME),  #Entrée à GCS.Mettre txt
            help='Input for the pipeline')

        parser.add_argument(
            '--output',
            default='{}/output.txt'.format(GCS_BUCKET_NAME),  #Sortie vers GCS
            help='Output for the pipeline')


class ComputeWordLength(beam.DoFn):
    """Processus de conversion pour trouver le nombre de caractères."""

    def __init__(self):
        pass

    def process(self, element):
        yield len(element)


def run():
    options = MyOptions()

    #Option GCP
    google_cloud_options = options.view_as(GoogleCloudOptions)
    google_cloud_options.project = GCP_PROJECT_ID  #ID du projet
    google_cloud_options.job_name = JOB_NAME  #Nom de travail arbitraire
    google_cloud_options.staging_location = '{}/binaries'.format(GCS_BUCKET_NAME)  #Chemin GCS pour les fichiers intermédiaires
    google_cloud_options.temp_location = '{}/temp'.format(GCS_BUCKET_NAME)  #Chemin GCS pour les fichiers temporaires

    #Options des travailleurs
    options.view_as(WorkerOptions).autoscaling_algorithm = 'THROUGHPUT_BASED'  #Activer la mise à l'échelle automatique

    #Option standard
    options.view_as(StandardOptions).runner = 'DataflowRunner'  #Spécifier le runner Dataflow

    p = beam.Pipeline(options=options)

    (p
     | 'ReadFromText' >> beam.io.ReadFromText(options.input)
     | 'ComputeWordLength' >> beam.ParDo(ComputeWordLength())
     | 'WriteToText' >> beam.io.WriteToText(options.output, shard_name_template=""))

    p.run()
    # p.run().wait_until_finish()  #Bloquer jusqu'à ce que le pipeline soit terminé


if __name__ == '__main__':
    run()

Voir ici pour plus d'options Dataflow (https://cloud.google.com/dataflow/docs/guides/specifying-exec-params?hl=ja#-cloud-dataflow--). L'option «streaming» doit être «true» pour effectuer le streaming.

Cela peut également être exécuté avec une commande similaire.

python pipeline.py --input=gs://my-project-id/input.txt --output=gs://my-project-id/output.txt

Les options définies dans le programme peuvent également être passées à partir des arguments de ligne de commande comme celui-ci.

python pipeline.py \
  --input=gs://my-project-id/input.txt \
  --output=gs://my-project-id/output.txt \
  --runner=DataflowRunner \
  --project=my-project-id \
  --temp_location=gs://my-project-id/tmp/
  ...

Vous pouvez surveiller le pipeline en accédant au service Dataflow à partir de GCP. L'interface utilisateur ressemble à ceci et le résultat est émis vers le chemin spécifié.

スクリーンショット 2020-01-03 15.36.16.png

Si vous souhaitez exécuter ce traitement par lots de Dataflow régulièrement, il est pratique d'utiliser le ** modèle Dataflow **. Pour plus d'informations, consultez ici.

Test de pipeline

Lors du test de pipelines, ** les tests unitaires locaux peuvent souvent économiser beaucoup de temps et d'efforts ** par rapport au débogage d'exécutions à distance telles que Dataflow.

Vous devez installer les éléments suivants pour résoudre la dépendance:

pip install nose

Exemple d'implémentation

Pour tester le pipeline, utilisez l'objet TestPipeline. Au lieu de lire l'entrée depuis une source externe, utilisez ʻapache_beam.Create pour créer une PCollection à partir de la mémoire. Comparez la sortie avec ʻassert_that.

test_pipeline.py


from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to

from src.pipeline import ComputeWordLength


class PipelineTest(TestCase):

    def test_pipeline(self):
        expected = [
            13,
            15,
            13
        ]

        inputs = [
            'good morning.',
            'good afternoon.',
            'good evening.'
        ]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.ParDo(ComputeWordLength()))

            assert_that(actual, equal_to(expected))

Conception de pipeline

Dans [ci-dessus](application de # transform-), nous avons brièvement expliqué la conception (flux de traitement) lors de la création d'un pipeline simple et d'un pipeline de branchement. Ici, nous présenterons d'autres conceptions de pipeline courantes.

Un pipeline avec une transformation qui produit plusieurs PCollections

image.png

Cela peut être réalisé à l'aide de la fonctionnalité de sorties supplémentaires d'Apache Beam (https://beam.apache.org/documentation/programming-guide/#additional-outputs).

class ExtractWord(beam.DoFn):

   def process(element):
        if element.startswith('A'):
            yield pvalue.TaggedOutput('a', element)  #Donnez un nom de tag (commençant'A'Si c'est un élément de'a')
        elif element.startswith('B'):
            yield pvalue.TaggedOutput('b', element)  #Donnez un nom de tag (commençant'B'Si c'est un élément de'b')


mixed_col = db_row_col | beam.ParDo(ExtractWord()).with_outputs()

mixed_col.a | beam.ParDo(...)  # .Accessible par nom de balise
mixed_col.b | beam.ParDo(...)

Un pipeline avec une transformation qui rejoint PCollections

image.png

Ceci peut être réalisé en utilisant «Flatten».

col_list = (a_col, b_col) | beam.Flatten()

Pipeline avec plusieurs sources d'entrée

image.png

Vous pouvez créer une PCollection à partir de chaque source d'entrée et la joindre avec CoGroupByKey etc.

user_address = p | beam.io.ReadFromText(...)
user_order = p | beam.io.ReadFromText(...)

joined_col = (user_address, user_order) | beam.CoGroupByKey()

joined_col | beam.ParDo(...)

Autres fonctions utiles

Vous souhaiterez peut-être également connaître les fonctionnalités suivantes afin de pouvoir gérer différents cas d'utilisation.

Composite transforms Les transformations composites sont une combinaison de plusieurs transformations (ParDo, Combine, GroupByKey ...). L'imbrication de plusieurs transformations rend votre code plus modulaire et plus facile à comprendre.

Exemple d'implémentation

Pour implémenter des transformations composites, vous devez étendre la classe Transform et remplacer la méthode d'expansion.

"""Un pipeline qui compte le nombre de mots dans une phrase."""
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions


class ComputeWordCount(beam.PTransform):
    """Transformations composites comptant le nombre de mots."""

    def __init__(self):
        pass

    def expand(self, pcoll):
        return (pcoll
                | 'SplitWithHalfSpace' >> beam.Map(lambda element: element.split(' '))
                | 'ComputeArraySize' >> beam.Map(lambda element: len(element)))


def run():
    p = beam.Pipeline(options=PipelineOptions())

    inputs = ['There is no time like the present.', 'Time is money.']

    (p
     | 'Create' >> beam.Create(inputs)
     | 'ComputeWordCount' >> ComputeWordCount()
     | 'WriteToText' >> beam.io.WriteToText('Chemin de destination de sortie'))

    p.run()

if __name__ == '__main__':
    run()
    

output


7
3

Side inputs Les entrées latérales sont une fonction qui vous permet de transmettre des entrées supplémentaires (entrées secondaires) à une transformation en plus des entrées normales (entrée principale) PCollection.

Exemple d'implémentation

"""Un pipeline qui ne produit que des chaînes avec plus de caractères que la moyenne."""
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam import pvalue


class FilterMeanLengthFn(beam.DoFn):
    """Filtrer les chaînes avec un nombre de caractères supérieur à la moyenne."""

    def __init__(self):
        pass

    # mean_word_la longueur est une sous-entrée
    def process(self, element, mean_word_length):
        if len(element) >= mean_word_length:
            yield element


def run():
    p = beam.Pipeline(options=PipelineOptions())

    inputs = ["good morning.", "good afternoon.", "good evening."]

    #Sous-entrée
    mean_word_length = word_lengths | 'ComputeMeanWordLength' >> beam.CombineGlobally(beam.combiners.MeanCombineFn())

    #Entrée principale
    output = (p
              | 'Create' >> beam.Create(inputs)
              | 'FilterMeanLength' >> beam.ParDo(FilterMeanLengthFn(), pvalue.AsSingleton(mean_word_length))  #Insérez une sous-entrée dans le deuxième argument de ParDo
              | 'write to text' >> beam.io.WriteToText('Chemin de destination de sortie'))

    p.run().wait_until_finish()


if __name__ == '__main__':
    run()

Le nombre de caractères «bonjour», «bon après-midi» et «bonsoir». Sont respectivement «13», «15» et «13», et la moyenne est d'environ 13,67, le résultat est donc le suivant.

output


good afternoon.

Que se passe-t-il dans le pipeline?

Il décrit un peu "ce qui se passe dans le pipeline".

Sérialiser et communiquer

L'une des opérations les plus coûteuses du traitement de pipeline distribué est la ** sérialisation et la communication d'éléments entre les machines **. Le runner Apache Beam sérialise les éléments de PCollection, par exemple parce qu'il communique entre les machines. Communiquez les éléments entre la transformation et la transformation à l'étape suivante en utilisant les techniques suivantes:

  1. Sérialisez l'élément et acheminez-le vers le collaborateur
  2. Sérialisez l'élément et redistribuez-le à plusieurs nœuds de calcul
  3. Lors de l'utilisation d'entrées latérales, l'élément doit être sérialisé et diffusé à tous les travailleurs
  4. Si la transformation et la transformation de l'étape suivante sont exécutées par le même travailleur, les éléments sont communiqués en utilisant en mémoire (le coût de communication peut être réduit en ne sérialisant pas).

Bundle et persistant

Apache Beam se concentre sur le problème Parallèlement embarrassant. Comme Apache Beam attache une grande importance au traitement des éléments en parallèle, il n'est pas bon pour exprimer des actions telles que ** attribuer des numéros de séquence à chaque élément de PCollection **. En effet, ces algorithmes sont beaucoup plus susceptibles d'avoir des problèmes d'évolutivité.

** Le traitement de tous les éléments en parallèle ** présente également quelques inconvénients. Par exemple, lors de l'écriture d'un élément dans la destination de sortie. Dans le traitement de sortie, il n'est pas possible de traiter par lots tous les éléments en parallèle.

Par conséquent, le runner Apache Beam ne traite pas tous les éléments en même temps, mais regroupe et traite les éléments de PCollection. Dans le cas du traitement en continu, il a tendance à être groupé et traité en petites unités, et dans le cas du traitement par lots, il a tendance à être groupé et traité en unités plus grandes.

Traitement parallèle

Traitement parallèle dans Transform

Lors de l'exécution d'un seul ParDo, le programme d'exécution Apache Beam peut diviser et regrouper les éléments de PCollection en deux.

image.png

Lorsque ParDo est exécuté, le worker traite les deux bundles en parallèle, comme illustré ci-dessous.

image.png

Puisqu'un seul élément ne peut pas être divisé, le parallélisme maximum d'une Transform dépend du nombre d'éléments dans la PCollection. Le nombre maximum de processus parallèles dans ce cas est de ** 9 ** comme le montre la figure.

Traitement parallèle entre les transformations

Les ParDos peuvent être des parallèles subordonnés. Par exemple, ParDo1 et ParDo2 sont parallèles dépendants si la sortie de ParDo1 doit être traitée par le même worker comme suit:

image.png

Worker1 exécute ParDo1 sur les éléments du Bundle A, qui devient le Bundle C. Ensuite, ParDo2 est exécuté sur les éléments du Bundle C. De même, Worker2 exécute ParDo1 sur les éléments du Bundle B, qui devient Bundle D. Ensuite, ParDo2 est exécuté sur les éléments du Bundle D.

image.png

En exécutant ParDo de cette manière, les exécuteurs Apache Beam peuvent éviter de redistribuer des éléments entre les nœuds de calcul. Et cela économise des coûts de communication. Cependant, ** le nombre maximal de processus parallèles dépendra désormais du nombre maximal de processus parallèles pour le premier ParDo dans le parallèle dépendant. ** **

Comportement en cas de panne

Comportement en cas d'échec de la transformation

Si le traitement des éléments du Bundle échoue, le Bundle entier échouera. Par conséquent, le processus doit être retenté (sinon tout le pipeline échouera).

Dans l'exemple suivant, Worker1 gère avec succès les cinq éléments du bundle A. Worker2 gère les quatre éléments du Bundle B, mais les deux premiers éléments du Bundle B sont traités avec succès et le troisième élément échoue.

Le programme d'exécution Apache Beam réessaye ensuite tous les éléments du Bundle B, et la deuxième fois, il se termine avec succès. Comme indiqué, ** les tentatives ne se produisent pas toujours dans le même Worker que la tentative de traitement d'origine. ** **

image.png

Comportement en cas d'échec entre les transformations

Si les éléments de ParDo2 ne peuvent pas être traités après le traitement de ParDo1, ces deux transformations échoueront en même temps.

Dans l'exemple suivant, Worker2 exécute avec succès ParDo1 sur tous les éléments du Bundle B. Cependant, ParDo2 échoue car il ne peut pas gérer les éléments de Bundle D.

Par conséquent, le programme d'exécution Apache Beam doit ignorer la sortie ParDo2 et exécuter à nouveau le processus. Dans ce cas, le Bundle ParDo1 doit également être détruit et tous les éléments du ** Bundle doivent être retentés. ** **

image.png

Résumé

J'ai essayé de résumer ce que j'avais appris en me basant sur le contenu de la documentation Apache Beam. Veuillez signaler toute erreur! : arc:

Recommended Posts

Présentation pratique d'Apache Beam (Dataflow) [Python]
Touchez Apache Beam avec Python
Apache Beam Cheet Sheet [Python]
Introduction à Apache Beam avec Cloud Dataflow (sur la série 2.0.0) ~ Partie de base ~ ParDo ~
Introduction à Apache Beam avec Google Cloud Dataflow (sur la série 2.0.x) ~ Combine Edition ~
Introduction de Python
Introduction au langage Python
Introduction à OpenCV (python) - (2)
Cours de base Python (Introduction)
Guide du débutant Python (Introduction)
Introduction à Apache Beam avec Google Cloud Dataflow (sur la série 2.0.x) ~ Basic Group By Key ~
Introduction à Python Django (2) Win
Apache mod_auth_tkt et Python AuthTkt
Python3 + Django ~ Mac ~ avec Apache
Introduction d'activités appliquant Python
Introduction à la communication série [Python]
Apache sur macports, Python 3.3 + mod_wsgi3.4 sur non-macports
Modèles de conception en Python: introduction
[Introduction à Python] <liste> [modifier le 22/02/2020]
Introduction à Python (version Python APG4b)
Une introduction à la programmation Python
Introduction à Python pour, pendant
Apache Beam 2.0.x avec Google Cloud Dataflow commençant par IntelliJ et Gradle