I implemented Matrix Factorization, which is the basis of the recommended model, in python.
It is a system that predicts the preference for items and presents the most suitable items to specific users. Examples of implementation as a service include Amazon's "Recommended products for you" and Youtube's "Recommended videos".
An algorithm often used in recommendation.
Based on the evaluation value information for the user's item, the evaluation value of the item that has not been evaluated is predicted, and the item that is likely to have a high evaluation value is recommended.
First, there is a matrix (A) consisting of m users and n items, and each value in it is the evaluation value for an item of a certain user. This matrix (A) is decomposed into a user matrix (U) consisting of users and latent variables (P) and an item matrix (I) consisting of items and latent variables (Q).
The value obtained by taking the inner product of the user matrix and the item matrix is used as the predicted value. The parameter is updated many times until the error between the predicted value and the correct answer value falls below the threshold value or the number of repetitions exceeds a certain number.
We adopted https://grouplens.org/datasets/movielens/100k/ as the data set.
import numpy as np
import random
import pandas as pd
import math
class MFx(object):
#Set each variable
def __init__(self,K=20,alpha=1e-6,beta=0.0):
self.K=K
self.alpha=alpha
self.beta=beta
def fit(self,X,n_user,n_item,n_iter=100):
self.R=X.copy()
self.samples=X.copy()
#Set initial value for latent factor variable
self.user_factors = np.random.rand(n_user,self.K)
self.item_factors = np.random.rand(n_item,self.K)
#stochastic gradient descent
self.loss=[]
for i in range(n_iter):
self.sgd()
mse=self.mse()
self.loss.append((i,mse))
def sgd(self):
np.random.shuffle(self.samples)
for user,item,rating in self.samples:
err=rating-self.predict_pair(user,item)
#update parameter
self.user_factors[user] += self.alpha*(err*self.item_factors[item]-self.beta*self.user_factors[user])
self.item_factors[item] += self.alpha*(err*self.user_factors[user]-self.beta*self.item_factors[item])
def mse(self):
predicted = self.predict(self.R)
error=np.hstack((self.R,np.array(predicted).reshape(-1,1)))
error=np.sqrt(pow((error[:,2]-error[:,3]),2).mean())
return error
#Evaluation value prediction for a certain item by a certain user (inner is the inner product of vectors)
def predict_pair(self,user,item):
return np.inner(self.user_factors[user],self.item_factors[item])
def predict(self,X):
rate=[]
for row in X:
rate.append(self.predict_pair(row[0],row[1]))
return rate
def get_full_matrix(self):
return np.inner(self.user_factors,self.item_factors)
#Data read
def load_ml100k():
samples=pd.read_csv('data/ml-100k/u.data',sep='\t',header=None)
samples=samples.iloc[:,:3]
samples.columns=['user','item','rate']
samples['user']=samples['user']-1
samples['item']=samples['item']-1
return samples
#Make the dataset an array type
df = np.array(load_ml100k())
#Delete the same value and count the number of users and items
n_user=np.unique(df[:,0]).max()+1
n_item=np.unique(df[:,1]).max()+1
n_rate=np.unique(df[:,2]).max()
#Random order
random.shuffle(df)
#80% of the total size is for training and the rest is for testing
train_size=int(df.shape[0]*0.8)
train_df=df[:train_size]
test_df=df[train_size:]
#MF
MF=MFx(K=20,alpha=0.01,beta=0.5)
MF.fit(train_df,n_user,n_item,n_iter=10)
The following two evaluations were performed on the recommendation accuracy. --Evaluation of predicted value --Ranking evaluation
Find the squared error between the predicted value and the correct answer value, and calculate the average. The lower the value, the more accurate the recommendation.
#Predict the evaluation value of items in test data
pre=MF.predict(test_df)
#f=np.array(pre)
#print(type(test_df))
#Create a matrix with the predicted value after the evaluation value of the test data
ret1=np.hstack((test_df, np.array(pre).reshape(-1, 1)))
#Calculate the mean square error between the predicted value and the actual evaluation value
print("pred")
print(np.sqrt(pow((ret1[:,2]-ret1[:,3]),2).mean()))
NDCG Create a ranking based on the correct answer value and a ranking based on the predicted value. Compare the two rankings and measure how close the predicted ranking is to the correct ranking. The range of NDCG values is 0 to 1, and the closer it is to 1, the better the ranking is generated.
def get_dcg(f):
dcg=0.0
s=1
if type(f[0])==float:
f=[f]
for i in f:
if s==1:
dcg+=i[0]
s+=1
else:
dcg+=i[0]/math.log2(s)
s+=1
return dcg
def get_ndcgs(pre):
user=[]
x=[]
for i in np.argsort(pre[:,0]):
user.append(pre[i][0])
x.append(pre[i])
users=list(set(user))
columns=['user','item','real','pred']
y=pd.DataFrame(data=pre, columns=columns, dtype='float')
y.set_index("user",inplace=True)
y_real=y.sort_values(['user','real'],ascending=[True, False])
y_pred=y.sort_values(['user','pred'],ascending=[True, False])
ndcg=0
fff=users
for ii in fff:
fr=y_real.loc[ii,"real":"pred"].values.tolist()
fp=y_pred.loc[ii,"real":"pred"].values.tolist()
ndcg+=get_dcg(fp)/get_dcg(fr)
return ndcg/len(fff)
print("NDCG")
print(get_ndcgs(ret1[:,:4]))
Recommended Posts