Spark's MLlib is migrating to ML. From Spark 2.0, the RDD-based MLlib API will be maintenance-only, and the DataFrame-based API will become the standard in the future. Here, we will use the ML API in PySpark to perform principal component analysis.
Let's implement an example in Spark that analyzes the trends of the paper from the news, business, and sports fulfillment evaluations of the 10 newspapers linked below.
http://ifs.nog.cc/gucchi24.hp.infoseek.co.jp/SHUSEIEX.htm
Regarding 10 newspapers, the content of the article was surveyed on a 10-point scale for news, business, and sports. The scale is very good from 0 to 10, but not good, but 0.
news.csv
no,news,business,Sports
1,8,9,4
2,2,5,7
3,8,5,6
4,3,5,4
5,7,4,9
6,4,3,4
7,3,6,8
8,6,8,2
9,5,4,5
10,6,7,6
Previously it was standard to start with SparkContext
, but 2.0 uses SparkSession
. SQLContext
, HiveContext
have been integrated into SparkSession
.
from pyspark.sql import SparkSession
spark = (SparkSession
.builder
.appName("news")
.enableHiveSupport()
.getOrCreate())
Read the CSV with spark.read.csv
and store it in the DataFrame.
ex1.py
df = spark.read.csv(filename, header=True, inferSchema=True, mode="DROPMALFORMED", encoding='UTF-8')
print("====Raw data====")
df.show(truncate=False)
If Japanese is included, the table will collapse. Character width is not taken into account.
$ export PYTHONIOENCODING=utf8
$ spark-submit ex1.py
====Raw data====
+---+----+----+----+
|no |news|business|Sports|
+---+----+----+----+
|1 |8 |9 |4 |
|2 |2 |5 |7 |
|3 |8 |5 |6 |
|4 |3 |5 |4 |
|5 |7 |4 |9 |
|6 |4 |3 |4 |
|7 |3 |6 |8 |
|8 |6 |8 |2 |
|9 |5 |4 |5 |
|10 |6 |7 |6 |
+---+----+----+----+
PCA () requires variates in vector form. Use VectorAssembler
to vectorize [News, Business, Sports] and store it in the Fluent
column. .transform (df)
creates a new DataFrame.
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=df.columns[1:], outputCol="variable")
feature_vectors = assembler.transform(df)
feature_vectors.show(truncate=False)
A vector has been added to the variate
column.
+---+----+----+----+-------------+
| no|news|business|Sports|variable|
+---+----+----+----+-------------+
| 1| 8| 9| 4|[8.0,9.0,4.0]|
| 2| 2| 5| 7|[2.0,5.0,7.0]|
| 3| 8| 5| 6|[8.0,5.0,6.0]|
| 4| 3| 5| 4|[3.0,5.0,4.0]|
| 5| 7| 4| 9|[7.0,4.0,9.0]|
| 6| 4| 3| 4|[4.0,3.0,4.0]|
| 7| 3| 6| 8|[3.0,6.0,8.0]|
| 8| 6| 8| 2|[6.0,8.0,2.0]|
| 9| 5| 4| 5|[5.0,4.0,5.0]|
| 10| 6| 7| 6|[6.0,7.0,6.0]|
+---+----+----+----+-------------+
Following the link, the data is standardized here as well before the calculation. Principal component analysis generally gives better results after standardization. ML has Standard Scaler
, so use this.
The input is a vector of the variate
column and the output is the standardized variate
. This API first creates a model from the input data with .fit
, then gives the input data again with .transform
and actually transforms it.
from pyspark.ml.feature import StandardScaler
# step1
scaler = StandardScaler(inputCol="variable", outputCol="標準化variable", withStd=True, withMean=True)
scalerModel = scaler.fit(feature_vectors)
# step2
std_feature_vectors = scalerModel.transform(feature_vectors)
#Display only standardized variables
print("====Standardized data====")
std_feature_vectors.select("Standardized variate").show(truncate=False)
It is slightly different from the Linked table. StandardScaler
uses unbiased variance (n-1), but the link destination uses sample variance (n). Please refer to other articles for details.
====Standardized data====
+---------------------------------------------------------------+
|Standardized variate|
+---------------------------------------------------------------+
|[1.3023647131866891,1.7919573407620815,-0.7071067811865476] |
|[-1.4884168150705013,-0.3162277660168382,0.7071067811865476] |
|[1.3023647131866891,-0.3162277660168382,0.23570226039551587] |
|[-1.0232865603609695,-0.3162277660168382,-0.7071067811865476] |
|[0.8372344584771575,-0.8432740427115681,1.649915822768611] |
|[-0.5581563056514377,-1.370320319406298,-0.7071067811865476] |
|[-1.0232865603609695,0.21081851067789167,1.1785113019775793] |
|[0.3721042037676257,1.2649110640673515,-1.649915822768611] |
|[-0.09302605094190601,-0.8432740427115681,-0.23570226039551587]|
|[0.3721042037676257,0.7378647873726216,0.23570226039551587] |
+---------------------------------------------------------------+
PCA
Finally you can call the principal component analysis API. The input is a standardized variate
and the output is a principal component score
. As with the Standard Scaler
above, first create a model and then do the actual calculations. The eigenvector and contribution rate can be obtained from the constructed model. k = 3 is an instruction to calculate up to the third principal component. Originally, k is set to a large value and calculated once, then k is selected so that the cumulative total from the top contribution rate is around 80% and calculated again.
from pyspark.ml.feature import PCA
pca = PCA(k=3, inputCol="Standardized variate", outputCol="Main component score")
pcaModel = pca.fit(std_feature_vectors)
print("====Eigenvector====")
print(pcaModel.pc)
print("====Contribution rate====")
print(pcaModel.explainedVariance)
pca_score = pcaModel.transform(std_feature_vectors).select("Main component score")
print("====Main component score====")
pca_score.show(truncate=False)
As for the result of the eigenvector, the first column (the leftmost vertical column) is the eigenvector of the first principal component, the second column is the second principal component, and the third column is the third principal component.
The contribution rate was 52% for the first principal component, 30% for the second principal component, and 17.6% for the third principal component. Since the cumulative total of the first and second is 82%, the third main component may be removed. In that case, set k = 2.
The eigenvalue is ** not available **, but the contribution is the sum of the eigenvalues / eigenvalues
, so in most cases the contribution should be sufficient.
The main component scores of each newspaper are the first main component in the first column, the second main component in the second column, and the third main component in the third column. The sign of the score of the first principal component is upside down as link destination. Just because the vector is oriented 180 degrees opposite does not affect the analysis result. The values are slightly different due to the difference in unbiased variance and sample variance.
====Eigenvector====
DenseMatrix([[-0.53130806, 0.68925233, -0.49258803],
[-0.67331251, 0.00933405, 0.73929908],
[ 0.51416145, 0.72446125, 0.45912296]])
====Contribution rate====
[0.52355344314,0.300887148322,0.175559408538]
====Main component score====
+---------------------------------------------------------------+
|Main component score|
+---------------------------------------------------------------+
|[-2.2620712255691466,0.4021126641946994,0.35861418406317674] |
|[1.3672950172090064,-0.516574975843834,0.8240383763102186] |
|[-0.35784774304549694,1.0654633785914394,-0.7670998522924913] |
|[0.3930334607140129,-1.220525792393691,-0.05437714111925901] |
|[0.9712806670593661,1.7644947192188811,-0.2783291638335238] |
|[0.8556397135650156,-0.9097726336587761,-1.0627843972001996] |
|[1.0076787432724863,0.1504509197015279,1.2009982469039933] |
|[-1.8977055313059759,-0.9270196509736093,-0.005660728153863093]|
|[0.4960234396284956,-0.24274673811341405,-0.6858245266064249] |
|[-0.5733265415277634,0.43411810927677885,0.47042500192836967] |
+---------------------------------------------------------------+
We have introduced an example of principal component analysis using DataFrame, PCA, and Standard Scaler among Spark's ML APIs.
PCA
is a variable in vector format, and the output is the principal component score.StandardScaler
to standardize inputFor an analysis of newspaper trends ... see the linked article (^^;
pca.py
# -*- coding: utf-8 -*-
from pyspark.sql import SparkSession
from pyspark.ml.feature import PCA, VectorAssembler, StandardScaler
# Initialize SparkSession
spark = (SparkSession
.builder
.appName("news")
.enableHiveSupport()
.getOrCreate())
# Read raw data
df = spark.read.csv('news.csv', header=True, inferSchema=True, mode="DROPMALFORMED", encoding='UTF-8')
print("====Raw data====")
df.show(truncate=False)
assembler = VectorAssembler(inputCols=df.columns[1:], outputCol="variable")
feature_vectors = assembler.transform(df)
feature_vectors.show()
scaler = StandardScaler(inputCol="variable", outputCol="標準化variable", withStd=True, withMean=True)
scalerModel = scaler.fit(feature_vectors)
std_feature_vectors = scalerModel.transform(feature_vectors)
print("====Standardized data====")
std_feature_vectors.select("Standardized variate").show(truncate=False)
# build PCA model
pca = PCA(k=3, inputCol="Standardized variate", outputCol="Main component score")
pcaModel = pca.fit(std_feature_vectors)
print("====Eigenvector====")
print(pcaModel.pc)
print("====Contribution rate====")
print(pcaModel.explainedVariance)
pca_score = pcaModel.transform(std_feature_vectors).select("Main component score")
print("====Main component score====")
pca_score.show(truncate=False)
Recommended Posts