I've written code to measure content similarity in pyspark. There were few Japanese documents, so I will make a note of it. I used docker because it was awkward to introduce spark from scratch.
To introduce docker http://qiita.com/hshimo/items/e24b1fbfbf775ec7c941 I referred to.
Basically Get started with Docker for Mac Just download and install more dmg files.
Clone the docker image of spark + jupyter. I cloned it to https://github.com/busbud/jupyter-docker-stacks/tree/master/all-spark-notebook. As noted in the README
$ docker run -d -p 8888:8888 jupyter/all-spark-notebook -e GRANT_SUDO=yes
Type the above command on terminal. This will bring up a jupyter notebook that can use pyspark on localhost: 8888. (-E GRANT_SUDO = yes is an option to allow jupyter to be used without user privileges.)
When you hit the docker ps command
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
91fc42290759 jupyter/all-spark-notebook "tini -- start-not..." 3 hours ago Up 3 hours 0.0.0.0:8888->8888/tcp nifty_bartik
You can see that the specified docker is running.
The created code is https://github.com/kenchin110100/machine_learning/blob/master/samplePysparkSimilarity.ipynb It is in.
First, import the required libraries and initialize pyspark.
# coding: utf-8
"""
Sample for similarity with pyspark
"""
import numpy as np
from pyspark import SQLContext, sql
import pyspark
from pyspark.sql import functions, Row
from pyspark.mllib.linalg import DenseVector
sc = pyspark.SparkContext('local[*]')
sqlContext = sql.SQLContext(sc)
sc is an instance required to use the RDD type of pyspark, sqlContext is an instance required to use the DataFrame type.
Next, I created sample data to measure the similarity.
#Creating sample data
samples = [['aaa', 'a', 30, 1,2,3,4,5] + np.random.randn(5).tolist(),
['aaa', 'b', 30,2,1,3,4,1] + np.random.randn(5).tolist(),
['bbb', 'a', 30,4,5,3,2,4] + np.random.randn(5).tolist(),
['bbb', 'b', 30,1,2,4,3,1] + np.random.randn(5).tolist(),
['ccc', 'a', 30,4,5,2,1,2] + np.random.randn(5).tolist(),
['ccc', 'b', 30,1,2,5,4,1] + np.random.randn(5).tolist(),]
#Creating column names
colnames = [
'mc', 'mtc', 'area_cd',
'label1', 'label2', 'label3', 'label4', 'label5',
'label6', 'label7', 'label8', 'label9', 'label10'
]
colnames1 = [col + '_1' for col in colnames]
colnames2 = [col + '_2' for col in colnames]
#Convert the created sample data to pyspark DataFrame type
df1 = sqlContext.createDataFrame(sc.parallelize(samples), colnames1)
df2 = sqlContext.createDataFrame(sc.parallelize(samples), colnames2)
We consider mc and mtc as unique keys and label1 to label10 as features.
The same sample data is stored in two data frames This is to create a combination for similarity with join. After converting samples to RDD type with sc.parallelize (samples), it is converted to DataFrame type with createDataFrame.
Then use a DataFrame type join to enumerate the combinations that measure similarity.
joined_df = df1.join(df2, df1.area_cd_1 == df2.area_cd_2).filter(functions.concat(df1.mc_1, df1.mtc_1) != functions.concat(df2.mc_2, df2.mtc_2))
DataFrame type join
df1.join(df2, <conditions>, 'left' or 'inner' or ...)
You can create it with. In the code created this time, by performing a filter after join, I try not to measure the similarity with myself.
functions.concat(df1.mc_1, df1.mtc_1)
Then, by combining the two keys of mc and mtc that you want to make unique, they are treated as one unique key.
We will calculate the similarity using the DataFrame created so far.
First, define the function.
def match_sim(row1 ,row2):
keys = row1.asDict().keys()
total = len(keys)
count = 0
for key in keys:
if row1[key] == row2[key]:
count += 1
return float(count)/total
def cosine_sim(vec1 ,vec2):
dot = abs(vec1.dot(vec2))
n1 = vec1.norm(None)
n2 = vec1.norm(None)
return float(dot/(n1*n2))
match_sim is a function for measuring the similarity of categorical variables. Pass the Row type of pyspark. Returns 1 if they match, 0 if they don't match, and returns the value divided by the compared features.
cosine_sim is a function for calculating cosine similarity Pass the DenseVector type of pyspark.mllib.
Use this function to calculate the similarity for each row.
joined_rdd = joined_df.rdd.map(lambda x: (
Row(mc_1=x.mc_1, mtc_1=x.mtc_1, mc_2=x.mc_2, mtc_2=x.mtc_2),
Row(label1=x.label1_1, label2=x.label2_1, label3=x.label3_1, label4=x.label4_1, label5=x.label5_1),
DenseVector([x.label6_1,x.label7_1,x.label8_1,x.label9_1,x.label10_1]),
Row(label1=x.label1_2, label2=x.label2_2, label3=x.label3_2, label4=x.label4_2, label5=x.label5_2),
DenseVector([x.label6_2,x.label7_2,x.label8_2,x.label9_2,x.label10_2])
)) \
.map(lambda x: (x[0], match_sim(x[1], x[3]), cosine_sim(x[2], x[4]))) \
.map(lambda x: (x[0].mc_1, x[0].mtc_1, x[0].mc_2, x[0].mtc_2, x[1], x[2]))
First, convert the DataFrame type joined_df created earlier to the rdd type and map it (1st line). Save 5 data types for each combination. Row (mc_1 = x.mc_1 ...) is a Row (2nd row) for storing a unique key for similarity. Row (label1 = x.label1_1 ...) is the Row for storing categorical variables (3rd row) DenseVector (x.label6_1, ...) is a Vector for storing continuous variables (4th line) The 5th and 6th lines store the categorical variables and continuous variables of the other line for similarity.
Map further to the RDD that saved the 5 types of data types created in this way (line 8). With x [0] as is, the match similarity is calculated for x [1] and x [3], and the cosine similarity is calculated for x [2] and x [4]. Finally, format it so that it can be passed to the DataFrame type again (line 9).
The similarity table created in this way is as follows.
sqlContext.createDataFrame(joined_rdd, ['tar_mc', 'tar_mtc', 'res_mc', 'res_mtc', 'match_sim', 'cosine_sim']).show()
+------+-------+------+-------+---------+--------------------+
|tar_mc|tar_mtc|res_mc|res_mtc|match_sim| cosine_sim|
+------+-------+------+-------+---------+--------------------+
| aaa| a| aaa| b| 0.4| 0.2979433262317515|
| aaa| a| bbb| a| 0.2| 0.2161103600613806|
| aaa| a| bbb| b| 0.4| 0.6933162039799152|
| aaa| a| ccc| a| 0.0| 0.34941331375143353|
| aaa| a| ccc| b| 0.6| 0.5354750033557132|
| aaa| b| aaa| a| 0.4| 0.19428899651078324|
| aaa| b| bbb| a| 0.2| 0.10702152405150611|
| aaa| b| bbb| b| 0.2| 0.4033681950723296|
| aaa| b| ccc| a| 0.0| 0.20097172584128625|
| aaa| b| ccc| b| 0.4| 0.6861144738544892|
| bbb| a| aaa| a| 0.2| 0.3590385377694502|
| bbb| a| aaa| b| 0.2| 0.27266040008605663|
| bbb| a| bbb| b| 0.0| 1.1313716028957246|
| bbb| a| ccc| a| 0.4|0.009321106239696326|
| bbb| a| ccc| b| 0.0| 1.0017633803368193|
| bbb| b| aaa| a| 0.4| 0.2176828683879606|
| bbb| b| aaa| b| 0.2| 0.194213765887726|
| bbb| b| bbb| a| 0.0| 0.21381230488831227|
| bbb| b| ccc| a| 0.0| 0.21074015342537053|
| bbb| b| ccc| b| 0.6| 0.34536679942567616|
+------+-------+------+-------+---------+--------------------+
only showing top 20 rows
For each key, the similarity and cosine similarity by category match are calculated.
This time, I used docker's jupyter-spark image to create a sample to measure the similarity between data. There should be a more concise way to write it (such as using MLlib's ColumnSimilarity), but for a variety of reasons, I wrote this roundabout code this time around. I'm a beginner of docker and spark, so I'd like to practice a little more from now on.