Référence inversée Luigi

Ceci est un mémo de la série 2.0 de Luigi, qui est un gestionnaire de tâches de Python.

De base

Type de tâche de base

standard_task.py


import luigi

class MyTask(luigi.Task):
    date = luigi.DateParameter()

    def requires(self):
        return MyDependentTask(self.date)

    def run(self):
        with self.output().open('w') as output:
            with self.input().open('r') as input:
                for line in input:
                    ret = do_something(line)
                    output.write(ret)
                    output.write('\n')

    def output(self):
        return luigi.LocalTarget('./out2_{0}.txt'.format(self.date.isoformat()))


class MyDependentTask(luigi.Task):
    date = luigi.DateParameter()

    def run(self):
        with self.output().open('w') as output:
            output.write("line1\n")
            output.write("line2\n")

    def output(self):
        return luigi.LocalTarget('./out1_{0}.txt'.format(self.date.isoformat()))


if __name__ == '__main__':
    luigi.run()

Je veux sortir un fichier binaire

Utilisez luigi.format.Nop. Par exemple, si vous voulez décaper.

import pickle

import luigi


class SomeTask(luigi.Task):
    def requires(self):
        return xxxx

    def run(self):
        some_obj = hoge()
        with self.output().open('w') as output:
            output.write(pickle.dumps(some_obj, protocol=pickle.HIGHEST_PROTOCOL))

    def output(self):
        return luigi.LocalTarget(
            format=luigi.format.Nop,
            path='xxxxxxxx')


class NextTask(luigi.Task):
    def requires(self):
        return SomeTask()

    def run(self):
        with self.input().open('r') as infile:
            ret = pickle.load(infile)

Entrez un fichier gzippé

Renvoie Target avec luigi.format.GzipFormat transmis dans la sortie de la tâche dépendante.

Gzip le fichier de sortie

Passez luigi.format.GzipFormat au format de Target comme vous l'avez fait lors de la saisie

class MyTask(luigi.Task):
    def run(self):
        with self.output().open('w') as output:
            output.write('aaaa')

    def output(self):
        return luigi.LocalTarget('./out.gz', format=luigi.format.GzipFormat())

Je veux sortir Pandas DataFrame

Spécifiez luigi.format.Nop comme format de sortie et pickle et écrivez le DataFrame. N'utilisez pas to_csv etc. car le type sera perdu.


def run(self):
    result_df = do_something()
    with self.output().open('w') as output:
        output.write(pickle.dumps(result_df, protocol=pickle.HIGHEST_PROTOCOL))

Le côté recevant comme entrée


def run(self):
    with self.input().open('r') as infile:
        input_df: pd.DataFrame = pickle.load(infile)
        do_something(input_df)    

Tâches qui exécutent simplement des tâches

luigi.WrapperTask n'implémente pas run ou ʻoutput`.

class MyInvokerTask(luigi.WrapperTask):
    def requires(self):
        return [FugaTask(), BarTask(), BuzTask(), FooTask()]

Exécuter des tâches dépendantes en parallèle

class MyInvokerTask(luigi.WrapperTask):
    def requires(self):
        return [MyDepTask1(), MyDepTask2(), MyDepTask3()]

class MyDepTask1(luigi.Task):
    priority = 100

    #Ce qui suit est omis

Ensuite, ajoutez --workers 2 ou quelque chose dans la commande de démarrage. Regardez la propriété de chaque tâche et exécutez-la avec la priorité la plus élevée.

Exécuter les tâches dépendantes dans l'ordre

Si vous ne définissez pas de dépendances de type luigi mais que vous souhaitez les traiter en série

class MyInvokerTask(luigi.WrapperTask):
    def requires(self):
        yield MyDepTask1()
        yield MyDepTask2()
        yield MyDepTask3()

Ne pas enchaîner les tâches dépendantes

Si vous luigi.task.externalize l'objet de tâche, il ne fonctionnera pas, il vérifiera seulement si la sortie est générée.

class MyTask(luigi.Task):
    def requires(self):
        return externalize(MyDependencyTask())

    def run(self):
        print('Someone has finished MyDependencyTask')

Réessayer manuellement un travail de mousse

Si la tâche est EN ATTENTE dans Visualiser ou ne peut pas être vue (libérée du planificateur), exécutez à nouveau la commande. Seules les tâches pour lesquelles aucune sortie n'a été générée sur l'arborescence de dépendances seront exécutées.

Paramètre pour réessayer automatiquement les travaux de mousse

Les paramètres par défaut ne réessaient pas, spécifiez donc les 4 éléments suivants dans le fichier de paramètres. ** Remarque: dans la version 2.5, les éléments de configuration autour de la nouvelle tentative ont changé **

luigi.cfg


[core]
worker-keep-alive: true
max-reschedules: 20

[scheduler]
disable-num-failures: 10
retry-delay: 300

Arrêter de réessayer si les tentatives échouent toujours

Désactive la tâche si elle est moussée le nombre de fois disable-num-failures dans le temps spécifié par disable-window-seconds.

luigi.cfg


disable-num-failures: 20
disable-window-seconds: 3600

Continuer d'attendre la sortie de la tâche externe

Si vous définissez retry-external-tasks: true dans luigi.cfg, ExternalTask sera également réessayé. La spécification de «retry-delay» concerne chaque planificateur et ne peut pas être spécifiée pour chaque tâche.

Collecter le temps de traitement des tâches

luigi.Task.event_handler Vous pouvez créer des crochets avec le décorateur. Si vous collectez le temps écoulé de la tâche dans le gestionnaire pour PROCESSING_TIME, vous n'avez besoin de l'implémenter qu'à un seul endroit.

@luigi.Task.event_handler(luigi.Event.PROCESSING_TIME)
def on_processing_time(task, duration):
    logger.debug('Task {} proceed {:.1f} sec'.format(task, duration))
    #Jetez pour collecter des métriques quelque part
    # ...

AWS

http://luigi.readthedocs.org/en/stable/_modules/luigi/s3.html

Entrez le fichier que la tâche externe a placé dans S3

Utilisez luigi.s3.S3PathTask

class MyTask(luigi.Task):
    def requires(self):
        return luigi.s3.S3PathTask('s3://xxxxxxx')

Si gzippé

class MyTask(luigi.Task):
    def requires(self):
        return GzipS3FileTask('s3://hoge/fuga.gz')

    def run(self):
        input = self.input().open('r') #Peut être lu avec

class GzipS3FileTask(luigi.s3.S3PathTask):
    path = luigi.Parameter()

    def output(self):
        return luigi.s3.S3Target(self.path, format=luigi.format.GzipFormat())

Sortir le résultat vers S3

Écrire avec la sortie définie sur luigi.s3.S3Target.

class MyTask(luigi.Task):
    def run(self):
        with self.output().open('w') as output:
            output.write('Hey')

    def output(self):
        return luigi.s3.S3Target('s3://hoge/fuga.txt')

Utiliser la connexion STS (Security Token Service) pour accéder à S3

Passez le client pour la connexion STS au client de S3Target

class MyS3FileTask(luigi.s3.S3PathTask):
    path = luigi.Parameter()

    def output(self):
        #Passer la clé obtenue à partir du rôle assumé
        client = luigi.s3.S3Client(
            aws_access_key_id=xxxxx,
            aws_secret_access_key=yyyyy,
            security_token=zzzz)
        return luigi.s3.S3Target('s3://xxxx', client=client)

Envoyer une notification d'erreur à SNS

Effectuez les réglages comme celui-ci

luigi.cfg


[core]
error-email: arn:aws:sns:ap-northeast-1:0000000000:sns-LuigiError

[email]
type: sns
force-send: true #Vrai quand vous voulez sauter même pendant l'exécution manuelle

Le cas échéant, passez ʻAWS_DEFAULT_REGION` etc. à la commande de démarrage. Vous n'avez pas besoin de spécifier les informations d'identification lors de l'utilisation du rôle IAM de l'instance EC2.

AWS_DEFAULT_REGION=ap-northeast-1 python sns_test.py Invoke

GCP

Les informations d'identification GCP sont transmises dans la variable d'environnement «GOOGLE_APPLICATION_CREDENTIALS».

Entrez un fichier GCS

Utilisez luigi.contrib.gcs.GCSTarget

Puisque GCSTarget est fait sans supposer d'erreur même si l'accès au réseau se produit lors de la création d'une instance, il est préférable de réessayer lorsque 503 revient.

Sortie vers GCS

Écrivez dans luigi.contrib.gcs.GCSTarget

import luigi
from luigi.contrib.gcs import GCSTarget

class MyTask(luigi.Task):
   def requires(self):
       return GCSPathTask(path='gs://hoge/fuga.txt')

    def run(self):
        with self.input().open('r') as input:
            #Faire quelque chose
        
        with self.output().open('w') as output:
            #Ecrire quelque chose en sortie

    def output(self):
        return GCSTarget('gs://hoge/fuga_result.txt')

class GCSPathTask(luigi.ExternalTask):
    path = luigi.Parameter()
    
    def output(self):
        return GCSTarget(self.path)

Exécutez une tâche de chargement BigQuery

luigi.contrib.bigquery est difficile à utiliser, il est donc préférable de l'écrire à l'avance. En particulier, BigQuery Target ne peut pas réexécuter une tâche sans supprimer la table.

Recommended Posts

Référence inversée Luigi
Pytest à traction inversée
Référence d'inversion de bibliothèque de date / heure Python
Conseils de configuration du serveur Pull inversé
Mémo inversé de l'écran de gestion Django