diff --git a/dags/external_data_dag.py b/dags/external_data_dag.py index 6df6615c..2d5300ec 100644 --- a/dags/external_data_dag.py +++ b/dags/external_data_dag.py @@ -97,9 +97,11 @@ def stellar_etl_internal_task( ) -run_id = "{{ run_id }}" -filepath = os.path.join( - Variable.get("gcs_exported_object_prefix"), run_id, "retool-exported-entity.txt" +retool_run_id = "{{ run_id }}" +retool_filepath = os.path.join( + Variable.get("gcs_exported_object_prefix"), + retool_run_id, + "retool-exported-entity.txt", ) @@ -117,38 +119,38 @@ def stellar_etl_internal_task( "--cloud-provider", "gcp", "--output", - filepath, + retool_filepath, ], ) -table_name = "retool_entity_data" -table_id = "test-hubble-319619.test_crypto_stellar_internal.retool_entity_data" -public_project = "test-hubble-319619" -public_dataset = "test_crypto_stellar_internal" -batch_id = macros.get_batch_id() -batch_date = "{{ batch_run_date_as_datetime_string(dag, data_interval_start) }}" -export_task_id = "export_retool_data" -source_object_suffix = "/*-retool-exported-entity.txt" -source_objects = [ +retool_table_name = "retool_entity_data" +retool_table_id = "test-hubble-319619.test_crypto_stellar_internal.retool_entity_data" +retool_public_project = "test-hubble-319619" +retool_public_dataset = "test_crypto_stellar_internal" +retool_batch_id = macros.get_batch_id() +retool_batch_date = "{{ batch_run_date_as_datetime_string(dag, data_interval_start) }}" +retool_export_task_id = "export_retool_data" +retool_source_object_suffix = "/*-retool-exported-entity.txt" +retool_source_objects = [ "{{ task_instance.xcom_pull(task_ids='" - + export_task_id + + retool_export_task_id + '\')["output"] }}' - + source_object_suffix + + retool_source_object_suffix ] task_vars = { - "task_id": f"del_ins_{table_name}_task", - "project": public_project, - "dataset": public_dataset, - "table_name": table_name, + "task_id": f"del_ins_{retool_table_name}_task", + "project": retool_public_project, + "dataset": retool_public_dataset, + "table_name": retool_table_name, "export_task_id": "export_retool_data", - "source_object_suffix": source_object_suffix, + "source_object_suffix": retool_source_object_suffix, "partition": False, "cluster": False, - "batch_id": batch_id, - "batch_date": batch_date, - "source_objects": source_objects, - "table_id": table_id, + "batch_id": retool_batch_id, + "batch_date": retool_batch_date, + "source_objects": retool_source_objects, + "table_id": retool_table_id, } retool_insert_to_bq_task = create_del_ins_task(