Skip to content

Commit

Permalink
wip: configure columns w. project vars (#2)
Browse files Browse the repository at this point in the history
  • Loading branch information
tnightengale authored Feb 2, 2023
1 parent b92aecb commit 6b1f354
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 66 deletions.
7 changes: 7 additions & 0 deletions integration_tests/.example.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CI_SNOWFLAKE_DBT_USER=
CI_SNOWFLAKE_DBT_WAREHOUSE=
CI_SNOWFLAKE_DBT_PASS=
CI_SNOWFLAKE_DBT_ACCOUNT=
CI_SNOWFLAKE_DBT_DATABASE=
CI_SNOWFLAKE_DBT_ROLE=
CI_SNOWFKALE_DBT_ROLE=
22 changes: 22 additions & 0 deletions integration_tests/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,28 @@ models:

vars:
running_intergration_tests: true
activity_schema_required_column_aliases:
ts: ts
customer: customer
activity: activity
activity_occurrence: activity_occurrence
activity_repeated_at: activity_repeated_at
activity_schema_primary_activity_columns:
- customer
- ts
- activity_occurrence
- activity_repeated_at
- activity_id
- activity
- anonymous_customer_id
- feature_json
- revenue_impact
- link
activity_schema_appended_activities_columns:
- name: feature_json
aggregation: min
- name: ts
aggregation: min

seeds:
dbt_activity_schema_integration_tests:
Expand Down
71 changes: 61 additions & 10 deletions macros/dataclasses/columns.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,68 @@

{% macro default__columns() %}

{% set activity_schema_required_column_aliases = var(
"activity_schema_required_column_aliases",
dict(
ts = "ts",
customer = "customer",
activity = "activity",
activity_occurrence = "activity_occurrence",
activity_repeated_at = "activity_repeated_at"
)
) %}

{% set required_columns = [
"ts",
"customer",
"activity",
"activity_occurrence",
"activity_repeated_at"
] %}

{% set required_columns_provided = activity_schema_required_column_aliases.keys() | list %}
{% if required_columns_provided != required_columns %}
{% set message %}
"Project variable 'activity_schema_required_column_aliases' must contain the following keys: " {{ required_columns }},
"Got: " {{ required_columns_provided }}
{% endset %}
{{ exceptions.raise_compiler_error(message)}}
{% endif %}

{% set activity_schema_primary_activity_columns = var(
"activity_schema_primary_activity_columns",
[
"activity_id",
"activity",
"anonymous_customer_id",
"feature_json",
"revenue_impact",
"link"
]
) %}

{% set activity_schema_appended_activities_columns = var(
"activity_schema_appended_activities_columns",
[
dict(
name = "feature_json",
aggregation = "min"
),
dict(
name = "ts",
aggregation = "min"
)
]
) %}

{% do return(namespace(
activity_id = var("activity_stream_activity_id_col_name", "activity_id"),
ts = var("activity_stream_ts_col_name", "ts"),
customer = var("activity_stream_customer_col_name", "customer"),
activity = var("activity_stream_activity_col_name", "activity"),
anonymous_customer_id = var("activity_stream_anonymous_customer_id_col_name", "anonymous_customer_id"),
feature_json = var("activity_stream_feature_json_col_name", "feature_json"),
revenue_impact = var("activity_stream_revenue_impact_col_name", "revenue_impact"),
link = var("activity_stream_link_col_name", "link"),
activity_occurrence = var("activity_stream_activity_occurrence_col_name", "activity_occurrence"),
activity_repeated_at = var("activity_stream_activity_repeated_at_col_name", "activity_repeated_at")
ts = activity_schema_required_column_aliases["ts"],
customer = activity_schema_required_column_aliases["customer"],
activity = activity_schema_required_column_aliases["activity"],
activity_occurrence = activity_schema_required_column_aliases["activity_occurrence"],
activity_repeated_at = activity_schema_required_column_aliases["activity_repeated_at"],
primary_activity = activity_schema_primary_activity_columns,
appended_activities = activity_schema_appended_activities_columns
)) %}

{% endmacro %}
91 changes: 35 additions & 56 deletions macros/dataset.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
) %}

{# Create a derived dataset using self-joins from an activity stream model.

params:

activity_stream_ref: ref()
Expand All @@ -25,7 +25,7 @@ params:

primary_activity: primary_activity (dataclass)
The primary activity of the derived dataset.

appended_activities: List[append_activity (dataclass)]
The list of appended activities to self-join to the primary activity.
#}
Expand All @@ -34,38 +34,33 @@ params:
{% set stream = dbt_activity_schema.globals().stream %}
{% set alias = dbt_activity_schema.alias %}

with
with

join_appended_activities as (
select
stream.{{- columns.activity_id }},
stream.{{- columns.customer }},
stream.{{- columns.ts }},
stream.{{- columns.activity }},
stream.{{- columns.anonymous_customer_id }},
stream.{{- columns.feature_json }},
stream.{{- columns.revenue_impact }},
stream.{{- columns.link }},
stream.{{- columns.activity_occurrence }},
stream.{{- columns.activity_repeated_at }},

{% for activity in appended_activities %}{% set i = loop.index %}

stream_{{ i }}.{{ columns.ts }} as {{ alias(activity, columns.ts)}},
stream_{{ i }}.{{ columns.feature_json }} as {{ alias(activity, columns.feature_json)}}

{%- if not loop.last -%},{% endif %}{% endfor %}
{% for col in columns.primary_activity %}
stream.{{- col }},
{% endfor %}

{% for activity in appended_activities %}{% set i = loop.index %}{% set last_outer_loop = loop.last %}
{% for col in columns.appended_activities %}

stream_{{ i }}.{{ col.name }} as {{ alias(activity, col.name)}}{% if not (last_outer_loop and loop.last) %},{% endif %}

{% endfor %}
{% endfor %}

from {{ activity_stream_ref }} as stream

{% for activity in appended_activities %}{% set i = loop.index %}

left join {{ activity_stream_ref }} as stream_{{ i }}
on (
stream_{{ i }}.{{ columns.customer }} = stream.{{ columns.customer }}
and stream_{{ i -}}.{{- columns.activity }} = {{ dbt.string_literal(activity.name) }}
and {{ activity.relationship.join_clause(i) }}
)
left join {{ activity_stream_ref }} as stream_{{ i }}
on (
stream_{{ i }}.{{ columns.customer }} = stream.{{ columns.customer }}
and stream_{{ i -}}.{{- columns.activity }} = {{ dbt.string_literal(activity.name) }}
and {{ activity.relationship.join_clause(i) }}
)

{% endfor %}

where stream.{{ columns.activity }} = '{{ primary_activity.name }}'
Expand All @@ -74,43 +69,27 @@ join_appended_activities as (

aggregate_appended_activities as (
select
{{ columns.activity_id }},
{{ columns.customer }},
{{ columns.ts }},
{{ columns.activity }},
{{ columns.anonymous_customer_id }},
{{ columns.feature_json }},
{{ columns.revenue_impact }},
{{ columns.link }},
{{ columns.activity_occurrence }},
{{ columns.activity_repeated_at }},

{% for activity in appended_activities %}{% set i = loop.index %}

min(
{{- alias(activity, columns.feature_json) -}}
) as {{- alias(activity, columns.feature_json) -}},
{% for col in columns.primary_activity %}
{{- col }},
{% endfor %}

{% for activity in appended_activities %}{% set i = loop.index %}{% set last_outer_loop = loop.last %}
{% for col in columns.appended_activities %}

{{ activity.relationship.aggregation_func }}(
{{- alias(activity, columns.ts) -}}
) as {{ alias(activity, columns.ts) }}
{{- alias(activity, col.name) -}}
) as {{ alias(activity, col.name)}}{% if not (last_outer_loop and loop.last) %},{% endif %}

{%- if not loop.last -%},{% endif %}{% endfor %}
{% endfor %}
{% endfor %}

from join_appended_activities
group by
{{ columns.activity_id }},
{{ columns.customer }},
{{ columns.ts }},
{{ columns.activity }},
{{ columns.anonymous_customer_id }},
{{ columns.feature_json }},
{{ columns.revenue_impact }},
{{ columns.link }},
{{ columns.activity_occurrence }},
{{ columns.activity_repeated_at }}
{% for col in columns.primary_activity %}
{{- col }}{% if not loop.last %},{% endif %}
{% endfor %}
)

select * from aggregate_appended_activities
select * from aggregate_appended_activities

{% endmacro %}

0 comments on commit 6b1f354

Please sign in to comment.