From c2ee33436592c75ec9deef3758c1267d0fc7fedc Mon Sep 17 00:00:00 2001 From: Emiel Date: Wed, 16 Aug 2023 18:17:10 +0200 Subject: [PATCH] Incorporate package name into manifest table as optional argument --- .../update_incremental_manifest_table.sql | 37 +++++++++++++------ 1 file changed, 26 insertions(+), 11 deletions(-) diff --git a/macros/incremental_hooks/update_incremental_manifest_table.sql b/macros/incremental_hooks/update_incremental_manifest_table.sql index 876848f5..9b19e269 100644 --- a/macros/incremental_hooks/update_incremental_manifest_table.sql +++ b/macros/incremental_hooks/update_incremental_manifest_table.sql @@ -1,20 +1,23 @@ {# Updates the incremental manifest table at the run end with the latest tstamp consumed per model #} -{% macro update_incremental_manifest_table(manifest_table, base_events_table, models) -%} +{% macro update_incremental_manifest_table(manifest_table, base_events_table, models, package_name=none) -%} - {{ return(adapter.dispatch('update_incremental_manifest_table', 'snowplow_utils')(manifest_table, base_events_table, models)) }} + {{ return(adapter.dispatch('update_incremental_manifest_table', 'snowplow_utils')(manifest_table, base_events_table, models, package_name)) }} {% endmacro %} -{% macro default__update_incremental_manifest_table(manifest_table, base_events_table, models) -%} +{% macro default__update_incremental_manifest_table(manifest_table, base_events_table, models, package_name) -%} {% if models %} {% set last_success_query %} - select - b.model, - a.last_success - - from + select + b.model, + a.last_success + {% if package_name %} + , '{{ package_name }}' as package + {% endif %} + + from (select max(collector_tstamp) as last_success from {{ base_events_table }}) a, ({% for model in models %} select '{{model}}' as model {%- if not loop.last %} union all {% endif %} {% endfor %}) b @@ -24,6 +27,9 @@ merge into {{ manifest_table }} m using ( {{ last_success_query }} ) s on m.model = s.model + {% if package_name %} + and s.package = m.package + {% endif %} when matched then update set last_success = greatest(m.last_success, s.last_success) when not matched then @@ -32,12 +38,12 @@ {% if target.type == 'snowflake' %} commit; {% endif %} - + {% endif %} {%- endmacro %} -{% macro postgres__update_incremental_manifest_table(manifest_table, base_events_table, models) -%} +{% macro postgres__update_incremental_manifest_table(manifest_table, base_events_table, models, package_name) -%} {% if models %} @@ -48,12 +54,18 @@ select a.model, greatest(a.last_success, b.last_success) as last_success + {% if package_name %} + , a.package + {% endif %} from ( select model, last_success + {% if package_name %} + , package + {% endif %} from (select max(collector_tstamp) as last_success from {{ base_events_table }}) as ls, @@ -64,6 +76,9 @@ ) a left join {{ manifest_table }} b on a.model = b.model + {% if package_name %} + and a.package = b.package + {% endif %} ); delete from {{ manifest_table }} where model in (select model from snowplow_models_last_success); @@ -72,7 +87,7 @@ end transaction; drop table snowplow_models_last_success; - + {% endif %} {%- endmacro %}