Apache Beam Cheet Sheet [Python]

introduction

Cet article résume les transformations fournies par le SDK Apache Beam Python. En connaissant toutes les transformations qui peuvent être appelées facilement, je pense que vous pouvez planifier la mise en œuvre plus rapidement.

Traitement élément par élément | Par élément

ParDo-Run DoFn

Considérez chaque élément de la PCollection et effectuez un certain traitement (DoFn).

test_par_do.py


from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class ComputeWordLength(beam.DoFn):

    def __init__(self):
        super(ComputeWordLength, self).__init__()

    def process(self, element):
        yield len(element)


class TestParDo(TestCase):

    def test_par_do(self):
        expected = [5, 3, 7, 7, 5]

        inputs = ['Alice', 'Bob', 'Cameron', 'Daniele', 'Ellen']

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.ParDo(ComputeWordLength()))

            assert_that(actual, equal_to(expected))

Filtre - Filtrage des éléments

Filtrez les éléments de la PCollection.

test_filter.py


from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestFilter(TestCase):

    def test_filter(self):
        expected = ['A']

        inputs = ['A', 'B', 'C']

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.Filter(lambda element: element.startswith('A')))

            assert_that(actual, equal_to(expected))

Map-Apply fonction à l'élément

Appliquez une fonction à chaque élément de la PCollection.

test_map.py


from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestMap(TestCase):

    def test_map(self):
        expected = [5, 3, 7, 7, 5]

        inputs = ['Alice', 'Bob', 'Cameron', 'Daniele', 'Ellen']

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.Map(lambda element: len(element)))

            assert_that(actual, equal_to(expected))

Fonction FlatMap-Apply à l'élément (répétée)

Appliquez une fonction à chaque élément de la PCollection.

test_flat_map.py


from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestFlatMap(TestCase):

    def test_flat_map(self):
        expected = [5, 3, 7, 7, 5]

        inputs = [['Alice', 'Bob'], ['Cameron', 'Daniele', 'Ellen']]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.FlatMap(lambda element: [len(e) for e in element]))

            assert_that(actual, equal_to(expected))

ToString --Convertir l'élément en chaîne

Convertit chaque élément de PCollection en une chaîne.

test_to_string.py


from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestToString(TestCase):

    def test_to_string_kvs(self):
        """Key,Valeur,Vers la chaîne de délimitation."""
        expected = ['A,B', 'C,D']

        inputs = [('A', 'B'), ('C', 'D')]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.ToString.Kvs())

            assert_that(actual, equal_to(expected))

    def test_to_string_element(self):
        """Chaque élément dans une chaîne."""
        expected = ["A", "['A', 'B']", "['C', 'D', 'E']"]

        inputs = ['A', ['A', 'B'], ['C', 'D', 'E']]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.ToString.Element())

            assert_that(actual, equal_to(expected))

    def test_to_string_iterables(self):
        """Transformez les objets itérables en chaînes."""
        expected = ['A,B', 'C,D,E']

        inputs = [['A', 'B'], ['C', 'D', 'E']]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.ToString.Iterables())

            assert_that(actual, equal_to(expected))

Clés - Extraire les clés des éléments

Extrayez la clé de chaque élément de la PCollection (paire clé / valeur).

test_keys.py


from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestKeys(TestCase):

    def test_keys(self):
        expected = [0, 1, 2, 3, 4, 5, 6]

        inputs = [(0, 'Sunday'), (1, 'Monday'), (2, 'Tuesday'), (3, 'Wednesday'),
                  (4, 'Thursday'), (5, 'Friday'), (6, 'Saturday')]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.Keys())

            assert_that(actual, equal_to(expected))

Valeurs - Extraire la valeur de l'élément

Extrait une valeur de chaque élément de la PCollection (paire clé / valeur).

test_values.py


from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestValues(TestCase):

    def test_values(self):
        expected = ['Sunday', 'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday']

        inputs = [(0, 'Sunday'), (1, 'Monday'), (2, 'Tuesday'), (3, 'Wednesday'),
                  (4, 'Thursday'), (5, 'Friday'), (6, 'Saturday')]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.Values())

            assert_that(actual, equal_to(expected))

Clés et valeurs des éléments KvSwap-swap

Échangez les valeurs de clé et de valeur de chaque élément de la PCollection (paire clé et valeur).

test_kv_swap.py


from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestKvSwap(TestCase):

    def test_kv_swap(self):
        expected = [('Friday', 5), ('Monday', 1), ('Saturday', 6), ('Sunday', 0),
                    ('Thursday', 4), ('Tuesday', 2), ('Wednesday', 3)]

        inputs = [(0, 'Sunday'), (1, 'Monday'), (2, 'Tuesday'), (3, 'Wednesday'),
                  (4, 'Thursday'), (5, 'Friday'), (6, 'Saturday')]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.KvSwap())

            assert_that(actual, equal_to(expected))

Agrégation | Agrégation

GroupByKey --Agréger des éléments par clé

Agréger les éléments d'une PCollection (paires clé / valeur) par clé.

test_group_by_key.py


from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestGroupByKey(TestCase):

    def test_group_by_key(self):
        expected = [('cat', ['tama', 'mike']), ('dog', ['pochi'])]

        inputs = [('cat', 'tama'), ('cat', 'mike'), ('dog', 'pochi')]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.GroupByKey())

            assert_that(actual, equal_to(expected))

CoGroupByKey --Agréger les éléments par clé (plusieurs PCollections)

Agréger les éléments de plusieurs PCollections (paires clé / valeur) par clé.

test_co_group_by_key.py


from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestCoGroupByKey(TestCase):

    def test_co_group_by_key(self):
        expected = [
            ('amy', (['[email protected]'], ['111-222-3333', '333-444-5555'])),
            ('julia', (['[email protected]'], []))
        ]

        inputs1 = [('amy', '[email protected]'), ('julia', '[email protected]')]
        inputs2 = [('amy', '111-222-3333'), ('amy', '333-444-5555')]

        with TestPipeline() as p:
            pcol1 = p | 'create pcol1' >> beam.Create(inputs1)
            pcol2 = p | 'create pcol2' >> beam.Create(inputs2)

            actual = ((pcol1, pcol2)
                      | beam.CoGroupByKey())

            assert_that(actual, equal_to(expected))

CombineGlobally-Combine des éléments

Combine tous les éléments d'une PCollection.

combine_globally.py


from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestCombineGlobally(TestCase):

    def test_combine_globally(self):
        expected = [55]

        inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.CombineGlobally(sum))

            assert_that(actual, equal_to(expected))

Éléments ToList-Store dans une liste

Stocke tous les éléments de la PCollection dans une seule liste.

test_to_list.py


from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestToList(TestCase):

    def test_to_list(self):
        expected = [[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]

        inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.combiners.ToList())

            assert_that(actual, equal_to(expected))

ToDict-Store éléments dans un seul type de dictionnaire

Stocke tous les éléments de la PCollection (paires clé / valeur) dans un seul type de dictionnaire.

test_to_dict.py


from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestToDict(TestCase):

    def test_to_dict(self):
        expected = [{'A': 2, 'B': 1}]  #Si la clé est couverte, l'une ou l'autre des valeurs est sélectionnée

        inputs = [('A', 1), ('A', 2), ('B', 1)]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.combiners.ToDict())

            assert_that(actual, equal_to(expected))

Compter-compter le nombre d'éléments

Comptez le nombre d'éléments dans la PCollection.

test_count.py


from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestCount(TestCase):

    def test_count(self):
        expected = [10]

        inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.combiners.Count.Globally())

            assert_that(actual, equal_to(expected))

Déduplication des éléments distincts

Éliminez les doublons des éléments de la PCollection.

test_distinct.py


from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestDistinct(TestCase):

    def test_distinct(self):
        expected = [1, 2, 3]

        inputs = [1, 1, 2, 3]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.Distinct())

            assert_that(actual, equal_to(expected))

Moyenne de l'élément Calculer la moyenne

Calcule la moyenne de tous les éléments de la PCollection.

test_mean.py


from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestMean(TestCase):

    def test_mean(self):
        expected = [5.5]

        inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.combiners.Mean.Globally())

            assert_that(actual, equal_to(expected))

Échantillon: extrait au hasard des éléments

Extrayez-en quelques uns au hasard de tous les éléments de la PCollection

test_sample.py


from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestSample(TestCase):

    def test_sample(self):
        expected = [[2, 8, 6]]  #La valeur attendue sera une valeur aléatoire à chaque fois

        inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.combiners.Sample.FixedSizeGlobally(3))

            assert_that(actual, equal_to(expected))

Top-Extraction des valeurs maximales (ou minimales) des éléments

Extrayez certains des éléments les plus grands (ou les plus petits) de la PCollection.

test_top.py


from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestTop(TestCase):

    def test_top_largest(self):
        expected = [[10, 9, 8]]

        inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.combiners.Top.Largest(3))

            assert_that(actual, equal_to(expected))

    def test_top_smallest(self):
        expected = [[1, 2, 3]]

        inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.combiners.Top.Smallest(3))

            assert_that(actual, equal_to(expected))

Autre traitement | Autres

Flatten-Join PCollection

Combinez plusieurs PCollections en une seule PCollection.

test_flatten.py


from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestFlatten(TestCase):

    def test_flatten(self):
        expected = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

        inputs1 = [1, 2, 3, 4, 5]
        inputs2 = [6, 7, 8, 9, 10]

        with TestPipeline() as p:
            pcol1 = p | 'create pcol1' >> beam.Create(inputs1)
            pcol2 = p | 'create pcol2' >> beam.Create(inputs2)

            actual = (pcol1, pcol2) | beam.Flatten()

            assert_that(actual, equal_to(expected))

Remaniement-Redistribution des éléments

Redistribuez les éléments de la PCollection parmi les travailleurs.

test_reshuffle.py


from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestReshuffle(TestCase):

    def test_reshuffle(self):
        expected = ['A', 'B', 'C']

        inputs = ['A', 'B', 'C']

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.Reshuffle())

            assert_that(actual, equal_to(expected))

Résumé

Le SDK Apache Beam Python fournit une multitude de transformations (bien que inférieures à Java). Je voudrais le mettre à jour au fur et à mesure que de nouvelles fonctionnalités sont fournies.

J'espère que vous pourrez vous y référer lorsque vous voudrez vous souvenir de la transformation d'Apache Beam.

URL de référence

Recommended Posts

Apache Beam Cheet Sheet [Python]
Aide-mémoire Python3 (basique)
Fiche technique PySpark [Python]
Feuille de triche de tri Python
[Python3] Entrée standard [Cheet sheet]
Fiche technique de la science des données (Python)
Touchez Apache Beam avec Python
Présentation pratique d'Apache Beam (Dataflow) [Python]
Aide-mémoire Python (pour les expérimentés C ++)
Aide-mémoire au curry
Fiche de triche AtCoder en python (pour moi-même)
Fiche technique de l'accès aux données Blender Python Mesh
Feuille de calcul du modélisateur d'optimisation mathématique (PuLP) (Python)
Aide-mémoire SQLite3
feuille de triche pyenv
[Mise à jour] Aide-mémoire de la syntaxe Python pour la boutique Java
feuille de triche de commande conda
Aide-mémoire PIL / Pillow
feuille de triche de commande ps
Feuilles de triche PDF basées sur Python
feuille de triche de fichier de réglage tox
feuille de triche de réutilisation de la mémoire numpy
Apache mod_auth_tkt et Python AuthTkt
Python3 + Django ~ Mac ~ avec Apache
Aide-mémoire sur les pièces jointes de l'API Slack
Apache sur macports, Python 3.3 + mod_wsgi3.4 sur non-macports
feuille de triche de l'algorithme scikit learn
Aide-mémoire personnel Google Test / Mock
Aide-mémoire sur le style de livraison continue (CPS)
Créer Apache Log CSV avec Python
Jusqu'à ce que Python fonctionne sur Apache