This time I implemented the variational Gaussian distribution mentioned at the end of the previous article. Last time, the parameters of the mixed Gaussian distribution were most likely estimated using the EM algorithm, but this time, the parameters of the mixed Gaussian distribution are Bayesian estimated using the variational Bayesian method. In the previous implementation, we set the number of mixed elements in the mixed Gaussian distribution, but by using the variational Bayesian method, the number of mixed elements is automatically determined. There are some other advantages of treating it like Bayesian.
In the maximum likelihood estimation of the normal mixed Gaussian distribution, the mixing coefficient $ \ pi_k $, the mean $ \ mu_k $, and the covariance $ \ Sigma_k $ (or the accuracy $ \ Lambda_k $) were the maximum likelihood estimates, but this time all of them are Estimate the probability distribution as a random variable.
The figure below (extracted from PRML Figure 10.5) is a graphical model of the model used this time. The joint distribution of all these random variables
p({\bf X}, {\bf Z}, {\bf \pi}, {\bf \mu}, {\bf \Lambda}) = p({\bf X}|{\bf Z},{\bf\mu},{\bf\Lambda})p({\bf Z}|{\bf\pi})p({\bf\pi})p({\bf\mu}|{\bf\Lambda})p({\bf\Lambda})
It will be.
The point of this variational Bayesian method is the posterior distribution $ p ({\ bf Z}, {\ bf \ pi}, {\ bf \ mu}, {after observing ** $ {\ bf X} $. \ bf \ Lambda} | {\ bf X}) $ is a variational approximation to the probability distribution decomposed into the latent variable $ {\ bf Z} $ and the parameters as follows **.
\begin{align}
p({\bf Z}, {\bf \pi}, {\bf \mu}, {\bf \Lambda}|{\bf X}) &\approx q({\bf Z}, {\bf \pi}, {\bf \mu}, {\bf \Lambda})\\
&= q({\bf Z})q({\bf\pi},{\bf\mu},{\bf\Lambda})
\end{align}
Since it is difficult to calculate the original posterior distribution exactly, variational approximation is used.
The general flow of this variational Bayesian method is
In addition to the usual matplotlib and numpy, I also use scipy. Since we need a digamma function and a gamma function, we will use a library other than Numpy for the algorithm part this time as well.
import matplotlib.cm as cm
import matplotlib.pyplot as plt
import numpy as np
from scipy.special import digamma, gamma
class VariationalGaussianMixture(object):
def __init__(self, n_component=10, alpha0=1.):
#Number of mixed elements in a mixed Gaussian distribution
self.n_component = n_component
#Prior distribution parameters of mixing coefficient pi
self.alpha0 = alpha0
def init_params(self, X):
self.sample_size, self.ndim = X.shape
#Set prior distribution parameters
self.alpha0 = np.ones(self.n_component) * self.alpha0
self.m0 = np.zeros(self.ndim)
self.W0 = np.eye(self.ndim)
self.nu0 = self.ndim
self.beta0 = 1.
self.component_size = self.sample_size / self.n_component + np.zeros(self.n_component)
#Initialize the parameters of the probability distribution
self.alpha = self.alpha0 + self.component_size
self.beta = self.beta0 + self.component_size
indices = np.random.choice(self.sample_size, self.n_component, replace=False)
self.m = X[indices].T
self.W = np.tile(self.W0, (self.n_component, 1, 1)).T
self.nu = self.nu0 + self.component_size
#Returns the parameters of the probability distribution
def get_params(self):
return self.alpha, self.beta, self.m, self.W, self.nu
#Variational Bayesian method
def fit(self, X, iter_max=100):
self.init_params(X)
#Update until the probability distribution converges
for i in xrange(iter_max):
params = np.hstack([array.flatten() for array in self.get_params()])
#Probability distribution q(z)Update
r = self.e_like_step(X)
#Probability distribution q(pi, mu, Lambda)Update
self.m_like_step(X, r)
#If it converges, it ends
if np.allclose(params, np.hstack([array.ravel() for array in self.get_params()])):
break
else:
print("parameters may not have converged")
#Probability distribution q(z)Update
def e_like_step(self, X):
d = X[:, :, None] - self.m
gauss = np.exp(
-0.5 * self.ndim / self.beta
- 0.5 * self.nu * np.sum(
np.einsum('ijk,njk->nik', self.W, d) * d,
axis=1)
)
pi = np.exp(digamma(self.alpha) - digamma(self.alpha.sum()))
Lambda = np.exp(digamma(self.nu - np.arange(self.ndim)[:, None]).sum(axis=0) + self.ndim * np.log(2) + np.linalg.slogdet(self.W.T)[1])
#PRML formula(10.67)Burden rate calculation
r = pi * np.sqrt(Lambda) * gauss
#PRML formula(10.49)Normalize the burden rate
r /= np.sum(r, axis=-1, keepdims=True)
#In case 0% occurs
r[np.isnan(r)] = 1. / self.n_component
return r
#Probability distribution q(pi, mu, Lambda)Update
def m_like_step(self, X, r):
#PRML formula(10.51)
self.component_size = r.sum(axis=0)
#PRML formula(10.52)
Xm = X.T.dot(r) / self.component_size
d = X[:, :, None] - Xm
#PRML formula(10.53)
S = np.einsum('nik,njk->ijk', d, r[:, None, :] * d) / self.component_size
#PRML formula(10.58) q(pi)Update parameters for
self.alpha = self.alpha0 + self.component_size
#PRML formula(10.60)
self.beta = self.beta0 + self.component_size
#PRML formula(10.61)
self.m = (self.beta0 * self.m0[:, None] + self.component_size * Xm) / self.beta
d = Xm - self.m0[:, None]
#PRML formula(10.62)
self.W = np.linalg.inv(
np.linalg.inv(self.W0)
+ (self.component_size * S).T
+ (self.beta0 * self.component_size * np.einsum('ik,jk->ijk', d, d) / (self.beta0 + self.component_size)).T).T
#PRML formula(10.63)
self.nu = self.nu0 + self.component_size
# p(test data|Training data)Pi,mu,Approximate with Lambda estimates
def predict_proba(self, X):
#PRML formula(B.80)Expected value of precision matrix Lambda
covs = self.nu * self.W
precisions = np.linalg.inv(covs.T).T
d = X[:, :, None] - self.m
exponents = np.sum(np.einsum('nik,ijk->njk', d, precisions) * d, axis=1)
#PRML formula(10.38)
gausses = np.exp(-0.5 * exponents) / np.sqrt(np.linalg.det(covs.T).T * (2 * np.pi) ** self.ndim)
#PRML formula(10.69)Expected value of mixing coefficient pi
gausses *= (self.alpha0 + self.component_size) / (self.n_component * self.alpha0 + self.sample_size)
#Peripheralization of latent variable z
return np.sum(gausses, axis=-1)
#Clustering
def classify(self, X):
#Probability distribution q(Z)Arg max
return np.argmax(self.e_like_step(X), 1)
#Calculate student's t distribution
def student_t(self, X):
nu = self.nu + 1 - self.ndim
#PRML formula(10.82)
L = nu * self.beta * self.W / (1 + self.beta)
d = X[:, :, None] - self.m
#PRML formula(B.72)
maha_sq = np.sum(np.einsum('nik,ijk->njk', d, L) * d, axis=1)
#PRML formula(B.68)
return (
gamma(0.5 * (nu + self.ndim))
* np.sqrt(np.linalg.det(L.T))
* (1 + maha_sq / nu) ** (-0.5 * (nu + self.ndim))
/ (gamma(0.5 * nu) * (nu * np.pi) ** (0.5 * self.ndim)))
#Predicted distribution p(test data|Training data)Calculate
def predict_dist(self, X):
#PRML formula(10.81)
return (self.alpha * self.student_t(X)).sum(axis=-1) / self.alpha.sum()
import matplotlib.cm as cm
import matplotlib.pyplot as plt
import numpy as np
from scipy.special import digamma, gamma
class VariationalGaussianMixture(object):
def __init__(self, n_component=10, alpha0=1.):
self.n_component = n_component
self.alpha0 = alpha0
def init_params(self, X):
self.sample_size, self.ndim = X.shape
self.alpha0 = np.ones(self.n_component) * self.alpha0
self.m0 = np.zeros(self.ndim)
self.W0 = np.eye(self.ndim)
self.nu0 = self.ndim
self.beta0 = 1.
self.component_size = self.sample_size / self.n_component + np.zeros(self.n_component)
self.alpha = self.alpha0 + self.component_size
self.beta = self.beta0 + self.component_size
indices = np.random.choice(self.sample_size, self.n_component, replace=False)
self.m = X[indices].T
self.W = np.tile(self.W0, (self.n_component, 1, 1)).T
self.nu = self.nu0 + self.component_size
def get_params(self):
return self.alpha, self.beta, self.m, self.W, self.nu
def fit(self, X, iter_max=100):
self.init_params(X)
for i in xrange(iter_max):
params = np.hstack([array.flatten() for array in self.get_params()])
r = self.e_like_step(X)
self.m_like_step(X, r)
if np.allclose(params, np.hstack([array.ravel() for array in self.get_params()])):
break
else:
print("parameters may not have converged")
def e_like_step(self, X):
d = X[:, :, None] - self.m
gauss = np.exp(
-0.5 * self.ndim / self.beta
- 0.5 * self.nu * np.sum(
np.einsum('ijk,njk->nik', self.W, d) * d,
axis=1)
)
pi = np.exp(digamma(self.alpha) - digamma(self.alpha.sum()))
Lambda = np.exp(digamma(self.nu - np.arange(self.ndim)[:, None]).sum(axis=0) + self.ndim * np.log(2) + np.linalg.slogdet(self.W.T)[1])
r = pi * np.sqrt(Lambda) * gauss
r /= np.sum(r, axis=-1, keepdims=True)
r[np.isnan(r)] = 1. / self.n_component
return r
def m_like_step(self, X, r):
self.component_size = r.sum(axis=0)
Xm = X.T.dot(r) / self.component_size
d = X[:, :, None] - Xm
S = np.einsum('nik,njk->ijk', d, r[:, None, :] * d) / self.component_size
self.alpha = self.alpha0 + self.component_size
self.beta = self.beta0 + self.component_size
self.m = (self.beta0 * self.m0[:, None] + self.component_size * Xm) / self.beta
d = Xm - self.m0[:, None]
self.W = np.linalg.inv(
np.linalg.inv(self.W0)
+ (self.component_size * S).T
+ (self.beta0 * self.component_size * np.einsum('ik,jk->ijk', d, d) / (self.beta0 + self.component_size)).T).T
self.nu = self.nu0 + self.component_size
def predict_proba(self, X):
covs = self.nu * self.W
precisions = np.linalg.inv(covs.T).T
d = X[:, :, None] - self.m
exponents = np.sum(np.einsum('nik,ijk->njk', d, precisions) * d, axis=1)
gausses = np.exp(-0.5 * exponents) / np.sqrt(np.linalg.det(covs.T).T * (2 * np.pi) ** self.ndim)
gausses *= (self.alpha0 + self.component_size) / (self.n_component * self.alpha0 + self.sample_size)
return np.sum(gausses, axis=-1)
def classify(self, X):
return np.argmax(self.e_like_step(X), 1)
def student_t(self, X):
nu = self.nu + 1 - self.ndim
L = nu * self.beta * self.W / (1 + self.beta)
d = X[:, :, None] - self.m
maha_sq = np.sum(np.einsum('nik,ijk->njk', d, L) * d, axis=1)
return (
gamma(0.5 * (nu + self.ndim))
* np.sqrt(np.linalg.det(L.T))
* (1 + maha_sq / nu) ** (-0.5 * (nu + self.ndim))
/ (gamma(0.5 * nu) * (nu * np.pi) ** (0.5 * self.ndim)))
def predict_dist(self, X):
return (self.alpha * self.student_t(X)).sum(axis=-1) / self.alpha.sum()
def create_toy_data():
x1 = np.random.normal(size=(100, 2))
x1 += np.array([-5, -5])
x2 = np.random.normal(size=(100, 2))
x2 += np.array([5, -5])
x3 = np.random.normal(size=(100, 2))
x3 += np.array([0, 5])
return np.vstack((x1, x2, x3))
def main():
X = create_toy_data()
model = VariationalGaussianMixture(n_component=10, alpha0=0.01)
model.fit(X, iter_max=100)
labels = model.classify(X)
x_test, y_test = np.meshgrid(
np.linspace(-10, 10, 100), np.linspace(-10, 10, 100))
X_test = np.array([x_test, y_test]).reshape(2, -1).transpose()
probs = model.predict_dist(X_test)
plt.scatter(X[:, 0], X[:, 1], c=labels, cmap=cm.get_cmap())
plt.contour(x_test, y_test, probs.reshape(100, 100))
plt.xlim(-10, 10)
plt.ylim(-10, 10)
plt.show()
if __name__ == '__main__':
main()
The video below shows clustering using the variational Bayesian method. The color of the dots indicates which cluster it belongs to, and as the learning progresses, the number of valid mixed elements decreases, and finally it becomes three. (The above code does not show such progress and only shows the final result)
Chapter 10 of PRML introduces examples of using variational Bayesian methods not only for mixed Gauss but also for linear regression and logistic regression, so we will implement them if we have the opportunity.
Recommended Posts