Module deepposekit.annotate.KMeansSampler
Expand source code
# -*- coding: utf-8 -*-
# Copyright 2018-2019 Jacob M. Graving <jgraving@gmail.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import matplotlib.pyplot as plt
from sklearn.cluster import MiniBatchKMeans
from sklearn.utils.validation import check_is_fitted
from deepposekit.annotate.utils.image import check_image_array
class KMeansSampler(MiniBatchKMeans):
def __init__(
self,
n_clusters=10,
init="k-means++",
max_iter=100,
batch_size=100,
verbose=0,
compute_labels=True,
random_state=None,
tol=0.0,
max_no_improvement=10,
init_size=None,
n_init=3,
reassignment_ratio=0.01,
):
super(KMeansSampler, self).__init__(
n_clusters=n_clusters,
init=init,
max_iter=max_iter,
batch_size=batch_size,
verbose=verbose,
compute_labels=compute_labels,
random_state=random_state,
tol=tol,
max_no_improvement=max_no_improvement,
init_size=init_size,
n_init=n_init,
reassignment_ratio=reassignment_ratio,
)
self._fit = super(KMeansSampler, self).fit
self._partial_fit = super(KMeansSampler, self).partial_fit
self._predict = super(KMeansSampler, self).predict
def sample_idx(self, X, n_samples_per_label=100):
labels = self.predict(X)
X_new = []
y_new = []
index = np.arange(X.shape[0])
for idx in np.unique(labels):
label_idx = index[labels == idx]
if label_idx.shape[0] > 0:
if label_idx.shape[0] < n_samples_per_label:
n_samples = label_idx.shape[0]
else:
n_samples = n_samples_per_label
sample_idx = np.random.choice(label_idx, n_samples, replace=False)
X_new.append(sample_idx)
y_new.append(np.ones_like(sample_idx, dtype=np.int32) * idx)
X_new = np.concatenate(X_new)
y_new = np.concatenate(y_new)
return X_new, y_new
def sample_data(self, X, n_samples_per_label=100):
"""Sample evenly from each cluster for X.
Parameters
----------
X : array-like, shape = [n_samples, rows, cols, channels]
Coordinates of the data points to cluster.
n_samples_per_label : int
Number of samples per cluster label.
If X does not contain enough samples in
a cluster, all samples for that cluster
are used without replacement.
Returns
-------
X_new : array-like, shape = [n_samples, rows, cols, channels]
The sampled data
y_new : array-like, shape = [n_samples,]
The cluster labels each sample
"""
labels = self.predict(X)
X_new, y_new = self.sample_idx(X, n_samples_per_label)
X_new = X[X_new]
return X_new, y_new
def fit(self, X, y=None):
"""Compute the centroids on X by chunking it into mini-batches.
Parameters
----------
X : array-like or sparse matrix, shape = [n_samples, rows, cols, channels]
Training instances to cluster.
y : Ignored
"""
X = check_image_array(self, X)
return self._fit(X, y)
def partial_fit(self, X, y=None):
"""Update k means estimate on a single mini-batch X.
Parameters
----------
X : array-like, shape = [n_samples, rows, cols, channels]
Coordinates of the data points to cluster.
y : Ignored
"""
X = check_image_array(self, X)
return self._partial_fit(X, y)
def predict(self, X):
"""Predict the closest cluster each sample in X belongs to.
In the vector quantization literature, `cluster_centers_` is called
the code book and each value returned by `predict` is the index of
the closest code in the code book.
Parameters
----------
X : {array-like, sparse matrix}, shape = [n_samples, rows, cols, channels]
New data to predict.
Returns
-------
labels : array, shape [n_samples,]
Index of the cluster each sample belongs to.
"""
check_is_fitted(self, "cluster_centers_")
X = check_image_array(self, X)
return self._predict(X)
def plot_centers(self, n_rows=2, figsize=(20, 20)):
check_is_fitted(self, "cluster_centers_")
n_cols = self.n_clusters // n_rows
mean = self.cluster_centers_.mean(0)
centers = self.cluster_centers_ - mean[None, ...]
centers = centers.reshape(n_rows, n_cols, self.rows, self.cols, self.channels)
centers = centers.swapaxes(1, 2).reshape(
n_rows * self.rows, n_cols * self.cols, self.channels
)
if self.channels == 1:
centers = centers[..., 0]
fig = plt.figure(figsize=figsize)
plt.imshow(centers, cmap="seismic", vmin=-255, vmax=255)
return fig
Classes
class KMeansSampler (n_clusters=10, init='k-means++', max_iter=100, batch_size=100, verbose=0, compute_labels=True, random_state=None, tol=0.0, max_no_improvement=10, init_size=None, n_init=3, reassignment_ratio=0.01)
-
Mini-Batch K-Means clustering
Read more in the :ref:
User Guide <mini_batch_kmeans>
.Parameters
n_clusters
:int
, optional, default:8
- The number of clusters to form as well as the number of centroids to generate.
init
: {'k
-means
++'
,'random'
oran
ndarray
}, default:'k
-means
++'
-
Method for initialization, defaults to 'k-means++':
'k-means++' : selects initial cluster centers for k-mean clustering in a smart way to speed up convergence. See section Notes in k_init for more details.
'random': choose k observations (rows) at random from data for the initial centroids.
If an ndarray is passed, it should be of shape (n_clusters, n_features) and gives the initial centers.
max_iter
:int
, optional- Maximum number of iterations over the complete dataset before stopping independently of any early stopping criterion heuristics.
batch_size
:int
, optional, default:100
- Size of the mini batches.
verbose
:boolean
, optional- Verbosity mode.
compute_labels
:boolean
, default=True
- Compute label assignment and inertia for the complete dataset once the minibatch optimization has converged in fit.
random_state
:int
,RandomState
instance
orNone
(default)- Determines random number generation for centroid initialization and
random reassignment. Use an int to make the randomness deterministic.
See :term:
Glossary <random_state>
. tol
:float
, default:0.0
-
Control early stopping based on the relative center changes as measured by a smoothed, variance-normalized of the mean center squared position changes. This early stopping heuristics is closer to the one used for the batch variant of the algorithms but induces a slight computational and memory overhead over the inertia heuristic.
To disable convergence detection based on normalized center change, set tol to 0.0 (default).
max_no_improvement
:int
, default:10
-
Control early stopping based on the consecutive number of mini batches that does not yield an improvement on the smoothed inertia.
To disable convergence detection based on inertia, set max_no_improvement to None.
init_size
:int
, optional, default:3
*batch_size
- Number of samples to randomly sample for speeding up the initialization (sometimes at the expense of accuracy): the only algorithm is initialized by running a batch KMeans on a random subset of the data. This needs to be larger than n_clusters.
n_init
:int
, default=3
- Number of random initializations that are tried.
In contrast to KMeans, the algorithm is only run once, using the
best of the
n_init
initializations as measured by inertia. reassignment_ratio
:float
, default:0.01
- Control the fraction of the maximum number of counts for a center to be reassigned. A higher value means that low count centers are more easily reassigned, which means that the model will take longer to converge, but should converge in a better clustering.
Attributes
cluster_centers_
:array
, [n_clusters
,n_features
]- Coordinates of cluster centers
labels_ : Labels of each point (if compute_labels is set to True).
inertia_
:float
- The value of the inertia criterion associated with the chosen partition (if compute_labels is set to True). The inertia is defined as the sum of square distances of samples to their nearest neighbor.
Examples
>>> from sklearn.cluster import MiniBatchKMeans >>> import numpy as np >>> X = np.array([[1, 2], [1, 4], [1, 0], ... [4, 2], [4, 0], [4, 4], ... [4, 5], [0, 1], [2, 2], ... [3, 2], [5, 5], [1, -1]]) >>> # manually fit on batches >>> kmeans = MiniBatchKMeans(n_clusters=2, ... random_state=0, ... batch_size=6) >>> kmeans = kmeans.partial_fit(X[0:6,:]) >>> kmeans = kmeans.partial_fit(X[6:12,:]) >>> kmeans.cluster_centers_ array([[1, 1], [3, 4]]) >>> kmeans.predict([[0, 0], [4, 4]]) array([0, 1], dtype=int32) >>> # fit on the whole data >>> kmeans = MiniBatchKMeans(n_clusters=2, ... random_state=0, ... batch_size=6, ... max_iter=10).fit(X) >>> kmeans.cluster_centers_ array([[3.95918367, 2.40816327], [1.12195122, 1.3902439 ]]) >>> kmeans.predict([[0, 0], [4, 4]]) array([1, 0], dtype=int32)
See also
KMeans
The classic implementation of the clustering method based on the
Lloyd's algorithm. It consumes the whole set of input data at each
iteration.
Notes
See https://www.eecs.tufts.edu/~dsculley/papers/fastkmeans.pdf
Expand source code
class KMeansSampler(MiniBatchKMeans): def __init__( self, n_clusters=10, init="k-means++", max_iter=100, batch_size=100, verbose=0, compute_labels=True, random_state=None, tol=0.0, max_no_improvement=10, init_size=None, n_init=3, reassignment_ratio=0.01, ): super(KMeansSampler, self).__init__( n_clusters=n_clusters, init=init, max_iter=max_iter, batch_size=batch_size, verbose=verbose, compute_labels=compute_labels, random_state=random_state, tol=tol, max_no_improvement=max_no_improvement, init_size=init_size, n_init=n_init, reassignment_ratio=reassignment_ratio, ) self._fit = super(KMeansSampler, self).fit self._partial_fit = super(KMeansSampler, self).partial_fit self._predict = super(KMeansSampler, self).predict def sample_idx(self, X, n_samples_per_label=100): labels = self.predict(X) X_new = [] y_new = [] index = np.arange(X.shape[0]) for idx in np.unique(labels): label_idx = index[labels == idx] if label_idx.shape[0] > 0: if label_idx.shape[0] < n_samples_per_label: n_samples = label_idx.shape[0] else: n_samples = n_samples_per_label sample_idx = np.random.choice(label_idx, n_samples, replace=False) X_new.append(sample_idx) y_new.append(np.ones_like(sample_idx, dtype=np.int32) * idx) X_new = np.concatenate(X_new) y_new = np.concatenate(y_new) return X_new, y_new def sample_data(self, X, n_samples_per_label=100): """Sample evenly from each cluster for X. Parameters ---------- X : array-like, shape = [n_samples, rows, cols, channels] Coordinates of the data points to cluster. n_samples_per_label : int Number of samples per cluster label. If X does not contain enough samples in a cluster, all samples for that cluster are used without replacement. Returns ------- X_new : array-like, shape = [n_samples, rows, cols, channels] The sampled data y_new : array-like, shape = [n_samples,] The cluster labels each sample """ labels = self.predict(X) X_new, y_new = self.sample_idx(X, n_samples_per_label) X_new = X[X_new] return X_new, y_new def fit(self, X, y=None): """Compute the centroids on X by chunking it into mini-batches. Parameters ---------- X : array-like or sparse matrix, shape = [n_samples, rows, cols, channels] Training instances to cluster. y : Ignored """ X = check_image_array(self, X) return self._fit(X, y) def partial_fit(self, X, y=None): """Update k means estimate on a single mini-batch X. Parameters ---------- X : array-like, shape = [n_samples, rows, cols, channels] Coordinates of the data points to cluster. y : Ignored """ X = check_image_array(self, X) return self._partial_fit(X, y) def predict(self, X): """Predict the closest cluster each sample in X belongs to. In the vector quantization literature, `cluster_centers_` is called the code book and each value returned by `predict` is the index of the closest code in the code book. Parameters ---------- X : {array-like, sparse matrix}, shape = [n_samples, rows, cols, channels] New data to predict. Returns ------- labels : array, shape [n_samples,] Index of the cluster each sample belongs to. """ check_is_fitted(self, "cluster_centers_") X = check_image_array(self, X) return self._predict(X) def plot_centers(self, n_rows=2, figsize=(20, 20)): check_is_fitted(self, "cluster_centers_") n_cols = self.n_clusters // n_rows mean = self.cluster_centers_.mean(0) centers = self.cluster_centers_ - mean[None, ...] centers = centers.reshape(n_rows, n_cols, self.rows, self.cols, self.channels) centers = centers.swapaxes(1, 2).reshape( n_rows * self.rows, n_cols * self.cols, self.channels ) if self.channels == 1: centers = centers[..., 0] fig = plt.figure(figsize=figsize) plt.imshow(centers, cmap="seismic", vmin=-255, vmax=255) return fig
Ancestors
- sklearn.cluster.k_means_.MiniBatchKMeans
- sklearn.cluster.k_means_.KMeans
- sklearn.base.BaseEstimator
- sklearn.base.ClusterMixin
- sklearn.base.TransformerMixin
Methods
def fit(self, X, y=None)
-
Compute the centroids on X by chunking it into mini-batches. Parameters
X
:array
-like
orsparse
matrix
,shape
= [n_samples
,rows
,cols
,channels
]- Training instances to cluster.
y
:Ignored
Expand source code
def fit(self, X, y=None): """Compute the centroids on X by chunking it into mini-batches. Parameters ---------- X : array-like or sparse matrix, shape = [n_samples, rows, cols, channels] Training instances to cluster. y : Ignored """ X = check_image_array(self, X) return self._fit(X, y)
def partial_fit(self, X, y=None)
-
Update k means estimate on a single mini-batch X. Parameters
X
:array
-like
,shape
= [n_samples
,rows
,cols
,channels
]- Coordinates of the data points to cluster.
y
:Ignored
Expand source code
def partial_fit(self, X, y=None): """Update k means estimate on a single mini-batch X. Parameters ---------- X : array-like, shape = [n_samples, rows, cols, channels] Coordinates of the data points to cluster. y : Ignored """ X = check_image_array(self, X) return self._partial_fit(X, y)
def plot_centers(self, n_rows=2, figsize=(20, 20))
-
Expand source code
def plot_centers(self, n_rows=2, figsize=(20, 20)): check_is_fitted(self, "cluster_centers_") n_cols = self.n_clusters // n_rows mean = self.cluster_centers_.mean(0) centers = self.cluster_centers_ - mean[None, ...] centers = centers.reshape(n_rows, n_cols, self.rows, self.cols, self.channels) centers = centers.swapaxes(1, 2).reshape( n_rows * self.rows, n_cols * self.cols, self.channels ) if self.channels == 1: centers = centers[..., 0] fig = plt.figure(figsize=figsize) plt.imshow(centers, cmap="seismic", vmin=-255, vmax=255) return fig
def predict(self, X)
-
Predict the closest cluster each sample in X belongs to. In the vector quantization literature,
cluster_centers_
is called the code book and each value returned bypredict
is the index of the closest code in the code book. Parameters
X
: {array
-like
,sparse
matrix
},shape
= [n_samples
,rows
,cols
,channels
]- New data to predict.
Returns
labels
:array
,shape
[n_samples
,]- Index of the cluster each sample belongs to.
Expand source code
def predict(self, X): """Predict the closest cluster each sample in X belongs to. In the vector quantization literature, `cluster_centers_` is called the code book and each value returned by `predict` is the index of the closest code in the code book. Parameters ---------- X : {array-like, sparse matrix}, shape = [n_samples, rows, cols, channels] New data to predict. Returns ------- labels : array, shape [n_samples,] Index of the cluster each sample belongs to. """ check_is_fitted(self, "cluster_centers_") X = check_image_array(self, X) return self._predict(X)
def sample_data(self, X, n_samples_per_label=100)
-
Sample evenly from each cluster for X. Parameters
X
:array
-like
,shape
= [n_samples
,rows
,cols
,channels
]- Coordinates of the data points to cluster.
n_samples_per_label
:int
- Number of samples per cluster label. If X does not contain enough samples in a cluster, all samples for that cluster are used without replacement.
Returns
X_new
:array
-like
,shape
= [n_samples
,rows
,cols
,channels
]- The sampled data
y_new
:array
-like
,shape
= [n_samples
,]- The cluster labels each sample
Expand source code
def sample_data(self, X, n_samples_per_label=100): """Sample evenly from each cluster for X. Parameters ---------- X : array-like, shape = [n_samples, rows, cols, channels] Coordinates of the data points to cluster. n_samples_per_label : int Number of samples per cluster label. If X does not contain enough samples in a cluster, all samples for that cluster are used without replacement. Returns ------- X_new : array-like, shape = [n_samples, rows, cols, channels] The sampled data y_new : array-like, shape = [n_samples,] The cluster labels each sample """ labels = self.predict(X) X_new, y_new = self.sample_idx(X, n_samples_per_label) X_new = X[X_new] return X_new, y_new
def sample_idx(self, X, n_samples_per_label=100)
-
Expand source code
def sample_idx(self, X, n_samples_per_label=100): labels = self.predict(X) X_new = [] y_new = [] index = np.arange(X.shape[0]) for idx in np.unique(labels): label_idx = index[labels == idx] if label_idx.shape[0] > 0: if label_idx.shape[0] < n_samples_per_label: n_samples = label_idx.shape[0] else: n_samples = n_samples_per_label sample_idx = np.random.choice(label_idx, n_samples, replace=False) X_new.append(sample_idx) y_new.append(np.ones_like(sample_idx, dtype=np.int32) * idx) X_new = np.concatenate(X_new) y_new = np.concatenate(y_new) return X_new, y_new