Geoff Ruddock

k-means clustering

from typing import Optional, Union, Tuple

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

%matplotlib inline
%reload_ext autoreload
%autoreload 2

from IPython.core.interactiveshell import InteractiveShell
from IPython.core.display import display, HTML

InteractiveShell.ast_node_interactivity = 'all'  # display all output cells
display(HTML("<style>.container { width:100% !important; }</style>"))  # make full width
pd.set_option('float_format', lambda x: '%.3f' % x)
pd.set_option('display.max_rows', 200)
np.set_printoptions(suppress=True, linewidth=180)

Helper functions

def plot_kmeans(data: np.ndarray, clusters: np.ndarray, assignments: Optional[np.ndarray]=None) -> None:
    """ Visualize data points, cluster centroids, and cluster assignments at a particular iteration of k-means. """
    fig, ax = plt.subplots(figsize=(5, 5), dpi=100)
    # Format grid
    ax.axvline(0, c='black', zorder=2)
    ax.axhline(0, c='black', zorder=2)
    ax.set_xlim(-4, 4)
    ax.set_ylim(-4, 4)
    # Plot data points
    if assignments is None:
        color = 'grey'
        color = np.asarray(['indianred', 'cornflowerblue'])[assignments]
    ax.scatter(*data.T, s=256, c=color, edgecolors='black', zorder=3)
    # Plot clusters
    ax.scatter(*clusters.T, s=256, c=['darkred', 'darkblue'], marker='s', edgecolors='black', zorder=3)
    for i, (x, y) in enumerate(data):
        ax.annotate(i+1, (x, y), c='white', size=10, ha='center', va='center')

Step by step

Let’s run k-means on a simple toy dataset and see what happens at each step.

data = np.asarray([
    [2, 2],
    [-1, 1],
    [3, 1],
    [0, -1],
    [-2, -2]

centroids = np.asarray([
    [-3, -1],
    [2, 1]
plot_kmeans(data, centroids)


Show the cluster assignment

We assign each observation to the cluster whose centroid is closest: $\pi(i) = \text{argmin}_{j} \ | x_i - c_j |$ using the Manhattam $\ell_1$ norm. This results in data points 2, 4 and 5 being assigned to Cluster A, and data points 1 and 3 being assigned to cluster B.

def assign_data_points(data, centroids, norm=1):
    distances = np.array([np.linalg.norm(data - centroid, ord=norm, axis=1) for centroid in centroids])
    return np.argmin(distances, axis=0)

assignments = assign_data_points(data, centroids, norm=1)

plot_kmeans(data, centroids, assignments)


Show the location of the new center

We compute each cluster centroid as the average of each feature value for that cluster’s members: $c_j = \frac{1}{n_j} \sum_i^{n_j} x_i$

def adjust_centroids(data, assignments, k):
    return np.array([np.mean(data[assignments == i], axis=0) for i in range(k)])

centroids = adjust_centroids(data, assignments, k=2)

plot_kmeans(data, centroids, assignments)


Will it terminate in one step?

The k-means algorithm terminates when the assignments made in an iteration do not change. In our case, the algorithm will not terminate, because in the next iteration data point #2 will be reassigned from Cluster B to cluster A. This occurs because we are using the $\ell_1$ norm, so the distance $d(x_2, A) = 2.5$ is smaller than $d(x_2, B) = 2.66$.

assignments = assign_data_points(data, centroids, norm=1)

plot_kmeans(data, centroids, assignments)


Rolling our own estimator

Base class

First we’ll define a base class which handles some of the boilerplate that is not specific to k-means: initializing clusters, looping, etc.

class Clustering(object):
    def __init__(self, k: int):
        """ Abtract base class for a clustering algorithm. """ = None
        self.k = k
        self.centroids = None
        self.assignments = None

    def init_centroids(self, method: Union[str, np.ndarray]):
        """ Initialize centroids according to a common method or by passing coordinates manually.

            method: the two supported methods are 'forgy' and 'random_partition'

        n_rows =[0]
        if isinstance(method, np.ndarray):
            self.centroids = method
        elif method == 'forgy':
            self.centroids =[np.random.randint(0,  n_rows + 1, size=self.k)]
        elif method == 'random_partition':
            self.assignments = np.random.randint(0, self.k+1, size=n_rows)
            self.centroids = self.adjust_centroids()
            raise NotImplementedError

    def assign_data_points(self):
        raise NotImplementedError

    def adjust_centroids(self):
        raise NotImplementedError

    def fit_predict(self, data: np.array, init_method='forgy', max_iters=200, *args, **kwargs) -> Tuple[np.array, np.array]:
        """ Iteratively assign data points to cluster and recompute cluster centers until converged.

            assignments: index of cluster to which each data point is assigned
            centroids: final centroids after convergence


        # initialize centroids according to specified method = data

        # loop until centroids do not change
        iter_idx = 0
        while iter_idx < max_iters:

            # assign each row to nearest cluster
            self.assignments = self.assign_data_points()

            # adjust cluster centers
            new_centroids = self.adjust_centroids()

            if (new_centroids == self.centroids).all():
                self.centroids = new_centroids
                iter_idx += 1
                print('.', end='')

        if iter_idx == max_iters:
            print('REACHED MAX ITERATIONS')

        return self.assignments, self.centroids


To implement k-means, we can inherit from our base Clustering class and implement the logic for assigning data points and adjusting centroids.

class KMeans(Clustering):

    def assign_data_points(self):
        distances = np.array([np.linalg.norm( - centroid, ord=1, axis=1) for centroid in self.centroids])
        assignments = np.argmin(distances, axis=0)
        return assignments

    def adjust_centroids(self):
        return np.array([np.mean([self.assignments == i], axis=0) for i in range(self.k)])


class KMedoids(Clustering):

    def assign_data_points(self):
        """ Assign data points to closest centroid, using either L1 or L2 norm """
        if self.norm == 'L1':
            ord = 1
            ord = None

        distances = np.array([np.linalg.norm( - centroid, ord=ord, axis=1) for centroid in self.centroids])
        assignments = np.argmin(distances, axis=0)
        return assignments

    def adjust_centroids(self) -> np.ndarray:
        """ Adjust centroids to be least dissimilar data point within, using either L1 or L2 norm """
        if self.norm == 'L1':
            metric = 'cityblock'
        elif self.norm == 'L2':
            metric = 'euclidean'
            raise NotImplementedError

        centroids = np.zeros(shape=(self.k,[1]))

        for j in range(self.k):
            cluster_members = np.unique([self.assignments == j], axis=0)
            avg_distances = np.mean(distance.cdist(cluster_members, cluster_members, metric=metric), axis=1)
            centroids[j] = cluster_members[avg_distances.argmin()]

        return centroids

comments powered by Disqus