Il s'agit d'un test pour exécuter Spark sur iPython Notebook (Jupyter) et exécuter MLlib. J'ai essayé le clustering (KMeans), la classification: Classification (SVM, régression logistique, Random Forest) avec des données d'iris.
environnement
Veuillez noter que cet article décrit ce qui a été fait dans l'environnement ci-dessus, de sorte que les paramètres peuvent différer dans d'autres environnements.
http://spark.apache.org/downloads.html De spark-1.5.0-bin-hadoop2.6.tgz Télécharger. (Au 14 septembre 2015)
Répondez au fichier binaire téléchargé et placez-le à l'endroit approprié. Ici, il est placé dans / usr / local / bin /
.
tar zxvf spark-1.5.0-bin-hadoop2.6.tar
mv spark-1.5.0-bin-hadoop2.6 /usr/local/bin/
Définissez la variable d'environnement SPARK_HOME
sur .bashrc. Veuillez ajouter ce qui suit.
(Pour la première fois, rechargez avec source ~ / .bashrc
et lisez les variables d'environnement.)
.bashrc
export SPARK_HOME=/usr/local/bin/spark-1.5.0-bin-hadoop2.6
Lancez iPython Notebook lorsque vous êtes prêt.
ipython notebook
Exécutez le code suivant dans iPython Notebook.
import os, sys
from datetime import datetime as dt
print "loading PySpark setting..."
spark_home = os.environ.get('SPARK_HOME', None)
if not spark_home:
raise ValueError('SPARK_HOME environment variable is not set')
sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.8.2.1-src.zip'))
execfile(os.path.join(spark_home, 'python/pyspark/shell.py'))
Si vous voyez quelque chose comme ça, vous réussissez: rire:
loading PySpark setting...
/usr/local/bin/spark-1.5.0-bin-hadoop2.6
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 1.5.0
/_/
Using Python version 2.7.10 (default, May 28 2015 17:04:42)
SparkContext available as sc, HiveContext available as sqlContext.
Utilisez le jeu de données iris familier. Je vais l'utiliser car il est plus facile à obtenir que Scikit-learn. Pour le moment, essayez d'utiliser une combinaison de ('longueur sépale', 'longueur pétale'). Quoi qu'il en soit, traçons le diagramme de dispersion.
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn import datasets
plt.style.use('ggplot')
# http://scikit-learn.org/stable/auto_examples/datasets/plot_iris_dataset.html
iris = datasets.load_iris()
plt.figure(figsize=(9,7))
for i, color in enumerate('rgb'):
idx = np.where(iris.target == i)[0]
plt.scatter(iris.data[idx,0],iris.data[idx,2], c=color, s=30, alpha=.7)
plt.show()
Trois types d'Iris-Setosa, Iris-Versicolour et Iris-Virginica sont affichés dans différentes couleurs.
À partir de ces données, nous allons essayer KMeans, l'une des méthodes de clustering pour l'apprentissage non supervisé. Puisqu'il existe à l'origine trois types d'iris, définissons k = 3 et voyons s'il peut être jugé correctement.
# http://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#module-pyspark.mllib.clustering
from pyspark.mllib.clustering import KMeans
k = 3
d_start = dt.now()
#Convertir les données pour que Spark puisse lire
data = sc.parallelize(iris.data[:,[0,2]])
#Apprenez avec KMeans
model = KMeans.train(data, k, initializationMode="random", seed=None)
#Affichage des résultats
print("Final centers: " + str(model.clusterCenters))
print("Total Cost: " + str(model.computeCost(data)))
diff = dt.now() - d_start
print("{}: [end] {}".format(dt.now().strftime('%H:%M:%S'), diff ))
# ---------- Draw Graph ---------- #
plt.figure(figsize=(9,7))
for i, color in enumerate('rgb'):
idx = np.where(iris.target == i)[0]
plt.scatter(iris.data[idx,0],iris.data[idx,2], c=color, s=30, alpha=.7)
for i in range(k):
plt.scatter(model.clusterCenters[i][0], model.clusterCenters[i][1], s=200, c="purple", alpha=.7, marker="^")
plt.show()
D'une manière ou d'une autre, le Centre calculé par K Moyennes peut être tracé près du centre de chaque type: wink: (Le résultat de KMeans dépend de la valeur initiale, donc cela peut être plus étrange.)
Seules quelques parties utilisent Spark, mais si vous définissez correctement le traitement distribué, le traitement sera distribué. (Cette fois, il s'agit d'un essai avec Standalone, donc il n'est pas distribué)
Le fait est que le ndarray de numpy est converti en RDD de Spark par sc.parallelize () et passé à la fonction d'entraînement train () de la classe KMeans de Spark.
data = sc.parallelize(iris.data[:,[0,2]])
model = KMeans.train(data, k, initializationMode="random", seed=None)
Utilisez model.clusterCenters
pour obtenir les points centraux de chaque cluster en k groupes. Dans le code ci-dessous, nous obtenons k = 3 centres et traçons la marque ▲.
for i in range(k):
plt.scatter(model.clusterCenters[i][0], model.clusterCenters[i][1], s=200, c="purple", alpha=.7, marker="^")
Enfin, visualisons comment chaque point des coordonnées a été regroupé.
# ------- Create Color Map ------- #
xmin = 4.0
xmax = 8.5
ymin = 0
ymax = 8
n = 100
xx = np.linspace(xmin, xmax, n)
yy = np.linspace(ymin, ymax, n)
X, Y = np.meshgrid(xx, yy)
# 2015.9.15 Ajout: Traitement distribué de prédire
f_XY = np.column_stack([X.flatten(), Y.flatten()])
sc_XY = sc.parallelize(f_XY)
res = sc_XY.map(lambda data: model.predict(data)) #Exécution de la prédiction du point classé à partir des données apprises
Z = np.array(res.collect()).reshape(X.shape)
# 2015.9.15 Supprimer
#Z = np.zeros_like(X)
#for i in range(n):
# for j in range(n):
# Z[i,j] = model.predict([xx[j],yy[i]])
# ---------- Draw Graph ---------- #
plt.figure(figsize=(9,7))
plt.xlim(xmin, xmax)
plt.ylim(ymin, ymax)
for i, color in enumerate('rgb'):
idx = np.where(iris.target == i)[0]
plt.scatter(iris.data[idx,0],iris.data[idx,2], c=color, s=30, alpha=.7, zorder=100)
for i in range(k):
plt.scatter(model.clusterCenters[i][0], model.clusterCenters[i][1], s=200, c="purple", alpha=.7, marker="^", zorder=100)
plt.pcolor(X, Y, Z, alpha=0.3)
De même, cette fois, il s'agit d'une machine à vecteurs de support. Puisqu'il s'agit d'une classification binaire, nous limiterons les données d'iris à deux types.
from pyspark.mllib.classification import SVMWithSGD
from pyspark.mllib.regression import LabeledPoint
#Puisque SVM est une classification binaire, réduisez à deux types
idx = np.r_[ np.where(iris.target == 1)[0], np.where(iris.target == 2)[0]]
#Convertir les données pour que Spark puisse lire
dat = np.column_stack([iris.target[idx]-1, iris.data[idx,0],iris.data[idx,2]])
data = sc.parallelize(dat)
def parsePoint(vec):
return LabeledPoint(vec[0], vec[1:])
parsedData = data.map(parsePoint)
#Apprenez l'exécution avec SVM
model = SVMWithSGD.train(parsedData, iterations=5000)
# ------- Predict Data ------- #
# 2015.9.15 Ajout: Traitement distribué de prédire
f_XY = np.column_stack([X.flatten(), Y.flatten()])
sc_XY = sc.parallelize(f_XY)
res = sc_XY.map(lambda data: model.predict(data)) #Exécution de la prédiction du point classé à partir des données apprises
Z = np.array(res.collect()).reshape(X.shape)
# 2015.9.15 Supprimer
#Z = np.zeros_like(X)
#for i in range(n):
# for j in range(n):
# Z[i,j] = model.predict([xx[j],yy[i]])
# ---------- Draw Graph ---------- #
plt.figure(figsize=(9,7))
xmin = 4.0
xmax = 8.5
ymin = 2
ymax = 8
plt.xlim(xmin, xmax)
plt.ylim(ymin, ymax)
#Points de tracé
for i, color in enumerate('rb'):
idx = np.where(iris.target == i+1)[0]
plt.scatter(iris.data[idx,0],iris.data[idx,2], c=color, s=30, alpha=.7, zorder=100)
#Remplir le dessin
plt.pcolor(X, Y, Z, alpha=0.3)
Régression logistique. C'est aussi une classification binaire.
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
#Exécution d'apprentissage avec régression logistique
model = LogisticRegressionWithLBFGS.train(parsedData)
# ------- Predict Data ------- #
# 2015.9.15 Ajout: Traitement distribué de prédire
f_XY = np.column_stack([X.flatten(), Y.flatten()])
sc_XY = sc.parallelize(f_XY)
res = sc_XY.map(lambda data: model.predict(data)) #Exécution de la prédiction du point classé à partir des données apprises
Z = np.array(res.collect()).reshape(X.shape)
# 2015.9.15 Supprimer
#Z = np.zeros_like(X)
#for i in range(n):
# for j in range(n):
# Z[i,j] = model.predict([xx[j],yy[i]])
# ---------- Draw Graph ---------- #
plt.figure(figsize=(9,7))
plt.xlim(xmin, xmax)
plt.ylim(ymin, ymax)
#Points de tracé
for i, color in enumerate('rb'):
idx = np.where(iris.target == i+1)[0]
plt.scatter(iris.data[idx,0],iris.data[idx,2], c=color, s=30, alpha=.7, zorder=100)
#Remplir le dessin
plt.pcolor(X, Y, Z, alpha=0.3)
Enfin, il y a une forêt aléatoire. La classification multi-valeurs étant possible ici, nous classerons également par 3 types d'iris.
from pyspark.mllib.tree import RandomForest, RandomForestModel
#Convertir les données pour que Spark puisse lire
dat = np.column_stack([iris.target[:], iris.data[:,0],iris.data[:,2]])
data = sc.parallelize(dat)
parsedData = data.map(parsePoint)
#Divisé en données d'entraînement et données de test
(trainingData, testData) = parsedData.randomSplit([0.7, 0.3])
#Exécutez l'apprentissage dans une forêt aléatoire
model = RandomForest.trainClassifier(trainingData, numClasses=3,
categoricalFeaturesInfo={},
numTrees=5, featureSubsetStrategy="auto",
impurity='gini', maxDepth=4, maxBins=32)
# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() / float(testData.count())
print('Test Error = ' + str(testErr))
print('Learned classification forest model:')
print(model.toDebugString())
L'arborescence du résultat de la classification peut également être affichée avec toDebugString ()
.
out
Test Error = 0.0588235294118
Learned classification forest model:
TreeEnsembleModel classifier with 5 trees
Tree 0:
If (feature 1 <= 1.9)
Predict: 0.0
Else (feature 1 > 1.9)
If (feature 1 <= 4.8)
If (feature 0 <= 4.9)
Predict: 2.0
Else (feature 0 > 4.9)
Predict: 1.0
Else (feature 1 > 4.8)
If (feature 1 <= 5.0)
If (feature 0 <= 6.3)
Predict: 2.0
Else (feature 0 > 6.3)
Predict: 1.0
Else (feature 1 > 5.0)
Predict: 2.0
Tree 1:
If (feature 1 <= 1.9)
Predict: 0.0
Else (feature 1 > 1.9)
If (feature 1 <= 4.7)
If (feature 0 <= 4.9)
Predict: 2.0
Else (feature 0 > 4.9)
Predict: 1.0
Else (feature 1 > 4.7)
If (feature 0 <= 6.5)
Predict: 2.0
Else (feature 0 > 6.5)
If (feature 1 <= 5.0)
Predict: 1.0
Else (feature 1 > 5.0)
Predict: 2.0
Tree 2:
If (feature 1 <= 1.9)
Predict: 0.0
Else (feature 1 > 1.9)
If (feature 1 <= 4.8)
If (feature 1 <= 4.7)
Predict: 1.0
Else (feature 1 > 4.7)
If (feature 0 <= 5.9)
Predict: 1.0
Else (feature 0 > 5.9)
Predict: 2.0
Else (feature 1 > 4.8)
Predict: 2.0
Tree 3:
If (feature 1 <= 1.9)
Predict: 0.0
Else (feature 1 > 1.9)
If (feature 1 <= 4.8)
Predict: 1.0
Else (feature 1 > 4.8)
Predict: 2.0
Tree 4:
If (feature 1 <= 1.9)
Predict: 0.0
Else (feature 1 > 1.9)
If (feature 1 <= 4.7)
If (feature 0 <= 4.9)
Predict: 2.0
Else (feature 0 > 4.9)
Predict: 1.0
Else (feature 1 > 4.7)
If (feature 1 <= 5.0)
If (feature 0 <= 6.0)
Predict: 2.0
Else (feature 0 > 6.0)
Predict: 1.0
Else (feature 1 > 5.0)
Predict: 2.0
Dessinez le résultat.
# ------- Predict Data ------- #
Z = np.zeros_like(X)
for i in range(n):
for j in range(n):
Z[i,j] = model.predict([xx[j],yy[i]])
# ---------- Draw Graph ---------- #
plt.figure(figsize=(9,7))
xmin = 4.0
xmax = 8.5
ymin = 0
ymax = 8
plt.xlim(xmin, xmax)
plt.ylim(ymin, ymax)
#Points de tracé
for i, color in enumerate('rgb'):
idx = np.where(iris.target == i)[0]
plt.scatter(iris.data[idx,0],iris.data[idx,2], c=color, s=30, alpha=.7, zorder=100)
#Remplir le dessin
plt.pcolor(X, Y, Z, alpha=0.3)
Ensuite, nous traiterons des recommandations. "[Apprentissage automatique] Exécutez Spark MLlib avec Python et faites des recommandations" http://qiita.com/kenmatsu4/items/42fa2f17865f7914688d
Une erreur se produit dans la parallélisation de predire () de RandomForest, et nous recherchons une solution.
-> Apparemment, c'est la cause ... "Spark / RDD ne peut pas être imbriqué!" Je me demande si cela peut être résolu ...
How-to: Use IPython Notebook with Apache Spark http://blog.cloudera.com/blog/2014/08/how-to-use-ipython-notebook-with-apache-spark/
Spark 1.5.0 Machine Learning Library (MLlib) Guide http://spark.apache.org/docs/latest/mllib-guide.html