Co-filtrage avec PySpark

Bonjour à tous. @best_not_best. Cette fois, je présenterai la technologie qui convient au travail dont je suis en charge.

Aperçu

Le co-filtrage est utilisé pour calculer le score d'un utilisateur pour acheter un produit. Étant donné que la quantité de calcul est importante et que le traitement des données à grande échelle prend du temps, PySpark est utilisé pour le traitement distribué.

environnement

Ajoutez ce qui suit à spark-env.sh pour pouvoir appeler Python 3.x du côté Spark.

export PYSPARK_PYTHON=python3
PYSPARK_DRIVER_PYTHON=python3

procédure

  1. Obtenez des données d'entraînement
  2. Traitez les données d'entraînement (si nécessaire)
  3. Créez un modèle
  4. Prédisez le score

Obtenez des données d'entraînement

Acquérir des données d'entraînement à partir d'une base de données, etc., et créer un fichier CSV avec 3 colonnes «utilisateur», «élément» et «note». Par exemple, lors de l'utilisation de "données d'achat de produit pour le mois dernier", les données seront "ID utilisateur", "ID produit" et "si le produit a été acheté ou non (non acheté: 0 / acheté: 1)". Voici un exemple de fichier CSV.

user item rating
1xxxxxxxx2 3xxxxxxxx5 1
1xxxxxxxx9 3xxxxxxxx5 1
1xxxxxxxx8 3xxxxxxxx3 0

En tant que nom de «note», «combien l'utilisateur a donné une note au produit» est l'utilisation d'origine, mais comme mentionné ci-dessus, «si le produit a été acheté» ou «a accédé à la page». Il est également possible de mettre en œuvre avec des données telles que "si oui ou non". Dans le premier cas, il s'agit d'un modèle qui prédit "combien l'utilisateur achète le produit", et dans le second cas, "combien l'utilisateur visite la page".

Traiter les données de formation

Étant donné que l'ID utilisateur et l'ID de produit ne peuvent gérer que la valeur maximale de ʻint32 (2 147 483 647), s'il y a un ID qui dépasse cela, l'ID sera de nouveau renuméroté. De plus, comme il ne peut gérer que des valeurs entières, il sera renuméroté de la même manière s'il contient une chaîne de caractères. Si l'ID est une valeur entière et ne dépasse pas la valeur maximale de ʻint32, ignorez cette étape.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

"""processing training data."""

from datetime import datetime
from pyspark.sql.dataframe import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

class ProcessTrainingData(object):
    """get training data from Redshift, and add sequence number to data."""

    def __get_action_log(
        self,
        sqlContext: SQLContext,
        unprocessed_data_file_path: str
    ) -> DataFrame:
        """get data."""
        df = sqlContext\
            .read\
            .format('csv')\
            .options(header='true')\
            .load(unprocessed_data_file_path)

        return df

    def run(
        self,
        unprocessed_data_file_path: str,
        training_data_dir_path: str
    ) -> bool:
        """execute."""
        # make spark context
        spark = SparkSession\
            .builder\
            .appName('process_training_data')\
            .config('spark.sql.crossJoin.enabled', 'true')\
            .config('spark.debug.maxToStringFields', 500)\
            .getOrCreate()
        sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)

        # get data
        df = self.__get_action_log(sqlContext, unprocessed_data_file_path)

        # make sequence number of users
        unique_users_rdd = df.rdd.map(lambda l: l[0]).distinct().zipWithIndex()
        unique_users_df = sqlContext.createDataFrame(
            unique_users_rdd,
            ('user', 'unique_user_id')
        )

        # make sequence number of items
        unique_items_rdd = df.rdd.map(lambda l: l[1]).distinct().zipWithIndex()
        unique_items_df = sqlContext.createDataFrame(
            unique_items_rdd,
            ('item', 'unique_item_id')
        )

        # add sequence number of users, sequence number of items to data
        df = df.join(
            unique_users_df,
            df['user'] == unique_users_df['user'],
            'inner'
        ).drop(unique_users_df['user'])
        df = df.join(
            unique_items_df,
            df['item'] == unique_items_df['item'],
            'inner'
        ).drop(unique_items_df['item'])

        # save
        saved_data_file_path = training_data_dir_path + 'cf_training_data.csv'
        df.write\
            .format('csv')\
            .mode('overwrite')\
            .options(header='true')\
            .save(saved_data_file_path)

        return True

Lisez le CSV des données d'entraînement, ajoutez les colonnes d'ID utilisateur et d'ID produit renumérotées avec zipWithIndex (), et enregistrez-le dans un fichier séparé. Exécutez comme suit.

ptd = ProcessTrainingData()
ptd.run(unprocessed_data_file_path, training_data_dir_path)

Les paramètres sont les suivants.

Comme mentionné ci-dessus, définissez ʻunprocessed_data_file_path` sur un fichier CSV avec les colonnes suivantes.

user item rating
1xxxxxxxx2 3xxxxxxxx5 1
1xxxxxxxx9 3xxxxxxxx5 1
1xxxxxxxx8 3xxxxxxxx3 0

Une fois exécuté, un fichier CSV avec les colonnes suivantes sera généré dans training_data_dir_path.

user item rating unique_user_id unique_item_id
1xxxxxxxx7 3xxxxxxxx3 1 57704 32419
1xxxxxxxx8 3xxxxxxxx3 0 115460 32419
1xxxxxxxx6 3xxxxxxxx3 1 48853 32419

Créer un modèle

Créez et enregistrez un modèle pour le co-filtrage.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

"""create collaborative filtering model."""

from datetime import datetime
from pyspark.ml.recommendation import ALS
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import FloatType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StringType
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType

class CreateCfModel(object):
    """create collaborative filtering model."""

    def run(
        self,
        processed_training_data_file_path: str,
        model_dir_path: str,
        rank: int,
        max_iter: int,
        implicit_prefs: str,
        alpha: float,
        num_user_blocks: int,
        num_item_blocks: int
    ) -> bool:
        """execute."""
        # make spark context
        spark = SparkSession\
            .builder\
            .appName('create_cf_model')\
            .config('spark.sql.crossJoin.enabled', 'true')\
            .config('spark.debug.maxToStringFields', 500)\
            .getOrCreate()
        sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)

        # create model
        als = ALS(
            rank=int(rank),
            maxIter=int(max_iter),
            implicitPrefs=bool(implicit_prefs),
            alpha=float(alpha),
            numUserBlocks=int(num_user_blocks),
            numItemBlocks=int(num_item_blocks),
            userCol='unique_user_id',
            itemCol='unique_item_id'
        )

        # load training data
        custom_schema = StructType([
            StructField('user', StringType(), True),
            StructField('item', StringType(), True),
            StructField('rating', FloatType(), True),
            StructField('unique_user_id', IntegerType(), True),
            StructField('unique_item_id', IntegerType(), True),
        ])
        df = sqlContext\
            .read\
            .format('csv')\
            .options(header='true')\
            .load(processed_training_data_file_path, schema=custom_schema)

        # fitting
        model = als.fit(df)

        # save
        saved_data_dir_path = model_dir_path + 'als_model'
        model.write().overwrite().save(saved_data_dir_path)

        return True

Si vous n'avez pas eu besoin de renuméroter dans la section précédente, faites correspondre les noms «# create model» et «# load training data» aux noms des colonnes du fichier CSV et modifiez comme suit.

# create model
als = ALS(
    rank=int(rank),
    maxIter=int(max_iter),
    implicitPrefs=bool(implicit_prefs),
    alpha=float(alpha),
    numUserBlocks=int(num_user_blocks),
    numItemBlocks=int(num_item_blocks),
    userCol='user',
    itemCol='item'
)

# load training data
custom_schema = StructType([
    StructField('user', IntegerType(), True),
    StructField('item', IntegerType(), True),
    StructField('rating', FloatType(), True),
])

Exécutez comme suit.

ccm = CreateCfModel()
ccm.run(
    processed_training_data_file_path,
    model_dir_path,
    rank,
    max_iter,
    implicit_prefs,
    alpha,
    num_user_blocks,
    num_item_blocks
)

Les paramètres sont les suivants. rank, max_iter, ʻimplicit_prefs, ʻalpha, num_user_blocks, num_item_blocks sont des [paramètres PySpark ALS](http://spark.apache.org/docs/latest/api/python/ Ce sera pyspark.ml.html # pyspark.ml.recommendation.ALS).

Lorsqu'il est numéroté dans la section précédente, définissez le fichier CSV avec les colonnes suivantes dans processing_training_data_file_path.

user item rating unique_user_id unique_item_id
1xxxxxxxx7 3xxxxxxxx3 1 57704 32419
1xxxxxxxx8 3xxxxxxxx3 0 115460 32419
1xxxxxxxx6 3xxxxxxxx3 1 48853 32419

S'il n'est pas numéroté, définissez un fichier CSV avec les colonnes suivantes.

user item rating
1xxxxxxxx2 3xxxxxxxx5 1
1xxxxxxxx9 3xxxxxxxx5 1
1xxxxxxxx8 3xxxxxxxx3 0

Prédire le score

Chargez le modèle enregistré et prédisez le score à partir de la combinaison de l'ID utilisateur et de l'ID produit. Préparez les données de liste pour chaque ID utilisateur et ID produit séparément des données d'entraînement. En ce qui concerne le résultat de la prédiction, «tous les résultats» et «les résultats des N meilleurs scores pour chaque utilisateur» sont enregistrés sous forme de fichier CSV. Cette fois, l'ID utilisateur et l'ID produit qui n'existent pas dans les données d'entraînement d'origine (= le journal des actions n'existe pas) ne sont pas prédits.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

"""predict score from collaborative filtering model."""

from datetime import datetime
from pyspark.ml.recommendation import ALSModel
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import FloatType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StringType
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType

class CreatePredictedScore(object):
    """predict score from collaborative filtering model."""

    def run(
        self,
        model_file_path: str,
        predict_data_dir_path: str,
        user_data_file_path: str,
        item_data_file_path: str,
        processed_training_data_file_path: str,
        data_limit: int=-1
    ) -> bool:
        """execute."""
        # make spark context
        spark = SparkSession\
            .builder\
            .appName('create_predicted_score')\
            .config('spark.sql.crossJoin.enabled', 'true')\
            .config('spark.debug.maxToStringFields', 500)\
            .getOrCreate()
        sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)

        # load user data
        users_df = sqlContext\
            .read\
            .format('csv')\
            .options(header='false')\
            .load(user_data_file_path)
        users_id_rdd = users_df.rdd.map(lambda l: Row(user_id=l[0]))
        users_id_df = sqlContext.createDataFrame(users_id_rdd)

        # load item data
        items_df = sqlContext\
            .read\
            .format('csv')\
            .options(header='false')\
            .load(item_data_file_path)
        items_id_rdd = items_df.rdd.map(lambda l: Row(item_id=l[0]))
        items_id_df = sqlContext.createDataFrame(items_id_rdd)

        # cross join user_id and item_id
        joined_df = users_id_df.join(items_id_df)
        joined_df.cache()

        # delete unnecessary variables
        del(users_df)
        del(users_id_rdd)
        del(users_id_df)
        del(items_df)
        del(items_id_rdd)
        del(items_id_df)

        # load training data
        custom_schema = StructType([
            StructField('user', StringType(), True),
            StructField('item', StringType(), True),
            StructField('rating', FloatType(), True),
            StructField('unique_user_id', IntegerType(), True),
            StructField('unique_item_id', IntegerType(), True),
        ])
        training_df = sqlContext\
            .read\
            .format('csv')\
            .options(header='true')\
            .load(processed_training_data_file_path, schema=custom_schema)
        # users
        unique_users_rdd = training_df.rdd.map(lambda l: [l[0], l[3]])
        unique_users_df = sqlContext.createDataFrame(
            unique_users_rdd,
            ('user', 'unique_user_id')
        ).dropDuplicates()
        unique_users_df.cache()
        # items
        unique_items_rdd = training_df.rdd.map(lambda l: [l[1], l[4]])
        unique_items_df = sqlContext.createDataFrame(
            unique_items_rdd,
            ('item', 'unique_item_id')
        ).dropDuplicates()
        unique_items_df.cache()

        # delete unnecessary variables
        del(training_df)
        del(unique_users_rdd)
        del(unique_items_rdd)

        # add unique user id
        joined_df = joined_df.join(
            unique_users_df,
            joined_df['user_id'] == unique_users_df['user'],
            'inner'
        ).drop(unique_users_df['user'])

        # add unique item id
        joined_df = joined_df.join(
            unique_items_df,
            joined_df['item_id'] == unique_items_df['item'],
            'inner'
        ).drop(unique_items_df['item'])

        # load model
        model = ALSModel.load(model_file_path)

        # predict score
        predictions = model.transform(joined_df)
        all_predict_data = predictions\
            .select('user_id', 'item_id', 'prediction')\
            .filter('prediction > 0')

        # save
        # all score
        saved_data_file_path = predict_data_dir_path + 'als_predict_data_all.csv'
        all_predict_data.write\
            .format('csv')\
            .mode('overwrite')\
            .options(header='true')\
            .save(saved_data_file_path)

        # limited score
        data_limit = int(data_limit)
        if data_limit > 0:
            all_predict_data.registerTempTable('predictions')
            sql = 'SELECT user_id, item_id, prediction ' \
                + 'FROM ( ' \
                + '  SELECT user_id, item_id, prediction, dense_rank() ' \
                + '  OVER (PARTITION BY user_id ORDER BY prediction DESC) AS rank ' \
                + '  FROM predictions ' \
                + ') tmp WHERE rank <= %d' % (data_limit)
            limited_predict_data = sqlContext.sql(sql)

            saved_data_file_path = predict_data_dir_path + 'als_predict_data_limit.csv'
            limited_predict_data.write\
                .format('csv')\
                .mode('overwrite')\
                .options(header='true')\
                .save(saved_data_file_path)

        return True

La combinaison de cibles de prédiction est créée par la procédure suivante.

  1. Créez toutes les combinaisons d'ID utilisateur et d'ID produit à partir de chaque donnée de liste
  2. Lisez les données d'entraînement, ajoutez l'ID renuméroté à la combinaison de 1. et supprimez l'ID utilisateur et l'ID de produit qui n'ont pas pu être numérotés.

Si vous n'avez pas besoin de renuméroter dans la première section, modifiez la méthode run comme suit:

def run(
    self,
    model_file_path: str,
    predict_data_dir_path: str,
    processed_training_data_file_path: str,
    data_limit: int=-1
) -> bool:
    """execute."""
    # make spark context
    spark = SparkSession\
        .builder\
        .appName('create_predicted_score')\
        .config('spark.sql.crossJoin.enabled', 'true')\
        .config('spark.debug.maxToStringFields', 500)\
        .getOrCreate()
    sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)

    # load training data
    custom_schema = StructType([
        StructField('user', IntegerType(), True),
        StructField('item', IntegerType(), True),
        StructField('rating', FloatType(), True),
    ])
    training_df = sqlContext\
        .read\
        .format('csv')\
        .options(header='true')\
        .load(processed_training_data_file_path, schema=custom_schema)

    # load user data
    users_id_rdd = training_df.rdd.map(lambda l: Row(user_id=l[0]))
    users_id_df = sqlContext.createDataFrame(users_id_rdd)

    # load item data
    items_id_rdd = training_df.rdd.map(lambda l: Row(item_id=l[1]))
    items_id_df = sqlContext.createDataFrame(items_id_rdd)

    # cross join user_id and item_id
    joined_df = users_id_df.join(items_id_df)
    joined_df.cache()

    # delete unnecessary variables
    del(training_df)
    del(users_id_rdd)
    del(users_id_df)
    del(items_id_rdd)
    del(items_id_df)

    # load model
    model = ALSModel.load(model_file_path)
(comme ci-dessous)

Exécutez comme suit.

cps = CreatePredictedScore()
cps.run(
    model_file_path,
    predict_data_dir_path,
    user_data_file_path,
    item_data_file_path,
    processed_training_data_file_path,
    data_limit
)

Les paramètres sont les suivants. Dans data_limit, N des N meilleurs scores est spécifié. Si 0 ou moins est spécifié, les N premières données ne seront pas créées.

ʻUser_data_file_path est défini sur le fichier CSV avec l'ID utilisateur dans la première colonne. J'utilise un fichier sans en-têtes. Pour ʻitem_data_file_path, définissez le fichier CSV avec l'ID de produit dans la première colonne. J'utilise également un fichier sans en-têtes. Définissez un fichier CSV avec les colonnes suivantes dans processing_training_data_file_path.

user item rating unique_user_id unique_item_id
1xxxxxxxx7 3xxxxxxxx3 1 57704 32419
1xxxxxxxx8 3xxxxxxxx3 0 115460 32419
1xxxxxxxx6 3xxxxxxxx3 1 48853 32419

Un fichier CSV avec les colonnes suivantes est généré dans predict_data_dir_path.

user_id user_id prediction
1xxxxxxxx3 3xxxxxxxx4 0.15594198
1xxxxxxxx3 3xxxxxxxx0 0.19135818
1xxxxxxxx3 3xxxxxxxx8 0.048197098

«prediction» est la valeur prédite.

Résumé

Implémentation du co-filtrage à l'aide d'ALS dans PySpark. Je suis désolé que ce soit difficile à lire parce que je n'ai pas divisé la méthode. La prochaine fois, j'expliquerai comment évaluer le modèle.

Lien de référence

Recommended Posts

Co-filtrage avec PySpark
Filtrage coordonné avec analyse des composants principaux et clustering K-means
Apprenez le filtrage collaboratif avec les supports Coursera Machine Learning
J'ai essayé de mettre en œuvre le co-filtrage (recommandation) avec redis et python
Co-filtrage basé sur l'utilisateur avec python
La vie PySpark à partir de Docker
[Recommandation] Filtrage basé sur le contenu et filtrage coopératif
[Python] Utilisation d'OpenCV avec Python (filtrage d'image)
Mesurer la similitude du contenu avec Pyspark
Filtrage de texte avec des baies naïves de sklearn