Bonjour à tous. @best_not_best. Cette fois, je présenterai la technologie qui convient au travail dont je suis en charge.
Le co-filtrage est utilisé pour calculer le score d'un utilisateur pour acheter un produit. Étant donné que la quantité de calcul est importante et que le traitement des données à grande échelle prend du temps, PySpark est utilisé pour le traitement distribué.
Ajoutez ce qui suit à spark-env.sh pour pouvoir appeler Python 3.x du côté Spark.
export PYSPARK_PYTHON=python3
PYSPARK_DRIVER_PYTHON=python3
Acquérir des données d'entraînement à partir d'une base de données, etc., et créer un fichier CSV avec 3 colonnes «utilisateur», «élément» et «note». Par exemple, lors de l'utilisation de "données d'achat de produit pour le mois dernier", les données seront "ID utilisateur", "ID produit" et "si le produit a été acheté ou non (non acheté: 0 / acheté: 1)". Voici un exemple de fichier CSV.
user | item | rating |
---|---|---|
1xxxxxxxx2 | 3xxxxxxxx5 | 1 |
1xxxxxxxx9 | 3xxxxxxxx5 | 1 |
1xxxxxxxx8 | 3xxxxxxxx3 | 0 |
En tant que nom de «note», «combien l'utilisateur a donné une note au produit» est l'utilisation d'origine, mais comme mentionné ci-dessus, «si le produit a été acheté» ou «a accédé à la page». Il est également possible de mettre en œuvre avec des données telles que "si oui ou non". Dans le premier cas, il s'agit d'un modèle qui prédit "combien l'utilisateur achète le produit", et dans le second cas, "combien l'utilisateur visite la page".
Étant donné que l'ID utilisateur et l'ID de produit ne peuvent gérer que la valeur maximale de ʻint32 (2 147 483 647), s'il y a un ID qui dépasse cela, l'ID sera de nouveau renuméroté. De plus, comme il ne peut gérer que des valeurs entières, il sera renuméroté de la même manière s'il contient une chaîne de caractères. Si l'ID est une valeur entière et ne dépasse pas la valeur maximale de ʻint32
, ignorez cette étape.
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""processing training data."""
from datetime import datetime
from pyspark.sql.dataframe import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
class ProcessTrainingData(object):
"""get training data from Redshift, and add sequence number to data."""
def __get_action_log(
self,
sqlContext: SQLContext,
unprocessed_data_file_path: str
) -> DataFrame:
"""get data."""
df = sqlContext\
.read\
.format('csv')\
.options(header='true')\
.load(unprocessed_data_file_path)
return df
def run(
self,
unprocessed_data_file_path: str,
training_data_dir_path: str
) -> bool:
"""execute."""
# make spark context
spark = SparkSession\
.builder\
.appName('process_training_data')\
.config('spark.sql.crossJoin.enabled', 'true')\
.config('spark.debug.maxToStringFields', 500)\
.getOrCreate()
sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)
# get data
df = self.__get_action_log(sqlContext, unprocessed_data_file_path)
# make sequence number of users
unique_users_rdd = df.rdd.map(lambda l: l[0]).distinct().zipWithIndex()
unique_users_df = sqlContext.createDataFrame(
unique_users_rdd,
('user', 'unique_user_id')
)
# make sequence number of items
unique_items_rdd = df.rdd.map(lambda l: l[1]).distinct().zipWithIndex()
unique_items_df = sqlContext.createDataFrame(
unique_items_rdd,
('item', 'unique_item_id')
)
# add sequence number of users, sequence number of items to data
df = df.join(
unique_users_df,
df['user'] == unique_users_df['user'],
'inner'
).drop(unique_users_df['user'])
df = df.join(
unique_items_df,
df['item'] == unique_items_df['item'],
'inner'
).drop(unique_items_df['item'])
# save
saved_data_file_path = training_data_dir_path + 'cf_training_data.csv'
df.write\
.format('csv')\
.mode('overwrite')\
.options(header='true')\
.save(saved_data_file_path)
return True
Lisez le CSV des données d'entraînement, ajoutez les colonnes d'ID utilisateur et d'ID produit renumérotées avec zipWithIndex ()
, et enregistrez-le dans un fichier séparé.
Exécutez comme suit.
ptd = ProcessTrainingData()
ptd.run(unprocessed_data_file_path, training_data_dir_path)
Les paramètres sont les suivants.
Comme mentionné ci-dessus, définissez ʻunprocessed_data_file_path` sur un fichier CSV avec les colonnes suivantes.
user | item | rating |
---|---|---|
1xxxxxxxx2 | 3xxxxxxxx5 | 1 |
1xxxxxxxx9 | 3xxxxxxxx5 | 1 |
1xxxxxxxx8 | 3xxxxxxxx3 | 0 |
Une fois exécuté, un fichier CSV avec les colonnes suivantes sera généré dans training_data_dir_path
.
user | item | rating | unique_user_id | unique_item_id |
---|---|---|---|---|
1xxxxxxxx7 | 3xxxxxxxx3 | 1 | 57704 | 32419 |
1xxxxxxxx8 | 3xxxxxxxx3 | 0 | 115460 | 32419 |
1xxxxxxxx6 | 3xxxxxxxx3 | 1 | 48853 | 32419 |
Créez et enregistrez un modèle pour le co-filtrage.
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""create collaborative filtering model."""
from datetime import datetime
from pyspark.ml.recommendation import ALS
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import FloatType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StringType
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType
class CreateCfModel(object):
"""create collaborative filtering model."""
def run(
self,
processed_training_data_file_path: str,
model_dir_path: str,
rank: int,
max_iter: int,
implicit_prefs: str,
alpha: float,
num_user_blocks: int,
num_item_blocks: int
) -> bool:
"""execute."""
# make spark context
spark = SparkSession\
.builder\
.appName('create_cf_model')\
.config('spark.sql.crossJoin.enabled', 'true')\
.config('spark.debug.maxToStringFields', 500)\
.getOrCreate()
sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)
# create model
als = ALS(
rank=int(rank),
maxIter=int(max_iter),
implicitPrefs=bool(implicit_prefs),
alpha=float(alpha),
numUserBlocks=int(num_user_blocks),
numItemBlocks=int(num_item_blocks),
userCol='unique_user_id',
itemCol='unique_item_id'
)
# load training data
custom_schema = StructType([
StructField('user', StringType(), True),
StructField('item', StringType(), True),
StructField('rating', FloatType(), True),
StructField('unique_user_id', IntegerType(), True),
StructField('unique_item_id', IntegerType(), True),
])
df = sqlContext\
.read\
.format('csv')\
.options(header='true')\
.load(processed_training_data_file_path, schema=custom_schema)
# fitting
model = als.fit(df)
# save
saved_data_dir_path = model_dir_path + 'als_model'
model.write().overwrite().save(saved_data_dir_path)
return True
Si vous n'avez pas eu besoin de renuméroter dans la section précédente, faites correspondre les noms «# create model» et «# load training data» aux noms des colonnes du fichier CSV et modifiez comme suit.
# create model
als = ALS(
rank=int(rank),
maxIter=int(max_iter),
implicitPrefs=bool(implicit_prefs),
alpha=float(alpha),
numUserBlocks=int(num_user_blocks),
numItemBlocks=int(num_item_blocks),
userCol='user',
itemCol='item'
)
# load training data
custom_schema = StructType([
StructField('user', IntegerType(), True),
StructField('item', IntegerType(), True),
StructField('rating', FloatType(), True),
])
Exécutez comme suit.
ccm = CreateCfModel()
ccm.run(
processed_training_data_file_path,
model_dir_path,
rank,
max_iter,
implicit_prefs,
alpha,
num_user_blocks,
num_item_blocks
)
Les paramètres sont les suivants. rank
, max_iter
, ʻimplicit_prefs, ʻalpha
, num_user_blocks
, num_item_blocks
sont des [paramètres PySpark ALS](http://spark.apache.org/docs/latest/api/python/ Ce sera pyspark.ml.html # pyspark.ml.recommendation.ALS).
Lorsqu'il est numéroté dans la section précédente, définissez le fichier CSV avec les colonnes suivantes dans processing_training_data_file_path
.
user | item | rating | unique_user_id | unique_item_id |
---|---|---|---|---|
1xxxxxxxx7 | 3xxxxxxxx3 | 1 | 57704 | 32419 |
1xxxxxxxx8 | 3xxxxxxxx3 | 0 | 115460 | 32419 |
1xxxxxxxx6 | 3xxxxxxxx3 | 1 | 48853 | 32419 |
S'il n'est pas numéroté, définissez un fichier CSV avec les colonnes suivantes.
user | item | rating |
---|---|---|
1xxxxxxxx2 | 3xxxxxxxx5 | 1 |
1xxxxxxxx9 | 3xxxxxxxx5 | 1 |
1xxxxxxxx8 | 3xxxxxxxx3 | 0 |
Chargez le modèle enregistré et prédisez le score à partir de la combinaison de l'ID utilisateur et de l'ID produit. Préparez les données de liste pour chaque ID utilisateur et ID produit séparément des données d'entraînement. En ce qui concerne le résultat de la prédiction, «tous les résultats» et «les résultats des N meilleurs scores pour chaque utilisateur» sont enregistrés sous forme de fichier CSV. Cette fois, l'ID utilisateur et l'ID produit qui n'existent pas dans les données d'entraînement d'origine (= le journal des actions n'existe pas) ne sont pas prédits.
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""predict score from collaborative filtering model."""
from datetime import datetime
from pyspark.ml.recommendation import ALSModel
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import FloatType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StringType
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType
class CreatePredictedScore(object):
"""predict score from collaborative filtering model."""
def run(
self,
model_file_path: str,
predict_data_dir_path: str,
user_data_file_path: str,
item_data_file_path: str,
processed_training_data_file_path: str,
data_limit: int=-1
) -> bool:
"""execute."""
# make spark context
spark = SparkSession\
.builder\
.appName('create_predicted_score')\
.config('spark.sql.crossJoin.enabled', 'true')\
.config('spark.debug.maxToStringFields', 500)\
.getOrCreate()
sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)
# load user data
users_df = sqlContext\
.read\
.format('csv')\
.options(header='false')\
.load(user_data_file_path)
users_id_rdd = users_df.rdd.map(lambda l: Row(user_id=l[0]))
users_id_df = sqlContext.createDataFrame(users_id_rdd)
# load item data
items_df = sqlContext\
.read\
.format('csv')\
.options(header='false')\
.load(item_data_file_path)
items_id_rdd = items_df.rdd.map(lambda l: Row(item_id=l[0]))
items_id_df = sqlContext.createDataFrame(items_id_rdd)
# cross join user_id and item_id
joined_df = users_id_df.join(items_id_df)
joined_df.cache()
# delete unnecessary variables
del(users_df)
del(users_id_rdd)
del(users_id_df)
del(items_df)
del(items_id_rdd)
del(items_id_df)
# load training data
custom_schema = StructType([
StructField('user', StringType(), True),
StructField('item', StringType(), True),
StructField('rating', FloatType(), True),
StructField('unique_user_id', IntegerType(), True),
StructField('unique_item_id', IntegerType(), True),
])
training_df = sqlContext\
.read\
.format('csv')\
.options(header='true')\
.load(processed_training_data_file_path, schema=custom_schema)
# users
unique_users_rdd = training_df.rdd.map(lambda l: [l[0], l[3]])
unique_users_df = sqlContext.createDataFrame(
unique_users_rdd,
('user', 'unique_user_id')
).dropDuplicates()
unique_users_df.cache()
# items
unique_items_rdd = training_df.rdd.map(lambda l: [l[1], l[4]])
unique_items_df = sqlContext.createDataFrame(
unique_items_rdd,
('item', 'unique_item_id')
).dropDuplicates()
unique_items_df.cache()
# delete unnecessary variables
del(training_df)
del(unique_users_rdd)
del(unique_items_rdd)
# add unique user id
joined_df = joined_df.join(
unique_users_df,
joined_df['user_id'] == unique_users_df['user'],
'inner'
).drop(unique_users_df['user'])
# add unique item id
joined_df = joined_df.join(
unique_items_df,
joined_df['item_id'] == unique_items_df['item'],
'inner'
).drop(unique_items_df['item'])
# load model
model = ALSModel.load(model_file_path)
# predict score
predictions = model.transform(joined_df)
all_predict_data = predictions\
.select('user_id', 'item_id', 'prediction')\
.filter('prediction > 0')
# save
# all score
saved_data_file_path = predict_data_dir_path + 'als_predict_data_all.csv'
all_predict_data.write\
.format('csv')\
.mode('overwrite')\
.options(header='true')\
.save(saved_data_file_path)
# limited score
data_limit = int(data_limit)
if data_limit > 0:
all_predict_data.registerTempTable('predictions')
sql = 'SELECT user_id, item_id, prediction ' \
+ 'FROM ( ' \
+ ' SELECT user_id, item_id, prediction, dense_rank() ' \
+ ' OVER (PARTITION BY user_id ORDER BY prediction DESC) AS rank ' \
+ ' FROM predictions ' \
+ ') tmp WHERE rank <= %d' % (data_limit)
limited_predict_data = sqlContext.sql(sql)
saved_data_file_path = predict_data_dir_path + 'als_predict_data_limit.csv'
limited_predict_data.write\
.format('csv')\
.mode('overwrite')\
.options(header='true')\
.save(saved_data_file_path)
return True
La combinaison de cibles de prédiction est créée par la procédure suivante.
Si vous n'avez pas besoin de renuméroter dans la première section, modifiez la méthode run
comme suit:
def run(
self,
model_file_path: str,
predict_data_dir_path: str,
processed_training_data_file_path: str,
data_limit: int=-1
) -> bool:
"""execute."""
# make spark context
spark = SparkSession\
.builder\
.appName('create_predicted_score')\
.config('spark.sql.crossJoin.enabled', 'true')\
.config('spark.debug.maxToStringFields', 500)\
.getOrCreate()
sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)
# load training data
custom_schema = StructType([
StructField('user', IntegerType(), True),
StructField('item', IntegerType(), True),
StructField('rating', FloatType(), True),
])
training_df = sqlContext\
.read\
.format('csv')\
.options(header='true')\
.load(processed_training_data_file_path, schema=custom_schema)
# load user data
users_id_rdd = training_df.rdd.map(lambda l: Row(user_id=l[0]))
users_id_df = sqlContext.createDataFrame(users_id_rdd)
# load item data
items_id_rdd = training_df.rdd.map(lambda l: Row(item_id=l[1]))
items_id_df = sqlContext.createDataFrame(items_id_rdd)
# cross join user_id and item_id
joined_df = users_id_df.join(items_id_df)
joined_df.cache()
# delete unnecessary variables
del(training_df)
del(users_id_rdd)
del(users_id_df)
del(items_id_rdd)
del(items_id_df)
# load model
model = ALSModel.load(model_file_path)
(comme ci-dessous)
Exécutez comme suit.
cps = CreatePredictedScore()
cps.run(
model_file_path,
predict_data_dir_path,
user_data_file_path,
item_data_file_path,
processed_training_data_file_path,
data_limit
)
Les paramètres sont les suivants. Dans data_limit
, N des N meilleurs scores est spécifié. Si 0 ou moins est spécifié, les N premières données ne seront pas créées.
ʻUser_data_file_path est défini sur le fichier CSV avec l'ID utilisateur dans la première colonne. J'utilise un fichier sans en-têtes. Pour ʻitem_data_file_path
, définissez le fichier CSV avec l'ID de produit dans la première colonne. J'utilise également un fichier sans en-têtes.
Définissez un fichier CSV avec les colonnes suivantes dans processing_training_data_file_path
.
user | item | rating | unique_user_id | unique_item_id |
---|---|---|---|---|
1xxxxxxxx7 | 3xxxxxxxx3 | 1 | 57704 | 32419 |
1xxxxxxxx8 | 3xxxxxxxx3 | 0 | 115460 | 32419 |
1xxxxxxxx6 | 3xxxxxxxx3 | 1 | 48853 | 32419 |
Un fichier CSV avec les colonnes suivantes est généré dans predict_data_dir_path
.
user_id | user_id | prediction |
---|---|---|
1xxxxxxxx3 | 3xxxxxxxx4 | 0.15594198 |
1xxxxxxxx3 | 3xxxxxxxx0 | 0.19135818 |
1xxxxxxxx3 | 3xxxxxxxx8 | 0.048197098 |
«prediction» est la valeur prédite.
Implémentation du co-filtrage à l'aide d'ALS dans PySpark. Je suis désolé que ce soit difficile à lire parce que je n'ai pas divisé la méthode. La prochaine fois, j'expliquerai comment évaluer le modèle.
Recommended Posts