How to extract features of time series data with PySpark Basics

Introduction

When building an AI model with time-series data, the processing flow of performing feature calculation using a sliding window and then learning with various algorithms is often implemented. This time-series data feature extraction is difficult because convenient packages such as pandas and numpy cannot be used for large-capacity data (so-called big data) that does not fit in the memory. go up. How do you achieve this? In this article, I will introduce how to use PySpark as an example. Since the amount of sentences will be large, I will introduce it in two parts.

Since this is the basic edition, I will introduce the general "method of calculating features using a sliding window with PySpark".

Verification environment

I used Azure Synapse Analytics as the execution environment of PySpark. The main package versions are as follows. (Default setting as of August 20, 2020)

Apache Spark 2.4
Python version 3.6.1

How to calculate features using a sliding window in PySpark

I will omit the method of creating the verification environment. If you have a request, I would like to write an article about how to build a Spark execution environment with Azure Synapse Analytics. We would appreciate it if you could request in the comments.

1. Data preparation

Define the appropriate data as a PySpark data frame.

df = sqlContext.createDataFrame([
    (1, 2.65,2.42,6.90,4.93),
    (2, 2.57,8.50,2.40,5.37),
    (3, 2.13,3.76,7.52,7.67),
    (4, 3.09,7.28,3.59,6.34),
    (5, 5.75,4.69,5.26,3.11),
    (6, 6.91,4.04,2.03,6.28),
    (7, 5.44,3.22,2.87,7.14),
    (8, 4.86,7.47,3.68,0.32),
    (9, 9.70,7.43,4.43,7.74),
    (10,6.30,7.72,7.78,7.91),
    ], 
    ["time", "data1", "data2", "data3", "data4"])

df.show()

# +----+-----+-----+-----+-----+
# |time|data1|data2|data3|data4|
# +----+-----+-----+-----+-----+
# |   1| 2.65| 2.42|  6.9| 4.93|
# |   2| 2.57|  8.5|  2.4| 5.37|
# |   3| 2.13| 3.76| 7.52| 7.67|
# |   4| 3.09| 7.28| 3.59| 6.34|
# |   5| 5.75| 4.69| 5.26| 3.11|
# |   6| 6.91| 4.04| 2.03| 6.28|
# |   7| 5.44| 3.22| 2.87| 7.14|
# |   8| 4.86| 7.47| 3.68| 0.32|
# |   9|  9.7| 7.43| 4.43| 7.74|
# |  10|  6.3| 7.72| 7.78| 7.91|
# +----+-----+-----+-----+-----+

Imagine that the following data was obtained.

Column name meaning
time Recording time (seconds)
data1~data6 Measurement data

2. Sliding window definition

The sliding window of PySpark is defined by Windows of pyspark.sql.window. Here, a sliding window with a window width of 5 seconds is defined.

from pyspark.sql.window import Window
import pyspark.sql.functions as F

# sliding-window settings
window_size = 5
sliding_window = Window.orderBy(F.col("time")).rowsBetween(Window.currentRow, window_size-1)

The sort key is specified by ʻorderBy ("column name") . It is very important to specify the sort key, as Sparkdoes not guarantee the processing order. In this example, the order oftime indicating the recording time, that is, the order of timeis arranged in ascending order and then processed in order from the first record, so the specification is ʻorderBy (F.col ("time")). doing. By the way, it is processed by ʻASC (ascending order)by default. If you want to process inDESC (descending order)`, write as follows.

sliding_window = Window.orderBy(F.col("time").desc()).rowsBetween(Window.currentRow, window_size-1)

If you add .desc () to F.col ("time "), it will be treated in descending order.

Next, the window width is defined by rowsBetween (Window.currentRow, window_size-1). The first argument is the definition of the start position, where Window.currentRow and the current row are specified. The second argument is the definition of the end position, where window_size-1 and 4 lines ahead (4 seconds ahead) from the current line are specified. With this, the data for 5 lines (5 seconds) up to 4 lines ahead including the current line can be defined as one window.

3. Feature calculation

Feature extraction is performed using the sliding window definition set earlier. Try to get the max (maximum value), min (minimum value), and ʻavg (mean value) in the window width for data1`.

df.withColumn('feat_max_data1', F.max('data1').over(sliding_window))\
  .withColumn('feat_min_data1', F.min('data1').over(sliding_window))\
  .withColumn('feat_avg_data1', F.avg('data1').over(sliding_window))\
  .select('time', 'data1', 'feat_max_data1', 'feat_min_data1', 'feat_avg_data1')\
  .show()

# +----+-----+--------------+--------------+------------------+
# |time|data1|feat_max_data1|feat_min_data1|    feat_avg_data1|
# +----+-----+--------------+--------------+------------------+
# |   1| 2.65|          5.75|          2.13|3.2379999999999995|
# |   2| 2.57|          6.91|          2.13|              4.09|
# |   3| 2.13|          6.91|          2.13|             4.664|
# |   4| 3.09|          6.91|          3.09|              5.21|
# |   5| 5.75|           9.7|          4.86| 6.531999999999999|
# |   6| 6.91|           9.7|          4.86|             6.642|
# |   7| 5.44|           9.7|          4.86|             6.575|
# |   8| 4.86|           9.7|          4.86| 6.953333333333333|
# |   9|  9.7|           9.7|           6.3|               8.0|
# |  10|  6.3|           6.3|           6.3|               6.3|
# +----+-----+--------------+--------------+------------------+

The result of the processing content specified by the specified column name is added as a new column of the data frame with withColumn ("column name ", processing content). Looking at the processing code that calculates max, it iswithColumn ('feat_max_data1', F.max ('data1'). Over (sliding_window)), and it takes the max of data1. Under the condition of ʻover (sliding_window). The result will be added as a feat_max_data1column." The sliding window specification inPySpark is defined by ʻover (). Since PySpark defines processing one by one, it is necessary to enumerate multiple processing codes when acquiring multiple features from one column as in this example.

Summary

So far, we have introduced the basics of "How to calculate features using a sliding window with PySpark". The processing method introduced this time is a general method, and I think that it is sufficient when the amount of data is small or the amount of features to be extracted is small. However, if the amount of data is large, the number of columns to be processed is large, or the amount of features to be extracted is large, this processing method will result in poor processing efficiency and high processing costs. However, it is possible to dramatically improve the processing cost by devising the processing method. Next time, as an advanced version, I will introduce what kind of device can be used for more efficient processing.

Thank you for reading.

Recommended Posts

How to extract features of time series data with PySpark Basics
Try to extract the features of the sensor data with CNN
How to handle time series data (implementation)
View details of time series data with Remotte
How to read time series data in PyTorch
I tried to extract features with SIFT of OpenCV
How to extract non-missing value nan data with pandas
How to extract non-missing value nan data with pandas
I just wanted to extract the data of the desired date and time with Django
How to generate exponential pulse time series data in python
How to deal with imbalanced data
How to deal with imbalanced data
How to Data Augmentation with PyTorch
Differentiation of time series data (discrete)
Time series analysis 3 Preprocessing of time series data
How to calculate the sum or average of time series csv data in an instant
<Pandas> How to handle time series data in a pivot table
Forecasting time series data with Simplex Projection
Predict time series data with neural network
How to compare time series data-Derivative DTW, DTW-
Jupyter Notebook Basics of how to use
Basics of PyTorch (1) -How to use Tensor-
How to read problem data with paiza
[Introduction to Data Scientists] Basics of Python ♬
Plot CSV of time series data with unixtime value in Python (matplotlib)
How to create sample CSV data with hypothesis
Convert data with shape (number of data, 1) to (number of data,) with numpy.
How to achieve time wait processing with wxpython
Acquisition of time series data (daily) of stock prices
How to scrape horse racing data with BeautifulSoup
How to specify attributes with Mock of python
How to implement "named_scope" of RubyOnRails with Django
Smoothing of time series and waveform data 3 methods (smoothing)
How to measure execution time with Python Part 1
How to measure execution time with Python Part 2
Summary of how to read numerical data with python [CSV, NetCDF, Fortran binary]
[Introduction to Python] How to get the index of data with a for statement
How to use xgboost: Multi-class classification with iris data
I implemented "Basics of Time Series Analysis and State Space Model" (Hayamoto) with pystan
Features that can be extracted from time series data
How to scrape image data from flickr with python
Reading, summarizing, visualizing, and exporting time series data to an Excel file with Python
Anomaly detection of time series data by LSTM (Keras)
How to convert horizontally held data to vertically held data with pandas
How to measure mp3 file playback time with python
How to get more than 1000 data with SQLAlchemy + MySQLdb
How to use Python Kivy ① ~ Basics of Kv Language ~
Time series analysis 1 Basics
How to extract null values and non-null values with pandas
How to output CSV of multi-line header with pandas
I tried to implement time series prediction with GBDT
How to infer MAP estimate of HMM with PyStruct
Basics of PyTorch (2) -How to make a neural network-
Implementation of clustering k-shape method for time series data [Unsupervised learning with python Chapter 13]
How to infer MAP estimate of HMM with OpenGM
How to learn structured SVM of ChainCRF with PyStruct
[Basics of data science] Collecting data from RSS with python
Extract the band information of raster data with python
Summary of how to share state with multiple functions
Example of how to aggregate a large amount of time series data using Python at a reasonable speed in a small memory environment
[Introduction to SIR model] Predict the end time of each country with COVID-19 data fitting ♬