Spark ML has a mechanism called Pipeline
that combines continuous conversion operations of data frames into one. Using this will make it easier to write code and will improve the efficiency of memory usage inside Spark.
The general flow is as follows
Let's rewrite the previously posted Principal component analysis with Spark ML using Pipeline.
The stage in the pipeline is called the stage. In the example of principal component analysis, there were the following three stages.
Declare the pipeline using these three. df is a data frame that contains the input data. See the previous article for details on df.
from pyspark.ml.pipeline import Pipeline
#Each stage of Pipeline
assembler = VectorAssembler(inputCols=df.columns[1:], outputCol="variable")
scaler = StandardScaler(inputCol="variable", outputCol="標準化variable", withStd=True, withMean=True)
pca = PCA(k=3, inputCol="Standardized variate", outputCol="Main component score")
#Pipeline Declaration
pipeline = Pipeline(stages=[
assembler,
scaler,
pca
])
Enter data into the constructed pipeline and create a model.
model = pipeline.fit(df)
result = model.transform(df)
result.select("Main component score").show(truncate=False)
I got the same results as when I ran them individually in the previous article.
+---------------------------------------------------------------+
|Main component score|
+---------------------------------------------------------------+
|[-2.2620712255691466,0.4021126641946994,0.35861418406317674] |
|[1.3672950172090064,-0.516574975843834,0.8240383763102186] |
|[-0.35784774304549694,1.0654633785914394,-0.7670998522924913] |
|[0.3930334607140129,-1.220525792393691,-0.05437714111925901] |
|[0.9712806670593661,1.7644947192188811,-0.2783291638335238] |
|[0.8556397135650156,-0.9097726336587761,-1.0627843972001996] |
|[1.0076787432724863,0.1504509197015279,1.2009982469039933] |
|[-1.8977055313059759,-0.9270196509736093,-0.005660728153863093]|
|[0.4960234396284956,-0.24274673811341405,-0.6858245266064249] |
|[-0.5733265415277634,0.43411810927677885,0.47042500192836967] |
+---------------------------------------------------------------+
Stage objects can be referenced with model.stages []
. Let's refer to the PCA model of the third stage.
print("====Eigenvector====")
print(model.stages[2].pc)
print("====Contribution rate====")
print(model.stages[2].explainedVariance)
By using Pipeline, the intermediate variables disappeared and the code was written neatly. You can also refer to individual models for each stage, so there seems to be no reason not to use Pipeline.
# -*- coding: utf-8 -*-
from pyspark.sql import SparkSession
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.feature import PCA, VectorAssembler, StandardScaler
# Initialize SparkSession
spark = (SparkSession
.builder
.appName("news")
.enableHiveSupport()
.getOrCreate())
# Read raw data
df = spark.read.csv('news.csv', header=True, inferSchema=True, mode="DROPMALFORMED", encoding='UTF-8')
print("====Raw data====")
df.show(truncate=False)
#Prepare pipeline parts
assembler = VectorAssembler(inputCols=df.columns[1:], outputCol="variable")
scaler = StandardScaler(inputCol="variable", outputCol="標準化variable", withStd=True, withMean=True)
pca = PCA(k=3, inputCol="Standardized variate", outputCol="Main component score")
pipeline = Pipeline(stages=[
assembler,
scaler,
pca
])
#Run the pipeline to build a model from the input data
model = pipeline.fit(df)
#Run the model
result = model.transform(df)
result.show(truncate=False)
#Pipeline's stage.Can be referenced in stages
print("====Eigenvector====")
print(model.stages[2].pc)
print("====Contribution rate====")
print(model.stages[2].explainedVariance)
Recommended Posts