This article summarizes the features and data manipulation of PySpark
.
Partitioning
and Bucketing
An important concept of ʻApache Hive` in the operation of PySpark
For more information on Partitioning and Bucketing, please visit [https://data-flair.training/blogs/hive-partitioning-vs-bucketing/).
If your data is slow, you may want to use Ganglia to see how your compute resources are being used.
In particular, network traffic (= data transfer volume) is low and processing often takes time. In this case, you may be able to solve it by taking the following measures.
df = df.cache ()
The following variables are assumed to have been generated.
spark
: spark contextpath
: Some file pathCaution
> df.show ()
may not always be correct because it is intended to capture the image.s3: //
because it's supposed to run on AWS.import
The following items are mainly imported when using spark
.
# from pyspark.sql.functions import *In some cases,
#I like to specify the namespace of the function with F because it is easier to understand.
#However, F violates PEP8. .. ..
from pyspark.sql import functions as F
from pyspark.sql.types import FloatType, TimestampType, StringType
from pyspark.sql.window import Window
JST
because the time on the instance is ʻUTC`.spark.conf.set("spark.sql.session.timeZone", "Asia/Tokyo")
initialize spark
It is not necessary on JupyterHub
of ʻEMR, but when executing with python script, Instance initialization of
spark` is required.
# spark initialization
spark = SparkSession.builder.appName("{your app name here}").getOrCreate()
df = spark.read.parquet(path)
*
# dt=2020-01-01/Read all the files below
df = spark.read.parquet("s3://some-bucket/data/dt=2020-01-01/*.parquet")
# dt=2020-01-01/From dt=2020-01-31/Read all the files below
df = spark.read.parquet("s3://some-bucket/data/dt=2020-01-*/*.parquet")
#reads the file in the path included in the paths list
df = spark.read.parquet(*paths)
#Load partition in addition to columns
df = spark.read.option("basePath", parent_path).parquet(*paths)
By saving the result of lazy evaluation in memory, high-speed processing becomes possible.
It is better to cache ()
frequently used data and use it especially after the processing.
#On-memory cache
df = df.cache()
#Or
#On-memory cache by default, cache destination can be changed to storage etc. with optional arguments
df = df.persist()
df.printSchema()
root
|-- id: string (nullable = true)
|-- name: string (nullable = true)
#csv (header is not given in this case)
df.write.csv(path)
# parquet
df.write.parquet(path)
#In the case of csv, it is not given unless the output setting of header is set.
df.write.mode("overwrite").option("header", "True").csv(path)
# or
df.write.mode("overwrite").csv(path, header=True)
#In case of parquet, it is output by default even if header is not specified.
df.write.parquet(path)
# gzip with csv
df.write.csv(path, compression="gzip")
#snappy with parquet (should be snappy compressed by default?)
df.write.option("compression", "snappy").parquet(path)
In the case of the following example, it will be output to the folder /dt={dt_col}/count={count_col}/{file}.parquet
.
df.repartition("dt", "count").write.partitionBy("dt", "count").parqeut(path)
If you perform coalesce after multiple processes, the processing speed will slow down, so if possible, it is better to output the file normally and then coalesce the read again.
#May be slow after multiple processes
df.coalesce(1).write.csv(path, header=True)
#Recommended if possible (output → read → output)
df.write.parquet(path)
alt_df = spark.read.parquet(path)
alt_df.coalesce(1).write.csv(path, header=True)
df.repartition(20).write.parquet(path)
# write.mode()Arguments that can be used in'overwrite', 'append', 'ignore', 'error', 'errorifexists'
#I often use overwrite
#Normally, an error will occur if the file exists in the output destination folder.
df.write.parquet(path)
#If you want to overwrite
df.write.mode("overwrite").parquet(path)
#If you want to add to the current folder
df.write.mode("append").parquet(path)
This is a method of creating a data frame programmatically, not from reading a file.
#Create a single column data frame
id_list = ["A001", "A002", "B001"]
df = spark.createDataFrame(id_list, StringType()).toDF("id")
#The elements inside are tuple,Finally specify the name of the column
df = spark.createDataFrame([
("a", None, None),
("a", "code1", None),
("a", "code2", "name2"),
], ["id", "code", "name"])
> df.show()
+---+-----+-----+
| id| code| name|
+---+-----+-----+
| a| null| null|
| a|code1| null|
| a|code2|name2|
+---+-----+-----+
# =======================
#When creating using rdd once
rdd = sc.parallelize(
[
(0, "A", 223, "201603", "PORT"),
(0, "A", 22, "201602", "PORT"),
(0, "A", 422, "201601", "DOCK"),
(1, "B", 3213, "201602", "DOCK"),
(1, "B", 3213, "201601", "PORT"),
(2, "C", 2321, "201601", "DOCK")
]
)
df_data = spark.createDataFrame(rdd, ["id","type", "cost", "date", "ship"])
> df.show()
+---+----+----+------+----+
| id|type|cost| date|ship|
+---+----+----+------+----+
| 0| A| 223|201603|PORT|
| 0| A| 22|201602|PORT|
| 0| A| 422|201601|DOCK|
| 1| B|3213|201602|DOCK|
| 1| B|3213|201601|PORT|
| 2| C|2321|201601|DOCK|
+---+----+----+------+----+
withColumn ()
)In PySpark, analysis is often performed using "the process of adding a new column".
# new_col_Create a new column called name and give it a literal value (= constant) of 1
df = df.withColumn("new_col_name", F.lit(1))
#Give the read file path
df = df.withColumn("file_path", F.input_file_name())
#Get the file name from the read file path
df = df.withColumn("file_name", F.split(col("file_path"), "/").getItem({int:Last index value}))
#Specified by a character string
df = df.withColumn("total_count", F.col("total_count").cast("double"))
#Specified by PySpark types
df = df.withColumn("value", F.lit("1").cast(StringType()))
#If you want to add a column of values according to the conditions
# F.when(condtion, value).otherwise(else_value)
df = df.withColumn("is_even", F.when(F.col("number") % 2 == 0, 1).otherwise(0))
#In case of multiple conditions
df = df.withColumn("search_result", F.when( (F.col("id") % 2 == 0) & (F.col("room") % 2 == 0), 1).otherwise(0))
df = df.withColumn("is_touched", F.col("value").isNotNull())
df = df.withColumn("replaced_id", F.regexp_replace(F.col("id"), "A", "C"))
# date time -> epoch time
df = df.withColumn("epochtime", F.unix_timestamp(F.col("timestamp"), "yyyy-MM-dd HH:mm:ssZ"))
# epoch time -> date time
# 1555259647 -> 2019-04-14 16:34:07
df = df.withColumn("datetime", F.to_timestamp(df["epochtime"]))
# datetime -> string
# 2019-04-14 16:34:07 -> 2019-04-14
string_format = "yyyy-MM-dd"
df = df.withColumn("dt", F.date_format(F.col("datetime"), string_format))
# epoch time:A string of numbers of about 10 digits. Seconds since January 1, 1970
df = df.withColumn("hour", F.hour(F.col("epochtime")))
df = df.withColumn("hour", F.hour(F.col("timestamp")))
#Truncate datetime to the specified time width
df = df.withColumn("hour", F.date_trunc("hour", "datetime"))
df = df.withColumn("week", F.date_trunc("week", "datetime"))
There are many other functions available in Takutan withColumn
.
Please also see the reference site.
The method to join two DataFrames horizontally / vertically is join () / union ()
.
#Specify columns to join with on
df = left_df.join(right_df, on="id")
# data-For different columns for each frame
df = left_df.join(right_df, left_df.id_1 == right_df.id_2)
#You can also specify the combination method
# how:= inner, left, right, left_semi, left_anti, cross, outer, full, left_outer, right_outer
df = left_df.join(right_df, on="id", how="inner")
df = left_df.join(right_df, on=["id", "dt"])
df = left_df.join(F.broadcast(right_df), on="id")
df = upper_df.union(bottom_df)
It is often used when reading csv without a column name.
#If there is no column name`_c0`From`_c{n}`Is given the column name
df = df.withColumnRenamed("_c0", "id")
df = df.select("id")
df = df.select("id").distinct()
# count()Often used in combination with
#Example: Unique number of a certain id
print(df.select("id").distinct().count())
df = df.drop("id")
# simple
df = df.dropna()
# using subset
df = df.na.drop(subset=["lat", "lon"])
#Simple case
df = df.select("id").select(F.collect_list("id"))
id_list = df.first()[0]
> id_list => ["A001", "A002", "B001"]
#Can also be used in combination with groupBy
df = df.groupBy("id").agg(F.collect_set("code"), F.collect_list("name"))
>
+---+-----------------+------------------+
| id|collect_set(code)|collect_list(name)|
+---+-----------------+------------------+
| a| [code1, code2]| [name2]|
+---+-----------------+------------------+
#Get the value of the data frame directly
df = df.groupBy().avg()
avg_attribute = df.collect()[0]
> print(avg_attribute["avg({col_name})"])
{averaged_value}
filter
You can use F.col ()
to apply filtering to specific columns
# using spark.function
df = df.filter(F.col("id") == "A001")
# pandas-like
df = df.filter(df['id'] == "A001")
df = df.filter(df.id == "A001")
However, if possible, you should create a spark dataframe from date_list and join it.
df = df.filter(F.col("dt").isin(date_list))
orderBy
Sorting is not suitable for distributed processing, so it is better not to do so much.
#Single column only
df = df.orderBy("count", ascending=False)
#Multi-condition sort
df = df.orderBy(F.col("id").asc(), F.col("cound").desc())
groupBy (aggregate)
# count()
df = df.groupBy("id").count()
# multiple
# alias()The column name is changed by the function
#Example: user aggregation
df = df.groupBy("id").agg(
F.count(F.lit(1)).alias("count"),
F.mean(F.col("diff")).alias("diff_mean"),
F.stddev(F.col("diff")).alias("diff_stddev"),
F.min(F.col("diff")).alias("diff_min"),
F.max(F.col("diff")).alias("diff_max")
)
> df.show()
(abridgement)
# =======================
#Example: Aggregation by user date and time
df = df.groupBy("id", "dt").agg(
F.count(F.lit(1)).alias("count")
)
> df.show()
+---+-----------+------+
| id| dt| count|
+---+-----------+------+
| a| 2020/01/01| 7|
| a| 2020/01/02| 5|
| a| 2020/01/03| 4|
+---+-----------+------+
# ===========================
#Example: Aggregation by user date / time / location
df = df.groupBy("id", "dt", "location_id").agg(
F.count(F.lit(1)).alias("count")
)
> df.show()
+---+-----------+------------+------+
| id| dt| location_id| count|
+---+-----------+------------+------+
| a| 2020/01/01| A| 2|
| a| 2020/01/01| B| 3|
| a| 2020/01/01| C| 2|
: : : : :
+---+-----------+------------+------+
#Example: Number of user uniques by date
df = df.groupBy("dt").agg(countDistinct("id").alias("id_count"))
> df.show()
+-----------+---------+
| dt| id_count|
+-----------+---------+
| 2020/01/01| 7|
| 2020/01/02| 5|
| 2020/01/03| 4|
+-----------+---------+
# ===============================
#Example: Number of days each user has been in contact at least once
df = df.groupBy("id").agg(countDistinct("dt").alias("dt_count"))
> df.show()
+---+---------+
| id| dt_count|
+---+---------+
| a| 10|
| b| 15|
| c| 4|
+---+---------+
group_columns = ["id", "dt"]
df = ad_touched_visit_df.groupBy(*group_columns).count()
w = Window().orderBy(F.col("id"))
df = df.withColumn("row_num", F.row_number().over(w))
#Add the data from the previous row as a column
w = Window.partitionBy("id").orderBy("timestamp")
df = df.withColumn("prev_timestamp", F.lag(df["timestamp"]).over(w))
It is strongly deprecated as it is incompatible with distributed environments. It is better to use it only when it must be for.
for row in df.rdd.collect():
do_some_with(row['id'])
Recommended Posts