J'avais besoin d'un modèle de prédiction de séries chronologiques très précis pour la détection des points de changement, alors j'ai essayé de créer un code. Écrivez d'abord le résultat. Je voulais dire: "J'ai pu obtenir une bonne précision! (Ne le faites pas!", Mais j'ai un code à moitié fini avec un comportement étrange ... orz À l'avenir, nous la reverrons dès qu'elle pourra être corrigée. ..
"Avant-plan de l'analyse des séries temporelles financières par modèle d'espace d'états auto-organisé avec recherche de distribution initiale: application au modèle de fluctuation de volatilité stochastique avec distribution t" résume les problèmes du filtre de Monte Carlo et recherche plus avant la distribution initiale optimale La méthode a également été proposée et c'était un matériau très intéressant. J'avais l'intention de créer un produit qui incluait l'optimisation de la distribution initiale en référence à cela, mais je me suis retrouvé avec un produit qui était loin de là. ..
Le filtre Monte Carlo est un algorithme d'estimation d'état très flexible qui vous permet de construire des modèles non-gaussiens non stationnaires. Cependant, les problèmes suivants sont soulevés.
Concernant l'estimation des paramètres dans le filtre de Monte Carlo, (1) le montant estimé de la fonction de vraisemblance pour le paramètre inclut l'erreur par la méthode de Monte Carlo, et (2) la différenciation de la fonction de vraisemblance est difficile à calculer (auto-organisation avec recherche de distribution initiale). Avant-plan de l'analyse des séries temporelles financières par modèle d'espace d'état P.4)
Pour résoudre ce problème, une méthode utilisant une méthode d'optimisation appelée méthode Nelder-Mead (méthode d'optimisation non linéaire de type recherche linéaire) a été proposée. Cependant, cette méthode laisse le problème que si les conditions de convergence ne sont pas assouplies de manière significative, la convergence ne se produira pas et la dispersion des estimations de paramètres augmentera. D'autre part, le modèle d'espace d'états auto-organisé est une méthode de transport de paramètres dans l'espace d'états pour effectuer une estimation bayésienne. Dans «Avant-première de l'analyse des séries temporelles financières par modèle d'espace d'états auto-organisé avec recherche de distribution initiale», une méthode de construction de la distribution initiale (méthode d'estimation la plus probable) utilisant la méthode NM est proposée. Pour le filtre Monte Carlo, veuillez vous référer à Implémentation d'un filtre à particules simple.
J'ai créé le code pour le modèle suivant.
Comme décrit dans "Modèle statistique de type découverte de connaissances et d'auto-organisation", par exemple, lors de l'estimation de $ σ ^ 2 $, il est nécessaire de garantir une valeur positive, donc estimez $ log_ {10} σ ^ 2 $ Aller.
Ensuite, j'écrirai le code ci-dessous.
python
# coding: utf-8
from math import log, pow, sqrt
import numpy as np
from scipy.stats import norm, cauchy
from numpy.random import uniform, multivariate_normal
from multiprocessing import Pool
import matplotlib.pyplot as plt
class ParticleFilter:
nu = 0.01 #Paramètre ultra-super d'échelle de bruit du système
xi = 0.03 #Échelle de bruit d'observation ultra-super paramètre
log_likelihood = 0.0 #Probabilité du journal
TIME = 1
PR=8 # unmber of processing
def __init__(self, PARTICLES_NUM, k=1, ydim=1, pdim=2):
self.PARTICLES_NUM = PARTICLES_NUM #Nombre de particules
self.TEETH_OF_COMB = np.arange(0, 1, float(1.0)/self.PARTICLES_NUM)
self.weights = np.zeros((ydim, self.PARTICLES_NUM))
self.particles = np.zeros((k*ydim+pdim ,self.PARTICLES_NUM))
self.predicted_particles = np.zeros((k*ydim+pdim , self.PARTICLES_NUM))
np.random.seed(555)
self.predicted_value = []
self.filtered_value = []
self.LSM = np.zeros(ydim) #Erreur carrée
self.F, self.G, self.H = self.FGHset(k, ydim, pdim)
self.k = k
self.ydim = ydim
self.pdim = pdim
def init_praticles_distribution(self, P, r):
"""initialize particles
x_0|0
tau_0|0
sigma_0|0
"""
data_particles = multivariate_normal([1]*self.ydim*self.k,
np.eye(self.ydim*self.k)*10, self.PARTICLES_NUM).T
param_particles = np.zeros((self.pdim, self.PARTICLES_NUM))
for i in xrange(self.pdim):
param_particles[i,:] = uniform(P-r, P+r, self.PARTICLES_NUM)
self.particles = np.vstack((data_particles, param_particles))
def get_system_noise(self):
"""v_t vector"""
data_noise = cauchy.rvs(loc=[0]*self.PARTICLES_NUM, scale=np.power(10,self.particles[self.ydim])) #size=self.PARTICLES_NUM
data_noise[data_noise==float("-inf")] = -1e308
data_noise[data_noise==float("inf")] = 1e308
parameta_noise_sys = cauchy.rvs(loc=0, scale=self.xi, size=self.PARTICLES_NUM) # noise of hyper parameter in system model
parameta_noise_ob = cauchy.rvs(loc=0, scale=self.nu, size=self.PARTICLES_NUM) # noise of hyper parameter in observation model
#print np.vstack((data_noise, parameta_noise_sys, parameta_noise_ob))
return np.vstack((data_noise, parameta_noise_sys, parameta_noise_ob))
def calc_pred_particles(self):
"""calculate system function
x_t|t-1 = F*x_t-1 + Gv_t
"""
return self.F.dot(self.particles) + self.G.dot(self.get_system_noise()) # linear non-Gaussian
def calc_particles_weight(self,y):
"""calculate fitness probabilities between observation value and predicted value
w_t
"""
locs = self.calc_pred_particles()
self.predicted_particles = locs
scale=np.power(10,locs[-1])
scale[scale==0] = 1e-308
self.weights = cauchy.pdf( np.array([y]*self.PARTICLES_NUM) - self.H.dot(locs), loc=[0]*self.PARTICLES_NUM,
scale=scale ).flatten()
def calc_likelihood(self):
"""calculate likelihood at that point
p(y_t|y_1:t-1)
"""
res = np.sum(self.weights)/self.PARTICLES_NUM
#print res
self.log_likelihood += log(res)
def normalize_weights(self):
"""wtilda_t"""
self.weights = self.weights/np.sum(self.weights)
def resample(self,y):
"""x_t|t"""
self.normalize_weights()
self.memorize_predicted_value()
# accumulate weight
cum = np.cumsum(self.weights)
# create roulette pointer
base = uniform(0,float(1.0)/self.PARTICLES_NUM)
pointers = self.TEETH_OF_COMB + base
# select particles
selected_idx = [np.where(cum>=p)[0][0] for p in pointers]
"""
pool = Pool(processes=self.PR)
selected_idx = pool.map(get_slected_particles_idx, ((cum,p) for p in pointers))
pool.close()
pool.join()
"""
self.particles = self.predicted_particles[:,selected_idx]
self.memorize_filtered_value(selected_idx, y)
def memorize_predicted_value(self):
predicted_value = np.sum(self.predicted_particles*self.weights, axis=1)[0]
self.predicted_value.append(predicted_value)
def memorize_filtered_value(self, selected_idx, y):
filtered_value = np.sum(self.particles*self.weights[selected_idx] , axis=1) \
/np.sum(self.weights[selected_idx])
self.filtered_value.append(filtered_value[0])
self.calculate_LSM(y,filtered_value[:self.ydim])
def calculate_LSM(self,y,filterd_value):
self.LSM += pow(y-filterd_value,2)
def forward(self,y):
"""compute system model and observation model"""
print 'calculating time at %d' % self.TIME
self.calc_pred_particles()
self.calc_particles_weight(y)
self.calc_likelihood()
self.resample(y)
self.TIME += 1
def FGHset(self, k, vn_y, n_ss_parameters):
"""
vn_y:input dimension
n_ss_parameters:number of hyper parameter
k:difference
"""
G_upper_block = np.zeros((k*vn_y, vn_y+n_ss_parameters))
G_lower_block = np.zeros((n_ss_parameters, vn_y+n_ss_parameters))
G_lower_block[-n_ss_parameters:, -n_ss_parameters:] = np.eye(n_ss_parameters)
G_upper_block[:vn_y, :vn_y] = np.eye(vn_y)
G = np.vstack( (G_upper_block, G_lower_block) )
H = np.hstack( (np.eye(vn_y),
np.zeros((vn_y, vn_y*(k-1)+n_ss_parameters))
) )
F_upper_block = np.zeros((k*vn_y, k*vn_y+n_ss_parameters))
F_lower_block = np.zeros((n_ss_parameters, k*vn_y+n_ss_parameters))
F_lower_block[-n_ss_parameters:, -n_ss_parameters:] = np.eye(n_ss_parameters)
if k==1:
F_upper_block[:vn_y, :vn_y] = np.eye(vn_y)
elif k==2:
F_upper_block[:vn_y, :vn_y] = np.eye(vn_y)*2
F_upper_block[:vn_y, vn_y:k*vn_y] = np.eye(vn_y)*-1
F_upper_block[vn_y:k*vn_y, :vn_y] = np.eye(vn_y)
F = np.vstack((F_upper_block, F_lower_block))
return F, G, H
def get_slected_particles_idx((cum,p)):
"""multiprocessing function"""
try:
return np.where(cum>=p)[0][0]
except Exception, e:
import sys
import traceback
sys.stderr.write(traceback.format_exc())
if __name__=='__main__':
n_particle = 10000
pf = ParticleFilter(n_particle, k=1, ydim=1, pdim=2)
pf.init_praticles_distribution(0, 8) # P, r
data = np.hstack((norm.rvs(0,1,size=20),norm.rvs(10,1,size=60),norm.rvs(-30,0.5,size=20)))
for d in data:
pf.forward(d)
print 'log likelihood:', pf.log_likelihood
print 'LSM:', pf.LSM
rng = range(100)
plt.plot(rng,data,label=u"training data")
plt.plot(rng,pf.predicted_value,label=u"predicted data")
plt.plot(rng,pf.filtered_value,label=u"filtered data")
plt.xlabel('TIME',fontsize=18)
plt.ylabel('Value',fontsize=18)
plt.legend()
plt.show()
C'est un code défectueux qui n'a même pas besoin d'inclure un diagramme des résultats ...
C'est dommage que le temps soit écoulé à mi-chemin cette fois. Corrigez-le afin d'obtenir des résultats décents à l'avenir et augmentez-le à nouveau. (Je suis désolé de publier un élément à moitié terminé qui n'est pas utile, mais cette fois je l'ai téléchargé comme mémo pour moi-même.)
Nous vous prions de nous excuser pour la gêne occasionnée, mais si vous faites une erreur, veuillez nous en informer.