Source code for alex.ml.gmm.gmm

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# This code is based on https://github.com/scikit-learn/scikit-learn/blob/master/sklearn/mixture/gmm.py code
# which is distributed under the new BSD license.

import numpy as np
import cPickle as pickle

from sklearn.utils.extmath import logsumexp

EPS = np.finfo(float).eps


[docs]class GMM: """This is a GMM model of the input data. It is memory efficient so that it can process very large input array like objects. The mixtures are incrementally added by splitting the heaviest component in two components and perturbation of the original mean. """ def __init__(self, n_features=1, n_components=1, thresh=1e-3, min_covar=1e-3, n_iter=1): self.n_features = n_features self.n_components = n_components self.thresh = thresh self.min_covar = min_covar self.n_iter = n_iter self.weights = np.ones(self.n_components) / self.n_components self.means = np.zeros((self.n_components, self.n_features)) self.covars = np.ones((self.n_components, self.n_features)) def __str__(self): s = [] s.append("W: %s" % str(self.weights)) s.append("M: %s" % str(self.means)) s.append("C: %s" % str(self.covars)) return '\n'.join(s)
[docs] def log_multivariate_normal_density_diag(self, x, means=0.0, covars=1.0): """Compute Gaussian log-density at X for a diagonal model""" s, n_dim = x.shape lpr = - 0.5 * (n_dim * np.log(2 * np.pi) + np.sum( np.log(covars), 1)) - 0.5 * np.sum(((x - means) ** 2 / covars), 1) # print "m", means # print "c", covars # print "lm", - 0.5* np.sum(((x - means) ** 2) / covars) return lpr
[docs] def expectation(self, x): """ Evaluate one example """ lpr = np.log(self.weights) + self.log_multivariate_normal_density_diag( x, self.means, self.covars) # print "lpr", lpr log_prob = logsumexp(lpr) responsibilities = np.exp(lpr - log_prob) return log_prob, responsibilities
[docs] def score(self, x): """Get the log prob of the x variable being generated by the mixture.""" x = x.reshape((1, len(x))) lpr = np.log(self.weights) + self.log_multivariate_normal_density_diag( x, self.means, self.covars) log_prob = logsumexp(lpr) return log_prob
[docs] def mixup(self, n_new_mixies): """Add n new mixies to the mixture.""" for n in range(n_new_mixies): # the heaviest compnent c_to_split = np.argmax(self.weights) # print "weights", self.weights # print "cts", c_to_split c_weights = self.weights[c_to_split] c_means = self.means[c_to_split] c_covars = self.covars[c_to_split] # print c_weights # print c_means # print c_covars # add the first component new_weights = np.append(self.weights, c_weights / 2) new_means = np.append(self.means, np.random.multivariate_normal( c_means, np.diag(c_covars * (0.2 ** 2)))[np.newaxis, :], 0) new_covars = np.append(self.covars, c_covars[np.newaxis, :], 0) # add the second component new_weights = np.append(new_weights, c_weights / 2) new_means = np.append(new_means, np.random.multivariate_normal( c_means, np.diag(c_covars * (0.2 ** 2)))[np.newaxis, :], 0) new_covars = np.append(new_covars, c_covars[np.newaxis, :], 0) new_weights = np.delete(new_weights, c_to_split) new_means = np.delete(new_means, c_to_split, 0) new_covars = np.delete(new_covars, c_to_split, 0) self.weights, self.means, self.covars = new_weights, new_means, new_covars self.n_components += 1
[docs] def fit(self, X): # init the algorithm self.log_probs = [] for i in range(self.n_iter): log_prob = 0 n = 0 acc_weights = np.zeros(self.n_components) acc_means = np.zeros((self.n_components, self.n_features)) acc_covars = np.zeros((self.n_components, self.n_features)) for x in X: x_m = x.reshape((1, len(x))) #expectation log_prob_x, responsibilities = self.expectation(x_m) # print x # print "r", responsibilities # maximisation acc_weights += responsibilities # make a column matrix from it responsibilities = responsibilities[:, np.newaxis] # print responsibilities acc_means += np.dot(responsibilities, x_m) # print (x_m - self.means)**2 # print "r", responsibilities.T # print "d", x_m - self.means # print "d**2", (x_m - self.means)**2 # print "ac", responsibilities * (x_m - self.means)**2 acc_covars += responsibilities * (x_m - self.means) ** 2 log_prob += log_prob_x n += 1 # print "LP", log_prob_x # if n > 100000: # break self.log_probs.append(log_prob / n) # print "Iter:", i, "Log prob of data: ", log_prob/n if i > 2 and abs(self.log_probs[-1] - self.log_probs[-2]) < self.thresh: break new_weights = (acc_weights + EPS) / (n + self.n_components * EPS) new_means = (acc_means + EPS) / ( acc_weights[:, np.newaxis] + self.n_components * EPS) new_covars = (acc_covars + EPS) / (acc_weights[:, np.newaxis] + self.n_components * EPS) + self.min_covar self.weights, self.means, self.covars = new_weights, new_means, new_covars
[docs] def save_model(self, file_name): """Save the GMM model as a pickle.""" f = open(file_name, 'w+') d = [self.n_features, self.n_components, self.thresh, self.min_covar, self.n_iter, self.weights, self.means, self.covars, ] pickle.dump(d, f) f.close()
[docs] def load_model(self, file_name): """Load the model from a pickle.load""" f = open(file_name, 'r') d = pickle.load(f) self.n_features, self.n_components, self.thresh, self.min_covar, self.n_iter, self.weights, self.means, self.covars = d f.close()