Skip to content

Commit

Permalink
Fix #284
Browse files Browse the repository at this point in the history
  • Loading branch information
cmutel committed Nov 27, 2024
1 parent 14619ec commit 388cf86
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 14 deletions.
30 changes: 22 additions & 8 deletions bw2io/extractors/ecospold2.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import math
import multiprocessing
import os
from pathlib import Path

from lxml import objectify
from stats_arrays.distributions import (
Expand Down Expand Up @@ -66,7 +67,7 @@ def getattr2(obj, attr):

class Ecospold2DataExtractor(object):
@classmethod
def extract_technosphere_metadata(cls, dirpath):
def extract_technosphere_metadata(cls, dirpath: Path):
"""
Extract technosphere metadata from ecospold2 directory.
Expand All @@ -82,15 +83,27 @@ def extract_technosphere_metadata(cls, dirpath):
"""

def extract_metadata(o):
return {"name": o.name.text, "unit": o.unitName.text, "id": o.get("id")}
dct = {"name": o.name.text, "unit": o.unitName.text, "id": o.get("id")}
if hasattr(o, "productInformation"):
dct["product_information"] = " ".join(
[child.text for child in o.productInformation.iterchildren()]
)
else:
dct["product_information"] = ""
return dct

fp = os.path.join(dirpath, "IntermediateExchanges.xml")
assert os.path.exists(fp), "Can't find IntermediateExchanges.xml"
fp = dirpath / "IntermediateExchanges.xml"
assert fp.exists(), "Can't find IntermediateExchanges.xml"
root = objectify.parse(open(fp, encoding="utf-8")).getroot()
return [extract_metadata(ds) for ds in root.iterchildren()]

@classmethod
def extract(cls, dirpath, db_name, use_mp=True):
def extract(
cls,
dirpath: Path,
db_name: str,
use_mp: bool = True,
):
"""
Extract data from all ecospold2 files in a directory.
Expand All @@ -114,15 +127,16 @@ def extract(cls, dirpath, db_name, use_mp=True):
If no .spold files are found in the directory.
"""
assert os.path.exists(dirpath)
if os.path.isdir(dirpath):
dirpath = Path(dirpath)
assert dirpath.exists()
if dirpath.is_dir():
filelist = [
filename
for filename in os.listdir(dirpath)
if os.path.isfile(os.path.join(dirpath, filename))
and filename.split(".")[-1].lower() == "spold"
]
elif os.path.isfile(dirpath):
elif dirpath.is_file():
filelist = [dirpath]
else:
raise OSError("Can't understand path {}".format(dirpath))
Expand Down
30 changes: 25 additions & 5 deletions bw2io/importers/ecospold2.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Any, Optional

from bw2data import Database, config
from bw2data.logs import stdout_feedback_logger

from ..errors import MultiprocessingError
from ..extractors import Ecospold2DataExtractor
Expand Down Expand Up @@ -57,6 +58,7 @@ def __init__(
use_mp: bool = True,
signal: Any = None,
reparametrize_lognormals: bool = False,
add_product_information: bool = True,
):
"""
Initializes the SingleOutputEcospold2Importer class instance.
Expand All @@ -79,12 +81,15 @@ def __init__(
Flag to indicate if lognormal distributions for exchanges should be reparametrized
such that the mean value of the resulting distribution meets the amount
defined for the exchange.
add_product_information: bool
Add the `productInformation` text from `MasterData/IntermediateExchanges.xml` to
`product_information`.
"""

self.dirpath = dirpath
self.dirpath = Path(dirpath)

if not Path(dirpath).is_dir():
raise ValueError(f"`dirpath` value was not a directory: {dirpath}")
if not self.dirpath.is_dir():
raise ValueError(f"`dirpath` value was not a directory: {self.dirpath}")

self.db_name = db_name
self.signal = signal
Expand Down Expand Up @@ -125,13 +130,28 @@ def __init__(

start = time()
try:
self.data = extractor.extract(dirpath, db_name, use_mp=use_mp)
self.data = extractor.extract(self.dirpath, db_name, use_mp=use_mp)
except RuntimeError as e:
raise MultiprocessingError(
"Multiprocessing error; re-run using `use_mp=False`"
).with_traceback(e.__traceback__)
print(
stdout_feedback_logger.info(
"Extracted {} datasets in {:.2f} seconds".format(
len(self.data), time() - start
)
)
if add_product_information:
tm_dirpath = self.dirpath.parent / "MasterData"
if not tm_dirpath.is_dir():
stdout_feedback_logger.warning(
"Skipping product information as `MasterData` directory not found"
)
else:
technosphere_metadata = {
obj["id"]: obj["product_information"]
for obj in extractor.extract_technosphere_metadata(tm_dirpath)
}
for ds in self.data:
ds["product_information"] = technosphere_metadata[
ds["filename"].replace(".spold", "").split("_")[1]
]
3 changes: 2 additions & 1 deletion tests/ecospold2/ecospold2_importer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
from pathlib import Path

import pytest
from bw2data import Database
Expand All @@ -24,7 +25,7 @@ def extract(self, *args, **kwargs):

imp = SingleOutputEcospold2Importer(FIXTURES, "ei", extractor=ext)
assert imp.data == []
assert ext.data == [(FIXTURES, "ei")]
assert ext.data == [(Path(FIXTURES), "ei")]


@bw2test
Expand Down

0 comments on commit 388cf86

Please sign in to comment.