Ceci est un mémo de la série 2.0 de Luigi, qui est un gestionnaire de tâches de Python.
standard_task.py
import luigi
class MyTask(luigi.Task):
date = luigi.DateParameter()
def requires(self):
return MyDependentTask(self.date)
def run(self):
with self.output().open('w') as output:
with self.input().open('r') as input:
for line in input:
ret = do_something(line)
output.write(ret)
output.write('\n')
def output(self):
return luigi.LocalTarget('./out2_{0}.txt'.format(self.date.isoformat()))
class MyDependentTask(luigi.Task):
date = luigi.DateParameter()
def run(self):
with self.output().open('w') as output:
output.write("line1\n")
output.write("line2\n")
def output(self):
return luigi.LocalTarget('./out1_{0}.txt'.format(self.date.isoformat()))
if __name__ == '__main__':
luigi.run()
Utilisez luigi.format.Nop
.
Par exemple, si vous voulez décaper.
import pickle
import luigi
class SomeTask(luigi.Task):
def requires(self):
return xxxx
def run(self):
some_obj = hoge()
with self.output().open('w') as output:
output.write(pickle.dumps(some_obj, protocol=pickle.HIGHEST_PROTOCOL))
def output(self):
return luigi.LocalTarget(
format=luigi.format.Nop,
path='xxxxxxxx')
class NextTask(luigi.Task):
def requires(self):
return SomeTask()
def run(self):
with self.input().open('r') as infile:
ret = pickle.load(infile)
Renvoie Target
avec luigi.format.GzipFormat
transmis dans la sortie de la tâche dépendante.
Passez luigi.format.GzipFormat
au format de Target comme vous l'avez fait lors de la saisie
class MyTask(luigi.Task):
def run(self):
with self.output().open('w') as output:
output.write('aaaa')
def output(self):
return luigi.LocalTarget('./out.gz', format=luigi.format.GzipFormat())
Spécifiez luigi.format.Nop
comme format de sortie et pickle et écrivez le DataFrame. N'utilisez pas to_csv
etc. car le type sera perdu.
def run(self):
result_df = do_something()
with self.output().open('w') as output:
output.write(pickle.dumps(result_df, protocol=pickle.HIGHEST_PROTOCOL))
Le côté recevant comme entrée
def run(self):
with self.input().open('r') as infile:
input_df: pd.DataFrame = pickle.load(infile)
do_something(input_df)
luigi.WrapperTask
n'implémente pas run
ou ʻoutput`.
class MyInvokerTask(luigi.WrapperTask):
def requires(self):
return [FugaTask(), BarTask(), BuzTask(), FooTask()]
class MyInvokerTask(luigi.WrapperTask):
def requires(self):
return [MyDepTask1(), MyDepTask2(), MyDepTask3()]
class MyDepTask1(luigi.Task):
priority = 100
#Ce qui suit est omis
Ensuite, ajoutez --workers 2
ou quelque chose dans la commande de démarrage.
Regardez la propriété de chaque tâche et exécutez-la avec la priorité la plus élevée.
Si vous ne définissez pas de dépendances de type luigi mais que vous souhaitez les traiter en série
class MyInvokerTask(luigi.WrapperTask):
def requires(self):
yield MyDepTask1()
yield MyDepTask2()
yield MyDepTask3()
Si vous luigi.task.externalize
l'objet de tâche, il ne fonctionnera pas, il vérifiera seulement si la sortie est générée.
class MyTask(luigi.Task):
def requires(self):
return externalize(MyDependencyTask())
def run(self):
print('Someone has finished MyDependencyTask')
Si la tâche est EN ATTENTE dans Visualiser ou ne peut pas être vue (libérée du planificateur), exécutez à nouveau la commande. Seules les tâches pour lesquelles aucune sortie n'a été générée sur l'arborescence de dépendances seront exécutées.
Les paramètres par défaut ne réessaient pas, spécifiez donc les 4 éléments suivants dans le fichier de paramètres. ** Remarque: dans la version 2.5, les éléments de configuration autour de la nouvelle tentative ont changé **
luigi.cfg
[core]
worker-keep-alive: true
max-reschedules: 20
[scheduler]
disable-num-failures: 10
retry-delay: 300
Désactive la tâche si elle est moussée le nombre de fois disable-num-failures
dans le temps spécifié par disable-window-seconds
.
luigi.cfg
disable-num-failures: 20
disable-window-seconds: 3600
Si vous définissez retry-external-tasks: true
dans luigi.cfg
, ExternalTask sera également réessayé. La spécification de «retry-delay» concerne chaque planificateur et ne peut pas être spécifiée pour chaque tâche.
luigi.Task.event_handler Vous pouvez créer des crochets avec le décorateur. Si vous collectez le temps écoulé de la tâche dans le gestionnaire pour PROCESSING_TIME
, vous n'avez besoin de l'implémenter qu'à un seul endroit.
@luigi.Task.event_handler(luigi.Event.PROCESSING_TIME)
def on_processing_time(task, duration):
logger.debug('Task {} proceed {:.1f} sec'.format(task, duration))
#Jetez pour collecter des métriques quelque part
# ...
AWS
http://luigi.readthedocs.org/en/stable/_modules/luigi/s3.html
Utilisez luigi.s3.S3PathTask
class MyTask(luigi.Task):
def requires(self):
return luigi.s3.S3PathTask('s3://xxxxxxx')
Si gzippé
class MyTask(luigi.Task):
def requires(self):
return GzipS3FileTask('s3://hoge/fuga.gz')
def run(self):
input = self.input().open('r') #Peut être lu avec
class GzipS3FileTask(luigi.s3.S3PathTask):
path = luigi.Parameter()
def output(self):
return luigi.s3.S3Target(self.path, format=luigi.format.GzipFormat())
Écrire avec la sortie définie sur luigi.s3.S3Target
.
class MyTask(luigi.Task):
def run(self):
with self.output().open('w') as output:
output.write('Hey')
def output(self):
return luigi.s3.S3Target('s3://hoge/fuga.txt')
Passez le client pour la connexion STS au client de S3Target
class MyS3FileTask(luigi.s3.S3PathTask):
path = luigi.Parameter()
def output(self):
#Passer la clé obtenue à partir du rôle assumé
client = luigi.s3.S3Client(
aws_access_key_id=xxxxx,
aws_secret_access_key=yyyyy,
security_token=zzzz)
return luigi.s3.S3Target('s3://xxxx', client=client)
Effectuez les réglages comme celui-ci
luigi.cfg
[core]
error-email: arn:aws:sns:ap-northeast-1:0000000000:sns-LuigiError
[email]
type: sns
force-send: true #Vrai quand vous voulez sauter même pendant l'exécution manuelle
Le cas échéant, passez ʻAWS_DEFAULT_REGION` etc. à la commande de démarrage. Vous n'avez pas besoin de spécifier les informations d'identification lors de l'utilisation du rôle IAM de l'instance EC2.
AWS_DEFAULT_REGION=ap-northeast-1 python sns_test.py Invoke
GCP
Les informations d'identification GCP sont transmises dans la variable d'environnement «GOOGLE_APPLICATION_CREDENTIALS».
Utilisez luigi.contrib.gcs.GCSTarget
Puisque GCSTarget
est fait sans supposer d'erreur même si l'accès au réseau se produit lors de la création d'une instance, il est préférable de réessayer lorsque 503 revient.
Écrivez dans luigi.contrib.gcs.GCSTarget
import luigi
from luigi.contrib.gcs import GCSTarget
class MyTask(luigi.Task):
def requires(self):
return GCSPathTask(path='gs://hoge/fuga.txt')
def run(self):
with self.input().open('r') as input:
#Faire quelque chose
with self.output().open('w') as output:
#Ecrire quelque chose en sortie
def output(self):
return GCSTarget('gs://hoge/fuga_result.txt')
class GCSPathTask(luigi.ExternalTask):
path = luigi.Parameter()
def output(self):
return GCSTarget(self.path)
luigi.contrib.bigquery
est difficile à utiliser, il est donc préférable de l'écrire à l'avance.
En particulier, BigQuery Target ne peut pas réexécuter une tâche sans supprimer la table.