Skip to content

Commit

Permalink
TST Parametrize, refactor and add new kmeans tests (scikit-learn#12432)
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremiedbb authored and rth committed Oct 27, 2018
1 parent 5cef1df commit 2912e3a
Showing 1 changed file with 90 additions and 63 deletions.
153 changes: 90 additions & 63 deletions sklearn/cluster/tests/test_k_means.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,50 @@
X_csr = sp.csr_matrix(X)


def test_elkan_results():
@pytest.mark.parametrize("representation, algo",
[('dense', 'full'),
('dense', 'elkan'),
('sparse', 'full')])
@pytest.mark.parametrize("dtype", [np.float32, np.float64])
def test_kmeans_results(representation, algo, dtype):
# cheks that kmeans works as intended
array_constr = {'dense': np.array, 'sparse': sp.csr_matrix}[representation]
X = array_constr([[0, 0], [0.5, 0], [0.5, 1], [1, 1]], dtype=dtype)
sample_weight = [3, 1, 1, 3] # will be rescaled to [1.5, 0.5, 0.5, 1.5]
init_centers = np.array([[0, 0], [1, 1]], dtype=dtype)

expected_labels = [0, 0, 1, 1]
expected_inertia = 0.1875
expected_centers = np.array([[0.125, 0], [0.875, 1]], dtype=dtype)
expected_n_iter = 2

kmeans = KMeans(n_clusters=2, n_init=1, init=init_centers, algorithm=algo)
kmeans.fit(X, sample_weight=sample_weight)

assert_array_equal(kmeans.labels_, expected_labels)
assert_almost_equal(kmeans.inertia_, expected_inertia)
assert_array_almost_equal(kmeans.cluster_centers_, expected_centers)
assert kmeans.n_iter_ == expected_n_iter


@pytest.mark.parametrize('distribution', ['normal', 'blobs'])
def test_elkan_results(distribution):
# check that results are identical between lloyd and elkan algorithms
rnd = np.random.RandomState(0)
X_normal = rnd.normal(size=(50, 10))
X_blobs, _ = make_blobs(random_state=0)
if distribution is 'normal':
X = rnd.normal(size=(50, 10))
else:
X, _ = make_blobs(random_state=rnd)

km_full = KMeans(algorithm='full', n_clusters=5, random_state=0, n_init=1)
km_elkan = KMeans(algorithm='elkan', n_clusters=5,
random_state=0, n_init=1)
for X in [X_normal, X_blobs]:
km_full.fit(X)
km_elkan.fit(X)
assert_array_almost_equal(km_elkan.cluster_centers_,
km_full.cluster_centers_)
assert_array_equal(km_elkan.labels_, km_full.labels_)

km_full.fit(X)
km_elkan.fit(X)
assert_array_almost_equal(km_elkan.cluster_centers_,
km_full.cluster_centers_)
assert_array_equal(km_elkan.labels_, km_full.labels_)


def test_labels_assignment_and_inertia():
Expand Down Expand Up @@ -292,6 +323,36 @@ def test_k_means_fortran_aligned_data():
assert_array_equal(km.labels_, labels)


@pytest.mark.parametrize('algo', ['full', 'elkan'])
@pytest.mark.parametrize('dtype', [np.float32, np.float64])
@pytest.mark.parametrize('constructor', [np.asarray, sp.csr_matrix])
@pytest.mark.parametrize('seed, max_iter, tol', [
(0, 2, 1e-7), # strict non-convergence
(1, 2, 1e-1), # loose non-convergence
(3, 300, 1e-7), # strict convergence
(4, 300, 1e-1), # loose convergence
])
def test_k_means_fit_predict(algo, dtype, constructor, seed, max_iter, tol):
# check that fit.predict gives same result as fit_predict
# There's a very small chance of failure with elkan on unstructured dataset
# because predict method uses fast euclidean distances computation which
# may cause small numerical instabilities.
if not (algo == 'elkan' and constructor is sp.csr_matrix):
rng = np.random.RandomState(seed)

X = make_blobs(n_samples=1000, n_features=10, centers=10,
random_state=rng)[0].astype(dtype, copy=False)
X = constructor(X)

kmeans = KMeans(algorithm=algo, n_clusters=10, random_state=seed,
tol=tol, max_iter=max_iter, n_jobs=1)

labels_1 = kmeans.fit(X).predict(X)
labels_2 = kmeans.fit_predict(X)

assert_array_equal(labels_1, labels_2)


def test_mb_kmeans_verbose():
mb_k_means = MiniBatchKMeans(init="k-means++", n_clusters=n_clusters,
random_state=42, verbose=1)
Expand Down Expand Up @@ -472,13 +533,9 @@ def test_minibatch_set_init_size():
_check_fitted_model(mb_k_means)


def test_k_means_invalid_init():
km = KMeans(init="invalid", n_init=1, n_clusters=n_clusters)
assert_raises(ValueError, km.fit, X)


def test_mini_match_k_means_invalid_init():
km = MiniBatchKMeans(init="invalid", n_init=1, n_clusters=n_clusters)
@pytest.mark.parametrize("Estimator", [KMeans, MiniBatchKMeans])
def test_k_means_invalid_init(Estimator):
km = Estimator(init="invalid", n_init=1, n_clusters=n_clusters)
assert_raises(ValueError, km.fit, X)


Expand Down Expand Up @@ -513,24 +570,6 @@ def test_k_means_non_collapsed():
assert_true(np.linalg.norm(centers[1] - centers[2]) >= 0.1)


def test_predict():
km = KMeans(n_clusters=n_clusters, random_state=42)

km.fit(X)

# sanity check: predict centroid labels
pred = km.predict(km.cluster_centers_)
assert_array_equal(pred, np.arange(n_clusters))

# sanity check: re-predict labeling for training set samples
pred = km.predict(X)
assert_array_equal(pred, km.labels_)

# re-predict labels for training set using fit_predict
pred = km.fit_predict(X)
assert_array_equal(pred, km.labels_)


@pytest.mark.parametrize('algo', ['full', 'elkan'])
def test_score(algo):
# Check that fitting k-means with multiple inits gives better score
Expand All @@ -540,22 +579,27 @@ def test_score(algo):
km2 = KMeans(n_clusters=n_clusters, max_iter=10, random_state=42, n_init=1,
algorithm=algo)
s2 = km2.fit(X).score(X)
assert_greater(s2, s1)
assert s2 > s1


@pytest.mark.parametrize('Estimator', [KMeans, MiniBatchKMeans])
@pytest.mark.parametrize('data', [X, X_csr], ids=['dense', 'sparse'])
@pytest.mark.parametrize('init', ['random', 'k-means++', centers.copy()])
def test_predict_minibatch(data, init):
mb_k_means = MiniBatchKMeans(n_clusters=n_clusters, init=init,
n_init=10, random_state=0).fit(data)
def test_predict(Estimator, data, init):
k_means = Estimator(n_clusters=n_clusters, init=init,
n_init=10, random_state=0).fit(data)

# sanity check: re-predict labeling for training set samples
assert_array_equal(mb_k_means.predict(data), mb_k_means.labels_)
assert_array_equal(k_means.predict(data), k_means.labels_)

# sanity check: predict centroid labels
pred = mb_k_means.predict(mb_k_means.cluster_centers_)
pred = k_means.predict(k_means.cluster_centers_)
assert_array_equal(pred, np.arange(n_clusters))

# re-predict labels for training set using fit_predict
pred = k_means.fit_predict(data)
assert_array_equal(pred, k_means.labels_)


@pytest.mark.parametrize('init', ['random', 'k-means++', centers.copy()])
def test_predict_minibatch_dense_sparse(init):
Expand Down Expand Up @@ -684,7 +728,7 @@ def test_k_means_function():


def test_x_squared_norms_init_centroids():
"""Test that x_squared_norms can be None in _init_centroids"""
# Test that x_squared_norms can be None in _init_centroids
from sklearn.cluster.k_means_ import _init_centroids

X_norms = np.sum(X**2, axis=1)
Expand All @@ -696,7 +740,6 @@ def test_x_squared_norms_init_centroids():


def test_max_iter_error():

km = KMeans(max_iter=-1)
assert_raise_message(ValueError, 'Number of iterations should be',
km.fit, X)
Expand Down Expand Up @@ -759,31 +802,15 @@ def test_k_means_init_centers():
init_centers))


def test_sparse_k_means_init_centers():
from sklearn.datasets import load_iris

iris = load_iris()
X = iris.data

@pytest.mark.parametrize("data", [X, X_csr], ids=["dense", "sparse"])
def test_k_means_init_fitted_centers(data):
# Get a local optimum
centers = KMeans(n_clusters=3).fit(X).cluster_centers_

# Fit starting from a local optimum shouldn't change the solution
np.testing.assert_allclose(
centers,
KMeans(n_clusters=3,
init=centers,
n_init=1).fit(X).cluster_centers_
)

# The same should be true when X is sparse
X_sparse = sp.csr_matrix(X)
np.testing.assert_allclose(
centers,
KMeans(n_clusters=3,
init=centers,
n_init=1).fit(X_sparse).cluster_centers_
)
new_centers = KMeans(n_clusters=3, init=centers,
n_init=1).fit(X).cluster_centers_
assert_array_almost_equal(centers, new_centers)


def test_sparse_validate_centers():
Expand Down

0 comments on commit 2912e3a

Please sign in to comment.