Skip to content
This repository has been archived by the owner on Apr 15, 2022. It is now read-only.

Commit

Permalink
Merge pull request #101 from splicemachine/DBAAS-4984
Browse files Browse the repository at this point in the history
Dbaas 4984
  • Loading branch information
Ben Epstein authored Jan 15, 2021
2 parents 90f874f + 0c59d72 commit 55dda8d
Show file tree
Hide file tree
Showing 9 changed files with 285 additions and 12 deletions.
2 changes: 1 addition & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
'private-members':True,
'inherited-members':True,
'undoc-members': False,
'exclude-members': '_validate_feature_vector_keys,_process_features,__prune_features_for_elimination,_register_metadata,_register_metadata,__update_deployment_status,__log_mlflow_results,__get_feature_importance,__get_pipeline,_validate_training_view,_validate_feature_set,_validate_feature,__validate_feature_data_type,_check_for_splice_ctx,_dropTableIfExists, _generateDBSchema,_getCreateTableSchema,_jstructtype,_spliceSparkPackagesName,_splicemachineContext,apply_patches, main'
'exclude-members': '_retrieve_model_data_sets,_retrieve_training_set_metadata_from_deployement,_validate_feature_vector_keys,_process_features,__prune_features_for_elimination,_register_metadata,_register_metadata,__update_deployment_status,__log_mlflow_results,__get_feature_importance,__get_pipeline,_validate_training_view,_validate_feature_set,_validate_feature,__validate_feature_data_type,_check_for_splice_ctx,_dropTableIfExists, _generateDBSchema,_getCreateTableSchema,_jstructtype,_spliceSparkPackagesName,_splicemachineContext,apply_patches, main'
}

# Add any paths that contain templates here, relative to this directory.
Expand Down
20 changes: 20 additions & 0 deletions splicemachine/features/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,26 @@ class SQL:
SELECT {feature_names} FROM {feature_sets} WHERE
"""

get_deployment_metadata = f"""
SELECT tv.name, d.training_set_start_ts, d.training_set_end_ts,
string_agg(f.name,',') features
FROM featurestore.deployment d
INNER JOIN {FEATURE_STORE_SCHEMA}.training_set ts ON d.training_set_id=ts.training_set_id
INNER JOIN {FEATURE_STORE_SCHEMA}.training_set_feature tsf ON tsf.training_set_id=d.training_set_id
LEFT OUTER JOIN {FEATURE_STORE_SCHEMA}.training_view tv ON tv.view_id = ts.view_id
INNER JOIN {FEATURE_STORE_SCHEMA}.feature f ON tsf.feature_id=f.feature_id
WHERE d.model_schema_name = '{{schema_name}}'
AND d.model_table_name = '{{table_name}}'
GROUP BY 1,2,3
"""

get_model_predictions = """
SELECT EVAL_TIME,
PREDICTION
FROM {schema_name}.{table_name} WHERE EVAL_TIME>='{start_time}' AND EVAL_TIME<'{end_time}'
ORDER BY EVAL_TIME
"""

class Columns:
feature = ['feature_id', 'feature_set_id', 'name', 'description', 'feature_data_type', 'feature_type',
'tags', 'compliance_level', 'last_update_ts', 'last_update_username']
Expand Down
2 changes: 1 addition & 1 deletion splicemachine/features/feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

class Feature:
def __init__(self, *, name, description, feature_data_type, feature_type, tags, feature_set_id=None, feature_id=None, **kwargs):
self.name = name
self.name = name.upper()
self.description = description
self.feature_data_type = feature_data_type
self.feature_type = feature_type
Expand Down
5 changes: 3 additions & 2 deletions splicemachine/features/feature_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ def __init__(self, *, splice_ctx: PySpliceContext, table_name, schema_name, desc
primary_keys: Dict[str, str], feature_set_id=None, deployed: bool = False, **kwargs):
self.splice_ctx = splice_ctx

self.table_name = table_name
self.schema_name = schema_name
self.table_name = table_name.upper()
self.schema_name = schema_name.upper()
self.description = description
self.primary_keys = primary_keys
self.feature_set_id = feature_set_id
Expand Down Expand Up @@ -91,6 +91,7 @@ def __update_deployment_status(self, status: bool):
"""
self.splice_ctx.execute(SQL.update_fset_deployment_status.format(status=int(status),
feature_set_id=self.feature_set_id))
self.deployed = True


def deploy(self, verbose=False):
Expand Down
138 changes: 132 additions & 6 deletions splicemachine/features/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from pandas import DataFrame as PandasDF

from pyspark.sql.dataframe import DataFrame as SparkDF
import pyspark.sql.functions as psf
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.regression import RandomForestRegressor
Expand All @@ -16,8 +17,9 @@
from splicemachine.spark import PySpliceContext
from splicemachine.features import Feature, FeatureSet
from .training_set import TrainingSet
from .utils import (dict_to_lower, _generate_training_set_history_sql,
_generate_training_set_sql, _create_temp_training_view)
from .utils.drift_utils import (add_feature_plot, remove_outliers, datetime_range_split, build_feature_drift_plot, build_model_drift_plot)
from .utils.training_utils import (dict_to_lower, _generate_training_set_history_sql,
_generate_training_set_sql, _create_temp_training_view)
from .constants import SQL, FeatureType
from .training_view import TrainingView

Expand Down Expand Up @@ -317,7 +319,6 @@ def get_training_set(self, features: Union[List[Feature], List[str]], current_va
temp_vw = _create_temp_training_view(features, fsets)
sql = _generate_training_set_history_sql(temp_vw, features, fsets, start_time=start_time, end_time=end_time)


# Here we create a null training view and pass it into the training set. We do this because this special kind
# of training set isn't standard. It's not based on a training view, on primary key columns, a label column,
# or a timestamp column . This is simply a joined set of features from different feature sets.
Expand All @@ -335,7 +336,7 @@ def get_training_set(self, features: Union[List[Feature], List[str]], current_va
ts.start_time = ts.end_time

if self.mlflow_ctx and not return_sql:
self.mlflow_ctx._active_training_set: TrainingSet = ts
self.mlflow_ctx._active_training_set = ts
ts._register_metadata(self.mlflow_ctx)
return sql if return_sql else self.splice_ctx.df(sql)

Expand Down Expand Up @@ -412,13 +413,17 @@ def list_training_sets(self) -> Dict[str, Optional[str]]:
"""
raise NotImplementedError("To see available training views, run fs.describe_training_views()")

def _validate_feature_set(self, schema_name, table_name):
def _validate_feature_set(self, schema_name: str, table_name: str):
"""
Asserts a feature set doesn't already exist in the database
:param schema_name: schema name of the feature set
:param table_name: table name of the feature set
:return: None
"""
# database stores object names in upper case
schema_name = schema_name.upper()
table_name = table_name.upper()

str = f'Feature Set {schema_name}.{table_name} already exists. Use a different schema and/or table name.'
# Validate Table
assert not self.splice_ctx.tableExists(schema_name, table_name=table_name), str
Expand All @@ -436,6 +441,10 @@ def create_feature_set(self, schema_name: str, table_name: str, primary_keys: Di
:param desc: The (optional) description
:return: FeatureSet
"""
# database stores object names in upper case
schema_name = schema_name.upper()
table_name = table_name.upper()

self._validate_feature_set(schema_name, table_name)
fset = FeatureSet(splice_ctx=self.splice_ctx, schema_name=schema_name, table_name=table_name,
primary_keys=primary_keys,
Expand Down Expand Up @@ -494,6 +503,10 @@ def create_feature(self, schema_name: str, table_name: str, name: str, feature_d
:return: Feature created
"""
self.__validate_feature_data_type(feature_data_type)
# database stores object names in upper case
schema_name = schema_name.upper()
table_name = table_name.upper()

if self.splice_ctx.tableExists(schema_name, table_name):
raise SpliceMachineException(f"Feature Set {schema_name}.{table_name} is already deployed. You cannot "
f"add features to a deployed feature set.")
Expand Down Expand Up @@ -604,7 +617,7 @@ def _process_features(self, features: List[Union[Feature, str]]) -> List[Feature
" a feature name (string) or a Feature object"
return all_features

def deploy_feature_set(self, schema_name, table_name):
def deploy_feature_set(self, schema_name: str, table_name: str):
"""
Deploys a feature set to the database. This persists the feature stores existence.
As of now, once deployed you cannot delete the feature set or add/delete features.
Expand All @@ -614,6 +627,9 @@ def deploy_feature_set(self, schema_name, table_name):
:param table_name: The table of the created feature set
"""
try:
# database stores object names in upper case
schema_name = schema_name.upper()
table_name = table_name.upper()
fset = self.get_feature_sets(_filter={'schema_name': schema_name, 'table_name': table_name})[0]
except:
raise SpliceMachineException(
Expand Down Expand Up @@ -642,6 +658,10 @@ def describe_feature_set(self, schema_name: str, table_name: str) -> None:
:param table_name: feature set table name
:return: None
"""
# database stores object names in upper case
schema_name = schema_name.upper()
table_name = table_name.upper()

fset = self.get_feature_sets(_filter={'schema_name': schema_name, 'table_name': table_name})
if not fset: raise SpliceMachineException(
f"Feature Set {schema_name}.{table_name} not found. Check name and try again.")
Expand Down Expand Up @@ -689,6 +709,111 @@ def describe_training_view(self, training_view: str) -> None:
def set_feature_description(self):
raise NotImplementedError

def get_training_set_from_deployment(self, schema_name: str, table_name: str):
"""
Reads Feature Store metadata to rebuild orginal training data set used for the given deployed model.
:param schema_name: model schema name
:param table_name: model table name
:return:
"""
# database stores object names in upper case
schema_name = schema_name.upper()
table_name = table_name.upper()

metadata = self._retrieve_training_set_metadata_from_deployement(schema_name, table_name)
features = metadata['FEATURES'].split(',')
tv_name = metadata['NAME']
start_time = metadata['TRAINING_SET_START_TS']
end_time = metadata['TRAINING_SET_END_TS']
if tv_name:
training_set_df = self.get_training_set_from_view(training_view=tv_name, start_time=start_time,
end_time=end_time, features=features)
else:
training_set_df = self.get_training_set(features=features, start_time=start_time, end_time=end_time)
return training_set_df

def _retrieve_model_data_sets(self, schema_name: str, table_name: str):
"""
Returns the training set dataframe and model table dataframe for a given deployed model.
:param schema_name: model schema name
:param table_name: model table name
:return:
"""
# database stores object names in upper case
schema_name = schema_name.upper()
table_name = table_name.upper()

training_set_df = self.get_training_set_from_deployment(schema_name, table_name)
model_table_df = self.splice_ctx.df(f'SELECT * FROM {schema_name}.{table_name}')
return training_set_df, model_table_df

def _retrieve_training_set_metadata_from_deployement(self, schema_name: str, table_name: str):
"""
Reads Feature Store metadata to retrieve definition of training set used to train the specified model.
:param schema_name: model schema name
:param table_name: model table name
:return:
"""
# database stores object names in upper case
schema_name = schema_name.upper()
table_name = table_name.upper()

sql = SQL.get_deployment_metadata.format(schema_name=schema_name, table_name=table_name)
deploy_df = self.splice_ctx.df(sql).collect()
cnt = len(deploy_df)
if cnt == 1:
return deploy_df[0]

def display_model_feature_drift(self, schema_name: str, table_name: str):
"""
Displays feature by feature comparison between the training set of the deployed model and the input feature
values used with the model since deployment.
:param schema_name: name of database schema where model table is deployed
:param table_name: name of the model table
:return: None
"""
# database stores object names in upper case
schema_name = schema_name.upper()
table_name = table_name.upper()

metadata = self._retrieve_training_set_metadata_from_deployement(schema_name, table_name)
if not metadata:
raise SpliceMachineException(f"Could not find deployment for model table {schema_name}.{table_name}") from None
training_set_df, model_table_df = self._retrieve_model_data_sets(schema_name, table_name)
features = metadata['FEATURES'].split(',')
build_feature_drift_plot(features, training_set_df, model_table_df)


def display_model_drift(self, schema_name: str, table_name: str, time_intervals: int,
start_time: datetime = None, end_time: datetime = None):
"""
Displays as many as 'time_intervals' plots showing the distribution of the model prediction within each time
period. Time periods are equal periods of time where predictions are present in the model table
'schema_name'.'table_name'. Model predictions are first filtered to only those occurring after 'start_time' if
specified and before 'end_time' if specified.
:param schema_name: schema where the model table resides
:param table_name: name of the model table
:param time_intervals: number of time intervals to plot
:param start_time: if specified, filters to only show predictions occurring after this date/time
:param end_time: if specified, filters to only show predictions occurring before this date/time
:return: None
"""
# database stores object names in upper case
schema_name = schema_name.upper()
table_name = table_name.upper()

# set default timeframe if not specified
if not start_time:
start_time = datetime(1900, 1, 1, 0, 0, 0)
if not end_time:
end_time = datetime.now()
# retrieve predictions the model has made over time
sql = SQL.get_model_predictions.format(schema_name=schema_name, table_name=table_name,
start_time=start_time, end_time=end_time)
model_table_df = self.splice_ctx.df(sql)
build_model_drift_plot(model_table_df, time_intervals)


def __get_pipeline(self, df, features, label, model_type):
"""
Creates a Pipeline with preprocessing steps (StringINdexer, VectorAssembler) for each feature depending
Expand Down Expand Up @@ -736,6 +861,7 @@ def __log_mlflow_results(self, name, rounds, mlflow_results):
:param name: MLflow run name
:param rounds: Number of rounds of feature elimination that were run
:param mlflow_results: The params / metrics to log
:return:
"""
try:
if self.mlflow_ctx.active_run():
Expand Down
Empty file.
Loading

0 comments on commit 55dda8d

Please sign in to comment.