From 49f2147ac790a7faa2e919ea28f969ff43027d5f Mon Sep 17 00:00:00 2001 From: Matteo Voges Date: Fri, 2 Feb 2024 14:06:23 +0100 Subject: [PATCH] feat: add omegaconf inventory core --- kapitan/inventory/__init__.py | 2 + kapitan/inventory/inv_omegaconf/__init__.py | 0 .../inventory/inv_omegaconf/inv_omegaconf.py | 274 ++++++++++++++++++ kapitan/inventory/inv_omegaconf/migrate.py | 77 +++++ kapitan/inventory/inv_omegaconf/resolvers.py | 272 +++++++++++++++++ kapitan/inventory/inventory.py | 6 + kapitan/resources.py | 4 + 7 files changed, 635 insertions(+) create mode 100644 kapitan/inventory/inv_omegaconf/__init__.py create mode 100644 kapitan/inventory/inv_omegaconf/inv_omegaconf.py create mode 100644 kapitan/inventory/inv_omegaconf/migrate.py create mode 100644 kapitan/inventory/inv_omegaconf/resolvers.py diff --git a/kapitan/inventory/__init__.py b/kapitan/inventory/__init__.py index c3c0f662d..cd16cb7f2 100644 --- a/kapitan/inventory/__init__.py +++ b/kapitan/inventory/__init__.py @@ -1,10 +1,12 @@ from typing import Type from .inv_reclass import ReclassInventory +from kapitan.inventory.inv_omegaconf.inv_omegaconf import OmegaConfInventory from .inventory import Inventory # Dict mapping values for command line flag `--inventory-backend` to the # associated `Inventory` subclass. AVAILABLE_BACKENDS: dict[str, Type[Inventory]] = { "reclass": ReclassInventory, + "omegaconf": OmegaConfInventory, } diff --git a/kapitan/inventory/inv_omegaconf/__init__.py b/kapitan/inventory/inv_omegaconf/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/kapitan/inventory/inv_omegaconf/inv_omegaconf.py b/kapitan/inventory/inv_omegaconf/inv_omegaconf.py new file mode 100644 index 000000000..146ece4ae --- /dev/null +++ b/kapitan/inventory/inv_omegaconf/inv_omegaconf.py @@ -0,0 +1,274 @@ +#!/usr/bin/env python3 + +# Copyright 2019 The Kapitan Authors +# SPDX-FileCopyrightText: 2020 The Kapitan Authors +# +# SPDX-License-Identifier: Apache-2.0 + +import logging +import multiprocessing as mp +import os +from copy import deepcopy +from time import time + +import yaml +from omegaconf import ListMergeMode, OmegaConf + +from kapitan import cached +from .migrate import migrate +from ..inventory import InventoryError, Inventory +from .resolvers import register_resolvers + +logger = logging.getLogger(__name__) + + +class InventoryTarget: + targets_path: str + logfile: str + + def __init__(self, target_name: str, target_path: str) -> None: + self.path = target_path + self.name = target_name + + # compose node name + self.composed_name = ( + os.path.splitext(target_path)[0].replace(self.targets_path + os.sep, "").replace("/", ".") + ) + + self.classes: list = [] + self.parameters: dict = {} + self.classes_redundancy_check: set = set() + + def _merge(self, class_parameters): + if not self.parameters: + self.parameters = class_parameters + else: + merged_parameters = OmegaConf.unsafe_merge( + class_parameters, + self.parameters, + list_merge_mode=ListMergeMode.EXTEND, + ) + + self.parameters = merged_parameters + + def _resolve(self): + escape_interpolation_strings = False + OmegaConf.resolve(self.parameters, escape_interpolation_strings) + + # remove specified keys + remove_location = "omegaconf.remove" + removed_keys = OmegaConf.select(self.parameters, remove_location, default=[]) + for key in removed_keys: + OmegaConf.update(self.parameters, key, {}, merge=False) + + # resolve second time and convert to object + # add throw_on_missing = True when resolving second time (--> wait for to_object support) + # reference: https://github.com/omry/omegaconf/pull/1113 + OmegaConf.resolve(self.parameters, escape_interpolation_strings) + self.parameters = OmegaConf.to_container(self.parameters) + + def add_metadata(self): + # append meta data (legacy: _reclass_) + _meta_ = { + "name": { + "full": self.name, + "parts": self.name.split("."), + "path": self.name.replace(".", "/"), + "short": self.name, + } + } + self.parameters["_meta_"] = _meta_ + self.parameters["_reclass_"] = _meta_ # legacy + + +class InventoryClass: + classes_path: str = "./inventory/classes" + + def __init__(self, class_path: str) -> None: + self.path = class_path + self.name = os.path.splitext(class_path)[0].replace(self.classes_path + os.sep, "").replace("/", ".") + self.parameters = {} + self.dependents = [] + + +class OmegaConfInventory(Inventory): + classes_cache: dict = {} + + # InventoryTarget.targets_path = self.targets_searchpath + # InventoryClass.classes_path = self.classes_searchpath + + def inventory(self): + register_resolvers(self.inventory_path) + selected_targets = self.get_selected_targets() + + # FEAT: add flag for multiprocessing + use_mp = True + + if not use_mp: + nodes = {} + # load targets one by one + for target in selected_targets: + try: + self.load_target(target) + nodes[target.name] = {"parameters": target.parameters} + except Exception as e: + raise InventoryError(f"{target.name}: {e}") + else: + # load targets parallel + manager = mp.Manager() # perf: bottleneck --> 90 % of the inventory time + + nodes = manager.dict() + mp.set_start_method("spawn", True) # platform independent + with mp.Pool(len(selected_targets)) as pool: + r = pool.map_async( + self.inventory_worker, [(self, target, nodes) for target in selected_targets] + ) + r.wait() + + # using nodes for reclass legacy code + nodes = dict(nodes) + + # using nodes for reclass legacy code + return {"nodes": nodes} + + @staticmethod + def inventory_worker(zipped_args): + start = time() + self, target, nodes = zipped_args + + try: + register_resolvers(self.inventory_path) + self.load_target(target) + nodes[target.name] = {"parameters": target.parameters} + except Exception as e: + logger.error(f"{target.name}: {e}") + return + + logger.info(f"Rendered {target.name} ({time()-start:.2f}s)") + + def migrate(self): + migrate(self.inventory_path) + + # ---------- + # private + # ---------- + def get_selected_targets(self): + selected_targets = [] + + # loop through targets searchpath and load all targets + for root, dirs, files in os.walk(self.targets_searchpath): + for target_file in files: + # split file extension and check if yml/yaml + target_path = os.path.join(root, target_file) + target_name, ext = os.path.splitext(target_file) + if ext not in (".yml", ".yaml"): + logger.debug(f"{target_file}: targets have to be .yml or .yaml files.") + continue + + # skip targets if they are not specified with -t flag + if self.targets and target_name not in self.targets: + continue + + # initialize target + target = InventoryTarget(target_name, target_path) + if self.compose_node_name: + target.name = target.composed_name + selected_targets.append(target) + + return selected_targets + + def load_target(self, target: InventoryTarget): + """ + load only one target with all its classes + """ + + # load the target parameters + target.classes, target.parameters = self.load_config(target.path) + + # load classes for targets + for class_name in target.classes: + inv_class = self.load_class(target, class_name) + if not inv_class: + # either redundantly defined or not found (with ignore_not_found: true) + continue + + params = deepcopy(inv_class.parameters) + target._merge(params) + target.classes += inv_class.dependents + + if not target.parameters: + # improve error msg + raise InventoryError("empty target") + + # resolve interpolations + target.add_metadata() + target._resolve() + + # obtain target name to insert in inv dict + vars_target_name = target.parameters.get("kapitan", {}).get("vars", {}).get("target") + if not vars_target_name: + # add hint to kapitan.vars.target + logger.warning(f"Could not resolve target name on target {target.name}") + + def load_class(self, target: InventoryTarget, class_name: str): + # resolve class path (has to be absolute) + class_path = os.path.join(self.classes_searchpath, *class_name.split(".")) + if class_path in target.classes_redundancy_check: + logger.debug(f"{class_path}: class {class_name} is redundantly defined") + return None + target.classes_redundancy_check.add(class_path) + + # search in inventory classes cache, otherwise load class + if class_name in self.classes_cache.keys(): + return self.classes_cache[class_name] + + # check if file exists + if os.path.isfile(class_path + ".yml"): + class_path += ".yml" + elif os.path.isdir(class_path): + # search for init file + init_path = os.path.join(self.classes_searchpath, *class_name.split("."), "init") + ".yml" + if os.path.isfile(init_path): + class_path = init_path + elif self.ignore_class_notfound: + logger.debug(f"Could not find {class_path}") + return None + else: + raise InventoryError(f"Class {class_name} not found.") + + # load classes recursively + classes, parameters = self.load_config(class_path) + + if not classes and not parameters: + return None + + # initialize inventory class + inv_class = InventoryClass(class_path) + inv_class.parameters = parameters + # resolve relative class names for new classes + for c in classes: + if c.startswith("."): + c = ".".join(class_name.split(".")[0:-1]) + c + inv_class.dependents.append(c) + + # add class to cache + self.classes_cache[class_name] = inv_class + + return inv_class + + def load_config(self, path: str): + with open(path, "r") as f: + f.seek(0) + config = yaml.load(f, yaml.SafeLoader) + + if not config: + logger.debug(f"{path}: file is empty") + return [], {} + classes = OmegaConf.create(config.get("classes", [])) + parameters = OmegaConf.create(config.get("parameters", {})) + + # add metadata to nodes + filename = os.path.splitext(os.path.split(path)[1])[0] + parameters._set_flag(["filename", "path"], [filename, path], recursive=True) + + return classes, parameters \ No newline at end of file diff --git a/kapitan/inventory/inv_omegaconf/migrate.py b/kapitan/inventory/inv_omegaconf/migrate.py new file mode 100644 index 000000000..1282b8b9c --- /dev/null +++ b/kapitan/inventory/inv_omegaconf/migrate.py @@ -0,0 +1,77 @@ +import os +import sys + +from regex import regex + +from kapitan.inventory.inventory import InventoryError + + +def migrate(inventory_path: str): + # FEAT: write migrations to temp dir and copy only if succeeded + + if os.path.exists(inventory_path): + if os.path.isdir(inventory_path): + migrate_dir(inventory_path) + elif os.path.isfile(inventory_path): + migrate_file(inventory_path) + else: + print(f"Error while migrating: inventory path at {inventory_path} does not exist") + + +def migrate_dir(path: str): + """migrates all .yml/.yaml files in the given path to omegaconfs syntax""" + + for root, _, files in os.walk(path): + for file in files: + file = os.path.join(root, file) + name, ext = os.path.splitext(file) + + if ext not in (".yml", ".yaml"): + continue + + try: + migrate_file(file) + except Exception as e: + InventoryError(f"{file}: error with migration: {e}") + + +def migrate_file(file: str): + with open(file, "r") as fp: + content = fp.read() + + updated_content = migrate_str(content) + + with open(file, "w") as fp: + fp.write(updated_content) + + +def migrate_str(content: str): + # FEAT: don't migrate custom resolvers + # FEAT: migrate interpolations with '.' in the keyname + + # search for interpolation pattern + # migrate path delimiter + # migrate meta data name + updated_content = regex.sub( + r"(?") + print(f"Migrating all .yml/.yaml files in {sys.argv[1]}") + migrate(sys.argv[1]) diff --git a/kapitan/inventory/inv_omegaconf/resolvers.py b/kapitan/inventory/inv_omegaconf/resolvers.py new file mode 100644 index 000000000..d2cb20ed6 --- /dev/null +++ b/kapitan/inventory/inv_omegaconf/resolvers.py @@ -0,0 +1,272 @@ +#!/usr/bin/env python3 + +# Copyright 2019 The Kapitan Authors +# SPDX-FileCopyrightText: 2020 The Kapitan Authors +# +# SPDX-License-Identifier: Apache-2.0 + +import copy +import logging +import os +import sys +from typing import Any + +from omegaconf import Container, ListMergeMode, Node, OmegaConf + +logger = logging.getLogger(__name__) + + +def key(_node_: Node): + """resolver function, that returns the name of its key""" + return _node_._key() + + +def parentkey(_parent_: Node): + """resolver function, that returns the name of its parent key""" + return _parent_._key() + + +def fullkey(_node_: Node): + """resolver function, that returns the full name of its key""" + return _node_._get_full_key("") + + +def access_key_with_dots(*key: str, _root_: Container): + """resolver function, that accesses a key with dots in it""" + value = _root_ + for part in key: + value = value[part] + + return value + + +def escape_interpolation(content: str): + """resolver function that escapes an interpolation for the next resolving step""" + return f"\\${{{content}}}" + + +def merge(*args): + """resolver function, that merges omegaconf objects""" + return OmegaConf.merge(*args, list_merge_mode=ListMergeMode.EXTEND) + + +def to_dict(input_list: list): + """resolver function, that converts an object to a dict""" + if type(input_list) is not list or OmegaConf.is_list(input_list): + # warning: input is not list + return input_list + + if not all([type(item) is list or OmegaConf.is_dict(item) for item in input_list]): + # warning: some items are not dict + return input_list + + return {dict_key: item[dict_key] for item in input_list for dict_key in item} + + +def to_list(input_obj: Any): + """resolver function, that converts an object to a list""" + if type(input_obj) is dict or OmegaConf.is_dict(input_obj): + return [{k: v} for k, v in input_obj.items()] + + return list(input_obj) + + +def default(*args): + output = "" + for arg in args[:-1]: + output += "${oc.select:" + str(arg) + "," + + output += str(args[-1]) + output += "}" * (len(args) - 1) + return output + + +def relpath(absolute_path: str, _node_: Node): + """ + resolver function, that translates an absolute yaml-path to its relative path + """ + + node_parts = [] + path_parts = absolute_path.split(".") + relative_path = "" + + i = 0 + node = _node_ + while node._key() is not None: + node_parts.append(node._key()) + node = node._get_parent() + i += 1 + + node_parts.reverse() + + for idx, (path_part, node_path) in enumerate(zip(path_parts, node_parts)): + if not path_part == node_path: + rel_prefix = "." * (i - idx) if idx != 0 else "" + relative_path = rel_prefix + ".".join(path_parts[idx:]) + break + + if not relative_path: + # warning: self reference + return "SELF REFERENCE DETECTED" + + relative_interpolation = "${" + relative_path + "}" + + return relative_interpolation + + +def write_to_key(destination: str, origin: str, _root_): + """ + resolver function to write any content to different place in the inventory + NOTE: Behavior for lists is not well-defined + """ + # fetch and resolve content + try: + content = OmegaConf.select(_root_, origin) + if not content: + # warning: origin could not be found / empty content + return "NOT FOUND" + + # resolve relative interpolations + try: + # replace with OC.to_object(), when it supports escaped interpolations (wip) + # reference: https://github.com/omry/omegaconf/pull/1113 + config = copy.deepcopy(content) + OmegaConf.resolve(config, True) + except Exception as e: + # resolver error + logger.warning(e) + return "ERROR WHILE RESOLVING" + + # remove when issue above is resolved + OmegaConf.set_readonly(config, False, recursive=True) + + # write resolved content back to _root_ + OmegaConf.update(_root_, destination, config, merge=True, force_add=True) + except Exception as e: + raise e + return "DONE" + + +def from_file(file_path: str): + if os.path.isfile(file_path): + with open(file_path, "r") as f: + return f.read() + else: + logger.error(f"from_file: file {file_path} does not exist") + return "FILE NOT EXISTS" + + +def filename(_node_: Node): + return _node_._get_flag("filename") + + +def parent_filename(_parent_: Node): + return _parent_._get_flag("filename") + + +def path(_node_: Node): + return _node_._get_flag("path") + + +def parent_path(_parent_: Node): + return _parent_._get_flag("path") + + +def condition_if(condition: str, config: dict): + if bool(condition): + return config + else: + return {} + + +def condition_if_else(condition: str, config_if: dict, config_else: dict): + if bool(condition): + return config_if + else: + return config_else + + +def condition_not(condition: str): + return not bool(condition) + + +def condition_and(*conditions: str): + return all(conditions) + + +def condition_or(*conditions: str): + return any(conditions) + + +def condition_equal(*configs): + return all(config == configs[0] for config in configs) + + +def register_resolvers(inventory_path: str) -> None: + """register pre-defined and user-defined resolvers""" + replace = True + + # yaml key utility functions + OmegaConf.register_new_resolver("key", key, replace=replace) + OmegaConf.register_new_resolver("parentkey", parentkey, replace=replace) + OmegaConf.register_new_resolver("fullkey", fullkey, replace=replace) + OmegaConf.register_new_resolver("relpath", relpath, replace=replace) + + # yaml object utility functions + OmegaConf.register_new_resolver("access", access_key_with_dots, replace=replace) + OmegaConf.register_new_resolver("escape", escape_interpolation, replace=replace) + OmegaConf.register_new_resolver("merge", merge, replace=replace) + OmegaConf.register_new_resolver("dict", to_dict, replace=replace) + OmegaConf.register_new_resolver("list", to_list, replace=replace) + OmegaConf.register_new_resolver("add", lambda x, y: x + y, replace=replace) + OmegaConf.register_new_resolver("default", default, replace=replace) + OmegaConf.register_new_resolver("write", write_to_key, replace=replace) + OmegaConf.register_new_resolver("from_file", from_file, replace=replace) + OmegaConf.register_new_resolver("filename", filename, replace=replace) + OmegaConf.register_new_resolver("parent_filename", parent_filename, replace=replace) + OmegaConf.register_new_resolver("path", path, replace=replace) + OmegaConf.register_new_resolver("parent_path", parent_path, replace=replace) + + # boolean algebra + OmegaConf.register_new_resolver("if", condition_if, replace=replace) + OmegaConf.register_new_resolver("ifelse", condition_if_else, replace=replace) + OmegaConf.register_new_resolver("and", condition_and, replace=replace) + OmegaConf.register_new_resolver("or", condition_or, replace=replace) + OmegaConf.register_new_resolver("not", condition_not, replace=replace) + OmegaConf.register_new_resolver("equal", condition_equal, replace=replace) + + # user defined resolvers + user_resolver_file = os.path.join(inventory_path, "resolvers.py") + if os.path.exists(user_resolver_file): + try: + register_user_resolvers(inventory_path) + except: + logger.warning(f"Couldn't import {os.path.join(inventory_path, 'resolvers.py')}") + + +def register_user_resolvers(inventory_path: str) -> None: + """import user resolvers specified in inventory/resolvers.py""" + try: + import_path = os.path.join(os.getcwd(), inventory_path) + sys.path.append(import_path) + from resolvers import pass_resolvers + + funcs = pass_resolvers() + except ImportError: + logger.warning("resolvers.py must contain function 'pass_resolvers()'") + return + except Exception as e: + logger.error(f"resolvers.py: {e}") + return + + if not isinstance(funcs, dict): + logger.warning("pass_resolvers() should return a dict") + return + + import resolvers + + for name, func in funcs.items(): + try: + OmegaConf.register_new_resolver(name, func, replace=True) + except: + logger.warning(f"Could not load resolver {name}") diff --git a/kapitan/inventory/inventory.py b/kapitan/inventory/inventory.py index c97d4d3e3..64d58411d 100644 --- a/kapitan/inventory/inventory.py +++ b/kapitan/inventory/inventory.py @@ -132,6 +132,12 @@ def render_targets(self, targets: list = None, ignore_class_notfound: bool = Fal """ raise NotImplementedError + def migrate(self): + """ + migrate the inventory, e.g. change interpolation syntax to new syntax + """ + pass + def __getitem__(self, key): return self.inventory[key] diff --git a/kapitan/resources.py b/kapitan/resources.py index 7079fa34d..ab4289ef0 100644 --- a/kapitan/resources.py +++ b/kapitan/resources.py @@ -327,6 +327,10 @@ def get_inventory(inventory_path) -> Inventory: logger.debug(f"Backend {backend_id} is unknown, falling back to reclass as inventory backend") inventory_backend = ReclassInventory(inventory_path) + # migrate inventory to selected inventory backend + if cached.args.get("migrate"): + inventory_backend.migrate() + inventory_backend.search_targets() cached.inv = inventory_backend