As a reminder, I would like to summarize the unexpectedly unknown points regarding the vertical connection of PySpark's DataFrame.
The content of the article is based on Spark 2.4.
There are three types of vertically connected methods of DataFrame.
There is actually no inductive difference between the two methods uniont and unionAll, both of which simply vertically join two DataFrames.
If you imagine SQL, it is easy to misunderstand that duplication control is performed in union, but in both cases duplication control is not performed. Therefore, if duplicate control is required, it is necessary to use the dinstinct method after vertical coupling.
In v2.0 and above, the use of union is recommended.
union and unionAll
df1 = spark.createDataFrame([(1, 2, 3), (4, 5, 6)], ['x', 'y', 'z'])
df2 = spark.createDataFrame([(4, 5, 6), (7, 8, 9)], ['x', 'y', 'z'])
df_union = df1.union(df2)
df_unionAll = df1.unionAll(df2)
print('df1')
df1.show()
print('df2')
df2.show()
# df1
# +---+---+---+
# | x| y| z|
# +---+---+---+
# | 1| 2| 3|
# | 4| 5| 6|
# +---+---+---+
#
# df2
# +---+---+---+
# | x| y| z|
# +---+---+---+
# | 4| 5| 6|
# | 7| 8| 9|
# +---+---+---+
print('union')
df_union.show()
print('unionAll')
df_unionAll.show()
# union
# +---+---+---+
# | x| y| z|
# +---+---+---+
# | 1| 2| 3|
# | 4| 5| 6|
# | 4| 5| 6|
# | 7| 8| 9|
# +---+---+---+
#
# unionAll
# +---+---+---+
# | x| y| z|
# +---+---+---+
# | 1| 2| 3|
# | 4| 5| 6|
# | 4| 5| 6|
# | 7| 8| 9|
# +---+---+---+
The difference between union and unionByName is that it refers to the column name of DataFrame at the time of vertical join.
union joins the first columns of two DataFrames, joins the second columns, and so on, taking into account the arrangement of the columns in the DataFrame. In other words, in the case of union, the column name is not seen at the time of joining. Even if the DataFrames have the same columns, if the order is different, they will be combined based on the column name of the DataFrame that called the method, and those in the same column will not be combined. On the other hand, unionByName refers to the column name of each DataFrame and joins the same column names.
Therefore, if the schemas of the two DataFrames you want to combine are unified, it is safe to use unionByName.
union and unionByName
df1 = spark.createDataFrame([(1, 2, 3)], ['x', 'y', 'z'])
df2 = spark.createDataFrame([(4, 5, 6)], ['z', 'x', 'y'])
df_union = df1.union(df2)
df_unionByName = df1.unionByName(df2)
print('df1')
df1.show()
print('df2')
df2.show()
# df1
# +---+---+---+
# | x| y| z|
# +---+---+---+
# | 1| 2| 3|
# +---+---+---+
#
# df2
# +---+---+---+
# | z| x| y|
# +---+---+---+
# | 4| 5| 6|
# +---+---+---+
print('union')
df_union.show()
print('unionByName')
df_unionByName.show()
# union
# +---+---+---+
# | x| y| z|
# +---+---+---+
# | 1| 2| 3|
# | 4| 5| 6|
# +---+---+---+
#
# unionByName
# +---+---+---+
# | x| y| z|
# +---+---+---+
# | 1| 2| 3|
# | 5| 6| 4|
# +---+---+---+
When vertically joining DataFrames, if the types of columns to be joined are different, it may be implicitly cast. (Some patterns cause an error without being done)
Even in the implicit Cast, the pattern in which a numeric type such as Integer is converted to a String is particularly troublesome, and it should be noted that this can occur sufficiently due to differences in the handling of flag values.
Here, an example of union is shown, but the same is true for unionByName (experientially recognized).
Cast at the time of vertical connection
from pyspark.sql.functions import col
from pyspark.sql.types import *
df = spark.createDataFrame([(1, 'x', True)], ['long', 'str', 'bool']).withColumn('int', col('long').cast('int'))
df.show()
df.printSchema()
# +------+---+----+---+
# |bigint|str|bool|int|
# +------+---+----+---+
# | 1| x|true| 1|
# +------+---+----+---+
# root
# |-- bigint: long (nullable = true)
# |-- str: string (nullable = true)
# |-- bool: boolean (nullable = true)
# |-- int: integer (nullable = true)
df.select('int').union(df.select('str')).printSchema()
# root
# |-- int: string (nullable = true)
df.select('int').union(df.select('long')).printSchema()
# root
# |-- int: long (nullable = true)
#This will result in an error
# df.select('bool').union(df.select('str'))
Vertically joined methods can only support joining two DataFrames. If there are 3 or more DataFrames you want to combine, you can do the following.
Vertical combination of multiple DataFrames
from functools import reduce
from pyspark.sql import DataFrame
df1 = spark.createDataFrame([(1, 2, 3)], ['x', 'y', 'z'])
df2 = spark.createDataFrame([(4, 5, 6)], ['x', 'y', 'z'])
df3 = spark.createDataFrame([(7, 8, 9)], ['x', 'y', 'z'])
df = reduce(DataFrame.unionByName, [df1, df2, df3])
df.show()
# +---+---+---+
# | x| y| z|
# +---+---+---+
# | 1| 2| 3|
# | 4| 5| 6|
# | 7| 8| 9|
# +---+---+---+
Recommended Posts