From 7660e391cb8797693131c471c8cf26649f8891f2 Mon Sep 17 00:00:00 2001 From: Ben Epstein Date: Tue, 5 Jan 2021 09:44:04 -0500 Subject: [PATCH] Feature store api cleanup (#96) * cleanup of api * add context (primary) key to describe feature sets * optional verbose to print sql in create_training_context * added get_feature_dataset * comments * old code * i hate upppercase * commment * sql format * i still hate uppercase * null tx * sql format * sql format * docs * docs * docs * docs * docs * docs * docs * docs * docs * docs * docs * docs * docs * docs * docs * verbose * docs * docs * column ordering * feature param cleanup * training context features * removed clean_df * to_lower * docs * better logic * better logic * label column validation * refactor TrainingContext -> TrainingView, Feature Set Context Key -> Feature Set Join Key * missed one * exclude 2 more funcs * docs * as list * missed some more * hashable * pep * docs * docs * handleinvalid keep * feature_vector_sql * get-features_by_name requires names * exclude members * return Feature, docs fix --- .readthedocs.yaml | 2 +- docs/conf.py | 2 +- docs/getting-started.rst | 34 +- docs/splicemachine.features.rst | 24 +- requirements-docs.txt | 19 + requirements.txt | 1 - splicemachine.egg-info/SOURCES.txt | 4 +- splicemachine/features/__init__.py | 1 + splicemachine/features/constants.py | 65 +- splicemachine/features/feature.py | 23 +- splicemachine/features/feature_set.py | 31 +- splicemachine/features/feature_store.py | 585 ++++++++++++------ splicemachine/features/training_set.py | 12 +- .../{training_context.py => training_view.py} | 10 +- splicemachine/features/utils.py | 9 + .../mlflow_support/mlflow_support.py | 4 +- splicemachine/spark/constants.py | 7 +- splicemachine/spark/context.py | 18 +- 18 files changed, 610 insertions(+), 241 deletions(-) create mode 100644 requirements-docs.txt rename splicemachine/features/{training_context.py => training_view.py} (80%) diff --git a/.readthedocs.yaml b/.readthedocs.yaml index db36f18..46338ec 100644 --- a/.readthedocs.yaml +++ b/.readthedocs.yaml @@ -21,4 +21,4 @@ formats: python: version: 3.7 install: - - requirements: requirements.txt + - requirements: requirements-docs.txt diff --git a/docs/conf.py b/docs/conf.py index 3b1334e..68bca17 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -46,7 +46,7 @@ 'private-members':True, 'inherited-members':True, 'undoc-members': False, - 'exclude-members': '_check_for_splice_ctx,_dropTableIfExists, _generateDBSchema,_getCreateTableSchema,_jstructtype,_spliceSparkPackagesName,_splicemachineContext,apply_patches, main' + 'exclude-members': '_validate_feature_vector_keys,_process_features,__prune_features_for_elimination,_register_metadata,_register_metadata,__update_deployment_status,__log_mlflow_results,__get_feature_importance,__get_pipeline,_validate_training_view,_validate_feature_set,_validate_feature,__validate_feature_data_type,_check_for_splice_ctx,_dropTableIfExists, _generateDBSchema,_getCreateTableSchema,_jstructtype,_spliceSparkPackagesName,_splicemachineContext,apply_patches, main' } # Add any paths that contain templates here, relative to this directory. diff --git a/docs/getting-started.rst b/docs/getting-started.rst index affc963..a29a588 100644 --- a/docs/getting-started.rst +++ b/docs/getting-started.rst @@ -13,11 +13,17 @@ If you are running inside of the Splice Machine Cloud Service in a Jupyter Noteb External Installation --------------------- -If you would like to install outside of the K8s cluster (and use the ExtPySpliceContext), you can install with +If you would like to install outside of the K8s cluster (and use the ExtPySpliceContext), you can install the stable build with .. code-block:: sh - sudo pip install pysplice + sudo pip install git+http://www.github.com/splicemachine/pysplice@2.3.0-k8 + +Or latest with + +.. code-block:: sh + + sudo pip install git+http://www.github.com/splicemachine/pysplice Usage ----- @@ -28,24 +34,40 @@ This section covers importing and instantiating the Native Spark DataSource .. tab:: Native Spark DataSource - To use the Native Spark DataSource inside of the cloud service, first create a Spark Session and then import your PySpliceContext + To use the Native Spark DataSource inside of the `cloud service`_., first create a Spark Session and then import your PySpliceContext .. code-block:: Python from pyspark.sql import SparkSession from splicemachine.spark import PySpliceContext + from splicemachine.mlflow_support import * # Connects your MLflow session automatically + from splicemachine.features import FeatureStore # Splice Machine Feature Store + spark = SparkSession.builder.getOrCreate() - splice = PySpliceContext(spark) + splice = PySpliceContext(spark) # The Native Spark Datasource (PySpliceContext) takes a Spark Session + fs = FeatureStore(splice) # Create your Feature Store + mlflow.register_splice_context(splice) # Gives mlflow native DB connection + mlflow.register_feature_store(fs) # Tracks Feature Store work in Mlflow automatically + .. tab:: External Native Spark DataSource - To use the External Native Spark DataSource, create a Spark Session with your external Jars configured. Then, import your ExtPySpliceContext and set the necessary parameters + To use the External Native Spark DataSource, create a Spark Session with your external Jars configured. Then, import your ExtPySpliceContext and set the necessary parameters. + Once created, the functionality is identical to the internal Native Spark Datasource (PySpliceContext) .. code-block:: Python from pyspark.sql import SparkSession from splicemachine.spark import ExtPySpliceContext + from splicemachine.mlflow_support import * # Connects your MLflow session automatically + from splicemachine.features import FeatureStore # Splice Machine Feature Store + spark = SparkSession.builder.config('spark.jars', '/path/to/splice_spark2-3.0.0.1962-SNAPSHOT-shaded.jar').config('spark.driver.extraClassPath', 'path/to/Splice/jars/dir/*').getOrCreate() JDBC_URL = '' #Set your JDBC URL here. You can get this from the Cloud Manager UI. Make sure to append ';user=;password=' after ';ssl=basic' so you can authenticate in - kafka_server = 'kafka-broker-0-' + JDBC_URL.split('jdbc:splice://jdbc-')[1].split(':1527')[0] + ':19092' # Formatting kafka URL from JDBC + # The ExtPySpliceContext communicates with the database via Kafka + kafka_server = 'kafka-broker-0-' + JDBC_URL.split('jdbc:splice://jdbc-')[1].split(':1527')[0] + ':19092' # Formatting kafka URL from JDBC splice = ExtPySpliceContext(spark, JDBC_URL=JDBC_URL, kafkaServers=kafka_server) + + fs = FeatureStore(splice) # Create your Feature Store + mlflow.register_splice_context(splice) # Gives mlflow native DB connection + mlflow.register_feature_store(fs) # Tracks Feature Store work in Mlflow automatically diff --git a/docs/splicemachine.features.rst b/docs/splicemachine.features.rst index e0e96cf..27cfe5d 100644 --- a/docs/splicemachine.features.rst +++ b/docs/splicemachine.features.rst @@ -19,7 +19,7 @@ Submodules :undoc-members: :show-inheritance: - .. automodule:: splicemachine.features.training_context + .. automodule:: splicemachine.features.training_view :members: :undoc-members: :show-inheritance: @@ -27,7 +27,7 @@ Submodules splicemachine.features.feature_store ---------------------------------- -This Module contains the classes adn APIs for interacting with the Splice Machine Feature Store. +This Module contains the classes and APIs for interacting with the Splice Machine Feature Store. .. automodule:: splicemachine.features.feature_store :members: @@ -37,30 +37,38 @@ This Module contains the classes adn APIs for interacting with the Splice Machin splicemachine.features.feature_set ---------------------------------- -This describes the Python representation of a Feature Set. A feature set is a database table that contains Features and their metadata +This describes the Python representation of a Feature Set. A feature set is a database table that contains Features and their metadata. +The Feature Set class is mostly used internally but can be used by the user to see the available Features in the given +Feature Set, to see the table and schema name it is deployed to (if it is deployed), and to deploy the feature set +(which can also be done directly through the Feature Store). Feature Sets are unique by their schema.table name, as they +exist in the Splice Machine database as a SQL table. They are case insensitive. +To see the full contents of your Feature Set, you can print, return, or .__dict__ your Feature Set object. .. automodule:: splicemachine.features.feature_set :members: - :undoc-members: :show-inheritance: splicemachine.features.Feature ---------------------------------- -This describes the Python representation of a Feature. A feature is a column of a table with particular metadata +This describes the Python representation of a Feature. A Feature is a column of a Feature Set table with particular metadata. +A Feature is the smallest unit in the Feature Store, and each Feature within a Feature Set is individually tracked for changes +to enable full time travel and point-in-time consistent training datasets. Features' names are unique and case insensitive. +To see the full contents of your Feature, you can print, return, or .__dict__ your Feature object. .. automodule:: splicemachine.features.feature :members: :undoc-members: :show-inheritance: -splicemachine.features.training_context +splicemachine.features.training_view ---------------------------------- -This describes the Python representation of a Training Context. A Training Context is a SQL statement defining an event of interest, and metadata around how to create a training dataset with that context +This describes the Python representation of a Training View. A Training View is a SQL statement defining an event of interest, and metadata around how to create a training dataset with that view. +To see the full contents of your Training View, you can print, return, or .__dict__ your Training View object. -.. automodule:: splicemachine.features.training_context +.. automodule:: splicemachine.features.training_view :members: :undoc-members: :show-inheritance: diff --git a/requirements-docs.txt b/requirements-docs.txt new file mode 100644 index 0000000..7cc5ffa --- /dev/null +++ b/requirements-docs.txt @@ -0,0 +1,19 @@ +py4j==0.10.7.0 +pytest +mlflow==1.8.0 +pyyaml==5.3.1 +mleap==0.15.0 +graphviz==0.13 +requests +gorilla==0.3.0 +tqdm==4.43.0 +pyspark-dist-explore==0.1.8 +numpy==1.18.2 +pandas==1.0.3 +scipy==1.4.1 +tensorflow==2.2.1 +pyspark +h2o-pysparkling-2.4==3.28.1.2-1 +sphinx-tabs +IPython +cloudpickle==1.6.0 diff --git a/requirements.txt b/requirements.txt index 7cc5ffa..4abddff 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,6 +14,5 @@ scipy==1.4.1 tensorflow==2.2.1 pyspark h2o-pysparkling-2.4==3.28.1.2-1 -sphinx-tabs IPython cloudpickle==1.6.0 diff --git a/splicemachine.egg-info/SOURCES.txt b/splicemachine.egg-info/SOURCES.txt index 0bf3b7e..49ee57c 100644 --- a/splicemachine.egg-info/SOURCES.txt +++ b/splicemachine.egg-info/SOURCES.txt @@ -14,7 +14,7 @@ splicemachine/features/constants.py splicemachine/features/feature.py splicemachine/features/feature_set.py splicemachine/features/feature_store.py -splicemachine/features/training_context.py +splicemachine/features/training_view.py splicemachine/features/utils.py splicemachine/mlflow_support/__init__.py splicemachine/mlflow_support/constants.py @@ -25,4 +25,4 @@ splicemachine/spark/constants.py splicemachine/spark/context.py splicemachine/spark/test/__init__.py splicemachine/spark/test/context_it.py -splicemachine/spark/test/resources/__init__.py \ No newline at end of file +splicemachine/spark/test/resources/__init__.py diff --git a/splicemachine/features/__init__.py b/splicemachine/features/__init__.py index af14582..19a8dd9 100644 --- a/splicemachine/features/__init__.py +++ b/splicemachine/features/__init__.py @@ -1,3 +1,4 @@ from .feature import Feature from .feature_set import FeatureSet from .feature_store import FeatureStore +from .constants import FeatureType diff --git a/splicemachine/features/constants.py b/splicemachine/features/constants.py index f04fd6a..3f91b4f 100644 --- a/splicemachine/features/constants.py +++ b/splicemachine/features/constants.py @@ -54,8 +54,11 @@ class SQL: """ get_features_by_name = f""" - select feature_id,feature_set_id,Name,Description,feature_data_type, feature_type,Tags,compliance_level, - last_update_ts,last_update_username from {FEATURE_STORE_SCHEMA}.feature where Name in ({{feature_names}}) + select fset.schema_name,fset.table_name,f.Name,f.Description,f.feature_data_type,f.feature_type,f.Tags, + f.compliance_level,f.last_update_ts,f.last_update_username,f.feature_id,f.feature_set_id + from {FEATURE_STORE_SCHEMA}.feature f + join {FEATURE_STORE_SCHEMA}.feature_set fset on f.feature_set_id=fset.feature_set_id + where {{where}} """ get_features_in_feature_set = f""" @@ -72,21 +75,33 @@ class SQL: FROM {FEATURE_STORE_SCHEMA}.feature_set_key GROUP BY 1) p ON fset.feature_set_id=p.feature_set_id """ - get_training_contexts = f""" - SELECT tc.context_id, tc.Name, tc.Description, CAST(SQL_text AS VARCHAR(1000)) context_sql, + + get_training_views = f""" + SELECT tc.view_id, tc.Name, tc.Description, CAST(SQL_text AS VARCHAR(1000)) view_sql, p.pk_columns, ts_column, label_column, - c.context_columns - FROM {FEATURE_STORE_SCHEMA}.training_context tc + c.join_columns + FROM {FEATURE_STORE_SCHEMA}.training_view tc INNER JOIN - (SELECT context_id, STRING_AGG(key_column_name,',') pk_columns FROM {FEATURE_STORE_SCHEMA}.training_context_key WHERE key_type='P' GROUP BY 1) p ON tc.context_id=p.context_id + (SELECT view_id, STRING_AGG(key_column_name,',') pk_columns FROM {FEATURE_STORE_SCHEMA}.training_view_key WHERE key_type='P' GROUP BY 1) p ON tc.view_id=p.view_id INNER JOIN - (SELECT context_id, STRING_AGG(key_column_name,',') context_columns FROM {FEATURE_STORE_SCHEMA}.training_context_key WHERE key_type='C' GROUP BY 1) c ON tc.context_id=c.context_id + (SELECT view_id, STRING_AGG(key_column_name,',') join_columns FROM {FEATURE_STORE_SCHEMA}.training_view_key WHERE key_type='J' GROUP BY 1) c ON tc.view_id=c.view_id + """ + + get_feature_set_join_keys = f""" + SELECT fset.feature_set_id, schema_name, table_name, pk_columns FROM {FEATURE_STORE_SCHEMA}.feature_set fset + INNER JOIN + ( + SELECT feature_set_id, STRING_AGG(key_column_name,'|') pk_columns, STRING_AGG(key_column_data_type,'|') pk_types + FROM {FEATURE_STORE_SCHEMA}.feature_set_key GROUP BY 1 + ) p + ON fset.feature_set_id=p.feature_set_id + where fset.feature_set_id in (select feature_set_id from {FEATURE_STORE_SCHEMA}.feature where name in {{names}} ) """ get_all_features = f"SELECT NAME FROM {FEATURE_STORE_SCHEMA}.feature WHERE Name='{{name}}'" - get_available_features = f""" + get_training_view_features = f""" SELECT f.feature_id, f.feature_set_id, f.NAME, f.DESCRIPTION, f.feature_data_type, f.feature_type, f.TAGS, f.compliance_level, f.last_update_ts, f.last_update_username FROM {FEATURE_STORE_SCHEMA}.Feature f WHERE feature_id IN @@ -96,11 +111,11 @@ class SQL: ( SELECT feature_id FROM ( - SELECT f.feature_id, fsk.KeyCount, count(distinct fsk.key_column_name) ContextKeyMatchCount + SELECT f.feature_id, fsk.KeyCount, count(distinct fsk.key_column_name) JoinKeyMatchCount FROM - {FEATURE_STORE_SCHEMA}.training_context tc + {FEATURE_STORE_SCHEMA}.training_view tc INNER JOIN - {FEATURE_STORE_SCHEMA}.training_context_key c ON c.context_id=tc.context_id AND c.key_type='C' + {FEATURE_STORE_SCHEMA}.training_view_key c ON c.view_id=tc.view_id AND c.key_type='J' INNER JOIN ( SELECT feature_set_id, key_column_name, count(*) OVER (PARTITION BY feature_set_id) KeyCount @@ -111,27 +126,27 @@ class SQL: WHERE {{where}} GROUP BY 1,2 )match_keys - WHERE ContextKeyMatchCount = KeyCount + WHERE JoinKeyMatchCount = KeyCount )fl ) """ - training_context = f""" - INSERT INTO {FEATURE_STORE_SCHEMA}.training_context (Name, Description, SQL_text, ts_column, label_column) + training_view = f""" + INSERT INTO {FEATURE_STORE_SCHEMA}.training_view (Name, Description, SQL_text, ts_column, label_column) VALUES ('{{name}}', '{{desc}}', '{{sql_text}}', '{{ts_col}}', {{label_col}}) """ - get_training_context_id = f""" - SELECT context_id from {FEATURE_STORE_SCHEMA}.Training_Context where Name='{{name}}' + get_training_view_id = f""" + SELECT view_id from {FEATURE_STORE_SCHEMA}.training_view where Name='{{name}}' """ get_fset_primary_keys = f""" select distinct key_column_name from {FEATURE_STORE_SCHEMA}.Feature_Set_Key """ - training_context_keys = f""" - INSERT INTO {FEATURE_STORE_SCHEMA}.training_context_key (Context_ID, Key_Column_Name, Key_Type) - VALUES ({{context_id}}, '{{key_column}}', '{{key_type}}' ) + training_view_keys = f""" + INSERT INTO {FEATURE_STORE_SCHEMA}.training_view_key (View_ID, Key_Column_Name, Key_Type) + VALUES ({{view_id}}, '{{key_column}}', '{{key_type}}' ) """ update_fset_deployment_status = f""" @@ -139,8 +154,8 @@ class SQL: """ training_set = f""" - INSERT INTO {FEATURE_STORE_SCHEMA}.training_set (name, context_id ) - VALUES ('{{name}}', {{context_id}}) + INSERT INTO {FEATURE_STORE_SCHEMA}.training_set (name, view_id ) + VALUES ('{{name}}', {{view_id}}) """ get_training_set_id = f""" @@ -171,10 +186,14 @@ class SQL: ({{model_schema_name}}, {{model_table_name}}, {{model_start_ts}}, {{model_end_ts}}, {{feature_id}}, {{feature_cardinality}}, {{feature_histogram}}, {{feature_mean}}, {{feature_median}}, {{feature_count}}, {{feature_stddev}}) """ + get_feature_vector = """ + SELECT {feature_names} FROM {feature_sets} WHERE + """ + class Columns: feature = ['feature_id', 'feature_set_id', 'name', 'description', 'feature_data_type', 'feature_type', 'tags', 'compliance_level', 'last_update_ts', 'last_update_username'] - training_context = ['context_id','name','description','context_sql','pk_columns','ts_column','label_column','context_columns'] + training_view = ['view_id','name','description','view_sql','pk_columns','ts_column','label_column','join_columns'] feature_set = ['feature_set_id', 'table_name', 'schema_name', 'description', 'pk_columns', 'pk_types', 'deployed'] history_table_pk = ['ASOF_TS','UNTIL_TS'] diff --git a/splicemachine/features/feature.py b/splicemachine/features/feature.py index 87dd1e0..9d8a327 100644 --- a/splicemachine/features/feature.py +++ b/splicemachine/features/feature.py @@ -13,23 +13,31 @@ def __init__(self, *, name, description, feature_data_type, feature_type, tags, self.__dict__.update(args) def is_categorical(self): + """ + Returns if the type of this feature is categorical + """ return self.feature_type == FeatureType.categorical def is_continuous(self): + """ + Returns if the type of this feature is continuous + """ return self.feature_type == FeatureType.continuous def is_ordinal(self): + """ + Returns if the type of this feature is ordinal + """ return self.feature_type == FeatureType.ordinal def _register_metadata(self, splice): """ Registers the feature's existence in the feature store - :return: None """ feature_sql = SQL.feature_metadata.format( feature_set_id=self.feature_set_id, name=self.name, desc=self.description, feature_data_type=self.feature_data_type, - feature_type=self.feature_type, tags=','.join(self.tags) + feature_type=self.feature_type, tags=','.join(self.tags) if isinstance(self.tags, list) else self.tags ) splice.execute(feature_sql) @@ -43,8 +51,19 @@ def __eq__(self, other): def __repr__(self): return self.__str__() + def __str__(self): return f'Feature(FeatureID={self.__dict__.get("feature_id","None")}, ' \ f'FeatureSetID={self.__dict__.get("feature_set_id","None")}, Name={self.name}, \n' \ f'Description={self.description}, FeatureDataType={self.feature_data_type}, ' \ f'FeatureType={self.feature_type}, Tags={self.tags})\n' + + def __hash__(self): + return hash(repr(self)) + + def __lt__(self, other): + if isinstance(other, str): + return self.name < other + elif isinstance(other, Feature): + return self.name < other.name + raise TypeError(f"< not supported between instances of Feature and {type(other)}") diff --git a/splicemachine/features/feature_set.py b/splicemachine/features/feature_set.py index f3babbd..e74c25a 100644 --- a/splicemachine/features/feature_set.py +++ b/splicemachine/features/feature_set.py @@ -27,6 +27,11 @@ def __init__(self, *, splice_ctx: PySpliceContext, table_name, schema_name, desc self.pk_columns = list(primary_keys.keys()) def get_features(self) -> List[Feature]: + """ + Get's all of the features from this featureset as a list of splicemachine.features.Feature + + :return: List[Feature] + """ features = [] if self.feature_set_id: features_df = self.splice_ctx.df(SQL.get_features_in_feature_set.format(feature_set_id=self.feature_set_id)) @@ -36,9 +41,20 @@ def get_features(self) -> List[Feature]: features.append(Feature(**f)) return features + def is_deployed(self): + """ + Returns whether or not this Feature Set has been deployed (the schema.table has been created in the database) + :return: (bool) True if the Feature Set is deployed + """ + return self.deployed + def remove_feature(self, feature: Feature or str): - #TODO - pass + """ + Removes a Feature from the Feature Set. This is not yet implemented + + :param feature: The feature to remove + """ + raise NotImplementedError def get_pk_schema_str(self): return ','.join([f'\n\t{k} {self.primary_keys[k]}' for k in self.primary_keys]) @@ -78,7 +94,10 @@ def __update_deployment_status(self, status: bool): feature_set_id=self.feature_set_id)) - def deploy(self): + def deploy(self, verbose=False): + """ + Deploys the current feature set. Equivalent to calling fs.deploy(schema_name, table_name) + """ old_pk_cols = ','.join(f'OLDW.{p}' for p in self.pk_columns) old_feature_cols = ','.join(f'OLDW.{f.name}' for f in self.get_features()) @@ -98,15 +117,15 @@ def deploy(self): feature_list=self.get_feature_column_str(), old_pk_cols=old_pk_cols, old_feature_cols=old_feature_cols) print('Creating Feature Set...', end=' ') - print('\n', feature_set_sql, '\n') + if verbose: print('\n', feature_set_sql, '\n') self.splice_ctx.execute(feature_set_sql) print('Done.') print('Creating Feature Set History...', end=' ') - print('\n', history_sql, '\n') + if verbose: print('\n', history_sql, '\n') self.splice_ctx.execute(history_sql) print('Done.') print('Creating Historian Trigger...', end=' ') - print('\n', trigger_sql, '\n') + if verbose: print('\n', trigger_sql, '\n') self.splice_ctx.execute(trigger_sql) print('Done.') print('Updating Metadata...') diff --git a/splicemachine/features/feature_store.py b/splicemachine/features/feature_store.py index ff650b6..e9c121b 100644 --- a/splicemachine/features/feature_store.py +++ b/splicemachine/features/feature_store.py @@ -4,6 +4,7 @@ from IPython.display import display import pandas as pd +from pandas import DataFrame as PandasDF from pyspark.sql.dataframe import DataFrame as SparkDF from pyspark.ml import Pipeline @@ -15,10 +16,10 @@ from splicemachine.spark import PySpliceContext from splicemachine.features import Feature, FeatureSet from .training_set import TrainingSet +from .utils import dict_to_lower +from .constants import SQL, FeatureType +from .training_view import TrainingView -from .constants import SQL, Columns, FeatureType -from .training_context import TrainingContext -from .utils import clean_df class FeatureStore: def __init__(self, splice_ctx: PySpliceContext) -> None: @@ -35,7 +36,7 @@ def get_feature_sets(self, feature_set_ids: List[int] = None, _filter: Dict[str, :param feature_set_ids: A list of feature set IDs. If none will return all FeatureSets :param _filter: Dictionary of filters to apply to the query. This filter can be on any attribute of FeatureSets. If None, will return all FeatureSets - :return: List[FeatureSet] + :return: List[FeatureSet] the list of Feature Sets """ feature_sets = [] feature_set_ids = feature_set_ids or [] @@ -53,9 +54,7 @@ def get_feature_sets(self, feature_set_ids: List[int] = None, _filter: Dict[str, sql += f" fset.{fl}='{_filter[fl]}' AND" sql = sql.rstrip('AND') - feature_set_rows = self.splice_ctx.df(sql) - cols = Columns.feature_set - feature_set_rows = clean_df(feature_set_rows, cols) + feature_set_rows = self.splice_ctx.df(sql, to_lower=True) for fs in feature_set_rows.collect(): d = fs.asDict() @@ -65,26 +64,106 @@ def get_feature_sets(self, feature_set_ids: List[int] = None, _filter: Dict[str, feature_sets.append(FeatureSet(splice_ctx=self.splice_ctx, **d)) return feature_sets - def get_training_context(self, training_context: str) -> TrainingContext: + def get_training_set(self, features: Union[List[Feature], List[str]]) -> SparkDF: """ - Gets a training context by name + Gets a set of feature values across feature sets that is not time dependent (ie for non time series clustering). + This feature dataset will be treated and tracked implicitly the same way a training_dataset is tracked from + :py:meth:`features.FeatureStore.get_training_set` . The dataset's metadata and features used will be tracked in mlflow automatically (see + get_training_set for more details). - :param training_context: Training context name - :return: TrainingContext + :param features: List of Features or strings of feature names + + :NOTE: + .. code-block:: text + + The Features Sets which the list of Features come from must have common join keys, + otherwise the function will fail. If there is no common join key, it is recommended to + create a Training View to specify the join conditions. + + :return: Spark DF + """ + features = self._process_features(features) + + sql = SQL.get_feature_set_join_keys.format(names=tuple([f.name for f in features])) + fset_keys: pd.DataFrame = self.splice_ctx.df(sql).toPandas() + # Get max number of pk (join) columns from all feature sets + fset_keys['PK_COLUMNS_COUNT'] = fset_keys['PK_COLUMNS'].apply(lambda x: len(x.split('|'))) + # Get "anchor" feature set. The one we will use to try to join to all others + ind = fset_keys['PK_COLUMNS_COUNT'].idxmax() + anchor_series = fset_keys.iloc[ind] + # Remove that from the list + fset_keys.drop(index=ind, inplace=True) + all_pk_cols = anchor_series.PK_COLUMNS.split('|') + # For each feature set, assert that all join keys exist in our "anchor" feature set + fset_keys['can_join'] = fset_keys['PK_COLUMNS'].map(lambda x: set(x.split('|')).issubset(all_pk_cols)) + if not fset_keys['can_join'].all(): + bad_feature_set_ids = [t.FEATURE_SET_ID for _, t in fset_keys[fset_keys['can_join'] != True].iterrows()] + bad_features = [f.name for f in features if f.feature_set_id in bad_feature_set_ids] + raise SpliceMachineException(f"The provided features do not have a common join key." + f"Remove features {bad_features} from your request") + + # SELECT clause + sql = 'SELECT ' + + sql += ','.join([f'fset{feature.feature_set_id}.{feature.name}' for feature in features]) + + alias = f'fset{anchor_series.FEATURE_SET_ID}' # We use this a lot for joins + sql += f'\nFROM {anchor_series.SCHEMA_NAME}.{anchor_series.TABLE_NAME} {alias} ' + + # JOIN clause + for _, fset in fset_keys.iterrows(): + # Join Feature Set + sql += f'\nLEFT OUTER JOIN {fset.SCHEMA_NAME}.{fset.TABLE_NAME} fset{fset.FEATURE_SET_ID} \n\tON ' + for ind, pkcol in enumerate(fset.PK_COLUMNS.split('|')): + if ind > 0: sql += ' AND ' # In case of multiple columns + sql += f'fset{fset.FEATURE_SET_ID}.{pkcol}={alias}.{pkcol}' + + # Link this to mlflow for model deployment + # Here we create a null training view and pass it into the training set. We do this because this special kind + # of training set isn't standard. It's not based on a training view, on primary key columns, a label column, + # or a timestamp column . This is simply a joined set of features from different feature sets. + # But we still want to track this in mlflow as a user may build and deploy a model based on this. So we pass in + # a null training view that can be tracked with a "name" (although the name is None). This is a likely case + # for (non time based) clustering use cases. + null_tx = TrainingView(pk_columns=[], ts_column=None, label_column=None, view_sql=None, name=None, + description=None) + ts = TrainingSet(training_view=null_tx, features=features) + if hasattr(self, 'mlflow_ctx'): + self.mlflow_ctx._active_training_set: TrainingSet = ts + ts._register_metadata(self.mlflow_ctx) + return self.splice_ctx.df(sql) + + def remove_training_view(self, override=False): + """ + Note: This function is not yet implemented. + Removes a training view. This will run 2 checks. + 1. See if the training view is being used by a model in a deployment. If this is the case, the function will fail, always. + 2. See if the training view is being used in any mlflow runs (non-deployed models). This will fail and return + a warning Telling the user that this training view is being used in mlflow runs (and the run_ids) and that + they will need to "override" this function to forcefully remove the training view. """ - return self.get_training_contexts(_filter={'name': training_context})[0] + raise NotImplementedError - def get_training_contexts(self, _filter: Dict[str, Union[int, str]] = None) -> List[TrainingContext]: + def get_training_view(self, training_view: str) -> TrainingView: """ - Returns a list of all available training contexts with an optional filter + Gets a training view by name + + :param training_view: Training view name + :return: TrainingView + """ + return self.get_training_views(_filter={'name': training_view})[0] + + def get_training_views(self, _filter: Dict[str, Union[int, str]] = None) -> List[TrainingView]: + """ + Returns a list of all available training views with an optional filter :param _filter: Dictionary container the filter keyword (label, description etc) and the value to filter on - If None, will return all TrainingContexts - :return: List[TrainingContext] + If None, will return all TrainingViews + :return: List[TrainingView] """ - training_contexts = [] + training_views = [] - sql = SQL.get_training_contexts + sql = SQL.get_training_views if _filter: sql += ' WHERE ' @@ -92,41 +171,43 @@ def get_training_contexts(self, _filter: Dict[str, Union[int, str]] = None) -> L sql += f"tc.{k}='{_filter[k]}' and" sql = sql.rstrip('and') - training_context_rows = self.splice_ctx.df(sql) + training_view_rows = self.splice_ctx.df(sql, to_lower=True) - cols = Columns.training_context - - training_context_rows = clean_df(training_context_rows, cols) - - for tc in training_context_rows.collect(): + for tc in training_view_rows.collect(): t = tc.asDict() # DB doesn't support lists so it stores , separated vals in a string t['pk_columns'] = t.pop('pk_columns').split(',') - training_contexts.append(TrainingContext(**t)) - return training_contexts + training_views.append(TrainingView(**t)) + return training_views - def get_training_context_id(self, name: str) -> int: + def get_training_view_id(self, name: str) -> int: """ - Returns the unique context ID from a name + Returns the unique view ID from a name - :param name: The training context name - :return: The training context id + :param name: The training view name + :return: The training view id """ - return self.splice_ctx.df(SQL.get_training_context_id.format(name=name)).collect()[0][0] + return self.splice_ctx.df(SQL.get_training_view_id.format(name=name)).collect()[0][0] - def get_features_by_name(self, names: List[str]) -> List[Feature]: + def get_features_by_name(self, names: Optional[List[str]], as_list=False) -> Union[List[Feature], SparkDF]: """ - Returns a list of features whose names are provided + Returns a dataframe or list of features whose names are provided :param names: The list of feature names - :return: The list of features + :param as_list: Whether or not to return a list of features. Default False + :return: SparkDF or List[Feature] The list of Feature objects or Spark Dataframe of features and their metadata. Note, this is not the Feature + values, simply the describing metadata about the features. To create a training dataset with Feature values, see + :py:meth:`features.FeatureStore.get_training_set` or :py:meth:`features.FeatureStore.get_feature_dataset` """ - # Format feature names into quotes strings for search - df = self.splice_ctx.df(SQL.get_features_by_name.format(feature_names=",".join([f"'{i.upper()}'" for i in names]))) - df = clean_df(df, Columns.feature) + # If they don't pass in feature names, get all features + where_clause = "name in (" + ",".join([f"'{i.upper()}'" for i in names]) + ")" + df = self.splice_ctx.df(SQL.get_features_by_name.format(where=where_clause), to_lower=True) + if not as_list: return df + features = [] for feat in df.collect(): f = feat.asDict() + f = dict((k.lower(), v) for k, v in f.items()) # DB returns uppercase column names features.append(Feature(**f)) return features @@ -134,22 +215,72 @@ def remove_feature_set(self): # TODO raise NotImplementedError - def get_feature_vector_sql(self, training_context: str, features: List[Feature], + def _validate_feature_vector_keys(self, join_key_values, feature_sets) -> None: + """ + Validates that all necessary primary keys are provided when requesting a feature vector + + :param join_key_values: dict The primary (join) key columns and values provided by the user + :param feature_sets: List[FeatureSet] the list of Feature Sets derived from the requested Features + :return: None. Raise Exception on bad validation + """ + + feature_set_key_columns = {fkey.lower() for fset in feature_sets for fkey in fset.primary_keys.keys()} + missing_keys = feature_set_key_columns - join_key_values.keys() + assert not missing_keys, f"The following keys were not provided and must be: {missing_keys}" + + def get_feature_vector(self, features: List[Union[str,Feature]], + join_key_values: Dict[str,str], return_sql=False) -> Union[str, PandasDF]: + """ + Gets a feature vector given a list of Features and primary key values for their corresponding Feature Sets + + :param features: List of str Feature names or Features + :param join_key_values: (dict) join key vals to get the proper Feature values formatted as {join_key_column_name: join_key_value} + :param return_sql: Whether to return the SQL needed to get the vector or the values themselves. Default False + :return: Pandas Dataframe or str (SQL statement) + """ + feats: List[Feature] = self._process_features(features) + # Match the case of the keys + join_keys = dict_to_lower(join_key_values) + + # Get the feature sets and their primary key column names + feature_sets = self.get_feature_sets([f.feature_set_id for f in feats]) + self._validate_feature_vector_keys(join_keys, feature_sets) + + + feature_names = ','.join([f.name for f in feats]) + fset_tables = ','.join([f'{fset.schema_name}.{fset.table_name} fset{fset.feature_set_id}' for fset in feature_sets]) + sql = "SELECT {feature_names} FROM {fset_tables} ".format(feature_names=feature_names, fset_tables=fset_tables) + + # For each Feature Set, for each primary key in the given feature set, get primary key value from the user provided dictionary + pk_conditions = [f"fset{fset.feature_set_id}.{pk_col} = {join_keys[pk_col.lower()]}" + for fset in feature_sets for pk_col in fset.primary_keys] + pk_conditions = ' AND '.join(pk_conditions) + + sql += f"WHERE {pk_conditions}" + + return sql if return_sql else self.splice_ctx.df(sql).toPandas() + + def get_feature_vector_sql_from_training_view(self, training_view: str, features: List[Feature], include_insert: Optional[bool] = True) -> str: """ Returns the parameterized feature retrieval SQL used for online model serving. - :param training_context_id: (str) The name of the registered training context + :param training_view: (str) The name of the registered training view :param features: (List[str]) the list of features from the feature store to be included in the training - * NOTE: This function will error if the context SQL is missing a context key required to retrieve the\ - desired features + + :NOTE: + .. code-block:: text + + This function will error if the view SQL is missing a view key required to retrieve the\ + desired features + :param include_insert: (Optional[bool]) determines whether insert into model table is included in the SQL statement - :return : (str) + :return: (str) the parameterized feature vector SQL """ - # Get training context information (ctx primary key column(s), ctx primary key inference ts column, ) - cid = self.get_training_context_id(training_context) - tctx = self.get_training_contexts(_filter={'context_id': cid})[0] + # Get training view information (ctx primary key column(s), ctx primary key inference ts column, ) + vid = self.get_training_view_id(training_view) + tctx = self.get_training_views(_filter={'view_id': vid})[0] # optional INSERT prefix if (include_insert): @@ -190,27 +321,26 @@ def get_feature_vector_sql(self, training_context: str, features: List[Feature], return sql - def get_feature_context_keys(self, features: List[str]) -> Dict[str, List[str]]: + def get_feature_primary_keys(self, features: List[str]) -> Dict[str, List[str]]: """ - Returns a dictionary mapping each individual feature to its primary key(s) + Returns a dictionary mapping each individual feature to its primary key(s). This function is not yet implemented. :param features: (List[str]) The list of features to get primary keys for - :return: Dict[str, List[str]] + :return: Dict[str, List[str]] A mapping of {feature name: [pk1, pk2, etc]} """ pass - def get_available_features(self, training_context: str) -> List[Feature]: + def get_training_view_features(self, training_view: str) -> List[Feature]: """ - Returns the available features for the given a training context name + Returns the available features for the given a training view name - :param training_context: The name of the training context - :return: A list of available features + :param training_view: The name of the training view + :return: A list of available Feature objects """ - where = f"tc.Name='{training_context}'" + where = f"tc.Name='{training_view}'" - df = self.splice_ctx.df(SQL.get_available_features.format(where=where)) + df = self.splice_ctx.df(SQL.get_training_view_features.format(where=where), to_lower=True) - df = clean_df(df, Columns.feature) features = [] for feat in df.collect(): f = feat.asDict() @@ -218,34 +348,58 @@ def get_available_features(self, training_context: str) -> List[Feature]: return features def get_feature_description(self): - #TODO + # TODO raise NotImplementedError - def get_training_set(self, training_context: str, features: Union[List[Feature],List[str]], start_time: Optional[datetime] = None, - end_time: Optional[datetime] = None, return_sql: bool = False) -> SparkDF or str: - """ - Returns the training set as a Spark Dataframe - - :param training_context: (str) The name of the registered training context + def get_training_set_from_view(self, training_view: str, features: Union[List[Feature], List[str]] = None, + start_time: Optional[datetime] = None, end_time: Optional[datetime] = None, + return_sql: bool = False) -> SparkDF or str: + """ + Returns the training set as a Spark Dataframe from a Training View. When a user calls this function (assuming they have registered + the feature store with mlflow using :py:meth:`~mlflow.register_feature_store` ) + the training dataset's metadata will be tracked in mlflow automatically. The following will be tracked: + including: + * Training View + * Selected features + * Start time + * End time + This tracking will occur in the current run (if there is an active run) + or in the next run that is started after calling this function (if no run is currently active). + + :param training_view: (str) The name of the registered training view :param features: (List[str] OR List[Feature]) the list of features from the feature store to be included in the training. - If a list of strings is passed in it will be converted to a list of Feature - * NOTE: This function will error if the context SQL is missing a context key required to retrieve the\ - desired features + If a list of strings is passed in it will be converted to a list of Feature. If not provided will return all available features. + + :NOTE: + .. code-block:: text + + This function will error if the view SQL is missing a join key required to retrieve the + desired features + :param start_time: (Optional[datetime]) The start time of the query (how far back in the data to start). Default None - * NOTE: If start_time is None, query will start from beginning of history - :param end_time: (Optional[datetime]) The end time of the query (how far recent in the data to get). Default None + + :NOTE: + .. code-block:: text + + If start_time is None, query will start from beginning of history + :param end_time: (Optional[datetime]) The end time of the query (how far recent in the data to get). Default None - * NOTE: If end_time is None, query will get most recently available data + + :NOTE: + .. code-block:: text + + If end_time is None, query will get most recently available data + :param return_sql: (Optional[bool]) Return the SQL statement (str) instead of the Spark DF. Defaults False - :return: Optional[SparkDF, str] + :return: Optional[SparkDF, str] The Spark dataframe of the training set or the SQL that is used to generate it (for debugging) """ - features = self.get_features_by_name(features) if all([isinstance(i,str) for i in features]) else features + features = self._process_features(features) if features else self.get_training_view_features(training_view) # DB-9556 loss of column names on complex sql for NSDS cols = [] - # Get training context information (ctx primary key column(s), ctx primary key inference ts column, ) - tctx = self.get_training_context(training_context) + # Get training view information (view primary key column(s), inference ts column, ) + tctx = self.get_training_view(training_view) # SELECT clause sql = 'SELECT ' for pkcol in tctx.pk_columns: # Select primary key column(s) @@ -265,7 +419,7 @@ def get_training_set(self, training_context: str, features: Union[List[Feature], if tctx.label_column: cols.append(tctx.label_column) # FROM clause - sql += f'\nFROM ({tctx.context_sql}) ctx ' + sql += f'\nFROM ({tctx.view_sql}) ctx ' # JOIN clause feature_set_ids = list({f.feature_set_id for f in features}) # Distinct set of IDs @@ -293,13 +447,13 @@ def get_training_set(self, training_context: str, features: Union[List[Feature], sql = sql.rstrip('AND') # Link this to mlflow for model deployment - if hasattr(self, 'mlflow_ctx'): - ts = TrainingSet(training_context=tctx, features=features, - start_time=start_time, end_time=end_time) + if hasattr(self, 'mlflow_ctx') and not return_sql: + ts = TrainingSet(training_view=tctx, features=features, + start_time=start_time, end_time=end_time) self.mlflow_ctx._active_training_set: TrainingSet = ts ts._register_metadata(self.mlflow_ctx) - return sql if return_sql else clean_df(self.splice_ctx.df(sql), cols) + return sql if return_sql else self.splice_ctx.df(sql) def list_training_sets(self) -> Dict[str, Optional[str]]: """ @@ -308,45 +462,45 @@ def list_training_sets(self) -> Dict[str, Optional[str]]: :return: Dict[str, Optional[str]] """ - pass + raise NotImplementedError("To see available training views, run fs.describe_training_views()") - def _validate_feature_set(self, schema, table): + def _validate_feature_set(self, schema_name, table_name): """ Asserts a feature set doesn't already exist in the database - :param schema: schema name of the feature set - :param table: table name of the feature set + :param schema_name: schema name of the feature set + :param table_name: table name of the feature set :return: None """ - str = f'Feature Set {schema}.{table} already exists. Use a different schema and/or table name.' + str = f'Feature Set {schema_name}.{table_name} already exists. Use a different schema and/or table name.' # Validate Table - assert not self.splice_ctx.tableExists(schema, table_name=table), str + assert not self.splice_ctx.tableExists(schema_name, table_name=table_name), str # Validate metadata - assert len(self.get_feature_sets(_filter={'table_name': table, 'schema_name': schema})) == 0, str + assert len(self.get_feature_sets(_filter={'table_name': table_name, 'schema_name': schema_name})) == 0, str - def create_feature_set(self, schema: str, table: str, primary_keys: Dict[str, str], + def create_feature_set(self, schema_name: str, table_name: str, primary_keys: Dict[str, str], desc: Optional[str] = None) -> FeatureSet: """ Creates and returns a new feature set - :param schema: - :param name: - :param pk_columns: - :param feature_column: - :param desc: + :param schema_name: The schema under which to create the feature set table + :param table_name: The table name for this feature set + :param primary_keys: The primary key column(s) of this feature set + :param desc: The (optional) description :return: FeatureSet """ - self._validate_feature_set(schema, table) - fset = FeatureSet(splice_ctx=self.splice_ctx, schema_name=schema, table_name=table, primary_keys=primary_keys, + self._validate_feature_set(schema_name, table_name) + fset = FeatureSet(splice_ctx=self.splice_ctx, schema_name=schema_name, table_name=table_name, + primary_keys=primary_keys, description=desc) self.feature_sets.append(fset) - print(f'Registering feature set {schema}.{table} in Feature Store') + print(f'Registering feature set {schema_name}.{table_name} in Feature Store') fset._register_metadata() return fset def _validate_feature(self, name): """ Ensures that the feature doesn't exist as all features have unique names - :param name: + :param name: the Feature name :return: """ # TODO: Capitalization of feature name column @@ -355,112 +509,168 @@ def _validate_feature(self, name): l = self.splice_ctx.df(SQL.get_all_features.format(name=name.upper())).count() assert l == 0, str - if not re.match('^[A-Za-z][A-Za-z0-9_]*$', name): + if not re.match('^[A-Za-z][A-Za-z0-9_]*$', name, re.IGNORECASE): raise SpliceMachineException('Feature name does not conform. Must start with an alphabetic character, ' 'and can only contains letters, numbers and underscores') - def create_feature(self, *, feature_set_schema: str, feature_set_table: str, - name: str, description: str, feature_data_type: str, feature_type: FeatureType, tags: List[str]): + def __validate_feature_data_type(self, feature_data_type: str): + """ + Validated that the provided feature data type is a valid SQL data type + :param feature_data_type: the feature data type + :return: None + """ + from splicemachine.spark.constants import SQL_TYPES + if not feature_data_type.split('(')[0] in SQL_TYPES: + raise SpliceMachineException(f"The datatype you've passed in, {feature_data_type} is not a valid SQL type. " + f"Valid types are {SQL_TYPES}") + + def create_feature(self, schema_name: str, table_name: str, name: str, feature_data_type: str, + feature_type: FeatureType, desc: str = None, tags: List[str] = None): """ Add a feature to a feature set - :param feature_set_schema: The feature set schema - :param feature_set_table: The feature set table name to add the feature to + + :param schema_name: The feature set schema + :param table_name: The feature set table name to add the feature to :param name: The feature name - :param description: The feature description :param feature_data_type: The datatype of the feature. Must be a valid SQL datatype - :param feature_type: FeatureType of the feature. Available are FeatureType.[categorical, ordinal, continuous] - :param tags: List of (str) tag words - :return: + :param feature_type: splicemachine.features.FeatureType of the feature. The available types are from the FeatureType class: FeatureType.[categorical, ordinal, continuous]. + You can see available feature types by running + + .. code-block:: python + + from splicemachine.features import FeatureType + print(FeatureType.get_valid()) + + :param desc: The (optional) feature description (default None) + :param tags: (optional) List of (str) tag words (default None) + :return: Feature created """ - fset: FeatureSet = \ - self.get_feature_sets(_filter={'table_name': feature_set_table, 'schema_name': feature_set_schema})[0] + self.__validate_feature_data_type(feature_data_type) + if self.splice_ctx.tableExists(schema_name, table_name): + raise SpliceMachineException(f"Feature Set {schema_name}.{table_name} is already deployed. You cannot " + f"add features to a deployed feature set.") + fset: FeatureSet = self.get_feature_sets(_filter={'table_name': table_name, 'schema_name': schema_name})[0] self._validate_feature(name) - f = Feature(name=name, description=description, feature_data_type=feature_data_type, - feature_type=feature_type, tags=tags, feature_set_id=fset.feature_set_id) + f = Feature(name=name, description=desc or '', feature_data_type=feature_data_type, + feature_type=feature_type, tags=tags or [], feature_set_id=fset.feature_set_id) print(f'Registering feature {f.name} in Feature Store') f._register_metadata(self.splice_ctx) + return f # TODO: Backfill the feature - def _validate_training_context(self, name, sql, context_keys): + def _validate_training_view(self, name, sql, join_keys, label_col=None): """ - Validates that the training context doesn't already exist. - #TODO: Validate the SQL for the training_sql (if possible?) - #TODO: Validate context_keys, primary_key etc... - :param name: The training context name - :return: None + Validates that the training view doesn't already exist. + + :param name: The training view name + :param sql: The training view provided SQL + :param join_keys: The provided join keys when creating the training view + :param label_col: The label column + :return: """ # Validate name doesn't exist - assert len(self.get_training_contexts(_filter={'name': name})) == 0, f"Training context {name} already exists!" + assert len(self.get_training_views(_filter={'name': name})) == 0, f"Training View {name} already exists!" # Column comparison - # Lazily evaluate sql resultset, ensure that the result contains all columns matching pks, context_keys, tscol and label_col + # Lazily evaluate sql resultset, ensure that the result contains all columns matching pks, join_keys, tscol and label_col from py4j.protocol import Py4JJavaError try: - context_sql_df = self.splice_ctx.df(sql) + valid_df = self.splice_ctx.df(sql) except Py4JJavaError as e: if 'SQLSyntaxErrorException' in str(e.java_exception): raise SpliceMachineException(f'The provided SQL is incorrect. The following error was raised during ' f'validation:\n\n{str(e.java_exception)}') from None raise e - # FIXME: We cannot move forward here until https://splicemachine.atlassian.net/browse/DB-9556 - # Confirm that all context_keys provided correspond to primary keys of created feature sets + # Ensure the label column specified is in the output of the SQL + if label_col: assert label_col in valid_df.columns, f"Provided label column {label_col} is not available in the provided SQL" + # Confirm that all join_keys provided correspond to primary keys of created feature sets pks = set(i[0].upper() for i in self.splice_ctx.df(SQL.get_fset_primary_keys).collect()) - missing_keys = set(i.upper() for i in context_keys) - pks - assert not missing_keys, f"Not all provided context keys exist. Remove {missing_keys} or " \ + missing_keys = set(i.upper() for i in join_keys) - pks + assert not missing_keys, f"Not all provided join keys exist. Remove {missing_keys} or " \ f"create a feature set that uses the missing keys" - def create_training_context(self, *, name: str, sql: str, primary_keys: List[str], context_keys: List[str], - ts_col: str, label_col: Optional[str] = None, replace: Optional[bool] = False, - desc: Optional[str] = None) -> None: + def create_training_view(self, name: str, sql: str, primary_keys: List[str], join_keys: List[str], + ts_col: str, label_col: Optional[str] = None, replace: Optional[bool] = False, + desc: Optional[str] = None, verbose=False) -> None: """ - Registers a training context for use in generating training SQL + Registers a training view for use in generating training SQL :param name: The training set name. This must be unique to other existing training sets unless replace is True :param sql: (str) a SELECT statement that includes: * the primary key column(s) - uniquely identifying a training row/case * the inference timestamp column - timestamp column with which to join features (temporal join timestamp) - * context key(s) - the references to the other feature tables' primary keys (ie customer_id, location_id) + * join key(s) - the references to the other feature tables' primary keys (ie customer_id, location_id) * (optionally) the label expression - defining what the training set is trying to predict :param primary_keys: (List[str]) The list of columns from the training SQL that identify the training row - :param ts_col: (Optional[str]) The timestamp column of the training SQL that identifies the inference timestamp + :param ts_col: The timestamp column of the training SQL that identifies the inference timestamp :param label_col: (Optional[str]) The optional label column from the training SQL. - :param replace: (Optional[bool]) Whether to replace an existing training set - :param context_keys: (List[str]) A list of context keys in the sql that are used to get the desired features in + :param replace: (Optional[bool]) Whether to replace an existing training view + :param join_keys: (List[str]) A list of join keys in the sql that are used to get the desired features in get_training_set :param desc: (Optional[str]) An optional description of the training set + :param verbose: Whether or not to print the SQL before execution (default False) :return: """ - self._validate_training_context(name, sql, context_keys) - # register_training_context() + assert name != "None", "Name of training view cannot be None!" + self._validate_training_view(name, sql, join_keys, label_col) + # register_training_view() label_col = f"'{label_col}'" if label_col else "NULL" # Formatting incase NULL - train_sql = SQL.training_context.format(name=name, desc=desc or 'None Provided', sql_text=sql, ts_col=ts_col, - label_col=label_col) + train_sql = SQL.training_view.format(name=name, desc=desc or 'None Provided', sql_text=sql, ts_col=ts_col, + label_col=label_col) print('Building training sql...') - print('\t', train_sql) + if verbose: print('\t', train_sql) self.splice_ctx.execute(train_sql) print('Done.') - # Get generated context ID - cid = self.get_training_context_id(name) + # Get generated view ID + vid = self.get_training_view_id(name) - print('Creating Context Keys') - for i in context_keys: - key_sql = SQL.training_context_keys.format(context_id=cid, key_column=i.upper(), key_type='C') - print(f'\tCreating Context Key {i}...') - print('\t', key_sql) + print('Creating Join Keys') + for i in join_keys: + key_sql = SQL.training_view_keys.format(view_id=vid, key_column=i.upper(), key_type='J') + print(f'\tCreating Join Key {i}...') + if verbose: print('\t', key_sql) self.splice_ctx.execute(key_sql) print('Done.') - print('Creating Primary Keys') + print('Creating Training View Primary Keys') for i in primary_keys: - key_sql = SQL.training_context_keys.format(context_id=cid, key_column=i.upper(), key_type='P') + key_sql = SQL.training_view_keys.format(view_id=vid, key_column=i.upper(), key_type='P') print(f'\tCreating Primary Key {i}...') - print('\t', key_sql) + if verbose: print('\t', key_sql) self.splice_ctx.execute(key_sql) print('Done.') - def deploy_feature_set(self, feature_set_schema, feature_set_table): - fset = self.get_feature_sets(_filter={'schema_name': feature_set_schema, 'table_name': feature_set_table})[0] + def _process_features(self, features: List[Union[Feature, str]]) -> List[Feature]: + """ + Process a list of Features parameter. If the list is strings, it converts them to Features, else returns itself + + :param features: The list of Feature names or Feature objects + :return: List[Feature] + """ + feat_str = [f for f in features if isinstance(f, str)] + str_to_feat = self.get_features_by_name(names=feat_str, as_list=True) if feat_str else [] + all_features = str_to_feat + [f for f in features if not isinstance(f, str)] + assert all( + [isinstance(i, Feature) for i in all_features]), "It seems you've passed in Features that are neither" \ + " a feature name (string) or a Feature object" + return all_features + + def deploy_feature_set(self, schema_name, table_name): + """ + Deploys a feature set to the database. This persists the feature stores existence. + As of now, once deployed you cannot delete the feature set or add/delete features. + The feature set must have already been created with :py:meth:`~features.FeatureStore.create_feature_set` + + :param schema_name: The schema of the created feature set + :param table_name: The table of the created feature set + """ + try: + fset = self.get_feature_sets(_filter={'schema_name': schema_name, 'table_name': table_name})[0] + except: + raise SpliceMachineException( + f"Cannot find feature set {schema_name}.{table_name}. Ensure you've created this" + f"feature set using fs.create_feature_set before deploying.") fset.deploy() def describe_feature_sets(self) -> None: @@ -472,56 +682,61 @@ def describe_feature_sets(self) -> None: """ print('Available feature sets') for fset in self.get_feature_sets(): - print('-' * 200) + print('-' * 23) self.describe_feature_set(fset.schema_name, fset.table_name) - def describe_feature_set(self, feature_set_schema: str, feature_set_table: str) -> None: + def describe_feature_set(self, schema_name: str, table_name: str) -> None: """ Prints out a description of a given feature set, with all features in the feature set and whether the feature set is deployed - :param feature_set_schema: feature set schema name - :param feature_set_table: feature set table name + :param schema_name: feature set schema name + :param table_name: feature set table name :return: None """ - fset = self.get_feature_sets(_filter={'schema_name': feature_set_schema, 'table_name': feature_set_table}) - if not fset: raise SpliceMachineException(f"Feature Set {feature_set_schema}.{feature_set_table} not found. Check name and try again.") + fset = self.get_feature_sets(_filter={'schema_name': schema_name, 'table_name': table_name}) + if not fset: raise SpliceMachineException( + f"Feature Set {schema_name}.{table_name} not found. Check name and try again.") fset = fset[0] print(f'{fset.schema_name}.{fset.table_name} - {fset.description}') - print('\n\tAvailable features:') + print('Primary keys:', fset.primary_keys) + print('\nAvailable features:') display(pd.DataFrame(f.__dict__ for f in fset.get_features())) - def describe_training_contexts(self) -> None: + def describe_training_views(self) -> None: """ - Prints out a description of all training contexts, the ID, name, description and optional label + Prints out a description of all training views, the ID, name, description and optional label - :param training_context: The training context name + :param training_view: The training view name :return: None """ - print('Available training contexts') - for tcx in self.get_training_contexts(): - print('-' * 200) - self.describe_training_context(tcx.name) + print('Available training views') + for tcx in self.get_training_views(): + print('-' * 23) + self.describe_training_view(tcx.name) - def describe_training_context(self, training_context: str) -> None: + def describe_training_view(self, training_view: str) -> None: """ - Prints out a description of a given training context, the ID, name, description and optional label + Prints out a description of a given training view, the ID, name, description and optional label - :param training_context: The training context name + :param training_view: The training view name :return: None """ - tcx = self.get_training_contexts(_filter={'name': training_context}) - if not tcx: raise SpliceMachineException(f"Training context {training_context} not found. Check name and try again.") + tcx = self.get_training_views(_filter={'name': training_view}) + if not tcx: raise SpliceMachineException(f"Training view {training_view} not found. Check name and try again.") tcx = tcx[0] - print(f'ID({tcx.context_id}) {tcx.name} - {tcx.description} - LABEL: {tcx.label_column}') + print(f'ID({tcx.view_id}) {tcx.name} - {tcx.description} - LABEL: {tcx.label_column}') print(f'Available features in {tcx.name}:') - feats: List[Feature] = self.get_available_features(tcx.name) + feats: List[Feature] = self.get_training_view_features(tcx.name) # Grab the feature set info and their corresponding names (schema.table) for the display table feat_sets: List[FeatureSet] = self.get_feature_sets(feature_set_ids=[f.feature_set_id for f in feats]) - feat_sets: Dict[int,str] = {fset.feature_set_id: f'{fset.schema_name}.{fset.table_name}' for fset in feat_sets} + feat_sets: Dict[int, str] = {fset.feature_set_id: f'{fset.schema_name}.{fset.table_name}' for fset in feat_sets} for f in feats: f.feature_set_name = feat_sets[f.feature_set_id] - display(pd.DataFrame(f.__dict__ for f in feats)) + col_order = ['name', 'description', 'feature_data_type', 'feature_set_name', 'feature_type', 'tags', + 'last_update_ts', + 'last_update_username', 'compliance_level', 'feature_set_id', 'feature_id'] + display(pd.DataFrame(f.__dict__ for f in feats)[col_order]) def set_feature_description(self): raise NotImplementedError @@ -544,7 +759,7 @@ def __get_pipeline(self, df, features, label, model_type): si = [StringIndexer(inputCol=n, outputCol=f'{n}_index', handleInvalid='keep') for n in categorical_features] all_features = numeric_features + indexed_features - v = VectorAssembler(inputCols=all_features, outputCol='features') + v = VectorAssembler(inputCols=all_features, outputCol='features', handleInvalid='keep') if model_type == 'classification': si += [StringIndexer(inputCol=label, outputCol=f'{label}_index', handleInvalid='keep')] clf = RandomForestClassifier(labelCol=f'{label}_index') @@ -580,18 +795,36 @@ def __log_mlflow_results(self, name, rounds, mlflow_results): with self.mlflow_ctx.start_run(run_name=f'Round {r}', nested=True): self.mlflow_ctx.log_metrics(mlflow_results[r]) - def run_feature_elimination(self, df, features, label: str = 'label', n: int = 10, verbose: int = 0, - model_type: str = 'classification', step: int = 1, log_mlflow: bool = False, - mlflow_run_name: str = None, return_importances: bool = False): + def __prune_features_for_elimination(self, features) -> List[Feature]: + """ + Removes incompatible features from the provided list if they are not compatible with SparkML modeling + + :param features: List[Feature] the provided list + :return: List[Features] the pruned list + """ + from splicemachine.spark.constants import SQL_MODELING_TYPES + invalid_features = {f for f in features if f.feature_data_type not in SQL_MODELING_TYPES} + valid_features = list(set(features) - invalid_features) + if invalid_features: print('The following features are invalid for modeling based on their Data Types:\n') + for f in invalid_features: + print(f.name, f.feature_data_type) + return valid_features + + def run_feature_elimination(self, df, features: List[Union[str, Feature]], label: str = 'label', n: int = 10, + verbose: int = 0, model_type: str = 'classification', step: int = 1, + log_mlflow: bool = False, mlflow_run_name: str = None, + return_importances: bool = False): """ Runs feature elimination using a Spark decision tree on the dataframe passed in. Optionally logs results to mlflow :param df: The dataframe with features and label - :param label: the label column + :param features: The list of feature names (or Feature objects) to run elimination on + :param label: the label column names :param n: The number of features desired. Default 10 :param verbose: The level of verbosity. 0 indicated no printing. 1 indicates printing remaining features after each round. 2 indicates print features and relative importances after each round. Default 0 + :param model_type: Whether the model to test with will be a regression or classification model. Default classification :param log_mlflow: Whether or not to log results to mlflow as nested runs. Default false :param mlflow_run_name: The name of the parent run under which all subsequent runs will live. The children run names will be {mlflow_run_name}_{num_features}_features. ie testrun_5_features, testrun_4_features etc @@ -599,7 +832,8 @@ def run_feature_elimination(self, df, features, label: str = 'label', n: int = 1 """ train_df = df - remaining_features = features + features = self._process_features(features) + remaining_features = self.__prune_features_for_elimination(features) rnd = 0 mlflow_results = [] assert len( @@ -612,7 +846,8 @@ def run_feature_elimination(self, df, features, label: str = 'label', n: int = 1 model = self.__get_pipeline(train_df, remaining_features, label, model_type) print('Getting feature importance') feature_importances = self.__get_feature_importance(model.stages[-1].featureImportances, - model.transform(train_df), "features").head(num_features) + model.transform(train_df), "features").head( + num_features) remaining_features_and_label = list(feature_importances['name'].values) + [label] train_df = train_df.select(*remaining_features_and_label) remaining_features = [f for f in remaining_features if f.name in feature_importances['name'].values] diff --git a/splicemachine/features/training_set.py b/splicemachine/features/training_set.py index 4b3ea1f..131eb4e 100644 --- a/splicemachine/features/training_set.py +++ b/splicemachine/features/training_set.py @@ -1,4 +1,4 @@ -from .training_context import TrainingContext +from .training_view import TrainingView from .feature import Feature from typing import List, Optional from datetime import datetime @@ -12,12 +12,12 @@ class TrainingSet: """ def __init__(self, *, - training_context: TrainingContext, + training_view: TrainingView, features: List[Feature], - start_time: Optional[datetime], - end_time: Optional[datetime] + start_time: Optional[datetime] = None, + end_time: Optional[datetime] = None ): - self.training_context = training_context + self.training_view = training_view self.features = features self.start_time = start_time or datetime.min self.end_time = end_time or datetime.today() @@ -25,7 +25,7 @@ def __init__(self, def _register_metadata(self, mlflow_ctx): if mlflow_ctx.active_run(): print("There is an active mlflow run, your training set will be logged to that run.") - mlflow_ctx.lp("splice.feature_store.training_set",self.training_context.name) + mlflow_ctx.lp("splice.feature_store.training_set",self.training_view.name) mlflow_ctx.lp("splice.feature_store.training_set_start_time",str(self.start_time)) mlflow_ctx.lp("splice.feature_store.training_set_end_time",str(self.end_time)) mlflow_ctx.lp("splice.feature_store.training_set_num_features", len(self.features)) diff --git a/splicemachine/features/training_context.py b/splicemachine/features/training_view.py similarity index 80% rename from splicemachine/features/training_context.py rename to splicemachine/features/training_view.py index ae9134e..6b857d8 100644 --- a/splicemachine/features/training_context.py +++ b/splicemachine/features/training_view.py @@ -1,11 +1,11 @@ from typing import List -class TrainingContext: - def __init__(self, *, pk_columns: List[str], ts_column, label_column, context_sql, name, description, **kwargs): +class TrainingView: + def __init__(self, *, pk_columns: List[str], ts_column, label_column, view_sql, name, description, **kwargs): self.pk_columns = pk_columns self.ts_column = ts_column self.label_column = label_column - self.context_sql = context_sql + self.view_sql = view_sql self.name = name self.description = description args = {k.lower(): kwargs[k] for k in kwargs} # Make all keys lowercase @@ -14,11 +14,11 @@ def __init__(self, *, pk_columns: List[str], ts_column, label_column, context_sq def __repr__(self): - return f'TrainingContext(' \ + return f'TrainingView(' \ f'PKColumns={self.pk_columns}, ' \ f'TSColumn={self.ts_column}, ' \ f'LabelColumn={self.label_column}, \n' \ - f'ContextSQL={self.context_sql}' + f'ViewSQL={self.view_sql}' def __str__(self): return self.__repr__() diff --git a/splicemachine/features/utils.py b/splicemachine/features/utils.py index 03dbaea..32a6254 100644 --- a/splicemachine/features/utils.py +++ b/splicemachine/features/utils.py @@ -2,3 +2,12 @@ def clean_df(df, cols): for old,new in zip(df.columns, cols): df = df.withColumnRenamed(old,new) return df + +def dict_to_lower(dict): + """ + Converts a dictionary to all lowercase keys + + :param dict: The dictionary + :return: The lowercased dictionary + """ + return {i.lower():dict[i] for i in dict} diff --git a/splicemachine/mlflow_support/mlflow_support.py b/splicemachine/mlflow_support/mlflow_support.py index dcf2c0b..294e019 100644 --- a/splicemachine/mlflow_support/mlflow_support.py +++ b/splicemachine/mlflow_support/mlflow_support.py @@ -775,11 +775,11 @@ def _deploy_kubernetes(run_id: str, service_port: int = 80, :param service_port: (default 80) the port that the prediction service runs on internally in the cluster :param autoscaling_enabled: (default False) whether or not to provision a Horizontal Pod Autoscaler to provision pods dynamically - :param max_replicas (default 2) [USED IF AUTOSCALING ENABLED] max number of pods to scale up to + :param max_replicas: (default 2) [USED IF AUTOSCALING ENABLED] max number of pods to scale up to :param target_cpu_utilization: (default 50) [USED IF AUTOSCALING ENABLED] the cpu utilization to scale up to new pods on :param disable_nginx: (default False) disable nginx inside of the pod (recommended) - :param gunicorn_workers: (default 1) [MUST BE 1 FOR SPARK TO PREVENT OOM] Number of web workers. + :param gunicorn_workers: (default 1) [MUST BE 1 FOR SPARK ML models TO PREVENT OOM] Number of web workers. :param resource_requests_enabled: (default False) whether or not to enable Kubernetes resource requests :param resource_limits_enabled: (default False) whether or not to enable Kubernetes resource limits :param cpu_request: (default 0.5) [USED IF RESOURCE REQUESTS ENABLED] number of CPU to request diff --git a/splicemachine/spark/constants.py b/splicemachine/spark/constants.py index 4dcdbf4..e43bb0f 100644 --- a/splicemachine/spark/constants.py +++ b/splicemachine/spark/constants.py @@ -16,4 +16,9 @@ } SQL_TYPES = ['CHAR', 'LONG VARCHAR', 'VARCHAR', 'DATE', 'TIME', 'TIMESTAMP', 'BLOB', 'CLOB', 'TEXT', 'BIGINT', - 'DECIMAL', 'DOUBLE', 'DOUBLE PRECISION', 'INTEGER', 'NUMERIC', 'REAL', 'SMALLINT', 'TINYINT', 'BOOLEAN', 'INT'] + 'DECIMAL', 'DOUBLE', 'DOUBLE PRECISION', 'INTEGER', 'NUMERIC', 'REAL', 'SMALLINT', 'TINYINT', 'BOOLEAN', + 'INT', 'INTEGER'] + +# Sql types that are compatible with SparkML modeling +SQL_MODELING_TYPES = {'CHAR', 'LONG VARCHAR', 'VARCHAR','CLOB', 'TEXT','BIGINT', 'DECIMAL', 'DOUBLE', 'INTEGER', + 'DOUBLE PRECISION', 'INTEGER', 'NUMERIC', 'REAL', 'SMALLINT', 'TINYINT', 'BOOLEAN', 'INT'} diff --git a/splicemachine/spark/context.py b/splicemachine/spark/context.py index 8b7e003..a715afe 100755 --- a/splicemachine/spark/context.py +++ b/splicemachine/spark/context.py @@ -97,6 +97,18 @@ def toUpper(self, dataframe): # You need to re-generate the dataframe for the capital letters to take effect return dataframe.rdd.toDF(dataframe.schema) + def toLower(self, dataframe): + """ + Returns a dataframe with all of the columns in lowercase + + :param dataframe: (Dataframe) The dataframe to convert to lowercase + """ + for s in dataframe.schema: + s.name = s.name.lower() + # You need to re-generate the dataframe for the capital letters to take effect + return dataframe.rdd.toDF(dataframe.schema) + + def replaceDataframeSchema(self, dataframe, schema_table_name): """ Returns a dataframe with all column names replaced with the proper string case from the DB table @@ -208,7 +220,7 @@ def dropTable(self, schema_and_or_table_name, table_name=None): else: self.context.dropTable(schema_and_or_table_name) - def df(self, sql): + def df(self, sql, to_lower=False): """ Return a Spark Dataframe from the results of a Splice Machine SQL Query @@ -218,9 +230,11 @@ def df(self, sql): df = splice.df('SELECT * FROM MYSCHEMA.TABLE1 WHERE COL2 > 3') :param sql: (str) SQL Query (eg. SELECT * FROM table1 WHERE col2 > 3) + :param to_lower: Whether or not to convert column names from the dataframe to lowercase :return: (Dataframe) A Spark DataFrame containing the results """ - return DataFrame(self.context.df(sql), self.spark_sql_context) + df = DataFrame(self.context.df(sql), self.spark_sql_context) + return self.toLower(df) if to_lower else df def insert(self, dataframe, schema_table_name, to_upper=True, create_table=False): """