Code pour le modèle d'espace d'état auto-organisé (inachevé)

Motivation

J'avais besoin d'un modèle de prédiction de séries chronologiques très précis pour la détection des points de changement, alors j'ai essayé de créer un code. Écrivez d'abord le résultat. Je voulais dire: "J'ai pu obtenir une bonne précision! (Ne le faites pas!", Mais j'ai un code à moitié fini avec un comportement étrange ... orz À l'avenir, nous la reverrons dès qu'elle pourra être corrigée. ..

Les références

"Avant-plan de l'analyse des séries temporelles financières par modèle d'espace d'états auto-organisé avec recherche de distribution initiale: application au modèle de fluctuation de volatilité stochastique avec distribution t" résume les problèmes du filtre de Monte Carlo et recherche plus avant la distribution initiale optimale La méthode a également été proposée et c'était un matériau très intéressant. J'avais l'intention de créer un produit qui incluait l'optimisation de la distribution initiale en référence à cela, mais je me suis retrouvé avec un produit qui était loin de là. ..

Problèmes avec le filtre Monte Carlo

Le filtre Monte Carlo est un algorithme d'estimation d'état très flexible qui vous permet de construire des modèles non-gaussiens non stationnaires. Cependant, les problèmes suivants sont soulevés.

Concernant l'estimation des paramètres dans le filtre de Monte Carlo, (1) le montant estimé de la fonction de vraisemblance pour le paramètre inclut l'erreur par la méthode de Monte Carlo, et (2) la différenciation de la fonction de vraisemblance est difficile à calculer (auto-organisation avec recherche de distribution initiale). Avant-plan de l'analyse des séries temporelles financières par modèle d'espace d'état P.4)

Pour résoudre ce problème, une méthode utilisant une méthode d'optimisation appelée méthode Nelder-Mead (méthode d'optimisation non linéaire de type recherche linéaire) a été proposée. Cependant, cette méthode laisse le problème que si les conditions de convergence ne sont pas assouplies de manière significative, la convergence ne se produira pas et la dispersion des estimations de paramètres augmentera. D'autre part, le modèle d'espace d'états auto-organisé est une méthode de transport de paramètres dans l'espace d'états pour effectuer une estimation bayésienne. Dans «Avant-première de l'analyse des séries temporelles financières par modèle d'espace d'états auto-organisé avec recherche de distribution initiale», une méthode de construction de la distribution initiale (méthode d'estimation la plus probable) utilisant la méthode NM est proposée. Pour le filtre Monte Carlo, veuillez vous référer à Implémentation d'un filtre à particules simple.

modèle

J'ai créé le code pour le modèle suivant. x_t=Fx_{t-1}+Gv_t y_t=Hx_{t}+w_t, w_t~C(0,σ_t^2) v_{t,t_t}~C(0,τ_t^2) v_{t,{τ_t^2}}~C(0,ν^2) v_{t,{σ_t^2}}~C(0,ξ^2) x_t=[t_t,log_{10}τ_t^2,log_{10}σ_t^2]^T v_{t}=[v_{t,t_t},v_{t,{τ_t^2}},v_{t,{σ_t^2}}]^T

Comme décrit dans "Modèle statistique de type découverte de connaissances et d'auto-organisation", par exemple, lors de l'estimation de $ σ ^ 2 $, il est nécessaire de garantir une valeur positive, donc estimez $ log_ {10} σ ^ 2 $ Aller.

Ensuite, j'écrirai le code ci-dessous.

Code de référence

python


# coding: utf-8

from math import log, pow, sqrt
import numpy as np
from scipy.stats import norm, cauchy
from numpy.random import uniform, multivariate_normal
from multiprocessing import Pool
import matplotlib.pyplot as plt


class ParticleFilter:
    nu = 0.01 #Paramètre ultra-super d'échelle de bruit du système
    xi = 0.03 #Échelle de bruit d'observation ultra-super paramètre
    log_likelihood = 0.0 #Probabilité du journal
    TIME = 1
    PR=8 # unmber of processing
    
    def __init__(self, PARTICLES_NUM, k=1, ydim=1, pdim=2):
        self.PARTICLES_NUM = PARTICLES_NUM #Nombre de particules
        self.TEETH_OF_COMB = np.arange(0, 1, float(1.0)/self.PARTICLES_NUM)
        self.weights = np.zeros((ydim, self.PARTICLES_NUM))
        self.particles = np.zeros((k*ydim+pdim ,self.PARTICLES_NUM))
        self.predicted_particles = np.zeros((k*ydim+pdim , self.PARTICLES_NUM))
        np.random.seed(555)
        self.predicted_value = []
        self.filtered_value = []
        self.LSM = np.zeros(ydim) #Erreur carrée

        self.F, self.G, self.H = self.FGHset(k, ydim, pdim)
        self.k = k
        self.ydim = ydim
        self.pdim = pdim
    
    def init_praticles_distribution(self, P, r):
        """initialize particles
        x_0|0
        tau_0|0
        sigma_0|0
        """
        data_particles = multivariate_normal([1]*self.ydim*self.k,
                            np.eye(self.ydim*self.k)*10, self.PARTICLES_NUM).T
        param_particles = np.zeros((self.pdim, self.PARTICLES_NUM))
        for i in xrange(self.pdim):
            param_particles[i,:] = uniform(P-r, P+r, self.PARTICLES_NUM)
        self.particles = np.vstack((data_particles, param_particles))
        
    def get_system_noise(self):
        """v_t vector"""
        data_noise = cauchy.rvs(loc=[0]*self.PARTICLES_NUM, scale=np.power(10,self.particles[self.ydim])) #size=self.PARTICLES_NUM
        data_noise[data_noise==float("-inf")] = -1e308
        data_noise[data_noise==float("inf")] = 1e308
        parameta_noise_sys = cauchy.rvs(loc=0, scale=self.xi, size=self.PARTICLES_NUM) # noise of hyper parameter in system model
        parameta_noise_ob = cauchy.rvs(loc=0, scale=self.nu, size=self.PARTICLES_NUM) # noise of hyper parameter in observation model

        #print np.vstack((data_noise, parameta_noise_sys, parameta_noise_ob))
        return np.vstack((data_noise, parameta_noise_sys, parameta_noise_ob))
        
    def calc_pred_particles(self):
        """calculate system function
        x_t|t-1 = F*x_t-1 + Gv_t
        """
        return self.F.dot(self.particles) + self.G.dot(self.get_system_noise()) # linear non-Gaussian  
        
    def calc_particles_weight(self,y):
        """calculate fitness probabilities between observation value and predicted value
        w_t
        """
        locs = self.calc_pred_particles()
        self.predicted_particles = locs
        scale=np.power(10,locs[-1])
        scale[scale==0] = 1e-308
        
        self.weights = cauchy.pdf( np.array([y]*self.PARTICLES_NUM) - self.H.dot(locs), loc=[0]*self.PARTICLES_NUM,
                                scale=scale ).flatten()
        
    def calc_likelihood(self):
        """calculate likelihood at that point
        p(y_t|y_1:t-1)
        """
        res = np.sum(self.weights)/self.PARTICLES_NUM
        #print res
        self.log_likelihood += log(res)
      
    def normalize_weights(self):
        """wtilda_t"""
        self.weights = self.weights/np.sum(self.weights)
      
    def resample(self,y):
        """x_t|t"""
        self.normalize_weights()

        self.memorize_predicted_value()

        # accumulate weight
        cum = np.cumsum(self.weights)
        
        # create roulette pointer 
        base = uniform(0,float(1.0)/self.PARTICLES_NUM)
        pointers = self.TEETH_OF_COMB + base

        # select particles
        selected_idx = [np.where(cum>=p)[0][0] for p in pointers]
        """
        pool = Pool(processes=self.PR)
        selected_idx = pool.map(get_slected_particles_idx, ((cum,p) for p in pointers))
        pool.close()
        pool.join()     
        """

        self.particles = self.predicted_particles[:,selected_idx]
        self.memorize_filtered_value(selected_idx, y)
    
    def memorize_predicted_value(self):
        predicted_value = np.sum(self.predicted_particles*self.weights, axis=1)[0]
        self.predicted_value.append(predicted_value)

    def memorize_filtered_value(self, selected_idx, y):
        filtered_value = np.sum(self.particles*self.weights[selected_idx] , axis=1) \
                            /np.sum(self.weights[selected_idx])
        self.filtered_value.append(filtered_value[0])
        self.calculate_LSM(y,filtered_value[:self.ydim])

    def calculate_LSM(self,y,filterd_value):
        self.LSM += pow(y-filterd_value,2)

    def forward(self,y):
        """compute system model and observation model"""
        print 'calculating time at %d' % self.TIME
        self.calc_pred_particles()
        self.calc_particles_weight(y)
        self.calc_likelihood()
        self.resample(y)
        self.TIME += 1

    def FGHset(self, k, vn_y, n_ss_parameters):
        """
        vn_y:input dimension
        n_ss_parameters:number of hyper parameter 
        k:difference
        """
        G_upper_block = np.zeros((k*vn_y, vn_y+n_ss_parameters))
        G_lower_block = np.zeros((n_ss_parameters, vn_y+n_ss_parameters))
        G_lower_block[-n_ss_parameters:, -n_ss_parameters:] = np.eye(n_ss_parameters)
        G_upper_block[:vn_y, :vn_y] = np.eye(vn_y)
        G = np.vstack( (G_upper_block, G_lower_block) )
        
        H = np.hstack( (np.eye(vn_y), 
                            np.zeros((vn_y, vn_y*(k-1)+n_ss_parameters))
                        ) )
              
        F_upper_block = np.zeros((k*vn_y, k*vn_y+n_ss_parameters))
        F_lower_block = np.zeros((n_ss_parameters, k*vn_y+n_ss_parameters))
        F_lower_block[-n_ss_parameters:, -n_ss_parameters:] = np.eye(n_ss_parameters)
        if k==1:
            F_upper_block[:vn_y, :vn_y] = np.eye(vn_y)
        elif k==2:
            F_upper_block[:vn_y, :vn_y] = np.eye(vn_y)*2
            F_upper_block[:vn_y, vn_y:k*vn_y] = np.eye(vn_y)*-1
            F_upper_block[vn_y:k*vn_y, :vn_y] = np.eye(vn_y)
        F = np.vstack((F_upper_block, F_lower_block))
        
        return F, G, H

def get_slected_particles_idx((cum,p)):
    """multiprocessing function"""
    try:
        return np.where(cum>=p)[0][0]
    
    except Exception, e:
        import sys
        import traceback
        sys.stderr.write(traceback.format_exc())    

if __name__=='__main__':
    n_particle = 10000
    pf = ParticleFilter(n_particle, k=1, ydim=1, pdim=2)
    pf.init_praticles_distribution(0, 8) # P, r
    
    data = np.hstack((norm.rvs(0,1,size=20),norm.rvs(10,1,size=60),norm.rvs(-30,0.5,size=20)))
    
    for d in data:
        pf.forward(d)
    print 'log likelihood:', pf.log_likelihood
    print 'LSM:', pf.LSM
    
    rng = range(100)
    plt.plot(rng,data,label=u"training data")
    plt.plot(rng,pf.predicted_value,label=u"predicted data")
    plt.plot(rng,pf.filtered_value,label=u"filtered data")
    plt.xlabel('TIME',fontsize=18)
    plt.ylabel('Value',fontsize=18)    
    plt.legend() 
    plt.show()

Résultat expérimental

C'est un code défectueux qui n'a même pas besoin d'inclure un diagramme des résultats ...

commentaire

C'est dommage que le temps soit écoulé à mi-chemin cette fois. Corrigez-le afin d'obtenir des résultats décents à l'avenir et augmentez-le à nouveau. (Je suis désolé de publier un élément à moitié terminé qui n'est pas utile, mais cette fois je l'ai téléchargé comme mémo pour moi-même.)

Nous vous prions de nous excuser pour la gêne occasionnée, mais si vous faites une erreur, veuillez nous en informer.

Recommended Posts

Code pour le modèle d'espace d'état auto-organisé (inachevé)
Code pour le modèle d'espace d'états auto-organisé (version modifiée)
Techniques de test de code?
Mémo de code personnel Python
Code de test pour évaluer les décorateurs
[Python] Exemple de code pour la grammaire Python