Anomaly detection of time series data by LSTM is performed.
Implemented using Keras while referring to the contents of ReNom Tutorial in Reference [1]. ..
The general flow is as follows.
Step1 Create a model that predicts future values from past data using normal data.
Step2 Predict with test data using the created model and sample the error vector. Then, the normal distribution is fitted to the error vector.
Step3 Calculate the error vector of the part where the abnormality seems to have occurred, and if there is an error vector at the tail of the normal distribution estimated in Step 2, it is judged that the abnormality has occurred.
%matplotlib inline
import keras
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
Use the electrocardiogram data called qtdb / sel102 ECG dataset [2].
The data can be obtained at:
http://www.cs.ucr.edu/~eamonn/discords/qtdbsel102.txt
df = pd.read_csv('qtdbsel102.txt', header=None, delimiter='\t')
print(df.shape)
(45000, 3)
This time, we will focus on the data in the third column to detect anomalies.
ecg = df.iloc[:,2].values
ecg = ecg.reshape(len(ecg), -1)
print("length of ECG data:", len(ecg))
length of ECG data: 45000
As data preprocessing, set the mean to 0 and the variance to 1.
scaler = StandardScaler()
std_ecg = scaler.fit_transform(ecg)
Next, the data is illustrated to confirm the tendency.
plt.figure()
plt.style.use('ggplot')
plt.figure(figsize=(15,5))
plt.xlabel('time')
plt.ylabel('ECG\'s value')
plt.plot(np.arange(45000), std_ecg[:45000], color='b')
plt.legend()
plt.show()
Although it is periodic as a whole, it can be seen that the shape is broken in one place before 5000 steps.
So, let's expand the graph from 0 steps to 5000 steps.
plt.figure()
plt.style.use('ggplot')
plt.figure(figsize=(15,5))
plt.xlabel('time')
plt.ylabel('ECG\'s value')
plt.plot(np.arange(5000), std_ecg[:5000], color='b')
plt.ylim(-3, 3)
x = np.arange(4200,4400)
y1 = [-3]*len(x)
y2 = [3]*len(x)
plt.fill_between(x, y1, y2, facecolor='g', alpha=.3)
plt.legend()
plt.show()
It can be seen that the periodicity is broken at the part painted in green.
The specific goals of this time are as follows.
-** The green part is judged as abnormal and detected. ** **
In order to learn only with normal data, data after 5000 steps is taken out.
normal_cycle = std_ecg[5000:]
Prepare the following generators to divide the possessed data into training data, verification data, and test data.
Here, the following 3 data are predicted from the 10 data.
Note) delay and step are not used this time. It means "predict the data after delay" and "use the data every step", respectively.
def generator(data, lookback, delay, pred_length, min_index, max_index, shuffle=False,
batch_size=100, step=1):
if max_index is None:
max_index = len(data) - delay - pred_length - 1
i = min_index + lookback
while 1:
if shuffle:
rows = np.random.randint(min_index + lookback, max_index,
size=batch_size)
else:
if i + batch_size >= max_index:
i = min_index + lookback
rows = np.arange(i, min(i + batch_size, max_index))
i += len(rows)
samples = np.zeros((len(rows),
lookback//step,
data.shape[-1]))
targets = np.zeros((len(rows), pred_length))
for j, row in enumerate(rows):
indices = range(rows[j] - lookback, rows[j], step)
samples[j] = data[indices]
targets[j] = data[rows[j] + delay : rows[j] + delay + pred_length].flatten()
yield samples, targets
lookback = 10
pred_length = 3
step = 1
delay = 1
batch_size = 100
#Training generator
train_gen = generator(normal_cycle,
lookback=lookback,
pred_length=pred_length,
delay=delay,
min_index=0,
max_index=20000,
shuffle=True,
step=step,
batch_size=batch_size)
val_gen = generator(normal_cycle,
lookback=lookback,
pred_length=pred_length,
delay=delay,
min_index=20001,
max_index=30000,
step=step,
batch_size=batch_size)
#Val to examine the entire validation dataset_Number of time increments to extract from gen
val_steps = (30001 - 20001 -lookback) // batch_size
#Others are test data
Here, a model in which two LSTMs are stacked is used.
LSTM is a neural network suitable for time series data.
Only the final 5 steps will be posted.
import keras
from keras.models import Sequential
from keras import layers
from keras.optimizers import RMSprop
model = Sequential()
model.add(layers.LSTM(35, return_sequences = True, input_shape=(None,normal_cycle.shape[-1])))
model.add(layers.LSTM(35))
model.add(layers.Dense(pred_length))
model.compile(optimizer=RMSprop(), loss="mse")
history = model.fit_generator(train_gen,
steps_per_epoch=200,
epochs=60,
validation_data=val_gen,
validation_steps=val_steps)
Epoch 56/60
200/200 [==============================] - 3s 17ms/step - loss: 0.0037 - val_loss: 0.0043
Epoch 57/60
200/200 [==============================] - 3s 17ms/step - loss: 0.0039 - val_loss: 0.0045
Epoch 58/60
200/200 [==============================] - 3s 17ms/step - loss: 0.0036 - val_loss: 0.0047
Epoch 59/60
200/200 [==============================] - 3s 17ms/step - loss: 0.0036 - val_loss: 0.0043
Epoch 60/60
200/200 [==============================] - 3s 17ms/step - loss: 0.0036 - val_loss: 0.0038
The transition of the loss value in the training data and the verification data is shown below.
loss = history.history["loss"]
val_loss = history.history["val_loss"]
epochs = range(len(loss))
plt.figure(figsize=(10,5))
plt.plot(epochs, loss, "b", label="Training loss")
plt.plot(epochs, val_loss, "r", label="Validatioin loss")
plt.xlabel("epoch")
plt.ylabel("loss")
plt.title("Training and validation loss")
plt.legend(fontsize=20)
plt.show()
Prepare the predicted value and the target value in the test data.
test_gen_pred = generator(normal_cycle,
lookback=lookback,
pred_length=pred_length,
delay=delay,
min_index=30001,
max_index=None,
step=step,
batch_size=batch_size)
test_steps = (len(normal_cycle) - 30001 - lookback) // batch_size
test_pred = model.predict_generator(test_gen_pred, steps=test_steps)
test_gen_target = generator(normal_cycle,
lookback=lookback,
pred_length=pred_length,
delay=delay,
min_index=30001,
max_index=None,
step=step,
batch_size=batch_size)
test_target = np.zeros((test_steps * batch_size , pred_length))
for i in range(test_steps):
test_target[i*batch_size:(i+1)*batch_size] = next(test_gen_target)[1]
The mse is evaluated below. It can be confirmed that the model is appropriate.
from sklearn.metrics import mean_squared_error
print(mean_squared_error(test_pred, test_target))
0.0032039127006786993
Just in case, check the graph as well.
plt.figure(figsize=(15,5))
plt.plot(range(len(test_pred[0:2000,0])), test_pred[0:2000,0], "r", label="Prediction")
plt.plot(range(len(test_target[0:2000,0])), test_target[0:2000,0], "b", label="Target")
plt.xlabel("time")
plt.ylabel('ECG\'s value')
plt.legend(fontsize=10)
plt.show()
plt.figure(figsize=(15,5))
plt.plot(range(len(test_pred[:,0])), test_pred[:,0], "r", label="Prediction")
plt.plot(range(len(test_target[:,0])), test_target[:,0], "b", label="Target")
plt.xlabel("time")
plt.ylabel('ECG\'s value')
plt.legend(fontsize=10)
plt.show()
Assume that the error $ e $ between the predicted value and the target value follows a normal distribution. However, since we predicted three data, $ e $ is a three-dimensional vector, and we need to consider a three-dimensional normal distribution.
\begin{align*}
p(e) = N(e| \mu, \Sigma)
\end{align*}
Where $ \ mu $ is the mean vector and $ \ Sigma $ is the covariance matrix. In general, a multidimensional normal distribution is determined by the mean vector and the variance-covariance matrix.
Parameter estimation is performed from the sampled data. Currently, the error in the test data can be acquired for 10,000 steps.
\begin{align*}
\{ e_1, e_2, ..., e_{10000} \}
\end{align*}
In addition, each element is
\begin{align*}
e_i = (e_i ^{(1)}, e_i ^{(2)}, e_i ^{(3)} )
\end{align*}
Is.
Here, the maximum likelihood method is adopted as the method of parameter estimation. The maximum likelihood estimator of the normal distribution is as follows.
\begin{align*}
& \hat{\mu} = \frac{1}{N}\sum_{n=1}^{N}e_n \\
& \hat{\Sigma} = \frac{1}{N}\sum_{n=1}^{N}(e_n-\hat{\mu})(e_n-\hat{\mu})^{\top}
\end{align*}
Note that the estimator of the variance-covariance matrix by the most likely method is a sample variance-covariance matrix rather than an invariant-variance-covariance matrix.
With this data, we will calculate each estimator.
error = test_pred - test_target
mean = np.mean(error, axis=0)
print(mean)
cov = np.cov(error, rowvar=False, bias=True)
print(cov)
[0.00562191 0.00312449 0.00928306]
[[0.00149367 0.0016201 0.0013881 ]
[0.0016201 0.00302257 0.00310156]
[0.0013881 0.00310156 0.00496796]]
The reason why rowbar = False is to make the direction of the sum vertical. Also, the reason that bias = True is to find the sample covariance matrix. By default, bias = False. In this case, the invariant covariance matrix can be obtained.
The Mahalanobis distance is defined below using the previous estimates.
\begin{align*}
a(e) = (e-\hat{\mu})^\top \hat{\Sigma}^{-1}(e-\hat{\mu})
\end{align*}
It is difficult to understand if it is multidimensional, but if it is one-dimensional, it will be as follows. However, $ {\ hat {\ sigma} ^ 2} $ is the sample variance.
\begin{align*}
a(e) = \frac{(e-\hat{\mu})^2}{\hat{\sigma}^2}
\end{align*}
In other words, $ a (e) $ represents how far the sample data is from the mean value, and in the normal distribution, if this value is large, the probability of occurrence is low, in other words, an abnormal event has occurred. be able to. Thereby, an abnormal value can be detected.
def Mahalanobis_dist(x, mean, cov):
d = np.dot(x-mean, np.linalg.inv(cov))
d = np.dot(d, (x-mean).T)
return d
detection_gen_pred = generator(std_ecg,
lookback=lookback,
pred_length=pred_length,
delay=delay,
min_index=0,
max_index=5000,
step=step,
batch_size=batch_size)
detection_steps = (5000 -lookback) // batch_size
detection_pred = model.predict_generator(detection_gen_pred, steps=detection_steps)
detection_gen_target = generator(std_ecg,
lookback=lookback,
pred_length=pred_length,
delay=delay,
min_index=0,
max_index=5000,
step=step,
batch_size=batch_size)
detection_target = np.zeros((detection_steps * batch_size , pred_length))
for i in range(detection_steps):
detection_target[i*batch_size:(i+1)*batch_size] = next(detection_gen_target)[1]
Prepare the error vector and find the Mahalanobis distance.
error_detection = detection_pred - detection_target
m_dist = []
for e in error_detection:
m_dist.append(Mahalanobis_dist(e, mean, cov))
The result is as follows.
fig, axes = plt.subplots(nrows=2, figsize=(15,10))
axes[0].plot(std_ecg[:5000],color='b',label='original data')
axes[0].set_xlabel('time')
axes[0].set_ylabel('ECG\'s value' )
axes[0].set_xlim(-250, 5250)
axes[0].set_ylim(-3, 3)
x = np.arange(4200,4400)
y1 = [-3]*len(x)
y2 = [3]*len(x)
axes[0].fill_between(x, y1, y2, facecolor='g', alpha=.3)
axes[1].plot(m_dist, color='r',label='Mahalanobis Distance')
axes[1].set_xlabel('time')
axes[1].set_ylabel('Mahalanobis Distance')
axes[1].set_xlim(-250, 5250)
axes[1].set_ylim(0, 1000)
y1 = [0]*len(x)
y2 = [1000]*len(x)
axes[1].fill_between(x, y1, y2, facecolor='g', alpha=.3)
plt.legend(fontsize=15)
plt.show()
It can be seen that the Mahalanobis distance increases in the green areas. In other words, it means that the abnormality could be detected (sensually).
In the above, the abnormal value is detected by qualitative judgment, but it is necessary to set some threshold value in order to automatically detect the abnormal value.
The following facts are known about the Mahalanobis distance.
See reference [2] for more information.
Believe in this fact and use it to set the threshold. In this case, $ M = 3 $, $ N = 10000 $, so it's safe to think of it as $ N \ gg M $.
Hereafter, it is assumed that $ a (e) $ follows a chi-square distribution with three degrees of freedom. First, check the chi-square distribution with 3 degrees of freedom.
from scipy import stats
x = np.linspace(0, 50, 2000)
plt.figure(figsize=(15,5))
plt.style.use('ggplot')
plt.plot(x, stats.chi2.pdf(x, 3), "b", label="chi2, 3")
plt.legend(fontsize=15)
plt.show()
Then, the upper 1% point is calculated.
a_99 = stats.chi2.ppf(0.99, 3)
print(a_99)
11.344866730144373
This is adopted as a threshold value, and when this value is exceeded, it is judged as an abnormal value.
plt.figure(figsize=(15,5))
plt.style.use('ggplot')
plt.plot(m_dist, color='r',label='Mahalanobis Distance')
plt.xlabel('time')
plt.ylabel('Mahalanobis Distance')
plt.xlim(-250, 5250)
plt.ylim(0, 300)
plt.plot(np.linspace(-250, 5250), [a_99] * len(np.linspace(-250, 5250)), 'b')
plt.legend(fontsize=15)
plt.show()
It's kind of subtle. It sounds like that, but it's inflexible.
So I decided to try another method. The next method is to find the optimum threshold value according to the data.
It is assumed that the data held can be labeled as normal or abnormal, or has already been done.
The following content is based on reference [3].
For preparation, we will label with this data. Here, data from 0 to 5000 is targeted, and 4200 to 4400 is set as an abnormal value.
m_dist_th = np.zeros(4910)
m_dist_th[10:4910] = m_dist
TF_label = np.zeros(4910)
TF_label[4200:4400] = 1
By setting the threshold value, the following values are obtained. However, the right side represents the number of data satisfying the conditions.
\begin{align*}
& TP = (\, Label:positive, \quad Prediction:positive \, )\\
& FP = (\, Label:negative, \quad Prediction:positive \, )\\
& FN = (\, Label:positive, \quad Prediction:negative \, )\\
& TN = (\, Label:negative, \quad Prediction:negative \, )
\end{align*}
In addition, the following indicators will be introduced.
\begin{align*}
& precision = \frac{TP}{TP + FP} \\
& recall = \frac{TP}{TP + FN} \\
& F_{\beta} = (1+\beta^2) \frac{precision \cdot recal}{(\beta^2 \cdot precision) + recall}
\end{align*}
Find the threshold that maximizes $ F_ {\ beta} $. First, define a function that returns $ F_ {\ beta} $.
def f_score(th, TF_label, m_dist_th, beta):
TF_pred = np.where(m_dist_th > th, 1, 0)
TF_pred = 2 * TF_pred
PN = TF_label + TF_pred
TN = np.count_nonzero(PN == 0)
FN = np.count_nonzero(PN == 1)
FP = np.count_nonzero(PN == 2)
TP = np.count_nonzero(PN == 3)
precision = TP/(TP + FP)
recall = TP/(TP+FN)
return (1+beta**2)*precision*recall/(beta**2*precision + recall)
Hereafter, $ \ beta = 0.1 $. Illustration of $ F_ {0.1} $ for thresholds 0-1000.
th_line = np.linspace(0, 1000, 5000)
dx = 0.2
f_score_graph = []
for i in range(5000):
f_score_graph.append(f_score(dx * i, TF_label, m_dist_th, 0.1))
plt.figure(figsize=(15,5))
plt.style.use('ggplot')
plt.plot(th_line, f_score_graph, color='g', label = "f_score")
plt.legend(fontsize=15)
plt.show()
Find the threshold that maximizes $ F_ {0.1} $.
from scipy import optimize
def minus_f_score(th, TF_label, m_dist_th, beta):
return -f_score(th, TF_label, m_dist_th, beta)
ave_m_dist_th = np.average(m_dist_th)
opt_th = optimize.fmin_powell(minus_f_score, ave_m_dist_th, (TF_label, m_dist_th, 0.1))
print(opt_th)
Optimization terminated successfully.
Current function value: -0.875333
Iterations: 2
Function evaluations: 86
309.19475611029304
plt.figure(figsize=(15,5))
plt.style.use('ggplot')
plt.plot(m_dist_th, color='r',label='Mahalanobis Distance')
plt.xlabel('time')
plt.ylabel('Mahalanobis Distance')
plt.xlim(-250, 5250)
plt.ylim(0, 1000)
plt.plot(np.linspace(-250, 5250), [opt_th] * len(np.linspace(-250, 5250)), 'b')
plt.legend(fontsize=15)
plt.show()
Good feeling.
[1] ReNom tutorial, anomaly detection of time series data by LSTM, https://www.renom.jp/ja/notebooks/tutorial/time_series/lstm-anomalydetection/notebook.html.
[2] Takeshi Ide, Introductory Anomaly Detection by Machine Learning -Practical Guide by R-, Corona Publishing Co., Ltd., 2015.
[3] P.Malhotra, L.Vig, G.Shroff and P.Agarwal, Long short term memory networks for anomaly detection in time series, Proceedings. Presses universitaires de Louvain, 2015.
Recommended Posts