Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generate avro files from bq tables #507

Merged
merged 13 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion airflow_variables_dev.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"api_key_path": "/home/airflow/gcs/data/apiKey.json",
"avro_gcs_bucket": "test_dune_bucket_sdf",
"bq_dataset": "test_crypto_stellar_internal",
"bq_dataset_audit_log": "audit_log",
"bq_project": "test-hubble-319619",
Expand Down Expand Up @@ -339,6 +340,7 @@
"build_export_task": 840,
"build_gcs_to_bq_task": 960,
"build_time_task": 480,
"build_bq_generate_avro_job": 600,
"cleanup_metadata": 60,
"create_sandbox": 2400,
"current_state": 720,
Expand Down Expand Up @@ -373,7 +375,8 @@
"build_delete_data_task": 180,
"build_export_task": 420,
"build_gcs_to_bq_task": 300,
"build_time_task": 480
"build_time_task": 480,
"build_bq_generate_avro_job": 600
},
"txmeta_datastore_path": "sdf-ledger-close-meta/ledgers",
"use_captive_core": "False",
Expand Down
67 changes: 67 additions & 0 deletions dags/generate_avro_files_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from datetime import datetime

from airflow import DAG
from kubernetes.client import models as k8s
from stellar_etl_airflow.build_bq_generate_avro_job_task import (
build_bq_generate_avro_job,
)
from stellar_etl_airflow.build_cross_dependency_task import build_cross_deps
from stellar_etl_airflow.default import (
alert_sla_miss,
get_default_dag_args,
init_sentry,
)

init_sentry()

dag = DAG(
"generate_avro_files",
default_args=get_default_dag_args(),
start_date=datetime(2024, 10, 1, 0, 0),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AVRO files for < 10/1/24 are already generated

Copy link
Contributor Author

@chowbao chowbao Oct 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm we can. I wonder if that would be confusing for Dune though. Like 10/1 wouldn't be the full month but instead there would be 10/1 and then another folder for 10/8, 10/9, 10/10, etc...

Any concerns about catching the last 7 days up?

Nope

description="This DAG generates AVRO files from BQ tables",
schedule_interval="0 * * * *", # Runs every hour
user_defined_filters={
"container_resources": lambda s: k8s.V1ResourceRequirements(requests=s),
},
max_active_runs=5,
catchup=True,
sla_miss_callback=alert_sla_miss,
)

public_project = "{{ var.value.public_project }}"
public_dataset = "{{ var.value.public_dataset }}"
gcs_bucket = "{{ var.value.avro_gcs_bucket }}"

# Wait on ingestion DAGs
wait_on_history_table = build_cross_deps(
dag, "wait_on_ledgers_txs", "history_table_export"
)
wait_on_state_table = build_cross_deps(dag, "wait_on_state_table", "state_table_export")

# Generate AVRO files
avro_tables = [
"accounts",
"contract_data",
"history_contract_events",
"history_ledgers",
"history_trades",
"history_transactions",
"liquidity_pools",
"offers",
"trust_lines",
"ttl",
# "history_effects",
# "history_operations",
]

for table in avro_tables:
task = build_bq_generate_avro_job(
dag=dag,
project=public_project,
dataset=public_dataset,
table=table,
gcs_bucket=gcs_bucket,
)

wait_on_history_table >> task
wait_on_state_table >> task
17 changes: 17 additions & 0 deletions dags/queries/generate_avro/accounts.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
export data
options (
uri = '{uri}',
format = 'avro',
overwrite=true
)
as (
select
*
except(sequence_ledger, batch_id, batch_insert_ts, batch_run_date),
sequence_ledger as account_sequence_last_modified_ledger
from {project_id}.{dataset_id}.accounts
where true
and closed_at >= '{batch_run_date}'
and closed_at < '{next_batch_run_date}'
order by closed_at asc
)
16 changes: 16 additions & 0 deletions dags/queries/generate_avro/contract_data.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
export data
options (
uri = '{uri}',
format = 'avro',
overwrite=true
)
as (
select
*
except(batch_id, batch_insert_ts, batch_run_date)
from {project_id}.{dataset_id}.contract_data
where true
and closed_at >= '{batch_run_date}'
and closed_at < '{next_batch_run_date}'
order by closed_at asc
)
16 changes: 16 additions & 0 deletions dags/queries/generate_avro/history_contract_events.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
export data
options (
uri = '{uri}',
format = 'avro',
overwrite=true
)
as (
select
*
except(batch_id, batch_insert_ts, batch_run_date)
from {project_id}.{dataset_id}.history_contract_events
where true
and closed_at >= '{batch_run_date}'
and closed_at < '{next_batch_run_date}'
order by closed_at asc
)
18 changes: 18 additions & 0 deletions dags/queries/generate_avro/history_effects.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
export data
options (
uri = '{uri}',
format = 'avro',
overwrite=true
)
as (
select
*
except(details, batch_id, batch_insert_ts, batch_run_date),
details.*
except(predicate)
from {project_id}.{dataset_id}.history_effects
where true
and closed_at >= '{batch_run_date}'
and closed_at < '{next_batch_run_date}'
order by closed_at asc
)
16 changes: 16 additions & 0 deletions dags/queries/generate_avro/history_ledgers.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
export data
options (
uri = '{uri}',
format = 'avro',
overwrite=true
)
as (
select
*
except(batch_id, batch_insert_ts, batch_run_date)
from {project_id}.{dataset_id}.history_ledgers
where true
and closed_at >= '{batch_run_date}'
and closed_at < '{next_batch_run_date}'
order by closed_at asc
)
19 changes: 19 additions & 0 deletions dags/queries/generate_avro/history_operations.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
export data
options (
uri = '{uri}',
format = 'avro',
overwrite=true
)
as (
select
*
except(details, details_json, batch_id, batch_insert_ts, batch_run_date),
details.*
except(claimants, type),
details.type as details_type
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this soroban_operation_type? Do you think we need to rename this column to be clearer? Might be worth discussing with Andre

Copy link
Contributor Author

@chowbao chowbao Oct 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this soroban_operation_type?

Yes it is

Do you think we need to rename this column to be clearer?

Yeah we can. I'll regenerate the files and mention it to Andre

from {project_id}.{dataset_id}.history_operations
where true
and closed_at >= '{batch_run_date}'
and closed_at < '{next_batch_run_date}'
order by closed_at asc
)
17 changes: 17 additions & 0 deletions dags/queries/generate_avro/history_trades.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
export data
options (
uri = '{uri}',
format = 'avro',
overwrite=true
)
as (
select
*
except(ledger_closed_at, batch_id, batch_insert_ts, batch_run_date),
ledger_closed_at as closed_at
from {project_id}.{dataset_id}.history_trades
where true
and closed_at >= '{batch_run_date}'
and closed_at < '{next_batch_run_date}'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

closed_at is not on the table, i don't think this will run

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to ledger_closed_at

order by closed_at asc
)
16 changes: 16 additions & 0 deletions dags/queries/generate_avro/history_transactions.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
export data
options (
uri = '{uri}',
format = 'avro',
overwrite=true
)
as (
select
*
except(batch_id, batch_insert_ts, batch_run_date)
from {project_id}.{dataset_id}.history_transactions
where true
and closed_at >= '{batch_run_date}'
and closed_at < '{next_batch_run_date}'
order by closed_at asc
)
16 changes: 16 additions & 0 deletions dags/queries/generate_avro/liquidity_pools.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
export data
options (
uri = '{uri}',
format = 'avro',
overwrite=true
)
as (
select
*
except(batch_id, batch_insert_ts, batch_run_date)
from {project_id}.{dataset_id}.liquidity_pools
where true
and closed_at >= '{batch_run_date}'
and closed_at < '{next_batch_run_date}'
order by closed_at asc
)
16 changes: 16 additions & 0 deletions dags/queries/generate_avro/offers.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
export data
options (
uri = '{uri}',
format = 'avro',
overwrite=true
)
as (
select
*
except(batch_id, batch_insert_ts, batch_run_date)
from {project_id}.{dataset_id}.offers
where true
and closed_at >= '{batch_run_date}'
and closed_at < '{next_batch_run_date}'
order by closed_at asc
)
16 changes: 16 additions & 0 deletions dags/queries/generate_avro/trust_lines.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
export data
options (
uri = '{uri}',
format = 'avro',
overwrite=true
)
as (
select
*
except(batch_id, batch_insert_ts, batch_run_date)
from {project_id}.{dataset_id}.trust_lines
where true
and closed_at >= '{batch_run_date}'
and closed_at < '{next_batch_run_date}'
order by closed_at asc
)
16 changes: 16 additions & 0 deletions dags/queries/generate_avro/ttl.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
export data
options (
uri = '{uri}',
format = 'avro',
overwrite=true
)
as (
select
*
except(batch_id, batch_insert_ts, batch_run_date)
from {project_id}.{dataset_id}.ttl
where true
and closed_at >= '{batch_run_date}'
and closed_at < '{next_batch_run_date}'
order by closed_at asc
)
66 changes: 66 additions & 0 deletions dags/stellar_etl_airflow/build_bq_generate_avro_job_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import os
from datetime import timedelta

from airflow.models import Variable
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from stellar_etl_airflow import macros
from stellar_etl_airflow.build_bq_insert_job_task import file_to_string
from stellar_etl_airflow.default import alert_after_max_retries


def get_query_filepath(query_name):
root = os.path.dirname(os.path.dirname(__file__))
return os.path.join(root, f"queries/generate_avro/{query_name}.sql")


def build_bq_generate_avro_job(
dag,
project,
dataset,
table,
gcs_bucket,
):
query_path = get_query_filepath(table)
query = file_to_string(query_path)
batch_run_date = "{{ batch_run_date_as_datetime_string(dag, data_interval_start) }}"
prev_batch_run_date = (
"{{ batch_run_date_as_datetime_string(dag, prev_data_interval_start_success) }}"
)
next_batch_run_date = (
"{{ batch_run_date_as_datetime_string(dag, data_interval_end) }}"
)
uri_datetime = (
"{{ batch_run_date_as_directory_string(dag, data_interval_start) }}"
)
uri = f"gs://{gcs_bucket}/avro/{table}/{uri_datetime}/*.avro"
sql_params = {
"project_id": project,
"dataset_id": dataset,
"batch_run_date": batch_run_date,
"prev_batch_run_date": prev_batch_run_date,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this used in the SQL query? I didn't see the parameter anywhere in the SQLs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prev_batch_run_date is not. I'll remove it

"next_batch_run_date": next_batch_run_date,
"uri": uri,
}
query = query.format(**sql_params)
configuration = {
"query": {
"query": query,
"useLegacySql": False,
}
}

return BigQueryInsertJobOperator(
task_id=f"generate_avro_{table}",
execution_timeout=timedelta(
seconds=Variable.get("task_timeout", deserialize_json=True)[
build_bq_generate_avro_job.__name__
]
),
on_failure_callback=alert_after_max_retries,
configuration=configuration,
sla=timedelta(
seconds=Variable.get("task_sla", deserialize_json=True)[
build_bq_generate_avro_job.__name__
]
),
)
4 changes: 4 additions & 0 deletions dags/stellar_etl_airflow/macros.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,7 @@ def batch_run_date_as_datetime_string(dag, start_time):

def get_batch_id():
return "{}-{}".format("{{ run_id }}", "{{ params.alias }}")

def batch_run_date_as_directory_string(dag, start_time):
time = subtract_data_interval(dag, start_time)
return f"{time.year}/{time.month}/{time.day}/{time.hour}:{time.minute}:{time.second}"
Loading