J'ai implémenté le modèle d'espace d'état auto-organisé dans l'article suivant en Python. En supposant un bruit non gaussien pour le bruit du système / le bruit d'observation du filtre à particules, vous optimiserez vous-même les hyper paramètres.
La recherche de super paramètres est assez gênante, donc celui qui s'optimise est assez pratique. (Ensuite, le problème est de rechercher des paramètres ultra-super ...)
Référence: Filtrage de la trajectoire de mouvement à l'aide d'un modèle d'espace d'état auto-organisé
N'est-ce pas mal de faire ça ici? N'hésitez pas à commenter si vous avez des questions. Aussi, je ne suis pas sûr, alors je vous invite à me le dire.
** Fixé le 14 décembre 2016 **
Dans les deux cas, la dispersion du bruit est augmentée en cours de route.
――Le filtre à particules normal semble bien suivre dans la zone où la dispersion est faible, mais il décolle dans la zone où la dispersion est grande. ――Le type auto-organisateur semble être plus tolérant aux erreurs car la dispersion des particules est légèrement plus importante dans la région où la dispersion est importante.
** Attention ** Les fonctions utilitaires sont les mêmes que ci-dessous. Implémentation Python du filtre de particules --Qiita
particle_selforganized.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Implémentation Python du filtre à particules auto-organisé
"""
import pyximport
pyximport.install()
import resample
import numpy as xp
# import cupy as xp
def _rand_cauchy(gma):
nr, nc = gma.shape
uni = xp.random.rand(nr, nc)
return xp.arctan(uni/gma) /xp.pi + 1./2.
def _rand_normal(sgm, shape):
return xp.random.normal(0, sgm, shape)
def _logpdf_cauchy(gma, x):
y = xp.log(gma/xp.pi) -xp.log(x**2. + gma**2.)
return xp.sum(y, axis=0)
def _normalize(w):
return w / xp.sum(w)
class ParticleFilterSelfOrganized(object):
_sgm = 0.001
def __init__(self, f, h, pars_init, pars_hpt_init, pars_hpo_init):
self._f = f
self._h = h
_, num = pars_init.shape
self._num = num
self._w = _normalize(xp.ones(num))
self._pars = pars_init
self._pars_hpt = pars_hpt_init
self._pars_hpo = pars_hpo_init
def update(self, y):
self._update_pars()
self._update_weights(y)
self._resample()
def _update_pars(self):
self._pars = self._f(self._pars) + _rand_cauchy(xp.exp(self._pars_hpt/2.))
self._pars_hpt = self._pars_hpt + _rand_normal(self._sgm, self._pars_hpt.shape)
self._pars_hpo = self._pars_hpo + _rand_normal(self._sgm, self._pars_hpo.shape)
def _update_weights(self, y):
Y = y.reshape(y.size,1) * xp.ones(self._num)
pars_hpo = xp.exp(self._pars_hpo/2.)
loglh = _logpdf_cauchy( pars_hpo, xp.absolute(Y - self._h(self._pars)) )
self._w = _normalize( xp.exp( xp.log(self._w) + loglh ) )
def _resample(self):
wcum = xp.r_[0, xp.cumsum(self._w)]
num = self._num
idxs = resample.resample(num, wcum)
# start = 0
# idxs = num*[0]
# for i, n in enumerate( sorted(xp.random.rand(num)) ):
# for j in range(start, num):
# if n <= wcum[j+1]:
# idxs[i] = start = j
# break
self._pars = self._pars[:,idxs]
self._pars_hpt = self._pars_hpt[:,idxs]
self._pars_hpo = self._pars_hpo[:,idxs]
self._w = _normalize(self._w[idxs])
def estimate(self):
return xp.sum(self._pars * self._w, axis=1)
def particles(self):
return self._pars
test_particle_selforganized.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""particle_selforganized.exemple d'utilisation py
"""
import particle_selforganized as par
import utils
import numpy as xp
# import cupy as xp
#réglages des paramètres--------------------
num = 5000 #Nombre de particules
v_noise = 0.05
#Particules initiales
mins = -5. * xp.ones(4)
maxs = +5. * xp.ones(4)
pars_init = utils.rand_uniform(mins, maxs, num)
pars_hpt_init = xp.random.normal(0, 0.01, pars_init.shape)
pars_hpo_init = xp.random.normal(0, 0.01, (2,num))
# ---------------------------------
dataset = utils.load_data("testdata")
# dataset = utils.generate_data("testdata")
#Génération de modèle d'état(Modèle de différence de plancher secondaire)
A = xp.kron(xp.eye(2), xp.array([[2, -1], [1, 0]]))
f_trans = lambda X: A.dot(X)
#Génération de modèle d'observation(Modèle d'observation directe)
B = xp.kron(xp.eye(2), xp.array([1, 0]))
f_obs = lambda X: B.dot(X)
#Tracé initial
lines = utils.init_plot_particle(dataset, pars_init)
#Génération de filtre à particules
pf = par.ParticleFilterSelfOrganized(
f_trans, f_obs, pars_init, pars_hpt_init, pars_hpo_init)
x_est = xp.zeros(dataset.x.shape)
for i, y in enumerate(dataset.y):
pf.update(y)
state = pf.estimate()
x_est[i,:] = f_obs(state)
#Graphique de données
pars = pf.particles()
utils.plot_each_particle(lines, x_est, pars, i)
# utils.save_as_gif_particle(dataset, pars_init, pf, "particle_selforganized.gif")
Recommended Posts