C'est tout pour le titre!
…… Parce que c'est quelque chose, avec quelques exemples.
Comme le sait tous ceux qui l'ont utilisé, le gestionnaire de flux de travail Luigi extrait les données générées par une tâche et les traite comme une classe héritée de Target
. Pour le moment, cela fonctionne si vous pouvez implémenter la méthode ʻexists () pour voir si elle existe déjà. Par exemple, s'il s'agit d'un fichier local,
LocalTarget` est utilisé, mais si vous l'écrivez selon le tutoriel pour le moment, ce sera comme suit.
from luigi import Task, ExternalTask, LocalTarget
import pandas as pd
class RawFile(ExternalTask):
def output(self):
return LocalTarget('path/to/file.csv')
class Aggregation(Task):
def requires(self):
yield RawFile()
def run(self):
df = pd.read_csv(self.input()[0], sep='\t', parse_dates=[7, 8], encoding='cp932')
...
Eh bien ... eh bien, voici une autre tâche qui dépend de RawFile
. Pensons à ça. Je commence progressivement à écrire ce genre de code ...
class Plot(Task):
def requires(self):
yield RawFile()
def run(self):
df = pd.read_csv(
Attends une minute. Vous souvenez-vous des bonnes options à transmettre à l'analyseur de fichiers CSV pour chaque projet dans lequel vous êtes impliqué? ** Je ne me souviens pas. ** Souhaitez-vous écrire une option à chaque fois? Je ne veux pas écrire ** absolument ** plusieurs fois. Luigi a extrait Target dans le chemin des données d'origine, mais c'est brutal. C'est vrai, ** il vous suffit d'abstraire la lecture **.
from luigi import Task, ExternalTask, LocalTarget
import pandas as pd
class RawFileTarget(LocalTarget):
path = 'path/to/file.csv'
def __init__(self):
super(RawFileTarget, self).__init__(path)
def load(self):
return pd.read_csv(self.fn, sep='\t', parse_dates=[7, 8], encoding='cp932')
class RawFile(ExternalTask):
def output(self):
return RawFileTarget()
Définissons Target
, qui définit load ()
.
De cette façon, toute tâche dépendante
class Aggregation(Task):
def requires(self):
yield RawFile()
def run(self):
df = self.input()[0].load()
...
class Plot(Task):
def requires(self):
yield RawFile()
def run(self):
df = self.input()[0].load()
...
C'est rafraîchissant. Plus tard, la personne en charge du traitement des données a reçu un fichier CSV avec des spécifications différentes de celui avec les mots "Désolé, celui qui m'a été donné auparavant, j'ai fait une erreur ...", mais j'ai mal compris quoi. Que le fichier passé soit ou non Excel, vous pouvez simplement réécrire RawFileTarget.load ()
. Heureux!
Comme le titre l'indique, je suis heureux d'écrire save ()
pour qu'il corresponde à load ()
, mais je vais omettre l'exemple de code.
Recommended Posts