diff --git a/integration_tests/.example.env b/integration_tests/.example.env new file mode 100644 index 0000000..2140d77 --- /dev/null +++ b/integration_tests/.example.env @@ -0,0 +1,7 @@ +CI_SNOWFLAKE_DBT_USER= +CI_SNOWFLAKE_DBT_WAREHOUSE= +CI_SNOWFLAKE_DBT_PASS= +CI_SNOWFLAKE_DBT_ACCOUNT= +CI_SNOWFLAKE_DBT_DATABASE= +CI_SNOWFLAKE_DBT_ROLE= +CI_SNOWFKALE_DBT_ROLE= diff --git a/integration_tests/dbt_project.yml b/integration_tests/dbt_project.yml index b29bc3e..3e25add 100644 --- a/integration_tests/dbt_project.yml +++ b/integration_tests/dbt_project.yml @@ -20,6 +20,28 @@ models: vars: running_intergration_tests: true + activity_schema_required_column_aliases: + ts: ts + customer: customer + activity: activity + activity_occurrence: activity_occurrence + activity_repeated_at: activity_repeated_at + activity_schema_primary_activity_columns: + - customer + - ts + - activity_occurrence + - activity_repeated_at + - activity_id + - activity + - anonymous_customer_id + - feature_json + - revenue_impact + - link + activity_schema_appended_activities_columns: + - name: feature_json + aggregation: min + - name: ts + aggregation: min seeds: dbt_activity_schema_integration_tests: diff --git a/macros/dataclasses/columns.sql b/macros/dataclasses/columns.sql index e38dcba..3e6da5c 100644 --- a/macros/dataclasses/columns.sql +++ b/macros/dataclasses/columns.sql @@ -4,17 +4,68 @@ {% macro default__columns() %} +{% set activity_schema_required_column_aliases = var( + "activity_schema_required_column_aliases", + dict( + ts = "ts", + customer = "customer", + activity = "activity", + activity_occurrence = "activity_occurrence", + activity_repeated_at = "activity_repeated_at" + ) +) %} + +{% set required_columns = [ + "ts", + "customer", + "activity", + "activity_occurrence", + "activity_repeated_at" +] %} + +{% set required_columns_provided = activity_schema_required_column_aliases.keys() | list %} +{% if required_columns_provided != required_columns %} + {% set message %} + "Project variable 'activity_schema_required_column_aliases' must contain the following keys: " {{ required_columns }}, + "Got: " {{ required_columns_provided }} + {% endset %} + {{ exceptions.raise_compiler_error(message)}} +{% endif %} + +{% set activity_schema_primary_activity_columns = var( + "activity_schema_primary_activity_columns", + [ + "activity_id", + "activity", + "anonymous_customer_id", + "feature_json", + "revenue_impact", + "link" + ] +) %} + +{% set activity_schema_appended_activities_columns = var( + "activity_schema_appended_activities_columns", + [ + dict( + name = "feature_json", + aggregation = "min" + ), + dict( + name = "ts", + aggregation = "min" + ) + ] +) %} + {% do return(namespace( - activity_id = var("activity_stream_activity_id_col_name", "activity_id"), - ts = var("activity_stream_ts_col_name", "ts"), - customer = var("activity_stream_customer_col_name", "customer"), - activity = var("activity_stream_activity_col_name", "activity"), - anonymous_customer_id = var("activity_stream_anonymous_customer_id_col_name", "anonymous_customer_id"), - feature_json = var("activity_stream_feature_json_col_name", "feature_json"), - revenue_impact = var("activity_stream_revenue_impact_col_name", "revenue_impact"), - link = var("activity_stream_link_col_name", "link"), - activity_occurrence = var("activity_stream_activity_occurrence_col_name", "activity_occurrence"), - activity_repeated_at = var("activity_stream_activity_repeated_at_col_name", "activity_repeated_at") + ts = activity_schema_required_column_aliases["ts"], + customer = activity_schema_required_column_aliases["customer"], + activity = activity_schema_required_column_aliases["activity"], + activity_occurrence = activity_schema_required_column_aliases["activity_occurrence"], + activity_repeated_at = activity_schema_required_column_aliases["activity_repeated_at"], + primary_activity = activity_schema_primary_activity_columns, + appended_activities = activity_schema_appended_activities_columns )) %} {% endmacro %} diff --git a/macros/dataset.sql b/macros/dataset.sql index 10ea241..82f1072 100644 --- a/macros/dataset.sql +++ b/macros/dataset.sql @@ -15,7 +15,7 @@ ) %} {# Create a derived dataset using self-joins from an activity stream model. - + params: activity_stream_ref: ref() @@ -25,7 +25,7 @@ params: primary_activity: primary_activity (dataclass) The primary activity of the derived dataset. - + appended_activities: List[append_activity (dataclass)] The list of appended activities to self-join to the primary activity. #} @@ -34,38 +34,33 @@ params: {% set stream = dbt_activity_schema.globals().stream %} {% set alias = dbt_activity_schema.alias %} -with +with join_appended_activities as ( select - stream.{{- columns.activity_id }}, - stream.{{- columns.customer }}, - stream.{{- columns.ts }}, - stream.{{- columns.activity }}, - stream.{{- columns.anonymous_customer_id }}, - stream.{{- columns.feature_json }}, - stream.{{- columns.revenue_impact }}, - stream.{{- columns.link }}, - stream.{{- columns.activity_occurrence }}, - stream.{{- columns.activity_repeated_at }}, - - {% for activity in appended_activities %}{% set i = loop.index %} - - stream_{{ i }}.{{ columns.ts }} as {{ alias(activity, columns.ts)}}, - stream_{{ i }}.{{ columns.feature_json }} as {{ alias(activity, columns.feature_json)}} - - {%- if not loop.last -%},{% endif %}{% endfor %} + {% for col in columns.primary_activity %} + stream.{{- col }}, + {% endfor %} + + {% for activity in appended_activities %}{% set i = loop.index %}{% set last_outer_loop = loop.last %} + {% for col in columns.appended_activities %} + + stream_{{ i }}.{{ col.name }} as {{ alias(activity, col.name)}}{% if not (last_outer_loop and loop.last) %},{% endif %} + + {% endfor %} + {% endfor %} from {{ activity_stream_ref }} as stream {% for activity in appended_activities %}{% set i = loop.index %} - left join {{ activity_stream_ref }} as stream_{{ i }} - on ( - stream_{{ i }}.{{ columns.customer }} = stream.{{ columns.customer }} - and stream_{{ i -}}.{{- columns.activity }} = {{ dbt.string_literal(activity.name) }} - and {{ activity.relationship.join_clause(i) }} - ) + left join {{ activity_stream_ref }} as stream_{{ i }} + on ( + stream_{{ i }}.{{ columns.customer }} = stream.{{ columns.customer }} + and stream_{{ i -}}.{{- columns.activity }} = {{ dbt.string_literal(activity.name) }} + and {{ activity.relationship.join_clause(i) }} + ) + {% endfor %} where stream.{{ columns.activity }} = '{{ primary_activity.name }}' @@ -74,43 +69,27 @@ join_appended_activities as ( aggregate_appended_activities as ( select - {{ columns.activity_id }}, - {{ columns.customer }}, - {{ columns.ts }}, - {{ columns.activity }}, - {{ columns.anonymous_customer_id }}, - {{ columns.feature_json }}, - {{ columns.revenue_impact }}, - {{ columns.link }}, - {{ columns.activity_occurrence }}, - {{ columns.activity_repeated_at }}, - - {% for activity in appended_activities %}{% set i = loop.index %} - - min( - {{- alias(activity, columns.feature_json) -}} - ) as {{- alias(activity, columns.feature_json) -}}, + {% for col in columns.primary_activity %} + {{- col }}, + {% endfor %} + + {% for activity in appended_activities %}{% set i = loop.index %}{% set last_outer_loop = loop.last %} + {% for col in columns.appended_activities %} {{ activity.relationship.aggregation_func }}( - {{- alias(activity, columns.ts) -}} - ) as {{ alias(activity, columns.ts) }} + {{- alias(activity, col.name) -}} + ) as {{ alias(activity, col.name)}}{% if not (last_outer_loop and loop.last) %},{% endif %} - {%- if not loop.last -%},{% endif %}{% endfor %} + {% endfor %} + {% endfor %} from join_appended_activities group by - {{ columns.activity_id }}, - {{ columns.customer }}, - {{ columns.ts }}, - {{ columns.activity }}, - {{ columns.anonymous_customer_id }}, - {{ columns.feature_json }}, - {{ columns.revenue_impact }}, - {{ columns.link }}, - {{ columns.activity_occurrence }}, - {{ columns.activity_repeated_at }} + {% for col in columns.primary_activity %} + {{- col }}{% if not loop.last %},{% endif %} + {% endfor %} ) -select * from aggregate_appended_activities +select * from aggregate_appended_activities {% endmacro %}