Skip to content
This repository has been archived by the owner on Apr 15, 2022. It is now read-only.

Commit

Permalink
Dbaas 4951 (#98)
Browse files Browse the repository at this point in the history
* cleanup of api

* add context (primary) key to describe feature sets

* optional verbose to print sql in create_training_context

* added get_feature_dataset

* comments

* old code

* i hate upppercase

* commment

* sql format

* i still hate uppercase

* null tx

* sql format

* sql format

* docs

* docs

* docs

* docs

* docs

* docs

* docs

* docs

* docs

* docs

* docs

* docs

* docs

* docs

* docs

* verbose

* docs

* docs

* column ordering

* feature param cleanup

* training context features

* removed clean_df

* to_lower

* docs

* better logic

* better logic

* label column validation

* refactor TrainingContext -> TrainingView, Feature Set Context Key -> Feature Set Join Key

* missed one

* exclude 2 more funcs

* docs

* as list

* missed some more

* hashable

* pep

* docs

* docs

* handleinvalid keep

* feature_vector_sql

* get-features_by_name requires names

* exclude members

* return Feature, docs fix

* history for get_training_set without TrainingView

* removed clean_df

* missing collect

* curretn values

* 2 froms

* add pk cols

* join with itself

* better line

* merge master

* remove include_insert

* myles as codeowner
  • Loading branch information
Ben Epstein authored Jan 7, 2021
2 parents 7660e39 + b5b679a commit 5ddcf9b
Show file tree
Hide file tree
Showing 6 changed files with 245 additions and 144 deletions.
2 changes: 1 addition & 1 deletion CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#Global-Reviewers

#* @splicemachine/splice-cloudops
* @bklo94 @Ben-Epstein @edriggers @jhoule-splice @jramineni @njnygaard @patricksplice @splicemaahs @troysplice @abaveja313
* @bklo94 @Ben-Epstein @edriggers @jramineni @myles-novick @splicemaahs @troysplice @abaveja313
3 changes: 2 additions & 1 deletion splicemachine/features/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ class SQL:
FROM {FEATURE_STORE_SCHEMA}.feature_set_key GROUP BY 1
) p
ON fset.feature_set_id=p.feature_set_id
where fset.feature_set_id in (select feature_set_id from {FEATURE_STORE_SCHEMA}.feature where name in {{names}} )
WHERE fset.feature_set_id in (select feature_set_id from {FEATURE_STORE_SCHEMA}.feature where name in {{names}} )
ORDER BY schema_name, table_name
"""

get_all_features = f"SELECT NAME FROM {FEATURE_STORE_SCHEMA}.feature WHERE Name='{{name}}'"
Expand Down
8 changes: 4 additions & 4 deletions splicemachine/features/feature_set.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from splicemachine.features import Feature
from .constants import SQL, Columns
from .utils import clean_df
from splicemachine.spark import PySpliceContext
from typing import List, Dict

Expand Down Expand Up @@ -34,8 +33,8 @@ def get_features(self) -> List[Feature]:
"""
features = []
if self.feature_set_id:
features_df = self.splice_ctx.df(SQL.get_features_in_feature_set.format(feature_set_id=self.feature_set_id))
features_df = clean_df(features_df, Columns.feature).collect()
features_df = self.splice_ctx.df(SQL.get_features_in_feature_set.format(feature_set_id=self.feature_set_id),
to_lower=True).collect()
for f in features_df:
f = f.asDict()
features.append(Feature(**f))
Expand Down Expand Up @@ -134,7 +133,8 @@ def deploy(self, verbose=False):

def __eq__(self, other):
if isinstance(other, FeatureSet):
return self.table_name == other.table_name and self.schema_name == other.schema_name
return self.table_name.lower() == other.table_name.lower() and \
self.schema_name.lower() == other.schema_name.lower()
return False

def __repr__(self):
Expand Down
217 changes: 82 additions & 135 deletions splicemachine/features/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
from splicemachine.spark import PySpliceContext
from splicemachine.features import Feature, FeatureSet
from .training_set import TrainingSet
from .utils import dict_to_lower
from .utils import (dict_to_lower, _generate_training_set_history_sql,
_generate_training_set_sql, _create_temp_training_view)
from .constants import SQL, FeatureType
from .training_view import TrainingView

Expand Down Expand Up @@ -64,75 +65,6 @@ def get_feature_sets(self, feature_set_ids: List[int] = None, _filter: Dict[str,
feature_sets.append(FeatureSet(splice_ctx=self.splice_ctx, **d))
return feature_sets

def get_training_set(self, features: Union[List[Feature], List[str]]) -> SparkDF:
"""
Gets a set of feature values across feature sets that is not time dependent (ie for non time series clustering).
This feature dataset will be treated and tracked implicitly the same way a training_dataset is tracked from
:py:meth:`features.FeatureStore.get_training_set` . The dataset's metadata and features used will be tracked in mlflow automatically (see
get_training_set for more details).
:param features: List of Features or strings of feature names
:NOTE:
.. code-block:: text
The Features Sets which the list of Features come from must have common join keys,
otherwise the function will fail. If there is no common join key, it is recommended to
create a Training View to specify the join conditions.
:return: Spark DF
"""
features = self._process_features(features)

sql = SQL.get_feature_set_join_keys.format(names=tuple([f.name for f in features]))
fset_keys: pd.DataFrame = self.splice_ctx.df(sql).toPandas()
# Get max number of pk (join) columns from all feature sets
fset_keys['PK_COLUMNS_COUNT'] = fset_keys['PK_COLUMNS'].apply(lambda x: len(x.split('|')))
# Get "anchor" feature set. The one we will use to try to join to all others
ind = fset_keys['PK_COLUMNS_COUNT'].idxmax()
anchor_series = fset_keys.iloc[ind]
# Remove that from the list
fset_keys.drop(index=ind, inplace=True)
all_pk_cols = anchor_series.PK_COLUMNS.split('|')
# For each feature set, assert that all join keys exist in our "anchor" feature set
fset_keys['can_join'] = fset_keys['PK_COLUMNS'].map(lambda x: set(x.split('|')).issubset(all_pk_cols))
if not fset_keys['can_join'].all():
bad_feature_set_ids = [t.FEATURE_SET_ID for _, t in fset_keys[fset_keys['can_join'] != True].iterrows()]
bad_features = [f.name for f in features if f.feature_set_id in bad_feature_set_ids]
raise SpliceMachineException(f"The provided features do not have a common join key."
f"Remove features {bad_features} from your request")

# SELECT clause
sql = 'SELECT '

sql += ','.join([f'fset{feature.feature_set_id}.{feature.name}' for feature in features])

alias = f'fset{anchor_series.FEATURE_SET_ID}' # We use this a lot for joins
sql += f'\nFROM {anchor_series.SCHEMA_NAME}.{anchor_series.TABLE_NAME} {alias} '

# JOIN clause
for _, fset in fset_keys.iterrows():
# Join Feature Set
sql += f'\nLEFT OUTER JOIN {fset.SCHEMA_NAME}.{fset.TABLE_NAME} fset{fset.FEATURE_SET_ID} \n\tON '
for ind, pkcol in enumerate(fset.PK_COLUMNS.split('|')):
if ind > 0: sql += ' AND ' # In case of multiple columns
sql += f'fset{fset.FEATURE_SET_ID}.{pkcol}={alias}.{pkcol}'

# Link this to mlflow for model deployment
# Here we create a null training view and pass it into the training set. We do this because this special kind
# of training set isn't standard. It's not based on a training view, on primary key columns, a label column,
# or a timestamp column . This is simply a joined set of features from different feature sets.
# But we still want to track this in mlflow as a user may build and deploy a model based on this. So we pass in
# a null training view that can be tracked with a "name" (although the name is None). This is a likely case
# for (non time based) clustering use cases.
null_tx = TrainingView(pk_columns=[], ts_column=None, label_column=None, view_sql=None, name=None,
description=None)
ts = TrainingSet(training_view=null_tx, features=features)
if hasattr(self, 'mlflow_ctx'):
self.mlflow_ctx._active_training_set: TrainingSet = ts
ts._register_metadata(self.mlflow_ctx)
return self.splice_ctx.df(sql)

def remove_training_view(self, override=False):
"""
Note: This function is not yet implemented.
Expand Down Expand Up @@ -228,8 +160,8 @@ def _validate_feature_vector_keys(self, join_key_values, feature_sets) -> None:
missing_keys = feature_set_key_columns - join_key_values.keys()
assert not missing_keys, f"The following keys were not provided and must be: {missing_keys}"

def get_feature_vector(self, features: List[Union[str,Feature]],
join_key_values: Dict[str,str], return_sql=False) -> Union[str, PandasDF]:
def get_feature_vector(self, features: List[Union[str, Feature]],
join_key_values: Dict[str, str], return_sql=False) -> Union[str, PandasDF]:
"""
Gets a feature vector given a list of Features and primary key values for their corresponding Feature Sets
Expand All @@ -246,9 +178,9 @@ def get_feature_vector(self, features: List[Union[str,Feature]],
feature_sets = self.get_feature_sets([f.feature_set_id for f in feats])
self._validate_feature_vector_keys(join_keys, feature_sets)


feature_names = ','.join([f.name for f in feats])
fset_tables = ','.join([f'{fset.schema_name}.{fset.table_name} fset{fset.feature_set_id}' for fset in feature_sets])
fset_tables = ','.join(
[f'{fset.schema_name}.{fset.table_name} fset{fset.feature_set_id}' for fset in feature_sets])
sql = "SELECT {feature_names} FROM {fset_tables} ".format(feature_names=feature_names, fset_tables=fset_tables)

# For each Feature Set, for each primary key in the given feature set, get primary key value from the user provided dictionary
Expand All @@ -260,8 +192,7 @@ def get_feature_vector(self, features: List[Union[str,Feature]],

return sql if return_sql else self.splice_ctx.df(sql).toPandas()

def get_feature_vector_sql_from_training_view(self, training_view: str, features: List[Feature],
include_insert: Optional[bool] = True) -> str:
def get_feature_vector_sql_from_training_view(self, training_view: str, features: List[Feature]) -> str:
"""
Returns the parameterized feature retrieval SQL used for online model serving.
Expand All @@ -274,25 +205,14 @@ def get_feature_vector_sql_from_training_view(self, training_view: str, features
This function will error if the view SQL is missing a view key required to retrieve the\
desired features
:param include_insert: (Optional[bool]) determines whether insert into model table is included in the SQL statement
:return: (str) the parameterized feature vector SQL
"""

# Get training view information (ctx primary key column(s), ctx primary key inference ts column, )
vid = self.get_training_view_id(training_view)
tctx = self.get_training_views(_filter={'view_id': vid})[0]

# optional INSERT prefix
if (include_insert):
sql = 'INSERT INTO {target_model_table} ('
for pkcol in tctx.pk_columns: # Select primary key column(s)
sql += f'{pkcol}, '
for feature in features:
sql += f'{feature.name}, ' # Collect all features over time
sql = sql.rstrip(', ')
sql += ')\nSELECT '
else:
sql = 'SELECT '
sql = 'SELECT '

# SELECT expressions
for pkcol in tctx.pk_columns: # Select primary key column(s)
Expand Down Expand Up @@ -351,6 +271,73 @@ def get_feature_description(self):
# TODO
raise NotImplementedError

def get_training_set(self, features: Union[List[Feature], List[str]], current_values_only: bool = False,
start_time: datetime = None, end_time: datetime = None, return_sql: bool = False) -> SparkDF:
"""
Gets a set of feature values across feature sets that is not time dependent (ie for non time series clustering).
This feature dataset will be treated and tracked implicitly the same way a training_dataset is tracked from
:py:meth:`features.FeatureStore.get_training_set` . The dataset's metadata and features used will be tracked in mlflow automatically (see
get_training_set for more details).
The way point-in-time correctness is guaranteed here is by choosing one of the Feature Sets as the "anchor" dataset.
This means that the points in time that the query is based off of will be the points in time in which the anchor
Feature Set recorded changes. The anchor Feature Set is the Feature Set that contains the superset of all primary key
columns across all Feature Sets from all Features provided. If more than 1 Feature Set has the superset of
all Feature Sets, the Feature Set with the most primary keys is selected. If more than 1 Feature Set has the same
maximum number of primary keys, the Feature Set is chosen by alphabetical order (schema_name, table_name).
:param features: List of Features or strings of feature names
:NOTE:
.. code-block:: text
The Features Sets which the list of Features come from must have common join keys,
otherwise the function will fail. If there is no common join key, it is recommended to
create a Training View to specify the join conditions.
:param current_values_only: If you only want the most recent values of the features, set this to true. Otherwise, all history will be returned. Default False
:param start_time: How far back in history you want Feature values. If not specified (and current_values_only is False), all history will be returned.
This parameter only takes effect if current_values_only is False.
:param end_time: The most recent values for each selected Feature. This will be the cutoff time, such that any Feature values that
were updated after this point in time won't be selected. If not specified (and current_values_only is False),
Feature values up to the moment in time you call the function (now) will be retrieved. This parameter
only takes effect if current_values_only is False.
:return: Spark DF
"""
# Get List[Feature]
features = self._process_features(features)

# Get the Feature Sets
fsets = self.get_feature_sets(list({f.feature_set_id for f in features}))

if current_values_only:
sql = _generate_training_set_sql(features, fsets)
else:
temp_vw = _create_temp_training_view(features, fsets)
sql = _generate_training_set_history_sql(temp_vw, features, fsets, start_time=start_time, end_time=end_time)


# Here we create a null training view and pass it into the training set. We do this because this special kind
# of training set isn't standard. It's not based on a training view, on primary key columns, a label column,
# or a timestamp column . This is simply a joined set of features from different feature sets.
# But we still want to track this in mlflow as a user may build and deploy a model based on this. So we pass in
# a null training view that can be tracked with a "name" (although the name is None). This is a likely case
# for (non time based) clustering use cases.
null_tvw = TrainingView(pk_columns=[], ts_column=None, label_column=None, view_sql=None, name=None,
description=None)
ts = TrainingSet(training_view=null_tvw, features=features, start_time=start_time, end_time=end_time)

# If the user isn't getting historical values, that means there isn't really a start_time, as the user simply
# wants the most up to date values of each feature. So we set start_time to end_time (which is datetime.today)
# For metadata purposes
if current_values_only:
ts.start_time = ts.end_time

if hasattr(self, 'mlflow_ctx'):
self.mlflow_ctx._active_training_set: TrainingSet = ts
ts._register_metadata(self.mlflow_ctx)
return sql if return_sql else self.splice_ctx.df(sql)

def get_training_set_from_view(self, training_view: str, features: Union[List[Feature], List[str]] = None,
start_time: Optional[datetime] = None, end_time: Optional[datetime] = None,
return_sql: bool = False) -> SparkDF or str:
Expand Down Expand Up @@ -394,61 +381,21 @@ def get_training_set_from_view(self, training_view: str, features: Union[List[Fe
:return: Optional[SparkDF, str] The Spark dataframe of the training set or the SQL that is used to generate it (for debugging)
"""

# Get features as list of Features
features = self._process_features(features) if features else self.get_training_view_features(training_view)
# DB-9556 loss of column names on complex sql for NSDS
cols = []

# Get training view information (view primary key column(s), inference ts column, )
tctx = self.get_training_view(training_view)
# SELECT clause
sql = 'SELECT '
for pkcol in tctx.pk_columns: # Select primary key column(s)
sql += f'\n\tctx.{pkcol},'
cols.append(pkcol)

sql += f'\n\tctx.{tctx.ts_column}, ' # Select timestamp column
cols.append(tctx.ts_column)

# TODO: ensure these features exist and fail gracefully if not
for feature in features:
sql += f'\n\tCOALESCE(fset{feature.feature_set_id}.{feature.name},fset{feature.feature_set_id}h.{feature.name}) {feature.name},' # Collect all features over time
cols.append(feature.name)

sql = sql + f'\n\tctx.{tctx.label_column}' if tctx.label_column else sql.rstrip(
',') # Select the optional label col
if tctx.label_column: cols.append(tctx.label_column)

# FROM clause
sql += f'\nFROM ({tctx.view_sql}) ctx '

# JOIN clause
# Get List of necessary Feature Sets
feature_set_ids = list({f.feature_set_id for f in features}) # Distinct set of IDs
feature_sets = self.get_feature_sets(feature_set_ids)
for fset in feature_sets:
# Join Feature Set
sql += f'\nLEFT OUTER JOIN {fset.schema_name}.{fset.table_name} fset{fset.feature_set_id} \n\tON '
for pkcol in fset.pk_columns:
sql += f'fset{fset.feature_set_id}.{pkcol}=ctx.{pkcol} AND '
sql += f' ctx.{tctx.ts_column} >= fset{fset.feature_set_id}.LAST_UPDATE_TS '

# Join Feature Set History
sql += f'\nLEFT OUTER JOIN {fset.schema_name}.{fset.table_name}_history fset{fset.feature_set_id}h \n\tON '
for pkcol in fset.pk_columns:
sql += f' fset{fset.feature_set_id}h.{pkcol}=ctx.{pkcol} AND '
sql += f' ctx.{tctx.ts_column} >= fset{fset.feature_set_id}h.ASOF_TS AND ctx.{tctx.ts_column} < fset{fset.feature_set_id}h.UNTIL_TS'

# WHERE clause on optional start and end times
if start_time or end_time:
sql += '\nWHERE '
if start_time:
sql += f"\n\tctx.{tctx.ts_column} >= '{str(start_time)}' AND"
if end_time:
sql += f"\n\tctx.{tctx.ts_column} <= '{str(end_time)}'"
sql = sql.rstrip('AND')
# Get training view information (view primary key column(s), inference ts column, )
tvw = self.get_training_view(training_view)
# Generate the SQL needed to create the dataset
sql = _generate_training_set_history_sql(tvw, features, feature_sets, start_time=start_time, end_time=end_time)

# Link this to mlflow for model deployment
if hasattr(self, 'mlflow_ctx') and not return_sql:
ts = TrainingSet(training_view=tctx, features=features,
ts = TrainingSet(training_view=tvw, features=features,
start_time=start_time, end_time=end_time)
self.mlflow_ctx._active_training_set: TrainingSet = ts
ts._register_metadata(self.mlflow_ctx)
Expand Down
7 changes: 7 additions & 0 deletions splicemachine/features/training_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ def __init__(self,
self.end_time = end_time or datetime.today()

def _register_metadata(self, mlflow_ctx):
"""
Registers training set with mlflow if the user has registered the feature store in their mlflow session,
and has called either get_training_set or get_training_set_from_view before or during an mlflow run
:param mlflow_ctx: the mlflow context
:return: None
"""
if mlflow_ctx.active_run():
print("There is an active mlflow run, your training set will be logged to that run.")
mlflow_ctx.lp("splice.feature_store.training_set",self.training_view.name)
Expand Down
Loading

0 comments on commit 5ddcf9b

Please sign in to comment.