diff --git a/scripts/build_feature_store.py b/scripts/build_feature_store.py index 858950f..2f38680 100644 --- a/scripts/build_feature_store.py +++ b/scripts/build_feature_store.py @@ -1206,7 +1206,7 @@ def apply_upgrades(baseline_building_features: DataFrame, upgrade_id: int) -> Da # DBTITLE 1,Drop rows where upgrade was not applied # read in outputs so that we can test applicability logic annual_outputs = spark.table( - "ml.surrogate_model.building_simulation_outputs_annual_tmp" + "ml.surrogate_model.building_simulation_outputs_annual" ).where(F.col("upgrade_id").isin(SUPPORTED_UPGRADES)) # drop upgrades that had no unchanged features and therefore weren't upgraded @@ -1402,8 +1402,11 @@ def transform_weather_features() -> DataFrame: # COMMAND ---------- # DBTITLE 1,Write out building metadata feature store -table_name = "ml.surrogate_model.building_features_tmp" -df = building_metadata_applicable_upgrades_with_weather_file_city_index +table_name = "ml.surrogate_model.building_features" +# TODO: remove this drop statement before retraining-- this is just a temp (yes hacky i know back off) solution to not break dohyo dowstream +df = building_metadata_applicable_upgrades_with_weather_file_city_index.drop( + "heat_pump_sizing_methodology" +) if spark.catalog.tableExists(table_name): fe.write_table(name=table_name, df=df, mode="merge") else: diff --git a/scripts/extract_data.py b/scripts/extract_data.py index c136477..a6c0b51 100644 --- a/scripts/extract_data.py +++ b/scripts/extract_data.py @@ -157,16 +157,14 @@ def extract_rastock_annual_outputs() -> DataFrame: # 1. get annual outputs for all RAStock upgrades and apply common post-processing rastock_outputs = util.get_clean_rastock_df() - # 2. apply custom sumo post-processing to align with ResStock outputs + # 2. apply custom sumo post-processing to align with ResStock outputs # cast pkeys to the right type - rastock_outputs = ( - rastock_outputs - .withColumn("building_id", F.col("building_id").cast("int")) - .withColumn("upgrade_id", F.col("upgrade_id").cast("double")) - ) + rastock_outputs = rastock_outputs.withColumn( + "building_id", F.col("building_id").cast("int") + ).withColumn("upgrade_id", F.col("upgrade_id").cast("double")) # remove irrelevant columns and rename to align with resstock - # first do some prep: - # construct the regex pattern of columns to remove: + # first do some prep: + # construct the regex pattern of columns to remove: # match all columns except for: # * pkeys # * those prefixed with "out_" followed by a modeled fuel @@ -182,11 +180,11 @@ def extract_rastock_annual_outputs() -> DataFrame: r_pkey = "".join([f"(?!{k}$)" for k in pkey_cols]) columns_to_remove_match_pattern = rf"^(?!out_({r_fuels})){r_pkey}.*" # construct the the substring replacement dict to align colnames with ResStock - replace_column_substrings_dict={ - **{f + "_": f + "__" for f in modeled_fuel_types}, - **{"natural_gas": "methane_gas", "permanent_spa": "hot_tub"}, - } - # apply reformatting to match ResStock + replace_column_substrings_dict = { + **{f + "_": f + "__" for f in modeled_fuel_types}, + **{"natural_gas": "methane_gas", "permanent_spa": "hot_tub"}, + } + # apply reformatting to match ResStock rastock_outputs_cleaned = util.clean_columns( df=rastock_outputs, remove_columns_with_substrings=[columns_to_remove_match_pattern], @@ -195,7 +193,9 @@ def extract_rastock_annual_outputs() -> DataFrame: ) # RAStock only includes sims when upgrades are applicable, so this column is missing - rastock_outputs_cleaned = rastock_outputs_cleaned.withColumn("applicability", F.lit(True)) + rastock_outputs_cleaned = rastock_outputs_cleaned.withColumn( + "applicability", F.lit(True) + ) return rastock_outputs_cleaned @@ -248,7 +248,6 @@ def extract_hourly_weather_data() -> DataFrame: ) return weather_data - # COMMAND ---------- # DBTITLE 1,Extract building metadata @@ -292,9 +291,7 @@ def extract_hourly_weather_data() -> DataFrame: # COMMAND ---------- # DBTITLE 1,Write out annual outputs -# TODO: move this back to the original once testing is complete -table_name = "ml.surrogate_model.building_simulation_outputs_annual_tmp" -# table_name = "ml.surrogate_model.building_simulation_outputs_annual" +table_name = "ml.surrogate_model.building_simulation_outputs_annual" annual_outputs.write.saveAsTable( table_name, mode="overwrite", overwriteSchema=True, partitionBy=["upgrade_id"] ) diff --git a/scripts/model_training.py b/scripts/model_training.py index 8718e73..47f944e 100644 --- a/scripts/model_training.py +++ b/scripts/model_training.py @@ -117,8 +117,6 @@ # COMMAND ---------- # DBTITLE 1,Define wrapper class for processing at inference time - - class SurrogateModelingWrapper(mlflow.pyfunc.PythonModel): """ A wrapper class that applies the pre/post processing to the data at inference time, @@ -218,7 +216,6 @@ def convert_feature_dataframe_to_dict( for col in self.building_features + ["weather_file_city_index"] } - # COMMAND ---------- # DBTITLE 1,Initialize model @@ -320,5 +317,3 @@ def convert_feature_dataframe_to_dict( # DBTITLE 1,Pass Run ID to next notebook if running in job if not DEBUG: dbutils.jobs.taskValues.set(key="run_id", value=run_id) - -# COMMAND ---------- diff --git a/src/datagen.py b/src/datagen.py index cbce261..bd92d8a 100644 --- a/src/datagen.py +++ b/src/datagen.py @@ -47,10 +47,11 @@ class DataGenerator(tf.keras.utils.Sequence): # init FeatureEngineering client fe = FeatureEngineeringClient() - # table names to pull from - building_feature_table_name = "ml.surrogate_model.building_features_tmp" + #table names to pull from + building_feature_table_name = "ml.surrogate_model.building_features" weather_feature_table_name = "ml.surrogate_model.weather_features_hourly" + #TODO: put this in some kind of shared config that can be used across srcipts/repos # init all of the class attribute defaults building_features = [ # structure @@ -154,8 +155,8 @@ class DataGenerator(tf.keras.utils.Sequence): "propane": ["propane__total"], } - # TODO: put this in some kind of shared config that can be used across srcipts/repos - supported_upgrade_ids = [0.0, 1.0, 3.0, 4.0, 6.0, 9.0, 11.05, 13.01] + #TODO: add 13.01 and 11.05 before training new model + supported_upgrade_ids = [0.0, 1.0, 3.0, 4.0, 6.0, 9.0] def __init__( self,