Skip to content
This repository has been archived by the owner on Apr 15, 2022. It is now read-only.

Commit

Permalink
Dbaas 5219 - Pipeline feature aggregation support (#125)
Browse files Browse the repository at this point in the history
* initial code for source in feature store

* need to test

* working backfill and pipeline functions. Need to test for correctness

* name -> names

* delete source

* copied code from create_source by accident

* cleanup on docker-compose

* test feature store

* fixes from testing

* scope creep: fixes to routes and cleanup

* image tags

* source and pipeline source combined

* missing a max on timestamp for pipeline run

* removed egg build, added back fk constraint
  • Loading branch information
Ben Epstein authored Apr 6, 2021
1 parent 00ec0bb commit 98c8b7c
Show file tree
Hide file tree
Showing 14 changed files with 1,195 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ def _register_training_set(self, key_vals: Dict[str,str]):

# Create training set
ts = TrainingSet(
view_id=view_id,
view_id=int(view_id),
name=key_vals['splice.feature_store.training_set'],
last_update_username=self.request_user
)
Expand Down Expand Up @@ -443,7 +443,7 @@ def _register_model_deployment(self, ts: TrainingSet, key_vals: Dict[str,str]):
DatabaseSQL.add_feature_store_deployment.format(**deployment)
)

def _validate_training_view(self, view_id):
def _validate_training_view(self, view_id: int):
"""
Validates that a particular training view exists. If a run in mlflow was training using a training set from
the feature store, and that training set was taken from a training view that no longer exists, we cannot
Expand All @@ -452,7 +452,7 @@ def _validate_training_view(self, view_id):
:param view_id: The view ID
"""
tvw: TrainingView = self.session.query(TrainingView)\
.filter_by(view_id=view_id).one_or_none()
.filter(TrainingView.view_id==view_id).first()
if not tvw:
raise Exception(f"The training view (id:{view_id}) used for this run has been deleted."
f" You cannot deploy a model that was trained using a training view that no longer exists.")
Expand Down Expand Up @@ -486,7 +486,7 @@ def register_feature_store_deployment(self):
# We need to ensure this view hasn't been deleted since the time this run was created
if eval(key_vals['splice.feature_store.training_view_id']): # returns 'None' not None so need eval
self.logger.info('Validating that the Training View still exists')
self._validate_training_view(key_vals['splice.feature_store.training_view_id'])
self._validate_training_view(int(key_vals['splice.feature_store.training_view_id']))

ts: TrainingSet = self._register_training_set(key_vals)
self.logger.info(f"Done. Gathering individual features...", send_db=True)
Expand Down
11 changes: 7 additions & 4 deletions feature_store/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,6 @@ RUN \
curl -kLs http://repository.splicemachine.com/nexus/content/groups/public/com/splicemachine/db-client/$sm_version/db-client-$sm_version.jar -o $SRC_HOME/lib/db-client.jar && \
curl -kLs http://repository.splicemachine.com/nexus/content/groups/public/com/splicemachine/db-client/$sm_version/db-client-$sm_version-sources.jar -o $SRC_HOME/lib/db-client-sources.jar

COPY shared /tmp/shared
RUN pip3 install /tmp/shared/.

ENV CLASSPATH=$SRC_HOME/lib/*:/usr/local/share/py4j/py4j0.10.8.1.jar

RUN wget -q \
Expand All @@ -65,7 +62,7 @@ RUN wget -q \
&& bash Miniconda3-latest-Linux-x86_64.sh -b \
&& rm -f Miniconda3-latest-Linux-x86_64.sh

ENV PATH="/root/miniconda3/bin:${PATH}"
ENV PATH="${PATH}:/root/miniconda3/bin"
# Install Python dependencies

RUN conda install -c anaconda postgresql
Expand All @@ -74,6 +71,12 @@ RUN adduser postgres && \
usermod -aG root postgres && \
ln -s /root/miniconda3/bin/pg_ctl /usr/bin/pg_ctl

RUN alias pip3=/usr/bin/pip3 && \
alias python3=/usr/bin/python3


COPY shared /tmp/shared
RUN pip3 install /tmp/shared/.

# Copy Source into Container
COPY feature_store/src $SRC_HOME
Expand Down
8 changes: 4 additions & 4 deletions feature_store/docs/Feature Store Metadata.ndm2
Original file line number Diff line number Diff line change
Expand Up @@ -5029,7 +5029,7 @@
},
{
"objectType": "TableField_MYSQL",
"name": "feature_prefix_name",
"name": "feature_name_prefix",
"type": "varchar",
"length": 128,
"decimals": 0,
Expand All @@ -5051,7 +5051,7 @@
"isGeneratedAlways": false,
"virtualExpr": "",
"virtualType": "",
"oldName": "feature_prefix_name"
"oldName": "feature_name_prefix"
},
{
"objectType": "TableField_MYSQL",
Expand Down Expand Up @@ -5257,7 +5257,7 @@
},
{
"objectType": "IndexField_MYSQL",
"name": "feature_prefix_name",
"name": "feature_name_prefix",
"keyLength": 0,
"order": "",
"oldName": ""
Expand Down Expand Up @@ -7555,4 +7555,4 @@
"viewRelations": []
}
]
}
}
27 changes: 25 additions & 2 deletions feature_store/src/rest_api/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,17 @@ class SQL:
({{model_schema_name}}, {{model_table_name}}, {{model_start_ts}}, {{model_end_ts}}, {{feature_id}}, {{feature_cardinality}}, {{feature_histogram}}, {{feature_mean}}, {{feature_median}}, {{feature_count}}, {{feature_stddev}})
"""

backfill_timestamps = """
SELECT ASOF_TS
FROM new "com.splicemachine.fs_functions.TimestampGeneratorVTI"('{backfill_start_time}','{pipeline_start_time}',{window_value},{window_length})
t (asof_ts TIMESTAMP, until_ts TIMESTAMP)
"""

pipeline = f"""
INSERT INTO {FEATURE_STORE_SCHEMA}.pipeline(feature_set_id, source_id, pipeline_start_ts, pipeline_interval, backfill_start_ts, backfill_interval, pipeline_url)
VALUES ({{feature_set_id}}, {{source_id}}, '{{pipeline_start_ts}}', '{{pipeline_interval}}', '{{backfill_start_ts}}', '{{backfill_interval}}', '{{pipeline_url}}')
"""

class Columns:
feature = ['feature_id', 'feature_set_id', 'name', 'description', 'feature_data_type', 'feature_type',
'tags', 'compliance_level', 'last_update_ts', 'last_update_username']
Expand All @@ -35,5 +46,17 @@ class Columns:
SQL_TYPES = ['CHAR', 'LONG VARCHAR', 'VARCHAR', 'DATE', 'TIME', 'TIMESTAMP', 'BLOB', 'CLOB', 'TEXT', 'BIGINT',
'DECIMAL', 'DOUBLE', 'DOUBLE PRECISION', 'FLOAT', 'INTEGER', 'NUMERIC', 'REAL', 'SMALLINT', 'TINYINT', 'BOOLEAN',
'INT']
SQLALCHEMY_TYPES = dict(zip(SQL_TYPES, [CHAR, VARCHAR, VARCHAR, DATE, TIME, TIMESTAMP, BLOB, CLOB, TEXT, BIGINT,
DECIMAL, FLOAT, FLOAT, FLOAT, INTEGER, NUMERIC, REAL, SMALLINT, SMALLINT, BOOLEAN, INTEGER]))

SQLALCHEMY_TYPES = [CHAR, VARCHAR, VARCHAR, DATE, TIME, TIMESTAMP, BLOB, CLOB, TEXT, BIGINT, DECIMAL, FLOAT, FLOAT,
FLOAT, INTEGER, NUMERIC, REAL, SMALLINT, SMALLINT, BOOLEAN, INTEGER]

# class SQLALCHEMY_TYPES:
# mapping = dict(zip(SQL_TYPES, [CHAR, VARCHAR, VARCHAR, DATE, TIME, TIMESTAMP, BLOB, CLOB, TEXT, BIGINT,
# DECIMAL, FLOAT, FLOAT, FLOAT, INTEGER, NUMERIC, REAL, SMALLINT, SMALLINT, BOOLEAN, INTEGER]))
# @staticmethod
# def _get(item: str):
# return SQLALCHEMY_TYPES.mapping[item.split('(')[0]]

SQL_TO_SQLALCHEMY = dict(
zip(SQL_TYPES,SQLALCHEMY_TYPES)
)
Loading

0 comments on commit 98c8b7c

Please sign in to comment.