Implémentation Python du filtre à particules

introduction

C'est courant, mais je l'ai créé donc je vais le laisser. J'ajouterai un commentaire quand j'en aurai envie.

――Je me promène dans des endroits étranges jusqu'à ce que je le verrouille pour la première fois. «J'ai mis exprès quelques valeurs aberrantes, mais il semble qu'elles filtrent correctement.

sample.gif

N'est-ce pas mal de faire ça ici? N'hésitez pas à commenter si vous avez des questions. Aussi, je ne suis pas sûr, alors je vous invite à me le dire.

** Fixé le 14 décembre 2016 **

Aperçu de la source

Filtre à particule

Filtre de Kalman (bonus)

Code source

Corps de filtre à particules

particle.py


#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""Implémentation Python du filtre à particules
"""

import pyximport
pyximport.install()
import resample
import numpy as xp
# import cupy as xp


class GaussianNoiseModel(object):
    """Distribution gaussienne multidimensionnelle
    """

    def __init__(self, Cov):
        """constructeur

        @param ndarray(n,n) Cov :Matrice co-distribuée distribuée
        """
        self._Cov = Cov

    def generate(self, num):
        """Génération de bruit

        @param  int num :Nombre de particules
        @return ndarray(n,num) :Matrice de bruit
        """
        n, _ = self._Cov.shape
        return xp.random.multivariate_normal(xp.zeros(n), self._Cov, num).T

    def logpdf(self, X):
        """Fonction de densité de probabilité logistique

        @param  ndarray(n,num) X
        @param  int num :Nombre de particules
        @return ndarray(num) :Densité de probabilité du journal
        """
        Cov = self._Cov
        k, _ = Cov.shape
        det_Cov = xp.linalg.det(Cov)
        Cov_inv = xp.linalg.inv(Cov)
        coefs = xp.array([ x.dot(Cov_inv).dot(x.T) for x in X.T ])
        return -k*xp.log(2.*xp.pi)/2. -xp.log(det_Cov)/2. -coefs/2.


class CauchyNoiseModel(object):
    """Distribution de Cauchy indépendante multidimensionnelle

Distribution multidimensionnelle de Cauchy en supposant l'indépendance de chaque variable
    """

    def __init__(self, gma):
        """constructeur

        @param ndarray(n) gma :Échelle de la population
        """
        self._gma = gma

    def generate(self, num):
        gma = self._gma
        uni = xp.random.rand(gma.size, num)
        Gma = gma.reshape(gma.size,1) * xp.ones(num)
        return xp.arctan(uni / Gma) /xp.pi + 1./2.

    def logpdf(self, X):
        _, num = X.shape
        gma = self._gma
        Gma = gma.reshape(gma.size,1) * xp.ones(num)
        return xp.sum(xp.log(Gma/xp.pi) - xp.log(X**2 + Gma**2), axis=0)


def _normalize(w):
    """Normalisation du poids

    @param  ndarray(num) w :Poids de chaque particule
    @return ndarray(num) :Poids normalisé pour chaque particule
    """
    return w / xp.sum(w)


class ParticleFilter(object):
    """Particle Filter (Filtre à particule)
    """

    def __init__(self, f, g, h, t_noise, o_noise, pars_init):
        """constructeur

        @param ndarray(nx,num) function( ndarray(nx,num) ) f :Fonction de transition d'état
        @param ndarray(nx,num) function( ndarray(nu,num) ) g :Fonction de propagation d'entrée
        @param ndarray(ny,num) function( ndarray(nx,num) ) h :Fonction d'observation
        @param NoiseModel t_noise :Modèle de bruit du système
        @param NoiseModel o_noise :Modèle de bruit d'observation
        @param ndarray(nx,num)  pars_init :Valeur initiale des particules
        """
        self._f = f
        self._g = g
        self._h = h

        self._t_noise = t_noise
        self._o_noise = o_noise

        _, num = pars_init.shape
        self._num = num
        self._w = _normalize(xp.ones(num))
        self._pars = pars_init

    def update(self, y, u):
        """Mise à jour du filtre

        @param ndarray(ny) y :Vecteur d'observation en n
        @param ndarray(nu) u :Vecteur d'entrée à n
        """
        self._update_pars(u)
        self._update_weights(y)
        self._resample()

    def _update_pars(self, u):
        """Mettre à jour les particules en fonction du modèle de transition d'état

        -Équation d'état
            x_n = f(x_n-1) + g(u_n-1) + w
        -Vecteur d'état x_n
        -Vecteur d'entrée u_n
        -Fonction de transition d'état f(x)
        -Fonction de propagation d'entrée g(u)
        -Bruit du système w

        @param  ndarray(nu) u : n-Vecteur d'entrée à 1(u_n-1)
        """
        U = u.reshape(u.size,1) * xp.ones(self._num)
        self._pars = self._f(self._pars) + self._g(U) + self._t_noise.generate(self._num)

    def _update_weights(self, y):
        """Calculer la vraisemblance selon le modèle d'observation

        -Équation d'observation
            y_n = h(x_n) + v
        -Vecteur d'état x_n
        -Vecteur d'observation y_n
        -Fonction d'observation h(x)
        -Bruit d'observation v

        @param  ndarray(ny) y :Vecteur d'observation en n(y_n)
        """
        Y = y.reshape(y.size,1) * xp.ones(self._num)
        loglh = self._o_noise.logpdf( xp.absolute(Y - self._h(self._pars)) )
        self._w = _normalize( xp.exp( xp.log(self._w) + loglh ) )

    def _resample(self):
        """Rééchantillonnage
        """
        wcum = xp.cumsum(self._w)
        num = self._num

        # idxs = [ i for n in xp.sort(xp.random.rand(num))
        #     for i in range(num) if n <= wcum[i] ]

        # start = 0
        # idxs = [ 0 for i in xrange(num) ]
        # for i, n in enumerate( xp.sort(xp.random.rand(num)) ):
        #     for j in range(start, num):
        #         if n <= wcum[j]:
        #             idxs[i] = start = j
        #             break
        idxs = resample.resample(num, wcum)

        self._pars = self._pars[:,idxs]
        self._w = _normalize(self._w[idxs])

    def estimate(self):
        """Estimation d'état

        @return ndarray(nx) :Estimation vectorielle d'état
        """
        return xp.sum(self._pars * self._w, axis=1)

    def particles(self):
        """particule

        @return ndarray(nx,num)
        """
        return self._pars

Rééchantillonnage avec Cython

J'ai imaginé un peu ici. Si vous faites ce qui suit, ce sera une double boucle, mais vous pouvez traiter presque une seule boucle. Puisque le nombre de boucles est proportionnel à la première puissance du nombre de particules, et non au carré, il y a une grande différence de vitesse à mesure que le nombre de particules augmente.

-Générer un nombre aléatoire uniforme $ n_i $ de $ [0, 1] $ et trier par ordre croissant

resample.pyx


# -*- coding: utf-8 -*-
#cython: boundscheck=False
#cython: wraparound=False

import numpy as xp
cimport numpy as xp
cimport cython

ctypedef xp.float64_t DOUBLE_t
ctypedef xp.int_t INT_t

def resample(int num, xp.ndarray[DOUBLE_t, ndim=1] wcum):
    cdef int start, i, j, length
    cdef double n

    start = 0
    cdef xp.ndarray[INT_t, ndim=1] idxs = xp.zeros(num).astype(xp.int)
    cdef xp.ndarray[DOUBLE_t, ndim=1] rand = xp.sort(xp.random.rand(num))
    length = rand.size

    for i in xrange(length):
        for j in xrange(start, num):
            if rand[i] <= wcum[j]:
                idxs[i] = start = j
                break

    return idxs

resample_setup.py


#!/usr/bin/env python
# -*- coding: utf-8 -*-
#filename_setup.py
from distutils.core import setup
from distutils.extension import Extension
from Cython.Distutils import build_ext as build_pyx
import numpy

setup(
    name = 'resample',
    ext_modules = [Extension('resample', ['resample.pyx'])],
    cmdclass = { 'build_ext': build_pyx },
    include_dirs = [numpy.get_include()],
)
$ cython -a resample.pyx #compiler
$ python resample_setup.py build_ext --inplace #Construire

Cython Cela n'a pas été beaucoup plus rapide, probablement parce que je ne l'ai pas bien compris. (Merci pour votre commentaire)

Exemple de programme d'utilisation

Lorsque le bruit du système et le bruit d'observation sont une distribution de Cauchy indépendante multidimensionnelle. La résistance aux valeurs aberrantes est ajoutée. Référence: Filtrage de la trajectoire de mouvement à l'aide d'un modèle d'espace d'état auto-organisé

py.test_particle.py


#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""particle.exemple d'utilisation py
"""

import particle as par
import utils
import numpy as xp
# import cupy as xp


#réglages des paramètres--------------------
num = 3000  #Nombre de particules
v_sys = 0.01  #Co-dispersion du bruit du système
v_obs = 0.1  #Co-dispersion du bruit observé
v_noise = 0.05

#Particules initiales
mins = -5. * xp.ones(4)
maxs = +5. * xp.ones(4)
pars_init = utils.rand_uniform(mins, maxs, num)
# ---------------------------------

dataset = utils.load_data("testdata")
# dataset = utils.generate_data("testdata")

#Génération de modèle d'état(Modèle de différence de plancher secondaire)
A = xp.kron(xp.eye(2), xp.array([[2, -1], [1, 0]]))
f = lambda X: A.dot(X)  #Définition de la fonction de transition d'état
B = xp.zeros(1)
g = lambda U: B.dot(U)

C = xp.ones(4) * v_sys
sysn_model = par.CauchyNoiseModel(C)

#Génération de modèle d'observation(Modèle d'observation directe)
D = xp.kron(xp.eye(2), xp.array([1, 0]))
h = lambda X: D.dot(X)  #Définition de la fonction d'observation

E = xp.ones(2) * v_obs
obsn_model = par.CauchyNoiseModel(E)

#Tracé initial
lines = utils.init_plot_particle(dataset, pars_init)

#Génération de filtre à particules
pf = par.ParticleFilter(
    f, g, h, sysn_model, obsn_model, pars_init)

x_est = xp.zeros(dataset.x.shape)
for i, y in enumerate(dataset.y):
    pf.update(y, xp.zeros(1))
    state = pf.estimate()
    x_est[i,:] = h(state)

    #Graphique de données
    pars = pf.particles()
    utils.plot_each_particle(lines, x_est, pars, i)

# utils.save_as_gif_particle(dataset, pars_init, pf, "particle_selforganized.gif")

Terrain ou système utilitaire

utiils.py


#!/usr/bin/env python
# -*- coding: utf-8 -*-


import matplotlib.animation as animation
import matplotlib.pyplot as plt
from collections import namedtuple
import numpy as xp
# import cupy as xp


Dataset = namedtuple("Dataset", ["t", "x", "y"])


def load_data(dirname):
    t = xp.loadtxt(dirname + "/t.txt")
    x = xp.loadtxt(dirname + "/x.txt")
    y = xp.loadtxt(dirname + "/y.txt")
    dataset = Dataset(t=t, x=x, y=y)
    return dataset


def save_data(dirname, dataset):
    xp.savetxt(dirname + "/t.txt", dataset.t)
    xp.savetxt(dirname + "/x.txt", dataset.x)
    xp.savetxt(dirname + "/y.txt", dataset.y)


def generate_data(dirname):
    # truth
    t = xp.arange(0, 6*xp.pi, 0.05)
    x = xp.c_[t*xp.cos(t), t*xp.sin(t)]

    # noise
    nr, nc = x.shape
    idx = xp.random.randint(nr/2, nr, 5)
    n = xp.random.normal(0., xp.sqrt(v_noise), x.shape)
    n[nr/3:2*nr/3,:] = xp.random.normal(0., xp.sqrt(v_noise*10), (2*nr/3-nr/3,nc))
    n[idx,:] += xp.random.normal(0., xp.sqrt(v_noise)*20, (5,nc))

    # observation
    y = x + n
    x_est = xp.zeros(x.shape)
    Dataset = namedtuple("Dataset", ["t", "x", "y"])
    dataset = Dataset(t=t, x=x, y=y)

    save_data(dirname, dataset)
    return dataset


def init_plot_particle(dataset, pars):
    fig, ax = plt.subplots(1, 1)
    ax.hold(True)
    ax.axis("equal")
    lines = []
    lines.append( ax.plot(pars[0,:], pars[2,:], "o") )
    lines.append( ax.plot(dataset.x.T[0], dataset.x.T[1], ":"))
    lines.append( ax.plot(dataset.y.T[0], dataset.y.T[1], ".-"))
    lines.append( ax.plot([0], [0]) )
    plt.legend(["particles", "truth", "observed", "estimated"])
    plt.xlabel("x")
    plt.ylabel("y")
    plt.grid(True)
    return lines


def plot_each_particle(lines, x_est, pars, i):
    lines[0][0].set_data(pars[0,:], pars[2,:])
    lines[3][0].set_data(x_est.T[0][:i], x_est.T[1][:i])
    plt.pause(1e-5)


def save_as_gif_particle(dataset, pars_init, pf, filename):
    x_est = xp.zeros(dataset.x.shape)
    lines = init_plot_particle(dataset, pars_init, x_est)

    def draw(i):
        pf.update(dataset.y[i])
        state = pf.estimate()
        x_est[i,:] = [state[0], state[2]]

        #Graphique de données
        pars = pf.particles()
        plot_each_particle(lines, x_est, pars, i)

    ny, _ = dataset.y.shape
    ani = animation.FuncAnimation(fig, draw, ny, blit=False, interval=100, repeat=False)
    ani.save(filename, writer="imagemagick")


def init_plot_kalman(dataset):
    fig, ax = plt.subplots(1, 1)
    ax.hold(True)
    ax.axis("equal")
    lines = []
    lines.append( ax.plot(dataset.x.T[0], dataset.x.T[1], ":"))
    lines.append( ax.plot(dataset.y.T[0], dataset.y.T[1], ".-"))
    lines.append( ax.plot([0], [0]) )
    plt.legend(["truth", "observed", "estimated"])
    plt.xlabel("x")
    plt.ylabel("y")
    plt.grid(True)
    return lines


def plot_each_kalman(lines, x_est, i):
    lines[2][0].set_data(x_est.T[0][:i], x_est.T[1][:i])
    plt.pause(1e-5)


def save_as_gif_kalman(dataset, kf, filename):
    x_est = xp.zeros(dataset.x.shape)
    lines = init_plot_kalman(dataset, x_est)

    def draw(i):
        kf.update(dataset.y[i])
        state = pf.estimate()
        x_est[i,:] = [state[0], state[2]]

        #Graphique de données
        plot_each_kalman(lines, x_est, i)

    ny, _ = dataset.y.shape
    ani = animation.FuncAnimation(fig, draw, ny, blit=False, interval=100, repeat=False)
    ani.save(filename, writer="imagemagick")


def rand_uniform(mins, maxs, num):
    """Pour la génération de particules initiales

    @param  ndarray(nx) mins :Chaque valeur minimale de la particule
    @param  ndarray(nx) maxs :Chaque valeur maximale de la particule
    @param  int num :Nombre de particules
    @return ndarray(nx,num)  :Particules initiales
    """
    nx = mins.size
    return (mins.reshape(nx, 1) * xp.ones(num)
        + (maxs - mins).reshape(nx, 1) * xp.random.rand(nx, num))

prime

Corps de filtre Kalman

kalman.py


#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""Implémentation Python du filtre Kalman
"""

import numpy as xp
# import cupy as xp


class KalmanFilter(object):

    def __init__(self, F, H, Q, R, x_init, P_init):
        self._x = x_init.reshape(-1,1)
        self._P = P_init

        self._update_x    = lambda x     : F.dot(x)
        self._update_P    = lambda P     : Q + F.dot(P).dot(F.T)
        self._error       = lambda x,y   : y - H.dot(x)
        self._cov_P       = lambda P     : R + H.dot(P).dot(H.T)
        self._kalman_gain = lambda P,S   : P.dot(H.T).dot(xp.linalg.inv(S))
        self._estimate_x  = lambda x,K,e : x + K.dot(e)
        self._estimate_P  = lambda P,K   : (xp.eye(*P.shape)-K.dot(H)).dot(P)

    def update(self, y):
        #Prévoir
        x_predicted = self._update_x(self._x)
        P_predicted = self._update_P(self._P)

        #Calcul des paramètres
        e = self._error(x_predicted, y.reshape(-1,1))
        S = self._cov_P(P_predicted)
        K = self._kalman_gain(P_predicted, S)

        #mise à jour
        self._x = self._estimate_x(x_predicted, K, e)
        self._P = self._estimate_P(P_predicted, K)

    def estimate(self):
        return self._x

Exemple de programme d'utilisation

test_kalman.py


#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""kalman.exemple d'utilisation py
"""

import kalman
import utils
import numpy as xp
# import cupy as xp


#réglages des paramètres--------------------
var_Q = 0.01
var_R = 1

x_init = xp.zeros(4)
P_init = xp.ones(4)
# ---------------------------------

dataset = utils.load_data("testdata")
# dataset = utils.generate_data("testdata")

#Génération de modèle d'état(Modèle de différence de plancher secondaire)
F = xp.kron(xp.eye(2), xp.array([[2, -1], [1, 0]]))
Q = var_Q * xp.eye(4)

#Génération de modèle d'observation(Modèle d'observation directe)
H = xp.kron(xp.eye(2), xp.array([1, 0]))
R = var_R * xp.eye(2)

#Tracé initial
lines = utils.init_plot_kalman(dataset)

kf = kalman.KalmanFilter(F, H, Q, R, x_init, P_init)

x_est = xp.zeros(dataset.x.shape)
for i, y in enumerate(dataset.y):
    kf.update(y)
    state = kf.estimate()
    x_est[i,:] = H.dot(state.reshape(-1,1)).T

    #Graphique de données
    utils.plot_each_kalman(lines, x_est, i)

# utils.save_as_gif_kalman(dataset, kf, "kalman.gif")

Recommended Posts

Implémentation Python du filtre à particules
Implémentation Python du filtre à particules auto-organisateur
Implémentation du tri rapide en Python
Implémentation du filtre à particules par Python et application au modèle d'espace d'états
Mise en place d'un filtre à particules simple
Implémentation des notifications de bureau à l'aide de Python
Implémentation Python de l'arborescence de segments non récursive
Implémentation de Light CNN (Python Keras)
Implémentation du tri original en Python
Implémentation de la méthode Dyxtra par python
Les bases de Python ①
Bases de python ①
Implémentation Python du modèle Markov caché continu
Introduction de Python
Pourquoi l'implémentation Python d'ISUCON 5 a utilisé Bottle
Implémentation de l'arbre TRIE avec Python et LOUDS
[Entretien de codage] Implémentation de la machine cryptographique Enigma (Python)
Explication de la distance d'édition et de l'implémentation en Python
Liste des modules python
Implémentation ValueObject en Python
Unification de l'environnement Python
Copie des préférences python
Principes de base du grattage Python
[python] comportement d'argmax
Utilisation des locaux Python ()
le zen de Python
Implémentation de la séquence de Fibonacci
# 4 [python] Bases des fonctions
Connaissance de base de Python
Anecdotes sobres de python3
Résumé des arguments Python
Bases de python: sortie
Installation de matplotlib (Python 3.3.2)
Application de Python 3 vars
Implémentation SVM en python
Divers traitements de Python
[Python] Implémentation du clustering à l'aide d'un modèle gaussien mixte
Exemple d'implémentation d'un système de traitement LISP simple (version Python)
Implémentation d'estimation la plus probable du modèle de sujet en python
Implémentation python de la classe de régression linéaire bayésienne
Implémentation d'estimation bayésienne de variante du modèle de sujet en python
Un mémorandum sur la mise en œuvre des recommandations en Python
Implémentation Python du mode de fusion CSS3 et discussion sur l'espace colorimétrique
[Python] Utilisation correcte de la carte
Ecrire des filtres Pandec en Python
Vers la retraite de Python2
Implémentation de TF-IDF à l'aide de gensim
Implémentation de MathJax sur Sphinx
Résumé des opérations de liste Python3
Python - Démarrage rapide de la journalisation
Recommandation de la bibliothèque binpacking de python
Comment écrire un exemple d'implémentation E14 Python en temps réel hors ligne
[python] Valeur de l'objet fonction (?)
[Line / Python] Mémo d'implémentation Beacon
Mise à jour automatique du module Python
Python - Vérifiez le type de valeurs
Analyse statique des programmes Python
À propos de divers encodages de Python 3