C'est courant, mais je l'ai créé donc je vais le laisser. J'ajouterai un commentaire quand j'en aurai envie.
――Je me promène dans des endroits étranges jusqu'à ce que je le verrouille pour la première fois. «J'ai mis exprès quelques valeurs aberrantes, mais il semble qu'elles filtrent correctement.
N'est-ce pas mal de faire ça ici? N'hésitez pas à commenter si vous avez des questions. Aussi, je ne suis pas sûr, alors je vous invite à me le dire.
** Fixé le 14 décembre 2016 **
, puis re-répartissez-la en une fois avec
pars [:, idxs]`.particle.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Implémentation Python du filtre à particules
"""
import pyximport
pyximport.install()
import resample
import numpy as xp
# import cupy as xp
class GaussianNoiseModel(object):
"""Distribution gaussienne multidimensionnelle
"""
def __init__(self, Cov):
"""constructeur
@param ndarray(n,n) Cov :Matrice co-distribuée distribuée
"""
self._Cov = Cov
def generate(self, num):
"""Génération de bruit
@param int num :Nombre de particules
@return ndarray(n,num) :Matrice de bruit
"""
n, _ = self._Cov.shape
return xp.random.multivariate_normal(xp.zeros(n), self._Cov, num).T
def logpdf(self, X):
"""Fonction de densité de probabilité logistique
@param ndarray(n,num) X
@param int num :Nombre de particules
@return ndarray(num) :Densité de probabilité du journal
"""
Cov = self._Cov
k, _ = Cov.shape
det_Cov = xp.linalg.det(Cov)
Cov_inv = xp.linalg.inv(Cov)
coefs = xp.array([ x.dot(Cov_inv).dot(x.T) for x in X.T ])
return -k*xp.log(2.*xp.pi)/2. -xp.log(det_Cov)/2. -coefs/2.
class CauchyNoiseModel(object):
"""Distribution de Cauchy indépendante multidimensionnelle
Distribution multidimensionnelle de Cauchy en supposant l'indépendance de chaque variable
"""
def __init__(self, gma):
"""constructeur
@param ndarray(n) gma :Échelle de la population
"""
self._gma = gma
def generate(self, num):
gma = self._gma
uni = xp.random.rand(gma.size, num)
Gma = gma.reshape(gma.size,1) * xp.ones(num)
return xp.arctan(uni / Gma) /xp.pi + 1./2.
def logpdf(self, X):
_, num = X.shape
gma = self._gma
Gma = gma.reshape(gma.size,1) * xp.ones(num)
return xp.sum(xp.log(Gma/xp.pi) - xp.log(X**2 + Gma**2), axis=0)
def _normalize(w):
"""Normalisation du poids
@param ndarray(num) w :Poids de chaque particule
@return ndarray(num) :Poids normalisé pour chaque particule
"""
return w / xp.sum(w)
class ParticleFilter(object):
"""Particle Filter (Filtre à particule)
"""
def __init__(self, f, g, h, t_noise, o_noise, pars_init):
"""constructeur
@param ndarray(nx,num) function( ndarray(nx,num) ) f :Fonction de transition d'état
@param ndarray(nx,num) function( ndarray(nu,num) ) g :Fonction de propagation d'entrée
@param ndarray(ny,num) function( ndarray(nx,num) ) h :Fonction d'observation
@param NoiseModel t_noise :Modèle de bruit du système
@param NoiseModel o_noise :Modèle de bruit d'observation
@param ndarray(nx,num) pars_init :Valeur initiale des particules
"""
self._f = f
self._g = g
self._h = h
self._t_noise = t_noise
self._o_noise = o_noise
_, num = pars_init.shape
self._num = num
self._w = _normalize(xp.ones(num))
self._pars = pars_init
def update(self, y, u):
"""Mise à jour du filtre
@param ndarray(ny) y :Vecteur d'observation en n
@param ndarray(nu) u :Vecteur d'entrée à n
"""
self._update_pars(u)
self._update_weights(y)
self._resample()
def _update_pars(self, u):
"""Mettre à jour les particules en fonction du modèle de transition d'état
-Équation d'état
x_n = f(x_n-1) + g(u_n-1) + w
-Vecteur d'état x_n
-Vecteur d'entrée u_n
-Fonction de transition d'état f(x)
-Fonction de propagation d'entrée g(u)
-Bruit du système w
@param ndarray(nu) u : n-Vecteur d'entrée à 1(u_n-1)
"""
U = u.reshape(u.size,1) * xp.ones(self._num)
self._pars = self._f(self._pars) + self._g(U) + self._t_noise.generate(self._num)
def _update_weights(self, y):
"""Calculer la vraisemblance selon le modèle d'observation
-Équation d'observation
y_n = h(x_n) + v
-Vecteur d'état x_n
-Vecteur d'observation y_n
-Fonction d'observation h(x)
-Bruit d'observation v
@param ndarray(ny) y :Vecteur d'observation en n(y_n)
"""
Y = y.reshape(y.size,1) * xp.ones(self._num)
loglh = self._o_noise.logpdf( xp.absolute(Y - self._h(self._pars)) )
self._w = _normalize( xp.exp( xp.log(self._w) + loglh ) )
def _resample(self):
"""Rééchantillonnage
"""
wcum = xp.cumsum(self._w)
num = self._num
# idxs = [ i for n in xp.sort(xp.random.rand(num))
# for i in range(num) if n <= wcum[i] ]
# start = 0
# idxs = [ 0 for i in xrange(num) ]
# for i, n in enumerate( xp.sort(xp.random.rand(num)) ):
# for j in range(start, num):
# if n <= wcum[j]:
# idxs[i] = start = j
# break
idxs = resample.resample(num, wcum)
self._pars = self._pars[:,idxs]
self._w = _normalize(self._w[idxs])
def estimate(self):
"""Estimation d'état
@return ndarray(nx) :Estimation vectorielle d'état
"""
return xp.sum(self._pars * self._w, axis=1)
def particles(self):
"""particule
@return ndarray(nx,num)
"""
return self._pars
J'ai imaginé un peu ici. Si vous faites ce qui suit, ce sera une double boucle, mais vous pouvez traiter presque une seule boucle. Puisque le nombre de boucles est proportionnel à la première puissance du nombre de particules, et non au carré, il y a une grande différence de vitesse à mesure que le nombre de particules augmente.
-Générer un nombre aléatoire uniforme $ n_i $ de $ [0, 1] $ et trier par ordre croissant
resample.pyx
# -*- coding: utf-8 -*-
#cython: boundscheck=False
#cython: wraparound=False
import numpy as xp
cimport numpy as xp
cimport cython
ctypedef xp.float64_t DOUBLE_t
ctypedef xp.int_t INT_t
def resample(int num, xp.ndarray[DOUBLE_t, ndim=1] wcum):
cdef int start, i, j, length
cdef double n
start = 0
cdef xp.ndarray[INT_t, ndim=1] idxs = xp.zeros(num).astype(xp.int)
cdef xp.ndarray[DOUBLE_t, ndim=1] rand = xp.sort(xp.random.rand(num))
length = rand.size
for i in xrange(length):
for j in xrange(start, num):
if rand[i] <= wcum[j]:
idxs[i] = start = j
break
return idxs
resample_setup.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#filename_setup.py
from distutils.core import setup
from distutils.extension import Extension
from Cython.Distutils import build_ext as build_pyx
import numpy
setup(
name = 'resample',
ext_modules = [Extension('resample', ['resample.pyx'])],
cmdclass = { 'build_ext': build_pyx },
include_dirs = [numpy.get_include()],
)
$ cython -a resample.pyx #compiler
$ python resample_setup.py build_ext --inplace #Construire
Cython Cela n'a pas été beaucoup plus rapide, probablement parce que je ne l'ai pas bien compris. (Merci pour votre commentaire)
Lorsque le bruit du système et le bruit d'observation sont une distribution de Cauchy indépendante multidimensionnelle. La résistance aux valeurs aberrantes est ajoutée. Référence: Filtrage de la trajectoire de mouvement à l'aide d'un modèle d'espace d'état auto-organisé
py.test_particle.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""particle.exemple d'utilisation py
"""
import particle as par
import utils
import numpy as xp
# import cupy as xp
#réglages des paramètres--------------------
num = 3000 #Nombre de particules
v_sys = 0.01 #Co-dispersion du bruit du système
v_obs = 0.1 #Co-dispersion du bruit observé
v_noise = 0.05
#Particules initiales
mins = -5. * xp.ones(4)
maxs = +5. * xp.ones(4)
pars_init = utils.rand_uniform(mins, maxs, num)
# ---------------------------------
dataset = utils.load_data("testdata")
# dataset = utils.generate_data("testdata")
#Génération de modèle d'état(Modèle de différence de plancher secondaire)
A = xp.kron(xp.eye(2), xp.array([[2, -1], [1, 0]]))
f = lambda X: A.dot(X) #Définition de la fonction de transition d'état
B = xp.zeros(1)
g = lambda U: B.dot(U)
C = xp.ones(4) * v_sys
sysn_model = par.CauchyNoiseModel(C)
#Génération de modèle d'observation(Modèle d'observation directe)
D = xp.kron(xp.eye(2), xp.array([1, 0]))
h = lambda X: D.dot(X) #Définition de la fonction d'observation
E = xp.ones(2) * v_obs
obsn_model = par.CauchyNoiseModel(E)
#Tracé initial
lines = utils.init_plot_particle(dataset, pars_init)
#Génération de filtre à particules
pf = par.ParticleFilter(
f, g, h, sysn_model, obsn_model, pars_init)
x_est = xp.zeros(dataset.x.shape)
for i, y in enumerate(dataset.y):
pf.update(y, xp.zeros(1))
state = pf.estimate()
x_est[i,:] = h(state)
#Graphique de données
pars = pf.particles()
utils.plot_each_particle(lines, x_est, pars, i)
# utils.save_as_gif_particle(dataset, pars_init, pf, "particle_selforganized.gif")
utiils.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import matplotlib.animation as animation
import matplotlib.pyplot as plt
from collections import namedtuple
import numpy as xp
# import cupy as xp
Dataset = namedtuple("Dataset", ["t", "x", "y"])
def load_data(dirname):
t = xp.loadtxt(dirname + "/t.txt")
x = xp.loadtxt(dirname + "/x.txt")
y = xp.loadtxt(dirname + "/y.txt")
dataset = Dataset(t=t, x=x, y=y)
return dataset
def save_data(dirname, dataset):
xp.savetxt(dirname + "/t.txt", dataset.t)
xp.savetxt(dirname + "/x.txt", dataset.x)
xp.savetxt(dirname + "/y.txt", dataset.y)
def generate_data(dirname):
# truth
t = xp.arange(0, 6*xp.pi, 0.05)
x = xp.c_[t*xp.cos(t), t*xp.sin(t)]
# noise
nr, nc = x.shape
idx = xp.random.randint(nr/2, nr, 5)
n = xp.random.normal(0., xp.sqrt(v_noise), x.shape)
n[nr/3:2*nr/3,:] = xp.random.normal(0., xp.sqrt(v_noise*10), (2*nr/3-nr/3,nc))
n[idx,:] += xp.random.normal(0., xp.sqrt(v_noise)*20, (5,nc))
# observation
y = x + n
x_est = xp.zeros(x.shape)
Dataset = namedtuple("Dataset", ["t", "x", "y"])
dataset = Dataset(t=t, x=x, y=y)
save_data(dirname, dataset)
return dataset
def init_plot_particle(dataset, pars):
fig, ax = plt.subplots(1, 1)
ax.hold(True)
ax.axis("equal")
lines = []
lines.append( ax.plot(pars[0,:], pars[2,:], "o") )
lines.append( ax.plot(dataset.x.T[0], dataset.x.T[1], ":"))
lines.append( ax.plot(dataset.y.T[0], dataset.y.T[1], ".-"))
lines.append( ax.plot([0], [0]) )
plt.legend(["particles", "truth", "observed", "estimated"])
plt.xlabel("x")
plt.ylabel("y")
plt.grid(True)
return lines
def plot_each_particle(lines, x_est, pars, i):
lines[0][0].set_data(pars[0,:], pars[2,:])
lines[3][0].set_data(x_est.T[0][:i], x_est.T[1][:i])
plt.pause(1e-5)
def save_as_gif_particle(dataset, pars_init, pf, filename):
x_est = xp.zeros(dataset.x.shape)
lines = init_plot_particle(dataset, pars_init, x_est)
def draw(i):
pf.update(dataset.y[i])
state = pf.estimate()
x_est[i,:] = [state[0], state[2]]
#Graphique de données
pars = pf.particles()
plot_each_particle(lines, x_est, pars, i)
ny, _ = dataset.y.shape
ani = animation.FuncAnimation(fig, draw, ny, blit=False, interval=100, repeat=False)
ani.save(filename, writer="imagemagick")
def init_plot_kalman(dataset):
fig, ax = plt.subplots(1, 1)
ax.hold(True)
ax.axis("equal")
lines = []
lines.append( ax.plot(dataset.x.T[0], dataset.x.T[1], ":"))
lines.append( ax.plot(dataset.y.T[0], dataset.y.T[1], ".-"))
lines.append( ax.plot([0], [0]) )
plt.legend(["truth", "observed", "estimated"])
plt.xlabel("x")
plt.ylabel("y")
plt.grid(True)
return lines
def plot_each_kalman(lines, x_est, i):
lines[2][0].set_data(x_est.T[0][:i], x_est.T[1][:i])
plt.pause(1e-5)
def save_as_gif_kalman(dataset, kf, filename):
x_est = xp.zeros(dataset.x.shape)
lines = init_plot_kalman(dataset, x_est)
def draw(i):
kf.update(dataset.y[i])
state = pf.estimate()
x_est[i,:] = [state[0], state[2]]
#Graphique de données
plot_each_kalman(lines, x_est, i)
ny, _ = dataset.y.shape
ani = animation.FuncAnimation(fig, draw, ny, blit=False, interval=100, repeat=False)
ani.save(filename, writer="imagemagick")
def rand_uniform(mins, maxs, num):
"""Pour la génération de particules initiales
@param ndarray(nx) mins :Chaque valeur minimale de la particule
@param ndarray(nx) maxs :Chaque valeur maximale de la particule
@param int num :Nombre de particules
@return ndarray(nx,num) :Particules initiales
"""
nx = mins.size
return (mins.reshape(nx, 1) * xp.ones(num)
+ (maxs - mins).reshape(nx, 1) * xp.random.rand(nx, num))
kalman.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Implémentation Python du filtre Kalman
"""
import numpy as xp
# import cupy as xp
class KalmanFilter(object):
def __init__(self, F, H, Q, R, x_init, P_init):
self._x = x_init.reshape(-1,1)
self._P = P_init
self._update_x = lambda x : F.dot(x)
self._update_P = lambda P : Q + F.dot(P).dot(F.T)
self._error = lambda x,y : y - H.dot(x)
self._cov_P = lambda P : R + H.dot(P).dot(H.T)
self._kalman_gain = lambda P,S : P.dot(H.T).dot(xp.linalg.inv(S))
self._estimate_x = lambda x,K,e : x + K.dot(e)
self._estimate_P = lambda P,K : (xp.eye(*P.shape)-K.dot(H)).dot(P)
def update(self, y):
#Prévoir
x_predicted = self._update_x(self._x)
P_predicted = self._update_P(self._P)
#Calcul des paramètres
e = self._error(x_predicted, y.reshape(-1,1))
S = self._cov_P(P_predicted)
K = self._kalman_gain(P_predicted, S)
#mise à jour
self._x = self._estimate_x(x_predicted, K, e)
self._P = self._estimate_P(P_predicted, K)
def estimate(self):
return self._x
test_kalman.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""kalman.exemple d'utilisation py
"""
import kalman
import utils
import numpy as xp
# import cupy as xp
#réglages des paramètres--------------------
var_Q = 0.01
var_R = 1
x_init = xp.zeros(4)
P_init = xp.ones(4)
# ---------------------------------
dataset = utils.load_data("testdata")
# dataset = utils.generate_data("testdata")
#Génération de modèle d'état(Modèle de différence de plancher secondaire)
F = xp.kron(xp.eye(2), xp.array([[2, -1], [1, 0]]))
Q = var_Q * xp.eye(4)
#Génération de modèle d'observation(Modèle d'observation directe)
H = xp.kron(xp.eye(2), xp.array([1, 0]))
R = var_R * xp.eye(2)
#Tracé initial
lines = utils.init_plot_kalman(dataset)
kf = kalman.KalmanFilter(F, H, Q, R, x_init, P_init)
x_est = xp.zeros(dataset.x.shape)
for i, y in enumerate(dataset.y):
kf.update(y)
state = kf.estimate()
x_est[i,:] = H.dot(state.reshape(-1,1)).T
#Graphique de données
utils.plot_each_kalman(lines, x_est, i)
# utils.save_as_gif_kalman(dataset, kf, "kalman.gif")
Recommended Posts