Skip to content
This repository has been archived by the owner on Apr 15, 2022. It is now read-only.

Commit

Permalink
Feature store api cleanup (#96)
Browse files Browse the repository at this point in the history
* cleanup of api

* add context (primary) key to describe feature sets

* optional verbose to print sql in create_training_context

* added get_feature_dataset

* comments

* old code

* i hate upppercase

* commment

* sql format

* i still hate uppercase

* null tx

* sql format

* sql format

* docs

* docs

* docs

* docs

* docs

* docs

* docs

* docs

* docs

* docs

* docs

* docs

* docs

* docs

* docs

* verbose

* docs

* docs

* column ordering

* feature param cleanup

* training context features

* removed clean_df

* to_lower

* docs

* better logic

* better logic

* label column validation

* refactor TrainingContext -> TrainingView, Feature Set Context Key -> Feature Set Join Key

* missed one

* exclude 2 more funcs

* docs

* as list

* missed some more

* hashable

* pep

* docs

* docs

* handleinvalid keep

* feature_vector_sql

* get-features_by_name requires names

* exclude members

* return Feature, docs fix
  • Loading branch information
Ben Epstein authored Jan 5, 2021
1 parent 66d0ff9 commit 7660e39
Show file tree
Hide file tree
Showing 18 changed files with 610 additions and 241 deletions.
2 changes: 1 addition & 1 deletion .readthedocs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ formats:
python:
version: 3.7
install:
- requirements: requirements.txt
- requirements: requirements-docs.txt
2 changes: 1 addition & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
'private-members':True,
'inherited-members':True,
'undoc-members': False,
'exclude-members': '_check_for_splice_ctx,_dropTableIfExists, _generateDBSchema,_getCreateTableSchema,_jstructtype,_spliceSparkPackagesName,_splicemachineContext,apply_patches, main'
'exclude-members': '_validate_feature_vector_keys,_process_features,__prune_features_for_elimination,_register_metadata,_register_metadata,__update_deployment_status,__log_mlflow_results,__get_feature_importance,__get_pipeline,_validate_training_view,_validate_feature_set,_validate_feature,__validate_feature_data_type,_check_for_splice_ctx,_dropTableIfExists, _generateDBSchema,_getCreateTableSchema,_jstructtype,_spliceSparkPackagesName,_splicemachineContext,apply_patches, main'
}

# Add any paths that contain templates here, relative to this directory.
Expand Down
34 changes: 28 additions & 6 deletions docs/getting-started.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,17 @@ If you are running inside of the Splice Machine Cloud Service in a Jupyter Noteb
External Installation
---------------------

If you would like to install outside of the K8s cluster (and use the ExtPySpliceContext), you can install with
If you would like to install outside of the K8s cluster (and use the ExtPySpliceContext), you can install the stable build with

.. code-block:: sh
sudo pip install pysplice
sudo pip install git+http://www.github.com/splicemachine/[email protected]
Or latest with

.. code-block:: sh
sudo pip install git+http://www.github.com/splicemachine/pysplice
Usage
-----
Expand All @@ -28,24 +34,40 @@ This section covers importing and instantiating the Native Spark DataSource

.. tab:: Native Spark DataSource

To use the Native Spark DataSource inside of the cloud service, first create a Spark Session and then import your PySpliceContext
To use the Native Spark DataSource inside of the `cloud service<https://cloud.splicemachine.io/register?utm_source=pydocs&utm_medium=header&utm_campaign=sandbox>`_., first create a Spark Session and then import your PySpliceContext

.. code-block:: Python
from pyspark.sql import SparkSession
from splicemachine.spark import PySpliceContext
from splicemachine.mlflow_support import * # Connects your MLflow session automatically
from splicemachine.features import FeatureStore # Splice Machine Feature Store
spark = SparkSession.builder.getOrCreate()
splice = PySpliceContext(spark)
splice = PySpliceContext(spark) # The Native Spark Datasource (PySpliceContext) takes a Spark Session
fs = FeatureStore(splice) # Create your Feature Store
mlflow.register_splice_context(splice) # Gives mlflow native DB connection
mlflow.register_feature_store(fs) # Tracks Feature Store work in Mlflow automatically
.. tab:: External Native Spark DataSource

To use the External Native Spark DataSource, create a Spark Session with your external Jars configured. Then, import your ExtPySpliceContext and set the necessary parameters
To use the External Native Spark DataSource, create a Spark Session with your external Jars configured. Then, import your ExtPySpliceContext and set the necessary parameters.
Once created, the functionality is identical to the internal Native Spark Datasource (PySpliceContext)

.. code-block:: Python
from pyspark.sql import SparkSession
from splicemachine.spark import ExtPySpliceContext
from splicemachine.mlflow_support import * # Connects your MLflow session automatically
from splicemachine.features import FeatureStore # Splice Machine Feature Store
spark = SparkSession.builder.config('spark.jars', '/path/to/splice_spark2-3.0.0.1962-SNAPSHOT-shaded.jar').config('spark.driver.extraClassPath', 'path/to/Splice/jars/dir/*').getOrCreate()
JDBC_URL = '' #Set your JDBC URL here. You can get this from the Cloud Manager UI. Make sure to append ';user=<USERNAME>;password=<PASSWORD>' after ';ssl=basic' so you can authenticate in
kafka_server = 'kafka-broker-0-' + JDBC_URL.split('jdbc:splice://jdbc-')[1].split(':1527')[0] + ':19092' # Formatting kafka URL from JDBC
# The ExtPySpliceContext communicates with the database via Kafka
kafka_server = 'kafka-broker-0-' + JDBC_URL.split('jdbc:splice://jdbc-')[1].split(':1527')[0] + ':19092' # Formatting kafka URL from JDBC
splice = ExtPySpliceContext(spark, JDBC_URL=JDBC_URL, kafkaServers=kafka_server)
fs = FeatureStore(splice) # Create your Feature Store
mlflow.register_splice_context(splice) # Gives mlflow native DB connection
mlflow.register_feature_store(fs) # Tracks Feature Store work in Mlflow automatically
24 changes: 16 additions & 8 deletions docs/splicemachine.features.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ Submodules
:undoc-members:
:show-inheritance:

.. automodule:: splicemachine.features.training_context
.. automodule:: splicemachine.features.training_view
:members:
:undoc-members:
:show-inheritance:

splicemachine.features.feature_store
----------------------------------

This Module contains the classes adn APIs for interacting with the Splice Machine Feature Store.
This Module contains the classes and APIs for interacting with the Splice Machine Feature Store.

.. automodule:: splicemachine.features.feature_store
:members:
Expand All @@ -37,30 +37,38 @@ This Module contains the classes adn APIs for interacting with the Splice Machin
splicemachine.features.feature_set
----------------------------------

This describes the Python representation of a Feature Set. A feature set is a database table that contains Features and their metadata
This describes the Python representation of a Feature Set. A feature set is a database table that contains Features and their metadata.
The Feature Set class is mostly used internally but can be used by the user to see the available Features in the given
Feature Set, to see the table and schema name it is deployed to (if it is deployed), and to deploy the feature set
(which can also be done directly through the Feature Store). Feature Sets are unique by their schema.table name, as they
exist in the Splice Machine database as a SQL table. They are case insensitive.
To see the full contents of your Feature Set, you can print, return, or .__dict__ your Feature Set object.

.. automodule:: splicemachine.features.feature_set
:members:
:undoc-members:
:show-inheritance:


splicemachine.features.Feature
----------------------------------

This describes the Python representation of a Feature. A feature is a column of a table with particular metadata
This describes the Python representation of a Feature. A Feature is a column of a Feature Set table with particular metadata.
A Feature is the smallest unit in the Feature Store, and each Feature within a Feature Set is individually tracked for changes
to enable full time travel and point-in-time consistent training datasets. Features' names are unique and case insensitive.
To see the full contents of your Feature, you can print, return, or .__dict__ your Feature object.

.. automodule:: splicemachine.features.feature
:members:
:undoc-members:
:show-inheritance:

splicemachine.features.training_context
splicemachine.features.training_view
----------------------------------

This describes the Python representation of a Training Context. A Training Context is a SQL statement defining an event of interest, and metadata around how to create a training dataset with that context
This describes the Python representation of a Training View. A Training View is a SQL statement defining an event of interest, and metadata around how to create a training dataset with that view.
To see the full contents of your Training View, you can print, return, or .__dict__ your Training View object.

.. automodule:: splicemachine.features.training_context
.. automodule:: splicemachine.features.training_view
:members:
:undoc-members:
:show-inheritance:
Expand Down
19 changes: 19 additions & 0 deletions requirements-docs.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
py4j==0.10.7.0
pytest
mlflow==1.8.0
pyyaml==5.3.1
mleap==0.15.0
graphviz==0.13
requests
gorilla==0.3.0
tqdm==4.43.0
pyspark-dist-explore==0.1.8
numpy==1.18.2
pandas==1.0.3
scipy==1.4.1
tensorflow==2.2.1
pyspark
h2o-pysparkling-2.4==3.28.1.2-1
sphinx-tabs
IPython
cloudpickle==1.6.0
1 change: 0 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,5 @@ scipy==1.4.1
tensorflow==2.2.1
pyspark
h2o-pysparkling-2.4==3.28.1.2-1
sphinx-tabs
IPython
cloudpickle==1.6.0
4 changes: 2 additions & 2 deletions splicemachine.egg-info/SOURCES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ splicemachine/features/constants.py
splicemachine/features/feature.py
splicemachine/features/feature_set.py
splicemachine/features/feature_store.py
splicemachine/features/training_context.py
splicemachine/features/training_view.py
splicemachine/features/utils.py
splicemachine/mlflow_support/__init__.py
splicemachine/mlflow_support/constants.py
Expand All @@ -25,4 +25,4 @@ splicemachine/spark/constants.py
splicemachine/spark/context.py
splicemachine/spark/test/__init__.py
splicemachine/spark/test/context_it.py
splicemachine/spark/test/resources/__init__.py
splicemachine/spark/test/resources/__init__.py
1 change: 1 addition & 0 deletions splicemachine/features/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .feature import Feature
from .feature_set import FeatureSet
from .feature_store import FeatureStore
from .constants import FeatureType
65 changes: 42 additions & 23 deletions splicemachine/features/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,11 @@ class SQL:
"""

get_features_by_name = f"""
select feature_id,feature_set_id,Name,Description,feature_data_type, feature_type,Tags,compliance_level,
last_update_ts,last_update_username from {FEATURE_STORE_SCHEMA}.feature where Name in ({{feature_names}})
select fset.schema_name,fset.table_name,f.Name,f.Description,f.feature_data_type,f.feature_type,f.Tags,
f.compliance_level,f.last_update_ts,f.last_update_username,f.feature_id,f.feature_set_id
from {FEATURE_STORE_SCHEMA}.feature f
join {FEATURE_STORE_SCHEMA}.feature_set fset on f.feature_set_id=fset.feature_set_id
where {{where}}
"""

get_features_in_feature_set = f"""
Expand All @@ -72,21 +75,33 @@ class SQL:
FROM {FEATURE_STORE_SCHEMA}.feature_set_key GROUP BY 1) p
ON fset.feature_set_id=p.feature_set_id
"""
get_training_contexts = f"""
SELECT tc.context_id, tc.Name, tc.Description, CAST(SQL_text AS VARCHAR(1000)) context_sql,

get_training_views = f"""
SELECT tc.view_id, tc.Name, tc.Description, CAST(SQL_text AS VARCHAR(1000)) view_sql,
p.pk_columns,
ts_column, label_column,
c.context_columns
FROM {FEATURE_STORE_SCHEMA}.training_context tc
c.join_columns
FROM {FEATURE_STORE_SCHEMA}.training_view tc
INNER JOIN
(SELECT context_id, STRING_AGG(key_column_name,',') pk_columns FROM {FEATURE_STORE_SCHEMA}.training_context_key WHERE key_type='P' GROUP BY 1) p ON tc.context_id=p.context_id
(SELECT view_id, STRING_AGG(key_column_name,',') pk_columns FROM {FEATURE_STORE_SCHEMA}.training_view_key WHERE key_type='P' GROUP BY 1) p ON tc.view_id=p.view_id
INNER JOIN
(SELECT context_id, STRING_AGG(key_column_name,',') context_columns FROM {FEATURE_STORE_SCHEMA}.training_context_key WHERE key_type='C' GROUP BY 1) c ON tc.context_id=c.context_id
(SELECT view_id, STRING_AGG(key_column_name,',') join_columns FROM {FEATURE_STORE_SCHEMA}.training_view_key WHERE key_type='J' GROUP BY 1) c ON tc.view_id=c.view_id
"""

get_feature_set_join_keys = f"""
SELECT fset.feature_set_id, schema_name, table_name, pk_columns FROM {FEATURE_STORE_SCHEMA}.feature_set fset
INNER JOIN
(
SELECT feature_set_id, STRING_AGG(key_column_name,'|') pk_columns, STRING_AGG(key_column_data_type,'|') pk_types
FROM {FEATURE_STORE_SCHEMA}.feature_set_key GROUP BY 1
) p
ON fset.feature_set_id=p.feature_set_id
where fset.feature_set_id in (select feature_set_id from {FEATURE_STORE_SCHEMA}.feature where name in {{names}} )
"""

get_all_features = f"SELECT NAME FROM {FEATURE_STORE_SCHEMA}.feature WHERE Name='{{name}}'"

get_available_features = f"""
get_training_view_features = f"""
SELECT f.feature_id, f.feature_set_id, f.NAME, f.DESCRIPTION, f.feature_data_type, f.feature_type, f.TAGS, f.compliance_level, f.last_update_ts, f.last_update_username
FROM {FEATURE_STORE_SCHEMA}.Feature f
WHERE feature_id IN
Expand All @@ -96,11 +111,11 @@ class SQL:
(
SELECT feature_id FROM
(
SELECT f.feature_id, fsk.KeyCount, count(distinct fsk.key_column_name) ContextKeyMatchCount
SELECT f.feature_id, fsk.KeyCount, count(distinct fsk.key_column_name) JoinKeyMatchCount
FROM
{FEATURE_STORE_SCHEMA}.training_context tc
{FEATURE_STORE_SCHEMA}.training_view tc
INNER JOIN
{FEATURE_STORE_SCHEMA}.training_context_key c ON c.context_id=tc.context_id AND c.key_type='C'
{FEATURE_STORE_SCHEMA}.training_view_key c ON c.view_id=tc.view_id AND c.key_type='J'
INNER JOIN
(
SELECT feature_set_id, key_column_name, count(*) OVER (PARTITION BY feature_set_id) KeyCount
Expand All @@ -111,36 +126,36 @@ class SQL:
WHERE {{where}}
GROUP BY 1,2
)match_keys
WHERE ContextKeyMatchCount = KeyCount
WHERE JoinKeyMatchCount = KeyCount
)fl
)
"""

training_context = f"""
INSERT INTO {FEATURE_STORE_SCHEMA}.training_context (Name, Description, SQL_text, ts_column, label_column)
training_view = f"""
INSERT INTO {FEATURE_STORE_SCHEMA}.training_view (Name, Description, SQL_text, ts_column, label_column)
VALUES ('{{name}}', '{{desc}}', '{{sql_text}}', '{{ts_col}}', {{label_col}})
"""

get_training_context_id = f"""
SELECT context_id from {FEATURE_STORE_SCHEMA}.Training_Context where Name='{{name}}'
get_training_view_id = f"""
SELECT view_id from {FEATURE_STORE_SCHEMA}.training_view where Name='{{name}}'
"""

get_fset_primary_keys = f"""
select distinct key_column_name from {FEATURE_STORE_SCHEMA}.Feature_Set_Key
"""

training_context_keys = f"""
INSERT INTO {FEATURE_STORE_SCHEMA}.training_context_key (Context_ID, Key_Column_Name, Key_Type)
VALUES ({{context_id}}, '{{key_column}}', '{{key_type}}' )
training_view_keys = f"""
INSERT INTO {FEATURE_STORE_SCHEMA}.training_view_key (View_ID, Key_Column_Name, Key_Type)
VALUES ({{view_id}}, '{{key_column}}', '{{key_type}}' )
"""

update_fset_deployment_status = f"""
UPDATE {FEATURE_STORE_SCHEMA}.feature_set set deployed={{status}} where feature_set_id = {{feature_set_id}}
"""

training_set = f"""
INSERT INTO {FEATURE_STORE_SCHEMA}.training_set (name, context_id )
VALUES ('{{name}}', {{context_id}})
INSERT INTO {FEATURE_STORE_SCHEMA}.training_set (name, view_id )
VALUES ('{{name}}', {{view_id}})
"""

get_training_set_id = f"""
Expand Down Expand Up @@ -171,10 +186,14 @@ class SQL:
({{model_schema_name}}, {{model_table_name}}, {{model_start_ts}}, {{model_end_ts}}, {{feature_id}}, {{feature_cardinality}}, {{feature_histogram}}, {{feature_mean}}, {{feature_median}}, {{feature_count}}, {{feature_stddev}})
"""

get_feature_vector = """
SELECT {feature_names} FROM {feature_sets} WHERE
"""

class Columns:
feature = ['feature_id', 'feature_set_id', 'name', 'description', 'feature_data_type', 'feature_type',
'tags', 'compliance_level', 'last_update_ts', 'last_update_username']
training_context = ['context_id','name','description','context_sql','pk_columns','ts_column','label_column','context_columns']
training_view = ['view_id','name','description','view_sql','pk_columns','ts_column','label_column','join_columns']
feature_set = ['feature_set_id', 'table_name', 'schema_name', 'description', 'pk_columns', 'pk_types', 'deployed']
history_table_pk = ['ASOF_TS','UNTIL_TS']

23 changes: 21 additions & 2 deletions splicemachine/features/feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,31 @@ def __init__(self, *, name, description, feature_data_type, feature_type, tags,
self.__dict__.update(args)

def is_categorical(self):
"""
Returns if the type of this feature is categorical
"""
return self.feature_type == FeatureType.categorical

def is_continuous(self):
"""
Returns if the type of this feature is continuous
"""
return self.feature_type == FeatureType.continuous

def is_ordinal(self):
"""
Returns if the type of this feature is ordinal
"""
return self.feature_type == FeatureType.ordinal

def _register_metadata(self, splice):
"""
Registers the feature's existence in the feature store
:return: None
"""
feature_sql = SQL.feature_metadata.format(
feature_set_id=self.feature_set_id, name=self.name, desc=self.description,
feature_data_type=self.feature_data_type,
feature_type=self.feature_type, tags=','.join(self.tags)
feature_type=self.feature_type, tags=','.join(self.tags) if isinstance(self.tags, list) else self.tags
)
splice.execute(feature_sql)

Expand All @@ -43,8 +51,19 @@ def __eq__(self, other):

def __repr__(self):
return self.__str__()

def __str__(self):
return f'Feature(FeatureID={self.__dict__.get("feature_id","None")}, ' \
f'FeatureSetID={self.__dict__.get("feature_set_id","None")}, Name={self.name}, \n' \
f'Description={self.description}, FeatureDataType={self.feature_data_type}, ' \
f'FeatureType={self.feature_type}, Tags={self.tags})\n'

def __hash__(self):
return hash(repr(self))

def __lt__(self, other):
if isinstance(other, str):
return self.name < other
elif isinstance(other, Feature):
return self.name < other.name
raise TypeError(f"< not supported between instances of Feature and {type(other)}")
Loading

0 comments on commit 7660e39

Please sign in to comment.