diff --git a/edk2toollib/database/__init__.py b/edk2toollib/database/__init__.py index 180888c38..34ebb4ae7 100644 --- a/edk2toollib/database/__init__.py +++ b/edk2toollib/database/__init__.py @@ -7,8 +7,4 @@ # SPDX-License-Identifier: BSD-2-Clause-Patent ## """Core classes and methods used to interact with the database module inside edk2-pytool-library.""" - -from tinydb import Query, where # noqa: F401 -from tinyrecord import transaction # noqa: F401 - -from .edk2_db import AdvancedQuery, Edk2DB, TableGenerator # noqa: F401 +from .edk2_db import Edk2DB # noqa: F401 diff --git a/edk2toollib/database/edk2_db.py b/edk2toollib/database/edk2_db.py index fd63f245c..c1ea32916 100644 --- a/edk2toollib/database/edk2_db.py +++ b/edk2toollib/database/edk2_db.py @@ -7,32 +7,29 @@ ## """A class for interacting with a database implemented using json.""" import logging +import sqlite3 import time -from typing import Any, List +from typing import Any -from tinydb import TinyDB -from tinydb.middlewares import CachingMiddleware -from tinydb.storages import JSONStorage, MemoryStorage -from tinydb.table import Document +from edk2toollib.uefi.edk2.path_utilities import Edk2Path +from edk2toollib.database.tables.base_table import TableGenerator -class Edk2DB(TinyDB): - """A subclass of TinyDB providing advanced queries and parser management. +CREATE_JUNCTION_TABLE = """ +CREATE TABLE IF NOT EXISTS junction ( + table1 TEXT, + key1 TEXT, + table2 TEXT, + key2 TEXT +) +""" + +class Edk2DB: + """A SQLite3 database manager for a EDKII workspace. This class provides the ability to register parsers that will create / update tables in the database while also providing the ability to run queries on the database. - Edk2DB can be run in three modes: - - 1. File Read/Write: A database will be loaded or created at the specified path. Any changes made will be written - to the database file. This is the slowest of the three modes. Specify with Edk2DB.FILE_RW - - 2. File Read Only: A database will be loaded at the specific path. Attempting to change the database will result - in an error. This is the middle of the three in terms of performance. Specify with Edk2DB.FILE_RO - - 3. In-Memory Read/Write: A database will be created in memory. Any changes made will only exist for the lifetime - of the database object. This is the fastest of the three modes. Specify with Edk2DB.MEM_RW - Edk2DB can, and should, be used as a context manager to ensure that the database is closed properly. If not using as a context manager, the `close()` method must be used to ensure that the database is closed properly and any changes are saved. @@ -41,73 +38,34 @@ class Edk2DB(TinyDB): not appending to the database, the entire database will be dropped before parsing. ```python - # Run using File storage from edk2toollib.database.parsers import * - with Edk2DB(Edk2DB.FILE_RW, pathobj=edk2path, db_path=Path("path/to/db.db")) as db: + with Edk2DB(Path("path/to/db.db"), edk2path) as db: db.register(Parser1(), Parser2(), Parser3()) db.parse() - - # Run using Memory storage - from edk2toollib.database.parsers import * - with Edk2DB(Edk2DB.MEM_RW, pathobj=edk2path) as db: - db.register(Parser1(), Parser2(), Parser3()) - db.parse() - - # Run some parsers in clear mode and some in append mode - from edk2toollib.database.parsers import * - with Edk2DB(Edk2DB.MEM_RW, pathobj=edk2path) as db: - db.register(Parser1()) - db.parse() - db.clear_parsers() - - db.register(Parser2(), Parser3()) - for env in env_list: - db.parse(env=env, append=True) - - # Run Queries on specific tables or on the database - from edk2toollib.database.queries import * - with Edk2DB(Edk2DB.FILE_RW, pathobj=edk2path, db_path=Path("path/to/db.db")) as db: - # Run a tinydb Query - # https://tinydb.readthedocs.io/en/latest/usage.html#queries - query_results = db.table("TABLENAME").search(Query().table_field == "value") - - # Run an advanced query - query_results = db.search(AdvancedQuerySubclass(config1 = "x", config2 = "y")) - """ - FILE_RW = 1 # Mode: File storage, Read & Write - FILE_RO = 2 # Mode: File storage, Read Only - MEM_RW = 3 # Mode: Memory storage, Read & Write - - def __init__(self, mode: int, **kwargs: dict[str,Any]): + """ + def __init__(self, db_path: str, pathobj: Edk2Path, **kwargs: dict[str,Any]): """Initializes the database. Args: - mode: The mode you are opening the database with Edk2DB.FILE_RW, Edk2DB.FILE_RO, Edk2DB.MEM_RW + db_path: Path to create or load the database from + pathobj: Edk2Path object for the workspace **kwargs: see Keyword Arguments Keyword Arguments: - db_path (str): Path to create or load the database from - pathobj (Edk2Path): Edk2Path object for the workspace - - !!! note - needing db_path or pathobj depends on the mode you are opening the database with. + None """ - self.pathobj = None + self.pathobj = pathobj self._parsers = [] + self.connection = sqlite3.connect(db_path) - if mode == Edk2DB.FILE_RW: - logging.debug("Database running in File Read/Write mode.") - super().__init__(kwargs.pop("db_path"), access_mode='r+', storage=CachingMiddleware(JSONStorage)) - self.pathobj = kwargs.pop("pathobj") - elif mode == Edk2DB.FILE_RO: - logging.debug("Database running in File ReadOnly mode.") - super().__init__(kwargs.pop("db_path"), access_mode='r', storage=CachingMiddleware(JSONStorage)) - elif mode == Edk2DB.MEM_RW: - logging.debug("Database running in In-Memory Read/Write mode.") - super().__init__(storage=MemoryStorage) - self.pathobj = kwargs.pop("pathobj") - else: - raise ValueError("Unknown Database mode.") + def __enter__(self): + """Enables the use of the `with` statement.""" + return self + + def __exit__(self, exc_type, exc_value, traceback): + """Enables the use of the `with` statement.""" + self.connection.commit() + self.connection.close() def register(self, *parsers: 'TableGenerator') -> None: """Registers a one or more table generators. @@ -120,98 +78,20 @@ def register(self, *parsers: 'TableGenerator') -> None: def clear_parsers(self) -> None: """Empties the list of registered table generators.""" - self._parsers.clear() + self._parsers = [] - def parse(self, append: bool=False) -> None: - """Runs all registered table parsers against the database. + def parse(self) -> None: + """Runs all registered table parsers against the database.""" + # Create the junction table + self.connection.execute(CREATE_JUNCTION_TABLE) - Args: - append: Whether to append to the database or clear it first - """ - if not append: - self.drop_tables() + # Create all tables + for parser in self._parsers: + parser.create_tables(self.connection.cursor()) + # Fill all tables for parser in self._parsers: logging.debug(f"[{parser.__class__.__name__}] starting...") - try: - t = time.time() - parser.parse(self) - except Exception as e: - logging.error(f"[{parser.__class__.__name__}] failed.") - logging.error(str(e)) - finally: - logging.debug(f"[{parser.__class__.__name__}] finished in {time.time() - t:.2f}s") - - def search(self, advanced_query: 'AdvancedQuery') -> List[Document]: - """Runs an advanced query against the database. - - Args: - advanced_query: The query to run - """ - return advanced_query.run(self) - - -class AdvancedQuery: - """An interface for an advanced query. - - One of TinyDB's limitations is that it does not support relationships between tables (i.e. Primary Key / Foreign - Key and JOINs). This means these types of queries are more complicated and require additional steps. An advanced - Query is a conceptual way to grouping these extra steps in a single place and providing a single line interface - to execute the more advanced query. - - ```python - # An example of a simple query, an interface provided by TinyDB to run a single query against a single table - db.table('table_name').search(Query().field == 'value' & Query().field2 == 'value2') - - # An example of an advanced query, which is run at the database level instead of the table level and can - # run multiple queries - db.query(MyAdvancedQuery(config1 = "a", config2 = "b")) - ``` - """ - def __init__(self, *args, **kwargs) -> None: - """Initialize the query with the specific settings.""" - - def run(self, db: Edk2DB) -> any: - """Run the query against the database.""" - raise NotImplementedError - - def columns(self, column_list: list[str], documents: list[Document], ): - """Given a list of Documents, return it with only the specified columns.""" - filtered_list = [] - for document in documents: - filtered_dict = {k: v for k, v in document.items() if k in column_list} - filtered_list.append(Document(filtered_dict, document.doc_id)) - return filtered_list - - -class TableGenerator: - """An interface for a parser that Generates an Edk2DB table. - - Allows you to parse a workspace, file, etc, and load the contents into the database as rows in a table. - - As Edk2DB is a subclass of TinyDB, it uses the same interface to interact with the database. This documentation - can be found here: https://tinydb.readthedocs.io/en/latest/usage.html#handling-data. While TinyDB provides a - default table to write to, it is suggested that a table be created for each parser using `db.table('table_name')` - - Common commands: - - `table = db.table('table_name')` Get or create a table from the database - - `table.insert(dict)` Insert a new entry into the table - - `table.insert_multiple([dict1, dict2, ...])` Insert multiple entries into the table - - !!! warning - Inserting many large entries into the database is slow! If you need to insert many entries, use tinyrecord's - transaction method which uses a record-first then execute architecture that minimizes the time we are in a - threadlock. This has been seen to cut insertion times by 90% for typical purposes. - - ```python - from tinyrecord import transaction - with transaction(table) as tr: - tr.insert_multiple - ``` - """ - def __init__(self, *args, **kwargs): - """Initialize the query with the specific settings.""" - - def parse(self, db: Edk2DB) -> None: - """Execute the parser and update the database.""" - raise NotImplementedError + time.time() + parser.parse(self.connection.cursor(), self.pathobj) + self.connection.commit() diff --git a/edk2toollib/database/queries/__init__.py b/edk2toollib/database/queries/__init__.py deleted file mode 100644 index 19ce077ce..000000000 --- a/edk2toollib/database/queries/__init__.py +++ /dev/null @@ -1,14 +0,0 @@ -## -# Copyright (c) Microsoft Corporation -# -# SPDX-License-Identifier: BSD-2-Clause-Patent -## -"""This file exists to satisfy pythons packaging requirements. - -Read more: https://docs.python.org/3/reference/import.html#regular-packages -""" - -from .component_query import ComponentQuery # noqa: F401 -from .library_query import LibraryQuery # noqa: F401 -from .license_query import LicenseQuery # noqa: F401 -from .unused_component_query import UnusedComponentQuery # noqa: F401 diff --git a/edk2toollib/database/queries/component_query.py b/edk2toollib/database/queries/component_query.py deleted file mode 100644 index 543e37dc0..000000000 --- a/edk2toollib/database/queries/component_query.py +++ /dev/null @@ -1,48 +0,0 @@ -# @file component_query.py -# A Query that reads the database and returns information about an instanced component. -## -# Copyright (c) Microsoft Corporation -# -# SPDX-License-Identifier: BSD-2-Clause-Patent -## -"""A Query that reads the database and returns information about an instanced component.""" -from edk2toollib.database import AdvancedQuery, Edk2DB, Query - - -class ComponentQuery(AdvancedQuery): - """A query that provides information about an instanced Component.""" - def __init__(self, *args, **kwargs) -> None: - """Initializes the ComponentQuery with the specified kwargs. - - !!! note - If the database stores multiple builds of data with different environments, - that environment information should be stored in a `environment` table, and - that should be linked in the `instanced_inf` via a ENV column. - - Arguments: - args (any): non-keyword arguments - kwargs (any): keyword arguments expanded below. - - Keyword Arguments: - component (string): The component to get info on; returns all components if empty - env_idx (int): The index in the `environment` table that represents - the environment used for parsing. - - """ - self.component = kwargs.get('component', "") - self.env_id = kwargs.get('env_id', None) - - def run(self, db: Edk2DB): - """Runs the query.""" - table_name = "instanced_inf" - table = db.table(table_name) - - if self.env_id is not None: - entries = table.search((Query().PATH.search(self.component)) - & ~(Query().COMPONENT.exists()) - & (Query().ENVIRONMENT_ID == self.env_id)) - else: - entries = table.search((Query().PATH.search(self.component)) - & ~(Query().COMPONENT.exists())) - - return self.columns(["NAME", "MODULE_TYPE", "ARCH", "LIBRARIES_USED"], entries) diff --git a/edk2toollib/database/queries/library_query.py b/edk2toollib/database/queries/library_query.py deleted file mode 100644 index b0acbfb53..000000000 --- a/edk2toollib/database/queries/library_query.py +++ /dev/null @@ -1,32 +0,0 @@ -# @file library_query.py -# A Query that reads the database and returns all instances of a given LIBRARY_CLASS -## -# Copyright (c) Microsoft Corporation -# -# SPDX-License-Identifier: BSD-2-Clause-Patent -## -"""A Query that reads the database and returns all instances of a given LIBRARY_CLASS.""" -from edk2toollib.database import AdvancedQuery, Edk2DB, Query - - -class LibraryQuery(AdvancedQuery): - """A query that generates a list of library instances for a given library.""" - def __init__(self, *args, **kwargs) -> None: - """Initializes the LibraryQuery with the specified kwargs. - - Arguments: - args (any): non-keyword arguments - kwargs (any): keyword arguments expanded below. - - Keyword Arguments: - library (string): The library to get all instances of. - """ - self.library = kwargs.get('library', "") - - def run(self, db: Edk2DB): - """Runs the query.""" - table_name = "inf" - table = db.table(table_name) - - result = table.search((Query().LIBRARY_CLASS != "") & (Query().LIBRARY_CLASS.matches(self.library))) - return self.columns(["LIBRARY_CLASS", "PATH"], result) diff --git a/edk2toollib/database/queries/license_query.py b/edk2toollib/database/queries/license_query.py deleted file mode 100644 index 408da3a18..000000000 --- a/edk2toollib/database/queries/license_query.py +++ /dev/null @@ -1,46 +0,0 @@ -# @file license_query.py -# A Query that reads the database and returns files missing a license identifier. -## -# Copyright (c) Microsoft Corporation -# -# SPDX-License-Identifier: BSD-2-Clause-Patent -## -# ruff: noqa: F811 -"""A Query that reads the database and returns files missing a license identifier.""" -from edk2toollib.database import AdvancedQuery, Edk2DB, Query - - -class LicenseQuery(AdvancedQuery): - """A Query that reads the database and returns files missing a license identifier.""" - def __init__(self, *args, **kwargs) -> None: - """Initializes the LicenseQuery with the specified kwargs. - - Arguments: - args (any): non-keyword arguments - kwargs (any): keyword arguments expanded below. - - Keyword Arguments: - include (list[str]): A list of strings to search for in the file path name - exclude (list[str]): A list of strings to exclude in the file path name - """ - self.include = kwargs.get('include', None) - self.exclude = kwargs.get('exclude', None) - - if isinstance(self.include, str): - self.include = [self.include] - if isinstance(self.exclude, str): - self.exclude = [self.exclude] - - def run(self, db: Edk2DB): - """Runs the query.""" - table = db.table("source") - - regex = "^" - if self.include: - regex += f"(?=.*({'|'.join(self.include)}))" - if self.exclude: - regex += f"(?!.*({'|'.join(self.exclude)})).*$" - - result = table.search((Query().LICENSE == "") & (Query().PATH.search(regex))) - - return self.columns(['PATH', 'LICENSE'], result) diff --git a/edk2toollib/database/queries/unused_component_query.py b/edk2toollib/database/queries/unused_component_query.py deleted file mode 100644 index d39494cb1..000000000 --- a/edk2toollib/database/queries/unused_component_query.py +++ /dev/null @@ -1,94 +0,0 @@ -# @file unused_component_query.py -# A Query that reads the database and returns all components and libraries defined in the DSC but unused in the FDF. -## -# Copyright (c) Microsoft Corporation -# -# SPDX-License-Identifier: BSD-2-Clause-Patent -## -"""A Query that reads the database and returns all components / libraries defined in the DSC but unused in the FDF.""" -from typing import Union - -from edk2toollib.database import AdvancedQuery, Edk2DB, Query - - -class UnusedComponentQuery(AdvancedQuery): - """A query that returns any unused components for a specific build.""" - def __init__(self, *args, **kwargs) -> None: - """Initializes the UnusedComponentQuery with the specified kwargs. - - !!! note - If the database stores multiple builds of data with different environments, - that environment information should be stored in a `environment` table, and - that should be linked in the `instanced_inf` via a ENV column. - - Arguments: - args (any): non-keyword arguments - kwargs (any): keyword arguments expanded below. - - Keyword Arguments: - ignore_app (bool): Whether to ingore UEFI_APPLICATIONs or not - env_idx (int): The index in the `environment` table that represents - the environment to use for parsing. - """ - self.ignore_app = kwargs.get('ignore_app', False) - self.env_id = kwargs.get('env_id', None) - - def run(self, db: Edk2DB) -> Union[str, str]: - """Returns (unused_components, unused_libraries).""" - dsc_infs = db.table("instanced_inf") - fdf_fvs = db.table("instanced_fv") - - dsc_components = [] - fdf_components = [] - - if self.env_id is not None: - dsc_rows = dsc_infs.search((~Query().COMPONENT.exists()) & (Query().ENVIRONMENT_ID == self.env_id)) - fv_rows = fdf_fvs.search(Query().ENVIRONMENT_ID == self.env_id) - else: - dsc_rows = dsc_infs.search(~Query().COMPONENT.exists()) - fv_rows = fdf_fvs.all() - - # Grab all components in the DSC - for entry in dsc_rows: - if self.ignore_app and entry["MODULE_TYPE"] == "UEFI_APPLICATION": - continue - dsc_components.append(entry["PATH"]) - - # Grab all components from the fdf - for fv in fv_rows: - fdf_components.extend(fv["INF_LIST"]) - - unused_components = set(dsc_components) - set(fdf_components) - used_components = set(fdf_components) - - unused_library_list = [] - used_library_list = [] - - # Grab all libraries used by unused_components - for component in unused_components: - self._recurse_inf(component, dsc_infs, unused_library_list) - - # Grab all libraries used by used_components - for component in used_components: - self._recurse_inf(component, dsc_infs, used_library_list) - - unused_libraries = set(unused_library_list) - set(used_library_list) - - return (list(unused_components), list(unused_libraries)) - - def _recurse_inf(self, inf, table, library_list): - if inf in library_list: - return - - if self.env_id is not None: - search_results = table.search((Query().PATH == inf) & (Query().ENVIRONMENT_ID == self.env_id)) - else: - search_results = table.search(Query().PATH == inf) - - for result in search_results: - # Only mark a inf as visited if it is a library - if "COMPONENT" in result: - library_list.append(inf) - - for inf in result["LIBRARIES_USED"]: - self._recurse_inf(inf[1], table, library_list) diff --git a/edk2toollib/database/tables/__init__.py b/edk2toollib/database/tables/__init__.py index 5984a9555..67bd7c293 100644 --- a/edk2toollib/database/tables/__init__.py +++ b/edk2toollib/database/tables/__init__.py @@ -4,9 +4,9 @@ # SPDX-License-Identifier: BSD-2-Clause-Patent ## """A collection of table generators that run against the workspace.""" - from .environment_table import EnvironmentTable # noqa: F401 from .inf_table import InfTable # noqa: F401 from .instanced_fv_table import InstancedFvTable # noqa: F401 from .instanced_inf_table import InstancedInfTable # noqa: F401 from .source_table import SourceTable # noqa: F401 +from .package_table import PackageTable # noqa: F401 diff --git a/edk2toollib/database/tables/base_table.py b/edk2toollib/database/tables/base_table.py new file mode 100644 index 000000000..62ed7d996 --- /dev/null +++ b/edk2toollib/database/tables/base_table.py @@ -0,0 +1,24 @@ +import sqlite3 + +from edk2toollib.uefi.edk2.path_utilities import Edk2Path + + +class TableGenerator: + """An interface for a parser that Generates an Edk2DB table. + + Allows you to parse a workspace, file, etc, and load the contents into the database as rows in a table. + + Edk2Db provides a connection to a sqlite3 database and will commit any changes made during `parse` once + the parser has finished executing and has returned. Review sqlite3 documentation for more information on + how to interact with the database. + """ + def __init__(self, *args, **kwargs): + """Initialize the query with the specific settings.""" + + def create_tables(self, db_cursor: sqlite3.Cursor) -> None: + """Create the tables necessary for this parser.""" + raise NotImplementedError + + def parse(self, db_cursor: sqlite3.Cursor, pathobj: Edk2Path) -> None: + """Execute the parser and update the database.""" + raise NotImplementedError diff --git a/edk2toollib/database/tables/environment_table.py b/edk2toollib/database/tables/environment_table.py index 4eea69d68..210191282 100644 --- a/edk2toollib/database/tables/environment_table.py +++ b/edk2toollib/database/tables/environment_table.py @@ -6,46 +6,56 @@ # SPDX-License-Identifier: BSD-2-Clause-Patent ## """A module to run a table generator that creates or appends to a table with environment information.""" -from datetime import date - -from tinyrecord import transaction - -from edk2toollib.database import Edk2DB, TableGenerator - +import datetime +import sqlite3 +import uuid + +import git + +from edk2toollib.database.tables.base_table import TableGenerator +from edk2toollib.uefi.edk2.path_utilities import Edk2Path + +CREATE_ENV_TABLE_COMMAND = ''' +CREATE TABLE IF NOT EXISTS environment ( + id TEXT PRIMARY KEY, + date TEXT, + version TEXT +); +''' + +CREATE_ENV_VALUES_TABLE_COMMAND = ''' +CREATE TABLE IF NOT EXISTS environment_values ( + id TEXT, + key TEXT, + value TEXT, + FOREIGN KEY (id) REFERENCES environment(id) +); +''' class EnvironmentTable(TableGenerator): - """A Workspace parser that records import environment information for a given parsing execution. - - Generates a table with the following schema: - - - ``` py - table_name = "environment" - |--------------------------------------| - | DATE | VERSION | ENV | PACKAGES_PATH | - |--------------------------------------| - ``` - """ # noqa: E501 + """A Workspace parser that records import environment information for a given parsing execution.""" # noqa: E501 def __init__(self, *args, **kwargs): """Initialize the query with the specific settings.""" - self.env = kwargs.pop("env") + self.env = kwargs.pop("env", {}) + + def create_tables(self, db_cursor: sqlite3.Cursor) -> None: + """Create the tables necessary for this parser.""" + db_cursor.execute(CREATE_ENV_VALUES_TABLE_COMMAND) + db_cursor.execute(CREATE_ENV_TABLE_COMMAND) - def parse(self, db: Edk2DB) -> None: + def parse(self, db_cursor: sqlite3.Cursor, pathobj: Edk2Path) -> None: """Parses the environment and adds the data to the table.""" - table_name = 'environment' - table = db.table(table_name, cache_size=None) - today = date.today() - - # Pull out commonly used environment variables as their own entry rather than in the dict. - version = self.env.pop('VERSION', "UNKNOWN") - pp = self.env.pop('PACKAGES_PATH', []) - - entry = { - "DATE": str(today), - "VERSION": version, - "ENV": self.env, - "PACKAGES_PATH": pp, - } - - with transaction(table) as tr: - tr.insert(entry) + dtime = datetime.datetime.now() + + try: + version = git.Repo(pathobj.WorkspacePath).head.commit.hexsha + except git.InvalidGitRepositoryError: + version = "UNKNOWN" + + # Insert into environment table + entry = (str(uuid.uuid4().hex),str(dtime),version,) + db_cursor.execute("INSERT INTO environment (id, date,version) VALUES (?, ?, ?)", entry) + + # Insert into environment_values table + data = [(db_cursor.lastrowid, key, value) for key, value in self.env.items()] + db_cursor.executemany("INSERT INTO environment_values VALUES (?, ?, ?)", data) diff --git a/edk2toollib/database/tables/inf_table.py b/edk2toollib/database/tables/inf_table.py index a79353282..b3019836a 100644 --- a/edk2toollib/database/tables/inf_table.py +++ b/edk2toollib/database/tables/inf_table.py @@ -10,26 +10,42 @@ import logging import time from pathlib import Path +from sqlite3 import Cursor from joblib import Parallel, delayed -from tinyrecord import transaction -from edk2toollib.database import Edk2DB, TableGenerator +from edk2toollib.database.tables.base_table import TableGenerator from edk2toollib.uefi.edk2.parsers.inf_parser import InfParser as InfP +from edk2toollib.uefi.edk2.path_utilities import Edk2Path +CREATE_INF_TABLE = ''' +CREATE TABLE IF NOT EXISTS inf ( + path TEXT PRIMARY KEY, + guid TEXT, + library_class TEXT, + package TEXT +); +''' -class InfTable(TableGenerator): - """A Table Generator that parses all INF files in the workspace and generates a table. +CREATE_LIBRARY_CLASS_TABLE = ''' +CREATE TABLE IF NOT EXISTS library_class ( + class TEXT +) +''' + +INSERT_JUNCTION_ROW = ''' +INSERT INTO junction (table1, key1, table2, key2) +VALUES (?, ?, ?, ?) +''' - Generates a table with the following schema: +INSERT_INF_ROW = ''' +INSERT OR REPLACE INTO inf (path, guid, library_class, package) +VALUES (?, ?, ?, ?) +''' - ``` py - table_name = "inf" - |----------------------------------------------------------------------------------------------------------------------------| - | GUID | LIBRARY_CLASS | PATH | PHASES | SOURCES_USED | LIBRARIES_USED | PROTOCOLS_USED | GUIDS_USED | PPIS_USED | PCDS_USED | - |----------------------------------------------------------------------------------------------------------------------------| - ``` - """ # noqa: E501 +class InfTable(TableGenerator): + """A Table Generator that parses all INF files in the workspace and generates a table.""" + # TODO: Add phase, protocol, guid, ppi, pcd tables and associations once necessary def __init__(self, *args, **kwargs): """Initializes the INF Table Parser. @@ -42,24 +58,36 @@ def __init__(self, *args, **kwargs): """ self.n_jobs = kwargs.get("n_jobs", -1) - def parse(self, db: Edk2DB) -> None: + def create_tables(self, db_cursor: Cursor) -> None: + """Create the tables necessary for this parser.""" + db_cursor.execute(CREATE_INF_TABLE) + db_cursor.execute(CREATE_LIBRARY_CLASS_TABLE) + + def parse(self, db_cursor: Cursor, pathobj: Edk2Path) -> None: """Parse the workspace and update the database.""" - ws = Path(db.pathobj.WorkspacePath) - inf_table = db.table("inf", cache_size=None) + ws = Path(pathobj.WorkspacePath) inf_entries = [] start = time.time() files = list(ws.glob("**/*.inf")) files = [file for file in files if not file.is_relative_to(ws / "Build")] - inf_entries = Parallel(n_jobs=self.n_jobs)(delayed(self._parse_file)(ws, fname, db.pathobj) for fname in files) + inf_entries = Parallel(n_jobs=self.n_jobs)(delayed(self._parse_file)(fname, pathobj) for fname in files) logging.debug( f"{self.__class__.__name__}: Parsed {len(inf_entries)} .inf files took; " f"{round(time.time() - start, 2)} seconds.") - with transaction(inf_table) as tr: - tr.insert_multiple(inf_entries) + for inf_entry in inf_entries: + db_cursor.execute( + INSERT_INF_ROW, + (inf_entry["PATH"], inf_entry["GUID"], inf_entry["LIBRARY_CLASS"], inf_entry["PACKAGE"]) + ) + for library in inf_entry["LIBRARIES_USED"]: + db_cursor.execute(INSERT_JUNCTION_ROW, ("inf", inf_entry["PATH"], "library_class", library)) + for source in inf_entry["SOURCES_USED"]: + source_path = pathobj.GetEdk2RelativePathFromAbsolutePath((Path(inf_entry["PATH"]).parent / source).resolve().as_posix()) + db_cursor.execute(INSERT_JUNCTION_ROW, ("inf", inf_entry["PATH"], "source", source_path)) - def _parse_file(self, ws, filename, pathobj) -> dict: + def _parse_file(self, filename, pathobj) -> dict: inf_parser = InfP().SetEdk2Path(pathobj) inf_parser.ParseFile(filename) @@ -79,5 +107,6 @@ def _parse_file(self, ws, filename, pathobj) -> dict: data["GUIDS_USED"] = inf_parser.GuidsUsed data["PPIS_USED"] = inf_parser.PpisUsed data["PCDS_USED"] = inf_parser.PcdsUsed + data["PACKAGE"] = pkg return data diff --git a/edk2toollib/database/tables/instanced_fv_table.py b/edk2toollib/database/tables/instanced_fv_table.py index 4f2534890..f0bcac1cc 100644 --- a/edk2toollib/database/tables/instanced_fv_table.py +++ b/edk2toollib/database/tables/instanced_fv_table.py @@ -8,26 +8,35 @@ ## """A module to generate a table containing fv information.""" import re +import sqlite3 from pathlib import Path -from tinyrecord import transaction - -from edk2toollib.database import Edk2DB, TableGenerator +from edk2toollib.database.tables.base_table import TableGenerator from edk2toollib.uefi.edk2.parsers.fdf_parser import FdfParser as FdfP +from edk2toollib.uefi.edk2.path_utilities import Edk2Path +CREATE_INSTANCED_FV_TABLE = """ +CREATE TABLE IF NOT EXISTS instanced_fv ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + env INTEGER, + fv_name TEXT, + fdf TEXT, + path TEXT +) +""" -class InstancedFvTable(TableGenerator): - """A Table Generator that parses a single FDF file and generates a table containing FV information. +INSERT_INSTANCED_FV_ROW = """ +INSERT INTO instanced_fv (env, fv_name, fdf, path) +VALUES (?, ?, ?, ?) +""" - Generates a table with the following schema: +INSERT_JUNCTION_ROW = ''' +INSERT INTO junction (table1, key1, table2, key2) +VALUES (?, ?, ?, ?) +''' - ``` py - table_name = "instanced_fv" - |------------------------------------------------------| - | FV_NAME | FDF | PATH | TARGET | INF_LIST | FILE_LIST | - |------------------------------------------------------| - ``` - """ # noqa: E501 +class InstancedFvTable(TableGenerator): + """A Table Generator that parses a single FDF file and generates a table containing FV information.""" # noqa: E501 RULEOVERRIDE = re.compile(r'RuleOverride\s*=.+\s+(.+\.inf)', re.IGNORECASE) @@ -39,20 +48,21 @@ def __init__(self, *args, **kwargs): self.arch = self.env["TARGET_ARCH"].split(" ") self.target = self.env["TARGET"] - def parse(self, db: Edk2DB) -> None: + def create_tables(self, db_cursor: sqlite3.Cursor) -> None: + """Create the tables necessary for this parser.""" + db_cursor.execute(CREATE_INSTANCED_FV_TABLE) + + def parse(self, db_cursor: sqlite3.Cursor, pathobj: Edk2Path) -> None: """Parse the workspace and update the database.""" - self.pathobj = db.pathobj + self.pathobj = pathobj self.ws = Path(self.pathobj.WorkspacePath) # Our DscParser subclass can now parse components, their scope, and their overrides fdfp = FdfP().SetEdk2Path(self.pathobj) fdfp.SetInputVars(self.env) fdfp.ParseFile(self.fdf) - - table_name = 'instanced_fv' - table = db.table(table_name, cache_size=None) - - entry_list = [] + count = 0 + env = db_cursor.execute("SELECT id FROM environment ORDER BY date DESC LIMIT 1").fetchone()[0] for fv in fdfp.FVs: inf_list = [] # Some INF's start with RuleOverride. We only need the INF @@ -63,13 +73,10 @@ def parse(self, db: Edk2DB) -> None: inf = str(Path(self.pathobj.GetEdk2RelativePathFromAbsolutePath(inf))) inf_list.append(Path(inf).as_posix()) - entry_list.append({ - "FV_NAME": fv, - "FDF": Path(self.fdf).name, - "PATH": self.fdf, - "INF_LIST": inf_list, - "FILE_LIST": fdfp.FVs[fv]["Files"] - }) - - with transaction(table) as tr: - tr.insert_multiple(entry_list) + db_cursor.execute(INSERT_INSTANCED_FV_ROW, (env, fv, Path(self.fdf).name, self.fdf)) + fv_id = db_cursor.lastrowid + count += len(inf_list) + for inf in inf_list: + print(inf) + # db_cursor.execute(INSERT_JUNCTION_ROW, ("instanced_fv", fv_id, "inf", inf)) + print(count) diff --git a/edk2toollib/database/tables/instanced_inf_table.py b/edk2toollib/database/tables/instanced_inf_table.py index 2d3a9eae0..f9a98a31a 100644 --- a/edk2toollib/database/tables/instanced_inf_table.py +++ b/edk2toollib/database/tables/instanced_inf_table.py @@ -10,26 +10,45 @@ import logging import re from pathlib import Path +from sqlite3 import Cursor -from tinyrecord import transaction - -from edk2toollib.database.edk2_db import Edk2DB, TableGenerator +from edk2toollib.database.tables.base_table import TableGenerator from edk2toollib.uefi.edk2.parsers.dsc_parser import DscParser as DscP from edk2toollib.uefi.edk2.parsers.inf_parser import InfParser as InfP - +from edk2toollib.uefi.edk2.path_utilities import Edk2Path + +CREATE_INSTANCED_INF_TABLE = ''' +CREATE TABLE IF NOT EXISTS instanced_inf ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + env INTEGER, + path TEXT, + class TEXT, + name TEXT, + arch TEXT, + dsc TEXT, + component TEXT, + FOREIGN KEY(env) REFERENCES environment(env) +) +''' + +INSERT_INSTANCED_INF_ROW = ''' +INSERT INTO instanced_inf (env, path, class, name, arch, dsc, component) +VALUES (?, ?, ?, ?, ?, ?, ?) +''' + +INSERT_JUNCTION_ROW = ''' +INSERT INTO junction (table1, key1, table2, key2) +VALUES (?, ?, ?, ?) +''' + +GET_ROW_ID = ''' +SELECT id FROM instanced_inf +WHERE env = ? and path = ? and dsc = ? and (class = ? OR class IS NULL) +LIMIT 1 +''' class InstancedInfTable(TableGenerator): - """A Table Generator that parses a single DSC file and generates a table. - - Generates a table with the following schema: - - ``` py - table_name = "instanced_inf" - |----------------------------------------------------------------------------------------------------------------------------------------------| - | DSC | PATH | NAME | LIBRARY_CLASS | COMPONENT | MODULE_TYPE | ARCH | SOURCES_USED | LIBRARIES_USED | PROTOCOLS_USED | GUIDS_USED | PCDS_USED | - |----------------------------------------------------------------------------------------------------------------------------------------------| - ``` - """ # noqa: E501 + """A Table Generator that parses a single DSC file and generates a table.""" SECTION_LIBRARY = "LibraryClasses" SECTION_COMPONENT = "Components" SECTION_REGEX = re.compile(r"\[(.*)\]") @@ -43,6 +62,10 @@ def __init__(self, *args, **kwargs): self.arch = self.env["TARGET_ARCH"].split(" ") # REQUIRED self.target = self.env["TARGET"] # REQUIRED + def create_tables(self, db_cursor: Cursor) -> None: + """Create the tables necessary for this parser.""" + db_cursor.execute(CREATE_INSTANCED_INF_TABLE) + # Prevent parsing the same INF multiple times self._parsed_infs = {} @@ -56,9 +79,9 @@ def inf(self, inf: str) -> InfP: self._parsed_infs[inf] = infp return infp - def parse(self, db: Edk2DB) -> None: + def parse(self, db_cursor: Cursor, pathobj: Edk2Path) -> None: """Parse the workspace and update the database.""" - self.pathobj = db.pathobj + self.pathobj = pathobj self.ws = Path(self.pathobj.WorkspacePath) # Our DscParser subclass can now parse components, their scope, and their overrides @@ -83,10 +106,26 @@ def parse(self, db: Edk2DB) -> None: if Path(entry["PATH"]).is_absolute(): entry["PATH"] = self.pathobj.GetEdk2RelativePathFromAbsolutePath(entry["PATH"]) - table_name = 'instanced_inf' - table = db.table(table_name, cache_size=None) - with transaction(table) as tr: - tr.insert_multiple(inf_entries) + env = db_cursor.execute("SELECT id FROM environment ORDER BY date DESC LIMIT 1").fetchone()[0] + + # add instanced_inf entries + for entry in inf_entries: + db_cursor.execute( + INSERT_INSTANCED_INF_ROW, + (env, entry["PATH"], entry["LIBRARY_CLASS"], entry["NAME"], entry["ARCH"], entry["DSC"], entry["COMPONENT"]) + ) + + # # add junction entries + for entry in inf_entries: + inf_id = db_cursor.execute(GET_ROW_ID, (env, entry["PATH"], entry["DSC"], entry["LIBRARY_CLASS"])).fetchone()[0] + for source in entry["SOURCES_USED"]: + db_cursor.execute(INSERT_JUNCTION_ROW, ("instanced_inf", inf_id, "source", source)) + for cls, instance in entry["LIBRARIES_USED"]: + if instance is None: + used_inf_id = None + else: + used_inf_id = db_cursor.execute(GET_ROW_ID, (env, instance, entry["DSC"], cls)).fetchone()[0] + db_cursor.execute(INSERT_JUNCTION_ROW, ("instanced_inf", inf_id, "instanced_inf", used_inf_id)) def _build_inf_table(self, dscp: DscP): @@ -104,17 +143,24 @@ def _build_inf_table(self, dscp: DscP): if "MODULE_TYPE" in infp.Dict: scope += f".{infp.Dict['MODULE_TYPE']}".lower() - inf_entries += self._parse_inf_recursively(inf, inf, dscp.ScopedLibraryDict, overrides, scope, []) + inf_entries += self._parse_inf_recursively(inf, None, inf, dscp.ScopedLibraryDict, overrides, scope, []) - # Move entries to correct table - for entry in inf_entries: - if entry["PATH"] == entry["COMPONENT"]: - del entry["COMPONENT"] + # # Move entries to correct table + # for entry in inf_entries: + # if entry["PATH"] == entry["COMPONENT"]: + # entry["COMPONENT"] = None return inf_entries def _parse_inf_recursively( - self, inf: str, component: str, library_dict: dict, override_dict: dict, scope: str, visited): + self, + inf: str, + lib_cls: str, + component: str, + library_dict: dict, + override_dict: dict, + scope: str, + visited: list[str]): """Recurses down all libraries starting from a single INF. Will immediately return if the INF has already been visited. @@ -147,14 +193,19 @@ def _parse_inf_recursively( library_instance_list.append(null_lib) library_class_list.append("NULL") - to_parse = list(filter(lambda lib: lib is not None, library_instance_list)) + # to_parse = list(filter(lambda lib: lib is not None, library_instance_list)) - # Time to visit in libraries that we have not visited yet. - to_return = [] - for library in filter(lambda lib: lib not in visited, to_parse): - to_return += self._parse_inf_recursively(library, component, - library_dict, override_dict, scope, visited) + # # Time to visit in libraries that we have not visited yet. + # to_return = [] + # for library in filter(lambda lib: lib not in visited, to_parse): + # to_return += self._parse_inf_recursively(library, component, + # library_dict, override_dict, scope, visited) + to_return = [] + for cls, instance in zip(library_class_list, library_instance_list): + if instance is None or instance in visited: + continue + to_return += self._parse_inf_recursively(instance, cls, component, library_dict, override_dict, scope, visited) # Transform path to edk2 relative form (POSIX) def to_posix(path): if path is None: @@ -166,8 +217,9 @@ def to_posix(path): to_return.append({ "DSC": Path(self.dsc).name, "PATH": Path(inf).as_posix(), + "GUID": infp.Dict.get("FILE_GUID", ""), "NAME": infp.Dict["BASE_NAME"], - "LIBRARY_CLASS": infp.LibraryClass, + "LIBRARY_CLASS": lib_cls, "COMPONENT": Path(component).as_posix(), "MODULE_TYPE": infp.Dict["MODULE_TYPE"], "ARCH": scope.split(".")[0].upper(), diff --git a/edk2toollib/database/tables/package_table.py b/edk2toollib/database/tables/package_table.py new file mode 100644 index 000000000..a2daffb35 --- /dev/null +++ b/edk2toollib/database/tables/package_table.py @@ -0,0 +1,59 @@ +# @file repository_table.py +# A module to associate the packages in a workspace with the repositories they come from. +## +# Copyright (c) Microsoft Corporation +# +# SPDX-License-Identifier: BSD-2-Clause-Patent +## +"""A module to generate a table containing information about a package.""" +from pathlib import Path +from edk2toollib.database.tables.base_table import TableGenerator +from edk2toollib.uefi.edk2.path_utilities import Edk2Path +from sqlite3 import Cursor +import git + +CREATE_PACKAGE_TABLE = """ +CREATE TABLE IF NOT EXISTS package ( + name TEXT PRIMARY KEY, + repository TEXT +) +""" + +INSERT_PACKAGE_ROW = """ +INSERT OR REPLACE INTO package (name, repository) +VALUES (?, ?) +""" +class PackageTable(TableGenerator): + """A Table Generator that associates packages with their repositories.""" + def __init__(self, *args, **kwargs): + """Initializes the Repository Table Parser. + + Args: + args (any): non-keyword arguments + kwargs (any): keyword arguments described below + + Keyword Arguments: + None + """ + + def create_tables(self, db_cursor: Cursor) -> None: + """Create the table necessary for this parser.""" + db_cursor.execute(CREATE_PACKAGE_TABLE) + + def parse(self, db_cursor: Cursor, pathobj: Edk2Path) -> None: + """Glob for packages and insert them into the table.""" + + try: + repo = git.Repo(pathobj.WorkspacePath) + except: + return + + for file in Path(pathobj.WorkspacePath).rglob("*.dec"): + pkg = pathobj.GetContainingPackage(str(file)) + containing_repo = "BASE" + if repo: + for submodule in repo.submodules: + if submodule.abspath in str(file): + containing_repo = submodule.name + break + db_cursor.execute(INSERT_PACKAGE_ROW, (pkg, containing_repo)) diff --git a/edk2toollib/database/tables/source_table.py b/edk2toollib/database/tables/source_table.py index 691f9b004..ad012b253 100644 --- a/edk2toollib/database/tables/source_table.py +++ b/edk2toollib/database/tables/source_table.py @@ -10,27 +10,35 @@ import re import time from pathlib import Path +from sqlite3 import Cursor from joblib import Parallel, delayed -from tinyrecord import transaction -from edk2toollib.database import Edk2DB, TableGenerator +from edk2toollib.database.tables.base_table import TableGenerator +from edk2toollib.uefi.edk2.path_utilities import Edk2Path SOURCE_FILES = ["*.c", "*.h", "*.cpp", "*.asm", "*.s", "*.nasm", "*.masm", "*.rs"] +CREATE_SOURCE_TABLE = ''' +CREATE TABLE IF NOT EXISTS source ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + path TEXT UNIQUE, + license TEXT, + total_lines INTEGER, + code_lines INTEGER, + comment_lines INTEGER, + blank_lines INTEGER +) +''' -class SourceTable(TableGenerator): - """A Table Generator that parses all c and h files in the workspace. +INSERT_SOURCE_ROW = ''' +INSERT OR REPLACE INTO source (path, license, total_lines, code_lines, comment_lines, blank_lines) +VALUES (?, ?, ?, ?, ?, ?) +''' - Generates a table with the following schema: - ``` py - table_name = "source" - |-------------------------------------------------------------------------| - | PATH | LICENSE | TOTAL_LINES | CODE_LINES | COMMENT_LINES | BLANK_LINES | - |-------------------------------------------------------------------------| - ``` - """ # noqa: E501 +class SourceTable(TableGenerator): + """A Table Generator that parses all c and h files in the workspace.""" def __init__(self, *args, **kwargs): """Initializes the Source Table Parser. @@ -43,10 +51,14 @@ def __init__(self, *args, **kwargs): """ self.n_jobs = kwargs.get("n_jobs", -1) - def parse(self, db: Edk2DB) -> None: + def create_tables(self, db_cursor: Cursor) -> None: + """Create the tables necessary for this parser.""" + db_cursor.execute(CREATE_SOURCE_TABLE) + + def parse(self, db_cursor: Cursor, pathobj: Edk2Path) -> None: """Parse the workspace and update the database.""" - ws = Path(db.pathobj.WorkspacePath) - src_table = db.table("source", cache_size=None) + ws = Path(pathobj.WorkspacePath) + self.pathobj = pathobj start = time.time() files = [] @@ -58,8 +70,7 @@ def parse(self, db: Edk2DB) -> None: f"{self.__class__.__name__}: Parsed {len(src_entries)} files; " f"took {round(time.time() - start, 2)} seconds.") - with transaction(src_table) as tr: - tr.insert_multiple(src_entries) + db_cursor.executemany(INSERT_SOURCE_ROW, src_entries) def _parse_file(self, ws, filename: Path) -> dict: """Parse a C file and return the results.""" @@ -69,12 +80,11 @@ def _parse_file(self, ws, filename: Path) -> dict: match = re.search(r"SPDX-License-Identifier:\s*(.*)$", line) # TODO: This is not a standard format. if match: license = match.group(1) - - return { - "PATH": filename.relative_to(ws).as_posix(), - "LICENSE": license, - "TOTAL_LINES": 0, - "CODE_LINES": 0, - "COMMENT_LINES": 0, - "BLANK_LINES": 0, - } + return ( + self.pathobj.GetEdk2RelativePathFromAbsolutePath(filename.as_posix()), # path + license or "Unknown", # license + 0, # total_lines + 0, # code_lines + 0, # comment_lines + 0, # blank_lines + ) diff --git a/pyproject.toml b/pyproject.toml index 035fad821..e2eb4f060 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,9 +13,7 @@ dependencies = [ "pyasn1 >= 0.4.8", "pyasn1-modules >= 0.2.8", "cryptography >= 39.0.1", - "tinydb == 4.8.0", "joblib == 1.3.2", - "tinyrecord == 0.2.0", ] classifiers=[ "Programming Language :: Python :: 3", diff --git a/tests.unit/database/common.py b/tests.unit/database/common.py index 7b32c57a5..5a26ac08a 100644 --- a/tests.unit/database/common.py +++ b/tests.unit/database/common.py @@ -10,19 +10,9 @@ from pathlib import Path import pytest -from edk2toollib.database import Edk2DB, Query, transaction from edk2toollib.uefi.edk2.path_utilities import Edk2Path -def correlate_env(db: Edk2DB): - """Correlates the environment table with the other tables.""" - idx = len(db.table("environment")) - 1 - for table in filter(lambda table: table != "environment", db.tables()): - table = db.table(table) - - with transaction(table) as tr: - tr.update({'ENVIRONMENT_ID': idx}, ~Query().ENVIRONMENT_ID.exists()) - def write_file(file, contents): """Writes contents to a file.""" file.write_text(contents) @@ -44,7 +34,7 @@ def make_edk2_cfg_file(*args, **kwargs)->str: out += "[Defines]\n" for key, value in kwargs["defines"].items(): # Must exist - out += f' {key} = {value}\n' + out += f' {key.split(" ")[0]} = {value}\n' for key, values in kwargs.items(): if key == "defines": diff --git a/tests.unit/database/test_component_query.py b/tests.unit/database/test_component_query.py deleted file mode 100644 index d761fcc94..000000000 --- a/tests.unit/database/test_component_query.py +++ /dev/null @@ -1,77 +0,0 @@ -## -# unittest for the ComponentQuery query -# -# Copyright (c) Microsoft Corporation -# -# Spdx-License-Identifier: BSD-2-Clause-Patent -## -# ruff: noqa: F811 -"""Unittest for the ComponentQuery query.""" -from common import Tree, correlate_env, empty_tree # noqa: F401 -from edk2toollib.database import Edk2DB -from edk2toollib.database.queries import ComponentQuery -from edk2toollib.database.tables import EnvironmentTable, InstancedInfTable -from edk2toollib.uefi.edk2.path_utilities import Edk2Path - - -def test_simple_component(empty_tree: Tree): - """Tests that components are detected.""" - lib1 = empty_tree.create_library("TestLib1", "TestCls") - lib2 = empty_tree.create_library("TestLib2", "TestCls") - lib3 = empty_tree.create_library("TestLib3", "TestNullCls") - - comp1 = empty_tree.create_component( - "TestDriver1", "DXE_DRIVER", - libraryclasses = ["TestCls"] - ) - comp2 = empty_tree.create_component( - "TestDriver2", "DXE_DRIVER", - libraryclasses = ["TestCls"] - ) - - dsc = empty_tree.create_dsc( - libraryclasses = [ - f'TestCls|{lib1}' - ], - components = [ - f'{comp2}', - f'{comp1} {{', - '', - '!if $(TARGET) == "DEBUG"', - f'TestCls|{lib2}', - f'NULL|{lib3}', - '!endif', - '}', - ] - ) - - edk2path = Edk2Path(str(empty_tree.ws), []) - db = Edk2DB(Edk2DB.MEM_RW, pathobj = edk2path) - env = { - "ACTIVE_PLATFORM": dsc, - "TARGET_ARCH": "IA32", - "TARGET": "DEBUG", - } - db.register(InstancedInfTable(env = env), EnvironmentTable(env = env)) - db.parse() - correlate_env(db) - - # Ensure that a component query with an invalid env id returns nothing and does not crash - result = db.search(ComponentQuery(env_id = 1)) - assert len(result) == 0 - - # ensure that a component query with a valid env id returns the correct result - result = db.search(ComponentQuery(env_id = 0)) - assert len(result) == 2 - - # ensure that a component query without an env id returns the correct result - result = db.search(ComponentQuery()) - assert len(result) == 2 - - result = db.search(ComponentQuery(component = "TestDriver1")) - assert len(result) == 1 - - assert sorted(result[0]['LIBRARIES_USED']) == sorted([('TestCls','TestPkg/Library/TestLib2.inf'), ('NULL','TestPkg/Library/TestLib3.inf')]) - - result = db.search(ComponentQuery(component = "NonExistantDriver")) - assert len(result) == 0 diff --git a/tests.unit/database/test_edk2_db.py b/tests.unit/database/test_edk2_db.py index 1dcd65a7e..d767b8922 100644 --- a/tests.unit/database/test_edk2_db.py +++ b/tests.unit/database/test_edk2_db.py @@ -7,33 +7,15 @@ ## # ruff: noqa: F811 """Unittest for the Edk2DB class.""" -import logging import pytest -from common import Tree, correlate_env, empty_tree # noqa: F401 -from edk2toollib.database import AdvancedQuery, Edk2DB, TableGenerator -from edk2toollib.database.queries import LibraryQuery +from common import Tree, empty_tree # noqa: F401 +from edk2toollib.database import Edk2DB from edk2toollib.database.tables import InfTable +from edk2toollib.database.tables.base_table import TableGenerator from edk2toollib.uefi.edk2.path_utilities import Edk2Path -def test_load_each_db_mode(empty_tree: Tree): - edk2path = Edk2Path(str(empty_tree.ws), []) - db_path = empty_tree.ws / "test.db" - - with Edk2DB(Edk2DB.FILE_RW, pathobj=edk2path, db_path=db_path): - pass - - with Edk2DB(Edk2DB.FILE_RO, db_path=db_path): - pass - - with Edk2DB(Edk2DB.MEM_RW, pathobj=edk2path): - pass - - with pytest.raises(ValueError, match = "Unknown Database mode."): - with Edk2DB(5): - pass - def test_load_existing_db(empty_tree: Tree): """Test that we can create a json database and load it later.""" empty_tree.create_library("TestLib1", "TestCls") @@ -42,42 +24,35 @@ def test_load_existing_db(empty_tree: Tree): db_path = empty_tree.ws / "test.db" assert db_path.exists() is False - with Edk2DB(Edk2DB.FILE_RW, pathobj=edk2path, db_path=db_path) as db: + with Edk2DB(db_path, pathobj=edk2path) as db: db.register(InfTable()) - db.parse(edk2path) - assert len(db.search(LibraryQuery())) == 1 + db.parse() + result = db.connection.execute("SELECT name FROM sqlite_master WHERE type='table' AND name=?", ("inf",)).fetchone() + assert result is not None assert db_path.exists() # Ensure we can load an existing database - with Edk2DB(Edk2DB.FILE_RW, pathobj=edk2path, db_path=db_path) as db: - assert len(db.search(LibraryQuery())) == 1 + with Edk2DB(db_path, pathobj=edk2path) as db: + result = db.connection.execute("SELECT name FROM sqlite_master WHERE type='table' AND name=?", ("inf",)).fetchone() + assert result is not None -def test_catch_bad_parser_and_query(empty_tree: Tree, caplog): +def test_catch_bad_parser_and_query(empty_tree: Tree): """Test that a bad parser will be caught and logged.""" - caplog.set_level(logging.ERROR) edk2path = Edk2Path(str(empty_tree.ws), []) db_path = empty_tree.ws / "test.db" assert db_path.exists() is False - with Edk2DB(Edk2DB.FILE_RW, pathobj=edk2path, db_path=db_path) as db: - db.register(TableGenerator()) # Not implemented, will throw an error. Caught and logged. - db.parse(edk2path) - - with pytest.raises(NotImplementedError): - db.search(AdvancedQuery()) # Not implemented, will throw an error - - for message in [r.message for r in caplog.records]: - if "failed." in message: - break - else: - pytest.fail("No error message was logged for a failed parser.") + with pytest.raises(NotImplementedError): + with Edk2DB(db_path, pathobj=edk2path) as db: + db.register(TableGenerator()) + db.parse() def test_clear_parsers(empty_tree: Tree): """Test that we can clear all parsers.""" edk2path = Edk2Path(str(empty_tree.ws), []) - with Edk2DB(Edk2DB.MEM_RW, pathobj=edk2path) as db: + with Edk2DB(empty_tree.ws / "test.db", pathobj=edk2path) as db: db.register(TableGenerator()) assert len(db._parsers) == 1 diff --git a/tests.unit/database/test_environment_table.py b/tests.unit/database/test_environment_table.py index c46d5c1b2..6f18dc6a0 100644 --- a/tests.unit/database/test_environment_table.py +++ b/tests.unit/database/test_environment_table.py @@ -7,62 +7,79 @@ ## # ruff: noqa: F811 """Tests for build an inf file table.""" -import os from datetime import date -from edk2toollib.database.tables import EnvironmentTable + from edk2toollib.database import Edk2DB +from edk2toollib.database.tables import EnvironmentTable from edk2toollib.uefi.edk2.path_utilities import Edk2Path -def test_environment_no_version(): + +def test_environment_no_version(tmp_path): """Test that version is set if not found in the environment variables.""" - edk2path = Edk2Path(os.getcwd(), []) - db = Edk2DB(Edk2DB.MEM_RW, pathobj=edk2path) - env_table = EnvironmentTable(env={}) + edk2path = Edk2Path(str(tmp_path), []) + db = Edk2DB(tmp_path / "db.db", pathobj=edk2path) + db.register(EnvironmentTable(env={})) - env_table.parse(db) - table = db.table("environment") + db.parse() - assert len(table) == 1 - row = table.all()[0] + rows = list(db.connection.cursor().execute("SELECT * FROM environment")) - assert row['VERSION'] == 'UNKNOWN' - assert row["DATE"] == str(date.today()) - assert row["ENV"] == {} + assert len(rows) == 1 + _, actual_date, actual_version = rows[0] -def test_environment_version(): + assert actual_version == 'UNKNOWN' + assert actual_date.split(" ")[0] == str(date.today()) + + rows = list(db.connection.cursor().execute("SELECT key, value FROM environment_values")) + assert len(rows) == 0 + +def test_environment_version(tmp_path): """Test that version is detected out of environment variables.""" - edk2path = Edk2Path(os.getcwd(), []) - db = Edk2DB(Edk2DB.MEM_RW, pathobj=edk2path) - env = {"VERSION": "abcdef1"} - env_table = EnvironmentTable(env=env) + edk2path = Edk2Path(str(tmp_path), []) + db = Edk2DB(tmp_path / "db.db", pathobj=edk2path) + db.register(EnvironmentTable(env={})) + + db.parse() - env_table.parse(db) - table = db.table("environment") + rows = list(db.connection.cursor().execute("SELECT * FROM environment")) - assert len(table) == 1 - row = table.all()[0] + assert len(rows) == 1 + _, actual_date, actual_version = rows[0] - assert row['VERSION'] == 'abcdef1' - assert row["DATE"] == str(date.today()) - assert row["ENV"] == {} + assert actual_date.split(" ")[0] == str(date.today()) + assert actual_version == 'UNKNOWN' + rows = list(db.connection.cursor().execute("SELECT key, value FROM environment_values")) + assert len(rows) == 0 -def test_environment_with_vars(): + +def test_environment_with_vars(tmp_path): """Tests that environment variables are recorded.""" - edk2path = Edk2Path(os.getcwd(), []) - db = Edk2DB(Edk2DB.MEM_RW, pathobj=edk2path) env = { "ACTIVE_PLATFORM": "TestPkg/TestPkg.dsc", "TARGET_ARCH": "X64", "TOOL_CHAIN_TAG": "VS2019", "FLASH_DEFINITION": "TestPkg/TestPkg.fdf", } - env_table = EnvironmentTable(env = env) - env_table.parse(db) + edk2path = Edk2Path(str(tmp_path), []) + db = Edk2DB(tmp_path / "db.db", pathobj=edk2path) + db.register(EnvironmentTable(env=env)) + + db.parse() + + rows = list(db.connection.cursor().execute("SELECT * FROM environment")) + + assert len(rows) == 1 + _, actual_date, actual_version = rows[0] + + assert actual_date.split(" ")[0] == str(date.today()) + assert actual_version == 'UNKNOWN' + + rows = list(db.connection.cursor().execute("SELECT key, value FROM environment_values WHERE id = 1")) + assert len(rows) == 4 + + db.parse() - assert len(db.table("environment")) == 1 - row = db.table("environment").all()[0] + rows = list(db.connection.cursor().execute("SELECT * FROM environment")) - assert row['VERSION'] == 'UNKNOWN' - assert row["DATE"] == str(date.today()) - assert row["ENV"] == env + assert len(rows) == 2 diff --git a/tests.unit/database/test_inf_table.py b/tests.unit/database/test_inf_table.py index 9072aee78..cab347308 100644 --- a/tests.unit/database/test_inf_table.py +++ b/tests.unit/database/test_inf_table.py @@ -16,8 +16,8 @@ def test_valid_inf(empty_tree: Tree): """Tests that a valid Inf with typical settings is properly parsed.""" edk2path = Edk2Path(str(empty_tree.ws), []) - db = Edk2DB(Edk2DB.MEM_RW, pathobj=edk2path) - inf_table = InfTable(n_jobs = 1) + db = Edk2DB(empty_tree.ws / "db.db", pathobj=edk2path) + db.register(InfTable(n_jobs = 1)) # Configure inf libs = ["TestLib2", "TestLib3"] @@ -36,14 +36,25 @@ def test_valid_inf(empty_tree: Tree): sources_ia32 = sources_ia32, sources_x64 = sources_x64, ) - inf_table.parse(db) - table = db.table("inf") + lib2 = empty_tree.create_library( + "TestLib2", "TestCls", + libraryclasses = libs, + protocols = protocols, + guids = guids, + sources = sources, + sources_ia32 = sources_ia32, + sources_x64 = sources_x64, + ) + db.parse() + + rows = list(db.connection.cursor().execute("SELECT path, library_class FROM inf")) + assert len(rows) == 2 + + for path, library_class in rows: + assert path in [lib1, lib2] + assert library_class == "TestCls" - assert len(table) == 1 - row = table.all()[0] + for inf in [lib1, lib2]: + rows = db.connection.execute("SELECT * FROM junction WHERE key1 = ? AND table2 = 'source'", (inf,)).fetchall() + assert len(rows) == 3 - assert row['PATH'] in (empty_tree.ws / lib1).as_posix() - assert row['LIBRARIES_USED'] == libs - assert row['PROTOCOLS_USED'] == protocols - assert row['GUIDS_USED'] == guids - assert sorted(row['SOURCES_USED']) == sorted(sources + sources_ia32 + sources_x64) diff --git a/tests.unit/database/test_instanced_fv_table.py b/tests.unit/database/test_instanced_fv_table.py index fbb13506a..8069bf869 100644 --- a/tests.unit/database/test_instanced_fv_table.py +++ b/tests.unit/database/test_instanced_fv_table.py @@ -11,14 +11,20 @@ import pytest from common import Tree, empty_tree # noqa: F401 from edk2toollib.database import Edk2DB -from edk2toollib.database.tables import InstancedFvTable +from edk2toollib.database.tables import EnvironmentTable, InstancedFvTable from edk2toollib.uefi.edk2.path_utilities import Edk2Path +GET_INF_LIST_QUERY = """ +SELECT i.path +FROM inf AS i +JOIN junction AS j ON ? = j.key1 and j.table2 = "inf" +""" def test_valid_fdf(empty_tree: Tree): # noqa: F811 """Tests that a typical fdf can be properly parsed.""" edk2path = Edk2Path(str(empty_tree.ws), []) - db = Edk2DB(Edk2DB.MEM_RW, pathobj=edk2path) + db = Edk2DB(empty_tree.ws / "db.db", pathobj=edk2path) + db.register(EnvironmentTable()) # raise exception if the Table generator is missing required information to # Generate the table. @@ -51,19 +57,11 @@ def test_valid_fdf(empty_tree: Tree): # noqa: F811 "TARGET_ARCH": "IA32 X64", "TARGET": "DEBUG", }) - # Parse the FDF - fv_table.parse(db) + db.register(fv_table) + db.parse() - # Ensure tests pass for expected output - for fv in db.table("instanced_fv").all(): + fv_id = db.connection.execute("SELECT id FROM instanced_fv WHERE fv_name = 'infformat'").fetchone()[0] + rows = db.connection.execute("SELECT key2 FROM junction where key1 == ?", (fv_id,)).fetchall() - # Test INF's were parsed correctly. Paths should be posix as - # That is the EDK2 standard - if fv['FV_NAME'] == "infformat": - assert sorted(fv['INF_LIST']) == sorted([ - Path(comp1).as_posix(), - Path(comp2).as_posix(), - Path(comp3).as_posix(), - Path(comp5).as_posix(), - Path(comp4).as_posix(), - ]) + assert len(rows) == 5 + assert sorted(rows) == sorted([(comp1,), (comp2,), (comp3,), (comp4,), (comp5,)]) diff --git a/tests.unit/database/test_instanced_inf_table.py b/tests.unit/database/test_instanced_inf_table.py index 4469fd31b..659f621fb 100644 --- a/tests.unit/database/test_instanced_inf_table.py +++ b/tests.unit/database/test_instanced_inf_table.py @@ -11,16 +11,28 @@ from pathlib import Path import pytest -from common import Tree, create_inf_file, empty_tree # noqa: F401 -from edk2toollib.database import Edk2DB, Query -from edk2toollib.database.tables import InstancedInfTable +from common import Tree, empty_tree # noqa: F401 +from edk2toollib.database import Edk2DB +from edk2toollib.database.tables import EnvironmentTable, InstancedInfTable from edk2toollib.uefi.edk2.path_utilities import Edk2Path +GET_USED_LIBRARIES_QUERY = """ +SELECT i.path +FROM instanced_inf AS i +JOIN junction AS j ON i.id = j.key2 and j.table2 = "instanced_inf" +WHERE j.key1 = ( + SELECT id + FROM instanced_inf + WHERE name = ? AND arch = ? + LIMIT 1 +); +""" def test_valid_dsc(empty_tree: Tree): """Tests that a typical dsc can be correctly parsed.""" edk2path = Edk2Path(str(empty_tree.ws), []) - db = Edk2DB(Edk2DB.MEM_RW, pathobj=edk2path) + db = Edk2DB(empty_tree.ws / "db.db", pathobj=edk2path) + db.register(EnvironmentTable()) comp1 = empty_tree.create_component("TestComponent1", "DXE_DRIVER") lib1 = empty_tree.create_library("TestLib1", "TestCls") @@ -34,18 +46,19 @@ def test_valid_dsc(empty_tree: Tree): "TARGET_ARCH": "IA32", "TARGET": "DEBUG", }) - inf_table.parse(db) + db.register(inf_table) + db.parse() - # Check that only 1 component is picked up, as libraries in the component section are ignored - assert len(db.table("instanced_inf")) == 1 - entry = db.table("instanced_inf").all()[0] - assert entry["NAME"] == Path(comp1).stem + rows = db.connection.cursor().execute("SELECT * FROM instanced_inf").fetchall() + assert len(rows) == 1 + assert rows[0][4] == Path(comp1).stem def test_no_active_platform(empty_tree: Tree, caplog): """Tests that the dsc table returns immediately when no ACTIVE_PLATFORM is defined.""" caplog.set_level(logging.DEBUG) edk2path = Edk2Path(str(empty_tree.ws), []) - Edk2DB(Edk2DB.MEM_RW, pathobj=edk2path) + db = Edk2DB(empty_tree.ws / "db.db", pathobj=edk2path) + db.register(EnvironmentTable()) # Test 1: raise error for missing ACTIVE_PLATFORM with pytest.raises(KeyError, match = "ACTIVE_PLATFORM"): @@ -67,7 +80,8 @@ def test_no_active_platform(empty_tree: Tree, caplog): def test_dsc_with_conditional(empty_tree: Tree): """Tests that conditionals inside a DSC works as expected.""" edk2path = Edk2Path(str(empty_tree.ws), []) - db = Edk2DB(Edk2DB.MEM_RW, pathobj=edk2path) + db = Edk2DB(empty_tree.ws / "db.db", pathobj=edk2path) + db.register(EnvironmentTable()) empty_tree.create_library("TestLib", "SortLib") comp1 = empty_tree.create_component('TestComponent1', 'DXE_DRIVER') @@ -84,15 +98,16 @@ def test_dsc_with_conditional(empty_tree: Tree): "TARGET_ARCH": "IA32 X64", "TARGET": "DEBUG", }) + db.register(inf_table) + db.parse() - inf_table.parse(db) - - assert len(db.table("instanced_inf")) == 0 + assert db.connection.cursor().execute("SELECT * FROM instanced_inf").fetchall() == [] def test_library_override(empty_tree: Tree): """Tests that overrides and null library overrides can be parsed as expected.""" edk2path = Edk2Path(str(empty_tree.ws), []) - db = Edk2DB(Edk2DB.MEM_RW, pathobj=edk2path) + db = Edk2DB(empty_tree.ws / "db.db", pathobj=edk2path) + db.register(EnvironmentTable()) lib1 = empty_tree.create_library("TestLib1", "TestCls") lib2 = empty_tree.create_library("TestLib2", "TestCls") @@ -123,16 +138,13 @@ def test_library_override(empty_tree: Tree): "TARGET_ARCH": "IA32 X64", "TARGET": "DEBUG", }) - inf_table.parse(db) + db.register(inf_table) + db.parse() + db.connection.execute("SELECT * FROM junction").fetchall() + library_list = db.connection.cursor().execute(GET_USED_LIBRARIES_QUERY, ("TestDriver1", "IA32")) - # Ensure the Test Driver is using TestLib2 from the override and the NULL library was added - for row in db.table("instanced_inf").all(): - if (row["NAME"] == Path(comp1).stem - and ("TestCls", Path(lib2).as_posix()) in row["LIBRARIES_USED"] - and ("NULL", Path(lib3).as_posix()) in row["LIBRARIES_USED"]): - break - else: - assert False + for path, in library_list: + assert path in [lib2, lib3] def test_scoped_libraries1(empty_tree: Tree): """Ensure that the correct libraries in regards to scoping. @@ -143,7 +155,8 @@ def test_scoped_libraries1(empty_tree: Tree): 2. $(ARCH) """ edk2path = Edk2Path(str(empty_tree.ws), []) - db = Edk2DB(Edk2DB.MEM_RW, pathobj=edk2path) + db = Edk2DB(empty_tree.ws / "db.db", pathobj=edk2path) + db.register(EnvironmentTable()) lib1 = empty_tree.create_library("TestLib1", "TestCls") lib2 = empty_tree.create_library("TestLib2", "TestCls") @@ -167,12 +180,13 @@ def test_scoped_libraries1(empty_tree: Tree): "TARGET_ARCH": "IA32 X64", "TARGET": "DEBUG", }) - inf_table.parse(db) + db.register(inf_table) + db.parse() - # For each driver, verify that the the driver number (1, 2, 3) uses the corresponding lib number (1, 2, 3) - for row in db.table("instanced_inf").all(): - if "COMPONENT" not in row: # Only care about looking at drivers, which do not have a component - assert row["NAME"].replace("Driver", "Lib") in row["LIBRARIES_USED"][0][1] + for arch in ["IA32", "X64"]: + for component, in db.connection.execute("SELECT name FROM instanced_inf WHERE component IS NULL and arch is ?;", (arch,)): + component_lib = db.connection.execute(GET_USED_LIBRARIES_QUERY, (component, arch)).fetchone()[0] + assert component.replace("Driver", "Lib") in component_lib def test_scoped_libraries2(empty_tree: Tree): """Ensure that the correct libraries in regards to scoping. @@ -183,7 +197,8 @@ def test_scoped_libraries2(empty_tree: Tree): 2. common """ edk2path = Edk2Path(str(empty_tree.ws), []) - db = Edk2DB(Edk2DB.MEM_RW, pathobj=edk2path) + db = Edk2DB(empty_tree.ws / "db.db", pathobj=edk2path) + db.register(EnvironmentTable()) lib1 = empty_tree.create_library("TestLib1", "TestCls") lib2 = empty_tree.create_library("TestLib2", "TestCls") @@ -203,16 +218,19 @@ def test_scoped_libraries2(empty_tree: Tree): "TARGET_ARCH": "IA32 X64", "TARGET": "DEBUG", }) - inf_table.parse(db) + db.register(inf_table) + db.parse() - for row in db.table("instanced_inf").all(): - if "COMPONENT" not in row: - assert row["NAME"].replace("Driver", "Lib") in row["LIBRARIES_USED"][0][1] + for arch in ["IA32", "X64"]: + for component, in db.connection.execute("SELECT name FROM instanced_inf WHERE component IS NULL and arch is ?;", (arch,)): + component_lib = db.connection.execute(GET_USED_LIBRARIES_QUERY, (component, arch)).fetchone()[0] + assert component.replace("Driver", "Lib") in component_lib -def test_missing_library(empty_tree: Tree, caplog): +def test_missing_library(empty_tree: Tree): """Test when a library is missing.""" edk2path = Edk2Path(str(empty_tree.ws), []) - db = Edk2DB(Edk2DB.MEM_RW, pathobj=edk2path) + db = Edk2DB(empty_tree.ws / "db.db", pathobj=edk2path) + db.register(EnvironmentTable()) comp1 = empty_tree.create_component("TestDriver1", "PEIM", libraryclasses = ["TestCls"]) @@ -227,71 +245,53 @@ def test_missing_library(empty_tree: Tree, caplog): "TARGET_ARCH": "IA32 X64", "TARGET": "DEBUG", }) - - with caplog.at_level(logging.WARNING): - inf_table.parse(db) - - assert len(caplog.records) == 1 - assert 'testcls' in caplog.records[0].message - -def test_skip_library_with_unsupported_module_type(empty_tree: Tree): - """Library class INFs can specify what module types they support. - - In situations where a library class is in the [LibraryClasses] section, it may not necessarily - support all module types as the LIBRARY_CLASS section of the INF may limit its supported - module types. This test ensures that a library instance is ignored if the library instance - itself states it does not support a module type. - - i.e. LIBRARY_CLASS = TestCls| PEIM only supports PEIM's, even if it is in the [LibraryClasses] section. - """ + db.register(inf_table) + db.parse() + key2 = db.connection.execute("SELECT key2 FROM junction").fetchone()[0] + assert key2 is None # This library class does not have an instance available, so key2 should be None + +# TODO +def test_multiple_library_class(empty_tree: Tree): + """Test that a library INF that has multiple library class definitions is handled correctly.""" edk2path = Edk2Path(str(empty_tree.ws), []) - db = Edk2DB(Edk2DB.MEM_RW, pathobj=edk2path) - - testlib1 = empty_tree.create_library("TestLib1", "TestCls", - defines = { - "LIBRARY_CLASS": "TestCls| PEIM" - } - ) - testlib2 = empty_tree.create_library("TestLib2", "TestCls") - comp1 = empty_tree.create_component("TestDriver1", "DXE_DRIVER", libraryclasses = ["TestCls"]) - - # Generate the DSC with testlib1 first - dsc1 = empty_tree.create_dsc( - libraryclasses = [ - f'TestCls|{testlib1}', - f'TestCls|{testlib2}', + db = Edk2DB(empty_tree.ws / "db.db", pathobj=edk2path) + db.register(EnvironmentTable()) + + lib1 = empty_tree.create_library("TestLib", "TestCls", default = { + "MODULE_TYPE": "BASE", + "BASE_NAME": "TestLib1", + "LIBRARY_CLASS 1": "TestCls1", + "LIBRARY_CLASS 2": "TestCls2", + }) - ], - components = [comp1], - ) + comp1 = empty_tree.create_component("TestDriver1", "DXE_RUNTIME_DRIVER", libraryclasses = ["TestCls1"]) + comp2 = empty_tree.create_component("TestDriver2", "DXE_DRIVER", libraryclasses = ["TestCls2"]) - # Generate the DSC with testlib2 first - dsc2 = empty_tree.create_dsc( + dsc = empty_tree.create_dsc( libraryclasses = [ - f'TestCls|{testlib2}', - f'TestCls|{testlib1}', - + f'TestCls1|{lib1}', + f'TestCls2|{lib1}' ], - components = [comp1], + components = [comp1, comp2], ) inf_table = InstancedInfTable(env = { - "ACTIVE_PLATFORM": dsc1, + "ACTIVE_PLATFORM": dsc, "TARGET_ARCH": "X64", "TARGET": "DEBUG", }) - inf_table.parse(db) + db.register(inf_table) + db.parse() - inf_table = InstancedInfTable(env = { - "ACTIVE_PLATFORM": dsc2, - "TARGET_ARCH": "X64", - "TARGET": "DEBUG", - }) + results = db.connection.execute("SELECT key1, key2 FROM junction").fetchall() - inf_table.parse(db) + # Verify that TestDrver1 uses TestLib acting as TestCls1 + assert results[0] == ('2','1') # idx 2 is TestDriver1, idx1 is TestLib1 acting as TestCsl1 + assert ("TestLib", "TestCls1") == db.connection.execute("SELECT name, class FROM instanced_inf where id = 1").fetchone() + assert ("TestDriver1", None) == db.connection.execute("SELECT name, component FROM instanced_inf where id = 2").fetchone() - component_list = db.table("instanced_inf").search(~Query().COMPONENT.exists()) - assert len(component_list) == 2 - for component in component_list: - assert component["LIBRARIES_USED"][0] == ("TestCls", Path(testlib2).as_posix()) + # Verify that TestDriver2 uses TestLib acting as TestCls2 + assert results[1] == ('4', '3') # idx 4 is TestDriver2, idx 3 is TestLib1 acting as TestCls2 + assert ("TestLib", "TestCls2") == db.connection.execute("SELECT name, class FROM instanced_inf where id = 3").fetchone() + assert ("TestDriver2", None) == db.connection.execute("SELECT name, component FROM instanced_inf where id = 4").fetchone() diff --git a/tests.unit/database/test_library_query.py b/tests.unit/database/test_library_query.py deleted file mode 100644 index b77811cf3..000000000 --- a/tests.unit/database/test_library_query.py +++ /dev/null @@ -1,35 +0,0 @@ -## -# unittest for the LibraryQuery query -# -# Copyright (c) Microsoft Corporation -# -# Spdx-License-Identifier: BSD-2-Clause-Patent -## -# ruff: noqa: F811 -"""Unittest for the LibraryQuery query.""" -from common import Tree, empty_tree # noqa: F401 -from edk2toollib.database import Edk2DB -from edk2toollib.database.queries import LibraryQuery -from edk2toollib.database.tables import InfTable -from edk2toollib.uefi.edk2.path_utilities import Edk2Path - - -def test_simple_library_query(empty_tree: Tree): - """Tests that libraries are detected.""" - empty_tree.create_library("TestLib1", "TestCls") - empty_tree.create_library("TestLib2", "TestCls") - empty_tree.create_library("TestLib3", "TestOtherCls") - - edk2path = Edk2Path(str(empty_tree.ws), []) - db = Edk2DB(Edk2DB.MEM_RW, pathobj=edk2path) - db.register(InfTable()) - db.parse() - - result = db.search(LibraryQuery(library = "TestCls")) - assert len(result) == 2 - - result = db.search(LibraryQuery(library = "TestOtherCls")) - assert len(result) == 1 - - result = db.search(LibraryQuery()) - assert len(result) == 3 diff --git a/tests.unit/database/test_license_query.py b/tests.unit/database/test_license_query.py deleted file mode 100644 index d5bc34cd0..000000000 --- a/tests.unit/database/test_license_query.py +++ /dev/null @@ -1,75 +0,0 @@ -## -# unittest for the LicenseQuery query -# -# Copyright (c) Microsoft Corporation -# -# Spdx-License-Identifier: BSD-2-Clause-Patent -## -"""Unittest for the LicenseQuery query.""" -from common import Tree, empty_tree # noqa: F401 -from edk2toollib.database import Edk2DB -from edk2toollib.database.queries import LicenseQuery -from edk2toollib.database.tables import SourceTable -from edk2toollib.uefi.edk2.path_utilities import Edk2Path - - -def test_simple_license(empty_tree: Tree): - """Tests that missing licenses are detected.""" - f1 = empty_tree.library_folder / "File.c" - f1.touch() - - with open(f1, 'a+') as f: - f.writelines([ - '/**' - ' Nothing to see here!' - '**/' - ]) - - f2 = empty_tree.library_folder / "File2.c" - f2.touch() - - with open(f2, 'a+') as f: - f.writelines([ - '/**' - ' SPDX-License-Identifier: Fake-License' - '**/' - ]) - - f3 = empty_tree.component_folder / "File3.c" - f3.touch() - - with open(f3, 'a+') as f: - f.writelines([ - '/**' - ' SPDX-License-Identifier: BSD-2-Clause-Patent' - '**/' - ]) - - f4 = empty_tree.component_folder / "File4.c" - f4.touch() - - with open(f4, 'a+') as f: - f.writelines([ - '/**' - ' Nothing to see here!' - '**/' - ]) - - edk2path = Edk2Path(str(empty_tree.ws), []) - db = Edk2DB(Edk2DB.MEM_RW, pathobj=edk2path) - db.register(SourceTable()) - db.parse() - - # Test with no filters - result = db.search(LicenseQuery()) - assert len(result) == 2 - - # Test with include filter - result = db.search(LicenseQuery(include = "Library")) - assert len(result) == 1 - assert "Library" in result[0]["PATH"] - - # Test with exclude filter - result = db.search(LicenseQuery(exclude = "Library")) - assert len(result) == 1 - assert "Driver" in result[0]["PATH"] diff --git a/tests.unit/database/test_source_table.py b/tests.unit/database/test_source_table.py index f31cff231..fa2673e71 100644 --- a/tests.unit/database/test_source_table.py +++ b/tests.unit/database/test_source_table.py @@ -7,7 +7,7 @@ ## """Tests for building a source file table.""" from common import write_file -from edk2toollib.database import Edk2DB +from edk2toollib.database.edk2_db import Edk2DB from edk2toollib.database.tables import SourceTable from edk2toollib.uefi.edk2.path_utilities import Edk2Path @@ -32,54 +32,46 @@ def test_source_with_license(tmp_path): """Tests that a source with a license is detected and the license is set.""" edk2path = Edk2Path(str(tmp_path), []) - db = Edk2DB(Edk2DB.MEM_RW, pathobj=edk2path) - source_table = SourceTable(n_jobs = 1) + db = Edk2DB(tmp_path / "db.db", pathobj=edk2path) + db.register(SourceTable(n_jobs = 1)) # Verify we detect c and h files for file in ["file.c", "file.h", "file.asm", "file.cpp"]: write_file(tmp_path / file, SOURCE_LICENSE) - source_table.parse(db) - table = db.table("source") - assert len(table) == 1 - row = table.all()[0] - assert row["PATH"] == (tmp_path / file).relative_to(tmp_path).as_posix() - assert row["LICENSE"] == "BSD-2-Clause-Patent" + db.parse() - db.drop_table("source") - (tmp_path / file).unlink() + rows = list(db.connection.cursor().execute("SELECT license FROM source")) + assert len(rows) == 4 + for license, in rows: + assert license == "BSD-2-Clause-Patent" - # Ensure we don't catch a file that isnt a c / h file. - write_file(tmp_path / "file1.py", SOURCE_LICENSE) - source_table.parse(db) - table = db.table("source") - assert len(table) == 0 - def test_source_without_license(tmp_path): """Tests that a source without a license is detected.""" edk2path = Edk2Path(str(tmp_path), []) - db = Edk2DB(Edk2DB.MEM_RW, pathobj=edk2path) - source_table = SourceTable(n_jobs = 1) - + db = Edk2DB(tmp_path / "db.db", pathobj=edk2path) + db.register(SourceTable(n_jobs = 1)) # Verify we detect c and h files for file in ["file.c", "file.h"]: write_file(tmp_path / file, SOURCE_NO_LICENSE) - source_table.parse(db) - table = db.table("source") - assert len(table) == 1 - row = table.all()[0] - assert row["PATH"] == (tmp_path / file).relative_to(tmp_path).as_posix() - assert row["LICENSE"] == "" + db.parse() - db.drop_table("source") - (tmp_path / file).unlink() + rows = list(db.connection.cursor().execute("SELECT license FROM source")) + assert len(rows) == 2 + for license, in rows: + assert license == "Unknown" +def test_invalid_filetype(tmp_path): + """Tests that a source file that is not of the valid type is skipped.""" + edk2path = Edk2Path(str(tmp_path), []) + db = Edk2DB(tmp_path / "db.db", pathobj=edk2path) + db.register(SourceTable(n_jobs = 1)) # Ensure we don't catch a file that isnt a c / h file. write_file(tmp_path / "file1.py", SOURCE_LICENSE) - source_table.parse(db) - table = db.table("source") - assert len(table) == 0 + db.parse() + rows = list(db.connection.cursor().execute("SELECT license FROM source")) + assert len(rows) == 0 diff --git a/tests.unit/database/test_unused_component_query.py b/tests.unit/database/test_unused_component_query.py deleted file mode 100644 index 19434e623..000000000 --- a/tests.unit/database/test_unused_component_query.py +++ /dev/null @@ -1,192 +0,0 @@ -## -# unittest for the UnusedComponentQuery query -# -# Copyright (c) Microsoft Corporation -# -# Spdx-License-Identifier: BSD-2-Clause-Patent -## -# ruff: noqa: F811 -"""Unittest for the ComponentQuery query.""" -from pathlib import Path - -from common import Tree, correlate_env, empty_tree # noqa: F401 -from edk2toollib.database import Edk2DB -from edk2toollib.database.queries import UnusedComponentQuery -from edk2toollib.database.tables import EnvironmentTable, InstancedFvTable, InstancedInfTable -from edk2toollib.uefi.edk2.path_utilities import Edk2Path - - -def test_simple_unused_component(empty_tree: Tree): - """Tests that unused components are detected.""" - lib1 = empty_tree.create_library("TestLib1", "TestCls1") - lib2 = empty_tree.create_library("TestLib2", "TestCls2") - empty_tree.create_library("TestLib3", "TestCls3") - - comp1 = empty_tree.create_component( - "TestDriver1", "DXE_DRIVER", - libraryclasses = ["TestCls1"] - ) - comp2 = empty_tree.create_component( - "TestDriver2", "DXE_DRIVER", - libraryclasses = ["TestCls2"] - ) - - dsc = empty_tree.create_dsc( - libraryclasses = [ - f'TestCls1|{lib1}', - f'TestCls2|{lib2}', - ], - components = [ - comp1, - comp2, - ] - ) - - fdf = empty_tree.create_fdf( - fv_testfv = [ - f"INF {comp1}" - ] - ) - - edk2path = Edk2Path(str(empty_tree.ws), []) - db = Edk2DB(Edk2DB.MEM_RW, pathobj=edk2path) - env = { - "ACTIVE_PLATFORM": dsc, - "FLASH_DEFINITION": fdf, - "TARGET_ARCH": "IA32 X64", - "TARGET": "DEBUG", - } - db.register(InstancedFvTable(env=env), InstancedInfTable(env=env)) - db.parse() - comps, libs = db.search(UnusedComponentQuery()) - - assert len(comps) == 1 and comps[0] == Path(comp2).as_posix() - assert len(libs) == 1 and libs[0] == Path(lib2).as_posix() - -def test_env_unused_component(empty_tree: Tree): - """Tests that unused components are detected for different runs.""" - lib1 = empty_tree.create_library("TestLib1", "TestCls") - lib2 = empty_tree.create_library("TestLib2", "TestCls") - lib3 = empty_tree.create_library("TestLib3", "TestCls2") - lib4 = empty_tree.create_library("TestLib4", "TestCls2") - - comp1 = empty_tree.create_component( - "TestDriver1", "DXE_DRIVER", - libraryclasses = ["TestCls"] - ) - comp2 = empty_tree.create_component( - "TestDriver2", "DXE_DRIVER", - libraryclasses = ["TestCls2"] - ) - - dsc = empty_tree.create_dsc( - libraryclasses_ia32 = [ - f'TestCls|{lib1}', - f'TestCls2|{lib3}', - ], - libraryclasses_x64 = [ - f'TestCls|{lib2}', - f'TestCls2|{lib4}', - ], - components = [ - comp1, - comp2, - ] - ) - - fdf = empty_tree.create_fdf( - fv_testfv = [ - f"INF {comp1}" - ] - ) - - edk2path = Edk2Path(str(empty_tree.ws), []) - db = Edk2DB(Edk2DB.MEM_RW, pathobj=edk2path) - - env = { - "ACTIVE_PLATFORM": dsc, - "FLASH_DEFINITION": fdf, - "TARGET_ARCH": "IA32", - "TARGET": "DEBUG", - } - db.register( - InstancedFvTable(env=env), - InstancedInfTable(env=env), - EnvironmentTable(env=env), - ) - db.parse() - correlate_env(db) - - db.clear_parsers() - env = { - "ACTIVE_PLATFORM": dsc, - "FLASH_DEFINITION": fdf, - "TARGET_ARCH": "X64", - "TARGET": "DEBUG", - } - db.register( - InstancedFvTable(env=env), - InstancedInfTable(env=env), - EnvironmentTable(env=env), - ) - db.parse(append=True) - correlate_env(db) - - # Driver 2 goes unused as it's not in the FDF - # Driver 2 is built with Lib3 as our first scan is for ARCH IA32 - comps, libs = db.search(UnusedComponentQuery(env_id = 0)) - assert len(comps) == 1 and comps[0] == Path(comp2).as_posix() - assert len(libs) == 1 and libs[0] == Path(lib3).as_posix() - - # Driver 2 goes unused as it's not in the FDF - # Driver 2 is built with Lib2 as our second scan is for ARCH X64 - comps, libs = db.search(UnusedComponentQuery(env_id = 1)) - assert len(comps) == 1 and comps[0] == Path(comp2).as_posix() - assert len(libs) == 1 and libs[0] == Path(lib4).as_posix() - - # Driver 2 goes unused twice (It is built for IA32 and X64) as it's not in the FDF - # Driver 2 uses Lib3 for once instance, and Lib4 for the other, so both are considered unused - comps, libs = db.search(UnusedComponentQuery()) - assert len(comps) == 1 and comps[0] == Path(comp2).as_posix() - assert len(libs) == 2 and sorted(libs) == sorted([Path(lib3).as_posix(), Path(lib4).as_posix()]) - -def test_ignore_uefi_application(empty_tree: Tree): - """Tests that UEFI_APPLICATION components are ignored.""" - lib1 = empty_tree.create_library("TestLib1", "TestCls1") - - comp1 = empty_tree.create_component( - "TestDriver1", "UEFI_APPLICATION", - libraryclasses = ["TestCls1"] - ) - - dsc = empty_tree.create_dsc( - libraryclasses = [ - f'TestCls1|{lib1}', - ], - components = [ - comp1, - ] - ) - - fdf = empty_tree.create_fdf( - fv_testfv = [] - ) - - edk2path = Edk2Path(str(empty_tree.ws), []) - db = Edk2DB(Edk2DB.MEM_RW, pathobj=edk2path) - env = { - "ACTIVE_PLATFORM": dsc, - "FLASH_DEFINITION": fdf, - "TARGET_ARCH": "IA32 X64", - "TARGET": "DEBUG", - } - db.register(InstancedFvTable(env=env), InstancedInfTable(env=env)) - db.parse(env) - comps, libs = db.search(UnusedComponentQuery()) - - assert len(comps) == 1 and comps[0] == Path(comp1).as_posix() - assert len(libs) == 1 and libs[0] == Path(lib1).as_posix() - - comps, libs = db.search(UnusedComponentQuery(ignore_app = True)) - assert len(comps) == 0 - assert len(libs) == 0