J'ai écrit le code pour mesurer la similitude du contenu dans pyspark. Il y avait peu de documents japonais, je vais donc en prendre note. J'ai utilisé docker parce que c'était pénible d'introduire une étincelle à partir de zéro.
Pour présenter docker http://qiita.com/hshimo/items/e24b1fbfbf775ec7c941 Je l'ai mentionné.
Fondamentalement Get started with Docker for Mac Téléchargez et installez simplement plus de fichiers dmg.
Clonez l'image docker de spark + jupyter. Je l'ai cloné sur https://github.com/busbud/jupyter-docker-stacks/tree/master/all-spark-notebook. Comme indiqué dans le README
$ docker run -d -p 8888:8888 jupyter/all-spark-notebook -e GRANT_SUDO=yes
Tapez la commande ci-dessus sur le terminal. Cela fera apparaître un notebook jupyter qui peut utiliser pyspark sur localhost: 8888. (-E GRANT_SUDO = yes est une option qui vous permet d'utiliser jupyter sans privilèges d'utilisateur.)
Lorsque vous appuyez sur la commande docker ps
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
91fc42290759 jupyter/all-spark-notebook "tini -- start-not..." 3 hours ago Up 3 hours 0.0.0.0:8888->8888/tcp nifty_bartik
Vous pouvez voir que le docker spécifié est en cours d'exécution.
Le code créé est https://github.com/kenchin110100/machine_learning/blob/master/samplePysparkSimilarity.ipynb C'est dedans.
Tout d'abord, importez les bibliothèques requises et initialisez pyspark.
# coding: utf-8
"""
Exemple de similitude avec pyspark
"""
import numpy as np
from pyspark import SQLContext, sql
import pyspark
from pyspark.sql import functions, Row
from pyspark.mllib.linalg import DenseVector
sc = pyspark.SparkContext('local[*]')
sqlContext = sql.SQLContext(sc)
sc est une instance requise pour utiliser le type RDD de pyspark, sqlContext est une instance requise pour utiliser le type DataFrame.
Ensuite, j'ai créé des exemples de données pour mesurer la similitude.
#Créer des exemples de données
samples = [['aaa', 'a', 30, 1,2,3,4,5] + np.random.randn(5).tolist(),
['aaa', 'b', 30,2,1,3,4,1] + np.random.randn(5).tolist(),
['bbb', 'a', 30,4,5,3,2,4] + np.random.randn(5).tolist(),
['bbb', 'b', 30,1,2,4,3,1] + np.random.randn(5).tolist(),
['ccc', 'a', 30,4,5,2,1,2] + np.random.randn(5).tolist(),
['ccc', 'b', 30,1,2,5,4,1] + np.random.randn(5).tolist(),]
#Créer des noms de colonnes
colnames = [
'mc', 'mtc', 'area_cd',
'label1', 'label2', 'label3', 'label4', 'label5',
'label6', 'label7', 'label8', 'label9', 'label10'
]
colnames1 = [col + '_1' for col in colnames]
colnames2 = [col + '_2' for col in colnames]
#Convertir les exemples de données créés en type pyspark DataFrame
df1 = sqlContext.createDataFrame(sc.parallelize(samples), colnames1)
df2 = sqlContext.createDataFrame(sc.parallelize(samples), colnames2)
Nous considérons mc et mtc comme des clés uniques et label1 à label10 comme des identités.
Le même échantillon de données est stocké dans deux blocs de données Il s'agit de créer une combinaison de similitude avec jointure. Après avoir converti les échantillons en type RDD avec sc.parallelize (samples), il est converti en type DataFrame avec createDataFrame.
Utilisez ensuite une jointure de type DataFrame pour énumérer les combinaisons qui mesurent la similitude.
joined_df = df1.join(df2, df1.area_cd_1 == df2.area_cd_2).filter(functions.concat(df1.mc_1, df1.mtc_1) != functions.concat(df2.mc_2, df2.mtc_2))
Jointure de type DataFrame
df1.join(df2, <conditions>, 'left' or 'inner' or ...)
Vous pouvez le créer avec. Dans le code créé cette fois, en effectuant un filtre après la jointure, J'essaye de ne pas mesurer la similitude avec moi-même.
functions.concat(df1.mc_1, df1.mtc_1)
Ensuite, en combinant les deux clés mc et mtc que vous souhaitez rendre uniques, elles sont traitées comme une clé unique.
Nous calculerons la similitude en utilisant le DataFrame créé jusqu'à présent.
Tout d'abord, définissez la fonction.
def match_sim(row1 ,row2):
keys = row1.asDict().keys()
total = len(keys)
count = 0
for key in keys:
if row1[key] == row2[key]:
count += 1
return float(count)/total
def cosine_sim(vec1 ,vec2):
dot = abs(vec1.dot(vec2))
n1 = vec1.norm(None)
n2 = vec1.norm(None)
return float(dot/(n1*n2))
match_sim est une fonction pour mesurer la similitude des variables catégorielles. Passez le type Row de pyspark. Renvoie 1 s'ils correspondent, 0 s'ils ne correspondent pas et renvoie la valeur divisée par les nombres premiers comparés.
cosine_sim est une fonction pour calculer la similitude cosinus Passez le type DenseVector de pyspark.mllib.
Utilisez cette fonction pour calculer la similitude pour chaque ligne.
joined_rdd = joined_df.rdd.map(lambda x: (
Row(mc_1=x.mc_1, mtc_1=x.mtc_1, mc_2=x.mc_2, mtc_2=x.mtc_2),
Row(label1=x.label1_1, label2=x.label2_1, label3=x.label3_1, label4=x.label4_1, label5=x.label5_1),
DenseVector([x.label6_1,x.label7_1,x.label8_1,x.label9_1,x.label10_1]),
Row(label1=x.label1_2, label2=x.label2_2, label3=x.label3_2, label4=x.label4_2, label5=x.label5_2),
DenseVector([x.label6_2,x.label7_2,x.label8_2,x.label9_2,x.label10_2])
)) \
.map(lambda x: (x[0], match_sim(x[1], x[3]), cosine_sim(x[2], x[4]))) \
.map(lambda x: (x[0].mc_1, x[0].mtc_1, x[0].mc_2, x[0].mtc_2, x[1], x[2]))
Commencez par convertir le type DataFrame joint_df créé précédemment en type rdd et mappez-le (1ère ligne). Enregistrez 5 types de données pour chaque combinaison. Row (mc_1 = x.mc_1 ...) est une ligne (2e ligne) pour stocker une clé unique pour la similitude. Row (label1 = x.label1_1 ...) est la ligne de stockage des variables catégorielles (3e ligne) DenseVector (x.label6_1, ...) est un vecteur pour stocker des variables continues (4ème ligne) Les 5ème et 6ème lignes stockent les variables catégorielles et les variables continues de l'autre ligne pour la similitude.
Mappez davantage le RDD qui enregistre les 5 types de types de données ainsi créés (ligne 8). Avec x [0] tel quel, la similarité de correspondance est calculée pour x [1] et x [3], et la similitude cosinus est calculée pour x [2] et x [4]. Enfin, formatez-le pour qu'il puisse être à nouveau passé au type DataFrame (ligne 9).
La table de similitude ainsi créée est la suivante.
sqlContext.createDataFrame(joined_rdd, ['tar_mc', 'tar_mtc', 'res_mc', 'res_mtc', 'match_sim', 'cosine_sim']).show()
+------+-------+------+-------+---------+--------------------+
|tar_mc|tar_mtc|res_mc|res_mtc|match_sim| cosine_sim|
+------+-------+------+-------+---------+--------------------+
| aaa| a| aaa| b| 0.4| 0.2979433262317515|
| aaa| a| bbb| a| 0.2| 0.2161103600613806|
| aaa| a| bbb| b| 0.4| 0.6933162039799152|
| aaa| a| ccc| a| 0.0| 0.34941331375143353|
| aaa| a| ccc| b| 0.6| 0.5354750033557132|
| aaa| b| aaa| a| 0.4| 0.19428899651078324|
| aaa| b| bbb| a| 0.2| 0.10702152405150611|
| aaa| b| bbb| b| 0.2| 0.4033681950723296|
| aaa| b| ccc| a| 0.0| 0.20097172584128625|
| aaa| b| ccc| b| 0.4| 0.6861144738544892|
| bbb| a| aaa| a| 0.2| 0.3590385377694502|
| bbb| a| aaa| b| 0.2| 0.27266040008605663|
| bbb| a| bbb| b| 0.0| 1.1313716028957246|
| bbb| a| ccc| a| 0.4|0.009321106239696326|
| bbb| a| ccc| b| 0.0| 1.0017633803368193|
| bbb| b| aaa| a| 0.4| 0.2176828683879606|
| bbb| b| aaa| b| 0.2| 0.194213765887726|
| bbb| b| bbb| a| 0.0| 0.21381230488831227|
| bbb| b| ccc| a| 0.0| 0.21074015342537053|
| bbb| b| ccc| b| 0.6| 0.34536679942567616|
+------+-------+------+-------+---------+--------------------+
only showing top 20 rows
Pour chaque clé, la similarité et la similarité cosinus en fonction du degré de correspondance de la catégorie sont calculées.
Cette fois, j'ai utilisé l'image jupyter-spark de docker pour créer un échantillon afin de mesurer la similitude entre les données. Il devrait y avoir une manière plus concise de l'écrire (comme l'utilisation de ColumnSimilarity de MLlib), mais pour diverses raisons, j'ai écrit ce code détourné cette fois-ci. Je suis un débutant de docker et d'étincelle, donc j'aimerais m'entraîner un peu plus à partir de maintenant.
Recommended Posts