diff --git a/policyengine/economic_impact/inequality_impact/__init__.py b/policyengine/economic_impact/inequality_impact/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/policyengine/economic_impact/inequality_impact/inequality_impact.py b/policyengine/economic_impact/inequality_impact/inequality_impact.py index cd1a593..d47ab34 100644 --- a/policyengine/economic_impact/inequality_impact/inequality_impact.py +++ b/policyengine/economic_impact/inequality_impact/inequality_impact.py @@ -1,4 +1,5 @@ -from typing import Union +from policyengine_core.reforms import Reform +from typing import Dict, Union class BaseMetricCalculator: """ @@ -8,8 +9,7 @@ class BaseMetricCalculator: baseline (object): Object representing baseline data. reformed (object): Object representing reformed data. """ - - def __init__(self, baseline: object, reformed: object) -> None: + def __init__(self, baseline: object, reformed: object, default_period: int = 2024) -> None: """ Initialize with baseline and reformed data objects. @@ -19,59 +19,74 @@ def __init__(self, baseline: object, reformed: object) -> None: """ self.baseline = baseline self.reformed = reformed + self.default_period = default_period + self.baseline.default_calculation_period = default_period + self.reformed.default_calculation_period = default_period - def calculate_baseline(self, variable: str, period: int = 2024, map_to: str = "person") -> Union[float, int]: + def calculate_baseline(self, variable: str, period: int = None) -> Union[float, int]: """ Calculate baseline metric value. Args: variable (str): Variable to calculate. period (int): Year or period for calculation (default: 2024). - map_to (str): Mapping type (default: "person"). Returns: Union[float, int]: Calculated metric value. """ - return self.baseline.calculate(variable, period=period, map_to=map_to) + if period is None: + period = self.default_period + return self.baseline.calculate(variable, period=period) - def calculate_reformed(self, variable: str, period: int = 2024, map_to: str = "person") -> Union[float, int]: + def calculate_reformed(self, variable: str, period: int = None) -> Union[float, int]: """ Calculate reformed metric value. Args: variable (str): Variable to calculate. period (int): Year or period for calculation (default: 2024). - map_to (str): Mapping type (default: "person"). Returns: Union[float, int]: Calculated metric value. """ - return self.reformed.calculate(variable, period=period, map_to=map_to) + if period is None: + period = self.default_period + return self.reformed.calculate(variable, period=period) class GiniCalculator(BaseMetricCalculator): """ Calculate Gini coefficient metrics based on baseline and reformed data. Inherits from BaseMetricCalculator. """ - def calculate(self) -> dict: - """ - Calculate Gini coefficient metrics. + baseline_personal_hh_equiv_income = self.calculate_baseline("equiv_household_net_income") + baseline_household_count_people = self.calculate_baseline("household_count_people") + baseline_personal_hh_equiv_income.weights *= baseline_household_count_people - Returns: - dict: Dictionary containing "baseline", "reform", and "change" values. - """ - baseline_person = self.calculate_baseline("household_net_income") - reformed_person = self.calculate_reformed("household_net_income") + reformed_personal_hh_equiv_income = self.calculate_reformed("equiv_household_net_income") + reformed_household_count_people = self.calculate_reformed("household_count_people") + reformed_personal_hh_equiv_income.weights *= reformed_household_count_people + + try: + baseline_value = baseline_personal_hh_equiv_income.gini() + except: + print("WARNING: Baseline Gini index calculations resulted in an error: returning 0.4, but this is inaccurate.") + baseline_value = 0.4 - baseline_value = baseline_person.gini() - reformed_value = reformed_person.gini() + try: + reformed_value = reformed_personal_hh_equiv_income.gini() + except: + print("WARNING: Reformed Gini index calculations resulted in an error: returning 0.4, but this is inaccurate.") + reformed_value = 0.4 + change_value = reformed_value - baseline_value + change_perc = (change_value / baseline_value) * 100 return { - "baseline": baseline_value, - "reform": reformed_value, - "change": change_value + "baseline": round(baseline_value,2), + "reform": round(reformed_value,2), + "change": round(change_value,2), + "change_percentage": round(change_perc,2) } class Top1PctShareCalculator(BaseMetricCalculator): @@ -79,25 +94,36 @@ class Top1PctShareCalculator(BaseMetricCalculator): Calculate top 1% income share metrics based on baseline and reformed data. Inherits from BaseMetricCalculator. """ - def calculate(self) -> dict: - """ - Calculate top 1% income share metrics. + baseline_personal_hh_equiv_income = self.calculate_baseline("equiv_household_net_income") + baseline_household_count_people = self.calculate_baseline("household_count_people") + baseline_personal_hh_equiv_income.weights *= baseline_household_count_people + in_top_1_pct = baseline_personal_hh_equiv_income.percentile_rank() == 100 + baseline_personal_hh_equiv_income.weights /= baseline_household_count_people + baseline_top_1_pct_share = ( + baseline_personal_hh_equiv_income[in_top_1_pct].sum() + / baseline_personal_hh_equiv_income.sum() + ) - Returns: - dict: Dictionary containing "baseline", "reform", and "change" values. - """ - baseline_person = self.calculate_baseline("household_net_income") - reformed_person = self.calculate_reformed("household_net_income") + reformed_personal_hh_equiv_income = self.calculate_reformed("equiv_household_net_income") + reformed_household_count_people = self.calculate_reformed("household_count_people") + reformed_personal_hh_equiv_income.weights *= reformed_household_count_people + in_top_1_pct = reformed_personal_hh_equiv_income.percentile_rank() == 100 + reformed_personal_hh_equiv_income.weights /= reformed_household_count_people + reformed_top_1_pct_share = ( + reformed_personal_hh_equiv_income[in_top_1_pct].sum() + / reformed_personal_hh_equiv_income.sum() + ) - baseline_value = baseline_person.top_1_pct_share() - reformed_value = reformed_person.top_1_pct_share() - change_value = reformed_value - baseline_value + change_value = reformed_top_1_pct_share - baseline_top_1_pct_share + change_perc = (change_value / baseline_top_1_pct_share) * 100 + return { - "baseline": baseline_value, - "reform": reformed_value, - "change": change_value + "baseline": round(baseline_top_1_pct_share,2), + "reform": round(reformed_top_1_pct_share,2), + "change": round(change_value,2), + "change_percentage": round(change_perc,2) } class Top10PctShareCalculator(BaseMetricCalculator): @@ -105,23 +131,34 @@ class Top10PctShareCalculator(BaseMetricCalculator): Calculate top 10% income share metrics based on baseline and reformed data. Inherits from BaseMetricCalculator. """ - def calculate(self) -> dict: - """ - Calculate top 10% income share metrics. - - Returns: - dict: Dictionary containing "baseline", "reform", and "change" values. - """ - baseline_person = self.calculate_baseline("household_net_income") - reformed_person = self.calculate_reformed("household_net_income") - baseline_value = baseline_person.top_10_pct_share() - reformed_value = reformed_person.top_10_pct_share() - change_value = reformed_value - baseline_value + baseline_personal_hh_equiv_income = self.calculate_baseline("equiv_household_net_income") + baseline_household_count_people = self.calculate_baseline("household_count_people") + baseline_personal_hh_equiv_income.weights *= baseline_household_count_people + in_top_10_pct = baseline_personal_hh_equiv_income.decile_rank() == 10 + baseline_personal_hh_equiv_income.weights /= baseline_household_count_people + baseline_top_10_pct_share = ( + baseline_personal_hh_equiv_income[in_top_10_pct].sum() + / baseline_personal_hh_equiv_income.sum() + ) + reformed_personal_hh_equiv_income = self.calculate_reformed("equiv_household_net_income") + reformed_household_count_people = self.calculate_reformed("household_count_people") + reformed_personal_hh_equiv_income.weights *= reformed_household_count_people + in_top_10_pct = reformed_personal_hh_equiv_income.decile_rank() == 10 + reformed_personal_hh_equiv_income.weights /= reformed_household_count_people + reformed_top_10_pct_share = ( + reformed_personal_hh_equiv_income[in_top_10_pct].sum() + / reformed_personal_hh_equiv_income.sum() + ) + + change_value = reformed_top_10_pct_share - baseline_top_10_pct_share + change_perc = (change_value / baseline_top_10_pct_share) * 100 + return { - "baseline": baseline_value, - "reform": reformed_value, - "change": change_value + "baseline": round(baseline_top_10_pct_share,2), + "reform": round(reformed_top_10_pct_share,2), + "change": round(change_value,2), + "change_percentage": round(change_perc,2) } diff --git a/policyengine/tests/economic_impact/inequality_impact.py/inequality_impact.py b/policyengine/tests/economic_impact/inequality_impact.py/inequality_impact.py new file mode 100644 index 0000000..32a0aec --- /dev/null +++ b/policyengine/tests/economic_impact/inequality_impact.py/inequality_impact.py @@ -0,0 +1,39 @@ +import pytest +import yaml +import os +from policyengine import EconomicImpact + +def assert_dict_approx_equal(actual, expected, tolerance=1e-4): + for key in expected: + assert abs(actual[key] - expected[key]) < tolerance, f"Key {key}: expected {expected[key]}, got {actual[key]}" + +# Specify the path to your YAML file +yaml_file_path = "/home/tahseer/Desktop/NewFolder/policyengine.py/policyengine/tests/economic_impact/inequality_impact.py/inequality_impact.yaml" + +# Check if the file exists +if not os.path.exists(yaml_file_path): + raise FileNotFoundError(f"The YAML file does not exist at: {yaml_file_path}") + +with open(yaml_file_path, 'r') as file: + test_cases = yaml.safe_load(file) + +@pytest.mark.parametrize("test_case", test_cases) +def test_economic_impact(test_case): + test_name = list(test_case.keys())[0] + test_data = test_case[test_name] + + economic_impact = EconomicImpact(test_data['reform'], test_data['country']) + + if 'gini' in test_name: + result = economic_impact.calculate("inequality/gini") + elif 'top_1_pct' in test_name: + result = economic_impact.calculate("inequality/top_1_pct_share") + elif 'top_10_pct' in test_name: + result = economic_impact.calculate("inequality/top_10_pct_share") + else: + pytest.fail(f"Unknown test case: {test_name}") + + assert_dict_approx_equal(result, test_data['expected']) + +if __name__ == "__main__": + pytest.main([__file__]) diff --git a/policyengine/tests/economic_impact/inequality_impact.py/inequality_impact.yaml b/policyengine/tests/economic_impact/inequality_impact.py/inequality_impact.yaml new file mode 100644 index 0000000..2ca66a7 --- /dev/null +++ b/policyengine/tests/economic_impact/inequality_impact.py/inequality_impact.yaml @@ -0,0 +1,33 @@ +# economic_impact_tests.yaml +- test_gini_calculator: + reform: + gov.hmrc.income_tax.rates.uk[0].rate: + "2024-01-01.2100-12-31": 0.55 + country: uk + expected: + baseline: 0.42 + reform: 0.40 + change: -0.01 + change_percentage: -2.80 + +- test_top_1_pct_share_calculator: + reform: + gov.hmrc.income_tax.rates.uk[0].rate: + "2024-01-01.2100-12-31": 0.55 + country: uk + expected: + baseline: 0.12 + reform: 0.14 + change: 0.02 + change_percentage: 12.76 + +- test_top_10_pct_share_calculator: + reform: + gov.hmrc.income_tax.rates.uk[0].rate: + "2024-01-01.2100-12-31": 0.55 + country: uk + expected: + baseline: 0.35 + reform: 0.36 + change: 0.01 + change_percentage: 3.11 \ No newline at end of file