diff --git a/bw2io/importers/base_lci.py b/bw2io/importers/base_lci.py index 178bdc3..fb45b1a 100644 --- a/bw2io/importers/base_lci.py +++ b/bw2io/importers/base_lci.py @@ -2,10 +2,10 @@ import functools import itertools import warnings -from typing import Optional, Tuple, Callable, List +from typing import Optional, Tuple, Callable, List, Set, Union from pathlib import Path -from bw2data import Database, config, databases, labels, parameters, get_node +from bw2data import Database, config, databases, labels, parameters, get_node, projects from bw2data.data_store import ProcessedDataStore from bw2data.parameters import ( ActivityParameter, @@ -15,6 +15,7 @@ ) from bw2data.errors import UnknownObject import randonneur as rn +import randonneur_data as rd from ..errors import NonuniqueCode, StrategyError, WrongDatabase from ..export.excel import write_lci_matching @@ -48,6 +49,7 @@ "uncertainty type", "uncertainty_type", ) +DEFAULT_TARGET_FIELDS = ("name", "location", "reference product", "unit") def _reformat_biosphere_exc_as_new_node(exc: dict, db_name: str) -> dict: @@ -545,13 +547,15 @@ def get_key( get_key(exc, ffields) not in mapping and get_key(exc, ffields, False) in existence ): - new_data = _reformat_biosphere_exc_as_new_node(exc, placeholder_db_name) + new_data = _reformat_biosphere_exc_as_new_node( + exc, placeholder_db_name + ) try: - node = get_node(database=new_data['database'], code=new_data['code']) - except UnknownObject: - node = placeholder.new_node( - **new_data + node = get_node( + database=new_data["database"], code=new_data["code"] ) + except UnknownObject: + node = placeholder.new_node(**new_data) node.save() exc["input"] = node.key @@ -708,3 +712,86 @@ def add_unlinked_activities(self): act["database"] = self.db_name self.data.extend(new_activities) self.apply_strategy(functools.partial(link_iterable_by_fields, other=self.data)) + + def all_source_fields_in_unlinked_data(self) -> Set[str]: + """Return set of all field labels (dict keys) in unlinked edges.""" + found = set() + + for exc in self.unlinked: + found.update(set(exc)) + + return found + + # def read_randonneur_excel_template(self, filepath: Path, add_to_registry: Union[rd.Registry, bool] = False,) + + def create_randonneur_excel_template_for_unlinked( + self, + target_fields: List[str] = DEFAULT_TARGET_FIELDS, + source_fields: Optional[List[str]] = None, + edge_filter: Optional[Callable] = None, + filename: Optional[str] = None, + output_dir: Optional[Path] = None, + replace_existing: bool = False, + ) -> Path: + """ + Create Excel template with source data in the `randonneur` format for unlinked exchanges. + + Intended to be used with `read_randonneur_excel_template` to create a migration file, which + can then be applied to resolve unlinked data. + + Should *only* use string values - no conversion for numbers, booleans, etc. if made in + either direction. + + `target_fields` is a list of labels for the target fields, which must be filled by the + practitioner. Defaults to `["name", "location", "reference product", "unit"]` + + `source_fields` is a list of string labels to include when defining the matchings. Defaults + to all available fields except for fields in `EXCHANGE_SPECIFIC_KEYS`. Use + `.all_source_fields_in_unlinked_data()` to get a list of fields to select from. + + `edge_filter`: Optional function to reduce the number of unlinked edges to write to the + template. Takes the unlinked edge as input argument. + + `output_dir`: Where to write the template file. Defaults to `bw2data.projects.output_dir`. + + Returns the `Path` of the created file. + """ + if not source_fields: + source_fields = self.all_source_fields_in_unlinked_data().difference( + set(EXCHANGE_SPECIFIC_KEYS) + ) + + if edge_filter is None: + edge_filter = lambda x: True + + if not filename: + filename = f"randonneur-matching-template-{self.db_name}.xlsx" + filepath = Path(output_dir or projects.output_dir) / filename + if not filepath.suffix.lower() == ".xlsx": + filepath = filepath.with_suffix(".xlsx") + + data = [ + { + "source": {key: obj.get(key) for key in sorted(source_fields)}, + "target": {key: "" for key in sorted(target_fields)}, + } + for obj in self.unlinked + if edge_filter(obj) + ] + + # Need to check uniqueness again as the set of fields we consider is not necessarily + # the same as in `.unlinked`. + data_as_set = { + (tuple(ds["source"].values()), tuple(ds["target"].values())) for ds in data + } + data = [ + { + "source": dict(zip(source_fields, a)), + "target": dict(zip(source_fields, b)), + } + for a, b in sorted(data_as_set) + ] + + return rn.create_excel_template( + data=data, filepath=filepath, replace_existing=replace_existing + )