Collaborative filtering with PySpark

Hello everyone. @best_not_best. This time, I will introduce the technology according to the work I am in charge of.

Overview

Collaborative filtering is used to calculate the score at which a user purchases a product. Since the amount of calculation is large and it takes time to process large-scale data, PySpark performs distributed processing.

environment

Add the following to spark-env.sh so that Python 3.x can be called on the Spark side.

export PYSPARK_PYTHON=python3
PYSPARK_DRIVER_PYTHON=python3

procedure

  1. Get training data
  2. Process the training data (if necessary)
  3. Create a model
  4. Predict the score

Get training data

Acquire training data from a database, etc., and create a CSV file with 3 columns of "user", "item", and "rating". For example, when using "product purchase data for the past month", the data will be "user ID", "product ID", and "whether or not the product was purchased (not purchased: 0 / purchased: 1)". Below is an example of a CSV file.

user item rating
1xxxxxxxx2 3xxxxxxxx5 1
1xxxxxxxx9 3xxxxxxxx5 1
1xxxxxxxx8 3xxxxxxxx3 0

As the name of rating," how much the user gave the product a rating "is the original usage, but as mentioned above," whether the product was purchased "or" accessed the page ". It is also possible to implement with data such as "whether or not". In the former case, it is a model that predicts "how much the user will buy the product", and in the latter case, "how much the user will visit the page".

Process training data

Since the user ID and product ID can only handle up to the maximum value of ʻint32 (2,147,483,647), if there is an ID that exceeds that, the ID will be numbered again. Also, since only integer values can be handled, renumbering is performed in the same way even if a character string is included. If the ID is an integer value and does not exceed the maximum value of ʻint32, skip this step.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

"""processing training data."""

from datetime import datetime
from pyspark.sql.dataframe import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

class ProcessTrainingData(object):
    """get training data from Redshift, and add sequence number to data."""

    def __get_action_log(
        self,
        sqlContext: SQLContext,
        unprocessed_data_file_path: str
    ) -> DataFrame:
        """get data."""
        df = sqlContext\
            .read\
            .format('csv')\
            .options(header='true')\
            .load(unprocessed_data_file_path)

        return df

    def run(
        self,
        unprocessed_data_file_path: str,
        training_data_dir_path: str
    ) -> bool:
        """execute."""
        # make spark context
        spark = SparkSession\
            .builder\
            .appName('process_training_data')\
            .config('spark.sql.crossJoin.enabled', 'true')\
            .config('spark.debug.maxToStringFields', 500)\
            .getOrCreate()
        sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)

        # get data
        df = self.__get_action_log(sqlContext, unprocessed_data_file_path)

        # make sequence number of users
        unique_users_rdd = df.rdd.map(lambda l: l[0]).distinct().zipWithIndex()
        unique_users_df = sqlContext.createDataFrame(
            unique_users_rdd,
            ('user', 'unique_user_id')
        )

        # make sequence number of items
        unique_items_rdd = df.rdd.map(lambda l: l[1]).distinct().zipWithIndex()
        unique_items_df = sqlContext.createDataFrame(
            unique_items_rdd,
            ('item', 'unique_item_id')
        )

        # add sequence number of users, sequence number of items to data
        df = df.join(
            unique_users_df,
            df['user'] == unique_users_df['user'],
            'inner'
        ).drop(unique_users_df['user'])
        df = df.join(
            unique_items_df,
            df['item'] == unique_items_df['item'],
            'inner'
        ).drop(unique_items_df['item'])

        # save
        saved_data_file_path = training_data_dir_path + 'cf_training_data.csv'
        df.write\
            .format('csv')\
            .mode('overwrite')\
            .options(header='true')\
            .save(saved_data_file_path)

        return True

Read the CSV of the training data, add the user ID and product ID columns renumbered with zipWithIndex (), and save it as a separate file. Execute as follows.

ptd = ProcessTrainingData()
ptd.run(unprocessed_data_file_path, training_data_dir_path)

The parameters are as follows.

As mentioned above, set ʻunprocessed_data_file_path` to a CSV file with the following columns.

user item rating
1xxxxxxxx2 3xxxxxxxx5 1
1xxxxxxxx9 3xxxxxxxx5 1
1xxxxxxxx8 3xxxxxxxx3 0

When executed, a CSV file with the following columns will be output to training_data_dir_path.

user item rating unique_user_id unique_item_id
1xxxxxxxx7 3xxxxxxxx3 1 57704 32419
1xxxxxxxx8 3xxxxxxxx3 0 115460 32419
1xxxxxxxx6 3xxxxxxxx3 1 48853 32419

Create a model

Create and save a model of collaborative filtering.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

"""create collaborative filtering model."""

from datetime import datetime
from pyspark.ml.recommendation import ALS
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import FloatType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StringType
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType

class CreateCfModel(object):
    """create collaborative filtering model."""

    def run(
        self,
        processed_training_data_file_path: str,
        model_dir_path: str,
        rank: int,
        max_iter: int,
        implicit_prefs: str,
        alpha: float,
        num_user_blocks: int,
        num_item_blocks: int
    ) -> bool:
        """execute."""
        # make spark context
        spark = SparkSession\
            .builder\
            .appName('create_cf_model')\
            .config('spark.sql.crossJoin.enabled', 'true')\
            .config('spark.debug.maxToStringFields', 500)\
            .getOrCreate()
        sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)

        # create model
        als = ALS(
            rank=int(rank),
            maxIter=int(max_iter),
            implicitPrefs=bool(implicit_prefs),
            alpha=float(alpha),
            numUserBlocks=int(num_user_blocks),
            numItemBlocks=int(num_item_blocks),
            userCol='unique_user_id',
            itemCol='unique_item_id'
        )

        # load training data
        custom_schema = StructType([
            StructField('user', StringType(), True),
            StructField('item', StringType(), True),
            StructField('rating', FloatType(), True),
            StructField('unique_user_id', IntegerType(), True),
            StructField('unique_item_id', IntegerType(), True),
        ])
        df = sqlContext\
            .read\
            .format('csv')\
            .options(header='true')\
            .load(processed_training_data_file_path, schema=custom_schema)

        # fitting
        model = als.fit(df)

        # save
        saved_data_dir_path = model_dir_path + 'als_model'
        model.write().overwrite().save(saved_data_dir_path)

        return True

If you did not need to renumber in the previous section, match the # create model and # load training data to the column names in the CSV file, and modify as follows.

# create model
als = ALS(
    rank=int(rank),
    maxIter=int(max_iter),
    implicitPrefs=bool(implicit_prefs),
    alpha=float(alpha),
    numUserBlocks=int(num_user_blocks),
    numItemBlocks=int(num_item_blocks),
    userCol='user',
    itemCol='item'
)

# load training data
custom_schema = StructType([
    StructField('user', IntegerType(), True),
    StructField('item', IntegerType(), True),
    StructField('rating', FloatType(), True),
])

Execute as follows.

ccm = CreateCfModel()
ccm.run(
    processed_training_data_file_path,
    model_dir_path,
    rank,
    max_iter,
    implicit_prefs,
    alpha,
    num_user_blocks,
    num_item_blocks
)

The parameters are as follows. rank, max_iter, ʻimplicit_prefs, ʻalpha, num_user_blocks, num_item_blocks are [PySpark ALS parameters](http://spark.apache.org/docs/latest/api/python/ It will be pyspark.ml.html#pyspark.ml.recommendation.ALS).

When numbered in the previous section, set the CSV file with the following columns in processed_training_data_file_path.

user item rating unique_user_id unique_item_id
1xxxxxxxx7 3xxxxxxxx3 1 57704 32419
1xxxxxxxx8 3xxxxxxxx3 0 115460 32419
1xxxxxxxx6 3xxxxxxxx3 1 48853 32419

If not numbered, set a CSV file with the following columns.

user item rating
1xxxxxxxx2 3xxxxxxxx5 1
1xxxxxxxx9 3xxxxxxxx5 1
1xxxxxxxx8 3xxxxxxxx3 0

Predict the score

Load the saved model and predict the score from the combination of user ID and product ID. Prepare list data for each user ID and product ID separately from the training data. As for the prediction result, "all results" and "results of the top N scores for each user" are saved as CSV files. This time, the user ID and product ID that do not exist in the original learning data (= the action log does not exist) are not predicted.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

"""predict score from collaborative filtering model."""

from datetime import datetime
from pyspark.ml.recommendation import ALSModel
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import FloatType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StringType
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType

class CreatePredictedScore(object):
    """predict score from collaborative filtering model."""

    def run(
        self,
        model_file_path: str,
        predict_data_dir_path: str,
        user_data_file_path: str,
        item_data_file_path: str,
        processed_training_data_file_path: str,
        data_limit: int=-1
    ) -> bool:
        """execute."""
        # make spark context
        spark = SparkSession\
            .builder\
            .appName('create_predicted_score')\
            .config('spark.sql.crossJoin.enabled', 'true')\
            .config('spark.debug.maxToStringFields', 500)\
            .getOrCreate()
        sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)

        # load user data
        users_df = sqlContext\
            .read\
            .format('csv')\
            .options(header='false')\
            .load(user_data_file_path)
        users_id_rdd = users_df.rdd.map(lambda l: Row(user_id=l[0]))
        users_id_df = sqlContext.createDataFrame(users_id_rdd)

        # load item data
        items_df = sqlContext\
            .read\
            .format('csv')\
            .options(header='false')\
            .load(item_data_file_path)
        items_id_rdd = items_df.rdd.map(lambda l: Row(item_id=l[0]))
        items_id_df = sqlContext.createDataFrame(items_id_rdd)

        # cross join user_id and item_id
        joined_df = users_id_df.join(items_id_df)
        joined_df.cache()

        # delete unnecessary variables
        del(users_df)
        del(users_id_rdd)
        del(users_id_df)
        del(items_df)
        del(items_id_rdd)
        del(items_id_df)

        # load training data
        custom_schema = StructType([
            StructField('user', StringType(), True),
            StructField('item', StringType(), True),
            StructField('rating', FloatType(), True),
            StructField('unique_user_id', IntegerType(), True),
            StructField('unique_item_id', IntegerType(), True),
        ])
        training_df = sqlContext\
            .read\
            .format('csv')\
            .options(header='true')\
            .load(processed_training_data_file_path, schema=custom_schema)
        # users
        unique_users_rdd = training_df.rdd.map(lambda l: [l[0], l[3]])
        unique_users_df = sqlContext.createDataFrame(
            unique_users_rdd,
            ('user', 'unique_user_id')
        ).dropDuplicates()
        unique_users_df.cache()
        # items
        unique_items_rdd = training_df.rdd.map(lambda l: [l[1], l[4]])
        unique_items_df = sqlContext.createDataFrame(
            unique_items_rdd,
            ('item', 'unique_item_id')
        ).dropDuplicates()
        unique_items_df.cache()

        # delete unnecessary variables
        del(training_df)
        del(unique_users_rdd)
        del(unique_items_rdd)

        # add unique user id
        joined_df = joined_df.join(
            unique_users_df,
            joined_df['user_id'] == unique_users_df['user'],
            'inner'
        ).drop(unique_users_df['user'])

        # add unique item id
        joined_df = joined_df.join(
            unique_items_df,
            joined_df['item_id'] == unique_items_df['item'],
            'inner'
        ).drop(unique_items_df['item'])

        # load model
        model = ALSModel.load(model_file_path)

        # predict score
        predictions = model.transform(joined_df)
        all_predict_data = predictions\
            .select('user_id', 'item_id', 'prediction')\
            .filter('prediction > 0')

        # save
        # all score
        saved_data_file_path = predict_data_dir_path + 'als_predict_data_all.csv'
        all_predict_data.write\
            .format('csv')\
            .mode('overwrite')\
            .options(header='true')\
            .save(saved_data_file_path)

        # limited score
        data_limit = int(data_limit)
        if data_limit > 0:
            all_predict_data.registerTempTable('predictions')
            sql = 'SELECT user_id, item_id, prediction ' \
                + 'FROM ( ' \
                + '  SELECT user_id, item_id, prediction, dense_rank() ' \
                + '  OVER (PARTITION BY user_id ORDER BY prediction DESC) AS rank ' \
                + '  FROM predictions ' \
                + ') tmp WHERE rank <= %d' % (data_limit)
            limited_predict_data = sqlContext.sql(sql)

            saved_data_file_path = predict_data_dir_path + 'als_predict_data_limit.csv'
            limited_predict_data.write\
                .format('csv')\
                .mode('overwrite')\
                .options(header='true')\
                .save(saved_data_file_path)

        return True

The combination of prediction targets is created by the following procedure.

  1. Create all combinations of user ID and product ID from each list data
  2. Read the training data, add the renumbered ID to the combination of 1., and delete the user ID and product ID that could not be numbered.

If you didn't need to renumber in the first section, modify the run method as follows:

def run(
    self,
    model_file_path: str,
    predict_data_dir_path: str,
    processed_training_data_file_path: str,
    data_limit: int=-1
) -> bool:
    """execute."""
    # make spark context
    spark = SparkSession\
        .builder\
        .appName('create_predicted_score')\
        .config('spark.sql.crossJoin.enabled', 'true')\
        .config('spark.debug.maxToStringFields', 500)\
        .getOrCreate()
    sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)

    # load training data
    custom_schema = StructType([
        StructField('user', IntegerType(), True),
        StructField('item', IntegerType(), True),
        StructField('rating', FloatType(), True),
    ])
    training_df = sqlContext\
        .read\
        .format('csv')\
        .options(header='true')\
        .load(processed_training_data_file_path, schema=custom_schema)

    # load user data
    users_id_rdd = training_df.rdd.map(lambda l: Row(user_id=l[0]))
    users_id_df = sqlContext.createDataFrame(users_id_rdd)

    # load item data
    items_id_rdd = training_df.rdd.map(lambda l: Row(item_id=l[1]))
    items_id_df = sqlContext.createDataFrame(items_id_rdd)

    # cross join user_id and item_id
    joined_df = users_id_df.join(items_id_df)
    joined_df.cache()

    # delete unnecessary variables
    del(training_df)
    del(users_id_rdd)
    del(users_id_df)
    del(items_id_rdd)
    del(items_id_df)

    # load model
    model = ALSModel.load(model_file_path)
(same as below)

Execute as follows.

cps = CreatePredictedScore()
cps.run(
    model_file_path,
    predict_data_dir_path,
    user_data_file_path,
    item_data_file_path,
    processed_training_data_file_path,
    data_limit
)

The parameters are as follows. In data_limit, N of the top N scores is specified. If 0 or less is specified, the top N data will not be created.

ʻUser_data_file_path is set to the CSV file with the user ID in the first column. I'm using a file without a header. In ʻitem_data_file_path, set the CSV file with the product ID in the first column. Similarly, I am using a file without a header. For processed_training_data_file_path, set a CSV file with the following columns.

user item rating unique_user_id unique_item_id
1xxxxxxxx7 3xxxxxxxx3 1 57704 32419
1xxxxxxxx8 3xxxxxxxx3 0 115460 32419
1xxxxxxxx6 3xxxxxxxx3 1 48853 32419

A CSV file with the following columns is output to predict_data_dir_path.

user_id user_id prediction
1xxxxxxxx3 3xxxxxxxx4 0.15594198
1xxxxxxxx3 3xxxxxxxx0 0.19135818
1xxxxxxxx3 3xxxxxxxx8 0.048197098

prediction is the predicted value.

Summary

Implemented collaborative filtering with ALS in PySpark. I'm sorry it's hard to read because I haven't divided the methods. Next time, I will explain how to evaluate the model.

Reference link

Recommended Posts

Collaborative filtering with PySpark
PySpark learning record ③ Recommendation overview + Collaborative filtering easily implemented with Spark ML
Collaborative filtering with principal component analysis and K-means clustering
Learn collaborative filtering along with Coursera Machine Learning materials
I implemented collaborative filtering (recommendation) with redis and python
User-based collaborative filtering in python
PySpark life starting with Docker
[Recommendation] Content-based filtering and collaborative filtering
[Python] Using OpenCV with Python (Image Filtering)
Aim for content similarity with Pyspark
Text filtering with naive bayes in sklearn