diff --git a/CODEOWNERS b/CODEOWNERS index 3483813..e5e0824 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -1,4 +1,4 @@ #Global-Reviewers #* @splicemachine/splice-cloudops -* @bklo94 @Ben-Epstein @edriggers @jhoule-splice @jramineni @njnygaard @patricksplice @splicemaahs @troysplice @abaveja313 +* @bklo94 @Ben-Epstein @edriggers @jramineni @myles-novick @splicemaahs @troysplice @abaveja313 diff --git a/splicemachine/features/constants.py b/splicemachine/features/constants.py index 3f91b4f..7f7bdeb 100644 --- a/splicemachine/features/constants.py +++ b/splicemachine/features/constants.py @@ -96,7 +96,8 @@ class SQL: FROM {FEATURE_STORE_SCHEMA}.feature_set_key GROUP BY 1 ) p ON fset.feature_set_id=p.feature_set_id - where fset.feature_set_id in (select feature_set_id from {FEATURE_STORE_SCHEMA}.feature where name in {{names}} ) + WHERE fset.feature_set_id in (select feature_set_id from {FEATURE_STORE_SCHEMA}.feature where name in {{names}} ) + ORDER BY schema_name, table_name """ get_all_features = f"SELECT NAME FROM {FEATURE_STORE_SCHEMA}.feature WHERE Name='{{name}}'" diff --git a/splicemachine/features/feature_set.py b/splicemachine/features/feature_set.py index e74c25a..faa5571 100644 --- a/splicemachine/features/feature_set.py +++ b/splicemachine/features/feature_set.py @@ -1,6 +1,5 @@ from splicemachine.features import Feature from .constants import SQL, Columns -from .utils import clean_df from splicemachine.spark import PySpliceContext from typing import List, Dict @@ -34,8 +33,8 @@ def get_features(self) -> List[Feature]: """ features = [] if self.feature_set_id: - features_df = self.splice_ctx.df(SQL.get_features_in_feature_set.format(feature_set_id=self.feature_set_id)) - features_df = clean_df(features_df, Columns.feature).collect() + features_df = self.splice_ctx.df(SQL.get_features_in_feature_set.format(feature_set_id=self.feature_set_id), + to_lower=True).collect() for f in features_df: f = f.asDict() features.append(Feature(**f)) @@ -134,7 +133,8 @@ def deploy(self, verbose=False): def __eq__(self, other): if isinstance(other, FeatureSet): - return self.table_name == other.table_name and self.schema_name == other.schema_name + return self.table_name.lower() == other.table_name.lower() and \ + self.schema_name.lower() == other.schema_name.lower() return False def __repr__(self): diff --git a/splicemachine/features/feature_store.py b/splicemachine/features/feature_store.py index e9c121b..90bb081 100644 --- a/splicemachine/features/feature_store.py +++ b/splicemachine/features/feature_store.py @@ -16,7 +16,8 @@ from splicemachine.spark import PySpliceContext from splicemachine.features import Feature, FeatureSet from .training_set import TrainingSet -from .utils import dict_to_lower +from .utils import (dict_to_lower, _generate_training_set_history_sql, + _generate_training_set_sql, _create_temp_training_view) from .constants import SQL, FeatureType from .training_view import TrainingView @@ -64,75 +65,6 @@ def get_feature_sets(self, feature_set_ids: List[int] = None, _filter: Dict[str, feature_sets.append(FeatureSet(splice_ctx=self.splice_ctx, **d)) return feature_sets - def get_training_set(self, features: Union[List[Feature], List[str]]) -> SparkDF: - """ - Gets a set of feature values across feature sets that is not time dependent (ie for non time series clustering). - This feature dataset will be treated and tracked implicitly the same way a training_dataset is tracked from - :py:meth:`features.FeatureStore.get_training_set` . The dataset's metadata and features used will be tracked in mlflow automatically (see - get_training_set for more details). - - :param features: List of Features or strings of feature names - - :NOTE: - .. code-block:: text - - The Features Sets which the list of Features come from must have common join keys, - otherwise the function will fail. If there is no common join key, it is recommended to - create a Training View to specify the join conditions. - - :return: Spark DF - """ - features = self._process_features(features) - - sql = SQL.get_feature_set_join_keys.format(names=tuple([f.name for f in features])) - fset_keys: pd.DataFrame = self.splice_ctx.df(sql).toPandas() - # Get max number of pk (join) columns from all feature sets - fset_keys['PK_COLUMNS_COUNT'] = fset_keys['PK_COLUMNS'].apply(lambda x: len(x.split('|'))) - # Get "anchor" feature set. The one we will use to try to join to all others - ind = fset_keys['PK_COLUMNS_COUNT'].idxmax() - anchor_series = fset_keys.iloc[ind] - # Remove that from the list - fset_keys.drop(index=ind, inplace=True) - all_pk_cols = anchor_series.PK_COLUMNS.split('|') - # For each feature set, assert that all join keys exist in our "anchor" feature set - fset_keys['can_join'] = fset_keys['PK_COLUMNS'].map(lambda x: set(x.split('|')).issubset(all_pk_cols)) - if not fset_keys['can_join'].all(): - bad_feature_set_ids = [t.FEATURE_SET_ID for _, t in fset_keys[fset_keys['can_join'] != True].iterrows()] - bad_features = [f.name for f in features if f.feature_set_id in bad_feature_set_ids] - raise SpliceMachineException(f"The provided features do not have a common join key." - f"Remove features {bad_features} from your request") - - # SELECT clause - sql = 'SELECT ' - - sql += ','.join([f'fset{feature.feature_set_id}.{feature.name}' for feature in features]) - - alias = f'fset{anchor_series.FEATURE_SET_ID}' # We use this a lot for joins - sql += f'\nFROM {anchor_series.SCHEMA_NAME}.{anchor_series.TABLE_NAME} {alias} ' - - # JOIN clause - for _, fset in fset_keys.iterrows(): - # Join Feature Set - sql += f'\nLEFT OUTER JOIN {fset.SCHEMA_NAME}.{fset.TABLE_NAME} fset{fset.FEATURE_SET_ID} \n\tON ' - for ind, pkcol in enumerate(fset.PK_COLUMNS.split('|')): - if ind > 0: sql += ' AND ' # In case of multiple columns - sql += f'fset{fset.FEATURE_SET_ID}.{pkcol}={alias}.{pkcol}' - - # Link this to mlflow for model deployment - # Here we create a null training view and pass it into the training set. We do this because this special kind - # of training set isn't standard. It's not based on a training view, on primary key columns, a label column, - # or a timestamp column . This is simply a joined set of features from different feature sets. - # But we still want to track this in mlflow as a user may build and deploy a model based on this. So we pass in - # a null training view that can be tracked with a "name" (although the name is None). This is a likely case - # for (non time based) clustering use cases. - null_tx = TrainingView(pk_columns=[], ts_column=None, label_column=None, view_sql=None, name=None, - description=None) - ts = TrainingSet(training_view=null_tx, features=features) - if hasattr(self, 'mlflow_ctx'): - self.mlflow_ctx._active_training_set: TrainingSet = ts - ts._register_metadata(self.mlflow_ctx) - return self.splice_ctx.df(sql) - def remove_training_view(self, override=False): """ Note: This function is not yet implemented. @@ -228,8 +160,8 @@ def _validate_feature_vector_keys(self, join_key_values, feature_sets) -> None: missing_keys = feature_set_key_columns - join_key_values.keys() assert not missing_keys, f"The following keys were not provided and must be: {missing_keys}" - def get_feature_vector(self, features: List[Union[str,Feature]], - join_key_values: Dict[str,str], return_sql=False) -> Union[str, PandasDF]: + def get_feature_vector(self, features: List[Union[str, Feature]], + join_key_values: Dict[str, str], return_sql=False) -> Union[str, PandasDF]: """ Gets a feature vector given a list of Features and primary key values for their corresponding Feature Sets @@ -246,9 +178,9 @@ def get_feature_vector(self, features: List[Union[str,Feature]], feature_sets = self.get_feature_sets([f.feature_set_id for f in feats]) self._validate_feature_vector_keys(join_keys, feature_sets) - feature_names = ','.join([f.name for f in feats]) - fset_tables = ','.join([f'{fset.schema_name}.{fset.table_name} fset{fset.feature_set_id}' for fset in feature_sets]) + fset_tables = ','.join( + [f'{fset.schema_name}.{fset.table_name} fset{fset.feature_set_id}' for fset in feature_sets]) sql = "SELECT {feature_names} FROM {fset_tables} ".format(feature_names=feature_names, fset_tables=fset_tables) # For each Feature Set, for each primary key in the given feature set, get primary key value from the user provided dictionary @@ -260,8 +192,7 @@ def get_feature_vector(self, features: List[Union[str,Feature]], return sql if return_sql else self.splice_ctx.df(sql).toPandas() - def get_feature_vector_sql_from_training_view(self, training_view: str, features: List[Feature], - include_insert: Optional[bool] = True) -> str: + def get_feature_vector_sql_from_training_view(self, training_view: str, features: List[Feature]) -> str: """ Returns the parameterized feature retrieval SQL used for online model serving. @@ -274,7 +205,6 @@ def get_feature_vector_sql_from_training_view(self, training_view: str, features This function will error if the view SQL is missing a view key required to retrieve the\ desired features - :param include_insert: (Optional[bool]) determines whether insert into model table is included in the SQL statement :return: (str) the parameterized feature vector SQL """ @@ -282,17 +212,7 @@ def get_feature_vector_sql_from_training_view(self, training_view: str, features vid = self.get_training_view_id(training_view) tctx = self.get_training_views(_filter={'view_id': vid})[0] - # optional INSERT prefix - if (include_insert): - sql = 'INSERT INTO {target_model_table} (' - for pkcol in tctx.pk_columns: # Select primary key column(s) - sql += f'{pkcol}, ' - for feature in features: - sql += f'{feature.name}, ' # Collect all features over time - sql = sql.rstrip(', ') - sql += ')\nSELECT ' - else: - sql = 'SELECT ' + sql = 'SELECT ' # SELECT expressions for pkcol in tctx.pk_columns: # Select primary key column(s) @@ -351,6 +271,73 @@ def get_feature_description(self): # TODO raise NotImplementedError + def get_training_set(self, features: Union[List[Feature], List[str]], current_values_only: bool = False, + start_time: datetime = None, end_time: datetime = None, return_sql: bool = False) -> SparkDF: + """ + Gets a set of feature values across feature sets that is not time dependent (ie for non time series clustering). + This feature dataset will be treated and tracked implicitly the same way a training_dataset is tracked from + :py:meth:`features.FeatureStore.get_training_set` . The dataset's metadata and features used will be tracked in mlflow automatically (see + get_training_set for more details). + + The way point-in-time correctness is guaranteed here is by choosing one of the Feature Sets as the "anchor" dataset. + This means that the points in time that the query is based off of will be the points in time in which the anchor + Feature Set recorded changes. The anchor Feature Set is the Feature Set that contains the superset of all primary key + columns across all Feature Sets from all Features provided. If more than 1 Feature Set has the superset of + all Feature Sets, the Feature Set with the most primary keys is selected. If more than 1 Feature Set has the same + maximum number of primary keys, the Feature Set is chosen by alphabetical order (schema_name, table_name). + + :param features: List of Features or strings of feature names + + :NOTE: + .. code-block:: text + + The Features Sets which the list of Features come from must have common join keys, + otherwise the function will fail. If there is no common join key, it is recommended to + create a Training View to specify the join conditions. + + :param current_values_only: If you only want the most recent values of the features, set this to true. Otherwise, all history will be returned. Default False + :param start_time: How far back in history you want Feature values. If not specified (and current_values_only is False), all history will be returned. + This parameter only takes effect if current_values_only is False. + :param end_time: The most recent values for each selected Feature. This will be the cutoff time, such that any Feature values that + were updated after this point in time won't be selected. If not specified (and current_values_only is False), + Feature values up to the moment in time you call the function (now) will be retrieved. This parameter + only takes effect if current_values_only is False. + :return: Spark DF + """ + # Get List[Feature] + features = self._process_features(features) + + # Get the Feature Sets + fsets = self.get_feature_sets(list({f.feature_set_id for f in features})) + + if current_values_only: + sql = _generate_training_set_sql(features, fsets) + else: + temp_vw = _create_temp_training_view(features, fsets) + sql = _generate_training_set_history_sql(temp_vw, features, fsets, start_time=start_time, end_time=end_time) + + + # Here we create a null training view and pass it into the training set. We do this because this special kind + # of training set isn't standard. It's not based on a training view, on primary key columns, a label column, + # or a timestamp column . This is simply a joined set of features from different feature sets. + # But we still want to track this in mlflow as a user may build and deploy a model based on this. So we pass in + # a null training view that can be tracked with a "name" (although the name is None). This is a likely case + # for (non time based) clustering use cases. + null_tvw = TrainingView(pk_columns=[], ts_column=None, label_column=None, view_sql=None, name=None, + description=None) + ts = TrainingSet(training_view=null_tvw, features=features, start_time=start_time, end_time=end_time) + + # If the user isn't getting historical values, that means there isn't really a start_time, as the user simply + # wants the most up to date values of each feature. So we set start_time to end_time (which is datetime.today) + # For metadata purposes + if current_values_only: + ts.start_time = ts.end_time + + if hasattr(self, 'mlflow_ctx'): + self.mlflow_ctx._active_training_set: TrainingSet = ts + ts._register_metadata(self.mlflow_ctx) + return sql if return_sql else self.splice_ctx.df(sql) + def get_training_set_from_view(self, training_view: str, features: Union[List[Feature], List[str]] = None, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None, return_sql: bool = False) -> SparkDF or str: @@ -394,61 +381,21 @@ def get_training_set_from_view(self, training_view: str, features: Union[List[Fe :return: Optional[SparkDF, str] The Spark dataframe of the training set or the SQL that is used to generate it (for debugging) """ + # Get features as list of Features features = self._process_features(features) if features else self.get_training_view_features(training_view) - # DB-9556 loss of column names on complex sql for NSDS - cols = [] - - # Get training view information (view primary key column(s), inference ts column, ) - tctx = self.get_training_view(training_view) - # SELECT clause - sql = 'SELECT ' - for pkcol in tctx.pk_columns: # Select primary key column(s) - sql += f'\n\tctx.{pkcol},' - cols.append(pkcol) - - sql += f'\n\tctx.{tctx.ts_column}, ' # Select timestamp column - cols.append(tctx.ts_column) - # TODO: ensure these features exist and fail gracefully if not - for feature in features: - sql += f'\n\tCOALESCE(fset{feature.feature_set_id}.{feature.name},fset{feature.feature_set_id}h.{feature.name}) {feature.name},' # Collect all features over time - cols.append(feature.name) - - sql = sql + f'\n\tctx.{tctx.label_column}' if tctx.label_column else sql.rstrip( - ',') # Select the optional label col - if tctx.label_column: cols.append(tctx.label_column) - - # FROM clause - sql += f'\nFROM ({tctx.view_sql}) ctx ' - - # JOIN clause + # Get List of necessary Feature Sets feature_set_ids = list({f.feature_set_id for f in features}) # Distinct set of IDs feature_sets = self.get_feature_sets(feature_set_ids) - for fset in feature_sets: - # Join Feature Set - sql += f'\nLEFT OUTER JOIN {fset.schema_name}.{fset.table_name} fset{fset.feature_set_id} \n\tON ' - for pkcol in fset.pk_columns: - sql += f'fset{fset.feature_set_id}.{pkcol}=ctx.{pkcol} AND ' - sql += f' ctx.{tctx.ts_column} >= fset{fset.feature_set_id}.LAST_UPDATE_TS ' - # Join Feature Set History - sql += f'\nLEFT OUTER JOIN {fset.schema_name}.{fset.table_name}_history fset{fset.feature_set_id}h \n\tON ' - for pkcol in fset.pk_columns: - sql += f' fset{fset.feature_set_id}h.{pkcol}=ctx.{pkcol} AND ' - sql += f' ctx.{tctx.ts_column} >= fset{fset.feature_set_id}h.ASOF_TS AND ctx.{tctx.ts_column} < fset{fset.feature_set_id}h.UNTIL_TS' - - # WHERE clause on optional start and end times - if start_time or end_time: - sql += '\nWHERE ' - if start_time: - sql += f"\n\tctx.{tctx.ts_column} >= '{str(start_time)}' AND" - if end_time: - sql += f"\n\tctx.{tctx.ts_column} <= '{str(end_time)}'" - sql = sql.rstrip('AND') + # Get training view information (view primary key column(s), inference ts column, ) + tvw = self.get_training_view(training_view) + # Generate the SQL needed to create the dataset + sql = _generate_training_set_history_sql(tvw, features, feature_sets, start_time=start_time, end_time=end_time) # Link this to mlflow for model deployment if hasattr(self, 'mlflow_ctx') and not return_sql: - ts = TrainingSet(training_view=tctx, features=features, + ts = TrainingSet(training_view=tvw, features=features, start_time=start_time, end_time=end_time) self.mlflow_ctx._active_training_set: TrainingSet = ts ts._register_metadata(self.mlflow_ctx) diff --git a/splicemachine/features/training_set.py b/splicemachine/features/training_set.py index 131eb4e..80bda62 100644 --- a/splicemachine/features/training_set.py +++ b/splicemachine/features/training_set.py @@ -23,6 +23,13 @@ def __init__(self, self.end_time = end_time or datetime.today() def _register_metadata(self, mlflow_ctx): + """ + Registers training set with mlflow if the user has registered the feature store in their mlflow session, + and has called either get_training_set or get_training_set_from_view before or during an mlflow run + + :param mlflow_ctx: the mlflow context + :return: None + """ if mlflow_ctx.active_run(): print("There is an active mlflow run, your training set will be logged to that run.") mlflow_ctx.lp("splice.feature_store.training_set",self.training_view.name) diff --git a/splicemachine/features/utils.py b/splicemachine/features/utils.py index 32a6254..2f4a0b5 100644 --- a/splicemachine/features/utils.py +++ b/splicemachine/features/utils.py @@ -1,6 +1,13 @@ +from splicemachine import SpliceMachineException +from typing import List +from .feature import Feature +from .feature_set import FeatureSet +from splicemachine.features.training_view import TrainingView + + def clean_df(df, cols): - for old,new in zip(df.columns, cols): - df = df.withColumnRenamed(old,new) + for old, new in zip(df.columns, cols): + df = df.withColumnRenamed(old, new) return df def dict_to_lower(dict): @@ -10,4 +17,143 @@ def dict_to_lower(dict): :param dict: The dictionary :return: The lowercased dictionary """ - return {i.lower():dict[i] for i in dict} + return {i.lower(): dict[i] for i in dict} + + +def _get_anchor_feature_set(features: List[Feature], feature_sets: List[FeatureSet]) -> FeatureSet: + """ + From a dataframe of feature set rows, where each row has columns feature_set_id, schema_name, table_name + and pk_cols where pk_cols is a pipe delimited string of Primary Key column names, + this function finds which row has the superset of all primary key columns, raising an exception if none exist + + :param fset_keys: Pandas Dataframe containing FEATURE_SET_ID, SCHEMA_NAME, TABLE_NAME, and PK_COLUMNS, which + is a | delimited string of column names + :return: FeatureSet + :raise: SpliceMachineException + """ + # Get the Feature Set with the maximum number of primary key columns as the anchor + anchor_fset = feature_sets[0] + + for fset in feature_sets: + if len(fset.pk_columns) > len(anchor_fset.pk_columns): + anchor_fset = fset + + # If Features are requested that come from Feature Sets that cannot be joined to our anchor, we will raise an + # Exception and let the user know + bad_features = [] + all_pk_cols = set(anchor_fset.pk_columns) + for fset in feature_sets: + if not set(fset.pk_columns).issubset(all_pk_cols): + bad_features += [f.name for f in features if f.feature_set_id == fset.feature_set_id] + + if bad_features: + raise SpliceMachineException(f"The provided features do not have a common join key." + f"Remove features {bad_features} from your request") + + return anchor_fset + + +def _generate_training_set_history_sql(tvw: TrainingView, features: List[Feature], + feature_sets: List[FeatureSet], start_time=None, end_time=None) -> str: + """ + Generates the SQL query for creating a training set from a TrainingView and a List of Features. + This performs the coalesces necessary to aggregate Features over time in a point-in-time consistent way + + :param tvw: The TrainingView + :param features: List[Feature] The group of Features desired to be returned + :param feature_sets: List[FeatureSets] the group of all Feature Sets of which Features are being selected + :return: str the SQL necessary to execute + """ + # SELECT clause + sql = 'SELECT ' + for pkcol in tvw.pk_columns: # Select primary key column(s) + sql += f'\n\tctx.{pkcol},' + + sql += f'\n\tctx.{tvw.ts_column}, ' # Select timestamp column + + # TODO: ensure these features exist and fail gracefully if not + for feature in features: + sql += f'\n\tCOALESCE(fset{feature.feature_set_id}.{feature.name},fset{feature.feature_set_id}h.{feature.name}) {feature.name},' # Collect all features over time + + # Select the optional label col + if tvw.label_column: + sql += f'\n\tctx.{tvw.label_column}' + else: + sql = sql.rstrip(',') + + # FROM clause + sql += f'\nFROM ({tvw.view_sql}) ctx ' + + # JOIN clause + for fset in feature_sets: + # Join Feature Set + sql += f'\nLEFT OUTER JOIN {fset.schema_name}.{fset.table_name} fset{fset.feature_set_id} \n\tON ' + for pkcol in fset.pk_columns: + sql += f'fset{fset.feature_set_id}.{pkcol}=ctx.{pkcol} AND ' + sql += f' ctx.{tvw.ts_column} >= fset{fset.feature_set_id}.LAST_UPDATE_TS ' + + # Join Feature Set History + sql += f'\nLEFT OUTER JOIN {fset.schema_name}.{fset.table_name}_history fset{fset.feature_set_id}h \n\tON ' + for pkcol in fset.pk_columns: + sql += f' fset{fset.feature_set_id}h.{pkcol}=ctx.{pkcol} AND ' + sql += f' ctx.{tvw.ts_column} >= fset{fset.feature_set_id}h.ASOF_TS AND ctx.{tvw.ts_column} < fset{fset.feature_set_id}h.UNTIL_TS' + + # WHERE clause on optional start and end times + if start_time or end_time: + sql += '\nWHERE ' + if start_time: + sql += f"\n\tctx.{tvw.ts_column} >= '{str(start_time)}' AND" + if end_time: + sql += f"\n\tctx.{tvw.ts_column} <= '{str(end_time)}'" + sql = sql.rstrip('AND') + return sql + + +def _generate_training_set_sql(features: List[Feature], feature_sets: List[FeatureSet]) -> str: + """ + Generates the SQL query for creating a training set from a List of Features (NO TrainingView). + + :param features: List[Feature] The group of Features desired to be returned + :param feature_sets: List of Feature Sets + :return: str the SQL necessary to execute + """ + anchor_fset: FeatureSet = _get_anchor_feature_set(features, feature_sets) + alias = f'fset{anchor_fset.feature_set_id}' # We use this a lot for joins + anchor_fset_schema = f'{anchor_fset.schema_name}.{anchor_fset.table_name} {alias} ' + remaining_fsets = [fset for fset in feature_sets if fset != anchor_fset] + + # SELECT clause + feature_names = ','.join([f'fset{feature.feature_set_id}.{feature.name}' for feature in features]) + # Include the pk columns of the anchor feature set + pk_cols = ','.join([f'{alias}.{pk}' for pk in anchor_fset.pk_columns]) + all_feature_columns = feature_names + ',' + pk_cols + + sql = f'SELECT {all_feature_columns} \nFROM {anchor_fset_schema}' + + # JOIN clause + for fset in remaining_fsets: + # Join Feature Set + sql += f'\nLEFT OUTER JOIN {fset.schema_name}.{fset.table_name} fset{fset.feature_set_id} \n\tON ' + for ind, pkcol in enumerate(fset.pk_columns): + if ind > 0: sql += ' AND ' # In case of multiple columns + sql += f'fset{fset.feature_set_id}.{pkcol}={alias}.{pkcol}' + return sql + + +def _create_temp_training_view(features: List[Feature], feature_sets: List[FeatureSet]) -> TrainingView: + """ + Internal function to create a temporary Training View for training set retrieval using a Feature Set. When + a user created + + :param fsets: List[FeatureSet] + :param features: List[Feature] + :return: Generated Training View + """ + anchor_fset = _get_anchor_feature_set(features, feature_sets) + anchor_pk_column_sql = ','.join(anchor_fset.pk_columns) + ts_col = 'LAST_UPDATE_TS' + schema_table_name = f'{anchor_fset.schema_name}.{anchor_fset.table_name}_history' + view_sql = f'SELECT {anchor_pk_column_sql}, ASOF_TS as {ts_col} FROM {schema_table_name}' + return TrainingView(pk_columns=anchor_fset.pk_columns, ts_column=ts_col, view_sql=view_sql, + description=None, name=None, label_column=None) +