Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes + check for dupes contexts in postgres/redshift #139

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion integration_tests/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ vars:
snowplow__event_limits: 'snowplow_base_new_event_limits_actual'
snowplow__incremental_manifest: 'snowplow_incremental_manifest_actual'
snowplow__entities_or_sdes: null
snowplow__custom_entities_or_sdes: [{"name" : "contexts_com_snowplowanalytics_custom_entity_1_0_0", "prefix": "custom", "single_entity": true}]
snowplow__custom_entities_or_sdes: [{"schema" : "contexts_com_snowplowanalytics_custom_entity_1_0_0", "prefix": "custom", "single_entity": true}]
snowplow__bigquery_custom_sql: 'cast(contexts_com_snowplowanalytics_custom_entity_1_0_0[safe_offset(0)].contents as STRING) as custom_contents'
snowplow__databricks_custom_sql: 'contexts_com_snowplowanalytics_custom_entity_1_0_0[0].contents as custom_contents'
snowplow__snowflake_custom_sql: 'contexts_com_snowplowanalytics_custom_entity_1_0_0[0].contents::TEXT as custom_contents'
Expand Down
66 changes: 49 additions & 17 deletions macros/base/base_create_snowplow_events_this_run.sql
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,36 @@ You may obtain a copy of the Snowplow Community License Version 1.0 at https://d
'start_tstamp',
'end_tstamp') %}


{# Get all the session and user contexts extracted and ready to join later #}
{% set unique_session_identifiers = dict() %} {# need to avoid duplicate contexts when values come from the same one, so just use the first of that context #}

{% if session_identifiers %}
{% for identifier in session_identifiers %}
{% if identifier['schema']|lower != 'atomic' and identifier['schema'] not in unique_session_identifiers %}
{% do unique_session_identifiers.update({identifier['schema']: identifier}) %}
{%- endif -%}
{% if identifier['schema'] in unique_session_identifiers.keys() %}
{% if identifier['alias'] != unique_session_identifiers[identifier['schema']]['alias'] or identifier['prefix'] != unique_session_identifiers[identifier['schema']]['prefix'] %}
{% do exceptions.warn("Snowplow Warning: Duplicate context ( " ~ identifier['schema'] ~" ) detected for session identifiers, using first alias and prefix provided ( " ~ unique_session_identifiers[identifier['schema']] ~ " ) in base events this run.") %}
{% endif %}
{% endif %}
{% endfor %}
{% endif %}

{# check uniqueness of entity/sde names provided, warn those also in session identifiers #}
{% if entities_or_sdes %}
-- check uniqueness of entity/sde names provided
{% set ent_sde_names = [] %}
{% for ent_or_sde in entities_or_sdes %}
{% do ent_sde_names.append(ent_or_sde['name']) %}
{% do ent_sde_names.append(ent_or_sde['schema']) %}
{% if ent_or_sde['schema'] in unique_session_identifiers.keys() %}
{% if ent_or_sde['alias'] != unique_session_identifiers[ent_or_sde['schema']]['alias'] or ent_or_sde['prefix'] != unique_session_identifiers[ent_or_sde['schema']]['prefix'] %}
{% do exceptions.warn("Snowplow Warning: Context or SDE ( " ~ ent_or_sde['schema'] ~ " ) used for session_identifier is being included, using alias and prefix from session_identifier ( " ~ unique_session_identifiers[ent_or_sde['schema']] ~ " ).") %}
{% endif %}
{% endif %}
{% endfor %}
{% if ent_sde_names | unique | list | length != entities_or_sdes | length %}
{% do exceptions.raise_compiler_error("There are duplicate names in your provided `entities_or_sdes` list. Please correct this before proceeding.")%}
{% do exceptions.raise_compiler_error("There are duplicate schema names in your provided `entities_or_sdes` list. Please correct this before proceeding.")%}
{% endif %}
{% endif %}

Expand All @@ -93,23 +115,25 @@ You may obtain a copy of the Snowplow Community License Version 1.0 at https://d
{% set events_this_run_query %}
with

{% if session_identifiers -%}
{% for identifier in session_identifiers %}
{# Extract the session identifier contexts into CTEs #}
{% if unique_session_identifiers -%}
{% for identifier in unique_session_identifiers.values() %}
{% if identifier['schema']|lower != 'atomic' %}
{{ snowplow_utils.get_sde_or_context(snowplow_events_schema, identifier['schema'], lower_limit, upper_limit, identifier['prefix']) }},
{%- endif -%}
{% endfor %}
{% endif %}

{# Extract the entitity/sde contexts into CTEs UNLESS they are in the session already #}
{%- if entities_or_sdes -%}
{%- for ent_or_sde in entities_or_sdes -%}
{%- set name = none -%}
{%- set prefix = none -%}
{%- set single_entity = true -%}
{%- if ent_or_sde['name'] -%}
{%- set name = ent_or_sde['name'] -%}
{%- if ent_or_sde['schema'] -%}
{%- set name = ent_or_sde['schema'] -%}
{%- else -%}
{%- do exceptions.raise_compiler_error("Need to specify the name of your Entity or SDE using the {'name'} attribute in a key-value map.") -%}
{%- do exceptions.raise_compiler_error("Need to specify the schema name of your Entity or SDE using the {'schema'} attribute in a key-value map.") -%}
{%- endif -%}
{%- if ent_or_sde['prefix'] -%}
{%- set prefix = ent_or_sde['prefix'] -%}
Expand All @@ -119,7 +143,9 @@ You may obtain a copy of the Snowplow Community License Version 1.0 at https://d
{%- if ent_or_sde['single_entity'] and ent_or_sde['single_entity'] is boolean -%}
{%- set single_entity = ent_or_sde['single_entity'] -%}
{%- endif %}
{{ snowplow_utils.get_sde_or_context(snowplow_events_schema, name, lower_limit, upper_limit, prefix, single_entity) }},
{% if ent_or_sde['schema'] not in unique_session_identifiers.keys() %} {# Exclude any that we have already made above #}
{{ snowplow_utils.get_sde_or_context(snowplow_events_schema, name, lower_limit, upper_limit, prefix, single_entity) }},
{% endif %}
{% endfor -%}
{%- endif %}

Expand All @@ -131,7 +157,9 @@ You may obtain a copy of the Snowplow Community License Version 1.0 at https://d
COALESCE(
{% for identifier in session_identifiers %}
{%- if identifier['schema']|lower != 'atomic' %}
{% if identifier['alias'] %}{{identifier['alias']}}{% else %}{{identifier['schema']}}{% endif %}.{% if identifier['prefix'] %}{{ identifier['prefix'] }}{% else %}{{ identifier['schema']}}{% endif %}_{{identifier['field']}}
{# Use the parsed version of the context to ensure we have the right alias and prefix #}
{% set uniq_iden = unique_session_identifiers[identifier['schema']] %}
{% if uniq_iden['alias'] %}{{uniq_iden['alias']}}{% else %}{{uniq_iden['schema']}}{% endif %}.{% if uniq_iden['prefix'] %}{{ uniq_iden['prefix'] }}{% else %}{{ uniq_iden['schema']}}{% endif %}_{{identifier['field']}}
{%- else %}
e.{{identifier['field']}}
{%- endif -%}
Expand All @@ -146,8 +174,8 @@ You may obtain a copy of the Snowplow Community License Version 1.0 at https://d
{%- endif %}

from {{ snowplow_events }} e
{% if session_identifiers|length > 0 %}
{% for identifier in session_identifiers %}
{% if unique_session_identifiers|length > 0 %}
{% for identifier in unique_session_identifiers.values() %}
{%- if identifier['schema']|lower != 'atomic' -%}
left join {{ identifier['schema'] }} {% if identifier['alias'] %}as {{ identifier['alias'] }}{% endif %} on e.event_id = {% if identifier['alias'] %}{{ identifier['alias']}}{% else %}{{ identifier['schema'] }}{% endif %}.{{identifier['prefix']}}__id and e.collector_tstamp = {% if identifier['alias'] %}{{ identifier['alias']}}{% else %}{{ identifier['schema'] }}{% endif %}.{{ identifier['prefix'] }}__tstamp
{% endif -%}
Expand Down Expand Up @@ -183,21 +211,25 @@ You may obtain a copy of the Snowplow Community License Version 1.0 at https://d
{%- set prefix = none -%}
{%- set single_entity = true -%}
{%- set alias = none -%}
{%- if ent_or_sde['name'] -%}
{%- set name = ent_or_sde['name'] -%}
{%- if ent_or_sde['schema'] -%}
{%- set name = ent_or_sde['schema'] -%}
{%- else -%}
{%- do exceptions.raise_compiler_error("Need to specify the name of your Entity or SDE using the {'name'} attribute in a key-value map.") -%}
{%- do exceptions.raise_compiler_error("Need to specify the schema name of your Entity or SDE using the {'schema'} attribute in a key-value map.") -%}
{%- endif -%}
{%- if ent_or_sde['prefix'] -%}
{%- if ent_or_sde['prefix'] and name not in unique_session_identifiers.keys() -%}
{%- set prefix = ent_or_sde['prefix'] -%}
{%- elif name in unique_session_identifiers.keys() and unique_session_identifiers.get(name, {}).get('prefix') -%}
{%- set prefix = unique_session_identifiers[name]['prefix'] -%}
{%- else -%}
{%- set prefix = name -%}
{%- endif -%}
{%- if ent_or_sde['single_entity'] and ent_or_sde['single_entity'] is boolean -%}
{%- set single_entity = ent_or_sde['single_entity'] -%}
{%- endif -%}
{%- if ent_or_sde['alias'] -%}
{%- if ent_or_sde['alias'] and name not in unique_session_identifiers.keys() -%}
{%- set alias = ent_or_sde['alias'] -%}
{%- elif name in unique_session_identifiers.keys() and unique_session_identifiers.get(name, {}).get('alias') -%}
{%- set alias = unique_session_identifiers[name] -%}
{%- endif %}
left join {{name}} {% if alias -%} as {{ alias }} {%- endif %} on e.event_id = {% if alias -%} {{ alias }} {%- else -%}{{name}}{%- endif %}.{{prefix}}__id
and e.collector_tstamp = {% if alias -%} {{ alias }} {%- else -%}{{name}}{%- endif %}.{{prefix}}__tstamp
Expand Down
34 changes: 20 additions & 14 deletions macros/base/base_create_snowplow_sessions_lifecycle_manifest.sql
Original file line number Diff line number Diff line change
Expand Up @@ -146,21 +146,28 @@ You may obtain a copy of the Snowplow Community License Version 1.0 at https://d
{% set sessions_lifecycle_manifest_query %}

with
{# Get all the session and user contexts extracted and ready to join later #}
{% set unique_identifiers = dict() %} {# need to avoid duplicate contexts when values come from the same one, so just use the first of that context #}

{% if session_identifiers %}
{% for identifier in session_identifiers %}
{% if identifier['schema']|lower != 'atomic' %}
{% if identifier['schema']|lower != 'atomic' and identifier['schema'] not in unique_identifiers %}
{{ snowplow_utils.get_sde_or_context(snowplow_events_schema, identifier['schema'], lower_limit, upper_limit, identifier['prefix']) }},
{% do unique_identifiers.update({identifier['schema']: identifier}) %}
{%- endif -%}
{% endfor %}
{% endif %}

{% if user_identifiers%}
{% for identifier in user_identifiers %}
{% if identifier['schema']|lower != 'atomic' %}
{% if identifier['schema']|lower != 'atomic' and identifier['schema'] not in unique_identifiers %}
{{ snowplow_utils.get_sde_or_context(snowplow_events_schema, identifier['schema'], lower_limit, upper_limit, identifier['prefix']) }},
{% do unique_identifiers.update({identifier['schema']: identifier}) %}
{%- endif -%}
{% endfor %}
{% endif %}

{# Produce the core session and single user identifier for sessions with new events #}
new_events_session_ids_init as (
select
{% if session_sql %}
Expand All @@ -169,7 +176,9 @@ You may obtain a copy of the Snowplow Community License Version 1.0 at https://d
COALESCE(
{% for identifier in session_identifiers %}
{%- if identifier['schema']|lower != 'atomic' -%}
{% if identifier['alias'] %}{{identifier['alias']}}{% else %}{{identifier['schema']}}{% endif %}.{% if identifier['prefix'] %}{{ identifier['prefix'] }}{% else %}{{ identifier['schema']}}{% endif %}_{{identifier['field']}}
{# Use the parsed version of the context to ensure we have the right alias and prefix #}
{% set uniq_iden = unique_identifiers[identifier['schema']] %}
{% if uniq_iden['alias'] %}{{uniq_iden['alias']}}{% else %}{{uniq_iden['schema']}}{% endif %}.{% if uniq_iden['prefix'] %}{{ uniq_iden['prefix'] ~ '_' }}{% endif %}{{identifier['field']}}
{%- else -%}
e.{{identifier['field']}}
{%- endif -%}
Expand All @@ -187,7 +196,9 @@ You may obtain a copy of the Snowplow Community License Version 1.0 at https://d
COALESCE(
{% for identifier in user_identifiers %}
{%- if identifier['schema']|lower != 'atomic' %}
{% if identifier['alias'] %}{{identifier['alias']}}{% else %}{{identifier['schema']}}{% endif %}.{% if identifier['prefix'] %}{{ identifier['prefix'] }}{% else %}{{ identifier['schema']}}{% endif %}_{{identifier['field']}}
{# Use the parsed version of the context to ensure we have the right alias and prefix #}
{% set uniq_iden = unique_identifiers[identifier['schema']] %}
{% if uniq_iden['alias'] %}{{uniq_iden['alias']}}{% else %}{{uniq_iden['schema']}}{% endif %}.{% if uniq_iden['prefix'] %}{{ uniq_iden['prefix'] ~ '_' }}{% endif %}{{identifier['field']}}
{%- else %}
e.{{identifier['field']}}
{%- endif -%}
Expand All @@ -203,17 +214,10 @@ You may obtain a copy of the Snowplow Community License Version 1.0 at https://d
max({{ session_timestamp }}) as end_tstamp

from {{ snowplow_events }} e
{% if session_identifiers|length > 0 %}
{% for identifier in session_identifiers %}
{% if unique_identifiers|length > 0 %}
{% for identifier in unique_identifiers.values() %}
{%- if identifier['schema']|lower != 'atomic' -%}
left join {{ identifier['schema'] }} {% if identifier['alias'] %}as {{ identifier['alias'] }}{% endif %} on e.event_id = {% if identifier['alias'] %}{{ identifier['alias']}}{% else %}{{ identifier['schema'] }}{% endif %}.{{identifier['prefix']}}__id and e.collector_tstamp = {% if identifier['alias'] %}{{ identifier['alias']}}{% else %}{{ identifier['schema'] }}{% endif %}.{{ identifier['prefix'] }}__tstamp
{% endif -%}
{% endfor %}
{% endif %}
{% if session_identifiers|length > 0 %}
{% for identifier in user_identifiers %}
{%- if identifier['schema']|lower != 'atomic' -%}
left join {{ identifier['schema'] }} {% if identifier['alias'] %}as {{ identifier['alias'] }}{% endif %} on e.event_id = {% if identifier['alias'] %}{{ identifier['alias']}}{% else %}{{ identifier['schema'] }}{% endif %}.{{identifier['prefix']}}__id and e.collector_tstamp = {% if identifier['alias'] %}{{ identifier['alias']}}{% else %}{{ identifier['schema'] }}{% endif %}.{{ identifier['prefix'] }}__tstamp
left join {{ identifier['schema'] }} {% if identifier['alias'] %}as {{ identifier['alias'] }}{% endif %} on e.event_id = {% if identifier['alias'] %}{{ identifier['alias']}}{% else %}{{ identifier['schema'] }}{% endif %}.{% if identifier['prefix'] %}{{ identifier['prefix'] }}{% else %}{{ identifier['schema']}}{% endif %}__id and e.collector_tstamp = {% if identifier['alias'] %}{{ identifier['alias']}}{% else %}{{ identifier['schema'] }}{% endif %}.{% if identifier['prefix'] %}{{ identifier['prefix'] }}{% else %}{{ identifier['schema']}}{% endif %}__tstamp
{% endif -%}
{% endfor %}
{% endif %}
Expand All @@ -229,6 +233,8 @@ You may obtain a copy of the Snowplow Community License Version 1.0 at https://d
{% endif %}

group by 1

{# Exclude quarantined sessions #}
), new_events_session_ids as (
select *
from new_events_session_ids_init e
Expand Down