From a9db461378c33ed9e8fc25042d4f818ffdd01c50 Mon Sep 17 00:00:00 2001 From: Nicolas Oudard Date: Tue, 21 Jan 2025 17:03:54 +0100 Subject: [PATCH] =?UTF-8?q?taches=20de=20d=C3=A9duplication=20en=20ligne?= =?UTF-8?q?=20et=20renommage=20des=20taches=20(#1244)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit taches de déduplicationen ligne et renommage des taches Co-authored-by: Fabien Le Frapper --- .../dags/create_final_actors.py | 50 +++++++++---------- .../tasks/airflow_logic/__init__.py | 8 +-- ...ask.py => compute_acteur_services_task.py} | 12 ++--- ..._acteur_task.py => compute_acteur_task.py} | 14 +++--- ..._labels_task.py => compute_labels_task.py} | 12 ++--- .../tasks/airflow_logic/db_data_write_task.py | 4 +- .../deduplicate_acteur_services_task.py | 22 ++++---- .../deduplicate_acteur_sources_task.py | 2 +- .../airflow_logic/deduplicate_labels_task.py | 10 ++-- ...> deduplicate_propositionservices_task.py} | 14 +++--- .../tasks/business_logic/__init__.py | 8 +-- ...orrections_acteur.py => compute_acteur.py} | 2 +- ...services.py => compute_acteur_services.py} | 2 +- .../{merge_labels.py => compute_labels.py} | 2 +- .../deduplicate_acteur_services.py | 6 +-- .../business_logic/deduplicate_labels.py | 4 +- ....py => deduplicate_propositionservices.py} | 2 +- .../test_apply_correction_acteur.py | 12 ++--- .../test_merge_acteur_services.py | 8 +-- .../business_logic/test_merge_labels.py | 6 +-- 20 files changed, 93 insertions(+), 107 deletions(-) rename dags/compute_acteurs/tasks/airflow_logic/{merge_acteur_services_task.py => compute_acteur_services_task.py} (74%) rename dags/compute_acteurs/tasks/airflow_logic/{apply_corrections_acteur_task.py => compute_acteur_task.py} (53%) rename dags/compute_acteurs/tasks/airflow_logic/{merge_labels_task.py => compute_labels_task.py} (75%) rename dags/compute_acteurs/tasks/airflow_logic/{compute_parent_ps_task.py => deduplicate_propositionservices_task.py} (69%) rename dags/compute_acteurs/tasks/business_logic/{apply_corrections_acteur.py => compute_acteur.py} (95%) rename dags/compute_acteurs/tasks/business_logic/{merge_acteur_services.py => compute_acteur_services.py} (94%) rename dags/compute_acteurs/tasks/business_logic/{merge_labels.py => compute_labels.py} (95%) rename dags/compute_acteurs/tasks/business_logic/{compute_parent_ps.py => deduplicate_propositionservices.py} (98%) diff --git a/dags/compute_acteurs/dags/create_final_actors.py b/dags/compute_acteurs/dags/create_final_actors.py index 4c3838e8d..ee131ac85 100755 --- a/dags/compute_acteurs/dags/create_final_actors.py +++ b/dags/compute_acteurs/dags/create_final_actors.py @@ -3,15 +3,15 @@ from airflow import DAG from airflow.operators.python import PythonOperator from compute_acteurs.tasks.airflow_logic import ( - apply_corrections_acteur_task, - compute_parent_ps_task, + compute_acteur_services_task, + compute_acteur_task, + compute_labels_task, compute_ps_task, db_data_write_task, - deduplicate_acteur_serivces_task, + deduplicate_acteur_services_task, deduplicate_acteur_sources_task, deduplicate_labels_task, - merge_acteur_services_task, - merge_labels_task, + deduplicate_propositionservices_task, ) from utils.db_tasks import read_data_from_postgres @@ -133,18 +133,20 @@ ) -apply_corrections_acteur_task_instance = apply_corrections_acteur_task(dag) +compute_acteur_task_instance = compute_acteur_task(dag) compute_ps_task_instance = compute_ps_task(dag) -compute_parent_ps_task_instance = compute_parent_ps_task(dag) -deduplicate_acteur_serivces_task_instance = deduplicate_acteur_serivces_task(dag) +deduplicate_propositionservices_task_instance = deduplicate_propositionservices_task( + dag +) +deduplicate_acteur_services_task_instance = deduplicate_acteur_services_task(dag) deduplicate_acteur_sources_task_instance = deduplicate_acteur_sources_task(dag) deduplicate_labels_task_instance = deduplicate_labels_task(dag) -merge_acteur_services_task_instance = merge_acteur_services_task(dag) -merge_labels_task_instance = merge_labels_task(dag) +compute_acteur_services_task_instance = compute_acteur_services_task(dag) +compute_labels_task_instance = compute_labels_task(dag) db_data_write_task_instance = db_data_write_task(dag) -load_acteur_task >> apply_corrections_acteur_task_instance +load_acteur_task >> compute_acteur_task_instance [ load_propositionservice_task, load_revisionpropositionservice_task, @@ -155,26 +157,20 @@ load_revisionacteur_task, load_acteur_labels_task, load_revisionacteur_labels_task, -] >> merge_labels_task_instance +] >> compute_labels_task_instance [ load_revisionacteur_task, load_acteur_acteur_services_task, load_revisionacteur_acteur_services_task, -] >> merge_acteur_services_task_instance -apply_corrections_acteur_task_instance >> compute_parent_ps_task_instance -apply_corrections_acteur_task_instance >> deduplicate_acteur_sources_task_instance -(compute_ps_task_instance >> compute_parent_ps_task_instance) +] >> compute_acteur_services_task_instance +compute_acteur_task_instance >> deduplicate_propositionservices_task_instance +compute_ps_task_instance >> deduplicate_propositionservices_task_instance +(compute_labels_task_instance >> compute_acteur_task_instance) +(compute_acteur_services_task_instance >> compute_acteur_task_instance) ( - merge_labels_task_instance - >> apply_corrections_acteur_task_instance + deduplicate_propositionservices_task_instance + >> deduplicate_acteur_sources_task_instance + >> deduplicate_acteur_services_task_instance >> deduplicate_labels_task_instance + >> db_data_write_task_instance ) -( - merge_acteur_services_task_instance - >> apply_corrections_acteur_task_instance - >> deduplicate_acteur_serivces_task_instance -) -deduplicate_acteur_sources_task_instance >> db_data_write_task_instance -compute_parent_ps_task_instance >> db_data_write_task_instance -deduplicate_labels_task_instance >> db_data_write_task_instance -deduplicate_acteur_serivces_task_instance >> db_data_write_task_instance diff --git a/dags/compute_acteurs/tasks/airflow_logic/__init__.py b/dags/compute_acteurs/tasks/airflow_logic/__init__.py index b829e87a9..70b58def2 100644 --- a/dags/compute_acteurs/tasks/airflow_logic/__init__.py +++ b/dags/compute_acteurs/tasks/airflow_logic/__init__.py @@ -1,9 +1,9 @@ -from .apply_corrections_acteur_task import * # noqa -from .compute_parent_ps_task import * # noqa +from .compute_acteur_services_task import * # noqa +from .compute_acteur_task import * # noqa +from .compute_labels_task import * # noqa from .compute_ps_task import * # noqa from .db_data_write_task import * # noqa from .deduplicate_acteur_services_task import * # noqa from .deduplicate_acteur_sources_task import * # noqa from .deduplicate_labels_task import * # noqa -from .merge_acteur_services_task import * # noqa -from .merge_labels_task import * # noqa +from .deduplicate_propositionservices_task import * # noqa diff --git a/dags/compute_acteurs/tasks/airflow_logic/merge_acteur_services_task.py b/dags/compute_acteurs/tasks/airflow_logic/compute_acteur_services_task.py similarity index 74% rename from dags/compute_acteurs/tasks/airflow_logic/merge_acteur_services_task.py rename to dags/compute_acteurs/tasks/airflow_logic/compute_acteur_services_task.py index a5a55c230..2bbd0b5ef 100644 --- a/dags/compute_acteurs/tasks/airflow_logic/merge_acteur_services_task.py +++ b/dags/compute_acteurs/tasks/airflow_logic/compute_acteur_services_task.py @@ -2,21 +2,21 @@ from airflow import DAG from airflow.operators.python import PythonOperator -from compute_acteurs.tasks.business_logic import merge_acteur_services +from compute_acteurs.tasks.business_logic import compute_acteur_services from utils import logging_utils as log logger = logging.getLogger(__name__) -def merge_acteur_services_task(dag: DAG) -> PythonOperator: +def compute_acteur_services_task(dag: DAG) -> PythonOperator: return PythonOperator( - task_id="merge_acteur_services", - python_callable=merge_acteur_services_wrapper, + task_id="compute_acteur_services", + python_callable=compute_acteur_services_wrapper, dag=dag, ) -def merge_acteur_services_wrapper(**kwargs): +def compute_acteur_services_wrapper(**kwargs): df_acteur_acteur_services = kwargs["ti"].xcom_pull( task_ids="load_acteur_acteur_services" ) @@ -29,7 +29,7 @@ def merge_acteur_services_wrapper(**kwargs): log.preview("df_revisionacteur_acteur_services", df_revisionacteur_acteur_services) log.preview("df_revisionacteur", df_revisionacteur) - return merge_acteur_services( + return compute_acteur_services( df_acteur_acteur_services=df_acteur_acteur_services, df_revisionacteur_acteur_services=df_revisionacteur_acteur_services, df_revisionacteur=df_revisionacteur, diff --git a/dags/compute_acteurs/tasks/airflow_logic/apply_corrections_acteur_task.py b/dags/compute_acteurs/tasks/airflow_logic/compute_acteur_task.py similarity index 53% rename from dags/compute_acteurs/tasks/airflow_logic/apply_corrections_acteur_task.py rename to dags/compute_acteurs/tasks/airflow_logic/compute_acteur_task.py index 0b1399eac..6b3576ce5 100644 --- a/dags/compute_acteurs/tasks/airflow_logic/apply_corrections_acteur_task.py +++ b/dags/compute_acteurs/tasks/airflow_logic/compute_acteur_task.py @@ -2,27 +2,25 @@ from airflow import DAG from airflow.operators.python import PythonOperator -from compute_acteurs.tasks.business_logic import apply_corrections_acteur +from compute_acteurs.tasks.business_logic import compute_acteur from utils import logging_utils as log logger = logging.getLogger(__name__) -def apply_corrections_acteur_task(dag: DAG) -> PythonOperator: +def compute_acteur_task(dag: DAG) -> PythonOperator: return PythonOperator( - task_id="apply_corrections_acteur", - python_callable=apply_corrections_acteur_wrapper, + task_id="compute_acteur", + python_callable=compute_acteur_wrapper, dag=dag, ) -def apply_corrections_acteur_wrapper(**kwargs): +def compute_acteur_wrapper(**kwargs): df_acteur = kwargs["ti"].xcom_pull(task_ids="load_acteur") df_revisionacteur = kwargs["ti"].xcom_pull(task_ids="load_revisionacteur") log.preview("df_acteur", df_acteur) log.preview("df_revisionacteur", df_revisionacteur) - return apply_corrections_acteur( - df_acteur=df_acteur, df_revisionacteur=df_revisionacteur - ) + return compute_acteur(df_acteur=df_acteur, df_revisionacteur=df_revisionacteur) diff --git a/dags/compute_acteurs/tasks/airflow_logic/merge_labels_task.py b/dags/compute_acteurs/tasks/airflow_logic/compute_labels_task.py similarity index 75% rename from dags/compute_acteurs/tasks/airflow_logic/merge_labels_task.py rename to dags/compute_acteurs/tasks/airflow_logic/compute_labels_task.py index ce95cbc14..eb436a62f 100644 --- a/dags/compute_acteurs/tasks/airflow_logic/merge_labels_task.py +++ b/dags/compute_acteurs/tasks/airflow_logic/compute_labels_task.py @@ -2,21 +2,21 @@ from airflow import DAG from airflow.operators.python import PythonOperator -from compute_acteurs.tasks.business_logic import merge_labels +from compute_acteurs.tasks.business_logic import compute_labels from utils import logging_utils as log logger = logging.getLogger(__name__) -def merge_labels_task(dag: DAG) -> PythonOperator: +def compute_labels_task(dag: DAG) -> PythonOperator: return PythonOperator( - task_id="merge_labels", - python_callable=merge_labels_wrapper, + task_id="compute_labels", + python_callable=compute_labels_wrapper, dag=dag, ) -def merge_labels_wrapper(**kwargs): +def compute_labels_wrapper(**kwargs): df_acteur_labels = kwargs["ti"].xcom_pull(task_ids="load_acteur_labels") df_revisionacteur_labels = kwargs["ti"].xcom_pull( task_ids="load_revisionacteur_labels" @@ -27,7 +27,7 @@ def merge_labels_wrapper(**kwargs): log.preview("df_revisionacteur_labels", df_revisionacteur_labels) log.preview("df_revisionacteur", df_revisionacteur) - return merge_labels( + return compute_labels( df_acteur_labels=df_acteur_labels, df_revisionacteur_labels=df_revisionacteur_labels, df_revisionacteur=df_revisionacteur, diff --git a/dags/compute_acteurs/tasks/airflow_logic/db_data_write_task.py b/dags/compute_acteurs/tasks/airflow_logic/db_data_write_task.py index 1881e44ef..c65029d67 100644 --- a/dags/compute_acteurs/tasks/airflow_logic/db_data_write_task.py +++ b/dags/compute_acteurs/tasks/airflow_logic/db_data_write_task.py @@ -17,12 +17,12 @@ def db_data_write_task(dag: DAG) -> PythonOperator: def db_data_write_wrapper(**kwargs): - df_acteur_merged = kwargs["ti"].xcom_pull(task_ids="apply_corrections_acteur")[ + df_acteur_merged = kwargs["ti"].xcom_pull(task_ids="compute_acteur")[ "df_acteur_merged" ] df_labels_updated = kwargs["ti"].xcom_pull(task_ids="deduplicate_labels") df_acteur_services_updated = kwargs["ti"].xcom_pull( - task_ids="deduplicate_acteur_serivces" + task_ids="deduplicate_acteur_services" ) df_acteur_sources_updated = kwargs["ti"].xcom_pull( task_ids="deduplicate_acteur_sources" diff --git a/dags/compute_acteurs/tasks/airflow_logic/deduplicate_acteur_services_task.py b/dags/compute_acteurs/tasks/airflow_logic/deduplicate_acteur_services_task.py index 1d1d9fa5c..ec3cfa44d 100644 --- a/dags/compute_acteurs/tasks/airflow_logic/deduplicate_acteur_services_task.py +++ b/dags/compute_acteurs/tasks/airflow_logic/deduplicate_acteur_services_task.py @@ -2,31 +2,29 @@ from airflow import DAG from airflow.operators.python import PythonOperator -from compute_acteurs.tasks.business_logic import deduplicate_acteur_serivces +from compute_acteurs.tasks.business_logic import deduplicate_acteur_services from utils import logging_utils as log logger = logging.getLogger(__name__) -def deduplicate_acteur_serivces_task(dag: DAG) -> PythonOperator: +def deduplicate_acteur_services_task(dag: DAG) -> PythonOperator: return PythonOperator( - task_id="deduplicate_acteur_serivces", - python_callable=deduplicate_acteur_serivces_wrapper, + task_id="deduplicate_acteur_services", + python_callable=deduplicate_acteur_services_wrapper, dag=dag, ) -def deduplicate_acteur_serivces_wrapper(**kwargs): +def deduplicate_acteur_services_wrapper(**kwargs): - df_children = kwargs["ti"].xcom_pull(task_ids="apply_corrections_acteur")[ - "df_children" - ] - df_merge_acteur_services = kwargs["ti"].xcom_pull(task_ids="merge_acteur_services") + df_children = kwargs["ti"].xcom_pull(task_ids="compute_acteur")["df_children"] + df_acteur_services = kwargs["ti"].xcom_pull(task_ids="compute_acteur_services") log.preview("df_children", df_children) - log.preview("df_merged_relationship", df_merge_acteur_services) + log.preview("df_merged_relationship", df_acteur_services) - return deduplicate_acteur_serivces( + return deduplicate_acteur_services( df_children=df_children, - df_merge_acteur_services=df_merge_acteur_services, + df_acteur_services=df_acteur_services, ) diff --git a/dags/compute_acteurs/tasks/airflow_logic/deduplicate_acteur_sources_task.py b/dags/compute_acteurs/tasks/airflow_logic/deduplicate_acteur_sources_task.py index 1837bc220..d12bcddda 100644 --- a/dags/compute_acteurs/tasks/airflow_logic/deduplicate_acteur_sources_task.py +++ b/dags/compute_acteurs/tasks/airflow_logic/deduplicate_acteur_sources_task.py @@ -18,7 +18,7 @@ def deduplicate_acteur_sources_task(dag: DAG) -> PythonOperator: def deduplicate_acteur_sources_wrapper(**kwargs): - data_actors = kwargs["ti"].xcom_pull(task_ids="apply_corrections_acteur") + data_actors = kwargs["ti"].xcom_pull(task_ids="compute_acteur") df_children = data_actors["df_children"] df_acteur_merged = data_actors["df_acteur_merged"] diff --git a/dags/compute_acteurs/tasks/airflow_logic/deduplicate_labels_task.py b/dags/compute_acteurs/tasks/airflow_logic/deduplicate_labels_task.py index b2a88d590..341dcb6ab 100644 --- a/dags/compute_acteurs/tasks/airflow_logic/deduplicate_labels_task.py +++ b/dags/compute_acteurs/tasks/airflow_logic/deduplicate_labels_task.py @@ -18,15 +18,13 @@ def deduplicate_labels_task(dag: DAG) -> PythonOperator: def deduplicate_labels_wrapper(**kwargs): - df_children = kwargs["ti"].xcom_pull(task_ids="apply_corrections_acteur")[ - "df_children" - ] - df_merge_labels = kwargs["ti"].xcom_pull(task_ids="merge_labels") + df_children = kwargs["ti"].xcom_pull(task_ids="compute_acteur")["df_children"] + df_labels = kwargs["ti"].xcom_pull(task_ids="compute_labels") log.preview("df_children", df_children) - log.preview("df_merged_relationship", df_merge_labels) + log.preview("df_merged_relationship", df_labels) return deduplicate_labels( df_children=df_children, - df_merge_labels=df_merge_labels, + df_labels=df_labels, ) diff --git a/dags/compute_acteurs/tasks/airflow_logic/compute_parent_ps_task.py b/dags/compute_acteurs/tasks/airflow_logic/deduplicate_propositionservices_task.py similarity index 69% rename from dags/compute_acteurs/tasks/airflow_logic/compute_parent_ps_task.py rename to dags/compute_acteurs/tasks/airflow_logic/deduplicate_propositionservices_task.py index 0ce4359c2..6c9e85170 100644 --- a/dags/compute_acteurs/tasks/airflow_logic/compute_parent_ps_task.py +++ b/dags/compute_acteurs/tasks/airflow_logic/deduplicate_propositionservices_task.py @@ -2,24 +2,22 @@ from airflow import DAG from airflow.operators.python import PythonOperator -from compute_acteurs.tasks.business_logic import compute_parent_ps +from compute_acteurs.tasks.business_logic import deduplicate_propositionservices from utils import logging_utils as log logger = logging.getLogger(__name__) -def compute_parent_ps_task(dag: DAG) -> PythonOperator: +def deduplicate_propositionservices_task(dag: DAG) -> PythonOperator: return PythonOperator( task_id="deduplicate_propositionservices", - python_callable=compute_parent_ps_wrapper, + python_callable=deduplicate_propositionservices_wrapper, dag=dag, ) -def compute_parent_ps_wrapper(**kwargs): - df_children = kwargs["ti"].xcom_pull(task_ids="apply_corrections_acteur")[ - "df_children" - ] +def deduplicate_propositionservices_wrapper(**kwargs): + df_children = kwargs["ti"].xcom_pull(task_ids="compute_acteur")["df_children"] dfs_ps = kwargs["ti"].xcom_pull(task_ids="compute_ps") df_propositionservice_merged = dfs_ps["df_propositionservice_merged"] df_propositionservice_sous_categories_merged = dfs_ps[ @@ -33,7 +31,7 @@ def compute_parent_ps_wrapper(**kwargs): df_propositionservice_sous_categories_merged, ) - return compute_parent_ps( + return deduplicate_propositionservices( df_children=df_children, df_ps=df_propositionservice_merged, df_ps_sscat=df_propositionservice_sous_categories_merged, diff --git a/dags/compute_acteurs/tasks/business_logic/__init__.py b/dags/compute_acteurs/tasks/business_logic/__init__.py index c5f9d4393..f88fca2f0 100644 --- a/dags/compute_acteurs/tasks/business_logic/__init__.py +++ b/dags/compute_acteurs/tasks/business_logic/__init__.py @@ -1,9 +1,9 @@ -from .apply_corrections_acteur import * # noqa -from .compute_parent_ps import * # noqa +from .compute_acteur import * # noqa +from .compute_acteur_services import * # noqa +from .compute_labels import * # noqa from .compute_ps import * # noqa from .db_data_write import * # noqa from .deduplicate_acteur_services import * # noqa from .deduplicate_acteur_sources import * # noqa from .deduplicate_labels import * # noqa -from .merge_acteur_services import * # noqa -from .merge_labels import * # noqa +from .deduplicate_propositionservices import * # noqa diff --git a/dags/compute_acteurs/tasks/business_logic/apply_corrections_acteur.py b/dags/compute_acteurs/tasks/business_logic/compute_acteur.py similarity index 95% rename from dags/compute_acteurs/tasks/business_logic/apply_corrections_acteur.py rename to dags/compute_acteurs/tasks/business_logic/compute_acteur.py index 515bfea71..177f7c12e 100644 --- a/dags/compute_acteurs/tasks/business_logic/apply_corrections_acteur.py +++ b/dags/compute_acteurs/tasks/business_logic/compute_acteur.py @@ -2,7 +2,7 @@ import shortuuid -def apply_corrections_acteur(df_acteur: pd.DataFrame, df_revisionacteur: pd.DataFrame): +def compute_acteur(df_acteur: pd.DataFrame, df_revisionacteur: pd.DataFrame): revisionacteur_parent_ids = df_revisionacteur["parent_id"].unique() diff --git a/dags/compute_acteurs/tasks/business_logic/merge_acteur_services.py b/dags/compute_acteurs/tasks/business_logic/compute_acteur_services.py similarity index 94% rename from dags/compute_acteurs/tasks/business_logic/merge_acteur_services.py rename to dags/compute_acteurs/tasks/business_logic/compute_acteur_services.py index 19abd82bf..a364a685b 100644 --- a/dags/compute_acteurs/tasks/business_logic/merge_acteur_services.py +++ b/dags/compute_acteurs/tasks/business_logic/compute_acteur_services.py @@ -4,7 +4,7 @@ ) -def merge_acteur_services( +def compute_acteur_services( df_acteur_acteur_services: pd.DataFrame, df_revisionacteur_acteur_services: pd.DataFrame, df_revisionacteur: pd.DataFrame, diff --git a/dags/compute_acteurs/tasks/business_logic/merge_labels.py b/dags/compute_acteurs/tasks/business_logic/compute_labels.py similarity index 95% rename from dags/compute_acteurs/tasks/business_logic/merge_labels.py rename to dags/compute_acteurs/tasks/business_logic/compute_labels.py index 3ea7fad7f..85562d01e 100644 --- a/dags/compute_acteurs/tasks/business_logic/merge_labels.py +++ b/dags/compute_acteurs/tasks/business_logic/compute_labels.py @@ -4,7 +4,7 @@ ) -def merge_labels( +def compute_labels( df_acteur_labels: pd.DataFrame, df_revisionacteur_labels: pd.DataFrame, df_revisionacteur: pd.DataFrame, diff --git a/dags/compute_acteurs/tasks/business_logic/deduplicate_acteur_services.py b/dags/compute_acteurs/tasks/business_logic/deduplicate_acteur_services.py index 51f33ed71..9b4b7ba3b 100644 --- a/dags/compute_acteurs/tasks/business_logic/deduplicate_acteur_services.py +++ b/dags/compute_acteurs/tasks/business_logic/deduplicate_acteur_services.py @@ -4,12 +4,12 @@ ) -def deduplicate_acteur_serivces( +def deduplicate_acteur_services( df_children: pd.DataFrame, - df_merge_acteur_services: pd.DataFrame, + df_acteur_services: pd.DataFrame, ): return deduplicate_acteurs_many2many_relationship( df_children, - df_merge_acteur_services, + df_acteur_services, "acteurservice_id", ) diff --git a/dags/compute_acteurs/tasks/business_logic/deduplicate_labels.py b/dags/compute_acteurs/tasks/business_logic/deduplicate_labels.py index 01e0c0348..22d7d64d3 100644 --- a/dags/compute_acteurs/tasks/business_logic/deduplicate_labels.py +++ b/dags/compute_acteurs/tasks/business_logic/deduplicate_labels.py @@ -6,10 +6,10 @@ def deduplicate_labels( df_children: pd.DataFrame, - df_merge_labels: pd.DataFrame, + df_labels: pd.DataFrame, ): return deduplicate_acteurs_many2many_relationship( df_children, - df_merge_labels, + df_labels, "labelqualite_id", ) diff --git a/dags/compute_acteurs/tasks/business_logic/compute_parent_ps.py b/dags/compute_acteurs/tasks/business_logic/deduplicate_propositionservices.py similarity index 98% rename from dags/compute_acteurs/tasks/business_logic/compute_parent_ps.py rename to dags/compute_acteurs/tasks/business_logic/deduplicate_propositionservices.py index c5d843097..f846bc458 100644 --- a/dags/compute_acteurs/tasks/business_logic/compute_parent_ps.py +++ b/dags/compute_acteurs/tasks/business_logic/deduplicate_propositionservices.py @@ -1,7 +1,7 @@ import pandas as pd -def compute_parent_ps( +def deduplicate_propositionservices( df_children: pd.DataFrame, df_ps: pd.DataFrame, df_ps_sscat: pd.DataFrame, diff --git a/dags_unit_tests/compute_acteurs/business_logic/test_apply_correction_acteur.py b/dags_unit_tests/compute_acteurs/business_logic/test_apply_correction_acteur.py index 1afecfd47..46a8666da 100755 --- a/dags_unit_tests/compute_acteurs/business_logic/test_apply_correction_acteur.py +++ b/dags_unit_tests/compute_acteurs/business_logic/test_apply_correction_acteur.py @@ -1,9 +1,7 @@ import numpy as np import pandas as pd import pytest -from compute_acteurs.tasks.business_logic.apply_corrections_acteur import ( - apply_corrections_acteur, -) +from compute_acteurs.tasks.business_logic.compute_acteur import compute_acteur class TestApplyCorrections: @@ -32,10 +30,10 @@ def df_load_revisionacteur(self): } ) - def test_apply_corrections_acteur(self, df_load_acteur, df_load_revisionacteur): + def test_compute_acteur(self, df_load_acteur, df_load_revisionacteur): # Call the function with the mocked ti - result = apply_corrections_acteur( + result = compute_acteur( df_acteur=df_load_acteur, df_revisionacteur=df_load_revisionacteur ) @@ -88,11 +86,11 @@ def df_load_revisionacteur_with_children(self): } ) - def test_apply_corrections_acteur_children( + def test_compute_acteur_children( self, df_load_acteur_with_children, df_load_revisionacteur_with_children ): # Call the function with the mocked ti - result = apply_corrections_acteur( + result = compute_acteur( df_acteur=df_load_acteur_with_children, df_revisionacteur=df_load_revisionacteur_with_children, ) diff --git a/dags_unit_tests/compute_acteurs/business_logic/test_merge_acteur_services.py b/dags_unit_tests/compute_acteurs/business_logic/test_merge_acteur_services.py index 27c972764..926b7e4dd 100755 --- a/dags_unit_tests/compute_acteurs/business_logic/test_merge_acteur_services.py +++ b/dags_unit_tests/compute_acteurs/business_logic/test_merge_acteur_services.py @@ -1,7 +1,7 @@ import pandas as pd import pytest -from compute_acteurs.tasks.business_logic.merge_acteur_services import ( - merge_acteur_services, +from compute_acteurs.tasks.business_logic.compute_acteur_services import ( + compute_acteur_services, ) @@ -91,7 +91,7 @@ class TestMergeActeurServices: ), ], ) - def test_merge_acteur_services( + def test_compute_acteur_services( self, load_acteur_acteur_services, load_revisionacteur_acteur_services, @@ -99,7 +99,7 @@ def test_merge_acteur_services( expected, ): - result = merge_acteur_services( + result = compute_acteur_services( df_acteur_acteur_services=load_acteur_acteur_services, df_revisionacteur_acteur_services=load_revisionacteur_acteur_services, df_revisionacteur=load_revisionacteur, diff --git a/dags_unit_tests/compute_acteurs/business_logic/test_merge_labels.py b/dags_unit_tests/compute_acteurs/business_logic/test_merge_labels.py index c4ad41844..a39823d87 100755 --- a/dags_unit_tests/compute_acteurs/business_logic/test_merge_labels.py +++ b/dags_unit_tests/compute_acteurs/business_logic/test_merge_labels.py @@ -1,6 +1,6 @@ import pandas as pd import pytest -from compute_acteurs.tasks.business_logic.merge_labels import merge_labels +from compute_acteurs.tasks.business_logic.compute_labels import compute_labels class TestMergeLabels: @@ -87,7 +87,7 @@ class TestMergeLabels: ), ], ) - def test_merge_labels( + def test_compute_labels( self, load_acteur_labels, load_revisionacteur_labels, @@ -95,7 +95,7 @@ def test_merge_labels( expected, ): - result = merge_labels( + result = compute_labels( df_acteur_labels=load_acteur_labels, df_revisionacteur_labels=load_revisionacteur_labels, df_revisionacteur=load_revisionacteur,