Skip to content

Commit

Permalink
Incorporate package name into manifest table as optional argument
Browse files Browse the repository at this point in the history
  • Loading branch information
emielver committed Aug 16, 2023
1 parent 2e1cdb3 commit c2ee334
Showing 1 changed file with 26 additions and 11 deletions.
37 changes: 26 additions & 11 deletions macros/incremental_hooks/update_incremental_manifest_table.sql
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
{# Updates the incremental manifest table at the run end with the latest tstamp consumed per model #}
{% macro update_incremental_manifest_table(manifest_table, base_events_table, models) -%}
{% macro update_incremental_manifest_table(manifest_table, base_events_table, models, package_name=none) -%}

{{ return(adapter.dispatch('update_incremental_manifest_table', 'snowplow_utils')(manifest_table, base_events_table, models)) }}
{{ return(adapter.dispatch('update_incremental_manifest_table', 'snowplow_utils')(manifest_table, base_events_table, models, package_name)) }}

{% endmacro %}

{% macro default__update_incremental_manifest_table(manifest_table, base_events_table, models) -%}
{% macro default__update_incremental_manifest_table(manifest_table, base_events_table, models, package_name) -%}

{% if models %}

{% set last_success_query %}
select
b.model,
a.last_success

from
select
b.model,
a.last_success
{% if package_name %}
, '{{ package_name }}' as package
{% endif %}

from
(select max(collector_tstamp) as last_success from {{ base_events_table }}) a,
({% for model in models %} select '{{model}}' as model {%- if not loop.last %} union all {% endif %} {% endfor %}) b

Expand All @@ -24,6 +27,9 @@
merge into {{ manifest_table }} m
using ( {{ last_success_query }} ) s
on m.model = s.model
{% if package_name %}
and s.package = m.package
{% endif %}
when matched then
update set last_success = greatest(m.last_success, s.last_success)
when not matched then
Expand All @@ -32,12 +38,12 @@
{% if target.type == 'snowflake' %}
commit;
{% endif %}

{% endif %}

{%- endmacro %}

{% macro postgres__update_incremental_manifest_table(manifest_table, base_events_table, models) -%}
{% macro postgres__update_incremental_manifest_table(manifest_table, base_events_table, models, package_name) -%}

{% if models %}

Expand All @@ -48,12 +54,18 @@
select
a.model,
greatest(a.last_success, b.last_success) as last_success
{% if package_name %}
, a.package
{% endif %}

from (

select
model,
last_success
{% if package_name %}
, package
{% endif %}

from
(select max(collector_tstamp) as last_success from {{ base_events_table }}) as ls,
Expand All @@ -64,6 +76,9 @@
) a
left join {{ manifest_table }} b
on a.model = b.model
{% if package_name %}
and a.package = b.package
{% endif %}
);

delete from {{ manifest_table }} where model in (select model from snowplow_models_last_success);
Expand All @@ -72,7 +87,7 @@
end transaction;

drop table snowplow_models_last_success;

{% endif %}

{%- endmacro %}

0 comments on commit c2ee334

Please sign in to comment.