Lors de la création d'un modèle d'IA avec des données de séries chronologiques, le flux de traitement consistant à effectuer un calcul de quantité d'entités à l'aide d'une fenêtre glissante, puis à s'entraîner avec divers algorithmes est souvent mis en œuvre. En ce qui concerne l'extraction de la quantité de caractéristiques de ces données de série chronologique, la difficulté de traitement est difficile car des packages pratiques tels que pandas
et numpy
ne peuvent pas être utilisés pour des données de grande capacité (dites big data) qui ne rentrent pas dans la mémoire. monter. Comment y parvenir?
Dans cet article, je vais vous présenter comment utiliser PySpark
comme exemple. Étant donné que le nombre de phrases sera important, je vais l'introduire en deux parties.
Cette fois, comme il s'agit d'une édition basique, j'introduirai une "méthode générale de calcul des caractéristiques à l'aide d'une fenêtre glissante avec PySpark".
J'ai utilisé Azure Synapse Analytics comme environnement d'exécution pour PySpark
.
Les principales versions du package sont les suivantes. (Paramètre par défaut au 20 août 2020)
Apache Spark 2.4
Python version 3.6.1
J'omettrai la méthode de création de l'environnement de vérification. Si vous avez une demande, j'aimerais écrire un article sur la création d'un environnement d'exécution Spark avec Azure Synapse Analytics. Nous vous serions reconnaissants si vous pouviez demander dans les commentaires.
Définissez les données appropriées en tant que bloc de données PySpark.
df = sqlContext.createDataFrame([
(1, 2.65,2.42,6.90,4.93),
(2, 2.57,8.50,2.40,5.37),
(3, 2.13,3.76,7.52,7.67),
(4, 3.09,7.28,3.59,6.34),
(5, 5.75,4.69,5.26,3.11),
(6, 6.91,4.04,2.03,6.28),
(7, 5.44,3.22,2.87,7.14),
(8, 4.86,7.47,3.68,0.32),
(9, 9.70,7.43,4.43,7.74),
(10,6.30,7.72,7.78,7.91),
],
["time", "data1", "data2", "data3", "data4"])
df.show()
# +----+-----+-----+-----+-----+
# |time|data1|data2|data3|data4|
# +----+-----+-----+-----+-----+
# | 1| 2.65| 2.42| 6.9| 4.93|
# | 2| 2.57| 8.5| 2.4| 5.37|
# | 3| 2.13| 3.76| 7.52| 7.67|
# | 4| 3.09| 7.28| 3.59| 6.34|
# | 5| 5.75| 4.69| 5.26| 3.11|
# | 6| 6.91| 4.04| 2.03| 6.28|
# | 7| 5.44| 3.22| 2.87| 7.14|
# | 8| 4.86| 7.47| 3.68| 0.32|
# | 9| 9.7| 7.43| 4.43| 7.74|
# | 10| 6.3| 7.72| 7.78| 7.91|
# +----+-----+-----+-----+-----+
Imaginez que les données suivantes ont été obtenues.
Nom de colonne | sens |
---|---|
time | Durée d'enregistrement (secondes) |
data1~data6 | Données de mesure |
La fenêtre glissante de «PySpark» est définie par «Windows» de «pyspark.sql.window». Ici, une fenêtre glissante avec une largeur de fenêtre de 5 secondes est définie.
from pyspark.sql.window import Window
import pyspark.sql.functions as F
# sliding-paramètres de la fenêtre
window_size = 5
sliding_window = Window.orderBy(F.col("time")).rowsBetween(Window.currentRow, window_size-1)
La clé de tri est spécifiée par ʻorderBy ("nom de colonne") . La spécification de la clé de tri est très importante car «Spark» ne garantit pas l'ordre de traitement. Dans cet exemple, l'ordre de «time» indiquant l'heure d'enregistrement, c'est-à-dire que «time» est organisé par ordre croissant et traité dans l'ordre à partir du premier enregistrement, spécifiez donc ʻorderBy (F.col ("time"))
Faire. D'ailleurs, par défaut, il est traité dans ʻASC (ordre croissant) . Si vous voulez traiter avec
DESC (ordre décroissant)`, écrivez comme suit.
sliding_window = Window.orderBy(F.col("time").desc()).rowsBetween(Window.currentRow, window_size-1)
Si vous ajoutez .desc ()
à F.col (" time ")
, il sera traité par ordre décroissant.
Ensuite, la largeur de la fenêtre est définie par rowsBetween (Window.currentRow, window_size-1)
. Le premier argument est la définition de la position de départ, où Window.currentRow
et la ligne actuelle sont spécifiés. Le deuxième argument est la définition de la position finale, où window_size-1
et 4 lignes en avant (4 secondes en avance) de la ligne actuelle sont spécifiés. Avec cela, les données pour 5 lignes (5 secondes) jusqu'à 4 lignes en avant, y compris la ligne actuelle, peuvent être définies comme une fenêtre.
Effectuez une extraction de quantité d'entités en utilisant la définition de la fenêtre glissante définie précédemment. Essayez d'obtenir le max (valeur maximale)
, min (valeur minimale)
et ʻavg (valeur moyenne) ʻdans la largeur de la fenêtre pour data1
.
df.withColumn('feat_max_data1', F.max('data1').over(sliding_window))\
.withColumn('feat_min_data1', F.min('data1').over(sliding_window))\
.withColumn('feat_avg_data1', F.avg('data1').over(sliding_window))\
.select('time', 'data1', 'feat_max_data1', 'feat_min_data1', 'feat_avg_data1')\
.show()
# +----+-----+--------------+--------------+------------------+
# |time|data1|feat_max_data1|feat_min_data1| feat_avg_data1|
# +----+-----+--------------+--------------+------------------+
# | 1| 2.65| 5.75| 2.13|3.2379999999999995|
# | 2| 2.57| 6.91| 2.13| 4.09|
# | 3| 2.13| 6.91| 2.13| 4.664|
# | 4| 3.09| 6.91| 3.09| 5.21|
# | 5| 5.75| 9.7| 4.86| 6.531999999999999|
# | 6| 6.91| 9.7| 4.86| 6.642|
# | 7| 5.44| 9.7| 4.86| 6.575|
# | 8| 4.86| 9.7| 4.86| 6.953333333333333|
# | 9| 9.7| 9.7| 6.3| 8.0|
# | 10| 6.3| 6.3| 6.3| 6.3|
# +----+-----+--------------+--------------+------------------+
Le résultat du traitement du contenu spécifié par le nom de colonne spécifié est ajouté en tant que nouvelle colonne du bloc de données avec withColumn (" nom de colonne ", traitement du contenu)
. En regardant le code de traitement qui calcule max
, il est withColumn ('feat_max_data1', F.max ('data1'). Over (glissant_window))
, et il prend le max
de data1
. Sous la condition de ʻover (glissant_window) . Le résultat sera ajouté en tant que colonne
feat_max_data1. " La spécification de la fenêtre glissante dans
PySpark est définie par ʻover ()
.
Puisque PySpark
définit le traitement un par un, il est nécessaire d'énumérer plusieurs codes de traitement lors de l'acquisition de plusieurs quantités de caractéristiques à partir d'une colonne comme dans cet exemple.
Jusqu'à présent, nous avons présenté les bases de "Comment calculer des entités en utilisant une fenêtre glissante avec PySpark". La méthode de traitement introduite cette fois-ci est une méthode générale, et je pense qu'elle est suffisante lorsque la quantité de données est petite ou la quantité de caractéristiques à extraire est petite. Cependant, si la quantité de données est importante, le nombre de colonnes à traiter est important ou la quantité de caractéristiques à extraire est importante, cette méthode de traitement se traduira par une efficacité de traitement médiocre et des coûts de traitement élevés. Cependant, il est possible d'améliorer considérablement le coût de traitement en concevant le procédé de traitement. La prochaine fois, en tant que version avancée, je présenterai le type d'appareil pouvant être utilisé pour un traitement plus efficace.
Merci pour la lecture.
Recommended Posts