Luigi:2.5.0 python:3.6
Luigi a un article qui évite le problème que le traitement parallèle n'est pas possible dans l'environnement Windows. Forcer luigi à effectuer un traitement parallèle dans un environnement Windows
Il y a un problème avec la gestion du générateur ~~, et les méthodes require et run de la tâche qui retourne la tâche dépendante sont appelées plusieurs fois par le planificateur. Cela signifie que le processus écrit en cours d'exécution ou requiert sera exécuté plusieurs fois selon la situation. Par conséquent, il est plus sûr de ne pas écrire des processus coûteux ou des processus qui affectent l'extérieur dans la méthode qui renvoie des tâches dépendantes.
Lors de la transition de la tâche de dépendance au traitement de la tâche de dépendance, l'objet générateur de la tâche de dépendance est écrasé, et lors du retour à la tâche de dépendance, un nouvel objet générateur est à nouveau acquis, donc le début de la méthode à chaque fois. Il sera redémarré à partir de. https://github.com/mtoriumi/luigi/blob/5678b6119ed260e8fb43410675be6d6daea445d1/luigi/worker.py#L130
Sample:
from luigi import Task, run
from luigi.mock import MockTarget
from inspect import currentframe
class DependentTask(Task):
def output(self):
print("running {}.{}".format(self.__class__.__name__, currentframe().f_code.co_name))
return MockTarget('out.txt')
def run(self):
print("running {}.{}".format(self.__class__.__name__, currentframe().f_code.co_name))
with self.output().open('w') as fout:
fout.write('DependentTask is succeeded')
def on_success(self):
print("Reached {}.{}".format(self.__class__.__name__, currentframe().f_code.co_name))
def on_failure(self, exception):
print("Reached {}.{} {}".format(self.__class__.__name__, currentframe().f_code.co_name, exception))
class StartTask(Task):
def output(self):
print("running {}.{}".format(self.__class__.__name__, currentframe().f_code.co_name))
return MockTarget("StartTaskOut.txt")
def run(self):
print("running {}.{}".format(self.__class__.__name__, currentframe().f_code.co_name))
with self.output().open('w') as fout:
fout.write('StartTask is succeeded')
yield DependentTask()
def on_success(self):
print("Reached {}.{}".format(self.__class__.__name__, currentframe().f_code.co_name))
def on_failure(self, exception):
print("Reached {}.{} {}".format(self.__class__.__name__, currentframe().f_code.co_name, exception))
if __name__ == '__main__':
run(main_task_cls=StartTask)
Output:
running StartTask.output
running StartTask.run
running StartTask.output
running DependentTask.output
running DependentTask.output
running DependentTask.run
running DependentTask.output
Reached DependentTask.on_success
running StartTask.run
running StartTask.output
running DependentTask.output
running DependentTask.output
Reached StartTask.on_success
~~ Il y a un bogue dans le processus de retry_count, qui spécifie le nombre de tentatives pour chaque tâche, et il est dit que le paramètre de limite du nombre de tentatives du planificateur sera utilisé pour la deuxième tentative et les suivantes. Actuellement, il ne fonctionne pas correctement, nous vous recommandons donc de ne pas l'utiliser. La cause est que la stratégie de nouvelle tentative (retry_policy_dict) n'est pas spécifiée dans le deuxième emplacement d'enregistrement de tâche et les suivants ci-dessous. https://github.com/spotify/luigi/blob/b33aa3033405bfe41d9f5a806d70fb8e98214d86/luigi/worker.py#L974-L984~~
~~ Des questions similaires ont été posées ci-dessous dans le passé. http://stackoverflow.com/questions/39595786/luigi-per-task-retry-policy~~
~~ Ceci envoie actuellement une pull request et est en attente d'examen. https://github.com/spotify/luigi/pull/2012~~
Il a été fusionné.
La variable d'environnement LUIGI_CONFIG_PATH doit également être spécifiée lors du démarrage de luigid. luigid fonctionne indépendamment, donc si vous souhaitez refléter le contenu de la section du planificateur, vous devez spécifier le fichier de configuration lorsque vous démarrez luigid.
Les tâches sont traitées dans l'ordre ʻoutput ()
=>
requires`` =>
run ()
`.
Lors du passage d'un paramètre à partir de la ligne de commande, le trait de soulignement dans le nom du paramètre est remplacé par un trait d'union.
Lorsque vous prenez un paramètre avec DictParameter, il devient un type unique appelé FrozenOrderedDict. Par conséquent, certaines méthodes qui pourraient être utilisées dans le type intégré ne peuvent pas être utilisées.
Les paramètres autres que le type intégré ne peuvent pas être acheminés entre des tâches parallèles. Si un objet est donné, il sera converti en une chaîne de nom de classe. S'il est en série, il peut être remis. Il s'agit d'une spécification selon laquelle les tâches parallèles fonctionnent dans plusieurs processus et les paramètres sont une fois convertis au format JSON.
Il y a deux conditions pour terminer une tâche.
--Créer un fichier sur la cible spécifiée par output = ouvrir / fermer la cible (tâche réussie) --Exception dans la tâche (échec de la tâche)
Si l'un des éléments ci-dessus n'est pas satisfait, la tâche ne se terminera pas pour toujours tant qu'elle ne sera pas tuée.
Si une exception se produit alors que la cible est ouverte en mode écriture, le fichier d'écriture temporaire restera en tant que garbage en raison du fichier occupé, donc n'écrivez pas un processus qui peut provoquer une exception après l'ouverture de la cible.
Recommended Posts