From 540ef5711f9075e343924bf5081e24a5db096765 Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Mon, 7 Oct 2024 12:09:20 -0400 Subject: [PATCH 01/13] Generate avro files from bq tables --- dags/generate_avro_files_dag.py | 63 ++++++++++++++++++ dags/queries/generate_avro/accounts.sql | 17 +++++ dags/queries/generate_avro/contract_data.sql | 17 +++++ .../generate_avro/history_contract_events.sql | 17 +++++ .../queries/generate_avro/history_effects.sql | 19 ++++++ .../queries/generate_avro/history_ledgers.sql | 17 +++++ .../generate_avro/history_operations.sql | 20 ++++++ dags/queries/generate_avro/history_trades.sql | 18 ++++++ .../generate_avro/history_transactions.sql | 17 +++++ .../queries/generate_avro/liquidity_pools.sql | 17 +++++ dags/queries/generate_avro/offers.sql | 17 +++++ dags/queries/generate_avro/trust_lines.sql | 17 +++++ dags/queries/generate_avro/ttl.sql | 17 +++++ .../build_bq_generate_avro_job_task.py | 64 +++++++++++++++++++ 14 files changed, 337 insertions(+) create mode 100644 dags/generate_avro_files_dag.py create mode 100644 dags/queries/generate_avro/accounts.sql create mode 100644 dags/queries/generate_avro/contract_data.sql create mode 100644 dags/queries/generate_avro/history_contract_events.sql create mode 100644 dags/queries/generate_avro/history_effects.sql create mode 100644 dags/queries/generate_avro/history_ledgers.sql create mode 100644 dags/queries/generate_avro/history_operations.sql create mode 100644 dags/queries/generate_avro/history_trades.sql create mode 100644 dags/queries/generate_avro/history_transactions.sql create mode 100644 dags/queries/generate_avro/liquidity_pools.sql create mode 100644 dags/queries/generate_avro/offers.sql create mode 100644 dags/queries/generate_avro/trust_lines.sql create mode 100644 dags/queries/generate_avro/ttl.sql create mode 100644 dags/stellar_etl_airflow/build_bq_generate_avro_job_task.py diff --git a/dags/generate_avro_files_dag.py b/dags/generate_avro_files_dag.py new file mode 100644 index 00000000..5bf83e2b --- /dev/null +++ b/dags/generate_avro_files_dag.py @@ -0,0 +1,63 @@ +from datetime import datetime + +from airflow import DAG +from kubernetes.client import models as k8s +from stellar_etl_airflow.build_cross_dependency_task import build_cross_deps +from stellar_etl_airflow.build_bq_generate_avro_job_task import build_bq_generate_avro_job +from stellar_etl_airflow.default import ( + alert_sla_miss, + get_default_dag_args, + init_sentry, +) + +init_sentry() + +dag = DAG( + "generate_avro_files", + default_args=get_default_dag_args(), + start_date=datetime(2024, 10, 1, 0, 0), + description="This DAG generates AVRO files from BQ tables", + schedule_interval="0 * * * *", # Runs every hour + user_defined_filters={ + "container_resources": lambda s: k8s.V1ResourceRequirements(requests=s), + }, + max_active_runs=5, + catchup=True, + tags=["dbt-enriched-base-tables"], + sla_miss_callback=alert_sla_miss, +) + +# Wait on ingestion DAGs +wait_on_history_table = build_cross_deps( + dag, "wait_on_ledgers_txs", "history_table_export" +) +wait_on_state_table = build_cross_deps(dag, "wait_on_state_table", "state_table_export") + +# Generate AVRO files +avro_tables = [ + "accounts", + "contract_data", + "history_contract_events", + "history_ledgers", + "history_trades", + "history_transactions", + "liquidity_pools", + "offers", + "trust_lines", + "ttl", + #"history_effects", + #"history_operations", +] + +for table in avro_tables: + task = build_bq_generate_avro_job( + dag=dag, + project=public_project, + dataset=public_dataset, + table=table, + gcs_bucket=gcs_bucket, + ) + + wait_on_history_table >> task + wait_on_state_table >> task + diff --git a/dags/queries/generate_avro/accounts.sql b/dags/queries/generate_avro/accounts.sql new file mode 100644 index 00000000..ee25193c --- /dev/null +++ b/dags/queries/generate_avro/accounts.sql @@ -0,0 +1,17 @@ +export data + options ( + uri = '{uri}', + format = 'avro', + overwrite=true + ) +as ( + select + * + except(sequence_ledger, batch_id, batch_insert_ts, batch_run_date), + sequence_ledger as account_sequence_last_modified_ledger + from {project_id}.{dataset_id}.accounts + where true + and closed_at >= '{batch_run_date}' + and closed_at < '{next_batch_run_date}' + order by closed_at asc +) diff --git a/dags/queries/generate_avro/contract_data.sql b/dags/queries/generate_avro/contract_data.sql new file mode 100644 index 00000000..4a1eb01e --- /dev/null +++ b/dags/queries/generate_avro/contract_data.sql @@ -0,0 +1,17 @@ +export data + options ( + uri = '{uri}', + format = 'avro', + overwrite=true + ) +as ( + select + * + except(batch_id, batch_insert_ts, batch_run_date) + from {project_id}.{dataset_id}.contract_data + where true + and closed_at >= '{batch_run_date}' + and closed_at < '{next_batch_run_date}' + order by closed_at asc +) + diff --git a/dags/queries/generate_avro/history_contract_events.sql b/dags/queries/generate_avro/history_contract_events.sql new file mode 100644 index 00000000..68e9b3ba --- /dev/null +++ b/dags/queries/generate_avro/history_contract_events.sql @@ -0,0 +1,17 @@ +export data + options ( + uri = '{uri}', + format = 'avro', + overwrite=true + ) +as ( + select + * + except(batch_id, batch_insert_ts, batch_run_date) + from {project_id}.{dataset_id}.history_contract_events + where true + and closed_at >= '{batch_run_date}' + and closed_at < '{next_batch_run_date}' + order by closed_at asc +) + diff --git a/dags/queries/generate_avro/history_effects.sql b/dags/queries/generate_avro/history_effects.sql new file mode 100644 index 00000000..4d80ab45 --- /dev/null +++ b/dags/queries/generate_avro/history_effects.sql @@ -0,0 +1,19 @@ +export data + options ( + uri = '{uri}', + format = 'avro', + overwrite=true + ) +as ( + select + * + except(details, batch_id, batch_insert_ts, batch_run_date), + details.* + except(predicate) + from {project_id}.{dataset_id}.history_effects + where true + and closed_at >= '{batch_run_date}' + and closed_at < '{next_batch_run_date}' + order by closed_at asc +) + diff --git a/dags/queries/generate_avro/history_ledgers.sql b/dags/queries/generate_avro/history_ledgers.sql new file mode 100644 index 00000000..926e188a --- /dev/null +++ b/dags/queries/generate_avro/history_ledgers.sql @@ -0,0 +1,17 @@ +export data + options ( + uri = '{uri}', + format = 'avro', + overwrite=true + ) +as ( + select + * + except(batch_id, batch_insert_ts, batch_run_date) + from {project_id}.{dataset_id}.history_ledgers + where true + and closed_at >= '{batch_run_date}' + and closed_at < '{next_batch_run_date}' + order by closed_at asc +) + diff --git a/dags/queries/generate_avro/history_operations.sql b/dags/queries/generate_avro/history_operations.sql new file mode 100644 index 00000000..908c710c --- /dev/null +++ b/dags/queries/generate_avro/history_operations.sql @@ -0,0 +1,20 @@ +export data + options ( + uri = '{uri}', + format = 'avro', + overwrite=true + ) +as ( + select + * + except(details, details_json, batch_id, batch_insert_ts, batch_run_date), + details.* + except(claimants, type), + details.type as details_type + from {project_id}.{dataset_id}.history_operations + where true + and closed_at >= '{batch_run_date}' + and closed_at < '{next_batch_run_date}' + order by closed_at asc +) + diff --git a/dags/queries/generate_avro/history_trades.sql b/dags/queries/generate_avro/history_trades.sql new file mode 100644 index 00000000..52b84cc3 --- /dev/null +++ b/dags/queries/generate_avro/history_trades.sql @@ -0,0 +1,18 @@ +export data + options ( + uri = '{uri}', + format = 'avro', + overwrite=true + ) +as ( + select + * + except(ledger_closed_at, batch_id, batch_insert_ts, batch_run_date), + ledger_closed_at as closed_at + from {project_id}.{dataset_id}.history_trades + where true + and closed_at >= '{batch_run_date}' + and closed_at < '{next_batch_run_date}' + order by closed_at asc +) + diff --git a/dags/queries/generate_avro/history_transactions.sql b/dags/queries/generate_avro/history_transactions.sql new file mode 100644 index 00000000..9fe1be63 --- /dev/null +++ b/dags/queries/generate_avro/history_transactions.sql @@ -0,0 +1,17 @@ +export data + options ( + uri = '{uri}', + format = 'avro', + overwrite=true + ) +as ( + select + * + except(batch_id, batch_insert_ts, batch_run_date) + from {project_id}.{dataset_id}.history_transactions + where true + and closed_at >= '{batch_run_date}' + and closed_at < '{next_batch_run_date}' + order by closed_at asc +) + diff --git a/dags/queries/generate_avro/liquidity_pools.sql b/dags/queries/generate_avro/liquidity_pools.sql new file mode 100644 index 00000000..1bb4b622 --- /dev/null +++ b/dags/queries/generate_avro/liquidity_pools.sql @@ -0,0 +1,17 @@ +export data + options ( + uri = '{uri}', + format = 'avro', + overwrite=true + ) +as ( + select + * + except(batch_id, batch_insert_ts, batch_run_date) + from {project_id}.{dataset_id}.liquidity_pools + where true + and closed_at >= '{batch_run_date}' + and closed_at < '{next_batch_run_date}' + order by closed_at asc +) + diff --git a/dags/queries/generate_avro/offers.sql b/dags/queries/generate_avro/offers.sql new file mode 100644 index 00000000..e3eb8b74 --- /dev/null +++ b/dags/queries/generate_avro/offers.sql @@ -0,0 +1,17 @@ +export data + options ( + uri = '{uri}', + format = 'avro', + overwrite=true + ) +as ( + select + * + except(batch_id, batch_insert_ts, batch_run_date) + from {project_id}.{dataset_id}.offers + where true + and closed_at >= '{batch_run_date}' + and closed_at < '{next_batch_run_date}' + order by closed_at asc +) + diff --git a/dags/queries/generate_avro/trust_lines.sql b/dags/queries/generate_avro/trust_lines.sql new file mode 100644 index 00000000..6917e93f --- /dev/null +++ b/dags/queries/generate_avro/trust_lines.sql @@ -0,0 +1,17 @@ +export data + options ( + uri = '{uri}', + format = 'avro', + overwrite=true + ) +as ( + select + * + except(batch_id, batch_insert_ts, batch_run_date) + from {project_id}.{dataset_id}.trust_lines + where true + and closed_at >= '{batch_run_date}' + and closed_at < '{next_batch_run_date}' + order by closed_at asc +) + diff --git a/dags/queries/generate_avro/ttl.sql b/dags/queries/generate_avro/ttl.sql new file mode 100644 index 00000000..fd0ef89c --- /dev/null +++ b/dags/queries/generate_avro/ttl.sql @@ -0,0 +1,17 @@ +export data + options ( + uri = '{uri}', + format = 'avro', + overwrite=true + ) +as ( + select + * + except(batch_id, batch_insert_ts, batch_run_date) + from {project_id}.{dataset_id}.ttl + where true + and closed_at >= '{batch_run_date}' + and closed_at < '{next_batch_run_date}' + order by closed_at asc +) + diff --git a/dags/stellar_etl_airflow/build_bq_generate_avro_job_task.py b/dags/stellar_etl_airflow/build_bq_generate_avro_job_task.py new file mode 100644 index 00000000..1a10a6c6 --- /dev/null +++ b/dags/stellar_etl_airflow/build_bq_generate_avro_job_task.py @@ -0,0 +1,64 @@ +import os +from datetime import timedelta + +from airflow.models import Variable +from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator +from stellar_etl_airflow import macros +from stellar_etl_airflow.build_bq_insert_job_task import file_to_string +from stellar_etl_airflow.default import alert_after_max_retries + + +def get_query_filepath(query_name): + root = os.path.dirname(os.path.dirname(__file__)) + return os.path.join(root, f"queries/generate_avro/{query_name}.sql") + + +def build_bq_generate_avro_job( + dag, + project, + dataset, + table, + gcs_bucket, +): + query_path = get_query_filepath(table) + query = file_to_string(query_path) + batch_run_date = "{{ batch_run_date_as_datetime_string(dag, data_interval_start) }}" + prev_batch_run_date = ( + "{{ batch_run_date_as_datetime_string(dag, prev_data_interval_start_success) }}" + ) + next_batch_run_date = ( + "{{ batch_run_date_as_datetime_string(dag, data_interval_end) }}" + ) + batch_run_time = f"{batch_run_date.hour}:{batch_run_date.minute}:{batch_run_date.second}": + uri = f"gs://{gcs_bucket}/avro/{table}/{batch_run_date.year}/{batch_run_date.month}/{batch_run_date.day}/{batch_run_time}/*.avro" + sql_params = { + "project_id": project, + "dataset_id": dataset, + "prev_batch_run_date": prev_batch_run_date, + "next_batch_run_date": next_batch_run_date, + "uri": uri, + } + query = query.format(**sql_params) + configuration = { + "query": { + "query": query, + "useLegacySql": False, + } + } + + return BigQueryInsertJobOperator( + task_id=f"generate_avro_{table}", + execution_timeout=timedelta( + seconds=Variable.get("task_timeout", deserialize_json=True)[ + build_bq_insert_job.__name__ + ] + ), + on_failure_callback=alert_after_max_retries, + configuration=configuration, + sla=timedelta( + seconds=Variable.get("task_sla", deserialize_json=True)[ + build_bq_insert_job.__name__ + ] + ), + ) + From 31b887dc33bf8c2eb57ac9975f31bf3c0885ade2 Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Mon, 7 Oct 2024 12:18:35 -0400 Subject: [PATCH 02/13] add batch_run_date --- dags/stellar_etl_airflow/build_bq_generate_avro_job_task.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dags/stellar_etl_airflow/build_bq_generate_avro_job_task.py b/dags/stellar_etl_airflow/build_bq_generate_avro_job_task.py index 1a10a6c6..7fb06258 100644 --- a/dags/stellar_etl_airflow/build_bq_generate_avro_job_task.py +++ b/dags/stellar_etl_airflow/build_bq_generate_avro_job_task.py @@ -34,6 +34,7 @@ def build_bq_generate_avro_job( sql_params = { "project_id": project, "dataset_id": dataset, + "batch_run_date": batch_run_date, "prev_batch_run_date": prev_batch_run_date, "next_batch_run_date": next_batch_run_date, "uri": uri, From 15ee527e0eea2331f38197b2e960539ed6764c1c Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Mon, 7 Oct 2024 12:34:43 -0400 Subject: [PATCH 03/13] lint --- dags/generate_avro_files_dag.py | 9 +++++---- dags/queries/generate_avro/contract_data.sql | 1 - dags/queries/generate_avro/history_contract_events.sql | 1 - dags/queries/generate_avro/history_effects.sql | 1 - dags/queries/generate_avro/history_ledgers.sql | 1 - dags/queries/generate_avro/history_operations.sql | 1 - dags/queries/generate_avro/history_trades.sql | 1 - dags/queries/generate_avro/history_transactions.sql | 1 - dags/queries/generate_avro/liquidity_pools.sql | 1 - dags/queries/generate_avro/offers.sql | 1 - dags/queries/generate_avro/trust_lines.sql | 1 - dags/queries/generate_avro/ttl.sql | 1 - 12 files changed, 5 insertions(+), 15 deletions(-) diff --git a/dags/generate_avro_files_dag.py b/dags/generate_avro_files_dag.py index 5bf83e2b..5ea5891b 100644 --- a/dags/generate_avro_files_dag.py +++ b/dags/generate_avro_files_dag.py @@ -3,7 +3,9 @@ from airflow import DAG from kubernetes.client import models as k8s from stellar_etl_airflow.build_cross_dependency_task import build_cross_deps -from stellar_etl_airflow.build_bq_generate_avro_job_task import build_bq_generate_avro_job +from stellar_etl_airflow.build_bq_generate_avro_job_task import ( + build_bq_generate_avro_job +) from stellar_etl_airflow.default import ( alert_sla_miss, get_default_dag_args, @@ -45,8 +47,8 @@ "offers", "trust_lines", "ttl", - #"history_effects", - #"history_operations", + # "history_effects", + # "history_operations", ] for table in avro_tables: @@ -60,4 +62,3 @@ wait_on_history_table >> task wait_on_state_table >> task - diff --git a/dags/queries/generate_avro/contract_data.sql b/dags/queries/generate_avro/contract_data.sql index 4a1eb01e..92b872ab 100644 --- a/dags/queries/generate_avro/contract_data.sql +++ b/dags/queries/generate_avro/contract_data.sql @@ -14,4 +14,3 @@ as ( and closed_at < '{next_batch_run_date}' order by closed_at asc ) - diff --git a/dags/queries/generate_avro/history_contract_events.sql b/dags/queries/generate_avro/history_contract_events.sql index 68e9b3ba..0d0806a5 100644 --- a/dags/queries/generate_avro/history_contract_events.sql +++ b/dags/queries/generate_avro/history_contract_events.sql @@ -14,4 +14,3 @@ as ( and closed_at < '{next_batch_run_date}' order by closed_at asc ) - diff --git a/dags/queries/generate_avro/history_effects.sql b/dags/queries/generate_avro/history_effects.sql index 4d80ab45..54309e0b 100644 --- a/dags/queries/generate_avro/history_effects.sql +++ b/dags/queries/generate_avro/history_effects.sql @@ -16,4 +16,3 @@ as ( and closed_at < '{next_batch_run_date}' order by closed_at asc ) - diff --git a/dags/queries/generate_avro/history_ledgers.sql b/dags/queries/generate_avro/history_ledgers.sql index 926e188a..ff78c883 100644 --- a/dags/queries/generate_avro/history_ledgers.sql +++ b/dags/queries/generate_avro/history_ledgers.sql @@ -14,4 +14,3 @@ as ( and closed_at < '{next_batch_run_date}' order by closed_at asc ) - diff --git a/dags/queries/generate_avro/history_operations.sql b/dags/queries/generate_avro/history_operations.sql index 908c710c..1a22ccb4 100644 --- a/dags/queries/generate_avro/history_operations.sql +++ b/dags/queries/generate_avro/history_operations.sql @@ -17,4 +17,3 @@ as ( and closed_at < '{next_batch_run_date}' order by closed_at asc ) - diff --git a/dags/queries/generate_avro/history_trades.sql b/dags/queries/generate_avro/history_trades.sql index 52b84cc3..5ddb6c1f 100644 --- a/dags/queries/generate_avro/history_trades.sql +++ b/dags/queries/generate_avro/history_trades.sql @@ -15,4 +15,3 @@ as ( and closed_at < '{next_batch_run_date}' order by closed_at asc ) - diff --git a/dags/queries/generate_avro/history_transactions.sql b/dags/queries/generate_avro/history_transactions.sql index 9fe1be63..6c134c88 100644 --- a/dags/queries/generate_avro/history_transactions.sql +++ b/dags/queries/generate_avro/history_transactions.sql @@ -14,4 +14,3 @@ as ( and closed_at < '{next_batch_run_date}' order by closed_at asc ) - diff --git a/dags/queries/generate_avro/liquidity_pools.sql b/dags/queries/generate_avro/liquidity_pools.sql index 1bb4b622..ac92c690 100644 --- a/dags/queries/generate_avro/liquidity_pools.sql +++ b/dags/queries/generate_avro/liquidity_pools.sql @@ -14,4 +14,3 @@ as ( and closed_at < '{next_batch_run_date}' order by closed_at asc ) - diff --git a/dags/queries/generate_avro/offers.sql b/dags/queries/generate_avro/offers.sql index e3eb8b74..bb3077d8 100644 --- a/dags/queries/generate_avro/offers.sql +++ b/dags/queries/generate_avro/offers.sql @@ -14,4 +14,3 @@ as ( and closed_at < '{next_batch_run_date}' order by closed_at asc ) - diff --git a/dags/queries/generate_avro/trust_lines.sql b/dags/queries/generate_avro/trust_lines.sql index 6917e93f..32917d22 100644 --- a/dags/queries/generate_avro/trust_lines.sql +++ b/dags/queries/generate_avro/trust_lines.sql @@ -14,4 +14,3 @@ as ( and closed_at < '{next_batch_run_date}' order by closed_at asc ) - diff --git a/dags/queries/generate_avro/ttl.sql b/dags/queries/generate_avro/ttl.sql index fd0ef89c..97224d06 100644 --- a/dags/queries/generate_avro/ttl.sql +++ b/dags/queries/generate_avro/ttl.sql @@ -14,4 +14,3 @@ as ( and closed_at < '{next_batch_run_date}' order by closed_at asc ) - From 67061c5ac38cbcc9d087cddcae79013dbab71122 Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Mon, 7 Oct 2024 13:13:03 -0400 Subject: [PATCH 04/13] Add missing variables --- dags/generate_avro_files_dag.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/dags/generate_avro_files_dag.py b/dags/generate_avro_files_dag.py index 5ea5891b..664d9cc7 100644 --- a/dags/generate_avro_files_dag.py +++ b/dags/generate_avro_files_dag.py @@ -29,6 +29,10 @@ sla_miss_callback=alert_sla_miss, ) +public_project = "{{ var.value.public_project }}" +public_dataset = "{{ var.value.public_dataset }}" +gcs_bucket = "{{ var.value.avro_gcs_bucket }}" + # Wait on ingestion DAGs wait_on_history_table = build_cross_deps( dag, "wait_on_ledgers_txs", "history_table_export" From d38f6f891e0da69da3f828c386007683e5ddc141 Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Mon, 7 Oct 2024 13:16:25 -0400 Subject: [PATCH 05/13] lint --- dags/generate_avro_files_dag.py | 4 ++-- dags/stellar_etl_airflow/build_bq_generate_avro_job_task.py | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/dags/generate_avro_files_dag.py b/dags/generate_avro_files_dag.py index 664d9cc7..78ade559 100644 --- a/dags/generate_avro_files_dag.py +++ b/dags/generate_avro_files_dag.py @@ -2,10 +2,10 @@ from airflow import DAG from kubernetes.client import models as k8s -from stellar_etl_airflow.build_cross_dependency_task import build_cross_deps from stellar_etl_airflow.build_bq_generate_avro_job_task import ( - build_bq_generate_avro_job + build_bq_generate_avro_job, ) +from stellar_etl_airflow.build_cross_dependency_task import build_cross_deps from stellar_etl_airflow.default import ( alert_sla_miss, get_default_dag_args, diff --git a/dags/stellar_etl_airflow/build_bq_generate_avro_job_task.py b/dags/stellar_etl_airflow/build_bq_generate_avro_job_task.py index 7fb06258..f5d05242 100644 --- a/dags/stellar_etl_airflow/build_bq_generate_avro_job_task.py +++ b/dags/stellar_etl_airflow/build_bq_generate_avro_job_task.py @@ -62,4 +62,3 @@ def build_bq_generate_avro_job( ] ), ) - From 25c33b3155385f480ca7d62ceca6f5a3b32baa96 Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Mon, 7 Oct 2024 14:04:39 -0400 Subject: [PATCH 06/13] fix bugs --- airflow_variables_dev.json | 5 ++++- dags/generate_avro_files_dag.py | 1 - .../build_bq_generate_avro_job_task.py | 10 ++++++---- dags/stellar_etl_airflow/macros.py | 4 ++++ 4 files changed, 14 insertions(+), 6 deletions(-) diff --git a/airflow_variables_dev.json b/airflow_variables_dev.json index 367e3228..78585f6a 100644 --- a/airflow_variables_dev.json +++ b/airflow_variables_dev.json @@ -1,5 +1,6 @@ { "api_key_path": "/home/airflow/gcs/data/apiKey.json", + "avro_gcs_bucket": "test_dune_bucket_sdf", "bq_dataset": "test_crypto_stellar_internal", "bq_dataset_audit_log": "audit_log", "bq_project": "test-hubble-319619", @@ -339,6 +340,7 @@ "build_export_task": 840, "build_gcs_to_bq_task": 960, "build_time_task": 480, + "build_bq_generate_avro_job": 600, "cleanup_metadata": 60, "create_sandbox": 2400, "current_state": 720, @@ -373,7 +375,8 @@ "build_delete_data_task": 180, "build_export_task": 420, "build_gcs_to_bq_task": 300, - "build_time_task": 480 + "build_time_task": 480, + "build_bq_generate_avro_job": 600 }, "txmeta_datastore_path": "sdf-ledger-close-meta/ledgers", "use_captive_core": "False", diff --git a/dags/generate_avro_files_dag.py b/dags/generate_avro_files_dag.py index 78ade559..a6aa95cf 100644 --- a/dags/generate_avro_files_dag.py +++ b/dags/generate_avro_files_dag.py @@ -25,7 +25,6 @@ }, max_active_runs=5, catchup=True, - tags=["dbt-enriched-base-tables"], sla_miss_callback=alert_sla_miss, ) diff --git a/dags/stellar_etl_airflow/build_bq_generate_avro_job_task.py b/dags/stellar_etl_airflow/build_bq_generate_avro_job_task.py index f5d05242..007dd388 100644 --- a/dags/stellar_etl_airflow/build_bq_generate_avro_job_task.py +++ b/dags/stellar_etl_airflow/build_bq_generate_avro_job_task.py @@ -29,8 +29,10 @@ def build_bq_generate_avro_job( next_batch_run_date = ( "{{ batch_run_date_as_datetime_string(dag, data_interval_end) }}" ) - batch_run_time = f"{batch_run_date.hour}:{batch_run_date.minute}:{batch_run_date.second}": - uri = f"gs://{gcs_bucket}/avro/{table}/{batch_run_date.year}/{batch_run_date.month}/{batch_run_date.day}/{batch_run_time}/*.avro" + uri_datetime = ( + "{{ batch_run_date_as_directory_string(dag, data_interval_start) }}" + ) + uri = f"gs://{gcs_bucket}/avro/{table}/{uri_datetime}/*.avro" sql_params = { "project_id": project, "dataset_id": dataset, @@ -51,14 +53,14 @@ def build_bq_generate_avro_job( task_id=f"generate_avro_{table}", execution_timeout=timedelta( seconds=Variable.get("task_timeout", deserialize_json=True)[ - build_bq_insert_job.__name__ + build_bq_generate_avro_job.__name__ ] ), on_failure_callback=alert_after_max_retries, configuration=configuration, sla=timedelta( seconds=Variable.get("task_sla", deserialize_json=True)[ - build_bq_insert_job.__name__ + build_bq_generate_avro_job.__name__ ] ), ) diff --git a/dags/stellar_etl_airflow/macros.py b/dags/stellar_etl_airflow/macros.py index fdd23c89..55501d76 100644 --- a/dags/stellar_etl_airflow/macros.py +++ b/dags/stellar_etl_airflow/macros.py @@ -9,3 +9,7 @@ def batch_run_date_as_datetime_string(dag, start_time): def get_batch_id(): return "{}-{}".format("{{ run_id }}", "{{ params.alias }}") + +def batch_run_date_as_directory_string(dag, start_time): + time = subtract_data_interval(dag, start_time) + return f"{time.year}/{time.month}/{time.day}/{time.hour}:{time.minute}:{time.second}" From e40ddd524d16b607931b53d82f2e992c8349659c Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Mon, 7 Oct 2024 22:33:42 -0400 Subject: [PATCH 07/13] update quereies and vars --- airflow_variables_prod.json | 5 ++- dags/generate_avro_files_dag.py | 32 ++++++++++++------- dags/queries/generate_avro/accounts.sql | 2 ++ .../queries/generate_avro/history_effects.sql | 2 ++ .../generate_avro/history_operations.sql | 4 ++- dags/queries/generate_avro/history_trades.sql | 4 +-- .../generate_avro/history_transactions.sql | 2 ++ .../queries/generate_avro/liquidity_pools.sql | 2 ++ dags/queries/generate_avro/offers.sql | 2 ++ dags/queries/generate_avro/trust_lines.sql | 2 ++ .../build_bq_generate_avro_job_task.py | 16 +++------- dags/stellar_etl_airflow/macros.py | 1 + 12 files changed, 47 insertions(+), 27 deletions(-) diff --git a/airflow_variables_prod.json b/airflow_variables_prod.json index 53347235..d26cd16e 100644 --- a/airflow_variables_prod.json +++ b/airflow_variables_prod.json @@ -1,5 +1,6 @@ { "api_key_path": "/home/airflow/gcs/data/apiKey.json", + "avro_gcs_bucket": "dune_bucket_sdf", "bq_dataset": "crypto_stellar_internal_2", "bq_dataset_audit_log": "audit_log", "bq_project": "hubble-261722", @@ -337,6 +338,7 @@ "build_export_task": 600, "build_gcs_to_bq_task": 660, "build_time_task": 300, + "build_bq_generate_avro_job": 600, "cleanup_metadata": 60, "create_sandbox": 1020, "current_state": 1200, @@ -371,7 +373,8 @@ "build_delete_data_task": 180, "build_export_task": 300, "build_gcs_to_bq_task": 300, - "build_time_task": 360 + "build_time_task": 360, + "build_bq_generate_avro_job": 600 }, "txmeta_datastore_path": "sdf-ledger-close-meta/ledgers", "use_captive_core": "False", diff --git a/dags/generate_avro_files_dag.py b/dags/generate_avro_files_dag.py index a6aa95cf..b8914112 100644 --- a/dags/generate_avro_files_dag.py +++ b/dags/generate_avro_files_dag.py @@ -1,11 +1,12 @@ from datetime import datetime from airflow import DAG -from kubernetes.client import models as k8s +from stellar_etl_airflow import macros +from stellar_etl_airflow.build_cross_dependency_task import build_cross_deps from stellar_etl_airflow.build_bq_generate_avro_job_task import ( build_bq_generate_avro_job, ) -from stellar_etl_airflow.build_cross_dependency_task import build_cross_deps +from airflow.operators.dummy import DummyOperator from stellar_etl_airflow.default import ( alert_sla_miss, get_default_dag_args, @@ -15,16 +16,18 @@ init_sentry() dag = DAG( - "generate_avro_files", + "generate_avro", default_args=get_default_dag_args(), - start_date=datetime(2024, 10, 1, 0, 0), + start_date=datetime(2024, 10, 1, 1, 0), + catchup=True, description="This DAG generates AVRO files from BQ tables", - schedule_interval="0 * * * *", # Runs every hour - user_defined_filters={ - "container_resources": lambda s: k8s.V1ResourceRequirements(requests=s), + schedule_interval="0 * * * *", + render_template_as_native_obj=True, + user_defined_macros={ + "subtract_data_interval": macros.subtract_data_interval, + "batch_run_date_as_datetime_string": macros.batch_run_date_as_datetime_string, + "batch_run_date_as_directory_string": macros.batch_run_date_as_directory_string, }, - max_active_runs=5, - catchup=True, sla_miss_callback=alert_sla_miss, ) @@ -32,12 +35,15 @@ public_dataset = "{{ var.value.public_dataset }}" gcs_bucket = "{{ var.value.avro_gcs_bucket }}" + # Wait on ingestion DAGs wait_on_history_table = build_cross_deps( dag, "wait_on_ledgers_txs", "history_table_export" ) wait_on_state_table = build_cross_deps(dag, "wait_on_state_table", "state_table_export") +dummy_task = DummyOperator(task_id='dummy_task', dag=dag) + # Generate AVRO files avro_tables = [ "accounts", @@ -55,7 +61,7 @@ ] for table in avro_tables: - task = build_bq_generate_avro_job( + avro_task = build_bq_generate_avro_job( dag=dag, project=public_project, dataset=public_dataset, @@ -63,5 +69,7 @@ gcs_bucket=gcs_bucket, ) - wait_on_history_table >> task - wait_on_state_table >> task + dummy_task >> avro_task + wait_on_history_table >> avro_task + wait_on_state_table >> avro_task + diff --git a/dags/queries/generate_avro/accounts.sql b/dags/queries/generate_avro/accounts.sql index ee25193c..5280a2bd 100644 --- a/dags/queries/generate_avro/accounts.sql +++ b/dags/queries/generate_avro/accounts.sql @@ -11,6 +11,8 @@ as ( sequence_ledger as account_sequence_last_modified_ledger from {project_id}.{dataset_id}.accounts where true + and batch_run_date >= '{batch_run_date}' + and batch_run_date < '{next_batch_run_date}' and closed_at >= '{batch_run_date}' and closed_at < '{next_batch_run_date}' order by closed_at asc diff --git a/dags/queries/generate_avro/history_effects.sql b/dags/queries/generate_avro/history_effects.sql index 54309e0b..b2d53c1c 100644 --- a/dags/queries/generate_avro/history_effects.sql +++ b/dags/queries/generate_avro/history_effects.sql @@ -12,6 +12,8 @@ as ( except(predicate) from {project_id}.{dataset_id}.history_effects where true + and batch_run_date >= '{batch_run_date}' + and batch_run_date < '{next_batch_run_date}' and closed_at >= '{batch_run_date}' and closed_at < '{next_batch_run_date}' order by closed_at asc diff --git a/dags/queries/generate_avro/history_operations.sql b/dags/queries/generate_avro/history_operations.sql index 1a22ccb4..bf2fd99d 100644 --- a/dags/queries/generate_avro/history_operations.sql +++ b/dags/queries/generate_avro/history_operations.sql @@ -10,9 +10,11 @@ as ( except(details, details_json, batch_id, batch_insert_ts, batch_run_date), details.* except(claimants, type), - details.type as details_type + details.type as soroban_operation_type from {project_id}.{dataset_id}.history_operations where true + and batch_run_date >= '{batch_run_date}' + and batch_run_date < '{next_batch_run_date}' and closed_at >= '{batch_run_date}' and closed_at < '{next_batch_run_date}' order by closed_at asc diff --git a/dags/queries/generate_avro/history_trades.sql b/dags/queries/generate_avro/history_trades.sql index 5ddb6c1f..151192f9 100644 --- a/dags/queries/generate_avro/history_trades.sql +++ b/dags/queries/generate_avro/history_trades.sql @@ -11,7 +11,7 @@ as ( ledger_closed_at as closed_at from {project_id}.{dataset_id}.history_trades where true - and closed_at >= '{batch_run_date}' - and closed_at < '{next_batch_run_date}' + and ledger_closed_at >= '{batch_run_date}' + and ledger_closed_at < '{next_batch_run_date}' order by closed_at asc ) diff --git a/dags/queries/generate_avro/history_transactions.sql b/dags/queries/generate_avro/history_transactions.sql index 6c134c88..84e0d096 100644 --- a/dags/queries/generate_avro/history_transactions.sql +++ b/dags/queries/generate_avro/history_transactions.sql @@ -10,6 +10,8 @@ as ( except(batch_id, batch_insert_ts, batch_run_date) from {project_id}.{dataset_id}.history_transactions where true + and batch_run_date >= '{batch_run_date}' + and batch_run_date < '{next_batch_run_date}' and closed_at >= '{batch_run_date}' and closed_at < '{next_batch_run_date}' order by closed_at asc diff --git a/dags/queries/generate_avro/liquidity_pools.sql b/dags/queries/generate_avro/liquidity_pools.sql index ac92c690..cb74e81f 100644 --- a/dags/queries/generate_avro/liquidity_pools.sql +++ b/dags/queries/generate_avro/liquidity_pools.sql @@ -10,6 +10,8 @@ as ( except(batch_id, batch_insert_ts, batch_run_date) from {project_id}.{dataset_id}.liquidity_pools where true + and batch_run_date >= '{batch_run_date}' + and batch_run_date < '{next_batch_run_date}' and closed_at >= '{batch_run_date}' and closed_at < '{next_batch_run_date}' order by closed_at asc diff --git a/dags/queries/generate_avro/offers.sql b/dags/queries/generate_avro/offers.sql index bb3077d8..d6c6f4a2 100644 --- a/dags/queries/generate_avro/offers.sql +++ b/dags/queries/generate_avro/offers.sql @@ -10,6 +10,8 @@ as ( except(batch_id, batch_insert_ts, batch_run_date) from {project_id}.{dataset_id}.offers where true + and batch_run_date >= '{batch_run_date}' + and batch_run_date < '{next_batch_run_date}' and closed_at >= '{batch_run_date}' and closed_at < '{next_batch_run_date}' order by closed_at asc diff --git a/dags/queries/generate_avro/trust_lines.sql b/dags/queries/generate_avro/trust_lines.sql index 32917d22..47c1a076 100644 --- a/dags/queries/generate_avro/trust_lines.sql +++ b/dags/queries/generate_avro/trust_lines.sql @@ -10,6 +10,8 @@ as ( except(batch_id, batch_insert_ts, batch_run_date) from {project_id}.{dataset_id}.trust_lines where true + and batch_run_date >= '{batch_run_date}' + and batch_run_date < '{next_batch_run_date}' and closed_at >= '{batch_run_date}' and closed_at < '{next_batch_run_date}' order by closed_at asc diff --git a/dags/stellar_etl_airflow/build_bq_generate_avro_job_task.py b/dags/stellar_etl_airflow/build_bq_generate_avro_job_task.py index 007dd388..ebf81901 100644 --- a/dags/stellar_etl_airflow/build_bq_generate_avro_job_task.py +++ b/dags/stellar_etl_airflow/build_bq_generate_avro_job_task.py @@ -3,16 +3,14 @@ from airflow.models import Variable from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator +from stellar_etl_airflow.build_bq_insert_job_task import ( + get_query_filepath, + file_to_string, +) from stellar_etl_airflow import macros -from stellar_etl_airflow.build_bq_insert_job_task import file_to_string from stellar_etl_airflow.default import alert_after_max_retries -def get_query_filepath(query_name): - root = os.path.dirname(os.path.dirname(__file__)) - return os.path.join(root, f"queries/generate_avro/{query_name}.sql") - - def build_bq_generate_avro_job( dag, project, @@ -20,12 +18,9 @@ def build_bq_generate_avro_job( table, gcs_bucket, ): - query_path = get_query_filepath(table) + query_path = get_query_filepath(f"generate_avro/{table}") query = file_to_string(query_path) batch_run_date = "{{ batch_run_date_as_datetime_string(dag, data_interval_start) }}" - prev_batch_run_date = ( - "{{ batch_run_date_as_datetime_string(dag, prev_data_interval_start_success) }}" - ) next_batch_run_date = ( "{{ batch_run_date_as_datetime_string(dag, data_interval_end) }}" ) @@ -37,7 +32,6 @@ def build_bq_generate_avro_job( "project_id": project, "dataset_id": dataset, "batch_run_date": batch_run_date, - "prev_batch_run_date": prev_batch_run_date, "next_batch_run_date": next_batch_run_date, "uri": uri, } diff --git a/dags/stellar_etl_airflow/macros.py b/dags/stellar_etl_airflow/macros.py index 55501d76..79ded807 100644 --- a/dags/stellar_etl_airflow/macros.py +++ b/dags/stellar_etl_airflow/macros.py @@ -10,6 +10,7 @@ def batch_run_date_as_datetime_string(dag, start_time): def get_batch_id(): return "{}-{}".format("{{ run_id }}", "{{ params.alias }}") + def batch_run_date_as_directory_string(dag, start_time): time = subtract_data_interval(dag, start_time) return f"{time.year}/{time.month}/{time.day}/{time.hour}:{time.minute}:{time.second}" From df4aae3a09c84700cfb4fc27eb5accf8cba655fd Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Mon, 7 Oct 2024 22:43:11 -0400 Subject: [PATCH 08/13] lint --- airflow_variables_dev.json | 6 +++--- airflow_variables_prod.json | 6 +++--- dags/generate_avro_files_dag.py | 6 +++--- .../build_bq_generate_avro_job_task.py | 9 +++------ dags/stellar_etl_airflow/macros.py | 4 +++- 5 files changed, 15 insertions(+), 16 deletions(-) diff --git a/airflow_variables_dev.json b/airflow_variables_dev.json index 78585f6a..85468664 100644 --- a/airflow_variables_dev.json +++ b/airflow_variables_dev.json @@ -334,13 +334,13 @@ "task_sla": { "asset_stats": 720, "build_batch_stats": 840, + "build_bq_generate_avro_job": 600, "build_bq_insert_job": 1080, "build_del_ins_from_gcs_to_bq_task": 2000, "build_delete_data_task": 1020, "build_export_task": 840, "build_gcs_to_bq_task": 960, "build_time_task": 480, - "build_bq_generate_avro_job": 600, "cleanup_metadata": 60, "create_sandbox": 2400, "current_state": 720, @@ -368,6 +368,7 @@ }, "task_timeout": { "build_batch_stats": 180, + "build_bq_generate_avro_job": 600, "build_bq_insert_job": 180, "build_copy_table": 180, "build_dbt_task": 960, @@ -375,8 +376,7 @@ "build_delete_data_task": 180, "build_export_task": 420, "build_gcs_to_bq_task": 300, - "build_time_task": 480, - "build_bq_generate_avro_job": 600 + "build_time_task": 480 }, "txmeta_datastore_path": "sdf-ledger-close-meta/ledgers", "use_captive_core": "False", diff --git a/airflow_variables_prod.json b/airflow_variables_prod.json index d26cd16e..98bdb3d2 100644 --- a/airflow_variables_prod.json +++ b/airflow_variables_prod.json @@ -332,13 +332,13 @@ "task_sla": { "asset_stats": 420, "build_batch_stats": 600, + "build_bq_generate_avro_job": 600, "build_bq_insert_job": 840, "build_del_ins_from_gcs_to_bq_task": 2000, "build_delete_data_task": 780, "build_export_task": 600, "build_gcs_to_bq_task": 660, "build_time_task": 300, - "build_bq_generate_avro_job": 600, "cleanup_metadata": 60, "create_sandbox": 1020, "current_state": 1200, @@ -366,6 +366,7 @@ }, "task_timeout": { "build_batch_stats": 180, + "build_bq_generate_avro_job": 600, "build_bq_insert_job": 180, "build_copy_table": 180, "build_dbt_task": 1800, @@ -373,8 +374,7 @@ "build_delete_data_task": 180, "build_export_task": 300, "build_gcs_to_bq_task": 300, - "build_time_task": 360, - "build_bq_generate_avro_job": 600 + "build_time_task": 360 }, "txmeta_datastore_path": "sdf-ledger-close-meta/ledgers", "use_captive_core": "False", diff --git a/dags/generate_avro_files_dag.py b/dags/generate_avro_files_dag.py index b8914112..37b6cd44 100644 --- a/dags/generate_avro_files_dag.py +++ b/dags/generate_avro_files_dag.py @@ -1,12 +1,12 @@ from datetime import datetime from airflow import DAG +from airflow.operators.dummy import DummyOperator from stellar_etl_airflow import macros -from stellar_etl_airflow.build_cross_dependency_task import build_cross_deps from stellar_etl_airflow.build_bq_generate_avro_job_task import ( build_bq_generate_avro_job, ) -from airflow.operators.dummy import DummyOperator +from stellar_etl_airflow.build_cross_dependency_task import build_cross_deps from stellar_etl_airflow.default import ( alert_sla_miss, get_default_dag_args, @@ -42,7 +42,7 @@ ) wait_on_state_table = build_cross_deps(dag, "wait_on_state_table", "state_table_export") -dummy_task = DummyOperator(task_id='dummy_task', dag=dag) +dummy_task = DummyOperator(task_id="dummy_task", dag=dag) # Generate AVRO files avro_tables = [ diff --git a/dags/stellar_etl_airflow/build_bq_generate_avro_job_task.py b/dags/stellar_etl_airflow/build_bq_generate_avro_job_task.py index ebf81901..6769c469 100644 --- a/dags/stellar_etl_airflow/build_bq_generate_avro_job_task.py +++ b/dags/stellar_etl_airflow/build_bq_generate_avro_job_task.py @@ -1,13 +1,12 @@ -import os from datetime import timedelta from airflow.models import Variable from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator +from stellar_etl_airflow import macros from stellar_etl_airflow.build_bq_insert_job_task import ( - get_query_filepath, file_to_string, + get_query_filepath, ) -from stellar_etl_airflow import macros from stellar_etl_airflow.default import alert_after_max_retries @@ -24,9 +23,7 @@ def build_bq_generate_avro_job( next_batch_run_date = ( "{{ batch_run_date_as_datetime_string(dag, data_interval_end) }}" ) - uri_datetime = ( - "{{ batch_run_date_as_directory_string(dag, data_interval_start) }}" - ) + uri_datetime = ("{{ batch_run_date_as_directory_string(dag, data_interval_start) }}") uri = f"gs://{gcs_bucket}/avro/{table}/{uri_datetime}/*.avro" sql_params = { "project_id": project, diff --git a/dags/stellar_etl_airflow/macros.py b/dags/stellar_etl_airflow/macros.py index 79ded807..55f5d0af 100644 --- a/dags/stellar_etl_airflow/macros.py +++ b/dags/stellar_etl_airflow/macros.py @@ -13,4 +13,6 @@ def get_batch_id(): def batch_run_date_as_directory_string(dag, start_time): time = subtract_data_interval(dag, start_time) - return f"{time.year}/{time.month}/{time.day}/{time.hour}:{time.minute}:{time.second}" + return ( + f"{time.year}/{time.month}/{time.day}/{time.hour}:{time.minute}:{time.second}" + ) From 36e800dd079627ba37424447807e5ca76e2658c6 Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Mon, 7 Oct 2024 22:54:24 -0400 Subject: [PATCH 09/13] lint --- .sqlfluff | 1 + dags/stellar_etl_airflow/build_bq_generate_avro_job_task.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/.sqlfluff b/.sqlfluff index f38f966b..bbf99671 100644 --- a/.sqlfluff +++ b/.sqlfluff @@ -81,6 +81,7 @@ batch_id = batch_id batch_run_date = batch_run_date prev_batch_run_date = prev_batch_run_date next_batch_run_date = next_batch_run_date +uri = uri # Some rules can be configured directly from the config common to other rules [sqlfluff:rules] diff --git a/dags/stellar_etl_airflow/build_bq_generate_avro_job_task.py b/dags/stellar_etl_airflow/build_bq_generate_avro_job_task.py index 6769c469..1f4495fa 100644 --- a/dags/stellar_etl_airflow/build_bq_generate_avro_job_task.py +++ b/dags/stellar_etl_airflow/build_bq_generate_avro_job_task.py @@ -23,7 +23,7 @@ def build_bq_generate_avro_job( next_batch_run_date = ( "{{ batch_run_date_as_datetime_string(dag, data_interval_end) }}" ) - uri_datetime = ("{{ batch_run_date_as_directory_string(dag, data_interval_start) }}") + uri_datetime = "{{ batch_run_date_as_directory_string(dag, data_interval_start) }}" uri = f"gs://{gcs_bucket}/avro/{table}/{uri_datetime}/*.avro" sql_params = { "project_id": project, From 2fbfed5b94f0d8e799e635aa6e22bb29626fa9a5 Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Mon, 7 Oct 2024 23:02:33 -0400 Subject: [PATCH 10/13] lint --- dags/queries/generate_avro/accounts.sql | 32 ++++++++--------- dags/queries/generate_avro/contract_data.sql | 25 +++++++------ .../generate_avro/history_contract_events.sql | 25 +++++++------ .../queries/generate_avro/history_effects.sql | 34 +++++++++--------- .../queries/generate_avro/history_ledgers.sql | 25 +++++++------ .../generate_avro/history_operations.sql | 36 +++++++++---------- dags/queries/generate_avro/history_trades.sql | 28 +++++++-------- .../generate_avro/history_transactions.sql | 29 ++++++++------- .../queries/generate_avro/liquidity_pools.sql | 29 ++++++++------- dags/queries/generate_avro/offers.sql | 29 ++++++++------- dags/queries/generate_avro/trust_lines.sql | 29 ++++++++------- dags/queries/generate_avro/ttl.sql | 25 +++++++------ 12 files changed, 169 insertions(+), 177 deletions(-) diff --git a/dags/queries/generate_avro/accounts.sql b/dags/queries/generate_avro/accounts.sql index 5280a2bd..2b71ddfe 100644 --- a/dags/queries/generate_avro/accounts.sql +++ b/dags/queries/generate_avro/accounts.sql @@ -1,19 +1,19 @@ export data - options ( - uri = '{uri}', - format = 'avro', - overwrite=true - ) +options ( + uri = '{uri}' + , format = 'avro' + , overwrite = true +) as ( - select - * - except(sequence_ledger, batch_id, batch_insert_ts, batch_run_date), - sequence_ledger as account_sequence_last_modified_ledger - from {project_id}.{dataset_id}.accounts - where true - and batch_run_date >= '{batch_run_date}' - and batch_run_date < '{next_batch_run_date}' - and closed_at >= '{batch_run_date}' - and closed_at < '{next_batch_run_date}' - order by closed_at asc + select + * + except(sequence_ledger, batch_id, batch_insert_ts, batch_run_date) + , sequence_ledger as account_sequence_last_modified_ledger + from {project_id}.{dataset_id}.accounts + where true + and batch_run_date >= '{batch_run_date}' + and batch_run_date < '{next_batch_run_date}' + and closed_at >= '{batch_run_date}' + and closed_at < '{next_batch_run_date}' + order by closed_at asc ) diff --git a/dags/queries/generate_avro/contract_data.sql b/dags/queries/generate_avro/contract_data.sql index 92b872ab..fedbe6c6 100644 --- a/dags/queries/generate_avro/contract_data.sql +++ b/dags/queries/generate_avro/contract_data.sql @@ -1,16 +1,15 @@ export data - options ( - uri = '{uri}', - format = 'avro', - overwrite=true - ) +options ( + uri = '{uri}' + , format = 'avro' + , overwrite = true +) as ( - select - * - except(batch_id, batch_insert_ts, batch_run_date) - from {project_id}.{dataset_id}.contract_data - where true - and closed_at >= '{batch_run_date}' - and closed_at < '{next_batch_run_date}' - order by closed_at asc + select * + except(batch_id, batch_insert_ts, batch_run_date) + from {project_id}.{dataset_id}.contract_data + where true + and closed_at >= '{batch_run_date}' + and closed_at < '{next_batch_run_date}' + order by closed_at asc ) diff --git a/dags/queries/generate_avro/history_contract_events.sql b/dags/queries/generate_avro/history_contract_events.sql index 0d0806a5..c6d0a987 100644 --- a/dags/queries/generate_avro/history_contract_events.sql +++ b/dags/queries/generate_avro/history_contract_events.sql @@ -1,16 +1,15 @@ export data - options ( - uri = '{uri}', - format = 'avro', - overwrite=true - ) +options ( + uri = '{uri}' + , format = 'avro' + , overwrite = true +) as ( - select - * - except(batch_id, batch_insert_ts, batch_run_date) - from {project_id}.{dataset_id}.history_contract_events - where true - and closed_at >= '{batch_run_date}' - and closed_at < '{next_batch_run_date}' - order by closed_at asc + select * + except(batch_id, batch_insert_ts, batch_run_date) + from {project_id}.{dataset_id}.history_contract_events + where true + and closed_at >= '{batch_run_date}' + and closed_at < '{next_batch_run_date}' + order by closed_at asc ) diff --git a/dags/queries/generate_avro/history_effects.sql b/dags/queries/generate_avro/history_effects.sql index b2d53c1c..d4b7c954 100644 --- a/dags/queries/generate_avro/history_effects.sql +++ b/dags/queries/generate_avro/history_effects.sql @@ -1,20 +1,20 @@ export data - options ( - uri = '{uri}', - format = 'avro', - overwrite=true - ) +options ( + uri = '{uri}' + , format = 'avro' + , overwrite = true +) as ( - select - * - except(details, batch_id, batch_insert_ts, batch_run_date), - details.* - except(predicate) - from {project_id}.{dataset_id}.history_effects - where true - and batch_run_date >= '{batch_run_date}' - and batch_run_date < '{next_batch_run_date}' - and closed_at >= '{batch_run_date}' - and closed_at < '{next_batch_run_date}' - order by closed_at asc + select + * + except(details, batch_id, batch_insert_ts, batch_run_date) + , details.* + except(predicate) + from {project_id}.{dataset_id}.history_effects + where true + and batch_run_date >= '{batch_run_date}' + and batch_run_date < '{next_batch_run_date}' + and closed_at >= '{batch_run_date}' + and closed_at < '{next_batch_run_date}' + order by closed_at asc ) diff --git a/dags/queries/generate_avro/history_ledgers.sql b/dags/queries/generate_avro/history_ledgers.sql index ff78c883..d216da5c 100644 --- a/dags/queries/generate_avro/history_ledgers.sql +++ b/dags/queries/generate_avro/history_ledgers.sql @@ -1,16 +1,15 @@ export data - options ( - uri = '{uri}', - format = 'avro', - overwrite=true - ) +options ( + uri = '{uri}' + , format = 'avro' + , overwrite = true +) as ( - select - * - except(batch_id, batch_insert_ts, batch_run_date) - from {project_id}.{dataset_id}.history_ledgers - where true - and closed_at >= '{batch_run_date}' - and closed_at < '{next_batch_run_date}' - order by closed_at asc + select * + except(batch_id, batch_insert_ts, batch_run_date) + from {project_id}.{dataset_id}.history_ledgers + where true + and closed_at >= '{batch_run_date}' + and closed_at < '{next_batch_run_date}' + order by closed_at asc ) diff --git a/dags/queries/generate_avro/history_operations.sql b/dags/queries/generate_avro/history_operations.sql index bf2fd99d..c365c1ab 100644 --- a/dags/queries/generate_avro/history_operations.sql +++ b/dags/queries/generate_avro/history_operations.sql @@ -1,21 +1,21 @@ export data - options ( - uri = '{uri}', - format = 'avro', - overwrite=true - ) +options ( + uri = '{uri}' + , format = 'avro' + , overwrite = true +) as ( - select - * - except(details, details_json, batch_id, batch_insert_ts, batch_run_date), - details.* - except(claimants, type), - details.type as soroban_operation_type - from {project_id}.{dataset_id}.history_operations - where true - and batch_run_date >= '{batch_run_date}' - and batch_run_date < '{next_batch_run_date}' - and closed_at >= '{batch_run_date}' - and closed_at < '{next_batch_run_date}' - order by closed_at asc + select + * + except(details, details_json, batch_id, batch_insert_ts, batch_run_date) + , details.* + except(claimants, type) + , details.type as soroban_operation_type + from {project_id}.{dataset_id}.history_operations + where true + and batch_run_date >= '{batch_run_date}' + and batch_run_date < '{next_batch_run_date}' + and closed_at >= '{batch_run_date}' + and closed_at < '{next_batch_run_date}' + order by closed_at asc ) diff --git a/dags/queries/generate_avro/history_trades.sql b/dags/queries/generate_avro/history_trades.sql index 151192f9..809cd838 100644 --- a/dags/queries/generate_avro/history_trades.sql +++ b/dags/queries/generate_avro/history_trades.sql @@ -1,17 +1,17 @@ export data - options ( - uri = '{uri}', - format = 'avro', - overwrite=true - ) +options ( + uri = '{uri}' + , format = 'avro' + , overwrite = true +) as ( - select - * - except(ledger_closed_at, batch_id, batch_insert_ts, batch_run_date), - ledger_closed_at as closed_at - from {project_id}.{dataset_id}.history_trades - where true - and ledger_closed_at >= '{batch_run_date}' - and ledger_closed_at < '{next_batch_run_date}' - order by closed_at asc + select + * + except(ledger_closed_at, batch_id, batch_insert_ts, batch_run_date) + , ledger_closed_at as closed_at + from {project_id}.{dataset_id}.history_trades + where true + and ledger_closed_at >= '{batch_run_date}' + and ledger_closed_at < '{next_batch_run_date}' + order by closed_at asc ) diff --git a/dags/queries/generate_avro/history_transactions.sql b/dags/queries/generate_avro/history_transactions.sql index 84e0d096..03795ade 100644 --- a/dags/queries/generate_avro/history_transactions.sql +++ b/dags/queries/generate_avro/history_transactions.sql @@ -1,18 +1,17 @@ export data - options ( - uri = '{uri}', - format = 'avro', - overwrite=true - ) +options ( + uri = '{uri}' + , format = 'avro' + , overwrite = true +) as ( - select - * - except(batch_id, batch_insert_ts, batch_run_date) - from {project_id}.{dataset_id}.history_transactions - where true - and batch_run_date >= '{batch_run_date}' - and batch_run_date < '{next_batch_run_date}' - and closed_at >= '{batch_run_date}' - and closed_at < '{next_batch_run_date}' - order by closed_at asc + select * + except(batch_id, batch_insert_ts, batch_run_date) + from {project_id}.{dataset_id}.history_transactions + where true + and batch_run_date >= '{batch_run_date}' + and batch_run_date < '{next_batch_run_date}' + and closed_at >= '{batch_run_date}' + and closed_at < '{next_batch_run_date}' + order by closed_at asc ) diff --git a/dags/queries/generate_avro/liquidity_pools.sql b/dags/queries/generate_avro/liquidity_pools.sql index cb74e81f..b4dbdee0 100644 --- a/dags/queries/generate_avro/liquidity_pools.sql +++ b/dags/queries/generate_avro/liquidity_pools.sql @@ -1,18 +1,17 @@ export data - options ( - uri = '{uri}', - format = 'avro', - overwrite=true - ) +options ( + uri = '{uri}' + , format = 'avro' + , overwrite = true +) as ( - select - * - except(batch_id, batch_insert_ts, batch_run_date) - from {project_id}.{dataset_id}.liquidity_pools - where true - and batch_run_date >= '{batch_run_date}' - and batch_run_date < '{next_batch_run_date}' - and closed_at >= '{batch_run_date}' - and closed_at < '{next_batch_run_date}' - order by closed_at asc + select * + except(batch_id, batch_insert_ts, batch_run_date) + from {project_id}.{dataset_id}.liquidity_pools + where true + and batch_run_date >= '{batch_run_date}' + and batch_run_date < '{next_batch_run_date}' + and closed_at >= '{batch_run_date}' + and closed_at < '{next_batch_run_date}' + order by closed_at asc ) diff --git a/dags/queries/generate_avro/offers.sql b/dags/queries/generate_avro/offers.sql index d6c6f4a2..31ef9e18 100644 --- a/dags/queries/generate_avro/offers.sql +++ b/dags/queries/generate_avro/offers.sql @@ -1,18 +1,17 @@ export data - options ( - uri = '{uri}', - format = 'avro', - overwrite=true - ) +options ( + uri = '{uri}' + , format = 'avro' + , overwrite = true +) as ( - select - * - except(batch_id, batch_insert_ts, batch_run_date) - from {project_id}.{dataset_id}.offers - where true - and batch_run_date >= '{batch_run_date}' - and batch_run_date < '{next_batch_run_date}' - and closed_at >= '{batch_run_date}' - and closed_at < '{next_batch_run_date}' - order by closed_at asc + select * + except(batch_id, batch_insert_ts, batch_run_date) + from {project_id}.{dataset_id}.offers + where true + and batch_run_date >= '{batch_run_date}' + and batch_run_date < '{next_batch_run_date}' + and closed_at >= '{batch_run_date}' + and closed_at < '{next_batch_run_date}' + order by closed_at asc ) diff --git a/dags/queries/generate_avro/trust_lines.sql b/dags/queries/generate_avro/trust_lines.sql index 47c1a076..287a2236 100644 --- a/dags/queries/generate_avro/trust_lines.sql +++ b/dags/queries/generate_avro/trust_lines.sql @@ -1,18 +1,17 @@ export data - options ( - uri = '{uri}', - format = 'avro', - overwrite=true - ) +options ( + uri = '{uri}' + , format = 'avro' + , overwrite = true +) as ( - select - * - except(batch_id, batch_insert_ts, batch_run_date) - from {project_id}.{dataset_id}.trust_lines - where true - and batch_run_date >= '{batch_run_date}' - and batch_run_date < '{next_batch_run_date}' - and closed_at >= '{batch_run_date}' - and closed_at < '{next_batch_run_date}' - order by closed_at asc + select * + except(batch_id, batch_insert_ts, batch_run_date) + from {project_id}.{dataset_id}.trust_lines + where true + and batch_run_date >= '{batch_run_date}' + and batch_run_date < '{next_batch_run_date}' + and closed_at >= '{batch_run_date}' + and closed_at < '{next_batch_run_date}' + order by closed_at asc ) diff --git a/dags/queries/generate_avro/ttl.sql b/dags/queries/generate_avro/ttl.sql index 97224d06..919cdb96 100644 --- a/dags/queries/generate_avro/ttl.sql +++ b/dags/queries/generate_avro/ttl.sql @@ -1,16 +1,15 @@ export data - options ( - uri = '{uri}', - format = 'avro', - overwrite=true - ) +options ( + uri = '{uri}' + , format = 'avro' + , overwrite = true +) as ( - select - * - except(batch_id, batch_insert_ts, batch_run_date) - from {project_id}.{dataset_id}.ttl - where true - and closed_at >= '{batch_run_date}' - and closed_at < '{next_batch_run_date}' - order by closed_at asc + select * + except(batch_id, batch_insert_ts, batch_run_date) + from {project_id}.{dataset_id}.ttl + where true + and closed_at >= '{batch_run_date}' + and closed_at < '{next_batch_run_date}' + order by closed_at asc ) From 7e19c9d19c0d4f4701cdc832bb6fce5b84e371f9 Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Mon, 7 Oct 2024 23:12:14 -0400 Subject: [PATCH 11/13] lint --- dags/queries/generate_avro/accounts.sql | 5 +++-- dags/queries/generate_avro/contract_data.sql | 8 +++++--- dags/queries/generate_avro/history_contract_events.sql | 8 +++++--- dags/queries/generate_avro/history_effects.sql | 7 ++++--- dags/queries/generate_avro/history_ledgers.sql | 8 +++++--- dags/queries/generate_avro/history_operations.sql | 7 ++++--- dags/queries/generate_avro/history_trades.sql | 5 +++-- dags/queries/generate_avro/history_transactions.sql | 8 +++++--- dags/queries/generate_avro/liquidity_pools.sql | 8 +++++--- dags/queries/generate_avro/offers.sql | 8 +++++--- dags/queries/generate_avro/trust_lines.sql | 8 +++++--- dags/queries/generate_avro/ttl.sql | 8 +++++--- 12 files changed, 54 insertions(+), 34 deletions(-) diff --git a/dags/queries/generate_avro/accounts.sql b/dags/queries/generate_avro/accounts.sql index 2b71ddfe..28f080f5 100644 --- a/dags/queries/generate_avro/accounts.sql +++ b/dags/queries/generate_avro/accounts.sql @@ -7,10 +7,11 @@ options ( as ( select * - except(sequence_ledger, batch_id, batch_insert_ts, batch_run_date) + except (sequence_ledger, batch_id, batch_insert_ts, batch_run_date) , sequence_ledger as account_sequence_last_modified_ledger from {project_id}.{dataset_id}.accounts - where true + where + true and batch_run_date >= '{batch_run_date}' and batch_run_date < '{next_batch_run_date}' and closed_at >= '{batch_run_date}' diff --git a/dags/queries/generate_avro/contract_data.sql b/dags/queries/generate_avro/contract_data.sql index fedbe6c6..9fb7e60d 100644 --- a/dags/queries/generate_avro/contract_data.sql +++ b/dags/queries/generate_avro/contract_data.sql @@ -5,10 +5,12 @@ options ( , overwrite = true ) as ( - select * - except(batch_id, batch_insert_ts, batch_run_date) + select + * + except (batch_id, batch_insert_ts, batch_run_date) from {project_id}.{dataset_id}.contract_data - where true + where + true and closed_at >= '{batch_run_date}' and closed_at < '{next_batch_run_date}' order by closed_at asc diff --git a/dags/queries/generate_avro/history_contract_events.sql b/dags/queries/generate_avro/history_contract_events.sql index c6d0a987..84050cb5 100644 --- a/dags/queries/generate_avro/history_contract_events.sql +++ b/dags/queries/generate_avro/history_contract_events.sql @@ -5,10 +5,12 @@ options ( , overwrite = true ) as ( - select * - except(batch_id, batch_insert_ts, batch_run_date) + select + * + except (batch_id, batch_insert_ts, batch_run_date) from {project_id}.{dataset_id}.history_contract_events - where true + where + true and closed_at >= '{batch_run_date}' and closed_at < '{next_batch_run_date}' order by closed_at asc diff --git a/dags/queries/generate_avro/history_effects.sql b/dags/queries/generate_avro/history_effects.sql index d4b7c954..81498e20 100644 --- a/dags/queries/generate_avro/history_effects.sql +++ b/dags/queries/generate_avro/history_effects.sql @@ -7,11 +7,12 @@ options ( as ( select * - except(details, batch_id, batch_insert_ts, batch_run_date) + except (details, batch_id, batch_insert_ts, batch_run_date) , details.* - except(predicate) + except (predicate) from {project_id}.{dataset_id}.history_effects - where true + where + true and batch_run_date >= '{batch_run_date}' and batch_run_date < '{next_batch_run_date}' and closed_at >= '{batch_run_date}' diff --git a/dags/queries/generate_avro/history_ledgers.sql b/dags/queries/generate_avro/history_ledgers.sql index d216da5c..3ef04582 100644 --- a/dags/queries/generate_avro/history_ledgers.sql +++ b/dags/queries/generate_avro/history_ledgers.sql @@ -5,10 +5,12 @@ options ( , overwrite = true ) as ( - select * - except(batch_id, batch_insert_ts, batch_run_date) + select + * + except (batch_id, batch_insert_ts, batch_run_date) from {project_id}.{dataset_id}.history_ledgers - where true + where + true and closed_at >= '{batch_run_date}' and closed_at < '{next_batch_run_date}' order by closed_at asc diff --git a/dags/queries/generate_avro/history_operations.sql b/dags/queries/generate_avro/history_operations.sql index c365c1ab..dbe4258c 100644 --- a/dags/queries/generate_avro/history_operations.sql +++ b/dags/queries/generate_avro/history_operations.sql @@ -7,12 +7,13 @@ options ( as ( select * - except(details, details_json, batch_id, batch_insert_ts, batch_run_date) + except (details, details_json, batch_id, batch_insert_ts, batch_run_date) , details.* - except(claimants, type) + except (claimants, type) , details.type as soroban_operation_type from {project_id}.{dataset_id}.history_operations - where true + where + true and batch_run_date >= '{batch_run_date}' and batch_run_date < '{next_batch_run_date}' and closed_at >= '{batch_run_date}' diff --git a/dags/queries/generate_avro/history_trades.sql b/dags/queries/generate_avro/history_trades.sql index 809cd838..28941a1b 100644 --- a/dags/queries/generate_avro/history_trades.sql +++ b/dags/queries/generate_avro/history_trades.sql @@ -7,10 +7,11 @@ options ( as ( select * - except(ledger_closed_at, batch_id, batch_insert_ts, batch_run_date) + except (ledger_closed_at, batch_id, batch_insert_ts, batch_run_date) , ledger_closed_at as closed_at from {project_id}.{dataset_id}.history_trades - where true + where + true and ledger_closed_at >= '{batch_run_date}' and ledger_closed_at < '{next_batch_run_date}' order by closed_at asc diff --git a/dags/queries/generate_avro/history_transactions.sql b/dags/queries/generate_avro/history_transactions.sql index 03795ade..437e3243 100644 --- a/dags/queries/generate_avro/history_transactions.sql +++ b/dags/queries/generate_avro/history_transactions.sql @@ -5,10 +5,12 @@ options ( , overwrite = true ) as ( - select * - except(batch_id, batch_insert_ts, batch_run_date) + select + * + except (batch_id, batch_insert_ts, batch_run_date) from {project_id}.{dataset_id}.history_transactions - where true + where + true and batch_run_date >= '{batch_run_date}' and batch_run_date < '{next_batch_run_date}' and closed_at >= '{batch_run_date}' diff --git a/dags/queries/generate_avro/liquidity_pools.sql b/dags/queries/generate_avro/liquidity_pools.sql index b4dbdee0..c4b4c2b4 100644 --- a/dags/queries/generate_avro/liquidity_pools.sql +++ b/dags/queries/generate_avro/liquidity_pools.sql @@ -5,10 +5,12 @@ options ( , overwrite = true ) as ( - select * - except(batch_id, batch_insert_ts, batch_run_date) + select + * + except (batch_id, batch_insert_ts, batch_run_date) from {project_id}.{dataset_id}.liquidity_pools - where true + where + true and batch_run_date >= '{batch_run_date}' and batch_run_date < '{next_batch_run_date}' and closed_at >= '{batch_run_date}' diff --git a/dags/queries/generate_avro/offers.sql b/dags/queries/generate_avro/offers.sql index 31ef9e18..58c699fe 100644 --- a/dags/queries/generate_avro/offers.sql +++ b/dags/queries/generate_avro/offers.sql @@ -5,10 +5,12 @@ options ( , overwrite = true ) as ( - select * - except(batch_id, batch_insert_ts, batch_run_date) + select + * + except (batch_id, batch_insert_ts, batch_run_date) from {project_id}.{dataset_id}.offers - where true + where + true and batch_run_date >= '{batch_run_date}' and batch_run_date < '{next_batch_run_date}' and closed_at >= '{batch_run_date}' diff --git a/dags/queries/generate_avro/trust_lines.sql b/dags/queries/generate_avro/trust_lines.sql index 287a2236..fbdd9976 100644 --- a/dags/queries/generate_avro/trust_lines.sql +++ b/dags/queries/generate_avro/trust_lines.sql @@ -5,10 +5,12 @@ options ( , overwrite = true ) as ( - select * - except(batch_id, batch_insert_ts, batch_run_date) + select + * + except (batch_id, batch_insert_ts, batch_run_date) from {project_id}.{dataset_id}.trust_lines - where true + where + true and batch_run_date >= '{batch_run_date}' and batch_run_date < '{next_batch_run_date}' and closed_at >= '{batch_run_date}' diff --git a/dags/queries/generate_avro/ttl.sql b/dags/queries/generate_avro/ttl.sql index 919cdb96..80c7636a 100644 --- a/dags/queries/generate_avro/ttl.sql +++ b/dags/queries/generate_avro/ttl.sql @@ -5,10 +5,12 @@ options ( , overwrite = true ) as ( - select * - except(batch_id, batch_insert_ts, batch_run_date) + select + * + except (batch_id, batch_insert_ts, batch_run_date) from {project_id}.{dataset_id}.ttl - where true + where + true and closed_at >= '{batch_run_date}' and closed_at < '{next_batch_run_date}' order by closed_at asc From 9f9bda80b49c04692c755867d9358abc05c9f094 Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Mon, 7 Oct 2024 23:13:53 -0400 Subject: [PATCH 12/13] lint --- dags/generate_avro_files_dag.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dags/generate_avro_files_dag.py b/dags/generate_avro_files_dag.py index 37b6cd44..265fbb06 100644 --- a/dags/generate_avro_files_dag.py +++ b/dags/generate_avro_files_dag.py @@ -72,4 +72,3 @@ dummy_task >> avro_task wait_on_history_table >> avro_task wait_on_state_table >> avro_task - From 878dac3d4127efa46569f05ca6092961f63dfa10 Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Mon, 7 Oct 2024 23:36:21 -0400 Subject: [PATCH 13/13] Add comment --- dags/generate_avro_files_dag.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dags/generate_avro_files_dag.py b/dags/generate_avro_files_dag.py index 265fbb06..d13c27ef 100644 --- a/dags/generate_avro_files_dag.py +++ b/dags/generate_avro_files_dag.py @@ -42,6 +42,7 @@ ) wait_on_state_table = build_cross_deps(dag, "wait_on_state_table", "state_table_export") +# Add dummy_task so DAG generates the avro_tables loop and dependency graph correctly dummy_task = DummyOperator(task_id="dummy_task", dag=dag) # Generate AVRO files