Skip to content

Commit

Permalink
taches de déduplication en ligne et renommage des taches (#1244)
Browse files Browse the repository at this point in the history
taches de déduplicationen ligne et renommage des taches

Co-authored-by: Fabien Le Frapper <[email protected]>
  • Loading branch information
kolok and fabienheureux authored Jan 21, 2025
1 parent 52751db commit a9db461
Show file tree
Hide file tree
Showing 20 changed files with 93 additions and 107 deletions.
50 changes: 23 additions & 27 deletions dags/compute_acteurs/dags/create_final_actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
from airflow import DAG
from airflow.operators.python import PythonOperator
from compute_acteurs.tasks.airflow_logic import (
apply_corrections_acteur_task,
compute_parent_ps_task,
compute_acteur_services_task,
compute_acteur_task,
compute_labels_task,
compute_ps_task,
db_data_write_task,
deduplicate_acteur_serivces_task,
deduplicate_acteur_services_task,
deduplicate_acteur_sources_task,
deduplicate_labels_task,
merge_acteur_services_task,
merge_labels_task,
deduplicate_propositionservices_task,
)
from utils.db_tasks import read_data_from_postgres

Expand Down Expand Up @@ -133,18 +133,20 @@
)


apply_corrections_acteur_task_instance = apply_corrections_acteur_task(dag)
compute_acteur_task_instance = compute_acteur_task(dag)
compute_ps_task_instance = compute_ps_task(dag)
compute_parent_ps_task_instance = compute_parent_ps_task(dag)
deduplicate_acteur_serivces_task_instance = deduplicate_acteur_serivces_task(dag)
deduplicate_propositionservices_task_instance = deduplicate_propositionservices_task(
dag
)
deduplicate_acteur_services_task_instance = deduplicate_acteur_services_task(dag)
deduplicate_acteur_sources_task_instance = deduplicate_acteur_sources_task(dag)
deduplicate_labels_task_instance = deduplicate_labels_task(dag)
merge_acteur_services_task_instance = merge_acteur_services_task(dag)
merge_labels_task_instance = merge_labels_task(dag)
compute_acteur_services_task_instance = compute_acteur_services_task(dag)
compute_labels_task_instance = compute_labels_task(dag)
db_data_write_task_instance = db_data_write_task(dag)


load_acteur_task >> apply_corrections_acteur_task_instance
load_acteur_task >> compute_acteur_task_instance
[
load_propositionservice_task,
load_revisionpropositionservice_task,
Expand All @@ -155,26 +157,20 @@
load_revisionacteur_task,
load_acteur_labels_task,
load_revisionacteur_labels_task,
] >> merge_labels_task_instance
] >> compute_labels_task_instance
[
load_revisionacteur_task,
load_acteur_acteur_services_task,
load_revisionacteur_acteur_services_task,
] >> merge_acteur_services_task_instance
apply_corrections_acteur_task_instance >> compute_parent_ps_task_instance
apply_corrections_acteur_task_instance >> deduplicate_acteur_sources_task_instance
(compute_ps_task_instance >> compute_parent_ps_task_instance)
] >> compute_acteur_services_task_instance
compute_acteur_task_instance >> deduplicate_propositionservices_task_instance
compute_ps_task_instance >> deduplicate_propositionservices_task_instance
(compute_labels_task_instance >> compute_acteur_task_instance)
(compute_acteur_services_task_instance >> compute_acteur_task_instance)
(
merge_labels_task_instance
>> apply_corrections_acteur_task_instance
deduplicate_propositionservices_task_instance
>> deduplicate_acteur_sources_task_instance
>> deduplicate_acteur_services_task_instance
>> deduplicate_labels_task_instance
>> db_data_write_task_instance
)
(
merge_acteur_services_task_instance
>> apply_corrections_acteur_task_instance
>> deduplicate_acteur_serivces_task_instance
)
deduplicate_acteur_sources_task_instance >> db_data_write_task_instance
compute_parent_ps_task_instance >> db_data_write_task_instance
deduplicate_labels_task_instance >> db_data_write_task_instance
deduplicate_acteur_serivces_task_instance >> db_data_write_task_instance
8 changes: 4 additions & 4 deletions dags/compute_acteurs/tasks/airflow_logic/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from .apply_corrections_acteur_task import * # noqa
from .compute_parent_ps_task import * # noqa
from .compute_acteur_services_task import * # noqa
from .compute_acteur_task import * # noqa
from .compute_labels_task import * # noqa
from .compute_ps_task import * # noqa
from .db_data_write_task import * # noqa
from .deduplicate_acteur_services_task import * # noqa
from .deduplicate_acteur_sources_task import * # noqa
from .deduplicate_labels_task import * # noqa
from .merge_acteur_services_task import * # noqa
from .merge_labels_task import * # noqa
from .deduplicate_propositionservices_task import * # noqa
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@

from airflow import DAG
from airflow.operators.python import PythonOperator
from compute_acteurs.tasks.business_logic import merge_acteur_services
from compute_acteurs.tasks.business_logic import compute_acteur_services
from utils import logging_utils as log

logger = logging.getLogger(__name__)


def merge_acteur_services_task(dag: DAG) -> PythonOperator:
def compute_acteur_services_task(dag: DAG) -> PythonOperator:
return PythonOperator(
task_id="merge_acteur_services",
python_callable=merge_acteur_services_wrapper,
task_id="compute_acteur_services",
python_callable=compute_acteur_services_wrapper,
dag=dag,
)


def merge_acteur_services_wrapper(**kwargs):
def compute_acteur_services_wrapper(**kwargs):
df_acteur_acteur_services = kwargs["ti"].xcom_pull(
task_ids="load_acteur_acteur_services"
)
Expand All @@ -29,7 +29,7 @@ def merge_acteur_services_wrapper(**kwargs):
log.preview("df_revisionacteur_acteur_services", df_revisionacteur_acteur_services)
log.preview("df_revisionacteur", df_revisionacteur)

return merge_acteur_services(
return compute_acteur_services(
df_acteur_acteur_services=df_acteur_acteur_services,
df_revisionacteur_acteur_services=df_revisionacteur_acteur_services,
df_revisionacteur=df_revisionacteur,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,25 @@

from airflow import DAG
from airflow.operators.python import PythonOperator
from compute_acteurs.tasks.business_logic import apply_corrections_acteur
from compute_acteurs.tasks.business_logic import compute_acteur
from utils import logging_utils as log

logger = logging.getLogger(__name__)


def apply_corrections_acteur_task(dag: DAG) -> PythonOperator:
def compute_acteur_task(dag: DAG) -> PythonOperator:
return PythonOperator(
task_id="apply_corrections_acteur",
python_callable=apply_corrections_acteur_wrapper,
task_id="compute_acteur",
python_callable=compute_acteur_wrapper,
dag=dag,
)


def apply_corrections_acteur_wrapper(**kwargs):
def compute_acteur_wrapper(**kwargs):
df_acteur = kwargs["ti"].xcom_pull(task_ids="load_acteur")
df_revisionacteur = kwargs["ti"].xcom_pull(task_ids="load_revisionacteur")

log.preview("df_acteur", df_acteur)
log.preview("df_revisionacteur", df_revisionacteur)

return apply_corrections_acteur(
df_acteur=df_acteur, df_revisionacteur=df_revisionacteur
)
return compute_acteur(df_acteur=df_acteur, df_revisionacteur=df_revisionacteur)
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@

from airflow import DAG
from airflow.operators.python import PythonOperator
from compute_acteurs.tasks.business_logic import merge_labels
from compute_acteurs.tasks.business_logic import compute_labels
from utils import logging_utils as log

logger = logging.getLogger(__name__)


def merge_labels_task(dag: DAG) -> PythonOperator:
def compute_labels_task(dag: DAG) -> PythonOperator:
return PythonOperator(
task_id="merge_labels",
python_callable=merge_labels_wrapper,
task_id="compute_labels",
python_callable=compute_labels_wrapper,
dag=dag,
)


def merge_labels_wrapper(**kwargs):
def compute_labels_wrapper(**kwargs):
df_acteur_labels = kwargs["ti"].xcom_pull(task_ids="load_acteur_labels")
df_revisionacteur_labels = kwargs["ti"].xcom_pull(
task_ids="load_revisionacteur_labels"
Expand All @@ -27,7 +27,7 @@ def merge_labels_wrapper(**kwargs):
log.preview("df_revisionacteur_labels", df_revisionacteur_labels)
log.preview("df_revisionacteur", df_revisionacteur)

return merge_labels(
return compute_labels(
df_acteur_labels=df_acteur_labels,
df_revisionacteur_labels=df_revisionacteur_labels,
df_revisionacteur=df_revisionacteur,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ def db_data_write_task(dag: DAG) -> PythonOperator:


def db_data_write_wrapper(**kwargs):
df_acteur_merged = kwargs["ti"].xcom_pull(task_ids="apply_corrections_acteur")[
df_acteur_merged = kwargs["ti"].xcom_pull(task_ids="compute_acteur")[
"df_acteur_merged"
]
df_labels_updated = kwargs["ti"].xcom_pull(task_ids="deduplicate_labels")
df_acteur_services_updated = kwargs["ti"].xcom_pull(
task_ids="deduplicate_acteur_serivces"
task_ids="deduplicate_acteur_services"
)
df_acteur_sources_updated = kwargs["ti"].xcom_pull(
task_ids="deduplicate_acteur_sources"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,29 @@

from airflow import DAG
from airflow.operators.python import PythonOperator
from compute_acteurs.tasks.business_logic import deduplicate_acteur_serivces
from compute_acteurs.tasks.business_logic import deduplicate_acteur_services
from utils import logging_utils as log

logger = logging.getLogger(__name__)


def deduplicate_acteur_serivces_task(dag: DAG) -> PythonOperator:
def deduplicate_acteur_services_task(dag: DAG) -> PythonOperator:
return PythonOperator(
task_id="deduplicate_acteur_serivces",
python_callable=deduplicate_acteur_serivces_wrapper,
task_id="deduplicate_acteur_services",
python_callable=deduplicate_acteur_services_wrapper,
dag=dag,
)


def deduplicate_acteur_serivces_wrapper(**kwargs):
def deduplicate_acteur_services_wrapper(**kwargs):

df_children = kwargs["ti"].xcom_pull(task_ids="apply_corrections_acteur")[
"df_children"
]
df_merge_acteur_services = kwargs["ti"].xcom_pull(task_ids="merge_acteur_services")
df_children = kwargs["ti"].xcom_pull(task_ids="compute_acteur")["df_children"]
df_acteur_services = kwargs["ti"].xcom_pull(task_ids="compute_acteur_services")

log.preview("df_children", df_children)
log.preview("df_merged_relationship", df_merge_acteur_services)
log.preview("df_merged_relationship", df_acteur_services)

return deduplicate_acteur_serivces(
return deduplicate_acteur_services(
df_children=df_children,
df_merge_acteur_services=df_merge_acteur_services,
df_acteur_services=df_acteur_services,
)
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def deduplicate_acteur_sources_task(dag: DAG) -> PythonOperator:

def deduplicate_acteur_sources_wrapper(**kwargs):

data_actors = kwargs["ti"].xcom_pull(task_ids="apply_corrections_acteur")
data_actors = kwargs["ti"].xcom_pull(task_ids="compute_acteur")
df_children = data_actors["df_children"]
df_acteur_merged = data_actors["df_acteur_merged"]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@ def deduplicate_labels_task(dag: DAG) -> PythonOperator:

def deduplicate_labels_wrapper(**kwargs):

df_children = kwargs["ti"].xcom_pull(task_ids="apply_corrections_acteur")[
"df_children"
]
df_merge_labels = kwargs["ti"].xcom_pull(task_ids="merge_labels")
df_children = kwargs["ti"].xcom_pull(task_ids="compute_acteur")["df_children"]
df_labels = kwargs["ti"].xcom_pull(task_ids="compute_labels")

log.preview("df_children", df_children)
log.preview("df_merged_relationship", df_merge_labels)
log.preview("df_merged_relationship", df_labels)

return deduplicate_labels(
df_children=df_children,
df_merge_labels=df_merge_labels,
df_labels=df_labels,
)
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,22 @@

from airflow import DAG
from airflow.operators.python import PythonOperator
from compute_acteurs.tasks.business_logic import compute_parent_ps
from compute_acteurs.tasks.business_logic import deduplicate_propositionservices
from utils import logging_utils as log

logger = logging.getLogger(__name__)


def compute_parent_ps_task(dag: DAG) -> PythonOperator:
def deduplicate_propositionservices_task(dag: DAG) -> PythonOperator:
return PythonOperator(
task_id="deduplicate_propositionservices",
python_callable=compute_parent_ps_wrapper,
python_callable=deduplicate_propositionservices_wrapper,
dag=dag,
)


def compute_parent_ps_wrapper(**kwargs):
df_children = kwargs["ti"].xcom_pull(task_ids="apply_corrections_acteur")[
"df_children"
]
def deduplicate_propositionservices_wrapper(**kwargs):
df_children = kwargs["ti"].xcom_pull(task_ids="compute_acteur")["df_children"]
dfs_ps = kwargs["ti"].xcom_pull(task_ids="compute_ps")
df_propositionservice_merged = dfs_ps["df_propositionservice_merged"]
df_propositionservice_sous_categories_merged = dfs_ps[
Expand All @@ -33,7 +31,7 @@ def compute_parent_ps_wrapper(**kwargs):
df_propositionservice_sous_categories_merged,
)

return compute_parent_ps(
return deduplicate_propositionservices(
df_children=df_children,
df_ps=df_propositionservice_merged,
df_ps_sscat=df_propositionservice_sous_categories_merged,
Expand Down
8 changes: 4 additions & 4 deletions dags/compute_acteurs/tasks/business_logic/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from .apply_corrections_acteur import * # noqa
from .compute_parent_ps import * # noqa
from .compute_acteur import * # noqa
from .compute_acteur_services import * # noqa
from .compute_labels import * # noqa
from .compute_ps import * # noqa
from .db_data_write import * # noqa
from .deduplicate_acteur_services import * # noqa
from .deduplicate_acteur_sources import * # noqa
from .deduplicate_labels import * # noqa
from .merge_acteur_services import * # noqa
from .merge_labels import * # noqa
from .deduplicate_propositionservices import * # noqa
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import shortuuid


def apply_corrections_acteur(df_acteur: pd.DataFrame, df_revisionacteur: pd.DataFrame):
def compute_acteur(df_acteur: pd.DataFrame, df_revisionacteur: pd.DataFrame):

revisionacteur_parent_ids = df_revisionacteur["parent_id"].unique()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
)


def merge_acteur_services(
def compute_acteur_services(
df_acteur_acteur_services: pd.DataFrame,
df_revisionacteur_acteur_services: pd.DataFrame,
df_revisionacteur: pd.DataFrame,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
)


def merge_labels(
def compute_labels(
df_acteur_labels: pd.DataFrame,
df_revisionacteur_labels: pd.DataFrame,
df_revisionacteur: pd.DataFrame,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
)


def deduplicate_acteur_serivces(
def deduplicate_acteur_services(
df_children: pd.DataFrame,
df_merge_acteur_services: pd.DataFrame,
df_acteur_services: pd.DataFrame,
):
return deduplicate_acteurs_many2many_relationship(
df_children,
df_merge_acteur_services,
df_acteur_services,
"acteurservice_id",
)
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@

def deduplicate_labels(
df_children: pd.DataFrame,
df_merge_labels: pd.DataFrame,
df_labels: pd.DataFrame,
):
return deduplicate_acteurs_many2many_relationship(
df_children,
df_merge_labels,
df_labels,
"labelqualite_id",
)
Loading

0 comments on commit a9db461

Please sign in to comment.