Notes and Tips on Vertical Joining of PySpark DataFrame

Introduction

As a reminder, I would like to summarize the unexpectedly unknown points regarding the vertical connection of PySpark's DataFrame.

The content of the article is based on Spark 2.4.

Vertical connection of PySpark

About the difference between vertically connected methods

There are three types of vertically connected methods of DataFrame.

Difference between union and union All

There is actually no inductive difference between the two methods uniont and unionAll, both of which simply vertically join two DataFrames.

If you imagine SQL, it is easy to misunderstand that duplication control is performed in union, but in both cases duplication control is not performed. Therefore, if duplicate control is required, it is necessary to use the dinstinct method after vertical coupling.

In v2.0 and above, the use of union is recommended.

union and unionAll


df1 = spark.createDataFrame([(1, 2, 3), (4, 5, 6)], ['x', 'y', 'z'])
df2 = spark.createDataFrame([(4, 5, 6), (7, 8, 9)], ['x', 'y', 'z'])

df_union = df1.union(df2)
df_unionAll = df1.unionAll(df2)

print('df1')
df1.show()
print('df2')
df2.show()

# df1
# +---+---+---+
# |  x|  y|  z|
# +---+---+---+
# |  1|  2|  3|
# |  4|  5|  6|
# +---+---+---+
#
# df2
# +---+---+---+
# |  x|  y|  z|
# +---+---+---+
# |  4|  5|  6|
# |  7|  8|  9|
# +---+---+---+

print('union')
df_union.show()
print('unionAll')
df_unionAll.show()

# union
# +---+---+---+
# |  x|  y|  z|
# +---+---+---+
# |  1|  2|  3|
# |  4|  5|  6|
# |  4|  5|  6|
# |  7|  8|  9|
# +---+---+---+
#
# unionAll
# +---+---+---+
# |  x|  y|  z|
# +---+---+---+
# |  1|  2|  3|
# |  4|  5|  6|
# |  4|  5|  6|
# |  7|  8|  9|
# +---+---+---+

Difference between union and unionByName

The difference between union and unionByName is that it refers to the column name of DataFrame at the time of vertical join.

union joins the first columns of two DataFrames, joins the second columns, and so on, taking into account the arrangement of the columns in the DataFrame. In other words, in the case of union, the column name is not seen at the time of joining. Even if the DataFrames have the same columns, if the order is different, they will be combined based on the column name of the DataFrame that called the method, and those in the same column will not be combined. On the other hand, unionByName refers to the column name of each DataFrame and joins the same column names.

Therefore, if the schemas of the two DataFrames you want to combine are unified, it is safe to use unionByName.

union and unionByName


df1 = spark.createDataFrame([(1, 2, 3)], ['x', 'y', 'z'])
df2 = spark.createDataFrame([(4, 5, 6)], ['z', 'x', 'y'])

df_union = df1.union(df2)
df_unionByName = df1.unionByName(df2)

print('df1')
df1.show()
print('df2')
df2.show()

# df1
# +---+---+---+
# |  x|  y|  z|
# +---+---+---+
# |  1|  2|  3|
# +---+---+---+
#
# df2
# +---+---+---+
# |  z|  x|  y|
# +---+---+---+
# |  4|  5|  6|
# +---+---+---+

print('union')
df_union.show()
print('unionByName')
df_unionByName.show()

# union
# +---+---+---+
# |  x|  y|  z|
# +---+---+---+
# |  1|  2|  3|
# |  4|  5|  6|
# +---+---+---+
#
# unionByName
# +---+---+---+
# |  x|  y|  z|
# +---+---+---+
# |  1|  2|  3|
# |  5|  6|  4|
# +---+---+---+

About Cast at the time of joining

When vertically joining DataFrames, if the types of columns to be joined are different, it may be implicitly cast. (Some patterns cause an error without being done)

Even in the implicit Cast, the pattern in which a numeric type such as Integer is converted to a String is particularly troublesome, and it should be noted that this can occur sufficiently due to differences in the handling of flag values.

Here, an example of union is shown, but the same is true for unionByName (experientially recognized).

Cast at the time of vertical connection


from pyspark.sql.functions import col
from pyspark.sql.types import *

df = spark.createDataFrame([(1, 'x', True)], ['long', 'str', 'bool']).withColumn('int', col('long').cast('int'))

df.show()
df.printSchema()

# +------+---+----+---+
# |bigint|str|bool|int|
# +------+---+----+---+
# |     1|  x|true|  1|
# +------+---+----+---+

# root
#  |-- bigint: long (nullable = true)
#  |-- str: string (nullable = true)
#  |-- bool: boolean (nullable = true)
#  |-- int: integer (nullable = true)

df.select('int').union(df.select('str')).printSchema()

# root
#  |-- int: string (nullable = true)

df.select('int').union(df.select('long')).printSchema()

# root
# |-- int: long (nullable = true)

#This will result in an error
# df.select('bool').union(df.select('str'))

About vertical connection of multiple DataFrames

Vertically joined methods can only support joining two DataFrames. If there are 3 or more DataFrames you want to combine, you can do the following.

Vertical combination of multiple DataFrames


from functools import reduce
from pyspark.sql import DataFrame

df1 = spark.createDataFrame([(1, 2, 3)], ['x', 'y', 'z'])
df2 = spark.createDataFrame([(4, 5, 6)], ['x', 'y', 'z'])
df3 = spark.createDataFrame([(7, 8, 9)], ['x', 'y', 'z'])

df = reduce(DataFrame.unionByName, [df1, df2, df3])
df.show()

# +---+---+---+
# |  x|  y|  z|
# +---+---+---+
# |  1|  2|  3|
# |  4|  5|  6|
# |  7|  8|  9|
# +---+---+---+

Recommended Posts

Notes and Tips on Vertical Joining of PySpark DataFrame
Notes on tf.function and Tracing
Notes on * args and ** kargs
Notes on pyenv and Atom
Introduction and tips of mlflow.Tracking
Notes on Python and dictionary types
Notes on using post-receive and post-merge
Notes on standard input / output of Go
Notes on building Python and pyenv on Mac
Fill the missing value (null) of DataFrame with the values before and after with pyspark
Notes on installing Python3 and using pip on Windows7
Basic operation of Python Pandas Series and Dataframe (1)
Notes on exchanging and multiple assignment of Python variable values ​​learned in quiz format