Run XGBoost with Cloud Dataflow (Python)

Previous article A memorandum of the procedure for running the machine learning pipeline with Cloud Dataflow.

Thing you want to do

Last time, I did machine learning using scikit-learn and pandas that were pre-installed on the node used in Cloud Dataflow, but in the actual machine learning pipeline, I like to put a library for preprocessing such as OpenCV. You will want to install and analyze various machine learning libraries. This time, I would like to write how to install and run XGBoost, which is also popular in Kaggle, as an example of the procedure for installing arbitrary libraries in Cloud Dataflow.

How to install the library in Dataflow

As you can see in the Official Documents, there are three main ways to install your favorite library on Cloud Dataflow (Python). is.

Libraries like XBGoost require a third step. Below we will see how to create a custom setup.py for XB Goost.

How to create a custom setup.py

As you can see in the example code of the official repository, write according to the setuptools method. I will. If there is a Python-dependent library, describe it in install_requires, but if you want to execute your own shell command for installation, create a command class that inherits the Command class of setuptools and describe the command execution procedure. The following is the XGBoost installation command written based on the above official example code. Basically, the installation procedure of XGBoost is followed, but in order to execute with subprocess.Popen, the execution path is specified for the command that involves moving the directory.

setup.py


from distutils.command.build import build as _build
import subprocess
import setuptools


class build(_build):
    sub_commands = _build.sub_commands + [('CustomCommands', None)]

CUSTOM_COMMANDS = [(["sudo","apt-get","update"],"."),
                   (["sudo","apt-get","install","git","build-essential","libatlas-base-dev","-y"],"."), #"python-dev","gfortran"
                   (["git","clone","--recursive","https://github.com/dmlc/xgboost"],"."),
                   (["sudo","make"],"xgboost"),
                   (["sudo","python","setup.py","install"],"xgboost/python-package"),
                   (["sudo","pip","install","xgboost"],".")]


class CustomCommands(setuptools.Command):

    def initialize_options(self):
        pass

    def finalize_options(self):
        pass

    def RunCustomCommand(self, command_list):
        print 'Running command: %s' % command_list[0]
        p = subprocess.Popen(command_list[0], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=command_list[1])
        stdout_data, _ = p.communicate()
        print 'Command output: %s' % stdout_data
        if p.returncode != 0:
            raise RuntimeError('Command %s failed: exit code: %s' % (command_list[0], p.returncode))

    def run(self):
        for command in CUSTOM_COMMANDS:
            self.RunCustomCommand(command)


REQUIRED_PACKAGES = []

setuptools.setup(
    name='xgboost-install-examble',
    version='0.0.1',
    description='xgboost workflow packages.',
    packages=setuptools.find_packages(),
    #install_requires=REQUIRED_PACKAGES,
    cmdclass={
        'build': build,
        'CustomCommands': CustomCommands,
    }
)

After creating setup.py, specify it with the setup_file option of pipeline. The following is an example of specifying options in Python code. (In my environment, it didn't seem to work unless I specified it with the full path)

xgboost_pipeline.py


setup_options = options.view_as(beam.utils.pipeline_options.SetupOptions)
setup_options.setup_file = "/home/orfeon/dataflow-sample/python/xgboost/setup.py" # set fullpath

Execute Pipeline with the option specified, and if the installation is successful, the xgboost code will run in the pipeline. In the code of previous article, the following code that used sklearn's regressor was replaced with xgboost worked.

xgboost_pipeline.py(learn_inside predict)


~ Omitted ~

year_max = data["year"].max()
train = data[data["year"] <  year_max]
test  = data[data["year"] == year_max]

dtrain = xgb.DMatrix(train[learn_attrs].values, label=train[target_attr].values, feature_names=learn_attrs)
dtest  = xgb.DMatrix(test[learn_attrs].values, label=test[target_attr].values, feature_names=learn_attrs)

evals_result = {}
watchlist = [(dtrain, 'train'),(dtest, 'test')]
best_params = {'objective': 'reg:linear',
               'eval_metric': 'rmse',
               'learning_rate': 0.01,
               'max_depth': 3,
               'colsample_bytree': 0.65,
               'subsample': 0.55,
               'min_child_weight': 7.0,
               'reg_alpha': 0.6,'reg_lambda': 0.7,'gamma': 1.2}

model = xgb.train(best_params, dtrain,
                  num_boost_round=1000,
                  evals=watchlist,
                  evals_result=evals_result,
                  verbose_eval=True)

test.loc[:, "predict"] = model.predict(dtest)

in conclusion

If you create a custom setup.py, you will be able to run various libraries on Dataflow. Dataflow will not be used not only for mass execution using your favorite machine learning library, but also for easy execution of preprocessing that requires a large amount of computer resources such as image data processing using an image processing library. I think. There are still some restrictions, such as only working with Python 2 and not working in streaming mode, but it seems that it will be gradually supported.

* Dataflow support for distributed version XGBoost

Although the XG boost that was run this time is distributed, the learning itself was operated on a single node, but the version that the algorithm that distributes and learns to multiple nodes works ([XGBoost4J](http://dmlc.ml/ 2016/03/14 / xgboost4j-portable-distributed-xgboost-in-spark-flink-and-dataflow.html)) is also being developed. Currently it seems to run on Spark, Flink as an execution engine, but Development Roadmap also includes support for Cloud Dataflow (Apache Beam). I'm looking forward to the future.

Recommended Posts

Run XGBoost with Cloud Dataflow (Python)
Run a machine learning pipeline with Cloud Dataflow (Python)
Run Cloud Dataflow (Python) from App Engine
Run Python with VBA
Run prepDE.py with python3
Run Blender with python
Cloud Run tutorial (python)
Run iperf with python
Run python with PyCharm (Windows)
Run Python with CloudFlash (arm926ej-s)
Run Label with tkinter [Python]
Run DHT22 with RasPi + Python
[Package cloud] Manage python packages with package cloud
Run Rotrics DexArm with python API
Run mruby with Python or Blender
Run Aprili from Python with Orange
Run python3 Django1.9 with mod_wsgi (deploy)
Until you run python with apache
Changes to run "Using Cloud Dataflow + Cloud Pub / Sub + Fluentd ..." with SDK 2.1
Build a detonation velocity website with Cloud Run and Python (Flask)
Run the IDCF cloud CLI with Docker
Run servo with Python on ESP32 (Windows)
Use Tabpy with Cloud Run (on GKE)
Try using Python with Google Cloud Functions
[GCP] Operate Google Cloud Storage with Python
Text mining with Python ② Visualization with Word Cloud
Run a Python web application with Docker
Process big data with Dataflow (ApacheBeam) + Python3
FizzBuzz with Python3
Scraping with Python
Statistics with python
Scraping with Python
Python with Go
Twilio with Python
Integrate with Python
Play with 2016-Python
AES256 with python
Tested with Python
python starts with ()
with syntax (Python)
Bingo with python
Zundokokiyoshi with python
Excel with Python
Microcomputer with Python
Cast with python
Text extraction with GCP Cloud Vision API (Python3.6)
Try it with Word Cloud Japanese Python JupyterLab.
Python> Run with run-time arguments> Use import argparse
Easily realize microservices with Cloud Run x Flask
Run VMware vSphere 6 vSphere API with Python script (pyvmomi)
Run Flask on CentOS with python3.4, Gunicorn + Nginx.
Use Python / Django with Windows Azure Cloud Service!
[Cloud102] # 1 Get Started with Python (Part 1 Python First Steps)
How to enable python3 to run when sending jobs from GCP Cloud Composer to Dataflow
Serial communication with Python
Zip, unzip with python
Django 1.11 started with Python3.6
Primality test with Python
Socket communication with Python
Data analysis with python 2
Try scraping with Python.