This is a summary article about Luigi's wrapper library gokart.
The motives for development and basic usage are summarized very carefully in M3's blog, and the basic usage is It's a story that you should read here, but I wanted to summarize it as a reverse lookup reference, so I wrote it as an article.
In addition, I will not explain much about the functions of Luigi itself.
A type of OSS for the Pipeline framework developed by Spotify. Implemented in Python, inheriting luigi.Task
--requires ()
: Dependent Task
--run ()
: Process to be executed
--ʻOutput () `: Output destination
You can easily create a workflow just by writing the three methods.
The origin of the name
Also it should be mentioned that Luigi is named after the pipeline-running friend of Super Mario.
apparently
Gokart is a wrapper library that makes Luigi easier to use.
The name probably comes from Mario (Kart).
The functions of gokart == 0.3.6
are summarized below.
When creating a task, inherit gokart.TaskOnKart
instead of luigi.Task
.
import gokart
class TaskA(gokart.TaskOnKart):
def run(self):
data= pd.DataFrame(load_iris()['data'])
self.dump(data)
class TaskB(gokart.TaskOnKart):
def reuires(self):
return TaskA()
#output is optional
def output(self):
return self.make_target('data.pkl')
def run(self):
df =self.load()
self.dump(df)
The basic usage is the same as Luigi, but since you only need to use self.dump (object you want to save)
, it can be considerably simplified compared to performing the same processing with Luigi alone. In addition, the def output (self)
can be omitted, in which case it will be saved in the pickle
format.
Execute as follows.
gokart.run(['TaskB', '--local-scheduler'])
When executed, the object will be saved under resources
as shown below.
resources
├── data_3eba828ec57403111694b0fb1e3f62e0.pkl
└── log
├── module_versions
│ └── TaskB_3eba828ec57403111694b0fb1e3f62e0.txt
├── processing_time
│ └── TaskB_3eba828ec57403111694b0fb1e3f62e0.pkl
├── task_log
│ └── TaskB_3eba828ec57403111694b0fb1e3f62e0.pkl
└── task_params
└── TaskB_3eba828ec57403111694b0fb1e3f62e0.pkl
The hash value is attached to the specified file name and saved. Since the hash value is determined by the parameters of the Task, the Task will be re-executed if the parameters are changed, unlike the case of ** Luigi alone. ** This is also one of the benefits of gokart. (Although it will be described later, it is also possible to save without adding a hash value)
log has
--Version of the module used --Processing time --log output by logger --Applied parameters
Is saved.
By default, it is saved under resources
, but the save destination directory is in the configuration file.
[TaskOnKart]
workspace_directory=./output
It can be changed by specifying as.
load
Of course, you can also load a saved DataFrame with self.load ()
, but if you want to load a set of DataFrames like [df1, df2, df3 ...]
, you can use load_dataframe
. You can load multiple DataFrames in a vertically combined state.
You can also optionally specify a column with set
to raise an exception if that column does not exist in the DataFrame
to load.
class TaskA(gokart.TaskOnKart):
def run(self):
df1 = pd.DataFrame([1,2], columns=['target'])
df2 = pd.DataFrame([3,4], columns=['target'])
df3 = pd.DataFrame([5,6], columns=['target'])
self.dump([df1, df2, df3])
class TaskB(gokart.TaskOnKart):
def requires(self):
return TaskA()
def run(self):
#Loaded after being concated
df =self.load_data_frame(required_columns={'target'})
self.dump(df)
If there are multiple dependent Tasks, you can define the dependent Tasks in dictionary format and read them with key as shown below. Luigi alone can load multiple Tasks, but it doesn't support dictionary forms, so using the dictionary format can improve the readability of your code.
class TrainModel(gokart.TaskOnKart):
def requires(self):
return {'data': LoadData(), 'target': LoadTarget()}
def run(self):
data = self.load('data')
target = self.load('target')
model = LogisticRegression()
model.fit(data, target)
self.dump(model)
You can use self.load_generator
to load and process Tasks sequentially.
from sklearn.datasets import load_iris
from sklearn.datasets import load_wine
class LoadWineData(gokart.TaskOnKart):
def run(self):
data = load_wine()['data']
self.dump(data)
class LoadIrisData(gokart.TaskOnKart):
def run(self):
data = load_iris()['data']
self.dump(data)
class LoadGenerator(gokart.TaskOnKart):
def requires(self):
return [LoadWineData(), LoadIrisData()]
def run(self):
for data in self.load_generator():
print(f'data_shape={data.shape}')
# data_shape=(178, 13)
# data_shape=(150, 4)
output
If ʻuse_unique_id = False`, the hash value will not be attached to the file name.
def output(self):
return self.make_target('data.pkl', use_unique_id=False)
For formats such as gensim and TensorFlow where models are saved across multiple files, you can use make_model_target
as shown below to compress and save them all at once.
def output(self):
return self.make_model_target(
'model.zip',
save_function=gensim.model.Word2Vec.save,
load_function=gensim.model.Word2Vec.load)
By passing a function for saving and restoring as a parameter like, the model and load_function
are compressed and saved as a set in zip format (in this case), and the calling Task is particularly concerned. You can restore the model with self.load ()
without having to.
If you use make_large_data_frame_target
as shown below, DataFrame
will be divided into multiple records for each capacity specified by max_byte
, compressed into one, and then saved.
def output(self):
return self.make_large_data_frame_target('large_df.zip', max_byte=2**10)
By the way, the above-mentioned make_model_target
is used internally.
If you want to convert DataFrame
to a format other than pickle
and save it, just add the extension of that format and the internal FileProcessor
will convert it to the target format and save it.
Currently supported formats are
- pickle
- npz
- gz
- txt
- csv
- tsv
- json
- xml
is.
class LoadWineData(gokart.TaskOnKart):
def run(self):
data = load_wine()['data']
self.dump(data)
class ToCSV(gokart.TaskOnKart):
def requires(self):
return LoadWineData()
def output(self):
#Define the extension you want to save in suffix
return self.make_target('./wine.csv')
def run(self):
df = pd.DataFrame(self.load())
self.dump(df)
If the path of work_space_directory
described in the configuration file starts withgs: //
, all output results will be uploaded to GCS, and if it iss3: //
, all output results will be uploaded to S3.
[TaskOnKart]
#prefix gs://Or s3://Then all outputs will be saved in the cloud
workspace_directory=gs://resources/
It's very convenient because you can change the code without any modification unlike the case of Luigi alone.
By writing parameter = $ {environment variable}
in the configuration file, you can specify the Parameter of Task in the environment variable.
This is very useful when you want to separate the test from the production, or when you want to change whether to save in the cloud for each environment you run.
[TaskOnKart]
workspace_directory=${WORK_SPACE}
[feature.LoadTrainTask]
is_test=${IS_TEST}
--.zshrc
export IS_TEST=False
datetime=`date "+%m%d%H%Y"`
export WORK_SPACE="gs://data/output/${datetime}"
Personally, I would like to turn it a little for confirmation locally before turning it tightly with GCE, but luigi.cfg
is very useful because I want to use a common one.
If you use gokart. (List) TaskInstanceParameter
, you can take Task as Parameter of Task. This allows you to reuse the same Task by creating a Task that does not depend on a specific Task, increasing the possibility of writing more flexible code.
from sklearn.datasets import load_wine
from sklearn.linear_model import LogisticRegression
class LoadWineData(gokart.TaskOnKart):
def run(self):
data = load_wine()['data']
self.dump(data)
class LoadWineTarget(gokart.TaskOnKart):
def run(self):
target = load_wine()['target']
self.dump(target)
class Trainer(gokart.TaskOnKart):
#Takes Task as an argument
data_task = gokart.TaskInstanceParameter(description='data for train')
target_task= gokart.TaskInstanceParameter(description='target for train')
def requires(self):
return {'data': self.data_task, 'target': self.target_task}
def run(self):
data = self.load('data')
target = self.load('target')
model = LogisticRegression()
model.fit(data, target)
self.dump(model)
class ExcuteTrain(gokart.TaskOnKart):
def requires(self):
#Inject Task
return Trainer(data_task=LoadWineData(), target_task=LoadWineTarget())
def run(self):
trained_model = self.load()
self.dump(trained_model)
You can notify slack by writing the following in the configuration file. From a security point of view, it is better to define token as an environment variable instead of solid writing.
[SlackConfig]
token=${SLACK_TOKEN}
channel=channel_name
to_user=hase_hiro
I would appreciate it if you could point out any differences in behavior.
Recommended Posts