[Machine learning] Start Spark with iPython Notebook and try MLlib

This is a test to run Spark on iPython Notebook (Jupyter) and run MLlib. I tried clustering (KMeans), classification: Classification (SVM, logistic regression, Random Forest) with iris data.

environment

Please note that this article describes what was done in the above environment, so the settings may differ in other environments.

1. Download & deploy Spark binaries

http://spark.apache.org/downloads.html From   spark-1.5.0-bin-hadoop2.6.tgz Download. (As of September 14, 2015)

Answer the downloaded binary and place it in the appropriate place. Here, it is placed in / usr / local / bin /.

tar zxvf spark-1.5.0-bin-hadoop2.6.tar 
mv spark-1.5.0-bin-hadoop2.6 /usr/local/bin/

Set the environment variable SPARK_HOME to .bashrc. Please add the following. (For the first time, reload with source ~ / .bashrc and read the environment variables.)

.bashrc


export SPARK_HOME=/usr/local/bin/spark-1.5.0-bin-hadoop2.6

Launch iPython Notebook when you're ready.

ipython notebook

1. Launch Spark with iPython Notebook

Execute the following code in iPython Notebook.

import os, sys
from datetime import datetime as dt
print "loading PySpark setting..."
spark_home = os.environ.get('SPARK_HOME', None)
if not spark_home:
    raise ValueError('SPARK_HOME environment variable is not set')
sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.8.2.1-src.zip'))
execfile(os.path.join(spark_home, 'python/pyspark/shell.py'))

If you see something like this, you're successful: laughing:

loading PySpark setting...
/usr/local/bin/spark-1.5.0-bin-hadoop2.6
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.5.0
      /_/

Using Python version 2.7.10 (default, May 28 2015 17:04:42)
SparkContext available as sc, HiveContext available as sqlContext.

2. Clustering with MLlib (KMeans)

2-1. Data preparation

Use the familiar iris dataset. I will use this because it is easier to obtain than Scikit-learn. For the time being, try a combination of ('sepal length','petal length'). Anyway, let's plot the scatter plot.

%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn import datasets
plt.style.use('ggplot')

# http://scikit-learn.org/stable/auto_examples/datasets/plot_iris_dataset.html
iris = datasets.load_iris()
plt.figure(figsize=(9,7))

for i, color in enumerate('rgb'):
    idx = np.where(iris.target == i)[0]
    plt.scatter(iris.data[idx,0],iris.data[idx,2], c=color, s=30, alpha=.7)

plt.show()

Three types of Iris-Setosa, Iris-Versicolour, and Iris-Virginica are displayed in different colors.

iris1-compressor.png

2-2 Clustering with KMeans

From this data, we will try KMeans, one of the clustering methods for unsupervised learning. Since there are originally three types of iris, let's set k = 3 and see if it can be judged correctly.

# http://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#module-pyspark.mllib.clustering

from pyspark.mllib.clustering import KMeans

k = 3
d_start = dt.now()

#Convert data so Spark can read
data = sc.parallelize(iris.data[:,[0,2]])

#Learn with KMeans
model = KMeans.train(data, k, initializationMode="random", seed=None)

#Result display
print("Final centers: " + str(model.clusterCenters))
print("Total Cost: " + str(model.computeCost(data)))
diff = dt.now() - d_start
print("{}: [end] {}".format(dt.now().strftime('%H:%M:%S'), diff ))

# ---------- Draw Graph ---------- # 
plt.figure(figsize=(9,7))
for i, color in enumerate('rgb'):
    idx = np.where(iris.target == i)[0]
    plt.scatter(iris.data[idx,0],iris.data[idx,2], c=color, s=30, alpha=.7)

for i in range(k):
    plt.scatter(model.clusterCenters[i][0], model.clusterCenters[i][1], s=200, c="purple", alpha=.7, marker="^")

plt.show()

Somehow, the Center calculated by K Means can be plotted near the center of each type: wink: (The result of KMeans depends on the initial value, so it may be more strange.)

iris2-compressor.png

There are only a few parts that use Spark, but if you set the distributed processing properly, the processing will be distributed. (This time it is a trial with Standalone, so it is not distributed)

The point is that numpy's ndarray is converted to Spark's RDD by sc.parallelize () and passed to the training function train () of Spark's KMeans class.

data = sc.parallelize(iris.data[:,[0,2]])
model = KMeans.train(data, k, initializationMode="random", seed=None)

Use model.clusterCenters to get the center points of each clustered into k groups. In the code below, we get k = 3 centers and plot the ▲ mark.

for i in range(k):
    plt.scatter(model.clusterCenters[i][0], model.clusterCenters[i][1], s=200, c="purple", alpha=.7, marker="^")

Finally, let's visualize how each point of the coordinates is clustered.

# ------- Create Color Map ------- #
xmin = 4.0
xmax = 8.5
ymin = 0
ymax = 8
n = 100
xx = np.linspace(xmin, xmax, n)
yy = np.linspace(ymin, ymax, n)
X, Y = np.meshgrid(xx, yy)

# 2015.9.15 Addition: Distributed processing of predict
f_XY = np.column_stack([X.flatten(), Y.flatten()])
sc_XY = sc.parallelize(f_XY)
res = sc_XY.map(lambda data: model.predict(data))  #Prediction execution of which point is classified from the learned data
Z = np.array(res.collect()).reshape(X.shape)

# 2015.9.15 Delete
#Z = np.zeros_like(X)
#for i in range(n):
#    for j in range(n):
#        Z[i,j] = model.predict([xx[j],yy[i]])
        
# ---------- Draw Graph ---------- # 
plt.figure(figsize=(9,7))
plt.xlim(xmin, xmax)
plt.ylim(ymin, ymax)

for i, color in enumerate('rgb'):
    idx = np.where(iris.target == i)[0]
    plt.scatter(iris.data[idx,0],iris.data[idx,2], c=color, s=30, alpha=.7, zorder=100)

for i in range(k):
    plt.scatter(model.clusterCenters[i][0], model.clusterCenters[i][1], s=200, c="purple", alpha=.7, marker="^", zorder=100)


plt.pcolor(X, Y, Z, alpha=0.3)

pcolor-compressor.png

3. Classification with MLlib (SVM)

Similarly, this time it's a support vector machine. Since this is a binary classification, we will narrow down the iris data to two types.

from pyspark.mllib.classification import SVMWithSGD
from pyspark.mllib.regression import LabeledPoint

#Since SVM is a binary classification, narrow down to two types
idx = np.r_[ np.where(iris.target == 1)[0],  np.where(iris.target == 2)[0]]

#Convert data so Spark can read
dat = np.column_stack([iris.target[idx]-1, iris.data[idx,0],iris.data[idx,2]])
data = sc.parallelize(dat)
def parsePoint(vec):
    return LabeledPoint(vec[0], vec[1:])
parsedData = data.map(parsePoint)

#Learning execution with SVM
model = SVMWithSGD.train(parsedData, iterations=5000)

# ------- Predict Data ------- #
# 2015.9.15 Addition: Distributed processing of predict
f_XY = np.column_stack([X.flatten(), Y.flatten()])
sc_XY = sc.parallelize(f_XY)
res = sc_XY.map(lambda data: model.predict(data))  #Prediction execution of which point is classified from the learned data
Z = np.array(res.collect()).reshape(X.shape)

# 2015.9.15 Delete
#Z = np.zeros_like(X)
#for i in range(n):
#    for j in range(n):
#        Z[i,j] = model.predict([xx[j],yy[i]])
   
        
# ---------- Draw Graph ---------- # 
plt.figure(figsize=(9,7))
xmin = 4.0
xmax = 8.5
ymin = 2
ymax = 8
plt.xlim(xmin, xmax)
plt.ylim(ymin, ymax)

#Plot points
for i, color in enumerate('rb'):
    idx = np.where(iris.target == i+1)[0]
    plt.scatter(iris.data[idx,0],iris.data[idx,2], c=color, s=30, alpha=.7, zorder=100)

#Fill drawing
plt.pcolor(X, Y, Z, alpha=0.3)

SVM1-compressor.png

4. Classification with MLlib (Logistic Regression)

Logistic regression. This is also a binary classification.

from pyspark.mllib.classification import LogisticRegressionWithLBFGS

#Learning execution with logistic regression
model = LogisticRegressionWithLBFGS.train(parsedData)

# ------- Predict Data ------- #
# 2015.9.15 Addition: Distributed processing of predict
f_XY = np.column_stack([X.flatten(), Y.flatten()])
sc_XY = sc.parallelize(f_XY)
res = sc_XY.map(lambda data: model.predict(data))  #Prediction execution of which point is classified from the learned data
Z = np.array(res.collect()).reshape(X.shape)

# 2015.9.15 Delete
#Z = np.zeros_like(X)
#for i in range(n):
#    for j in range(n):
#        Z[i,j] = model.predict([xx[j],yy[i]])
        
# ---------- Draw Graph ---------- # 
plt.figure(figsize=(9,7))
plt.xlim(xmin, xmax)
plt.ylim(ymin, ymax)

#Plot points
for i, color in enumerate('rb'):
    idx = np.where(iris.target == i+1)[0]
    plt.scatter(iris.data[idx,0],iris.data[idx,2], c=color, s=30, alpha=.7, zorder=100)

#Fill drawing
plt.pcolor(X, Y, Z, alpha=0.3)

Logistic1-compressor.png

5. Classification in MLlib (Random Forest)

Finally, there is Random Forest. Since multi-value classification is possible here, we will also classify by 3 types of iris.

from pyspark.mllib.tree import RandomForest, RandomForestModel

#Convert data so Spark can read
dat = np.column_stack([iris.target[:], iris.data[:,0],iris.data[:,2]])
data = sc.parallelize(dat)
parsedData = data.map(parsePoint)

#Divided into training data and test data
(trainingData, testData) = parsedData.randomSplit([0.7, 0.3])


#Learning execution in random forest
model = RandomForest.trainClassifier(trainingData, numClasses=3,
                                     categoricalFeaturesInfo={},
                                     numTrees=5, featureSubsetStrategy="auto",
                                     impurity='gini', maxDepth=4, maxBins=32)

# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() / float(testData.count())
print('Test Error = ' + str(testErr))
print('Learned classification forest model:')
print(model.toDebugString())

The tree structure of the classification result can also be displayed with toDebugString ().

out


Test Error = 0.0588235294118
Learned classification forest model:
TreeEnsembleModel classifier with 5 trees

  Tree 0:
    If (feature 1 <= 1.9)
     Predict: 0.0
    Else (feature 1 > 1.9)
     If (feature 1 <= 4.8)
      If (feature 0 <= 4.9)
       Predict: 2.0
      Else (feature 0 > 4.9)
       Predict: 1.0
     Else (feature 1 > 4.8)
      If (feature 1 <= 5.0)
       If (feature 0 <= 6.3)
        Predict: 2.0
       Else (feature 0 > 6.3)
        Predict: 1.0
      Else (feature 1 > 5.0)
       Predict: 2.0
  Tree 1:
    If (feature 1 <= 1.9)
     Predict: 0.0
    Else (feature 1 > 1.9)
     If (feature 1 <= 4.7)
      If (feature 0 <= 4.9)
       Predict: 2.0
      Else (feature 0 > 4.9)
       Predict: 1.0
     Else (feature 1 > 4.7)
      If (feature 0 <= 6.5)
       Predict: 2.0
      Else (feature 0 > 6.5)
       If (feature 1 <= 5.0)
        Predict: 1.0
       Else (feature 1 > 5.0)
        Predict: 2.0
  Tree 2:
    If (feature 1 <= 1.9)
     Predict: 0.0
    Else (feature 1 > 1.9)
     If (feature 1 <= 4.8)
      If (feature 1 <= 4.7)
       Predict: 1.0
      Else (feature 1 > 4.7)
       If (feature 0 <= 5.9)
        Predict: 1.0
       Else (feature 0 > 5.9)
        Predict: 2.0
     Else (feature 1 > 4.8)
      Predict: 2.0
  Tree 3:
    If (feature 1 <= 1.9)
     Predict: 0.0
    Else (feature 1 > 1.9)
     If (feature 1 <= 4.8)
      Predict: 1.0
     Else (feature 1 > 4.8)
      Predict: 2.0
  Tree 4:
    If (feature 1 <= 1.9)
     Predict: 0.0
    Else (feature 1 > 1.9)
     If (feature 1 <= 4.7)
      If (feature 0 <= 4.9)
       Predict: 2.0
      Else (feature 0 > 4.9)
       Predict: 1.0
     Else (feature 1 > 4.7)
      If (feature 1 <= 5.0)
       If (feature 0 <= 6.0)
        Predict: 2.0
       Else (feature 0 > 6.0)
        Predict: 1.0
      Else (feature 1 > 5.0)
       Predict: 2.0

Draw the result.

# ------- Predict Data ------- #
Z = np.zeros_like(X)
for i in range(n):
    for j in range(n):
        Z[i,j] = model.predict([xx[j],yy[i]])
        
# ---------- Draw Graph ---------- # 
plt.figure(figsize=(9,7))
xmin = 4.0
xmax = 8.5
ymin = 0
ymax = 8
plt.xlim(xmin, xmax)
plt.ylim(ymin, ymax)

#Plot points
for i, color in enumerate('rgb'):
    idx = np.where(iris.target == i)[0]
    plt.scatter(iris.data[idx,0],iris.data[idx,2], c=color, s=30, alpha=.7, zorder=100)

#Fill drawing
plt.pcolor(X, Y, Z, alpha=0.3)

randomforest-compressor.png

Next, we will deal with recommendations. "[Machine learning] Run Spark MLlib with Python and make recommendations"   http://qiita.com/kenmatsu4/items/42fa2f17865f7914688d

Scheduled to be fixed

An error is occurring in parallelizing predict () of RandomForest, and we are looking for a solution.

-> Apparently this is the cause ... "Spark / RDD cannot be nested!" I wonder if it can be solved ...

reference

How-to: Use IPython Notebook with Apache Spark  http://blog.cloudera.com/blog/2014/08/how-to-use-ipython-notebook-with-apache-spark/

Spark 1.5.0 Machine Learning Library (MLlib) Guide  http://spark.apache.org/docs/latest/mllib-guide.html

Recommended Posts

[Machine learning] Start Spark with iPython Notebook and try MLlib
[Machine learning] Try running Spark MLlib with Python and make recommendations
Try machine learning with Kaggle
Try machine learning with scikit-learn SVM
Use apache Spark with jupyter notebook (IPython notebook)
Introducing Spark to EC2 and linking iPython Notebook
Try using Jupyter Notebook of Azure Machine Learning
Start IPython with virtualenv
Easy Machine Learning with AutoAI (Part 4) Jupyter Notebook Edition
Machine learning with Raspberry Pi 4 and Coral USB Accelerator
Easy machine learning with scikit-learn and flask ✕ Web app
Try to predict forex (FX) with non-deep machine learning
Practical machine learning with Scikit-Learn and TensorFlow-TensorFlow gave up-
Machine learning learned with Pokemon
Parallel computing with iPython notebook
Try deep learning with TensorFlow
Machine learning with Jupyter Notebook in OCI Always Free environment (2019/12/17)
Build a machine learning scikit-learn environment with VirtualBox and Ubuntu
Machine learning with Python! Preparation
Try Deep Learning with FPGA
For those who want to start machine learning with TensorFlow2
Play with Jupyter Notebook (IPython Notebook)
Machine learning Minesweeper with PyTorch
Machine learning and mathematical optimization
Machine learning to learn with Nogizaka46 and Keyakizaka46 Part 1 Introduction
Try to predict if tweets will burn with machine learning
Run Apache-Spark with IPython Notebook
Beginning with Python machine learning
Graph drawing with IPython Notebook
Use Bokeh with IPython Notebook
Manga Recommendations with Machine Learning Part 1 First, try dividing without thinking
[Reading Notes] Hands-on Machine Learning with Scikit-Learn, Keras, and TensorFlow Chapter 1
"Gaussian process and machine learning" Gaussian process regression implemented only with Python numpy
Try Deep Learning with FPGA-Select Cucumbers
R & D life with iPython notebook
Reinforcement learning 13 Try Mountain_car with ChainerRL.
Significance of machine learning and mini-batch learning
[Machine learning] Try studying decision trees
I tried machine learning with liblinear
Machine learning with python (1) Overall classification
Try deep learning with TensorFlow Part 2
Machine learning beginners try linear regression
Classification and regression in machine learning
Organize machine learning and deep learning platforms
Build IPython Notebook environment with boot2docker
[Machine learning] Try studying random forest
Try Common Representation Learning with chainer
Quantum-inspired machine learning with tensor networks
Get started with machine learning with SageMaker
"Scraping & machine learning with Python" Learning memo
What to do if ipython and python start up with different versions
Vulkan compute with Python with VkInline and think about GPU machine learning and more
A story about automating online mahjong (Mahjong Soul) with OpenCV and machine learning
Machine Learning with Spark (86) with Spark (1) "Machine Learning with Spark" By Rajdeep Dua, Manpreet Singh Ghotra, Nick Pentreath
[Machine learning] OOB (Out-Of-Bag) and its ratio
Predict power demand with machine learning Part 2
Amplify images for machine learning with python
Machine learning imbalanced data sklearn with k-NN
Machine learning with python (2) Simple regression analysis
Use Jupyter Lab and Jupyter Notebook with EC2
Try SVM with scikit-learn on Jupyter Notebook