diff --git a/dbt_project.yml b/dbt_project.yml index 1005f4a..a743476 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -17,10 +17,6 @@ macro-paths: ["macros"] snapshot-paths: ["snapshots"] target-path: "target" -clean-targets: +clean-targets: - "target" - "dbt_modules" - -# Configured for the dbt_activity_schema.logger macro. -vars: - logging_level: INFO diff --git a/integration_tests/dbt_project.yml b/integration_tests/dbt_project.yml index 3e25add..5f4df38 100644 --- a/integration_tests/dbt_project.yml +++ b/integration_tests/dbt_project.yml @@ -19,29 +19,20 @@ models: +materialized: table vars: - running_intergration_tests: true - activity_schema_required_column_aliases: - ts: ts - customer: customer - activity: activity - activity_occurrence: activity_occurrence - activity_repeated_at: activity_repeated_at - activity_schema_primary_activity_columns: + primary_activity_columns: + - activity_id - customer - ts - - activity_occurrence - - activity_repeated_at - - activity_id - activity - anonymous_customer_id - feature_json - revenue_impact - link - activity_schema_appended_activities_columns: - - name: feature_json - aggregation: min - - name: ts - aggregation: min + - activity_occurrence + - activity_repeated_at + appended_activity_columns: + - feature_json + - ts seeds: dbt_activity_schema_integration_tests: diff --git a/integration_tests/models/first_after/dataset__first_after_2.sql b/integration_tests/models/first_after/dataset__first_after_2.sql index 1860e53..8f9eace 100644 --- a/integration_tests/models/first_after/dataset__first_after_2.sql +++ b/integration_tests/models/first_after/dataset__first_after_2.sql @@ -3,7 +3,8 @@ ref("input__first_after"), dbt_activity_schema.primary_activity("All","visit page"), [ - dbt_activity_schema.append_activity("first_after", "bought something") + dbt_activity_schema.append_activity( + "first_after", "bought something") ] ) }} diff --git a/integration_tests/models/first_after/dataset__first_after_3.sql b/integration_tests/models/first_after/dataset__first_after_3.sql new file mode 100644 index 0000000..ead5147 --- /dev/null +++ b/integration_tests/models/first_after/dataset__first_after_3.sql @@ -0,0 +1,14 @@ +{{ + dbt_activity_schema.dataset( + ref("input__first_after"), + dbt_activity_schema.primary_activity("All","signed up"), + [ + dbt_activity_schema.append_activity( + "first_after", + "visit page", + ["feature_json", "activity_occurrence", "ts"], + feature_json_join_columns=["type"] + ) + ] + ) +}} diff --git a/integration_tests/models/models.yml b/integration_tests/models/models.yml index 64c6c66..1810fea 100644 --- a/integration_tests/models/models.yml +++ b/integration_tests/models/models.yml @@ -12,6 +12,11 @@ models: - dbt_utils.equality: compare_model: ref("output__first_after_2") + - name: dataset__first_after_3 + tests: + - dbt_utils.equality: + compare_model: ref("output__first_after_3") + - name: dataset__first_before tests: - dbt_utils.equality: @@ -31,7 +36,7 @@ models: tests: - dbt_utils.equality: compare_model: ref("output__last_before") - + - name: dataset__last_ever tests: - dbt_utils.equality: diff --git a/integration_tests/scripts/export_env.sh b/integration_tests/scripts/export_env.sh new file mode 100644 index 0000000..e4512d9 --- /dev/null +++ b/integration_tests/scripts/export_env.sh @@ -0,0 +1,3 @@ +#!/bin/sh + +export $(grep -v '^#' .env | xargs) diff --git a/integration_tests/seeds/first_after/input__first_after.csv b/integration_tests/seeds/first_after/input__first_after.csv index 9f72ae2..37b72a3 100644 --- a/integration_tests/seeds/first_after/input__first_after.csv +++ b/integration_tests/seeds/first_after/input__first_after.csv @@ -1,13 +1,13 @@ activity_id,ts,customer,activity,anonymous_customer_id,feature_json,revenue_impact,link,activity_occurrence,activity_repeated_at -2,2022-01-01 22:10:11,1,visit page,,"[{""visited page"": 1}]",0,,1,2022-01-03 22:10:11 -3,2022-01-02 22:10:11,1,signed up,,"[{""signed up"": 1}]",0,,1, -4,2022-01-03 22:10:11,1,visit page,,"[{""visited page"": 1}]",0,,2,2022-01-04 22:10:11 -5,2022-01-04 22:10:11,1,visit page,,"[{""visited page"": 1}]",0,,3,2022-01-06 22:10:11 -6,2022-01-05 22:10:11,1,bought something,,"[{""bought something"": 1}]",100,,1, -7,2022-01-06 22:10:11,1,visit page,,"[{""visited page"": 1}]",0,,4, -8,2022-01-07 22:10:11,7,visit page,,"[{""visited page"": 1}]",0,,1,2022-01-09 22:10:11 -9,2022-01-08 22:10:11,7,signed up,,"[{""signed up"": 1}]",0,,1, -10,2022-01-09 22:10:11,7,visit page,,"[{""visited page"": 1}]",0,,2,2022-01-10 22:10:11 -11,2022-01-10 22:10:11,7,visit page,,"[{""visited page"": 1}]",0,,3,2022-01-12 22:10:11 -12,2022-01-11 22:10:11,7,bought something,,"[{""bought something"": 1}]",100,,1, -13,2022-01-12 22:10:11,7,visit page,,"[{""visited page"": 1}]",0,,4, +2,2022-01-01 22:10:11,1,visit page,,"{""type"": 1}",0,,1,2022-01-03 22:10:11 +3,2022-01-02 22:10:11,1,signed up,,"{""type"": 1}",0,,1, +4,2022-01-03 22:10:11,1,visit page,,"{""type"": 2}",0,,2,2022-01-04 22:10:11 +5,2022-01-04 22:10:11,1,visit page,,"{""type"": 2}",0,,3,2022-01-06 22:10:11 +6,2022-01-05 22:10:11,1,bought something,,"{""type"": 1}",100,,1, +7,2022-01-06 22:10:11,1,visit page,,"{""type"": 1}",0,,4, +8,2022-01-07 22:10:11,7,visit page,,"{""type"": 1}",0,,1,2022-01-09 22:10:11 +9,2022-01-08 22:10:11,7,signed up,,"{""type"": 1}",0,,1, +10,2022-01-09 22:10:11,7,visit page,,"{""type"": 2}",0,,2,2022-01-10 22:10:11 +11,2022-01-10 22:10:11,7,visit page,,"{""type"": 2}",0,,3,2022-01-12 22:10:11 +12,2022-01-11 22:10:11,7,bought something,,"{""type"": 1}",100,,1, +13,2022-01-12 22:10:11,7,visit page,,"{""type"": 1}",0,,4, diff --git a/integration_tests/seeds/first_after/output/output__first_after_1.csv b/integration_tests/seeds/first_after/output/output__first_after_1.csv index 8c3b49d..90525b0 100644 --- a/integration_tests/seeds/first_after/output/output__first_after_1.csv +++ b/integration_tests/seeds/first_after/output/output__first_after_1.csv @@ -1,3 +1,3 @@ ACTIVITY_ID,CUSTOMER,TS,ACTIVITY,ANONYMOUS_CUSTOMER_ID,FEATURE_JSON,REVENUE_IMPACT,LINK,ACTIVITY_OCCURRENCE,ACTIVITY_REPEATED_AT,FIRST_AFTER_BOUGHT_SOMETHING_FEATURE_JSON,FIRST_AFTER_BOUGHT_SOMETHING_TS -3,1,2022-01-02 22:10:11.000,signed up,,[{"signed up": 1}],0,,1,,[{"bought something": 1}],2022-01-05 22:10:11.000 -9,7,2022-01-08 22:10:11.000,signed up,,[{"signed up": 1}],0,,1,,[{"bought something": 1}],2022-01-11 22:10:11.000 +3,1,2022-01-02 22:10:11.000,signed up,,{"type": 1},0,,1,,{"type": 1},2022-01-05 22:10:11.000 +9,7,2022-01-08 22:10:11.000,signed up,,{"type": 1},0,,1,,{"type": 1},2022-01-11 22:10:11.000 diff --git a/integration_tests/seeds/first_after/output/output__first_after_2.csv b/integration_tests/seeds/first_after/output/output__first_after_2.csv index a84c4b3..3a15ea4 100644 --- a/integration_tests/seeds/first_after/output/output__first_after_2.csv +++ b/integration_tests/seeds/first_after/output/output__first_after_2.csv @@ -1,9 +1,9 @@ ACTIVITY_ID,CUSTOMER,TS,ACTIVITY,ANONYMOUS_CUSTOMER_ID,FEATURE_JSON,REVENUE_IMPACT,LINK,ACTIVITY_OCCURRENCE,ACTIVITY_REPEATED_AT,FIRST_AFTER_BOUGHT_SOMETHING_FEATURE_JSON,FIRST_AFTER_BOUGHT_SOMETHING_TS -8,7,2022-01-07 22:10:11.000,visit page,,[{"visited page": 1}],0,,1,2022-01-09 22:10:11.000,, -11,7,2022-01-10 22:10:11.000,visit page,,[{"visited page": 1}],0,,3,2022-01-12 22:10:11.000,[{"bought something": 1}],2022-01-11 22:10:11.000 -2,1,2022-01-01 22:10:11.000,visit page,,[{"visited page": 1}],0,,1,2022-01-03 22:10:11.000,, -10,7,2022-01-09 22:10:11.000,visit page,,[{"visited page": 1}],0,,2,2022-01-10 22:10:11.000,, -5,1,2022-01-04 22:10:11.000,visit page,,[{"visited page": 1}],0,,3,2022-01-06 22:10:11.000,[{"bought something": 1}],2022-01-05 22:10:11.000 -4,1,2022-01-03 22:10:11.000,visit page,,[{"visited page": 1}],0,,2,2022-01-04 22:10:11.000,, -7,1,2022-01-06 22:10:11.000,visit page,,[{"visited page": 1}],0,,4,,, -13,7,2022-01-12 22:10:11.000,visit page,,[{"visited page": 1}],0,,4,,, +7,1,2022-01-06 22:10:11.000,visit page,,{"type": 1},0,,4,,, +2,1,2022-01-01 22:10:11.000,visit page,,{"type": 1},0,,1,2022-01-03 22:10:11.000,, +4,1,2022-01-03 22:10:11.000,visit page,,{"type": 2},0,,2,2022-01-04 22:10:11.000,, +10,7,2022-01-09 22:10:11.000,visit page,,{"type": 2},0,,2,2022-01-10 22:10:11.000,, +8,7,2022-01-07 22:10:11.000,visit page,,{"type": 1},0,,1,2022-01-09 22:10:11.000,, +5,1,2022-01-04 22:10:11.000,visit page,,{"type": 2},0,,3,2022-01-06 22:10:11.000,{"type": 1},2022-01-05 22:10:11.000 +11,7,2022-01-10 22:10:11.000,visit page,,{"type": 2},0,,3,2022-01-12 22:10:11.000,{"type": 1},2022-01-11 22:10:11.000 +13,7,2022-01-12 22:10:11.000,visit page,,{"type": 1},0,,4,,, diff --git a/integration_tests/seeds/first_after/output/output__first_after_3.csv b/integration_tests/seeds/first_after/output/output__first_after_3.csv new file mode 100644 index 0000000..f09ac9a --- /dev/null +++ b/integration_tests/seeds/first_after/output/output__first_after_3.csv @@ -0,0 +1,3 @@ +ACTIVITY_ID,CUSTOMER,TS,ACTIVITY,ANONYMOUS_CUSTOMER_ID,FEATURE_JSON,REVENUE_IMPACT,LINK,ACTIVITY_OCCURRENCE,ACTIVITY_REPEATED_AT,FIRST_AFTER_VISIT_PAGE_FEATURE_JSON,FIRST_AFTER_VISIT_PAGE_ACTIVITY_OCCURRENCE,FIRST_AFTER_VISIT_PAGE_TS +3,1,2022-01-02 22:10:11.000,signed up,,{"type": 1},0,,1,,{"type": 1},4,2022-01-06 22:10:11.000 +9,7,2022-01-08 22:10:11.000,signed up,,{"type": 1},0,,1,,{"type": 1},4,2022-01-12 22:10:11.000 diff --git a/macros/dataclasses/append_activity.sql b/macros/dataclasses/append_activity.sql index 1b30f26..4409496 100644 --- a/macros/dataclasses/append_activity.sql +++ b/macros/dataclasses/append_activity.sql @@ -1,8 +1,9 @@ -{% macro append_activity(relationship_name, activity_name) %} - {{ return(adapter.dispatch("append_activity", "dbt_activity_schema")(relationship_name, activity_name))}} -{% endmacro %} - -{% macro default__append_activity(relationship_name, activity_name) %} +{% macro append_activity( + relationship_name, + activity_name, + override_appended_columns=[], + feature_json_join_columns=[] +) %} {# An activity to append to the `primary_activity`. @@ -18,11 +19,21 @@ params: 6. "last_after" 7. "aggregate_after" 8. "aggregate_all_ever" - + activity_name: str The string identifier of the activity in the stream to append (join). + + override_appended_columns: List[str] + List of columns to join to the primary activity, defaults to the project var `appended_activity_columns`. + + feature_json_join_columns: List[str] + List of additional keys in the feature_json to extract and join on. #} +{% set default_appended_activity_columns = dbt_activity_schema.columns().appended_activities %} + +{% set columns_to_append = override_appended_columns if override_appended_columns != [] else default_appended_activity_columns %} + {% set relationship_factory = dict( first_before = dbt_activity_schema.first_before(), first_ever = dbt_activity_schema.first_ever(), @@ -34,6 +45,8 @@ params: {% do return(namespace( name = activity_name, + columns_to_append = columns_to_append, + feature_json_join_columns = feature_json_join_columns, relationship = relationship_factory[relationship_name] )) %} diff --git a/macros/dataclasses/columns.sql b/macros/dataclasses/columns.sql index 3e6da5c..1e3b23a 100644 --- a/macros/dataclasses/columns.sql +++ b/macros/dataclasses/columns.sql @@ -4,68 +4,41 @@ {% macro default__columns() %} -{% set activity_schema_required_column_aliases = var( - "activity_schema_required_column_aliases", +{% set column_names = dict( + activity_id = "activity_id", ts = "ts", customer = "customer", + anonymous_customer_id = "anonymous_customer_id", activity = "activity", activity_occurrence = "activity_occurrence", - activity_repeated_at = "activity_repeated_at" + activity_repeated_at = "activity_repeated_at", + feature_json = "feature_json", + revenue_impact = "revenue_impact", + link = "link" ) -) %} +%} -{% set required_columns = [ - "ts", - "customer", - "activity", - "activity_occurrence", - "activity_repeated_at" -] %} +{% do column_names.update(var("override_columns", {})) %} -{% set required_columns_provided = activity_schema_required_column_aliases.keys() | list %} -{% if required_columns_provided != required_columns %} - {% set message %} - "Project variable 'activity_schema_required_column_aliases' must contain the following keys: " {{ required_columns }}, - "Got: " {{ required_columns_provided }} - {% endset %} - {{ exceptions.raise_compiler_error(message)}} -{% endif %} -{% set activity_schema_primary_activity_columns = var( - "activity_schema_primary_activity_columns", - [ - "activity_id", - "activity", - "anonymous_customer_id", - "feature_json", - "revenue_impact", - "link" - ] -) %} +{% set primary_activity_columns = var("primary_activity_columns", column_names.values() | list) %} -{% set activity_schema_appended_activities_columns = var( - "activity_schema_appended_activities_columns", - [ - dict( - name = "feature_json", - aggregation = "min" - ), - dict( - name = "ts", - aggregation = "min" - ) - ] -) %} +{% set appended_activity_columns = var("appended_activity_columns", column_names.values() | list) %} {% do return(namespace( - ts = activity_schema_required_column_aliases["ts"], - customer = activity_schema_required_column_aliases["customer"], - activity = activity_schema_required_column_aliases["activity"], - activity_occurrence = activity_schema_required_column_aliases["activity_occurrence"], - activity_repeated_at = activity_schema_required_column_aliases["activity_repeated_at"], - primary_activity = activity_schema_primary_activity_columns, - appended_activities = activity_schema_appended_activities_columns + activity_id = column_names["activity_id"], + ts = column_names["ts"], + customer = column_names["customer"], + anonymous_customer_id = column_names["anonymous_customer_id"], + activity = column_names["activity"], + activity_occurrence = column_names["activity_occurrence"], + activity_repeated_at = column_names["activity_repeated_at"], + feature_json = column_names["feature_json"], + revenue_impact = column_names["revenue_impact"], + link = column_names["link"], + primary_activity = primary_activity_columns, + appended_activities = appended_activity_columns )) %} {% endmacro %} diff --git a/macros/dataset.sql b/macros/dataset.sql index 82f1072..19b4235 100644 --- a/macros/dataset.sql +++ b/macros/dataset.sql @@ -33,6 +33,7 @@ params: {% set columns = dbt_activity_schema.columns() %} {% set stream = dbt_activity_schema.globals().stream %} {% set alias = dbt_activity_schema.alias %} +{% set json_unpack_key = dbt_activity_schema.json_unpack_key %} with @@ -43,9 +44,9 @@ join_appended_activities as ( {% endfor %} {% for activity in appended_activities %}{% set i = loop.index %}{% set last_outer_loop = loop.last %} - {% for col in columns.appended_activities %} + {% for col in activity.columns_to_append %} - stream_{{ i }}.{{ col.name }} as {{ alias(activity, col.name)}}{% if not (last_outer_loop and loop.last) %},{% endif %} + stream_{{ i }}.{{ col }} as {{ alias(activity, col) }}{% if not (last_outer_loop and loop.last) %},{% endif %} {% endfor %} {% endfor %} @@ -58,6 +59,15 @@ join_appended_activities as ( on ( stream_{{ i }}.{{ columns.customer }} = stream.{{ columns.customer }} and stream_{{ i -}}.{{- columns.activity }} = {{ dbt.string_literal(activity.name) }} + + {% for col in activity.feature_json_join_columns %} + {% set _stream_i -%} stream_{{ i }}.{{ columns.feature_json }} {%- endset %} + {% set _stream -%} stream.{{ columns.feature_json }} {%- endset %} + + and {{ json_unpack_key(_stream_i, col) }} = {{ json_unpack_key(_stream, col) }} + + {% endfor %} + and {{ activity.relationship.join_clause(i) }} ) @@ -74,11 +84,11 @@ aggregate_appended_activities as ( {% endfor %} {% for activity in appended_activities %}{% set i = loop.index %}{% set last_outer_loop = loop.last %} - {% for col in columns.appended_activities %} + {% for col in activity.columns_to_append %} {{ activity.relationship.aggregation_func }}( - {{- alias(activity, col.name) -}} - ) as {{ alias(activity, col.name)}}{% if not (last_outer_loop and loop.last) %},{% endif %} + {{- alias(activity, col) -}} + ) as {{ alias(activity, col)}}{% if not (last_outer_loop and loop.last) %},{% endif %} {% endfor %} {% endfor %} diff --git a/macros/utils/appended_activity_alias.sql b/macros/utils/alias.sql similarity index 100% rename from macros/utils/appended_activity_alias.sql rename to macros/utils/alias.sql diff --git a/macros/utils/factories/relationship.sql b/macros/utils/factories/relationship.sql deleted file mode 100644 index 8ce5a28..0000000 --- a/macros/utils/factories/relationship.sql +++ /dev/null @@ -1,13 +0,0 @@ -{% macro relationship(option, name, i) %} - {{ return(adapter.dispatch("relationship", "dbt_activity_schema")(option, name, i))}} -{% endmacro %} - -{% macro default__relationship(option, name, i) %} - -{% set relationship_factory = dict( - first_ever=dbt_activity_schema.first_ever(option, i) -) %} - -{% do return(relationship_factory[name]) %} - -{% endmacro %} diff --git a/macros/utils/json_unpack_key.sql b/macros/utils/json_unpack_key.sql new file mode 100644 index 0000000..69f040b --- /dev/null +++ b/macros/utils/json_unpack_key.sql @@ -0,0 +1,23 @@ +{% macro json_unpack_key(json_col, key) %} + {{ return(adapter.dispatch("json_unpack_key", "dbt_activity_schema")(json_col, key))}} +{% endmacro %} + +{# params + +key: str + The name of the key to unpack from the activity schema feature_json column. +#} + +{% macro default__json_unpack_key(json_col, key) -%} + +{% if caller %} + +json_extract_path_text({{ caller }}) + +{% else %} + +json_extract_path_text({{ json_col }}, {{dbt.string_literal(key) }}) + +{% endif %} + +{%- endmacro %}