Skip to content

Commit

Permalink
feat(ds): add init_error for worker init error matrix (#114)
Browse files Browse the repository at this point in the history
* feat(ds): add init_error for worker init error matrix

* add initial_error in m_step

* add: tests

* fix: mypy error

* fix: test cases

* docs: add examples and notes about  in docs

* add strategy for initial error matrix apply

* fix: make m_step function as static method

* tests: add more tests for assign strategy

* add more docs & examples

* fix: tests mypy error

* fix: format
  • Loading branch information
shenxiangzhuang authored Aug 9, 2024
1 parent bfd6dfe commit e67d3a7
Show file tree
Hide file tree
Showing 3 changed files with 504 additions and 11 deletions.
172 changes: 162 additions & 10 deletions crowdkit/aggregation/classification/dawid_skene.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
__all__ = ["DawidSkene", "OneCoinDawidSkene"]

from typing import Any, List, Optional, cast
from typing import Any, List, Literal, Optional, cast

import attr
import numpy as np
Expand Down Expand Up @@ -55,6 +55,47 @@ class DawidSkene(BaseClassificationAggregator):
>>> true_labels = gt[:1000] # use the first 100 true labels
>>> ds = DawidSkene(100)
>>> result = ds.fit_predict(df, true_labels)
We can also provide the workers' initial error matrices, which come from historical performance data.
There two strategies to initialize the workers' error matrices: `assign` and `addition`.
Here we create simple error matrices with two workers:
```
0 1
worker label
w851 0 9 1
1 1 9
w6991 0 9 1
1 1 9
```
Note:
1. Make sure the error matrix is indexed by `worker` and `label`
with columns for every `label_id` appeared in `data`.
You can use the `pandas.MultiIndex` to create such an index, see the example below.
2. When using `addition` strategy, the error matrix should contain the history **count**(not probability) that `worker` produces `observed_label`,
given that the task true label is `true_label`.
When we use the `addition` strategy, partial error matrices are acceptable,
which will be added to the workers' priori error matrices estimated from the given data.
Examples:
>>> import pandas as pd
>>> from crowdkit.aggregation import DawidSkene
>>> from crowdkit.datasets import load_dataset
>>> df, gt = load_dataset('relevance-2')
>>> error_matrix_index = pd.MultiIndex.from_arrays([['w851', 'w851', 'w6991', 'w6991'], [0, 1, 0, 1]], names=['worker', 'label'])
>>> initial_error = pd.DataFrame(
... data=[[9, 1], [1, 9], [9, 1], [1, 9]],
... index=error_matrix_index,
... columns=[0, 1],
... )
>>> ds = DawidSkene(100, initial_error_strategy='addition')
>>> result = ds.fit_predict(df, initial_error=initial_error)
We can also use the `assign` strategy to initialize the workers' error matrices.
But in this case, the `initial_error` **must** contain all the workers' error matrices in the data.
"""

n_iter: int = attr.ib(default=100)
Expand All @@ -64,6 +105,44 @@ class DawidSkene(BaseClassificationAggregator):
"""The tolerance stopping criterion for iterative methods with a variable number of steps.
The algorithm converges when the loss change is less than the `tol` parameter."""

initial_error_strategy: Optional[Literal["assign", "addition"]] = attr.ib(
default=None
)
"""The strategy for initializing the workers' error matrices.
The `assign` strategy assigns the initial error matrix to the workers' error matrices;
the `addition` strategy adds the initial error matrix with the workers' priori error matrices estimated
from the given data. If `None`, the initial error matrix is not used.
Note:
- `addition` strategy
- The initial error matrix can be partial, not all workers' error matrices need to be provided.
- The initial error matrix values should be the history **count** that
`worker` produces `observed_label`, given that the task true label is `true_label`.
For example(count values error matrix):
0 1
worker label
w851 0 9 1
1 1 9
w6991 0 9 1
1 1 9
- `assign` strategy
- The initial error matrix must contain all the workers' error matrices in the data.
- The initial error matrix values could be the probability or count that
`worker` produces `observed_label`, given that the task true label is `true_label`.
When given probability error matrix, the values should sum up to 1 for each worker at each `true_label` column.
For example(probability values error matrix):
0 1
worker label
w851 0 0.9 0.1
1 0.1 0.9
w6991 0 0.9 0.1
1 0.1 0.9
...
"""

probas_: Optional[pd.DataFrame] = attr.ib(init=False)
"""The probability distributions of task labels.
The `pandas.Series` data is indexed by `task` so that `labels.loc[task]` is the most likely true label of tasks."""
Expand All @@ -83,15 +162,22 @@ class DawidSkene(BaseClassificationAggregator):
""" A list of loss values during training."""

@staticmethod
def _m_step(data: pd.DataFrame, probas: pd.DataFrame) -> pd.DataFrame:
def _m_step(
data: pd.DataFrame,
probas: pd.DataFrame,
initial_error: Optional[pd.DataFrame] = None,
initial_error_strategy: Optional[Literal["assign", "addition"]] = None,
) -> pd.DataFrame:
"""Performs M-step of the Dawid-Skene algorithm.
Estimates the workers' error probability matrix using the specified workers' responses and the true task label probabilities.
"""
joined = data.join(probas, on="task")
joined.drop(columns=["task"], inplace=True)

errors = joined.groupby(["worker", "label"], sort=False).sum()
# Apply the initial error matrix
errors = initial_error_apply(errors, initial_error, initial_error_strategy)
# Normalize the error matrix
errors.clip(lower=_EPS, inplace=True)
errors /= errors.groupby("worker", sort=False).sum()

Expand Down Expand Up @@ -160,7 +246,10 @@ def _evidence_lower_bound(
return float(joint_expectation + entropy)

def fit(
self, data: pd.DataFrame, true_labels: Optional["pd.Series[Any]"] = None
self,
data: pd.DataFrame,
true_labels: Optional["pd.Series[Any]"] = None,
initial_error: Optional[pd.DataFrame] = None,
) -> "DawidSkene":
"""Fits the model to the training data with the EM algorithm.
Expand All @@ -171,7 +260,12 @@ def fit(
The `pandas.Series` data is indexed by `task` so that `labels.loc[task]` is the task ground truth label.
When provided, the model will correct the probability distributions of task labels by the true labels
during the iterative process.
initial_error (DataFrame): The workers' initial error matrices, comes from historical performance data.
The `pandas.DataFrame` data is indexed by `worker` and `label` with a column
for every `label_id` found in `data` so that `result.loc[worker, observed_label, true_label]` is the
history **count** that `worker` produces `observed_label`, given that the task true label is `true_label`.
When the `initial_error_strategy` is `assign`, the values in the error matrix can be the probability too.
Check the examples in the class docstring for more details.
Returns:
DawidSkene: self.
"""
Expand All @@ -192,7 +286,7 @@ def fit(
if true_labels is not None:
probas = self._correct_probas_with_golden(probas, true_labels)
priors = probas.mean()
errors = self._m_step(data, probas)
errors = self._m_step(data, probas, initial_error, self.initial_error_strategy)
loss = -np.inf
self.loss_history_ = []

Expand Down Expand Up @@ -225,7 +319,10 @@ def fit(
return self

def fit_predict_proba(
self, data: pd.DataFrame, true_labels: Optional["pd.Series[Any]"] = None
self,
data: pd.DataFrame,
true_labels: Optional["pd.Series[Any]"] = None,
initial_error: Optional[pd.DataFrame] = None,
) -> pd.DataFrame:
"""Fits the model to the training data and returns probability distributions of labels for each task.
Expand All @@ -236,19 +333,28 @@ def fit_predict_proba(
The `pandas.Series` data is indexed by `task` so that `labels.loc[task]` is the task ground truth label.
When provided, the model will correct the probability distributions of task labels by the true labels
during the iterative process.
initial_error (DataFrame): The workers' initial error matrices, comes from historical performance data.
The `pandas.DataFrame` data is indexed by `worker` and `label` with a column
for every `label_id` found in `data` so that `result.loc[worker, observed_label, true_label]` is the
history **count** that `worker` produces `observed_label`, given that the task true label is `true_label`.
When the `initial_error_strategy` is `assign`, the values in the error matrix can be the probability too.
Check the examples in the class docstring for more details.
Returns:
DataFrame: Probability distributions of task labels.
The `pandas.DataFrame` data is indexed by `task` so that `result.loc[task, label]` is the probability that the `task` true label is equal to `label`.
Each probability is in the range from 0 to 1, all task probabilities must sum up to 1.
"""

self.fit(data, true_labels)
self.fit(data, true_labels, initial_error)
assert self.probas_ is not None, "no probas_"
return self.probas_

def fit_predict(
self, data: pd.DataFrame, true_labels: Optional["pd.Series[Any]"] = None
self,
data: pd.DataFrame,
true_labels: Optional["pd.Series[Any]"] = None,
initial_error: Optional[pd.DataFrame] = None,
) -> "pd.Series[Any]":
"""Fits the model to the training data and returns the aggregated results.
Expand All @@ -259,12 +365,18 @@ def fit_predict(
The `pandas.Series` data is indexed by `task` so that `labels.loc[task]` is the task ground truth label.
When provided, the model will correct the probability distributions of task labels by the true labels
during the iterative process.
initial_error (DataFrame): The workers' initial error matrices, comes from historical performance data.
The `pandas.DataFrame` data is indexed by `worker` and `label` with a column
for every `label_id` found in `data` so that `result.loc[worker, observed_label, true_label]` is the
history **count** that `worker` produces `observed_label`, given that the task true label is `true_label`.
When the `initial_error_strategy` is `assign`, the values in the error matrix can be the probability too.
Check the examples in the class docstring for more details.
Returns:
Series: Task labels. The `pandas.Series` data is indexed by `task` so that `labels.loc[task]` is the most likely true label of tasks.
"""

self.fit(data, true_labels)
self.fit(data, true_labels, initial_error)
assert self.labels_ is not None, "no labels_"
return self.labels_

Expand Down Expand Up @@ -292,6 +404,46 @@ def _correct_probas_with_golden(
return corrected_probas


def initial_error_apply(
errors: pd.DataFrame,
initial_error: Optional[pd.DataFrame],
initial_error_strategy: Optional[Literal["assign", "addition"]],
) -> pd.DataFrame:
if initial_error_strategy is None or initial_error is None:
return errors
# check the index names of initial_error
if initial_error.index.names != errors.index.names:
raise ValueError(
f"The index of initial_error must be: {errors.index.names},"
f"but got: {initial_error.index.names}"
)
if initial_error_strategy == "assign":
# check the completeness of initial_error: all the workers in data should be in initial_error
mask = errors.index.isin(initial_error.index)
if not mask.all():
not_found_workers = errors.index[~mask].get_level_values("worker").unique()
raise ValueError(
f"All the workers in data should be in initial_error: "
f"Can not find {len(not_found_workers)} workers' error matrix in initial_error"
)
# if the values in initial_error are probability, check the sum of each worker's error matrix
if (initial_error <= 1.0).all().all() and not np.allclose(
initial_error.groupby("worker", sort=False).sum(), 1.0
):
raise ValueError(
"The sum of each worker's error matrix in initial_error should be 1.0"
)
errors = initial_error
elif initial_error_strategy == "addition":
errors = errors.add(initial_error, axis="index", fill_value=0.0)
else:
raise ValueError(
f"Invalid initial_error_strategy: {initial_error_strategy},"
f"should be 'assign' or 'addition'"
)
return errors


@attr.s
class OneCoinDawidSkene(DawidSkene):
r"""The **one-coin Dawid-Skene** aggregation model works exactly the same as the original Dawid-Skene model
Expand Down
Loading

0 comments on commit e67d3a7

Please sign in to comment.