When building an AI model with time-series data, the processing flow of performing feature calculation using a sliding window and then learning with various algorithms is often implemented. This time-series data feature extraction is difficult because convenient packages such as pandas
and numpy
cannot be used for large-capacity data (so-called big data) that does not fit in the memory. go up. How do you achieve this?
In this article, I will introduce how to use PySpark
as an example. Since the amount of sentences will be large, I will introduce it in two parts.
Since this is the basic edition, I will introduce the general "method of calculating features using a sliding window with PySpark".
I used Azure Synapse Analytics as the execution environment of PySpark
.
The main package versions are as follows. (Default setting as of August 20, 2020)
Apache Spark 2.4
Python version 3.6.1
I will omit the method of creating the verification environment. If you have a request, I would like to write an article about how to build a Spark execution environment with Azure Synapse Analytics. We would appreciate it if you could request in the comments.
Define the appropriate data as a PySpark data frame.
df = sqlContext.createDataFrame([
(1, 2.65,2.42,6.90,4.93),
(2, 2.57,8.50,2.40,5.37),
(3, 2.13,3.76,7.52,7.67),
(4, 3.09,7.28,3.59,6.34),
(5, 5.75,4.69,5.26,3.11),
(6, 6.91,4.04,2.03,6.28),
(7, 5.44,3.22,2.87,7.14),
(8, 4.86,7.47,3.68,0.32),
(9, 9.70,7.43,4.43,7.74),
(10,6.30,7.72,7.78,7.91),
],
["time", "data1", "data2", "data3", "data4"])
df.show()
# +----+-----+-----+-----+-----+
# |time|data1|data2|data3|data4|
# +----+-----+-----+-----+-----+
# | 1| 2.65| 2.42| 6.9| 4.93|
# | 2| 2.57| 8.5| 2.4| 5.37|
# | 3| 2.13| 3.76| 7.52| 7.67|
# | 4| 3.09| 7.28| 3.59| 6.34|
# | 5| 5.75| 4.69| 5.26| 3.11|
# | 6| 6.91| 4.04| 2.03| 6.28|
# | 7| 5.44| 3.22| 2.87| 7.14|
# | 8| 4.86| 7.47| 3.68| 0.32|
# | 9| 9.7| 7.43| 4.43| 7.74|
# | 10| 6.3| 7.72| 7.78| 7.91|
# +----+-----+-----+-----+-----+
Imagine that the following data was obtained.
Column name | meaning |
---|---|
time | Recording time (seconds) |
data1~data6 | Measurement data |
The sliding window of PySpark
is defined by Windows
of pyspark.sql.window
. Here, a sliding window with a window width of 5 seconds is defined.
from pyspark.sql.window import Window
import pyspark.sql.functions as F
# sliding-window settings
window_size = 5
sliding_window = Window.orderBy(F.col("time")).rowsBetween(Window.currentRow, window_size-1)
The sort key is specified by ʻorderBy ("column name") . It is very important to specify the sort key, as
Sparkdoes not guarantee the processing order. In this example, the order of
time indicating the recording time, that is, the order of
timeis arranged in ascending order and then processed in order from the first record, so the specification is ʻorderBy (F.col ("time"))
. doing. By the way, it is processed by ʻASC (ascending order)by default. If you want to process in
DESC (descending order)`, write as follows.
sliding_window = Window.orderBy(F.col("time").desc()).rowsBetween(Window.currentRow, window_size-1)
If you add .desc ()
to F.col ("time ")
, it will be treated in descending order.
Next, the window width is defined by rowsBetween (Window.currentRow, window_size-1)
. The first argument is the definition of the start position, where Window.currentRow
and the current row are specified. The second argument is the definition of the end position, where window_size-1
and 4 lines ahead (4 seconds ahead) from the current line are specified. With this, the data for 5 lines (5 seconds) up to 4 lines ahead including the current line can be defined as one window.
Feature extraction is performed using the sliding window definition set earlier. Try to get the max (maximum value)
, min (minimum value)
, and ʻavg (mean value) in the window width for
data1`.
df.withColumn('feat_max_data1', F.max('data1').over(sliding_window))\
.withColumn('feat_min_data1', F.min('data1').over(sliding_window))\
.withColumn('feat_avg_data1', F.avg('data1').over(sliding_window))\
.select('time', 'data1', 'feat_max_data1', 'feat_min_data1', 'feat_avg_data1')\
.show()
# +----+-----+--------------+--------------+------------------+
# |time|data1|feat_max_data1|feat_min_data1| feat_avg_data1|
# +----+-----+--------------+--------------+------------------+
# | 1| 2.65| 5.75| 2.13|3.2379999999999995|
# | 2| 2.57| 6.91| 2.13| 4.09|
# | 3| 2.13| 6.91| 2.13| 4.664|
# | 4| 3.09| 6.91| 3.09| 5.21|
# | 5| 5.75| 9.7| 4.86| 6.531999999999999|
# | 6| 6.91| 9.7| 4.86| 6.642|
# | 7| 5.44| 9.7| 4.86| 6.575|
# | 8| 4.86| 9.7| 4.86| 6.953333333333333|
# | 9| 9.7| 9.7| 6.3| 8.0|
# | 10| 6.3| 6.3| 6.3| 6.3|
# +----+-----+--------------+--------------+------------------+
The result of the processing content specified by the specified column name is added as a new column of the data frame with withColumn ("column name ", processing content)
. Looking at the processing code that calculates max
, it iswithColumn ('feat_max_data1', F.max ('data1'). Over (sliding_window))
, and it takes the max
of data1
. Under the condition of ʻover (sliding_window). The result will be added as a
feat_max_data1column." The sliding window specification in
PySpark is defined by ʻover ()
.
Since PySpark
defines processing one by one, it is necessary to enumerate multiple processing codes when acquiring multiple features from one column as in this example.
So far, we have introduced the basics of "How to calculate features using a sliding window with PySpark". The processing method introduced this time is a general method, and I think that it is sufficient when the amount of data is small or the amount of features to be extracted is small. However, if the amount of data is large, the number of columns to be processed is large, or the amount of features to be extracted is large, this processing method will result in poor processing efficiency and high processing costs. However, it is possible to dramatically improve the processing cost by devising the processing method. Next time, as an advanced version, I will introduce what kind of device can be used for more efficient processing.
Thank you for reading.
Recommended Posts