Hello. I wrote the EM algorithm calculation for the mixed Gaussian distribution problem in Python. Examples of univariates and bivariates. Since the initial values etc. are randomly generated, you can see that the progress of convergence changes variously if you run it repeatedly.
$ ./em.py --univariate
nstep= 98 log(likelihood) = -404.36
$ ./em.py --bivariate
nstep= 39 log(likelihood) = -1534.51
em.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import division
from __future__ import print_function
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.mlab as mlab
from matplotlib.patches import Ellipse
#Mixed Gaussian distribution par= (pi, mean, var): (Mixing factor, mean, variance)
def gaussians(x, par):
return [gaussian(x-mu, var) * pi for pi,mu,var in zip(*par)]
#Gaussian distribution
def gaussian(x, var):
nvar = n_variate(x)
if not nvar:
qf, detvar, nvar = x**2/var, var, 1
else:
qf, detvar = np.dot(np.linalg.solve(var, x), x), np.linalg.det(var)
return np.exp(-qf/2) / np.sqrt(detvar*(2*np.pi)**nvar)
#Log likelihood
def loglikelihood(data, par):
gam = [gaussians(x, par) for x in data]
ll = sum([np.log(sum(g)) for g in gam])
return ll, gam
#E step
def e_step(data, pars):
ll, gam = loglikelihood(data, pars)
gammas = transpose(map(normalize, gam))
return gammas, ll
#M step pars= (pis, means, vars)
def m_step(data, gammas):
ws = map(sum, gammas)
pis = normalize(ws)
means = [np.dot(g, data)/w for g, w in zip(gammas, ws)]
vars = [make_var(g, data, mu)/w for g, w, mu in zip(gammas, ws, means)]
return pis, means, vars
#Covariance
def make_var(gammas, data, mean):
return np.sum([g * make_cov(x-mean) for g, x in zip(gammas, data)], axis=0)
def make_cov(x):
nvar = n_variate(x)
if not nvar:
return x**2
m = np.matrix(x)
return m.reshape(nvar, 1) * m.reshape(1, nvar)
#n-variable
def n_variate(x):
if isinstance(x, (list, np.ndarray)):
return len(x)
return 0 # univariate
#Normalization
def normalize(lst):
s = sum(lst)
return [x/s for x in lst]
#Transpose
def transpose(a):
return zip(*a)
def flatten(lst):
if isinstance(lst[0], np.ndarray):
lst = map(list, lst)
return sum(lst, [])
#ellipse
def ellipse(cov, mstd=1.0):
vals, vecs = eigsorted(cov)
radii = mstd * np.sqrt(vals)
tilt = np.degrees(np.arctan2(*vecs[:,0][::-1]))
return radii, tilt
def eigsorted(cov):
vals, vecs = np.linalg.eigh(cov)
order = vals.argsort()[::-1]
return vals[order], vecs[:,order]
# color map
def cm(x, a=0.85):
if x > a:
return (1, 0, (1-x)/(1-a), 1)
return (x/a, 1-x/a, 1, 1)
#Result plot
def plot_em(data, ls, par):
nvar = n_variate(data[0])
col = [cm((l-ls[0])/(ls[-1]-ls[0])) for l in ls]
ax1 = plt.subplot2grid((4, 1), (0, 0), rowspan=3) # subplot(211)
ax2 = plt.subplot2grid((4, 1), (3, 0)) # subplot(212)
if not nvar:
subplot_hist(data, par, col, ax1)
elif nvar == 2:
subplot_bivariate(data, ls, par, col, ax1)
subplot_loglikelihood(ls, col, ax2)
plt.show()
return 0
#Histogram display (univariate)
def subplot_hist(data, pars, col, ax):
xs = np.linspace(min(data), max(data), 200)
ax.hist(data, bins=20, normed=True, alpha=0.1)
for par, c in zip(pars, col):
norm = [mlab.normpdf(xs, m, np.sqrt(var))*pi for pi,m,var in zip(*par)]
ax.plot(xs, sum(norm), c=c, lw=1, alpha=0.8)
ax.set_xlim(min(data), max(data))
ax.set_xlabel("x")
ax.set_ylabel("Probability")
ax.grid()
return 0
#Display of bivariate Gaussian distribution
def subplot_bivariate(data, ls, par, cols, ax):
x, y = zip(*data)
ax.plot(x, y, 'g.', alpha=0.5)
ax.grid()
ax.set(aspect=1) # 'equal'
nstep = 4
mstd = 4.0
for i in range(nstep):
j = ((len(ls)-1)*i)//(nstep-1)
(pi, mean, cov), col = par[j], cols[j]
for m, c in zip(mean, cov):
radii, tilt = ellipse(c, mstd)
ax.add_artist(Ellipse(xy=m, width=radii[0], height=radii[1], angle=tilt, ec=col, fc='none', alpha=0.5))
return 0
#Transition of log-likelihood
def subplot_loglikelihood(ls, col, ax):
ax.scatter(range(len(ls)), ls, c=col, edgecolor='none')
ax.set_xlim(-1, len(ls))
ax.set_xlabel("steps")
ax.set_ylabel("loglikelihood")
ax.grid()
return 0
#Mixed Gaussian distribution data (K:Number of mixed Gaussian distributions)
def make_data(typ_nvariate):
if typ_nvariate == 'univariate': #Univariate
par = [(2.0, 0.2, 100), (4.0, 0.4, 600), (6.0, 0.4, 300)]
data = flatten([np.random.normal(mu,sig,n) for mu,sig,n in par])
K = len(par)
means = [np.random.choice(data) for _ in range(K)]
vars = [np.var(data)]*K
elif typ_nvariate == 'bivariate': #Bivariate
nvar, ndat, sig = 2, 250, 0.4
centers = [[1, 1], [-1, -1], [1, -1]]
K = len(centers)
data = flatten([np.random.randn(ndat,nvar)*sig + np.array(c) for c in centers])
means = np.random.rand(K, nvar)
vars = [np.identity(nvar)]*K
pis = [1.0/K]*K
return data, (pis, means, vars)
#EM algorithm (gammas: 'burden rates', or 'responsibilities')
def em(typ_nvariate='univariate'):
delta_ls, max_step = 1e-5, 400
lls, pars = [], [] #Save the calculation result of each step
data, par = make_data(typ_nvariate)
for _ in range(max_step):
gammas, ll = e_step(data, par)
par = m_step(data, gammas)
pars.append(par)
lls.append(ll)
if len(lls) > 8 and lls[-1] - lls[-2] < delta_ls:
break
#Result output
print('nstep=%3d' % len(lls), " log(likelihood) =", lls[-1])
plot_em(data, lls[1:], pars[1:])
return 0
def main():
"""{f}: EM algorithm for a Gaussian mixture problem.
usage: {f} [-h] [--univariate | --bivariate]
options:
-h, --help show this help message and exit
--univariate calculates a univariate problem (default)
--bivariate calculates a bivariate problem
"""
import docopt, textwrap
args = docopt.docopt(textwrap.dedent(main.__doc__.format(f=__file__)))
typ_nvariate = ["univariate", "bivariate"][args["--bivariate"]]
em(typ_nvariate)
if __name__ == '__main__':
main()
Recommended Posts