MLlib de Spark migre vers ML. À partir de Spark 2.0, l'API MLlib basée sur RDD sera uniquement destinée à la maintenance, et l'API basée sur DataFrame sera la norme à l'avenir. Ici, nous utiliserons l'API ML dans PySpark pour effectuer l'analyse des composants principaux.
Implémentons un exemple dans Spark qui analyse les tendances du papier à partir de l'évaluation de l'accomplissement de l'actualité, des affaires et des sports des 10 journaux liés ci-dessous.
http://ifs.nog.cc/gucchi24.hp.infoseek.co.jp/SHUSEIEX.htm
Concernant 10 journaux, le contenu de l'article a été étudié sur une échelle de 10 points pour l'actualité, les affaires et le sport. L'échelle est très bonne de 0 à 10, mais pas bonne, mais 0.
news.csv
no,nouvelles,Entreprise,Des sports
1,8,9,4
2,2,5,7
3,8,5,6
4,3,5,4
5,7,4,9
6,4,3,4
7,3,6,8
8,6,8,2
9,5,4,5
10,6,7,6
Auparavant, il était standard de commencer par SparkContext
, mais dans la version 2.0, nous utiliseronsSparkSession
. SQLContext
, HiveContext
ont été intégrés dans SparkSession
.
from pyspark.sql import SparkSession
spark = (SparkSession
.builder
.appName("news")
.enableHiveSupport()
.getOrCreate())
Lisez le CSV avec spark.read.csv
et stockez-le dans le DataFrame.
ex1.py
df = spark.read.csv(filename, header=True, inferSchema=True, mode="DROPMALFORMED", encoding='UTF-8')
print("====Données brutes====")
df.show(truncate=False)
Si le japonais est inclus, le tableau se réduira. La largeur des caractères n'est pas prise en compte.
$ export PYTHONIOENCODING=utf8
$ spark-submit ex1.py
====Données brutes====
+---+----+----+----+
|no |nouvelles|Entreprise|Des sports|
+---+----+----+----+
|1 |8 |9 |4 |
|2 |2 |5 |7 |
|3 |8 |5 |6 |
|4 |3 |5 |4 |
|5 |7 |4 |9 |
|6 |4 |3 |4 |
|7 |3 |6 |8 |
|8 |6 |8 |2 |
|9 |5 |4 |5 |
|10 |6 |7 |6 |
+---+----+----+----+
PCA () nécessite une variable sous forme vectorielle. Utilisez VectorAssembler
pour faire de [News, Business, Sports] un vecteur et le stocker dans la colonne Variant
. .transform (df)
crée un nouveau DataFrame.
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=df.columns[1:], outputCol="variable")
feature_vectors = assembler.transform(df)
feature_vectors.show(truncate=False)
Un vecteur a été ajouté à la colonne «variant».
+---+----+----+----+-------------+
| no|nouvelles|Entreprise|Des sports|variable|
+---+----+----+----+-------------+
| 1| 8| 9| 4|[8.0,9.0,4.0]|
| 2| 2| 5| 7|[2.0,5.0,7.0]|
| 3| 8| 5| 6|[8.0,5.0,6.0]|
| 4| 3| 5| 4|[3.0,5.0,4.0]|
| 5| 7| 4| 9|[7.0,4.0,9.0]|
| 6| 4| 3| 4|[4.0,3.0,4.0]|
| 7| 3| 6| 8|[3.0,6.0,8.0]|
| 8| 6| 8| 2|[6.0,8.0,2.0]|
| 9| 5| 4| 5|[5.0,4.0,5.0]|
| 10| 6| 7| 6|[6.0,7.0,6.0]|
+---+----+----+----+-------------+
Après la destination du lien, les données sont standardisées avant le calcul. Dans l'analyse en composantes principales, il est généralement préférable de normaliser avant de calculer. ML a Standard Scaler
, alors utilisez ceci.
L'entrée est un vecteur de la colonne «variant» et la sortie est la «variable normalisée». Cette API crée d'abord un modèle à partir des données d'entrée avec .fit
, puis redonne les données d'entrée avec .transform
et les transforme réellement.
from pyspark.ml.feature import StandardScaler
# step1
scaler = StandardScaler(inputCol="variable", outputCol="標準化variable", withStd=True, withMean=True)
scalerModel = scaler.fit(feature_vectors)
# step2
std_feature_vectors = scalerModel.transform(feature_vectors)
#Afficher uniquement les variables normalisées
print("====Données standardisées====")
std_feature_vectors.select("Variable normalisée").show(truncate=False)
Il est légèrement différent de la Table liée. StandardScaler
utilise une distribution non biaisée (n-1), mais la destination du lien utilise une distribution d'échantillon (n). Veuillez consulter d'autres articles pour plus de détails.
====Données standardisées====
+---------------------------------------------------------------+
|Variable normalisée|
+---------------------------------------------------------------+
|[1.3023647131866891,1.7919573407620815,-0.7071067811865476] |
|[-1.4884168150705013,-0.3162277660168382,0.7071067811865476] |
|[1.3023647131866891,-0.3162277660168382,0.23570226039551587] |
|[-1.0232865603609695,-0.3162277660168382,-0.7071067811865476] |
|[0.8372344584771575,-0.8432740427115681,1.649915822768611] |
|[-0.5581563056514377,-1.370320319406298,-0.7071067811865476] |
|[-1.0232865603609695,0.21081851067789167,1.1785113019775793] |
|[0.3721042037676257,1.2649110640673515,-1.649915822768611] |
|[-0.09302605094190601,-0.8432740427115681,-0.23570226039551587]|
|[0.3721042037676257,0.7378647873726216,0.23570226039551587] |
+---------------------------------------------------------------+
PCA
Enfin, vous pouvez appeler l'API d'analyse des composants principaux. L'entrée est une "variable standardisée" et la sortie est un "score de la composante principale". Comme pour le Standard Scaler
ci-dessus, créez d'abord un modèle, puis effectuez les calculs réels. Le vecteur propre et le taux de cotisation peuvent être obtenus à partir du modèle construit. k = 3 est une instruction pour calculer jusqu'à la troisième composante principale. À l'origine, k est fixé à une valeur élevée et calculé une fois, puis k est sélectionné de sorte que le total cumulé du taux de cotisation supérieur soit d'environ 80% et calculé à nouveau.
from pyspark.ml.feature import PCA
pca = PCA(k=3, inputCol="Variable normalisée", outputCol="Score du composant principal")
pcaModel = pca.fit(std_feature_vectors)
print("====Vecteur unique====")
print(pcaModel.pc)
print("====Taux de cotisation====")
print(pcaModel.explainedVariance)
pca_score = pcaModel.transform(std_feature_vectors).select("Score du composant principal")
print("====Score du composant principal====")
pca_score.show(truncate=False)
En ce qui concerne le résultat du vecteur propre, la première colonne (la colonne verticale la plus à gauche) est le vecteur propre du premier composant principal, la deuxième colonne est le deuxième composant principal et la troisième colonne est le troisième composant principal.
Le taux de cotisation était de 52% pour la première composante principale, 30% pour la deuxième composante principale et 17,6% pour la troisième composante principale. Puisque le total cumulatif du premier et du deuxième est de 82%, le troisième composant principal peut être supprimé. Dans ce cas, définissez k = 2.
La valeur propre n'est ** pas obtenue **, mais le taux de cotisation est "la somme de la valeur propre / valeur propre", donc dans la plupart des cas, le taux de cotisation devrait être suffisant.
Les scores des composants principaux de chaque journal sont le premier composant principal dans la première colonne, le deuxième composant principal dans la deuxième colonne et le troisième composant principal dans la troisième colonne. Le code de la partition du premier composant principal est à l'envers comme destination du lien. Le simple fait que le vecteur soit orienté à 180 degrés en face n'affecte pas le résultat de l'analyse. Les valeurs sont légèrement différentes en raison de la différence dans la distribution non biaisée et la distribution de l'échantillon.
====Vecteur unique====
DenseMatrix([[-0.53130806, 0.68925233, -0.49258803],
[-0.67331251, 0.00933405, 0.73929908],
[ 0.51416145, 0.72446125, 0.45912296]])
====Taux de cotisation====
[0.52355344314,0.300887148322,0.175559408538]
====Score du composant principal====
+---------------------------------------------------------------+
|Score du composant principal|
+---------------------------------------------------------------+
|[-2.2620712255691466,0.4021126641946994,0.35861418406317674] |
|[1.3672950172090064,-0.516574975843834,0.8240383763102186] |
|[-0.35784774304549694,1.0654633785914394,-0.7670998522924913] |
|[0.3930334607140129,-1.220525792393691,-0.05437714111925901] |
|[0.9712806670593661,1.7644947192188811,-0.2783291638335238] |
|[0.8556397135650156,-0.9097726336587761,-1.0627843972001996] |
|[1.0076787432724863,0.1504509197015279,1.2009982469039933] |
|[-1.8977055313059759,-0.9270196509736093,-0.005660728153863093]|
|[0.4960234396284956,-0.24274673811341405,-0.6858245266064249] |
|[-0.5733265415277634,0.43411810927677885,0.47042500192836967] |
+---------------------------------------------------------------+
Nous avons présenté un exemple d'analyse des composants principaux à l'aide de DataFrame, PCA et Standard Scaler parmi les API ML de Spark.
PCA
est une variable au format vectoriel, et la sortie est le score de la composante principale.StandardScaler
pour standardiser l'entréePour une analyse des tendances des journaux ... voir l'article lié (^^;
pca.py
# -*- coding: utf-8 -*-
from pyspark.sql import SparkSession
from pyspark.ml.feature import PCA, VectorAssembler, StandardScaler
# Initialize SparkSession
spark = (SparkSession
.builder
.appName("news")
.enableHiveSupport()
.getOrCreate())
# Read raw data
df = spark.read.csv('news.csv', header=True, inferSchema=True, mode="DROPMALFORMED", encoding='UTF-8')
print("====Données brutes====")
df.show(truncate=False)
assembler = VectorAssembler(inputCols=df.columns[1:], outputCol="variable")
feature_vectors = assembler.transform(df)
feature_vectors.show()
scaler = StandardScaler(inputCol="variable", outputCol="標準化variable", withStd=True, withMean=True)
scalerModel = scaler.fit(feature_vectors)
std_feature_vectors = scalerModel.transform(feature_vectors)
print("====Données standardisées====")
std_feature_vectors.select("Variable normalisée").show(truncate=False)
# build PCA model
pca = PCA(k=3, inputCol="Variable normalisée", outputCol="Score du composant principal")
pcaModel = pca.fit(std_feature_vectors)
print("====Vecteur unique====")
print(pcaModel.pc)
print("====Taux de cotisation====")
print(pcaModel.explainedVariance)
pca_score = pcaModel.transform(std_feature_vectors).select("Score du composant principal")
print("====Score du composant principal====")
pca_score.show(truncate=False)