Skip to content

Commit

Permalink
Fixes + check for dupes contexts in postgres/redshift
Browse files Browse the repository at this point in the history
  • Loading branch information
rlh1994 committed Sep 12, 2023
1 parent dad8ccd commit 940db25
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 23 deletions.
46 changes: 37 additions & 9 deletions macros/base/base_create_snowplow_events_this_run.sql
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,29 @@ You may obtain a copy of the Snowplow Community License Version 1.0 at https://d
'start_tstamp',
'end_tstamp') %}


{# Get all the session and user contexts extracted and ready to join later #}
{% set unique_session_identifiers = dict() %} {# need to avoid duplicate contexts when values come from the same one, so just use the first of that context #}

{% if session_identifiers %}
{% for identifier in session_identifiers %}
{% if identifier['schema']|lower != 'atomic' and identifier['schema'] not in unique_session_identifiers %}
{% do unique_session_identifiers.update({identifier['schema']: identifier}) %}
{%- endif -%}
{% if identifier['schema'] in unique_session_identifiers.keys() %}
{% do exceptions.warn("Snowplow Warning: Duplicate context ( " ~ identifier['schema'] ~" ) detected for session identifiers, using first alias and prefix provided ( " ~ unique_session_identifiers[identifier['schema']] ~ " ) in base events this run.") %}
{% endif %}
{% endfor %}
{% endif %}

{# check uniqueness of entity/sde names provided, warn those also in session identifiers #}
{% if entities_or_sdes %}
-- check uniqueness of entity/sde names provided
{% set ent_sde_names = [] %}
{% for ent_or_sde in entities_or_sdes %}
{% do ent_sde_names.append(ent_or_sde['name']) %}
{% if ent_or_sde['name'] in unique_session_identifiers.keys() %}
{% do exceptions.warn("Snowplow Warning: Context or SDE ( " ~ ent_or_sde['name'] ~ " ) used for session_identifier is being included, using alias and prefix from session_identifier ( " ~ unique_session_identifiers[ent_or_sde['name']] ~ " ).") %}
{% endif %}
{% endfor %}
{% if ent_sde_names | unique | list | length != entities_or_sdes | length %}
{% do exceptions.raise_compiler_error("There are duplicate names in your provided `entities_or_sdes` list. Please correct this before proceeding.")%}
Expand All @@ -93,14 +111,16 @@ You may obtain a copy of the Snowplow Community License Version 1.0 at https://d
{% set events_this_run_query %}
with

{% if session_identifiers -%}
{% for identifier in session_identifiers %}
{# Extract the session identifier contexts into CTEs #}
{% if unique_session_identifiers -%}
{% for identifier in unique_session_identifiers.values() %}
{% if identifier['schema']|lower != 'atomic' %}
{{ snowplow_utils.get_sde_or_context(snowplow_events_schema, identifier['schema'], lower_limit, upper_limit, identifier['prefix']) }},
{%- endif -%}
{% endfor %}
{% endif %}

{# Extract the entitity/sde contexts into CTEs UNLESS they are in the session already #}
{%- if entities_or_sdes -%}
{%- for ent_or_sde in entities_or_sdes -%}
{%- set name = none -%}
Expand All @@ -119,7 +139,9 @@ You may obtain a copy of the Snowplow Community License Version 1.0 at https://d
{%- if ent_or_sde['single_entity'] and ent_or_sde['single_entity'] is boolean -%}
{%- set single_entity = ent_or_sde['single_entity'] -%}
{%- endif %}
{{ snowplow_utils.get_sde_or_context(snowplow_events_schema, name, lower_limit, upper_limit, prefix, single_entity) }},
{% if ent_or_sde['name'] not in unique_session_identifiers.keys() %} {# Exclude any that we have already made above #}
{{ snowplow_utils.get_sde_or_context(snowplow_events_schema, name, lower_limit, upper_limit, prefix, single_entity) }},
{% endif %}
{% endfor -%}
{%- endif %}

Expand All @@ -131,7 +153,9 @@ You may obtain a copy of the Snowplow Community License Version 1.0 at https://d
COALESCE(
{% for identifier in session_identifiers %}
{%- if identifier['schema']|lower != 'atomic' %}
{% if identifier['alias'] %}{{identifier['alias']}}{% else %}{{identifier['schema']}}{% endif %}.{% if identifier['prefix'] %}{{ identifier['prefix'] }}{% else %}{{ identifier['schema']}}{% endif %}_{{identifier['field']}}
{# Use the parsed version of the context to ensure we have the right alias and prefix #}
{% set uniq_iden = unique_session_identifiers[identifier['schema']] %}
{% if uniq_iden['alias'] %}{{uniq_iden['alias']}}{% else %}{{uniq_iden['schema']}}{% endif %}.{% if uniq_iden['prefix'] %}{{ uniq_iden['prefix'] }}{% else %}{{ uniq_iden['schema']}}{% endif %}_{{identifier['field']}}
{%- else %}
e.{{identifier['field']}}
{%- endif -%}
Expand All @@ -146,8 +170,8 @@ You may obtain a copy of the Snowplow Community License Version 1.0 at https://d
{%- endif %}

from {{ snowplow_events }} e
{% if session_identifiers|length > 0 %}
{% for identifier in session_identifiers %}
{% if unique_session_identifiers|length > 0 %}
{% for identifier in unique_session_identifiers.values() %}
{%- if identifier['schema']|lower != 'atomic' -%}
left join {{ identifier['schema'] }} {% if identifier['alias'] %}as {{ identifier['alias'] }}{% endif %} on e.event_id = {% if identifier['alias'] %}{{ identifier['alias']}}{% else %}{{ identifier['schema'] }}{% endif %}.{{identifier['prefix']}}__id and e.collector_tstamp = {% if identifier['alias'] %}{{ identifier['alias']}}{% else %}{{ identifier['schema'] }}{% endif %}.{{ identifier['prefix'] }}__tstamp
{% endif -%}
Expand Down Expand Up @@ -188,16 +212,20 @@ You may obtain a copy of the Snowplow Community License Version 1.0 at https://d
{%- else -%}
{%- do exceptions.raise_compiler_error("Need to specify the name of your Entity or SDE using the {'name'} attribute in a key-value map.") -%}
{%- endif -%}
{%- if ent_or_sde['prefix'] -%}
{%- if ent_or_sde['prefix'] and name not in unique_session_identifiers.keys() -%}
{%- set prefix = ent_or_sde['prefix'] -%}
{%- elif name in unique_session_identifiers.keys() and unique_session_identifiers.get(name, {}).get('prefix') -%}
{%- set prefix = unique_session_identifiers[name]['prefix'] -%}
{%- else -%}
{%- set prefix = name -%}
{%- endif -%}
{%- if ent_or_sde['single_entity'] and ent_or_sde['single_entity'] is boolean -%}
{%- set single_entity = ent_or_sde['single_entity'] -%}
{%- endif -%}
{%- if ent_or_sde['alias'] -%}
{%- if ent_or_sde['alias'] and name not in unique_session_identifiers.keys() -%}
{%- set alias = ent_or_sde['alias'] -%}
{%- elif name in unique_session_identifiers.keys() and unique_session_identifiers.get(name, {}).get('alias') -%}
{%- set alias = unique_session_identifiers[name] -%}
{%- endif %}
left join {{name}} {% if alias -%} as {{ alias }} {%- endif %} on e.event_id = {% if alias -%} {{ alias }} {%- else -%}{{name}}{%- endif %}.{{prefix}}__id
and e.collector_tstamp = {% if alias -%} {{ alias }} {%- else -%}{{name}}{%- endif %}.{{prefix}}__tstamp
Expand Down
34 changes: 20 additions & 14 deletions macros/base/base_create_snowplow_sessions_lifecycle_manifest.sql
Original file line number Diff line number Diff line change
Expand Up @@ -146,21 +146,28 @@ You may obtain a copy of the Snowplow Community License Version 1.0 at https://d
{% set sessions_lifecycle_manifest_query %}

with
{# Get all the session and user contexts extracted and ready to join later #}
{% set unique_identifiers = dict() %} {# need to avoid duplicate contexts when values come from the same one, so just use the first of that context #}

{% if session_identifiers %}
{% for identifier in session_identifiers %}
{% if identifier['schema']|lower != 'atomic' %}
{% if identifier['schema']|lower != 'atomic' and identifier['schema'] not in unique_identifiers %}
{{ snowplow_utils.get_sde_or_context(snowplow_events_schema, identifier['schema'], lower_limit, upper_limit, identifier['prefix']) }},
{% do unique_identifiers.update({identifier['schema']: identifier}) %}
{%- endif -%}
{% endfor %}
{% endif %}

{% if user_identifiers%}
{% for identifier in user_identifiers %}
{% if identifier['schema']|lower != 'atomic' %}
{% if identifier['schema']|lower != 'atomic' and identifier['schema'] not in unique_identifiers %}
{{ snowplow_utils.get_sde_or_context(snowplow_events_schema, identifier['schema'], lower_limit, upper_limit, identifier['prefix']) }},
{% do unique_identifiers.update({identifier['schema']: identifier}) %}
{%- endif -%}
{% endfor %}
{% endif %}

{# Produce the core session and single user identifier for sessions with new events #}
new_events_session_ids_init as (
select
{% if session_sql %}
Expand All @@ -169,7 +176,9 @@ You may obtain a copy of the Snowplow Community License Version 1.0 at https://d
COALESCE(
{% for identifier in session_identifiers %}
{%- if identifier['schema']|lower != 'atomic' -%}
{% if identifier['alias'] %}{{identifier['alias']}}{% else %}{{identifier['schema']}}{% endif %}.{% if identifier['prefix'] %}{{ identifier['prefix'] }}{% else %}{{ identifier['schema']}}{% endif %}_{{identifier['field']}}
{# Use the parsed version of the context to ensure we have the right alias and prefix #}
{% set uniq_iden = unique_identifiers[identifier['schema']] %}
{% if uniq_iden['alias'] %}{{uniq_iden['alias']}}{% else %}{{uniq_iden['schema']}}{% endif %}.{% if uniq_iden['prefix'] %}{{ uniq_iden['prefix'] ~ '_' }}{% endif %}{{identifier['field']}}
{%- else -%}
e.{{identifier['field']}}
{%- endif -%}
Expand All @@ -187,7 +196,9 @@ You may obtain a copy of the Snowplow Community License Version 1.0 at https://d
COALESCE(
{% for identifier in user_identifiers %}
{%- if identifier['schema']|lower != 'atomic' %}
{% if identifier['alias'] %}{{identifier['alias']}}{% else %}{{identifier['schema']}}{% endif %}.{% if identifier['prefix'] %}{{ identifier['prefix'] }}{% else %}{{ identifier['schema']}}{% endif %}_{{identifier['field']}}
{# Use the parsed version of the context to ensure we have the right alias and prefix #}
{% set uniq_iden = unique_identifiers[identifier['schema']] %}
{% if uniq_iden['alias'] %}{{uniq_iden['alias']}}{% else %}{{uniq_iden['schema']}}{% endif %}.{% if uniq_iden['prefix'] %}{{ uniq_iden['prefix'] ~ '_' }}{% endif %}{{identifier['field']}}
{%- else %}
e.{{identifier['field']}}
{%- endif -%}
Expand All @@ -203,17 +214,10 @@ You may obtain a copy of the Snowplow Community License Version 1.0 at https://d
max({{ session_timestamp }}) as end_tstamp

from {{ snowplow_events }} e
{% if session_identifiers|length > 0 %}
{% for identifier in session_identifiers %}
{% if unique_identifiers|length > 0 %}
{% for identifier in unique_identifiers.values() %}
{%- if identifier['schema']|lower != 'atomic' -%}
left join {{ identifier['schema'] }} {% if identifier['alias'] %}as {{ identifier['alias'] }}{% endif %} on e.event_id = {% if identifier['alias'] %}{{ identifier['alias']}}{% else %}{{ identifier['schema'] }}{% endif %}.{{identifier['prefix']}}__id and e.collector_tstamp = {% if identifier['alias'] %}{{ identifier['alias']}}{% else %}{{ identifier['schema'] }}{% endif %}.{{ identifier['prefix'] }}__tstamp
{% endif -%}
{% endfor %}
{% endif %}
{% if session_identifiers|length > 0 %}
{% for identifier in user_identifiers %}
{%- if identifier['schema']|lower != 'atomic' -%}
left join {{ identifier['schema'] }} {% if identifier['alias'] %}as {{ identifier['alias'] }}{% endif %} on e.event_id = {% if identifier['alias'] %}{{ identifier['alias']}}{% else %}{{ identifier['schema'] }}{% endif %}.{{identifier['prefix']}}__id and e.collector_tstamp = {% if identifier['alias'] %}{{ identifier['alias']}}{% else %}{{ identifier['schema'] }}{% endif %}.{{ identifier['prefix'] }}__tstamp
left join {{ identifier['schema'] }} {% if identifier['alias'] %}as {{ identifier['alias'] }}{% endif %} on e.event_id = {% if identifier['alias'] %}{{ identifier['alias']}}{% else %}{{ identifier['schema'] }}{% endif %}.{% if identifier['prefix'] %}{{ identifier['prefix'] }}{% else %}{{ identifier['schema']}}{% endif %}__id and e.collector_tstamp = {% if identifier['alias'] %}{{ identifier['alias']}}{% else %}{{ identifier['schema'] }}{% endif %}.{% if identifier['prefix'] %}{{ identifier['prefix'] }}{% else %}{{ identifier['schema']}}{% endif %}__tstamp
{% endif -%}
{% endfor %}
{% endif %}
Expand All @@ -229,6 +233,8 @@ You may obtain a copy of the Snowplow Community License Version 1.0 at https://d
{% endif %}

group by 1

{# Exclude quarantined sessions #}
), new_events_session_ids as (
select *
from new_events_session_ids_init e
Expand Down

0 comments on commit 940db25

Please sign in to comment.