Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support for Snowflake #310

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,30 @@ seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]

#profile: 'bq_ga4' # bq
profile: 'sf_ga4' # sf

target-path: "target" # directory which will store compiled SQL files
clean-targets: # directories to be removed by `dbt clean`
- "target"
- "dbt_packages"

vars:
start_date: "20230306" # Defines the earliest GA4 _TABLE_SUFFIX to load into base events model.
#source_project: "analytics" # bq
source_project: "analytics" # sf
source_schema: "schema"
property_ids: [id]
frequency: "daily"
conversion_events: ['sign_up','video_complete', 'video_start', 'goalkeepers_download_report']
static_incremental_days: 3
page_view_custom_parameters:
- name: "article_published_date"
value_type: "string_value"
- name: "days_since_published_date"
value_type: "string_value"
ga4_incremental_strategy: "{% if target.type == 'bigquery' %}insert_overwrite{% elif target.type == 'snowflake' %}delete+insert{% endif %}"

models:
ga4:
+materialized: view
Expand Down
7 changes: 6 additions & 1 deletion macros/base_select.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@
{% endmacro %}

{% macro default__base_select_source() %}
parse_date('%Y%m%d',event_date) as event_date_dt

{%- if target.type == 'bigquery' %}
parse_date('%Y%m%d', event_date) as event_date_dt
{%- elif target.type == 'snowflake' %}
event_date as event_date_dt
{%- endif %}
, event_timestamp
, event_name
, event_params
Expand Down
71 changes: 53 additions & 18 deletions models/staging/base/base_ga4__events.sql
Original file line number Diff line number Diff line change
@@ -1,21 +1,37 @@
{% set partitions_to_replace = ['current_date'] %}
{% for i in range(var('static_incremental_days')) %}
{% set partitions_to_replace = partitions_to_replace.append('date_sub(current_date, interval ' + (i+1)|string + ' day)') %}
{% set partitions_to_replace = partitions_to_replace.append('current_date - ' + (i+1)|string) %}
{% endfor %}

{{
config(
pre_hook="{{ ga4.combine_property_data() }}" if var('combined_dataset', false) else "",
materialized = 'incremental',
incremental_strategy = 'insert_overwrite',
partition_by={
"field": "event_date_dt",
"data_type": "date",
},
partitions = partitions_to_replace,
cluster_by=['event_name']
)
}}

{%- if target.type == 'bigquery' -%}

{{
config(
pre_hook = "{{ ga4.combine_property_data() }}" if var('combined_dataset', false) else "",
materialized = "incremental",
incremental_strategy = "insert_overwrite",
partition_by = {
"field": "event_date_dt",
"data_type": "date",
},
partitions = partitions_to_replace,
cluster_by = ["event_name"]
)
}}

{%- elif target.type == 'snowflake' -%}

{{
config(
pre_hook = "{{ ga4.combine_property_data() }}" if var('combined_dataset', false) else "",
materialized = "incremental",
incremental_strategy = "delete+insert",
)
}}

{%- endif -%}


with source as (
select
Expand All @@ -25,12 +41,31 @@ with source as (
{% if is_incremental() %}
and parse_date('%Y%m%d', left(replace(_table_suffix, 'intraday_', ''), 8)) in ({{ partitions_to_replace | join(',') }})
{% endif %}
),
renamed as (
and cast(left(replace(_table_suffix, 'intraday_', ''), 8) as int64) = 20240305
)


{%- if target.type == 'bigquery' -%}

select
{{ ga4.base_select_renamed() }}
from source
qualify row_number() over(partition by event_date_dt, stream_id, user_pseudo_id, session_id, event_name, event_timestamp, to_json_string(ARRAY(SELECT params FROM UNNEST(event_params) AS params ORDER BY key))) = 1

{%- elif target.type == 'snowflake' -%}

, renamed as (

select distinct
renamed.*
, array_agg(e.value) within group(order by e.value) over(partition by e.seq) as event_params_constructed
from renamed
, lateral flatten(input=>event_params) as e

)

select * from renamed
qualify row_number() over(partition by event_date_dt, stream_id, user_pseudo_id, session_id, event_name, event_timestamp, to_json_string(ARRAY(SELECT params FROM UNNEST(event_params) AS params ORDER BY key))) = 1
select *
from renamed
qualify row_number() over(partition by event_date_dt, stream_id, user_pseudo_id, session_id, event_name, event_timestamp, event_params_constructed) = 1

{%- endif -%}
17 changes: 13 additions & 4 deletions models/staging/src_ga4.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,19 @@ sources:
{%- else -%} {{var('source_project')}}
{%- endif -%}
schema: | # Source from combined property dataset if set, otherwise source from original GA4 property
{%- if var('combined_dataset', false) != false -%} {{var('combined_dataset')}}
{%- else -%} analytics_{{var('property_ids')[0]}}
{%- if var('combined_dataset', false) != false -%}
{{ var('combined_dataset') }}
{%- elif var('source_schema', false) != false -%}
{{ var('source_schema') }}
{%- else -%}
analytics_{{ var('property_ids')[0] }}
{%- endif -%}
tables:
- name: events
identifier: events_* # Scan across all sharded event tables. Use the 'start_date' variable to limit this scan
description: Main events table exported by GA4. Sharded by date.
description: Main events table exported by GA4. Sharded by date.
identifier: | # Scan across all sharded event tables. Use the 'start_date' variable to limit this scan
{%- if target.type == 'bigquery' -%}
events_*
{%- else -%}
analytics_{{ var('property_ids')[0] }}__view
{%- endif -%}
4 changes: 4 additions & 0 deletions package-lock.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
packages:
- package: dbt-labs/dbt_utils
version: 1.1.1
sha1_hash: 8113526d71661bd1c3569bc0d5d9d9b58fbfc710
Loading