As an implementation practice of data processing and machine learning in PySpark, I tried the Kaggle Titanic competition with PySpark binding. I will write while comparing with the theme of "What do you do with PySpark, this process that you often do with Pandas and Scikit-learn?"
I pulled the image of Jupyter/pyspark-notebook: latest with Docker and used it as it is (Python 3.8.6, Spark 3.0.1). For details, see Above.
I'm sorry.
#Session start
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("titanic").getOrCreate()
#read csv data
train = spark.read.csv('./data/train.csv', header=True, inferSchema=True)
test = spark.read.csv('./data/test.csv', header=True, inferSchema=True)
#Export DataFrame to Parquet format
train.write.parquet('./data/train_parquet')
test.write.parquet('./data/test_parquet')
#Data reload
train = spark.read.parquet('./data/train_parquet', header=True, inferSchema=True)
test = spark.read.parquet('./data/test_parquet', header=True, inferSchema=True)
There is no equivalent to .shape
in Pandas. Get the number of lines with .count ()
. The number of columns can be obtained as a list with .columns
, so count it withlen ()
.
#Confirm shape
train_shape = (train.count(), len(train.columns))
test_shape = (test.count(), len(test.columns))
print('train:',train_shape)
print('test:',test_shape)
>>>
train: (891, 12)
test: (418, 11)
Check the data type of each column. The data types are handled by Scala, not Python.
#Check the schema
train.printSchema()
>>>
root
|-- PassengerId: integer (nullable = true)
|-- Survived: integer (nullable = true)
|-- Pclass: integer (nullable = true)
|-- Name: string (nullable = true)
|-- Sex: string (nullable = true)
|-- Age: double (nullable = true)
|-- SibSp: integer (nullable = true)
|-- Parch: integer (nullable = true)
|-- Ticket: string (nullable = true)
|-- Fare: double (nullable = true)
|-- Cabin: string (nullable = true)
|-- Embarked: string (nullable = true)
The equivalent of .head ()
in Pandas, which displays a few lines of data to get an overview. In the case of PySpark, there is a method of displaying the DataFrame itself with .show ()
or a method of displaying it in Row object units with head ()
. I think the latter is easier to see when there are many columns or when long text data is included.
#Display 5 data
train.show(5)
>>>
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass| Name| Sex| Age|SibSp|Parch| Ticket| Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
| 1| 0| 3|Braund, Mr. Owen ...| male|22.0| 1| 0| A/5 21171| 7.25| null| S|
| 2| 1| 1|Cumings, Mrs. Joh...|female|38.0| 1| 0| PC 17599|71.2833| C85| C|
| 3| 1| 3|Heikkinen, Miss. ...|female|26.0| 0| 0|STON/O2. 3101282| 7.925| null| S|
| 4| 1| 1|Futrelle, Mrs. Ja...|female|35.0| 1| 0| 113803| 53.1| C123| S|
| 5| 0| 3|Allen, Mr. Willia...| male|35.0| 0| 0| 373450| 8.05| null| S|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
only showing top 5 rows
#Display 5 data
train.head(5)
>>>
[Row(PassengerId=1, Survived=0, Pclass=3, Name='Braund, Mr. Owen Harris', Sex='male', Age=22.0, SibSp=1, Parch=0, Ticket='A/5 21171', Fare=7.25, Cabin=None, Embarked='S'),
Row(PassengerId=2, Survived=1, Pclass=1, Name='Cumings, Mrs. John Bradley (Florence Briggs Thayer)', Sex='female', Age=38.0, SibSp=1, Parch=0, Ticket='PC 17599', Fare=71.2833, Cabin='C85', Embarked='C'),
Row(PassengerId=3, Survived=1, Pclass=3, Name='Heikkinen, Miss. Laina', Sex='female', Age=26.0, SibSp=0, Parch=0, Ticket='STON/O2. 3101282', Fare=7.925, Cabin=None, Embarked='S'),
Row(PassengerId=4, Survived=1, Pclass=1, Name='Futrelle, Mrs. Jacques Heath (Lily May Peel)', Sex='female', Age=35.0, SibSp=1, Parch=0, Ticket='113803', Fare=53.1, Cabin='C123', Embarked='S'),
Row(PassengerId=5, Survived=0, Pclass=3, Name='Allen, Mr. William Henry', Sex='male', Age=35.0, SibSp=0, Parch=0, Ticket='373450', Fare=8.05, Cabin=None, Embarked='S')]
I often do it in Pandas .describe ()
. The same is true for PySpark, but you need .show ()
to display it. By default, Pandas only displays numeric variables, but PySpark also displays categorical variables.
#Display statistic list
train.describe().show()
>>>
+-------+------------------+-------------------+------------------+--------------------+------+-----------------+------------------+-------------------+------------------+-----------------+-----+--------+
|summary| PassengerId| Survived| Pclass| Name| Sex| Age| SibSp| Parch| Ticket| Fare|Cabin|Embarked|
+-------+------------------+-------------------+------------------+--------------------+------+-----------------+------------------+-------------------+------------------+-----------------+-----+--------+
| count| 889| 889| 889| 889| 889| 712| 889| 889| 889| 889| 202| 889|
| mean| 446.0|0.38245219347581555|2.3115860517435323| null| null|29.64209269662921|0.5241844769403825|0.38245219347581555| 260763.9104704097|32.09668087739029| null| null|
| stddev|256.99817277718313|0.48625968831477334|0.8346997785705753| null| null|14.49293290032352| 1.103704875596923| 0.8067607445174785|472255.95121695305|49.69750431670795| null| null|
| min| 1| 0| 1|"Andersson, Mr. A...|female| 0.42| 0| 0| 110152| 0.0| A10| C|
| max| 891| 1| 3|van Melkebeke, Mr...| male| 80.0| 8| 6| WE/P 5735| 512.3292| T| S|
+-------+------------------+-------------------+------------------+--------------------+------+-----------------+------------------+-------------------+------------------+-----------------+-----+--------+
I think it's quick to check with the count line of .describe ()
. You can also filter each column with .isNull ()
or isNotNull ()
to check.
#Missing value confirmation
# train.describe().head()But similar results
train.describe().collect()[0]
>>>
Row(summary='count', PassengerId='891', Survived='891', Pclass='891', Name='891', Sex='891', Age='714', SibSp='891', Parch='891', Ticket='891', Fare='891', Cabin='204', Embarked='889')
# 'Age'Get the number of missing values in a column
train.filter(train['Age'].isNull()).count()
>>>
177
# 'Age'Get the number of non-missing values in a column
train.filter(train['Age'].isNotNull()).count()
>>>
714
Use .dtypes
because you can get a list of tuples containing (column name, data type) for each column.
train.dtypes
>>>
[('PassengerId', 'int'),
('Survived', 'int'),
('Pclass', 'int'),
('Name', 'string'),
('Sex', 'string'),
('Age', 'double'),
('SibSp', 'int'),
('Parch', 'int'),
('Ticket', 'string'),
('Fare', 'double'),
('Cabin', 'string'),
('Embarked', 'string')]
This time, we decided to treat the data type'double'as a numeric variable and the data type'string' or'int' as a categorical variable, and divided them as follows in list comprehension notation.
#Separate columns for categorical variables and columns for numeric variables
num_cols = [dtype[0] for dtype in train.dtypes if dtype[1] == 'double']
cat_cols = [dtype[0] for dtype in train.dtypes if dtype[1] != 'double']
#Of the objective variable'Survived'Just remove
cat_cols.remove('Survived')
print('Categorical variables:{}'.format(cat_cols))
print('Numeric variable:{}'.format(num_cols))
>>>
Categorical variables:['PassengerId', 'Pclass', 'Name', 'Sex', 'SibSp', 'Parch', 'Ticket', 'Cabin', 'Embarked']
Numeric variable:['Age', 'Fare']
The equivalent of .nunique ()
in Pandas. Maybe it will be used as a reference when selecting features or converting categorical variables. Since it is not implemented in PySpark, if you want to do it, turn it with a for statement as follows. (It seems that distributed processing and for statements are not very compatible ...)
#Find out the unique number of categorical variables
nunique = {}
for col in cat_cols:
value = train.select(col).distinct().count()
nunique[col] = value
print(nunique)
>>>
{'PassengerId': 891,
'Pclass': 3,
'Name': 891,
'Sex': 2,
'SibSp': 7,
'Parch': 7,
'Ticket': 681,
'Cabin': 148,
'Embarked': 4}
For example, when you want to check if the data is imbalanced in the case of a classification task.
# 'Sex'Check the ratio of
train.groupBy('Sex').count().show()
>>>
+------+-----+
| Sex|count|
+------+-----+
|female| 314|
| male| 577|
+------+-----+
Implement using .groupBy ()
and .pivot ()
. In this example, we can see that the rate of life and death varies greatly depending on gender (although it is extremely famous).
#Make a pivot table
train.groupBy('Survived').pivot("Sex").count().show()
>>>
+--------+------+----+
|Survived|female|male|
+--------+------+----+
| 1| 233| 109|
| 0| 81| 468|
+--------+------+----+
As shown below, you can narrow down the elements to be displayed and display the statistics of numerical variables.
# 'Survived', 'Embarked'Every'Age'Find out the average value of
# 'Embarked'about'C','Q','S'Limited to
train.groupBy('Survived').pivot('Embarked', ['C', 'Q', 'S']).mean('Age').show()
>>>
+--------+------------------+------+------------------+
|Survived| C| Q| S|
+--------+------------------+------+------------------+
| 1| 28.97367088607595| 22.5| 28.11318407960199|
| 0|33.666666666666664|30.325|30.203966005665723|
+--------+------------------+------+------------------+
Use the .corr ()
method for the DataFrame. You can also get the correlation matrix by using Correlation
in Spark ML's stat module.
# 'Age'When'Fare'Correlation coefficient of
train.corr('Age', 'Fare')
>>>
0.135515853527051
The equivalent of train_test_split in Scikit-learn. In the case of PySpark, it is implemented as a method of DataFrame.
#train data 7:Divided into 3
df_train, df_valid = train.randomSplit([0.7,0.3], seed=2020)
print('df_train: {} rows'.format(df_train.count()))
print('df_valid: {} rows'.format(df_valid.count()))
>>>
df_train: 651 rows
df_valid: 240 rows
In the case of PySpark, it is not always necessary to delete unnecessary columns here because there is work to combine the columns to be used again into one column before plunging into the model, but it can be done as follows.
#Drop unnecessary columns
df_train = df_train.drop('PassengerId', 'Name', 'Cabin', 'Ticket')
df_valid = df_valid.drop('PassengerId', 'Name', 'Cabin', 'Ticket')
df_train.columns
>>>
['Survived', 'Pclass', 'Sex', 'Age', 'SibSp', 'Parch', 'Fare', 'Embarked']
You can delete a row containing a missing value with dropna ()
. There are three arguments, and how
selects'any'or'all' (default'any'). In thresh
, specify how many defects are deleted when how ='any' (int, default'None'). subset
selects the target column ('None' by default).
# 'Embarked' =Delete null data
df_train = df_train.dropna(how='any', subset=['Embarked'])
df_valid = df_valid.dropna(how='any', subset=['Embarked'])
df_train.describe().show()
>>>
+-------+-------------------+------------------+------+-----------------+------------------+-------------------+-----------------+--------+
|summary| Survived| Pclass| Sex| Age| SibSp| Parch| Fare|Embarked|
+-------+-------------------+------------------+------+-----------------+------------------+-------------------+-----------------+--------+
| count| 889| 889| 889| 712| 889| 889| 889| 889|
| mean|0.38245219347581555|2.3115860517435323| null|29.64209269662921|0.5241844769403825|0.38245219347581555|32.09668087739029| null|
| stddev|0.48625968831477334|0.8346997785705753| null|14.49293290032352| 1.103704875596923| 0.8067607445174785|49.69750431670795| null|
| min| 0| 1|female| 0.42| 0| 0| 0.0| C|
| max| 1| 3| male| 80.0| 8| 6| 512.3292| S|
+-------+-------------------+------------------+------+-----------------+------------------+-------------------+-----------------+--------+
If you want to complement with the mean or median, you can use Spark ML's Imputer
. You can see that a new column called'Age_imputed'has been added on the far right.
# 'Age'Complement columns with median
from pyspark.ml.feature import Imputer
imputer = Imputer(
strategy='median',
inputCol='Age',
outputCol='Age_imputed'
)
model = imputer.fit(df_train)
model.transform(df_train)
model.transform(df_valid)
#Verification
df_train.describe().show()
>>>
+-------+-------------------+------------------+------+------------------+------------------+------------------+------------------+--------+------------------+
|summary| Survived| Pclass| Sex| Age| SibSp| Parch| Fare|Embarked| Age_imputed|
+-------+-------------------+------------------+------+------------------+------------------+------------------+------------------+--------+------------------+
| count| 649| 649| 649| 521| 649| 649| 649| 649| 649|
| mean| 0.386748844375963| 2.295839753466872| null|29.335259117082533|0.5469953775038521|0.3913713405238829|31.948169645608576| null|29.071910631741137|
| stddev|0.48738094472424587|0.8418076223501735| null| 14.67636802626401| 1.1130653931477|0.7940671982196961| 46.4778648584037| null| 13.15793243066804|
| min| 0| 1|female| 0.42| 0| 0| 0.0| C| 0.42|
| max| 1| 3| male| 80.0| 8| 6| 512.3292| S| 80.0|
+-------+-------------------+------------------+------+------------------+------------------+------------------+------------------+--------+------------------+
Like Scikit-learn, Spark ML also implements StandardScaler
. However, in the case of Spark ML, only one column can be handled, so if you want to standardize multiple columns at the same time, you need to first group the columns you want to process with VectorAssembler
into one column and then standardize.
# 'Age_imputed'When'Fare'をまWhenめた'num_cols'Create column
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
inputCols = ['Age_imputed', 'Fare'],
outputCol='num_cols'
)
df_train = assembler.transform(df_train)
df_valid = assembler.transform(df_valid)
#Standardization
from pyspark.ml.feature import StandardScaler
sc = StandardScaler(inputCol='num_cols', outputCol='num_cols_scaled')
sc_model = sc.fit(df_train)
df_train = sc_model.transform(df_train)
df_valid = sc_model.transform(df_valid)
#Verification
df_train.select(['Age_imputed', 'Fare', 'num_cols', 'num_cols_scaled']).show(5)
>>>
+-----------+-------+--------------+--------------------+
|Age_imputed| Fare| num_cols| num_cols_scaled|
+-----------+-------+--------------+--------------------+
| 22.0| 7.25| [22.0,7.25]|[1.67199521018387...|
| 38.0|71.2833|[38.0,71.2833]|[2.88799172668123...|
| 26.0| 7.925| [26.0,7.925]|[1.97599433930821...|
| 35.0| 53.1| [35.0,53.1]|[2.65999237983797...|
| 35.0| 8.05| [35.0,8.05]|[2.65999237983797...|
+-----------+-------+--------------+--------------------+
only showing top 5 rows
One-Hot encoding is also implemented in Spark ML. Unlike StandardScaler, it can process multiple columns at once, but it can only handle numbers. Therefore, the column containing the character string needs to be processed by OneHotEncoder
after assigning a numerical value with StringIndexser
(that is, after Label encoding).
# 'Sex', 'Embarked'To a number
from pyspark.ml.feature import StringIndexer
idx_input = ['Sex', 'Embarked']
idx_output = [col + '_indexed' for col in idx_input]
indexer = StringIndexer(
inputCols=idx_input,
outputCols=idx_output
)
idx_model = indexer.fit(df_train)
df_train = idx_model.transform(df_train)
df_valid = idx_model.transform(df_valid)
#Verification
df_train.select(['Sex', 'Sex_indexed', 'Embarked', 'Embarked_indexed']).show(5)
>>>
+------+-----------+--------+----------------+
| Sex|Sex_indexed|Embarked|Embarked_indexed|
+------+-----------+--------+----------------+
| male| 0.0| S| 0.0|
|female| 1.0| C| 1.0|
|female| 1.0| S| 0.0|
|female| 1.0| S| 0.0|
| male| 0.0| S| 0.0|
+------+-----------+--------+----------------+
only showing top 5 rows
# One-Hot encoding
from pyspark.ml.feature import OneHotEncoder
ohe_input = ['Pclass', 'Sex_indexed', 'SibSp', 'Parch', 'Embarked_indexed']
ohe_output = [col + '_encoded' for col in ohe_input]
ohe = OneHotEncoder(
inputCols=ohe_input,
outputCols=ohe_output,
dropLast=True
)
ohe_model = ohe.fit(df_train)
df_train = ohe_model.transform(df_train)
df_valid = ohe_model.transform(df_valid)
# 'Embarked'confirm
df_train['Embarked', 'Embarked_indexed', 'Embarked_indexed_encoded'].show(10)
>>>
+--------+----------------+------------------------+
|Embarked|Embarked_indexed|Embarked_indexed_encoded|
+--------+----------------+------------------------+
| S| 0.0| (2,[0],[1.0])|
| C| 1.0| (2,[1],[1.0])|
| S| 0.0| (2,[0],[1.0])|
| S| 0.0| (2,[0],[1.0])|
| S| 0.0| (2,[0],[1.0])|
| Q| 2.0| (2,[],[])|
| S| 0.0| (2,[0],[1.0])|
| S| 0.0| (2,[0],[1.0])|
| C| 1.0| (2,[1],[1.0])|
| S| 0.0| (2,[0],[1.0])|
+--------+----------------+------------------------+
In this way, it becomes Sparse data after One-Hot encoding.
One of the differences from Scikit-learn is that PySpark requires the features used to train the model to be grouped into a single column. It seems that it is common to use VectorAssembler
, which was also used in standardization, and name the column'features'.
#Summarize features
assembler2 = VectorAssembler(
inputCols=['Pclass_encoded',
'Sex_indexed_encoded',
'SibSp_encoded',
'Parch_encoded',
'Embarked_indexed_encoded',
'num_cols_scaled'],
outputCol='features'
)
df_train = assembler2.transform(df_train)
df_valid = assembler2.transform(df_valid)
df_train.select(['Survived', 'features']).show(5)
>>>
+--------+--------------------+
|Survived| features|
+--------+--------------------+
| 0|(22,[3,5,12,18,20...|
| 1|(22,[1,5,12,19,20...|
| 1|(22,[4,12,18,20,2...|
| 1|(22,[1,5,12,18,20...|
| 0|(22,[3,4,12,18,20...|
+--------+--------------------+
only showing top 5 rows
By the way, this time the categorical variable was originally Sparse data and the numerical variable was Dense data (ordinary data), but it seems that these are automatically converted to Spaese data.
df_train.select(['features']).head(2)
>>>
#Check data structure
[Row(features=SparseVector(22, {3: 1.0, 5: 1.0, 12: 1.0, 18: 1.0, 20: 1.672, 21: 0.156})),
Row(features=SparseVector(22, {1: 1.0, 5: 1.0, 12: 1.0, 19: 1.0, 20: 2.888, 21: 1.5337}))]
This time we will simply use logistic regression. Most of the other basic things, such as decision trees and SVC, are implemented in Spark ML.
#Logistic regression
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol='Survived')
lr_model = lr.fit(df_train)
For those with correct labels, you can use .evaluation ()
to get the resulting Summary object for the model, and .predictions
to check the contents.
#Acquisition of inference results
train_result = lr_model.evaluate(df_train)
valid_result = lr_model.evaluate(df_valid)
#Verification
valid_result.predictions.select(['Survived', 'rawPrediction', 'probability', 'prediction']).show()
>>>
+--------+--------------------+--------------------+----------+
|Survived| rawPrediction| probability|prediction|
+--------+--------------------+--------------------+----------+
| 1|[-0.0107443246616...|[0.49731394467449...| 1.0|
| 0|[2.10818940159344...|[0.89169660088758...| 0.0|
| 0|[2.71630875457920...|[0.93798215479564...| 0.0|
| 1|[-0.2429596953280...|[0.43955710975950...| 1.0|
| 0|[1.81502375560081...|[0.85996794511927...| 0.0|
+--------+--------------------+--------------------+----------+
only showing top 5 rows
Use the class in the evaluation module for model evaluation. Use BinaryClassificationEvaluator
to calculate the AUC score in binary classification like this time.
#Model Evaluation (AUC)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluater = BinaryClassificationEvaluator(
rawPredictionCol='prediction',
labelCol='Survived',
metricName='areaUnderROC' #Defaults to AUC score
)
auc_train = evaluater.evaluate(train_result.predictions)
auc_valid = evaluater.evaluate(valid_result.predictions)
#Verification
print('AUC score')
print('train:', auc_train)
print('valid:', auc_valid)
>>>
AUC score
train: 0.7889497287232977
valid: 0.8065704293474217
If you want to use'f1',' recall',' precision', etc., you need to use MulticlassClassificationEvaluator
.
#Model evaluation (F1)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluater = MulticlassClassificationEvaluator(
predictionCol='prediction',
labelCol='Survived',
metricName='f1'
)
f1_train = evaluater.evaluate(train_result.predictions)
f1_valid = evaluater.evaluate(valid_result.predictions)
print('f1 score')
print('train:', f1_train)
print('valid:', f1_valid)
>>>
f1 score
train: 0.8032854491383006
valid: 0.8270897150503879
Since it is not preferable to delete missing data for unknown data,'Embarked' is filled with the mode'S' here.
# 'Embarked' =Complement null data with mode
train = train.fillna({'Embarked': 'S'})
test = test.fillna({'Embarked': 'S'})
The process used up to model evaluation is summarized in Pipeline. Basically, it seems to be the same as Pipeline of Scikit-learn.
#Pipeline settings
from pyspark.ml import Pipeline, PipelineModel
stages=[
imputer, #'Age'Missing value completion of
assembler, #'Age'When'Fare'Into one vector
sc, #'Age'When'Fare'Standardized
indexer, #'Sex'When'Embarked'Quantify
ohe, #OneHotEncodeing
assembler2, #Summarize the features to be used
lr #Logistic regression
]
pipeline = Pipeline(stages=stages)
#Model learning
pipeline_model = pipeline.fit(train)
#Save model
model_path = './model/lr_base'
pipeline_model.save(model_path)
#Model reload
pipeline_model = PipelineModel.load(model_path)
#inference
train_result = pipeline_model.transform(train)
test_result = pipeline_model.transform(test)
#Display of forecast results
submission = test_result.withColumn('Survived', test_result['prediction'].cast('int'))
submission = submission.select('PassengerId', 'Survived')
submission.show()
>>>
+-----------+--------+
|PassengerId|Survived|
+-----------+--------+
| 892| 0|
| 893| 0|
| 894| 0|
| 895| 0|
| 896| 1|
| 897| 0|
| 898| 1|
| 899| 0|
| 900| 1|
| 901| 0|
| 902| 0|
| 903| 0|
| 904| 1|
| 905| 0|
| 906| 1|
| 907| 1|
| 908| 0|
| 909| 0|
| 910| 1|
| 911| 1|
+-----------+--------+
only showing top 20 rows
Using the Titanic competition as a theme, I implemented a basic machine learning method in PySpark. Spark ML seems to be strongly influenced by Scikit-learn, and there are many similarities, but for some reason standardization can only be done column by column (if used as it is), or Label encoding is used before One-Hot. I felt that there was a slight habit of having to do it. In the future, I would like to use not only Spark ML but also Spark external packages.
https://data-analysis-stats.jp/spark/pyspark%E3%81%A7%E6%AC%A0%E6%90%8D%E5%80%A4null%E3%81%AE%E5%8F%96%E3%82%8A%E6%89%B1%E3%81%84%E6%96%B9%E6%B3%95/ https://qiita.com/calderarie/items/d37462d7eafef04891b8 https://qiita.com/gsy0911/items/a4cb8b2d54d6341558e0#window%E9%96%A2%E6%95%B0 PySpark official documentation
Recommended Posts