Skip to content

Commit

Permalink
0.0.3 (#3)
Browse files Browse the repository at this point in the history
* feat:

* fix: change var to `override_columns`

* feat: add export_env.sh, fix: models.yml test
  • Loading branch information
tnightengale authored Feb 3, 2023
1 parent 6b1f354 commit 090ca1d
Show file tree
Hide file tree
Showing 16 changed files with 138 additions and 119 deletions.
6 changes: 1 addition & 5 deletions dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@ macro-paths: ["macros"]
snapshot-paths: ["snapshots"]

target-path: "target"
clean-targets:
clean-targets:
- "target"
- "dbt_modules"

# Configured for the dbt_activity_schema.logger macro.
vars:
logging_level: INFO
23 changes: 7 additions & 16 deletions integration_tests/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,20 @@ models:
+materialized: table

vars:
running_intergration_tests: true
activity_schema_required_column_aliases:
ts: ts
customer: customer
activity: activity
activity_occurrence: activity_occurrence
activity_repeated_at: activity_repeated_at
activity_schema_primary_activity_columns:
primary_activity_columns:
- activity_id
- customer
- ts
- activity_occurrence
- activity_repeated_at
- activity_id
- activity
- anonymous_customer_id
- feature_json
- revenue_impact
- link
activity_schema_appended_activities_columns:
- name: feature_json
aggregation: min
- name: ts
aggregation: min
- activity_occurrence
- activity_repeated_at
appended_activity_columns:
- feature_json
- ts

seeds:
dbt_activity_schema_integration_tests:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
ref("input__first_after"),
dbt_activity_schema.primary_activity("All","visit page"),
[
dbt_activity_schema.append_activity("first_after", "bought something")
dbt_activity_schema.append_activity(
"first_after", "bought something")
]
)
}}
14 changes: 14 additions & 0 deletions integration_tests/models/first_after/dataset__first_after_3.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{{
dbt_activity_schema.dataset(
ref("input__first_after"),
dbt_activity_schema.primary_activity("All","signed up"),
[
dbt_activity_schema.append_activity(
"first_after",
"visit page",
["feature_json", "activity_occurrence", "ts"],
feature_json_join_columns=["type"]
)
]
)
}}
7 changes: 6 additions & 1 deletion integration_tests/models/models.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ models:
- dbt_utils.equality:
compare_model: ref("output__first_after_2")

- name: dataset__first_after_3
tests:
- dbt_utils.equality:
compare_model: ref("output__first_after_3")

- name: dataset__first_before
tests:
- dbt_utils.equality:
Expand All @@ -31,7 +36,7 @@ models:
tests:
- dbt_utils.equality:
compare_model: ref("output__last_before")

- name: dataset__last_ever
tests:
- dbt_utils.equality:
Expand Down
3 changes: 3 additions & 0 deletions integration_tests/scripts/export_env.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/sh

export $(grep -v '^#' .env | xargs)
24 changes: 12 additions & 12 deletions integration_tests/seeds/first_after/input__first_after.csv
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
activity_id,ts,customer,activity,anonymous_customer_id,feature_json,revenue_impact,link,activity_occurrence,activity_repeated_at
2,2022-01-01 22:10:11,1,visit page,,"[{""visited page"": 1}]",0,,1,2022-01-03 22:10:11
3,2022-01-02 22:10:11,1,signed up,,"[{""signed up"": 1}]",0,,1,
4,2022-01-03 22:10:11,1,visit page,,"[{""visited page"": 1}]",0,,2,2022-01-04 22:10:11
5,2022-01-04 22:10:11,1,visit page,,"[{""visited page"": 1}]",0,,3,2022-01-06 22:10:11
6,2022-01-05 22:10:11,1,bought something,,"[{""bought something"": 1}]",100,,1,
7,2022-01-06 22:10:11,1,visit page,,"[{""visited page"": 1}]",0,,4,
8,2022-01-07 22:10:11,7,visit page,,"[{""visited page"": 1}]",0,,1,2022-01-09 22:10:11
9,2022-01-08 22:10:11,7,signed up,,"[{""signed up"": 1}]",0,,1,
10,2022-01-09 22:10:11,7,visit page,,"[{""visited page"": 1}]",0,,2,2022-01-10 22:10:11
11,2022-01-10 22:10:11,7,visit page,,"[{""visited page"": 1}]",0,,3,2022-01-12 22:10:11
12,2022-01-11 22:10:11,7,bought something,,"[{""bought something"": 1}]",100,,1,
13,2022-01-12 22:10:11,7,visit page,,"[{""visited page"": 1}]",0,,4,
2,2022-01-01 22:10:11,1,visit page,,"{""type"": 1}",0,,1,2022-01-03 22:10:11
3,2022-01-02 22:10:11,1,signed up,,"{""type"": 1}",0,,1,
4,2022-01-03 22:10:11,1,visit page,,"{""type"": 2}",0,,2,2022-01-04 22:10:11
5,2022-01-04 22:10:11,1,visit page,,"{""type"": 2}",0,,3,2022-01-06 22:10:11
6,2022-01-05 22:10:11,1,bought something,,"{""type"": 1}",100,,1,
7,2022-01-06 22:10:11,1,visit page,,"{""type"": 1}",0,,4,
8,2022-01-07 22:10:11,7,visit page,,"{""type"": 1}",0,,1,2022-01-09 22:10:11
9,2022-01-08 22:10:11,7,signed up,,"{""type"": 1}",0,,1,
10,2022-01-09 22:10:11,7,visit page,,"{""type"": 2}",0,,2,2022-01-10 22:10:11
11,2022-01-10 22:10:11,7,visit page,,"{""type"": 2}",0,,3,2022-01-12 22:10:11
12,2022-01-11 22:10:11,7,bought something,,"{""type"": 1}",100,,1,
13,2022-01-12 22:10:11,7,visit page,,"{""type"": 1}",0,,4,
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
ACTIVITY_ID,CUSTOMER,TS,ACTIVITY,ANONYMOUS_CUSTOMER_ID,FEATURE_JSON,REVENUE_IMPACT,LINK,ACTIVITY_OCCURRENCE,ACTIVITY_REPEATED_AT,FIRST_AFTER_BOUGHT_SOMETHING_FEATURE_JSON,FIRST_AFTER_BOUGHT_SOMETHING_TS
3,1,2022-01-02 22:10:11.000,signed up,,[{"signed up": 1}],0,,1,,[{"bought something": 1}],2022-01-05 22:10:11.000
9,7,2022-01-08 22:10:11.000,signed up,,[{"signed up": 1}],0,,1,,[{"bought something": 1}],2022-01-11 22:10:11.000
3,1,2022-01-02 22:10:11.000,signed up,,{"type": 1},0,,1,,{"type": 1},2022-01-05 22:10:11.000
9,7,2022-01-08 22:10:11.000,signed up,,{"type": 1},0,,1,,{"type": 1},2022-01-11 22:10:11.000
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
ACTIVITY_ID,CUSTOMER,TS,ACTIVITY,ANONYMOUS_CUSTOMER_ID,FEATURE_JSON,REVENUE_IMPACT,LINK,ACTIVITY_OCCURRENCE,ACTIVITY_REPEATED_AT,FIRST_AFTER_BOUGHT_SOMETHING_FEATURE_JSON,FIRST_AFTER_BOUGHT_SOMETHING_TS
8,7,2022-01-07 22:10:11.000,visit page,,[{"visited page": 1}],0,,1,2022-01-09 22:10:11.000,,
11,7,2022-01-10 22:10:11.000,visit page,,[{"visited page": 1}],0,,3,2022-01-12 22:10:11.000,[{"bought something": 1}],2022-01-11 22:10:11.000
2,1,2022-01-01 22:10:11.000,visit page,,[{"visited page": 1}],0,,1,2022-01-03 22:10:11.000,,
10,7,2022-01-09 22:10:11.000,visit page,,[{"visited page": 1}],0,,2,2022-01-10 22:10:11.000,,
5,1,2022-01-04 22:10:11.000,visit page,,[{"visited page": 1}],0,,3,2022-01-06 22:10:11.000,[{"bought something": 1}],2022-01-05 22:10:11.000
4,1,2022-01-03 22:10:11.000,visit page,,[{"visited page": 1}],0,,2,2022-01-04 22:10:11.000,,
7,1,2022-01-06 22:10:11.000,visit page,,[{"visited page": 1}],0,,4,,,
13,7,2022-01-12 22:10:11.000,visit page,,[{"visited page": 1}],0,,4,,,
7,1,2022-01-06 22:10:11.000,visit page,,{"type": 1},0,,4,,,
2,1,2022-01-01 22:10:11.000,visit page,,{"type": 1},0,,1,2022-01-03 22:10:11.000,,
4,1,2022-01-03 22:10:11.000,visit page,,{"type": 2},0,,2,2022-01-04 22:10:11.000,,
10,7,2022-01-09 22:10:11.000,visit page,,{"type": 2},0,,2,2022-01-10 22:10:11.000,,
8,7,2022-01-07 22:10:11.000,visit page,,{"type": 1},0,,1,2022-01-09 22:10:11.000,,
5,1,2022-01-04 22:10:11.000,visit page,,{"type": 2},0,,3,2022-01-06 22:10:11.000,{"type": 1},2022-01-05 22:10:11.000
11,7,2022-01-10 22:10:11.000,visit page,,{"type": 2},0,,3,2022-01-12 22:10:11.000,{"type": 1},2022-01-11 22:10:11.000
13,7,2022-01-12 22:10:11.000,visit page,,{"type": 1},0,,4,,,
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ACTIVITY_ID,CUSTOMER,TS,ACTIVITY,ANONYMOUS_CUSTOMER_ID,FEATURE_JSON,REVENUE_IMPACT,LINK,ACTIVITY_OCCURRENCE,ACTIVITY_REPEATED_AT,FIRST_AFTER_VISIT_PAGE_FEATURE_JSON,FIRST_AFTER_VISIT_PAGE_ACTIVITY_OCCURRENCE,FIRST_AFTER_VISIT_PAGE_TS
3,1,2022-01-02 22:10:11.000,signed up,,{"type": 1},0,,1,,{"type": 1},4,2022-01-06 22:10:11.000
9,7,2022-01-08 22:10:11.000,signed up,,{"type": 1},0,,1,,{"type": 1},4,2022-01-12 22:10:11.000
25 changes: 19 additions & 6 deletions macros/dataclasses/append_activity.sql
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
{% macro append_activity(relationship_name, activity_name) %}
{{ return(adapter.dispatch("append_activity", "dbt_activity_schema")(relationship_name, activity_name))}}
{% endmacro %}

{% macro default__append_activity(relationship_name, activity_name) %}
{% macro append_activity(
relationship_name,
activity_name,
override_appended_columns=[],
feature_json_join_columns=[]
) %}

{# An activity to append to the `primary_activity`.

Expand All @@ -18,11 +19,21 @@ params:
6. "last_after"
7. "aggregate_after"
8. "aggregate_all_ever"

activity_name: str
The string identifier of the activity in the stream to append (join).

override_appended_columns: List[str]
List of columns to join to the primary activity, defaults to the project var `appended_activity_columns`.

feature_json_join_columns: List[str]
List of additional keys in the feature_json to extract and join on.
#}

{% set default_appended_activity_columns = dbt_activity_schema.columns().appended_activities %}

{% set columns_to_append = override_appended_columns if override_appended_columns != [] else default_appended_activity_columns %}

{% set relationship_factory = dict(
first_before = dbt_activity_schema.first_before(),
first_ever = dbt_activity_schema.first_ever(),
Expand All @@ -34,6 +45,8 @@ params:

{% do return(namespace(
name = activity_name,
columns_to_append = columns_to_append,
feature_json_join_columns = feature_json_join_columns,
relationship = relationship_factory[relationship_name]

)) %}
Expand Down
73 changes: 23 additions & 50 deletions macros/dataclasses/columns.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,68 +4,41 @@

{% macro default__columns() %}

{% set activity_schema_required_column_aliases = var(
"activity_schema_required_column_aliases",
{% set column_names =
dict(
activity_id = "activity_id",
ts = "ts",
customer = "customer",
anonymous_customer_id = "anonymous_customer_id",
activity = "activity",
activity_occurrence = "activity_occurrence",
activity_repeated_at = "activity_repeated_at"
activity_repeated_at = "activity_repeated_at",
feature_json = "feature_json",
revenue_impact = "revenue_impact",
link = "link"
)
) %}
%}

{% set required_columns = [
"ts",
"customer",
"activity",
"activity_occurrence",
"activity_repeated_at"
] %}
{% do column_names.update(var("override_columns", {})) %}

{% set required_columns_provided = activity_schema_required_column_aliases.keys() | list %}
{% if required_columns_provided != required_columns %}
{% set message %}
"Project variable 'activity_schema_required_column_aliases' must contain the following keys: " {{ required_columns }},
"Got: " {{ required_columns_provided }}
{% endset %}
{{ exceptions.raise_compiler_error(message)}}
{% endif %}

{% set activity_schema_primary_activity_columns = var(
"activity_schema_primary_activity_columns",
[
"activity_id",
"activity",
"anonymous_customer_id",
"feature_json",
"revenue_impact",
"link"
]
) %}
{% set primary_activity_columns = var("primary_activity_columns", column_names.values() | list) %}

{% set activity_schema_appended_activities_columns = var(
"activity_schema_appended_activities_columns",
[
dict(
name = "feature_json",
aggregation = "min"
),
dict(
name = "ts",
aggregation = "min"
)
]
) %}
{% set appended_activity_columns = var("appended_activity_columns", column_names.values() | list) %}

{% do return(namespace(
ts = activity_schema_required_column_aliases["ts"],
customer = activity_schema_required_column_aliases["customer"],
activity = activity_schema_required_column_aliases["activity"],
activity_occurrence = activity_schema_required_column_aliases["activity_occurrence"],
activity_repeated_at = activity_schema_required_column_aliases["activity_repeated_at"],
primary_activity = activity_schema_primary_activity_columns,
appended_activities = activity_schema_appended_activities_columns
activity_id = column_names["activity_id"],
ts = column_names["ts"],
customer = column_names["customer"],
anonymous_customer_id = column_names["anonymous_customer_id"],
activity = column_names["activity"],
activity_occurrence = column_names["activity_occurrence"],
activity_repeated_at = column_names["activity_repeated_at"],
feature_json = column_names["feature_json"],
revenue_impact = column_names["revenue_impact"],
link = column_names["link"],
primary_activity = primary_activity_columns,
appended_activities = appended_activity_columns
)) %}

{% endmacro %}
20 changes: 15 additions & 5 deletions macros/dataset.sql
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ params:
{% set columns = dbt_activity_schema.columns() %}
{% set stream = dbt_activity_schema.globals().stream %}
{% set alias = dbt_activity_schema.alias %}
{% set json_unpack_key = dbt_activity_schema.json_unpack_key %}

with

Expand All @@ -43,9 +44,9 @@ join_appended_activities as (
{% endfor %}

{% for activity in appended_activities %}{% set i = loop.index %}{% set last_outer_loop = loop.last %}
{% for col in columns.appended_activities %}
{% for col in activity.columns_to_append %}

stream_{{ i }}.{{ col.name }} as {{ alias(activity, col.name)}}{% if not (last_outer_loop and loop.last) %},{% endif %}
stream_{{ i }}.{{ col }} as {{ alias(activity, col) }}{% if not (last_outer_loop and loop.last) %},{% endif %}

{% endfor %}
{% endfor %}
Expand All @@ -58,6 +59,15 @@ join_appended_activities as (
on (
stream_{{ i }}.{{ columns.customer }} = stream.{{ columns.customer }}
and stream_{{ i -}}.{{- columns.activity }} = {{ dbt.string_literal(activity.name) }}

{% for col in activity.feature_json_join_columns %}
{% set _stream_i -%} stream_{{ i }}.{{ columns.feature_json }} {%- endset %}
{% set _stream -%} stream.{{ columns.feature_json }} {%- endset %}

and {{ json_unpack_key(_stream_i, col) }} = {{ json_unpack_key(_stream, col) }}

{% endfor %}

and {{ activity.relationship.join_clause(i) }}
)

Expand All @@ -74,11 +84,11 @@ aggregate_appended_activities as (
{% endfor %}

{% for activity in appended_activities %}{% set i = loop.index %}{% set last_outer_loop = loop.last %}
{% for col in columns.appended_activities %}
{% for col in activity.columns_to_append %}

{{ activity.relationship.aggregation_func }}(
{{- alias(activity, col.name) -}}
) as {{ alias(activity, col.name)}}{% if not (last_outer_loop and loop.last) %},{% endif %}
{{- alias(activity, col) -}}
) as {{ alias(activity, col)}}{% if not (last_outer_loop and loop.last) %},{% endif %}

{% endfor %}
{% endfor %}
Expand Down
File renamed without changes.
13 changes: 0 additions & 13 deletions macros/utils/factories/relationship.sql

This file was deleted.

23 changes: 23 additions & 0 deletions macros/utils/json_unpack_key.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{% macro json_unpack_key(json_col, key) %}
{{ return(adapter.dispatch("json_unpack_key", "dbt_activity_schema")(json_col, key))}}
{% endmacro %}

{# params

key: str
The name of the key to unpack from the activity schema feature_json column.
#}

{% macro default__json_unpack_key(json_col, key) -%}

{% if caller %}

json_extract_path_text({{ caller }})

{% else %}

json_extract_path_text({{ json_col }}, {{dbt.string_literal(key) }})

{% endif %}

{%- endmacro %}

0 comments on commit 090ca1d

Please sign in to comment.