Skip to content

Commit

Permalink
update common utilities
Browse files Browse the repository at this point in the history
  • Loading branch information
mikivee committed Nov 19, 2024
1 parent 386882c commit e2b652f
Show file tree
Hide file tree
Showing 12 changed files with 123 additions and 246 deletions.
15 changes: 8 additions & 7 deletions scripts/build_feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@
from pyspark.sql import DataFrame
from pyspark.sql.types import StringType

from src.dmutils import sumo, qa_utils
from src.dmutils import qa_utils
from src import feature_utils

# COMMAND ----------

Expand All @@ -59,7 +60,7 @@
# COMMAND ----------

# DBTITLE 1,Transform building metadata
baseline_building_metadata_transformed = sumo.transform_building_features('ml.surrogate_model.building_metadata')
baseline_building_metadata_transformed = feature_utils.transform_building_features('ml.surrogate_model.building_metadata')

# COMMAND ----------

Expand All @@ -70,12 +71,12 @@
# COMMAND ----------

# DBTITLE 1,Build metadata table for all samples and upgrades
building_metadata_upgrades = sumo.build_upgrade_metadata_table(baseline_building_metadata_transformed)
building_metadata_upgrades = feature_utils.build_upgrade_metadata_table(baseline_building_metadata_transformed)

# COMMAND ----------

# DBTITLE 1,Drop rows where upgrade was not applied
building_metadata_applicable_upgrades = sumo.drop_non_upgraded_samples(
building_metadata_applicable_upgrades = feature_utils.drop_non_upgraded_samples(
building_metadata_upgrades,
check_applicability_logic=True)

Expand Down Expand Up @@ -152,13 +153,13 @@ def transform_weather_features() -> DataFrame:

# DBTITLE 1,Add weather file city index
# fit the string indexer on the weather feature df
weather_file_city_indexer = sumo.fit_weather_city_index(df_to_fit=weather_features)
weather_file_city_indexer = feature_utils.fit_weather_city_index(df_to_fit=weather_features)
# apply indexer to weather feature df to get a weather_file_city_index column
weather_features_indexed = sumo.transform_weather_city_index(
weather_features_indexed = feature_utils.transform_weather_city_index(
df_to_transform=weather_features,
weather_file_city_indexer=weather_file_city_indexer)
# apply indexer to building metadata feature df to get a weather_file_city_index column
building_metadata_with_weather_index = sumo.transform_weather_city_index(
building_metadata_with_weather_index = feature_utils.transform_weather_city_index(
df_to_transform=building_metadata_applicable_upgrades,
weather_file_city_indexer=weather_file_city_indexer)

Expand Down
6 changes: 3 additions & 3 deletions scripts/extract_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@
# COMMAND ----------

# DBTITLE 1,Imports
import os
from typing import List

from cloudpathlib import CloudPath
from pyspark.sql import DataFrame
import pyspark.sql.functions as F

from src.dmutils import bsb, data_cleaning, sumo
from src.dmutils import bsb, data_cleaning
from src import feature_utils

# COMMAND ----------

Expand Down Expand Up @@ -223,7 +223,7 @@ def extract_hourly_weather_data() -> DataFrame:

# DBTITLE 1,Extract building metadata
raw_building_metadata = spark.read.parquet(BUILDING_METADATA_PARQUET_PATH)
building_metadata = sumo.clean_building_metadata(raw_building_metadata)
building_metadata = feature_utils.clean_building_metadata(raw_building_metadata)

# COMMAND ----------

Expand Down
2 changes: 1 addition & 1 deletion scripts/megastock/data_prep_01.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from pyspark.sql.types import StringType

sys.path.append("../../src")
from dmutilslocal import sumo
from src import feature_utils

# COMMAND ----------

Expand Down
12 changes: 6 additions & 6 deletions scripts/megastock/feature_extract_02.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from databricks.feature_engineering import FeatureEngineeringClient
import pyspark.sql.functions as F

sys.path.append("../../src")
from dmutilslocal import sumo, qa_utils
from dmutils import qa_utils
from src import feature_utils

# COMMAND ----------

Expand All @@ -22,7 +22,7 @@

# COMMAND ----------

baseline_building_metadata_transformed = sumo.transform_building_features(f"ml.megastock.building_metadata_{N_SAMPLE_TAG}")
baseline_building_metadata_transformed = feature_utils.transform_building_features(f"ml.megastock.building_metadata_{N_SAMPLE_TAG}")

# COMMAND ----------

Expand All @@ -38,15 +38,15 @@

# COMMAND ----------

building_metadata_upgrades = sumo.build_upgrade_metadata_table(baseline_building_metadata_transformed)
building_metadata_upgrades = feature_utils.build_upgrade_metadata_table(baseline_building_metadata_transformed)

# COMMAND ----------

building_metadata_upgrades = sumo.add_weather_city_index(building_metadata_upgrades)
building_metadata_upgrades = feature_utils.add_weather_city_index(building_metadata_upgrades)

# COMMAND ----------

building_metadata_applicable_upgrades = sumo.drop_non_upgraded_samples(
building_metadata_applicable_upgrades = feature_utils.drop_non_upgraded_samples(
building_metadata_upgrades, check_applicability_logic=False
)

Expand Down
2 changes: 1 addition & 1 deletion src/dmutils/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
EER2_TO_EER = 1.04
"""
Conversion of SEER2 to SEER (Seasonal Energy Efficiency Ratio)
for Packaged Air Conditioner and Heat Pump.
for Packaged Air Conditioner and Heat Pump.
The same conversion can be used for SEER2 to SEER.
See https://www.marathonhvac.com/seer-to-seer2
Expand Down
3 changes: 2 additions & 1 deletion src/dmutils/data_cleaning.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ def edit_columns(
in format {to_replace: replace_value}.
replace_period_character (str, optional): Character to replace '.' with. Defaults to '__'.
Returns:
Returns
-------
DataFrame: Cleaned DataFrame
"""
# replace these with an empty string
Expand Down
51 changes: 51 additions & 0 deletions src/dmutils/qa_utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# TODO: move to dmutils

from pyspark.sql import DataFrame
import pyspark.sql.functions as F

Expand All @@ -20,3 +22,52 @@ def check_for_null_values(df: DataFrame) -> None:
null_count_items = {k: v for k, v in null_counts_dict.items() if v > 0}
if len(null_count_items) > 0:
raise ValueError(f"Columns with null values: {null_count_items}")


def compare_dataframes_string_values(df1: DataFrame, df2: DataFrame) -> dict:
"""
Identify the differences possible values for string columns in two PySpark DataFrames.
Args:
df1 (DataFrame): The first dataframe to compare.
df1 (DataFrame): The second dataframe to compare.
Returns:
dict: A dictionary of differences between the two DataFrames. The keys are the string columns that
have differences, and the values are dictionaries containing the differences for each column.
"""

# Initialize a dictionary to store the results
comparison_dict = {}

# Get string columns
string_cols_df1 = [field.name for field in df1.schema.fields if field.dataType.simpleString() == "string"]
string_cols_df2 = [field.name for field in df2.schema.fields if field.dataType.simpleString() == "string"]

# Find common string columns
common_string_cols = set(string_cols_df1).intersection(set(string_cols_df2))

for col in common_string_cols:
# Get unique values as lists
unique_df1 = df1.select(col).distinct().rdd.flatMap(lambda x: x).collect()
unique_df2 = df2.select(col).distinct().rdd.flatMap(lambda x: x).collect()

unique_set1 = set(unique_df1)
unique_set2 = set(unique_df2)

differences = {}

# Find values unique to df1
only_in_df1 = unique_set1 - unique_set2
if only_in_df1:
differences["df1 only"] = only_in_df1

# Find values unique to df2
only_in_df2 = unique_set2 - unique_set1
if only_in_df2:
differences["df2 only"] = only_in_df2

if differences:
comparison_dict[col] = differences

return comparison_dict
83 changes: 50 additions & 33 deletions src/dmutils/sumo.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# copied from surrogate_modeling repo, branch mev/refactor_feature_building
# src/dmutils/sumo.py

# TODO: Move this into dmutils
# TODO: Run tests using doctest

Expand Down Expand Up @@ -111,7 +114,31 @@ def clean_building_metadata(raw_resstock_metadata_df: DataFrame) -> DataFrame:
],
)

return building_metadata_cleaned
# Filter to homes of interest: occupied sf homes with modeled fuels and without shared HVAC systems
filtered_building_metadata = (
building_metadata_cleaned
# only single family, mobile home, or multifam with < 5 units
.where(
F.col("geometry_building_type_acs").isin(
[
"Single-Family Detached",
"Single-Family Attached",
"Mobile Home",
"2 Unit",
"3 or 4 Unit",
]
)
)
# other fuels are not modeled in resstock,
# and this filter is sufficienct to remove units that have other fuels for any applaince
.where(F.col("heating_fuel") != "Other Fuel").where(F.col("water_heater_fuel") != "Other Fuel")
# filter out vacant homes
.where(F.col("vacancy_status") == "Occupied")
# filter out homes with shared HVAC or water heating systems
.where((F.col("hvac_has_shared_system") == "None") & (F.col("water_heater_in_unit") == "Yes"))
)

return filtered_building_metadata


# -- feature transformation udfs -- #
Expand Down Expand Up @@ -157,9 +184,12 @@ def extract_r_value(construction_type: str, set_none_to_inf: bool = False) -> in
Assumption: all baseline walls have similar R-value of ~4.
The returned value is for additional insulation only. Examples:
Uninsulated brick, 3w, 12": ~4 (https://ncma.org/resource/rvalues-of-multi-wythe-concrete-masonry-walls/)
Uninsulated wood studs: ~4 (assuming 2x4 studs and 1.25/inch (air gap has higher R-value than wood), 3.5*1.25=4.375)
Hollow Concrete Masonry Unit, Uninsulated: ~4 per 6" (https://ncma.org/resource/rvalues-ufactors-of-single-wythe-concrete-masonry-walls/)
Uninsulated brick, 3w, 12": ~4
(https://ncma.org/resource/rvalues-of-multi-wythe-concrete-masonry-walls/)
Uninsulated wood studs: ~4
(assuming 2x4 studs and 1.25/inch (air gap has higher R-value than wood), 3.5*1.25=4.375)
Hollow Concrete Masonry Unit, Uninsulated: ~4 per 6"
(https://ncma.org/resource/rvalues-ufactors-of-single-wythe-concrete-masonry-walls/)
>>> extract_r_value('Finished, R-13')
13
Expand Down Expand Up @@ -317,7 +347,7 @@ def extract_energy_factor(ef_string: str) -> int:
>>> extract_energy_factor("None")
99.
"""
if ef_string == "None":
if "None" in ef_string:
return 99.0
return float(ef_string.split(",")[0][3:])

Expand Down Expand Up @@ -398,6 +428,7 @@ def get_water_heater_capacity_ashrae(n_bedrooms: int, n_bathrooms: float, is_ele
]
)


# pulled from options.tsv
@udf(wh_schema)
def get_water_heater_specs(name: str) -> StructType:
Expand Down Expand Up @@ -538,6 +569,7 @@ def make_map_type_from_dict(mapping: Dict) -> Column:
}
)


# -- function to apply all the baseline transformations -- #
def transform_building_features(building_metadata_table_name) -> DataFrame:
"""
Expand All @@ -550,28 +582,7 @@ def transform_building_features(building_metadata_table_name) -> DataFrame:
Dataframe: dataframe of building metadata features
"""
building_metadata_features = (
spark.read.table("ml.surrogate_model.building_metadata")
# -- filter to occupied sf homes with modeled fuels and without shared HVAC systems -- #
# sf homes only
.where(
F.col("geometry_building_type_acs").isin(
[
"Single-Family Detached",
"Single-Family Attached",
"Mobile Home",
"2 Unit",
"3 or 4 Unit",
]
)
)
# other fuels are not modeled in resstock,
# and this filter is sufficienct to remove units that have other fuels for any applaince
.where(F.col("heating_fuel") != "Other Fuel")
.where(F.col("water_heater_fuel") != "Other Fuel")
# filter out vacant homes
.where(F.col("vacancy_status") == "Occupied")
# filter out homes with shared HVAC or water heating systems
.where((F.col("hvac_has_shared_system") == "None") & (F.col("water_heater_in_unit") == "Yes"))
spark.read.table(building_metadata_table_name)
# -- structure transformations -- #
.withColumn("n_bedrooms", F.col("bedrooms").cast("int"))
.withColumn("n_bathrooms", F.col("n_bedrooms") / 2 + 0.5) # based on docs
Expand Down Expand Up @@ -780,7 +791,7 @@ def transform_building_features(building_metadata_table_name) -> DataFrame:
"n_bedrooms",
"n_bathrooms",
F.col("geometry_attic_type").alias("attic_type"),
"sqft",
F.col("sqft").cast("double"),
F.col("geometry_foundation_type").alias("foundation_type"),
"garage_size_n_car",
F.col("geometry_stories").cast("int").alias("n_stories"),
Expand Down Expand Up @@ -889,7 +900,8 @@ def upgrade_to_hp(
) -> DataFrame:
"""
Upgrade the baseline building features to an air source heat pump (ASHP) with specified efficiencies.
Note that all baseline hps in Resstock are lower efficiency than specified upgrade thresholds (<=SEER 15; <=HSPF 8.5)
Note that all baseline hps in Resstock are lower efficiency than specified upgrade thresholds
(<=SEER 15; <=HSPF 8.5)
Args:
baseline_building_features (DataFrame): The baseline building features.
Expand Down Expand Up @@ -926,7 +938,8 @@ def upgrade_to_hp(
def apply_upgrades(baseline_building_features: DataFrame, upgrade_id: int) -> DataFrame:
"""
Modify building features to reflect the upgrade. Source:
https://oedi-data-lake.s3.amazonaws.com/nrel-pds-building-stock/end-use-load-profiles-for-us-building-stock/2022/EUSS_ResRound1_Technical_Documentation.pdf
https://oedi-data-lake.s3.amazonaws.com/nrel-pds-building-stock/end-use-load-profiles-for-us-building-stock
/2022/EUSS_ResRound1_Technical_Documentation.pdf
In case of contradictions, consult: https://github.com/NREL/resstock/blob/run/euss/EUSS-project-file_2018_10k.yml.
Args:
Expand Down Expand Up @@ -1082,7 +1095,8 @@ def build_upgrade_metadata_table(baseline_building_features: DataFrame) -> DataF
to create a comprehensive DataFrame that includes the baseline and all upgrades.
Args:
building_features_baseline (DataFrame): A Spark DataFrame containing baseline building metadata for a set of building samples.
building_features_baseline (DataFrame): A Spark DataFrame containing baseline building metadata
for a set of building samples.
Returns:
DataFrame: A Spark DataFrame containing building metadata for each upgrade including baseline.
"""
Expand All @@ -1109,7 +1123,8 @@ def drop_non_upgraded_samples(building_features: DataFrame, check_applicability_
Args:
building_metadata_upgrades (DataFrame): The DataFrame containing building metadata upgrades.
check_applicability_logic (bool, optional): Flag indicating whether to check whether the applicabilitity logic
matches between the metadata (i.e, non-unique set of metadata) the applicability flag output by the simulation. Should only be passed if running on Resstock EUSS data. Defaults to False.
matches between the metadata (i.e, non-unique set of metadata) the applicability flag output by the
simulation. Should only be passed if running on Resstock EUSS data. Defaults to False.
Returns:
DataFrame: The DataFrame with non-upgraded samples dropped.
Expand Down Expand Up @@ -1149,7 +1164,8 @@ def drop_non_upgraded_samples(building_features: DataFrame, check_applicability_
if mismatch_count > 0:
applicability_compare.where(F.col("features.applicability") != F.col("targets.applicability")).display()
raise ValueError(
f"{mismatch_count} cases where applicability based on metadata and simulation applicability flag do not match"
f"{mismatch_count} cases where applicability based on metadata and simulation applicability flag\
do not match"
)

# drop feature rows where upgrade was not applied
Expand All @@ -1168,6 +1184,7 @@ def fit_weather_city_index(df_to_fit: Optional[DataFrame] = None):
inputCol="weather_file_city",
outputCol="weather_file_city_index",
stringOrderType="alphabetAsc",
handleInvalid="skip",
)
return indexer.fit(df_to_fit)

Expand Down
Loading

0 comments on commit e2b652f

Please sign in to comment.