Cet article est une traduction japonaise de Building Data Pipelines with Python and Luigi. L'article original était bien fait, alors j'ai essayé de le traduire, même si je n'en étais pas sûr, pour ma propre compréhension. Si vous avez des erreurs, veuillez nous en informer dans les commentaires.
Pour les data scientists, les opérations quotidiennes sont souvent plus de recherche et développement que d'ingénierie. Néanmoins, le processus du prototype au produit nécessite pas mal d'efforts de reconfiguration, les décisions rapides et confuses étant la meilleure option suivante ^ 1. Cela retarde toujours l'innovation et, d'une manière générale, l'ensemble du projet.
Cet article décrit l'expérience de la création d'un pipeline de données: toutes les étapes courantes requises pour préparer les données pour un produit basé sur les données, telles que l'extraction, le nettoyage, la fusion et le prétraitement des données. L'accent est mis en particulier sur le ** transfert de données ** et sur la façon dont un gestionnaire de flux de travail comme Luigi peut être un sauveur sans vous déranger. Avec un minimum d'effort, la transition du prototype au produit se fera en douceur.
Un exemple de code est disponible sur GitHub Gist.
Dans les prototypes précédents, le pipeline de données ressemblait à peu près à ceci:
$ python get_some_data.py
$ python clean_some_data.py
$ python join_other_data.py
$ python do_stuff_with_data.py
Dans la phase expérimentale préliminaire d'un projet de données, les éléments suivants sont assez courants: un pré-traitement est nécessaire, ce qui est susceptible de conduire à un piratage rapide ^ 2, il est donc en proie aux meilleures pratiques d'ingénierie. Et le nombre de scripts augmente et le pipeline de données pèle.
Cette approche n'a que l'avantage d'être rapide et pirate. L'inconvénient est que c'est ennuyeux: vous voudrez relancer le pipeline à chaque fois, et vous devrez appeler manuellement un tas de scripts l'un après l'autre. De plus, il y a beaucoup de malentendus lors du partage de ce prototype avec des collègues (comme "Pourquoi do_stuff_with_data
ne fonctionne-t-il pas?", "Avez-vous d'abord fait" clean_some_data`? ", Etc.).
La solution apparemment pirate semble tout pousser dans un seul script. Après une rapide refactorisation, le script do_everything.py
ressemblerait à ceci:
if __name__ == '__main__':
get_some_data()
clean_some_data()
join_other_data()
do_stuff_with_data()
Facile à faire:
$ python do_everything.py
(Remarque: vous pouvez tout rassembler dans un script bash qui appelle un tas de scripts en séquence, mais les inconvénients restent les mêmes)
Lorsque nous entrons dans le pipeline prêt pour le produit, nous devons réfléchir un peu aux aspects du code qui exécutent tous les exemples. En particulier, la gestion des erreurs doit être envisagée:
try:
get_some_data()
except GetSomeDataError as e:
#La gestion des erreurs
Mais lorsque toutes les tâches sont réunies, cela se transforme en un essai / sauf le sapin de Noël:
try:
get_some_data()
try:
clean_some_data()
try:
#Faites quelque chose ici...
except EvenMoreErrors:
# ...
except CleanSomeDataError as e:
#Gérer CleanSomeDataError
except GetSomeDataError as e:
#Gérer GetSomeDataError
Une autre considération importante est de savoir comment restaurer le pipeline. Par exemple, si les premières tâches sont terminées, mais qu'une erreur se produit en cours de route, comment pouvez-vous réexécuter le pipeline sans réexécuter la première étape réussie?
#Vérifiez si la tâche est déjà réussie
if not i_got_the_data_already():
#Sinon, fais-le
try:
get_some_date()
except GetSomeDataError as e:
#La gestion des erreurs
Luigi est un outil Python de gestion des flux de travail développé par Spotify pour aider à créer des pipelines de données complexes pour les travaux par lots. L'installation Luigi est:
pip install luigi
Les fonctionnalités utiles de Luigi sont:
--Gestion de la dépendance
Il existe deux concepts clés pour comprendre comment Luigi peut être appliqué au pipeline de données: les tâches et les cibles. Une tâche est un ensemble de tâches, représentées en héritant de la classe luigi.Task
et en remplaçant certaines méthodes de base. La sortie de la tâche est la cible, qui peut être le système de fichiers local, Amazon S3 ou la base de données.
Les dépendances peuvent être définies sur les entrées et les sorties. Par exemple, si la tâche B dépend de la tâche A, cela signifie que la sortie de la tâche A est l'entrée de la tâche B.
Jetons un coup d'œil à quelques tâches typiques:
# Filename: run_luigi.py
import luigi
class PrintNumbers(luigi.Task):
def requires(self):
return []
def output(self):
return luigi.LocalTarget("numbers_up_to_10.txt")
def run(self):
with self.output().open('w') as f:
for i in range(1, 11):
f.write("{}\n".format(i))
class SquaredNumbers(luigi.Task):
def requires(self):
return [PrintNumbers()]
def output(self):
return luigi.LocalTarget("squares.txt")
def run(self):
with self.input()[0].open() as fin, self.output().open('w') as fout:
for line in fin:
n = int(line.strip())
out = n * n
fout.write("{}:{}\n".format(n, out))
if __name__ == '__main__':
luigi.run()
Ce code présente deux tâches: PrintNumbers
, qui écrit les nombres de 1 à 10 ligne par ligne dans un fichier appelé number_up_to_10.txt
, et squares
, qui lit ce fichier et les paires avec des nombres carrés ligne par ligne. C'est un "Nombres au carré" qui écrit dans un fichier appelé .txt`.
Pour effectuer cette tâche:
$ python run_luigi.py SquaredNumbers --local-scheduler
Luigi considère la vérification des dépendances entre les tâches et constate qu'il n'y a pas d'entrée SquaredNumbers
, alors lancez d'abord la tâche PrintNumbers
, puis exécutez SquaredNumbers
.
Le premier argument que vous passez à Luigi est le nom de la dernière tâche du pipeline que vous souhaitez effectuer. Le deuxième argument indique simplement à Luigi d'utiliser le planificateur local (nous en reparlerons plus tard).
Vous pouvez également utiliser la commande luigi
:
$ luigi -m run_luigi.py SquaredNumbers --local-scheduler
Pour créer une tâche Luigi, créez simplement une classe avec luigi.Task
comme parent et remplacez certaines méthodes. En particulier:
--requires ()
est une liste de tâches dépendantes
--ʻOutput () ʻest la cible de la tâche (par exemple, LocalTarget, S3Target, etc.)
--run ()
est la logique d'exécution
Est. Luigi vérifie les valeurs de retour de requires ()
et ʻoutput () `et construit un graphe de dépendances en conséquence.
Les noms de fichiers et les paramètres codés en dur sont généralement des anti-modèles. Une fois que vous avez compris la structure et la dynamique d'une tâche, vous devez paramétrer vos paramètres afin de pouvoir appeler dynamiquement le même script avec différents arguments.
C'est la classe luigi.Parameter ()
. Chaque tâche Luigi peut avoir plusieurs paramètres. Par exemple, disons dans l'exemple précédent que vous pouvez changer le nombre. Puisque nous utilisons des entiers comme paramètres pour la fonction range ()
, nous pouvons utiliser luigi.IntParameter
au lieu de la classe de paramètres par défaut. La tâche modifiée ressemble à ceci:
class PrintNumbers(luigi.Task):
n = luigi.IntParameter()
def requires(self):
return []
def output(self):
return luigi.LocalTarget("numbers_up_to_{}.txt".format(self.n))
def run(self):
with self.output().open('w') as f:
for i in range(1, self.n+1):
f.write("{}\n".format(i))
class SquaredNumbers(luigi.Task):
n = luigi.IntParameter()
def requires(self):
return [PrintNumbers(n=self.n)]
def output(self):
return luigi.LocalTarget("squares_up_to_{}.txt".format(self.n))
def run(self):
with self.input()[0].open() as fin, self.output().open('w') as fout:
for line in fin:
n = int(line.strip())
out = n * n
fout.write("{}:{}\n".format(n, out))
Pour élever la tâche SquaredNumbers
à 20 et l'appeler:
$ python run_luigi.py SquaredNumbers --local-scheduler --n 20
Les paramètres peuvent également avoir des valeurs par défaut. Par exemple:
n = luigi.IntParameter(default=10)
Dans ce cas, 10 est utilisé sauf si l'argument --n
est spécifié.
Auparavant, j'ai utilisé l'option --local-scheduler
lors de l'exécution des tâches de Luigi sur le planificateur local. Ceci est utile pour le développement, mais pour les environnements de produit, vous devez utiliser un planificateur centralisé (voir la documentation sur scheduler).
Cela présente plusieurs avantages:
Pour exécuter le démon du planificateur Luigi au premier plan:
$ luigid
En arrière-plan:
$ luigid --background
Il utilise le port 8082 par défaut, vous pouvez donc voir la visualisation en accédant à http: // localhost: 8082 / avec votre navigateur.
Lorsque le planificateur global Luigi est en cours d'exécution, il peut être réexécuté sans options pour le planificateur local:
$ python run_luigi.py SquaredNumbers --n [BIG_NUMBER]
L'exemple de code se termine en millisecondes, mais si vous voulez basculer vers le navigateur et voir le graphique de dépendances lorsque la tâche est toujours en cours d'exécution, vous en avez probablement un grand nombre, disons 10 000 000 ou plus, dans l'option --n
. Vous devriez le donner.
Une capture d'écran du graphique de dépendance est:
Nous avons discuté de la définition d'un pipeline de données à l'aide de Luigi, un gestionnaire de flux de travail écrit en Python. Luigi fournit une belle abstraction du pipeline avec des tâches et des cibles, et prend également en compte les dépendances pour vous.
Du point de vue de la réutilisation du code et de la mentalité de passer du prototype au produit, j'ai des [packages Python] individuels (http://marcobonzanini.com/2015/07/01/how-to-develop) pour les tâches de logique métier. Je trouve utile de le définir comme -et-distribuer-python-packages /) (c'est-à-dire avec le fichier setup.py
). De cette façon, vous pouvez simplement déclarer ʻimport your_package` à partir du script de Luigi et l'appeler à partir de là.
Il est possible pour une tâche de produire plusieurs fichiers en sortie, mais si c'est le cas, vous devriez probablement vous demander si la tâche peut être divisée en unités plus petites (c'est-à-dire plusieurs tâches). Leurs résultats sont-ils logiquement les mêmes? Y a-t-il des dépendances? Si vous ne pouvez pas diviser une tâche, je pense qu'il est simple et pratique de créer simplement ʻoutput () un fichier journal qui combine le nom de la tâche elle-même, l'horodatage, etc. Le nom du fichier journal serait quelque chose comme
TaskName_timestamp_param1value_param2value_etc`.
Les gestionnaires de flux de travail comme Luigi gèrent les dépendances, réduisent la quantité de modèles de code pour la gestion des paramètres et des erreurs, gèrent la récupération des pannes et suivent des modèles clairs lors du développement de pipelines de données. De manière générale, c'est utile parce que c'est le cas.
Il est également important de prendre en compte les limites:
--Lugi est développé pour les travaux par lots, il est donc probablement inutile pour un traitement en temps quasi réel -Ne déclenche pas l'exécution. Vous devez exécuter le pipeline de données (par exemple via cronjob)
Recommended Posts