Note: This is best viewed on NBViewer. It is part of a series on Dirichlet Processes and Nonparametric Bayes.
In an earlier notebook, I showed how we can fit the parameters of a Bayesian mixture model using a Gibbs sampler. The sampler defines a Markov chain that, in steady state, samples from the posterior distribution of the mixture model. To move the chain forward by one step we:
It turns out that we can derive a Gibbs sampler that just samples the assignments instead of the mixture weights and cluster means. This is known as a collapsed Gibbs sampler. If we integrate out the cluster means
By the conditional independence, we can factorize this marginal distribution
The two terms have intuitive explanations.
The second term is the predictive likelihood that point
Thus,
At each step of the collapsed sampler, we sample each
where the normalizing constant is
Let's write code for this Gibbs sampler!
%matplotlib inlineimport pandas as pdimport numpy as npimport matplotlib.pyplot as pltfrom collections import namedtuple, Counterfrom scipy import statsfrom numpy import random
np.random.seed(12345)
First, load the same dataset we used previously:
data = pd.Series.from_csv("clusters.csv")_=data.hist(bins=20)
Again, we want to define a state object and a function for updating the sufficient statistics of the state.
SuffStat = namedtuple('SuffStat', 'theta N')def initial_state(num_clusters=3, alpha=1.0): cluster_ids = range(num_clusters) state = { 'cluster_ids_': cluster_ids, 'data_': data, 'num_clusters_': num_clusters, 'cluster_variance_': .01, 'alpha_': alpha, 'hyperparameters_': { "mean": 0, "variance": 1, }, 'suffstats': {cid: None for cid in cluster_ids}, 'assignment': [random.choice(cluster_ids) for _ in data], 'pi': {cid: alpha / num_clusters for cid in cluster_ids}, } update_suffstats(state) return statedef update_suffstats(state): for cluster_id, N in Counter(state['assignment']).iteritems(): points_in_cluster = [x for x, cid in zip(state['data_'], state['assignment']) if cid == cluster_id ] mean = np.array(points_in_cluster).mean() state['suffstats'][cluster_id] = SuffStat(mean, N)
Next we define functions to compute the two terms of our marginal distribution over cluster assignments (as we derived above).
def log_predictive_likelihood(data_id, cluster_id, state): """Predictive likelihood of the data at data_id is generated by cluster_id given the currenbt state. From Section 2.4 of http://www.cs.ubc.ca/~murphyk/Papers/bayesGauss.pdf """ ss = state['suffstats'][cluster_id] hp_mean = state['hyperparameters_']['mean'] hp_var = state['hyperparameters_']['variance'] param_var = state['cluster_variance_'] x = state['data_'][data_id] return _log_predictive_likelihood(ss, hp_mean, hp_var, param_var, x)def _log_predictive_likelihood(ss, hp_mean, hp_var, param_var, x): posterior_sigma2 = 1 / (ss.N * 1. / param_var + 1. / hp_var) predictive_mu = posterior_sigma2 * (hp_mean * 1. / hp_var + ss.N * ss.theta * 1. / param_var) predictive_sigma2 = param_var + posterior_sigma2 predictive_sd = np.sqrt(predictive_sigma2) return stats.norm(predictive_mu, predictive_sd).logpdf(x)def log_cluster_assign_score(cluster_id, state): """Log-likelihood that a new point generated will be assigned to cluster_id given the current state. """ current_cluster_size = state['suffstats'][cluster_id].N num_clusters = state['num_clusters_'] alpha = state['alpha_'] return np.log(current_cluster_size + alpha * 1. / num_clusters)
Given these two functions, we can compute the posterior probability distribution for assignment of a given datapoint. This is the core of our collapsed Gibbs sampler.
To simplify the computation of things like
def cluster_assignment_distribution(data_id, state): """Compute the marginal distribution of cluster assignment for each cluster. """ scores = {} for cid in state['suffstats'].keys(): scores[cid] = log_predictive_likelihood(data_id, cid, state) scores[cid] += log_cluster_assign_score(cid, state) scores = {cid: np.exp(score) for cid, score in scores.iteritems()} normalization = 1.0/sum(scores.values()) scores = {cid: score*normalization for cid, score in scores.iteritems()} return scoresdef add_datapoint_to_suffstats(x, ss): """Add datapoint to sufficient stats for normal component """ return SuffStat((ss.theta*(ss.N)+x)/(ss.N+1), ss.N+1)def remove_datapoint_from_suffstats(x, ss): """Remove datapoint from sufficient stats for normal component """ return SuffStat((ss.theta*(ss.N)-x*1.0)/(ss.N-1), ss.N-1)
Finally, we're ready to create a function that takes a Gibbs step on the state. For each datapoint, it
def gibbs_step(state): pairs = zip(state['data_'], state['assignment']) for data_id, (datapoint, cid) in enumerate(pairs): state['suffstats'][cid] = remove_datapoint_from_suffstats(datapoint, state['suffstats'][cid]) scores = cluster_assignment_distribution(data_id, state).items() labels, scores = zip(*scores) cid = random.choice(labels, p=scores) state['assignment'][data_id] = cid state['suffstats'][cid] = add_datapoint_to_suffstats(state['data_'][data_id], state['suffstats'][cid])
Here's our old function to plot the assignments.
def plot_clusters(state): gby = pd.DataFrame({ 'data': state['data_'], 'assignment': state['assignment']} ).groupby(by='assignment')['data'] hist_data = [gby.get_group(cid).tolist() for cid in gby.groups.keys()] plt.hist(hist_data, bins=20, histtype='stepfilled', alpha=.5 )
Randomly assign the datapoints to a cluster to start.
state = initial_state()plot_clusters(state)
Look what happens to the assignments after just one Gibbs step!
gibbs_step(state)plot_clusters(state)
Two:
gibbs_step(state)plot_clusters(state)
After just two steps, our assignments look really good. We can run it a few more times and see the assignments again.
for _ in range(20): gibbs_step(state)plot_clusters(state)
It turns out, the collapsed Gibbs sampler for mixture models is almost identical in the context of a nonparametric model. This model uses a Dirichlet process prior instead of a Dirichlet distribution prior. It doesn't require us to specify how many clusters we are looking for in our data.
The cluster assignment score changes slightly. It is proportional to
def log_cluster_assign_score_dp(cluster_id, state): """Log-likelihood that a new point generated will be assigned to cluster_id given the current state. """ if cluster_id == "new": return np.log(state["alpha_"]) else: return np.log(state['suffstats'][cluster_id].N)
The predictive likelihood remains the same for known clusters. However, we need to know the likelihood of assigning a datapoint to a new cluster. In this case, we fall back on the hyperparameters to get:
def log_predictive_likelihood_dp(data_id, cluster_id, state): """Predictive likelihood of the data at data_id is generated by cluster_id given the currenbt state. From Section 2.4 of http://www.cs.ubc.ca/~murphyk/Papers/bayesGauss.pdf """ if cluster_id == "new": ss = SuffStat(0, 0) else: ss = state['suffstats'][cluster_id] hp_mean = state['hyperparameters_']['mean'] hp_var = state['hyperparameters_']['variance'] param_var = state['cluster_variance_'] x = state['data_'][data_id] return _log_predictive_likelihood(ss, hp_mean, hp_var, param_var, x)
Given this, we can define the marginal distribution over cluster assignment. The only change is that the "new
" state enters in the distribution.
def cluster_assignment_distribution_dp(data_id, state): """Compute the marginal distribution of cluster assignment for each cluster. """ scores = {} cluster_ids = state['suffstats'].keys() + ['new'] for cid in cluster_ids: scores[cid] = log_predictive_likelihood_dp(data_id, cid, state) scores[cid] += log_cluster_assign_score_dp(cid, state) scores = {cid: np.exp(score) for cid, score in scores.iteritems()} normalization = 1.0/sum(scores.values()) scores = {cid: score*normalization for cid, score in scores.iteritems()} return scores
We also need to be able to create a new cluster when "new
" is drawn, and destroy a cluster if its emptied.
def create_cluster(state): state["num_clusters_"] += 1 cluster_id = max(state['suffstats'].keys()) + 1 state['suffstats'][cluster_id] = SuffStat(0, 0) state['cluster_ids_'].append(cluster_id) return cluster_iddef destroy_cluster(state, cluster_id): state["num_clusters_"] = 1 del state['suffstats'][cluster_id] state['cluster_ids_'].remove(cluster_id) def prune_clusters(state): for cid in state['cluster_ids_']: if state['suffstats'][cid].N == 0: destroy_cluster(state, cid)
Finally, we can define the gibbs_step_dp
function. It's nearly identical to the earlier gibbs_step
function except
cluster_assignment_distribution_dp
new
".For clarity, I split out the code for sampling assignment to its own function.
def sample_assignment(data_id, state): """Sample new assignment from marginal distribution. If cluster is "`new`", create a new cluster. """ scores = cluster_assignment_distribution_dp(data_id, state).items() labels, scores = zip(*scores) cid = random.choice(labels, p=scores) if cid == "new": return create_cluster(state) else: return int(cid)def gibbs_step_dp(state): """Collapsed Gibbs sampler for Dirichlet Process Mixture Model """ pairs = zip(state['data_'], state['assignment']) for data_id, (datapoint, cid) in enumerate(pairs): state['suffstats'][cid] = remove_datapoint_from_suffstats(datapoint, state['suffstats'][cid]) prune_clusters(state) cid = sample_assignment(data_id, state) state['assignment'][data_id] = cid state['suffstats'][cid] = add_datapoint_to_suffstats(state['data_'][data_id], state['suffstats'][cid])
This time, we will start by randomly assigning our data to two clusters.
state = initial_state(num_clusters=2, alpha=0.1)plot_clusters(state)
Here's what happens when we run our Gibbs sampler once.
gibbs_step_dp(state)plot_clusters(state)
We went from 2 to 4 clusters!
After 100 iterations:
for _ in range(99): gibbs_step_dp(state)plot_clusters(state)
After 100 iterations, our assignment looks correct! We went back to 3 clusters.
We can sample the mixture weights, if we need them, using the "Conditional Distribution of Mixture Weights" derived here.
ss = state['suffstats']alpha = [ss[cid].N + state['alpha_'] / state['num_clusters_'] for cid in state['cluster_ids_']]stats.dirichlet(alpha).rvs(size=1).flatten()
array([ 0.21330625, 0.29838101, 0.48831275])
We can also sample the cluster means using the method we derived earlier:
for cluster_id in state['cluster_ids_']: cluster_var = state['cluster_variance_'] hp_mean = state['hyperparameters_']['mean'] hp_var = state['hyperparameters_']['variance'] ss = state['suffstats'][cluster_id] numerator = hp_mean / hp_var + ss.theta * ss.N / cluster_var denominator = (1.0 / hp_var + ss.N / cluster_var) posterior_mu = numerator / denominator posterior_var = 1.0 / denominator mean = stats.norm(posterior_mu, np.sqrt(posterior_var)).rvs() print "cluster_id:", cluster_id, "mean", mean
cluster_id: 1 mean -0.0176257860235cluster_id: 2 mean -0.400581819532cluster_id: 3 mean 0.600302879661
Much thanks to Erik Sudderth's excellent introduction to nonparametric Bayes in Chapter 2 of his dissertation. Algorithms 2.2 and 2.3 in that piece are the clearest formulation of collapsed Gibbs sampling for mixture models that I have come across.
联系客服