EM algorithm calculation for mixed Gaussian distribution problem

Hello. I wrote the EM algorithm calculation for the mixed Gaussian distribution problem in Python. Examples of univariates and bivariates. Since the initial values etc. are randomly generated, you can see that the progress of convergence changes variously if you run it repeatedly.

$ ./em.py --univariate
nstep= 98  log(likelihood) = -404.36

$ ./em.py --bivariate
nstep= 39  log(likelihood) = -1534.51

figure_2.png figure_1.png

em.py


#!/usr/bin/env python
# -*- coding: utf-8 -*-

from __future__ import division
from __future__ import print_function
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.mlab as mlab
from matplotlib.patches import Ellipse

#Mixed Gaussian distribution par= (pi, mean, var): (Mixing factor, mean, variance)
def gaussians(x, par):
    return [gaussian(x-mu, var) * pi for pi,mu,var in zip(*par)]

#Gaussian distribution
def gaussian(x, var):
    nvar = n_variate(x)
    if not nvar:
        qf, detvar, nvar = x**2/var, var, 1
    else:
        qf, detvar = np.dot(np.linalg.solve(var, x), x), np.linalg.det(var)
    return np.exp(-qf/2) / np.sqrt(detvar*(2*np.pi)**nvar)

#Log likelihood
def loglikelihood(data, par):
    gam = [gaussians(x, par) for x in data]
    ll = sum([np.log(sum(g)) for g in gam])
    return ll, gam

#E step
def e_step(data, pars):
    ll, gam = loglikelihood(data, pars)
    gammas = transpose(map(normalize, gam))
    return gammas, ll

#M step pars= (pis, means, vars)
def m_step(data, gammas):
    ws = map(sum, gammas)
    pis = normalize(ws)
    means = [np.dot(g, data)/w for g, w in zip(gammas, ws)]
    vars = [make_var(g, data, mu)/w for g, w, mu in zip(gammas, ws, means)]
    return pis, means, vars

#Covariance
def make_var(gammas, data, mean):
    return np.sum([g * make_cov(x-mean) for g, x in zip(gammas, data)], axis=0)

def make_cov(x):
    nvar = n_variate(x)
    if not nvar:
        return x**2
    m = np.matrix(x)
    return m.reshape(nvar, 1) * m.reshape(1, nvar)

#n-variable
def n_variate(x):
    if isinstance(x, (list, np.ndarray)):
        return len(x)
    return 0  # univariate

#Normalization
def normalize(lst):
    s = sum(lst)
    return [x/s for x in lst]

#Transpose
def transpose(a):
    return zip(*a)

def flatten(lst):
    if isinstance(lst[0], np.ndarray):
        lst = map(list, lst)
    return sum(lst, [])

#ellipse
def ellipse(cov, mstd=1.0):
    vals, vecs = eigsorted(cov)
    radii = mstd * np.sqrt(vals)
    tilt = np.degrees(np.arctan2(*vecs[:,0][::-1]))
    return radii, tilt

def eigsorted(cov):
    vals, vecs = np.linalg.eigh(cov)
    order = vals.argsort()[::-1]
    return vals[order], vecs[:,order]

# color map
def cm(x, a=0.85):
    if x > a:
        return (1, 0, (1-x)/(1-a), 1)
    return (x/a, 1-x/a, 1, 1)

#Result plot
def plot_em(data, ls, par):
    nvar = n_variate(data[0])
    col = [cm((l-ls[0])/(ls[-1]-ls[0])) for l in ls]
    ax1 = plt.subplot2grid((4, 1), (0, 0), rowspan=3)  # subplot(211)
    ax2 = plt.subplot2grid((4, 1), (3, 0))  # subplot(212)
    if not nvar:
        subplot_hist(data, par, col, ax1)
    elif nvar == 2:
        subplot_bivariate(data, ls, par, col, ax1)
    subplot_loglikelihood(ls, col, ax2)
    plt.show()
    return 0

#Histogram display (univariate)
def subplot_hist(data, pars, col, ax):
    xs = np.linspace(min(data), max(data), 200)
    ax.hist(data, bins=20, normed=True, alpha=0.1)
    for par, c in zip(pars, col):
        norm = [mlab.normpdf(xs, m, np.sqrt(var))*pi for pi,m,var in zip(*par)]
        ax.plot(xs, sum(norm), c=c, lw=1, alpha=0.8)
    ax.set_xlim(min(data), max(data))
    ax.set_xlabel("x")
    ax.set_ylabel("Probability")
    ax.grid()
    return 0

#Display of bivariate Gaussian distribution
def subplot_bivariate(data, ls, par, cols, ax):
    x, y = zip(*data)
    ax.plot(x, y, 'g.', alpha=0.5)
    ax.grid()
    ax.set(aspect=1) # 'equal'
    nstep = 4
    mstd = 4.0
    for i in range(nstep):
        j = ((len(ls)-1)*i)//(nstep-1)
        (pi, mean, cov), col = par[j], cols[j]
        for m, c in zip(mean, cov):
            radii, tilt = ellipse(c, mstd)
            ax.add_artist(Ellipse(xy=m, width=radii[0], height=radii[1], angle=tilt, ec=col, fc='none', alpha=0.5))
    return 0

#Transition of log-likelihood
def subplot_loglikelihood(ls, col, ax):
    ax.scatter(range(len(ls)), ls, c=col, edgecolor='none')
    ax.set_xlim(-1, len(ls))
    ax.set_xlabel("steps")
    ax.set_ylabel("loglikelihood")
    ax.grid()
    return 0

#Mixed Gaussian distribution data (K:Number of mixed Gaussian distributions)
def make_data(typ_nvariate):
    if typ_nvariate == 'univariate':  #Univariate
        par = [(2.0, 0.2, 100), (4.0, 0.4, 600), (6.0, 0.4, 300)]
        data = flatten([np.random.normal(mu,sig,n) for mu,sig,n in par])
        K = len(par)
        means = [np.random.choice(data) for _ in range(K)]
        vars = [np.var(data)]*K
    elif typ_nvariate == 'bivariate':  #Bivariate
        nvar, ndat, sig = 2, 250, 0.4
        centers = [[1, 1], [-1, -1], [1, -1]]
        K = len(centers)
        data = flatten([np.random.randn(ndat,nvar)*sig + np.array(c) for c in centers])
        means = np.random.rand(K, nvar)
        vars = [np.identity(nvar)]*K
    pis = [1.0/K]*K
    return data, (pis, means, vars)

#EM algorithm (gammas: 'burden rates', or 'responsibilities')
def em(typ_nvariate='univariate'):
    delta_ls, max_step = 1e-5, 400
    lls, pars = [], []  #Save the calculation result of each step
    data, par = make_data(typ_nvariate)
    for _ in range(max_step):
        gammas, ll = e_step(data, par)
        par = m_step(data, gammas)
        pars.append(par)
        lls.append(ll)
        if len(lls) > 8 and lls[-1] - lls[-2] < delta_ls:
            break
    #Result output
    print('nstep=%3d' % len(lls), " log(likelihood) =", lls[-1])
    plot_em(data, lls[1:], pars[1:])
    return 0

def main():
    """{f}: EM algorithm for a Gaussian mixture problem.

    usage: {f} [-h] [--univariate | --bivariate]
    
    options:
        -h, --help    show this help message and exit
        --univariate  calculates a univariate problem (default)
        --bivariate   calculates a bivariate problem
    """
    import docopt, textwrap
    args = docopt.docopt(textwrap.dedent(main.__doc__.format(f=__file__)))
    typ_nvariate = ["univariate", "bivariate"][args["--bivariate"]]
    em(typ_nvariate)

if __name__ == '__main__':
    main()

Recommended Posts

EM algorithm calculation for mixed Gaussian distribution problem
EM of mixed Gaussian distribution
Gaussian mixed model EM algorithm [statistical machine learning]
Mixed Gaussian distribution and logsumexp
Derivation of EM algorithm and calculation example for coin toss
PRML Chapter 9 Mixed Gaussian Distribution Python Implementation
Variational Bayesian estimation of mixed Gaussian distribution
(Machine learning) I tried to understand the EM algorithm in a mixed Gaussian distribution carefully with implementation.