Python job manager Luigi 2.0 series memo.
standard_task.py
import luigi
class MyTask(luigi.Task):
date = luigi.DateParameter()
def requires(self):
return MyDependentTask(self.date)
def run(self):
with self.output().open('w') as output:
with self.input().open('r') as input:
for line in input:
ret = do_something(line)
output.write(ret)
output.write('\n')
def output(self):
return luigi.LocalTarget('./out2_{0}.txt'.format(self.date.isoformat()))
class MyDependentTask(luigi.Task):
date = luigi.DateParameter()
def run(self):
with self.output().open('w') as output:
output.write("line1\n")
output.write("line2\n")
def output(self):
return luigi.LocalTarget('./out1_{0}.txt'.format(self.date.isoformat()))
if __name__ == '__main__':
luigi.run()
Use luigi.format.Nop
.
For example, if you want to pickle.
import pickle
import luigi
class SomeTask(luigi.Task):
def requires(self):
return xxxx
def run(self):
some_obj = hoge()
with self.output().open('w') as output:
output.write(pickle.dumps(some_obj, protocol=pickle.HIGHEST_PROTOCOL))
def output(self):
return luigi.LocalTarget(
format=luigi.format.Nop,
path='xxxxxxxx')
class NextTask(luigi.Task):
def requires(self):
return SomeTask()
def run(self):
with self.input().open('r') as infile:
ret = pickle.load(infile)
Returns Target
with luigi.format.GzipFormat
passed in the output of the dependent Task.
Pass luigi.format.GzipFormat
to Target's format as you did when typing
class MyTask(luigi.Task):
def run(self):
with self.output().open('w') as output:
output.write('aaaa')
def output(self):
return luigi.LocalTarget('./out.gz', format=luigi.format.GzipFormat())
Specify luigi.format.Nop
as the output format and pickle and write the DataFrame. Do not use to_csv
etc. as the type will be lost.
def run(self):
result_df = do_something()
with self.output().open('w') as output:
output.write(pickle.dumps(result_df, protocol=pickle.HIGHEST_PROTOCOL))
The side that receives as input
def run(self):
with self.input().open('r') as infile:
input_df: pd.DataFrame = pickle.load(infile)
do_something(input_df)
luigi.WrapperTask
does not implement run
or ʻoutput`.
class MyInvokerTask(luigi.WrapperTask):
def requires(self):
return [FugaTask(), BarTask(), BuzTask(), FooTask()]
class MyInvokerTask(luigi.WrapperTask):
def requires(self):
return [MyDepTask1(), MyDepTask2(), MyDepTask3()]
class MyDepTask1(luigi.Task):
priority = 100
#The following is omitted
Then, add --workers 2
or something in the startup command.
Look at the proprity
of each task and execute it with priority from the highest one.
If you don't define luigi-like dependencies but want to process them serially
class MyInvokerTask(luigi.WrapperTask):
def requires(self):
yield MyDepTask1()
yield MyDepTask2()
yield MyDepTask3()
If you luigi.task.externalize
the task object, it will not run
, it will only check if the output is generated.
class MyTask(luigi.Task):
def requires(self):
return externalize(MyDependencyTask())
def run(self):
print('Someone has finished MyDependencyTask')
If the task is PENDING or invisible (released from the scheduler) in Visualiser, you can execute the command again. Only tasks for which no output has been generated on the dependency tree will be executed.
The default settings will not retry, so specify the following 4 items in the configuration file. ** Note: In Version 2.5, the setting items around the retry have changed **
luigi.cfg
[core]
worker-keep-alive: true
max-reschedules: 20
[scheduler]
disable-num-failures: 10
retry-delay: 300
Disables the task if it is mossed the number of times disable-num-failures
within the time specified by disable-window-seconds
.
luigi.cfg
disable-num-failures: 20
disable-window-seconds: 3600
If you set retry-external-tasks: true
in luigi.cfg
, ExternalTask will also be retried. retry-delay
can be specified for each scheduler and cannot be specified for each task.
luigi.Task.event_handler You can make hooks with the decorator. If you collect the task elapsed time in the handler for PROCESSING_TIME
, you only need to implement it in one place.
@luigi.Task.event_handler(luigi.Event.PROCESSING_TIME)
def on_processing_time(task, duration):
logger.debug('Task {} proceed {:.1f} sec'.format(task, duration))
#Throw somewhere to collect metrics
# ...
AWS
http://luigi.readthedocs.org/en/stable/_modules/luigi/s3.html
Use luigi.s3.S3PathTask
class MyTask(luigi.Task):
def requires(self):
return luigi.s3.S3PathTask('s3://xxxxxxx')
If gzipped
class MyTask(luigi.Task):
def requires(self):
return GzipS3FileTask('s3://hoge/fuga.gz')
def run(self):
input = self.input().open('r') #Can be read with
class GzipS3FileTask(luigi.s3.S3PathTask):
path = luigi.Parameter()
def output(self):
return luigi.s3.S3Target(self.path, format=luigi.format.GzipFormat())
Write with output set to luigi.s3.S3Target
.
class MyTask(luigi.Task):
def run(self):
with self.output().open('w') as output:
output.write('Hey')
def output(self):
return luigi.s3.S3Target('s3://hoge/fuga.txt')
Pass the client for STS connection to the client of S3Target
class MyS3FileTask(luigi.s3.S3PathTask):
path = luigi.Parameter()
def output(self):
#Pass the key obtained from the assumed role
client = luigi.s3.S3Client(
aws_access_key_id=xxxxx,
aws_secret_access_key=yyyyy,
security_token=zzzz)
return luigi.s3.S3Target('s3://xxxx', client=client)
Make the settings like this
luigi.cfg
[core]
error-email: arn:aws:sns:ap-northeast-1:0000000000:sns-LuigiError
[email]
type: sns
force-send: true #True when you want to skip even during manual execution
As appropriate, pass ʻAWS_DEFAULT_REGION` etc. to the startup command. You don't need to specify credentials when using the IAM role of an EC2 instance.
AWS_DEFAULT_REGION=ap-northeast-1 python sns_test.py Invoke
GCP
GCP credentials are passed in the environment variable GOOGLE_APPLICATION_CREDENTIALS
.
Use luigi.contrib.gcs.GCSTarget
Since GCSTarget
is made without assuming an error even though network access occurs when creating an instance, it is better to retry when 503 returns.
Write to luigi.contrib.gcs.GCSTarget
import luigi
from luigi.contrib.gcs import GCSTarget
class MyTask(luigi.Task):
def requires(self):
return GCSPathTask(path='gs://hoge/fuga.txt')
def run(self):
with self.input().open('r') as input:
#Do something
with self.output().open('w') as output:
#Write something in output
def output(self):
return GCSTarget('gs://hoge/fuga_result.txt')
class GCSPathTask(luigi.ExternalTask):
path = luigi.Parameter()
def output(self):
return GCSTarget(self.path)
luigi.contrib.bigquery
is awkward to use, so it's better to write it in advance.
In particular, BigQuery Target cannot re-execute the task without deleting the table.