diff --git a/macros/base/base_create_snowplow_events_this_run.sql b/macros/base/base_create_snowplow_events_this_run.sql index aee9571f..d61f5a4e 100644 --- a/macros/base/base_create_snowplow_events_this_run.sql +++ b/macros/base/base_create_snowplow_events_this_run.sql @@ -76,11 +76,29 @@ You may obtain a copy of the Snowplow Community License Version 1.0 at https://d 'start_tstamp', 'end_tstamp') %} + + {# Get all the session and user contexts extracted and ready to join later #} + {% set unique_session_identifiers = dict() %} {# need to avoid duplicate contexts when values come from the same one, so just use the first of that context #} + + {% if session_identifiers %} + {% for identifier in session_identifiers %} + {% if identifier['schema']|lower != 'atomic' and identifier['schema'] not in unique_session_identifiers %} + {% do unique_session_identifiers.update({identifier['schema']: identifier}) %} + {%- endif -%} + {% if identifier['schema'] in unique_session_identifiers.keys() %} + {% do exceptions.warn("Snowplow Warning: Duplicate context ( " ~ identifier['schema'] ~" ) detected for session identifiers, using first alias and prefix provided ( " ~ unique_session_identifiers[identifier['schema']] ~ " ) in base events this run.") %} + {% endif %} + {% endfor %} + {% endif %} + + {# check uniqueness of entity/sde names provided, warn those also in session identifiers #} {% if entities_or_sdes %} - -- check uniqueness of entity/sde names provided {% set ent_sde_names = [] %} {% for ent_or_sde in entities_or_sdes %} {% do ent_sde_names.append(ent_or_sde['name']) %} + {% if ent_or_sde['name'] in unique_session_identifiers.keys() %} + {% do exceptions.warn("Snowplow Warning: Context or SDE ( " ~ ent_or_sde['name'] ~ " ) used for session_identifier is being included, using alias and prefix from session_identifier ( " ~ unique_session_identifiers[ent_or_sde['name']] ~ " ).") %} + {% endif %} {% endfor %} {% if ent_sde_names | unique | list | length != entities_or_sdes | length %} {% do exceptions.raise_compiler_error("There are duplicate names in your provided `entities_or_sdes` list. Please correct this before proceeding.")%} @@ -93,14 +111,16 @@ You may obtain a copy of the Snowplow Community License Version 1.0 at https://d {% set events_this_run_query %} with - {% if session_identifiers -%} - {% for identifier in session_identifiers %} + {# Extract the session identifier contexts into CTEs #} + {% if unique_session_identifiers -%} + {% for identifier in unique_session_identifiers.values() %} {% if identifier['schema']|lower != 'atomic' %} {{ snowplow_utils.get_sde_or_context(snowplow_events_schema, identifier['schema'], lower_limit, upper_limit, identifier['prefix']) }}, {%- endif -%} {% endfor %} {% endif %} + {# Extract the entitity/sde contexts into CTEs UNLESS they are in the session already #} {%- if entities_or_sdes -%} {%- for ent_or_sde in entities_or_sdes -%} {%- set name = none -%} @@ -119,7 +139,9 @@ You may obtain a copy of the Snowplow Community License Version 1.0 at https://d {%- if ent_or_sde['single_entity'] and ent_or_sde['single_entity'] is boolean -%} {%- set single_entity = ent_or_sde['single_entity'] -%} {%- endif %} - {{ snowplow_utils.get_sde_or_context(snowplow_events_schema, name, lower_limit, upper_limit, prefix, single_entity) }}, + {% if ent_or_sde['name'] not in unique_session_identifiers.keys() %} {# Exclude any that we have already made above #} + {{ snowplow_utils.get_sde_or_context(snowplow_events_schema, name, lower_limit, upper_limit, prefix, single_entity) }}, + {% endif %} {% endfor -%} {%- endif %} @@ -131,7 +153,9 @@ You may obtain a copy of the Snowplow Community License Version 1.0 at https://d COALESCE( {% for identifier in session_identifiers %} {%- if identifier['schema']|lower != 'atomic' %} - {% if identifier['alias'] %}{{identifier['alias']}}{% else %}{{identifier['schema']}}{% endif %}.{% if identifier['prefix'] %}{{ identifier['prefix'] }}{% else %}{{ identifier['schema']}}{% endif %}_{{identifier['field']}} + {# Use the parsed version of the context to ensure we have the right alias and prefix #} + {% set uniq_iden = unique_session_identifiers[identifier['schema']] %} + {% if uniq_iden['alias'] %}{{uniq_iden['alias']}}{% else %}{{uniq_iden['schema']}}{% endif %}.{% if uniq_iden['prefix'] %}{{ uniq_iden['prefix'] }}{% else %}{{ uniq_iden['schema']}}{% endif %}_{{identifier['field']}} {%- else %} e.{{identifier['field']}} {%- endif -%} @@ -146,8 +170,8 @@ You may obtain a copy of the Snowplow Community License Version 1.0 at https://d {%- endif %} from {{ snowplow_events }} e - {% if session_identifiers|length > 0 %} - {% for identifier in session_identifiers %} + {% if unique_session_identifiers|length > 0 %} + {% for identifier in unique_session_identifiers.values() %} {%- if identifier['schema']|lower != 'atomic' -%} left join {{ identifier['schema'] }} {% if identifier['alias'] %}as {{ identifier['alias'] }}{% endif %} on e.event_id = {% if identifier['alias'] %}{{ identifier['alias']}}{% else %}{{ identifier['schema'] }}{% endif %}.{{identifier['prefix']}}__id and e.collector_tstamp = {% if identifier['alias'] %}{{ identifier['alias']}}{% else %}{{ identifier['schema'] }}{% endif %}.{{ identifier['prefix'] }}__tstamp {% endif -%} @@ -188,16 +212,20 @@ You may obtain a copy of the Snowplow Community License Version 1.0 at https://d {%- else -%} {%- do exceptions.raise_compiler_error("Need to specify the name of your Entity or SDE using the {'name'} attribute in a key-value map.") -%} {%- endif -%} - {%- if ent_or_sde['prefix'] -%} + {%- if ent_or_sde['prefix'] and name not in unique_session_identifiers.keys() -%} {%- set prefix = ent_or_sde['prefix'] -%} + {%- elif name in unique_session_identifiers.keys() and unique_session_identifiers.get(name, {}).get('prefix') -%} + {%- set prefix = unique_session_identifiers[name]['prefix'] -%} {%- else -%} {%- set prefix = name -%} {%- endif -%} {%- if ent_or_sde['single_entity'] and ent_or_sde['single_entity'] is boolean -%} {%- set single_entity = ent_or_sde['single_entity'] -%} {%- endif -%} - {%- if ent_or_sde['alias'] -%} + {%- if ent_or_sde['alias'] and name not in unique_session_identifiers.keys() -%} {%- set alias = ent_or_sde['alias'] -%} + {%- elif name in unique_session_identifiers.keys() and unique_session_identifiers.get(name, {}).get('alias') -%} + {%- set alias = unique_session_identifiers[name] -%} {%- endif %} left join {{name}} {% if alias -%} as {{ alias }} {%- endif %} on e.event_id = {% if alias -%} {{ alias }} {%- else -%}{{name}}{%- endif %}.{{prefix}}__id and e.collector_tstamp = {% if alias -%} {{ alias }} {%- else -%}{{name}}{%- endif %}.{{prefix}}__tstamp diff --git a/macros/base/base_create_snowplow_sessions_lifecycle_manifest.sql b/macros/base/base_create_snowplow_sessions_lifecycle_manifest.sql index be52fc43..af4925e2 100644 --- a/macros/base/base_create_snowplow_sessions_lifecycle_manifest.sql +++ b/macros/base/base_create_snowplow_sessions_lifecycle_manifest.sql @@ -146,21 +146,28 @@ You may obtain a copy of the Snowplow Community License Version 1.0 at https://d {% set sessions_lifecycle_manifest_query %} with + {# Get all the session and user contexts extracted and ready to join later #} + {% set unique_identifiers = dict() %} {# need to avoid duplicate contexts when values come from the same one, so just use the first of that context #} + {% if session_identifiers %} {% for identifier in session_identifiers %} - {% if identifier['schema']|lower != 'atomic' %} + {% if identifier['schema']|lower != 'atomic' and identifier['schema'] not in unique_identifiers %} {{ snowplow_utils.get_sde_or_context(snowplow_events_schema, identifier['schema'], lower_limit, upper_limit, identifier['prefix']) }}, + {% do unique_identifiers.update({identifier['schema']: identifier}) %} {%- endif -%} {% endfor %} {% endif %} {% if user_identifiers%} {% for identifier in user_identifiers %} - {% if identifier['schema']|lower != 'atomic' %} + {% if identifier['schema']|lower != 'atomic' and identifier['schema'] not in unique_identifiers %} {{ snowplow_utils.get_sde_or_context(snowplow_events_schema, identifier['schema'], lower_limit, upper_limit, identifier['prefix']) }}, + {% do unique_identifiers.update({identifier['schema']: identifier}) %} {%- endif -%} {% endfor %} {% endif %} + + {# Produce the core session and single user identifier for sessions with new events #} new_events_session_ids_init as ( select {% if session_sql %} @@ -169,7 +176,9 @@ You may obtain a copy of the Snowplow Community License Version 1.0 at https://d COALESCE( {% for identifier in session_identifiers %} {%- if identifier['schema']|lower != 'atomic' -%} - {% if identifier['alias'] %}{{identifier['alias']}}{% else %}{{identifier['schema']}}{% endif %}.{% if identifier['prefix'] %}{{ identifier['prefix'] }}{% else %}{{ identifier['schema']}}{% endif %}_{{identifier['field']}} + {# Use the parsed version of the context to ensure we have the right alias and prefix #} + {% set uniq_iden = unique_identifiers[identifier['schema']] %} + {% if uniq_iden['alias'] %}{{uniq_iden['alias']}}{% else %}{{uniq_iden['schema']}}{% endif %}.{% if uniq_iden['prefix'] %}{{ uniq_iden['prefix'] ~ '_' }}{% endif %}{{identifier['field']}} {%- else -%} e.{{identifier['field']}} {%- endif -%} @@ -187,7 +196,9 @@ You may obtain a copy of the Snowplow Community License Version 1.0 at https://d COALESCE( {% for identifier in user_identifiers %} {%- if identifier['schema']|lower != 'atomic' %} - {% if identifier['alias'] %}{{identifier['alias']}}{% else %}{{identifier['schema']}}{% endif %}.{% if identifier['prefix'] %}{{ identifier['prefix'] }}{% else %}{{ identifier['schema']}}{% endif %}_{{identifier['field']}} + {# Use the parsed version of the context to ensure we have the right alias and prefix #} + {% set uniq_iden = unique_identifiers[identifier['schema']] %} + {% if uniq_iden['alias'] %}{{uniq_iden['alias']}}{% else %}{{uniq_iden['schema']}}{% endif %}.{% if uniq_iden['prefix'] %}{{ uniq_iden['prefix'] ~ '_' }}{% endif %}{{identifier['field']}} {%- else %} e.{{identifier['field']}} {%- endif -%} @@ -203,17 +214,10 @@ You may obtain a copy of the Snowplow Community License Version 1.0 at https://d max({{ session_timestamp }}) as end_tstamp from {{ snowplow_events }} e - {% if session_identifiers|length > 0 %} - {% for identifier in session_identifiers %} + {% if unique_identifiers|length > 0 %} + {% for identifier in unique_identifiers.values() %} {%- if identifier['schema']|lower != 'atomic' -%} - left join {{ identifier['schema'] }} {% if identifier['alias'] %}as {{ identifier['alias'] }}{% endif %} on e.event_id = {% if identifier['alias'] %}{{ identifier['alias']}}{% else %}{{ identifier['schema'] }}{% endif %}.{{identifier['prefix']}}__id and e.collector_tstamp = {% if identifier['alias'] %}{{ identifier['alias']}}{% else %}{{ identifier['schema'] }}{% endif %}.{{ identifier['prefix'] }}__tstamp - {% endif -%} - {% endfor %} - {% endif %} - {% if session_identifiers|length > 0 %} - {% for identifier in user_identifiers %} - {%- if identifier['schema']|lower != 'atomic' -%} - left join {{ identifier['schema'] }} {% if identifier['alias'] %}as {{ identifier['alias'] }}{% endif %} on e.event_id = {% if identifier['alias'] %}{{ identifier['alias']}}{% else %}{{ identifier['schema'] }}{% endif %}.{{identifier['prefix']}}__id and e.collector_tstamp = {% if identifier['alias'] %}{{ identifier['alias']}}{% else %}{{ identifier['schema'] }}{% endif %}.{{ identifier['prefix'] }}__tstamp + left join {{ identifier['schema'] }} {% if identifier['alias'] %}as {{ identifier['alias'] }}{% endif %} on e.event_id = {% if identifier['alias'] %}{{ identifier['alias']}}{% else %}{{ identifier['schema'] }}{% endif %}.{% if identifier['prefix'] %}{{ identifier['prefix'] }}{% else %}{{ identifier['schema']}}{% endif %}__id and e.collector_tstamp = {% if identifier['alias'] %}{{ identifier['alias']}}{% else %}{{ identifier['schema'] }}{% endif %}.{% if identifier['prefix'] %}{{ identifier['prefix'] }}{% else %}{{ identifier['schema']}}{% endif %}__tstamp {% endif -%} {% endfor %} {% endif %} @@ -229,6 +233,8 @@ You may obtain a copy of the Snowplow Community License Version 1.0 at https://d {% endif %} group by 1 + + {# Exclude quarantined sessions #} ), new_events_session_ids as ( select * from new_events_session_ids_init e