Module deepposekit.annotate.KMeansSampler

Expand source code
# -*- coding: utf-8 -*-
# Copyright 2018-2019 Jacob M. Graving <jgraving@gmail.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

#    http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import numpy as np
import matplotlib.pyplot as plt
from sklearn.cluster import MiniBatchKMeans
from sklearn.utils.validation import check_is_fitted
from deepposekit.annotate.utils.image import check_image_array


class KMeansSampler(MiniBatchKMeans):
    def __init__(
        self,
        n_clusters=10,
        init="k-means++",
        max_iter=100,
        batch_size=100,
        verbose=0,
        compute_labels=True,
        random_state=None,
        tol=0.0,
        max_no_improvement=10,
        init_size=None,
        n_init=3,
        reassignment_ratio=0.01,
    ):

        super(KMeansSampler, self).__init__(
            n_clusters=n_clusters,
            init=init,
            max_iter=max_iter,
            batch_size=batch_size,
            verbose=verbose,
            compute_labels=compute_labels,
            random_state=random_state,
            tol=tol,
            max_no_improvement=max_no_improvement,
            init_size=init_size,
            n_init=n_init,
            reassignment_ratio=reassignment_ratio,
        )
        self._fit = super(KMeansSampler, self).fit
        self._partial_fit = super(KMeansSampler, self).partial_fit
        self._predict = super(KMeansSampler, self).predict

    def sample_idx(self, X, n_samples_per_label=100):
        labels = self.predict(X)

        X_new = []
        y_new = []
        index = np.arange(X.shape[0])
        for idx in np.unique(labels):
            label_idx = index[labels == idx]
            if label_idx.shape[0] > 0:
                if label_idx.shape[0] < n_samples_per_label:
                    n_samples = label_idx.shape[0]
                else:
                    n_samples = n_samples_per_label
                sample_idx = np.random.choice(label_idx, n_samples, replace=False)
                X_new.append(sample_idx)
                y_new.append(np.ones_like(sample_idx, dtype=np.int32) * idx)
        X_new = np.concatenate(X_new)
        y_new = np.concatenate(y_new)

        return X_new, y_new

    def sample_data(self, X, n_samples_per_label=100):
        """Sample evenly from each cluster for X.
        Parameters
        ----------
        X : array-like, shape = [n_samples, rows, cols, channels]
            Coordinates of the data points to cluster.
        n_samples_per_label : int
            Number of samples per cluster label.
            If X does not contain enough samples in
            a cluster, all samples for that cluster
            are used without replacement.
        Returns
        -------
        X_new : array-like, shape = [n_samples, rows, cols, channels]
            The sampled data
        y_new : array-like, shape = [n_samples,]
            The cluster labels each sample

        """
        labels = self.predict(X)

        X_new, y_new = self.sample_idx(X, n_samples_per_label)
        X_new = X[X_new]
        return X_new, y_new

    def fit(self, X, y=None):
        """Compute the centroids on X by chunking it into mini-batches.
        Parameters
        ----------
        X : array-like or sparse matrix, shape = [n_samples, rows, cols, channels]
            Training instances to cluster.
        y : Ignored
        """
        X = check_image_array(self, X)

        return self._fit(X, y)

    def partial_fit(self, X, y=None):
        """Update k means estimate on a single mini-batch X.
        Parameters
        ----------
        X : array-like, shape = [n_samples, rows, cols, channels]
            Coordinates of the data points to cluster.
        y : Ignored
        """
        X = check_image_array(self, X)

        return self._partial_fit(X, y)

    def predict(self, X):
        """Predict the closest cluster each sample in X belongs to.
        In the vector quantization literature, `cluster_centers_` is called
        the code book and each value returned by `predict` is the index of
        the closest code in the code book.
        Parameters
        ----------
        X : {array-like, sparse matrix}, shape = [n_samples, rows, cols, channels]
            New data to predict.
        Returns
        -------
        labels : array, shape [n_samples,]
            Index of the cluster each sample belongs to.
        """
        check_is_fitted(self, "cluster_centers_")
        X = check_image_array(self, X)

        return self._predict(X)

    def plot_centers(self, n_rows=2, figsize=(20, 20)):

        check_is_fitted(self, "cluster_centers_")

        n_cols = self.n_clusters // n_rows

        mean = self.cluster_centers_.mean(0)
        centers = self.cluster_centers_ - mean[None, ...]
        centers = centers.reshape(n_rows, n_cols, self.rows, self.cols, self.channels)
        centers = centers.swapaxes(1, 2).reshape(
            n_rows * self.rows, n_cols * self.cols, self.channels
        )
        if self.channels == 1:
            centers = centers[..., 0]
        fig = plt.figure(figsize=figsize)
        plt.imshow(centers, cmap="seismic", vmin=-255, vmax=255)

        return fig

Classes

class KMeansSampler (n_clusters=10, init='k-means++', max_iter=100, batch_size=100, verbose=0, compute_labels=True, random_state=None, tol=0.0, max_no_improvement=10, init_size=None, n_init=3, reassignment_ratio=0.01)

Mini-Batch K-Means clustering

Read more in the :ref:User Guide <mini_batch_kmeans>.

Parameters

n_clusters : int, optional, default: 8
The number of clusters to form as well as the number of centroids to generate.
init : {'k-means++', 'random' or an ndarray}, default: 'k-means++'

Method for initialization, defaults to 'k-means++':

'k-means++' : selects initial cluster centers for k-mean clustering in a smart way to speed up convergence. See section Notes in k_init for more details.

'random': choose k observations (rows) at random from data for the initial centroids.

If an ndarray is passed, it should be of shape (n_clusters, n_features) and gives the initial centers.

max_iter : int, optional
Maximum number of iterations over the complete dataset before stopping independently of any early stopping criterion heuristics.
batch_size : int, optional, default: 100
Size of the mini batches.
verbose : boolean, optional
Verbosity mode.
compute_labels : boolean, default=True
Compute label assignment and inertia for the complete dataset once the minibatch optimization has converged in fit.
random_state : int, RandomState instance or None (default)
Determines random number generation for centroid initialization and random reassignment. Use an int to make the randomness deterministic. See :term:Glossary <random_state>.
tol : float, default: 0.0

Control early stopping based on the relative center changes as measured by a smoothed, variance-normalized of the mean center squared position changes. This early stopping heuristics is closer to the one used for the batch variant of the algorithms but induces a slight computational and memory overhead over the inertia heuristic.

To disable convergence detection based on normalized center change, set tol to 0.0 (default).

max_no_improvement : int, default: 10

Control early stopping based on the consecutive number of mini batches that does not yield an improvement on the smoothed inertia.

To disable convergence detection based on inertia, set max_no_improvement to None.

init_size : int, optional, default: 3 * batch_size
Number of samples to randomly sample for speeding up the initialization (sometimes at the expense of accuracy): the only algorithm is initialized by running a batch KMeans on a random subset of the data. This needs to be larger than n_clusters.
n_init : int, default=3
Number of random initializations that are tried. In contrast to KMeans, the algorithm is only run once, using the best of the n_init initializations as measured by inertia.
reassignment_ratio : float, default: 0.01
Control the fraction of the maximum number of counts for a center to be reassigned. A higher value means that low count centers are more easily reassigned, which means that the model will take longer to converge, but should converge in a better clustering.

Attributes

cluster_centers_ : array, [n_clusters, n_features]
Coordinates of cluster centers

labels_ : Labels of each point (if compute_labels is set to True).

inertia_ : float
The value of the inertia criterion associated with the chosen partition (if compute_labels is set to True). The inertia is defined as the sum of square distances of samples to their nearest neighbor.

Examples

>>> from sklearn.cluster import MiniBatchKMeans
>>> import numpy as np
>>> X = np.array([[1, 2], [1, 4], [1, 0],
...               [4, 2], [4, 0], [4, 4],
...               [4, 5], [0, 1], [2, 2],
...               [3, 2], [5, 5], [1, -1]])
>>> # manually fit on batches
>>> kmeans = MiniBatchKMeans(n_clusters=2,
...                          random_state=0,
...                          batch_size=6)
>>> kmeans = kmeans.partial_fit(X[0:6,:])
>>> kmeans = kmeans.partial_fit(X[6:12,:])
>>> kmeans.cluster_centers_
array([[1, 1],


   [3, 4]])

>>> kmeans.predict([[0, 0], [4, 4]])
array([0, 1], dtype=int32)
>>> # fit on the whole data
>>> kmeans = MiniBatchKMeans(n_clusters=2,
...                          random_state=0,
...                          batch_size=6,
...                          max_iter=10).fit(X)
>>> kmeans.cluster_centers_
array([[3.95918367, 2.40816327],


   [1.12195122, 1.3902439 ]])

>>> kmeans.predict([[0, 0], [4, 4]])
array([1, 0], dtype=int32)

See also

KMeans The classic implementation of the clustering method based on the Lloyd's algorithm. It consumes the whole set of input data at each iteration.

Notes

See https://www.eecs.tufts.edu/~dsculley/papers/fastkmeans.pdf

Expand source code
class KMeansSampler(MiniBatchKMeans):
    def __init__(
        self,
        n_clusters=10,
        init="k-means++",
        max_iter=100,
        batch_size=100,
        verbose=0,
        compute_labels=True,
        random_state=None,
        tol=0.0,
        max_no_improvement=10,
        init_size=None,
        n_init=3,
        reassignment_ratio=0.01,
    ):

        super(KMeansSampler, self).__init__(
            n_clusters=n_clusters,
            init=init,
            max_iter=max_iter,
            batch_size=batch_size,
            verbose=verbose,
            compute_labels=compute_labels,
            random_state=random_state,
            tol=tol,
            max_no_improvement=max_no_improvement,
            init_size=init_size,
            n_init=n_init,
            reassignment_ratio=reassignment_ratio,
        )
        self._fit = super(KMeansSampler, self).fit
        self._partial_fit = super(KMeansSampler, self).partial_fit
        self._predict = super(KMeansSampler, self).predict

    def sample_idx(self, X, n_samples_per_label=100):
        labels = self.predict(X)

        X_new = []
        y_new = []
        index = np.arange(X.shape[0])
        for idx in np.unique(labels):
            label_idx = index[labels == idx]
            if label_idx.shape[0] > 0:
                if label_idx.shape[0] < n_samples_per_label:
                    n_samples = label_idx.shape[0]
                else:
                    n_samples = n_samples_per_label
                sample_idx = np.random.choice(label_idx, n_samples, replace=False)
                X_new.append(sample_idx)
                y_new.append(np.ones_like(sample_idx, dtype=np.int32) * idx)
        X_new = np.concatenate(X_new)
        y_new = np.concatenate(y_new)

        return X_new, y_new

    def sample_data(self, X, n_samples_per_label=100):
        """Sample evenly from each cluster for X.
        Parameters
        ----------
        X : array-like, shape = [n_samples, rows, cols, channels]
            Coordinates of the data points to cluster.
        n_samples_per_label : int
            Number of samples per cluster label.
            If X does not contain enough samples in
            a cluster, all samples for that cluster
            are used without replacement.
        Returns
        -------
        X_new : array-like, shape = [n_samples, rows, cols, channels]
            The sampled data
        y_new : array-like, shape = [n_samples,]
            The cluster labels each sample

        """
        labels = self.predict(X)

        X_new, y_new = self.sample_idx(X, n_samples_per_label)
        X_new = X[X_new]
        return X_new, y_new

    def fit(self, X, y=None):
        """Compute the centroids on X by chunking it into mini-batches.
        Parameters
        ----------
        X : array-like or sparse matrix, shape = [n_samples, rows, cols, channels]
            Training instances to cluster.
        y : Ignored
        """
        X = check_image_array(self, X)

        return self._fit(X, y)

    def partial_fit(self, X, y=None):
        """Update k means estimate on a single mini-batch X.
        Parameters
        ----------
        X : array-like, shape = [n_samples, rows, cols, channels]
            Coordinates of the data points to cluster.
        y : Ignored
        """
        X = check_image_array(self, X)

        return self._partial_fit(X, y)

    def predict(self, X):
        """Predict the closest cluster each sample in X belongs to.
        In the vector quantization literature, `cluster_centers_` is called
        the code book and each value returned by `predict` is the index of
        the closest code in the code book.
        Parameters
        ----------
        X : {array-like, sparse matrix}, shape = [n_samples, rows, cols, channels]
            New data to predict.
        Returns
        -------
        labels : array, shape [n_samples,]
            Index of the cluster each sample belongs to.
        """
        check_is_fitted(self, "cluster_centers_")
        X = check_image_array(self, X)

        return self._predict(X)

    def plot_centers(self, n_rows=2, figsize=(20, 20)):

        check_is_fitted(self, "cluster_centers_")

        n_cols = self.n_clusters // n_rows

        mean = self.cluster_centers_.mean(0)
        centers = self.cluster_centers_ - mean[None, ...]
        centers = centers.reshape(n_rows, n_cols, self.rows, self.cols, self.channels)
        centers = centers.swapaxes(1, 2).reshape(
            n_rows * self.rows, n_cols * self.cols, self.channels
        )
        if self.channels == 1:
            centers = centers[..., 0]
        fig = plt.figure(figsize=figsize)
        plt.imshow(centers, cmap="seismic", vmin=-255, vmax=255)

        return fig

Ancestors

  • sklearn.cluster.k_means_.MiniBatchKMeans
  • sklearn.cluster.k_means_.KMeans
  • sklearn.base.BaseEstimator
  • sklearn.base.ClusterMixin
  • sklearn.base.TransformerMixin

Methods

def fit(self, X, y=None)

Compute the centroids on X by chunking it into mini-batches. Parameters


X : array-like or sparse matrix, shape = [n_samples, rows, cols, channels]
Training instances to cluster.
y : Ignored
 
Expand source code
def fit(self, X, y=None):
    """Compute the centroids on X by chunking it into mini-batches.
    Parameters
    ----------
    X : array-like or sparse matrix, shape = [n_samples, rows, cols, channels]
        Training instances to cluster.
    y : Ignored
    """
    X = check_image_array(self, X)

    return self._fit(X, y)
def partial_fit(self, X, y=None)

Update k means estimate on a single mini-batch X. Parameters


X : array-like, shape = [n_samples, rows, cols, channels]
Coordinates of the data points to cluster.
y : Ignored
 
Expand source code
def partial_fit(self, X, y=None):
    """Update k means estimate on a single mini-batch X.
    Parameters
    ----------
    X : array-like, shape = [n_samples, rows, cols, channels]
        Coordinates of the data points to cluster.
    y : Ignored
    """
    X = check_image_array(self, X)

    return self._partial_fit(X, y)
def plot_centers(self, n_rows=2, figsize=(20, 20))
Expand source code
def plot_centers(self, n_rows=2, figsize=(20, 20)):

    check_is_fitted(self, "cluster_centers_")

    n_cols = self.n_clusters // n_rows

    mean = self.cluster_centers_.mean(0)
    centers = self.cluster_centers_ - mean[None, ...]
    centers = centers.reshape(n_rows, n_cols, self.rows, self.cols, self.channels)
    centers = centers.swapaxes(1, 2).reshape(
        n_rows * self.rows, n_cols * self.cols, self.channels
    )
    if self.channels == 1:
        centers = centers[..., 0]
    fig = plt.figure(figsize=figsize)
    plt.imshow(centers, cmap="seismic", vmin=-255, vmax=255)

    return fig
def predict(self, X)

Predict the closest cluster each sample in X belongs to. In the vector quantization literature, cluster_centers_ is called the code book and each value returned by predict is the index of the closest code in the code book. Parameters


X : {array-like, sparse matrix}, shape = [n_samples, rows, cols, channels]
New data to predict.

Returns

labels : array, shape [n_samples,]
Index of the cluster each sample belongs to.
Expand source code
def predict(self, X):
    """Predict the closest cluster each sample in X belongs to.
    In the vector quantization literature, `cluster_centers_` is called
    the code book and each value returned by `predict` is the index of
    the closest code in the code book.
    Parameters
    ----------
    X : {array-like, sparse matrix}, shape = [n_samples, rows, cols, channels]
        New data to predict.
    Returns
    -------
    labels : array, shape [n_samples,]
        Index of the cluster each sample belongs to.
    """
    check_is_fitted(self, "cluster_centers_")
    X = check_image_array(self, X)

    return self._predict(X)
def sample_data(self, X, n_samples_per_label=100)

Sample evenly from each cluster for X. Parameters


X : array-like, shape = [n_samples, rows, cols, channels]
Coordinates of the data points to cluster.
n_samples_per_label : int
Number of samples per cluster label. If X does not contain enough samples in a cluster, all samples for that cluster are used without replacement.

Returns

X_new : array-like, shape = [n_samples, rows, cols, channels]
The sampled data
y_new : array-like, shape = [n_samples,]
The cluster labels each sample
Expand source code
def sample_data(self, X, n_samples_per_label=100):
    """Sample evenly from each cluster for X.
    Parameters
    ----------
    X : array-like, shape = [n_samples, rows, cols, channels]
        Coordinates of the data points to cluster.
    n_samples_per_label : int
        Number of samples per cluster label.
        If X does not contain enough samples in
        a cluster, all samples for that cluster
        are used without replacement.
    Returns
    -------
    X_new : array-like, shape = [n_samples, rows, cols, channels]
        The sampled data
    y_new : array-like, shape = [n_samples,]
        The cluster labels each sample

    """
    labels = self.predict(X)

    X_new, y_new = self.sample_idx(X, n_samples_per_label)
    X_new = X[X_new]
    return X_new, y_new
def sample_idx(self, X, n_samples_per_label=100)
Expand source code
def sample_idx(self, X, n_samples_per_label=100):
    labels = self.predict(X)

    X_new = []
    y_new = []
    index = np.arange(X.shape[0])
    for idx in np.unique(labels):
        label_idx = index[labels == idx]
        if label_idx.shape[0] > 0:
            if label_idx.shape[0] < n_samples_per_label:
                n_samples = label_idx.shape[0]
            else:
                n_samples = n_samples_per_label
            sample_idx = np.random.choice(label_idx, n_samples, replace=False)
            X_new.append(sample_idx)
            y_new.append(np.ones_like(sample_idx, dtype=np.int32) * idx)
    X_new = np.concatenate(X_new)
    y_new = np.concatenate(y_new)

    return X_new, y_new