Cet article résume les fonctionnalités et les opérations sur les données de PySpark
.
Un concept important de ʻApache Hive` dans le fonctionnement de PySpark
Pour plus d'informations sur le partitionnement et le compartimentage, voir ici.
Si vos données sont lentes, il est judicieux d'utiliser Ganglia pour voir comment vos ressources de calcul sont utilisées.
En particulier, le volume de communication réseau (= volume de transfert de données) est faible et le traitement prend souvent du temps. Dans ce cas, vous pourrez peut-être le résoudre en prenant les mesures suivantes.
df = df.cache ()
Les variables suivantes sont supposées avoir été générées.
spark
: spark contextpath
: quelque chose de chemin de fichierMise en garde
> df.show ()
peut ne pas toujours être correct dans le but de saisir l'image.import
Les éléments suivants sont principalement importés lors de l'utilisation de «spark».
# from pyspark.sql.functions import *Dans certains cas,
#J'aime spécifier l'espace de noms de la fonction avec F car c'est plus facile à comprendre.
#Cependant, F viole PEP8. .. ..
from pyspark.sql import functions as F
from pyspark.sql.types import FloatType, TimestampType, StringType
from pyspark.sql.window import Window
JST
car l'heure de l'instance est ʻUTC`.spark.conf.set("spark.sql.session.timeZone", "Asia/Tokyo")
initialize spark
Ce n'est pas nécessaire sur JupyterHub
de ʻEMR`, mais lors de l'exécution avec un script python,
L'initialisation de l'instance de «spark» est requise.
# spark initialization
spark = SparkSession.builder.appName("{your app name here}").getOrCreate()
df = spark.read.parquet(path)
*
# dt=2020-01-01/Lisez tous les fichiers ci-dessous
df = spark.read.parquet("s3://some-bucket/data/dt=2020-01-01/*.parquet")
# dt=2020-01-01/De dt=2020-01-31/Lisez tous les fichiers ci-dessous
df = spark.read.parquet("s3://some-bucket/data/dt=2020-01-*/*.parquet")
#lit les fichiers dans les chemins inclus dans la liste des chemins
df = spark.read.parquet(*paths)
#Ajouter une partition à la colonne et lire
df = spark.read.option("basePath", parent_path).parquet(*paths)
En sauvegardant le résultat de l'évaluation du retard dans la mémoire, un traitement à grande vitesse devient possible. Il est préférable de mettre en cache () les données fréquemment utilisées et de les utiliser surtout après le traitement.
#Cache en mémoire
df = df.cache()
#Ou
#Cache en mémoire par défaut, la destination du cache peut être changée en stockage, etc. avec un argument facultatif
df = df.persist()
df.printSchema()
root
|-- id: string (nullable = true)
|-- name: string (nullable = true)
#csv (l'en-tête n'est pas donné dans ce cas)
df.write.csv(path)
# parquet
df.write.parquet(path)
#Dans le cas de csv, il n'est donné que si le paramètre de sortie de l'en-tête est défini.
df.write.mode("overwrite").option("header", "True").csv(path)
# or
df.write.mode("overwrite").csv(path, header=True)
#Dans le cas du parquet, il est édité par défaut même si l'en-tête n'est pas spécifié.
df.write.parquet(path)
# gzip with csv
df.write.csv(path, compression="gzip")
#accrocheur avec du parquet (devrait-il être compressé par défaut?)
df.write.option("compression", "snappy").parquet(path)
Dans le cas de l'exemple suivant, il sera affiché dans le dossier / dt = {dt_col} / count = {count_col} / {file} .parquet
.
df.repartition("dt", "count").write.partitionBy("dt", "count").parqeut(path)
Si vous effectuez une fusion après plusieurs processus, la vitesse de traitement ralentira, donc si possible, il est préférable de sortir le fichier normalement, puis de fusionner à nouveau la lecture.
#Peut être lent après plusieurs processus
df.coalesce(1).write.csv(path, header=True)
#Recommandé si possible (sortie → lecture → sortie)
df.write.parquet(path)
alt_df = spark.read.parquet(path)
alt_df.coalesce(1).write.csv(path, header=True)
df.repartition(20).write.parquet(path)
# write.mode()Arguments pouvant être utilisés dans'overwrite', 'append', 'ignore', 'error', 'errorifexists'
#J'utilise souvent l'écrasement
#Normalement, une erreur se produit si le fichier existe dans le dossier de destination de sortie.
df.write.parquet(path)
#Si vous voulez écraser
df.write.mode("overwrite").parquet(path)
#Si vous souhaitez ajouter au dossier actuel
df.write.mode("append").parquet(path)
Il s'agit d'une méthode de création d'un bloc de données par programme, et non à partir de la lecture d'un fichier.
#Créer un bloc de données à une seule colonne
id_list = ["A001", "A002", "B001"]
df = spark.createDataFrame(id_list, StringType()).toDF("id")
#L'élément à l'intérieur est tuple,Spécifiez enfin le nom de la colonne
df = spark.createDataFrame([
("a", None, None),
("a", "code1", None),
("a", "code2", "name2"),
], ["id", "code", "name"])
> df.show()
+---+-----+-----+
| id| code| name|
+---+-----+-----+
| a| null| null|
| a|code1| null|
| a|code2|name2|
+---+-----+-----+
# =======================
#Lors de la création à l'aide de rdd une fois
rdd = sc.parallelize(
[
(0, "A", 223, "201603", "PORT"),
(0, "A", 22, "201602", "PORT"),
(0, "A", 422, "201601", "DOCK"),
(1, "B", 3213, "201602", "DOCK"),
(1, "B", 3213, "201601", "PORT"),
(2, "C", 2321, "201601", "DOCK")
]
)
df_data = spark.createDataFrame(rdd, ["id","type", "cost", "date", "ship"])
> df.show()
+---+----+----+------+----+
| id|type|cost| date|ship|
+---+----+----+------+----+
| 0| A| 223|201603|PORT|
| 0| A| 22|201602|PORT|
| 0| A| 422|201601|DOCK|
| 1| B|3213|201602|DOCK|
| 1| B|3213|201601|PORT|
| 2| C|2321|201601|DOCK|
+---+----+----+------+----+
withColumn ()
)Dans PySpark, l'analyse est souvent effectuée en utilisant le "processus d'ajout d'une nouvelle colonne".
# new_col_Créez une nouvelle colonne appelée nom et donnez-lui une valeur littérale (= constante) de 1.
df = df.withColumn("new_col_name", F.lit(1))
#Donnez le chemin du fichier lu
df = df.withColumn("file_path", F.input_file_name())
#Obtenez le nom du fichier à partir du chemin du fichier lu
df = df.withColumn("file_name", F.split(col("file_path"), "/").getItem({int:Dernière valeur d'index}))
#Spécifié par chaîne de caractères
df = df.withColumn("total_count", F.col("total_count").cast("double"))
#Spécifié par les types PySpark
df = df.withColumn("value", F.lit("1").cast(StringType()))
#Si vous souhaitez ajouter une colonne de valeurs selon les conditions
# F.when(condtion, value).otherwise(else_value)
df = df.withColumn("is_even", F.when(F.col("number") % 2 == 0, 1).otherwise(0))
#En cas de conditions multiples
df = df.withColumn("search_result", F.when( (F.col("id") % 2 == 0) & (F.col("room") % 2 == 0), 1).otherwise(0))
df = df.withColumn("is_touched", F.col("value").isNotNull())
df = df.withColumn("replaced_id", F.regexp_replace(F.col("id"), "A", "C"))
# date time -> epoch time
df = df.withColumn("epochtime", F.unix_timestamp(F.col("timestamp"), "yyyy-MM-dd HH:mm:ssZ"))
# epoch time -> date time
# 1555259647 -> 2019-04-14 16:34:07
df = df.withColumn("datetime", F.to_timestamp(df["epochtime"]))
# datetime -> string
# 2019-04-14 16:34:07 -> 2019-04-14
string_format = "yyyy-MM-dd"
df = df.withColumn("dt", F.date_format(F.col("datetime"), string_format))
# epoch time:Une chaîne de nombres d'environ 10 chiffres. Nombre de secondes depuis le 1er janvier 1970
df = df.withColumn("hour", F.hour(F.col("epochtime")))
df = df.withColumn("hour", F.hour(F.col("timestamp")))
#Tronquer datetime à la largeur de temps spécifiée
df = df.withColumn("hour", F.date_trunc("hour", "datetime"))
df = df.withColumn("week", F.date_trunc("week", "datetime"))
Il existe de nombreuses autres fonctions disponibles dans Takutan withColumn
.
Veuillez également consulter le site de référence.
La méthode pour joindre deux DataFrames horizontalement / verticalement est join () / union ()
.
#Spécifiez les colonnes à rejoindre sur
df = left_df.join(right_df, on="id")
# data-Pour différentes colonnes pour chaque cadre
df = left_df.join(right_df, left_df.id_1 == right_df.id_2)
#Vous pouvez également spécifier la méthode de combinaison
# how:= inner, left, right, left_semi, left_anti, cross, outer, full, left_outer, right_outer
df = left_df.join(right_df, on="id", how="inner")
df = left_df.join(right_df, on=["id", "dt"])
df = left_df.join(F.broadcast(right_df), on="id")
df = upper_df.union(bottom_df)
Il est souvent utilisé lors de la lecture de csv sans nom de colonne.
#S'il n'y a pas de nom de colonne`_c0`De`_c{n}`Reçoit le nom de la colonne
df = df.withColumnRenamed("_c0", "id")
df = df.select("id")
df = df.select("id").distinct()
# count()Souvent utilisé en combinaison avec
#Exemple: numéro unique d'un certain identifiant
print(df.select("id").distinct().count())
df = df.drop("id")
# simple
df = df.dropna()
# using subset
df = df.na.drop(subset=["lat", "lon"])
#Cas simple
df = df.select("id").select(F.collect_list("id"))
id_list = df.first()[0]
> id_list => ["A001", "A002", "B001"]
#Peut également être utilisé en combinaison avec groupBy
df = df.groupBy("id").agg(F.collect_set("code"), F.collect_list("name"))
>
+---+-----------------+------------------+
| id|collect_set(code)|collect_list(name)|
+---+-----------------+------------------+
| a| [code1, code2]| [name2]|
+---+-----------------+------------------+
#Obtenez directement la valeur de la trame de données
df = df.groupBy().avg()
avg_attribute = df.collect()[0]
> print(avg_attribute["avg({col_name})"])
{averaged_value}
filter
Vous pouvez utiliser F.col ()
pour appliquer le filtrage à des colonnes spécifiques
# using spark.function
df = df.filter(F.col("id") == "A001")
# pandas-like
df = df.filter(df['id'] == "A001")
df = df.filter(df.id == "A001")
Cependant, si possible, vous devez créer un cadre de données Spark à partir de date_list et le rejoindre.
df = df.filter(F.col("dt").isin(date_list))
orderBy
Le tri ne convient pas au traitement distribué, il vaut donc mieux ne pas en faire autant.
#Colonne unique uniquement
df = df.orderBy("count", ascending=False)
#Tri multi-conditions
df = df.orderBy(F.col("id").asc(), F.col("cound").desc())
groupBy (aggregate)
# count()
df = df.groupBy("id").count()
# multiple
# alias()Le nom de la colonne est modifié par la fonction
#Exemple: agrégation d'utilisateurs
df = df.groupBy("id").agg(
F.count(F.lit(1)).alias("count"),
F.mean(F.col("diff")).alias("diff_mean"),
F.stddev(F.col("diff")).alias("diff_stddev"),
F.min(F.col("diff")).alias("diff_min"),
F.max(F.col("diff")).alias("diff_max")
)
> df.show()
(réduction)
# =======================
#Exemple: agrégation par date et heure de l'utilisateur
df = df.groupBy("id", "dt").agg(
F.count(F.lit(1)).alias("count")
)
> df.show()
+---+-----------+------+
| id| dt| count|
+---+-----------+------+
| a| 2020/01/01| 7|
| a| 2020/01/02| 5|
| a| 2020/01/03| 4|
+---+-----------+------+
# ===========================
#Exemple: agrégation par date / heure / emplacement utilisateur
df = df.groupBy("id", "dt", "location_id").agg(
F.count(F.lit(1)).alias("count")
)
> df.show()
+---+-----------+------------+------+
| id| dt| location_id| count|
+---+-----------+------------+------+
| a| 2020/01/01| A| 2|
| a| 2020/01/01| B| 3|
| a| 2020/01/01| C| 2|
: : : : :
+---+-----------+------------+------+
#Exemple: nombre d'utilisateurs uniques par date
df = df.groupBy("dt").agg(countDistinct("id").alias("id_count"))
> df.show()
+-----------+---------+
| dt| id_count|
+-----------+---------+
| 2020/01/01| 7|
| 2020/01/02| 5|
| 2020/01/03| 4|
+-----------+---------+
# ===============================
#Exemple: nombre de jours pendant lesquels chaque utilisateur a été en contact au moins une fois
df = df.groupBy("id").agg(countDistinct("dt").alias("dt_count"))
> df.show()
+---+---------+
| id| dt_count|
+---+---------+
| a| 10|
| b| 15|
| c| 4|
+---+---------+
group_columns = ["id", "dt"]
df = ad_touched_visit_df.groupBy(*group_columns).count()
w = Window().orderBy(F.col("id"))
df = df.withColumn("row_num", F.row_number().over(w))
#Ajouter les données de la ligne précédente en tant que colonne
w = Window.partitionBy("id").orderBy("timestamp")
df = df.withColumn("prev_timestamp", F.lag(df["timestamp"]).over(w))
Il est fortement déconseillé car incompatible avec les environnements distribués. Il est préférable de ne l'utiliser que lorsque cela doit être fait.
for row in df.rdd.collect():
do_some_with(row['id'])
Recommended Posts