Skip to content

Commit

Permalink
Add create_randonneur_excel_template_for_unlinked
Browse files Browse the repository at this point in the history
  • Loading branch information
cmutel committed Sep 4, 2024
1 parent 7b1f074 commit a9e9967
Showing 1 changed file with 94 additions and 7 deletions.
101 changes: 94 additions & 7 deletions bw2io/importers/base_lci.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
import functools
import itertools
import warnings
from typing import Optional, Tuple, Callable, List
from typing import Optional, Tuple, Callable, List, Set, Union
from pathlib import Path

from bw2data import Database, config, databases, labels, parameters, get_node
from bw2data import Database, config, databases, labels, parameters, get_node, projects
from bw2data.data_store import ProcessedDataStore
from bw2data.parameters import (
ActivityParameter,
Expand All @@ -15,6 +15,7 @@
)
from bw2data.errors import UnknownObject
import randonneur as rn
import randonneur_data as rd

from ..errors import NonuniqueCode, StrategyError, WrongDatabase
from ..export.excel import write_lci_matching
Expand Down Expand Up @@ -48,6 +49,7 @@
"uncertainty type",
"uncertainty_type",
)
DEFAULT_TARGET_FIELDS = ("name", "location", "reference product", "unit")


def _reformat_biosphere_exc_as_new_node(exc: dict, db_name: str) -> dict:
Expand Down Expand Up @@ -545,13 +547,15 @@ def get_key(
get_key(exc, ffields) not in mapping
and get_key(exc, ffields, False) in existence
):
new_data = _reformat_biosphere_exc_as_new_node(exc, placeholder_db_name)
new_data = _reformat_biosphere_exc_as_new_node(
exc, placeholder_db_name
)
try:
node = get_node(database=new_data['database'], code=new_data['code'])
except UnknownObject:
node = placeholder.new_node(
**new_data
node = get_node(
database=new_data["database"], code=new_data["code"]
)
except UnknownObject:
node = placeholder.new_node(**new_data)
node.save()
exc["input"] = node.key

Expand Down Expand Up @@ -708,3 +712,86 @@ def add_unlinked_activities(self):
act["database"] = self.db_name
self.data.extend(new_activities)
self.apply_strategy(functools.partial(link_iterable_by_fields, other=self.data))

def all_source_fields_in_unlinked_data(self) -> Set[str]:
"""Return set of all field labels (dict keys) in unlinked edges."""
found = set()

for exc in self.unlinked:
found.update(set(exc))

return found

# def read_randonneur_excel_template(self, filepath: Path, add_to_registry: Union[rd.Registry, bool] = False,)

def create_randonneur_excel_template_for_unlinked(
self,
target_fields: List[str] = DEFAULT_TARGET_FIELDS,
source_fields: Optional[List[str]] = None,
edge_filter: Optional[Callable] = None,
filename: Optional[str] = None,
output_dir: Optional[Path] = None,
replace_existing: bool = False,
) -> Path:
"""
Create Excel template with source data in the `randonneur` format for unlinked exchanges.
Intended to be used with `read_randonneur_excel_template` to create a migration file, which
can then be applied to resolve unlinked data.
Should *only* use string values - no conversion for numbers, booleans, etc. if made in
either direction.
`target_fields` is a list of labels for the target fields, which must be filled by the
practitioner. Defaults to `["name", "location", "reference product", "unit"]`
`source_fields` is a list of string labels to include when defining the matchings. Defaults
to all available fields except for fields in `EXCHANGE_SPECIFIC_KEYS`. Use
`.all_source_fields_in_unlinked_data()` to get a list of fields to select from.
`edge_filter`: Optional function to reduce the number of unlinked edges to write to the
template. Takes the unlinked edge as input argument.
`output_dir`: Where to write the template file. Defaults to `bw2data.projects.output_dir`.
Returns the `Path` of the created file.
"""
if not source_fields:
source_fields = self.all_source_fields_in_unlinked_data().difference(
set(EXCHANGE_SPECIFIC_KEYS)
)

if edge_filter is None:
edge_filter = lambda x: True

if not filename:
filename = f"randonneur-matching-template-{self.db_name}.xlsx"
filepath = Path(output_dir or projects.output_dir) / filename
if not filepath.suffix.lower() == ".xlsx":
filepath = filepath.with_suffix(".xlsx")

data = [
{
"source": {key: obj.get(key) for key in sorted(source_fields)},
"target": {key: "" for key in sorted(target_fields)},
}
for obj in self.unlinked
if edge_filter(obj)
]

# Need to check uniqueness again as the set of fields we consider is not necessarily
# the same as in `.unlinked`.
data_as_set = {
(tuple(ds["source"].values()), tuple(ds["target"].values())) for ds in data
}
data = [
{
"source": dict(zip(source_fields, a)),
"target": dict(zip(source_fields, b)),
}
for a, b in sorted(data_as_set)
]

return rn.create_excel_template(
data=data, filepath=filepath, replace_existing=replace_existing
)

0 comments on commit a9e9967

Please sign in to comment.