From 30ee4940fc8955b7554f9d0bc08f0108f29ce603 Mon Sep 17 00:00:00 2001 From: Amisha Singla Date: Fri, 6 Dec 2024 17:03:12 -0600 Subject: [PATCH 01/15] Add DAG for exporting retool data --- airflow_variables_dev.json | 2 + dags/external_data_dag.py | 100 +++++++++++++++++++++++++++++++++++++ 2 files changed, 102 insertions(+) create mode 100644 dags/external_data_dag.py diff --git a/airflow_variables_dev.json b/airflow_variables_dev.json index 83e50b63..c6b1eb5c 100644 --- a/airflow_variables_dev.json +++ b/airflow_variables_dev.json @@ -315,6 +315,7 @@ "schema_filepath": "/home/airflow/gcs/dags/schemas/", "sentry_dsn": "https://9e0a056541c3445083329b072f2df690@o14203.ingest.us.sentry.io/6190849", "sentry_environment": "development", + "stellar_etl_internal_image_name": "stellar/stellar-etl:204d343fa", "table_ids": { "accounts": "accounts", "assets": "history_assets", @@ -352,6 +353,7 @@ "elementary_dbt_data_quality": 1620, "elementary_generate_report": 1200, "enriched_history_operations": 780, + "export_retool_data": 720, "fee_stats": 840, "history_assets": 720, "liquidity_pool_trade_volume": 1140, diff --git a/dags/external_data_dag.py b/dags/external_data_dag.py new file mode 100644 index 00000000..bea102f1 --- /dev/null +++ b/dags/external_data_dag.py @@ -0,0 +1,100 @@ +""" +The external_data_dag DAG exports data from external sources. +It is scheduled to export information to BigQuery at regular intervals. +""" + +from ast import literal_eval +from datetime import datetime +from json import loads + +from airflow import DAG +from airflow.models.variable import Variable +from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator +from kubernetes.client import models as k8s +from stellar_etl_airflow import macros +from stellar_etl_airflow.default import ( + alert_after_max_retries, + get_default_dag_args, + init_sentry, +) + +init_sentry() + +# Initialize the DAG +dag = DAG( + "external_data_dag", + default_args=get_default_dag_args(), + start_date=datetime(2024, 12, 5, 14, 30), + description="This DAG exports data from external sources such as retool.", + schedule_interval="*/10 * * * *", + render_template_as_native_obj=True, + user_defined_filters={ + "fromjson": lambda s: loads(s), + "container_resources": lambda s: k8s.V1ResourceRequirements(requests=s), + "literal_eval": lambda e: literal_eval(e), + }, +) + + +def stellar_etl_internal_task( + dag, task_name, command, cmd_args=[], resource_cfg="default" +): + namespace = conf.get("kubernetes", "NAMESPACE") + + if namespace == "default": + config_file_location = Variable.get("kube_config_location") + in_cluster = False + else: + config_file_location = None + in_cluster = True + + requests = { + "cpu": f"{{{{ var.json.resources.{resource_cfg}.requests.cpu }}}}", + "memory": f"{{{{ var.json.resources.{resource_cfg}.requests.memory }}}}", + } + container_resources = k8s.V1ResourceRequirements(requests=requests) + + image = "{{ var.value.stellar_etl_internal_image_name }}" + + logging.info(f"sh commands to run in pod: {args}") + + return KubernetesPodOperator( + task_id=task_name, + name=task_name, + namespace=Variable.get("k8s_namespace"), + service_account_name=Variable.get("k8s_service_account"), + env_vars={ + "EXECUTION_DATE": "{{ ds }}", + "AIRFLOW_START_TIMESTAMP": "{{ ti.start_date.strftime('%Y-%m-%dT%H:%M:%SZ') }}", + }, + image=image, + cmds=[command], + arguments=cmd_args, + dag=dag, + is_delete_operator_pod=True, + startup_timeout_seconds=720, + in_cluster=in_cluster, + config_file=config_file_location, + container_resources=container_resources, + on_failure_callback=alert_after_max_retries, + image_pull_policy="IfNotPresent", + image_pull_secrets=[k8s.V1LocalObjectReference("private-docker-auth")], + sla=timedelta( + seconds=Variable.get("task_sla", deserialize_json=True)[f"task_name"] + ), + trigger_rule="all_done", + ) + + +retool_export_task = stellar_etl_internal_task( + dag, + "export_retool_data", + "export-retool", + cmd_args=[ + "--start-time", + "2024-01-01T16:30:00+00:00", + "--end-time", + "2025-01-01T16:30:00+00:00", + "--testnet", + ], +) From ef7e871566f736e47aaa62479d9ba3f5b2d8b9b0 Mon Sep 17 00:00:00 2001 From: Amisha Singla Date: Fri, 6 Dec 2024 17:13:03 -0600 Subject: [PATCH 02/15] udpate image udpate image udpate image udpate image udpate image --- airflow_variables_dev.json | 2 +- dags/external_data_dag.py | 7 +++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/airflow_variables_dev.json b/airflow_variables_dev.json index c6b1eb5c..32dd10b6 100644 --- a/airflow_variables_dev.json +++ b/airflow_variables_dev.json @@ -315,7 +315,7 @@ "schema_filepath": "/home/airflow/gcs/dags/schemas/", "sentry_dsn": "https://9e0a056541c3445083329b072f2df690@o14203.ingest.us.sentry.io/6190849", "sentry_environment": "development", - "stellar_etl_internal_image_name": "stellar/stellar-etl:204d343fa", + "stellar_etl_internal_image_name": " amishastellar/stellar-etl-internal:ab0a13897", "table_ids": { "accounts": "accounts", "assets": "history_assets", diff --git a/dags/external_data_dag.py b/dags/external_data_dag.py index bea102f1..96266482 100644 --- a/dags/external_data_dag.py +++ b/dags/external_data_dag.py @@ -4,10 +4,11 @@ """ from ast import literal_eval -from datetime import datetime +from datetime import datetime, timedelta from json import loads from airflow import DAG +from airflow.configuration import conf from airflow.models.variable import Variable from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator from kubernetes.client import models as k8s @@ -56,8 +57,6 @@ def stellar_etl_internal_task( image = "{{ var.value.stellar_etl_internal_image_name }}" - logging.info(f"sh commands to run in pod: {args}") - return KubernetesPodOperator( task_id=task_name, name=task_name, @@ -80,7 +79,7 @@ def stellar_etl_internal_task( image_pull_policy="IfNotPresent", image_pull_secrets=[k8s.V1LocalObjectReference("private-docker-auth")], sla=timedelta( - seconds=Variable.get("task_sla", deserialize_json=True)[f"task_name"] + seconds=Variable.get("task_sla", deserialize_json=True)[task_name] ), trigger_rule="all_done", ) From 47ca1d2775f12b1c80f90c1f25e47af3c012f836 Mon Sep 17 00:00:00 2001 From: Amisha Singla Date: Tue, 10 Dec 2024 17:08:35 -0600 Subject: [PATCH 03/15] Pass api key --- airflow_variables_dev.json | 3 ++- dags/external_data_dag.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/airflow_variables_dev.json b/airflow_variables_dev.json index 32dd10b6..170de89e 100644 --- a/airflow_variables_dev.json +++ b/airflow_variables_dev.json @@ -311,11 +311,12 @@ } } }, + "retool_api_key": "test-api-key", "sandbox_dataset": "crypto_stellar_internal_sandbox", "schema_filepath": "/home/airflow/gcs/dags/schemas/", "sentry_dsn": "https://9e0a056541c3445083329b072f2df690@o14203.ingest.us.sentry.io/6190849", "sentry_environment": "development", - "stellar_etl_internal_image_name": " amishastellar/stellar-etl-internal:ab0a13897", + "stellar_etl_internal_image_name": "amishastellar/stellar-etl-internal:ab0a13897", "table_ids": { "accounts": "accounts", "assets": "history_assets", diff --git a/dags/external_data_dag.py b/dags/external_data_dag.py index 96266482..233cc061 100644 --- a/dags/external_data_dag.py +++ b/dags/external_data_dag.py @@ -65,6 +65,7 @@ def stellar_etl_internal_task( env_vars={ "EXECUTION_DATE": "{{ ds }}", "AIRFLOW_START_TIMESTAMP": "{{ ti.start_date.strftime('%Y-%m-%dT%H:%M:%SZ') }}", + "RETOOL_API_KEY": "{{ var.value.retool_api_key }}", }, image=image, cmds=[command], @@ -94,6 +95,5 @@ def stellar_etl_internal_task( "2024-01-01T16:30:00+00:00", "--end-time", "2025-01-01T16:30:00+00:00", - "--testnet", ], ) From 3e56ccb8f725a39fa32baf4632ad50be902875f3 Mon Sep 17 00:00:00 2001 From: Amisha Singla Date: Tue, 10 Dec 2024 17:27:37 -0600 Subject: [PATCH 04/15] send cloud flags --- dags/external_data_dag.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/dags/external_data_dag.py b/dags/external_data_dag.py index 233cc061..1d695ce6 100644 --- a/dags/external_data_dag.py +++ b/dags/external_data_dag.py @@ -95,5 +95,9 @@ def stellar_etl_internal_task( "2024-01-01T16:30:00+00:00", "--end-time", "2025-01-01T16:30:00+00:00", + "--cloud-storage-bucket", + Variable.get("gcs_exported_data_bucket_name"), + "--cloud-provider", + "gcp", ], ) From 1102681a6b066225a89f597e6a914babf6c114d4 Mon Sep 17 00:00:00 2001 From: Amisha Singla Date: Wed, 11 Dec 2024 16:33:46 -0600 Subject: [PATCH 05/15] Add task to write to bq Add alias --- airflow_variables_dev.json | 1 + dags/external_data_dag.py | 55 +++++++++ schemas/retool_entity_data_schema.json | 163 +++++++++++++++++++++++++ 3 files changed, 219 insertions(+) create mode 100644 schemas/retool_entity_data_schema.json diff --git a/airflow_variables_dev.json b/airflow_variables_dev.json index 170de89e..19a4da25 100644 --- a/airflow_variables_dev.json +++ b/airflow_variables_dev.json @@ -351,6 +351,7 @@ "create_sandbox": 2400, "current_state": 720, "default": 60, + "del_ins_retool_entity_data_task": 720, "elementary_dbt_data_quality": 1620, "elementary_generate_report": 1200, "enriched_history_operations": 780, diff --git a/dags/external_data_dag.py b/dags/external_data_dag.py index 1d695ce6..6df6615c 100644 --- a/dags/external_data_dag.py +++ b/dags/external_data_dag.py @@ -3,6 +3,7 @@ It is scheduled to export information to BigQuery at regular intervals. """ +import os from ast import literal_eval from datetime import datetime, timedelta from json import loads @@ -13,6 +14,13 @@ from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator from kubernetes.client import models as k8s from stellar_etl_airflow import macros +from stellar_etl_airflow.build_del_ins_from_gcs_to_bq_task import ( + build_del_ins_from_gcs_to_bq_task, +) +from stellar_etl_airflow.build_del_ins_operator import ( + create_del_ins_task, + initialize_task_vars, +) from stellar_etl_airflow.default import ( alert_after_max_retries, get_default_dag_args, @@ -28,6 +36,9 @@ start_date=datetime(2024, 12, 5, 14, 30), description="This DAG exports data from external sources such as retool.", schedule_interval="*/10 * * * *", + params={ + "alias": "external", + }, render_template_as_native_obj=True, user_defined_filters={ "fromjson": lambda s: loads(s), @@ -86,6 +97,12 @@ def stellar_etl_internal_task( ) +run_id = "{{ run_id }}" +filepath = os.path.join( + Variable.get("gcs_exported_object_prefix"), run_id, "retool-exported-entity.txt" +) + + retool_export_task = stellar_etl_internal_task( dag, "export_retool_data", @@ -99,5 +116,43 @@ def stellar_etl_internal_task( Variable.get("gcs_exported_data_bucket_name"), "--cloud-provider", "gcp", + "--output", + filepath, ], ) + +table_name = "retool_entity_data" +table_id = "test-hubble-319619.test_crypto_stellar_internal.retool_entity_data" +public_project = "test-hubble-319619" +public_dataset = "test_crypto_stellar_internal" +batch_id = macros.get_batch_id() +batch_date = "{{ batch_run_date_as_datetime_string(dag, data_interval_start) }}" +export_task_id = "export_retool_data" +source_object_suffix = "/*-retool-exported-entity.txt" +source_objects = [ + "{{ task_instance.xcom_pull(task_ids='" + + export_task_id + + '\')["output"] }}' + + source_object_suffix +] + +task_vars = { + "task_id": f"del_ins_{table_name}_task", + "project": public_project, + "dataset": public_dataset, + "table_name": table_name, + "export_task_id": "export_retool_data", + "source_object_suffix": source_object_suffix, + "partition": False, + "cluster": False, + "batch_id": batch_id, + "batch_date": batch_date, + "source_objects": source_objects, + "table_id": table_id, +} + +retool_insert_to_bq_task = create_del_ins_task( + dag, task_vars, build_del_ins_from_gcs_to_bq_task +) + +retool_export_task >> retool_insert_to_bq_task diff --git a/schemas/retool_entity_data_schema.json b/schemas/retool_entity_data_schema.json new file mode 100644 index 00000000..2be79a99 --- /dev/null +++ b/schemas/retool_entity_data_schema.json @@ -0,0 +1,163 @@ +[ + { + "description": "", + "fields": [], + "mode": "", + "name": "batch_id", + "type": "STRING" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "batch_run_date", + "type": "DATETIME" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "batch_insert_ts", + "type": "TIMESTAMP" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "created_at", + "type": "TIMESTAMP" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "updated_at", + "type": "TIMESTAMP" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "id", + "type": "INTEGER" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "account_sponsor", + "type": "BOOLEAN" + }, + { + "description": "", + "fields": [], + "mode": "REPEATED", + "name": "app_geographies", + "type": "STRING" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "custodial", + "type": "BOOLEAN" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "description", + "type": "STRING" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "fee_sponsor", + "type": "BOOLEAN" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "home_domain", + "type": "STRING" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "home_domains_id", + "type": "INTEGER" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "live", + "type": "BOOLEAN" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "name", + "type": "STRING" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "non_custodial", + "type": "BOOLEAN" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "notes", + "type": "STRING" + }, + { + "description": "", + "fields": [], + "mode": "REPEATED", + "name": "ramps", + "type": "STRING" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "sdp_enabled", + "type": "BOOLEAN" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "soroban_enabled", + "type": "BOOLEAN" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "status", + "type": "STRING" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "verified", + "type": "BOOLEAN" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "website_url", + "type": "STRING" + } +] From 718238e8b0bdf8c1578b9a8bdf751f04b74b2dca Mon Sep 17 00:00:00 2001 From: Amisha Singla Date: Wed, 11 Dec 2024 16:53:38 -0600 Subject: [PATCH 06/15] rename --- dags/external_data_dag.py | 50 ++++++++++++++++++++------------------- 1 file changed, 26 insertions(+), 24 deletions(-) diff --git a/dags/external_data_dag.py b/dags/external_data_dag.py index 6df6615c..2d5300ec 100644 --- a/dags/external_data_dag.py +++ b/dags/external_data_dag.py @@ -97,9 +97,11 @@ def stellar_etl_internal_task( ) -run_id = "{{ run_id }}" -filepath = os.path.join( - Variable.get("gcs_exported_object_prefix"), run_id, "retool-exported-entity.txt" +retool_run_id = "{{ run_id }}" +retool_filepath = os.path.join( + Variable.get("gcs_exported_object_prefix"), + retool_run_id, + "retool-exported-entity.txt", ) @@ -117,38 +119,38 @@ def stellar_etl_internal_task( "--cloud-provider", "gcp", "--output", - filepath, + retool_filepath, ], ) -table_name = "retool_entity_data" -table_id = "test-hubble-319619.test_crypto_stellar_internal.retool_entity_data" -public_project = "test-hubble-319619" -public_dataset = "test_crypto_stellar_internal" -batch_id = macros.get_batch_id() -batch_date = "{{ batch_run_date_as_datetime_string(dag, data_interval_start) }}" -export_task_id = "export_retool_data" -source_object_suffix = "/*-retool-exported-entity.txt" -source_objects = [ +retool_table_name = "retool_entity_data" +retool_table_id = "test-hubble-319619.test_crypto_stellar_internal.retool_entity_data" +retool_public_project = "test-hubble-319619" +retool_public_dataset = "test_crypto_stellar_internal" +retool_batch_id = macros.get_batch_id() +retool_batch_date = "{{ batch_run_date_as_datetime_string(dag, data_interval_start) }}" +retool_export_task_id = "export_retool_data" +retool_source_object_suffix = "/*-retool-exported-entity.txt" +retool_source_objects = [ "{{ task_instance.xcom_pull(task_ids='" - + export_task_id + + retool_export_task_id + '\')["output"] }}' - + source_object_suffix + + retool_source_object_suffix ] task_vars = { - "task_id": f"del_ins_{table_name}_task", - "project": public_project, - "dataset": public_dataset, - "table_name": table_name, + "task_id": f"del_ins_{retool_table_name}_task", + "project": retool_public_project, + "dataset": retool_public_dataset, + "table_name": retool_table_name, "export_task_id": "export_retool_data", - "source_object_suffix": source_object_suffix, + "source_object_suffix": retool_source_object_suffix, "partition": False, "cluster": False, - "batch_id": batch_id, - "batch_date": batch_date, - "source_objects": source_objects, - "table_id": table_id, + "batch_id": retool_batch_id, + "batch_date": retool_batch_date, + "source_objects": retool_source_objects, + "table_id": retool_table_id, } retool_insert_to_bq_task = create_del_ins_task( From c6326608ce842c86252f1a2ec082b3b050d3ce9e Mon Sep 17 00:00:00 2001 From: Amisha Singla Date: Wed, 11 Dec 2024 17:02:12 -0600 Subject: [PATCH 07/15] Add batch_run_date update xcom value update set xcom input simplify rename --- dags/external_data_dag.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/dags/external_data_dag.py b/dags/external_data_dag.py index 2d5300ec..ba05b957 100644 --- a/dags/external_data_dag.py +++ b/dags/external_data_dag.py @@ -40,6 +40,9 @@ "alias": "external", }, render_template_as_native_obj=True, + user_defined_macros={ + "batch_run_date_as_datetime_string": macros.batch_run_date_as_datetime_string, + }, user_defined_filters={ "fromjson": lambda s: loads(s), "container_resources": lambda s: k8s.V1ResourceRequirements(requests=s), @@ -49,7 +52,7 @@ def stellar_etl_internal_task( - dag, task_name, command, cmd_args=[], resource_cfg="default" + dag, task_name, command, cmd_args=[], resource_cfg="default", output_file="" ): namespace = conf.get("kubernetes", "NAMESPACE") @@ -68,6 +71,9 @@ def stellar_etl_internal_task( image = "{{ var.value.stellar_etl_internal_image_name }}" + etl_cmd_string = " ".join(cmd_args) + arguments = f""" {command} {etl_cmd_string} 1>> stdout.out 2>> stderr.out && cat stdout.out && cat stderr.out && echo "{{\\"output\\": \\"{output_file}\\"}}" >> /airflow/xcom/return.json""" + return KubernetesPodOperator( task_id=task_name, name=task_name, @@ -79,8 +85,9 @@ def stellar_etl_internal_task( "RETOOL_API_KEY": "{{ var.value.retool_api_key }}", }, image=image, - cmds=[command], - arguments=cmd_args, + cmds=["bash", "-c"], + arguments=[arguments], + do_xcom_push=True, dag=dag, is_delete_operator_pod=True, startup_timeout_seconds=720, @@ -104,7 +111,6 @@ def stellar_etl_internal_task( "retool-exported-entity.txt", ) - retool_export_task = stellar_etl_internal_task( dag, "export_retool_data", @@ -121,6 +127,7 @@ def stellar_etl_internal_task( "--output", retool_filepath, ], + output_file=retool_filepath, ) retool_table_name = "retool_entity_data" @@ -138,7 +145,7 @@ def stellar_etl_internal_task( + retool_source_object_suffix ] -task_vars = { +retool_task_vars = { "task_id": f"del_ins_{retool_table_name}_task", "project": retool_public_project, "dataset": retool_public_dataset, @@ -154,7 +161,7 @@ def stellar_etl_internal_task( } retool_insert_to_bq_task = create_del_ins_task( - dag, task_vars, build_del_ins_from_gcs_to_bq_task + dag, retool_task_vars, build_del_ins_from_gcs_to_bq_task ) retool_export_task >> retool_insert_to_bq_task From fe6647d83f5fb3343ac01203ce7938789464282a Mon Sep 17 00:00:00 2001 From: Amisha Singla Date: Wed, 11 Dec 2024 18:14:55 -0600 Subject: [PATCH 08/15] update suffix --- dags/external_data_dag.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dags/external_data_dag.py b/dags/external_data_dag.py index ba05b957..947fcc1b 100644 --- a/dags/external_data_dag.py +++ b/dags/external_data_dag.py @@ -137,7 +137,7 @@ def stellar_etl_internal_task( retool_batch_id = macros.get_batch_id() retool_batch_date = "{{ batch_run_date_as_datetime_string(dag, data_interval_start) }}" retool_export_task_id = "export_retool_data" -retool_source_object_suffix = "/*-retool-exported-entity.txt" +retool_source_object_suffix = "" retool_source_objects = [ "{{ task_instance.xcom_pull(task_ids='" + retool_export_task_id From 6378f51f5f6a451fd6147aeb5d4cb7a1573c8784 Mon Sep 17 00:00:00 2001 From: Amisha Singla Date: Wed, 11 Dec 2024 19:08:25 -0600 Subject: [PATCH 09/15] update image --- airflow_variables_dev.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow_variables_dev.json b/airflow_variables_dev.json index 19a4da25..10461424 100644 --- a/airflow_variables_dev.json +++ b/airflow_variables_dev.json @@ -316,7 +316,7 @@ "schema_filepath": "/home/airflow/gcs/dags/schemas/", "sentry_dsn": "https://9e0a056541c3445083329b072f2df690@o14203.ingest.us.sentry.io/6190849", "sentry_environment": "development", - "stellar_etl_internal_image_name": "amishastellar/stellar-etl-internal:ab0a13897", + "stellar_etl_internal_image_name": "amishastellar/stellar-etl-internal:40f406c53", "table_ids": { "accounts": "accounts", "assets": "history_assets", From a2ec9465d26137420ef9a7fb0140a42a5b49855a Mon Sep 17 00:00:00 2001 From: Amisha Singla Date: Wed, 11 Dec 2024 19:55:13 -0600 Subject: [PATCH 10/15] Add metadata fields --- airflow_variables_dev.json | 2 +- dags/external_data_dag.py | 34 +++++++++++++++++++--------------- 2 files changed, 20 insertions(+), 16 deletions(-) diff --git a/airflow_variables_dev.json b/airflow_variables_dev.json index 10461424..22f97fb1 100644 --- a/airflow_variables_dev.json +++ b/airflow_variables_dev.json @@ -316,7 +316,7 @@ "schema_filepath": "/home/airflow/gcs/dags/schemas/", "sentry_dsn": "https://9e0a056541c3445083329b072f2df690@o14203.ingest.us.sentry.io/6190849", "sentry_environment": "development", - "stellar_etl_internal_image_name": "amishastellar/stellar-etl-internal:40f406c53", + "stellar_etl_internal_image_name": "amishastellar/stellar-etl-internal:e3b9a2ea7", "table_ids": { "accounts": "accounts", "assets": "history_assets", diff --git a/dags/external_data_dag.py b/dags/external_data_dag.py index 947fcc1b..5026961d 100644 --- a/dags/external_data_dag.py +++ b/dags/external_data_dag.py @@ -111,6 +111,23 @@ def stellar_etl_internal_task( "retool-exported-entity.txt", ) +retool_table_name = "retool_entity_data" +retool_table_id = "test-hubble-319619.test_crypto_stellar_internal.retool_entity_data" +retool_public_project = "test-hubble-319619" +retool_public_dataset = "test_crypto_stellar_internal" +retool_batch_id = macros.get_batch_id() +retool_batch_date = "{{ batch_run_date_as_datetime_string(dag, data_interval_start) }}" +retool_export_task_id = "export_retool_data" +retool_source_object_suffix = "" +retool_source_objects = [ + "{{ task_instance.xcom_pull(task_ids='" + + retool_export_task_id + + '\')["output"] }}' + + retool_source_object_suffix +] +batch_insert_ts = datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ") + + retool_export_task = stellar_etl_internal_task( dag, "export_retool_data", @@ -126,25 +143,12 @@ def stellar_etl_internal_task( "gcp", "--output", retool_filepath, + "-u", + f"'batch_id={retool_batch_id},batch_run_date={retool_batch_date},batch_insert_ts={batch_insert_ts}'", ], output_file=retool_filepath, ) -retool_table_name = "retool_entity_data" -retool_table_id = "test-hubble-319619.test_crypto_stellar_internal.retool_entity_data" -retool_public_project = "test-hubble-319619" -retool_public_dataset = "test_crypto_stellar_internal" -retool_batch_id = macros.get_batch_id() -retool_batch_date = "{{ batch_run_date_as_datetime_string(dag, data_interval_start) }}" -retool_export_task_id = "export_retool_data" -retool_source_object_suffix = "" -retool_source_objects = [ - "{{ task_instance.xcom_pull(task_ids='" - + retool_export_task_id - + '\')["output"] }}' - + retool_source_object_suffix -] - retool_task_vars = { "task_id": f"del_ins_{retool_table_name}_task", "project": retool_public_project, From e15108fa1adeaac1da3612ba99b563ebafa982e0 Mon Sep 17 00:00:00 2001 From: Amisha Singla Date: Wed, 11 Dec 2024 20:19:27 -0600 Subject: [PATCH 11/15] Change schedule and update start/end time --- dags/external_data_dag.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/dags/external_data_dag.py b/dags/external_data_dag.py index 5026961d..d663a103 100644 --- a/dags/external_data_dag.py +++ b/dags/external_data_dag.py @@ -35,7 +35,7 @@ default_args=get_default_dag_args(), start_date=datetime(2024, 12, 5, 14, 30), description="This DAG exports data from external sources such as retool.", - schedule_interval="*/10 * * * *", + schedule_interval="0 22 * * *", params={ "alias": "external", }, @@ -125,8 +125,10 @@ def stellar_etl_internal_task( + '\')["output"] }}' + retool_source_object_suffix ] -batch_insert_ts = datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ") +retool_batch_insert_ts = datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ") +retool_start_time = "{{ subtract_data_interval(dag, data_interval_start).isoformat() }}" +retool_end_time = "{{ subtract_data_interval(dag, data_interval_end).isoformat() }}" retool_export_task = stellar_etl_internal_task( dag, @@ -134,9 +136,9 @@ def stellar_etl_internal_task( "export-retool", cmd_args=[ "--start-time", - "2024-01-01T16:30:00+00:00", + retool_start_time, "--end-time", - "2025-01-01T16:30:00+00:00", + retool_end_time, "--cloud-storage-bucket", Variable.get("gcs_exported_data_bucket_name"), "--cloud-provider", @@ -144,7 +146,7 @@ def stellar_etl_internal_task( "--output", retool_filepath, "-u", - f"'batch_id={retool_batch_id},batch_run_date={retool_batch_date},batch_insert_ts={batch_insert_ts}'", + f"'batch_id={retool_batch_id},batch_run_date={retool_batch_date},batch_insert_ts={retool_batch_insert_ts}'", ], output_file=retool_filepath, ) From 52ca776fe4d0fffc139bea53350df15cc27651ec Mon Sep 17 00:00:00 2001 From: Amisha Singla Date: Wed, 11 Dec 2024 20:30:31 -0600 Subject: [PATCH 12/15] Add macro --- dags/external_data_dag.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dags/external_data_dag.py b/dags/external_data_dag.py index d663a103..9393788a 100644 --- a/dags/external_data_dag.py +++ b/dags/external_data_dag.py @@ -41,6 +41,7 @@ }, render_template_as_native_obj=True, user_defined_macros={ + "subtract_data_interval": macros.subtract_data_interval, "batch_run_date_as_datetime_string": macros.batch_run_date_as_datetime_string, }, user_defined_filters={ From 3d3fc6a514bc16a542b5589f9b3eeafd8196995b Mon Sep 17 00:00:00 2001 From: Amisha Singla Date: Fri, 13 Dec 2024 15:26:10 -0600 Subject: [PATCH 13/15] Refactor into methods --- dags/external_data_dag.py | 176 ++++++------------ .../build_internal_export_task.py | 107 +++++++++++ 2 files changed, 165 insertions(+), 118 deletions(-) create mode 100644 dags/stellar_etl_airflow/build_internal_export_task.py diff --git a/dags/external_data_dag.py b/dags/external_data_dag.py index 9393788a..f8e11b3d 100644 --- a/dags/external_data_dag.py +++ b/dags/external_data_dag.py @@ -3,9 +3,8 @@ It is scheduled to export information to BigQuery at regular intervals. """ -import os from ast import literal_eval -from datetime import datetime, timedelta +from datetime import datetime from json import loads from airflow import DAG @@ -17,15 +16,12 @@ from stellar_etl_airflow.build_del_ins_from_gcs_to_bq_task import ( build_del_ins_from_gcs_to_bq_task, ) -from stellar_etl_airflow.build_del_ins_operator import ( - create_del_ins_task, - initialize_task_vars, -) -from stellar_etl_airflow.default import ( - alert_after_max_retries, - get_default_dag_args, - init_sentry, +from stellar_etl_airflow.build_del_ins_operator import create_del_ins_task +from stellar_etl_airflow.build_internal_export_task import ( + build_export_task, + get_airflow_metadata, ) +from stellar_etl_airflow.default import get_default_dag_args, init_sentry init_sentry() @@ -52,123 +48,67 @@ ) -def stellar_etl_internal_task( - dag, task_name, command, cmd_args=[], resource_cfg="default", output_file="" -): - namespace = conf.get("kubernetes", "NAMESPACE") - - if namespace == "default": - config_file_location = Variable.get("kube_config_location") - in_cluster = False - else: - config_file_location = None - in_cluster = True - - requests = { - "cpu": f"{{{{ var.json.resources.{resource_cfg}.requests.cpu }}}}", - "memory": f"{{{{ var.json.resources.{resource_cfg}.requests.memory }}}}", - } - container_resources = k8s.V1ResourceRequirements(requests=requests) - - image = "{{ var.value.stellar_etl_internal_image_name }}" - - etl_cmd_string = " ".join(cmd_args) - arguments = f""" {command} {etl_cmd_string} 1>> stdout.out 2>> stderr.out && cat stdout.out && cat stderr.out && echo "{{\\"output\\": \\"{output_file}\\"}}" >> /airflow/xcom/return.json""" - - return KubernetesPodOperator( - task_id=task_name, - name=task_name, - namespace=Variable.get("k8s_namespace"), - service_account_name=Variable.get("k8s_service_account"), - env_vars={ - "EXECUTION_DATE": "{{ ds }}", - "AIRFLOW_START_TIMESTAMP": "{{ ti.start_date.strftime('%Y-%m-%dT%H:%M:%SZ') }}", - "RETOOL_API_KEY": "{{ var.value.retool_api_key }}", - }, - image=image, - cmds=["bash", "-c"], - arguments=[arguments], - do_xcom_push=True, - dag=dag, - is_delete_operator_pod=True, - startup_timeout_seconds=720, - in_cluster=in_cluster, - config_file=config_file_location, - container_resources=container_resources, - on_failure_callback=alert_after_max_retries, - image_pull_policy="IfNotPresent", - image_pull_secrets=[k8s.V1LocalObjectReference("private-docker-auth")], - sla=timedelta( - seconds=Variable.get("task_sla", deserialize_json=True)[task_name] - ), - trigger_rule="all_done", - ) - - -retool_run_id = "{{ run_id }}" -retool_filepath = os.path.join( - Variable.get("gcs_exported_object_prefix"), - retool_run_id, - "retool-exported-entity.txt", -) - -retool_table_name = "retool_entity_data" -retool_table_id = "test-hubble-319619.test_crypto_stellar_internal.retool_entity_data" -retool_public_project = "test-hubble-319619" -retool_public_dataset = "test_crypto_stellar_internal" -retool_batch_id = macros.get_batch_id() -retool_batch_date = "{{ batch_run_date_as_datetime_string(dag, data_interval_start) }}" -retool_export_task_id = "export_retool_data" -retool_source_object_suffix = "" -retool_source_objects = [ - "{{ task_instance.xcom_pull(task_ids='" - + retool_export_task_id - + '\')["output"] }}' - + retool_source_object_suffix -] -retool_batch_insert_ts = datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ") - -retool_start_time = "{{ subtract_data_interval(dag, data_interval_start).isoformat() }}" -retool_end_time = "{{ subtract_data_interval(dag, data_interval_end).isoformat() }}" - -retool_export_task = stellar_etl_internal_task( +retool_export_task = build_export_task( dag, "export_retool_data", - "export-retool", + command="export-retool", cmd_args=[ "--start-time", - retool_start_time, + "{{ subtract_data_interval(dag, data_interval_start).isoformat() }}", "--end-time", - retool_end_time, - "--cloud-storage-bucket", - Variable.get("gcs_exported_data_bucket_name"), - "--cloud-provider", - "gcp", - "--output", - retool_filepath, - "-u", - f"'batch_id={retool_batch_id},batch_run_date={retool_batch_date},batch_insert_ts={retool_batch_insert_ts}'", + "{{ subtract_data_interval(dag, data_interval_end).isoformat() }}", ], - output_file=retool_filepath, + use_gcs=True, + env_vars={"RETOOL_API_KEY": "{{ var.value.retool_api_key }}"}, ) -retool_task_vars = { - "task_id": f"del_ins_{retool_table_name}_task", - "project": retool_public_project, - "dataset": retool_public_dataset, - "table_name": retool_table_name, - "export_task_id": "export_retool_data", - "source_object_suffix": retool_source_object_suffix, - "partition": False, - "cluster": False, - "batch_id": retool_batch_id, - "batch_date": retool_batch_date, - "source_objects": retool_source_objects, - "table_id": retool_table_id, -} -retool_insert_to_bq_task = create_del_ins_task( - dag, retool_task_vars, build_del_ins_from_gcs_to_bq_task +def get_insert_to_bq_task( + table_name: str, + project: str, + dataset: str, + export_task_id: str, + source_object_suffix: str, + partition: bool, + cluster: bool, + table_id: str, +): + metadata = get_airflow_metadata() + source_objects = [ + "{{ task_instance.xcom_pull(task_ids='" + + export_task_id + + '\')["output"] }}' + + source_object_suffix + ] + task_vars = { + "task_id": f"del_ins_{table_name}_task", + "project": project, + "dataset": dataset, + "table_name": table_name, + "export_task_id": export_task_id, + "source_object_suffix": source_object_suffix, + "partition": partition, + "cluster": cluster, + "batch_id": metadata["batch_id"], + "batch_date": metadata["batch_date"], + "source_objects": source_objects, + "table_id": table_id, + } + insert_to_bq_task = create_del_ins_task( + dag, task_vars, build_del_ins_from_gcs_to_bq_task + ) + return insert_to_bq_task + + +retool_insert_to_bq_task = get_insert_to_bq_task( + table_name="retool_entity_data", + project="test-hubble-319619", + dataset="test_crypto_stellar_internal", + export_task_id="export_retool_data", + source_object_suffix="", + partition=False, + cluster=False, + table_id="test-hubble-319619.test_crypto_stellar_internal.retool_entity_data", ) retool_export_task >> retool_insert_to_bq_task diff --git a/dags/stellar_etl_airflow/build_internal_export_task.py b/dags/stellar_etl_airflow/build_internal_export_task.py new file mode 100644 index 00000000..f22c104e --- /dev/null +++ b/dags/stellar_etl_airflow/build_internal_export_task.py @@ -0,0 +1,107 @@ +""" +This file contains functions for creating Airflow tasks to run stellar-etl-internal export functions. +""" + +import os +from datetime import datetime, timedelta + +from airflow import DAG +from airflow.configuration import conf +from airflow.models.variable import Variable +from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator +from kubernetes.client import models as k8s +from stellar_etl_airflow import macros +from stellar_etl_airflow.default import alert_after_max_retries + + +def get_airflow_metadata(): + return { + "batch_insert_ts": datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ"), + "batch_date": "{{ batch_run_date_as_datetime_string(dag, data_interval_start) }}", + "batch_id": macros.get_batch_id(), + "run_id": "{{ run_id }}", + } + + +def build_export_task( + dag, + task_name, + command, + cmd_args=[], + env_vars={}, + use_gcs=False, + resource_cfg="default", +): + namespace = conf.get("kubernetes", "NAMESPACE") + + if namespace == "default": + config_file_location = Variable.get("kube_config_location") + in_cluster = False + else: + config_file_location = None + in_cluster = True + + requests = { + "cpu": f"{{{{ var.json.resources.{resource_cfg}.requests.cpu }}}}", + "memory": f"{{{{ var.json.resources.{resource_cfg}.requests.memory }}}}", + } + container_resources = k8s.V1ResourceRequirements(requests=requests) + + image = "{{ var.value.stellar_etl_internal_image_name }}" + + output_filepath = "" + if use_gcs: + metadata = get_airflow_metadata() + batch_insert_ts = metadata["batch_insert_ts"] + batch_date = metadata["batch_date"] + batch_id = metadata["batch_id"] + run_id = metadata["run_id"] + + output_filepath = os.path.join( + Variable.get("gcs_exported_object_prefix"), + run_id, + f"{task_name}-exported-entity.txt", + ) + + cmd_args = cmd_args + [ + "--cloud-storage-bucket", + Variable.get("gcs_exported_data_bucket_name"), + "--cloud-provider", + "gcp", + "--output", + output_filepath, + "-u", + f"'batch_id={batch_id},batch_run_date={batch_date},batch_insert_ts={batch_insert_ts}'", + ] + etl_cmd_string = " ".join(cmd_args) + arguments = f""" {command} {etl_cmd_string} 1>> stdout.out 2>> stderr.out && cat stdout.out && cat stderr.out && echo "{{\\"output\\": \\"{output_filepath}\\"}}" >> /airflow/xcom/return.json""" + + return KubernetesPodOperator( + task_id=task_name, + name=task_name, + namespace=Variable.get("k8s_namespace"), + service_account_name=Variable.get("k8s_service_account"), + env_vars=env_vars.update( + { + "EXECUTION_DATE": "{{ ds }}", + "AIRFLOW_START_TIMESTAMP": "{{ ti.start_date.strftime('%Y-%m-%dT%H:%M:%SZ') }}", + } + ), + image=image, + cmds=["bash", "-c"], + arguments=[arguments], + do_xcom_push=True, + dag=dag, + is_delete_operator_pod=True, + startup_timeout_seconds=720, + in_cluster=in_cluster, + config_file=config_file_location, + container_resources=container_resources, + on_failure_callback=alert_after_max_retries, + image_pull_policy="IfNotPresent", + image_pull_secrets=[k8s.V1LocalObjectReference("private-docker-auth")], + sla=timedelta( + seconds=Variable.get("task_sla", deserialize_json=True)[task_name] + ), + trigger_rule="all_done", + ) From 7ee9e0d09dfa4a13725fd709bc4fc11c8f4bb6dc Mon Sep 17 00:00:00 2001 From: Amisha Singla Date: Fri, 13 Dec 2024 15:41:10 -0600 Subject: [PATCH 14/15] moar refactor update image --- airflow_variables_dev.json | 3 ++- airflow_variables_prod.json | 5 +++++ dags/external_data_dag.py | 18 ++++++++++++------ 3 files changed, 19 insertions(+), 7 deletions(-) diff --git a/airflow_variables_dev.json b/airflow_variables_dev.json index 22f97fb1..714c8bff 100644 --- a/airflow_variables_dev.json +++ b/airflow_variables_dev.json @@ -316,7 +316,7 @@ "schema_filepath": "/home/airflow/gcs/dags/schemas/", "sentry_dsn": "https://9e0a056541c3445083329b072f2df690@o14203.ingest.us.sentry.io/6190849", "sentry_environment": "development", - "stellar_etl_internal_image_name": "amishastellar/stellar-etl-internal:e3b9a2ea7", + "stellar_etl_internal_image_name": "amishastellar/stellar-etl-internal:cd53bcf70", "table_ids": { "accounts": "accounts", "assets": "history_assets", @@ -331,6 +331,7 @@ "liquidity_pools": "liquidity_pools", "offers": "offers", "operations": "history_operations", + "retool_entity_data": "retool_entity_data", "signers": "account_signers", "trades": "history_trades", "transactions": "history_transactions", diff --git a/airflow_variables_prod.json b/airflow_variables_prod.json index fa03f153..46669b27 100644 --- a/airflow_variables_prod.json +++ b/airflow_variables_prod.json @@ -309,10 +309,12 @@ } } }, + "retool_api_key": "test-api-key", "sandbox_dataset": "crypto_stellar_internal_sandbox", "schema_filepath": "/home/airflow/gcs/dags/schemas/", "sentry_dsn": "https://94027cdcc4c9470f9dafa2c0b456c2c9@o14203.ingest.us.sentry.io/5806618", "sentry_environment": "production", + "stellar_etl_internal_image_name": "amishastellar/stellar-etl-internal:e3b9a2ea7", "table_ids": { "accounts": "accounts", "assets": "history_assets", @@ -327,6 +329,7 @@ "liquidity_pools": "liquidity_pools", "offers": "offers", "operations": "history_operations", + "retool_entity_data": "retool_entity_data", "signers": "account_signers", "trades": "history_trades", "transactions": "history_transactions", @@ -347,9 +350,11 @@ "create_sandbox": 1020, "current_state": 1200, "default": 60, + "del_ins_retool_entity_data_task": 720, "elementary_dbt_data_quality": 2100, "elementary_generate_report": 1200, "enriched_history_operations": 1800, + "export_retool_data": 720, "fee_stats": 360, "history_assets": 360, "liquidity_pool_trade_volume": 1200, diff --git a/dags/external_data_dag.py b/dags/external_data_dag.py index f8e11b3d..16720102 100644 --- a/dags/external_data_dag.py +++ b/dags/external_data_dag.py @@ -25,6 +25,12 @@ init_sentry() +EXTERNAL_DATA_TABLE_NAMES = Variable.get("table_ids", deserialize_json=True) +EXTERNAL_DATA_PROJECT_NAME = Variable.get("bq_project") +EXTERNAL_DATA_DATASET_NAME = Variable.get("bq_dataset") +RETOOL_TABLE_NAME = EXTERNAL_DATA_TABLE_NAMES["retool_entity_data"] +RETOOL_EXPORT_TASK_ID = "export_retool_data" + # Initialize the DAG dag = DAG( "external_data_dag", @@ -50,7 +56,7 @@ retool_export_task = build_export_task( dag, - "export_retool_data", + RETOOL_EXPORT_TASK_ID, command="export-retool", cmd_args=[ "--start-time", @@ -101,14 +107,14 @@ def get_insert_to_bq_task( retool_insert_to_bq_task = get_insert_to_bq_task( - table_name="retool_entity_data", - project="test-hubble-319619", - dataset="test_crypto_stellar_internal", - export_task_id="export_retool_data", + table_name=RETOOL_TABLE_NAME, + project=EXTERNAL_DATA_PROJECT_NAME, + dataset=EXTERNAL_DATA_DATASET_NAME, + export_task_id=RETOOL_EXPORT_TASK_ID, source_object_suffix="", partition=False, cluster=False, - table_id="test-hubble-319619.test_crypto_stellar_internal.retool_entity_data", + table_id=f"{EXTERNAL_DATA_PROJECT_NAME}.{EXTERNAL_DATA_DATASET_NAME}.{RETOOL_TABLE_NAME}", ) retool_export_task >> retool_insert_to_bq_task From 06884eda32af760a7f6d5bf58ce1d19a7cc77b22 Mon Sep 17 00:00:00 2001 From: Amisha Singla Date: Fri, 13 Dec 2024 16:45:52 -0600 Subject: [PATCH 15/15] update env_vars early on --- .../build_internal_export_task.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/dags/stellar_etl_airflow/build_internal_export_task.py b/dags/stellar_etl_airflow/build_internal_export_task.py index f22c104e..3f318d6e 100644 --- a/dags/stellar_etl_airflow/build_internal_export_task.py +++ b/dags/stellar_etl_airflow/build_internal_export_task.py @@ -75,18 +75,19 @@ def build_export_task( ] etl_cmd_string = " ".join(cmd_args) arguments = f""" {command} {etl_cmd_string} 1>> stdout.out 2>> stderr.out && cat stdout.out && cat stderr.out && echo "{{\\"output\\": \\"{output_filepath}\\"}}" >> /airflow/xcom/return.json""" + env_vars.update( + { + "EXECUTION_DATE": "{{ ds }}", + "AIRFLOW_START_TIMESTAMP": "{{ ti.start_date.strftime('%Y-%m-%dT%H:%M:%SZ') }}", + } + ) return KubernetesPodOperator( task_id=task_name, name=task_name, namespace=Variable.get("k8s_namespace"), service_account_name=Variable.get("k8s_service_account"), - env_vars=env_vars.update( - { - "EXECUTION_DATE": "{{ ds }}", - "AIRFLOW_START_TIMESTAMP": "{{ ti.start_date.strftime('%Y-%m-%dT%H:%M:%SZ') }}", - } - ), + env_vars=env_vars, image=image, cmds=["bash", "-c"], arguments=[arguments],