Skip to content

Commit

Permalink
Add batch_run_date
Browse files Browse the repository at this point in the history
update xcom value

update

set xcom input

simplify

rename
  • Loading branch information
amishas157 committed Dec 12, 2024
1 parent 718238e commit c632660
Showing 1 changed file with 13 additions and 6 deletions.
19 changes: 13 additions & 6 deletions dags/external_data_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@
"alias": "external",
},
render_template_as_native_obj=True,
user_defined_macros={
"batch_run_date_as_datetime_string": macros.batch_run_date_as_datetime_string,
},
user_defined_filters={
"fromjson": lambda s: loads(s),
"container_resources": lambda s: k8s.V1ResourceRequirements(requests=s),
Expand All @@ -49,7 +52,7 @@


def stellar_etl_internal_task(
dag, task_name, command, cmd_args=[], resource_cfg="default"
dag, task_name, command, cmd_args=[], resource_cfg="default", output_file=""
):
namespace = conf.get("kubernetes", "NAMESPACE")

Expand All @@ -68,6 +71,9 @@ def stellar_etl_internal_task(

image = "{{ var.value.stellar_etl_internal_image_name }}"

etl_cmd_string = " ".join(cmd_args)
arguments = f""" {command} {etl_cmd_string} 1>> stdout.out 2>> stderr.out && cat stdout.out && cat stderr.out && echo "{{\\"output\\": \\"{output_file}\\"}}" >> /airflow/xcom/return.json"""

return KubernetesPodOperator(
task_id=task_name,
name=task_name,
Expand All @@ -79,8 +85,9 @@ def stellar_etl_internal_task(
"RETOOL_API_KEY": "{{ var.value.retool_api_key }}",
},
image=image,
cmds=[command],
arguments=cmd_args,
cmds=["bash", "-c"],
arguments=[arguments],
do_xcom_push=True,
dag=dag,
is_delete_operator_pod=True,
startup_timeout_seconds=720,
Expand All @@ -104,7 +111,6 @@ def stellar_etl_internal_task(
"retool-exported-entity.txt",
)


retool_export_task = stellar_etl_internal_task(
dag,
"export_retool_data",
Expand All @@ -121,6 +127,7 @@ def stellar_etl_internal_task(
"--output",
retool_filepath,
],
output_file=retool_filepath,
)

retool_table_name = "retool_entity_data"
Expand All @@ -138,7 +145,7 @@ def stellar_etl_internal_task(
+ retool_source_object_suffix
]

task_vars = {
retool_task_vars = {
"task_id": f"del_ins_{retool_table_name}_task",
"project": retool_public_project,
"dataset": retool_public_dataset,
Expand All @@ -154,7 +161,7 @@ def stellar_etl_internal_task(
}

retool_insert_to_bq_task = create_del_ins_task(
dag, task_vars, build_del_ins_from_gcs_to_bq_task
dag, retool_task_vars, build_del_ins_from_gcs_to_bq_task
)

retool_export_task >> retool_insert_to_bq_task

0 comments on commit c632660

Please sign in to comment.