Cet article est basé sur le contenu de la documentation PySpark 3.0.1. En connaissant le comportement des fonctions qui peuvent être facilement appelées, je pense qu'il sera possible de formuler une politique de mise en œuvre plus rapidement.
Introduction
JupyterLab PySpark Kernel
Tout d'abord, créez un environnement pour exécuter PySpark à l'aide de Dataproc de GCP.
gcloud dataproc clusters create <cluster name> --enable-component-gateway --region <region> --zone <zone> \
--master-machine-type n1-standard-2 --master-boot-disk-size 500 --num-workers 2 \
--worker-machine-type n1-standard-2 --worker-boot-disk-size 500 --image-version 1.4-debian10 --optional-components ANACONDA,JUPYTER \
--scopes https://www.googleapis.com/auth/cloud-platform,https://www.googleapis.com/auth/devstorage.full_control --project <project id>
Nous avons défini --enable-component-gateway afin que nous puissions sélectionner JupyterLab dans l'interface utilisateur du cluster. Pour les données, utilisez les [Titanic Data] de kaggle (https://www.kaggle.com/c/titanic).
import library
import pandas as pd
import numpy as np
import sys
import time
from google.cloud import storage as gcs
from io import BytesIO
from fs_gcsfs import GCSFS
from fs import open_fs
import gcsfs
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import Row ,functions as F
from pyspark.sql import SQLContext
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.functions import *
from pyspark.sql.window import Window
sys.path
>>
['/opt/conda/anaconda/lib/python36.zip',
'/opt/conda/anaconda/lib/python3.6',
'/opt/conda/anaconda/lib/python3.6/lib-dynload',
'',
'/opt/conda/anaconda/lib/python3.6/site-packages',
'/usr/lib/spark/python',
'/opt/conda/anaconda/lib/python3.6/site-packages/IPython/extensions',
'/root/.ipython']
gcsfs = GCSFS(bucket_name = <bucket_name>)
gcsfs.tree()
>>
|-- .ipynb_checkpoints
| -- error (resource '/.ipynb_checkpoints' not found)
|-- google-cloud-dataproc-metainfo
| -- error (resource '/google-cloud-dataproc-metainfo' not found)
|-- notebooks
| -- jupyter
| -- Untitled.ipynb
-- titanic_data.csv
load CSV data (as Pandas DataFrame) from Cloud Storage
bucket_name = "<bucket_name>"
file_name = "titanic_data.csv"
project_id = "<project_id>"
# with Pandas
titanic_data = pd.read_csv("<Cloud Storage Path>")
# with gcsfs
fs = gcsfs.GCSFileSystem(project = project_id)
with fs.open('{}/titanic_data.csv'.format(bucket_name)) as file:
titanic_data = pd.read_csv(file)
# with gcs
client = gcs.Client()
bucket = client.get_bucket(bucket_name)
blob = gcs.Blob(file_name, bucket)
content = blob.download_as_string()
titanic_data = pd.read_csv(BytesIO(content))
write PySPark DataFrame (as Pandas DataFrame) to Cloud Storage
titanic_data.toPandas().to_csv('gs://pyspark_output/output.csv', header = True)
Spark DataFrame
# spark = SparkSession.builder.appName("titanic_data_").getOrCreate()
schema = StructType([
StructField("PassengerId", IntegerType(), True),
StructField("Survived", IntegerType(), True),
StructField("Pclass", IntegerType(), True),
StructField("Name", StringType(), True),
StructField("Sex", StringType(), True),
StructField("Age", DoubleType(), True),
StructField("SibSp", IntegerType(), True),
StructField("Parch", IntegerType(), True),
StructField("Ticket", StringType(), True),
StructField("Fare", DoubleType(), True),
StructField("Cabin", StringType(), True),
StructField("Embarked", StringType(), True),
])
titanic_data = spark.read.format("com.databricks.spark.csv").options(header = "true").load("gs://{}/titanic_data.csv".format(bucket_name), schema = schema)
# titanic_data = spark.read.format("com.databricks.spark.csv").option("header", "true").load("gs://{}/titanic_data.csv".format(bucket_name), schema = schema)
titanic_data.show(5)
>>
+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass| Name| Sex|Age|SibSp|Parch| Ticket| Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
| 1| 0| 3|Braund, Mr. Owen ...| male| 22| 1| 0| A/5 21171| 7.25| null| S|
| 2| 1| 1|Cumings, Mrs. Joh...|female| 38| 1| 0| PC 17599|71.2833| C85| C|
| 3| 1| 3|Heikkinen, Miss. ...|female| 26| 0| 0|STON/O2. 3101282| 7.925| null| S|
| 4| 1| 1|Futrelle, Mrs. Ja...|female| 35| 1| 0| 113803| 53.1| C123| S|
| 5| 0| 3|Allen, Mr. Willia...| male| 35| 0| 0| 373450| 8.05| null| S|
+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
only showing top 5 rows
Supprimez la valeur manquante de Embarqué.
titanic_data = titanic_data.filter(F.col('Embarked').isNotNull())
titanic_data.filter(F.col('Embarked').isNull()).count()
>> 0
titanic_data
>> DataFrame[PassengerId: int, Survived: int, Pclass: int, Name: string, Sex: string, Age: double, SibSp: int, Parch: int, Ticket: string, Fare: double, Cabin: string, Embarked: string, Embarked_labeled: string, Age_cat: string]
titanic_data.count()
>> 889
titanic_data.printSchema()
>>
root
|-- PassengerId: integer (nullable = true)
|-- Survived: integer (nullable = true)
|-- Pclass: integer (nullable = true)
|-- Name: string (nullable = true)
|-- Sex: string (nullable = true)
|-- Age: double (nullable = true)
|-- SibSp: integer (nullable = true)
|-- Parch: integer (nullable = true)
|-- Ticket: string (nullable = true)
|-- Fare: double (nullable = true)
|-- Cabin: string (nullable = true)
|-- Embarked: string (nullable = true)
|-- Embarked_labeled: string (nullable = true)
|-- Age_cat: string (nullable = true)
titanic_data.dtypes
>>
[('PassengerId', 'int'),
('Survived', 'int'),
('Pclass', 'int'),
('Name', 'string'),
('Sex', 'string'),
('Age', 'double'),
('SibSp', 'int'),
('Parch', 'int'),
('Ticket', 'string'),
('Fare', 'double'),
('Cabin', 'string'),
('Embarked', 'string'),
('Embarked_labeled', 'string'),
('Age_cat', 'string')]
DataFrame ⇔ RDD
# DataFrame -> RDD
titanic_data.rdd
# DataFrame <- RDD
spark.createDataFrame(titanic_data.rdd)
SparkSQL
titanic_data.registerTempTable('titanic_data_table')
spark.sql("select PassengerID, Embarked from titanic_data_table where Embarked = 'S' ").toPandas().iloc[0:5]
>>
PassengerID Embarked
0 1 S
1 3 S
2 4 S
3 5 S
4 7 S
spark.sql("select age, \
case when age <= 12 then 'C' \
when age between 13 and 19 then 'T' \
when age between 20 and 25 then '1' \
when age between 26 and 34 then '2' \
when age between 35 and 49 then '3' \
when age >= 50 then '4' end as age_cat \
from titanic_data_table").toPandas().iloc[0:5]
>>
age age_cat
0 22 1
1 38 3
2 26 2
3 35 3
4 35 3
Functions
Max / Min / Avg (or Mean) / Std / Sum / Count
titanic_data.agg({'Fare': 'max'}).collect()
>> [Row(max(Fare)=512.3292)]
Join
join(other, on=None, how=None)[source] Joins with another DataFrame, using the given join expression.
on – a string for the join column name, a list of column names, a join expression (Column), or a list of Columns. If on is a string or a list of strings indicating the name of the join column(s), the column(s) must exist on both sides, and this performs an equi-join.
how – str, default inner. Must be one of: inner, cross, outer, full, fullouter, full_outer, left, leftouter, left_outer, right, rightouter, right_outer, semi, leftsemi, left_semi, anti, leftanti and left_anti.
random_number_pddf = pd.DataFrame(np.random.randint(100, size = 889)[:, np.newaxis], columns = {'random_number'})
data_for_join_ = pd.concat([titanic_data_.PassengerId, random_number_pddf], axis = 1)
data_for_join = spark.createDataFrame(data_for_join_)
# how = 'inner'
data_join = titanic_data.join(data_for_join, titanic_data.PassengerId == data_for_join.PassengerId, how = 'inner').drop(data_for_join.PassengerId)
data_join.printSchema()
>>
root
|-- PassengerId: integer (nullable = true)
|-- Survived: integer (nullable = true)
|-- Pclass: integer (nullable = true)
|-- Name: string (nullable = true)
|-- Sex: string (nullable = true)
|-- Age: double (nullable = true)
|-- SibSp: integer (nullable = true)
|-- Parch: integer (nullable = true)
|-- Ticket: string (nullable = true)
|-- Fare: double (nullable = true)
|-- Cabin: string (nullable = true)
|-- Embarked: string (nullable = true)
|-- Embarked_labeled: string (nullable = true)
|-- Age_cat: string (nullable = true)
|-- random_number: double (nullable = true)
groupBy
time_ = time.time()
titanic_data.groupby('Embarked', 'Survived').count().sort('count', ascending = False).show(5)
time_ = time.time() - time_
>>
+--------+--------+-----+
|Embarked|Survived|count|
+--------+--------+-----+
| S| 0| 427|
| S| 1| 217|
| C| 1| 93|
| C| 0| 75|
| Q| 0| 47|
+--------+--------+-----+
only showing top 5 rows
np.round(time_, 3)
>> 2.426
time_ = time.time()
spark.sql("select Embarked, Survived, \
count(*) as per_Embarked_cat \
from titanic_data_table \
group by Embarked, Survived order by per_Embarked_cat desc").show(5)
time_ = time.time() - time_
>>
+--------+--------+----------------+
|Embarked|Survived|per_Embarked_cat|
+--------+--------+----------------+
| S| 0| 427|
| S| 1| 217|
| C| 1| 93|
| C| 0| 75|
| Q| 0| 47|
+--------+--------+----------------+
only showing top 5 rows
np.round(time_, 3)
>> 1.794
Drop
titanic_data.select('Pclass', 'Age', 'Name').drop('Name').show(5)
>>
+------+----+
|Pclass| Age|
+------+----+
| 3|22.0|
| 1|38.0|
| 3|26.0|
| 1|35.0|
| 3|35.0|
+------+----+
only showing top 5 rows
Duplicate
titanic_data.select('Pclass', 'Age', 'Name').show(10)
>>
+------+----+--------------------+
|Pclass| Age| Name|
+------+----+--------------------+
| 3|22.0|Braund, Mr. Owen ...|
| 1|38.0|Cumings, Mrs. Joh...|
| 3|26.0|Heikkinen, Miss. ...|
| 1|35.0|Futrelle, Mrs. Ja...|
| 3|35.0|Allen, Mr. Willia...|
| 3|null| Moran, Mr. James|
| 1|54.0|McCarthy, Mr. Tim...|
| 3| 2.0|Palsson, Master. ...|
| 3|27.0|Johnson, Mrs. Osc...|
| 2|14.0|Nasser, Mrs. Nich...|
+------+----+--------------------+
only showing top 10 rows
titanic_data.select('Name').distinct().count()
>> 889
titanic_data.select('Name').count()
>> 889
titanic_data.dropDuplicates(['Name']).select('Pclass', 'Age', 'Name').show(10)
>>
+------+----+--------------------+
|Pclass| Age| Name|
+------+----+--------------------+
| 2|40.0|Watt, Mrs. James...|
| 1|36.0|Young, Miss. Mari...|
| 1|null|Parr, Mr. William...|
| 3|19.0|Soholt, Mr. Peter...|
| 3|31.0|Goldsmith, Mrs. F...|
| 3|42.0| Dimic, Mr. Jovan|
| 1|48.0|Harper, Mr. Henry...|
| 1|38.0|Reuchlin, Jonkhee...|
| 2|18.0|Fahlstrom, Mr. Ar...|
| 2|42.0|Hosono, Mr. Masabumi|
+------+----+--------------------+
only showing top 10 rows
titanic_data.dropDuplicates(['Name']).select('Pclass', 'Age', 'Name').count()
>> 889
Explode / Split / Regexp Replace
explode: Returns a new row for each element in the given array or map. Uses the default column name col for elements in the array and key and value for elements in the map unless specified otherwise.
split: Splits str around matches of the given pattern.
regexp_replace: Replace all substrings of the specified string value that match regexp with rep.
titanic_data.select('name').show(5)
>>
+--------------------+
| name|
+--------------------+
|Braund, Mr. Owen ...|
|Cumings, Mrs. Joh...|
|Heikkinen, Miss. ...|
|Futrelle, Mrs. Ja...|
|Allen, Mr. Willia...|
+--------------------+
only showing top 5 rows
titanic_data.withColumn('Name', explode(split(regexp_replace(F.col('name'), "(^\[)|(\]$)", ""), ", "))).select('name').show(5)
>>
+--------------------+
| name|
+--------------------+
| Braund|
| Mr. Owen Harris|
| Cumings|
|Mrs. John Bradley...|
| Heikkinen|
+--------------------+
only showing top 5 rows
Window-function
Ce qui suit traite de la fonction Fenêtre.
from pyspark.sql.window import Window
from pyspark.sql.functions import *
Rank
window_ = Window.partitionBy('Embarked').orderBy('Fare')
titanic_data.withColumn('rank', rank().over(window_)).select('Embarked', 'Fare', 'rank').show(5)
>>
+--------+------+----+
|Embarked| Fare|rank|
+--------+------+----+
| Q| 6.75| 1|
| Q| 6.75| 1|
| Q|6.8583| 3|
| Q| 6.95| 4|
| Q|7.6292| 5|
+--------+------+----+
only showing top 5 rows
Percent Rank
window_ = Window.partitionBy('Embarked').orderBy('Fare')
titanic_data.withColumn('percent_rank', percent_rank().over(window_)).select('Embarked', 'Fare', F.round('percent_rank', 2).alias('percent_rank')).show(5)
>>
+--------+------+------------+
|Embarked| Fare|percent_rank|
+--------+------+------------+
| Q| 6.75| 0.0|
| Q| 6.75| 0.0|
| Q|6.8583| 0.03|
| Q| 6.95| 0.04|
| Q|7.6292| 0.05|
+--------+------+------------+
only showing top 5 rows
Dense Rank
window_ = Window.partitionBy('Embarked').orderBy('Fare')
titanic_data.withColumn('dense_rank', dense_rank().over(window_)).select('Embarked', 'Fare', 'dense_rank').show(5)
>>
+--------+------+----------+
|Embarked| Fare|dense_rank|
+--------+------+----------+
| Q| 6.75| 1|
| Q| 6.75| 1|
| Q|6.8583| 2|
| Q| 6.95| 3|
| Q|7.6292| 4|
+--------+------+----------+
only showing top 5 rows
Row Number
window_ = Window.partitionBy('Embarked').orderBy('Fare')
titanic_data.withColumn('row_number', row_number().over(window_)).select('Embarked', 'Fare', 'row_number').show(5)
>>
+--------+------+----------+
|Embarked| Fare|row_number|
+--------+------+----------+
| Q| 6.75| 1|
| Q| 6.75| 2|
| Q|6.8583| 3|
| Q| 6.95| 4|
| Q|7.6292| 5|
+--------+------+----------+
only showing top 5 rows
Cume Dist
window_ = Window.partitionBy('Embarked').orderBy('Fare')
titanic_data.withColumn('cumulative_dist', cume_dist().over(window_)).select('Embarked', 'Fare', F.round('cumulative_dist', 2).alias('cumulative_dist')).show(10)
>>
+--------+------+---------------+
|Embarked| Fare|cumulative_dist|
+--------+------+---------------+
| Q| 6.75| 0.03|
| Q| 6.75| 0.03|
| Q|6.8583| 0.04|
| Q| 6.95| 0.05|
| Q|7.6292| 0.06|
| Q| 7.725| 0.08|
| Q|7.7292| 0.09|
| Q|7.7333| 0.14|
| Q|7.7333| 0.14|
| Q|7.7333| 0.14|
+--------+------+---------------+
only showing top 10 rows
Lead
window_ = Window.partitionBy('Embarked').orderBy('Fare')
titanic_data.withColumn('lead', lead('Fare', 2).over(window_)).select('Embarked', 'Fare', 'lead').show(5)
>>
+--------+------+------+
|Embarked| Fare| lead|
+--------+------+------+
| Q| 6.75|6.8583|
| Q| 6.75| 6.95|
| Q|6.8583|7.6292|
| Q| 6.95| 7.725|
| Q|7.6292|7.7292|
+--------+------+------+
only showing top 5 rows
Lag
window_ = Window.partitionBy('Embarked').orderBy('Fare')
titanic_data.withColumn('lag', lag('Fare', 2).over(window_)).select('Embarked', 'Fare', 'lag').show(5)
>>
+--------+------+------+
|Embarked| Fare| lag|
+--------+------+------+
| Q| 6.75| null|
| Q| 6.75| null|
| Q|6.8583| 6.75|
| Q| 6.95| 6.75|
| Q|7.6292|6.8583|
+--------+------+------+
only showing top 5 rows
Aggregate
window_ = Window.partitionBy('Embarked').orderBy('Fare')
window_agg = Window.partitionBy('Embarked')
titanic_data.withColumn('row', row_number().over(window_))\
.withColumn('avg', avg(F.col('Fare')).over(window_agg))\
.withColumn('max', max(F.col('Fare')).over(window_agg))\
.select('Embarked', 'row', 'avg', 'max').show(5)
>>
+--------+---+------------------+----+
|Embarked|row| avg| max|
+--------+---+------------------+----+
| Q| 1|13.276029870129872|90.0|
| Q| 2|13.276029870129872|90.0|
| Q| 3|13.276029870129872|90.0|
| Q| 4|13.276029870129872|90.0|
| Q| 5|13.276029870129872|90.0|
+--------+---+------------------+----+
only showing top 5 rows
titanic_data.withColumn('row', row_number().over(window_))\
.withColumn('avg', avg(F.col('Fare')).over(window_agg).alias('avg'))\
.withColumn('max', max(F.col('Fare')).over(window_agg)).where(F.col('row') == 1)\
.select('Embarked', F.round(F.col('avg'), 2).alias('Fare_avg'), F.round(F.col('max'), 2).alias('Fare_max'))\
.sort('Fare_avg', ascending = False).show()
>>
+--------+--------+--------+
|Embarked|Fare_avg|Fare_max|
+--------+--------+--------+
| C| 59.95| 512.33|
| S| 27.08| 263.0|
| Q| 13.28| 90.0|
+--------+--------+--------+
UserDefinedFunction
UserDefinedFunction【PySpark】
# from pyspark.sql.functions import UserDefinedFunction
# from pyspark.sql import SQLContext, Row
# from pyspark.sql.types import *
def LabelEncoder(x):
if x == 'S':
x_ = 0
elif x == 'C':
x_ = 1
elif x == 'Q':
x_ = 2
return x_
udf_label_Encoder = UserDefinedFunction(LabelEncoder)
titanic_data.filter('Age > 12').withColumn('Embarked_labeld', udf_label_Encoder(F.col('Embarked'))).select('PassengerId', 'Embarked_labeld').show(5)
# titanic_data.filter('Age > 12').withColumn('Embarked_labeld', udf_label_Encoder('Embarked')).select('PassengerId', 'Embarked_labeld').show(5)
>>
+-----------+--------------+
|PassengerId|Embarked_label|
+-----------+--------------+
| 1| 0|
| 2| 1|
| 3| 0|
| 4| 0|
| 5| 0|
+-----------+--------------+
only showing top 5 rows
titanic_data.select('PassengerId', udf_label_Encoder('Embarked').alias('Embarked_label')).filter('PassengerId >= 2').show(5)
>>
+-----------+--------------+
|PassengerId|Embarked_label|
+-----------+--------------+
| 2| 1|
| 3| 0|
| 4| 0|
| 5| 0|
| 6| 2|
+-----------+--------------+
only showing top 5 rows
# from pyspark.sql.functions import when
titanic_data.withColumn('Embarked', when(F.col('Embarked') == 'S', '0').when(F.col('Embarked') == 'C', '1').otherwise('3'))\
.withColumnRenamed('Embarked', 'Embarked_label').select('PassengerId', 'Embarked_label').filter('PassengerId >= 2').show(5)
>>
+-----------+--------------+
|PassengerId|Embarked_label|
+-----------+--------------+
| 2| 1|
| 3| 0|
| 4| 0|
| 5| 0|
| 6| 2|
+-----------+--------------+
only showing top 5 rows
UserDefinedFunction【SparkSQL】
En modifiant le fuseau horaire, vous pouvez exécuter des fonctions PySpark sur le DataFrame nouvellement créé par SaprkSQL.
spark.conf.set("spark.sql.session.timeZone", "Asia/Tokyo")
titanic_data.registerTempTable('titanic_data_table')
spark.udf.register('LabelEncoder_', LabelEncoder)
spark.sql('''select PassengerId, LabelEncoder_(Embarked) as Embarked_labeled from titanic_data_table''').show(5)
>>
+-----------+--------------+
|PassengerId|Embarked_label|
+-----------+--------------+
| 1| 0|
| 2| 1|
| 3| 0|
| 4| 0|
| 5| 0|
+-----------+--------------+
only showing top 5 rows
PySpark offre une multitude de fonctions. J'ai essayé de résumer certains d'entre eux.
Recommended Posts