Skip to content

Commit

Permalink
Added incremental learning method to SGDClassifier; resolves #50
Browse files Browse the repository at this point in the history
  • Loading branch information
kszucs committed Jul 14, 2015
1 parent 84459ca commit 4eccd1e
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 31 deletions.
32 changes: 10 additions & 22 deletions splearn/linear_model/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@
import scipy.sparse as sp
from sklearn.base import copy
from sklearn.linear_model.base import LinearRegression
from splearn.base import SparkBroadcasterMixin

from ..utils.validation import check_rdd


class SparkLinearModelMixin(object):
class SparkLinearModelMixin(SparkBroadcasterMixin):

__transient__ = ['coef_', 'intercept_']

def __add__(self, other):
"""Add method for Linear models with coef and intercept attributes.
Expand Down Expand Up @@ -62,7 +65,7 @@ def __div__(self, other):

__truediv__ = __div__

def _spark_fit(self, cls, Z, *args, **kwargs):
def _average_fit(self, cls, Z, *args, **kwargs):
"""Wraps a Scikit-learn Linear model's fit method to use with RDD
input.
Expand All @@ -80,28 +83,11 @@ def _spark_fit(self, cls, Z, *args, **kwargs):
mapper = lambda X_y: super(cls, self).fit(
X_y[0], X_y[1], *args, **kwargs
)
models = Z.map(mapper)
models = Z[:, ['X', 'y']].map(mapper)
avg = models.sum() / models.count()
self.__dict__.update(avg.__dict__)
return self

def _spark_predict(self, cls, X, *args, **kwargs):
"""Wraps a Scikit-learn Linear model's predict method to use with RDD
input.
Parameters
----------
cls : class object
The sklearn linear model's class to wrap.
Z : ArrayRDD
The distributed data to predict in a DictRDD.
Returns
-------
self: the wrapped class
"""
return X.map(lambda X: super(cls, self).predict(X, *args, **kwargs))


class SparkLinearRegression(LinearRegression, SparkLinearModelMixin):

Expand Down Expand Up @@ -148,7 +134,7 @@ def fit(self, Z):
self : returns an instance of self.
"""
check_rdd(Z, {'X': (sp.spmatrix, np.ndarray)})
return self._spark_fit(SparkLinearRegression, Z)
return self._average_fit(SparkLinearRegression, Z)

def predict(self, X):
"""Distributed method to predict class labels for samples in X.
Expand All @@ -164,4 +150,6 @@ def predict(self, X):
Predicted class label per sample.
"""
check_rdd(X, (sp.spmatrix, np.ndarray))
return self._spark_predict(SparkLinearRegression, X)
mapper = self.broadcast(
super(SparkLinearRegression, self).predict, X.context)
return X.transform(mapper, column='X')
6 changes: 4 additions & 2 deletions splearn/linear_model/logistic.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def fit(self, Z, classes=None):
# possible improve to partial_fit in partisions and then average
# in final reduce
self._classes_ = np.unique(classes)
return self._spark_fit(SparkLogisticRegression, Z)
return self._average_fit(SparkLogisticRegression, Z)

def predict(self, X):
"""Distributed method to predict class labels for samples in X.
Expand All @@ -150,4 +150,6 @@ def predict(self, X):
Predicted class label per sample.
"""
check_rdd(X, (sp.spmatrix, np.ndarray))
return self._spark_predict(SparkLogisticRegression, X)
mapper = self.broadcast(
super(SparkLogisticRegression, self).predict, X.context)
return X.transform(mapper, column='X')
21 changes: 17 additions & 4 deletions splearn/linear_model/stochastic_gradient.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,13 @@ class SparkSGDClassifier(SGDClassifier, SparkLinearModelMixin):
"""

def __init__(self, *args, **kwargs):
def __init__(self, learning_method='average', *args, **kwargs):
super(SparkSGDClassifier, self).__init__(*args, **kwargs)
self.average = True # force averaging
# self.average = True # force averaging
if learning_method not in ['average', 'incremental']:
raise ValueError('learning_method must be either average or '
'incremental, given {0}'.format(learning_method))
self.learning_method = learning_method

# workaround to keep the classes parameter unchanged
@property
Expand Down Expand Up @@ -169,7 +173,14 @@ def fit(self, Z, classes=None):
"""
check_rdd(Z, {'X': (sp.spmatrix, np.ndarray)})
self._classes_ = np.unique(classes)
return self._spark_fit(SparkSGDClassifier, Z)

if self.learning_method == 'average':
self._average_fit(SparkSGDClassifier, Z)
else:
for X, y in Z[:, ['X', 'y']]:
self.partial_fit(X, y)

return self

def predict(self, X):
"""Distributed method to predict class labels for samples in X.
Expand All @@ -185,4 +196,6 @@ def predict(self, X):
Predicted class label per sample.
"""
check_rdd(X, (sp.spmatrix, np.ndarray))
return self._spark_predict(SparkSGDClassifier, X)
mapper = self.broadcast(
super(SparkSGDClassifier, self).predict, X.context)
return X.transform(mapper, column='X')
23 changes: 21 additions & 2 deletions splearn/linear_model/tests/test_stochastic_gradient.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,35 @@
from sklearn.linear_model import SGDClassifier
from splearn.linear_model import SparkSGDClassifier
from splearn.utils.testing import SplearnTestCase, assert_true
from splearn.utils.testing import assert_array_almost_equal
from splearn.utils.validation import check_rdd_dtype


class TestSGDClassifier(SplearnTestCase):

def test_same_prediction(self):
def test_average_prediction(self):
X, y, Z = self.make_classification(2, 80000)

local = SGDClassifier(average=True)
dist = SparkSGDClassifier(average=True)
dist = SparkSGDClassifier(average=True, learning_method='average')

local.fit(X, y)
dist.fit(Z, classes=np.unique(y))

y_local = local.predict(X)
y_dist = dist.predict(Z[:, 'X'])

mismatch = y_local.shape[0] - np.count_nonzero(y_dist.toarray() == y_local)
mismatch_percent = float(mismatch) * 100 / y_local.shape[0]

assert_true(mismatch_percent <= 1)
assert_true(check_rdd_dtype(y_dist, (np.ndarray,)))

def test_incremental_prediction(self):
X, y, Z = self.make_classification(2, 80000)

local = SGDClassifier(average=True)
dist = SparkSGDClassifier(average=True, learning_method='incremental')

local.fit(X, y)
dist.fit(Z, classes=np.unique(y))
Expand Down
111 changes: 111 additions & 0 deletions splearn/random_projection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import warnings

import numpy as np
import scipy.sparse as sp
from numpy.testing import assert_equal
from sklearn.random_projection import (BaseRandomProjection,
GaussianRandomProjection,
SparseRandomProjection,
johnson_lindenstrauss_min_dim)
from sklearn.utils import DataDimensionalityWarning

from .base import SparkBroadcasterMixin
from .rdd import DictRDD
from .utils.validation import check_rdd


class SparkBaseRandomProjection(BaseRandomProjection, SparkBroadcasterMixin):

__transient__ = ['components_']

def fit(self, Z):
"""Generate a sparse random projection matrix
Parameters
----------
X : numpy array or scipy.sparse of shape [n_samples, n_features]
Training set: only the shape is used to find optimal random
matrix dimensions based on the theory referenced in the
afore mentioned papers.
y : is not used: placeholder to allow for usage in a Pipeline.
Returns
-------
self
"""
X = Z[:, 'X'] if isinstance(Z, DictRDD) else Z
check_rdd(X, (np.ndarray, sp.spmatrix))

n_samples, n_features = X.shape

if self.n_components == 'auto':
self.n_components_ = johnson_lindenstrauss_min_dim(
n_samples=n_samples, eps=self.eps)

if self.n_components_ <= 0:
raise ValueError(
'eps=%f and n_samples=%d lead to a target dimension of '
'%d which is invalid' % (
self.eps, n_samples, self.n_components_))

elif self.n_components_ > n_features:
raise ValueError(
'eps=%f and n_samples=%d lead to a target dimension of '
'%d which is larger than the original space with '
'n_features=%d' % (self.eps, n_samples, self.n_components_,
n_features))
else:
if self.n_components <= 0:
raise ValueError("n_components must be greater than 0, got %s"
% self.n_components_)

elif self.n_components > n_features:
warnings.warn(
"The number of components is higher than the number of"
" features: n_features < n_components (%s < %s)."
"The dimensionality of the problem will not be reduced."
% (n_features, self.n_components),
DataDimensionalityWarning)

self.n_components_ = self.n_components

# Generate a projection matrix of size [n_components, n_features]
self.components_ = self._make_random_matrix(self.n_components_,
n_features)

# Check contract
assert_equal(
self.components_.shape,
(self.n_components_, n_features),
err_msg=('An error has occurred the self.components_ matrix has '
' not the proper shape.'))

return self

def transform(self, Z):
"""Project the data by using matrix product with the random matrix
Parameters
----------
X : numpy array or scipy.sparse of shape [n_samples, n_features]
The input data to project into a smaller dimensional space.
y : is not used: placeholder to allow for usage in a Pipeline.
Returns
-------
X_new : numpy array or scipy sparse of shape [n_samples, n_components]
Projected array.
"""
X = Z[:, 'X'] if isinstance(Z, DictRDD) else Z
check_rdd(X, (np.ndarray, sp.spmatrix))

dtype = np.ndarray if self.dense_output else None
mapper = self.broadcast(
super(SparkBaseRandomProjection, self).transform, Z.context)
return Z.transform(mapper, column='X', dtype=dtype)


class SparkGaussianRandomProjection(GaussianRandomProjection,
SparkBaseRandomProjection):
pass


class SparkSparseRandomProjection(SparseRandomProjection,
SparkBaseRandomProjection):
pass
3 changes: 3 additions & 0 deletions splearn/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,9 @@ def __len__(self):
"""Returns the number of elements in all blocks."""
return self._rdd.map(len).sum()

def __iter__(self):
return self._rdd.toLocalIterator()

@property
def context(self):
return self._rdd.ctx
Expand Down
3 changes: 2 additions & 1 deletion splearn/svm/classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ def fit(self, Z, classes=None):
"""
check_rdd(Z, {'X': (sp.spmatrix, np.ndarray)})
self._classes_ = np.unique(classes)
return self._spark_fit(SparkLinearSVC, Z)

return self._average_fit(SparkLinearSVC, Z)

def predict(self, X):
"""Distributed method to predict class labels for samples in X.
Expand Down

0 comments on commit 4eccd1e

Please sign in to comment.