The second in the Spark series. Now we will use MLlib to implement recommendations using collaborative filtering.
First shot [Machine learning] Start Spark with iPython Notebook and try MLlib http://qiita.com/kenmatsu4/items/00ad151e857d546a97c3
environment
Please note that this article describes what was done in the above environment, so the settings may differ in other environments. Also, it is basically supposed to run Spark on iPython Notebook. Please refer to the above for how to do this.
First, start Spark.
#Start Spark
import os, sys
import pandas as pd
import numpy as np
from datetime import datetime as dt
print "loading PySpark setting..."
spark_home = os.environ.get('SPARK_HOME', None)
print spark_home
if not spark_home:
raise ValueError('SPARK_HOME environment variable is not set')
sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.8.2.1-src.zip'))
execfile(os.path.join(spark_home, 'python/pyspark/shell.py'))
This is the sample data included with Spark, and there is movie review information data called MovieLens, so use that. Read the data in SPARK_HOME as shown below.
#Example reading data
df = pd.read_csv(os.path.join(spark_home, 'data/mllib/als/sample_movielens_ratings.txt'),
delimiter='::', names=('uid', 'iid', 'rating','time'),engine='python')
pv_rating = df.pivot(index='uid', columns='iid', values='rating').fillna(0)
print pv_rating
The data looks like this. uid is the user ID, iid is the item (movie) ID, and the data inside is the movie rating for each user.
uid\iid | 0 | 1 | 2 | 3 | 4 | ... | 95 | 96 | 97 | 98 | 99 |
---|---|---|---|---|---|---|---|---|---|---|---|
0 | 0 | 0 | 3 | 1 | 0 | ... | 2 | 1 | 0 | 1 | 1 |
1 | 0 | 0 | 2 | 1 | 2 | ... | 0 | 1 | 1 | 0 | 0 |
2 | 0 | 0 | 0 | 0 | 3 | ... | 0 | 0 | 0 | 0 | 0 |
3 | 1 | 1 | 1 | 0 | 0 | ... | 0 | 0 | 0 | 0 | 0 |
... | |||||||||||
27 | 1 | 0 | 0 | 0 | 0 | ... | 1 | 0 | 0 | 1 | 0 |
28 | 3 | 1 | 4 | 1 | 0 | ... | 2 | 0 | 0 | 1 | 1 |
29 | 0 | 0 | 0 | 1 | 1 | ... | 0 | 0 | 1 | 0 | 1 |
First, let's visualize what kind of data it is.
#Rating visualization
%matplotlib inline
import matplotlib.pyplot as plt
import matplotlib.cm as cm
n_y, n_x = pv_rating.shape
X, Y = np.meshgrid(range(n_x+1), range(n_y+1))
Z = pv_rating.as_matrix()
Z.astype(np.float32)
Z = Z[::-1,:]
print Z.shape
fig = plt.figure(figsize=(19,5))
ax = plt.subplot(111)
plt.ylim(0,30)
plt.xlim(0,99)
ax.set_xticks([])
ax.set_yticks([])
cax = ax.pcolor(X, Y, Z, cmap=cm.get_cmap('ocean_r'), alpha=0.6)
cbar = fig.colorbar(cax, ticks=range(6))
#cbar.ax.set_yticklabels(['5', '4', '3', '2', '1', '0'])# vertically oriented colorbar
plt.show()
The vertical axis is the user and the horizontal axis is the movie. The white areas are where there is no rating. Where there is a color, there is rating information, and the numbers are color-coded as shown by the bar on the right.
The main subject is from here. Recommendations are made using the method called ALS (Alternating Least Squares) in MLlib that comes with Spark. This is a technique called collaborative filtering, which makes inferences using information from one user and another user who has similar tastes (here, movie rating). One of the features is that the content of the movie is ignored in a sense and inferred from the user's behavior.
We will start learning from the data.
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
#Preparation of training data
sc_rating = sc.parallelize(df.as_matrix())
ratings = sc_rating.map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))
# ALS(Alternating Least Squares)Produce recommendations with
rank = 10
numIterations = 10
model = ALS.train(ratings, rank, numIterations)
#Give a rating to the part where there was no data from the learned data
n_y, n_x = pv_rating.shape
X, Y = np.meshgrid(range(n_x+1), range(n_y+1))
f_XY = np.c_[Y.flatten(), X.flatten()]
predictions_all = model.predictAll(sc.parallelize(f_XY)).map(lambda r: ((r[0], r[1]), limitter(r[2]) ))
def selector(x, y):
if x is None:
return y
elif x != 0:
return x
else:
return y
#Keep where there is already a rating and substitute the value calculated by ALS where there is no rating
ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).rightOuterJoin(predictions_all).map(lambda r: ((r[0][0], r[0][1]), selector(r[1][0], r[1][1])) )
result = np.array(ratesAndPreds.sortBy(lambda x: (x[0], x[1])).collect())
Z = result[:,1]
Z = Z.astype(np.float32).reshape(pv_rating.shape)[::-1,:]
Visualize the inferred results. In the previous graph, there were many white parts (parts without rating), but they have almost disappeared! The place where the numerical value is entered in this blank place is the recommendation information. You can set a certain threshold value and say "Recommend if it is higher than that": smile:
fig = plt.figure(figsize=(19,5))
ax = plt.subplot(111)
plt.ylim(0,29)
plt.xlim(0,99)
ax.set_xticks([])
ax.set_yticks([])
cax = ax.pcolor(X, Y, Z, cmap=cm.get_cmap('ocean_r'), alpha=0.6)
cbar = fig.colorbar(cax, ticks=range(6))
plt.show()
Finally, let's look at the accuracy of this learning. The average square error is used to measure how far the predicted value is from the place where the rating is originally.
#Accuracy calculation
testdata = ratings.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), limitter(r[2]) ))
ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error = " + str(MSE))
out
#Average squared error
Mean Squared Error = 0.0558736464582
In addition, the results are as follows when viewed numerically. Where there is a rating, the number is close, and where there is None, the predicted rating is included.
out
((uid, iid), (rating, predict))
[((29, 17), (3.0, 2.9547048179008057)),
((23, 51), (None, 1.421916504776083)),
((11, 7), (None, 1.9669319580489901)),
((28, 10), (None, 0.06769150007295854)),
((9, 93), (None, 2.349846935916598)),
((23, 91), (None, 2.597452490149535)),
((17, 13), (2.0, 2.0700773308441507)),
((16, 38), (1.0, 0.8512992797830536)),
((22, 12), (None, 3.331810711043588)),
((12, 50), (4.0, 4.095528922729588)),
((11, 15), (None, 1.1874705514088135)),
((22, 52), (None, 3.4707062021048283)),
((0, 14), (None, 0.503229802782621)),
((8, 94), (None, 1.0007500227764983)),
((29, 89), (None, 0.4272431835442813)),
((5, 1), (1.0, 1.2148556310982808)),
((4, 42), (None, 1.030942641195369)),
((25, 13), (None, 1.5033919417064503)),
((3, 55), (None, 2.50649511105159))]
Spark 1.5.0 Machine Learning Library (MLlib) Guide http://spark.apache.org/docs/latest/mllib-guide.html
MLlib - Collaborative Filtering http://spark.apache.org/docs/latest/mllib-collaborative-filtering.html
Movie Recommendation with MLlib https://databricks-training.s3.amazonaws.com/movie-recommendation-with-mllib.html
Code for this article (GitHub) https://github.com/matsuken92/Qiita_Contents/blob/master/MLlib_recommendation/Spark_MLlib-recommendation.ipynb
Recommended Posts