From e67d3a73bcec8898addf15da324da54b2e74629d Mon Sep 17 00:00:00 2001 From: Mathew Shen Date: Sat, 10 Aug 2024 04:16:02 +0800 Subject: [PATCH] feat(ds): add init_error for worker init error matrix (#114) * feat(ds): add init_error for worker init error matrix * add initial_error in m_step * add: tests * fix: mypy error * fix: test cases * docs: add examples and notes about in docs * add strategy for initial error matrix apply * fix: make m_step function as static method * tests: add more tests for assign strategy * add more docs & examples * fix: tests mypy error * fix: format --- .../aggregation/classification/dawid_skene.py | 172 ++++++++- tests/aggregation/test_ds_aggregation.py | 326 +++++++++++++++++- tests/conftest.py | 17 + 3 files changed, 504 insertions(+), 11 deletions(-) diff --git a/crowdkit/aggregation/classification/dawid_skene.py b/crowdkit/aggregation/classification/dawid_skene.py index aaf4307e..1fbe7e2c 100644 --- a/crowdkit/aggregation/classification/dawid_skene.py +++ b/crowdkit/aggregation/classification/dawid_skene.py @@ -1,6 +1,6 @@ __all__ = ["DawidSkene", "OneCoinDawidSkene"] -from typing import Any, List, Optional, cast +from typing import Any, List, Literal, Optional, cast import attr import numpy as np @@ -55,6 +55,47 @@ class DawidSkene(BaseClassificationAggregator): >>> true_labels = gt[:1000] # use the first 100 true labels >>> ds = DawidSkene(100) >>> result = ds.fit_predict(df, true_labels) + + We can also provide the workers' initial error matrices, which come from historical performance data. + There two strategies to initialize the workers' error matrices: `assign` and `addition`. + Here we create simple error matrices with two workers: + + ``` + 0 1 + worker label + w851 0 9 1 + 1 1 9 + w6991 0 9 1 + 1 1 9 + ``` + + Note: + 1. Make sure the error matrix is indexed by `worker` and `label` + with columns for every `label_id` appeared in `data`. + You can use the `pandas.MultiIndex` to create such an index, see the example below. + + 2. When using `addition` strategy, the error matrix should contain the history **count**(not probability) that `worker` produces `observed_label`, + given that the task true label is `true_label`. + + When we use the `addition` strategy, partial error matrices are acceptable, + which will be added to the workers' priori error matrices estimated from the given data. + + Examples: + >>> import pandas as pd + >>> from crowdkit.aggregation import DawidSkene + >>> from crowdkit.datasets import load_dataset + >>> df, gt = load_dataset('relevance-2') + >>> error_matrix_index = pd.MultiIndex.from_arrays([['w851', 'w851', 'w6991', 'w6991'], [0, 1, 0, 1]], names=['worker', 'label']) + >>> initial_error = pd.DataFrame( + ... data=[[9, 1], [1, 9], [9, 1], [1, 9]], + ... index=error_matrix_index, + ... columns=[0, 1], + ... ) + >>> ds = DawidSkene(100, initial_error_strategy='addition') + >>> result = ds.fit_predict(df, initial_error=initial_error) + + We can also use the `assign` strategy to initialize the workers' error matrices. + But in this case, the `initial_error` **must** contain all the workers' error matrices in the data. """ n_iter: int = attr.ib(default=100) @@ -64,6 +105,44 @@ class DawidSkene(BaseClassificationAggregator): """The tolerance stopping criterion for iterative methods with a variable number of steps. The algorithm converges when the loss change is less than the `tol` parameter.""" + initial_error_strategy: Optional[Literal["assign", "addition"]] = attr.ib( + default=None + ) + """The strategy for initializing the workers' error matrices. + The `assign` strategy assigns the initial error matrix to the workers' error matrices; + the `addition` strategy adds the initial error matrix with the workers' priori error matrices estimated + from the given data. If `None`, the initial error matrix is not used. + + Note: + - `addition` strategy + - The initial error matrix can be partial, not all workers' error matrices need to be provided. + - The initial error matrix values should be the history **count** that + `worker` produces `observed_label`, given that the task true label is `true_label`. + For example(count values error matrix): + + 0 1 + worker label + w851 0 9 1 + 1 1 9 + w6991 0 9 1 + 1 1 9 + + - `assign` strategy + - The initial error matrix must contain all the workers' error matrices in the data. + - The initial error matrix values could be the probability or count that + `worker` produces `observed_label`, given that the task true label is `true_label`. + When given probability error matrix, the values should sum up to 1 for each worker at each `true_label` column. + For example(probability values error matrix): + + 0 1 + worker label + w851 0 0.9 0.1 + 1 0.1 0.9 + w6991 0 0.9 0.1 + 1 0.1 0.9 + ... + """ + probas_: Optional[pd.DataFrame] = attr.ib(init=False) """The probability distributions of task labels. The `pandas.Series` data is indexed by `task` so that `labels.loc[task]` is the most likely true label of tasks.""" @@ -83,15 +162,22 @@ class DawidSkene(BaseClassificationAggregator): """ A list of loss values during training.""" @staticmethod - def _m_step(data: pd.DataFrame, probas: pd.DataFrame) -> pd.DataFrame: + def _m_step( + data: pd.DataFrame, + probas: pd.DataFrame, + initial_error: Optional[pd.DataFrame] = None, + initial_error_strategy: Optional[Literal["assign", "addition"]] = None, + ) -> pd.DataFrame: """Performs M-step of the Dawid-Skene algorithm. Estimates the workers' error probability matrix using the specified workers' responses and the true task label probabilities. """ joined = data.join(probas, on="task") joined.drop(columns=["task"], inplace=True) - errors = joined.groupby(["worker", "label"], sort=False).sum() + # Apply the initial error matrix + errors = initial_error_apply(errors, initial_error, initial_error_strategy) + # Normalize the error matrix errors.clip(lower=_EPS, inplace=True) errors /= errors.groupby("worker", sort=False).sum() @@ -160,7 +246,10 @@ def _evidence_lower_bound( return float(joint_expectation + entropy) def fit( - self, data: pd.DataFrame, true_labels: Optional["pd.Series[Any]"] = None + self, + data: pd.DataFrame, + true_labels: Optional["pd.Series[Any]"] = None, + initial_error: Optional[pd.DataFrame] = None, ) -> "DawidSkene": """Fits the model to the training data with the EM algorithm. @@ -171,7 +260,12 @@ def fit( The `pandas.Series` data is indexed by `task` so that `labels.loc[task]` is the task ground truth label. When provided, the model will correct the probability distributions of task labels by the true labels during the iterative process. - + initial_error (DataFrame): The workers' initial error matrices, comes from historical performance data. + The `pandas.DataFrame` data is indexed by `worker` and `label` with a column + for every `label_id` found in `data` so that `result.loc[worker, observed_label, true_label]` is the + history **count** that `worker` produces `observed_label`, given that the task true label is `true_label`. + When the `initial_error_strategy` is `assign`, the values in the error matrix can be the probability too. + Check the examples in the class docstring for more details. Returns: DawidSkene: self. """ @@ -192,7 +286,7 @@ def fit( if true_labels is not None: probas = self._correct_probas_with_golden(probas, true_labels) priors = probas.mean() - errors = self._m_step(data, probas) + errors = self._m_step(data, probas, initial_error, self.initial_error_strategy) loss = -np.inf self.loss_history_ = [] @@ -225,7 +319,10 @@ def fit( return self def fit_predict_proba( - self, data: pd.DataFrame, true_labels: Optional["pd.Series[Any]"] = None + self, + data: pd.DataFrame, + true_labels: Optional["pd.Series[Any]"] = None, + initial_error: Optional[pd.DataFrame] = None, ) -> pd.DataFrame: """Fits the model to the training data and returns probability distributions of labels for each task. @@ -236,6 +333,12 @@ def fit_predict_proba( The `pandas.Series` data is indexed by `task` so that `labels.loc[task]` is the task ground truth label. When provided, the model will correct the probability distributions of task labels by the true labels during the iterative process. + initial_error (DataFrame): The workers' initial error matrices, comes from historical performance data. + The `pandas.DataFrame` data is indexed by `worker` and `label` with a column + for every `label_id` found in `data` so that `result.loc[worker, observed_label, true_label]` is the + history **count** that `worker` produces `observed_label`, given that the task true label is `true_label`. + When the `initial_error_strategy` is `assign`, the values in the error matrix can be the probability too. + Check the examples in the class docstring for more details. Returns: DataFrame: Probability distributions of task labels. @@ -243,12 +346,15 @@ def fit_predict_proba( Each probability is in the range from 0 to 1, all task probabilities must sum up to 1. """ - self.fit(data, true_labels) + self.fit(data, true_labels, initial_error) assert self.probas_ is not None, "no probas_" return self.probas_ def fit_predict( - self, data: pd.DataFrame, true_labels: Optional["pd.Series[Any]"] = None + self, + data: pd.DataFrame, + true_labels: Optional["pd.Series[Any]"] = None, + initial_error: Optional[pd.DataFrame] = None, ) -> "pd.Series[Any]": """Fits the model to the training data and returns the aggregated results. @@ -259,12 +365,18 @@ def fit_predict( The `pandas.Series` data is indexed by `task` so that `labels.loc[task]` is the task ground truth label. When provided, the model will correct the probability distributions of task labels by the true labels during the iterative process. + initial_error (DataFrame): The workers' initial error matrices, comes from historical performance data. + The `pandas.DataFrame` data is indexed by `worker` and `label` with a column + for every `label_id` found in `data` so that `result.loc[worker, observed_label, true_label]` is the + history **count** that `worker` produces `observed_label`, given that the task true label is `true_label`. + When the `initial_error_strategy` is `assign`, the values in the error matrix can be the probability too. + Check the examples in the class docstring for more details. Returns: Series: Task labels. The `pandas.Series` data is indexed by `task` so that `labels.loc[task]` is the most likely true label of tasks. """ - self.fit(data, true_labels) + self.fit(data, true_labels, initial_error) assert self.labels_ is not None, "no labels_" return self.labels_ @@ -292,6 +404,46 @@ def _correct_probas_with_golden( return corrected_probas +def initial_error_apply( + errors: pd.DataFrame, + initial_error: Optional[pd.DataFrame], + initial_error_strategy: Optional[Literal["assign", "addition"]], +) -> pd.DataFrame: + if initial_error_strategy is None or initial_error is None: + return errors + # check the index names of initial_error + if initial_error.index.names != errors.index.names: + raise ValueError( + f"The index of initial_error must be: {errors.index.names}," + f"but got: {initial_error.index.names}" + ) + if initial_error_strategy == "assign": + # check the completeness of initial_error: all the workers in data should be in initial_error + mask = errors.index.isin(initial_error.index) + if not mask.all(): + not_found_workers = errors.index[~mask].get_level_values("worker").unique() + raise ValueError( + f"All the workers in data should be in initial_error: " + f"Can not find {len(not_found_workers)} workers' error matrix in initial_error" + ) + # if the values in initial_error are probability, check the sum of each worker's error matrix + if (initial_error <= 1.0).all().all() and not np.allclose( + initial_error.groupby("worker", sort=False).sum(), 1.0 + ): + raise ValueError( + "The sum of each worker's error matrix in initial_error should be 1.0" + ) + errors = initial_error + elif initial_error_strategy == "addition": + errors = errors.add(initial_error, axis="index", fill_value=0.0) + else: + raise ValueError( + f"Invalid initial_error_strategy: {initial_error_strategy}," + f"should be 'assign' or 'addition'" + ) + return errors + + @attr.s class OneCoinDawidSkene(DawidSkene): r"""The **one-coin Dawid-Skene** aggregation model works exactly the same as the original Dawid-Skene model diff --git a/tests/aggregation/test_ds_aggregation.py b/tests/aggregation/test_ds_aggregation.py index e6fdf5b9..8fac92da 100644 --- a/tests/aggregation/test_ds_aggregation.py +++ b/tests/aggregation/test_ds_aggregation.py @@ -3,7 +3,7 @@ Testing all boundary conditions and asserts """ -from typing import Any, List, cast +from typing import Any, List, Literal, Optional, cast import numpy as np import pandas as pd @@ -13,6 +13,330 @@ from crowdkit.aggregation import DawidSkene, OneCoinDawidSkene +class TestWorkerInitError: + + @pytest.mark.parametrize( + "n_iter, tol, strategy", + [ + (10, 0, "addition"), + (10, 0, "assign"), + (10, 0, None), + ], + ) + def test_without_initial_error_on_toy_ysda( + self, + n_iter: int, + tol: float, + strategy: Optional[Literal["assign", "addition"]], + toy_answers_df: pd.DataFrame, + toy_ground_truth_df: "pd.Series[Any]", + ) -> None: + """ + Basic parameter compatibility test: None parameters + """ + np.random.seed(42) + ds = DawidSkene(n_iter=n_iter, tol=tol, initial_error_strategy=strategy) + assert_series_equal( + ds.fit(toy_answers_df, initial_error=None).labels_.sort_index(), # type: ignore + toy_ground_truth_df.sort_index(), + ) + + @pytest.mark.parametrize( + "n_iter, tol, strategy", + [ + (10, 0, "addition"), + (10, 0, "assign"), + (100500, 1e-5, "addition"), + (100500, 1e-5, "assign"), + ], + ) + def test_zero_error_addition_on_toy_ysda( + self, + n_iter: int, + tol: float, + strategy: Literal["assign", "addition"], + toy_answers_df: pd.DataFrame, + toy_ground_truth_df: "pd.Series[Any]", + toy_worker_init_error_zero_df: pd.DataFrame, + ) -> None: + """ + Basic parameter compatibility test: zeros initial error matrix + """ + np.random.seed(42) + initial_error_df = toy_worker_init_error_zero_df + ds = DawidSkene(n_iter=n_iter, tol=tol, initial_error_strategy=strategy) + if strategy == "addition": + assert_series_equal( + ds.fit(toy_answers_df, initial_error=initial_error_df).labels_.sort_index(), # type: ignore + toy_ground_truth_df.sort_index(), + ) + else: + with pytest.raises( + ValueError, + match="The sum of each worker's error matrix in initial_error should be 1.0", + ): + ds.fit(toy_answers_df, initial_error=initial_error_df) + + @pytest.mark.parametrize( + "n_iter, tol, strategy", + [ + (10, 0, "addition"), + (10, 0, "assign"), + (100500, 1e-5, "addition"), + (100500, 1e-5, "assign"), + ], + ) + def test_zero_partial_error_on_toy_ysda( + self, + n_iter: int, + tol: float, + strategy: Literal["assign", "addition"], + toy_answers_df: pd.DataFrame, + toy_ground_truth_df: "pd.Series[Any]", + toy_worker_init_error_zero_df: pd.DataFrame, + ) -> None: + """ + Basic parameter compatibility test: when initial_error doesn't contain all workers + """ + np.random.seed(42) + initial_error_df = toy_worker_init_error_zero_df[:3] + ds = DawidSkene(n_iter=n_iter, tol=tol, initial_error_strategy=strategy) + if strategy == "addition": + assert_series_equal( + ds.fit(toy_answers_df, initial_error=initial_error_df).labels_.sort_index(), # type: ignore + toy_ground_truth_df.sort_index(), + ) + else: + with pytest.raises( + ValueError, + ): + ds.fit(toy_answers_df, initial_error=initial_error_df) + + @pytest.mark.parametrize( + "n_iter, tol, strategy", [(10, 0, "addition"), (100500, 1e-5, "addition")] + ) + def test_addition_consistency_on_toy_ysda( + self, + n_iter: int, + tol: float, + strategy: Literal["assign", "addition"], + toy_answers_df: pd.DataFrame, + toy_ground_truth_df: "pd.Series[Any]", + toy_worker_init_error_zero_df: "pd.Series[Any]", + ) -> None: + """ + Behavior test: when worker's init error matrix is similar to the error matrix in these tasks, + the aggregation result should be the same as before(without init error) + """ + np.random.seed(42) + # According to the ground truth data, w2's answer is always right. + # so when we set the initial error matrix of w2 to almost right, we should get same results + init_error_df = toy_worker_init_error_zero_df + init_error_df[("w2", "yes"), "no"] = 1 + init_error_df[("w2", "yes"), "yes"] = 99 + init_error_df[("w2", "no"), "yes"] = 1 + init_error_df[("w2", "no"), "no"] = 99 + + ds = DawidSkene(n_iter=n_iter, tol=tol, initial_error_strategy=strategy) + assert_series_equal( + ds.fit(toy_answers_df, initial_error=init_error_df).labels_.sort_index(), # type: ignore + toy_ground_truth_df.sort_index(), + ) + + @pytest.mark.parametrize( + "n_iter, tol, strategy", [(10, 0, "assign"), (100500, 1e-5, "assign")] + ) + def test_assign_consistency_on_toy_ysda( + self, + n_iter: int, + tol: float, + strategy: Literal["assign", "addition"], + toy_answers_df: pd.DataFrame, + toy_ground_truth_df: "pd.Series[Any]", + toy_worker_init_error_zero_df: "pd.Series[Any]", + ) -> None: + """ + Behavior test: when worker's init error matrix is similar to the error matrix in these tasks, + the aggregation result should be the same as before(without init error) + """ + # step1: get the original estimated error matrix + np.random.seed(42) + ds = DawidSkene(n_iter=n_iter, tol=tol, initial_error_strategy=None) + assert_series_equal( + ds.fit(toy_answers_df, initial_error=None).labels_.sort_index(), # type: ignore + toy_ground_truth_df.sort_index(), + ) + original_error_df = ds.errors_ + + # step2: use the original_error_df as initial_error to fit the model + init_error_df = original_error_df + ds = DawidSkene(n_iter=n_iter, tol=tol, initial_error_strategy=strategy) + # step3: check the result, which should be the same as the original one + assert_series_equal( + ds.fit(toy_answers_df, initial_error=init_error_df).labels_.sort_index(), # type: ignore + toy_ground_truth_df.sort_index(), + ) + + @pytest.mark.parametrize( + "n_iter, tol, strategy", [(10, 0, "addition"), (100500, 1e-5, "addition")] + ) + def test_addition_desired_label_on_toy_ysda( + self, + n_iter: int, + tol: float, + strategy: Literal["assign", "addition"], + toy_answers_df: pd.DataFrame, + toy_ground_truth_df: "pd.Series[Any]", + toy_worker_init_error_zero_df: "pd.Series[Any]", + ) -> None: + """ + Behavior test: dedicate init error matrices should lead to desired results + """ + np.random.seed(42) + # worker's annotation on t2: w1: yes, w2: yes, w3: yes, w4: no, w5: no + # ground truth: t2: yes + + # When we set workers' init error matrices as fellow, we should get the desired result + # In these case, we want the t2's label to be no rather than yes + init_error_df = toy_worker_init_error_zero_df + item_indexes = [ + [("w1", "yes"), "no"], + [("w2", "yes"), "no"], + [("w3", "yes"), "no"], + [("w4", "no"), "no"], + [("w5", "no"), "no"], + ] + for loc in item_indexes: + init_error_df.loc[loc[0], loc[1]] = 99 # type: ignore + + ds = DawidSkene(n_iter=n_iter, tol=tol, initial_error_strategy=strategy) + ds = ds.fit(toy_answers_df, initial_error=init_error_df) # type: ignore + assert ds.labels_["t2"] == "no" # type: ignore + + @pytest.mark.parametrize( + "n_iter, tol, strategy", [(10, 0, "assign"), (100500, 1e-5, "assign")] + ) + def test_assign_desired_label_on_toy_ysda( + self, + n_iter: int, + tol: float, + strategy: Literal["assign", "addition"], + toy_answers_df: pd.DataFrame, + toy_ground_truth_df: "pd.Series[Any]", + toy_worker_init_error_zero_df: "pd.Series[Any]", + ) -> None: + """ + Behavior test: dedicate init error matrices should lead to desired results + """ + np.random.seed(42) + # worker's annotation on t2: w1: yes, w2: yes, w3: yes, w4: no, w5: no + # ground truth: t2: yes + + # When we set workers' init error matrices as fellow, we should get the desired result + # In this case, we want the t2's label to be no rather than yes + # init all probability with 0.5 + init_error_df = toy_worker_init_error_zero_df + init_error_df.loc[:, :] = 0.5 + # set dedicated probability + item_indexes = [ + [("w1", "yes"), ("w1", "no")], + [("w2", "yes"), ("w2", "no")], + [("w3", "yes"), ("w3", "no")], + [("w4", "no"), ("w4", "yes")], + [("w5", "no"), ("w5", "yes")], + ] + for loc in item_indexes: + init_error_df.loc[loc[0], "no"] = 0.99 + init_error_df.loc[loc[1], "no"] = 0.01 + + ds = DawidSkene(n_iter=n_iter, tol=tol, initial_error_strategy=strategy) + ds = ds.fit(toy_answers_df, initial_error=init_error_df) # type: ignore + assert ds.labels_["t2"] == "no" # type: ignore + + def test_addition_inner_state_on_toy_ysda( + self, + toy_answers_df: pd.DataFrame, + toy_ground_truth_df: "pd.Series[Any]", + toy_worker_init_error_zero_df: "pd.Series[Any]", + ) -> None: + """ + Inner state test. + + Without init error, w2's error matrix(without avg) in iter 0 is as fellows: + + no yes + label + yes 0.9 2.1 + no 1.4 0.6 + + After fitting with init error, w2's error matrix(without avg) in iter 0 should be: + + no yes + label + yes 2 3 + no 2 1 + + After the avg of error matrix: + no yes + label + yes 0.5 0.75 + no 0.5 0.25 + """ + np.random.seed(42) + init_error_df = toy_worker_init_error_zero_df + init_error_df.loc[("w2", "yes"), "no"] = 1.1 # 1.1 + 0.9 = 2 + init_error_df.loc[("w2", "yes"), "yes"] = 0.9 # 0.9 + 2.1 = 3 + init_error_df.loc[("w2", "no"), "yes"] = 0.4 # 0.4 + 0.6 = 1 + init_error_df.loc[("w2", "no"), "no"] = 0.6 # 0.6 + 1.4 = 2 + + # fit with init error + with_init_errors = DawidSkene( + n_iter=0, + tol=0.0, + initial_error_strategy="addition", + ).fit( + toy_answers_df, initial_error=init_error_df # type: ignore + ) + + # check w2 error matrix + item_probs = [ + ("yes", "no", 0.5), + ("yes", "yes", 0.75), + ("no", "yes", 0.25), + ("no", "no", 0.5), + ] + for observed, label, prob in item_probs: + assert np.isclose(with_init_errors.errors_.loc["w2"].loc[observed, label], prob) # type: ignore + + def test_assign_inner_state_on_toy_ysda( + self, + toy_answers_df: pd.DataFrame, + toy_ground_truth_df: "pd.Series[Any]", + toy_worker_init_error_zero_df: "pd.Series[Any]", + ) -> None: + """ + Inner state test. + """ + np.random.seed(42) + # generate random init error matrix + init_error_df = toy_worker_init_error_zero_df + init_error_df.loc[:, :] = np.random.randint(1, 100, size=init_error_df.shape) + init_error_df = ( + init_error_df / init_error_df.groupby("worker", sort=False).sum() + ) + # fit with init error + ds = DawidSkene( + n_iter=0, + tol=0.0, + initial_error_strategy="assign", + ).fit( + toy_answers_df, initial_error=init_error_df # type: ignore + ) + + # check w2 error matrix + assert_frame_equal(init_error_df, ds.errors_) # type: ignore + + @pytest.mark.parametrize("n_iter, tol", [(10, 0), (100500, 1e-5)]) def test_aggregate_ds_gold_on_toy_ysda( n_iter: int, diff --git a/tests/conftest.py b/tests/conftest.py index 898c3bfe..6fc7bbf5 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -67,6 +67,23 @@ def toy_gold_df() -> "pd.Series[Any]": ) +@pytest.fixture +def toy_worker_init_error_zero_df() -> pd.DataFrame: + """ + The initial error matrix for toy dataset, which is filled with zeros. + """ + worker_labels = [["w1", "w2", "w3", "w4", "w5"], ["yes", "no"]] + worker_label_index = pd.MultiIndex.from_product( + worker_labels, names=["worker", "label"] + ) + init_error_df = pd.DataFrame( + np.zeros((len(worker_label_index), 2)), + index=worker_label_index, + columns=["no", "yes"], + ) + return init_error_df + + # Simple dataset that imitates real toloka answers