Article précédent Un mémorandum de la procédure d'exécution du pipeline d'apprentissage automatique avec Cloud Dataflow.
La dernière fois, j'ai fait du machine learning à l'aide de scikit-learn et de pandas préinstallés sur le nœud utilisé dans Cloud Dataflow, mais dans le pipeline de machine learning, j'aime mettre une bibliothèque pour le prétraitement telle qu'OpenCV. Vous souhaiterez installer et analyser diverses bibliothèques d'apprentissage automatique. Cette fois, à titre d'exemple de la procédure d'installation d'une bibliothèque facultative dans Cloud Dataflow, j'aimerais écrire comment installer et exécuter XGBoost, qui est également populaire dans Kaggle.
Comme vous pouvez le voir dans le Document officiel, il existe trois méthodes principales pour installer votre bibliothèque préférée dans Cloud Dataflow (Python). est.
Les bibliothèques comme XBGoost nécessitent une troisième étape. Ci-dessous, nous verrons comment créer un setup.py personnalisé pour XB Goost.
Comme vous pouvez le voir dans l 'exemple de code du référentiel officiel, écrivez en fonction des outils de configuration. Je vais. S'il existe une bibliothèque dépendante de Python, décrivez-la dans install_requires, mais si vous souhaitez exécuter votre propre commande shell pour l'installation, créez une classe de commande qui hérite de la classe Command de setuptools et décrivez la procédure d'exécution de la commande. Voici la commande d'installation de XGBoost écrite sur la base de l'exemple de code officiel ci-dessus. Fondamentalement, la procédure d'installation de XGBoost est suivie, mais pour exécuter avec subprocess.Popen, le chemin d'exécution est spécifié pour la commande qui implique le déplacement du répertoire.
setup.py
from distutils.command.build import build as _build
import subprocess
import setuptools
class build(_build):
sub_commands = _build.sub_commands + [('CustomCommands', None)]
CUSTOM_COMMANDS = [(["sudo","apt-get","update"],"."),
(["sudo","apt-get","install","git","build-essential","libatlas-base-dev","-y"],"."), #"python-dev","gfortran"
(["git","clone","--recursive","https://github.com/dmlc/xgboost"],"."),
(["sudo","make"],"xgboost"),
(["sudo","python","setup.py","install"],"xgboost/python-package"),
(["sudo","pip","install","xgboost"],".")]
class CustomCommands(setuptools.Command):
def initialize_options(self):
pass
def finalize_options(self):
pass
def RunCustomCommand(self, command_list):
print 'Running command: %s' % command_list[0]
p = subprocess.Popen(command_list[0], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=command_list[1])
stdout_data, _ = p.communicate()
print 'Command output: %s' % stdout_data
if p.returncode != 0:
raise RuntimeError('Command %s failed: exit code: %s' % (command_list[0], p.returncode))
def run(self):
for command in CUSTOM_COMMANDS:
self.RunCustomCommand(command)
REQUIRED_PACKAGES = []
setuptools.setup(
name='xgboost-install-examble',
version='0.0.1',
description='xgboost workflow packages.',
packages=setuptools.find_packages(),
#install_requires=REQUIRED_PACKAGES,
cmdclass={
'build': build,
'CustomCommands': CustomCommands,
}
)
Après avoir créé setup.py, spécifiez-le avec l'option setup_file de pipeline. Voici un exemple de spécification d'options dans le code Python. (Dans mon environnement, cela ne semblait pas fonctionner à moins que je ne le spécifie avec le chemin complet)
xgboost_pipeline.py
setup_options = options.view_as(beam.utils.pipeline_options.SetupOptions)
setup_options.setup_file = "/home/orfeon/dataflow-sample/python/xgboost/setup.py" # set fullpath
Exécutez Pipeline avec l'option, et si l'installation réussit, le code xgboost s'exécutera dans le pipeline. Le code suivant que j'ai remplacé la partie qui utilisait le régresseur de sklearn par xgboost dans le code de article précédent fonctionnait.
xgboost_pipeline.py(learn_à l'intérieur de prévoir)
~ Omis ~
year_max = data["year"].max()
train = data[data["year"] < year_max]
test = data[data["year"] == year_max]
dtrain = xgb.DMatrix(train[learn_attrs].values, label=train[target_attr].values, feature_names=learn_attrs)
dtest = xgb.DMatrix(test[learn_attrs].values, label=test[target_attr].values, feature_names=learn_attrs)
evals_result = {}
watchlist = [(dtrain, 'train'),(dtest, 'test')]
best_params = {'objective': 'reg:linear',
'eval_metric': 'rmse',
'learning_rate': 0.01,
'max_depth': 3,
'colsample_bytree': 0.65,
'subsample': 0.55,
'min_child_weight': 7.0,
'reg_alpha': 0.6,'reg_lambda': 0.7,'gamma': 1.2}
model = xgb.train(best_params, dtrain,
num_boost_round=1000,
evals=watchlist,
evals_result=evals_result,
verbose_eval=True)
test.loc[:, "predict"] = model.predict(dtest)
Si vous créez un setup.py personnalisé, vous pourrez exécuter diverses bibliothèques sur Dataflow. Dataflow ne sera pas utilisé uniquement pour une exécution en masse à l'aide de votre bibliothèque d'apprentissage automatique préférée, mais également pour une exécution facile du prétraitement qui nécessite une grande quantité de ressources informatiques telles que le traitement des données d'image à l'aide d'une bibliothèque de traitement d'image. Je pense. Il y a encore quelques restrictions, comme travailler uniquement avec Python2 et ne pas fonctionner en mode streaming, mais il semble qu'il sera progressivement pris en charge.
Bien que le boost XG qui a été exécuté cette fois soit distribué, l'apprentissage lui-même était qu'il fonctionne sur un seul nœud, mais la version que l'algorithme qui distribue et apprend à plusieurs nœuds fonctionne (XGBoost4J 2016/03/14 / xgboost4j-portable-shared-xgboost-in-spark-flink-and-dataflow.html)) est également en cours de développement. Actuellement, il semble fonctionner sur Spark, Flink en tant que moteur d'exécution, mais Development Roadmap inclut également la prise en charge de Cloud Dataflow (Apache Beam). J'ai hâte à l'avenir.
Recommended Posts