diff --git a/airflow_variables_dev.json b/airflow_variables_dev.json index 83e50b63..11d8772e 100644 --- a/airflow_variables_dev.json +++ b/airflow_variables_dev.json @@ -315,6 +315,7 @@ "schema_filepath": "/home/airflow/gcs/dags/schemas/", "sentry_dsn": "https://9e0a056541c3445083329b072f2df690@o14203.ingest.us.sentry.io/6190849", "sentry_environment": "development", + "stellar_etl_internal_image_name": "amishastellar/stellar-etl-internal:cd53bcf70", "table_ids": { "accounts": "accounts", "assets": "history_assets", @@ -329,6 +330,7 @@ "liquidity_pools": "liquidity_pools", "offers": "offers", "operations": "history_operations", + "retool_entity_data": "retool_entity_data", "signers": "account_signers", "trades": "history_trades", "transactions": "history_transactions", @@ -349,9 +351,11 @@ "create_sandbox": 2400, "current_state": 720, "default": 60, + "del_ins_retool_entity_data_task": 720, "elementary_dbt_data_quality": 1620, "elementary_generate_report": 1200, "enriched_history_operations": 780, + "export_retool_data": 720, "fee_stats": 840, "history_assets": 720, "liquidity_pool_trade_volume": 1140, diff --git a/airflow_variables_prod.json b/airflow_variables_prod.json index fa03f153..d9c3bef6 100644 --- a/airflow_variables_prod.json +++ b/airflow_variables_prod.json @@ -313,6 +313,7 @@ "schema_filepath": "/home/airflow/gcs/dags/schemas/", "sentry_dsn": "https://94027cdcc4c9470f9dafa2c0b456c2c9@o14203.ingest.us.sentry.io/5806618", "sentry_environment": "production", + "stellar_etl_internal_image_name": "amishastellar/stellar-etl-internal:e3b9a2ea7", "table_ids": { "accounts": "accounts", "assets": "history_assets", @@ -327,6 +328,7 @@ "liquidity_pools": "liquidity_pools", "offers": "offers", "operations": "history_operations", + "retool_entity_data": "retool_entity_data", "signers": "account_signers", "trades": "history_trades", "transactions": "history_transactions", @@ -347,9 +349,11 @@ "create_sandbox": 1020, "current_state": 1200, "default": 60, + "del_ins_retool_entity_data_task": 720, "elementary_dbt_data_quality": 2100, "elementary_generate_report": 1200, "enriched_history_operations": 1800, + "export_retool_data": 720, "fee_stats": 360, "history_assets": 360, "liquidity_pool_trade_volume": 1200, diff --git a/dags/external_data_dag.py b/dags/external_data_dag.py new file mode 100644 index 00000000..f977be93 --- /dev/null +++ b/dags/external_data_dag.py @@ -0,0 +1,81 @@ +""" +The external_data_dag DAG exports data from external sources. +It is scheduled to export information to BigQuery at regular intervals. +""" + +from ast import literal_eval +from datetime import datetime +from json import loads + +from airflow import DAG +from airflow.configuration import conf +from airflow.models.variable import Variable +from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator +from kubernetes.client import models as k8s +from stellar_etl_airflow import macros +from stellar_etl_airflow.build_del_ins_operator import create_export_del_insert_operator +from stellar_etl_airflow.build_internal_export_task import build_export_task +from stellar_etl_airflow.default import get_default_dag_args, init_sentry +from stellar_etl_airflow.utils import access_secret + +init_sentry() + +EXTERNAL_DATA_TABLE_NAMES = Variable.get("table_ids", deserialize_json=True) +EXTERNAL_DATA_PROJECT_NAME = Variable.get("bq_project") +EXTERNAL_DATA_DATASET_NAME = Variable.get("bq_dataset") +RETOOL_TABLE_NAME = EXTERNAL_DATA_TABLE_NAMES["retool_entity_data"] +RETOOL_EXPORT_TASK_ID = "export_retool_data" + +# Initialize the DAG +dag = DAG( + "external_data_dag", + default_args=get_default_dag_args(), + start_date=datetime(2024, 12, 16, 0, 0), + description="This DAG exports data from external sources such as retool.", + schedule_interval="0 22 * * *", + params={ + "alias": "external", + }, + render_template_as_native_obj=True, + user_defined_macros={ + "subtract_data_interval": macros.subtract_data_interval, + "batch_run_date_as_datetime_string": macros.batch_run_date_as_datetime_string, + }, + user_defined_filters={ + "fromjson": lambda s: loads(s), + "container_resources": lambda s: k8s.V1ResourceRequirements(requests=s), + "literal_eval": lambda e: literal_eval(e), + }, +) + + +retool_export_task = build_export_task( + dag, + RETOOL_EXPORT_TASK_ID, + command="export-retool", + cmd_args=[ + "--start-time", + "{{ subtract_data_interval(dag, data_interval_start).isoformat() }}", + "--end-time", + "{{ subtract_data_interval(dag, data_interval_end).isoformat() }}", + ], + use_gcs=True, + env_vars={ + "RETOOL_API_KEY": access_secret("retool-api-key", "default"), + }, +) + + +retool_insert_to_bq_task = create_export_del_insert_operator( + dag, + table_name=RETOOL_TABLE_NAME, + project=EXTERNAL_DATA_PROJECT_NAME, + dataset=EXTERNAL_DATA_DATASET_NAME, + export_task_id=RETOOL_EXPORT_TASK_ID, + source_object_suffix="", + partition=False, + cluster=False, + table_id=f"{EXTERNAL_DATA_PROJECT_NAME}.{EXTERNAL_DATA_DATASET_NAME}.{RETOOL_TABLE_NAME}", +) + +retool_export_task >> retool_insert_to_bq_task diff --git a/dags/stellar_etl_airflow/build_del_ins_operator.py b/dags/stellar_etl_airflow/build_del_ins_operator.py index 0322f04b..efe347da 100644 --- a/dags/stellar_etl_airflow/build_del_ins_operator.py +++ b/dags/stellar_etl_airflow/build_del_ins_operator.py @@ -1,4 +1,8 @@ from airflow.operators.python import PythonOperator +from stellar_etl_airflow.build_del_ins_from_gcs_to_bq_task import ( + build_del_ins_from_gcs_to_bq_task, +) +from stellar_etl_airflow.build_internal_export_task import get_airflow_metadata from stellar_etl_airflow.default import alert_after_max_retries @@ -74,3 +78,41 @@ def create_del_ins_task(dag, task_vars, del_ins_callable): on_failure_callback=alert_after_max_retries, dag=dag, ) + + +def create_export_del_insert_operator( + dag, + table_name: str, + project: str, + dataset: str, + export_task_id: str, + source_object_suffix: str, + partition: bool, + cluster: bool, + table_id: str, +): + metadata = get_airflow_metadata() + source_objects = [ + "{{ task_instance.xcom_pull(task_ids='" + + export_task_id + + '\')["output"] }}' + + source_object_suffix + ] + task_vars = { + "task_id": f"del_ins_{table_name}_task", + "project": project, + "dataset": dataset, + "table_name": table_name, + "export_task_id": export_task_id, + "source_object_suffix": source_object_suffix, + "partition": partition, + "cluster": cluster, + "batch_id": metadata["batch_id"], + "batch_date": metadata["batch_date"], + "source_objects": source_objects, + "table_id": table_id, + } + insert_to_bq_task = create_del_ins_task( + dag, task_vars, build_del_ins_from_gcs_to_bq_task + ) + return insert_to_bq_task diff --git a/dags/stellar_etl_airflow/build_elementary_slack_alert_task.py b/dags/stellar_etl_airflow/build_elementary_slack_alert_task.py index 6bc76e26..94b662f6 100644 --- a/dags/stellar_etl_airflow/build_elementary_slack_alert_task.py +++ b/dags/stellar_etl_airflow/build_elementary_slack_alert_task.py @@ -1,22 +1,12 @@ -import base64 import logging from datetime import timedelta from airflow.configuration import conf from airflow.models import Variable from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator -from kubernetes import client, config from kubernetes.client import models as k8s from stellar_etl_airflow.default import alert_after_max_retries - - -def access_secret(secret_name, namespace): - config.load_kube_config() - v1 = client.CoreV1Api() - secret_data = v1.read_namespaced_secret(secret_name, namespace) - secret = secret_data.data - secret = base64.b64decode(secret["token"]).decode("utf-8") - return secret +from stellar_etl_airflow.utils import access_secret def elementary_task(dag, task_name, command, cmd_args=[], resource_cfg="default"): diff --git a/dags/stellar_etl_airflow/build_internal_export_task.py b/dags/stellar_etl_airflow/build_internal_export_task.py new file mode 100644 index 00000000..3f318d6e --- /dev/null +++ b/dags/stellar_etl_airflow/build_internal_export_task.py @@ -0,0 +1,108 @@ +""" +This file contains functions for creating Airflow tasks to run stellar-etl-internal export functions. +""" + +import os +from datetime import datetime, timedelta + +from airflow import DAG +from airflow.configuration import conf +from airflow.models.variable import Variable +from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator +from kubernetes.client import models as k8s +from stellar_etl_airflow import macros +from stellar_etl_airflow.default import alert_after_max_retries + + +def get_airflow_metadata(): + return { + "batch_insert_ts": datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ"), + "batch_date": "{{ batch_run_date_as_datetime_string(dag, data_interval_start) }}", + "batch_id": macros.get_batch_id(), + "run_id": "{{ run_id }}", + } + + +def build_export_task( + dag, + task_name, + command, + cmd_args=[], + env_vars={}, + use_gcs=False, + resource_cfg="default", +): + namespace = conf.get("kubernetes", "NAMESPACE") + + if namespace == "default": + config_file_location = Variable.get("kube_config_location") + in_cluster = False + else: + config_file_location = None + in_cluster = True + + requests = { + "cpu": f"{{{{ var.json.resources.{resource_cfg}.requests.cpu }}}}", + "memory": f"{{{{ var.json.resources.{resource_cfg}.requests.memory }}}}", + } + container_resources = k8s.V1ResourceRequirements(requests=requests) + + image = "{{ var.value.stellar_etl_internal_image_name }}" + + output_filepath = "" + if use_gcs: + metadata = get_airflow_metadata() + batch_insert_ts = metadata["batch_insert_ts"] + batch_date = metadata["batch_date"] + batch_id = metadata["batch_id"] + run_id = metadata["run_id"] + + output_filepath = os.path.join( + Variable.get("gcs_exported_object_prefix"), + run_id, + f"{task_name}-exported-entity.txt", + ) + + cmd_args = cmd_args + [ + "--cloud-storage-bucket", + Variable.get("gcs_exported_data_bucket_name"), + "--cloud-provider", + "gcp", + "--output", + output_filepath, + "-u", + f"'batch_id={batch_id},batch_run_date={batch_date},batch_insert_ts={batch_insert_ts}'", + ] + etl_cmd_string = " ".join(cmd_args) + arguments = f""" {command} {etl_cmd_string} 1>> stdout.out 2>> stderr.out && cat stdout.out && cat stderr.out && echo "{{\\"output\\": \\"{output_filepath}\\"}}" >> /airflow/xcom/return.json""" + env_vars.update( + { + "EXECUTION_DATE": "{{ ds }}", + "AIRFLOW_START_TIMESTAMP": "{{ ti.start_date.strftime('%Y-%m-%dT%H:%M:%SZ') }}", + } + ) + + return KubernetesPodOperator( + task_id=task_name, + name=task_name, + namespace=Variable.get("k8s_namespace"), + service_account_name=Variable.get("k8s_service_account"), + env_vars=env_vars, + image=image, + cmds=["bash", "-c"], + arguments=[arguments], + do_xcom_push=True, + dag=dag, + is_delete_operator_pod=True, + startup_timeout_seconds=720, + in_cluster=in_cluster, + config_file=config_file_location, + container_resources=container_resources, + on_failure_callback=alert_after_max_retries, + image_pull_policy="IfNotPresent", + image_pull_secrets=[k8s.V1LocalObjectReference("private-docker-auth")], + sla=timedelta( + seconds=Variable.get("task_sla", deserialize_json=True)[task_name] + ), + trigger_rule="all_done", + ) diff --git a/dags/stellar_etl_airflow/utils.py b/dags/stellar_etl_airflow/utils.py index a260fd9b..134bbd7c 100644 --- a/dags/stellar_etl_airflow/utils.py +++ b/dags/stellar_etl_airflow/utils.py @@ -1,3 +1,4 @@ +import base64 import logging import re import time @@ -5,6 +6,7 @@ from airflow.configuration import conf from airflow.models import Variable from airflow.utils.state import TaskInstanceState +from kubernetes import client, config base_log_folder = conf.get("logging", "base_log_folder") @@ -100,3 +102,12 @@ def skip_retry_dbt_errors(context) -> None: return else: return + + +def access_secret(secret_name, namespace): + config.load_kube_config() + v1 = client.CoreV1Api() + secret_data = v1.read_namespaced_secret(secret_name, namespace) + secret = secret_data.data + secret = base64.b64decode(secret["token"]).decode("utf-8") + return secret diff --git a/schemas/retool_entity_data_schema.json b/schemas/retool_entity_data_schema.json new file mode 100644 index 00000000..2be79a99 --- /dev/null +++ b/schemas/retool_entity_data_schema.json @@ -0,0 +1,163 @@ +[ + { + "description": "", + "fields": [], + "mode": "", + "name": "batch_id", + "type": "STRING" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "batch_run_date", + "type": "DATETIME" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "batch_insert_ts", + "type": "TIMESTAMP" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "created_at", + "type": "TIMESTAMP" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "updated_at", + "type": "TIMESTAMP" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "id", + "type": "INTEGER" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "account_sponsor", + "type": "BOOLEAN" + }, + { + "description": "", + "fields": [], + "mode": "REPEATED", + "name": "app_geographies", + "type": "STRING" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "custodial", + "type": "BOOLEAN" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "description", + "type": "STRING" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "fee_sponsor", + "type": "BOOLEAN" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "home_domain", + "type": "STRING" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "home_domains_id", + "type": "INTEGER" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "live", + "type": "BOOLEAN" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "name", + "type": "STRING" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "non_custodial", + "type": "BOOLEAN" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "notes", + "type": "STRING" + }, + { + "description": "", + "fields": [], + "mode": "REPEATED", + "name": "ramps", + "type": "STRING" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "sdp_enabled", + "type": "BOOLEAN" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "soroban_enabled", + "type": "BOOLEAN" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "status", + "type": "STRING" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "verified", + "type": "BOOLEAN" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "website_url", + "type": "STRING" + } +]