LSTM détecte les anomalies dans les données chronologiques.
Implémenté en utilisant Keras en se référant au contenu du Tutoriel ReNom dans Reference [1]. ..
Le flux général est le suivant.
Step1 Créez un modèle qui prédit les valeurs futures à partir de données passées à l'aide de données normales.
Step2 Prédire avec des données de test en utilisant le modèle créé et échantillonner le vecteur d'erreur. Ensuite, une distribution normale est ajustée au vecteur d'erreur.
Step3 Calculez le vecteur d'erreur de la pièce où l'anomalie semble s'être produite, et s'il y a un vecteur d'erreur à la queue de la distribution normale estimée à l'étape 2, on juge que l'anomalie s'est produite.
%matplotlib inline
import keras
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
Utilisez les données ECG appelées ensemble de données ECG qtdb / sel102 [2].
Les données peuvent être obtenues à:
http://www.cs.ucr.edu/~eamonn/discords/qtdbsel102.txt
df = pd.read_csv('qtdbsel102.txt', header=None, delimiter='\t')
print(df.shape)
(45000, 3)
Cette fois, nous nous concentrerons sur les données de la troisième colonne pour détecter les anomalies.
ecg = df.iloc[:,2].values
ecg = ecg.reshape(len(ecg), -1)
print("length of ECG data:", len(ecg))
length of ECG data: 45000
Lors du prétraitement des données, définissez la moyenne sur 0 et la variance sur 1.
scaler = StandardScaler()
std_ecg = scaler.fit_transform(ecg)
Ensuite, les données sont illustrées pour confirmer la tendance.
plt.figure()
plt.style.use('ggplot')
plt.figure(figsize=(15,5))
plt.xlabel('time')
plt.ylabel('ECG\'s value')
plt.plot(np.arange(45000), std_ecg[:45000], color='b')
plt.legend()
plt.show()
Bien qu'elle soit périodique dans son ensemble, on peut voir que la forme est cassée en un seul endroit avant 5000 étapes.
Alors, développons le graphique de 0 pas à 5000 pas.
plt.figure()
plt.style.use('ggplot')
plt.figure(figsize=(15,5))
plt.xlabel('time')
plt.ylabel('ECG\'s value')
plt.plot(np.arange(5000), std_ecg[:5000], color='b')
plt.ylim(-3, 3)
x = np.arange(4200,4400)
y1 = [-3]*len(x)
y2 = [3]*len(x)
plt.fill_between(x, y1, y2, facecolor='g', alpha=.3)
plt.legend()
plt.show()
On constate que la périodicité est rompue au niveau de la partie peinte en vert.
Les objectifs spécifiques de cette période sont les suivants.
Afin d'apprendre uniquement avec des données normales, les données après 5000 étapes sont supprimées.
normal_cycle = std_ecg[5000:]
Préparez les générateurs suivants pour diviser les données possédées en données d'entraînement, données de vérification et données de test.
Ici, les 3 données suivantes sont prédites à partir des 10 données.
Remarque) le délai et le pas ne sont pas utilisés cette fois. Cela signifie "prédire les données après un délai" et "utiliser les données à chaque étape", respectivement.
def generator(data, lookback, delay, pred_length, min_index, max_index, shuffle=False,
batch_size=100, step=1):
if max_index is None:
max_index = len(data) - delay - pred_length - 1
i = min_index + lookback
while 1:
if shuffle:
rows = np.random.randint(min_index + lookback, max_index,
size=batch_size)
else:
if i + batch_size >= max_index:
i = min_index + lookback
rows = np.arange(i, min(i + batch_size, max_index))
i += len(rows)
samples = np.zeros((len(rows),
lookback//step,
data.shape[-1]))
targets = np.zeros((len(rows), pred_length))
for j, row in enumerate(rows):
indices = range(rows[j] - lookback, rows[j], step)
samples[j] = data[indices]
targets[j] = data[rows[j] + delay : rows[j] + delay + pred_length].flatten()
yield samples, targets
lookback = 10
pred_length = 3
step = 1
delay = 1
batch_size = 100
#Générateur de formation
train_gen = generator(normal_cycle,
lookback=lookback,
pred_length=pred_length,
delay=delay,
min_index=0,
max_index=20000,
shuffle=True,
step=step,
batch_size=batch_size)
val_gen = generator(normal_cycle,
lookback=lookback,
pred_length=pred_length,
delay=delay,
min_index=20001,
max_index=30000,
step=step,
batch_size=batch_size)
#Val pour examiner l'ensemble des données de validation_Nombre d'incréments de temps extraits de gen
val_steps = (30001 - 20001 -lookback) // batch_size
#D'autres sont des données de test
Ici, un modèle dans lequel deux LSTM sont empilés est utilisé.
LSTM est un réseau neuronal adapté aux données de séries chronologiques.
Seules les 5 dernières étapes seront affichées.
import keras
from keras.models import Sequential
from keras import layers
from keras.optimizers import RMSprop
model = Sequential()
model.add(layers.LSTM(35, return_sequences = True, input_shape=(None,normal_cycle.shape[-1])))
model.add(layers.LSTM(35))
model.add(layers.Dense(pred_length))
model.compile(optimizer=RMSprop(), loss="mse")
history = model.fit_generator(train_gen,
steps_per_epoch=200,
epochs=60,
validation_data=val_gen,
validation_steps=val_steps)
Epoch 56/60
200/200 [==============================] - 3s 17ms/step - loss: 0.0037 - val_loss: 0.0043
Epoch 57/60
200/200 [==============================] - 3s 17ms/step - loss: 0.0039 - val_loss: 0.0045
Epoch 58/60
200/200 [==============================] - 3s 17ms/step - loss: 0.0036 - val_loss: 0.0047
Epoch 59/60
200/200 [==============================] - 3s 17ms/step - loss: 0.0036 - val_loss: 0.0043
Epoch 60/60
200/200 [==============================] - 3s 17ms/step - loss: 0.0036 - val_loss: 0.0038
La transition de la valeur de perte dans les données d'apprentissage et les données de vérification est indiquée ci-dessous.
loss = history.history["loss"]
val_loss = history.history["val_loss"]
epochs = range(len(loss))
plt.figure(figsize=(10,5))
plt.plot(epochs, loss, "b", label="Training loss")
plt.plot(epochs, val_loss, "r", label="Validatioin loss")
plt.xlabel("epoch")
plt.ylabel("loss")
plt.title("Training and validation loss")
plt.legend(fontsize=20)
plt.show()
Préparez la valeur prédite et la valeur cible dans les données de test.
test_gen_pred = generator(normal_cycle,
lookback=lookback,
pred_length=pred_length,
delay=delay,
min_index=30001,
max_index=None,
step=step,
batch_size=batch_size)
test_steps = (len(normal_cycle) - 30001 - lookback) // batch_size
test_pred = model.predict_generator(test_gen_pred, steps=test_steps)
test_gen_target = generator(normal_cycle,
lookback=lookback,
pred_length=pred_length,
delay=delay,
min_index=30001,
max_index=None,
step=step,
batch_size=batch_size)
test_target = np.zeros((test_steps * batch_size , pred_length))
for i in range(test_steps):
test_target[i*batch_size:(i+1)*batch_size] = next(test_gen_target)[1]
Le mse est évalué ci-dessous. Il peut être confirmé que le modèle est approprié.
from sklearn.metrics import mean_squared_error
print(mean_squared_error(test_pred, test_target))
0.0032039127006786993
Au cas où, vérifiez également le graphique.
plt.figure(figsize=(15,5))
plt.plot(range(len(test_pred[0:2000,0])), test_pred[0:2000,0], "r", label="Prediction")
plt.plot(range(len(test_target[0:2000,0])), test_target[0:2000,0], "b", label="Target")
plt.xlabel("time")
plt.ylabel('ECG\'s value')
plt.legend(fontsize=10)
plt.show()
plt.figure(figsize=(15,5))
plt.plot(range(len(test_pred[:,0])), test_pred[:,0], "r", label="Prediction")
plt.plot(range(len(test_target[:,0])), test_target[:,0], "b", label="Target")
plt.xlabel("time")
plt.ylabel('ECG\'s value')
plt.legend(fontsize=10)
plt.show()
Supposons que l'erreur $ e $ entre la valeur prédite et la valeur cible suit une distribution normale. Cependant, puisque nous avons prédit trois données, $ e $ est un vecteur tridimensionnel, et nous devons considérer une distribution normale tridimensionnelle.
\begin{align*}
p(e) = N(e| \mu, \Sigma)
\end{align*}
Où $ \ mu $ est le vecteur moyen et $ \ Sigma $ est la matrice distribuée co-distribuée. En général, une distribution normale multidimensionnelle est déterminée par le vecteur moyen et la matrice de variance-covariance.
L'estimation des paramètres est effectuée à partir des données échantillonnées. Actuellement, l'erreur dans les données de test peut être acquise pendant 10 000 étapes.
\begin{align*}
\{ e_1, e_2, ..., e_{10000} \}
\end{align*}
De plus, chaque élément est
\begin{align*}
e_i = (e_i ^{(1)}, e_i ^{(2)}, e_i ^{(3)} )
\end{align*}
Est.
Ici, la méthode la plus probable est adoptée comme méthode d'estimation des paramètres. Les estimations les plus probables de la distribution normale sont les suivantes.
\begin{align*}
& \hat{\mu} = \frac{1}{N}\sum_{n=1}^{N}e_n \\
& \hat{\Sigma} = \frac{1}{N}\sum_{n=1}^{N}(e_n-\hat{\mu})(e_n-\hat{\mu})^{\top}
\end{align*}
Notez que l'estimation de la matrice de variance-co-distribution par la méthode la plus probable est une matrice de co-distribution de dispersion d'échantillon plutôt qu'une matrice de variance-co-distribution invariante.
Avec ces données, chaque montant estimé est calculé.
error = test_pred - test_target
mean = np.mean(error, axis=0)
print(mean)
cov = np.cov(error, rowvar=False, bias=True)
print(cov)
[0.00562191 0.00312449 0.00928306]
[[0.00149367 0.0016201 0.0013881 ]
[0.0016201 0.00302257 0.00310156]
[0.0013881 0.00310156 0.00496796]]
La raison pour laquelle rowbar = False est de rendre la direction de la somme verticale. De plus, biais = True permet de trouver la matrice de covariance de l'échantillon. Par défaut, biais = Faux. Dans ce cas, la matrice de co-distribution de distribution invariante peut être obtenue.
La distance de Maharanobis est définie ci-dessous en utilisant les estimations précédentes.
\begin{align*}
a(e) = (e-\hat{\mu})^\top \hat{\Sigma}^{-1}(e-\hat{\mu})
\end{align*}
Il est difficile de comprendre s'il est multidimensionnel, mais s'il est unidimensionnel, ce sera comme suit. Cependant, $ {\ hat {\ sigma} ^ 2} $ est l'exemple de distribution.
\begin{align*}
a(e) = \frac{(e-\hat{\mu})^2}{\hat{\sigma}^2}
\end{align*}
En d'autres termes, $ a (e) $ représente la distance entre les données de l'échantillon et la valeur moyenne, et dans la distribution normale, si cette valeur est grande, un événement qui est peu susceptible de se produire, en d'autres termes, un événement anormal s'est produit. être capable de. Ainsi, une valeur anormale peut être détectée.
def Mahalanobis_dist(x, mean, cov):
d = np.dot(x-mean, np.linalg.inv(cov))
d = np.dot(d, (x-mean).T)
return d
detection_gen_pred = generator(std_ecg,
lookback=lookback,
pred_length=pred_length,
delay=delay,
min_index=0,
max_index=5000,
step=step,
batch_size=batch_size)
detection_steps = (5000 -lookback) // batch_size
detection_pred = model.predict_generator(detection_gen_pred, steps=detection_steps)
detection_gen_target = generator(std_ecg,
lookback=lookback,
pred_length=pred_length,
delay=delay,
min_index=0,
max_index=5000,
step=step,
batch_size=batch_size)
detection_target = np.zeros((detection_steps * batch_size , pred_length))
for i in range(detection_steps):
detection_target[i*batch_size:(i+1)*batch_size] = next(detection_gen_target)[1]
Préparez un vecteur d'erreur et trouvez la distance Maharanobis.
error_detection = detection_pred - detection_target
m_dist = []
for e in error_detection:
m_dist.append(Mahalanobis_dist(e, mean, cov))
Le résultat est le suivant.
fig, axes = plt.subplots(nrows=2, figsize=(15,10))
axes[0].plot(std_ecg[:5000],color='b',label='original data')
axes[0].set_xlabel('time')
axes[0].set_ylabel('ECG\'s value' )
axes[0].set_xlim(-250, 5250)
axes[0].set_ylim(-3, 3)
x = np.arange(4200,4400)
y1 = [-3]*len(x)
y2 = [3]*len(x)
axes[0].fill_between(x, y1, y2, facecolor='g', alpha=.3)
axes[1].plot(m_dist, color='r',label='Mahalanobis Distance')
axes[1].set_xlabel('time')
axes[1].set_ylabel('Mahalanobis Distance')
axes[1].set_xlim(-250, 5250)
axes[1].set_ylim(0, 1000)
y1 = [0]*len(x)
y2 = [1000]*len(x)
axes[1].fill_between(x, y1, y2, facecolor='g', alpha=.3)
plt.legend(fontsize=15)
plt.show()
On constate que la distance Maharanobis augmente dans les espaces verts. En d'autres termes, l'anomalie pourrait être détectée (sensuellement).
Dans ce qui précède, la valeur anormale est détectée par un jugement qualitatif, mais il est nécessaire de fixer un certain seuil afin de détecter automatiquement la valeur anormale.
Les faits suivants sont connus concernant la distance Maharanobis.
Voir la référence [2] pour plus d'informations.
Croyez en ce fait et utilisez-le pour définir le seuil. Dans ce cas, $ M = 3 $, $ N = 10000 $, il est donc prudent de le considérer comme $ N \ gg M $.
Par la suite, on suppose que $ a (e) $ suit une distribution du chi carré avec 3 degrés de liberté. Tout d'abord, vérifiez la distribution du chi carré avec 3 degrés de liberté.
from scipy import stats
x = np.linspace(0, 50, 2000)
plt.figure(figsize=(15,5))
plt.style.use('ggplot')
plt.plot(x, stats.chi2.pdf(x, 3), "b", label="chi2, 3")
plt.legend(fontsize=15)
plt.show()
Ensuite, le point 1% supérieur est calculé.
a_99 = stats.chi2.ppf(0.99, 3)
print(a_99)
11.344866730144373
Ceci est adopté comme valeur de seuil, et lorsque cette valeur est dépassée, elle est jugée comme une valeur anormale.
plt.figure(figsize=(15,5))
plt.style.use('ggplot')
plt.plot(m_dist, color='r',label='Mahalanobis Distance')
plt.xlabel('time')
plt.ylabel('Mahalanobis Distance')
plt.xlim(-250, 5250)
plt.ylim(0, 300)
plt.plot(np.linspace(-250, 5250), [a_99] * len(np.linspace(-250, 5250)), 'b')
plt.legend(fontsize=15)
plt.show()
C'est plutôt subtil. Cela ressemble à ça, mais cela manque de flexibilité.
J'ai donc décidé d'essayer une autre méthode. La méthode suivante consiste à trouver le seuil optimal en fonction des données.
On suppose que les données conservées peuvent être étiquetées comme normales ou anormales, ou que cela a déjà été fait.
Le contenu suivant est basé sur la référence [3].
Pour la préparation, nous allons étiqueter avec ces données. Ici, les données de 0 à 5000 sont ciblées, et 4200 à 4400 est défini comme une valeur anormale.
m_dist_th = np.zeros(4910)
m_dist_th[10:4910] = m_dist
TF_label = np.zeros(4910)
TF_label[4200:4400] = 1
En définissant le seuil, les valeurs suivantes sont obtenues. Cependant, le côté droit représente le nombre de données satisfaisant les conditions.
\begin{align*}
& TP = (\, Label:positive, \quad Prediction:positive \, )\\
& FP = (\, Label:negative, \quad Prediction:positive \, )\\
& FN = (\, Label:positive, \quad Prediction:negative \, )\\
& TN = (\, Label:negative, \quad Prediction:negative \, )
\end{align*}
De plus, les indicateurs suivants seront introduits.
\begin{align*}
& precision = \frac{TP}{TP + FP} \\
& recall = \frac{TP}{TP + FN} \\
& F_{\beta} = (1+\beta^2) \frac{precision \cdot recal}{(\beta^2 \cdot precision) + recall}
\end{align*}
Trouvez le seuil qui maximise $ F_ {\ beta} $. Tout d'abord, définissez une fonction qui renvoie $ F_ {\ beta} $.
def f_score(th, TF_label, m_dist_th, beta):
TF_pred = np.where(m_dist_th > th, 1, 0)
TF_pred = 2 * TF_pred
PN = TF_label + TF_pred
TN = np.count_nonzero(PN == 0)
FN = np.count_nonzero(PN == 1)
FP = np.count_nonzero(PN == 2)
TP = np.count_nonzero(PN == 3)
precision = TP/(TP + FP)
recall = TP/(TP+FN)
return (1+beta**2)*precision*recall/(beta**2*precision + recall)
Par la suite, $ \ beta = 0,1 $. Illustration de $ F_ {0,1} $ pour les seuils 0-1000.
th_line = np.linspace(0, 1000, 5000)
dx = 0.2
f_score_graph = []
for i in range(5000):
f_score_graph.append(f_score(dx * i, TF_label, m_dist_th, 0.1))
plt.figure(figsize=(15,5))
plt.style.use('ggplot')
plt.plot(th_line, f_score_graph, color='g', label = "f_score")
plt.legend(fontsize=15)
plt.show()
Trouvez le seuil auquel $ F_ {0,1} $ est le maximum.
from scipy import optimize
def minus_f_score(th, TF_label, m_dist_th, beta):
return -f_score(th, TF_label, m_dist_th, beta)
ave_m_dist_th = np.average(m_dist_th)
opt_th = optimize.fmin_powell(minus_f_score, ave_m_dist_th, (TF_label, m_dist_th, 0.1))
print(opt_th)
Optimization terminated successfully.
Current function value: -0.875333
Iterations: 2
Function evaluations: 86
309.19475611029304
plt.figure(figsize=(15,5))
plt.style.use('ggplot')
plt.plot(m_dist_th, color='r',label='Mahalanobis Distance')
plt.xlabel('time')
plt.ylabel('Mahalanobis Distance')
plt.xlim(-250, 5250)
plt.ylim(0, 1000)
plt.plot(np.linspace(-250, 5250), [opt_th] * len(np.linspace(-250, 5250)), 'b')
plt.legend(fontsize=15)
plt.show()
Bon sentiment.
[1] Tutoriel ReNom, Détection d'anomalies des données de séries temporelles par LSTM, https://www.renom.jp/ja/notebooks/tutorial/time_series/lstm-anomalydetection/notebook.html.
[2] Takeshi Ide, Introduction à la détection des anomalies par apprentissage automatique - Guide pratique de R-, Corona, 2015.
[3] P.Malhotra, L.Vig, G.Shroff and P.Agarwal, Long short term memory networks for anomaly detection in time series, Proceedings. Presses universitaires de Louvain, 2015.
Recommended Posts