Skip to content

Commit

Permalink
Simplification des dags source et ajout du contexte (#1248)
Browse files Browse the repository at this point in the history
  • Loading branch information
kolok authored Jan 23, 2025
1 parent 882e025 commit e39678f
Show file tree
Hide file tree
Showing 61 changed files with 711 additions and 1,361 deletions.
2 changes: 2 additions & 0 deletions dags/sources/config/airflow_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
clean_identifiant_externe,
clean_identifiant_unique,
clean_label_codes,
clean_proposition_services,
clean_siret_and_siren,
clean_telephone,
compute_location,
Expand Down Expand Up @@ -67,6 +68,7 @@
"get_latlng_from_geopoint": get_latlng_from_geopoint,
"strip_lower_string": strip_lower_string,
"compute_location": compute_location,
"clean_proposition_services": clean_proposition_services,
}


Expand Down
5 changes: 5 additions & 0 deletions dags/sources/dags/source_aliapur.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@
"transformation": "clean_action_codes",
"destination": ["action_codes"],
},
{
"origin": ["action_codes", "souscategorie_codes"],
"transformation": "clean_proposition_services",
"destination": ["proposition_services_codes"],
},
# 5. Supression des colonnes
{"remove": "_i"},
{"remove": "_id"},
Expand Down
5 changes: 5 additions & 0 deletions dags/sources/dags/source_citeo.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@
"transformation": "clean_action_codes",
"destination": ["action_codes"],
},
{
"origin": ["action_codes", "souscategorie_codes"],
"transformation": "clean_proposition_services",
"destination": ["proposition_services_codes"],
},
# 5. Supression des colonnes
{"remove": "_i"},
{"remove": "_id"},
Expand Down
5 changes: 5 additions & 0 deletions dags/sources/dags/source_corepile.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@
"transformation": "clean_action_codes",
"destination": ["action_codes"],
},
{
"origin": ["action_codes", "souscategorie_codes"],
"transformation": "clean_proposition_services",
"destination": ["proposition_services_codes"],
},
# 5. Supression des colonnes
{"remove": "_i"},
{"remove": "_id"},
Expand Down
5 changes: 5 additions & 0 deletions dags/sources/dags/source_cyclevia.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@
"transformation": "clean_action_codes",
"destination": ["action_codes"],
},
{
"origin": ["action_codes", "souscategorie_codes"],
"transformation": "clean_proposition_services",
"destination": ["proposition_services_codes"],
},
# 5. Supression des colonnes
{"remove": "_i"},
{"remove": "_id"},
Expand Down
5 changes: 5 additions & 0 deletions dags/sources/dags/source_ecodds.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@
"transformation": "clean_action_codes",
"destination": ["action_codes"],
},
{
"origin": ["action_codes", "souscategorie_codes"],
"transformation": "clean_proposition_services",
"destination": ["proposition_services_codes"],
},
# 5. Supression des colonnes
{"remove": "_i"},
{"remove": "_id"},
Expand Down
5 changes: 5 additions & 0 deletions dags/sources/dags/source_ecologic.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@
"transformation": "clean_action_codes",
"destination": ["action_codes"],
},
{
"origin": ["action_codes", "souscategorie_codes"],
"transformation": "clean_proposition_services",
"destination": ["proposition_services_codes"],
},
# 5. Supression des colonnes
{"remove": "_i"},
{"remove": "_id"},
Expand Down
5 changes: 5 additions & 0 deletions dags/sources/dags/source_ecomaison.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@
"transformation": "clean_action_codes",
"destination": ["action_codes"],
},
{
"origin": ["action_codes", "souscategorie_codes"],
"transformation": "clean_proposition_services",
"destination": ["proposition_services_codes"],
},
# 5. Supression des colonnes
{"remove": "_i"},
{"remove": "_id"},
Expand Down
5 changes: 5 additions & 0 deletions dags/sources/dags/source_ecopae.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@
"transformation": "clean_action_codes",
"destination": ["action_codes"],
},
{
"origin": ["action_codes", "souscategorie_codes"],
"transformation": "clean_proposition_services",
"destination": ["proposition_services_codes"],
},
# 5. Supression des colonnes
{"remove": "_i"},
{"remove": "_id"},
Expand Down
5 changes: 5 additions & 0 deletions dags/sources/dags/source_ecosystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@
"transformation": "clean_action_codes",
"destination": ["action_codes"],
},
{
"origin": ["action_codes", "souscategorie_codes"],
"transformation": "clean_proposition_services",
"destination": ["proposition_services_codes"],
},
# 5. Supression des colonnes
{"remove": "_i"},
{"remove": "_id"},
Expand Down
5 changes: 5 additions & 0 deletions dags/sources/dags/source_ocab.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@
"transformation": "clean_action_codes",
"destination": ["action_codes"],
},
{
"origin": ["action_codes", "souscategorie_codes"],
"transformation": "clean_proposition_services",
"destination": ["proposition_services_codes"],
},
# 5. Supression des colonnes
{"remove": "_i"},
{"remove": "_id"},
Expand Down
5 changes: 5 additions & 0 deletions dags/sources/dags/source_ocad3e.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@
"transformation": "clean_action_codes",
"destination": ["action_codes"],
},
{
"origin": ["action_codes", "souscategorie_codes"],
"transformation": "clean_proposition_services",
"destination": ["proposition_services_codes"],
},
# 5. Supression des colonnes
{"remove": "_i"},
{"remove": "_id"},
Expand Down
5 changes: 5 additions & 0 deletions dags/sources/dags/source_pharmacies.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@
"transformation": "clean_identifiant_unique",
"destination": ["identifiant_unique"],
},
{
"origin": ["action_codes", "souscategorie_codes"],
"transformation": "clean_proposition_services",
"destination": ["proposition_services_codes"],
},
# 5. Supression des colonnes
{"remove": "Téléphone"},
{"remove": "Région"},
Expand Down
5 changes: 5 additions & 0 deletions dags/sources/dags/source_pyreo.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@
"transformation": "clean_action_codes",
"destination": ["action_codes"],
},
{
"origin": ["action_codes", "souscategorie_codes"],
"transformation": "clean_proposition_services",
"destination": ["proposition_services_codes"],
},
# 5. Supression des colonnes
{"remove": "_i"},
{"remove": "_id"},
Expand Down
5 changes: 5 additions & 0 deletions dags/sources/dags/source_refashion.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@
"transformation": "clean_action_codes",
"destination": ["action_codes"],
},
{
"origin": ["action_codes", "souscategorie_codes"],
"transformation": "clean_proposition_services",
"destination": ["proposition_services_codes"],
},
# 5. Supression des colonnes
{"remove": "_i"},
{"remove": "_id"},
Expand Down
5 changes: 5 additions & 0 deletions dags/sources/dags/source_screlec.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@
"transformation": "clean_action_codes",
"destination": ["action_codes"],
},
{
"origin": ["action_codes", "souscategorie_codes"],
"transformation": "clean_proposition_services",
"destination": ["proposition_services_codes"],
},
# 5. Supression des colonnes
{"remove": "_i"},
{"remove": "_id"},
Expand Down
5 changes: 5 additions & 0 deletions dags/sources/dags/source_sinoe.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@
"transformation": "clean_identifiant_unique",
"destination": ["identifiant_unique"],
},
{
"origin": ["action_codes", "souscategorie_codes"],
"transformation": "clean_proposition_services",
"destination": ["proposition_services_codes"],
},
# 5. Supression des colonnes
{"remove": "_geopoint"},
{"remove": "_i"},
Expand Down
5 changes: 5 additions & 0 deletions dags/sources/dags/source_soren.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@
"transformation": "clean_action_codes",
"destination": ["action_codes"],
},
{
"origin": ["action_codes", "souscategorie_codes"],
"transformation": "clean_proposition_services",
"destination": ["proposition_services_codes"],
},
# 5. Supression des colonnes
{"remove": "_i"},
{"remove": "_id"},
Expand Down
5 changes: 5 additions & 0 deletions dags/sources/dags/source_valdelia.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@
"transformation": "clean_action_codes",
"destination": ["action_codes"],
},
{
"origin": ["action_codes", "souscategorie_codes"],
"transformation": "clean_proposition_services",
"destination": ["proposition_services_codes"],
},
# 5. Supression des colonnes
{"remove": "_i"},
{"remove": "_id"},
Expand Down
57 changes: 57 additions & 0 deletions dags/sources/tasks/airflow_logic/compute_link_tables_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import logging

from airflow import DAG
from airflow.operators.python import PythonOperator
from sources.tasks.business_logic.compute_link_tables import compute_link_tables
from sources.tasks.transform.read_mapping_from_postgres import (
read_mapping_from_postgres,
)
from utils import logging_utils as log

logger = logging.getLogger(__name__)


def compute_link_tables_task(dag: DAG) -> PythonOperator:
return PythonOperator(
task_id="compute_link_tables",
python_callable=compute_link_tables_wrapper,
dag=dag,
)


def compute_link_tables_wrapper(**kwargs):
df_acteur = kwargs["ti"].xcom_pull(task_ids="propose_acteur_changes")["df"]
acteurservice_id_by_code = read_mapping_from_postgres(
table_name="qfdmo_acteurservice"
)
labelqualite_id_by_code = read_mapping_from_postgres(
table_name="qfdmo_labelqualite"
)
actions_id_by_code = read_mapping_from_postgres(table_name="qfdmo_action")
souscats_id_by_code = read_mapping_from_postgres(
table_name="qfdmo_souscategorieobjet"
)
source_id_by_code = read_mapping_from_postgres(table_name="qfdmo_source")
acteurtype_id_by_code = read_mapping_from_postgres(table_name="qfdmo_acteurtype")

log.preview("df_acteur", df_acteur)
log.preview("acteurservice_id_by_code", acteurservice_id_by_code)
log.preview("labelqualite_id_by_code", labelqualite_id_by_code)
log.preview("actions_id_by_code", actions_id_by_code)
log.preview("souscats_id_by_code", souscats_id_by_code)
log.preview("source_id_by_code", source_id_by_code)
log.preview("acteurtype_id_by_code", acteurtype_id_by_code)

df_acteur = compute_link_tables(
df_acteur=df_acteur,
acteurservice_id_by_code=acteurservice_id_by_code,
labelqualite_id_by_code=labelqualite_id_by_code,
actions_id_by_code=actions_id_by_code,
souscats_id_by_code=souscats_id_by_code,
source_id_by_code=source_id_by_code,
acteurtype_id_by_code=acteurtype_id_by_code,
)

log.preview("df_acteur après traitement", df_acteur)

return df_acteur
40 changes: 7 additions & 33 deletions dags/sources/tasks/airflow_logic/db_data_prepare_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@
from airflow import DAG
from airflow.operators.python import PythonOperator
from sources.tasks.business_logic.db_data_prepare import db_data_prepare
from sources.tasks.business_logic.read_mapping_from_postgres import (
read_mapping_from_postgres,
)
from utils import logging_utils as log

logger = logging.getLogger(__name__)
Expand All @@ -20,36 +17,13 @@ def db_data_prepare_task(dag: DAG) -> PythonOperator:


def db_data_prepare_wrapper(**kwargs):
df_acteur_to_delete = kwargs["ti"].xcom_pull(task_ids="propose_acteur_to_delete")[
"df_acteur_to_delete"
]
df_actors = kwargs["ti"].xcom_pull(task_ids="propose_acteur_changes")["df"]
df_ps = kwargs["ti"].xcom_pull(task_ids="propose_services")["df"]
df_pssc = kwargs["ti"].xcom_pull(task_ids="propose_services_sous_categories")
df_labels = kwargs["ti"].xcom_pull(task_ids="propose_labels")
df_acteur_services = kwargs["ti"].xcom_pull(task_ids="propose_acteur_services")
df_acteurs_from_db = kwargs["ti"].xcom_pull(task_ids="db_read_acteur")
source_id_by_code = read_mapping_from_postgres(table_name="qfdmo_source")
acteurtype_id_by_code = read_mapping_from_postgres(table_name="qfdmo_acteurtype")

log.preview("df_acteur_to_delete", df_acteur_to_delete)
log.preview("df_actors", df_actors)
log.preview("df_ps", df_ps)
log.preview("df_pssc", df_pssc)
log.preview("df_labels", df_labels)
log.preview("df_acteur_services", df_acteur_services)
log.preview("df_acteurs_from_db", df_acteurs_from_db)
log.preview("source_id_by_code", source_id_by_code)
log.preview("acteurtype_id_by_code", acteurtype_id_by_code)
df_acteur = kwargs["ti"].xcom_pull(task_ids="compute_link_tables")
df_acteur_from_db = kwargs["ti"].xcom_pull(task_ids="db_read_acteur")

log.preview("df_acteur", df_acteur)
log.preview("df_acteur_from_db", df_acteur_from_db)

return db_data_prepare(
df_acteur_to_delete=df_acteur_to_delete,
df_acteur=df_actors,
df_ps=df_ps,
df_pssc=df_pssc,
df_labels=df_labels,
df_acteur_services=df_acteur_services,
df_acteurs_from_db=df_acteurs_from_db,
source_id_by_code=source_id_by_code,
acteurtype_id_by_code=acteurtype_id_by_code,
df_acteur=df_acteur,
df_acteur_from_db=df_acteur_from_db,
)

This file was deleted.

Loading

0 comments on commit e39678f

Please sign in to comment.