Comment extraire des fonctionnalités de données de séries chronologiques avec les bases de PySpark

introduction

Lors de la création d'un modèle d'IA avec des données de séries chronologiques, le flux de traitement consistant à effectuer un calcul de quantité d'entités à l'aide d'une fenêtre glissante, puis à s'entraîner avec divers algorithmes est souvent mis en œuvre. En ce qui concerne l'extraction de la quantité de caractéristiques de ces données de série chronologique, la difficulté de traitement est difficile car des packages pratiques tels que pandas et numpy ne peuvent pas être utilisés pour des données de grande capacité (dites big data) qui ne rentrent pas dans la mémoire. monter. Comment y parvenir? Dans cet article, je vais vous présenter comment utiliser PySpark comme exemple. Étant donné que le nombre de phrases sera important, je vais l'introduire en deux parties.

Cette fois, comme il s'agit d'une édition basique, j'introduirai une "méthode générale de calcul des caractéristiques à l'aide d'une fenêtre glissante avec PySpark".

Environnement de vérification

J'ai utilisé Azure Synapse Analytics comme environnement d'exécution pour PySpark. Les principales versions du package sont les suivantes. (Paramètre par défaut au 20 août 2020)

Apache Spark 2.4
Python version 3.6.1

Comment calculer des entités à l'aide d'une fenêtre glissante dans PySpark

J'omettrai la méthode de création de l'environnement de vérification. Si vous avez une demande, j'aimerais écrire un article sur la création d'un environnement d'exécution Spark avec Azure Synapse Analytics. Nous vous serions reconnaissants si vous pouviez demander dans les commentaires.

1. Préparation des données

Définissez les données appropriées en tant que bloc de données PySpark.

df = sqlContext.createDataFrame([
    (1, 2.65,2.42,6.90,4.93),
    (2, 2.57,8.50,2.40,5.37),
    (3, 2.13,3.76,7.52,7.67),
    (4, 3.09,7.28,3.59,6.34),
    (5, 5.75,4.69,5.26,3.11),
    (6, 6.91,4.04,2.03,6.28),
    (7, 5.44,3.22,2.87,7.14),
    (8, 4.86,7.47,3.68,0.32),
    (9, 9.70,7.43,4.43,7.74),
    (10,6.30,7.72,7.78,7.91),
    ], 
    ["time", "data1", "data2", "data3", "data4"])

df.show()

# +----+-----+-----+-----+-----+
# |time|data1|data2|data3|data4|
# +----+-----+-----+-----+-----+
# |   1| 2.65| 2.42|  6.9| 4.93|
# |   2| 2.57|  8.5|  2.4| 5.37|
# |   3| 2.13| 3.76| 7.52| 7.67|
# |   4| 3.09| 7.28| 3.59| 6.34|
# |   5| 5.75| 4.69| 5.26| 3.11|
# |   6| 6.91| 4.04| 2.03| 6.28|
# |   7| 5.44| 3.22| 2.87| 7.14|
# |   8| 4.86| 7.47| 3.68| 0.32|
# |   9|  9.7| 7.43| 4.43| 7.74|
# |  10|  6.3| 7.72| 7.78| 7.91|
# +----+-----+-----+-----+-----+

Imaginez que les données suivantes ont été obtenues.

Nom de colonne sens
time Durée d'enregistrement (secondes)
data1~data6 Données de mesure

2. Définition de la fenêtre coulissante

La fenêtre glissante de «PySpark» est définie par «Windows» de «pyspark.sql.window». Ici, une fenêtre glissante avec une largeur de fenêtre de 5 secondes est définie.

from pyspark.sql.window import Window
import pyspark.sql.functions as F

# sliding-paramètres de la fenêtre
window_size = 5
sliding_window = Window.orderBy(F.col("time")).rowsBetween(Window.currentRow, window_size-1)

La clé de tri est spécifiée par ʻorderBy ("nom de colonne") . La spécification de la clé de tri est très importante car «Spark» ne garantit pas l'ordre de traitement. Dans cet exemple, l'ordre de «time» indiquant l'heure d'enregistrement, c'est-à-dire que «time» est organisé par ordre croissant et traité dans l'ordre à partir du premier enregistrement, spécifiez donc ʻorderBy (F.col ("time")) Faire. D'ailleurs, par défaut, il est traité dans ʻASC (ordre croissant) . Si vous voulez traiter avec DESC (ordre décroissant)`, écrivez comme suit.

sliding_window = Window.orderBy(F.col("time").desc()).rowsBetween(Window.currentRow, window_size-1)

Si vous ajoutez .desc () à F.col (" time "), il sera traité par ordre décroissant.

Ensuite, la largeur de la fenêtre est définie par rowsBetween (Window.currentRow, window_size-1). Le premier argument est la définition de la position de départ, où Window.currentRow et la ligne actuelle sont spécifiés. Le deuxième argument est la définition de la position finale, où window_size-1 et 4 lignes en avant (4 secondes en avance) de la ligne actuelle sont spécifiés. Avec cela, les données pour 5 lignes (5 secondes) jusqu'à 4 lignes en avant, y compris la ligne actuelle, peuvent être définies comme une fenêtre.

3. Calcul des caractéristiques

Effectuez une extraction de quantité d'entités en utilisant la définition de la fenêtre glissante définie précédemment. Essayez d'obtenir le max (valeur maximale), min (valeur minimale) et ʻavg (valeur moyenne) ʻdans la largeur de la fenêtre pour data1.

df.withColumn('feat_max_data1', F.max('data1').over(sliding_window))\
  .withColumn('feat_min_data1', F.min('data1').over(sliding_window))\
  .withColumn('feat_avg_data1', F.avg('data1').over(sliding_window))\
  .select('time', 'data1', 'feat_max_data1', 'feat_min_data1', 'feat_avg_data1')\
  .show()

# +----+-----+--------------+--------------+------------------+
# |time|data1|feat_max_data1|feat_min_data1|    feat_avg_data1|
# +----+-----+--------------+--------------+------------------+
# |   1| 2.65|          5.75|          2.13|3.2379999999999995|
# |   2| 2.57|          6.91|          2.13|              4.09|
# |   3| 2.13|          6.91|          2.13|             4.664|
# |   4| 3.09|          6.91|          3.09|              5.21|
# |   5| 5.75|           9.7|          4.86| 6.531999999999999|
# |   6| 6.91|           9.7|          4.86|             6.642|
# |   7| 5.44|           9.7|          4.86|             6.575|
# |   8| 4.86|           9.7|          4.86| 6.953333333333333|
# |   9|  9.7|           9.7|           6.3|               8.0|
# |  10|  6.3|           6.3|           6.3|               6.3|
# +----+-----+--------------+--------------+------------------+

Le résultat du traitement du contenu spécifié par le nom de colonne spécifié est ajouté en tant que nouvelle colonne du bloc de données avec withColumn (" nom de colonne ", traitement du contenu). En regardant le code de traitement qui calcule max, il est withColumn ('feat_max_data1', F.max ('data1'). Over (glissant_window)), et il prend le max de data1. Sous la condition de ʻover (glissant_window) . Le résultat sera ajouté en tant que colonne feat_max_data1. " La spécification de la fenêtre glissante dans PySpark est définie par ʻover (). Puisque PySpark définit le traitement un par un, il est nécessaire d'énumérer plusieurs codes de traitement lors de l'acquisition de plusieurs quantités de caractéristiques à partir d'une colonne comme dans cet exemple.

Résumé

Jusqu'à présent, nous avons présenté les bases de "Comment calculer des entités en utilisant une fenêtre glissante avec PySpark". La méthode de traitement introduite cette fois-ci est une méthode générale, et je pense qu'elle est suffisante lorsque la quantité de données est petite ou la quantité de caractéristiques à extraire est petite. Cependant, si la quantité de données est importante, le nombre de colonnes à traiter est important ou la quantité de caractéristiques à extraire est importante, cette méthode de traitement se traduira par une efficacité de traitement médiocre et des coûts de traitement élevés. Cependant, il est possible d'améliorer considérablement le coût de traitement en concevant le procédé de traitement. La prochaine fois, en tant que version avancée, je présenterai le type d'appareil pouvant être utilisé pour un traitement plus efficace.

Merci pour la lecture.

Recommended Posts

Comment extraire des fonctionnalités de données de séries chronologiques avec les bases de PySpark
Essayez d'extraire les caractéristiques des données de capteur avec CNN
Comment gérer les données de séries chronologiques (mise en œuvre)
Voir les détails des données de séries chronologiques dans Remotte
Comment lire les données de séries chronologiques dans PyTorch
J'ai essayé d'extraire des fonctionnalités avec SIFT d'OpenCV
Comment extraire des données qui ne manquent pas de valeur nan avec des pandas
Comment extraire des données qui ne manquent pas de valeur nan avec des pandas
Je voulais juste extraire les données de la date et de l'heure souhaitées avec Django
Comment gérer les données déséquilibrées
Comment augmenter les données avec PyTorch
Différenciation des données de séries chronologiques (discrètes)
Analyse des séries chronologiques 3 Prétraitement des données des séries chronologiques
Comment calculer la somme ou la moyenne des données csv de séries chronologiques en un instant
<Pandas> Comment gérer les données de séries chronologiques dans le tableau croisé dynamique
Prédiction des données de séries chronologiques par projection simplex
Prédire les données de séries chronologiques avec un réseau neuronal
Comment comparer des données de séries chronologiques - Dérivée DTW, DTW-
Jupyter Notebook Principes d'utilisation
Bases de PyTorch (1) -Comment utiliser Tensor-
Comment lire les données de problème avec Paiza
[Introduction au Data Scientist] Bases de Python ♬
Tracer CSV de données de séries temporelles avec une valeur unixtime en Python (matplotlib)
Comment créer des exemples de données CSV avec hypothèse
Convertissez les données avec la forme (nombre de données, 1) en (nombre de données,) avec numpy.
Comment implémenter le traitement du temps d'attente avec wxpython
Acquisition de données chronologiques (quotidiennes) des cours des actions
Comment récupérer des données de courses de chevaux avec Beautiful Soup
Comment spécifier des attributs avec Mock of Python
Comment implémenter "named_scope" de RubyOnRails avec Django
Lissage des séries temporelles et des données de forme d'onde 3 méthodes (lissage)
Comment mesurer le temps d'exécution avec Python Partie 1
Comment mesurer le temps d'exécution avec Python, partie 2
Résumé de la lecture des données numériques avec python [CSV, NetCDF, Fortran binary]
[Introduction à Python] Comment obtenir l'index des données avec l'instruction for
Comment utiliser xgboost: classification multi-classes avec des données d'iris
J'ai essayé d'implémenter "Bases de l'analyse des séries temporelles et du modèle d'espace d'état" (Hayamoto) avec pystan
Quantité d'entités pouvant être extraite des données de séries chronologiques
Comment récupérer des données d'image de Flickr avec Python
Détection d'anomalies des données de séries chronologiques par LSTM (Keras)
Comment convertir des données détenues horizontalement en données détenues verticalement avec des pandas
Comment mesurer le temps de lecture d'un fichier mp3 avec python
Comment obtenir plus de 1000 données avec SQLAlchemy + MySQLdb
Comment utiliser Python Kivy ① ~ Bases du langage Kv ~
Analyse des séries chronologiques 1 Principes de base
Comment extraire des valeurs Null et des valeurs non Null avec des pandas
Comment générer un CSV d'en-tête multiligne avec des pandas
Comment déduire l'estimation MAP de HMM avec PyStruct
Bases de PyTorch (2) -Comment créer un réseau de neurones-
Implémentation de la méthode de clustering k-shape pour les données de séries chronologiques [Apprentissage non supervisé avec python Chapitre 13]
Comment déduire une estimation MAP de HMM avec OpenGM
Comment apprendre le SVM structuré de ChainCRF avec PyStruct
[Bases de la science des données] Collecte de données depuis RSS avec python
Résumé de la façon de partager l'état avec plusieurs fonctions
Exemple d'agrégation d'une grande quantité de données de séries temporelles à l'aide de Python dans un petit environnement de mémoire à une vitesse raisonnable
[Introduction au modèle SIR] Prédire l'heure de fin de chaque pays avec l'ajustement des données COVID-19 ♬