I had the opportunity to detect change points at work. At that time, I didn't have much time, so I made it by referring to Algorithm using ARIMA model that yokkuns was doing. I did. However, the ARIMA model had various troubles, so I tried to rewrite it with the kalman filter.
The main reference was "anomaly detection by data mining," which everyone loves.
The calculation steps are as follows: The calculation can be broadly divided into a learning step and a score calculation step.
Here are the steps to calculate before new data comes in.
This is the step to calculate when new data comes in.
The outline of the change point score calculation is as follows.
It's easy, but that's it. Then, I will write the code below.
KF.py
# coding: utf8
from numpy.oldnumeric.linear_algebra import inverse
from scipy import linalg
import numpy as np
from math import log
class KalmanFiltering:
limy = 1e20 #Boundary of numerical values to be regarded as missing
GSIG2 = 1
L = 1
R = np.identity(L)
NSUM = 0.0
SIG2 = 0.0
LDET = 0.0
def __init__(self, k, p, q, term=10, w=10):
self.k = k #Floor difference
self.p = p #Seasonal circulation
self.q = q #AR component
self.m, self.F, self.G, \
self.H, self.Q = self.FGHset(0,k,p,q,w)
self.term = term
self.strg_trm = term
self.resid = np.zeros(self.term)
self.pred = 0.0
# matrix for storage predicted value
self.XPS = np.zeros((term,self.m), dtype=np.float)
self.VPS = np.array([np.eye(self.m, dtype=np.float)]*term)
# matrix for storage predicted value
self.XFS = np.zeros((term,self.m), dtype=np.float)
self.VFS = np.array([np.eye(self.m, dtype=np.float)]*term)
# matrix for storage smoothed value
self.XSS = np.zeros((term,self.m), dtype=np.float)
self.VSS = np.array([np.eye(self.m, dtype=np.float)]*term)
def forward_backward(self, new_data, smoothing=0):
self.NSUM += 1
if self.NSUM < self.strg_trm:
self.term = int(self.NSUM)
else:
self.term = self.strg_trm
# forward
self.forward(new_data)
# smoothing
self.SMO()
if smoothing==1:
return np.mean( self.XSS[:self.term,0] )
return self.predict()[0]
def forward(self, y):
XF = self.XFS[self.term-1]
VF = self.VFS[self.term-1]
# 1span predicting
XP, VP = self.forward_predicting(VF, XF)
XF, VF = self.filtering(y, XP, VP)
self.storage_params(XP, XF, VP, VF)
# sig2 = self.SIG2 / self.NSUM
# FF = -0.5 * (self.NSUM * (log(2 * np.pi * sig2) + 1) + self.LDET)
# return {'LLF':FF, 'Ovar':sig2}
def storage_params(self, XP, XF, VP, VF):
if self.NSUM>self.term:
self.XPS[:self.term-1] = self.XPS[1:self.term]
self.XFS[:self.term-1] = self.XFS[1:self.term]
self.VPS[:self.term-1] = self.VPS[1:self.term]
self.VFS[:self.term-1] = self.VFS[1:self.term]
self.normal_storage(XP, XF, VP, VF)
else:
self.normal_storage(XP, XF, VP, VF)
def normal_storage(self, XP, XF, VP, VF):
self.XPS[self.term-1] = XP
self.XFS[self.term-1] = XF
self.VPS[self.term-1] = VP
self.VFS[self.term-1] = VF
def forward_predicting(self, VF, XF):
"""1span predicting"""
XP = np.ndarray.flatten( np.dot(self.F, XF.T) ) #Since it becomes a vertical vector from the second week, it is always converted to a horizontal vector
VP = self.F.dot(VF).dot(self.F.T) + self.G.dot(self.Q).dot(self.G.T)
return XP, VP
def filtering(self, y, XP, VP):
if y < self.limy:
B = np.dot( np.dot(self.H, VP), self.H.T) + self.R #H is mathematically a horizontal vector
B1 = inverse(B)
K = np.matrix(np.dot(VP, self.H.T)) * np.matrix(B1) #K becomes a vertical vector(matrix)
e = np.array(y).T - np.dot(self.H, XP.T)
XF = np.array(XP) + np.array( K * np.matrix(e) ).T #Horizontal vector
VF = np.array(VP) - np.array( K* np.matrix(self.H) * VP)
self.SIG2 += np.ndarray.flatten(np.array( np.matrix(e) * np.matrix(B1) * np.matrix(e).T ))[0] #Make matrix so that it can be calculated even in one dimension
self.LDET += log(linalg.det(B))
else:
XF = XP; VF = VP
return XF, VF
def SMO(self):
"""fixed-interval smoothing"""
XS1 = self.XFS[self.term-1]
VS1 = self.VFS[self.term-1]
self.XSS[self.term-1] = XS1
self.VSS[self.term-1] = VS1
for n1 in xrange(self.term):
n = (self.term-1) - n1; XP = self.XPS[n]; XF = self.XFS[n-1]
VP = self.VPS[n]; VF = self.VFS[n-1]; VPI = inverse(VP)
A = np.dot( np.dot(VF, self.F.T), VPI)
XS2 = XF + np.dot(A, (XS1 - XP))
VS2 = VF + np.dot( np.dot(A, (VS1 - VP)), A.T )
XS1 = XS2; VS1 = VS2
self.XSS[n-1] = XS1
self.VSS[n-1] = VS1
#Definition of TAU2x log-likelihood function
def LogL(self, parm, *args):
y=args[0]
LLF = self.forward(y)
LL = LLF['LLF']
return -LL #Since optimeze is a minimization function, it returns the log-likelihood multiplied by minus.
def predict(self, forward_time=1):
"""pridint average value"""
y = np.zeros(forward_time, dtype=np.float)
XFp=self.XFS[-1] #Get only the most recent data matrix
#VFp=VF[XF.shape[0]-1,:]
for n in xrange(forward_time):
XP = np.ndarray.flatten( np.dot(self.F, XFp.T) )
#VP = np.dot( np.dot(F, VF), F.T ) + np.dot( np.dot(G, Q), G.T )
y[n] = np.dot(self.H, XP) #Since it takes the expected value, no noise is added.
XFp=XP
return y
def FGHset(self, al, k, p, q, w=10):
"""Matrix setting of state-space representation of seasonally adjusted model
al: α vector of AR model
k,p,q: Difference, seasonal cycle, number of AR parameters (k when predicting)>=2)
w:Dispersion of system error (For change point detection, it should be set small and fixed)
"""
m = k + p + q -1
if q>0: G = np.zeros((m,3), dtype=np.float) #When the state model includes trend, season, and AR
elif p>0: G = np.zeros((m,2), dtype=np.float) #When it does not contain AR component(q=0)
else: m=k; G = np.zeros((m,1), dtype=np.float)
F = np.zeros((m,m), dtype=np.float)
H = np.zeros((1,m), dtype=np.float)
ns = 0; ls =0
#Construction of block matrix of trend model
if k>0:
ns +=1
G[0,0] = 1; H[0,0] = 1
if k==1: F[0,0] = 1
if k==2: F[0,0] = 2; F[0,1] = -1; F[1,0] = 1
if k==3: F[0,0] = 3; F[0,1] = -3; F[0,2] = 1; F[1,0] = 1; F[2,1] = 1
ls += k
#Construction of seasonally adjusted component block matrix
if p>0:
ns +=1
G[ls, ns-1] = 1
H[0,ls] = 1
for i in xrange(p-1): F[ls, ls+i] = -1
for i in xrange(p-2): F[ls+i+1, ls+i] = 1
ls +=p-1
#Construction of AR component block matrix
if q>0:
ns +=1
G[ls, ns-1] = 1
H[0,ls] = 1
for i in xrange(q): F[ls, ls+i-1] = al[i]
if q>1:
for i in xrange(q-1): F[ls+i, ls+i-1] = 1
#Calculation of the frame of the variance-covariance matrix Q of the system model
Q = np.eye(ns,dtype=np.float)*w
return m, F, G, H, Q
KF_AnomalyDetection.py
# coding: utf-8
from math import log, ceil
import numpy as np
from scipy.stats import norm, t
import matplotlib.pyplot as plt
import KF
class KFAnomalyDetection:
datalist = []
outlier_score_list = []
change_score_list = []
outlier_score_smooth = []
change_score_smooth = []
outlier_resid = None
change_resid = None
outlier_pred = None
change_pred = None
def __init__(self, term, smooth, k=2, p=0, q=0, w=10):
self.kf_outlier_score = KF.KalmanFiltering(k,p,q,term=term, w=w)
self.kf_first_smooth_score = KF.KalmanFiltering(k,p,q,term=smooth, w=w)
self.kf_change_score = KF.KalmanFiltering(k,p,q,term=term, w=w)
self.kf_second_smooth_score = KF.KalmanFiltering(k,p,q,term=smooth, w=w)
self.term = term
def forward_step(self, new_data):
# add new_data to datalist
if len(self.datalist)>=self.term:
self.datalist.pop(0)
self.datalist.append(new_data)
else:
self.datalist.append(new_data)
# compute score
if self.outlier_pred is None:
self.first_step(new_data)
else:
self.calculate_score_step(new_data)
self.learn_step(new_data)
def conversion_score(self, train, var):
"""convert score to log loss"""
m = np.mean(train)
s = np.std(train)
try:
if s < 1: s=1
px = norm.pdf(var, m, s) if norm.pdf(var, m, s)!=0.0 else 1e-308
res = -log(px)
return res
except:
return 0
def first_step(self, new_data):
# learn outlier model
self.outlier_resid, self.outlier_pred = \
self.learn_KF(self.kf_outlier_score, new_data)
# calculate outlier score
self.calculate_score(self.kf_first_smooth_score, self.outlier_resid,
self.outlier_pred, new_data, self.outlier_score_list,
self.outlier_score_smooth)
# learn cnage model
self.change_resid, self.change_pred = \
self.learn_KF(self.kf_change_score, self.outlier_score_smooth[-1])
# calculate change score
self.calculate_score(self.kf_second_smooth_score, self.change_resid,
self.change_pred, self.outlier_score_smooth[-1],
self.change_score_list, self.change_score_smooth)
def learn_step(self, data):
self.outlier_resid, self.outlier_pred = \
self.learn_KF(self.kf_outlier_score, data)
self.change_resid, self.change_pred = \
self.learn_KF(self.kf_change_score, self.outlier_score_smooth[-1])
def learn_KF(self, func, data):
"""leaning KF from new data"""
pred = func.forward_backward(data)
resid = np.abs( func.XSS[:func.term,0] - np.array(self.datalist) ) # residuals
return resid, pred
def calculate_score_step(self, new_data):
# calculate outlier score
self.calculate_score(self.kf_first_smooth_score, self.outlier_resid,
self.outlier_pred, new_data, self.outlier_score_list,
self.outlier_score_smooth)
# calculate change score
self.calculate_score(self.kf_second_smooth_score, self.change_resid,
self.change_pred, self.outlier_score_smooth[-1],
self.change_score_list, self.change_score_smooth)
def calculate_score(self, func, resid, pred, new_data, storage_score_list, storage_smooth_list):
score = self.conversion_score( resid, abs(float(pred) - float(new_data)) )
print 'got score', score
storage_score_list.append(score)
print 'smoothing score'
storage_smooth_list.append( func.forward_backward(score, smoothing=1) )
if __name__=='__main__':
fname = 'test'
term = 3 # time window of training
smooth = 1
kfad = KFAnomalyDetection(term,smooth,2,0,0,20)
datalist = []
of = open('score_out.txt','w')
dlist = np.hstack( (np.random.normal(0,1,100),np.random.normal(10,0.2,20),np.random.normal(0,1,100)) )
for data in dlist:
kfad.forward_step(data)
of.write( str(kfad.change_score_smooth[-1])+'\n' )
of.close()
rng = range( len(dlist.tolist()) )
plt.plot(rng,dlist,label=u"data")
plt.show()
plt.plot(rng,kfad.change_score_smooth,label=u"score")
plt.show()
Sorry for the dirty code as usual. I wrote the Kalman filter code over a year ago, but when I look at it now, I think "Who wrote this uncode?" .. .. Well, I will positively think that it is growing more than a year ago w
When calculating the score, if the standard deviation value is small, many points will be judged as abnormal values, so the minimum standard deviation value is set to 1. (I wonder if count data is acceptable, I use it as if it were.) Also, if a value that is too abnormal is input, the probability is considered to be zero (zero value is returned) and log calculation cannot be performed, so it is replaced with the minimum order of float. I think there is a possibility that this area can be solved by using a distribution with a thick tail.
Paste the result of running with the above code below.
You can detect the score fairly well. It is natural to be able to detect such clear data. .. ..
As for the parameters, it is almost necessary to change only the term (time window). There is no problem with 1 for the smooth window. In other words, the smooth part of the above code is just wasted calculation ... orz Also, there is no problem with dispersion, and it is not necessary to respond to major changes, so I think that about 10 to 20 is fine, depending on the scale.
I thought that the Kalman filter would have more advantages than the ARIMA model, but the result was not as good as I expected. .. I will continue to look for places that can be improved.
We apologize for the inconvenience, but if you make a mistake, we would appreciate it if you could point it out.
Recommended Posts