Previous article A memorandum of the procedure for running the machine learning pipeline with Cloud Dataflow.
Last time, I did machine learning using scikit-learn and pandas that were pre-installed on the node used in Cloud Dataflow, but in the actual machine learning pipeline, I like to put a library for preprocessing such as OpenCV. You will want to install and analyze various machine learning libraries. This time, I would like to write how to install and run XGBoost, which is also popular in Kaggle, as an example of the procedure for installing arbitrary libraries in Cloud Dataflow.
As you can see in the Official Documents, there are three main ways to install your favorite library on Cloud Dataflow (Python). is.
Libraries like XBGoost require a third step. Below we will see how to create a custom setup.py for XB Goost.
As you can see in the example code of the official repository, write according to the setuptools method. I will. If there is a Python-dependent library, describe it in install_requires, but if you want to execute your own shell command for installation, create a command class that inherits the Command class of setuptools and describe the command execution procedure. The following is the XGBoost installation command written based on the above official example code. Basically, the installation procedure of XGBoost is followed, but in order to execute with subprocess.Popen, the execution path is specified for the command that involves moving the directory.
setup.py
from distutils.command.build import build as _build
import subprocess
import setuptools
class build(_build):
sub_commands = _build.sub_commands + [('CustomCommands', None)]
CUSTOM_COMMANDS = [(["sudo","apt-get","update"],"."),
(["sudo","apt-get","install","git","build-essential","libatlas-base-dev","-y"],"."), #"python-dev","gfortran"
(["git","clone","--recursive","https://github.com/dmlc/xgboost"],"."),
(["sudo","make"],"xgboost"),
(["sudo","python","setup.py","install"],"xgboost/python-package"),
(["sudo","pip","install","xgboost"],".")]
class CustomCommands(setuptools.Command):
def initialize_options(self):
pass
def finalize_options(self):
pass
def RunCustomCommand(self, command_list):
print 'Running command: %s' % command_list[0]
p = subprocess.Popen(command_list[0], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=command_list[1])
stdout_data, _ = p.communicate()
print 'Command output: %s' % stdout_data
if p.returncode != 0:
raise RuntimeError('Command %s failed: exit code: %s' % (command_list[0], p.returncode))
def run(self):
for command in CUSTOM_COMMANDS:
self.RunCustomCommand(command)
REQUIRED_PACKAGES = []
setuptools.setup(
name='xgboost-install-examble',
version='0.0.1',
description='xgboost workflow packages.',
packages=setuptools.find_packages(),
#install_requires=REQUIRED_PACKAGES,
cmdclass={
'build': build,
'CustomCommands': CustomCommands,
}
)
After creating setup.py, specify it with the setup_file option of pipeline. The following is an example of specifying options in Python code. (In my environment, it didn't seem to work unless I specified it with the full path)
xgboost_pipeline.py
setup_options = options.view_as(beam.utils.pipeline_options.SetupOptions)
setup_options.setup_file = "/home/orfeon/dataflow-sample/python/xgboost/setup.py" # set fullpath
Execute Pipeline with the option specified, and if the installation is successful, the xgboost code will run in the pipeline. In the code of previous article, the following code that used sklearn's regressor was replaced with xgboost worked.
xgboost_pipeline.py(learn_inside predict)
~ Omitted ~
year_max = data["year"].max()
train = data[data["year"] < year_max]
test = data[data["year"] == year_max]
dtrain = xgb.DMatrix(train[learn_attrs].values, label=train[target_attr].values, feature_names=learn_attrs)
dtest = xgb.DMatrix(test[learn_attrs].values, label=test[target_attr].values, feature_names=learn_attrs)
evals_result = {}
watchlist = [(dtrain, 'train'),(dtest, 'test')]
best_params = {'objective': 'reg:linear',
'eval_metric': 'rmse',
'learning_rate': 0.01,
'max_depth': 3,
'colsample_bytree': 0.65,
'subsample': 0.55,
'min_child_weight': 7.0,
'reg_alpha': 0.6,'reg_lambda': 0.7,'gamma': 1.2}
model = xgb.train(best_params, dtrain,
num_boost_round=1000,
evals=watchlist,
evals_result=evals_result,
verbose_eval=True)
test.loc[:, "predict"] = model.predict(dtest)
If you create a custom setup.py, you will be able to run various libraries on Dataflow. Dataflow will not be used not only for mass execution using your favorite machine learning library, but also for easy execution of preprocessing that requires a large amount of computer resources such as image data processing using an image processing library. I think. There are still some restrictions, such as only working with Python 2 and not working in streaming mode, but it seems that it will be gradually supported.
Although the XG boost that was run this time is distributed, the learning itself was operated on a single node, but the version that the algorithm that distributes and learns to multiple nodes works ([XGBoost4J](http://dmlc.ml/ 2016/03/14 / xgboost4j-portable-distributed-xgboost-in-spark-flink-and-dataflow.html)) is also being developed. Currently it seems to run on Spark, Flink as an execution engine, but Development Roadmap also includes support for Cloud Dataflow (Apache Beam). I'm looking forward to the future.
Recommended Posts