From c6326608ce842c86252f1a2ec082b3b050d3ce9e Mon Sep 17 00:00:00 2001 From: Amisha Singla Date: Wed, 11 Dec 2024 17:02:12 -0600 Subject: [PATCH] Add batch_run_date update xcom value update set xcom input simplify rename --- dags/external_data_dag.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/dags/external_data_dag.py b/dags/external_data_dag.py index 2d5300ec..ba05b957 100644 --- a/dags/external_data_dag.py +++ b/dags/external_data_dag.py @@ -40,6 +40,9 @@ "alias": "external", }, render_template_as_native_obj=True, + user_defined_macros={ + "batch_run_date_as_datetime_string": macros.batch_run_date_as_datetime_string, + }, user_defined_filters={ "fromjson": lambda s: loads(s), "container_resources": lambda s: k8s.V1ResourceRequirements(requests=s), @@ -49,7 +52,7 @@ def stellar_etl_internal_task( - dag, task_name, command, cmd_args=[], resource_cfg="default" + dag, task_name, command, cmd_args=[], resource_cfg="default", output_file="" ): namespace = conf.get("kubernetes", "NAMESPACE") @@ -68,6 +71,9 @@ def stellar_etl_internal_task( image = "{{ var.value.stellar_etl_internal_image_name }}" + etl_cmd_string = " ".join(cmd_args) + arguments = f""" {command} {etl_cmd_string} 1>> stdout.out 2>> stderr.out && cat stdout.out && cat stderr.out && echo "{{\\"output\\": \\"{output_file}\\"}}" >> /airflow/xcom/return.json""" + return KubernetesPodOperator( task_id=task_name, name=task_name, @@ -79,8 +85,9 @@ def stellar_etl_internal_task( "RETOOL_API_KEY": "{{ var.value.retool_api_key }}", }, image=image, - cmds=[command], - arguments=cmd_args, + cmds=["bash", "-c"], + arguments=[arguments], + do_xcom_push=True, dag=dag, is_delete_operator_pod=True, startup_timeout_seconds=720, @@ -104,7 +111,6 @@ def stellar_etl_internal_task( "retool-exported-entity.txt", ) - retool_export_task = stellar_etl_internal_task( dag, "export_retool_data", @@ -121,6 +127,7 @@ def stellar_etl_internal_task( "--output", retool_filepath, ], + output_file=retool_filepath, ) retool_table_name = "retool_entity_data" @@ -138,7 +145,7 @@ def stellar_etl_internal_task( + retool_source_object_suffix ] -task_vars = { +retool_task_vars = { "task_id": f"del_ins_{retool_table_name}_task", "project": retool_public_project, "dataset": retool_public_dataset, @@ -154,7 +161,7 @@ def stellar_etl_internal_task( } retool_insert_to_bq_task = create_del_ins_task( - dag, task_vars, build_del_ins_from_gcs_to_bq_task + dag, retool_task_vars, build_del_ins_from_gcs_to_bq_task ) retool_export_task >> retool_insert_to_bq_task