From 8cf3b68bbc423b2302fba299c92bb645b01e1efe Mon Sep 17 00:00:00 2001 From: Rubel Date: Fri, 20 Dec 2024 13:05:29 +0100 Subject: [PATCH] PR review comments. --- src/nomad_measurements/utils.py | 106 +++++++++++++++++++++------ src/nomad_measurements/xrd/nx.py | 75 ------------------- src/nomad_measurements/xrd/schema.py | 2 +- 3 files changed, 84 insertions(+), 99 deletions(-) diff --git a/src/nomad_measurements/utils.py b/src/nomad_measurements/utils.py index fd88ac5..f6f283e 100644 --- a/src/nomad_measurements/utils.py +++ b/src/nomad_measurements/utils.py @@ -16,6 +16,7 @@ # limitations under the License. # import collections +import copy import os.path import re from typing import ( @@ -25,7 +26,6 @@ ) import h5py -import copy import numpy as np import pint from nomad.datamodel.hdf5 import HDF5Reference @@ -38,7 +38,7 @@ from pynxtools.dataconverter.template import Template from pynxtools.dataconverter.writer import Writer as pynxtools_writer -from nomad_measurements.xrd.nx import populate_nx_dataset_and_attribute +from nomad_measurements.xrd.nx import CONCEPT_MAP if TYPE_CHECKING: from nomad.datamodel.data import ( @@ -347,9 +347,12 @@ def write_file(self): except Exception as e: self.nexus = False self.logger.warning( - f'Encountered "{e}" error while creating nexus file. ' - 'Creating h5 file instead.' + f"""NeXusFileGenerationError: Encountered '{e}' error while creating + nexus file. Creating h5 file instead.""" ) + if self.archive.m_context.raw_path_exists(self.data_file): + os.remove(os.path.join(self.archive.m_context.raw_path(), + self.data_file)) self._write_hdf5_file() else: self._write_hdf5_file() @@ -368,8 +371,8 @@ def _write_nx_file(self): generate_template_from_nxdl(nxdl_root, template) attr_dict = {} dataset_dict = {} - populate_nx_dataset_and_attribute( - archive=self.archive, attr_dict=attr_dict, dataset_dict=dataset_dict + self.populate_nx_dataset_and_attribute( + attr_dict=attr_dict, dataset_dict=dataset_dict ) for nx_path, dset_ori in list(self._hdf5_datasets.items()) + list( dataset_dict.items() @@ -394,7 +397,6 @@ def _write_nx_file(self): for nx_path, attr_d in list(self._hdf5_attributes.items()) + list( attr_dict.items() ): - # hdf5_path = self._remove_nexus_annotations(nx_path) for attr_k, attr_v in attr_d.items(): if attr_v != 'dimensionless' and attr_v: try: @@ -405,24 +407,18 @@ def _write_nx_file(self): nx_full_file_path = os.path.join( self.archive.m_context.raw_path(), self.data_file ) - try: - if self.archive.m_context.raw_path_exists(self.data_file): - os.remove(nx_full_file_path) - - pynxtools_writer( - data=template, nxdl_f_path=nxdl_f_path, output_path=nx_full_file_path - ).write() - entry_list = Entry.objects( - upload_id=self.archive.m_context.upload_id, mainfile=self.data_file - ) - if not entry_list: - self.archive.m_context.process_updated_raw_file(self.data_file) + if self.archive.m_context.raw_path_exists(self.data_file): + os.remove(nx_full_file_path) + pynxtools_writer( + data=template, nxdl_f_path=nxdl_f_path, output_path=nx_full_file_path + ).write() - except NXFileGenerationError as exc: - if os.path.exists(nx_full_file_path): - os.remove(nx_full_file_path) - raise NXFileGenerationError('NeXus file can not be generated.') from exc + entry_list = Entry.objects( + upload_id=self.archive.m_context.upload_id, mainfile=self.data_file + ) + if not entry_list: + self.archive.m_context.process_updated_raw_file(self.data_file) def _write_hdf5_file(self): # noqa: PLR0912 """ @@ -490,6 +486,70 @@ def _write_hdf5_file(self): # noqa: PLR0912 self._hdf5_datasets = collections.OrderedDict() self._hdf5_attributes = collections.OrderedDict() + @staticmethod + def walk_through_object(parent_obj, attr_chain): + """ + Walk though the object until reach the leaf. + + Args: + parent_obj: This is a python obj. + e.g.Arvhive + attr_chain: Dot separated obj chain. + e.g. 'archive.data.xrd_settings.source.xray_tube_material' + default: A value to be returned by default, if not data is found. + """ + if parent_obj is None: + return parent_obj + + if isinstance(attr_chain, str) and attr_chain.startswith('archive.'): + parts = attr_chain.split('.') + child_obj = None + for part in parts[1:]: + child_nm = part + if '[' in child_nm: + child_nm, index = child_nm.split('[') + index = int(index[:-1]) + # section always exists + child_obj = getattr(parent_obj, child_nm)[index] + else: + child_obj = getattr(parent_obj, child_nm, None) + if child_obj is None: + return None + parent_obj = child_obj + + return child_obj + + def populate_nx_dataset_and_attribute( + self, attr_dict: dict, dataset_dict: dict + ): + """Construct datasets and attributes for nexus and populate.""" + + concept_map = copy.deepcopy(CONCEPT_MAP) + for nx_path, arch_path in concept_map.items(): + if arch_path.startswith('archive.'): + data = self.walk_through_object(self.archive, arch_path) + else: + data = arch_path # default value + + dataset = DatasetModel( + data=data, + ) + + if ( + isinstance(data, pint.Quantity) + and str(data.units) != 'dimensionless' + and str(data.units) + ): + attr_tmp = {nx_path: dict(units=str(data.units))} + attr_dict |= attr_tmp + dataset.data = data.magnitude + + l_part, r_part = nx_path.split('/', 1) + if r_part.startswith('@'): + attr_dict[l_part] = {r_part.replace('@', ''): data} + else: + dataset_dict[nx_path] = dataset + @staticmethod def _remove_nexus_annotations(path: str) -> str: """ diff --git a/src/nomad_measurements/xrd/nx.py b/src/nomad_measurements/xrd/nx.py index b519ea9..201f423 100644 --- a/src/nomad_measurements/xrd/nx.py +++ b/src/nomad_measurements/xrd/nx.py @@ -16,16 +16,6 @@ # limitations under the License. # -from typing import TYPE_CHECKING, Any, Optional -import pint - -import copy -from pydantic import BaseModel, Field - -if TYPE_CHECKING: - from nomad.datamodel.datamodel import EntryArchive - - NEXUS_DATASET_PATHS = [ '/ENTRY[entry]/experiment_result/intensity', '/ENTRY[entry]/experiment_result/two_theta', @@ -65,68 +55,3 @@ '/ENTRY[entry]/INSTRUMENT[instrument]/SOURCE[source]/kbeta': 'archive.data.xrd_settings.source.kbeta', } - -def walk_through_object(parent_obj, attr_chain): - """ - Walk though the object until reach the leaf. - - Args: - parent_obj: This is a python obj. - e.g.Arvhive - attr_chain: Dot separated obj chain. - e.g. 'archive.data.xrd_settings.source.xray_tube_material' - default: A value to be returned by default, if not data is found. - """ - if parent_obj is None: - return parent_obj - - if isinstance(attr_chain, str) and attr_chain.startswith('archive.'): - parts = attr_chain.split('.') - child_obj = None - for part in parts[1:]: - child_nm = part - if '[' in child_nm: - child_nm, index = child_nm.split('[') - index = int(index[:-1]) - # section always exists - child_obj = getattr(parent_obj, child_nm)[index] - else: - child_obj = getattr(parent_obj, child_nm, None) - if child_obj is None: - return None - parent_obj = child_obj - - return child_obj - - -def populate_nx_dataset_and_attribute( - archive: 'EntryArchive', attr_dict: dict, dataset_dict: dict -): - """Construct datasets and attributes for nexus and populate.""" - from nomad_measurements.utils import DatasetModel - - concept_map = copy.deepcopy(CONCEPT_MAP) - for nx_path, arch_path in concept_map.items(): - if arch_path.startswith('archive.'): - data = walk_through_object(archive, arch_path) - else: - data = arch_path # default value - - dataset = DatasetModel( - data=data, - ) - - if ( - isinstance(data, pint.Quantity) - and str(data.units) != 'dimensionless' - and str(data.units) - ): - attr_tmp = {nx_path: dict(units=str(data.units))} - attr_dict |= attr_tmp - dataset.data = data.magnitude - - l_part, r_part = nx_path.split('/', 1) - if r_part.startswith('@'): - attr_dict[l_part] = {r_part.replace('@', ''): data} - else: - dataset_dict[nx_path] = dataset diff --git a/src/nomad_measurements/xrd/schema.py b/src/nomad_measurements/xrd/schema.py index 42465da..870832c 100644 --- a/src/nomad_measurements/xrd/schema.py +++ b/src/nomad_measurements/xrd/schema.py @@ -1142,7 +1142,7 @@ class ELNXRayDiffraction(XRayDiffraction, EntryData): nexus_results = Quantity( type=ArchiveSection, description='Reference to the NeXus entry.', - a_eln = ELNAnnotation(component='ReferenceEditQuantity') + a_eln = ELNAnnotation(component=ELNComponentEnum.ReferenceEditQuantity) ) def get_read_write_functions(self) -> tuple[Callable, Callable]: