Since the chances of touching Spark are increasing, I tried to implement a recommendation system using MLlib as an inventory of knowledge. Spark's Movie Recommendation, which is used in various MLlib tutorials such as SparkSamit2014, but edX Introduction to Big Data with Apache Spar seemed to be good in terms of content, so I implemented it while using it as a subject. This course is implemented in Spark 1.3.1, but it is a little too old, so the functions that can be used in 1.6.1 are changed by using them.
① Data preparation Divide the original data into training, evaluation, and test data (2) Display movies with a high average rating from movies with a rating of 500 or more ③ Implementation of collaborative filtering ④ Add yourself to the training data as userID "0" and evaluate your favorite movie ⑤ Have the algorithm recommend a movie based on your own evaluation
First, read the data, divide it, and look at it. The data to be used includes the movie dataset consisting of UserID :: MovieID :: Rating :: Timestamp and the Rating dataset consisting of MovieID :: Title :: Genres.
numPartitions = 2
ratingFileName = "ratings.txt"
rawRatings = sc.textFile(ratingFileName, numPartitions)
moviesFileName = "movies.txt"
rawMovies = sc.textFile(moviesFileName, numPartitions)
def get_ratings_tuple(entry):
items = entry.split('::')
return int(items[0]), int(items[1]), float(items[2])
def get_movie_tuple(entry):
items = entry.split('::')
return int(items[0]), items[1]
ratingsRDD = rawRatings.map(get_ratings_tuple).cache()
moviesRDD = rawMovies.map(get_movie_tuple).cache()
ratingsCount = ratingsRDD.count()
moviesCount = moviesRDD.count()
print 'There are %s ratings and %s movies in the datasets' % (ratingsCount, moviesCount)
print 'Ratings: %s' % ratingsRDD.take(3)
print 'Movies: %s' % moviesRDD.take(3)
There are 1000209 ratings and 3883 movies in the datasets
Ratings: [(1, 1193, 5.0), (1, 661, 3.0), (1, 914, 3.0)]
Movies: [(1, u'Toy Story (1995)'), (2, u'Jumanji (1995)'), (3, u'Grumpier Old Men (1995)')]
We have over 500 reviews to find out what movies are popular, and select 20 of the most popular ones. Create an RDD that includes the MovieID, the number of ratings, and the average rating, and join it with the movieRDD to generate a tuple with (average rating, movie title, rating). After that, only the titles with 500 or more evaluations are selected and sorted by the average evaluation point is calculated. I am creating a function called sortFunction () to sort the rating points and movie titles in alphabetical order.
def getCountsAndAverages(IDandRatingsTuple):
aggr_result = (IDandRatingsTuple[0], (len(IDandRatingsTuple[1]), float(sum(IDandRatingsTuple[1])) / len(IDandRatingsTuple[1])))
return aggr_result
movieNameWithAvgRatingsRDD = (ratingsRDD
.map(lambda x:(x[1], x[2]))
.groupByKey()
.map(getCountsAndAverages)
.join(moviesRDD)
.map(lambda x:(x[1][0][1], x[1][1], x[0])))
print 'movieNameWithAvgRatingsRDD: %s\n' % movieNameWithAvgRatingsRDD.take(3)
def sortFunction(tuple):
key = unicode('%.3f' % tuple[0])
value = tuple[1]
return (key + ' ' + value)
movieLimitedAndSortedByRatingRDD = (movieNameWithAvgRatingsRDD
.filter(lambda x: (x[2] > 500))
.sortBy(sortFunction, False))
print 'Movies with highest ratings:'
print '(average rating, movie name, number of reviews)'
for ratingsTuple in movieLimitedAndSortedByRatingRDD.take(10):
print ratingsTuple
movieNameWithAvgRatingsRDD: [(3.49618320610687, u'Great Mouse Detective, The (1986)', 2048), (3.7871690427698574, u'Moonstruck (1987)', 3072), (2.7294117647058824, u'Waiting to Exhale (1995)', 4)]
Movies with highest ratings:
(average rating, movie name, number of reviews)
(5.0, u'Ulysses (Ulisse) (1954)', 3172)
(5.0, u'Song of Freedom (1936)', 3382)
(5.0, u'Smashing Time (1967)', 3233)
(5.0, u'Schlafes Bruder (Brother of Sleep) (1995)', 989)
(5.0, u'One Little Indian (1973)', 3607)
(5.0, u'Lured (1947)', 3656)
(5.0, u'Gate of Heavenly Peace, The (1995)', 787)
(5.0, u'Follow the Bitch (1998)', 1830)
(5.0, u'Bittersweet Motel (2000)', 3881)
(5.0, u'Baby, The (1973)', 3280)
Next, we will make recommendations using collaborative filtering. MLlib has a library that uses ALS, so I will use it as it is.
For the time being, divide the dataset into training, evaluation, and test data for model building.
trainingRDD, validationRDD, testRDD = ratingsRDD.randomSplit([6, 2, 2], seed=0)
print 'Training: %s, validation: %s, test: %s\n' % (trainingRDD.count(),
validationRDD.count(),
testRDD.count())
print trainingRDD.take(3)
print validationRDD.take(3)
print testRDD.take(3)
validationForPredictRDD = validationRDD.map(lambda x: (x[0], x[1]))
print validationForPredictRDD.take(3)
actualReformattedRDD = validationRDD.map(lambda x: ((x[0], x[1]), x[2]))
print actualReformattedRDD.take(3)
Training: 600364, validation: 199815, test: 200030
[(1, 661, 3.0), (1, 914, 3.0), (1, 1197, 3.0)]
[(1, 3408, 4.0), (1, 2355, 5.0), (1, 938, 4.0)]
[(1, 1193, 5.0), (1, 1287, 5.0), (1, 2804, 5.0)]
[(1, 3408), (1, 2355), (1, 938)]
[((1, 3408), 4.0), ((1, 2355), 5.0), ((1, 938), 4.0)]
It's model building, but grid search looks for better parameters. Also, for model evaluation, Square root of mean square error (RMSE) among the indicators provided in MLlib Use # pyspark.mllib.evaluation.RegressionMetrics).
from pyspark.mllib.recommendation import ALS
from pyspark.mllib.evaluation import RegressionMetrics
seed = 5L
iterations = [5,7,10]
regularizationParameter = 0.1
ranks = [4, 8, 12]
RMSEs = [0, 0, 0, 0, 0, 0, 0, 0, 0]
err = 0
tolerance = 0.03
minRMSE = float('inf')
bestRank = -1
bestIteration = -1
for rank in ranks:
for iteration in iterations:
model = ALS.train(trainingRDD,
rank,
seed=seed,
iteration,
lambda_=regularizationParameter)
predictedRatingsRDD = model.predictAll(validationForPredictRDD)
predictedReformattedRDD = predictedRatingsRDD.map(lambda x: ((x[0], x[1]), x[2]))
predictionAndObservations = (predictedReformattedRDD
.join(actualReformattedRDD)
.map(lambda x: x[1]))
metrics = RegressionMetrics(predictionAndObservations)
RMSE = metrics.rootMeanSquaredError
RMSEs[err] = RMSE
err += 1
print 'For rank %s and itereation %s, the RMSE is %s' % (rank, iteration, RMSE)
if RMSE < minRMSE:
minRMSE = RMSE
bestIteretioin = iteretaion
bestRank = rank
print 'The best model was trained with rank %s and iteratin %s' % (bestRank, bestIteretion)
For rank 4 and itereation 5, the RMSE is 0.903719946201
For rank 4 and itereation 7, the RMSE is 0.893408395534
For rank 4 and itereation 10, the RMSE is 0.886260195446
For rank 8 and itereation 5, the RMSE is 0.89365207233
For rank 8 and itereation 7, the RMSE is 0.883901283207
For rank 8 and itereation 10, the RMSE is 0.876701840863
For rank 12 and itereation 5, the RMSE is 0.887127524585
For rank 12 and itereation 7, the RMSE is 0.87863327159
For rank 12 and itereation 10, the RMSE is 0.872532683651
The best model was trained with rank 12 and iteratin 10
Check with test data. Since there is no problem with RMSE, it seems that overfitting will not occur.
bestModel = ALS.train(trainingRDD,
bestRank,
seed=seed,
iterations=bestIteretion,
lambda_=regularizationParameter)
testForPredictingRDD = testRDD.map(lambda x: (x[0], x[1]))
testReformattedRDD = testRDD.map(lambda x: ((x[0], x[1]), x[2]))
predictedTestRDD = bestModel.predictAll(testForPredictingRDD)
predictedTestReformattedRDD = predictedTestRDD.map(lambda x: ((x[0], x[1]), x[2]))
predictionAndObservationsTest = (predictedTestReformattedRDD
.join(testReformattedRDD)
.map(lambda x: x[1]))
metrics = RegressionMetrics(predictionAndObservationsTest)
testRMSE = metrics.rootMeanSquaredError
print 'The model had a RMSE on the test set of %s' % testRMSE
The model had a RMSE on the test set of 0.87447554868
Finally, evaluate your favorite movie and add it to the data as userID "0". Ask them to recommend a movie based on that rating. Take your favorite movie from movieRDD, evaluate it, add it, and use it to train your model. After that, get the prediction for userID "0" and display the one with the highest evaluation score first.
myUserID = 0
myRatedMovies = [(myUserID, 1, 5), #Toy Story
(myUserID, 648, 3), # Mission Impossible
(myUserID, 1580, 4), # Men In Black
(myUserID, 1097, 3), # ET
(myUserID, 3247, 5)] #Sister Act
myRatingsRDD = sc.parallelize(myRatedMovies)
trainingWithMyRatingsRDD = trainingRDD.union(myRatingsRDD)
myRatingsModel = ALS.train(trainingWithMyRatingsRDD,
bestRank,
seed=seed,
iterations=bestIteretion,
lambda_=regularizationParameter)
myUnratedMoviesRDD = (moviesRDD
.filter(lambda x: x[0] not in [x[1] for x in myRatedMovies])
.map(lambda x: (myUserID, x[0])))
predictedRatingsRDD = myRatingsModel.predictAll(myUnratedMoviesRDD)
predictedRDD = predictedRatingsRDD.map(lambda x: (x[1], x[2]))
movieCountsRDD = (ratingsRDD
.map(lambda x:(x[1], x[2]))
.groupByKey()
.map(getCountsAndAverages)
.map(lambda x: (x[0], x[1][0])))
#Marge PredictedRDD and CountsRDD
predictedWithCountsRDD = (predictedRDD
.join(movieCountsRDD))
ratingsWithNamesRDD = (predictedWithCountsRDD
.filter(lambda x: x[1][1] > 75)
.join(moviesRDD)
.map(lambda x: (x[1][0][0], x[1][1], x[1][0][1])))
predictedHighestRatedMovies = ratingsWithNamesRDD.takeOrdered(10, key=lambda x: -x[0])
print ('My highest rated movies as predicted (for movies with more than 75 reviews):\n%s' %
'\n'.join(map(str, predictedHighestRatedMovies)))
My highest rated movies as predicted (for movies with more than 75 reviews):
(4.74482593848827, u'Sound of Music, The (1965)', 882)
(4.580669496447569, u'Mary Poppins (1964)', 1011)
(4.486424714752521, u'Beauty and the Beast (1991)', 1060)
(4.478042748281928, u'Mulan (1998)', 490)
(4.477453571213953, u'Toy Story 2 (1999)', 1585)
(4.439390718632932, u'Fantasia 2000 (1999)', 453)
(4.405894101045507, u'FairyTale: A True Story (1997)', 87)
(4.4030583744108425, u"Singin' in the Rain (1952)", 751)
(4.390333274084924, u'Top Hat (1935)', 251)
(4.347757079374581, u'Gone with the Wind (1939)', 1156)
I'm evaluating solid movies, so solid movies are recommended lol
Recommended Posts