Cet article résume les transformations fournies par le SDK Apache Beam Python. En connaissant toutes les transformations qui peuvent être appelées facilement, je pense que vous pouvez planifier la mise en œuvre plus rapidement.
Considérez chaque élément de la PCollection et effectuez un certain traitement (DoFn).
test_par_do.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class ComputeWordLength(beam.DoFn):
def __init__(self):
super(ComputeWordLength, self).__init__()
def process(self, element):
yield len(element)
class TestParDo(TestCase):
def test_par_do(self):
expected = [5, 3, 7, 7, 5]
inputs = ['Alice', 'Bob', 'Cameron', 'Daniele', 'Ellen']
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.ParDo(ComputeWordLength()))
assert_that(actual, equal_to(expected))
Filtrez les éléments de la PCollection.
test_filter.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestFilter(TestCase):
def test_filter(self):
expected = ['A']
inputs = ['A', 'B', 'C']
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.Filter(lambda element: element.startswith('A')))
assert_that(actual, equal_to(expected))
Appliquez une fonction à chaque élément de la PCollection.
test_map.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestMap(TestCase):
def test_map(self):
expected = [5, 3, 7, 7, 5]
inputs = ['Alice', 'Bob', 'Cameron', 'Daniele', 'Ellen']
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.Map(lambda element: len(element)))
assert_that(actual, equal_to(expected))
Appliquez une fonction à chaque élément de la PCollection.
test_flat_map.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestFlatMap(TestCase):
def test_flat_map(self):
expected = [5, 3, 7, 7, 5]
inputs = [['Alice', 'Bob'], ['Cameron', 'Daniele', 'Ellen']]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.FlatMap(lambda element: [len(e) for e in element]))
assert_that(actual, equal_to(expected))
Convertit chaque élément de PCollection en une chaîne.
test_to_string.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestToString(TestCase):
def test_to_string_kvs(self):
"""Key,Valeur,Vers la chaîne de délimitation."""
expected = ['A,B', 'C,D']
inputs = [('A', 'B'), ('C', 'D')]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.ToString.Kvs())
assert_that(actual, equal_to(expected))
def test_to_string_element(self):
"""Chaque élément dans une chaîne."""
expected = ["A", "['A', 'B']", "['C', 'D', 'E']"]
inputs = ['A', ['A', 'B'], ['C', 'D', 'E']]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.ToString.Element())
assert_that(actual, equal_to(expected))
def test_to_string_iterables(self):
"""Transformez les objets itérables en chaînes."""
expected = ['A,B', 'C,D,E']
inputs = [['A', 'B'], ['C', 'D', 'E']]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.ToString.Iterables())
assert_that(actual, equal_to(expected))
Extrayez la clé de chaque élément de la PCollection (paire clé / valeur).
test_keys.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestKeys(TestCase):
def test_keys(self):
expected = [0, 1, 2, 3, 4, 5, 6]
inputs = [(0, 'Sunday'), (1, 'Monday'), (2, 'Tuesday'), (3, 'Wednesday'),
(4, 'Thursday'), (5, 'Friday'), (6, 'Saturday')]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.Keys())
assert_that(actual, equal_to(expected))
Extrait une valeur de chaque élément de la PCollection (paire clé / valeur).
test_values.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestValues(TestCase):
def test_values(self):
expected = ['Sunday', 'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday']
inputs = [(0, 'Sunday'), (1, 'Monday'), (2, 'Tuesday'), (3, 'Wednesday'),
(4, 'Thursday'), (5, 'Friday'), (6, 'Saturday')]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.Values())
assert_that(actual, equal_to(expected))
Échangez les valeurs de clé et de valeur de chaque élément de la PCollection (paire clé et valeur).
test_kv_swap.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestKvSwap(TestCase):
def test_kv_swap(self):
expected = [('Friday', 5), ('Monday', 1), ('Saturday', 6), ('Sunday', 0),
('Thursday', 4), ('Tuesday', 2), ('Wednesday', 3)]
inputs = [(0, 'Sunday'), (1, 'Monday'), (2, 'Tuesday'), (3, 'Wednesday'),
(4, 'Thursday'), (5, 'Friday'), (6, 'Saturday')]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.KvSwap())
assert_that(actual, equal_to(expected))
Agréger les éléments d'une PCollection (paires clé / valeur) par clé.
test_group_by_key.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestGroupByKey(TestCase):
def test_group_by_key(self):
expected = [('cat', ['tama', 'mike']), ('dog', ['pochi'])]
inputs = [('cat', 'tama'), ('cat', 'mike'), ('dog', 'pochi')]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.GroupByKey())
assert_that(actual, equal_to(expected))
Agréger les éléments de plusieurs PCollections (paires clé / valeur) par clé.
test_co_group_by_key.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestCoGroupByKey(TestCase):
def test_co_group_by_key(self):
expected = [
('amy', (['[email protected]'], ['111-222-3333', '333-444-5555'])),
('julia', (['[email protected]'], []))
]
inputs1 = [('amy', '[email protected]'), ('julia', '[email protected]')]
inputs2 = [('amy', '111-222-3333'), ('amy', '333-444-5555')]
with TestPipeline() as p:
pcol1 = p | 'create pcol1' >> beam.Create(inputs1)
pcol2 = p | 'create pcol2' >> beam.Create(inputs2)
actual = ((pcol1, pcol2)
| beam.CoGroupByKey())
assert_that(actual, equal_to(expected))
Combine tous les éléments d'une PCollection.
combine_globally.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestCombineGlobally(TestCase):
def test_combine_globally(self):
expected = [55]
inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.CombineGlobally(sum))
assert_that(actual, equal_to(expected))
Stocke tous les éléments de la PCollection dans une seule liste.
test_to_list.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestToList(TestCase):
def test_to_list(self):
expected = [[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]
inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.combiners.ToList())
assert_that(actual, equal_to(expected))
Stocke tous les éléments de la PCollection (paires clé / valeur) dans un seul type de dictionnaire.
test_to_dict.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestToDict(TestCase):
def test_to_dict(self):
expected = [{'A': 2, 'B': 1}] #Si la clé est couverte, l'une ou l'autre des valeurs est sélectionnée
inputs = [('A', 1), ('A', 2), ('B', 1)]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.combiners.ToDict())
assert_that(actual, equal_to(expected))
Comptez le nombre d'éléments dans la PCollection.
test_count.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestCount(TestCase):
def test_count(self):
expected = [10]
inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.combiners.Count.Globally())
assert_that(actual, equal_to(expected))
Éliminez les doublons des éléments de la PCollection.
test_distinct.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestDistinct(TestCase):
def test_distinct(self):
expected = [1, 2, 3]
inputs = [1, 1, 2, 3]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.Distinct())
assert_that(actual, equal_to(expected))
Calcule la moyenne de tous les éléments de la PCollection.
test_mean.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestMean(TestCase):
def test_mean(self):
expected = [5.5]
inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.combiners.Mean.Globally())
assert_that(actual, equal_to(expected))
Extrayez-en quelques uns au hasard de tous les éléments de la PCollection
test_sample.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestSample(TestCase):
def test_sample(self):
expected = [[2, 8, 6]] #La valeur attendue sera une valeur aléatoire à chaque fois
inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.combiners.Sample.FixedSizeGlobally(3))
assert_that(actual, equal_to(expected))
Extrayez certains des éléments les plus grands (ou les plus petits) de la PCollection.
test_top.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestTop(TestCase):
def test_top_largest(self):
expected = [[10, 9, 8]]
inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.combiners.Top.Largest(3))
assert_that(actual, equal_to(expected))
def test_top_smallest(self):
expected = [[1, 2, 3]]
inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.combiners.Top.Smallest(3))
assert_that(actual, equal_to(expected))
Combinez plusieurs PCollections en une seule PCollection.
test_flatten.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestFlatten(TestCase):
def test_flatten(self):
expected = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
inputs1 = [1, 2, 3, 4, 5]
inputs2 = [6, 7, 8, 9, 10]
with TestPipeline() as p:
pcol1 = p | 'create pcol1' >> beam.Create(inputs1)
pcol2 = p | 'create pcol2' >> beam.Create(inputs2)
actual = (pcol1, pcol2) | beam.Flatten()
assert_that(actual, equal_to(expected))
Redistribuez les éléments de la PCollection parmi les travailleurs.
test_reshuffle.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestReshuffle(TestCase):
def test_reshuffle(self):
expected = ['A', 'B', 'C']
inputs = ['A', 'B', 'C']
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.Reshuffle())
assert_that(actual, equal_to(expected))
Le SDK Apache Beam Python fournit une multitude de transformations (bien que inférieures à Java). Je voudrais le mettre à jour au fur et à mesure que de nouvelles fonctionnalités sont fournies.
J'espère que vous pourrez vous y référer lorsque vous voudrez vous souvenir de la transformation d'Apache Beam.
Recommended Posts