From b17c9849f7e7dceb50452e908cf34de71beb127b Mon Sep 17 00:00:00 2001 From: sergioferragut Date: Tue, 12 Jan 2021 17:45:03 -0800 Subject: [PATCH 01/13] added feature and model drift API --- splicemachine/features/constants.py | 20 +++ splicemachine/features/feature_store.py | 172 ++++++++++++++++++++++-- 2 files changed, 183 insertions(+), 9 deletions(-) diff --git a/splicemachine/features/constants.py b/splicemachine/features/constants.py index 7f7bdeb..b0eea9c 100644 --- a/splicemachine/features/constants.py +++ b/splicemachine/features/constants.py @@ -191,6 +191,26 @@ class SQL: SELECT {feature_names} FROM {feature_sets} WHERE """ + get_deployment_metadata = f""" + SELECT tv.name, d.training_set_start_ts, d.training_set_end_ts, + string_agg(f.name,',') features + FROM featurestore.deployment d + INNER JOIN {FEATURE_STORE_SCHEMA}.training_set ts ON d.training_set_id=ts.training_set_id + INNER JOIN {FEATURE_STORE_SCHEMA}.training_set_feature tsf ON tsf.training_set_id=d.training_set_id + LEFT OUTER JOIN {FEATURE_STORE_SCHEMA}.training_view tv ON tv.view_id = ts.view_id + INNER JOIN {FEATURE_STORE_SCHEMA}.feature f ON tsf.feature_id=f.feature_id + WHERE d.model_schema_name = '{{schema_name}}' + AND d.model_table_name = '{{table_name}}' + GROUP BY 1,2,3 + """ + + get_model_predictions = """ + SELECT EVAL_TIME, + PREDICTION + FROM {schema_name}.{table_name} WHERE EVAL_TIME>='{start_time}' AND EVAL_TIME<'{end_time}' + ORDER BY EVAL_TIME + """ + class Columns: feature = ['feature_id', 'feature_set_id', 'name', 'description', 'feature_data_type', 'feature_type', 'tags', 'compliance_level', 'last_update_ts', 'last_update_username'] diff --git a/splicemachine/features/feature_store.py b/splicemachine/features/feature_store.py index 28781e2..66b1dc0 100644 --- a/splicemachine/features/feature_store.py +++ b/splicemachine/features/feature_store.py @@ -25,7 +25,6 @@ class FeatureStore: def __init__(self, splice_ctx: PySpliceContext) -> None: self.splice_ctx = splice_ctx - self.mlflow_ctx = None self.feature_sets = [] # Cache of newly created feature sets def register_splice_context(self, splice_ctx: PySpliceContext) -> None: @@ -167,7 +166,7 @@ def get_feature_vector(self, features: List[Union[str, Feature]], Gets a feature vector given a list of Features and primary key values for their corresponding Feature Sets :param features: List of str Feature names or Features - :param join_key_values: (dict) join key values to get the proper Feature values formatted as {join_key_column_name: join_key_value} + :param join_key_values: (dict) join key vals to get the proper Feature values formatted as {join_key_column_name: join_key_value} :param return_sql: Whether to return the SQL needed to get the vector or the values themselves. Default False :return: Pandas Dataframe or str (SQL statement) """ @@ -317,7 +316,6 @@ def get_training_set(self, features: Union[List[Feature], List[str]], current_va temp_vw = _create_temp_training_view(features, fsets) sql = _generate_training_set_history_sql(temp_vw, features, fsets, start_time=start_time, end_time=end_time) - # Here we create a null training view and pass it into the training set. We do this because this special kind # of training set isn't standard. It's not based on a training view, on primary key columns, a label column, # or a timestamp column . This is simply a joined set of features from different feature sets. @@ -689,6 +687,166 @@ def describe_training_view(self, training_view: str) -> None: def set_feature_description(self): raise NotImplementedError + def _retrieve_training_set_from_deployment(self, schema_name, table_name): + metadata = self._retrieve_training_set_metadata_from_deployement(schema_name, table_name) + features = metadata['FEATURES'].split(',') + tv_name = metadata['NAME'] + start_time = metadata['TRAINING_SET_START_TS'] + end_time = metadata['TRAINING_SET_END_TS'] + if (tv_name): + training_set_df = self.get_training_set_from_view(training_view=tv_name, start_time=start_time, + end_time=end_time, features=features) + else: + training_set_df = self.get_training_set(features=features, start_time=start_time, end_time=end_time) + return training_set_df + + def _retrieve_model_data_sets(self, schema_name, table_name): + training_set_df = self._retrieve_training_set_from_deployment(schema_name, table_name) + model_table_df = self.splice_ctx.df(f'SELECT * FROM {schema_name}.{table_name}') + return training_set_df, model_table_df + + def _retrieve_training_set_metadata_from_deployement(self, schema_name: str, table_name: str): + sql = SQL.get_deployment_metadata.format(schema_name=schema_name, table_name=table_name) + deploy_df = self.splice_ctx.df(sql).collect() + cnt = len(deploy_df) + if (cnt == 1): + return deploy_df[0] + + def _calculate_bounds(self, df, column_name): + """ + Calculates outlier bounds based on interquartile range of distribution of values in column 'column_name' + from data set in data frame 'df'. + :param df: data frame containing data to be analyzed + :param column_name: column name to analyze + :return: dictionary with keys min, max, q1 and q3 keys and corresponding values for outlier minimum, maximum + and 25th and 75th percentile values (q1,q3) + """ + bounds = dict(zip(["q1", "q3"], df.approxQuantile(column_name, [0.25, 0.75], 0))) + iqr = bounds['q3'] - bounds['q1'] + bounds['min'] = bounds['q1'] - (iqr * 1.5) + bounds['max'] = bounds['q3'] + (iqr * 1.5) + return bounds + + def _remove_outliers(self, df, column_name): + ''' + Calculates outlier bounds no distribution of 'column_name' values and returns a filtered data frame without + outliers in the specified column. + :param df: data frame with data to remove outliers from + :param column_name: name of column to remove outliers from + :return: input data frame filtered to remove outliers + ''' + import pyspark.sql.functions as f + bounds = self._calculate_bounds(df, column_name) + return df.filter((f.col(column_name) >= bounds['min']) & (f.col(column_name) <= bounds['max'])) + + def _add_feature_plot(self, ax, train_df, model_df, feature, n_bins): + ''' + Adds a distplot of the outlier free feature values from both train_df and model_df data frames which both + contain the feature. + :param ax: target subplot for chart + :param train_df: training data containing feature of interest + :param model_df: model input data also containing feature of interest + :param feature: name of feature to display in distribution histogram + :param n_bins: number of bins to use in histogram plot + :return: None + ''' + from pyspark_dist_explore import distplot + import pyspark.sql.functions as f + distplot(ax, [self._remove_outliers(train_df.select(f.col(feature).alias('training')), 'training'), + self._remove_outliers(model_df.select(f.col(feature).alias('model')), 'model')], bins=n_bins) + ax.set_title(feature) + ax.legend() + + def display_model_feature_drift(self, schema_name, table_name): + """ + Displays feature by feature comparison between the training set of the deployed model and the input feature + values used with the model since deployment. + :param schema_name: name of database schema where model table is deployed + :param table_name: name of the model table + :return: None + """ + from matplotlib.pyplot import show, subplots + metadata = self._retrieve_training_set_metadata_from_deployement(schema_name, table_name) + if metadata: + features = metadata['FEATURES'].split(',') + training_set_df, model_table_df = self._retrieve_model_data_sets(schema_name, table_name) + final_features = [f for f in features if f in model_table_df.columns] + # prep plot area + n_bins = 15 + num_features = len(final_features) + n_rows = int(num_features / 5) + fig, axes = subplots(nrows=n_rows, ncols=5, figsize=(30, 10 * n_rows)) + axes = axes.flatten() + # calculate combined plots for each feature + for plot, f in enumerate(final_features): + self._add_feature_plot(axes[plot], training_set_df, model_table_df, f, n_bins) + show() + else: + print(f"Could not find deployment for model table {schema_name}.{table_name}") + + def _datetime_range(self, start: datetime, end: datetime, number: int): + """ + Subdivides the time frame defined by 'start' and 'end' parameters into 'number' equal time frames. + :param start: start date time + :param end: end date time + :param number: number of time frames to split into + :return: list of start/end date times + """ + from datetime import datetime + from itertools import count, islice + start_secs = (start - datetime(1970, 1, 1)).total_seconds() + end_secs = (end - datetime(1970, 1, 1)).total_seconds() + dates = [datetime.fromtimestamp(el) for el in + islice(count(start_secs, (end_secs - start_secs) / number), number + 1)] + return zip(dates, dates[1:]) + + def display_model_drift(self, schema_name: str, table_name: str, time_intervals: int, + start_time: datetime = None, end_time: datetime = None): + """ + Displays as many as 'time_intervals' plots showing the distribution of the model prediction within each time + period. Time periods are equal periods of time where predictions are present in the model table + 'schema_name'.'table_name'. Model predictions are first filtered to only those occurring after 'start_time' if + specified and before 'end_time' if specified. + :param schema_name: schema where the model table resides + :param table_name: name of the model table + :param time_intervals: number of time intervals to plot + :param start_time: if specified, filters to only show predictions occurring after this date/time + :param end_time: if specified, filters to only show predictions occurring before this date/time + :return: None + """ + from datetime import datetime + import matplotlib.pyplot as plt + from pyspark_dist_explore import distplot + import pyspark.sql.functions as f + + # set default timeframe if not specified + if not start_time: + start_time = datetime(1900, 1, 1, 0, 0, 0) + if not end_time: + end_time = datetime.now() + # retrieve predictions the model has made over time + sql = SQL.get_model_predictions.format(schema_name=schema_name, table_name=table_name, start_time=start_time, + end_time=end_time) + model_table_df = self.splice_ctx.df(sql) + min_ts = model_table_df.first()['EVAL_TIME'] + max_ts = model_table_df.orderBy(f.col("EVAL_TIME").desc()).first()['EVAL_TIME'] + + if max_ts > min_ts: + intervals = self._datetime_range(min_ts, max_ts, time_intervals) + n_rows = int(time_intervals / 5) + fig, axes = plt.subplots(nrows=n_rows, ncols=5, figsize=(30, 10 * n_rows)) + axes = axes.flatten() + for i, time_int in enumerate(intervals): + df = model_table_df.filter((f.col('EVAL_TIME') >= time_int[0]) & (f.col('EVAL_TIME') < time_int[1])) + distplot(axes[i], [self._remove_outliers(df.select(f.col('PREDICTION')), 'PREDICTION')], bins=15) + axes[i].set_title(f"{time_int[0]}") + axes[i].legend() + else: + fig, axes = plt.subplots(nrows=1, ncols=1, figsize=(10, 10)) + distplot(axes, [self._remove_outliers(model_table_df.select(f.col('PREDICTION')), 'PREDICTION')], bins=15) + axes.set_title(f"Predictions at {min_ts}") + axes.legend() + def __get_pipeline(self, df, features, label, model_type): """ Creates a Pipeline with preprocessing steps (StringINdexer, VectorAssembler) for each feature depending @@ -736,16 +894,12 @@ def __log_mlflow_results(self, name, rounds, mlflow_results): :param name: MLflow run name :param rounds: Number of rounds of feature elimination that were run :param mlflow_results: The params / metrics to log + :return: """ - try: - if self.mlflow_ctx.active_run(): - self.mlflow_ctx.start_run(run_name=name) + with self.mlflow_ctx.start_run(run_name=name): for r in range(rounds): with self.mlflow_ctx.start_run(run_name=f'Round {r}', nested=True): self.mlflow_ctx.log_metrics(mlflow_results[r]) - finally: - self.mlflow_ctx.end_run() - def __prune_features_for_elimination(self, features) -> List[Feature]: """ From d19eb63781064c5b7cbb74731fac003280e82bcd Mon Sep 17 00:00:00 2001 From: sergioferragut Date: Wed, 13 Jan 2021 08:42:43 -0800 Subject: [PATCH 02/13] adjusted subplot rows to account for extra plots --- splicemachine/features/feature_store.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/splicemachine/features/feature_store.py b/splicemachine/features/feature_store.py index 66b1dc0..11d435a 100644 --- a/splicemachine/features/feature_store.py +++ b/splicemachine/features/feature_store.py @@ -775,6 +775,8 @@ def display_model_feature_drift(self, schema_name, table_name): n_bins = 15 num_features = len(final_features) n_rows = int(num_features / 5) + if num_features % 5 > 0: + n_rows = n_rows + 1 fig, axes = subplots(nrows=n_rows, ncols=5, figsize=(30, 10 * n_rows)) axes = axes.flatten() # calculate combined plots for each feature @@ -834,6 +836,8 @@ def display_model_drift(self, schema_name: str, table_name: str, time_intervals: if max_ts > min_ts: intervals = self._datetime_range(min_ts, max_ts, time_intervals) n_rows = int(time_intervals / 5) + if time_intervals % 5 > 0: + n_rows = n_rows + 1 fig, axes = plt.subplots(nrows=n_rows, ncols=5, figsize=(30, 10 * n_rows)) axes = axes.flatten() for i, time_int in enumerate(intervals): From 54b505ced44cb5367a3fe2929049414e97d7b23f Mon Sep 17 00:00:00 2001 From: Ben Epstein Date: Wed, 13 Jan 2021 10:31:35 -0700 Subject: [PATCH 03/13] better utilities --- splicemachine/features/feature_store.py | 4 ++-- splicemachine/features/utils/drift_utils.py | 3 +++ .../features/{utils.py => utils/training_utils.py} | 6 ++++-- 3 files changed, 9 insertions(+), 4 deletions(-) create mode 100644 splicemachine/features/utils/drift_utils.py rename splicemachine/features/{utils.py => utils/training_utils.py} (98%) diff --git a/splicemachine/features/feature_store.py b/splicemachine/features/feature_store.py index 11d435a..468b9dc 100644 --- a/splicemachine/features/feature_store.py +++ b/splicemachine/features/feature_store.py @@ -16,8 +16,8 @@ from splicemachine.spark import PySpliceContext from splicemachine.features import Feature, FeatureSet from .training_set import TrainingSet -from .utils import (dict_to_lower, _generate_training_set_history_sql, - _generate_training_set_sql, _create_temp_training_view) +from .utils.training_utils import (dict_to_lower, _generate_training_set_history_sql, + _generate_training_set_sql, _create_temp_training_view) from .constants import SQL, FeatureType from .training_view import TrainingView diff --git a/splicemachine/features/utils/drift_utils.py b/splicemachine/features/utils/drift_utils.py new file mode 100644 index 0000000..13a609f --- /dev/null +++ b/splicemachine/features/utils/drift_utils.py @@ -0,0 +1,3 @@ +""" +A set of utility functions for calculating drift of deployed models +""" diff --git a/splicemachine/features/utils.py b/splicemachine/features/utils/training_utils.py similarity index 98% rename from splicemachine/features/utils.py rename to splicemachine/features/utils/training_utils.py index 2f4a0b5..6f27cde 100644 --- a/splicemachine/features/utils.py +++ b/splicemachine/features/utils/training_utils.py @@ -1,9 +1,11 @@ from splicemachine import SpliceMachineException from typing import List -from .feature import Feature -from .feature_set import FeatureSet +from splicemachine.features import Feature, FeatureSet from splicemachine.features.training_view import TrainingView +""" +A set of utility functions for creating Training Set SQL +""" def clean_df(df, cols): for old, new in zip(df.columns, cols): From 789be6bc8b8bac4d840383ae13a75152f904facd Mon Sep 17 00:00:00 2001 From: sergioferragut Date: Wed, 13 Jan 2021 13:01:14 -0800 Subject: [PATCH 04/13] refactor --- splicemachine/features/feature_store.py | 77 +++------------------ splicemachine/features/utils/drift_utils.py | 65 +++++++++++++++++ 2 files changed, 74 insertions(+), 68 deletions(-) diff --git a/splicemachine/features/feature_store.py b/splicemachine/features/feature_store.py index 468b9dc..6872985 100644 --- a/splicemachine/features/feature_store.py +++ b/splicemachine/features/feature_store.py @@ -16,6 +16,7 @@ from splicemachine.spark import PySpliceContext from splicemachine.features import Feature, FeatureSet from .training_set import TrainingSet +from .utils.drift_utils import add_feature_plot, remove_outliers, datetime_range_split from .utils.training_utils import (dict_to_lower, _generate_training_set_history_sql, _generate_training_set_sql, _create_temp_training_view) from .constants import SQL, FeatureType @@ -687,20 +688,20 @@ def describe_training_view(self, training_view: str) -> None: def set_feature_description(self): raise NotImplementedError - def _retrieve_training_set_from_deployment(self, schema_name, table_name): + def _retrieve_training_set_from_deployment( self, schema_name, table_name): metadata = self._retrieve_training_set_metadata_from_deployement(schema_name, table_name) features = metadata['FEATURES'].split(',') tv_name = metadata['NAME'] start_time = metadata['TRAINING_SET_START_TS'] end_time = metadata['TRAINING_SET_END_TS'] - if (tv_name): + if tv_name: training_set_df = self.get_training_set_from_view(training_view=tv_name, start_time=start_time, end_time=end_time, features=features) else: training_set_df = self.get_training_set(features=features, start_time=start_time, end_time=end_time) return training_set_df - def _retrieve_model_data_sets(self, schema_name, table_name): + def _retrieve_model_data_sets( self, schema_name, table_name): training_set_df = self._retrieve_training_set_from_deployment(schema_name, table_name) model_table_df = self.splice_ctx.df(f'SELECT * FROM {schema_name}.{table_name}') return training_set_df, model_table_df @@ -709,54 +710,9 @@ def _retrieve_training_set_metadata_from_deployement(self, schema_name: str, tab sql = SQL.get_deployment_metadata.format(schema_name=schema_name, table_name=table_name) deploy_df = self.splice_ctx.df(sql).collect() cnt = len(deploy_df) - if (cnt == 1): + if cnt == 1: return deploy_df[0] - def _calculate_bounds(self, df, column_name): - """ - Calculates outlier bounds based on interquartile range of distribution of values in column 'column_name' - from data set in data frame 'df'. - :param df: data frame containing data to be analyzed - :param column_name: column name to analyze - :return: dictionary with keys min, max, q1 and q3 keys and corresponding values for outlier minimum, maximum - and 25th and 75th percentile values (q1,q3) - """ - bounds = dict(zip(["q1", "q3"], df.approxQuantile(column_name, [0.25, 0.75], 0))) - iqr = bounds['q3'] - bounds['q1'] - bounds['min'] = bounds['q1'] - (iqr * 1.5) - bounds['max'] = bounds['q3'] + (iqr * 1.5) - return bounds - - def _remove_outliers(self, df, column_name): - ''' - Calculates outlier bounds no distribution of 'column_name' values and returns a filtered data frame without - outliers in the specified column. - :param df: data frame with data to remove outliers from - :param column_name: name of column to remove outliers from - :return: input data frame filtered to remove outliers - ''' - import pyspark.sql.functions as f - bounds = self._calculate_bounds(df, column_name) - return df.filter((f.col(column_name) >= bounds['min']) & (f.col(column_name) <= bounds['max'])) - - def _add_feature_plot(self, ax, train_df, model_df, feature, n_bins): - ''' - Adds a distplot of the outlier free feature values from both train_df and model_df data frames which both - contain the feature. - :param ax: target subplot for chart - :param train_df: training data containing feature of interest - :param model_df: model input data also containing feature of interest - :param feature: name of feature to display in distribution histogram - :param n_bins: number of bins to use in histogram plot - :return: None - ''' - from pyspark_dist_explore import distplot - import pyspark.sql.functions as f - distplot(ax, [self._remove_outliers(train_df.select(f.col(feature).alias('training')), 'training'), - self._remove_outliers(model_df.select(f.col(feature).alias('model')), 'model')], bins=n_bins) - ax.set_title(feature) - ax.legend() - def display_model_feature_drift(self, schema_name, table_name): """ Displays feature by feature comparison between the training set of the deployed model and the input feature @@ -781,26 +737,11 @@ def display_model_feature_drift(self, schema_name, table_name): axes = axes.flatten() # calculate combined plots for each feature for plot, f in enumerate(final_features): - self._add_feature_plot(axes[plot], training_set_df, model_table_df, f, n_bins) + add_feature_plot(axes[plot], training_set_df, model_table_df, f, n_bins) show() else: print(f"Could not find deployment for model table {schema_name}.{table_name}") - def _datetime_range(self, start: datetime, end: datetime, number: int): - """ - Subdivides the time frame defined by 'start' and 'end' parameters into 'number' equal time frames. - :param start: start date time - :param end: end date time - :param number: number of time frames to split into - :return: list of start/end date times - """ - from datetime import datetime - from itertools import count, islice - start_secs = (start - datetime(1970, 1, 1)).total_seconds() - end_secs = (end - datetime(1970, 1, 1)).total_seconds() - dates = [datetime.fromtimestamp(el) for el in - islice(count(start_secs, (end_secs - start_secs) / number), number + 1)] - return zip(dates, dates[1:]) def display_model_drift(self, schema_name: str, table_name: str, time_intervals: int, start_time: datetime = None, end_time: datetime = None): @@ -834,7 +775,7 @@ def display_model_drift(self, schema_name: str, table_name: str, time_intervals: max_ts = model_table_df.orderBy(f.col("EVAL_TIME").desc()).first()['EVAL_TIME'] if max_ts > min_ts: - intervals = self._datetime_range(min_ts, max_ts, time_intervals) + intervals = datetime_range_split(min_ts, max_ts, time_intervals) n_rows = int(time_intervals / 5) if time_intervals % 5 > 0: n_rows = n_rows + 1 @@ -842,12 +783,12 @@ def display_model_drift(self, schema_name: str, table_name: str, time_intervals: axes = axes.flatten() for i, time_int in enumerate(intervals): df = model_table_df.filter((f.col('EVAL_TIME') >= time_int[0]) & (f.col('EVAL_TIME') < time_int[1])) - distplot(axes[i], [self._remove_outliers(df.select(f.col('PREDICTION')), 'PREDICTION')], bins=15) + distplot(axes[i], [remove_outliers(df.select(f.col('PREDICTION')), 'PREDICTION')], bins=15) axes[i].set_title(f"{time_int[0]}") axes[i].legend() else: fig, axes = plt.subplots(nrows=1, ncols=1, figsize=(10, 10)) - distplot(axes, [self._remove_outliers(model_table_df.select(f.col('PREDICTION')), 'PREDICTION')], bins=15) + distplot(axes, [remove_outliers(model_table_df.select(f.col('PREDICTION')), 'PREDICTION')], bins=15) axes.set_title(f"Predictions at {min_ts}") axes.legend() diff --git a/splicemachine/features/utils/drift_utils.py b/splicemachine/features/utils/drift_utils.py index 13a609f..b50ebba 100644 --- a/splicemachine/features/utils/drift_utils.py +++ b/splicemachine/features/utils/drift_utils.py @@ -1,3 +1,68 @@ """ A set of utility functions for calculating drift of deployed models """ +import datetime as datetime + + +def calculate_outlier_bounds(df, column_name): + """ + Calculates outlier bounds based on interquartile range of distribution of values in column 'column_name' + from data set in data frame 'df'. + :param df: data frame containing data to be analyzed + :param column_name: column name to analyze + :return: dictionary with keys min, max, q1 and q3 keys and corresponding values for outlier minimum, maximum + and 25th and 75th percentile values (q1,q3) + """ + bounds = dict(zip(["q1", "q3"], df.approxQuantile(column_name, [0.25, 0.75], 0))) + iqr = bounds['q3'] - bounds['q1'] + bounds['min'] = bounds['q1'] - (iqr * 1.5) + bounds['max'] = bounds['q3'] + (iqr * 1.5) + return bounds + + +def remove_outliers(df, column_name): + """ + Calculates outlier bounds no distribution of 'column_name' values and returns a filtered data frame without + outliers in the specified column. + :param df: data frame with data to remove outliers from + :param column_name: name of column to remove outliers from + :return: input data frame filtered to remove outliers + """ + import pyspark.sql.functions as f + bounds = calculate_outlier_bounds(df, column_name) + return df.filter((f.col(column_name) >= bounds['min']) & (f.col(column_name) <= bounds['max'])) + + +def add_feature_plot(ax, train_df, model_df, feature, n_bins): + """ + Adds a distplot of the outlier free feature values from both train_df and model_df data frames which both + contain the feature. + :param ax: target subplot for chart + :param train_df: training data containing feature of interest + :param model_df: model input data also containing feature of interest + :param feature: name of feature to display in distribution histogram + :param n_bins: number of bins to use in histogram plot + :return: None + """ + from pyspark_dist_explore import distplot + import pyspark.sql.functions as f + distplot(ax, [remove_outliers(train_df.select(f.col(feature).alias('training')), 'training'), + remove_outliers(model_df.select(f.col(feature).alias('model')), 'model')], bins=n_bins) + ax.set_title(feature) + ax.legend() + + +def datetime_range_split( start: datetime, end: datetime, number: int): + """ + Subdivides the time frame defined by 'start' and 'end' parameters into 'number' equal time frames. + :param start: start date time + :param end: end date time + :param number: number of time frames to split into + :return: list of start/end date times + """ + from itertools import count, islice + start_secs = (start - datetime(1970, 1, 1)).total_seconds() + end_secs = (end - datetime(1970, 1, 1)).total_seconds() + dates = [datetime.fromtimestamp(el) for el in + islice(count(start_secs, (end_secs - start_secs) / number), number + 1)] + return zip(dates, dates[1:]) \ No newline at end of file From 70abcbd86b3c4a315b8775e3a407032f0788f78f Mon Sep 17 00:00:00 2001 From: sergioferragut Date: Wed, 13 Jan 2021 14:09:44 -0800 Subject: [PATCH 05/13] added init file --- splicemachine/features/feature_store.py | 2 +- splicemachine/features/utils/__init__.py | 0 2 files changed, 1 insertion(+), 1 deletion(-) create mode 100644 splicemachine/features/utils/__init__.py diff --git a/splicemachine/features/feature_store.py b/splicemachine/features/feature_store.py index 6872985..2269fab 100644 --- a/splicemachine/features/feature_store.py +++ b/splicemachine/features/feature_store.py @@ -16,7 +16,7 @@ from splicemachine.spark import PySpliceContext from splicemachine.features import Feature, FeatureSet from .training_set import TrainingSet -from .utils.drift_utils import add_feature_plot, remove_outliers, datetime_range_split +from .utils.drift_utils import (add_feature_plot, remove_outliers, datetime_range_split) from .utils.training_utils import (dict_to_lower, _generate_training_set_history_sql, _generate_training_set_sql, _create_temp_training_view) from .constants import SQL, FeatureType diff --git a/splicemachine/features/utils/__init__.py b/splicemachine/features/utils/__init__.py new file mode 100644 index 0000000..e69de29 From 61bb6dc933c14db1bd641d57c197b0cc84564d83 Mon Sep 17 00:00:00 2001 From: sergioferragut Date: Wed, 13 Jan 2021 16:23:06 -0800 Subject: [PATCH 06/13] more refactoring and documentation --- docs/conf.py | 2 +- splicemachine/features/feature_store.py | 84 ++++++++------------- splicemachine/features/utils/drift_utils.py | 55 +++++++++++++- 3 files changed, 87 insertions(+), 54 deletions(-) diff --git a/docs/conf.py b/docs/conf.py index 68bca17..26226fb 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -46,7 +46,7 @@ 'private-members':True, 'inherited-members':True, 'undoc-members': False, - 'exclude-members': '_validate_feature_vector_keys,_process_features,__prune_features_for_elimination,_register_metadata,_register_metadata,__update_deployment_status,__log_mlflow_results,__get_feature_importance,__get_pipeline,_validate_training_view,_validate_feature_set,_validate_feature,__validate_feature_data_type,_check_for_splice_ctx,_dropTableIfExists, _generateDBSchema,_getCreateTableSchema,_jstructtype,_spliceSparkPackagesName,_splicemachineContext,apply_patches, main' + 'exclude-members': '_retrieve_model_data_sets,_retrieve_training_set_metadata_from_deployement,_validate_feature_vector_keys,_process_features,__prune_features_for_elimination,_register_metadata,_register_metadata,__update_deployment_status,__log_mlflow_results,__get_feature_importance,__get_pipeline,_validate_training_view,_validate_feature_set,_validate_feature,__validate_feature_data_type,_check_for_splice_ctx,_dropTableIfExists, _generateDBSchema,_getCreateTableSchema,_jstructtype,_spliceSparkPackagesName,_splicemachineContext,apply_patches, main' } # Add any paths that contain templates here, relative to this directory. diff --git a/splicemachine/features/feature_store.py b/splicemachine/features/feature_store.py index 2269fab..415af38 100644 --- a/splicemachine/features/feature_store.py +++ b/splicemachine/features/feature_store.py @@ -7,6 +7,7 @@ from pandas import DataFrame as PandasDF from pyspark.sql.dataframe import DataFrame as SparkDF +import pyspark.sql.functions as psf from pyspark.ml import Pipeline from pyspark.ml.classification import RandomForestClassifier from pyspark.ml.regression import RandomForestRegressor @@ -16,7 +17,7 @@ from splicemachine.spark import PySpliceContext from splicemachine.features import Feature, FeatureSet from .training_set import TrainingSet -from .utils.drift_utils import (add_feature_plot, remove_outliers, datetime_range_split) +from .utils.drift_utils import (add_feature_plot, remove_outliers, datetime_range_split, build_feature_drift_plot, build_model_drift_plot) from .utils.training_utils import (dict_to_lower, _generate_training_set_history_sql, _generate_training_set_sql, _create_temp_training_view) from .constants import SQL, FeatureType @@ -688,7 +689,13 @@ def describe_training_view(self, training_view: str) -> None: def set_feature_description(self): raise NotImplementedError - def _retrieve_training_set_from_deployment( self, schema_name, table_name): + def get_training_set_from_deployment(self, schema_name, table_name): + """ + Reads Feature Store metadata to rebuild orginal training data set used for the given deployed model. + :param schema_name: model schema name + :param table_name: model table name + :return: + """ metadata = self._retrieve_training_set_metadata_from_deployement(schema_name, table_name) features = metadata['FEATURES'].split(',') tv_name = metadata['NAME'] @@ -701,19 +708,31 @@ def _retrieve_training_set_from_deployment( self, schema_name, table_name): training_set_df = self.get_training_set(features=features, start_time=start_time, end_time=end_time) return training_set_df - def _retrieve_model_data_sets( self, schema_name, table_name): - training_set_df = self._retrieve_training_set_from_deployment(schema_name, table_name) + def _retrieve_model_data_sets(self, schema_name, table_name): + """ + Returns the training set dataframe and model table dataframe for a given deployed model. + :param schema_name: model schema name + :param table_name: model table name + :return: + """ + training_set_df = self.get_training_set_from_deployment(schema_name, table_name) model_table_df = self.splice_ctx.df(f'SELECT * FROM {schema_name}.{table_name}') return training_set_df, model_table_df def _retrieve_training_set_metadata_from_deployement(self, schema_name: str, table_name: str): + """ + Reads Feature Store metadata to retrieve definition of training set used to train the specified model. + :param schema_name: model schema name + :param table_name: model table name + :return: + """ sql = SQL.get_deployment_metadata.format(schema_name=schema_name, table_name=table_name) deploy_df = self.splice_ctx.df(sql).collect() cnt = len(deploy_df) if cnt == 1: return deploy_df[0] - def display_model_feature_drift(self, schema_name, table_name): + def display_model_feature_drift(self, schema_name: str, table_name: str): """ Displays feature by feature comparison between the training set of the deployed model and the input feature values used with the model since deployment. @@ -721,26 +740,12 @@ def display_model_feature_drift(self, schema_name, table_name): :param table_name: name of the model table :return: None """ - from matplotlib.pyplot import show, subplots metadata = self._retrieve_training_set_metadata_from_deployement(schema_name, table_name) - if metadata: - features = metadata['FEATURES'].split(',') - training_set_df, model_table_df = self._retrieve_model_data_sets(schema_name, table_name) - final_features = [f for f in features if f in model_table_df.columns] - # prep plot area - n_bins = 15 - num_features = len(final_features) - n_rows = int(num_features / 5) - if num_features % 5 > 0: - n_rows = n_rows + 1 - fig, axes = subplots(nrows=n_rows, ncols=5, figsize=(30, 10 * n_rows)) - axes = axes.flatten() - # calculate combined plots for each feature - for plot, f in enumerate(final_features): - add_feature_plot(axes[plot], training_set_df, model_table_df, f, n_bins) - show() - else: - print(f"Could not find deployment for model table {schema_name}.{table_name}") + if not metadata: + raise SpliceMachineException(f"Could not find deployment for model table {schema_name}.{table_name}") from None + training_set_df, model_table_df = self._retrieve_model_data_sets(schema_name, table_name) + features = metadata['FEATURES'].split(',') + build_feature_drift_plot(features, training_set_df, model_table_df) def display_model_drift(self, schema_name: str, table_name: str, time_intervals: int, @@ -757,40 +762,17 @@ def display_model_drift(self, schema_name: str, table_name: str, time_intervals: :param end_time: if specified, filters to only show predictions occurring before this date/time :return: None """ - from datetime import datetime - import matplotlib.pyplot as plt - from pyspark_dist_explore import distplot - import pyspark.sql.functions as f - # set default timeframe if not specified if not start_time: start_time = datetime(1900, 1, 1, 0, 0, 0) if not end_time: end_time = datetime.now() # retrieve predictions the model has made over time - sql = SQL.get_model_predictions.format(schema_name=schema_name, table_name=table_name, start_time=start_time, - end_time=end_time) + sql = SQL.get_model_predictions.format(schema_name=schema_name, table_name=table_name, + start_time=start_time, end_time=end_time) model_table_df = self.splice_ctx.df(sql) - min_ts = model_table_df.first()['EVAL_TIME'] - max_ts = model_table_df.orderBy(f.col("EVAL_TIME").desc()).first()['EVAL_TIME'] - - if max_ts > min_ts: - intervals = datetime_range_split(min_ts, max_ts, time_intervals) - n_rows = int(time_intervals / 5) - if time_intervals % 5 > 0: - n_rows = n_rows + 1 - fig, axes = plt.subplots(nrows=n_rows, ncols=5, figsize=(30, 10 * n_rows)) - axes = axes.flatten() - for i, time_int in enumerate(intervals): - df = model_table_df.filter((f.col('EVAL_TIME') >= time_int[0]) & (f.col('EVAL_TIME') < time_int[1])) - distplot(axes[i], [remove_outliers(df.select(f.col('PREDICTION')), 'PREDICTION')], bins=15) - axes[i].set_title(f"{time_int[0]}") - axes[i].legend() - else: - fig, axes = plt.subplots(nrows=1, ncols=1, figsize=(10, 10)) - distplot(axes, [remove_outliers(model_table_df.select(f.col('PREDICTION')), 'PREDICTION')], bins=15) - axes.set_title(f"Predictions at {min_ts}") - axes.legend() + build_model_drift_plot(model_table_df, time_intervals) + def __get_pipeline(self, df, features, label, model_type): """ diff --git a/splicemachine/features/utils/drift_utils.py b/splicemachine/features/utils/drift_utils.py index b50ebba..052be2d 100644 --- a/splicemachine/features/utils/drift_utils.py +++ b/splicemachine/features/utils/drift_utils.py @@ -2,7 +2,9 @@ A set of utility functions for calculating drift of deployed models """ import datetime as datetime - +import matplotlib.pyplot as plt +from datetime import datetime +import pyspark.sql.functions as f def calculate_outlier_bounds(df, column_name): """ @@ -65,4 +67,53 @@ def datetime_range_split( start: datetime, end: datetime, number: int): end_secs = (end - datetime(1970, 1, 1)).total_seconds() dates = [datetime.fromtimestamp(el) for el in islice(count(start_secs, (end_secs - start_secs) / number), number + 1)] - return zip(dates, dates[1:]) \ No newline at end of file + return zip(dates, dates[1:]) + +def build_feature_drift_plot(features, training_set_df, model_table_df): + """ + Displays feature by feature comparison of distributions between the training set and the model inputs. + :param features: list of features to analyze + :param training_set_df: the dataframe used for training the model that contains all the features to analyze + :param model_table_df: the dataframe with the content of the model table containing all input features + :return: None + """ + final_features = [f for f in features if f in model_table_df.columns] + # prep plot area + n_bins = 15 + num_features = len(final_features) + n_rows = int(num_features / 5) + if num_features % 5 > 0: + n_rows = n_rows + 1 + fig, axes = plt.subplots(nrows=n_rows, ncols=5, figsize=(30, 10 * n_rows)) + axes = axes.flatten() + # calculate combined plots for each feature + for plot, f in enumerate(final_features): + add_feature_plot(axes[plot], training_set_df, model_table_df, f, n_bins) + plt.show() + +def build_model_drift_plot( model_table_df, time_intervals): + """ + Displays model prediction distribution plots split into multiple time intervals. + :param model_table_df: dataframe containing columns EVAL_TIME and PREDICTION + :param time_intervals: number of time intervals to display + :return: + """ + min_ts = model_table_df.first()['EVAL_TIME'] + max_ts = model_table_df.orderBy(f.col("EVAL_TIME").desc()).first()['EVAL_TIME'] + if max_ts > min_ts: + intervals = datetime_range_split(min_ts, max_ts, time_intervals) + n_rows = int(time_intervals / 5) + if time_intervals % 5 > 0: + n_rows = n_rows + 1 + fig, axes = plt.subplots(nrows=n_rows, ncols=5, figsize=(30, 10 * n_rows)) + axes = axes.flatten() + for i, time_int in enumerate(intervals): + df = model_table_df.filter((f.col('EVAL_TIME') >= time_int[0]) & (f.col('EVAL_TIME') < time_int[1])) + plt.distplot(axes[i], [remove_outliers(df.select(f.col('PREDICTION')), 'PREDICTION')], bins=15) + axes[i].set_title(f"{time_int[0]}") + axes[i].legend() + else: + fig, axes = plt.subplots(nrows=1, ncols=1, figsize=(10, 10)) + plt.distplot(axes, [remove_outliers(model_table_df.select(f.col('PREDICTION')), 'PREDICTION')], bins=15) + axes.set_title(f"Predictions at {min_ts}") + axes.legend() From 3622da07fb7e99b0171552e40383b59dad02c8a2 Mon Sep 17 00:00:00 2001 From: sergioferragut Date: Wed, 13 Jan 2021 16:31:10 -0800 Subject: [PATCH 07/13] more refactoring and documentation --- splicemachine/features/utils/drift_utils.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/splicemachine/features/utils/drift_utils.py b/splicemachine/features/utils/drift_utils.py index 052be2d..2a86ce4 100644 --- a/splicemachine/features/utils/drift_utils.py +++ b/splicemachine/features/utils/drift_utils.py @@ -5,6 +5,9 @@ import matplotlib.pyplot as plt from datetime import datetime import pyspark.sql.functions as f +from pyspark_dist_explore import distplot +from itertools import count, islice + def calculate_outlier_bounds(df, column_name): """ @@ -46,8 +49,6 @@ def add_feature_plot(ax, train_df, model_df, feature, n_bins): :param n_bins: number of bins to use in histogram plot :return: None """ - from pyspark_dist_explore import distplot - import pyspark.sql.functions as f distplot(ax, [remove_outliers(train_df.select(f.col(feature).alias('training')), 'training'), remove_outliers(model_df.select(f.col(feature).alias('model')), 'model')], bins=n_bins) ax.set_title(feature) @@ -62,7 +63,6 @@ def datetime_range_split( start: datetime, end: datetime, number: int): :param number: number of time frames to split into :return: list of start/end date times """ - from itertools import count, islice start_secs = (start - datetime(1970, 1, 1)).total_seconds() end_secs = (end - datetime(1970, 1, 1)).total_seconds() dates = [datetime.fromtimestamp(el) for el in @@ -109,11 +109,11 @@ def build_model_drift_plot( model_table_df, time_intervals): axes = axes.flatten() for i, time_int in enumerate(intervals): df = model_table_df.filter((f.col('EVAL_TIME') >= time_int[0]) & (f.col('EVAL_TIME') < time_int[1])) - plt.distplot(axes[i], [remove_outliers(df.select(f.col('PREDICTION')), 'PREDICTION')], bins=15) + distplot(axes[i], [remove_outliers(df.select(f.col('PREDICTION')), 'PREDICTION')], bins=15) axes[i].set_title(f"{time_int[0]}") axes[i].legend() else: fig, axes = plt.subplots(nrows=1, ncols=1, figsize=(10, 10)) - plt.distplot(axes, [remove_outliers(model_table_df.select(f.col('PREDICTION')), 'PREDICTION')], bins=15) + distplot(axes, [remove_outliers(model_table_df.select(f.col('PREDICTION')), 'PREDICTION')], bins=15) axes.set_title(f"Predictions at {min_ts}") axes.legend() From 999ccff67b58322e69813a8fa29123f87ba67b2b Mon Sep 17 00:00:00 2001 From: Ben Epstein Date: Thu, 14 Jan 2021 15:33:02 -0700 Subject: [PATCH 08/13] merge master --- splicemachine/features/feature_store.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/splicemachine/features/feature_store.py b/splicemachine/features/feature_store.py index d849863..9f1a196 100644 --- a/splicemachine/features/feature_store.py +++ b/splicemachine/features/feature_store.py @@ -27,6 +27,7 @@ class FeatureStore: def __init__(self, splice_ctx: PySpliceContext) -> None: self.splice_ctx = splice_ctx + self.mlflow_ctx = None self.feature_sets = [] # Cache of newly created feature sets def register_splice_context(self, splice_ctx: PySpliceContext) -> None: @@ -168,7 +169,7 @@ def get_feature_vector(self, features: List[Union[str, Feature]], Gets a feature vector given a list of Features and primary key values for their corresponding Feature Sets :param features: List of str Feature names or Features - :param join_key_values: (dict) join key vals to get the proper Feature values formatted as {join_key_column_name: join_key_value} + :param join_key_values: (dict) join key values to get the proper Feature values formatted as {join_key_column_name: join_key_value} :param return_sql: Whether to return the SQL needed to get the vector or the values themselves. Default False :return: Pandas Dataframe or str (SQL statement) """ @@ -823,10 +824,15 @@ def __log_mlflow_results(self, name, rounds, mlflow_results): :param mlflow_results: The params / metrics to log :return: """ - with self.mlflow_ctx.start_run(run_name=name): + try: + if self.mlflow_ctx.active_run(): + self.mlflow_ctx.start_run(run_name=name) for r in range(rounds): with self.mlflow_ctx.start_run(run_name=f'Round {r}', nested=True): self.mlflow_ctx.log_metrics(mlflow_results[r]) + finally: + self.mlflow_ctx.end_run() + def __prune_features_for_elimination(self, features) -> List[Feature]: """ From 59ccb39954654d57b89c6582d8baceac122a7ae7 Mon Sep 17 00:00:00 2001 From: Ben Epstein Date: Thu, 14 Jan 2021 15:34:28 -0700 Subject: [PATCH 09/13] merge master --- splicemachine/features/feature_store.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/splicemachine/features/feature_store.py b/splicemachine/features/feature_store.py index 9f1a196..4b395dc 100644 --- a/splicemachine/features/feature_store.py +++ b/splicemachine/features/feature_store.py @@ -336,7 +336,7 @@ def get_training_set(self, features: Union[List[Feature], List[str]], current_va ts.start_time = ts.end_time if self.mlflow_ctx and not return_sql: - self.mlflow_ctx._active_training_set: TrainingSet = ts + self.mlflow_ctx._active_training_set = ts ts._register_metadata(self.mlflow_ctx) return sql if return_sql else self.splice_ctx.df(sql) From 96aaad82524ae68ad517810040185a12e0e2415b Mon Sep 17 00:00:00 2001 From: Ben Epstein Date: Thu, 14 Jan 2021 16:45:05 -0700 Subject: [PATCH 10/13] case sensitivity --- splicemachine/features/feature.py | 2 +- splicemachine/features/feature_set.py | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/splicemachine/features/feature.py b/splicemachine/features/feature.py index 9d8a327..cccf7a3 100644 --- a/splicemachine/features/feature.py +++ b/splicemachine/features/feature.py @@ -2,7 +2,7 @@ class Feature: def __init__(self, *, name, description, feature_data_type, feature_type, tags, feature_set_id=None, feature_id=None, **kwargs): - self.name = name + self.name = name.upper() self.description = description self.feature_data_type = feature_data_type self.feature_type = feature_type diff --git a/splicemachine/features/feature_set.py b/splicemachine/features/feature_set.py index faa5571..6989459 100644 --- a/splicemachine/features/feature_set.py +++ b/splicemachine/features/feature_set.py @@ -12,8 +12,8 @@ def __init__(self, *, splice_ctx: PySpliceContext, table_name, schema_name, desc primary_keys: Dict[str, str], feature_set_id=None, deployed: bool = False, **kwargs): self.splice_ctx = splice_ctx - self.table_name = table_name - self.schema_name = schema_name + self.table_name = table_name.upper() + self.schema_name = schema_name.upper() self.description = description self.primary_keys = primary_keys self.feature_set_id = feature_set_id @@ -91,6 +91,7 @@ def __update_deployment_status(self, status: bool): """ self.splice_ctx.execute(SQL.update_fset_deployment_status.format(status=int(status), feature_set_id=self.feature_set_id)) + self.deployed = True def deploy(self, verbose=False): From e2740bf76572fdb1c162d41bd865686617c08db0 Mon Sep 17 00:00:00 2001 From: sergioferragut Date: Thu, 14 Jan 2021 18:12:19 -0800 Subject: [PATCH 11/13] upper case for all schema name table name parameters to address DB case --- splicemachine/features/feature_store.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/splicemachine/features/feature_store.py b/splicemachine/features/feature_store.py index 4b395dc..6c18891 100644 --- a/splicemachine/features/feature_store.py +++ b/splicemachine/features/feature_store.py @@ -437,6 +437,10 @@ def create_feature_set(self, schema_name: str, table_name: str, primary_keys: Di :param desc: The (optional) description :return: FeatureSet """ + # database stores object names in upper case + schema_name = schema_name.upper() + table_name = table_name.upper() + self._validate_feature_set(schema_name, table_name) fset = FeatureSet(splice_ctx=self.splice_ctx, schema_name=schema_name, table_name=table_name, primary_keys=primary_keys, @@ -495,6 +499,9 @@ def create_feature(self, schema_name: str, table_name: str, name: str, feature_d :return: Feature created """ self.__validate_feature_data_type(feature_data_type) + # database stores object names in upper case + schema_name = schema_name.upper() + table_name = table_name.upper() if self.splice_ctx.tableExists(schema_name, table_name): raise SpliceMachineException(f"Feature Set {schema_name}.{table_name} is already deployed. You cannot " f"add features to a deployed feature set.") @@ -643,6 +650,10 @@ def describe_feature_set(self, schema_name: str, table_name: str) -> None: :param table_name: feature set table name :return: None """ + # database stores object names in upper case + schema_name = schema_name.upper() + table_name = table_name.upper() + fset = self.get_feature_sets(_filter={'schema_name': schema_name, 'table_name': table_name}) if not fset: raise SpliceMachineException( f"Feature Set {schema_name}.{table_name} not found. Check name and try again.") @@ -727,6 +738,10 @@ def _retrieve_training_set_metadata_from_deployement(self, schema_name: str, tab :param table_name: model table name :return: """ + # database stores object names in upper case + schema_name = schema_name.upper() + table_name = table_name.upper() + sql = SQL.get_deployment_metadata.format(schema_name=schema_name, table_name=table_name) deploy_df = self.splice_ctx.df(sql).collect() cnt = len(deploy_df) @@ -741,6 +756,10 @@ def display_model_feature_drift(self, schema_name: str, table_name: str): :param table_name: name of the model table :return: None """ + # database stores object names in upper case + schema_name = schema_name.upper() + table_name = table_name.upper() + metadata = self._retrieve_training_set_metadata_from_deployement(schema_name, table_name) if not metadata: raise SpliceMachineException(f"Could not find deployment for model table {schema_name}.{table_name}") from None @@ -763,6 +782,9 @@ def display_model_drift(self, schema_name: str, table_name: str, time_intervals: :param end_time: if specified, filters to only show predictions occurring before this date/time :return: None """ + # database stores object names in upper case + schema_name = schema_name.upper() + table_name = table_name.upper() # set default timeframe if not specified if not start_time: start_time = datetime(1900, 1, 1, 0, 0, 0) From b36aacf6c51a3fb657186c2861a94b4e202bcd70 Mon Sep 17 00:00:00 2001 From: sergioferragut Date: Thu, 14 Jan 2021 18:27:01 -0800 Subject: [PATCH 12/13] upper case for all schema name table name parameters to address DB case (2nd pass) --- splicemachine/features/feature_store.py | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/splicemachine/features/feature_store.py b/splicemachine/features/feature_store.py index 6c18891..6bec430 100644 --- a/splicemachine/features/feature_store.py +++ b/splicemachine/features/feature_store.py @@ -413,13 +413,17 @@ def list_training_sets(self) -> Dict[str, Optional[str]]: """ raise NotImplementedError("To see available training views, run fs.describe_training_views()") - def _validate_feature_set(self, schema_name, table_name): + def _validate_feature_set(self, schema_name: str, table_name: str): """ Asserts a feature set doesn't already exist in the database :param schema_name: schema name of the feature set :param table_name: table name of the feature set :return: None """ + # database stores object names in upper case + schema_name = schema_name.upper() + table_name = table_name.upper() + str = f'Feature Set {schema_name}.{table_name} already exists. Use a different schema and/or table name.' # Validate Table assert not self.splice_ctx.tableExists(schema_name, table_name=table_name), str @@ -502,6 +506,7 @@ def create_feature(self, schema_name: str, table_name: str, name: str, feature_d # database stores object names in upper case schema_name = schema_name.upper() table_name = table_name.upper() + if self.splice_ctx.tableExists(schema_name, table_name): raise SpliceMachineException(f"Feature Set {schema_name}.{table_name} is already deployed. You cannot " f"add features to a deployed feature set.") @@ -612,7 +617,7 @@ def _process_features(self, features: List[Union[Feature, str]]) -> List[Feature " a feature name (string) or a Feature object" return all_features - def deploy_feature_set(self, schema_name, table_name): + def deploy_feature_set(self, schema_name: str, table_name: str): """ Deploys a feature set to the database. This persists the feature stores existence. As of now, once deployed you cannot delete the feature set or add/delete features. @@ -622,6 +627,9 @@ def deploy_feature_set(self, schema_name, table_name): :param table_name: The table of the created feature set """ try: + # database stores object names in upper case + schema_name = schema_name.upper() + table_name = table_name.upper() fset = self.get_feature_sets(_filter={'schema_name': schema_name, 'table_name': table_name})[0] except: raise SpliceMachineException( @@ -701,13 +709,17 @@ def describe_training_view(self, training_view: str) -> None: def set_feature_description(self): raise NotImplementedError - def get_training_set_from_deployment(self, schema_name, table_name): + def get_training_set_from_deployment(self, schema_name: str, table_name: str): """ Reads Feature Store metadata to rebuild orginal training data set used for the given deployed model. :param schema_name: model schema name :param table_name: model table name :return: """ + # database stores object names in upper case + schema_name = schema_name.upper() + table_name = table_name.upper() + metadata = self._retrieve_training_set_metadata_from_deployement(schema_name, table_name) features = metadata['FEATURES'].split(',') tv_name = metadata['NAME'] @@ -720,13 +732,17 @@ def get_training_set_from_deployment(self, schema_name, table_name): training_set_df = self.get_training_set(features=features, start_time=start_time, end_time=end_time) return training_set_df - def _retrieve_model_data_sets(self, schema_name, table_name): + def _retrieve_model_data_sets(self, schema_name: str, table_name: str): """ Returns the training set dataframe and model table dataframe for a given deployed model. :param schema_name: model schema name :param table_name: model table name :return: """ + # database stores object names in upper case + schema_name = schema_name.upper() + table_name = table_name.upper() + training_set_df = self.get_training_set_from_deployment(schema_name, table_name) model_table_df = self.splice_ctx.df(f'SELECT * FROM {schema_name}.{table_name}') return training_set_df, model_table_df @@ -785,6 +801,7 @@ def display_model_drift(self, schema_name: str, table_name: str, time_intervals: # database stores object names in upper case schema_name = schema_name.upper() table_name = table_name.upper() + # set default timeframe if not specified if not start_time: start_time = datetime(1900, 1, 1, 0, 0, 0) From 0c59d72db104be8257ff618286634514d1aea758 Mon Sep 17 00:00:00 2001 From: sergioferragut Date: Thu, 14 Jan 2021 19:54:35 -0800 Subject: [PATCH 13/13] upper case for all schema name table name parameters to address DB case (3rd pass) --- splicemachine/mlflow_support/mlflow_support.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/splicemachine/mlflow_support/mlflow_support.py b/splicemachine/mlflow_support/mlflow_support.py index 294e019..bdc8ce3 100644 --- a/splicemachine/mlflow_support/mlflow_support.py +++ b/splicemachine/mlflow_support/mlflow_support.py @@ -883,6 +883,11 @@ def _deploy_db(db_schema_name: str, _check_for_splice_ctx() print("Deploying model to database...") + # database converts all object names to upper case, so we need to as well in our metadata + db_schema_name=db_schema_name.upper() + db_table_name=db_table_name.upper() + + # ~ Backwards Compatability ~ if verbose: print("Deprecated Parameter 'verbose'. Use mlflow.watch_job() or mlflow.fetch_logs() to get"