Convolutional neural networks are generally used in image processing, but this time I tried using a one-dimensional vector as seen in sensor data. The point is converted to the same 3D (RGB, X, Y) as the image by converting the data structure using reshape. I think that convolution is effective for anomaly detection because it can extract features even if there are few learning parameters.
I use numpy to create a sine wave for one cycle, Add pseudo noise with np.random.rand () Create 100 items with some variation. We used 99 as learning waveforms and the remaining one as verification waveforms.
data=[]
for i in range(100):
data.append([np.sin(np.pi * n /50)*(1+np.random.rand())for n in range(100)])
Create a learning model using Chainer's Convolution 2D. The structure is such that the convoluted data is restored to the original input data in the last layer. As a result, feature extraction is performed as an AutoEncoder.
~~ The activation function is set to Tanh because if you do it with ReLU, the data in the middle of Convlution will be displayed. I thought it would look bad because there was no negative side when visualizing it. ~~ (Addition) The data for visualization is taken out without passing through the activation function. I thought this was more correct.
Even if I did it with ReLU, the Sin wave data did not affect the learning result.
class MyChain(chainer.Chain):
def __init__(self,n_out):
super(MyChain, self).__init__()
with self.init_scope():
self.l1 = L.Convolution2D(None,2, ksize=(1,4),stride=(1,1))
self.l2 = L.Convolution2D(None,2, ksize=(1,4),stride=(1,1))
self.l3 = L.Convolution2D(None,2, ksize=(1,4),stride=(1,1))
self.l4 = L.Linear(None, n_out)
def __Call__(self,x,y):
return F.mean_squared_error(self.fwd(x),y)
def fwd(self, x):
h1 = F.tanh(F.max_pooling_2d(self.l1(x),2))
h2 = F.tanh(F.max_pooling_2d(self.l2(h1),2))
h3 = F.tanh(F.max_pooling_2d(self.l3(h2),2))
h3 = h3.reshape(h3.shape[0],-1)
return self.l4(h3)
When using CNN, one-dimensional vector data cannot be read. Therefore, the training data is converted using Reshape. Also, the one that becomes teacher data is the original vector data.
TrainData = np.array(data,dtype=np.float32).reshape(100,1,1,100)
x=chainer.Variable(TrainData[:99])
for epoch in range(201):
model.zerograds()
loss=model(x,x.reshape(99,100))
loss.backward()
optimizer.update()
First, the result of restoration from the input waveform. It can be restored normally.
For reference, let's take a look at the waveform of the convolution process. I'm trying to match the size of the data in a pseudo manner.
Well, honestly I don't really understand. It can be said that Layer3 is a waveform with narrowed down features. What are the characteristics of the Sin wave itself? Do you capture the shape of a mountain? I tried learning several times, but each time the waveform is different. I think it's interesting.
I have seen anomaly detection using AutoEncoder in the past, It is a method to express the degree of anomaly by using the difference between the input and the restored output. Similarly, I created anomalous data and verified it.
The first is the phase shift I tried to shift the input value by 5 RBIs (5/100 cycle).
The Predict waveform is close to the original phase and the line shape is jagged. It seems that this can be easily detected as an abnormality.
Next, when one point like a spike protrudes
This is also jagged, isn't it? If it is jagged like this, it may be used for anomaly detection from features such as differences.
This is my first post. Through this site, I read your posts and study. I thought that I would give back, so I decided to post it. I hope this post will be of some help to you.
Environment Python 3.6.1 Anaconda 4.4.0 (64-bit) Chainer 2.0.2
import chainer
import chainer.functions as F
import chainer.links as L
import chainer.optimizers
import numpy as np
import matplotlib.pyplot as plt
class MyChain(chainer.Chain):
def __init__(self,n_out):
super(MyChain, self).__init__()
with self.init_scope():
self.l1 = L.Convolution2D(None,2, ksize=(1,4),stride=(1,1))
self.l2 = L.Convolution2D(None,2, ksize=(1,4),stride=(1,1))
self.l3 = L.Convolution2D(None,2, ksize=(1,4),stride=(1,1))
self.l4 = L.Linear(None, n_out)
def __call__(self,x,y):
return F.mean_squared_error(self.fwd(x),y)
def fwd(self, x):
h1 = F.tanh(F.max_pooling_2d(self.l1(x),2))
h2 = F.tanh(F.max_pooling_2d(self.l2(h1),2))
h3 = F.tanh(F.max_pooling_2d(self.l3(h2),2))
h3 = h3.reshape(h3.shape[0],-1)
return self.l4(h3)
def Layaer1(self, x):
return F.max_pooling_2d(self.l1(x),2)
def Layaer2(self, x):
h1=F.tanh(F.max_pooling_2d(self.l1(x),2))
return F.max_pooling_2d(self.l2(h1),2)
def Layaer3(self, x):
h1 = F.tanh(F.max_pooling_2d(self.l1(x),2))
h2 = F.tanh(F.max_pooling_2d(self.l2(h1),2))
return F.max_pooling_2d(self.l3(h2),2)
def CreatePlotData(arr,n1):
Buf1,Buf2=[],[]
for j in range(n1):
Buf1.append(0)
Buf2.append(0)
for i in range(arr.shape[1]):
for j in range(n1):
Buf1.append(arr[0][i].real)
Buf2.append(arr[1][i].real)
return np.array(Buf1,dtype=np.float32),np.array(Buf2,dtype=np.float32)
data=[]
for i in range(100):
data.append([np.sin(np.pi * n /50)*(1+np.random.rand())for n in range(100)])
model = MyChain(100)
optimizer = chainer.optimizers.Adam()
optimizer.setup(model)
TrainData = np.array(data,dtype=np.float32).reshape(100,1,1,100)
x=chainer.Variable(TrainData[:99])
ValidationData=TrainData[99].reshape(1,1,1,100)
PlotInput = ValidationData.reshape(100)
for epoch in range(201):
model.zerograds()
loss=model(x,x.reshape(99,100))
loss.backward()
optimizer.update()
if epoch%20==0:
Layer1Arr = np.array(model.Layaer1(ValidationData).data).reshape(2,-1)
Layer1Arr1,Layer1Arr2 = CreatePlotData(Layer1Arr,2)
Layer2Arr = np.array(model.Layaer2(ValidationData).data).reshape(2,-1)
Layer2Arr1,Layer2Arr2 = CreatePlotData(Layer2Arr,4)
Layer3Arr = np.array(model.Layaer3(ValidationData).data).reshape(2,-1)
Layer3Arr1,Layer3Arr2 = CreatePlotData(Layer3Arr,8)
plt.plot(PlotInput,label='Input')
plt.plot(Layer1Arr1,label='Lalyer1-1')
plt.plot(Layer1Arr2,label='Lalyer1-2')
plt.plot(Layer2Arr1,label='Lalyer2-1')
plt.plot(Layer2Arr2,label='Lalyer2-2')
plt.plot(Layer3Arr1,label='Lalyer3-1')
plt.plot(Layer3Arr2,label='Lalyer3-2')
plt.legend()
plt.savefig(str(epoch)+'-Epoch Convolution Graph.png')
plt.close()
predict = model.fwd(ValidationData)
predict=np.array(predict.data).reshape(100)
plt.plot(predict,label='Predict')
plt.plot(PlotInput,label='Input')
plt.legend()
plt.savefig(str(epoch)+'-Epoch Validation Graph.png')
plt.close()
ErrorPlot = [PlotInput[i+5]for i in range(len(PlotInput)-5)]
for i in range(5):
ErrorPlot.append(PlotInput[i])
predict = model.fwd(chainer.Variable(np.array(ErrorPlot,dtype=np.float32)).reshape(1,1,1,100))
predict=np.array(predict.data).reshape(100)
plt.plot(predict,label='Predict')
plt.plot(ErrorPlot,label='Error Input')
plt.legend()
plt.savefig('Shift Error Input Graph.png')
plt.close()
Rnd = np.random.randint(0,99)
ErrorPlot2=np.array(PlotInput)
ErrorPlot2[Rnd]=ErrorPlot2[Rnd]+3
predict = model.fwd(chainer.Variable(np.array(ErrorPlot2,dtype=np.float32)).reshape(1,1,1,100))
predict=np.array(predict.data).reshape(100)
plt.plot(predict,label='Predict')
plt.plot(ErrorPlot2,label='Error Input')
plt.legend()
plt.savefig('Spike Error Input Graph.png')
plt.close()
Recommended Posts