From dad18f5fec5f6f2dc7da3f032666a59a3a3173c5 Mon Sep 17 00:00:00 2001 From: Nicolas Oudard Date: Tue, 21 Jan 2025 15:17:28 +0100 Subject: [PATCH 1/7] Revert "Conserver le mode iframe durant la navigation (#1215)" This reverts commit c23b8ca38dc582d55bd35445e8d5c4b22d9dba9a. --- core/context_processors.py | 2 +- qfdmd/views.py | 9 --------- .../to_compile/js/controllers/assistant/analytics.ts | 5 ----- templates/qfdmd/base.html | 10 ---------- 4 files changed, 1 insertion(+), 25 deletions(-) diff --git a/core/context_processors.py b/core/context_processors.py index ae3ec21cb..aea8943fe 100644 --- a/core/context_processors.py +++ b/core/context_processors.py @@ -22,8 +22,8 @@ def content(request): def assistant(request) -> dict: return { "assistant": { + "is_iframe": "iframe" in request.GET, "is_home": request.path == reverse("qfdmd:home"), - "is_iframe": request.session.get("iframe"), "POSTHOG_KEY": settings.ASSISTANT["POSTHOG_KEY"], "MATOMO_ID": settings.ASSISTANT["MATOMO_ID"], }, diff --git a/qfdmd/views.py b/qfdmd/views.py index 49c58d7b2..92b5eb832 100644 --- a/qfdmd/views.py +++ b/qfdmd/views.py @@ -72,15 +72,6 @@ def get_context_data(self, **kwargs: Any) -> dict[str, Any]: ) return context - def setup(self, request, *args, **kwargs): - if "iframe" not in request.session: - request.session["iframe"] = "iframe" in request.GET - if not request.user.is_authenticated: - request.session.set_expiry( - 0 - ) # set to 0 expires when the users closes its browser - super().setup(request, *args, **kwargs) - class HomeView(BaseView, ListView): template_name = "qfdmd/home.html" diff --git a/static/to_compile/js/controllers/assistant/analytics.ts b/static/to_compile/js/controllers/assistant/analytics.ts index 109b2cb1f..09bc9d2c4 100644 --- a/static/to_compile/js/controllers/assistant/analytics.ts +++ b/static/to_compile/js/controllers/assistant/analytics.ts @@ -118,11 +118,6 @@ export default class extends Controller { conversionScore, }, }) - - const posthogBannerConversionScore = document.querySelector("#posthog-banner-conversion-score") - if (posthogBannerConversionScore) { - posthogBannerConversionScore.textContent = conversionScore.toString() - } }, 1000) } diff --git a/templates/qfdmd/base.html b/templates/qfdmd/base.html index 11a5c505d..365a8bcd0 100644 --- a/templates/qfdmd/base.html +++ b/templates/qfdmd/base.html @@ -72,15 +72,5 @@ {% endif %} data-analytics-action-value="{% block analytics_action %}{% endblock %}" > - {% if assistant.is_iframe and request.user.is_authenticated %} -
- Le mode iframe de l'assistant est actif -
- {% endif %} - {% if "posthog" in request.GET %} -
- Identifiant PostHog : {{ assistant.POSTHOG_KEY }} | Score de conversion : -
- {% endif %} From 7619c223d5ea9d17631fe45746a1472082b2a078 Mon Sep 17 00:00:00 2001 From: Nicolas Oudard Date: Tue, 21 Jan 2025 23:59:18 +0100 Subject: [PATCH 2/7] Simplification des dags source et ajout du contexte --- dags/sources/config/airflow_params.py | 2 + .../airflow_logic/compute_link_tables_task.py | 57 ++++ .../airflow_logic/db_data_prepare_task.py | 40 +-- dags/sources/tasks/airflow_logic/operators.py | 54 +--- .../propose_acteur_services_task.py | 29 -- .../propose_acteur_to_delete_task.py | 6 +- .../airflow_logic/propose_labels_task.py | 31 --- .../propose_services_sous_categories_task.py | 31 --- .../airflow_logic/propose_services_task.py | 35 --- .../business_logic/compute_link_tables.py | 73 +++++ .../tasks/business_logic/db_data_prepare.py | 146 +++------- .../business_logic/propose_acteur_changes.py | 1 - .../business_logic/propose_acteur_services.py | 20 -- .../propose_acteur_to_delete.py | 10 +- .../tasks/business_logic/propose_labels.py | 27 -- .../tasks/business_logic/propose_services.py | 42 --- .../propose_services_sous_categories.py | 43 --- .../business_logic/source_data_normalize.py | 1 + dags/sources/tasks/transform/transform_df.py | 20 ++ .../business_logic/db_normalize_suggestion.py | 2 +- dags_unit_tests/conftest.py | 2 +- .../test_propose_acteur_services.py | 59 ---- .../business_logic/test_propose_labels.py | 29 -- .../business_logic/test_propose_services.py | 211 -------------- .../test_propose_services_sous_categories.py | 65 ----- .../test_test_propose_acteur_to_delete.py | 6 +- dags_unit_tests/utils/test_db_data_prepare.py | 260 +++++------------- ...0003_rename_context_suggestion_contexte.py | 18 ++ data/models.py | 2 +- 29 files changed, 308 insertions(+), 1014 deletions(-) create mode 100644 dags/sources/tasks/airflow_logic/compute_link_tables_task.py delete mode 100644 dags/sources/tasks/airflow_logic/propose_acteur_services_task.py delete mode 100644 dags/sources/tasks/airflow_logic/propose_labels_task.py delete mode 100644 dags/sources/tasks/airflow_logic/propose_services_sous_categories_task.py delete mode 100644 dags/sources/tasks/airflow_logic/propose_services_task.py create mode 100644 dags/sources/tasks/business_logic/compute_link_tables.py delete mode 100644 dags/sources/tasks/business_logic/propose_acteur_services.py delete mode 100644 dags/sources/tasks/business_logic/propose_labels.py delete mode 100644 dags/sources/tasks/business_logic/propose_services.py delete mode 100644 dags/sources/tasks/business_logic/propose_services_sous_categories.py delete mode 100644 dags_unit_tests/sources/tasks/business_logic/test_propose_acteur_services.py delete mode 100644 dags_unit_tests/sources/tasks/business_logic/test_propose_labels.py delete mode 100644 dags_unit_tests/sources/tasks/business_logic/test_propose_services.py delete mode 100644 dags_unit_tests/sources/tasks/business_logic/test_propose_services_sous_categories.py create mode 100644 data/migrations/0003_rename_context_suggestion_contexte.py diff --git a/dags/sources/config/airflow_params.py b/dags/sources/config/airflow_params.py index 0ae7754b6..5609b3288 100644 --- a/dags/sources/config/airflow_params.py +++ b/dags/sources/config/airflow_params.py @@ -24,6 +24,7 @@ clean_identifiant_externe, clean_identifiant_unique, clean_label_codes, + clean_proposition_services, clean_siret_and_siren, clean_telephone, compute_location, @@ -67,6 +68,7 @@ "get_latlng_from_geopoint": get_latlng_from_geopoint, "strip_lower_string": strip_lower_string, "compute_location": compute_location, + "clean_proposition_services": clean_proposition_services, } diff --git a/dags/sources/tasks/airflow_logic/compute_link_tables_task.py b/dags/sources/tasks/airflow_logic/compute_link_tables_task.py new file mode 100644 index 000000000..885fbf73b --- /dev/null +++ b/dags/sources/tasks/airflow_logic/compute_link_tables_task.py @@ -0,0 +1,57 @@ +import logging + +from airflow import DAG +from airflow.operators.python import PythonOperator +from sources.tasks.business_logic.compute_link_tables import compute_link_tables +from sources.tasks.business_logic.read_mapping_from_postgres import ( + read_mapping_from_postgres, +) +from utils import logging_utils as log + +logger = logging.getLogger(__name__) + + +def compute_link_tables_task(dag: DAG) -> PythonOperator: + return PythonOperator( + task_id="compute_link_tables", + python_callable=compute_link_tables_wrapper, + dag=dag, + ) + + +def compute_link_tables_wrapper(**kwargs): + df_acteur = kwargs["ti"].xcom_pull(task_ids="propose_acteur_changes")["df"] + acteurservice_id_by_code = read_mapping_from_postgres( + table_name="qfdmo_acteurservice" + ) + labelqualite_id_by_code = read_mapping_from_postgres( + table_name="qfdmo_labelqualite" + ) + actions_id_by_code = read_mapping_from_postgres(table_name="qfdmo_action") + souscats_id_by_code = read_mapping_from_postgres( + table_name="qfdmo_souscategorieobjet" + ) + source_id_by_code = read_mapping_from_postgres(table_name="qfdmo_source") + acteurtype_id_by_code = read_mapping_from_postgres(table_name="qfdmo_acteurtype") + + log.preview("df_acteur", df_acteur) + log.preview("acteurservice_id_by_code", acteurservice_id_by_code) + log.preview("labelqualite_id_by_code", labelqualite_id_by_code) + log.preview("actions_id_by_code", actions_id_by_code) + log.preview("souscats_id_by_code", souscats_id_by_code) + log.preview("source_id_by_code", source_id_by_code) + log.preview("acteurtype_id_by_code", acteurtype_id_by_code) + + df_acteur = compute_link_tables( + df_acteur=df_acteur, + acteurservice_id_by_code=acteurservice_id_by_code, + labelqualite_id_by_code=labelqualite_id_by_code, + actions_id_by_code=actions_id_by_code, + souscats_id_by_code=souscats_id_by_code, + source_id_by_code=source_id_by_code, + acteurtype_id_by_code=acteurtype_id_by_code, + ) + + log.preview("df_acteur après traitement", df_acteur) + + return df_acteur diff --git a/dags/sources/tasks/airflow_logic/db_data_prepare_task.py b/dags/sources/tasks/airflow_logic/db_data_prepare_task.py index 8bb5093c4..4edb37488 100644 --- a/dags/sources/tasks/airflow_logic/db_data_prepare_task.py +++ b/dags/sources/tasks/airflow_logic/db_data_prepare_task.py @@ -3,9 +3,6 @@ from airflow import DAG from airflow.operators.python import PythonOperator from sources.tasks.business_logic.db_data_prepare import db_data_prepare -from sources.tasks.business_logic.read_mapping_from_postgres import ( - read_mapping_from_postgres, -) from utils import logging_utils as log logger = logging.getLogger(__name__) @@ -20,36 +17,13 @@ def db_data_prepare_task(dag: DAG) -> PythonOperator: def db_data_prepare_wrapper(**kwargs): - df_acteur_to_delete = kwargs["ti"].xcom_pull(task_ids="propose_acteur_to_delete")[ - "df_acteur_to_delete" - ] - df_actors = kwargs["ti"].xcom_pull(task_ids="propose_acteur_changes")["df"] - df_ps = kwargs["ti"].xcom_pull(task_ids="propose_services")["df"] - df_pssc = kwargs["ti"].xcom_pull(task_ids="propose_services_sous_categories") - df_labels = kwargs["ti"].xcom_pull(task_ids="propose_labels") - df_acteur_services = kwargs["ti"].xcom_pull(task_ids="propose_acteur_services") - df_acteurs_from_db = kwargs["ti"].xcom_pull(task_ids="db_read_acteur") - source_id_by_code = read_mapping_from_postgres(table_name="qfdmo_source") - acteurtype_id_by_code = read_mapping_from_postgres(table_name="qfdmo_acteurtype") - - log.preview("df_acteur_to_delete", df_acteur_to_delete) - log.preview("df_actors", df_actors) - log.preview("df_ps", df_ps) - log.preview("df_pssc", df_pssc) - log.preview("df_labels", df_labels) - log.preview("df_acteur_services", df_acteur_services) - log.preview("df_acteurs_from_db", df_acteurs_from_db) - log.preview("source_id_by_code", source_id_by_code) - log.preview("acteurtype_id_by_code", acteurtype_id_by_code) + df_acteur = kwargs["ti"].xcom_pull(task_ids="compute_link_tables") + df_acteur_from_db = kwargs["ti"].xcom_pull(task_ids="db_read_acteur") + + log.preview("df_actors", df_acteur) + log.preview("df_acteur_from_db", df_acteur_from_db) return db_data_prepare( - df_acteur_to_delete=df_acteur_to_delete, - df_acteur=df_actors, - df_ps=df_ps, - df_pssc=df_pssc, - df_labels=df_labels, - df_acteur_services=df_acteur_services, - df_acteurs_from_db=df_acteurs_from_db, - source_id_by_code=source_id_by_code, - acteurtype_id_by_code=acteurtype_id_by_code, + df_acteur=df_acteur, + df_acteur_from_db=df_acteur_from_db, ) diff --git a/dags/sources/tasks/airflow_logic/operators.py b/dags/sources/tasks/airflow_logic/operators.py index efa5fba9f..23e07a7a7 100755 --- a/dags/sources/tasks/airflow_logic/operators.py +++ b/dags/sources/tasks/airflow_logic/operators.py @@ -2,31 +2,17 @@ from airflow import DAG from airflow.models.baseoperator import chain +from sources.tasks.airflow_logic.compute_link_tables_task import ( + compute_link_tables_task, +) from sources.tasks.airflow_logic.db_data_prepare_task import db_data_prepare_task from sources.tasks.airflow_logic.db_read_acteur_task import db_read_acteur_task -from sources.tasks.airflow_logic.db_read_propositions_max_id_task import ( - db_read_propositions_max_id_task, -) from sources.tasks.airflow_logic.db_write_type_action_suggestions_task import ( db_write_type_action_suggestions_task, ) from sources.tasks.airflow_logic.propose_acteur_changes_task import ( propose_acteur_changes_task, ) -from sources.tasks.airflow_logic.propose_acteur_services_task import ( - propose_acteur_services_task, -) -from sources.tasks.airflow_logic.propose_acteur_to_delete_task import ( - propose_acteur_to_delete_task, -) -from sources.tasks.airflow_logic.propose_labels_task import propose_labels_task -from sources.tasks.airflow_logic.propose_services_sous_categories_task import ( - propose_services_sous_categories_task, -) -from sources.tasks.airflow_logic.propose_services_task import propose_services_task -from sources.tasks.airflow_logic.read_mapping_from_postgres_task import ( - read_mapping_from_postgres_task, -) from sources.tasks.airflow_logic.source_config_validate_task import ( source_config_validate_task, ) @@ -51,47 +37,15 @@ def eo_task_chain(dag: DAG) -> None: - read_tasks = [ - read_mapping_from_postgres_task( - dag=dag, table_name="qfdmo_acteurtype", task_id="db_read_acteurtype" - ), - read_mapping_from_postgres_task( - dag=dag, table_name="qfdmo_source", task_id="db_read_source" - ), - read_mapping_from_postgres_task( - dag=dag, table_name="qfdmo_action", task_id="db_read_action" - ), - read_mapping_from_postgres_task( - dag=dag, table_name="qfdmo_acteurservice", task_id="db_read_acteurservice" - ), - read_mapping_from_postgres_task( - dag=dag, table_name="qfdmo_labelqualite", task_id="db_read_labelqualite" - ), - read_mapping_from_postgres_task( - dag=dag, - table_name="qfdmo_souscategorieobjet", - task_id="db_read_souscategorieobjet", - ), - db_read_propositions_max_id_task(dag), - ] - - create_tasks = [ - propose_services_task(dag), - propose_labels_task(dag), - propose_acteur_services_task(dag), - ] chain( source_config_validate_task(dag), source_data_download_task(dag), source_data_normalize_task(dag), source_data_validate_task(dag), - read_tasks, db_read_acteur_task(dag), propose_acteur_changes_task(dag), - propose_acteur_to_delete_task(dag), - create_tasks, - propose_services_sous_categories_task(dag), + compute_link_tables_task(dag), db_data_prepare_task(dag), db_write_type_action_suggestions_task(dag), ) diff --git a/dags/sources/tasks/airflow_logic/propose_acteur_services_task.py b/dags/sources/tasks/airflow_logic/propose_acteur_services_task.py deleted file mode 100644 index 69d423513..000000000 --- a/dags/sources/tasks/airflow_logic/propose_acteur_services_task.py +++ /dev/null @@ -1,29 +0,0 @@ -import logging - -from airflow import DAG -from airflow.operators.python import PythonOperator -from sources.tasks.business_logic.propose_acteur_services import propose_acteur_services -from utils import logging_utils as log - -logger = logging.getLogger(__name__) - - -def propose_acteur_services_task(dag: DAG) -> PythonOperator: - return PythonOperator( - task_id="propose_acteur_services", - python_callable=propose_acteur_services_wrapper, - dag=dag, - ) - - -def propose_acteur_services_wrapper(**kwargs): - acteurservice_id_by_code = kwargs["ti"].xcom_pull(task_ids="db_read_acteurservice") - df_acteur = kwargs["ti"].xcom_pull(task_ids="propose_acteur_changes")["df"] - - log.preview(df_acteur, "df_actors") - log.preview(acteurservice_id_by_code, "acteurservice_id_by_code") - - return propose_acteur_services( - df_acteur=df_acteur, - acteurservice_id_by_code=acteurservice_id_by_code, - ) diff --git a/dags/sources/tasks/airflow_logic/propose_acteur_to_delete_task.py b/dags/sources/tasks/airflow_logic/propose_acteur_to_delete_task.py index 39b8de741..6582d530f 100644 --- a/dags/sources/tasks/airflow_logic/propose_acteur_to_delete_task.py +++ b/dags/sources/tasks/airflow_logic/propose_acteur_to_delete_task.py @@ -22,12 +22,12 @@ def propose_acteur_to_delete_wrapper(**kwargs): df_acteurs_for_source = kwargs["ti"].xcom_pull(task_ids="propose_acteur_changes")[ "df" ] - df_acteurs_from_db = kwargs["ti"].xcom_pull(task_ids="db_read_acteur") + df_acteur_from_db = kwargs["ti"].xcom_pull(task_ids="db_read_acteur") log.preview(df_acteurs_for_source, "df_acteurs_for_source") - log.preview(df_acteurs_from_db, "df_acteurs_from_db") + log.preview(df_acteur_from_db, "df_acteur_from_db") return propose_acteur_to_delete( df_acteurs_for_source=df_acteurs_for_source, - df_acteurs_from_db=df_acteurs_from_db, + df_acteur_from_db=df_acteur_from_db, ) diff --git a/dags/sources/tasks/airflow_logic/propose_labels_task.py b/dags/sources/tasks/airflow_logic/propose_labels_task.py deleted file mode 100644 index 2fe9268c5..000000000 --- a/dags/sources/tasks/airflow_logic/propose_labels_task.py +++ /dev/null @@ -1,31 +0,0 @@ -import logging - -from airflow import DAG -from airflow.operators.python import PythonOperator -from sources.tasks.business_logic.propose_labels import propose_labels -from utils import logging_utils as log - -logger = logging.getLogger(__name__) - - -def propose_labels_task(dag: DAG) -> PythonOperator: - return PythonOperator( - task_id="propose_labels", - python_callable=propose_labels_wrapper, - dag=dag, - ) - - -def propose_labels_wrapper(**kwargs): - labelqualite_id_by_code = kwargs["ti"].xcom_pull(task_ids="db_read_labelqualite") - acteurtype_id_by_code = kwargs["ti"].xcom_pull(task_ids="db_read_acteurtype") - df_actors = kwargs["ti"].xcom_pull(task_ids="propose_acteur_changes")["df"] - - log.preview(df_actors, "df_actors") - log.preview(labelqualite_id_by_code, "labelqualite_id_by_code") - log.preview(acteurtype_id_by_code, "acteurtype_id_by_code") - - return propose_labels( - df_acteur=df_actors, - labelqualite_id_by_code=labelqualite_id_by_code, - ) diff --git a/dags/sources/tasks/airflow_logic/propose_services_sous_categories_task.py b/dags/sources/tasks/airflow_logic/propose_services_sous_categories_task.py deleted file mode 100644 index 5c7e89354..000000000 --- a/dags/sources/tasks/airflow_logic/propose_services_sous_categories_task.py +++ /dev/null @@ -1,31 +0,0 @@ -import logging - -from airflow import DAG -from airflow.operators.python import PythonOperator -from sources.tasks.business_logic.propose_services_sous_categories import ( - propose_services_sous_categories, -) -from utils import logging_utils as log - -logger = logging.getLogger(__name__) - - -def propose_services_sous_categories_task(dag: DAG) -> PythonOperator: - return PythonOperator( - task_id="propose_services_sous_categories", - python_callable=propose_services_sous_categories_wrapper, - dag=dag, - ) - - -def propose_services_sous_categories_wrapper(**kwargs): - df_ps = kwargs["ti"].xcom_pull(task_ids="propose_services")["df"] - souscats_id_by_code = kwargs["ti"].xcom_pull(task_ids="db_read_souscategorieobjet") - - log.preview("df_ps", df_ps) - log.preview("souscats_id_by_code", souscats_id_by_code) - - return propose_services_sous_categories( - df_ps=df_ps, - souscats_id_by_code=souscats_id_by_code, - ) diff --git a/dags/sources/tasks/airflow_logic/propose_services_task.py b/dags/sources/tasks/airflow_logic/propose_services_task.py deleted file mode 100644 index 3e03af045..000000000 --- a/dags/sources/tasks/airflow_logic/propose_services_task.py +++ /dev/null @@ -1,35 +0,0 @@ -import logging - -from airflow import DAG -from airflow.operators.python import PythonOperator -from sources.tasks.business_logic.propose_services import propose_services -from utils import logging_utils as log - -logger = logging.getLogger(__name__) - - -def propose_services_task(dag: DAG) -> PythonOperator: - return PythonOperator( - task_id="propose_services", - python_callable=propose_services_wrapper, - dag=dag, - ) - - -def propose_services_wrapper(**kwargs) -> dict: - df = kwargs["ti"].xcom_pull(task_ids="propose_acteur_changes")["df"] - data_dict = kwargs["ti"].xcom_pull(task_ids="db_read_propositions_max_id") - displayedpropositionservice_max_id = data_dict["displayedpropositionservice_max_id"] - actions_id_by_code = kwargs["ti"].xcom_pull(task_ids="db_read_action") - - log.preview("df depuis propose_acteur_changes", df) - log.preview( - "displayedpropositionservice_max_id", displayedpropositionservice_max_id - ) - log.preview("actions_id_by_code", actions_id_by_code) - - return propose_services( - df=df, - displayedpropositionservice_max_id=displayedpropositionservice_max_id, - actions_id_by_code=actions_id_by_code, - ) diff --git a/dags/sources/tasks/business_logic/compute_link_tables.py b/dags/sources/tasks/business_logic/compute_link_tables.py new file mode 100644 index 000000000..ad853fb67 --- /dev/null +++ b/dags/sources/tasks/business_logic/compute_link_tables.py @@ -0,0 +1,73 @@ +import pandas as pd + + +def compute_link_tables( + df_acteur: pd.DataFrame, + acteurservice_id_by_code: dict, + labelqualite_id_by_code: dict, + actions_id_by_code: dict, + souscats_id_by_code: dict, + source_id_by_code: dict, + acteurtype_id_by_code: dict, +) -> pd.DataFrame: + + # FIXME: ajout du controle de la présence des colonnes dand l'action validate data + # (après la normalisation) + + # Compute qfdmo_acteur_acteurservice + df_acteur["acteur_services"] = df_acteur.apply( + lambda row: [ + { + "acteurservice_id": acteurservice_id_by_code[acteurservice_code], + "acteur_id": row["identifiant_unique"], + } + for acteurservice_code in row["acteurservice_codes"] + ], + axis=1, + ) + + # Compute qfdmo_acteur_labelqualite + df_acteur["labels"] = df_acteur.apply( + lambda row: [ + { + "acteur_id": row["identifiant_unique"], + "labelqualite_id": labelqualite_id_by_code[label_ou_bonus], + } + for label_ou_bonus in row["label_codes"] + ], + axis=1, + ) + + df_acteur["proposition_services"] = df_acteur.apply( + lambda row: [ + { + "acteur_id": row["identifiant_unique"], + "action_id": actions_id_by_code[action_code], + "action": action_code, + "sous_categories": sous_categories, + "pds_sous_categories": [ + { + "souscategorie": sous_categorie, + "souscategorieobjet_id": souscats_id_by_code[sous_categorie], + } + for sous_categorie in sous_categories + ], + } + for proposition_services_code in row["proposition_services_codes"] + for action_code, sous_categories in [ + ( + proposition_services_code["action"], + proposition_services_code["sous_categories"], + ) + ] + ], + axis=1, + ) + + # Convertir les codes des sources et des acteur_types en identifiants + df_acteur["source_id"] = df_acteur["source_code"].map(source_id_by_code) + df_acteur["acteur_type_id"] = df_acteur["acteur_type_code"].map( + acteurtype_id_by_code + ) + + return df_acteur diff --git a/dags/sources/tasks/business_logic/db_data_prepare.py b/dags/sources/tasks/business_logic/db_data_prepare.py index c2e5e216e..208e01f93 100644 --- a/dags/sources/tasks/business_logic/db_data_prepare.py +++ b/dags/sources/tasks/business_logic/db_data_prepare.py @@ -2,137 +2,69 @@ import logging import pandas as pd +from sources.config import shared_constants as constants from utils import logging_utils as log logger = logging.getLogger(__name__) def db_data_prepare( - df_acteur_to_delete: pd.DataFrame, df_acteur: pd.DataFrame, - df_ps: pd.DataFrame, - df_pssc: pd.DataFrame, - df_labels: pd.DataFrame, - df_acteur_services: pd.DataFrame, - df_acteurs_from_db: pd.DataFrame, - source_id_by_code: dict, - acteurtype_id_by_code: dict, + df_acteur_from_db: pd.DataFrame, ): - - update_actors_columns = ["identifiant_unique", "statut", "cree_le"] - df_acteur_to_delete["suggestion"] = df_acteur_to_delete[ - update_actors_columns - ].apply(lambda row: json.dumps(row.to_dict(), default=str), axis=1) - - if df_acteur.empty: - raise ValueError("df_acteur est vide") - if df_acteur_services.empty: - raise ValueError("df_acteur_services est vide") - if df_ps.empty: - raise ValueError("df_ps est vide") - if df_pssc.empty: - raise ValueError("df_pssc est vide") - - # Convertir les codes des sources et des acteur_types en identifiants - df_acteur["source_id"] = df_acteur["source_code"].map(source_id_by_code) - df_acteur["acteur_type_id"] = df_acteur["acteur_type_code"].map( - acteurtype_id_by_code - ) - - # FIXME: A bouger dans un tache compute_ps qui remplacera propose_services et - # propose_services_sous_categories - aggregated_pdsc = ( - df_pssc.groupby("propositionservice_id") - .apply(lambda x: x.to_dict("records") if not x.empty else []) - .reset_index(name="pds_sous_categories") - ) - - df_pds_joined = pd.merge( - df_ps, - aggregated_pdsc, - how="left", - left_on="id", - right_on="propositionservice_id", - ) - df_pds_joined["propositionservice_id"] = df_pds_joined[ - "propositionservice_id" - ].astype(str) - df_pds_joined["pds_sous_categories"] = df_pds_joined["pds_sous_categories"].apply( - lambda x: x if isinstance(x, list) else [] - ) - df_pds_joined.drop("id", axis=1, inplace=True) - - aggregated_pds = ( - df_pds_joined.groupby("acteur_id") - .apply(lambda x: x.to_dict("records") if not x.empty else []) - .reset_index(name="proposition_services") + # transformer df_acteur_from_db en df [['identifiant_unique', 'contexte']] + # dans context, on veut la ligne en json + df_acteur_from_db["contexte"] = df_acteur_from_db.apply( + lambda row: json.dumps(row.to_dict(), default=str), axis=1 ) - aggregated_labels = df_labels.groupby("acteur_id").apply( - lambda x: x.to_dict("records") if not x.empty else [] - ) - aggregated_labels = ( - pd.DataFrame(columns=["acteur_id", "labels"]) - if aggregated_labels.empty - else aggregated_labels.reset_index(name="labels") - ) + df_acteur_from_db_actifs = df_acteur_from_db[ + df_acteur_from_db["statut"] == constants.ACTEUR_ACTIF + ] - aggregated_acteur_services = df_acteur_services.groupby("acteur_id").apply( - lambda x: x.to_dict("records") if not x.empty else [] - ) - aggregated_acteur_services = ( - pd.DataFrame(columns=["acteur_id", "acteur_services"]) - if aggregated_acteur_services.empty - else aggregated_acteur_services.reset_index(name="acteur_services") - ) + df_acteur_to_delete = df_acteur_from_db_actifs[ + ~df_acteur_from_db_actifs["identifiant_unique"].isin( + df_acteur["identifiant_unique"] + ) + ][["identifiant_unique"]] - df_joined_with_pds = pd.merge( - df_acteur, - aggregated_pds, - how="left", - left_on="identifiant_unique", - right_on="acteur_id", - ) + df_acteur_to_delete["statut"] = "SUPPRIME" - df_joined_with_labels = pd.merge( - df_joined_with_pds, - aggregated_labels, - how="left", - left_on="acteur_id", - right_on="acteur_id", - ) + df_acteur_to_delete["suggestion"] = df_acteur_to_delete[ + ["identifiant_unique", "statut"] + ].apply(lambda row: json.dumps(row.to_dict(), default=str), axis=1) - df_joined = pd.merge( - df_joined_with_labels, - aggregated_acteur_services, - how="left", - left_on="acteur_id", - right_on="acteur_id", + df_acteur_to_delete = df_acteur_to_delete.merge( + df_acteur_from_db[["identifiant_unique", "contexte"]], + on="identifiant_unique", + how="inner", ) - df_joined["proposition_services"] = df_joined["proposition_services"].apply( - lambda x: x if isinstance(x, list) else [] - ) + # FIXME : à faire avant dans la validation des données + if df_acteur.empty: + raise ValueError("df_acteur est vide") - df_joined.loc[ - df_joined["proposition_services"].apply(lambda x: x == []), "statut" + # FIXME : à faire avant dans la normalisation des données + # Inactivate acteur if propositions_services is empty + df_acteur.loc[ + df_acteur["proposition_services"].apply(lambda x: x == []), "statut" ] = "INACTIF" - df_joined.drop("acteur_id", axis=1, inplace=True) - - df_joined = df_joined.where(pd.notna(df_joined), None) - - df_joined["suggestion"] = df_joined.apply( + df_acteur["suggestion"] = df_acteur.apply( lambda row: json.dumps(row.to_dict(), default=str), axis=1 ) - df_joined.drop_duplicates("identifiant_unique", keep="first", inplace=True) - df_acteur_to_create = df_joined[ - ~df_joined["identifiant_unique"].isin(df_acteurs_from_db["identifiant_unique"]) + df_acteur_to_create = df_acteur[ + ~df_acteur["identifiant_unique"].isin(df_acteur_from_db["identifiant_unique"]) ] - df_acteur_to_update = df_joined[ - df_joined["identifiant_unique"].isin(df_acteurs_from_db["identifiant_unique"]) + df_acteur_to_update = df_acteur[ + df_acteur["identifiant_unique"].isin(df_acteur_from_db["identifiant_unique"]) ] + df_acteur_to_update = df_acteur_to_update.merge( + df_acteur_from_db[["identifiant_unique", "contexte"]], + on="identifiant_unique", + how="left", + ) log.preview("df_acteur_to_create", df_acteur_to_create) log.preview("df_acteur_to_update", df_acteur_to_update) diff --git a/dags/sources/tasks/business_logic/propose_acteur_changes.py b/dags/sources/tasks/business_logic/propose_acteur_changes.py index 4e459d51d..0938cb113 100644 --- a/dags/sources/tasks/business_logic/propose_acteur_changes.py +++ b/dags/sources/tasks/business_logic/propose_acteur_changes.py @@ -36,7 +36,6 @@ def propose_acteur_changes( } df_acteur = df_acteur.drop_duplicates(subset="identifiant_unique", keep="first") - df_acteur["event"] = "CREATE" return { "df": df_acteur, "metadata": metadata, diff --git a/dags/sources/tasks/business_logic/propose_acteur_services.py b/dags/sources/tasks/business_logic/propose_acteur_services.py deleted file mode 100644 index 92292396c..000000000 --- a/dags/sources/tasks/business_logic/propose_acteur_services.py +++ /dev/null @@ -1,20 +0,0 @@ -import pandas as pd - - -def propose_acteur_services(df_acteur: pd.DataFrame, acteurservice_id_by_code: dict): - - acteur_acteurservice_list = [] - for _, acteur in df_acteur.iterrows(): - for acteurservice_code in acteur["acteurservice_codes"]: - acteur_acteurservice_list.append( - { - "acteur_id": acteur["identifiant_unique"], - "acteurservice_id": acteurservice_id_by_code[acteurservice_code], - } - ) - - df_acteur_services = pd.DataFrame( - acteur_acteurservice_list, - columns=["acteur_id", "acteurservice_id"], - ) - return df_acteur_services diff --git a/dags/sources/tasks/business_logic/propose_acteur_to_delete.py b/dags/sources/tasks/business_logic/propose_acteur_to_delete.py index c8e800f99..6008918b7 100644 --- a/dags/sources/tasks/business_logic/propose_acteur_to_delete.py +++ b/dags/sources/tasks/business_logic/propose_acteur_to_delete.py @@ -8,15 +8,15 @@ def propose_acteur_to_delete( df_acteurs_for_source: pd.DataFrame, - df_acteurs_from_db: pd.DataFrame, + df_acteur_from_db: pd.DataFrame, ): - df_acteurs_from_db_actifs = df_acteurs_from_db[ - df_acteurs_from_db["statut"] == constants.ACTEUR_ACTIF + df_acteur_from_db_actifs = df_acteur_from_db[ + df_acteur_from_db["statut"] == constants.ACTEUR_ACTIF ] - df_acteur_to_delete = df_acteurs_from_db_actifs[ - ~df_acteurs_from_db_actifs["identifiant_unique"].isin( + df_acteur_to_delete = df_acteur_from_db_actifs[ + ~df_acteur_from_db_actifs["identifiant_unique"].isin( df_acteurs_for_source["identifiant_unique"] ) ][["identifiant_unique", "cree_le", "modifie_le"]] diff --git a/dags/sources/tasks/business_logic/propose_labels.py b/dags/sources/tasks/business_logic/propose_labels.py deleted file mode 100644 index 351bdd09e..000000000 --- a/dags/sources/tasks/business_logic/propose_labels.py +++ /dev/null @@ -1,27 +0,0 @@ -import pandas as pd - -LABEL_TO_IGNORE = ["non applicable", "na", "n/a", "null", "aucun", "non"] -ACTEUR_TYPE_ESS = "ess" - - -def propose_labels( - df_acteur: pd.DataFrame, - labelqualite_id_by_code: dict, -) -> pd.DataFrame: - - rows_list = [] - for _, row in df_acteur.iterrows(): - - for label_ou_bonus in row["label_codes"]: - rows_list.append( - { - "acteur_id": row["identifiant_unique"], - "labelqualite_id": labelqualite_id_by_code[label_ou_bonus], - } - ) - - df_labels = pd.DataFrame(rows_list, columns=["acteur_id", "labelqualite_id"]) - df_labels.drop_duplicates( - ["acteur_id", "labelqualite_id"], keep="first", inplace=True - ) - return df_labels diff --git a/dags/sources/tasks/business_logic/propose_services.py b/dags/sources/tasks/business_logic/propose_services.py deleted file mode 100644 index 1a1b95b71..000000000 --- a/dags/sources/tasks/business_logic/propose_services.py +++ /dev/null @@ -1,42 +0,0 @@ -import logging - -import pandas as pd -from utils import logging_utils as log - -logger = logging.getLogger(__name__) - - -def propose_services( - df: pd.DataFrame, - displayedpropositionservice_max_id: int, - actions_id_by_code: dict, -) -> dict: - merged_count = 0 - rows_list = [] - - for _, row in df.iterrows(): - for action_code in row["action_codes"]: - rows_list.append( - { - "action_id": actions_id_by_code[action_code], - "acteur_id": row["identifiant_unique"], - "action": action_code, - "sous_categories": row["souscategorie_codes"], - } - ) - - df_pds = pd.DataFrame(rows_list) - if df_pds.empty: - raise ValueError("df_pds est vide") - if indexes := range( - displayedpropositionservice_max_id, - displayedpropositionservice_max_id + len(df_pds), - ): - df_pds["id"] = indexes - metadata = { - "number_of_merged_actors": merged_count, - "number_of_propositionservices": len(df_pds), - } - log.preview("df_pds retournée par la tâche", df_pds) - - return {"df": df_pds, "metadata": metadata} diff --git a/dags/sources/tasks/business_logic/propose_services_sous_categories.py b/dags/sources/tasks/business_logic/propose_services_sous_categories.py deleted file mode 100644 index 3571e998c..000000000 --- a/dags/sources/tasks/business_logic/propose_services_sous_categories.py +++ /dev/null @@ -1,43 +0,0 @@ -import logging - -import pandas as pd -from utils import logging_utils as log - -logger = logging.getLogger(__name__) - - -def propose_services_sous_categories( - df_ps: pd.DataFrame, - souscats_id_by_code: dict, -): - rows_list = [] - - log.preview("df_ps", df_ps) - logger.info(df_ps.head(1).to_dict(orient="records")) - log.preview("souscats_id_by_code", souscats_id_by_code) - - for _, row in df_ps.iterrows(): - for sscat_code in set(row["sous_categories"]): - rows_list.append( - { - "propositionservice_id": row["id"], - "souscategorieobjet_id": souscats_id_by_code[sscat_code], - "souscategorie": sscat_code, - } - ) - - df_souscats = pd.DataFrame( - rows_list, - columns=["propositionservice_id", "souscategorieobjet_id", "souscategorie"], - ) - log.preview("df_souscats créée par le mapping souscats", df_souscats) - logger.info(f"# entrées df_souscats avant nettoyage: {len(df_souscats)}") - df_souscats.drop_duplicates( - ["propositionservice_id", "souscategorieobjet_id"], keep="first", inplace=True - ) - logger.info(f"# entrées df_souscats après suppression doublons: {len(df_souscats)}") - df_souscats = df_souscats[df_souscats["souscategorieobjet_id"].notna()] - logger.info(f"# entrées df_souscats après nettoyage des vides: {len(df_souscats)}") - if df_souscats.empty: - raise ValueError("df_souscats est vide") - return df_souscats diff --git a/dags/sources/tasks/business_logic/source_data_normalize.py b/dags/sources/tasks/business_logic/source_data_normalize.py index 40026ec3d..046c6ffd1 100755 --- a/dags/sources/tasks/business_logic/source_data_normalize.py +++ b/dags/sources/tasks/business_logic/source_data_normalize.py @@ -134,6 +134,7 @@ def _remove_undesired_lines(df: pd.DataFrame, dag_config: DAGConfig) -> pd.DataF ) log.preview("Doublons sur identifiant_unique", dups) if dag_config.ignore_duplicates: + # FIXME: dedupliquer en mergeant proposition_services_codes ? # TODO: Attention aux lignes dupliquées à cause de de service en ligne # + physique df = df.drop_duplicates(subset=["identifiant_unique"], keep="first") diff --git a/dags/sources/tasks/transform/transform_df.py b/dags/sources/tasks/transform/transform_df.py index 4c8a8c42b..c7faa3006 100644 --- a/dags/sources/tasks/transform/transform_df.py +++ b/dags/sources/tasks/transform/transform_df.py @@ -230,6 +230,26 @@ def compute_location(row: pd.Series, _): return row[["location"]] +def clean_proposition_services(row, _): + + # formater les propositions de service selon les colonnes + # action_codes and souscategorie_codes + # + # [{'action': 'CODE_ACTION','sous_categories': ['CODE_SSCAT']}] ou [] + if row["souscategorie_codes"]: + row["proposition_services_codes"] = [ + { + "action": action, + "sous_categories": row["souscategorie_codes"], + } + for action in row["action_codes"] + ] + else: + row["proposition_services_codes"] = [] + + return row[["proposition_services_codes"]] + + ### Fonctions de résolution de l'adresse au format BAN et avec vérification via l'API # adresse.data.gouv.fr en option # TODO : A déplacer ? diff --git a/dags/suggestions/tasks/business_logic/db_normalize_suggestion.py b/dags/suggestions/tasks/business_logic/db_normalize_suggestion.py index 5551da313..7630792ae 100644 --- a/dags/suggestions/tasks/business_logic/db_normalize_suggestion.py +++ b/dags/suggestions/tasks/business_logic/db_normalize_suggestion.py @@ -91,8 +91,8 @@ def normalize_acteur_update_for_db(df_actors, dag_run_id, engine, type_action): def process_many2many_df(df, column_name, df_columns=["acteur_id", "labelqualite_id"]): try: - # Attempt to process the 'labels' column if it exists and is not empty normalized_df = df[column_name].dropna().apply(pd.json_normalize) + normalized_df = normalized_df[normalized_df.apply(lambda x: not x.empty)] if normalized_df.empty: return pd.DataFrame( columns=df_columns diff --git a/dags_unit_tests/conftest.py b/dags_unit_tests/conftest.py index 0d3eea91e..99a1e6214 100755 --- a/dags_unit_tests/conftest.py +++ b/dags_unit_tests/conftest.py @@ -100,7 +100,7 @@ def df_empty_acteurs_from_db(): @pytest.fixture -def df_acteurs_from_db(): +def df_acteur_from_db(): return pd.DataFrame( { "identifiant_unique": ["id1", "id2"], diff --git a/dags_unit_tests/sources/tasks/business_logic/test_propose_acteur_services.py b/dags_unit_tests/sources/tasks/business_logic/test_propose_acteur_services.py deleted file mode 100644 index afd0b8c86..000000000 --- a/dags_unit_tests/sources/tasks/business_logic/test_propose_acteur_services.py +++ /dev/null @@ -1,59 +0,0 @@ -import pandas as pd -import pytest -from sources.tasks.business_logic.propose_acteur_services import propose_acteur_services - - -@pytest.fixture -def acteurservice_id_by_code(): - return {"service_de_reparation": 10, "structure_de_collecte": 20} - - -class TestCreateActeurServices: - # TODO : refacto avec parametize - def test_create_acteur_services_empty(self, acteurservice_id_by_code): - - df_result = propose_acteur_services( - df_acteur=pd.DataFrame( - { - "identifiant_unique": [1, 2], - "acteurservice_codes": [[], []], - } - ), - acteurservice_id_by_code=acteurservice_id_by_code, - ) - - assert df_result.empty - assert df_result.columns.tolist() == [ - "acteur_id", - "acteurservice_id", - ] - - def test_create_acteur_services_full(self, acteurservice_id_by_code): - - df_result = propose_acteur_services( - df_acteur=pd.DataFrame( - { - "identifiant_unique": [1, 2, 3], - "acteurservice_codes": [ - ["service_de_reparation", "structure_de_collecte"], - ["service_de_reparation"], - ["structure_de_collecte"], - ], - } - ), - acteurservice_id_by_code=acteurservice_id_by_code, - ) - - assert df_result.columns.tolist() == [ - "acteur_id", - "acteurservice_id", - ] - assert sorted( - df_result.loc[df_result["acteur_id"] == 1, "acteurservice_id"].tolist() - ) == [10, 20] - assert sorted( - df_result.loc[df_result["acteur_id"] == 2, "acteurservice_id"].tolist() - ) == [10] - assert sorted( - df_result.loc[df_result["acteur_id"] == 3, "acteurservice_id"].tolist() - ) == [20] diff --git a/dags_unit_tests/sources/tasks/business_logic/test_propose_labels.py b/dags_unit_tests/sources/tasks/business_logic/test_propose_labels.py deleted file mode 100644 index b2bde946c..000000000 --- a/dags_unit_tests/sources/tasks/business_logic/test_propose_labels.py +++ /dev/null @@ -1,29 +0,0 @@ -import pandas as pd -from sources.tasks.business_logic.propose_labels import propose_labels - - -class TestCeateLabels: - - def test_create_reparacteur_labels( - self, - labelqualite_id_by_code, - ): - df = propose_labels( - df_acteur=pd.DataFrame( - { - "identifiant_unique": [1, 2], - "label_codes": [["label_bonus"], []], - "acteur_type_id": [202, 202], - } - ), - labelqualite_id_by_code=labelqualite_id_by_code, - ) - - expected_dataframe_with_reparacteur_label = pd.DataFrame( - { - "acteur_id": [1], - "labelqualite_id": [2], - } - ) - - pd.testing.assert_frame_equal(df, expected_dataframe_with_reparacteur_label) diff --git a/dags_unit_tests/sources/tasks/business_logic/test_propose_services.py b/dags_unit_tests/sources/tasks/business_logic/test_propose_services.py deleted file mode 100644 index ea9a1f62c..000000000 --- a/dags_unit_tests/sources/tasks/business_logic/test_propose_services.py +++ /dev/null @@ -1,211 +0,0 @@ -import pandas as pd -import pytest -from sources.tasks.business_logic.propose_services import propose_services - - -@pytest.fixture -def actions_id_by_code(): - return {"reparer": 1, "donner": 2, "trier": 3} - - -class TestCreatePropositionService: - - def test_create_proposition_services_empty( - self, - actions_id_by_code, - ): - df_create_actors = pd.DataFrame( - { - "identifiant_unique": [1], - "souscategorie_codes": [["smartphone, tablette et console"]], - "action_codes": [[]], - } - ) - - with pytest.raises(ValueError): - propose_services( - df=df_create_actors, - displayedpropositionservice_max_id=1, - actions_id_by_code=actions_id_by_code, - ) - - @pytest.mark.parametrize( - "df_create_actors, expected_df, expected_metadata", - [ - # Service Réparation - ( - pd.DataFrame( - { - "identifiant_unique": [1], - "souscategorie_codes": [["smartphone, tablette et console"]], - "action_codes": [["reparer"]], - } - ), - pd.DataFrame( - { - "action_id": [1], - "acteur_id": [1], - "action": ["reparer"], - "sous_categories": [["smartphone, tablette et console"]], - "id": [1], - }, - ), - {"number_of_merged_actors": 0, "number_of_propositionservices": 1}, - ), - # Service Réemploi - ( - pd.DataFrame( - { - "identifiant_unique": [1], - "souscategorie_codes": [["smartphone, tablette et console"]], - "action_codes": [["donner"]], - } - ), - pd.DataFrame( - { - "action_id": [2], - "acteur_id": [1], - "action": ["donner"], - "sous_categories": [["smartphone, tablette et console"]], - "id": [1], - }, - ), - {"number_of_merged_actors": 0, "number_of_propositionservices": 1}, - ), - # Point Collecte (tri) - ( - pd.DataFrame( - { - "identifiant_unique": [1], - "souscategorie_codes": [["smartphone, tablette et console"]], - "action_codes": [["trier"]], - } - ), - pd.DataFrame( - { - "action_id": [3], - "acteur_id": [1], - "action": ["trier"], - "sous_categories": [["smartphone, tablette et console"]], - "id": [1], - }, - ), - {"number_of_merged_actors": 0, "number_of_propositionservices": 1}, - ), - # All services - ( - pd.DataFrame( - { - "identifiant_unique": [1], - "souscategorie_codes": [["smartphone, tablette et console"]], - "action_codes": [["reparer", "donner", "trier"]], - } - ), - pd.DataFrame( - { - "action_id": [1, 2, 3], - "acteur_id": [1, 1, 1], - "action": ["reparer", "donner", "trier"], - "sous_categories": [ - ["smartphone, tablette et console"], - ["smartphone, tablette et console"], - ["smartphone, tablette et console"], - ], - "id": [1, 2, 3], - }, - ), - {"number_of_merged_actors": 0, "number_of_propositionservices": 3}, - ), - ], - ) - def test_create_proposition_services_services( - self, - df_create_actors, - expected_df, - expected_metadata, - actions_id_by_code, - ): - result = propose_services( - df=df_create_actors, - displayedpropositionservice_max_id=1, - actions_id_by_code=actions_id_by_code, - ) - - assert result["df"].equals(expected_df) - assert result["metadata"] == expected_metadata - - def test_create_proposition_multiple_actor( - self, - actions_id_by_code, - ): - df_create_actors = pd.DataFrame( - { - "identifiant_unique": [1, 2], - "souscategorie_codes": [ - "smartphone, tablette et console", - "smartphone, tablette et console", - ], - "action_codes": [["trier"], ["trier"]], - } - ) - - expected_df = pd.DataFrame( - { - "action_id": [3, 3], - "acteur_id": [1, 2], - "action": ["trier", "trier"], - "sous_categories": [ - "smartphone, tablette et console", - "smartphone, tablette et console", - ], - "id": [1, 2], - } - ) - expected_metadata = { - "number_of_merged_actors": 0, - "number_of_propositionservices": 2, - } - - result = propose_services( - df=df_create_actors, - displayedpropositionservice_max_id=1, - actions_id_by_code=actions_id_by_code, - ) - - assert result["df"].equals(expected_df) - assert result["metadata"] == expected_metadata - - def test_create_proposition_multiple_product( - self, - actions_id_by_code, - ): - df_create_actors = pd.DataFrame( - { - "identifiant_unique": [1], - "action_codes": [["reparer"]], - "souscategorie_codes": [["smartphone, tablette et console", "ecran"]], - } - ) - - df_expected = pd.DataFrame( - { - "action_id": [1], - "acteur_id": [1], - "action": ["reparer"], - "sous_categories": [["smartphone, tablette et console", "ecran"]], - "id": [1], - } - ) - expected_metadata = { - "number_of_merged_actors": 0, - "number_of_propositionservices": 1, - } - - result = propose_services( - df=df_create_actors, - displayedpropositionservice_max_id=1, - actions_id_by_code=actions_id_by_code, - ) - - assert result["df"].equals(df_expected) - assert result["metadata"] == expected_metadata diff --git a/dags_unit_tests/sources/tasks/business_logic/test_propose_services_sous_categories.py b/dags_unit_tests/sources/tasks/business_logic/test_propose_services_sous_categories.py deleted file mode 100644 index 9479d2f8b..000000000 --- a/dags_unit_tests/sources/tasks/business_logic/test_propose_services_sous_categories.py +++ /dev/null @@ -1,65 +0,0 @@ -import pandas as pd -import pytest -from sources.tasks.business_logic.propose_services_sous_categories import ( - propose_services_sous_categories, -) - - -class TestProposeServicesSousCategories: - def test_create_proposition_services_sous_categories( - self, souscategorieobjet_code_by_id - ): - df_result = propose_services_sous_categories( - df_ps=pd.DataFrame( - { - "action_id": [1, 3, 1, 3], - "acteur_id": [1, 1, 2, 2], - "action": ["reparer", "trier", "reparer", "trier"], - "sous_categories": [ - ["smartphone, tablette et console"], - ["smartphone, tablette et console"], - ["ecran"], - ["ecran"], - ], - "id": [1, 2, 3, 4], - } - ), - souscats_id_by_code=souscategorieobjet_code_by_id, - ) - - pd.testing.assert_frame_equal( - df_result, - pd.DataFrame( - { - "propositionservice_id": [1, 2, 3, 4], - "souscategorieobjet_id": [102, 102, 101, 101], - "souscategorie": [ - "smartphone, tablette et console", - "smartphone, tablette et console", - "ecran", - "ecran", - ], - } - ), - ) - - def test_create_proposition_services_sous_categories_empty_products( - self, souscategorieobjet_code_by_id - ): - with pytest.raises(ValueError): - propose_services_sous_categories( - df_ps=pd.DataFrame( - { - "action_id": [1, 1], - "acteur_id": [1, 2], - "action": ["reparer", "reparer"], - "acteur_service": [ - "Service de réparation", - "Service de réparation", - ], - "sous_categories": ["", []], - "id": [1, 2], - } - ), - souscats_id_by_code=souscategorieobjet_code_by_id, - ) diff --git a/dags_unit_tests/sources/tasks/business_logic/test_test_propose_acteur_to_delete.py b/dags_unit_tests/sources/tasks/business_logic/test_test_propose_acteur_to_delete.py index dee0070e3..12ba35369 100644 --- a/dags_unit_tests/sources/tasks/business_logic/test_test_propose_acteur_to_delete.py +++ b/dags_unit_tests/sources/tasks/business_logic/test_test_propose_acteur_to_delete.py @@ -10,7 +10,7 @@ class TestActeurToDelete: @pytest.mark.parametrize( ( - "df_acteurs_from_db1, df_acteurs_for_source, df_expected_acteur_to_delete," + "df_acteur_from_db1, df_acteurs_for_source, df_expected_acteur_to_delete," " expected_metadata" ), [ @@ -115,14 +115,14 @@ class TestActeurToDelete: ) def test_propose_acteur_to_delete( self, - df_acteurs_from_db1, + df_acteur_from_db1, df_acteurs_for_source, df_expected_acteur_to_delete, expected_metadata, ): result = propose_acteur_to_delete( df_acteurs_for_source=df_acteurs_for_source, - df_acteurs_from_db=df_acteurs_from_db1, + df_acteur_from_db=df_acteur_from_db1, ) df_returned_acteur_to_delete = result["df_acteur_to_delete"] diff --git a/dags_unit_tests/utils/test_db_data_prepare.py b/dags_unit_tests/utils/test_db_data_prepare.py index dde80bb35..0764a7422 100644 --- a/dags_unit_tests/utils/test_db_data_prepare.py +++ b/dags_unit_tests/utils/test_db_data_prepare.py @@ -1,211 +1,97 @@ -from datetime import datetime - import pandas as pd import pytest -from sources.tasks.business_logic.db_data_prepare import db_data_prepare - -class TestDBDataPrepare: +from dags.sources.tasks.business_logic.db_data_prepare import db_data_prepare - @pytest.mark.parametrize( - "propose_labels, expected_labels", - [ - ( - pd.DataFrame(columns=["acteur_id", "labelqualite_id"]), - [None, None], - ), - ( - pd.DataFrame( - { - "acteur_id": [1, 2, 2], - "labelqualite_id": [1, 1, 2], - } - ), - [ - [ - { - "acteur_id": 1, - "labelqualite_id": 1, - } - ], - [ - { - "acteur_id": 2, - "labelqualite_id": 1, - }, - { - "acteur_id": 2, - "labelqualite_id": 2, - }, - ], - ], - ), - ], - ) - def test_db_data_prepare_labels( - self, - df_proposition_services, - df_proposition_services_sous_categories, - propose_labels, - expected_labels, - df_acteurs_from_db, - source_id_by_code, - acteurtype_id_by_code, - ): - df_result = db_data_prepare( - df_acteur_to_delete=pd.DataFrame( - { - "identifiant_unique": [3], - "statut": ["ACTIF"], - "cree_le": [datetime(2024, 1, 1)], - } - ), - df_acteur=pd.DataFrame( +@pytest.mark.parametrize( + "df_acteur,df_acteur_from_db, expected_output", + [ + ( + pd.DataFrame( { - "identifiant_unique": [1, 2], - "source_code": ["source1", "source2"], - "acteur_type_code": ["commerce", "commerce"], + "identifiant_unique": ["2", "3"], + "statut": ["ACTIF", "ACTIF"], + "cree_le": ["2021-01-01", "2021-01-01"], + "proposition_services": [[{"prop2": "val2"}], [{"prop3": "val3"}]], } ), - df_ps=df_proposition_services, - df_pssc=df_proposition_services_sous_categories, - df_labels=propose_labels, - df_acteur_services=pd.DataFrame( + pd.DataFrame( { - "acteur_id": [1, 2], - "acteurservice_id": [10, 10], - "acteurservice": [ - "Service de réparation", - "Service de réparation", - ], + "identifiant_unique": ["1", "2"], + "statut": ["ACTIF", "ACTIF"], + "cree_le": ["2021-01-01", "2021-01-01"], } ), - df_acteurs_from_db=df_acteurs_from_db, - source_id_by_code=source_id_by_code, - acteurtype_id_by_code=acteurtype_id_by_code, - ) - - assert "labels" in df_result["df_acteur_to_create"].columns - assert list(df_result["df_acteur_to_create"]["labels"]) == expected_labels - - @pytest.mark.parametrize( - "propose_acteur_services, expected_acteur_services", - [ - ( - pd.DataFrame( + { + "df_acteur_to_create": pd.DataFrame( { - "acteur_id": [1, 2, 2], - "acteurservice_id": [10, 10, 20], - "acteurservice": [ - "Service de réparation", - "Service de réparation", - "Collecte par une structure spécialisée", + "identifiant_unique": ["3"], + "statut": ["ACTIF"], + "cree_le": ["2021-01-01"], + "proposition_services": [[{"prop3": "val3"}]], + "suggestion": [ + '{"identifiant_unique": "3", "statut": "ACTIF",' + ' "cree_le": "2021-01-01",' + ' "proposition_services": [{"prop3": "val3"}]}' ], } ), - [ - [ - { - "acteur_id": 1, - "acteurservice": "Service de réparation", - "acteurservice_id": 10, - } - ], - [ - { - "acteur_id": 2, - "acteurservice": "Service de réparation", - "acteurservice_id": 10, - }, - { - "acteur_id": 2, - "acteurservice": "Collecte par une structure spécialisée", - "acteurservice_id": 20, - }, - ], - ], - ), - ], - ) - def test_db_data_prepare_acteur_services( - self, - df_proposition_services, - df_proposition_services_sous_categories, - propose_acteur_services, - expected_acteur_services, - df_acteurs_from_db, - source_id_by_code, - acteurtype_id_by_code, - ): - df_result = db_data_prepare( - df_acteur_to_delete=pd.DataFrame( - { - "identifiant_unique": [3], - "statut": ["ACTIF"], - "cree_le": [datetime(2024, 1, 1)], - } - ), - df_acteur=pd.DataFrame( - { - "identifiant_unique": [1, 2], - "source_code": ["source1", "source2"], - "acteur_type_code": ["commerce", "commerce"], - } - ), - df_ps=df_proposition_services, - df_pssc=df_proposition_services_sous_categories, - df_labels=pd.DataFrame(columns=["acteur_id", "labelqualite_id"]), - df_acteur_services=propose_acteur_services, - df_acteurs_from_db=df_acteurs_from_db, - source_id_by_code=source_id_by_code, - acteurtype_id_by_code=acteurtype_id_by_code, - ) - - assert "acteur_services" in df_result["df_acteur_to_create"].columns - assert ( - list(df_result["df_acteur_to_create"]["acteur_services"]) - == expected_acteur_services - ) - - def test_db_data_prepare_acteur_services_empty( - self, - df_proposition_services, - df_proposition_services_sous_categories, - df_acteurs_from_db, - source_id_by_code, - acteurtype_id_by_code, - ): - - with pytest.raises(ValueError) as erreur: - db_data_prepare( - df_acteur_to_delete=pd.DataFrame( + "df_acteur_to_update": pd.DataFrame( { - "identifiant_unique": [3], + "identifiant_unique": ["2"], "statut": ["ACTIF"], - "cree_le": [datetime(2024, 1, 1)], + "cree_le": ["2021-01-01"], + "proposition_services": [[{"prop2": "val2"}]], + "suggestion": [ + '{"identifiant_unique": "2", "statut": "ACTIF",' + ' "cree_le": "2021-01-01",' + ' "proposition_services": [{"prop2": "val2"}]}' + ], + "contexte": [ + '{"identifiant_unique": "2", "statut": "ACTIF",' + ' "cree_le": "2021-01-01"}' + ], } ), - df_acteur=pd.DataFrame( + "df_acteur_to_delete": pd.DataFrame( { - "identifiant_unique": [1, 2], - "source_code": ["source1", "source2"], - "acteur_type_code": ["commerce", "commerce"], + "identifiant_unique": ["1"], + "statut": ["SUPPRIME"], + "suggestion": [ + '{"identifiant_unique": "1", "statut": "SUPPRIME"}' + ], + "contexte": [ + '{"identifiant_unique": "1", "statut": "ACTIF",' + ' "cree_le": "2021-01-01"}' + ], } ), - df_ps=df_proposition_services, - df_pssc=df_proposition_services_sous_categories, - df_labels=pd.DataFrame(columns=["acteur_id", "labelqualite_id"]), - df_acteur_services=pd.DataFrame( - columns=["acteur_id", "acteurservice_id", "acteurservice"] - ), - df_acteurs_from_db=df_acteurs_from_db, - source_id_by_code=source_id_by_code, - acteurtype_id_by_code=acteurtype_id_by_code, - ) - assert str(erreur.value) == "df_acteur_services est vide" + }, + ) + ], +) +def test_db_data_prepare(df_acteur, df_acteur_from_db, expected_output): + result = db_data_prepare(df_acteur, df_acteur_from_db) + pd.testing.assert_frame_equal( + result["df_acteur_to_delete"].reset_index(drop=True), + expected_output["df_acteur_to_delete"].reset_index(drop=True), + ) + pd.testing.assert_frame_equal( + result["df_acteur_to_create"].reset_index(drop=True), + expected_output["df_acteur_to_create"].reset_index(drop=True), + ) + pd.testing.assert_frame_equal( + result["df_acteur_to_update"].reset_index(drop=True), + expected_output["df_acteur_to_update"].reset_index(drop=True), + ) -class TestActeurToCreateToDeleteToUpdate: - # FIXME : tests à écrire - pass + +def test_db_data_prepare_raise(): + with pytest.raises(ValueError) as error: + db_data_prepare( + pd.DataFrame(columns=["identifiant_unique", "statut", "cree_le"]), + pd.DataFrame(columns=["identifiant_unique", "statut", "cree_le"]), + ) + assert str(error.value) == "df_acteur est vide" diff --git a/data/migrations/0003_rename_context_suggestion_contexte.py b/data/migrations/0003_rename_context_suggestion_contexte.py new file mode 100644 index 000000000..719c60c22 --- /dev/null +++ b/data/migrations/0003_rename_context_suggestion_contexte.py @@ -0,0 +1,18 @@ +# Generated by Django 5.1.4 on 2025-01-21 22:36 + +from django.db import migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ("data", "0002_tables_suggestion"), + ] + + operations = [ + migrations.RenameField( + model_name="suggestion", + old_name="context", + new_name="contexte", + ), + ] diff --git a/data/models.py b/data/models.py index c8f0730ce..dd7c6a0f4 100644 --- a/data/models.py +++ b/data/models.py @@ -94,7 +94,7 @@ class Suggestion(models.Model): choices=SuggestionStatut.choices, default=SuggestionStatut.AVALIDER, ) - context = models.JSONField( + contexte = models.JSONField( null=True, blank=True, verbose_name="Contexte de la suggestion : données initiales", From 4307cc6af74038a8ea24fd1cc73e33819d023c41 Mon Sep 17 00:00:00 2001 From: Nicolas Oudard Date: Wed, 22 Jan 2025 08:35:23 +0100 Subject: [PATCH 3/7] Ajout de la config pour toutes les sources --- core/context_processors.py | 2 +- dags/sources/dags/source_aliapur.py | 5 +++++ dags/sources/dags/source_citeo.py | 5 +++++ dags/sources/dags/source_corepile.py | 5 +++++ dags/sources/dags/source_cyclevia.py | 5 +++++ dags/sources/dags/source_ecodds.py | 5 +++++ dags/sources/dags/source_ecologic.py | 5 +++++ dags/sources/dags/source_ecomaison.py | 5 +++++ dags/sources/dags/source_ecopae.py | 5 +++++ dags/sources/dags/source_ecosystem.py | 5 +++++ dags/sources/dags/source_ocab.py | 5 +++++ dags/sources/dags/source_ocad3e.py | 5 +++++ dags/sources/dags/source_pharmacies.py | 5 +++++ dags/sources/dags/source_pyreo.py | 5 +++++ dags/sources/dags/source_refashion.py | 5 +++++ dags/sources/dags/source_screlec.py | 5 +++++ dags/sources/dags/source_sinoe.py | 5 +++++ dags/sources/dags/source_soren.py | 5 +++++ dags/sources/dags/source_valdelia.py | 5 +++++ 19 files changed, 91 insertions(+), 1 deletion(-) diff --git a/core/context_processors.py b/core/context_processors.py index aea8943fe..ae3ec21cb 100644 --- a/core/context_processors.py +++ b/core/context_processors.py @@ -22,8 +22,8 @@ def content(request): def assistant(request) -> dict: return { "assistant": { - "is_iframe": "iframe" in request.GET, "is_home": request.path == reverse("qfdmd:home"), + "is_iframe": request.session.get("iframe"), "POSTHOG_KEY": settings.ASSISTANT["POSTHOG_KEY"], "MATOMO_ID": settings.ASSISTANT["MATOMO_ID"], }, diff --git a/dags/sources/dags/source_aliapur.py b/dags/sources/dags/source_aliapur.py index 89b4d216c..e42d7b260 100755 --- a/dags/sources/dags/source_aliapur.py +++ b/dags/sources/dags/source_aliapur.py @@ -115,6 +115,11 @@ "transformation": "clean_action_codes", "destination": ["action_codes"], }, + { + "origin": ["action_codes", "souscategorie_codes"], + "transformation": "clean_proposition_services", + "destination": ["proposition_services_codes"], + }, # 5. Supression des colonnes {"remove": "_i"}, {"remove": "_id"}, diff --git a/dags/sources/dags/source_citeo.py b/dags/sources/dags/source_citeo.py index 17608fa84..b83eb2d1f 100755 --- a/dags/sources/dags/source_citeo.py +++ b/dags/sources/dags/source_citeo.py @@ -105,6 +105,11 @@ "transformation": "clean_action_codes", "destination": ["action_codes"], }, + { + "origin": ["action_codes", "souscategorie_codes"], + "transformation": "clean_proposition_services", + "destination": ["proposition_services_codes"], + }, # 5. Supression des colonnes {"remove": "_i"}, {"remove": "_id"}, diff --git a/dags/sources/dags/source_corepile.py b/dags/sources/dags/source_corepile.py index 21e5feef3..55fa38728 100755 --- a/dags/sources/dags/source_corepile.py +++ b/dags/sources/dags/source_corepile.py @@ -109,6 +109,11 @@ "transformation": "clean_action_codes", "destination": ["action_codes"], }, + { + "origin": ["action_codes", "souscategorie_codes"], + "transformation": "clean_proposition_services", + "destination": ["proposition_services_codes"], + }, # 5. Supression des colonnes {"remove": "_i"}, {"remove": "_id"}, diff --git a/dags/sources/dags/source_cyclevia.py b/dags/sources/dags/source_cyclevia.py index aadde838d..d3cfa142c 100755 --- a/dags/sources/dags/source_cyclevia.py +++ b/dags/sources/dags/source_cyclevia.py @@ -138,6 +138,11 @@ "transformation": "clean_action_codes", "destination": ["action_codes"], }, + { + "origin": ["action_codes", "souscategorie_codes"], + "transformation": "clean_proposition_services", + "destination": ["proposition_services_codes"], + }, # 5. Supression des colonnes {"remove": "_i"}, {"remove": "_id"}, diff --git a/dags/sources/dags/source_ecodds.py b/dags/sources/dags/source_ecodds.py index d02c0ec80..0cf3686b7 100755 --- a/dags/sources/dags/source_ecodds.py +++ b/dags/sources/dags/source_ecodds.py @@ -115,6 +115,11 @@ "transformation": "clean_action_codes", "destination": ["action_codes"], }, + { + "origin": ["action_codes", "souscategorie_codes"], + "transformation": "clean_proposition_services", + "destination": ["proposition_services_codes"], + }, # 5. Supression des colonnes {"remove": "_i"}, {"remove": "_id"}, diff --git a/dags/sources/dags/source_ecologic.py b/dags/sources/dags/source_ecologic.py index b4a0d9642..b4280e5ee 100755 --- a/dags/sources/dags/source_ecologic.py +++ b/dags/sources/dags/source_ecologic.py @@ -105,6 +105,11 @@ "transformation": "clean_action_codes", "destination": ["action_codes"], }, + { + "origin": ["action_codes", "souscategorie_codes"], + "transformation": "clean_proposition_services", + "destination": ["proposition_services_codes"], + }, # 5. Supression des colonnes {"remove": "_i"}, {"remove": "_id"}, diff --git a/dags/sources/dags/source_ecomaison.py b/dags/sources/dags/source_ecomaison.py index be1afecb3..966ab09e6 100755 --- a/dags/sources/dags/source_ecomaison.py +++ b/dags/sources/dags/source_ecomaison.py @@ -124,6 +124,11 @@ "transformation": "clean_action_codes", "destination": ["action_codes"], }, + { + "origin": ["action_codes", "souscategorie_codes"], + "transformation": "clean_proposition_services", + "destination": ["proposition_services_codes"], + }, # 5. Supression des colonnes {"remove": "_i"}, {"remove": "_id"}, diff --git a/dags/sources/dags/source_ecopae.py b/dags/sources/dags/source_ecopae.py index 95914f36f..8efcfc310 100755 --- a/dags/sources/dags/source_ecopae.py +++ b/dags/sources/dags/source_ecopae.py @@ -114,6 +114,11 @@ "transformation": "clean_action_codes", "destination": ["action_codes"], }, + { + "origin": ["action_codes", "souscategorie_codes"], + "transformation": "clean_proposition_services", + "destination": ["proposition_services_codes"], + }, # 5. Supression des colonnes {"remove": "_i"}, {"remove": "_id"}, diff --git a/dags/sources/dags/source_ecosystem.py b/dags/sources/dags/source_ecosystem.py index fa438259b..1ec510ef1 100755 --- a/dags/sources/dags/source_ecosystem.py +++ b/dags/sources/dags/source_ecosystem.py @@ -105,6 +105,11 @@ "transformation": "clean_action_codes", "destination": ["action_codes"], }, + { + "origin": ["action_codes", "souscategorie_codes"], + "transformation": "clean_proposition_services", + "destination": ["proposition_services_codes"], + }, # 5. Supression des colonnes {"remove": "_i"}, {"remove": "_id"}, diff --git a/dags/sources/dags/source_ocab.py b/dags/sources/dags/source_ocab.py index e23f970fd..bb75e9577 100755 --- a/dags/sources/dags/source_ocab.py +++ b/dags/sources/dags/source_ocab.py @@ -109,6 +109,11 @@ "transformation": "clean_action_codes", "destination": ["action_codes"], }, + { + "origin": ["action_codes", "souscategorie_codes"], + "transformation": "clean_proposition_services", + "destination": ["proposition_services_codes"], + }, # 5. Supression des colonnes {"remove": "_i"}, {"remove": "_id"}, diff --git a/dags/sources/dags/source_ocad3e.py b/dags/sources/dags/source_ocad3e.py index 367891773..982bb0df6 100755 --- a/dags/sources/dags/source_ocad3e.py +++ b/dags/sources/dags/source_ocad3e.py @@ -120,6 +120,11 @@ "transformation": "clean_action_codes", "destination": ["action_codes"], }, + { + "origin": ["action_codes", "souscategorie_codes"], + "transformation": "clean_proposition_services", + "destination": ["proposition_services_codes"], + }, # 5. Supression des colonnes {"remove": "_i"}, {"remove": "_id"}, diff --git a/dags/sources/dags/source_pharmacies.py b/dags/sources/dags/source_pharmacies.py index 6ca5ced72..87e272f0c 100755 --- a/dags/sources/dags/source_pharmacies.py +++ b/dags/sources/dags/source_pharmacies.py @@ -93,6 +93,11 @@ "transformation": "clean_identifiant_unique", "destination": ["identifiant_unique"], }, + { + "origin": ["action_codes", "souscategorie_codes"], + "transformation": "clean_proposition_services", + "destination": ["proposition_services_codes"], + }, # 5. Supression des colonnes {"remove": "Téléphone"}, {"remove": "Région"}, diff --git a/dags/sources/dags/source_pyreo.py b/dags/sources/dags/source_pyreo.py index f87eff16a..ef96a4cc8 100755 --- a/dags/sources/dags/source_pyreo.py +++ b/dags/sources/dags/source_pyreo.py @@ -139,6 +139,11 @@ "transformation": "clean_action_codes", "destination": ["action_codes"], }, + { + "origin": ["action_codes", "souscategorie_codes"], + "transformation": "clean_proposition_services", + "destination": ["proposition_services_codes"], + }, # 5. Supression des colonnes {"remove": "_i"}, {"remove": "_id"}, diff --git a/dags/sources/dags/source_refashion.py b/dags/sources/dags/source_refashion.py index ce5ddc54b..e1333bbf7 100755 --- a/dags/sources/dags/source_refashion.py +++ b/dags/sources/dags/source_refashion.py @@ -139,6 +139,11 @@ "transformation": "clean_action_codes", "destination": ["action_codes"], }, + { + "origin": ["action_codes", "souscategorie_codes"], + "transformation": "clean_proposition_services", + "destination": ["proposition_services_codes"], + }, # 5. Supression des colonnes {"remove": "_i"}, {"remove": "_id"}, diff --git a/dags/sources/dags/source_screlec.py b/dags/sources/dags/source_screlec.py index 4b7ddf7d0..5469cbe8b 100644 --- a/dags/sources/dags/source_screlec.py +++ b/dags/sources/dags/source_screlec.py @@ -130,6 +130,11 @@ "transformation": "clean_action_codes", "destination": ["action_codes"], }, + { + "origin": ["action_codes", "souscategorie_codes"], + "transformation": "clean_proposition_services", + "destination": ["proposition_services_codes"], + }, # 5. Supression des colonnes {"remove": "_i"}, {"remove": "_id"}, diff --git a/dags/sources/dags/source_sinoe.py b/dags/sources/dags/source_sinoe.py index b3a117c25..1ac2a0dd4 100755 --- a/dags/sources/dags/source_sinoe.py +++ b/dags/sources/dags/source_sinoe.py @@ -107,6 +107,11 @@ "transformation": "clean_identifiant_unique", "destination": ["identifiant_unique"], }, + { + "origin": ["action_codes", "souscategorie_codes"], + "transformation": "clean_proposition_services", + "destination": ["proposition_services_codes"], + }, # 5. Supression des colonnes {"remove": "_geopoint"}, {"remove": "_i"}, diff --git a/dags/sources/dags/source_soren.py b/dags/sources/dags/source_soren.py index 9c4998e87..9d303b336 100755 --- a/dags/sources/dags/source_soren.py +++ b/dags/sources/dags/source_soren.py @@ -121,6 +121,11 @@ "transformation": "clean_action_codes", "destination": ["action_codes"], }, + { + "origin": ["action_codes", "souscategorie_codes"], + "transformation": "clean_proposition_services", + "destination": ["proposition_services_codes"], + }, # 5. Supression des colonnes {"remove": "_i"}, {"remove": "_id"}, diff --git a/dags/sources/dags/source_valdelia.py b/dags/sources/dags/source_valdelia.py index afd37f36e..ab14754b5 100755 --- a/dags/sources/dags/source_valdelia.py +++ b/dags/sources/dags/source_valdelia.py @@ -130,6 +130,11 @@ "transformation": "clean_action_codes", "destination": ["action_codes"], }, + { + "origin": ["action_codes", "souscategorie_codes"], + "transformation": "clean_proposition_services", + "destination": ["proposition_services_codes"], + }, # 5. Supression des colonnes {"remove": "_i"}, {"remove": "_id"}, From 1c296d63d06ee142edf334ce50ccaa7b22477cc4 Mon Sep 17 00:00:00 2001 From: Nicolas Oudard Date: Wed, 22 Jan 2025 09:54:14 +0100 Subject: [PATCH 4/7] rename actors to acteur --- .../tasks/airflow_logic/db_data_prepare_task.py | 2 +- .../db_write_validsuggestions_task.py | 2 +- .../business_logic/db_normalize_suggestion.py | 12 ++++++------ .../business_logic/db_write_validsuggestions.py | 16 ++++++++-------- 4 files changed, 16 insertions(+), 16 deletions(-) diff --git a/dags/sources/tasks/airflow_logic/db_data_prepare_task.py b/dags/sources/tasks/airflow_logic/db_data_prepare_task.py index 4edb37488..8f9f8a4f7 100644 --- a/dags/sources/tasks/airflow_logic/db_data_prepare_task.py +++ b/dags/sources/tasks/airflow_logic/db_data_prepare_task.py @@ -20,7 +20,7 @@ def db_data_prepare_wrapper(**kwargs): df_acteur = kwargs["ti"].xcom_pull(task_ids="compute_link_tables") df_acteur_from_db = kwargs["ti"].xcom_pull(task_ids="db_read_acteur") - log.preview("df_actors", df_acteur) + log.preview("df_acteur", df_acteur) log.preview("df_acteur_from_db", df_acteur_from_db) return db_data_prepare( diff --git a/dags/suggestions/tasks/airflow_logic/db_write_validsuggestions_task.py b/dags/suggestions/tasks/airflow_logic/db_write_validsuggestions_task.py index 9a010d83c..6b3fa78e5 100644 --- a/dags/suggestions/tasks/airflow_logic/db_write_validsuggestions_task.py +++ b/dags/suggestions/tasks/airflow_logic/db_write_validsuggestions_task.py @@ -17,7 +17,7 @@ def db_write_validsuggestions_task(dag: DAG) -> PythonOperator: def db_write_validsuggestions_wrapper(**kwargs): data_acteurs_normalized = kwargs["ti"].xcom_pull(task_ids="db_normalize_suggestion") - log.preview("data_acteurs_normalized acteur", data_acteurs_normalized["actors"]) + log.preview("data_acteurs_normalized acteur", data_acteurs_normalized["acteur"]) log.preview( "data_acteurs_normalized change_type", data_acteurs_normalized["change_type"] ) diff --git a/dags/suggestions/tasks/business_logic/db_normalize_suggestion.py b/dags/suggestions/tasks/business_logic/db_normalize_suggestion.py index 7630792ae..34b896811 100644 --- a/dags/suggestions/tasks/business_logic/db_normalize_suggestion.py +++ b/dags/suggestions/tasks/business_logic/db_normalize_suggestion.py @@ -43,7 +43,7 @@ def db_normalize_suggestion(): df_acteur = pd.concat(normalized_dfs.tolist(), ignore_index=True) log.preview("df_acteur_to_delete", df_acteur) return { - "actors": df_acteur, + "acteur": df_acteur, "dag_run_id": suggestion_cohorte_id, "change_type": type_action, } @@ -51,16 +51,16 @@ def db_normalize_suggestion(): raise ValueError("No suggestion found") -def normalize_acteur_update_for_db(df_actors, dag_run_id, engine, type_action): - df_labels = process_many2many_df(df_actors, "labels") +def normalize_acteur_update_for_db(df_acteur, dag_run_id, engine, type_action): + df_labels = process_many2many_df(df_acteur, "labels") df_acteur_services = process_many2many_df( - df_actors, "acteur_services", df_columns=["acteur_id", "acteurservice_id"] + df_acteur, "acteur_services", df_columns=["acteur_id", "acteurservice_id"] ) max_id_pds = pd.read_sql_query( "SELECT max(id) FROM qfdmo_propositionservice", engine )["max"][0] - normalized_pds_dfs = df_actors["proposition_services"].apply(pd.json_normalize) + normalized_pds_dfs = df_acteur["proposition_services"].apply(pd.json_normalize) df_pds = pd.concat(normalized_pds_dfs.tolist(), ignore_index=True) ids_range = range(max_id_pds + 1, max_id_pds + 1 + len(df_pds)) @@ -77,7 +77,7 @@ def normalize_acteur_update_for_db(df_actors, dag_run_id, engine, type_action): df_pdssc = pd.concat(normalized_pdssc_dfs.tolist(), ignore_index=True) return { - "actors": df_actors, + "acteur": df_acteur, "pds": df_pds[["id", "action_id", "acteur_id"]], "pds_sous_categories": df_pdssc[ ["propositionservice_id", "souscategorieobjet_id"] diff --git a/dags/suggestions/tasks/business_logic/db_write_validsuggestions.py b/dags/suggestions/tasks/business_logic/db_write_validsuggestions.py index 05c3543f6..7e3862c4d 100644 --- a/dags/suggestions/tasks/business_logic/db_write_validsuggestions.py +++ b/dags/suggestions/tasks/business_logic/db_write_validsuggestions.py @@ -11,13 +11,13 @@ def db_write_validsuggestions(data_acteurs_normalized: dict): # If data_set is empty, nothing to do dag_run_id = data_acteurs_normalized["dag_run_id"] engine = PostgresConnectionManager().engine - if "actors" not in data_acteurs_normalized: + if "acteur" not in data_acteurs_normalized: with engine.begin() as connection: update_suggestion_status( connection, dag_run_id, constants.SUGGESTION_ENCOURS ) return - df_actors = data_acteurs_normalized["actors"] + df_acteur = data_acteurs_normalized["acteur"] df_labels = data_acteurs_normalized.get("labels") df_acteur_services = data_acteurs_normalized.get("acteur_services") df_pds = data_acteurs_normalized.get("pds") @@ -31,10 +31,10 @@ def db_write_validsuggestions(data_acteurs_normalized: dict): constants.SUGGESTION_SOURCE_MODIFICATION, ]: db_write_acteurupdate( - connection, df_actors, df_labels, df_acteur_services, df_pds, df_pdssc + connection, df_acteur, df_labels, df_acteur_services, df_pds, df_pdssc ) elif change_type == constants.SUGGESTION_SOURCE_SUPRESSION: - db_write_acteurdelete(connection, df_actors) + db_write_acteurdelete(connection, df_acteur) else: raise ValueError("Invalid change_type") @@ -42,11 +42,11 @@ def db_write_validsuggestions(data_acteurs_normalized: dict): def db_write_acteurupdate( - connection, df_actors, df_labels, df_acteur_services, df_pds, df_pdssc + connection, df_acteur, df_labels, df_acteur_services, df_pds, df_pdssc ): logger.warning("Création ou mise à jour des acteurs") - df_actors[["identifiant_unique"]].to_sql( + df_acteur[["identifiant_unique"]].to_sql( "temp_actors", connection, if_exists="replace" ) @@ -88,10 +88,10 @@ def db_write_acteurupdate( # Filtrer les colonnes qui existent dans le DataFrame colonnes_existantes = [ - col for col in colonnes_souhaitees if col in df_actors.columns + col for col in colonnes_souhaitees if col in df_acteur.columns ] - df_actors[colonnes_existantes].to_sql( + df_acteur[colonnes_existantes].to_sql( "qfdmo_acteur", connection, if_exists="append", From 089d20049059a070f50a86c5156c9ceb22bb7a09 Mon Sep 17 00:00:00 2001 From: Nicolas Oudard Date: Wed, 22 Jan 2025 14:07:56 +0100 Subject: [PATCH 5/7] check mandatory fields --- .../source_data_validate_task.py | 5 +--- .../tasks/business_logic/db_data_prepare.py | 4 ---- .../business_logic/source_config_validate.py | 16 ++++++++++--- .../business_logic/source_data_validate.py | 17 ++++++++----- dags/sources/tasks/transform/transform_df.py | 10 ++++++++ dags_unit_tests/conftest.py | 20 ++++++++++++++-- .../business_logic}/test_db_data_prepare.py | 9 ------- .../test_source_config_validate.py | 24 +++++++++++++------ 8 files changed, 70 insertions(+), 35 deletions(-) rename dags_unit_tests/{utils => sources/tasks/business_logic}/test_db_data_prepare.py (91%) diff --git a/dags/sources/tasks/airflow_logic/source_data_validate_task.py b/dags/sources/tasks/airflow_logic/source_data_validate_task.py index ae2b1f823..25dc8e15a 100644 --- a/dags/sources/tasks/airflow_logic/source_data_validate_task.py +++ b/dags/sources/tasks/airflow_logic/source_data_validate_task.py @@ -21,12 +21,9 @@ def source_data_validate_task(dag: DAG) -> PythonOperator: def source_data_validate_wrapper(**kwargs) -> None: df = kwargs["ti"].xcom_pull(task_ids="source_data_normalize") - params = kwargs["params"] - log.preview("df depuis source_data_normalize", df) - log.preview("paramètres du DAG", params) + log.preview("df before validation", df) return source_data_validate( df=df, - params=params, ) diff --git a/dags/sources/tasks/business_logic/db_data_prepare.py b/dags/sources/tasks/business_logic/db_data_prepare.py index 208e01f93..8b4cc266e 100644 --- a/dags/sources/tasks/business_logic/db_data_prepare.py +++ b/dags/sources/tasks/business_logic/db_data_prepare.py @@ -40,10 +40,6 @@ def db_data_prepare( how="inner", ) - # FIXME : à faire avant dans la validation des données - if df_acteur.empty: - raise ValueError("df_acteur est vide") - # FIXME : à faire avant dans la normalisation des données # Inactivate acteur if propositions_services is empty df_acteur.loc[ diff --git a/dags/sources/tasks/business_logic/source_config_validate.py b/dags/sources/tasks/business_logic/source_config_validate.py index db16e26cb..56549364d 100755 --- a/dags/sources/tasks/business_logic/source_config_validate.py +++ b/dags/sources/tasks/business_logic/source_config_validate.py @@ -7,6 +7,7 @@ NormalizationColumnTransform, NormalizationDFTransform, ) +from sources.tasks.transform.transform_df import MANDATORY_COLUMNS_AFTER_NORMALISATION from utils import logging_utils as log logger = logging.getLogger(__name__) @@ -20,9 +21,18 @@ def source_config_validate( pour éviter d'engendrer des coûts d'infra (et de fournisseurs API) si on peut déjà déterminer que le DAG ne pourra pas fonctionner """ - # TODO: la validation de la structure même des paramètres devrait - # se faire avec un schéma de validation (dataclass, pydantic, etc.) - # potentiellement appliqué directement dans le fichier DAG + + # check that all mandatory columns are present in dag_config.get_expected_columns + expected_columns = dag_config.get_expected_columns() + if not all( + col in expected_columns for col in MANDATORY_COLUMNS_AFTER_NORMALISATION + ): + raise ValueError( + f""" +Mandatory columns are missing in dag_config, +Mandatory columns are: {MANDATORY_COLUMNS_AFTER_NORMALISATION} +Expected columns from dag_config: {expected_columns}""" + ) # Validation des sous-catégories produit qui doivent être mappées # et toutes correspondre à des codes valides dans notre DB diff --git a/dags/sources/tasks/business_logic/source_data_validate.py b/dags/sources/tasks/business_logic/source_data_validate.py index 7c02694c3..78ee4defc 100755 --- a/dags/sources/tasks/business_logic/source_data_validate.py +++ b/dags/sources/tasks/business_logic/source_data_validate.py @@ -2,9 +2,11 @@ from itertools import chain import pandas as pd +from sources.tasks.airflow_logic.config_management import DAGConfig from sources.tasks.business_logic.read_mapping_from_postgres import ( read_mapping_from_postgres, ) +from sources.tasks.transform.transform_df import MANDATORY_COLUMNS_AFTER_NORMALISATION from utils import db_tasks from utils import logging_utils as log @@ -13,18 +15,21 @@ def source_data_validate( df: pd.DataFrame, - params: dict, ) -> None: """Etape de validation des données source où on applique des règles métier scrictes. Par exemple, si un SIRET est malformé c'est qu'on pas bien fait notre travail à l'étape de normalisation""" - # Il nous faut au moins 1 acteur sinon on à un problème avec le source - # TODO: règles d'anomalies plus avancées (ex: entre 80% et 100% vs. existant) + # We check that at least 1 actor exists, else we raise an error because we have + # an issue with source's data if df.empty: - raise ValueError("Aucune donnée reçue par source_data_normalize") + raise ValueError("No data in dataframe from normalisation") log.preview("df avant validation", df) + for col in MANDATORY_COLUMNS_AFTER_NORMALISATION: + if col not in df.columns: + raise ValueError(f"A mandatory column is missing: {col}") + # ------------------------------------ # identifiant_externe # Pas de doublons sur identifiant_externe (false=garde first+last) @@ -63,14 +68,14 @@ def source_data_validate( # product_mapping # - les valeur du mapping des produit peuvent-être des listes vides quand aucun # produit n'est à associer - product_mapping = params.get("product_mapping", {}) souscats_codes_to_ids = read_mapping_from_postgres( table_name="qfdmo_souscategorieobjet" ) codes_db = set(souscats_codes_to_ids.keys()) codes_mapping = set( chain.from_iterable( - x if isinstance(x, list) else [x] for x in product_mapping.values() + x if isinstance(x, list) else [x] + for x in DAGConfig.product_mapping.values() ) ) codes_invalid = codes_mapping - codes_db diff --git a/dags/sources/tasks/transform/transform_df.py b/dags/sources/tasks/transform/transform_df.py index c7faa3006..bd29c1539 100644 --- a/dags/sources/tasks/transform/transform_df.py +++ b/dags/sources/tasks/transform/transform_df.py @@ -21,6 +21,16 @@ ACTEUR_TYPE_ESS = "ess" LABEL_ESS = "ess" LABEL_TO_IGNORE = ["non applicable", "na", "n/a", "null", "aucun", "non"] +MANDATORY_COLUMNS_AFTER_NORMALISATION = [ + "identifiant_unique", + "identifiant_externe", + "nom", + "acteurservice_codes", + "label_codes", + "proposition_services_codes", + "source_code", + "acteur_type_code", +] def merge_duplicates( diff --git a/dags_unit_tests/conftest.py b/dags_unit_tests/conftest.py index 99a1e6214..3db412abc 100755 --- a/dags_unit_tests/conftest.py +++ b/dags_unit_tests/conftest.py @@ -4,6 +4,7 @@ import pandas as pd import pytest +from faker import Faker from dags.sources.tasks.airflow_logic.config_management import DAGConfig @@ -130,8 +131,23 @@ def source_id_by_code(): def dag_config(): return DAGConfig.model_validate( { - "normalization_rules": [], + "normalization_rules": [ + { + "column": col, + "value": Faker().word(), + } + for col in [ + "identifiant_unique", + "identifiant_externe", + "nom", + "acteurservice_codes", + "label_codes", + "proposition_services_codes", + "source_code", + "acteur_type_code", + ] + ], "endpoint": "https://example.com/api", - "product_mapping": {}, + "product_mapping": {"product1": "code1"}, } ) diff --git a/dags_unit_tests/utils/test_db_data_prepare.py b/dags_unit_tests/sources/tasks/business_logic/test_db_data_prepare.py similarity index 91% rename from dags_unit_tests/utils/test_db_data_prepare.py rename to dags_unit_tests/sources/tasks/business_logic/test_db_data_prepare.py index 0764a7422..ade3ca42e 100644 --- a/dags_unit_tests/utils/test_db_data_prepare.py +++ b/dags_unit_tests/sources/tasks/business_logic/test_db_data_prepare.py @@ -86,12 +86,3 @@ def test_db_data_prepare(df_acteur, df_acteur_from_db, expected_output): result["df_acteur_to_update"].reset_index(drop=True), expected_output["df_acteur_to_update"].reset_index(drop=True), ) - - -def test_db_data_prepare_raise(): - with pytest.raises(ValueError) as error: - db_data_prepare( - pd.DataFrame(columns=["identifiant_unique", "statut", "cree_le"]), - pd.DataFrame(columns=["identifiant_unique", "statut", "cree_le"]), - ) - assert str(error.value) == "df_acteur est vide" diff --git a/dags_unit_tests/sources/tasks/business_logic/test_source_config_validate.py b/dags_unit_tests/sources/tasks/business_logic/test_source_config_validate.py index bad96e089..863d4f601 100644 --- a/dags_unit_tests/sources/tasks/business_logic/test_source_config_validate.py +++ b/dags_unit_tests/sources/tasks/business_logic/test_source_config_validate.py @@ -31,23 +31,33 @@ def test_source_config_validate_valid(codes_sc_db, dag_config): ) +def test_mandatory_columns(codes_sc_db, dag_config): + dag_config.normalization_rules = [] + with pytest.raises(ValueError) as error: + source_config_validate(dag_config=dag_config, codes_sc_db=codes_sc_db) + assert "Mandatory columns are missing in dag_config" in str(error) + + def test_product_mapping_no_code(codes_sc_db, dag_config): dag_config.product_mapping["product3"] = ["code4"] - with pytest.raises(ValueError): + with pytest.raises(ValueError) as error: source_config_validate(dag_config=dag_config, codes_sc_db=codes_sc_db) + assert "Codes product_mapping invalides:" in str(error) -def test_normalization_rules_not_list(codes_sc_db, dag_config): - dag_config.normalization_rules = "not_a_list" - with pytest.raises(ValueError): +def test_product_mapping_missing(codes_sc_db, dag_config): + dag_config.product_mapping = {} + with pytest.raises(ValueError) as error: source_config_validate(dag_config=dag_config, codes_sc_db=codes_sc_db) + assert "product_mapping manquant pour la source" in str(error) def test_normalization_rules_invalid_function(codes_sc_db, dag_config): - dag_config.normalization_rules = [ + dag_config.normalization_rules.append( NormalizationColumnTransform( origin="src", transformation="invalid_function", destination="dest" ) - ] - with pytest.raises(ValueError): + ) + with pytest.raises(ValueError) as error: source_config_validate(dag_config=dag_config, codes_sc_db=codes_sc_db) + assert "La fonction de transformation invalid_function n'existe pas" in str(error) From 1eaaffc3f24287318eab40ed17ed94f3a586fda6 Mon Sep 17 00:00:00 2001 From: Nicolas Oudard Date: Wed, 22 Jan 2025 16:03:54 +0100 Subject: [PATCH 6/7] use dag_config --- .../tasks/airflow_logic/source_data_validate_task.py | 7 ++++--- dags/sources/tasks/business_logic/source_data_validate.py | 6 ++---- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/dags/sources/tasks/airflow_logic/source_data_validate_task.py b/dags/sources/tasks/airflow_logic/source_data_validate_task.py index 25dc8e15a..3036916c1 100644 --- a/dags/sources/tasks/airflow_logic/source_data_validate_task.py +++ b/dags/sources/tasks/airflow_logic/source_data_validate_task.py @@ -3,6 +3,7 @@ from airflow import DAG from airflow.operators.python import PythonOperator from airflow.utils.trigger_rule import TriggerRule +from sources.tasks.airflow_logic.config_management import DAGConfig from sources.tasks.business_logic.source_data_validate import source_data_validate from utils import logging_utils as log @@ -21,9 +22,9 @@ def source_data_validate_task(dag: DAG) -> PythonOperator: def source_data_validate_wrapper(**kwargs) -> None: df = kwargs["ti"].xcom_pull(task_ids="source_data_normalize") + dag_config = DAGConfig.from_airflow_params(kwargs["params"]) log.preview("df before validation", df) + log.preview("DAG parameters", dag_config) - return source_data_validate( - df=df, - ) + return source_data_validate(df=df, dag_config=dag_config) diff --git a/dags/sources/tasks/business_logic/source_data_validate.py b/dags/sources/tasks/business_logic/source_data_validate.py index 78ee4defc..318e66a09 100755 --- a/dags/sources/tasks/business_logic/source_data_validate.py +++ b/dags/sources/tasks/business_logic/source_data_validate.py @@ -13,9 +13,7 @@ logger = logging.getLogger(__name__) -def source_data_validate( - df: pd.DataFrame, -) -> None: +def source_data_validate(df: pd.DataFrame, dag_config: DAGConfig) -> None: """Etape de validation des données source où on applique des règles métier scrictes. Par exemple, si un SIRET est malformé c'est qu'on pas bien fait notre travail à l'étape de normalisation""" @@ -75,7 +73,7 @@ def source_data_validate( codes_mapping = set( chain.from_iterable( x if isinstance(x, list) else [x] - for x in DAGConfig.product_mapping.values() + for x in dag_config.product_mapping.values() ) ) codes_invalid = codes_mapping - codes_db From 7c3cefbf7b20af9fee559ad6ea8ddb82341a482c Mon Sep 17 00:00:00 2001 From: Nicolas Oudard Date: Wed, 22 Jan 2025 16:47:49 +0100 Subject: [PATCH 7/7] test compute_link_tables --- .../airflow_logic/compute_link_tables_task.py | 2 +- .../db_read_propositions_max_id_task.py | 26 --- .../propose_acteur_to_delete_task.py | 33 --- .../read_mapping_from_postgres_task.py | 38 --- .../propose_acteur_to_delete.py | 34 --- .../business_logic/source_data_validate.py | 2 +- .../read_mapping_from_postgres.py | 0 .../test_compute_link_tables.py | 221 ++++++++++++++++++ .../test_test_propose_acteur_to_delete.py | 134 ----------- 9 files changed, 223 insertions(+), 267 deletions(-) delete mode 100644 dags/sources/tasks/airflow_logic/db_read_propositions_max_id_task.py delete mode 100644 dags/sources/tasks/airflow_logic/propose_acteur_to_delete_task.py delete mode 100644 dags/sources/tasks/airflow_logic/read_mapping_from_postgres_task.py delete mode 100644 dags/sources/tasks/business_logic/propose_acteur_to_delete.py rename dags/sources/tasks/{business_logic => transform}/read_mapping_from_postgres.py (100%) create mode 100755 dags_unit_tests/sources/tasks/business_logic/test_compute_link_tables.py delete mode 100644 dags_unit_tests/sources/tasks/business_logic/test_test_propose_acteur_to_delete.py diff --git a/dags/sources/tasks/airflow_logic/compute_link_tables_task.py b/dags/sources/tasks/airflow_logic/compute_link_tables_task.py index 885fbf73b..611a389e7 100644 --- a/dags/sources/tasks/airflow_logic/compute_link_tables_task.py +++ b/dags/sources/tasks/airflow_logic/compute_link_tables_task.py @@ -3,7 +3,7 @@ from airflow import DAG from airflow.operators.python import PythonOperator from sources.tasks.business_logic.compute_link_tables import compute_link_tables -from sources.tasks.business_logic.read_mapping_from_postgres import ( +from sources.tasks.transform.read_mapping_from_postgres import ( read_mapping_from_postgres, ) from utils import logging_utils as log diff --git a/dags/sources/tasks/airflow_logic/db_read_propositions_max_id_task.py b/dags/sources/tasks/airflow_logic/db_read_propositions_max_id_task.py deleted file mode 100644 index 77c9d7885..000000000 --- a/dags/sources/tasks/airflow_logic/db_read_propositions_max_id_task.py +++ /dev/null @@ -1,26 +0,0 @@ -from airflow import DAG -from airflow.operators.python import PythonOperator -from shared.tasks.database_logic.db_manager import PostgresConnectionManager -from sqlalchemy import text - - -# TODO : supprimer cette tache après avoir trouvé une solution pour le max_id -def db_read_propositions_max_id_task(dag: DAG) -> PythonOperator: - return PythonOperator( - task_id="db_read_propositions_max_id", - python_callable=db_read_propositions_max_id, - dag=dag, - ) - - -def db_read_propositions_max_id(): - engine = PostgresConnectionManager().engine - - # TODO : check if we need to manage the max id here - displayedpropositionservice_max_id = engine.execute( - text("SELECT max(id) FROM qfdmo_displayedpropositionservice") - ).scalar() - - return { - "displayedpropositionservice_max_id": displayedpropositionservice_max_id, - } diff --git a/dags/sources/tasks/airflow_logic/propose_acteur_to_delete_task.py b/dags/sources/tasks/airflow_logic/propose_acteur_to_delete_task.py deleted file mode 100644 index 6582d530f..000000000 --- a/dags/sources/tasks/airflow_logic/propose_acteur_to_delete_task.py +++ /dev/null @@ -1,33 +0,0 @@ -import logging - -from airflow import DAG -from airflow.operators.python import PythonOperator -from sources.tasks.business_logic.propose_acteur_to_delete import ( - propose_acteur_to_delete, -) -from utils import logging_utils as log - -logger = logging.getLogger(__name__) - - -def propose_acteur_to_delete_task(dag: DAG) -> PythonOperator: - return PythonOperator( - task_id="propose_acteur_to_delete", - python_callable=propose_acteur_to_delete_wrapper, - dag=dag, - ) - - -def propose_acteur_to_delete_wrapper(**kwargs): - df_acteurs_for_source = kwargs["ti"].xcom_pull(task_ids="propose_acteur_changes")[ - "df" - ] - df_acteur_from_db = kwargs["ti"].xcom_pull(task_ids="db_read_acteur") - - log.preview(df_acteurs_for_source, "df_acteurs_for_source") - log.preview(df_acteur_from_db, "df_acteur_from_db") - - return propose_acteur_to_delete( - df_acteurs_for_source=df_acteurs_for_source, - df_acteur_from_db=df_acteur_from_db, - ) diff --git a/dags/sources/tasks/airflow_logic/read_mapping_from_postgres_task.py b/dags/sources/tasks/airflow_logic/read_mapping_from_postgres_task.py deleted file mode 100644 index fb6cab556..000000000 --- a/dags/sources/tasks/airflow_logic/read_mapping_from_postgres_task.py +++ /dev/null @@ -1,38 +0,0 @@ -import logging -from datetime import timedelta - -from airflow import DAG -from airflow.operators.python import PythonOperator -from sources.tasks.business_logic.read_mapping_from_postgres import ( - read_mapping_from_postgres, -) -from utils import logging_utils as log - -logger = logging.getLogger(__name__) - - -def read_mapping_from_postgres_task( - *, - dag: DAG, - table_name: str, - task_id: str, - retries: int = 0, - retry_delay: timedelta = timedelta(minutes=2), -) -> PythonOperator: - - return PythonOperator( - task_id=task_id, - python_callable=read_mapping_from_postgres_wrapper, - op_kwargs={"table_name": table_name}, - dag=dag, - retries=retries, - retry_delay=retry_delay, - ) - - -def read_mapping_from_postgres_wrapper(**kwargs): - table_name = kwargs["table_name"] - - log.preview("table_name", table_name) - - return read_mapping_from_postgres(table_name=table_name) diff --git a/dags/sources/tasks/business_logic/propose_acteur_to_delete.py b/dags/sources/tasks/business_logic/propose_acteur_to_delete.py deleted file mode 100644 index 6008918b7..000000000 --- a/dags/sources/tasks/business_logic/propose_acteur_to_delete.py +++ /dev/null @@ -1,34 +0,0 @@ -import logging - -import pandas as pd -from sources.config import shared_constants as constants - -logger = logging.getLogger(__name__) - - -def propose_acteur_to_delete( - df_acteurs_for_source: pd.DataFrame, - df_acteur_from_db: pd.DataFrame, -): - - df_acteur_from_db_actifs = df_acteur_from_db[ - df_acteur_from_db["statut"] == constants.ACTEUR_ACTIF - ] - - df_acteur_to_delete = df_acteur_from_db_actifs[ - ~df_acteur_from_db_actifs["identifiant_unique"].isin( - df_acteurs_for_source["identifiant_unique"] - ) - ][["identifiant_unique", "cree_le", "modifie_le"]] - - df_acteur_to_delete["statut"] = "SUPPRIME" - df_acteur_to_delete["event"] = "UPDATE_ACTOR" - - # FIXME: ajouter le contexte de la suppression - # ajouter une colonne context avec le contenu de df_acteurs_for_db en json pour - # chaque colonne en jonction sur identifiant_unique - - return { - "metadata": {"number_of_removed_actors": len(df_acteur_to_delete)}, - "df_acteur_to_delete": df_acteur_to_delete, - } diff --git a/dags/sources/tasks/business_logic/source_data_validate.py b/dags/sources/tasks/business_logic/source_data_validate.py index 318e66a09..5e11fb445 100755 --- a/dags/sources/tasks/business_logic/source_data_validate.py +++ b/dags/sources/tasks/business_logic/source_data_validate.py @@ -3,7 +3,7 @@ import pandas as pd from sources.tasks.airflow_logic.config_management import DAGConfig -from sources.tasks.business_logic.read_mapping_from_postgres import ( +from sources.tasks.transform.read_mapping_from_postgres import ( read_mapping_from_postgres, ) from sources.tasks.transform.transform_df import MANDATORY_COLUMNS_AFTER_NORMALISATION diff --git a/dags/sources/tasks/business_logic/read_mapping_from_postgres.py b/dags/sources/tasks/transform/read_mapping_from_postgres.py similarity index 100% rename from dags/sources/tasks/business_logic/read_mapping_from_postgres.py rename to dags/sources/tasks/transform/read_mapping_from_postgres.py diff --git a/dags_unit_tests/sources/tasks/business_logic/test_compute_link_tables.py b/dags_unit_tests/sources/tasks/business_logic/test_compute_link_tables.py new file mode 100755 index 000000000..46864e9df --- /dev/null +++ b/dags_unit_tests/sources/tasks/business_logic/test_compute_link_tables.py @@ -0,0 +1,221 @@ +import pandas as pd +import pytest +from sources.tasks.business_logic.compute_link_tables import compute_link_tables + + +@pytest.fixture +def df_acteur(): + return pd.DataFrame( + { + "identifiant_unique": [1, 2, 3], + "acteurservice_codes": [[], [], []], + "label_codes": [[], [], []], + "proposition_services_codes": [[], [], []], + "source_code": ["source1", "source2", "source1"], + "acteur_type_code": ["commerce", "ess", "pav_prive"], + } + ) + + +def test_compute_link_tables_success( + df_acteur, + acteurservice_id_by_code, + labelqualite_id_by_code, + actions_id_by_code, + souscategorieobjet_code_by_id, + source_id_by_code, + acteurtype_id_by_code, +): + + result = compute_link_tables( + df_acteur=df_acteur, + acteurservice_id_by_code=acteurservice_id_by_code, + labelqualite_id_by_code=labelqualite_id_by_code, + actions_id_by_code=actions_id_by_code, + souscats_id_by_code=souscategorieobjet_code_by_id, + source_id_by_code=source_id_by_code, + acteurtype_id_by_code=acteurtype_id_by_code, + ) + expected_result = pd.DataFrame( + { + "identifiant_unique": [1, 2, 3], + "acteurservice_codes": [[], [], []], + "label_codes": [[], [], []], + "proposition_services_codes": [[], [], []], + "source_code": ["source1", "source2", "source1"], + "acteur_type_code": ["commerce", "ess", "pav_prive"], + "acteur_services": [[], [], []], + "labels": [[], [], []], + "proposition_services": [[], [], []], + "source_id": [1, 2, 1], + "acteur_type_id": [202, 201, 204], + } + ) + pd.testing.assert_frame_equal( + result.reset_index(drop=True), expected_result.reset_index(drop=True) + ) + + +def test_compute_link_tables_acteur_services( + df_acteur, + acteurservice_id_by_code, + labelqualite_id_by_code, + actions_id_by_code, + souscategorieobjet_code_by_id, + source_id_by_code, + acteurtype_id_by_code, +): + df_acteur["acteurservice_codes"] = [ + ["service_de_reparation"], + ["structure_de_collecte"], + ["service_de_reparation", "structure_de_collecte"], + ] + result = compute_link_tables( + df_acteur=df_acteur, + acteurservice_id_by_code=acteurservice_id_by_code, + labelqualite_id_by_code=labelqualite_id_by_code, + actions_id_by_code=actions_id_by_code, + souscats_id_by_code=souscategorieobjet_code_by_id, + source_id_by_code=source_id_by_code, + acteurtype_id_by_code=acteurtype_id_by_code, + ) + expected_acteur_services = pd.Series( + [ + [{"acteurservice_id": 10, "acteur_id": 1}], + [{"acteurservice_id": 20, "acteur_id": 2}], + [ + {"acteurservice_id": 10, "acteur_id": 3}, + {"acteurservice_id": 20, "acteur_id": 3}, + ], + ] + ) + assert result["acteur_services"].equals(expected_acteur_services) + + +def test_compute_link_tables_labels( + df_acteur, + acteurservice_id_by_code, + labelqualite_id_by_code, + actions_id_by_code, + souscategorieobjet_code_by_id, + source_id_by_code, + acteurtype_id_by_code, +): + df_acteur["label_codes"] = [["ess"], ["label_bonus"], ["reparacteur", "ess"]] + result = compute_link_tables( + df_acteur=df_acteur, + acteurservice_id_by_code=acteurservice_id_by_code, + labelqualite_id_by_code=labelqualite_id_by_code, + actions_id_by_code=actions_id_by_code, + souscats_id_by_code=souscategorieobjet_code_by_id, + source_id_by_code=source_id_by_code, + acteurtype_id_by_code=acteurtype_id_by_code, + ) + expected_labels = pd.Series( + [ + [{"acteur_id": 1, "labelqualite_id": 1}], + [{"acteur_id": 2, "labelqualite_id": 2}], + [ + {"acteur_id": 3, "labelqualite_id": 3}, + {"acteur_id": 3, "labelqualite_id": 1}, + ], + ] + ) + assert result["labels"].equals(expected_labels) + + +def test_compute_link_tables_propositionservices( + df_acteur, + acteurservice_id_by_code, + labelqualite_id_by_code, + actions_id_by_code, + souscategorieobjet_code_by_id, + source_id_by_code, + acteurtype_id_by_code, +): + df_acteur["proposition_services_codes"] = [ + [{"action": "reparer", "sous_categories": ["smartphone, tablette et console"]}], + [{"action": "donner", "sous_categories": ["ecran"]}], + [ + { + "action": "reparer", + "sous_categories": ["smartphone, tablette et console"], + }, + { + "action": "donner", + "sous_categories": ["ecran", "smartphone, tablette et console"], + }, + ], + ] + result = compute_link_tables( + df_acteur=df_acteur, + acteurservice_id_by_code=acteurservice_id_by_code, + labelqualite_id_by_code=labelqualite_id_by_code, + actions_id_by_code=actions_id_by_code, + souscats_id_by_code=souscategorieobjet_code_by_id, + source_id_by_code=source_id_by_code, + acteurtype_id_by_code=acteurtype_id_by_code, + ) + expected_proposition_services = pd.Series( + [ + [ + { + "acteur_id": 1, + "action_id": 1, + "action": "reparer", + "sous_categories": ["smartphone, tablette et console"], + "pds_sous_categories": [ + { + "souscategorie": "smartphone, tablette et console", + "souscategorieobjet_id": 102, + } + ], + } + ], + [ + { + "acteur_id": 2, + "action_id": 2, + "action": "donner", + "sous_categories": ["ecran"], + "pds_sous_categories": [ + { + "souscategorie": "ecran", + "souscategorieobjet_id": 101, + } + ], + } + ], + [ + { + "acteur_id": 3, + "action_id": 1, + "action": "reparer", + "sous_categories": ["smartphone, tablette et console"], + "pds_sous_categories": [ + { + "souscategorie": "smartphone, tablette et console", + "souscategorieobjet_id": 102, + } + ], + }, + { + "acteur_id": 3, + "action_id": 2, + "action": "donner", + "sous_categories": ["ecran", "smartphone, tablette et console"], + "pds_sous_categories": [ + { + "souscategorie": "ecran", + "souscategorieobjet_id": 101, + }, + { + "souscategorie": "smartphone, tablette et console", + "souscategorieobjet_id": 102, + }, + ], + }, + ], + ] + ) + assert result["proposition_services"].equals(expected_proposition_services) diff --git a/dags_unit_tests/sources/tasks/business_logic/test_test_propose_acteur_to_delete.py b/dags_unit_tests/sources/tasks/business_logic/test_test_propose_acteur_to_delete.py deleted file mode 100644 index 12ba35369..000000000 --- a/dags_unit_tests/sources/tasks/business_logic/test_test_propose_acteur_to_delete.py +++ /dev/null @@ -1,134 +0,0 @@ -from datetime import datetime - -import pandas as pd -import pytest -from sources.tasks.business_logic.propose_acteur_to_delete import ( - propose_acteur_to_delete, -) - - -class TestActeurToDelete: - @pytest.mark.parametrize( - ( - "df_acteur_from_db1, df_acteurs_for_source, df_expected_acteur_to_delete," - " expected_metadata" - ), - [ - # No deletion - ( - pd.DataFrame( - { - "identifiant_unique": ["id1", "id2"], - "statut": ["ACTIF", "ACTIF"], - "cree_le": [datetime(2024, 1, 1), datetime(2024, 1, 1)], - "modifie_le": [datetime(2024, 1, 1), datetime(2024, 1, 1)], - } - ), - pd.DataFrame( - { - "identifiant_unique": ["id1", "id2"], - "statut": ["ACTIF", "ACTIF"], - "cree_le": [datetime(2024, 1, 1), datetime(2024, 1, 1)], - "modifie_le": [datetime(2024, 1, 1), datetime(2024, 1, 1)], - } - ), - pd.DataFrame( - columns=[ - "identifiant_unique", - "cree_le", - "modifie_le", - "statut", - "event", - ] - ), - {"number_of_removed_actors": 0}, - ), - # No Deletion - ( - pd.DataFrame( - { - "identifiant_unique": ["id1", "id2", "id3", "id4"], - "statut": ["ACTIF", "ACTIF", "INACTIF", "SUPPRIME"], - "cree_le": [ - datetime(2024, 1, 1), - datetime(2024, 1, 1), - datetime(2024, 1, 1), - datetime(2024, 1, 1), - ], - "modifie_le": [ - datetime(2024, 1, 1), - datetime(2024, 1, 1), - datetime(2024, 1, 1), - datetime(2024, 1, 1), - ], - } - ), - pd.DataFrame( - { - "identifiant_unique": ["id1", "id2"], - "statut": ["ACTIF", "ACTIF"], - "cree_le": [datetime(2024, 1, 1), datetime(2024, 1, 1)], - "modifie_le": [datetime(2024, 1, 1), datetime(2024, 1, 1)], - } - ), - pd.DataFrame( - columns=[ - "identifiant_unique", - "cree_le", - "modifie_le", - "statut", - "event", - ] - ), - {"number_of_removed_actors": 0}, - ), - # Deletion - ( - pd.DataFrame( - { - "identifiant_unique": ["id1", "id2"], - "statut": ["ACTIF", "ACTIF"], - "cree_le": [datetime(2024, 1, 1), datetime(2024, 1, 1)], - "modifie_le": [datetime(2024, 1, 1), datetime(2024, 1, 1)], - } - ), - pd.DataFrame( - { - "identifiant_unique": ["id1"], - "statut": ["ACTIF"], - "cree_le": [datetime(2024, 1, 1)], - "modifie_le": [datetime(2024, 1, 1)], - } - ), - pd.DataFrame( - { - "identifiant_unique": ["id2"], - "cree_le": [datetime(2024, 1, 1)], - "modifie_le": [datetime(2024, 1, 1)], - "statut": ["SUPPRIME"], - "event": ["UPDATE_ACTOR"], - } - ), - {"number_of_removed_actors": 1}, - ), - ], - ) - def test_propose_acteur_to_delete( - self, - df_acteur_from_db1, - df_acteurs_for_source, - df_expected_acteur_to_delete, - expected_metadata, - ): - result = propose_acteur_to_delete( - df_acteurs_for_source=df_acteurs_for_source, - df_acteur_from_db=df_acteur_from_db1, - ) - df_returned_acteur_to_delete = result["df_acteur_to_delete"] - - pd.testing.assert_frame_equal( - df_returned_acteur_to_delete.reset_index(drop=True), - df_expected_acteur_to_delete.reset_index(drop=True), - check_dtype=False, - ) - assert result["metadata"] == expected_metadata