Skip to content

Commit

Permalink
add 11.05 and 13.01 to feature table
Browse files Browse the repository at this point in the history
  • Loading branch information
mikivee committed Sep 27, 2024
1 parent 59037e9 commit 60e022c
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 35 deletions.
56 changes: 36 additions & 20 deletions scripts/build_feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,6 @@
# COMMAND ----------

# DBTITLE 1,Helper functions


@udf(returnType=DoubleType())
def extract_percentage(value: str) -> float:
"""Extract percentage from string and divide by 100
Expand Down Expand Up @@ -540,11 +538,9 @@ def add_water_heater_features(df):
.drop("wh_struct")
)


# COMMAND ----------

# DBTITLE 1,Mapping Expressions

# Make various mapping expressions
def make_map_type_from_dict(mapping: Dict) -> Column:
"""
Expand Down Expand Up @@ -583,8 +579,6 @@ def make_map_type_from_dict(mapping: Dict) -> Column:
# COMMAND ----------

# DBTITLE 1,Building metadata feature transformation function


def transform_building_features() -> DataFrame:
"""
Read and transform subset of building_metadata features for single family homes.
Expand Down Expand Up @@ -657,6 +651,12 @@ def transform_building_features() -> DataFrame:
.when(F.col("heating_appliance_type") == "", "None")
.otherwise(F.trim(F.col("heating_appliance_type"))),
)
.withColumn(
"heat_pump_sizing_methodology",
F.when(F.col("heating_appliance_type") == "ASHP", F.lit("ACCA")).otherwise(
"None"
),
)
.withColumn(
"heating_efficiency_nominal_percentage",
extract_heating_efficiency(F.col("hvac_heating_efficiency")),
Expand Down Expand Up @@ -780,7 +780,7 @@ def transform_building_features() -> DataFrame:
F.col("geometry_building_number_units_sfa"),
F.col("geometry_building_number_units_mf"),
F.lit("1"),
).cast('int'),
).cast("int"),
)
.withColumn(
"is_middle_unit",
Expand Down Expand Up @@ -872,6 +872,7 @@ def transform_building_features() -> DataFrame:
# heating
"heating_fuel",
"heating_appliance_type",
"heat_pump_sizing_methodology",
"has_ducted_heating",
"heating_efficiency_nominal_percentage",
"heating_setpoint_degrees_f",
Expand Down Expand Up @@ -940,7 +941,6 @@ def transform_building_features() -> DataFrame:
)
return building_metadata_transformed


# COMMAND ----------

# DBTITLE 1,Transform building metadata
Expand All @@ -955,6 +955,8 @@ def transform_building_features() -> DataFrame:
# COMMAND ----------

# DBTITLE 1,Apply upgrade functions
# TODO: put this in some kind of shared config that can be used across srcipts/repos
SUPPORTED_UPGRADES = [0.0, 1.0, 3.0, 4.0, 6.0, 9.0, 11.05, 13.01]
# Mapping of climate zone temperature -> threshold, insulation
# where climate zone temperature is the first character in the ASHRAE IECC climate zone
# ('1', 13, 30) means units in climate zones 1A (1-anything) with R13 insulation or less are upgraded to R30
Expand All @@ -976,6 +978,7 @@ def upgrade_to_hp(
baseline_building_features: DataFrame,
ducted_efficiency: str,
non_ducted_efficiency: str,
heat_pump_sizing_methodology: str = "ACCA",
) -> DataFrame:
"""
Upgrade the baseline building features to an air source heat pump (ASHP) with specified efficiencies.
Expand Down Expand Up @@ -1009,6 +1012,7 @@ def upgrade_to_hp(
)
.withColumn("ac_type", F.lit("Heat Pump"))
.withColumn("cooled_space_percentage", F.lit(1.0))
.withColumn("heat_pump_sizing_methodology", F.lit(heat_pump_sizing_methodology))
)


Expand All @@ -1026,7 +1030,7 @@ def apply_upgrades(baseline_building_features: DataFrame, upgrade_id: int) -> Da
DataFrame: building_features, augmented to reflect the upgrade.
"""
# Raise an error if an unsupported upgrade ID is provided
if upgrade_id not in [0, 1, 3, 4, 6, 8.1, 8.2, 9]:
if upgrade_id not in SUPPORTED_UPGRADES:
raise ValueError(f"Upgrade id={upgrade_id} is not yet supported")

upgrade_building_features = (
Expand All @@ -1038,7 +1042,7 @@ def apply_upgrades(baseline_building_features: DataFrame, upgrade_id: int) -> Da
if upgrade_id == 0: # baseline: return as is
pass

if upgrade_id in [1, 9]: # basic enclosure
if upgrade_id in [1, 9, 13.01]: # basic enclosure
upgrade_building_features = (
upgrade_building_features
# Upgrade insulation of ceiling
Expand Down Expand Up @@ -1115,6 +1119,17 @@ def apply_upgrades(baseline_building_features: DataFrame, upgrade_id: int) -> Da
"Heat Pump, SEER 24, 13 HSPF",
"Heat Pump, SEER 29.3, 14 HSPF",
)
if upgrade_id in [11.05, 13.01]:
upgrade_building_features = (
upgrade_building_features.transform(
upgrade_to_hp,
"Heat Pump, SEER 18, 10 HSPF",
"Heat Pump, SEER 18, 10.5 HSPF",
"HERS",
)
.withColumn("cooling_setpoint_offset_magnitude_degrees_f", F.lit(0.0))
.withColumn("heating_setpoint_offset_magnitude_degrees_f", F.lit(0.0))
)
if upgrade_id in [6, 9]:
upgrade_building_features = upgrade_building_features.withColumn(
"water_heater_efficiency",
Expand Down Expand Up @@ -1172,27 +1187,27 @@ def apply_upgrades(baseline_building_features: DataFrame, upgrade_id: int) -> Da

return upgrade_building_features


# COMMAND ----------

# DBTITLE 1,Apply upgrade logic to baseline features
# create a metadata df for baseline and each HVAC upgrade
upgrade_ids = [0.0, 1.0, 3.0, 4.0, 6.0, 9.0]
building_metadata_upgrades = reduce(
DataFrame.unionByName,
[
apply_upgrades(
baseline_building_features=building_metadata_transformed, upgrade_id=upgrade
)
for upgrade in upgrade_ids
for upgrade in SUPPORTED_UPGRADES
],
)

# COMMAND ----------

# DBTITLE 1,Drop rows where upgrade was not applied
# read in outputs so that we can test applicability logic
annual_outputs = spark.table("ml.surrogate_model.building_simulation_outputs_annual")
annual_outputs = spark.table(
"ml.surrogate_model.building_simulation_outputs_annual_tmp"
).where(F.col("upgrade_id").isin(SUPPORTED_UPGRADES))

# drop upgrades that had no unchanged features and therefore weren't upgraded
partition_cols = building_metadata_upgrades.drop("upgrade_id").columns
Expand All @@ -1208,6 +1223,8 @@ def apply_upgrades(baseline_building_features: DataFrame, upgrade_id: int) -> Da
)

# test that the applicability logic matches between the features and targets
# we ignore 13.01 since they are all flagged as True in the output table
# even though many do not have the insulation upgrade applied and are therefore identical to 11.05
applicability_compare = building_metadata_upgrades_applicability_flag.alias(
"features"
).join(
Expand All @@ -1219,7 +1236,9 @@ def apply_upgrades(baseline_building_features: DataFrame, upgrade_id: int) -> Da
assert (
applicability_compare.where(
F.col("features.applicability") != F.col("targets.applicability")
).count()
)
.where(F.col("upgrade_id") != 13.01)
.count()
== 0
)
# #display mismatching cases if assert fails
Expand Down Expand Up @@ -1287,8 +1306,6 @@ def apply_upgrades(baseline_building_features: DataFrame, upgrade_id: int) -> Da
# COMMAND ----------

# DBTITLE 1,Weather feature transformation function


def transform_weather_features() -> DataFrame:
"""
Read and transform weather timeseries table. Pivot from long format indexed by (weather_file_city, hour)
Expand All @@ -1309,7 +1326,6 @@ def transform_weather_features() -> DataFrame:
)
return weather_data_arrays


# COMMAND ----------

# DBTITLE 1,Transform weather features
Expand Down Expand Up @@ -1375,7 +1391,7 @@ def transform_weather_features() -> DataFrame:
# COMMAND ----------

# MAGIC %sql
# MAGIC -- the following code will upsert. To overwrite, uncommnet this line to first drop the table
# MAGIC -- the following code will upsert. To overwrite, uncomment this line to first drop the table
# MAGIC -- DROP TABLE ml.surrogate_model.building_features

# COMMAND ----------
Expand All @@ -1386,7 +1402,7 @@ def transform_weather_features() -> DataFrame:
# COMMAND ----------

# DBTITLE 1,Write out building metadata feature store
table_name = "ml.surrogate_model.building_features"
table_name = "ml.surrogate_model.building_features_tmp"
df = building_metadata_applicable_upgrades_with_weather_file_city_index
if spark.catalog.tableExists(table_name):
fe.write_table(name=table_name, df=df, mode="merge")
Expand Down
27 changes: 12 additions & 15 deletions src/datagen.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,11 @@ class DataGenerator(tf.keras.utils.Sequence):
# init FeatureEngineering client
fe = FeatureEngineeringClient()

# init all of the class attribute defaults
building_feature_table_name = "ml.surrogate_model.building_features"
#table names to pull from
building_feature_table_name = "ml.surrogate_model.building_features_tmp"
weather_feature_table_name = "ml.surrogate_model.weather_features_hourly"

# init all of the class attribute defaults
building_features = [
# structure
"n_bedrooms",
Expand Down Expand Up @@ -153,6 +154,9 @@ class DataGenerator(tf.keras.utils.Sequence):
"propane": ["propane__total"],
}

#TODO: put this in some kind of shared config that can be used across srcipts/repos
supported_upgrade_ids = [0.0, 1.0, 3.0, 4.0, 6.0, 9.0, 11.05, 13.01]

def __init__(
self,
train_data: DataFrame,
Expand All @@ -172,17 +176,9 @@ def __init__(
- train_data (DataFrame): the training data containing the targets and keys to join to the feature tables.
See class docstring for all other parameters.
"""
# self.upgrades = upgrade_ids or self.upgrade_ids
self.building_features = building_features or self.building_features
self.weather_features = weather_features or self.weather_features

self.building_feature_table_name = (
building_feature_table_name or self.building_feature_table_name
)
self.weather_feature_table_name = (
weather_feature_table_name or self.weather_feature_table_name
)

self.consumption_group_dict = (
consumption_group_dict or self.consumption_group_dict
)
Expand Down Expand Up @@ -422,7 +418,7 @@ def on_epoch_end(self):
def load_data(
consumption_group_dict=DataGenerator.consumption_group_dict,
building_feature_table_name=DataGenerator.building_feature_table_name,
upgrade_ids: List[str] = None,
upgrade_ids: List[float] = None,
p_val=0.2,
p_test=0.1,
n_train=None,
Expand All @@ -440,6 +436,7 @@ def load_data(
Default is DataGenerator.consumption_by_fuel_dict (too long to write out)
building_feature_table_name (str): Name of the building feature table.
Default is "ml.surrogate_model.building_features"
upgrade_ids (list): List of upgrade ids to use. If none (default) all supported upgrades are used.
p_val (float): Proportion of data to use for validation. Default is 0.2.
p_test (float): Proportion of data to use for testing. Default is 0.1.
n_train (int): Number of training records to select, where the size of the val and tests sets will be adjusted accordingly to
Expand All @@ -466,14 +463,14 @@ def load_data(
data = spark.sql(
f"""
SELECT B.building_id, B.upgrade_id, B.weather_file_city, {sum_str}
FROM ml.surrogate_model.building_simulation_outputs_annual O
FROM ml.surrogate_model.building_simulation_outputs_annual_tmp O
INNER JOIN {building_feature_table_name} B
ON B.upgrade_id = O.upgrade_id AND B.building_id == O.building_id
--WHERE B.upgrade_id IN (0,1,3,4)
"""
)
if upgrade_ids is not None:
data = data.where(F.col("upgrade_id").isin(upgrade_ids))
if upgrade_ids is None:
upgrade_ids = DataGenerator.supported_upgrade_ids
data = data.where(F.col("upgrade_id").isin(upgrade_ids))

# get list of unique building ids, which will be the basis for the dataset split
unique_building_ids = data.where(F.col("upgrade_id") == 0).select("building_id")
Expand Down

0 comments on commit 60e022c

Please sign in to comment.