Skip to content

Commit

Permalink
Merge branch 'main' into dbt_schema
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasgoetzweiss authored Jul 1, 2024
2 parents dc590ef + 6dc6c3b commit 814541d
Show file tree
Hide file tree
Showing 10 changed files with 85 additions and 52 deletions.
6 changes: 4 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,7 @@ build/
dist/
*.egg-info
*.DS_Store
test_api_key.txt
.vscode/
.vscode/
.venv
.pytest_cache
__pycache__
53 changes: 26 additions & 27 deletions eppo_metrics_sync/eppo_metrics_sync.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
import json, yaml, jsonschema, os, logging, requests
import json
import jsonschema
import os
import requests
import yaml

from eppo_metrics_sync.validation import (
unique_names,
valid_fact_references,
aggregation_is_valid,
unique_names,
valid_fact_references,
metric_aggregation_is_valid
)

from eppo_metrics_sync.dbt_model_parser import DbtModelParser

API_ENDPOINT = 'https://eppo.cloud/api/v1/metrics/sync'


class EppoMetricsSync:
def __init__(
self,
Expand All @@ -30,7 +34,6 @@ def __init__(
schema_path = os.path.join(package_root, 'schema', 'eppo_metric_schema.json')
with open(schema_path) as schema_file:
self.schema = json.load(schema_file)


def load_eppo_yaml(self, path):
with open(path, 'r') as yaml_file:
Expand All @@ -40,7 +43,6 @@ def load_eppo_yaml(self, path):
if 'metrics' in yaml_data:
self.metrics.extend(yaml_data['metrics'])


def load_dbt_yaml(self, path):
if not self.dbt_model_prefix:
raise ValueError('Must specify dbt_model_prefix when schema_type=dbt-model')
Expand All @@ -53,19 +55,20 @@ def load_dbt_yaml(self, path):
DbtModelParser(model, self.dbt_model_prefix).build()
)

def yaml_is_valid(self, yaml_path):
"""
Validate a single YAML file against the schema
def read_yaml_files(self):

# Validate a single YAML file against the schema
def yaml_is_valid(yaml_path):
with open(yaml_path, 'r') as yaml_file:
data = yaml.safe_load(yaml_file)
try:
jsonschema.validate(data, self.schema)
return {"passed": True}
except jsonschema.exceptions.ValidationError as e:
return {"passed": False, "error_message": e}
"""
with open(yaml_path, 'r') as yaml_file:
data = yaml.safe_load(yaml_file)
try:
jsonschema.validate(data, self.schema)
return {"passed": True}
except jsonschema.exceptions.ValidationError as e:
return {"passed": False, "error_message": e}

def read_yaml_files(self):
# Recursively scan the directory for YAML files and load valid ones
for root, _, files in os.walk(self.directory):
for file in files:
Expand All @@ -88,19 +91,16 @@ def yaml_is_valid(yaml_path):
else:
raise ValueError(f'Unexpected schema_type: {self.schema_type}')



if len(self.fact_sources) == 0 and len(self.metrics) == 0:
raise ValueError(
'No valid yaml files found. ' + ', '.join(self.validation_errors)
)


def validate(self):

if(len(self.fact_sources) == 0 and len(self.metrics) == 0):
if len(self.fact_sources) == 0 and len(self.metrics) == 0:
raise ValueError('No fact sources or metrics found, did you call eppo_metrics.read_yaml_files()?')

unique_names(self)
valid_fact_references(self)
metric_aggregation_is_valid(self)
Expand All @@ -113,24 +113,23 @@ def validate(self):

return True


def sync(self):
self.read_yaml_files()
self.validate()

api_key = os.getenv('EPPO_API_KEY')
if not api_key:
raise Exception('EPPO_API_KEY not set in environment variables. Please set and try again')

sync_tag = os.getenv('EPPO_SYNC_TAG')
if not api_key:
raise Exception('EPPO_SYNC_TAG not set in environment variables. Please set and try again')

headers = {"X-Eppo-Token": api_key}
payload = {
"sync_tag": sync_tag,
"fact_sources" : self.fact_sources,
"metrics" : self.metrics
"fact_sources": self.fact_sources,
"metrics": self.metrics
}

response = requests.post(API_ENDPOINT, json=payload, headers=headers)
Expand Down
8 changes: 4 additions & 4 deletions eppo_metrics_sync/schema/eppo_metric_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@
},
"operation": {
"description": "Which aggregation to apply to the fact",
"enum": ["sum", "count", "distinct_entity", "threshold", "conversion", "retention"]
"enum": ["sum", "count", "distinct_entity", "threshold", "conversion", "retention", "count_distinct"]
},
"filters": {
"description": "Optional fact property filters to apply",
Expand Down Expand Up @@ -192,9 +192,9 @@
"description": "Only used if operation = conversion",
"type": "object",
"additionalProperties": false,
"required": ["comparions_operator", "aggregation_type", "breach_value"],
"required": ["comparison_operator", "aggregation_type", "breach_value"],
"properties": {
"comparions_operator": {
"comparison_operator": {
"description": "One of gt or gte",
"enum": ["gt", "gte"]
},
Expand Down Expand Up @@ -246,7 +246,7 @@
},
"operation": {
"description": "How to aggregate fact",
"enum": ["sum", "count", "distinct_entity"]
"enum": ["sum", "count", "distinct_entity", "count_distinct"]
},
"filters": {
"description": "Optional fact property filters to apply",
Expand Down
11 changes: 7 additions & 4 deletions eppo_metrics_sync/validation.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from collections import Counter

simple_aggregation_options = ['sum', 'count', 'distinct_entity']

advanced_aggregation_parameters = [
'retention_threshold_days',
'conversion_threshold_days',
Expand Down Expand Up @@ -115,6 +113,11 @@ def aggregation_is_valid(aggregation):

error_message = []

if aggregation['operation'] not in ['sum', 'count', 'count_distinct', 'distinct_entity', 'threshold', 'retention', 'conversion']:
error_message.append(
'Invalid aggregation operation: ' + aggregation['operation']
)

# can only winsorize sum or count metrics
if aggregation['operation'] not in ['sum', 'count']:
if [name for name in winsorization_parameters if name in aggregation]:
Expand All @@ -130,15 +133,15 @@ def aggregation_is_valid(aggregation):
)

# only set timeframe_parameters on a some operation types
if aggregation['operation'] not in simple_aggregation_options:
if aggregation['operation'] in ['conversion']:
matched = [p for p in timeframe_parameters if p in aggregation]
if matched:
error_message.append(
"Cannot specify " + matched[0] + " for operation " + aggregation['operation']
)

# can't specify advanced aggregation parameters for simple aggregation types
if aggregation['operation'] in simple_aggregation_options:
if aggregation['operation'] not in ['threshold', 'retention', 'conversion']:
matched = [p for p in advanced_aggregation_parameters if p in aggregation]
if matched:
error_message.append(
Expand Down
12 changes: 12 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
attrs==23.2.0
certifi==2024.2.2
charset-normalizer==3.3.2
idna==3.7
iniconfig==2.0.0
jsonschema==3.2.0
packaging==24.0
pluggy==1.5.0
pyrsistent==0.20.0
pytest==8.2.0
PyYAML==6.0.1
requests==2.31.0
setuptools==69.5.1
six==1.16.0
urllib3==2.2.1
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

setup(
name='eppo_metrics_sync',
version='0.0.0',
version='0.0.1',
packages=find_packages(),
install_requires=[
'PyYAML', 'jsonschema', 'requests'
Expand Down
15 changes: 12 additions & 3 deletions tests/test_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,14 @@ def test_invalid_winsorization_operation():

def test_invalid_aggregation_for_timeframe():
test_agg = {
'operation': 'retention',
'operation': 'conversion',
'aggregation_timeframe_value': 1,
'aggregation_timeframe_unit': 'days',
'retention_threshold_days': 1
'conversion_threshold_days': 1
}

res = aggregation_is_valid(test_agg)
assert res == 'Cannot specify aggregation_timeframe_value for operation retention'
assert res == 'Cannot specify aggregation_timeframe_value for operation conversion'


def test_invalid_timeframe_parameters():
Expand Down Expand Up @@ -122,3 +122,12 @@ def test_extra_parameter_on_retention_metric():
res = aggregation_is_valid(test_agg)
assert res == 'Invalid parameter for retention aggregation: conversion_threshold_days'

def test_count_distinct():
test_agg = {
'operation': 'count_distinct',
'aggregation_timeframe_value': 1,
'aggregation_timeframe_unit': 'days'
}

res = aggregation_is_valid(test_agg)
assert res == None
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,4 @@ fact_sources:
column: user_id
facts:
- name: Net Subscriptions
column: event_value

metrics:
- name: Net subscriptions
description: Sum of upgrade and downgrade events
entity: User
numerator:
fact_name: Net Subscriptions
operation: sum
winsorization_lower_percentile: 0.01
winsorization_upper_percentile: 0.99
column: event_value
17 changes: 17 additions & 0 deletions tests/yaml/valid/global_kpis/net_subcriptions_metric.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
metrics:
- name: Net subscriptions
description: Sum of upgrade and downgrade events
entity: User
numerator:
fact_name: Net Subscriptions
operation: sum
winsorization_lower_percentile: 0.01
winsorization_upper_percentile: 0.99
- name: Net subscriptions over distinct subscriptions
entity: User
numerator:
fact_name: Net Subscriptions
operation: sum
denominator:
fact_name: Net Subscriptions
operation: count_distinct
1 change: 1 addition & 0 deletions tests/yaml/valid/purchases.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ fact_sources:
column: COUNTRY
- name: Browser
column: BROWSER
reference_url: https://github.com/Eppo-exp/eppo-metrics-sync
metrics:
- name: Unique Purchase by User
entity: User # it would be nice if this was optional if there is exactly 1 entity defined above
Expand Down

0 comments on commit 814541d

Please sign in to comment.