From fe0ce2b01403185e9f4a00bd27f13bf87474ccab Mon Sep 17 00:00:00 2001 From: Chris Mutel Date: Tue, 9 Jul 2024 08:13:37 +0200 Subject: [PATCH] Compatibility with simapro multifunctionality --- bw2io/__init__.py | 41 +++++----- bw2io/ecoinvent.py | 7 +- bw2io/extractors/ecospold1.py | 2 +- bw2io/importers/__init__.py | 2 +- bw2io/importers/base_lci.py | 101 ++++++++++++++++++------- bw2io/importers/simapro_block_csv.py | 87 +++++++++++++-------- bw2io/strategies/__init__.py | 10 ++- bw2io/strategies/simapro.py | 85 +++++++++++++++------ pyproject.toml | 2 + tests/strategies/simapro.py | 101 +++++++++++++++++++++++++ tests/strategies/simapro_allocation.py | 16 ++-- 11 files changed, 337 insertions(+), 117 deletions(-) diff --git a/bw2io/__init__.py b/bw2io/__init__.py index 52802a18..c63a549b 100644 --- a/bw2io/__init__.py +++ b/bw2io/__init__.py @@ -50,20 +50,12 @@ __version__ = "0.9.DEV28" -from .chemidplus import ChemIDPlus -from .package import BW2Package -from .export import ( - DatabaseToGEXF, - DatabaseSelectionToGEXF, - keyword_to_gephi_graph, - lci_matrices_to_excel, - lci_matrices_to_matlab, -) from .backup import ( backup_data_directory, backup_project_directory, restore_project_directory, ) +from .chemidplus import ChemIDPlus from .data import ( add_ecoinvent_33_biosphere_flows, add_ecoinvent_34_biosphere_flows, @@ -76,7 +68,13 @@ get_csv_example_filepath, get_xlsx_example_filepath, ) -from .migrations import migrations, Migration, create_core_migrations +from .export import ( + DatabaseSelectionToGEXF, + DatabaseToGEXF, + keyword_to_gephi_graph, + lci_matrices_to_excel, + lci_matrices_to_matlab, +) from .importers import ( CSVImporter, CSVLCIAImporter, @@ -92,10 +90,12 @@ SingleOutputEcospold1Importer, SingleOutputEcospold2Importer, ) +from .migrations import Migration, create_core_migrations, migrations +from .package import BW2Package +from .remote import install_project from .units import normalize_units -from .unlinked_data import unlinked_data, UnlinkedData +from .unlinked_data import UnlinkedData, unlinked_data from .utils import activity_hash, es2_activity_hash, load_json_data_file -from .remote import install_project try: from .ecoinvent import import_ecoinvent_release @@ -128,9 +128,10 @@ def create_default_lcia_methods( overwrite=False, rationalize_method_names=False, shortcut=True ): if shortcut: - import zipfile import json + import zipfile from pathlib import Path + from .importers.base_lcia import LCIAImporter fp = Path(__file__).parent.resolve() / "data" / "lcia" / "lcia_39_ecoinvent.zip" @@ -174,13 +175,14 @@ def useeio20(name="USEEIO-2.0", collapse_products=False, prune=False): print(f"{name} already present") return - from .importers.json_ld import JSONLDImporter - from .importers.json_ld_lcia import JSONLDLCIAImporter - from .strategies import remove_useeio_products, remove_random_exchanges - from .download_utils import download_with_progressbar - from pathlib import Path import tempfile import zipfile + from pathlib import Path + + from .download_utils import download_with_progressbar + from .importers.json_ld import JSONLDImporter + from .importers.json_ld_lcia import JSONLDLCIAImporter + from .strategies import remove_random_exchanges, remove_useeio_products with tempfile.TemporaryDirectory() as td: dp = Path(td) @@ -218,10 +220,11 @@ def exiobase_monetary( name=None, ignore_small_balancing_corrections=True, ): - from .download_utils import download_with_progressbar import tempfile from pathlib import Path + from .download_utils import download_with_progressbar + mapping = { (3, 8, 2): { "url": "https://zenodo.org/record/5589597/files/IOT_{year}_{system}.zip?download=1", diff --git a/bw2io/ecoinvent.py b/bw2io/ecoinvent.py index 7687aa1f..620a031f 100644 --- a/bw2io/ecoinvent.py +++ b/bw2io/ecoinvent.py @@ -313,7 +313,12 @@ def import_ecoinvent_release( for row in cfs: if lcia_prefix: - impact_category = (lcia_prefix, row["method"], row["category"], row["indicator"]) + impact_category = ( + lcia_prefix, + row["method"], + row["category"], + row["indicator"], + ) else: impact_category = (row["method"], row["category"], row["indicator"]) if row[cf_col_label] is None: diff --git a/bw2io/extractors/ecospold1.py b/bw2io/extractors/ecospold1.py index 94052cba..227187fa 100644 --- a/bw2io/extractors/ecospold1.py +++ b/bw2io/extractors/ecospold1.py @@ -3,7 +3,7 @@ import os from io import StringIO from pathlib import Path -from typing import Any, Union, Optional +from typing import Any, Optional, Union import numpy as np import pyecospold diff --git a/bw2io/importers/__init__.py b/bw2io/importers/__init__.py index 760cd677..bfe73c12 100644 --- a/bw2io/importers/__init__.py +++ b/bw2io/importers/__init__.py @@ -11,9 +11,9 @@ from .excel_lcia import CSVLCIAImporter, ExcelLCIAImporter from .exiobase3_hybrid import Exiobase3HybridImporter from .exiobase3_monetary import Exiobase3MonetaryImporter +from .simapro_block_csv import SimaProBlockCSVImporter from .simapro_csv import SimaProCSVImporter from .simapro_lcia_csv import SimaProLCIACSVImporter -from .simapro_block_csv import SimaProBlockCSVImporter """ This module provides classes for importing Life Cycle Impact Assessment (LCIA) data diff --git a/bw2io/importers/base_lci.py b/bw2io/importers/base_lci.py index b925a66e..b420d5ea 100644 --- a/bw2io/importers/base_lci.py +++ b/bw2io/importers/base_lci.py @@ -2,9 +2,10 @@ import functools import itertools import warnings -from typing import Optional +from typing import Optional, Tuple -from bw2data import Database, config, databases, parameters, labels +from bw2data import Database, config, databases, labels, parameters +from bw2data.data_store import ProcessedDataStore from bw2data.parameters import ( ActivityParameter, DatabaseParameter, @@ -56,7 +57,11 @@ def __init__(self, db_name): def all_linked(self): return self.statistics()[2] == 0 - def statistics(self, print_stats=True): + @property + def needs_multifunctional_database(self): + return any(ds.get("type") == "multifunctional" for ds in self.data) + + def statistics(self, print_stats: bool = True) -> Tuple[int, int, int, int]: num_datasets = len(self.data) num_exchanges = sum([len(ds.get("exchanges", [])) for ds in self.data]) num_unlinked = len( @@ -65,8 +70,12 @@ def statistics(self, print_stats=True): for ds in self.data for exc in ds.get("exchanges", []) if not exc.get("input") + and not (ds.get("type") == "multifunctional" and exc.get("functional")) ] ) + num_multifunctional = sum( + 1 for ds in self.data if ds.get("type") == "multifunctional" + ) if print_stats: unique_unlinked = collections.defaultdict(set) for ds in self.data: @@ -75,19 +84,28 @@ def statistics(self, print_stats=True): unique_unlinked = sorted( [(k, len(v)) for k, v in list(unique_unlinked.items())] ) - - print( - ( - "{} datasets\n{} exchanges\n{} unlinked exchanges\n " - + "\n ".join( - [ - "Type {}: {} unique unlinked exchanges".format(*o) - for o in unique_unlinked - ] - ) - ).format(num_datasets, num_exchanges, num_unlinked) + uu = "\n\t".join( + [ + "Type {}: {} unique unlinked exchanges".format(*o) + for o in unique_unlinked + ] ) - return num_datasets, num_exchanges, num_unlinked + + if num_multifunctional: + print( + f"""{num_datasets} datasets, including {num_multifunctional} multifunctional datasets +\t{num_exchanges} exchanges +\t{num_unlinked} unlinked exchanges ({len(unique_unlinked)} unique) +\t{uu}""" + ) + else: + print( + f"""{num_datasets} datasets +\t{num_exchanges} exchanges +\t{num_unlinked} unlinked exchanges ({len(unique_unlinked)} unique) +\t{uu}""" + ) + return num_datasets, num_exchanges, num_unlinked, num_multifunctional def write_project_parameters(self, data=None, delete_existing=True): """Write global parameters to ``ProjectParameter`` database table. @@ -201,6 +219,16 @@ def _write_activity_parameters(self, activity_parameters): for key in keys: parameters.add_exchanges_to_group(group, key) + def database_class( + self, db_name: str, requested_backend: str = "sqlite" + ) -> ProcessedDataStore: + from multifunctional import MultifunctionalDatabase + + if self.needs_multifunctional_database: + return MultifunctionalDatabase(db_name) + else: + return Database(db_name, backend=requested_backend) + def write_database( self, data: Optional[dict] = None, @@ -260,7 +288,7 @@ def write_database( if db_name in databases: # TODO: Raise error if unlinked exchanges? - db = Database(db_name) + db = self.database_class(db_name) if delete_existing: existing = {} else: @@ -269,9 +297,14 @@ def write_database( existing = {} if "format" not in self.metadata: self.metadata["format"] = self.format + if ( + self.needs_multifunctional_database + and "default_allocation" not in self.metadata + ): + self.metadata["default_allocation"] = "manual_allocation" with warnings.catch_warnings(): warnings.simplefilter("ignore") - db = Database(db_name, backend=backend) + db = self.database_class(db_name) db.register(**self.metadata) self.write_database_parameters(activate_parameters, delete_existing) @@ -346,27 +379,39 @@ def create_new_biosphere(self, biosphere_name: str): raise ValueError(f"{biosphere_name} database already exists") def reformat(exc): - return exc | {"type": labels.biosphere_node_default, "exchanges": [], "database": biosphere_name, "code": activity_hash(exc)} + return exc | { + "type": labels.biosphere_node_default, + "exchanges": [], + "database": biosphere_name, + "code": activity_hash(exc), + } - bio_data = {(flow["database"], flow["code"]): flow for flow in [ - reformat(exc) - for ds in self.data - for exc in ds.get("exchanges", []) - if exc["type"] in labels.biosphere_edge_types - and not exc.get("input") - ]} + bio_data = { + (flow["database"], flow["code"]): flow + for flow in [ + reformat(exc) + for ds in self.data + for exc in ds.get("exchanges", []) + if exc["type"] in labels.biosphere_edge_types and not exc.get("input") + ] + } if not bio_data: - print("Skipping biosphere database creation as all biosphere flows are linked") + print( + "Skipping biosphere database creation as all biosphere flows are linked" + ) return - print(f"Creating new biosphere database {biosphere_name} with {len(bio_data)} flows") + print( + f"Creating new biosphere database {biosphere_name} with {len(bio_data)} flows" + ) with warnings.catch_warnings(): warnings.simplefilter("ignore") new_bio = Database(biosphere_name) new_bio.register( - format=self.format, comment=f"Database for unlinked biosphere flows from {self.db_name}" + format=self.format, + comment=f"Database for unlinked biosphere flows from {self.db_name}", ) new_bio.write(bio_data) diff --git a/bw2io/importers/simapro_block_csv.py b/bw2io/importers/simapro_block_csv.py index bab20e48..4ac1c72f 100644 --- a/bw2io/importers/simapro_block_csv.py +++ b/bw2io/importers/simapro_block_csv.py @@ -1,14 +1,13 @@ import functools +import warnings from io import StringIO from pathlib import Path from typing import Optional -import warnings -from bw2data import Database, config, labels, databases +from bw2data import Database, config, databases, labels from bw_simapro_csv import SimaProCSV from ..strategies import ( - assign_only_functional_exchange_as_reference_product, change_electricity_unit_mj_to_kwh, convert_activity_parameters_to_list, drop_unspecified_subcategories, @@ -23,14 +22,15 @@ normalize_simapro_biosphere_categories, normalize_simapro_biosphere_names, normalize_units, + override_process_name_using_single_functional_exchange, set_code_by_activity_hash, - sp_allocate_functional_products, + set_metadata_using_single_functional_exchange, split_simapro_name_geo, strip_biosphere_exc_locations, update_ecoinvent_locations, ) -from .base_lci import LCIImporter from ..utils import activity_hash +from .base_lci import LCIImporter class SimaProBlockCSVImporter(LCIImporter): @@ -42,21 +42,19 @@ def __init__( database_name: Optional[str] = None, biosphere_database_name: Optional[str] = None, ): - spcsv = SimaProCSV( - path_or_stream=path_or_stream, database_name=database_name - ) + spcsv = SimaProCSV(path_or_stream=path_or_stream, database_name=database_name) data = spcsv.to_brightway() self.db_name = spcsv.database_name self.default_biosphere_database_name = biosphere_database_name - self.metadata = data['database'] - self.data = data['processes'] - self.database_parameters = data['database_parameters'] - self.project_parameters = data['project_parameters'] + self.metadata = data["database"] + self.data = data["processes"] + self.database_parameters = data["database_parameters"] + self.project_parameters = data["project_parameters"] self.strategies = [ - sp_allocate_functional_products, - assign_only_functional_exchange_as_reference_product, + set_metadata_using_single_functional_exchange, + override_process_name_using_single_functional_exchange, drop_unspecified_subcategories, split_simapro_name_geo, link_technosphere_based_on_name_unit_location, @@ -64,7 +62,7 @@ def __init__( link_iterable_by_fields, other=Database(biosphere_database_name or config.biosphere), kind=labels.biosphere_edge_types, - ) + ), ] def create_technosphere_placeholders(self, database_name: str): @@ -73,33 +71,46 @@ def create_technosphere_placeholders(self, database_name: str): raise ValueError(f"{database_name} database already exists") def reformat(exc): - new_exc = exc | {"type": labels.process_node_default, "exchanges": [], "database": database_name, "code": activity_hash(exc)} + new_exc = exc | { + "type": labels.process_node_default, + "exchanges": [], + "database": database_name, + "code": activity_hash(exc), + } if not new_exc.get("location"): # Also update original for correct linking # Location is required - exc['location'] = new_exc['location'] = 'GLO' + exc["location"] = new_exc["location"] = "GLO" return new_exc - proc_data = {(ds["database"], ds["code"]): ds for ds in [ - reformat(exc) - for ds in self.data - for exc in ds.get("exchanges", []) - if exc["type"] not in labels.biosphere_edge_types - and not exc.get("input") - and not exc.get("functional") - ]} + proc_data = { + (ds["database"], ds["code"]): ds + for ds in [ + reformat(exc) + for ds in self.data + for exc in ds.get("exchanges", []) + if exc["type"] not in labels.biosphere_edge_types + and not exc.get("input") + and not exc.get("functional") + ] + } if not proc_data: - print("Skipping placeholder database creation as all technosphere flows are linked") + print( + "Skipping placeholder database creation as all technosphere flows are linked" + ) return - print(f"Creating new placeholder database {database_name} with {len(proc_data)} processes") + print( + f"Creating new placeholder database {database_name} with {len(proc_data)} processes" + ) with warnings.catch_warnings(): warnings.simplefilter("ignore") new_db = Database(database_name) new_db.register( - format=self.format, comment=f"Database for unlinked technosphere flows from {self.db_name}" + format=self.format, + comment=f"Database for unlinked technosphere flows from {self.db_name}", ) new_db.write(proc_data) @@ -116,8 +127,7 @@ def reformat(exc): def use_ecoinvent_strategies(self) -> None: """Switch strategy selection to normalize data to ecoinvent flow lists""" self.strategies = [ - sp_allocate_products, - assign_only_functional_exchange_as_reference_product, + set_metadata_using_single_functional_exchange, drop_unspecified_subcategories, normalize_units, update_ecoinvent_locations, @@ -136,12 +146,23 @@ def use_ecoinvent_strategies(self) -> None: fix_localized_water_flows, functools.partial( link_iterable_by_fields, - other=Database(self.default_biosphere_database_name or config.biosphere), + other=Database( + self.default_biosphere_database_name or config.biosphere + ), kind="biosphere", ), ] - def write_database(self, backend: Optional[str] = None, activate_parameters: bool = True, searchable: bool = True) -> Database: + def write_database( + self, + backend: Optional[str] = None, + activate_parameters: bool = True, + searchable: bool = True, + ) -> Database: if activate_parameters: self.write_project_parameters(delete_existing=False) - return super().write_database(backend=backend, activate_parameters=activate_parameters, searchable=searchable) + return super().write_database( + backend=backend, + activate_parameters=activate_parameters, + searchable=searchable, + ) diff --git a/bw2io/strategies/__init__.py b/bw2io/strategies/__init__.py index 06165fe2..d1184ecb 100644 --- a/bw2io/strategies/__init__.py +++ b/bw2io/strategies/__init__.py @@ -2,7 +2,7 @@ "add_activity_hash_code", "add_cpc_classification_from_single_reference_product", "add_database_name", - "assign_only_functional_exchange_as_reference_product", + "set_metadata_using_single_functional_exchange", "assign_only_product_as_production", "assign_single_product_as_activity", "change_electricity_unit_mj_to_kwh", @@ -29,7 +29,6 @@ "fix_ecoinvent_flows_pre35", "fix_localized_water_flows", "fix_unreasonably_high_lognormal_uncertainties", - "reparametrize_lognormal_to_agree_with_static_amount", "fix_zero_allocation_products", "json_ld_add_activity_unit", "json_ld_add_products_as_activities", @@ -61,12 +60,14 @@ "normalize_simapro_biosphere_categories", "normalize_simapro_biosphere_names", "normalize_units", + "override_process_name_using_single_functional_exchange", "remove_random_exchanges", "remove_uncertainty_from_negative_loss_exchanges", "remove_unnamed_parameters", "remove_useeio_products", "remove_zero_amount_coproducts", "remove_zero_amount_inputs_with_no_activity", + "reparametrize_lognormal_to_agree_with_static_amount", "set_biosphere_type", "set_code_by_activity_hash", "set_lognormal_loc_value", @@ -169,15 +170,16 @@ from .locations import update_ecoinvent_locations from .migrations import migrate_datasets, migrate_exchanges from .simapro import ( - assign_only_functional_exchange_as_reference_product, change_electricity_unit_mj_to_kwh, fix_localized_water_flows, fix_zero_allocation_products, link_technosphere_based_on_name_unit_location, normalize_simapro_biosphere_categories, normalize_simapro_biosphere_names, + override_process_name_using_single_functional_exchange, + set_metadata_using_single_functional_exchange, + sp_allocate_functional_products, sp_allocate_products, split_simapro_name_geo, - sp_allocate_functional_products, ) from .useeio import remove_random_exchanges, remove_useeio_products diff --git a/bw2io/strategies/simapro.py b/bw2io/strategies/simapro.py index 643f9167..e6b9a8c6 100644 --- a/bw2io/strategies/simapro.py +++ b/bw2io/strategies/simapro.py @@ -1,9 +1,10 @@ import copy import re from numbers import Number +from typing import List -import numpy as np import bw2parameters +import numpy as np from bw2data import Database from stats_arrays import LognormalUncertainty @@ -96,11 +97,7 @@ def sp_allocate_functional_products(db): for ds in db: if not ds["type"] == "multifunctional": continue - products = [ - exc - for exc in ds.get("exchanges", []) - if functional(exc) - ] + products = [exc for exc in ds.get("exchanges", []) if functional(exc)] for product in products: if not isinstance(product.get("allocation"), Number): raise ValueError( @@ -120,7 +117,7 @@ def sp_allocate_functional_products(db): new = copy.deepcopy(ds) production_exc = copy.deepcopy(product) - del production_exc['allocation'] + del production_exc["allocation"] new["exchanges"] = [production_exc] + [ rescale_exchange(exc, allocation) for exc in new["exchanges"] @@ -213,7 +210,7 @@ def sp_allocate_products(db): ] for product in products: product = copy.deepcopy(product) - if (allocation := product.get("allocation")): + if allocation := product.get("allocation"): if isinstance(product["allocation"], str) and "parameters" in ds: ds["parameters"] = { k.lower(): v for k, v in ds["parameters"].items() @@ -832,17 +829,63 @@ def flip_sign_on_waste(db, other): return db -def assign_only_functional_exchange_as_reference_product(db): +def set_metadata_using_single_functional_exchange( + db: List[dict], missing_value: str = "(unknown)" +) -> List[dict]: """ - Assign only functional exchange as reference product. + Set `name`, `unit`, `production amount`, and `reference product` from the functional exchange. - Written initially for SimaPro imports, it overrides the process `names`. + Does not do anything unless these conditions are met: - If there is not `reference product`, and there is a single `functional` exchange, use that to set: + * There is only one functional exchange + * The keys or nor present, or are set to `missing_value` - * 'name' - name of reference product - * 'unit' - unit of reference product - * 'production amount' - amount of reference product + Parameters + ---------- + db : list + An list of dataset dictionaries. + + Returns + ------- + + The modified database list of dataset dictionaries. + + """ + missing = lambda x, y: not x.get(y) or (x.get(y) == missing_value) + + LABELS = [ + ("name", "name"), + ("reference product", "name"), + ("unit", "unit"), + ("production amount", "amount"), + ] + + for ds in db: + functional_edges = [x for x in ds.get("exchanges", []) if x.get("functional")] + if len(functional_edges) != 1: + continue + functional = functional_edges[0] + + for label1, label2 in LABELS: + if missing(ds, label1): + ds[label1] = functional.get(label2, missing_value) + return db + + +def override_process_name_using_single_functional_exchange( + db: List[dict], missing_value: str = "(unknown)" +) -> List[dict]: + """ + Set process dataset `name` from the single functional exchange. + + SimaPro exports *can* include process names, but as the manual states: + + "Under the Documentation tab, you can enter the process name. Please note that this is only for + your own reference and this name is not used anywhere. Processes are identified by the name + defined under the Input/Output tab in the product section. Therefore, if you want to search for a + certain process, you should use the product name defined in the Input/Output as the keyword." + + We therefore need to set the name to the same term being used as inputs elsewhere. Parameters ---------- @@ -856,12 +899,10 @@ def assign_only_functional_exchange_as_reference_product(db): """ for ds in db: - if ds.get("reference product"): + functional_edges = [x for x in ds.get("exchanges", []) if x.get("functional")] + if len(functional_edges) != 1: continue - functional = [x for x in ds.get("exchanges", []) if x.get("functional")] - if len(functional) == 1: - product = functional[0] - ds["name"] = ds["reference product"] = product["name"] - ds["production amount"] = product["amount"] - ds["unit"] = product.get("unit") or ds.get("unit") + if functional_edges[0].get("name") in (None, missing_value): + continue + ds["name"] = functional_edges[0]["name"] return db diff --git a/pyproject.toml b/pyproject.toml index 155f6e03..431837b4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,8 +39,10 @@ dependencies = [ "bw2parameters>=1.1.0", "bw_migrations>=0.2", "bw_processing>=0.8.5", + "bw_simapro_csv>=0.1.9", "lxml", "mrio_common_metadata", + "multifunctional>=0.5", "numpy", "openpyxl", "platformdirs", diff --git a/tests/strategies/simapro.py b/tests/strategies/simapro.py index 2178a604..b8386552 100644 --- a/tests/strategies/simapro.py +++ b/tests/strategies/simapro.py @@ -202,3 +202,104 @@ def test_fix_zero_allocation_products(): }, ] assert fix_zero_allocation_products(given) == expected + + +def test_set_metadata_using_single_functional_exchange(): + given = [ + { + "exchanges": [ + {"functional": True, "amount": 42, "name": "foo", "unit": "kg"} + ], + }, + { + "exchanges": [{"functional": True, "amount": 42}], + }, + { + "exchanges": [ + {"functional": True, "amount": 42, "name": "foo", "unit": "kg"} + ], + "name": "(unknown)", + "reference product": "(unknown)", + "unit": "(unknown)", + }, + { + "exchanges": [ + {"functional": True, "amount": 42, "name": "foo", "unit": "kg"} + ], + "name": "a", + "reference product": "b", + "unit": "c", + "production amount": 7, + }, + {"exchanges": [{"functional": True}, {"functional": True}]}, + ] + expected = [ + { + "exchanges": [ + {"functional": True, "amount": 42, "name": "foo", "unit": "kg"} + ], + "name": "foo", + "reference product": "foo", + "unit": "kg", + "production amount": 42, + }, + { + "exchanges": [{"functional": True, "amount": 42}], + "production amount": 42, + "name": "(unknown)", + "reference product": "(unknown)", + "unit": "(unknown)", + }, + { + "exchanges": [ + {"functional": True, "amount": 42, "name": "foo", "unit": "kg"} + ], + "name": "foo", + "reference product": "foo", + "unit": "kg", + "production amount": 42, + }, + { + "exchanges": [ + {"functional": True, "amount": 42, "name": "foo", "unit": "kg"} + ], + "name": "a", + "reference product": "b", + "unit": "c", + "production amount": 7, + }, + {"exchanges": [{"functional": True}, {"functional": True}]}, + ] + assert set_metadata_using_single_functional_exchange(given) == expected + + +def test_override_process_name_using_single_functional_exchange(): + given = [ + { + "name": "replace me", + "exchanges": [{"functional": True, "name": "foo"}], + }, + { + "name": "replace me", + "exchanges": [{"functional": True}], + }, + { + "name": "replace me", + "exchanges": [{"functional": True, "name": "(unknown)"}], + }, + ] + expected = [ + { + "name": "foo", + "exchanges": [{"functional": True, "name": "foo"}], + }, + { + "name": "replace me", + "exchanges": [{"functional": True}], + }, + { + "name": "replace me", + "exchanges": [{"functional": True, "name": "(unknown)"}], + }, + ] + assert override_process_name_using_single_functional_exchange(given) == expected diff --git a/tests/strategies/simapro_allocation.py b/tests/strategies/simapro_allocation.py index 4d704695..d84cfe56 100644 --- a/tests/strategies/simapro_allocation.py +++ b/tests/strategies/simapro_allocation.py @@ -113,18 +113,18 @@ def test_sp_allocate_functional_products(): "name": "Biowaste", "unit": "kg", "comment": "Manure", - "amount": 6., + "amount": 6.0, "uncertainty type": 0, - "loc": 6., + "loc": 6.0, "type": "production", "functional": False, }, { "name": "Quack", "unit": "p", - "amount": 3., + "amount": 3.0, "uncertainty type": 0, - "loc": 3., + "loc": 3.0, "type": "technosphere", }, ], @@ -147,18 +147,18 @@ def test_sp_allocate_functional_products(): "name": "Biowaste", "unit": "kg", "comment": "Manure", - "amount": 4., + "amount": 4.0, "uncertainty type": 0, - "loc": 4., + "loc": 4.0, "type": "production", "functional": False, }, { "name": "Quack", "unit": "p", - "amount": 2., + "amount": 2.0, "uncertainty type": 0, - "loc": 2., + "loc": 2.0, "type": "technosphere", }, ],