In pyspark (Spark SQL), ffill (forward fill) and [bfill] in pandas
](Https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.bfill.html) (backward fill) does not exist by default.
Therefore, if close processing is required, it is necessary to devise it by yourself. (Personal memo)
It seems that you should do it like the above link
The references have almost solved the problem, but I'll give it a try.
Verification environment:
#Import of required libraries
import sys
from typing import (
Union,
List,
)
import numpy as np
import pandas as pd
import pyspark.sql
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
col,
first,
last,
)
from pyspark.sql.window import Window
#Generate spark session
spark = SparkSession.builder.getOrCreate()
numpy
, pandas
are mainly included for test data creation
def ffill(
target: str,
partition: Union[str, List[str]],
sort_key: str,
) -> pyspark.sql.Column:
"""
forward fill
Args:
target:Column for which null value is forward filled
partition:Column for grouping records (List for multiple records)
sort_key:Column for determining the order
"""
window = Window.partitionBy(partition) \
.orderBy(sort_key) \
.rowsBetween(-sys.maxsize, 0)
filled_column = last(col(target), ignorenulls=True).over(window)
return filled_column
def bfill(
target: str,
partition: Union[str, List[str]],
sort_key: str,
) -> pyspark.sql.Column:
"""
backward fill
Args:
target:Column for which null value is back filled
partition:Column for grouping records (List for multiple records)
sort_key:Column for determining the order
"""
window = Window.partitionBy(partition) \
.orderBy(sort_key) \
.rowsBetween(0, sys.maxsize)
filled_column = first(col(target), ignorenulls=True).over(window)
return filled_column
I wrote an overview with docstring, but it is a brief supplement
window
:
--Separate the DataFrame by the column specified by partition
, and sort each block by the column specified by sort_key
.
--sys.maxsize
means virtually infinity
-rowsBetween (start, end)
is the target of processing Based on the record, create a range including the records from start
before to ʻendafter. --Since
sys.maxsize` is used, in the case of ffill," from the first record to the record to be processed in the range separated by partitionBy ", and in the case of bfill," from the record to be processed in the range separated by partitionBy ". Point to the range "to the last record"filled_column
--Apply the processing using the window function by window
above to all records
--Ffill / bfill with last
/first
, the last / first null in the window range Returns a value that is not
--Non-null records return themselves
--A record that seems to be null will contain the closest non-null value in the range created by window.#Test data preparation
test = pd.DataFrame({
"id": ['A']*10 + ['B']*10,
"timestamp": pd.date_range(start="2020-08-12T15:30:00",periods=20, freq='1s'),
"value": [0, None, None, 3, None, 5, 3, None, None, 2, None, 4, 2, None, None, 9, 2, 8, None, None]
})
df_test = spark.createDataFrame(test) \
.replace(np.nan, None) #If it is a numeric type`NaN`Will be stored, so replace it with null
df_test.show()
# +---+-------------------+-----+
# | id| timestamp|value|
# +---+-------------------+-----+
# | A|2020-08-12 15:30:00| 0.0|
# | A|2020-08-12 15:30:01| null|
# | A|2020-08-12 15:30:02| null|
# | A|2020-08-12 15:30:03| 3.0|
# | A|2020-08-12 15:30:04| null|
# | A|2020-08-12 15:30:05| 5.0|
# | A|2020-08-12 15:30:06| 3.0|
# | A|2020-08-12 15:30:07| null|
# | A|2020-08-12 15:30:08| null|
# | A|2020-08-12 15:30:09| 2.0|
# | B|2020-08-12 15:30:10| null|
# | B|2020-08-12 15:30:11| 4.0|
# | B|2020-08-12 15:30:12| 2.0|
# | B|2020-08-12 15:30:13| null|
# | B|2020-08-12 15:30:14| null|
# | B|2020-08-12 15:30:15| 9.0|
# | B|2020-08-12 15:30:16| 2.0|
# | B|2020-08-12 15:30:17| 8.0|
# | B|2020-08-12 15:30:18| null|
# | B|2020-08-12 15:30:19| null|
# +---+-------------------+-----+
#Practice null completion using the function you created earlier
df_test \
.withColumn(
"ffill",
ffill(target="value", partition="id", sort_key="timestamp")
) \
.withColumn(
"bfill",
bfill(target="value", partition="id", sort_key="timestamp")
) \
.show()
# +---+-------------------+-----+------------+-------------+
# | id| timestamp|value|forward fill|backward fill|
# +---+-------------------+-----+------------+-------------+
# | B|2020-08-12 15:30:10| null| null| 4.0|
# | B|2020-08-12 15:30:11| 4.0| 4.0| 4.0|
# | B|2020-08-12 15:30:12| 2.0| 2.0| 2.0|
# | B|2020-08-12 15:30:13| null| 2.0| 9.0|
# | B|2020-08-12 15:30:14| null| 2.0| 9.0|
# | B|2020-08-12 15:30:15| 9.0| 9.0| 9.0|
# | B|2020-08-12 15:30:16| 2.0| 2.0| 2.0|
# | B|2020-08-12 15:30:17| 8.0| 8.0| 8.0|
# | B|2020-08-12 15:30:18| null| 8.0| null|
# | B|2020-08-12 15:30:19| null| 8.0| null|
# | A|2020-08-12 15:30:00| 0.0| 0.0| 0.0|
# | A|2020-08-12 15:30:01| null| 0.0| 3.0|
# | A|2020-08-12 15:30:02| null| 0.0| 3.0|
# | A|2020-08-12 15:30:03| 3.0| 3.0| 3.0|
# | A|2020-08-12 15:30:04| null| 3.0| 5.0|
# | A|2020-08-12 15:30:05| 5.0| 5.0| 5.0|
# | A|2020-08-12 15:30:06| 3.0| 3.0| 3.0|
# | A|2020-08-12 15:30:07| null| 3.0| 2.0|
# | A|2020-08-12 15:30:08| null| 3.0| 2.0|
# | A|2020-08-12 15:30:09| 2.0| 2.0| 2.0|
# +---+-------------------+-----+------------+-------------+
――It's a little hard to see, but it's separated by ʻid: A, B --Sort by
timestamp in each ʻid
delimiter and ffill and bfill the missing value of value
.
--Compare value
with forward fill
and backward fill
――In the previous example, partitionBy was done in one column, so try specifying multiple columns. --The column to be completed was a numerical value earlier, but now let's see that a character string also works.
#Test data preparation (2)
test2 = pd.DataFrame({
"key1": ['A']*10 + ['B']*10,
"key2": [1, 2]*10,
"timestamp": pd.date_range(start="2020-08-12T15:30:00",periods=20, freq='1s'),
"value": ["foo", None, None, "bar", None, "hoge", "foofoo", None, None, "foobar", None, "aaa", "bbb", None, None, "ccc", "xxx", "zzz", None, None]
})
df_test2 = spark.createDataFrame(test2)
df_test2.show()
# +----+----+-------------------+------+
# |key1|key2| timestamp| value|
# +----+----+-------------------+------+
# | A| 1|2020-08-12 15:30:00| foo|
# | A| 2|2020-08-12 15:30:01| null|
# | A| 1|2020-08-12 15:30:02| null|
# | A| 2|2020-08-12 15:30:03| bar|
# | A| 1|2020-08-12 15:30:04| null|
# | A| 2|2020-08-12 15:30:05| hoge|
# | A| 1|2020-08-12 15:30:06|foofoo|
# | A| 2|2020-08-12 15:30:07| null|
# | A| 1|2020-08-12 15:30:08| null|
# | A| 2|2020-08-12 15:30:09|foobar|
# | B| 1|2020-08-12 15:30:10| null|
# | B| 2|2020-08-12 15:30:11| aaa|
# | B| 1|2020-08-12 15:30:12| bbb|
# | B| 2|2020-08-12 15:30:13| null|
# | B| 1|2020-08-12 15:30:14| null|
# | B| 2|2020-08-12 15:30:15| ccc|
# | B| 1|2020-08-12 15:30:16| xxx|
# | B| 2|2020-08-12 15:30:17| zzz|
# | B| 1|2020-08-12 15:30:18| null|
# | B| 2|2020-08-12 15:30:19| null|
# +----+----+-------------------+------+
#Practice null completion using the function you created earlier
df_test2 \
.withColumn(
"forward fill",
ffill(target="value", partition=["key1", "key2"], sort_key="timestamp")
) \
.withColumn(
"backward fill",
bfill(target="value", partition=["key1", "key2"], sort_key="timestamp")
) \
.show()
# +----+----+-------------------+------+------------+-------------+
# |key1|key2| timestamp| value|forward fill|backward fill|
# +----+----+-------------------+------+------------+-------------+
# | B| 1|2020-08-12 15:30:10| null| null| bbb|
# | B| 1|2020-08-12 15:30:12| bbb| bbb| bbb|
# | B| 1|2020-08-12 15:30:14| null| bbb| xxx|
# | B| 1|2020-08-12 15:30:16| xxx| xxx| xxx|
# | B| 1|2020-08-12 15:30:18| null| xxx| null|
# | A| 2|2020-08-12 15:30:01| null| null| bar|
# | A| 2|2020-08-12 15:30:03| bar| bar| bar|
# | A| 2|2020-08-12 15:30:05| hoge| hoge| hoge|
# | A| 2|2020-08-12 15:30:07| null| hoge| foobar|
# | A| 2|2020-08-12 15:30:09|foobar| foobar| foobar|
# | A| 1|2020-08-12 15:30:00| foo| foo| foo|
# | A| 1|2020-08-12 15:30:02| null| foo| foofoo|
# | A| 1|2020-08-12 15:30:04| null| foo| foofoo|
# | A| 1|2020-08-12 15:30:06|foofoo| foofoo| foofoo|
# | A| 1|2020-08-12 15:30:08| null| foofoo| null|
# | B| 2|2020-08-12 15:30:11| aaa| aaa| aaa|
# | B| 2|2020-08-12 15:30:13| null| aaa| ccc|
# | B| 2|2020-08-12 15:30:15| ccc| ccc| ccc|
# | B| 2|2020-08-12 15:30:17| zzz| zzz| zzz|
# | B| 2|2020-08-12 15:30:19| null| zzz| null|
# +----+----+-------------------+------+------------+-------------+
--In this example, it is separated by the combination of key1
and key2
.
--In this case as well, each delimiter is sorted by timestamp
, and if you compare value
with forward fill
and backward fill
, you can see that null is complemented by the values before and after.
Regarding the return value of the function created in the above example
display(ffill(target="value", partition="id", sort_key="timestamp"))
# Column<b'last(value, true) OVER (PARTITION BY id ORDER BY timestamp ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)'>
display(bfill(target="value", partition="id", sort_key="timestamp"))
# Column<b'first(value, true) OVER (PARTITION BY id ORDER BY timestamp ASC NULLS FIRST ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)'>
display(ffill(target="value", partition=["key1", "key2"], sort_key="timestamp"))
# Column<b'last(value, true) OVER (PARTITION BY key1, key2 ORDER BY timestamp ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)'>
display(bfill(target="value", partition=["key1", "key2"], sort_key="timestamp"))
# Column<b'first(value, true) OVER (PARTITION BY key1, key2 ORDER BY timestamp ASC NULLS FIRST ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)'>
--It feels like generating an appropriate SQL statement based on the arguments using the function of pyspark.sql
.
--If you give a DataFrame as corresponding to <table>
of FROM <table>
in SQL, it returns the actual value.
--Use in combination with DataFrame's select
and withColumn
methods
Recommended Posts