diff --git a/CHANGELOG.md b/CHANGELOG.md index 47f33f4e..97c4914f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ **Under development** +- feat: functionality to make use of INSEE population projection data +- update: don't remove households with people not living/studying in Île-de-France anymore to be more consistent with other use cases +- fix bug where always one household_id existed twice - Fix read order when exploring files using `glob` - Modes are only written now to `trips.csv` if `mode_choice` is activated - Update to `eqasim-java` commit `7cbe85b` diff --git a/data/census/cleaned.py b/data/census/cleaned.py index bc76889e..79e6c1cd 100644 --- a/data/census/cleaned.py +++ b/data/census/cleaned.py @@ -26,7 +26,7 @@ def execute(context): # Fill up undefined household ids (those where NUMMI == Z) f = np.isnan(df["household_id"]) - df.loc[f, "household_id"] = np.arange(np.count_nonzero(f)) + df["household_id"].max() + df.loc[f, "household_id"] = np.arange(np.count_nonzero(f)) + df["household_id"].max() + 1 df["household_id"] = df["household_id"].astype(int) # Put person IDs @@ -48,17 +48,6 @@ def execute(context): df.loc[f_undefined, "iris_id"] = "undefined" df["iris_id"] = df["iris_id"].astype("category") - # Verify with requested codes - df_codes = context.stage("data.spatial.codes") - - excess_communes = set(df["commune_id"].unique()) - set(df_codes["commune_id"].unique()) - if not excess_communes == {"undefined"}: - raise RuntimeError("Found additional communes: %s" % excess_communes) - - excess_iris = set(df["iris_id"].unique()) - set(df_codes["iris_id"].unique()) - if not excess_iris == {"undefined"}: - raise RuntimeError("Found additional IRIS: %s" % excess_iris) - # Age df["age"] = df["AGED"].apply(lambda x: "0" if x == "000" else x.lstrip("0")).astype(int) @@ -104,10 +93,6 @@ def execute(context): # Socioprofessional category df["socioprofessional_class"] = df["CS1"].astype(int) - # Place of work or education - df["work_outside_region"] = df["ILT"].isin(("4", "5", "6")) - df["education_outside_region"] = df["ILETUD"].isin(("4", "5", "6")) - # Consumption units df = pd.merge(df, hts.calculate_consumption_units(df), on = "household_id") @@ -117,6 +102,5 @@ def execute(context): "age", "sex", "couple", "commute_mode", "employed", "studies", "number_of_vehicles", "household_size", - "work_outside_region", "education_outside_region", "consumption_units", "socioprofessional_class" ]] diff --git a/data/census/filtered.py b/data/census/filtered.py index 72c33e1b..ecd3bdcd 100644 --- a/data/census/filtered.py +++ b/data/census/filtered.py @@ -14,38 +14,18 @@ def configure(context): def execute(context): df = context.stage("data.census.cleaned") - # We remove people who study or work in another region - f = df["work_outside_region"] | df["education_outside_region"] - remove_ids = df[f]["household_id"].unique() - - initial_households = len(df["household_id"].unique()) - removed_households = len(remove_ids) - - initial_persons = len(df["person_id"].unique()) - removed_persons = np.count_nonzero(df["household_id"].isin(remove_ids)) - - # TODO: This filtering is not really compatible with defining multiple regions - # or departments. This used to be a filter to avoid people going outside of - # Île-de-France, but we should consider removing this filter altogether, or - # find some smarter way (e.g. using OD matrices and filter out people in - # each municipality by the share of outside workers). + # Filter requested codes df_codes = context.stage("data.spatial.codes") - if len(df_codes["region_id"].unique()) > 1: - raise RuntimeError(""" - Multiple regions are defined, so the filtering for people going outside - of Île-de-France does not make sense in that case. Consider adjusting the - data.census.filtered stage! - """) + requested_departements = df_codes["departement_id"].unique() + df = df[df["departement_id"].isin(requested_departements)] - print( - "Removing %d/%d (%.2f%%) households (with %d/%d persons, %.2f%%) because at least one person is working outside of Île-de-France" % ( - removed_households, initial_households, 100 * removed_households / initial_households, - removed_persons, initial_persons, 100 * removed_persons / initial_persons - )) + excess_communes = set(df["commune_id"].unique()) - set(df_codes["commune_id"].unique()) + if not excess_communes == {"undefined"}: + raise RuntimeError("Found additional communes: %s" % excess_communes) - context.set_info("filtered_households_share", removed_households / initial_households) - context.set_info("filtered_persons_share", removed_persons / initial_persons) + excess_iris = set(df["iris_id"].unique()) - set(df_codes["iris_id"].unique()) + if not excess_iris == {"undefined"}: + raise RuntimeError("Found additional IRIS: %s" % excess_iris) - df = df[~df["household_id"].isin(remove_ids)] return df diff --git a/data/census/projection.py b/data/census/projection.py new file mode 100644 index 00000000..dc9a8f9f --- /dev/null +++ b/data/census/projection.py @@ -0,0 +1,75 @@ +import pandas as pd +import os + +""" +This stage loads and cleans projection data about the French population. +""" + +def configure(context): + context.config("data_path") + context.config("projection_path", "projection_2021") + context.config("projection_scenario", "00_central") + context.config("projection_year", None) + +def execute(context): + source_path = "{}/{}/{}.xlsx".format( + context.config("data_path"), + context.config("projection_path"), + context.config("projection_scenario")) + + projection_year = int(context.config("projection_year")) + + df_all = pd.read_excel( + source_path, sheet_name = "population", skiprows = 1).iloc[:107] + + df_male = pd.read_excel( + source_path, sheet_name = "populationH", skiprows = 1).iloc[:107] + + df_female = pd.read_excel( + source_path, sheet_name = "populationF", skiprows = 1).iloc[:107] + + df_male["sex"] = "male" + df_female["sex"] = "female" + + assert df_all["Âge au 1er janvier"].iloc[-1] == "Total" + assert df_male["Âge au 1er janvier"].iloc[-1] == "Total des hommes" + assert df_female["Âge au 1er janvier"].iloc[-1] == "Total des femmes" + + df_sex = pd.concat([ + df_male.iloc[-1:], + df_female.iloc[-1:] + ]).drop(columns = ["Âge au 1er janvier"])[["sex", projection_year]] + df_sex.columns = ["sex", "projection"] + + df_age = df_all[["Âge au 1er janvier", projection_year]].iloc[:-1] + df_age.columns = ["age", "projection"] + + df_male = df_male[["Âge au 1er janvier", "sex", projection_year]].iloc[:-1] + df_female = df_female[["Âge au 1er janvier", "sex", projection_year]].iloc[:-1] + + df_male.columns = ["age", "sex", "projection"] + df_female.columns = ["age","sex", "projection"] + + df_cross = pd.concat([df_male, df_female]) + df_cross["sex"] = df_cross["sex"].astype("category") + + df_total = df_all.iloc[-1:].drop(columns = ["Âge au 1er janvier"])[[projection_year]] + df_total.columns = ["projection"] + + return { + "total": df_total, "sex": df_sex, "age": df_age, "cross": df_cross + } + +def validate(context): + if context.config("projection_year") is not None: + source_path = "{}/{}/{}.xlsx".format( + context.config("data_path"), + context.config("projection_path"), + context.config("projection_scenario")) + + if not os.path.exists(source_path): + raise RuntimeError("Projection data is not available") + + return os.path.getsize(source_path) + + return 0 diff --git a/data/census/raw.py b/data/census/raw.py index 724f889f..73eebd4a 100644 --- a/data/census/raw.py +++ b/data/census/raw.py @@ -13,6 +13,8 @@ def configure(context): context.config("census_path", "rp_2019/RP2019_INDCVI_csv.zip") context.config("census_csv", "FD_INDCVI_2019.csv") + context.config("projection_year", None) + COLUMNS_DTYPES = { "CANTVILLE":"str", "NUMMI":"str", @@ -20,9 +22,7 @@ def configure(context): "COUPLE":"str", "CS1":"str", "DEPT":"str", - "ETUD":"str", - "ILETUD":"str", - "ILT":"str", + "ETUD":"str", "IPONDI":"str", "IRIS":"str", "REGION":"str", @@ -39,6 +39,9 @@ def execute(context): requested_departements = df_codes["departement_id"].unique() + # only pre-filter if we don't need to reweight the census later + prefilter_departments = context.config("projection_year") is None + with context.progress(label = "Reading census ...") as progress: with zipfile.ZipFile( "{}/{}".format(context.config("data_path"), context.config("census_path"))) as archive: @@ -50,8 +53,9 @@ def execute(context): for df_chunk in csv: progress.update(len(df_chunk)) - - df_chunk = df_chunk[df_chunk["DEPT"].isin(requested_departements)] + + if prefilter_departments: + df_chunk = df_chunk[df_chunk["DEPT"].isin(requested_departements)] if len(df_chunk) > 0: df_records.append(df_chunk) diff --git a/docs/population.md b/docs/population.md index f93d5dd7..fc231e07 100644 --- a/docs/population.md +++ b/docs/population.md @@ -290,3 +290,26 @@ config: ``` Running the pipeline again will add the `mode` colum to the `trips.csv` file and its spatial equivalent. + +### Population projections + +The pipeline allows to make use of population projections from INSEE up to 2021. The same methodology can also be used to scale down the population. The process takes into account the marginal distribution of sex, age, their combination, and the total number of persons. The census data for the base year (see above) is reweighted according to those marginals using *Iterative Proportional Updating*. + +- To make use of the scaling, [download the projection data from INSEE](https://www.insee.fr/fr/statistiques/5894093?sommaire=5760764). There are various scenarios in Excel format that you can choose from. The default is the *Scénario centrale*, the central scenario. +- Put the downloaded file into `data/projection_2021`, so you will have the file `data/projection_2021/00_central.xlsx` + +Then, activate the projection procedure by defining the projection year in the configuration: + +```yaml +config: + # [...] + projection_year: 2030 +``` + +You may choose any year (past or future) that is contained in the projection scenario Excel file. In case you want to use a different scenario, download the corresponding file, put it into the folder mentioned above, and choose the scenario name via configuration: + +```yaml +config: + # [...] + projection_scenario: 00_central +``` diff --git a/synthesis/population/projection/ipu.py b/synthesis/population/projection/ipu.py new file mode 100644 index 00000000..9007225d --- /dev/null +++ b/synthesis/population/projection/ipu.py @@ -0,0 +1,131 @@ +import pandas as pd +import numpy as np + +""" +This stage reweights the census data set according to the projection data for a different year. +""" + +def configure(context): + context.stage("data.census.cleaned") + context.stage("data.census.projection") + +def execute(context): + df_census = context.stage("data.census.cleaned") + projection = context.stage("data.census.projection") + + # Prepare indexing + df_households = df_census[["household_id", "household_size", "weight"]].drop_duplicates("household_id") + df_households["household_index"] = np.arange(len(df_households)) + df_census = pd.merge(df_census, df_households[["household_id", "household_index"]]) + + # Obtain weights and sizes as arrays + household_weights = df_households["weight"].values + household_sizes = df_households["household_size"].values + + # Obtain the attribute levels and membership of attributes for all households + attributes = [] + + attribute_membership = [] + attribute_counts = [] + attribute_targets = [] + + # Proccesing age ... + df_marginal = projection["age"] + for index, row in context.progress(df_marginal.iterrows(), label = "Processing attribute: age", total = len(df_marginal)): + f = df_census["age"] == row["age"] + + if row["age"] == 0: + continue # we skip incompatible values for peopel of zero age + + if np.count_nonzero(f) == 0: + print("Did not find age:", row["age"]) + + else: + df_counts = df_census.loc[f, "household_index"].value_counts() + + attribute_targets.append(row["projection"]) + attribute_membership.append(df_counts.index.values) + attribute_counts.append(df_counts.values) + attributes.append("age={}".format(row["age"])) + + # Processing sex ... + df_marginal = projection["sex"] + for index, row in context.progress(df_marginal.iterrows(), label = "Processing attribute: sex", total = len(df_marginal)): + f = df_census["sex"] == row["sex"] + + if np.count_nonzero(f) == 0: + print("Did not find sex:", row["sex"]) + + else: + df_counts = df_census.loc[f, "household_index"].value_counts() + + attribute_targets.append(row["projection"]) + attribute_membership.append(df_counts.index.values) + attribute_counts.append(df_counts.values) + attributes.append("sex={}".format(row["sex"])) + + # Processing age x sex ... + df_marginal = projection["cross"] + for index, row in context.progress(df_marginal.iterrows(), label = "Processing attributes: sex x age", total = len(df_marginal)): + f = (df_census["sex"] == row["sex"]) & (df_census["age"] == row["age"]) + + if row["age"] == 0: + continue + + if np.count_nonzero(f) == 0: + print("Did not find values:", row["sex"], row["age"]) + + else: + df_counts = df_census.loc[f, "household_index"].value_counts() + + attribute_targets.append(row["projection"]) + attribute_membership.append(df_counts.index.values) + attribute_counts.append(df_counts.values) + attributes.append("sex={},age={}".format(row["sex"], row["age"])) + + # Processing total ... + attribute_targets.append(projection["total"]["projection"].values[0]) + attribute_membership.append(np.arange(len(household_sizes))) + attribute_counts.append(household_sizes) + attributes.append("total") + + # Perform IPU to obtain update weights + update = np.ones((len(df_households),)) + + minimum_factors = [] + maximum_factors = [] + + for iteration in context.progress(range(100), label = "Performing IPU"): + factors = [] + for k in np.arange(len(attributes)): + selection = attribute_membership[k] + + target = attribute_targets[k] + current = np.sum(update[selection] * household_weights[selection] * attribute_counts[k]) + + factor = target / current + factors.append(factor) + + update[selection] *= factor + + minimum_factors.append(np.min(factors)) + maximum_factors.append(np.max(factors)) + + if np.max(factors) - np.min(factors) < 1e-3: + break + + criterion = np.max(factors) - np.min(factors) + + # Check that the applied factors in the last iteration are sufficiently small + assert criterion > 0.01 + + # For a sanity check, we check for the obtained distribution in 2019, but this + # may evolve in the future. + assert np.quantile(update, 0.1) > 0.35 + assert np.quantile(update, 0.8) < 2.0 + assert np.quantile(update, 0.9) < 2.5 + + # Update the weights + df_households["weight"] *= update + + return df_households[["household_id", "weight"]] diff --git a/synthesis/population/projection/reweighted.py b/synthesis/population/projection/reweighted.py new file mode 100644 index 00000000..9863e6a3 --- /dev/null +++ b/synthesis/population/projection/reweighted.py @@ -0,0 +1,24 @@ +import pandas as pd +import numpy as np + +""" +This stage reweights the census data set according to the projection data for a different year. +""" + +def configure(context): + context.stage("data.census.filtered") + context.stage("synthesis.population.projection.ipu") + +def execute(context): + df_census = context.stage("data.census.filtered") + df_weights = context.stage("synthesis.population.projection.ipu") + + initial_size = len(df_census) + + df_census = df_census.drop(columns = "weight") + df_census = pd.merge(df_census, df_weights, on = "household_id") + + final_size = len(df_census) + assert initial_size == final_size + + return df_census diff --git a/synthesis/population/sampled.py b/synthesis/population/sampled.py index 32f61e96..c4a33592 100644 --- a/synthesis/population/sampled.py +++ b/synthesis/population/sampled.py @@ -9,13 +9,16 @@ """ def configure(context): - context.stage("data.census.filtered") + if context.config("projection_year", None) is None: + context.stage("data.census.filtered", alias = "source") + else: + context.stage("synthesis.population.projection.reweighted", alias = "source") context.config("random_seed") context.config("sampling_rate") def execute(context): - df_census = context.stage("data.census.filtered").sort_values(by = "household_id").copy() + df_census = context.stage("source").sort_values(by = "household_id").copy() sampling_rate = context.config("sampling_rate") random = np.random.RandomState(context.config("random_seed")) diff --git a/tests/test_determinism.py b/tests/test_determinism.py index 75b9b94c..ba02a407 100644 --- a/tests/test_determinism.py +++ b/tests/test_determinism.py @@ -64,17 +64,17 @@ def _test_determinism(index, data_path, tmpdir): synpp.run(stages, config, working_directory = cache_path) REFERENCE_CSV_HASHES = { - "ile_de_france_activities.csv": "dcf8e08e9f238c90bff0298048251dac", - "ile_de_france_households.csv": "fa08f930689b27f9772c79d35075960d", - "ile_de_france_persons.csv": "ed87e2b6dfd2a9914d5fc7b2bf6d52d3", - "ile_de_france_trips.csv": "63425b21b452b65418db6f6d987a0162", + "ile_de_france_activities.csv": "e520003e1876a9542ff1a955a6efcfdc", + "ile_de_france_households.csv": "709ce7ded8a2487e6691d4fb3374754b", + "ile_de_france_persons.csv": "ddbe9b418c915b14e888b54efbdf9b1e", + "ile_de_france_trips.csv": "6c5f3427e41e683da768eeb53796a806", } REFERENCE_GPKG_HASHES = { - "ile_de_france_activities.gpkg": "f9e519cb5665c314431bcd16bbb8b1b8", - "ile_de_france_commutes.gpkg": "2e752795b7cd8e0cd4c8d32e736e455e", - "ile_de_france_homes.gpkg": "6f028d84944df9c4ae9342a47a932074", - "ile_de_france_trips.gpkg": "c5fdcff9416563823dd824c2a8ea85bd", + "ile_de_france_activities.gpkg": "4ef01c82b09e81e54cc6b7d59145123e", + "ile_de_france_commutes.gpkg": "a03554ee745e9a47a6b1ea4126a13c2a", + "ile_de_france_homes.gpkg": "3533f4756e3ee126618bb17f059033bd", + "ile_de_france_trips.gpkg": "982b83f27e231766d04b3f9351d84daa", } generated_csv_hashes = { @@ -123,7 +123,7 @@ def _test_determinism_matsim(index, data_path, tmpdir): REFERENCE_HASHES = { #"ile_de_france_population.xml.gz": "e1407f918cb92166ebf46ad769d8d085", #"ile_de_france_network.xml.gz": "5f10ec295b49d2bb768451c812955794", - "ile_de_france_households.xml.gz": "cdbd6ed5b175328861f237dc58dee1ff", + "ile_de_france_households.xml.gz": "64a0c9fab72aad51bc6adb926a1c9d44", #"ile_de_france_facilities.xml.gz": "5ad41afff9ae5c470082510b943e6778", "ile_de_france_config.xml": "f374807f12a5151fe1efb6e9904e1a56" }