diff --git a/scripts/build_feature_store.py b/scripts/build_feature_store.py index 746c6e5..858950f 100644 --- a/scripts/build_feature_store.py +++ b/scripts/build_feature_store.py @@ -132,8 +132,6 @@ # COMMAND ---------- # DBTITLE 1,Helper functions - - @udf(returnType=DoubleType()) def extract_percentage(value: str) -> float: """Extract percentage from string and divide by 100 @@ -540,11 +538,9 @@ def add_water_heater_features(df): .drop("wh_struct") ) - # COMMAND ---------- # DBTITLE 1,Mapping Expressions - # Make various mapping expressions def make_map_type_from_dict(mapping: Dict) -> Column: """ @@ -583,8 +579,6 @@ def make_map_type_from_dict(mapping: Dict) -> Column: # COMMAND ---------- # DBTITLE 1,Building metadata feature transformation function - - def transform_building_features() -> DataFrame: """ Read and transform subset of building_metadata features for single family homes. @@ -657,6 +651,12 @@ def transform_building_features() -> DataFrame: .when(F.col("heating_appliance_type") == "", "None") .otherwise(F.trim(F.col("heating_appliance_type"))), ) + .withColumn( + "heat_pump_sizing_methodology", + F.when(F.col("heating_appliance_type") == "ASHP", F.lit("ACCA")).otherwise( + "None" + ), + ) .withColumn( "heating_efficiency_nominal_percentage", extract_heating_efficiency(F.col("hvac_heating_efficiency")), @@ -780,7 +780,7 @@ def transform_building_features() -> DataFrame: F.col("geometry_building_number_units_sfa"), F.col("geometry_building_number_units_mf"), F.lit("1"), - ).cast('int'), + ).cast("int"), ) .withColumn( "is_middle_unit", @@ -872,6 +872,7 @@ def transform_building_features() -> DataFrame: # heating "heating_fuel", "heating_appliance_type", + "heat_pump_sizing_methodology", "has_ducted_heating", "heating_efficiency_nominal_percentage", "heating_setpoint_degrees_f", @@ -940,7 +941,6 @@ def transform_building_features() -> DataFrame: ) return building_metadata_transformed - # COMMAND ---------- # DBTITLE 1,Transform building metadata @@ -955,6 +955,8 @@ def transform_building_features() -> DataFrame: # COMMAND ---------- # DBTITLE 1,Apply upgrade functions +# TODO: put this in some kind of shared config that can be used across srcipts/repos +SUPPORTED_UPGRADES = [0.0, 1.0, 3.0, 4.0, 6.0, 9.0, 11.05, 13.01] # Mapping of climate zone temperature -> threshold, insulation # where climate zone temperature is the first character in the ASHRAE IECC climate zone # ('1', 13, 30) means units in climate zones 1A (1-anything) with R13 insulation or less are upgraded to R30 @@ -976,6 +978,7 @@ def upgrade_to_hp( baseline_building_features: DataFrame, ducted_efficiency: str, non_ducted_efficiency: str, + heat_pump_sizing_methodology: str = "ACCA", ) -> DataFrame: """ Upgrade the baseline building features to an air source heat pump (ASHP) with specified efficiencies. @@ -1009,6 +1012,7 @@ def upgrade_to_hp( ) .withColumn("ac_type", F.lit("Heat Pump")) .withColumn("cooled_space_percentage", F.lit(1.0)) + .withColumn("heat_pump_sizing_methodology", F.lit(heat_pump_sizing_methodology)) ) @@ -1026,7 +1030,7 @@ def apply_upgrades(baseline_building_features: DataFrame, upgrade_id: int) -> Da DataFrame: building_features, augmented to reflect the upgrade. """ # Raise an error if an unsupported upgrade ID is provided - if upgrade_id not in [0, 1, 3, 4, 6, 8.1, 8.2, 9]: + if upgrade_id not in SUPPORTED_UPGRADES: raise ValueError(f"Upgrade id={upgrade_id} is not yet supported") upgrade_building_features = ( @@ -1038,7 +1042,7 @@ def apply_upgrades(baseline_building_features: DataFrame, upgrade_id: int) -> Da if upgrade_id == 0: # baseline: return as is pass - if upgrade_id in [1, 9]: # basic enclosure + if upgrade_id in [1, 9, 13.01]: # basic enclosure upgrade_building_features = ( upgrade_building_features # Upgrade insulation of ceiling @@ -1115,6 +1119,17 @@ def apply_upgrades(baseline_building_features: DataFrame, upgrade_id: int) -> Da "Heat Pump, SEER 24, 13 HSPF", "Heat Pump, SEER 29.3, 14 HSPF", ) + if upgrade_id in [11.05, 13.01]: + upgrade_building_features = ( + upgrade_building_features.transform( + upgrade_to_hp, + "Heat Pump, SEER 18, 10 HSPF", + "Heat Pump, SEER 18, 10.5 HSPF", + "HERS", + ) + .withColumn("cooling_setpoint_offset_magnitude_degrees_f", F.lit(0.0)) + .withColumn("heating_setpoint_offset_magnitude_degrees_f", F.lit(0.0)) + ) if upgrade_id in [6, 9]: upgrade_building_features = upgrade_building_features.withColumn( "water_heater_efficiency", @@ -1172,19 +1187,17 @@ def apply_upgrades(baseline_building_features: DataFrame, upgrade_id: int) -> Da return upgrade_building_features - # COMMAND ---------- # DBTITLE 1,Apply upgrade logic to baseline features # create a metadata df for baseline and each HVAC upgrade -upgrade_ids = [0.0, 1.0, 3.0, 4.0, 6.0, 9.0] building_metadata_upgrades = reduce( DataFrame.unionByName, [ apply_upgrades( baseline_building_features=building_metadata_transformed, upgrade_id=upgrade ) - for upgrade in upgrade_ids + for upgrade in SUPPORTED_UPGRADES ], ) @@ -1192,7 +1205,9 @@ def apply_upgrades(baseline_building_features: DataFrame, upgrade_id: int) -> Da # DBTITLE 1,Drop rows where upgrade was not applied # read in outputs so that we can test applicability logic -annual_outputs = spark.table("ml.surrogate_model.building_simulation_outputs_annual") +annual_outputs = spark.table( + "ml.surrogate_model.building_simulation_outputs_annual_tmp" +).where(F.col("upgrade_id").isin(SUPPORTED_UPGRADES)) # drop upgrades that had no unchanged features and therefore weren't upgraded partition_cols = building_metadata_upgrades.drop("upgrade_id").columns @@ -1208,6 +1223,8 @@ def apply_upgrades(baseline_building_features: DataFrame, upgrade_id: int) -> Da ) # test that the applicability logic matches between the features and targets +# we ignore 13.01 since they are all flagged as True in the output table +# even though many do not have the insulation upgrade applied and are therefore identical to 11.05 applicability_compare = building_metadata_upgrades_applicability_flag.alias( "features" ).join( @@ -1219,7 +1236,9 @@ def apply_upgrades(baseline_building_features: DataFrame, upgrade_id: int) -> Da assert ( applicability_compare.where( F.col("features.applicability") != F.col("targets.applicability") - ).count() + ) + .where(F.col("upgrade_id") != 13.01) + .count() == 0 ) # #display mismatching cases if assert fails @@ -1287,8 +1306,6 @@ def apply_upgrades(baseline_building_features: DataFrame, upgrade_id: int) -> Da # COMMAND ---------- # DBTITLE 1,Weather feature transformation function - - def transform_weather_features() -> DataFrame: """ Read and transform weather timeseries table. Pivot from long format indexed by (weather_file_city, hour) @@ -1309,7 +1326,6 @@ def transform_weather_features() -> DataFrame: ) return weather_data_arrays - # COMMAND ---------- # DBTITLE 1,Transform weather features @@ -1375,7 +1391,7 @@ def transform_weather_features() -> DataFrame: # COMMAND ---------- # MAGIC %sql -# MAGIC -- the following code will upsert. To overwrite, uncommnet this line to first drop the table +# MAGIC -- the following code will upsert. To overwrite, uncomment this line to first drop the table # MAGIC -- DROP TABLE ml.surrogate_model.building_features # COMMAND ---------- @@ -1386,7 +1402,7 @@ def transform_weather_features() -> DataFrame: # COMMAND ---------- # DBTITLE 1,Write out building metadata feature store -table_name = "ml.surrogate_model.building_features" +table_name = "ml.surrogate_model.building_features_tmp" df = building_metadata_applicable_upgrades_with_weather_file_city_index if spark.catalog.tableExists(table_name): fe.write_table(name=table_name, df=df, mode="merge") diff --git a/src/datagen.py b/src/datagen.py index c0f5386..9f1fab8 100644 --- a/src/datagen.py +++ b/src/datagen.py @@ -47,10 +47,11 @@ class DataGenerator(tf.keras.utils.Sequence): # init FeatureEngineering client fe = FeatureEngineeringClient() - # init all of the class attribute defaults - building_feature_table_name = "ml.surrogate_model.building_features" + #table names to pull from + building_feature_table_name = "ml.surrogate_model.building_features_tmp" weather_feature_table_name = "ml.surrogate_model.weather_features_hourly" + # init all of the class attribute defaults building_features = [ # structure "n_bedrooms", @@ -153,6 +154,9 @@ class DataGenerator(tf.keras.utils.Sequence): "propane": ["propane__total"], } + #TODO: put this in some kind of shared config that can be used across srcipts/repos + supported_upgrade_ids = [0.0, 1.0, 3.0, 4.0, 6.0, 9.0, 11.05, 13.01] + def __init__( self, train_data: DataFrame, @@ -172,17 +176,9 @@ def __init__( - train_data (DataFrame): the training data containing the targets and keys to join to the feature tables. See class docstring for all other parameters. """ - # self.upgrades = upgrade_ids or self.upgrade_ids self.building_features = building_features or self.building_features self.weather_features = weather_features or self.weather_features - self.building_feature_table_name = ( - building_feature_table_name or self.building_feature_table_name - ) - self.weather_feature_table_name = ( - weather_feature_table_name or self.weather_feature_table_name - ) - self.consumption_group_dict = ( consumption_group_dict or self.consumption_group_dict ) @@ -422,7 +418,7 @@ def on_epoch_end(self): def load_data( consumption_group_dict=DataGenerator.consumption_group_dict, building_feature_table_name=DataGenerator.building_feature_table_name, - upgrade_ids: List[str] = None, + upgrade_ids: List[float] = None, p_val=0.2, p_test=0.1, n_train=None, @@ -440,6 +436,7 @@ def load_data( Default is DataGenerator.consumption_by_fuel_dict (too long to write out) building_feature_table_name (str): Name of the building feature table. Default is "ml.surrogate_model.building_features" + upgrade_ids (list): List of upgrade ids to use. If none (default) all supported upgrades are used. p_val (float): Proportion of data to use for validation. Default is 0.2. p_test (float): Proportion of data to use for testing. Default is 0.1. n_train (int): Number of training records to select, where the size of the val and tests sets will be adjusted accordingly to @@ -466,14 +463,14 @@ def load_data( data = spark.sql( f""" SELECT B.building_id, B.upgrade_id, B.weather_file_city, {sum_str} - FROM ml.surrogate_model.building_simulation_outputs_annual O + FROM ml.surrogate_model.building_simulation_outputs_annual_tmp O INNER JOIN {building_feature_table_name} B ON B.upgrade_id = O.upgrade_id AND B.building_id == O.building_id - --WHERE B.upgrade_id IN (0,1,3,4) """ ) - if upgrade_ids is not None: - data = data.where(F.col("upgrade_id").isin(upgrade_ids)) + if upgrade_ids is None: + upgrade_ids = DataGenerator.supported_upgrade_ids + data = data.where(F.col("upgrade_id").isin(upgrade_ids)) # get list of unique building ids, which will be the basis for the dataset split unique_building_ids = data.where(F.col("upgrade_id") == 0).select("building_id")