Skip to content

Commit

Permalink
Ele 1484 source freshness invocation (#1165)
Browse files Browse the repository at this point in the history
* fix typo

* add metadata

* validate invocation id

* renaming and refactor

* remove falsy condition
  • Loading branch information
NoyaArie authored Sep 19, 2023
1 parent 439d61c commit c9f9996
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{% macro can_upload_source_freshness(invocation_id, days_back=14) %}
{% set counter_query %}
with invocations as (
select invocation_id
from {{ ref("elementary", "dbt_source_freshness_results") }}
where {{ elementary.edr_datediff(elementary.edr_cast_as_timestamp('generated_at'), elementary.edr_current_timestamp(), 'day') }} < {{ days_back }}
)
select count(*) as count
from invocations
where invocation_id = {{ elementary.edr_quote(invocation_id) }}
{% endset %}

{% set records_count = elementary.result_value(counter_query) %}

{% if records_count == 0 %}
{% do return(true) %}
{% endif %}
{% do return(none) %}
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{% macro upload_source_freshness(results) %}
{% set source_freshness_results_dicts = fromjson(results) %}
{% set source_freshness_results = fromjson(results) %}
{% set source_freshness_results_relation = ref('dbt_source_freshness_results') %}
{% do elementary.upload_artifacts_to_table(source_freshness_results_relation, source_freshness_results_dicts, elementary.flatten_source_freshness, append=True, should_commit=true) %}
{% do elementary.upload_artifacts_to_table(source_freshness_results_relation, source_freshness_results, elementary.flatten_source_freshness, append=True, should_commit=true) %}
{% endmacro %}
30 changes: 25 additions & 5 deletions elementary/operations/upload_source_freshness.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,22 @@ def run(self):
raise click.ClickException(
"Path to dbt project is missing. Please run the command with `--project-dir <DBT_PROJECT_DIR>`."
)
results = self.get_results()
self.upload_results(results)
sources_file_contents = self.get_sources_file_contents()
results = sources_file_contents["results"]
metadata = sources_file_contents["metadata"]
self.upload_results(results, metadata)
click.echo("Uploaded source freshness results successfully.")

def get_results(self) -> dict:
def get_sources_file_contents(self) -> dict:
source_path = self.get_target_path() / "sources.json"
if not source_path.exists():
raise click.ClickException(
f"Could not find sources.json at {source_path}. "
"Please run `dbt source freshness` before running this command."
)
return json.loads(source_path.read_text())["results"]
return json.loads(source_path.read_text())

def upload_results(self, results: dict):
def upload_results(self, results: dict, metadata: dict):
dbt_runner = DbtRunner(
dbt_project_utils.PATH,
self.config.profiles_dir,
Expand All @@ -42,13 +44,31 @@ def upload_results(self, results: dict):
if not dbt_project_utils.is_dbt_package_up_to_date():
dbt_runner.deps()

invocation_id = metadata.get("invocation_id")
if not invocation_id:
raise click.ClickException("No invocation id found in sources.json.")

response = dbt_runner.run_operation(
"elementary_cli.can_upload_source_freshness",
macro_args={"invocation_id": invocation_id},
quiet=True,
)
if not response:
raise click.ClickException(
f"Source freshness for invocation id {invocation_id} were already uploaded."
)

chunk_size = 100
chunk_list = list(range(0, len(results), chunk_size))
upload_with_progress_bar = alive_it(
chunk_list, title="Uploading source freshness results"
)
for chunk in upload_with_progress_bar:
results_segment = results[chunk : chunk + chunk_size]

for result in results_segment:
result["metadata"] = metadata

dbt_runner.run_operation(
"elementary_cli.upload_source_freshness",
macro_args={"results": json.dumps(results_segment)},
Expand Down

0 comments on commit c9f9996

Please sign in to comment.