diff --git a/emmet-core/emmet/core/qc_tasks.py b/emmet-core/emmet/core/qc_tasks.py new file mode 100644 index 0000000000..88150a942b --- /dev/null +++ b/emmet-core/emmet/core/qc_tasks.py @@ -0,0 +1,597 @@ +# mypy: ignore-errors + +""" Core definition of a Q-Chem Task Document """ +from typing import Any, Dict, List, Optional +import logging +import os +import re +from collections import OrderedDict +from pydantic import BaseModel, Field +from custodian.qchem.jobs import QCJob +from pymatgen.core.structure import Molecule +from pymatgen.io.qchem.inputs import QCInput +from monty.serialization import loadfn +from typing import Type, TypeVar, Union +from emmet.core.structure import MoleculeMetadata +from pathlib import Path +from emmet.core.qchem.calc_types import ( + LevelOfTheory, + CalcType, + TaskType, +) +from emmet.core.qchem.calculation import Calculation, CalculationInput + +from emmet.core.qchem.task import QChemStatus + + +__author__ = ( + "Evan Spotte-Smith , Rishabh D. Guha " +) + +logger = logging.getLogger(__name__) +_T = TypeVar("_T", bound="TaskDoc") +# _DERIVATIVE_FILES = ("GRAD", "HESS") + + +class OutputDoc(BaseModel): + initial_molecule: Molecule = Field(None, description="Input Molecule object") + optimized_molecule: Optional[Molecule] = Field( + None, description="Optimized Molecule object" + ) + + # TODO: Discuss with Evan if these go here + # species_hash: str = Field( + # None, + # description="Weisfeiler Lehman (WL) graph hash using the atom species as the graph node attribute.", + # ) + # coord_hash: str = Field( + # None, + # description="Weisfeiler Lehman (WL) graph hash using the atom coordinates as the graph node attribute.", + # ) + + # last_updated: datetime = Field( + # None, + # description = "Timestamp for the most recent calculation for this QChem task document", + # ) + + final_energy: float = Field( + None, description="Final electronic energy for the calculation (units: Hartree)" + ) + enthalpy: Optional[float] = Field( + None, description="Total enthalpy of the molecule (units: kcal/mol)" + ) + entropy: Optional[float] = Field( + None, description="Total entropy of the molecule (units: cal/mol-K" + ) + dipoles: Optional[Dict[str, Any]] = Field( + None, description="Dipolar information from the output" + ) + mulliken: Optional[List[Any]] = Field( + None, description="Mulliken atomic partial charges and partial spins" + ) + resp: Optional[Union[List[float], List[Any]]] = Field( + None, + description="Restrained Electrostatic Potential (RESP) atomic partial charges", + ) + nbo: Optional[Dict[str, Any]] = Field( + None, description="Natural Bonding Orbital (NBO) output" + ) + + frequency_modes: Optional[Union[List, str]] = Field( + None, + description="The list of calculated frequency mode vectors if job type is freq (units: cm^-1)", + ) + + @classmethod + def from_qchem_calc_doc(cls, calc_doc: Calculation) -> "OutputDoc": + """ + Create a summary of QChem calculation outputs from a QChem calculation document. + + Parameters + ---------- + calc_doc + A QChem calculation document. + kwargs + Any other additional keyword arguments + + Returns + -------- + OutputDoc + The calculation output summary + """ + return cls( + initial_molecule=calc_doc.input.initial_molecule, + optimized_molecule=calc_doc.output.optimized_molecule, + # species_hash = self.species_hash, #The three entries post this needs to be checked again + # coord_hash = self.coord_hash, + # last_updated = self.last_updated, + final_energy=calc_doc.output.final_energy, + dipoles=calc_doc.output.dipoles, + enthalpy=calc_doc.output.enthalpy, + entropy=calc_doc.output.entropy, + mulliken=calc_doc.output.mulliken, + resp=calc_doc.output.resp, + nbo=calc_doc.output.nbo_data, + frequencies=calc_doc.output.frequencies, + ) + + +class InputDoc(BaseModel): + initial_molecule: Molecule = Field( + None, + title="Input Structure", + description="Input molecule and calc details for the QChem calculation", + ) + + prev_rem_params: Optional[Dict[str, Any]] = Field( + None, + description="Parameters from a previous qchem calculation in the series", + ) + + rem: Dict[str, Any] = Field( + None, + description="Parameters from the rem section of the current QChem calculation", + ) + + level_of_theory: Optional[Union[str, LevelOfTheory]] = Field( + None, description="Level of theory used in the qchem calculation" + ) + + task_type: Optional[Union[str, TaskType]] = Field( + None, + description="The type of the QChem calculation : optimization, single point ... etc.", + ) + + tags: Union[List[str], None] = Field( + [], title="tag", description="Metadata tagged to a given task." + ) + + solvation_lot_info: Optional[Union[Dict[str, Any], str]] = Field( + None, + description="Str or Dict representation of the solvent method used for the calculation", + ) + + special_run_type: Optional[str] = Field( + None, description="Special workflow name (if applicable)" + ) + + smiles: Optional[str] = Field( + None, + description="Simplified molecular-input line-entry system (SMILES) string for the molecule involved " + "in this calculation.", + ) + + calc_type: Optional[Union[str, CalcType]] = Field( + None, + description="A combined dictionary representation of the task type along with the level of theory used", + ) + + @classmethod + def from_qchem_calc_doc(cls, calc_doc: Calculation) -> "InputDoc": + """ + Create qchem calculation input summary from a qchem calculation document. + + Parameters + ---------- + calc_doc + A QChem calculation document. + + Returns + -------- + InputDoc + A summary of the input molecule and corresponding calculation parameters + """ + # TODO : modify this to get the different variables from the task doc. + return cls( + initial_molecule=calc_doc.input.initial_molecule, + rem=calc_doc.input.rem, + level_of_theory=calc_doc.level_of_theory.value, + task_type=calc_doc.task_type.value, + tags=calc_doc.input.tags, + solvation_lot_info=calc_doc.solvation_lot_info, + # special_run_type = calc_doc.input.special_run_type, + # smiles = calc_doc.input.smiles, + calc_type=calc_doc.calc_type.value, + ) + + +class CustodianDoc(BaseModel): + corrections: Optional[List[Any]] = Field( + None, + title="Custodian Corrections", + description="List of custodian correction data for calculation.", + ) + + job: Optional[Union[Dict[str, Any], QCJob]] = Field( + None, + title="Custodian Job Data", + description="Job data logged by custodian.", + ) + + +# AnalysisDoc? Is there a scope for AnalysisDoc in QChem? + + +class TaskDoc(MoleculeMetadata): + """ + Calculation-level details about QChem calculations that would eventually take over the TaskDocument implementation + """ + + dir_name: Optional[Union[str, Path]] = Field( + None, description="The directory for this QChem task" + ) + + state: Optional[QChemStatus] = Field( + None, description="State of this QChem calculation" + ) + + calcs_reversed: Optional[List[Calculation]] = Field( + None, + title="Calcs reversed data", + description="Detailed data for each QChem calculation contributing to the task document.", + ) + + task_type: Optional[Union[CalcType, TaskType]] = Field( + None, description="the type of QChem calculation" + ) + + orig_inputs: Optional[Union[CalculationInput, Dict[str, Any]]] = Field( + {}, description="Summary of the original Q-Chem inputs" + ) + + input: Optional[InputDoc] = Field( + None, + description="The input molecule and calc parameters used to generate the current task document.", + ) + + output: Optional[OutputDoc] = Field( + None, + description="The exact set of output parameters used to generate the current task document.", + ) + + # TODO: Implement entry dict + + custodian: Optional[List[CustodianDoc]] = Field( + None, + title="Calcs reversed data", + description="Detailed custodian data for each QChem calculation contributing to the task document.", + ) + + critic2: Optional[Dict[str, Any]] = Field( + None, description="Outputs from the critic2 calculation if performed" + ) + + custom_smd: Optional[Union[str, Dict[str, Any]]] = Field( + None, + description="The seven solvent parameters necessary to define a custom_smd model", + ) + + additional_fields: Optional[Dict[str, Any]] = Field( + None, description="Any miscellaneous fields passed to the pydantic model" + ) + + # TODO some sort of @validator s if necessary + + @classmethod + def from_directory( + cls: Type[_T], + dir_name: Union[Path, str], + store_additional_json: bool = True, + additional_fields: Dict[str, Any] = None, + **qchem_calculation_kwargs, + ) -> _T: + """ + Create a task document from a directory containing QChem files. + + Parameters + ---------- + dir_name + The path to the folder containing the calculation outputs. + store_additional_json + Whether to store additional json files in the calculation directory. + additional_fields + Dictionary of additional fields to add to output document. + **qchem_calculation_kwargs + Additional parsing options that will be passed to the + :obj:`.Calculation.from_qchem_files` function. + + Returns + ------- + QChemTaskDoc + A task document for the calculation + """ + logger.info(f"Getting task doc in: {dir_name}") + + additional_fields = {} if additional_fields is None else additional_fields + dir_name = Path(dir_name) + task_files = _find_qchem_files(dir_name) + + if len(task_files) == 0: + raise FileNotFoundError("No QChem files found!") + + critic2 = {} + custom_smd = {} + calcs_reversed = [] + for task_name, files in task_files.items(): + if task_name == "orig": + continue + else: + calc_doc = Calculation.from_qchem_files( + dir_name, task_name, **files, **qchem_calculation_kwargs + ) + calcs_reversed.append(calc_doc) + # all_qchem_objects.append(qchem_objects) + + # Lists need to be reversed so that newest calc is the first calc, all_qchem_objects are also reversed to match + calcs_reversed.reverse() + + # all_qchem_objects.reverse() + custodian = _parse_custodian(dir_name) + additional_json = None + if store_additional_json: + additional_json = _parse_additional_json(dir_name) + for key, _ in additional_json.items(): + if key == "processed_critic2": + critic2["processed"] = additional_json["processed_critic2"] + elif key == "cpreport": + critic2["cp"] = additional_json["cpreport"] + elif key == "YT": + critic2["yt"] = additional_json["yt"] + elif key == "bonding": + critic2["bonding"] = additional_json["bonding"] + elif key == "solvent_data": + custom_smd = additional_json["solvent_data"] + + orig_inputs = CalculationInput.from_qcinput(_parse_orig_inputs(dir_name)) + + dir_name = get_uri(dir_name) # convert to full path + + # only store objects from last calculation + # TODO: If vasp implementation makes this an option, change here as well + qchem_objects = None + included_objects = None + if qchem_objects: + included_objects = list(qchem_objects.keys()) + + # run_stats = _get_run_stats(calcs_reversed), Discuss whether this is something which is necessary in terms of QChem calcs + doc = cls.from_molecule( + meta_molecule=calcs_reversed[-1].input.initial_molecule, + dir_name=dir_name, + calcs_reversed=calcs_reversed, + custodian=custodian, + additional_json=additional_json, + additional_fields=additional_fields, + completed_at=calcs_reversed[0].completed_at, + orig_inputs=orig_inputs, + input=InputDoc.from_qchem_calc_doc(calcs_reversed[0]), + output=OutputDoc.from_qchem_calc_doc(calcs_reversed[0]), + state=_get_state(calcs_reversed), + qchem_objects=qchem_objects, + included_objects=included_objects, + critic2=critic2, + custom_smd=custom_smd, + task_type=calcs_reversed[0].task_type, + ) + + # doc = doc.copy(update=additional_fields) + doc = doc.model_copy(update=additional_fields) + return doc + + @staticmethod + def get_entry( + calcs_reversed: List[Calculation], task_id: Optional[str] = None + ) -> Dict: + """ + Get a computed entry from a list of QChem calculation documents. + + Parameters + ---------- + calcs_reversed + A list of QChem calculation documents in reverse order. + task_id + The job identifier + + Returns + -------- + Dict + A dict of computed entries + """ + + entry_dict = { + "entry_id": task_id, + "task_id": task_id, + "charge": calcs_reversed[0].output.molecule.charge, + "spin_multiplicity": calcs_reversed[0].output.molecule.spin_multiplicity, + "level_of_theory": calcs_reversed[-1].input.level_of_theory, + "solvent": calcs_reversed[-1].input.solv_spec, + "lot_solvent": calcs_reversed[-1].input.lot_solv_combo, + "custom_smd": calcs_reversed[-1].input.custom_smd, + "task_type": calcs_reversed[-1].input.task_spec, + "calc_type": calcs_reversed[-1].input.calc_spec, + "tags": calcs_reversed[-1].input.tags, + "molecule": calcs_reversed[0].output.molecule, + "composition": calcs_reversed[0].output.molecule.composition, + "formula": calcs_reversed[ + 0 + ].output.formula.composition.aplhabetical_formula, + "energy": calcs_reversed[0].output.final_energy, + "output": calcs_reversed[0].output.as_dict(), + "critic2": calcs_reversed[ + 0 + ].output.critic, # TODO: Unclear about orig_inputs + "last_updated": calcs_reversed[0].output.last_updated, + } + + return entry_dict + + +def get_uri(dir_name: Union[str, Path]) -> str: + """ + Return the URI path for a directory. + + This allows files hosted on different file servers to have distinct locations. + + Parameters + ---------- + dir_name : str or Path + A directory name. + + Returns + ------- + str + Full URI path, e.g., "fileserver.host.com:/full/payj/of/fir_name". + """ + import socket + + fullpath = Path(dir_name).absolute() + hostname = socket.gethostname() + try: + hostname = socket.gethostbyaddr(hostname)[0] + except (socket.gaierror, socket.herror): + pass + return f"{hostname}:{fullpath}" + + +def _parse_custodian(dir_name: Path) -> Optional[Dict]: + """ + Parse custodian.json file. + + Calculations done using custodian have a custodian.json file which tracks the makers + performed and any errors detected and fixed. + + Parameters + ---------- + dir_name + Path to calculation directory. + + Returns + -------- + Optional[Dict] + The information parsed from custodian.json file. + """ + filenames = tuple(dir_name.glob("custodian.json*")) + if len(filenames) >= 1: + return loadfn(filenames[0], cls=None) + return None + + +def _parse_orig_inputs( + dir_name: Path, +) -> Dict[str, Any]: + """ + Parse original input files. + + Calculations using custodian generate a *.orig file for the inputs. This is useful + to know how the calculation originally started. + + Parameters + ---------- + dir_name + Path to calculation directory. + + Returns + ------- + Dict[str, Any] + The original molecule, rem, solvent and other data. + """ + orig_inputs = {} + for filename in dir_name.glob("*.orig*"): + # orig_inputs = QCInput.from_file(os.path.join(dir_name, filename.pop("orig"))) + orig_inputs = QCInput.from_file(os.path.join(dir_name, filename)) + return orig_inputs + + +def _parse_additional_json(dir_name: Path) -> Dict[str, Any]: + """Parse additional json files in the directory.""" + additional_json = {} + for filename in dir_name.glob("*.json*"): + key = filename.name.split(".")[0] + if key not in ("custodian", "transformations"): + if key not in additional_json: + additional_json[key] = loadfn(filename, cls=None) + return additional_json + + +def _get_state(calcs_reversed: List[Calculation]) -> QChemStatus: + """Get state from calculation documents of QChem tasks.""" + all_calcs_completed = all( + [c.has_qchem_completed == QChemStatus.SUCCESS for c in calcs_reversed] + ) + if all_calcs_completed: + return QChemStatus.SUCCESS + return QChemStatus.FAILED + + +# def _get_run_stats(calcs_reversed: List[Calculation]) -> Dict[str, RunStatistics]: +# """Get summary of runtime statistics for each calculation in this task.""" + +# run_stats = {} +# total = dict( +# average_memory=0.0, +# max_memory=0.0, +# elapsed_time=0.0, +# system_time=0.0, +# user_time=0.0, +# total_time=0.0, +# cores=0, +# ) + + +def _find_qchem_files( + path: Union[str, Path], +) -> Dict[str, Any]: + """ + Find QChem files in a directory. + + Only the mol.qout file (or alternatively files + with the task name as an extension, e.g., mol.qout.opt_0.gz, mol.qout.freq_1.gz, or something like this...) + will be returned. + + Parameters + ---------- + path + Path to a directory to search. + + Returns + ------- + Dict[str, Any] + The filenames of the calculation outputs for each QChem task, given as a ordered dictionary of:: + + { + task_name:{ + "qchem_out_file": qcrun_filename, + }, + ... + } + If there is only 1 qout file task_name will be "standard" otherwise it will be the extension name like "opt_0" + """ + path = Path(path) + task_files = OrderedDict() + + in_file_pattern = re.compile(r"^(?Pmol\.qin(?:\..+)?)\.gz$") + + for file in path.iterdir(): + if file.is_file(): + in_match = in_file_pattern.match(file.name) + if in_match: + in_task_name = in_match.group("in_task_name").replace("mol.qin.", "") + if in_task_name == "orig": + task_files[in_task_name] = {"orig_input_file": file} + elif in_task_name == "mol.qin": + task_files["standard"] = { + "qcinput_file": file, + "qcoutput_file": Path("mol.qout.gz"), + } + else: + try: + task_files[in_task_name] = { + "qcinput_file": file, + "qcoutput_file": Path("mol.qout." + in_task_name + ".gz"), + } + except FileNotFoundError: + task_files[in_task_name] = { + "qcinput_file": file, + "qcoutput_file": "No qout files exist for this in file", + } + + return task_files diff --git a/emmet-core/emmet/core/qchem/calc_types/em_utils.py b/emmet-core/emmet/core/qchem/calc_types/em_utils.py new file mode 100644 index 0000000000..fa3afbebe1 --- /dev/null +++ b/emmet-core/emmet/core/qchem/calc_types/em_utils.py @@ -0,0 +1,209 @@ +""" Utilities to determine level of theory, task type, and calculation type for Q-Chem calculations in the pydantic Docs paradigm""" +from typing import Optional + +from emmet.core.qchem.calc_types import LevelOfTheory, CalcType, TaskType +from emmet.core.qchem.calculation import CalculationInput +from emmet.core.qchem.calc_types.calc_types import ( + FUNCTIONALS, + BASIS_SETS, +) + + +__author__ = ( + "Evan Spotte-Smith , Rishabh Debraj Guha " +) + + +functional_synonyms = { + "b97mv": "b97m-v", + "b97mrv": "b97m-rv", + "wb97xd": "wb97x-d", + "wb97xd3": "wb97x-d3", + "wb97xv": "wb97x-v", + "wb97mv": "wb97m-v", +} + +smd_synonyms = { + "DIELECTRIC=7,230;N=1,410;ALPHA=0,000;BETA=0,859;GAMMA=36,830;PHI=0,000;PSI=0,000": "diglyme", + "DIELECTRIC=18,500;N=1,415;ALPHA=0,000;BETA=0,735;GAMMA=20,200;PHI=0,000;PSI=0,000": "3:7 EC:EMC", +} + + +def level_of_theory(parameters: CalculationInput) -> LevelOfTheory: + """ + + Returns the level of theory for a calculation, + based on the input parameters given to Q-Chem + + Args: + parameters: Dict of Q-Chem input parameters + + """ + + funct_raw = parameters.rem.get("method") + basis_raw = parameters.rem.get("basis") + + if funct_raw is None or basis_raw is None: + raise ValueError( + 'Method and basis must be included in "rem" section ' "of parameters!" + ) + + disp_corr = parameters.rem.get("dft_d") + + if disp_corr is None: + funct_lower = funct_raw.lower() + funct_lower = functional_synonyms.get(funct_lower, funct_lower) + else: + # Replace Q-Chem terms for D3 tails with more common expressions + disp_corr = disp_corr.replace("_bj", "(bj)").replace("_zero", "(0)") + funct_lower = f"{funct_raw}-{disp_corr}" + + basis_lower = basis_raw.lower() + + functional = [f for f in FUNCTIONALS if f.lower() == funct_lower] + if not functional: + raise ValueError(f"Unexpected functional {funct_lower}!") + + functional = functional[0] + + basis = [b for b in BASIS_SETS if b.lower() == basis_lower] + if not basis: + raise ValueError(f"Unexpected basis set {basis_lower}!") + + basis = basis[0] + + solvent_method = parameters.rem.get("solvent_method", "").lower() + if solvent_method == "": + solvation = "VACUUM" + elif solvent_method in ["pcm", "cosmo"]: + solvation = "PCM" + # TODO: Add this once added into pymatgen and atomate + # elif solvent_method == "isosvp": + # if parameters.get("svp", {}).get("idefesr", 0): + # solvation = "CMIRS" + # else: + # solvation = "ISOSVP" + elif solvent_method == "smd": + solvation = "SMD" + else: + raise ValueError(f"Unexpected implicit solvent method {solvent_method}!") + + lot = f"{functional}/{basis}/{solvation}" + + return LevelOfTheory(lot) + + +def solvent(parameters: CalculationInput, custom_smd: Optional[str] = None) -> str: + """ + Returns the solvent used for this calculation. + + Args: + parameters: Dict of Q-Chem input parameters + custom_smd: (Optional) string representing SMD parameters for a + non-standard solvent + """ + + lot = level_of_theory(parameters) + solvation = lot.value.split("/")[-1] + + if solvation == "PCM": + dielectric = float(parameters.get("solvent", {}).get("dielectric", 78.39)) + dielectric_string = f"{dielectric:.2f}".replace(".", ",") + return f"DIELECTRIC={dielectric_string}" + # TODO: Add this once added into pymatgen and atomate + # elif solvation == "ISOSVP": + # dielectric = float(parameters.get("svp", {}).get("dielst", 78.39)) + # rho = float(parameters.get("svp", {}).get("rhoiso", 0.001)) + # return f"DIELECTRIC={round(dielectric, 2)},RHO={round(rho, 4)}" + # elif solvation == "CMIRS": + # dielectric = float(parameters.get("svp", {}).get("dielst", 78.39)) + # rho = float(parameters.get("svp", {}).get("rhoiso", 0.001)) + # a = parameters.get("pcm_nonels", {}).get("a") + # b = parameters.get("pcm_nonels", {}).get("b") + # c = parameters.get("pcm_nonels", {}).get("c") + # d = parameters.get("pcm_nonels", {}).get("d") + # solvrho = parameters.get("pcm_nonels", {}).get("solvrho") + # gamma = parameters.get("pcm_nonels", {}).get("gamma") + # + # string = f"DIELECTRIC={round(dielectric, 2)},RHO={round(rho, 4)}" + # for name, (piece, digits) in {"A": (a, 6), "B": (b, 6), "C": (c, 1), "D": (d, 3), + # "SOLVRHO": (solvrho, 2), "GAMMA": (gamma, 1)}.items(): + # if piece is None: + # piecestring = "NONE" + # else: + # piecestring = f"{name}={round(float(piece), digits)}" + # string += "," + piecestring + # return string + elif solvation == "SMD": + solvent = parameters.get("smx", {}).get("solvent", "water") + if solvent == "other": + if custom_smd is None: + raise ValueError( + "SMD calculation with solvent=other requires custom_smd!" + ) + + names = ["DIELECTRIC", "N", "ALPHA", "BETA", "GAMMA", "PHI", "PSI"] + numbers = [float(x) for x in custom_smd.split(",")] + + string = "" + for name, number in zip(names, numbers): + string += f"{name}={number:.3f};" + return string.rstrip(",").rstrip(";").replace(".", ",") + else: + return f"SOLVENT={solvent.upper()}" + else: + return "NONE" + + +def lot_solvent_string( + parameters: CalculationInput, custom_smd: Optional[str] = None +) -> str: + """ + Returns a string representation of the level of theory and solvent used for this calculation. + + Args: + parameters: Dict of Q-Chem input parameters + custom_smd: (Optional) string representing SMD parameters for a + non-standard solvent + """ + + lot = level_of_theory(parameters).value + solv = solvent(parameters, custom_smd=custom_smd) + return f"{lot}({solv})" + + +def task_type( + parameters: CalculationInput, special_run_type: Optional[str] = None +) -> TaskType: + if special_run_type == "frequency_flattener": + return TaskType("Frequency Flattening Geometry Optimization") + elif special_run_type == "ts_frequency_flattener": + return TaskType("Frequency Flattening Transition State Geometry Optimization") + + if parameters.job_type == "sp": + return TaskType("Single Point") + elif parameters.job_type == "force": + return TaskType("Force") + elif parameters.job_type == "opt": + return TaskType("Geometry Optimization") + elif parameters.job_type == "ts": + return TaskType("Transition State Geometry Optimization") + elif parameters.job_type == "freq": + return TaskType("Frequency Analysis") + + return TaskType("Unknown") + + +def calc_type( + parameters: CalculationInput, special_run_type: Optional[str] = None +) -> CalcType: + """ + Determines the calc type + + Args: + inputs: inputs dict with an incar, kpoints, potcar, and poscar dictionaries + parameters: Dictionary of VASP parameters from Vasprun.xml + """ + rt = level_of_theory(parameters).value + tt = task_type(parameters, special_run_type=special_run_type).value + return CalcType(f"{rt} {tt}") diff --git a/emmet-core/emmet/core/qchem/calculation.py b/emmet-core/emmet/core/qchem/calculation.py new file mode 100644 index 0000000000..f4af5d1484 --- /dev/null +++ b/emmet-core/emmet/core/qchem/calculation.py @@ -0,0 +1,659 @@ +"""Core definitions of a QChem calculation document.""" + +# mypy: ignore-errors + +import logging +from pathlib import Path +from typing import Any, Dict, List, Optional, Union + +import numpy as np +from pydantic import field_validator, BaseModel, Field, ConfigDict +from datetime import datetime +from pymatgen.io.qchem.inputs import QCInput +from pymatgen.io.qchem.outputs import QCOutput +from pymatgen.core.structure import Molecule +from collections import OrderedDict +import re + +from emmet.core.qchem.calc_types import ( + LevelOfTheory, + CalcType, + TaskType, +) +from emmet.core.qchem.calc_types.calc_types import ( + FUNCTIONALS, + BASIS_SETS, +) + +# from emmet.core.qchem.calc_types.em_utils import ( +# level_of_theory, +# task_type, +# calc_type, +# ) + +from emmet.core.qchem.task import QChemStatus + +functional_synonyms = { + "b97mv": "b97m-v", + "b97mrv": "b97m-rv", + "wb97xd": "wb97x-d", + "wb97xd3": "wb97x-d3", + "wb97xv": "wb97x-v", + "wb97mv": "wb97m-v", +} + +smd_synonyms = { + "DIELECTRIC=7,230;N=1,410;ALPHA=0,000;BETA=0,859;GAMMA=36,830;PHI=0,000;PSI=0,000": "diglyme", + "DIELECTRIC=18,500;N=1,415;ALPHA=0,000;BETA=0,735;GAMMA=20,200;PHI=0,000;PSI=0,000": "3:7 EC:EMC", +} + +__author__ = "Rishabh D. Guha " +logger = logging.getLogger(__name__) + +# class QChemObject(ValueEnum): +# Not sure but can we have something like GRAD and HESS +# as QChem data objects + + +class CalculationInput(BaseModel): + """ + Document defining QChem calculation inputs. + """ + + initial_molecule: Optional[Molecule] = Field( + None, description="input molecule geometry before the QChem calculation" + ) + + # parameters: Dict[str, Any] = Field( + # None, description = "Parameters from a previous QChem calculation." + # ) + + charge: int = Field(None, description="The charge of the input molecule") + + rem: Optional[Dict[str, Any]] = Field( + None, + description="The rem dict of the input file which has all the input parameters", + ) + + job_type: str = Field( + None, description="The type of QChem calculation being performed" + ) + + opt: Optional[Dict[str, Any]] = Field( + None, + description="A dictionary of opt section. For instance atom constraints and fixed atoms. Go to QCInput definition for more details.", + ) + + pcm: Optional[Dict[str, Any]] = Field( + None, description="A dictionary for the PCM solvent details if used" + ) + + solvent: Optional[Dict[str, Any]] = Field( + None, + description="The solvent parameters used if the PCM solvent model has been employed", + ) + + smx: Optional[Dict[str, Any]] = Field( + None, + description="A dictionary for the solvent parameters if the SMD solvent method has been employed", + ) + + vdw_mode: Optional[str] = Field( + None, + description="Either atomic or sequential. Used when custon van der Waals radii are used to construct pcm cavity", + ) + + van_der_waals: Optional[Dict[str, Any]] = Field( + None, description="The dictionary of the custom van der Waals radii if used" + ) + + scan_variables: Optional[Dict[str, Any]] = Field( + None, + description="The dictionary of scan variables for torsions or bond stretches", + ) + + tags: Optional[Union[Dict[str, Any], str]] = Field( + None, description="Any tags associated with the QChem calculation." + ) + + @classmethod + def from_qcinput(cls, qcinput: QCInput) -> "CalculationInput": + """ + Create a QChem input document from a QCInout object. + + Parameters + ---------- + qcinput + A QCInput object. + + Returns + -------- + CalculationInput + The input document. + """ + + return cls( + initial_molecule=qcinput.molecule, + charge=int(qcinput.molecule.as_dict()["charge"]) + if qcinput.molecule.as_dict() + else None, + rem=qcinput.rem, + job_type=qcinput.rem.get("job_type", None), + opt=qcinput.opt, + pcm=qcinput.pcm, + solvent=qcinput.solvent, + smx=qcinput.smx, + vdw_mode=qcinput.vdw_mode, + scan_variables=qcinput.scan, + van_der_waals=qcinput.van_der_waals, + ) + + +class CalculationOutput(BaseModel): + """Document defining QChem calculation outputs.""" + + optimized_molecule: Optional[Molecule] = Field( + None, + description="optimized geometry of the molecule after calculation in case of opt, optimization or ts", + ) + + mulliken: Optional[Union[List, Dict[str, Any]]] = Field( + None, description="Calculate Mulliken charges on the atoms" + ) + + esp: Optional[Union[List, Dict[str, Any]]] = Field( + None, + description="Calculated charges on the atoms if esp calculation has been performed", + ) + + resp: Optional[Union[List, Dict[str, Any]]] = Field( + None, + description="Calculated charges on the atoms if resp calculation has been performed", + ) + + nbo_data: Optional[Dict[str, Any]] = Field( + None, description="NBO data if analysis has been performed." + ) + + frequencies: Optional[Dict[str, Any]] = Field( + None, + description="Calculated frequency modes if the job type is freq or frequency", + ) + + frequency_modes: Optional[Union[List, str]] = Field( + None, description="The list of calculated frequency mode vectors" + ) + + final_energy: Optional[Union[str, float]] = Field( + None, + description="The final energy of the molecule after the calculation is complete", + ) + + enthalpy: Optional[Union[str, float]] = Field( + None, + description="The total enthalpy correction if a frequency calculation has been performed", + ) + + entropy: Optional[Union[str, float]] = Field( + None, + description="The total entropy of the system if a frequency calculation has been performed", + ) + + scan_energies: Optional[Dict[str, Any]] = Field( + None, + description="A dictionary of the scan energies with their respective parameters", + ) + + scan_geometries: Optional[Union[List, Dict[str, Any]]] = Field( + None, description="optimized geometry of the molecules after the geometric scan" + ) + + scan_molecules: Optional[Union[List, Dict[str, Any], Molecule]] = Field( + None, + description="The constructed pymatgen molecules from the optimized scan geometries", + ) + + pcm_gradients: Optional[Union[Dict[str, Any], np.ndarray, List]] = Field( + None, + description="The parsed total gradients after adding the PCM contributions.", + ) + + @field_validator("pcm_gradients", mode="before") + @classmethod + def validate_pcm_gradients(cls, v): + if v is not None and not isinstance(v, (np.ndarray, Dict, List)): + raise ValueError( + "pcm_gradients must be a numpy array, a dict or a list or None." + ) + return v + + cds_gradients: Optional[Union[Dict[str, Any], np.ndarray, List]] = Field( + None, description="The parsed CDS gradients." + ) + + @field_validator("cds_gradients", mode="before") + @classmethod + def validate_cds_gradients(cls, v): + if v is not None and not isinstance(v, (np.ndarray, Dict, List)): + raise ValueError( + "cds_gradients must be a numpy array, a dict or a list or None." + ) + return v + + dipoles: Optional[Dict[str, Any]] = Field( + None, description="The associated dipoles for Mulliken/RESP/ESP charges" + ) + + gap_info: Optional[Dict[str, Any]] = Field( + None, description="The Kohn-Sham HOMO-LUMO gaps and associated Eigenvalues" + ) + + @classmethod + def from_qcoutput(cls, qcoutput: QCOutput) -> "CalculationOutput": + """ + Create a QChem output document from a QCOutput object. + + Parameters + ---------- + qcoutput + A QCOutput object. + + Returns + -------- + CalculationOutput + The output document. + """ + optimized_molecule = qcoutput.data.get("molecule_from_optimized_geometry", {}) + optimized_molecule = optimized_molecule if optimized_molecule else None + return cls( + optimized_molecule=optimized_molecule, + mulliken=qcoutput.data.get(["Mulliken"][-1], []), + esp=qcoutput.data.get(["ESP"][-1], []), + resp=qcoutput.data.get(["RESP"][-1], []), + nbo_data=qcoutput.data.get("nbo_data", {}), + frequencies=qcoutput.data.get("frequencies", {}), + frequency_modes=qcoutput.data.get("frequency_mode_vectors", []), + final_energy=qcoutput.data.get("final_energy", None), + enthalpy=qcoutput.data.get("enthalpy", None), + entropy=qcoutput.data.get("entropy", None), + scan_energies=qcoutput.data.get("scan_energies", {}), + scan_geometries=qcoutput.data.get("optimized_geometries", {}), + scan_molecules=qcoutput.data.get("molecules_from_optimized_geometries", {}), + pcm_gradients=qcoutput.data.get(["pcm_gradients"][0], None), + cds_gradients=qcoutput.data.get(["CDS_gradients"][0], None), + dipoles=qcoutput.data.get("dipoles", None), + gap_info=qcoutput.data.get("gap_info", None), + ) + + model_config = ConfigDict(arbitrary_types_allowed=True) + # TODO What can be done for the trajectories, also how will walltime and cputime be reconciled + + +class Calculation(BaseModel): + """Full QChem calculation inputs and outputs.""" + + dir_name: str = Field(None, description="The directory for this QChem calculation") + qchem_version: str = Field( + None, description="QChem version used to perform the current calculation" + ) + has_qchem_completed: Union[QChemStatus, bool] = Field( + None, description="Whether QChem calculated the calculation successfully" + ) + input: CalculationInput = Field( + None, description="QChem input settings for the calculation" + ) + output: CalculationOutput = Field( + None, description="The QChem calculation output document" + ) + completed_at: str = Field( + None, description="Timestamp for when the calculation was completed" + ) + task_name: str = Field( + None, + description="Name of task given by custodian (e.g. opt1, opt2, freq1, freq2)", + ) + output_file_paths: Dict[str, Union[str, Path, Dict[str, Path]]] = Field( + None, + description="Paths (relative to dir_name) of the QChem output files associated with this calculation", + ) + level_of_theory: LevelOfTheory = Field( + None, + description="Levels of theory used for the QChem calculation: For instance, B97-D/6-31g*", + ) + solvation_lot_info: Optional[str] = Field( + None, + description="A condensed string representation of the comboned LOT and Solvent info", + ) + task_type: TaskType = Field( + None, + description="Calculation task type like Single Point, Geometry Optimization. Frequency...", + ) + calc_type: CalcType = Field( + None, + description="Combination dict of LOT + TaskType: B97-D/6-31g*/VACUUM Geometry Optimization", + ) + + @classmethod + def from_qchem_files( + cls, + dir_name: Union[Path, str], + task_name: str, + qcinput_file: Union[Path, str], + qcoutput_file: Union[Path, str], + store_energy_trajectory: bool = False, + qcinput_kwargs: Optional[Dict] = None, + qcoutput_kwargs: Optional[Dict] = None, + ) -> "Calculation": + """ + Create a QChem calculation document from a directory and file paths. + + Parameters + ---------- + dir_name + The directory containing the QChem calculation outputs. + task_name + The task name. + qcinput_file + Path to the .in/qin file, relative to dir_name. + qcoutput_file + Path to the .out/.qout file, relative to dir_name. + store_energy_trajectory + Whether to store the energy trajectory during a QChem calculation #TODO: Revisit this- False for now. + qcinput_kwargs + Additional keyword arguments that will be passed to the qcinput file + qcoutput_kwargs + Additional keyword arguments that will be passed to the qcoutput file + + Returns + ------- + Calculation + A QChem calculation document. + """ + + dir_name = Path(dir_name) + qcinput_file = dir_name / qcinput_file + qcoutput_file = dir_name / qcoutput_file + + output_file_paths = _find_qchem_files(dir_name) + + qcinput_kwargs = qcinput_kwargs if qcinput_kwargs else {} + qcinput = QCInput.from_file(qcinput_file, **qcinput_kwargs) + + qcoutput_kwargs = qcoutput_kwargs if qcoutput_kwargs else {} + qcoutput = QCOutput(qcoutput_file, **qcoutput_kwargs) + + completed_at = str(datetime.fromtimestamp(qcoutput_file.stat().st_mtime)) + + input_doc = CalculationInput.from_qcinput(qcinput) + output_doc = CalculationOutput.from_qcoutput(qcoutput) + + has_qchem_completed = ( + QChemStatus.SUCCESS + if qcoutput.data.get("completion", []) + else QChemStatus.FAILED + ) + + if store_energy_trajectory: + print("Still have to figure the energy trajectory") + + return cls( + dir_name=str(dir_name), + task_name=task_name, + qchem_version=qcoutput.data["version"], + has_qchem_completed=has_qchem_completed, + completed_at=completed_at, + input=input_doc, + output=output_doc, + output_file_paths={ + k.lower(): Path(v) + if isinstance(v, str) + else {k2: Path(v2) for k2, v2 in v.items()} + for k, v in output_file_paths.items() + }, + level_of_theory=level_of_theory(input_doc), + solvation_lot_info=lot_solvent_string(input_doc), + task_type=task_type(input_doc), + calc_type=calc_type(input_doc), + ) + + +def _find_qchem_files( + path: Union[str, Path], +) -> Dict[str, Any]: + """ + Find QChem files in a directory. + + Only the mol.qout file (or alternatively files + with the task name as an extension, e.g., mol.qout.opt_0.gz, mol.qout.freq_1.gz, or something like this...) + will be returned. + + Parameters + ---------- + path + Path to a directory to search. + + Returns + ------- + Dict[str, Any] + The filenames of the calculation outputs for each QChem task, given as a ordered dictionary of:: + + { + task_name:{ + "qchem_out_file": qcrun_filename, + }, + ... + } + If there is only 1 qout file task_name will be "standard" otherwise it will be the extension name like "opt_0" + """ + path = Path(path) + task_files = OrderedDict() + + in_file_pattern = re.compile(r"^(?Pmol\.qin(?:\..+)?)\.gz$") + + for file in path.iterdir(): + if file.is_file(): + in_match = in_file_pattern.match(file.name) + if in_match: + in_task_name = in_match.group("in_task_name").replace("mol.qin.", "") + if in_task_name == "orig": + task_files[in_task_name] = {"orig_input_file": file} + elif in_task_name == "mol.qin": + task_files["standard"] = { + "qcinput_file": file, + "qcoutput_file": Path("mol.qout.gz"), + } + else: + try: + task_files[in_task_name] = { + "qcinput_file": file, + "qcoutput_file": Path("mol.qout." + in_task_name + ".gz"), + } + except FileNotFoundError: + task_files[in_task_name] = { + "qcinput_file": file, + "qcoutput_file": "No qout files exist for this in file", + } + + return task_files + + +def level_of_theory(parameters: CalculationInput) -> LevelOfTheory: + """ + + Returns the level of theory for a calculation, + based on the input parameters given to Q-Chem + + Args: + parameters: Dict of Q-Chem input parameters + + """ + + funct_raw = parameters.rem.get("method") + basis_raw = parameters.rem.get("basis") + + if funct_raw is None or basis_raw is None: + raise ValueError( + 'Method and basis must be included in "rem" section ' "of parameters!" + ) + + disp_corr = parameters.rem.get("dft_d") + + if disp_corr is None: + funct_lower = funct_raw.lower() + funct_lower = functional_synonyms.get(funct_lower, funct_lower) + else: + # Replace Q-Chem terms for D3 tails with more common expressions + disp_corr = disp_corr.replace("_bj", "(bj)").replace("_zero", "(0)") + funct_lower = f"{funct_raw}-{disp_corr}" + + basis_lower = basis_raw.lower() + + functional = [f for f in FUNCTIONALS if f.lower() == funct_lower] + if not functional: + raise ValueError(f"Unexpected functional {funct_lower}!") + + functional = functional[0] + + basis = [b for b in BASIS_SETS if b.lower() == basis_lower] + if not basis: + raise ValueError(f"Unexpected basis set {basis_lower}!") + + basis = basis[0] + + solvent_method = parameters.rem.get("solvent_method", "").lower() + if solvent_method == "": + solvation = "VACUUM" + elif solvent_method in ["pcm", "cosmo"]: + solvation = "PCM" + # TODO: Add this once added into pymatgen and atomate + # elif solvent_method == "isosvp": + # if parameters.get("svp", {}).get("idefesr", 0): + # solvation = "CMIRS" + # else: + # solvation = "ISOSVP" + elif solvent_method == "smd": + solvation = "SMD" + else: + raise ValueError(f"Unexpected implicit solvent method {solvent_method}!") + + lot = f"{functional}/{basis}/{solvation}" + + return LevelOfTheory(lot) + + +def solvent(parameters: CalculationInput, custom_smd: Optional[str] = None) -> str: + """ + Returns the solvent used for this calculation. + + Args: + parameters: Dict of Q-Chem input parameters + custom_smd: (Optional) string representing SMD parameters for a + non-standard solvent + """ + + lot = level_of_theory(parameters) + solvation = lot.value.split("/")[-1] + + if solvation == "PCM": + # dielectric = float(parameters.get("solvent", {}).get("dielectric", 78.39)) + # dielectric = float(parameters.get("solvent", {})) + # dielectric = getattr(parameters, "solvent", None) + # dielectric_string = f"{dielectric.get('dielectric', '0.0'):.2f}".replace(".", ",") + dielectric_string = getattr(parameters, "solvent", None) + return f"DIELECTRIC= {dielectric_string}" + # TODO: Add this once added into pymatgen and atomate + # elif solvation == "ISOSVP": + # dielectric = float(parameters.get("svp", {}).get("dielst", 78.39)) + # rho = float(parameters.get("svp", {}).get("rhoiso", 0.001)) + # return f"DIELECTRIC={round(dielectric, 2)},RHO={round(rho, 4)}" + # elif solvation == "CMIRS": + # dielectric = float(parameters.get("svp", {}).get("dielst", 78.39)) + # rho = float(parameters.get("svp", {}).get("rhoiso", 0.001)) + # a = parameters.get("pcm_nonels", {}).get("a") + # b = parameters.get("pcm_nonels", {}).get("b") + # c = parameters.get("pcm_nonels", {}).get("c") + # d = parameters.get("pcm_nonels", {}).get("d") + # solvrho = parameters.get("pcm_nonels", {}).get("solvrho") + # gamma = parameters.get("pcm_nonels", {}).get("gamma") + # + # string = f"DIELECTRIC={round(dielectric, 2)},RHO={round(rho, 4)}" + # for name, (piece, digits) in {"A": (a, 6), "B": (b, 6), "C": (c, 1), "D": (d, 3), + # "SOLVRHO": (solvrho, 2), "GAMMA": (gamma, 1)}.items(): + # if piece is None: + # piecestring = "NONE" + # else: + # piecestring = f"{name}={round(float(piece), digits)}" + # string += "," + piecestring + # return string + elif solvation == "SMD": + solvent = parameters.smx.get("solvent", "water") + if solvent == "other": + if custom_smd is None: + raise ValueError( + "SMD calculation with solvent=other requires custom_smd!" + ) + + names = ["DIELECTRIC", "N", "ALPHA", "BETA", "GAMMA", "PHI", "PSI"] + numbers = [float(x) for x in custom_smd.split(",")] + + string = "" + for name, number in zip(names, numbers): + string += f"{name}={number:.3f};" + return string.rstrip(",").rstrip(";").replace(".", ",") + else: + return f"SOLVENT={solvent.upper()}" + else: + return "NONE" + + +def lot_solvent_string( + parameters: CalculationInput, custom_smd: Optional[str] = None +) -> str: + """ + Returns a string representation of the level of theory and solvent used for this calculation. + + Args: + parameters: Dict of Q-Chem input parameters + custom_smd: (Optional) string representing SMD parameters for a + non-standard solvent + """ + + lot = level_of_theory(parameters).value + solv = solvent(parameters, custom_smd=custom_smd) + return f"{lot}({solv})" + + +def task_type( + parameters: CalculationInput, special_run_type: Optional[str] = None +) -> TaskType: + if special_run_type == "frequency_flattener": + return TaskType("Frequency Flattening Geometry Optimization") + elif special_run_type == "ts_frequency_flattener": + return TaskType("Frequency Flattening Transition State Geometry Optimization") + + if parameters.job_type == "sp": + return TaskType("Single Point") + elif parameters.job_type == "force": + return TaskType("Force") + elif parameters.job_type == "opt": + return TaskType("Geometry Optimization") + elif parameters.job_type == "ts": + return TaskType("Transition State Geometry Optimization") + elif parameters.job_type == "freq": + return TaskType("Frequency Analysis") + + return TaskType("Unknown") + + +def calc_type( + parameters: CalculationInput, special_run_type: Optional[str] = None +) -> CalcType: + """ + Determines the calc type + + Args: + parameters: CalculationInput parameters + """ + rt = level_of_theory(parameters).value + tt = task_type(parameters, special_run_type=special_run_type).value + return CalcType(f"{rt} {tt}") diff --git a/emmet-core/emmet/core/qchem/task.py b/emmet-core/emmet/core/qchem/task.py index 4a1fa2a6d3..d009208f08 100644 --- a/emmet-core/emmet/core/qchem/task.py +++ b/emmet-core/emmet/core/qchem/task.py @@ -29,7 +29,7 @@ class QChemStatus(ValueEnum): Q-Chem Calculation State """ - SUCESS = "successful" + SUCCESS = "successful" FAILED = "unsuccessful" diff --git a/emmet-core/tests/conftest_qchem.py b/emmet-core/tests/conftest_qchem.py new file mode 100644 index 0000000000..b98cb69188 --- /dev/null +++ b/emmet-core/tests/conftest_qchem.py @@ -0,0 +1,398 @@ +from pathlib import Path +import pytest +import numpy as np + + +@pytest.fixture(scope="session") +def test_dir(): + return Path(__file__).parent.parent.parent.joinpath("test_files").resolve() + + +def assert_schemas_equal(test_schema, valid_schema): + """ + Recursively test all items in valid_schema are present and equal in test_schema. + + While test_schema can be a pydantic schema or dictionary, the valid schema must + be a (nested) dictionary. This function automatically handles accessing the + attributes of classes in the test_schema. + + Args: + test_schema: A pydantic schema or dictionary of the schema. + valid_schema: A (nested) dictionary specifying the key and values that must be + present in test_schema. This is what the generated test_schema will be tested against + """ + from pydantic import BaseModel + + if isinstance(valid_schema, dict): + for key, sub_valid_schema in valid_schema.items(): + if isinstance(key, str) and hasattr(test_schema, key): + sub_test_schema = getattr(test_schema, key) + if key == "initial_molecule": + sub_test_schema = sub_test_schema.as_dict() + elif key == "optimized_molecule": + sub_test_schema = ( + sub_test_schema.as_dict() if sub_test_schema else {} + ) + elif not isinstance(test_schema, BaseModel): + sub_test_schema = test_schema[key] + else: + raise ValueError(f"{type(test_schema)} does not have field: {key}") + return assert_schemas_equal(sub_test_schema, sub_valid_schema) + + elif isinstance(valid_schema, list): + for i, sub_valid_schema in enumerate(valid_schema): + return assert_schemas_equal(test_schema[i], sub_valid_schema) + + elif isinstance(valid_schema, np.ndarray): + assert np.array_equal(test_schema, valid_schema) + + elif isinstance(valid_schema, float): + assert test_schema == pytest.approx(valid_schema) + else: + assert test_schema == valid_schema + + +class SchemaTestData: + """Dummy class to be used to contain all test data information""" + + +class SinglePointTest(SchemaTestData): + folder = "qchem_sp_test" + task_files = { + "standard": { + "qcinput_file": "mol.qin.gz", + "qcoutput_file": "mol.qout.gz", + } + } + + objects = {"standard": []} + task_doc = { + "calcs_reversed": [ + { + "output": { + "mulliken": [np.array([-0.713178, 0.357278, 0.3559])], + "resp": [np.array([-0.872759, 0.436379, 0.436379])], + "final_energy": -76.4493700739, + }, + "input": { + "charge": 0, + "rem": { + "job_type": "sp", + "basis": "def2-qzvppd", + "max_scf_cycles": "100", + "gen_scfman": "true", + "xc_grid": "3", + "thresh": "14", + "s2thresh": "16", + "scf_algorithm": "diis", + "resp_charges": "true", + "symmetry": "false", + "sym_ignore": "true", + "method": "wb97mv", + "solvent_method": "smd", + "ideriv": "1", + }, + "job_type": "sp", + }, + } + ], + "input": { + "initial_molecule": { + "@module": "pymatgen.core.structure", + "@class": "Molecule", + "charge": 0.0, + "spin_multiplicity": 1, + "sites": [ + { + "name": "O", + "species": [{"element": "O", "occu": 1}], + "xyz": [-0.80595, 2.22952, -0.01914], + "properties": {}, + "label": "O", + }, + { + "name": "H", + "species": [{"element": "H", "occu": 1}], + "xyz": [0.18338, 2.20176, 0.01351], + "properties": {}, + "label": "H", + }, + { + "name": "H", + "species": [{"element": "H", "occu": 1}], + "xyz": [-1.09531, 1.61602, 0.70231], + "properties": {}, + "label": "H", + }, + ], + }, + "rem": { + "job_type": "sp", + "basis": "def2-qzvppd", + "max_scf_cycles": "100", + "gen_scfman": "true", + "xc_grid": "3", + "thresh": "14", + "s2thresh": "16", + "scf_algorithm": "diis", + "resp_charges": "true", + "symmetry": "false", + "sym_ignore": "true", + "method": "wb97mv", + "solvent_method": "smd", + "ideriv": "1", + }, + "level_of_theory": "wB97M-V/def2-QZVPPD/SMD", + "task_type": "Single Point", + "calc_type": "wB97M-V/def2-QZVPPD/SMD Single Point", + "solvation_lot_nfo": "wB97M-V/def2-QZVPPD/SMD(SOLVENT=WATER)", + }, + "output": { + "mulliken": [np.array([-0.713178, 0.357278, 0.3559])], + "resp": [np.array([-0.872759, 0.436379, 0.436379])], + "final_energy": -76.4493700739, + "dipoles": { + "total": 2.4648, + "dipole": [np.array([1.4227, -1.3039, 1.5333])], + "RESP_total": 2.5411, + "RESP_dipole": [np.array([1.4672, -1.3441, 1.5806])], + }, + }, + "custodian": [ + { + "job": { + "@module": "custodian.qchem.jobs", + "@class": "QCJob", + "@version": "2022.5.26", + "qchem_command": ["qchem"], + "max_cores": "40", + "multimode": "openmp", + "input_file": "mol.qin", + "output_file": "mol.qout", + "qclog_file": "mol.qclog", + "suffix": "", + "calc_loc": "/tmp", + "nboexe": None, + "save_scratch": False, + "backup": "true", + }, + "corrections": [], + } + ], + } + + +class OptimizationTest(SchemaTestData): + folder = "qchem_opt_test" + task_files = { + "standard": { + "qcinput_file": "mol.qin.gz", + "qcoutput_file": "mol.qout.gz", + } + } + + objects = {"standard": []} + task_doc = { + "calcs_reversed": [ + { + "output": { + "optimized_molecule": { + "@module": "pymatgen.core.structure", + "@class": "Molecule", + "charge": 0, + "spin_multiplicity": 1, + "sites": [ + { + "name": "O", + "species": [{"element": "O", "occu": 1}], + "xyz": [-0.8008592596, 2.2248298937, -0.0136245943], + "properties": {}, + "label": "O", + }, + { + "name": "H", + "species": [{"element": "H", "occu": 1}], + "xyz": [0.1637955748, 2.1962925542, 0.0199393927], + "properties": {}, + "label": "H", + }, + { + "name": "H", + "species": [{"element": "H", "occu": 1}], + "xyz": [-1.0808163152, 1.6261775521, 0.6903652017], + "properties": {}, + "label": "H", + }, + ], + }, + "mulliken": [-0.373491, 0.186964, 0.186527], + "resp": [-0.89522, 0.44761, 0.44761], + "final_energy": -76.358341626913, + }, + "input": { + "charge": 0, + "rem": { + "job_type": "sp", + "basis": "def2-qzvppd", + "max_scf_cycles": "100", + "gen_scfman": "true", + "xc_grid": "3", + "thresh": "14", + "s2thresh": "16", + "scf_algorithm": "diis", + "resp_charges": "true", + "symmetry": "false", + "sym_ignore": "true", + "method": "wb97mv", + "solvent_method": "smd", + "ideriv": "1", + }, + "job_type": "sp", + }, + } + ], + "input": { + "initial_molecule": { + "@module": "pymatgen.core.structure", + "@class": "Molecule", + "charge": 0.0, + "spin_multiplicity": 1, + "sites": [ + { + "name": "O", + "species": [{"element": "O", "occu": 1}], + "xyz": [-0.80595, 2.22952, -0.01914], + "properties": {}, + "label": "O", + }, + { + "name": "H", + "species": [{"element": "H", "occu": 1}], + "xyz": [0.18338, 2.20176, 0.01351], + "properties": {}, + "label": "H", + }, + { + "name": "H", + "species": [{"element": "H", "occu": 1}], + "xyz": [-1.09531, 1.61602, 0.70231], + "properties": {}, + "label": "H", + }, + ], + }, + "rem": { + "job_type": "opt", + "basis": "def2-svpd", + "max_scf_cycles": "100", + "gen_scfman": "true", + "xc_grid": "3", + "thresh": "14", + "s2thresh": "16", + "scf_algorithm": "diis", + "resp_charges": "true", + "symmetry": "false", + "sym_ignore": "true", + "method": "wb97mv", + "solvent_method": "smd", + "ideriv": "1", + "geom_opt2": "3", + }, + "level_of_theory": "wB97M-V/def2-SVPD/SMD", + "task_type": "Geometry Optimization", + "calc_type": "wB97M-V/def2-SVPD/SMD Geometry Optimization", + "solvation_lot_nfo": "wB97M-V/def2-SVPD/SMD(SOLVENT=WATER)", + }, + "output": { + "initial_molecule": { + "@module": "pymatgen.core.structure", + "@class": "Molecule", + "charge": 0.0, + "spin_multiplicity": 1, + "sites": [ + { + "name": "O", + "species": [{"element": "O", "occu": 1}], + "xyz": [-0.80595, 2.22952, -0.01914], + "properties": {}, + "label": "O", + }, + { + "name": "H", + "species": [{"element": "H", "occu": 1}], + "xyz": [0.18338, 2.20176, 0.01351], + "properties": {}, + "label": "H", + }, + { + "name": "H", + "species": [{"element": "H", "occu": 1}], + "xyz": [-1.09531, 1.61602, 0.70231], + "properties": {}, + "label": "H", + }, + ], + }, + "optimized_molecule": { + "@module": "pymatgen.core.structure", + "@class": "Molecule", + "charge": 0, + "spin_multiplicity": 1, + "sites": [ + { + "name": "O", + "species": [{"element": "O", "occu": 1}], + "xyz": [-0.8008592596, 2.2248298937, -0.0136245943], + "properties": {}, + "label": "O", + }, + { + "name": "H", + "species": [{"element": "H", "occu": 1}], + "xyz": [0.1637955748, 2.1962925542, 0.0199393927], + "properties": {}, + "label": "H", + }, + { + "name": "H", + "species": [{"element": "H", "occu": 1}], + "xyz": [-1.0808163152, 1.6261775521, 0.6903652017], + "properties": {}, + "label": "H", + }, + ], + }, + "mulliken": [-0.373491, 0.186964, 0.186527], + "resp": [-0.89522, 0.44761, 0.44761], + "final_energy": -76.358341626913, + }, + "custodian": [ + { + "job": { + "@module": "custodian.qchem.jobs", + "@class": "QCJob", + "@version": "2022.5.26", + "qchem_command": ["qchem"], + "max_cores": "40", + "multimode": "openmp", + "input_file": "mol.qin", + "output_file": "mol.qout", + "qclog_file": "mol.qclog", + "suffix": "", + "calc_loc": "/tmp", + "nboexe": "null", + "save_scratch": "false", + "backup": "true", + }, + "corrections": [], + } + ], + } + + +objects = {cls.__name__: cls for cls in SchemaTestData.__subclasses__()} + + +def get_test_object(object_name): + """Get the schema test data object from the class name.""" + return objects[object_name] diff --git a/emmet-core/tests/test_qc_task.py b/emmet-core/tests/test_qc_task.py new file mode 100644 index 0000000000..d47c41a23d --- /dev/null +++ b/emmet-core/tests/test_qc_task.py @@ -0,0 +1,90 @@ +import pytest +from tests.conftest_qchem import assert_schemas_equal, get_test_object + + +@pytest.mark.parametrize( + "object_name, task_name", + [ + pytest.param("SinglePointTest", "standard", id="SinglePointTest"), + pytest.param("OptimizationTest", "standard", id="OptimizationTest"), + ], +) # Can add more later, something like freq, pesscan, ts, +# FFOptTest, once we get flows working for qchem in atomate2 + + +def test_input_summary(test_dir, object_name, task_name): + from monty.json import MontyDecoder, jsanitize + from emmet.core.qc_tasks import InputDoc + from emmet.core.qchem.calculation import Calculation + + test_object = get_test_object(object_name) + dir_name = test_dir / "qchem" / test_object.folder + + files = test_object.task_files[task_name] + calc_doc = Calculation.from_qchem_files(dir_name, task_name, **files) + + test_doc = InputDoc.from_qchem_calc_doc(calc_doc) + valid_doc = test_object.task_doc["input"] + assert_schemas_equal(test_doc, valid_doc) + + # test document can be jsanitized + d = jsanitize(test_doc, strict=True, enum_values=True, allow_bson=True) + + # and decoded + MontyDecoder().process_decoded(d) + + +@pytest.mark.parametrize( + "object_name, task_name", + [ + pytest.param("SinglePointTest", "standard", id="SinglePointTest"), + pytest.param("OptimizationTest", "standard", id="OptimizationTest"), + ], +) +def test_output_summary(test_dir, object_name, task_name): + from monty.json import MontyDecoder, jsanitize + from emmet.core.qc_tasks import OutputDoc + from emmet.core.qchem.calculation import Calculation + + test_object = get_test_object(object_name) + dir_name = test_dir / "qchem" / test_object.folder + + files = test_object.task_files[task_name] + calc_doc = Calculation.from_qchem_files(dir_name, task_name, **files) + + test_doc = OutputDoc.from_qchem_calc_doc(calc_doc) + valid_doc = test_object.task_doc["output"] + assert_schemas_equal(test_doc, valid_doc) + + # test document can be janitized + d = jsanitize(test_doc, strict=True, enum_values=True, allow_bson=True) + + # and decoded + MontyDecoder().process_decoded(d) + + +@pytest.mark.parametrize( + "object_name", + [ + pytest.param("SinglePointTest", id="SinglePointTest"), + pytest.param("OptimizationTest", id="OptimizationTest"), + ], +) +def test_task_doc(test_dir, object_name): + from monty.json import MontyDecoder, jsanitize + from emmet.core.qc_tasks import TaskDoc + + test_object = get_test_object(object_name) + dir_name = test_dir / "qchem" / test_object.folder + test_doc = TaskDoc.from_directory(dir_name) + assert_schemas_equal(test_doc, test_object.task_doc) + + # test document can be jsanitized + d = jsanitize(test_doc, strict=True, enum_values=True, allow_bson=True) + + # and decoded + MontyDecoder().process_decoded(d) + + # Test that additional_fields works + test_doc = TaskDoc.from_directory(dir_name, additional_fields={"foo": "bar"}) + assert test_doc.model_dump()["additional_fields"] == {"foo": "bar"} diff --git a/test_files/qchem/qchem_opt_test/GRAD.gz b/test_files/qchem/qchem_opt_test/GRAD.gz new file mode 100644 index 0000000000..377e77672e Binary files /dev/null and b/test_files/qchem/qchem_opt_test/GRAD.gz differ diff --git a/test_files/qchem/qchem_opt_test/custodian.json.gz b/test_files/qchem/qchem_opt_test/custodian.json.gz new file mode 100644 index 0000000000..e5cda623be Binary files /dev/null and b/test_files/qchem/qchem_opt_test/custodian.json.gz differ diff --git a/test_files/qchem/qchem_opt_test/mol.qclog.gz b/test_files/qchem/qchem_opt_test/mol.qclog.gz new file mode 100644 index 0000000000..c34484d7b5 Binary files /dev/null and b/test_files/qchem/qchem_opt_test/mol.qclog.gz differ diff --git a/test_files/qchem/qchem_opt_test/mol.qin.gz b/test_files/qchem/qchem_opt_test/mol.qin.gz new file mode 100644 index 0000000000..92c0c24c86 Binary files /dev/null and b/test_files/qchem/qchem_opt_test/mol.qin.gz differ diff --git a/test_files/qchem/qchem_opt_test/mol.qin.orig.gz b/test_files/qchem/qchem_opt_test/mol.qin.orig.gz new file mode 100644 index 0000000000..e8d87b4810 Binary files /dev/null and b/test_files/qchem/qchem_opt_test/mol.qin.orig.gz differ diff --git a/test_files/qchem/qchem_opt_test/mol.qout.gz b/test_files/qchem/qchem_opt_test/mol.qout.gz new file mode 100644 index 0000000000..ad68b2d629 Binary files /dev/null and b/test_files/qchem/qchem_opt_test/mol.qout.gz differ diff --git a/test_files/qchem/qchem_sp_test/custodian.json.gz b/test_files/qchem/qchem_sp_test/custodian.json.gz new file mode 100644 index 0000000000..cf29c3973b Binary files /dev/null and b/test_files/qchem/qchem_sp_test/custodian.json.gz differ diff --git a/test_files/qchem/qchem_sp_test/mol.qclog.gz b/test_files/qchem/qchem_sp_test/mol.qclog.gz new file mode 100644 index 0000000000..ae7abcbd0e Binary files /dev/null and b/test_files/qchem/qchem_sp_test/mol.qclog.gz differ diff --git a/test_files/qchem/qchem_sp_test/mol.qin.gz b/test_files/qchem/qchem_sp_test/mol.qin.gz new file mode 100644 index 0000000000..5f32df9789 Binary files /dev/null and b/test_files/qchem/qchem_sp_test/mol.qin.gz differ diff --git a/test_files/qchem/qchem_sp_test/mol.qin.orig.gz b/test_files/qchem/qchem_sp_test/mol.qin.orig.gz new file mode 100644 index 0000000000..bd5096748b Binary files /dev/null and b/test_files/qchem/qchem_sp_test/mol.qin.orig.gz differ diff --git a/test_files/qchem/qchem_sp_test/mol.qout.gz b/test_files/qchem/qchem_sp_test/mol.qout.gz new file mode 100644 index 0000000000..78e514d980 Binary files /dev/null and b/test_files/qchem/qchem_sp_test/mol.qout.gz differ