Fill the missing value (null) of DataFrame with the values before and after with pyspark

Overview

In pyspark (Spark SQL), ffill (forward fill) and [bfill] in pandas ](Https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.bfill.html) (backward fill) does not exist by default. Therefore, if close processing is required, it is necessary to devise it by yourself. (Personal memo)

References (answer)

It seems that you should do it like the above link

Practical example

The references have almost solved the problem, but I'll give it a try.

Verification environment:

Preparation

#Import of required libraries
import sys
from typing import (
    Union,
    List,
)

import numpy as np
import pandas as pd
import pyspark.sql
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col,
    first,
    last,
)
from pyspark.sql.window import Window

#Generate spark session
spark = SparkSession.builder.getOrCreate()

numpy, pandas are mainly included for test data creation

Implement ffill and bfill in function

def ffill(
    target: str,
    partition: Union[str, List[str]],
    sort_key: str,
) -> pyspark.sql.Column:
    """
    forward fill
    
    Args:
        target:Column for which null value is forward filled
        partition:Column for grouping records (List for multiple records)
        sort_key:Column for determining the order
    """
    window = Window.partitionBy(partition) \
         .orderBy(sort_key) \
         .rowsBetween(-sys.maxsize, 0)

    filled_column = last(col(target), ignorenulls=True).over(window)

    return filled_column


def bfill(
    target: str,
    partition: Union[str, List[str]],
    sort_key: str,
) -> pyspark.sql.Column:
    """
    backward fill
    
    Args:
        target:Column for which null value is back filled
        partition:Column for grouping records (List for multiple records)
        sort_key:Column for determining the order
    """
    window = Window.partitionBy(partition) \
         .orderBy(sort_key) \
         .rowsBetween(0, sys.maxsize)

    filled_column = first(col(target), ignorenulls=True).over(window)

    return filled_column

I wrote an overview with docstring, but it is a brief supplement

Example 1

#Test data preparation
test = pd.DataFrame({
    "id": ['A']*10 + ['B']*10,
    "timestamp": pd.date_range(start="2020-08-12T15:30:00",periods=20, freq='1s'),
    "value": [0, None, None, 3, None, 5, 3, None, None, 2, None, 4, 2, None, None, 9, 2, 8, None, None]
})
df_test = spark.createDataFrame(test) \
    .replace(np.nan, None) #If it is a numeric type`NaN`Will be stored, so replace it with null

df_test.show()
# +---+-------------------+-----+
# | id|          timestamp|value|
# +---+-------------------+-----+
# |  A|2020-08-12 15:30:00|  0.0|
# |  A|2020-08-12 15:30:01| null|
# |  A|2020-08-12 15:30:02| null|
# |  A|2020-08-12 15:30:03|  3.0|
# |  A|2020-08-12 15:30:04| null|
# |  A|2020-08-12 15:30:05|  5.0|
# |  A|2020-08-12 15:30:06|  3.0|
# |  A|2020-08-12 15:30:07| null|
# |  A|2020-08-12 15:30:08| null|
# |  A|2020-08-12 15:30:09|  2.0|
# |  B|2020-08-12 15:30:10| null|
# |  B|2020-08-12 15:30:11|  4.0|
# |  B|2020-08-12 15:30:12|  2.0|
# |  B|2020-08-12 15:30:13| null|
# |  B|2020-08-12 15:30:14| null|
# |  B|2020-08-12 15:30:15|  9.0|
# |  B|2020-08-12 15:30:16|  2.0|
# |  B|2020-08-12 15:30:17|  8.0|
# |  B|2020-08-12 15:30:18| null|
# |  B|2020-08-12 15:30:19| null|
# +---+-------------------+-----+

#Practice null completion using the function you created earlier
df_test \
    .withColumn(
    "ffill",
    ffill(target="value", partition="id", sort_key="timestamp")
) \
    .withColumn(
    "bfill",
    bfill(target="value", partition="id", sort_key="timestamp")
) \
    .show()
# +---+-------------------+-----+------------+-------------+
# | id|          timestamp|value|forward fill|backward fill|
# +---+-------------------+-----+------------+-------------+
# |  B|2020-08-12 15:30:10| null|        null|          4.0|
# |  B|2020-08-12 15:30:11|  4.0|         4.0|          4.0|
# |  B|2020-08-12 15:30:12|  2.0|         2.0|          2.0|
# |  B|2020-08-12 15:30:13| null|         2.0|          9.0|
# |  B|2020-08-12 15:30:14| null|         2.0|          9.0|
# |  B|2020-08-12 15:30:15|  9.0|         9.0|          9.0|
# |  B|2020-08-12 15:30:16|  2.0|         2.0|          2.0|
# |  B|2020-08-12 15:30:17|  8.0|         8.0|          8.0|
# |  B|2020-08-12 15:30:18| null|         8.0|         null|
# |  B|2020-08-12 15:30:19| null|         8.0|         null|
# |  A|2020-08-12 15:30:00|  0.0|         0.0|          0.0|
# |  A|2020-08-12 15:30:01| null|         0.0|          3.0|
# |  A|2020-08-12 15:30:02| null|         0.0|          3.0|
# |  A|2020-08-12 15:30:03|  3.0|         3.0|          3.0|
# |  A|2020-08-12 15:30:04| null|         3.0|          5.0|
# |  A|2020-08-12 15:30:05|  5.0|         5.0|          5.0|
# |  A|2020-08-12 15:30:06|  3.0|         3.0|          3.0|
# |  A|2020-08-12 15:30:07| null|         3.0|          2.0|
# |  A|2020-08-12 15:30:08| null|         3.0|          2.0|
# |  A|2020-08-12 15:30:09|  2.0|         2.0|          2.0|
# +---+-------------------+-----+------------+-------------+

――It's a little hard to see, but it's separated by ʻid: A, B --Sort by timestamp in each ʻid delimiter and ffill and bfill the missing value of value. --Compare value with forward fill and backward fill

Example 2

――In the previous example, partitionBy was done in one column, so try specifying multiple columns. --The column to be completed was a numerical value earlier, but now let's see that a character string also works.

#Test data preparation (2)
test2 = pd.DataFrame({
    "key1": ['A']*10 + ['B']*10,
    "key2": [1, 2]*10,
    "timestamp": pd.date_range(start="2020-08-12T15:30:00",periods=20, freq='1s'),
    "value": ["foo", None, None, "bar", None, "hoge", "foofoo", None, None, "foobar", None, "aaa", "bbb", None, None, "ccc", "xxx", "zzz", None, None]
})
df_test2 = spark.createDataFrame(test2)

df_test2.show()
# +----+----+-------------------+------+
# |key1|key2|          timestamp| value|
# +----+----+-------------------+------+
# |   A|   1|2020-08-12 15:30:00|   foo|
# |   A|   2|2020-08-12 15:30:01|  null|
# |   A|   1|2020-08-12 15:30:02|  null|
# |   A|   2|2020-08-12 15:30:03|   bar|
# |   A|   1|2020-08-12 15:30:04|  null|
# |   A|   2|2020-08-12 15:30:05|  hoge|
# |   A|   1|2020-08-12 15:30:06|foofoo|
# |   A|   2|2020-08-12 15:30:07|  null|
# |   A|   1|2020-08-12 15:30:08|  null|
# |   A|   2|2020-08-12 15:30:09|foobar|
# |   B|   1|2020-08-12 15:30:10|  null|
# |   B|   2|2020-08-12 15:30:11|   aaa|
# |   B|   1|2020-08-12 15:30:12|   bbb|
# |   B|   2|2020-08-12 15:30:13|  null|
# |   B|   1|2020-08-12 15:30:14|  null|
# |   B|   2|2020-08-12 15:30:15|   ccc|
# |   B|   1|2020-08-12 15:30:16|   xxx|
# |   B|   2|2020-08-12 15:30:17|   zzz|
# |   B|   1|2020-08-12 15:30:18|  null|
# |   B|   2|2020-08-12 15:30:19|  null|
# +----+----+-------------------+------+

#Practice null completion using the function you created earlier
df_test2 \
    .withColumn(
    "forward fill",
    ffill(target="value", partition=["key1", "key2"], sort_key="timestamp")
) \
    .withColumn(
    "backward fill",
    bfill(target="value", partition=["key1", "key2"], sort_key="timestamp")
) \
    .show()
# +----+----+-------------------+------+------------+-------------+
# |key1|key2|          timestamp| value|forward fill|backward fill|
# +----+----+-------------------+------+------------+-------------+
# |   B|   1|2020-08-12 15:30:10|  null|        null|          bbb|
# |   B|   1|2020-08-12 15:30:12|   bbb|         bbb|          bbb|
# |   B|   1|2020-08-12 15:30:14|  null|         bbb|          xxx|
# |   B|   1|2020-08-12 15:30:16|   xxx|         xxx|          xxx|
# |   B|   1|2020-08-12 15:30:18|  null|         xxx|         null|
# |   A|   2|2020-08-12 15:30:01|  null|        null|          bar|
# |   A|   2|2020-08-12 15:30:03|   bar|         bar|          bar|
# |   A|   2|2020-08-12 15:30:05|  hoge|        hoge|         hoge|
# |   A|   2|2020-08-12 15:30:07|  null|        hoge|       foobar|
# |   A|   2|2020-08-12 15:30:09|foobar|      foobar|       foobar|
# |   A|   1|2020-08-12 15:30:00|   foo|         foo|          foo|
# |   A|   1|2020-08-12 15:30:02|  null|         foo|       foofoo|
# |   A|   1|2020-08-12 15:30:04|  null|         foo|       foofoo|
# |   A|   1|2020-08-12 15:30:06|foofoo|      foofoo|       foofoo|
# |   A|   1|2020-08-12 15:30:08|  null|      foofoo|         null|
# |   B|   2|2020-08-12 15:30:11|   aaa|         aaa|          aaa|
# |   B|   2|2020-08-12 15:30:13|  null|         aaa|          ccc|
# |   B|   2|2020-08-12 15:30:15|   ccc|         ccc|          ccc|
# |   B|   2|2020-08-12 15:30:17|   zzz|         zzz|          zzz|
# |   B|   2|2020-08-12 15:30:19|  null|         zzz|         null|
# +----+----+-------------------+------+------------+-------------+

--In this example, it is separated by the combination of key1 and key2. --In this case as well, each delimiter is sorted by timestamp, and if you compare value with forward fill and backward fill, you can see that null is complemented by the values before and after.

Supplement

Regarding the return value of the function created in the above example

display(ffill(target="value", partition="id", sort_key="timestamp"))
# Column<b'last(value, true) OVER (PARTITION BY id ORDER BY timestamp ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)'>

display(bfill(target="value", partition="id", sort_key="timestamp"))
# Column<b'first(value, true) OVER (PARTITION BY id ORDER BY timestamp ASC NULLS FIRST ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)'>

display(ffill(target="value", partition=["key1", "key2"], sort_key="timestamp"))
# Column<b'last(value, true) OVER (PARTITION BY key1, key2 ORDER BY timestamp ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)'>

display(bfill(target="value", partition=["key1", "key2"], sort_key="timestamp"))
# Column<b'first(value, true) OVER (PARTITION BY key1, key2 ORDER BY timestamp ASC NULLS FIRST ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)'>

--It feels like generating an appropriate SQL statement based on the arguments using the function of pyspark.sql. --If you give a DataFrame as corresponding to <table> of FROM <table> in SQL, it returns the actual value. --Use in combination with DataFrame's select and withColumn methods

Recommended Posts

Fill the missing value (null) of DataFrame with the values before and after with pyspark
[python] plot the values ​​before and after the conversion of yeojohnson conversion
Notify the contents of the task before and after executing the task in Fabric
I tried to take the difference of Config before and after work with pyATS / Genie self-made script
Fill in missing values with Scikit-learn impute
Fill the browser with the width of Jupyter Notebook
Describe ec2 with boto3 and retrieve the value
[Python Data Frame] When the value is empty, fill it with the value of another column.
[Python beginner's memo] Importance and method of confirming missing value NaN before data analysis
Notes and Tips on Vertical Joining of PySpark DataFrame
Find the sum of unique values with pandas crosstab
Visualize the range of interpolation and extrapolation with python
Take the value of SwitchBot thermo-hygrometer with Raspberry Pi
Log the value of SwitchBot thermo-hygrometer with Raspberry Pi
How to extract null values and non-null values with pandas
[Django 2.2] Sort and get the value of the relation destination
Extract the maximum value with pandas and change that value
The value of meta when specifying a function with no return value in Dask dataframe apply
Acquire / display the values of the acceleration sensor and geomagnetic sensor of the Alps IoT Smart Module with anyPi
Animate the alpha and beta values of the world's top market cap stocks with pandas + matplotlib
Remove double-byte spaces before and after the character string (python)
See the power of speeding up with NumPy and SciPy
Your URL didn't respond with the value of the challenge parameter.
The true value of Terraform automation starting with Oracle Cloud
Play with the password mechanism of GitHub Webhook and Python