-
-
Notifications
You must be signed in to change notification settings - Fork 1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
9 changed files
with
1,411 additions
and
15 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,165 @@ | ||
import unittest | ||
from unittest.mock import patch | ||
|
||
import numpy as np | ||
|
||
from Orange.classification import ColumnLearner, ColumnClassifier | ||
from Orange.data import DiscreteVariable, ContinuousVariable, Domain, Table | ||
|
||
|
||
class ColumnTest(unittest.TestCase): | ||
@classmethod | ||
def setUpClass(cls): | ||
cls.domain = Domain([DiscreteVariable("d1", values=["a", "b"]), | ||
DiscreteVariable("d2", values=["c", "d"]), | ||
DiscreteVariable("d3", values=["d", "c"]), | ||
ContinuousVariable("c1"), | ||
ContinuousVariable("c2") | ||
], | ||
DiscreteVariable("cls", values=["c", "d"]), | ||
[DiscreteVariable("m1", values=["a", "b"]), | ||
DiscreteVariable("m2", values=["d"]), | ||
ContinuousVariable("c3")] | ||
) | ||
cls.data = Table.from_numpy( | ||
cls.domain, | ||
np.array([[0, 0, 0, 1, 0.5], | ||
[0, 1, 1, 0.25, -3], | ||
[1, 0, np.nan, np.nan, np.nan]]), | ||
np.array([0, 1, 1]), | ||
np.array([[0, 0, 2], | ||
[1, 0, 8], | ||
[np.nan, np.nan, 5]]) | ||
) | ||
|
||
@patch("Orange.classification.column.ColumnModel") | ||
def test_fit_storage(self, clsfr): | ||
learner = ColumnLearner(self.domain.class_var, self.domain["d2"]) | ||
self.assertEqual(learner.name, "column 'd2'") | ||
learner.fit_storage(self.data) | ||
clsfr.assert_called_with(self.domain.class_var, self.domain["d2"], None, None) | ||
|
||
learner = ColumnLearner(self.domain.class_var, self.domain["c3"]) | ||
learner.fit_storage(self.data) | ||
clsfr.assert_called_with(self.domain.class_var, self.domain["c3"], None, None) | ||
|
||
learner = ColumnLearner(self.domain.class_var, self.domain["c3"], 42, 3.5) | ||
self.assertEqual(learner.name, "column 'c3'") | ||
learner.fit_storage(self.data) | ||
clsfr.assert_called_with(self.domain.class_var, self.domain["c3"], 42, 3.5) | ||
|
||
def test_classifier_init_checks(self): | ||
cls = ColumnClassifier(self.domain.class_var, self.domain["d2"]) | ||
cls.name = "column 'd2'" | ||
|
||
cls = ColumnClassifier(self.domain.class_var, self.domain["d3"]) | ||
cls.name = "column 'd3'" | ||
|
||
cls = ColumnClassifier(self.domain.class_var, self.domain["c3"]) | ||
cls.name = "column 'c3'" | ||
|
||
self.assertRaises( | ||
ValueError, | ||
ColumnClassifier, | ||
self.domain.class_var, self.domain["d1"]) | ||
|
||
self.assertRaises( | ||
ValueError, | ||
ColumnClassifier, | ||
DiscreteVariable("x", values=("a", "b", "c")), self.domain["c3"]) | ||
|
||
def test_check_prob_range(self): | ||
self.assertTrue( | ||
ColumnClassifier.valid_prob_range(np.array([0, 0.5, 1])) | ||
) | ||
self.assertTrue( | ||
ColumnClassifier.valid_prob_range(np.array([0, 0.5, np.nan])) | ||
) | ||
self.assertFalse( | ||
ColumnClassifier.valid_prob_range(np.array([0, 0.5, 1.5])) | ||
) | ||
self.assertFalse( | ||
ColumnClassifier.valid_prob_range(np.array([0, 0.5, -1])) | ||
) | ||
|
||
def test_check_value_sets(self): | ||
d1, d2, d3, *_ = self.domain.attributes | ||
c = self.domain.class_var | ||
m2: DiscreteVariable = self.domain["m2"] | ||
self.assertFalse(ColumnClassifier.valid_value_sets(c, d1)) | ||
self.assertTrue(ColumnClassifier.valid_value_sets(c, d2)) | ||
self.assertTrue(ColumnClassifier.valid_value_sets(c, d3)) | ||
self.assertTrue(ColumnClassifier.valid_value_sets(c, m2)) | ||
self.assertFalse(ColumnClassifier.valid_value_sets(m2, c)) | ||
|
||
def test_predict_discrete(self): | ||
# Just copy | ||
model = ColumnClassifier(self.domain.class_var, self.domain["d2"]) | ||
self.assertEqual(model.name, "column 'd2'") | ||
classes, probs = model(self.data, model.ValueProbs) | ||
np.testing.assert_equal(classes, [0, 1, 0]) | ||
np.testing.assert_equal(probs, [[1, 0], [0, 1], [1, 0]]) | ||
|
||
# Values are not in the same order -> map | ||
model = ColumnClassifier(self.domain.class_var, self.domain["d3"]) | ||
classes, probs = model(self.data, model.ValueProbs) | ||
np.testing.assert_equal(classes, [1, 0, np.nan]) | ||
np.testing.assert_equal(probs, [[0, 1], [1, 0], [0.5, 0.5]]) | ||
|
||
# Not in the same order, and one is missing -> map | ||
model = ColumnClassifier(self.domain.class_var, self.domain["m2"]) | ||
classes, probs = model(self.data, model.ValueProbs) | ||
np.testing.assert_equal(classes, [1, 1, np.nan]) | ||
np.testing.assert_equal(probs, [[0, 1], [0, 1], [0.5, 0.5]]) | ||
|
||
# Non-binary class | ||
domain = Domain( | ||
self.domain.attributes, | ||
DiscreteVariable("cls", values=["a", "c", "b", "d", "e"])) | ||
data = Table.from_numpy(domain, self.data.X, self.data.Y) | ||
model = ColumnClassifier(domain.class_var, domain["d3"]) | ||
classes, probs = model(data, model.ValueProbs) | ||
np.testing.assert_equal(classes, [3, 1, np.nan]) | ||
np.testing.assert_almost_equal( | ||
probs, | ||
np.array([[0, 0, 0, 1, 0], | ||
[0, 1, 0, 0, 0], | ||
[0.2, 0.2, 0.2, 0.2, 0.2]])) | ||
|
||
def test_predict_as_direct_probs(self): | ||
model = ColumnClassifier(self.domain.class_var, self.domain["c1"]) | ||
self.assertEqual(model.name, "column 'c1'") | ||
classes, probs = model(self.data, model.ValueProbs) | ||
np.testing.assert_equal(classes, [1, 0, np.nan]) | ||
np.testing.assert_equal(probs, [[0, 1], [0.75, 0.25], [0.5, 0.5]]) | ||
|
||
model = ColumnClassifier(self.domain.class_var, self.domain["c2"]) | ||
self.assertRaises(ValueError, model, self.data) | ||
|
||
model = ColumnClassifier(self.domain.class_var, self.domain["c3"]) | ||
self.assertRaises(ValueError, model, self.data) | ||
|
||
def test_predict_with_logistic(self): | ||
model = ColumnClassifier( | ||
self.domain.class_var, self.domain["c1"], 0.5, 3) | ||
classes, probs = model(self.data, model.ValueProbs) | ||
np.testing.assert_equal(classes, [1, 0, np.nan]) | ||
np.testing.assert_almost_equal( | ||
probs[:, 1], [1 / (1 + np.exp(-3 * (1 - 0.5))), | ||
1 / (1 + np.exp(-3 * (0.25 - 0.5))), | ||
0.5]) | ||
np.testing.assert_equal(probs[:, 0], 1 - probs[:, 1]) | ||
|
||
model = ColumnClassifier( | ||
self.domain.class_var, self.domain["c2"], 0.5, 3) | ||
classes, probs = model(self.data, model.ValueProbs) | ||
np.testing.assert_equal(classes, [0, 0, np.nan]) | ||
np.testing.assert_almost_equal( | ||
probs[:, 1], [1 / (1 + np.exp(-3 * (0.5 - 0.5))), | ||
1 / (1 + np.exp(-3 * (-3 - 0.5))), | ||
0.5]) | ||
np.testing.assert_equal(probs[:, 0], 1 - probs[:, 1]) | ||
|
||
|
||
if __name__ == "__main__": | ||
unittest.main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,155 @@ | ||
from typing import Optional | ||
|
||
import numpy as np | ||
|
||
from Orange.data import Variable, DiscreteVariable, Domain, Table | ||
from Orange.classification import LogisticRegressionLearner | ||
from Orange.regression import LinearRegressionLearner | ||
from Orange.modelling import Model, Learner | ||
|
||
__all__ = ["ColumnLearner", "ColumnModel"] | ||
|
||
|
||
def _check_column_combinations( | ||
class_var: Variable, | ||
column: Variable, | ||
fit_regression: bool): | ||
if class_var.is_continuous: | ||
if not column.is_continuous: | ||
raise ValueError( | ||
"Regression can only be used with numeric variables") | ||
return | ||
|
||
assert isinstance(class_var, DiscreteVariable) # remove type warnings | ||
if column.is_continuous: | ||
if len(class_var.values) != 2: | ||
raise ValueError( | ||
"Numeric columns can only be used with binary class variables") | ||
else: | ||
assert isinstance(column, DiscreteVariable) | ||
if not valid_value_sets(class_var, column): | ||
raise ValueError( | ||
"Column contains values that are not in class variable") | ||
if fit_regression and not column.is_continuous: | ||
raise ValueError( | ||
"Intercept and coefficient are only allowed for continuous " | ||
"variables") | ||
|
||
|
||
def valid_prob_range(values: np.ndarray): | ||
return np.nanmin(values) >= 0 and np.nanmax(values) <= 1 | ||
|
||
|
||
def valid_value_sets(class_var: DiscreteVariable, | ||
column_var: DiscreteVariable): | ||
return set(column_var.values) <= set(class_var.values) | ||
|
||
|
||
class ColumnLearner(Learner): | ||
def __init__(self, | ||
class_var: Variable, | ||
column: Variable, | ||
fit_regression: bool = False): | ||
super().__init__() | ||
_check_column_combinations(class_var, column, fit_regression) | ||
self.class_var = class_var | ||
self.column = column | ||
self.fit_regression = fit_regression | ||
self.name = f"column '{column.name}'" | ||
|
||
def __fit_coefficients(self, data: Table): | ||
# Use learners from Orange rather than directly calling | ||
# scikit-learn, so that we make sure we use the same parameters | ||
# and get the same result as we would if we used the widgets. | ||
data1 = data.transform(Domain([self.column], self.class_var)) | ||
if self.class_var.is_discrete: | ||
model = LogisticRegressionLearner()(data1) | ||
return model.intercept[0], model.coefficients[0][0] | ||
else: | ||
model = LinearRegressionLearner()(data1) | ||
return model.intercept, model.coefficients[0] | ||
|
||
def fit_storage(self, data: Table): | ||
if data.domain.class_var != self.class_var: | ||
raise ValueError("Class variable does not match the data") | ||
if not self.fit_regression: | ||
return ColumnModel(self.class_var, self.column) | ||
|
||
intercept, coefficient = self.__fit_coefficients(data) | ||
return ColumnModel(self.class_var, self.column, intercept, coefficient) | ||
|
||
|
||
class ColumnModel(Model): | ||
def __init__(self, | ||
class_var: Variable, | ||
column: Variable, | ||
intercept: Optional[float] = None, | ||
coefficient: Optional[float] = None): | ||
super().__init__(Domain([column], class_var)) | ||
|
||
_check_column_combinations(class_var, column, intercept is not None) | ||
if (intercept is not None) is not (coefficient is not None): | ||
raise ValueError( | ||
"Intercept and coefficient must both be provided or absent") | ||
|
||
self.class_var = class_var | ||
self.column = column | ||
self.intercept = intercept | ||
self.coefficient = coefficient | ||
if (column.is_discrete and | ||
class_var.values[:len(column.values)] != column.values): | ||
self.value_mapping = np.array([class_var.to_val(x) | ||
for x in column.values]) | ||
else: | ||
self.value_mapping = None | ||
|
||
pars = f" ({intercept}, {coefficient})" if intercept is not None else "" | ||
self.name = f"column '{column.name}'{pars}" | ||
|
||
def predict_storage(self, data: Table): | ||
vals = data.get_column(self.column) | ||
if self.class_var.is_discrete: | ||
return self._predict_discrete(vals) | ||
else: | ||
return self._predict_continuous(vals) | ||
|
||
def _predict_discrete(self, vals): | ||
assert isinstance(self.class_var, DiscreteVariable) | ||
nclasses = len(self.class_var.values) | ||
proba = np.full((len(vals), nclasses), np.nan) | ||
rows = np.isfinite(vals) | ||
if self.column.is_discrete: | ||
mapped = vals[rows].astype(int) | ||
if self.value_mapping is not None: | ||
mapped = self.value_mapping[mapped] | ||
vals = vals.copy() | ||
vals[rows] = mapped | ||
proba[rows] = 0 | ||
proba[rows, mapped] = 1 | ||
else: | ||
if self.coefficient is None: | ||
if not valid_prob_range(vals): | ||
raise ValueError("Column values must be in [0, 1] range " | ||
"unless logistic function is applied") | ||
proba[rows, 1] = vals[rows] | ||
else: | ||
proba[rows, 1] = ( | ||
1 / | ||
(1 + np.exp(-self.intercept - self.coefficient * vals[rows]) | ||
)) | ||
|
||
proba[rows, 0] = 1 - proba[rows, 1] | ||
vals = (proba[:, 1] > 0.5).astype(float) | ||
vals[~rows] = np.nan | ||
return vals, proba | ||
|
||
def _predict_continuous(self, vals): | ||
if self.coefficient is None: | ||
return vals | ||
else: | ||
return vals * self.coefficient + self.intercept | ||
|
||
def __str__(self): | ||
pars = f" ({self.intercept}, {self.coefficient})" \ | ||
if self.intercept is not None else "" | ||
return f'ColumnModel {self.column.name}{pars}' |
Oops, something went wrong.