Dans pyspark (Spark SQL), ffill (forward fill) et [bfill] dans pandas
](Https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.bfill.html) (remplissage vers l'arrière) n'existe pas par défaut.
Par conséquent, si un traitement précis est nécessaire, il est nécessaire de le concevoir vous-même. (Mémo personnel)
Il semble que vous devriez le faire comme le lien ci-dessus
Les références ont presque résolu le problème, mais je vais essayer.
Environnement de vérification:
#Importation des bibliothèques requises
import sys
from typing import (
Union,
List,
)
import numpy as np
import pandas as pd
import pyspark.sql
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
col,
first,
last,
)
from pyspark.sql.window import Window
#Générer une session Spark
spark = SparkSession.builder.getOrCreate()
numpy
, pandas
sont principalement inclus pour la création de données de test
def ffill(
target: str,
partition: Union[str, List[str]],
sort_key: str,
) -> pyspark.sql.Column:
"""
forward fill
Args:
target:Colonne pour laquelle la valeur nulle est remplie en avant
partition:Colonne pour regrouper les enregistrements (liste pour plusieurs)
sort_key:Colonne de détermination de la commande
"""
window = Window.partitionBy(partition) \
.orderBy(sort_key) \
.rowsBetween(-sys.maxsize, 0)
filled_column = last(col(target), ignorenulls=True).over(window)
return filled_column
def bfill(
target: str,
partition: Union[str, List[str]],
sort_key: str,
) -> pyspark.sql.Column:
"""
backward fill
Args:
target:Colonne pour laquelle la valeur nulle est remplie à nouveau
partition:Colonne pour regrouper les enregistrements (liste pour plusieurs)
sort_key:Colonne de détermination de la commande
"""
window = Window.partitionBy(partition) \
.orderBy(sort_key) \
.rowsBetween(0, sys.maxsize)
filled_column = first(col(target), ignorenulls=True).over(window)
return filled_column
J'ai écrit un aperçu avec docstring, mais c'est un bref supplément
window
:
--Séparez le DataFrame avec la colonne spécifiée par partition
, et triez chaque bloc par la colonne spécifiée par sort_key
.
--sys.maxsize
signifie pratiquement infinirowsBetween (start, end)
est la cible du traitement En vous basant sur l'enregistrement, créez une plage comprenant les enregistrements de «start» avant «end»sys.maxsize
est utilisé, dans le cas de ffill," du premier enregistrement à l'enregistrement à traiter dans la plage séparée par partitionBy ", et dans le cas de bfill," de l'enregistrement à traiter dans la plage séparée par partitionBy ". Se réfère à la plage "jusqu'au dernier enregistrement"filled_column
window
ci-dessus à tous les enregistrements
--Remplir / bfill avec last
/ first
, le dernier / premier null de la plage de fenêtres Renvoie une valeur qui n'est pas#Préparation des données de test
test = pd.DataFrame({
"id": ['A']*10 + ['B']*10,
"timestamp": pd.date_range(start="2020-08-12T15:30:00",periods=20, freq='1s'),
"value": [0, None, None, 3, None, 5, 3, None, None, 2, None, 4, 2, None, None, 9, 2, 8, None, None]
})
df_test = spark.createDataFrame(test) \
.replace(np.nan, None) #S'il s'agit d'un type numérique`NaN`Sera stocké, remplacez-le par null
df_test.show()
# +---+-------------------+-----+
# | id| timestamp|value|
# +---+-------------------+-----+
# | A|2020-08-12 15:30:00| 0.0|
# | A|2020-08-12 15:30:01| null|
# | A|2020-08-12 15:30:02| null|
# | A|2020-08-12 15:30:03| 3.0|
# | A|2020-08-12 15:30:04| null|
# | A|2020-08-12 15:30:05| 5.0|
# | A|2020-08-12 15:30:06| 3.0|
# | A|2020-08-12 15:30:07| null|
# | A|2020-08-12 15:30:08| null|
# | A|2020-08-12 15:30:09| 2.0|
# | B|2020-08-12 15:30:10| null|
# | B|2020-08-12 15:30:11| 4.0|
# | B|2020-08-12 15:30:12| 2.0|
# | B|2020-08-12 15:30:13| null|
# | B|2020-08-12 15:30:14| null|
# | B|2020-08-12 15:30:15| 9.0|
# | B|2020-08-12 15:30:16| 2.0|
# | B|2020-08-12 15:30:17| 8.0|
# | B|2020-08-12 15:30:18| null|
# | B|2020-08-12 15:30:19| null|
# +---+-------------------+-----+
#Pratiquez la complétion nulle en utilisant la fonction créée précédemment
df_test \
.withColumn(
"ffill",
ffill(target="value", partition="id", sort_key="timestamp")
) \
.withColumn(
"bfill",
bfill(target="value", partition="id", sort_key="timestamp")
) \
.show()
# +---+-------------------+-----+------------+-------------+
# | id| timestamp|value|forward fill|backward fill|
# +---+-------------------+-----+------------+-------------+
# | B|2020-08-12 15:30:10| null| null| 4.0|
# | B|2020-08-12 15:30:11| 4.0| 4.0| 4.0|
# | B|2020-08-12 15:30:12| 2.0| 2.0| 2.0|
# | B|2020-08-12 15:30:13| null| 2.0| 9.0|
# | B|2020-08-12 15:30:14| null| 2.0| 9.0|
# | B|2020-08-12 15:30:15| 9.0| 9.0| 9.0|
# | B|2020-08-12 15:30:16| 2.0| 2.0| 2.0|
# | B|2020-08-12 15:30:17| 8.0| 8.0| 8.0|
# | B|2020-08-12 15:30:18| null| 8.0| null|
# | B|2020-08-12 15:30:19| null| 8.0| null|
# | A|2020-08-12 15:30:00| 0.0| 0.0| 0.0|
# | A|2020-08-12 15:30:01| null| 0.0| 3.0|
# | A|2020-08-12 15:30:02| null| 0.0| 3.0|
# | A|2020-08-12 15:30:03| 3.0| 3.0| 3.0|
# | A|2020-08-12 15:30:04| null| 3.0| 5.0|
# | A|2020-08-12 15:30:05| 5.0| 5.0| 5.0|
# | A|2020-08-12 15:30:06| 3.0| 3.0| 3.0|
# | A|2020-08-12 15:30:07| null| 3.0| 2.0|
# | A|2020-08-12 15:30:08| null| 3.0| 2.0|
# | A|2020-08-12 15:30:09| 2.0| 2.0| 2.0|
# +---+-------------------+-----+------------+-------------+
――C'est un peu difficile à voir, mais il est séparé par ʻid: A, B --Trier par
horodatage dans chaque délimiteur ʻid
et remplir et remplir la valeur manquante de valeur
--Comparer valeur
avec remplissage avant
et remplissage arrière
#Préparation des données de test (2)
test2 = pd.DataFrame({
"key1": ['A']*10 + ['B']*10,
"key2": [1, 2]*10,
"timestamp": pd.date_range(start="2020-08-12T15:30:00",periods=20, freq='1s'),
"value": ["foo", None, None, "bar", None, "hoge", "foofoo", None, None, "foobar", None, "aaa", "bbb", None, None, "ccc", "xxx", "zzz", None, None]
})
df_test2 = spark.createDataFrame(test2)
df_test2.show()
# +----+----+-------------------+------+
# |key1|key2| timestamp| value|
# +----+----+-------------------+------+
# | A| 1|2020-08-12 15:30:00| foo|
# | A| 2|2020-08-12 15:30:01| null|
# | A| 1|2020-08-12 15:30:02| null|
# | A| 2|2020-08-12 15:30:03| bar|
# | A| 1|2020-08-12 15:30:04| null|
# | A| 2|2020-08-12 15:30:05| hoge|
# | A| 1|2020-08-12 15:30:06|foofoo|
# | A| 2|2020-08-12 15:30:07| null|
# | A| 1|2020-08-12 15:30:08| null|
# | A| 2|2020-08-12 15:30:09|foobar|
# | B| 1|2020-08-12 15:30:10| null|
# | B| 2|2020-08-12 15:30:11| aaa|
# | B| 1|2020-08-12 15:30:12| bbb|
# | B| 2|2020-08-12 15:30:13| null|
# | B| 1|2020-08-12 15:30:14| null|
# | B| 2|2020-08-12 15:30:15| ccc|
# | B| 1|2020-08-12 15:30:16| xxx|
# | B| 2|2020-08-12 15:30:17| zzz|
# | B| 1|2020-08-12 15:30:18| null|
# | B| 2|2020-08-12 15:30:19| null|
# +----+----+-------------------+------+
#Pratiquez la complétion nulle en utilisant la fonction créée précédemment
df_test2 \
.withColumn(
"forward fill",
ffill(target="value", partition=["key1", "key2"], sort_key="timestamp")
) \
.withColumn(
"backward fill",
bfill(target="value", partition=["key1", "key2"], sort_key="timestamp")
) \
.show()
# +----+----+-------------------+------+------------+-------------+
# |key1|key2| timestamp| value|forward fill|backward fill|
# +----+----+-------------------+------+------------+-------------+
# | B| 1|2020-08-12 15:30:10| null| null| bbb|
# | B| 1|2020-08-12 15:30:12| bbb| bbb| bbb|
# | B| 1|2020-08-12 15:30:14| null| bbb| xxx|
# | B| 1|2020-08-12 15:30:16| xxx| xxx| xxx|
# | B| 1|2020-08-12 15:30:18| null| xxx| null|
# | A| 2|2020-08-12 15:30:01| null| null| bar|
# | A| 2|2020-08-12 15:30:03| bar| bar| bar|
# | A| 2|2020-08-12 15:30:05| hoge| hoge| hoge|
# | A| 2|2020-08-12 15:30:07| null| hoge| foobar|
# | A| 2|2020-08-12 15:30:09|foobar| foobar| foobar|
# | A| 1|2020-08-12 15:30:00| foo| foo| foo|
# | A| 1|2020-08-12 15:30:02| null| foo| foofoo|
# | A| 1|2020-08-12 15:30:04| null| foo| foofoo|
# | A| 1|2020-08-12 15:30:06|foofoo| foofoo| foofoo|
# | A| 1|2020-08-12 15:30:08| null| foofoo| null|
# | B| 2|2020-08-12 15:30:11| aaa| aaa| aaa|
# | B| 2|2020-08-12 15:30:13| null| aaa| ccc|
# | B| 2|2020-08-12 15:30:15| ccc| ccc| ccc|
# | B| 2|2020-08-12 15:30:17| zzz| zzz| zzz|
# | B| 2|2020-08-12 15:30:19| null| zzz| null|
# +----+----+-------------------+------+------------+-------------+
Concernant la valeur de retour de la fonction créée dans l'exemple ci-dessus
display(ffill(target="value", partition="id", sort_key="timestamp"))
# Column<b'last(value, true) OVER (PARTITION BY id ORDER BY timestamp ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)'>
display(bfill(target="value", partition="id", sort_key="timestamp"))
# Column<b'first(value, true) OVER (PARTITION BY id ORDER BY timestamp ASC NULLS FIRST ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)'>
display(ffill(target="value", partition=["key1", "key2"], sort_key="timestamp"))
# Column<b'last(value, true) OVER (PARTITION BY key1, key2 ORDER BY timestamp ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)'>
display(bfill(target="value", partition=["key1", "key2"], sort_key="timestamp"))
# Column<b'first(value, true) OVER (PARTITION BY key1, key2 ORDER BY timestamp ASC NULLS FIRST ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)'>
pyspark.sql
pour générer une instruction SQL appropriée basée sur les arguments.
--Donner un DataFrame comme correspondant à la <table> '' du
FROM