Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DAG for exporting retool data #550

Draft
wants to merge 16 commits into
base: master
Choose a base branch
from
5 changes: 5 additions & 0 deletions airflow_variables_dev.json
Original file line number Diff line number Diff line change
Expand Up @@ -311,10 +311,12 @@
}
}
},
"retool_api_key": "test-api-key",
"sandbox_dataset": "crypto_stellar_internal_sandbox",
"schema_filepath": "/home/airflow/gcs/dags/schemas/",
"sentry_dsn": "https://[email protected]/6190849",
"sentry_environment": "development",
"stellar_etl_internal_image_name": "amishastellar/stellar-etl-internal:cd53bcf70",
"table_ids": {
"accounts": "accounts",
"assets": "history_assets",
Expand All @@ -329,6 +331,7 @@
"liquidity_pools": "liquidity_pools",
"offers": "offers",
"operations": "history_operations",
"retool_entity_data": "retool_entity_data",
"signers": "account_signers",
"trades": "history_trades",
"transactions": "history_transactions",
Expand All @@ -349,9 +352,11 @@
"create_sandbox": 2400,
"current_state": 720,
"default": 60,
"del_ins_retool_entity_data_task": 720,
"elementary_dbt_data_quality": 1620,
"elementary_generate_report": 1200,
"enriched_history_operations": 780,
"export_retool_data": 720,
"fee_stats": 840,
"history_assets": 720,
"liquidity_pool_trade_volume": 1140,
Expand Down
5 changes: 5 additions & 0 deletions airflow_variables_prod.json
Original file line number Diff line number Diff line change
Expand Up @@ -309,10 +309,12 @@
}
}
},
"retool_api_key": "test-api-key",
"sandbox_dataset": "crypto_stellar_internal_sandbox",
"schema_filepath": "/home/airflow/gcs/dags/schemas/",
"sentry_dsn": "https://[email protected]/5806618",
"sentry_environment": "production",
"stellar_etl_internal_image_name": "amishastellar/stellar-etl-internal:e3b9a2ea7",
"table_ids": {
"accounts": "accounts",
"assets": "history_assets",
Expand All @@ -327,6 +329,7 @@
"liquidity_pools": "liquidity_pools",
"offers": "offers",
"operations": "history_operations",
"retool_entity_data": "retool_entity_data",
"signers": "account_signers",
"trades": "history_trades",
"transactions": "history_transactions",
Expand All @@ -347,9 +350,11 @@
"create_sandbox": 1020,
"current_state": 1200,
"default": 60,
"del_ins_retool_entity_data_task": 720,
"elementary_dbt_data_quality": 2100,
"elementary_generate_report": 1200,
"enriched_history_operations": 1800,
"export_retool_data": 720,
"fee_stats": 360,
"history_assets": 360,
"liquidity_pool_trade_volume": 1200,
Expand Down
120 changes: 120 additions & 0 deletions dags/external_data_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
"""
The external_data_dag DAG exports data from external sources.
It is scheduled to export information to BigQuery at regular intervals.
"""

from ast import literal_eval
from datetime import datetime
from json import loads

from airflow import DAG
from airflow.configuration import conf
from airflow.models.variable import Variable
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes.client import models as k8s
from stellar_etl_airflow import macros
from stellar_etl_airflow.build_del_ins_from_gcs_to_bq_task import (
build_del_ins_from_gcs_to_bq_task,
)
from stellar_etl_airflow.build_del_ins_operator import create_del_ins_task
from stellar_etl_airflow.build_internal_export_task import (
build_export_task,
get_airflow_metadata,
)
from stellar_etl_airflow.default import get_default_dag_args, init_sentry

init_sentry()

EXTERNAL_DATA_TABLE_NAMES = Variable.get("table_ids", deserialize_json=True)
EXTERNAL_DATA_PROJECT_NAME = Variable.get("bq_project")
EXTERNAL_DATA_DATASET_NAME = Variable.get("bq_dataset")
RETOOL_TABLE_NAME = EXTERNAL_DATA_TABLE_NAMES["retool_entity_data"]
RETOOL_EXPORT_TASK_ID = "export_retool_data"

# Initialize the DAG
dag = DAG(
"external_data_dag",
default_args=get_default_dag_args(),
start_date=datetime(2024, 12, 5, 14, 30),
description="This DAG exports data from external sources such as retool.",
schedule_interval="0 22 * * *",
params={
"alias": "external",
},
render_template_as_native_obj=True,
user_defined_macros={
"subtract_data_interval": macros.subtract_data_interval,
"batch_run_date_as_datetime_string": macros.batch_run_date_as_datetime_string,
},
user_defined_filters={
"fromjson": lambda s: loads(s),
"container_resources": lambda s: k8s.V1ResourceRequirements(requests=s),
"literal_eval": lambda e: literal_eval(e),
},
)


retool_export_task = build_export_task(
dag,
RETOOL_EXPORT_TASK_ID,
command="export-retool",
cmd_args=[
"--start-time",
"{{ subtract_data_interval(dag, data_interval_start).isoformat() }}",
"--end-time",
"{{ subtract_data_interval(dag, data_interval_end).isoformat() }}",
],
use_gcs=True,
env_vars={"RETOOL_API_KEY": "{{ var.value.retool_api_key }}"},
)


def get_insert_to_bq_task(
table_name: str,
project: str,
dataset: str,
export_task_id: str,
source_object_suffix: str,
partition: bool,
cluster: bool,
table_id: str,
):
metadata = get_airflow_metadata()
source_objects = [
"{{ task_instance.xcom_pull(task_ids='"
+ export_task_id
+ '\')["output"] }}'
+ source_object_suffix
]
task_vars = {
"task_id": f"del_ins_{table_name}_task",
"project": project,
"dataset": dataset,
"table_name": table_name,
"export_task_id": export_task_id,
"source_object_suffix": source_object_suffix,
"partition": partition,
"cluster": cluster,
"batch_id": metadata["batch_id"],
"batch_date": metadata["batch_date"],
"source_objects": source_objects,
"table_id": table_id,
}
insert_to_bq_task = create_del_ins_task(
dag, task_vars, build_del_ins_from_gcs_to_bq_task
)
return insert_to_bq_task


retool_insert_to_bq_task = get_insert_to_bq_task(
table_name=RETOOL_TABLE_NAME,
project=EXTERNAL_DATA_PROJECT_NAME,
dataset=EXTERNAL_DATA_DATASET_NAME,
export_task_id=RETOOL_EXPORT_TASK_ID,
source_object_suffix="",
partition=False,
cluster=False,
table_id=f"{EXTERNAL_DATA_PROJECT_NAME}.{EXTERNAL_DATA_DATASET_NAME}.{RETOOL_TABLE_NAME}",
)

retool_export_task >> retool_insert_to_bq_task
108 changes: 108 additions & 0 deletions dags/stellar_etl_airflow/build_internal_export_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
"""
This file contains functions for creating Airflow tasks to run stellar-etl-internal export functions.
"""

import os
from datetime import datetime, timedelta

from airflow import DAG
from airflow.configuration import conf
from airflow.models.variable import Variable
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes.client import models as k8s
from stellar_etl_airflow import macros
from stellar_etl_airflow.default import alert_after_max_retries


def get_airflow_metadata():
return {
"batch_insert_ts": datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ"),
"batch_date": "{{ batch_run_date_as_datetime_string(dag, data_interval_start) }}",
"batch_id": macros.get_batch_id(),
"run_id": "{{ run_id }}",
}


def build_export_task(
dag,
task_name,
command,
cmd_args=[],
env_vars={},
use_gcs=False,
resource_cfg="default",
):
namespace = conf.get("kubernetes", "NAMESPACE")

if namespace == "default":
config_file_location = Variable.get("kube_config_location")
in_cluster = False
else:
config_file_location = None
in_cluster = True

requests = {
"cpu": f"{{{{ var.json.resources.{resource_cfg}.requests.cpu }}}}",
"memory": f"{{{{ var.json.resources.{resource_cfg}.requests.memory }}}}",
}
container_resources = k8s.V1ResourceRequirements(requests=requests)

image = "{{ var.value.stellar_etl_internal_image_name }}"

output_filepath = ""
if use_gcs:
metadata = get_airflow_metadata()
batch_insert_ts = metadata["batch_insert_ts"]
batch_date = metadata["batch_date"]
batch_id = metadata["batch_id"]
run_id = metadata["run_id"]

output_filepath = os.path.join(
Variable.get("gcs_exported_object_prefix"),
run_id,
f"{task_name}-exported-entity.txt",
)

cmd_args = cmd_args + [
"--cloud-storage-bucket",
Variable.get("gcs_exported_data_bucket_name"),
"--cloud-provider",
"gcp",
"--output",
output_filepath,
"-u",
f"'batch_id={batch_id},batch_run_date={batch_date},batch_insert_ts={batch_insert_ts}'",
]
etl_cmd_string = " ".join(cmd_args)
arguments = f""" {command} {etl_cmd_string} 1>> stdout.out 2>> stderr.out && cat stdout.out && cat stderr.out && echo "{{\\"output\\": \\"{output_filepath}\\"}}" >> /airflow/xcom/return.json"""
env_vars.update(
{
"EXECUTION_DATE": "{{ ds }}",
"AIRFLOW_START_TIMESTAMP": "{{ ti.start_date.strftime('%Y-%m-%dT%H:%M:%SZ') }}",
}
)

return KubernetesPodOperator(
task_id=task_name,
name=task_name,
namespace=Variable.get("k8s_namespace"),
service_account_name=Variable.get("k8s_service_account"),
env_vars=env_vars,
image=image,
cmds=["bash", "-c"],
arguments=[arguments],
do_xcom_push=True,
dag=dag,
is_delete_operator_pod=True,
startup_timeout_seconds=720,
in_cluster=in_cluster,
config_file=config_file_location,
container_resources=container_resources,
on_failure_callback=alert_after_max_retries,
image_pull_policy="IfNotPresent",
image_pull_secrets=[k8s.V1LocalObjectReference("private-docker-auth")],
sla=timedelta(
seconds=Variable.get("task_sla", deserialize_json=True)[task_name]
),
trigger_rule="all_done",
)
Loading
Loading