This is a test to run Spark on iPython Notebook (Jupyter) and run MLlib. I tried clustering (KMeans), classification: Classification (SVM, logistic regression, Random Forest) with iris data.
environment
Please note that this article describes what was done in the above environment, so the settings may differ in other environments.
http://spark.apache.org/downloads.html From spark-1.5.0-bin-hadoop2.6.tgz Download. (As of September 14, 2015)
Answer the downloaded binary and place it in the appropriate place. Here, it is placed in / usr / local / bin /
.
tar zxvf spark-1.5.0-bin-hadoop2.6.tar
mv spark-1.5.0-bin-hadoop2.6 /usr/local/bin/
Set the environment variable SPARK_HOME
to .bashrc. Please add the following.
(For the first time, reload with source ~ / .bashrc
and read the environment variables.)
.bashrc
export SPARK_HOME=/usr/local/bin/spark-1.5.0-bin-hadoop2.6
Launch iPython Notebook when you're ready.
ipython notebook
Execute the following code in iPython Notebook.
import os, sys
from datetime import datetime as dt
print "loading PySpark setting..."
spark_home = os.environ.get('SPARK_HOME', None)
if not spark_home:
raise ValueError('SPARK_HOME environment variable is not set')
sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.8.2.1-src.zip'))
execfile(os.path.join(spark_home, 'python/pyspark/shell.py'))
If you see something like this, you're successful: laughing:
loading PySpark setting...
/usr/local/bin/spark-1.5.0-bin-hadoop2.6
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 1.5.0
/_/
Using Python version 2.7.10 (default, May 28 2015 17:04:42)
SparkContext available as sc, HiveContext available as sqlContext.
Use the familiar iris dataset. I will use this because it is easier to obtain than Scikit-learn. For the time being, try a combination of ('sepal length','petal length'). Anyway, let's plot the scatter plot.
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn import datasets
plt.style.use('ggplot')
# http://scikit-learn.org/stable/auto_examples/datasets/plot_iris_dataset.html
iris = datasets.load_iris()
plt.figure(figsize=(9,7))
for i, color in enumerate('rgb'):
idx = np.where(iris.target == i)[0]
plt.scatter(iris.data[idx,0],iris.data[idx,2], c=color, s=30, alpha=.7)
plt.show()
Three types of Iris-Setosa, Iris-Versicolour, and Iris-Virginica are displayed in different colors.
From this data, we will try KMeans, one of the clustering methods for unsupervised learning. Since there are originally three types of iris, let's set k = 3 and see if it can be judged correctly.
# http://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#module-pyspark.mllib.clustering
from pyspark.mllib.clustering import KMeans
k = 3
d_start = dt.now()
#Convert data so Spark can read
data = sc.parallelize(iris.data[:,[0,2]])
#Learn with KMeans
model = KMeans.train(data, k, initializationMode="random", seed=None)
#Result display
print("Final centers: " + str(model.clusterCenters))
print("Total Cost: " + str(model.computeCost(data)))
diff = dt.now() - d_start
print("{}: [end] {}".format(dt.now().strftime('%H:%M:%S'), diff ))
# ---------- Draw Graph ---------- #
plt.figure(figsize=(9,7))
for i, color in enumerate('rgb'):
idx = np.where(iris.target == i)[0]
plt.scatter(iris.data[idx,0],iris.data[idx,2], c=color, s=30, alpha=.7)
for i in range(k):
plt.scatter(model.clusterCenters[i][0], model.clusterCenters[i][1], s=200, c="purple", alpha=.7, marker="^")
plt.show()
Somehow, the Center calculated by K Means can be plotted near the center of each type: wink: (The result of KMeans depends on the initial value, so it may be more strange.)
There are only a few parts that use Spark, but if you set the distributed processing properly, the processing will be distributed. (This time it is a trial with Standalone, so it is not distributed)
The point is that numpy's ndarray is converted to Spark's RDD by sc.parallelize () and passed to the training function train () of Spark's KMeans class.
data = sc.parallelize(iris.data[:,[0,2]])
model = KMeans.train(data, k, initializationMode="random", seed=None)
Use model.clusterCenters
to get the center points of each clustered into k groups. In the code below, we get k = 3 centers and plot the ▲ mark.
for i in range(k):
plt.scatter(model.clusterCenters[i][0], model.clusterCenters[i][1], s=200, c="purple", alpha=.7, marker="^")
Finally, let's visualize how each point of the coordinates is clustered.
# ------- Create Color Map ------- #
xmin = 4.0
xmax = 8.5
ymin = 0
ymax = 8
n = 100
xx = np.linspace(xmin, xmax, n)
yy = np.linspace(ymin, ymax, n)
X, Y = np.meshgrid(xx, yy)
# 2015.9.15 Addition: Distributed processing of predict
f_XY = np.column_stack([X.flatten(), Y.flatten()])
sc_XY = sc.parallelize(f_XY)
res = sc_XY.map(lambda data: model.predict(data)) #Prediction execution of which point is classified from the learned data
Z = np.array(res.collect()).reshape(X.shape)
# 2015.9.15 Delete
#Z = np.zeros_like(X)
#for i in range(n):
# for j in range(n):
# Z[i,j] = model.predict([xx[j],yy[i]])
# ---------- Draw Graph ---------- #
plt.figure(figsize=(9,7))
plt.xlim(xmin, xmax)
plt.ylim(ymin, ymax)
for i, color in enumerate('rgb'):
idx = np.where(iris.target == i)[0]
plt.scatter(iris.data[idx,0],iris.data[idx,2], c=color, s=30, alpha=.7, zorder=100)
for i in range(k):
plt.scatter(model.clusterCenters[i][0], model.clusterCenters[i][1], s=200, c="purple", alpha=.7, marker="^", zorder=100)
plt.pcolor(X, Y, Z, alpha=0.3)
Similarly, this time it's a support vector machine. Since this is a binary classification, we will narrow down the iris data to two types.
from pyspark.mllib.classification import SVMWithSGD
from pyspark.mllib.regression import LabeledPoint
#Since SVM is a binary classification, narrow down to two types
idx = np.r_[ np.where(iris.target == 1)[0], np.where(iris.target == 2)[0]]
#Convert data so Spark can read
dat = np.column_stack([iris.target[idx]-1, iris.data[idx,0],iris.data[idx,2]])
data = sc.parallelize(dat)
def parsePoint(vec):
return LabeledPoint(vec[0], vec[1:])
parsedData = data.map(parsePoint)
#Learning execution with SVM
model = SVMWithSGD.train(parsedData, iterations=5000)
# ------- Predict Data ------- #
# 2015.9.15 Addition: Distributed processing of predict
f_XY = np.column_stack([X.flatten(), Y.flatten()])
sc_XY = sc.parallelize(f_XY)
res = sc_XY.map(lambda data: model.predict(data)) #Prediction execution of which point is classified from the learned data
Z = np.array(res.collect()).reshape(X.shape)
# 2015.9.15 Delete
#Z = np.zeros_like(X)
#for i in range(n):
# for j in range(n):
# Z[i,j] = model.predict([xx[j],yy[i]])
# ---------- Draw Graph ---------- #
plt.figure(figsize=(9,7))
xmin = 4.0
xmax = 8.5
ymin = 2
ymax = 8
plt.xlim(xmin, xmax)
plt.ylim(ymin, ymax)
#Plot points
for i, color in enumerate('rb'):
idx = np.where(iris.target == i+1)[0]
plt.scatter(iris.data[idx,0],iris.data[idx,2], c=color, s=30, alpha=.7, zorder=100)
#Fill drawing
plt.pcolor(X, Y, Z, alpha=0.3)
Logistic regression. This is also a binary classification.
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
#Learning execution with logistic regression
model = LogisticRegressionWithLBFGS.train(parsedData)
# ------- Predict Data ------- #
# 2015.9.15 Addition: Distributed processing of predict
f_XY = np.column_stack([X.flatten(), Y.flatten()])
sc_XY = sc.parallelize(f_XY)
res = sc_XY.map(lambda data: model.predict(data)) #Prediction execution of which point is classified from the learned data
Z = np.array(res.collect()).reshape(X.shape)
# 2015.9.15 Delete
#Z = np.zeros_like(X)
#for i in range(n):
# for j in range(n):
# Z[i,j] = model.predict([xx[j],yy[i]])
# ---------- Draw Graph ---------- #
plt.figure(figsize=(9,7))
plt.xlim(xmin, xmax)
plt.ylim(ymin, ymax)
#Plot points
for i, color in enumerate('rb'):
idx = np.where(iris.target == i+1)[0]
plt.scatter(iris.data[idx,0],iris.data[idx,2], c=color, s=30, alpha=.7, zorder=100)
#Fill drawing
plt.pcolor(X, Y, Z, alpha=0.3)
Finally, there is Random Forest. Since multi-value classification is possible here, we will also classify by 3 types of iris.
from pyspark.mllib.tree import RandomForest, RandomForestModel
#Convert data so Spark can read
dat = np.column_stack([iris.target[:], iris.data[:,0],iris.data[:,2]])
data = sc.parallelize(dat)
parsedData = data.map(parsePoint)
#Divided into training data and test data
(trainingData, testData) = parsedData.randomSplit([0.7, 0.3])
#Learning execution in random forest
model = RandomForest.trainClassifier(trainingData, numClasses=3,
categoricalFeaturesInfo={},
numTrees=5, featureSubsetStrategy="auto",
impurity='gini', maxDepth=4, maxBins=32)
# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() / float(testData.count())
print('Test Error = ' + str(testErr))
print('Learned classification forest model:')
print(model.toDebugString())
The tree structure of the classification result can also be displayed with toDebugString ()
.
out
Test Error = 0.0588235294118
Learned classification forest model:
TreeEnsembleModel classifier with 5 trees
Tree 0:
If (feature 1 <= 1.9)
Predict: 0.0
Else (feature 1 > 1.9)
If (feature 1 <= 4.8)
If (feature 0 <= 4.9)
Predict: 2.0
Else (feature 0 > 4.9)
Predict: 1.0
Else (feature 1 > 4.8)
If (feature 1 <= 5.0)
If (feature 0 <= 6.3)
Predict: 2.0
Else (feature 0 > 6.3)
Predict: 1.0
Else (feature 1 > 5.0)
Predict: 2.0
Tree 1:
If (feature 1 <= 1.9)
Predict: 0.0
Else (feature 1 > 1.9)
If (feature 1 <= 4.7)
If (feature 0 <= 4.9)
Predict: 2.0
Else (feature 0 > 4.9)
Predict: 1.0
Else (feature 1 > 4.7)
If (feature 0 <= 6.5)
Predict: 2.0
Else (feature 0 > 6.5)
If (feature 1 <= 5.0)
Predict: 1.0
Else (feature 1 > 5.0)
Predict: 2.0
Tree 2:
If (feature 1 <= 1.9)
Predict: 0.0
Else (feature 1 > 1.9)
If (feature 1 <= 4.8)
If (feature 1 <= 4.7)
Predict: 1.0
Else (feature 1 > 4.7)
If (feature 0 <= 5.9)
Predict: 1.0
Else (feature 0 > 5.9)
Predict: 2.0
Else (feature 1 > 4.8)
Predict: 2.0
Tree 3:
If (feature 1 <= 1.9)
Predict: 0.0
Else (feature 1 > 1.9)
If (feature 1 <= 4.8)
Predict: 1.0
Else (feature 1 > 4.8)
Predict: 2.0
Tree 4:
If (feature 1 <= 1.9)
Predict: 0.0
Else (feature 1 > 1.9)
If (feature 1 <= 4.7)
If (feature 0 <= 4.9)
Predict: 2.0
Else (feature 0 > 4.9)
Predict: 1.0
Else (feature 1 > 4.7)
If (feature 1 <= 5.0)
If (feature 0 <= 6.0)
Predict: 2.0
Else (feature 0 > 6.0)
Predict: 1.0
Else (feature 1 > 5.0)
Predict: 2.0
Draw the result.
# ------- Predict Data ------- #
Z = np.zeros_like(X)
for i in range(n):
for j in range(n):
Z[i,j] = model.predict([xx[j],yy[i]])
# ---------- Draw Graph ---------- #
plt.figure(figsize=(9,7))
xmin = 4.0
xmax = 8.5
ymin = 0
ymax = 8
plt.xlim(xmin, xmax)
plt.ylim(ymin, ymax)
#Plot points
for i, color in enumerate('rgb'):
idx = np.where(iris.target == i)[0]
plt.scatter(iris.data[idx,0],iris.data[idx,2], c=color, s=30, alpha=.7, zorder=100)
#Fill drawing
plt.pcolor(X, Y, Z, alpha=0.3)
Next, we will deal with recommendations. "[Machine learning] Run Spark MLlib with Python and make recommendations" http://qiita.com/kenmatsu4/items/42fa2f17865f7914688d
An error is occurring in parallelizing predict () of RandomForest, and we are looking for a solution.
-> Apparently this is the cause ... "Spark / RDD cannot be nested!" I wonder if it can be solved ...
How-to: Use IPython Notebook with Apache Spark http://blog.cloudera.com/blog/2014/08/how-to-use-ipython-notebook-with-apache-spark/
Spark 1.5.0 Machine Learning Library (MLlib) Guide http://spark.apache.org/docs/latest/mllib-guide.html
Recommended Posts