diff --git a/CveXplore/VERSION b/CveXplore/VERSION index c074496f..ed63cdf2 100644 --- a/CveXplore/VERSION +++ b/CveXplore/VERSION @@ -1 +1 @@ -0.3.8.dev3 \ No newline at end of file +0.3.9 \ No newline at end of file diff --git a/CveXplore/cli.py b/CveXplore/cli.py index 3c7122c7..3dac9dcb 100644 --- a/CveXplore/cli.py +++ b/CveXplore/cli.py @@ -4,9 +4,9 @@ import click_completion.core from CveXplore.cli_cmds.capec_cmds import commands as group6 -from CveXplore.cli_cmds.cwe_cmds import commands as group7 from CveXplore.cli_cmds.cpe_cmds import commands as group5 from CveXplore.cli_cmds.cve_cmds import commands as group2 +from CveXplore.cli_cmds.cwe_cmds import commands as group7 from CveXplore.cli_cmds.db_cmds import commands as group4 from CveXplore.cli_cmds.find_cmds import commands as group1 from CveXplore.cli_cmds.stats_cmds import commands as group3 diff --git a/CveXplore/cli_cmds/capec_cmds/commands.py b/CveXplore/cli_cmds/capec_cmds/commands.py index a77c3547..5d572ee9 100644 --- a/CveXplore/cli_cmds/capec_cmds/commands.py +++ b/CveXplore/cli_cmds/capec_cmds/commands.py @@ -21,8 +21,6 @@ def capec_cmd(ctx): "--capec", help="Search for CAPEC's (could be multiple)", multiple=True, - cls=Mutex, - not_required_if=["field_list"], ) @click.option( "-f", @@ -36,10 +34,9 @@ def capec_cmd(ctx): "-fl", "--field_list", help="Return a field list for this collection", - multiple=True, is_flag=True, cls=Mutex, - not_required_if=["field", "capec"], + not_required_if=["field"], ) @click.option( "-o", @@ -56,18 +53,18 @@ def search_cmd( field_list, output, ): - if capec: + if capec and not field_list: ret_list = getattr(ctx.obj["data_source"], "capec").mget_by_id(*capec) - elif field_list: - ret_list = getattr(ctx.obj["data_source"], "capec").field_list() + elif capec and field_list: + ret_list = getattr(ctx.obj["data_source"], "capec").field_list(*capec) else: click.echo(search_cmd.get_help(ctx)) return - if isinstance(ret_list, list): + if isinstance(ret_list, list) and not field_list: result = [result.to_dict(*field) for result in ret_list] - elif isinstance(ret_list, set): - result = sorted([result for result in ret_list]) + elif field_list: + result = [result for result in ret_list] else: result = [] diff --git a/CveXplore/cli_cmds/cpe_cmds/commands.py b/CveXplore/cli_cmds/cpe_cmds/commands.py index c4cd154f..d107d8d0 100644 --- a/CveXplore/cli_cmds/cpe_cmds/commands.py +++ b/CveXplore/cli_cmds/cpe_cmds/commands.py @@ -50,7 +50,7 @@ def cpe_cmd(ctx): help="Search for CPE's (could be multiple) by id", multiple=True, cls=Mutex, - not_required_if=["field_list", "name", "title", "vendor"], + not_required_if=["name", "title", "vendor"], ) @click.option( "-f", @@ -64,10 +64,9 @@ def cpe_cmd(ctx): "-fl", "--field_list", help="Return a field list for this collection", - multiple=True, is_flag=True, cls=Mutex, - not_required_if=["field", "cpe", "name", "title", "vendor"], + not_required_if=["field", "name", "title", "vendor"], ) @click.option( "-m", @@ -84,7 +83,7 @@ def cpe_cmd(ctx): not_required_if=["match"], ) @click.option("-d", "--deprecated", is_flag=True, help="Filter deprecated cpe's") -@click.option("-c", "--cve", is_flag=True, help="Add related CVE's") +@click.option("-rc", "--related_cve", is_flag=True, help="Add related CVE's") @click.option( "-p", "--product_search", @@ -113,7 +112,7 @@ def search_cmd( match, regex, deprecated, - cve, + related_cve, product_search, limit, sort, @@ -155,20 +154,20 @@ def search_cmd( .limit(limit) .sort(search_by, sorting) ) - elif cpe: + elif cpe and not field_list: ret_list = getattr(ctx.obj["data_source"], "cpe").mget_by_id(*cpe) - elif field_list: - ret_list = getattr(ctx.obj["data_source"], "cpe").field_list() + elif cpe and field_list: + ret_list = getattr(ctx.obj["data_source"], "cpe").field_list(*cpe) else: click.echo(search_cmd.get_help(ctx)) return - if cve: + if related_cve: result = [result.to_cve_summary(product_search) for result in ret_list] - elif cpe: + elif cpe and not field_list: result = [result.to_dict(*field) for result in ret_list] - elif isinstance(ret_list, set): - result = sorted([result for result in ret_list]) + elif field_list: + result = [result for result in ret_list] else: result = [result.to_dict() for result in ret_list] diff --git a/CveXplore/cli_cmds/cve_cmds/commands.py b/CveXplore/cli_cmds/cve_cmds/commands.py index 60f3e274..3baf03ed 100644 --- a/CveXplore/cli_cmds/cve_cmds/commands.py +++ b/CveXplore/cli_cmds/cve_cmds/commands.py @@ -21,8 +21,6 @@ def cve_cmd(ctx): "--cve", help="Search for CVE's (could be multiple)", multiple=True, - cls=Mutex, - not_required_if=["field_list"], ) @click.option( "-f", @@ -36,10 +34,9 @@ def cve_cmd(ctx): "-fl", "--field_list", help="Return a field list for this collection", - multiple=True, is_flag=True, cls=Mutex, - not_required_if=["field", "cve"], + not_required_if=["field"], ) @click.option( "-o", @@ -56,18 +53,18 @@ def search_cmd( field_list, output, ): - if cve: + if cve and not field_list: ret_list = getattr(ctx.obj["data_source"], "cves").mget_by_id(*cve) - elif field_list: - ret_list = getattr(ctx.obj["data_source"], "cves").field_list() + elif cve and field_list: + ret_list = getattr(ctx.obj["data_source"], "cves").field_list(*cve) else: click.echo(search_cmd.get_help(ctx)) return - if isinstance(ret_list, list): + if isinstance(ret_list, list) and not field_list: result = [result.to_dict(*field) for result in ret_list] - elif isinstance(ret_list, set): - result = sorted([result for result in ret_list]) + elif field_list: + result = [result for result in ret_list] else: result = [] diff --git a/CveXplore/cli_cmds/cwe_cmds/commands.py b/CveXplore/cli_cmds/cwe_cmds/commands.py index 5864f25c..d87ff497 100644 --- a/CveXplore/cli_cmds/cwe_cmds/commands.py +++ b/CveXplore/cli_cmds/cwe_cmds/commands.py @@ -21,8 +21,6 @@ def cwe_cmd(ctx): "--cwe", help="Search for CWE's (could be multiple)", multiple=True, - cls=Mutex, - not_required_if=["field_list"], ) @click.option( "-f", @@ -36,10 +34,9 @@ def cwe_cmd(ctx): "-fl", "--field_list", help="Return a field list for this collection", - multiple=True, is_flag=True, cls=Mutex, - not_required_if=["field", "cwe"], + not_required_if=["field"], ) @click.option( "-o", @@ -56,18 +53,18 @@ def search_cmd( field_list, output, ): - if cwe: + if cwe and not field_list: ret_list = getattr(ctx.obj["data_source"], "cwe").mget_by_id(*cwe) - elif field_list: - ret_list = getattr(ctx.obj["data_source"], "cwe").field_list() + elif cwe and field_list: + ret_list = getattr(ctx.obj["data_source"], "cwe").field_list(*cwe) else: click.echo(search_cmd.get_help(ctx)) return - if isinstance(ret_list, list): + if isinstance(ret_list, list) and not field_list: result = [result.to_dict(*field) for result in ret_list] - elif isinstance(ret_list, set): - result = sorted([result for result in ret_list]) + elif field_list: + result = [result for result in ret_list] else: result = [] diff --git a/CveXplore/database/helpers/generic_db.py b/CveXplore/database/helpers/generic_db.py index 8b588341..d8f20d06 100644 --- a/CveXplore/database/helpers/generic_db.py +++ b/CveXplore/database/helpers/generic_db.py @@ -94,19 +94,38 @@ def mget_by_id(self, *doc_ids: str) -> Union[Iterable[CveXploreObject], Iterable else: return ret_data - def field_list(self) -> set: + def _field_list(self, doc_id: str) -> list: """ Method to fetch all field names from a specific collection """ - return reduce( - lambda all_keys, rec_keys: all_keys | set(rec_keys), - map( - lambda d: d.to_dict(), - [self._datasource_collection_connection.find_one()], - ), - set(), + return sorted( + list( + reduce( + lambda all_keys, rec_keys: all_keys | set(rec_keys), + map( + lambda d: d.to_dict(), + [ + self._datasource_collection_connection.find_one( + {"id": doc_id} + ) + ], + ), + set(), + ) + ) ) + def field_list(self, *doc_ids: str) -> list | dict: + """ + Method to fetch all field names from a specific collection + """ + ret_data = [] + for doc_id in sorted(doc_ids): + data = self._field_list(doc_id=doc_id) + ret_data.append({doc_id: data, "field_count": len(data)}) + + return ret_data + def __repr__(self): """String representation of object""" return "<< GenericDatabaseFactory:{} >>".format(self._collection) diff --git a/CveXplore/database/helpers/specific_db.py b/CveXplore/database/helpers/specific_db.py index 9d60c302..4702486d 100644 --- a/CveXplore/database/helpers/specific_db.py +++ b/CveXplore/database/helpers/specific_db.py @@ -3,7 +3,8 @@ =========================== """ import re -from typing import List, Union, Iterable +from functools import reduce +from typing import List from pymongo import DESCENDING @@ -39,11 +40,7 @@ def get_cves_for_vendor( else: return None - def get_by_id(self, doc_id: str): - """ - Method to retrieve a single CVE from the database by its CVE ID number. - The number format should be either CVE-2000-0001, cve-2000-0001 or 2000-0001. - """ + def __parse_cve_id(self, doc_id: str) -> str: # first try to match full cve number format reg_match = re.compile(r"[cC][vV][eE]-\d{4}-\d{4,10}") if reg_match.match(doc_id) is not None: @@ -57,6 +54,14 @@ def get_by_id(self, doc_id: str): "Could not validate the CVE number. The number format should be either " "CVE-2000-0001, cve-2000-0001 or 2000-0001." ) + return doc_id + + def get_by_id(self, doc_id: str): + """ + Method to retrieve a single CVE from the database by its CVE ID number. + The number format should be either CVE-2000-0001, cve-2000-0001 or 2000-0001. + """ + doc_id = self.__parse_cve_id(doc_id) if not isinstance(doc_id, str): try: @@ -66,6 +71,29 @@ def get_by_id(self, doc_id: str): return self._datasource_collection_connection.find_one({"id": doc_id}) + def _field_list(self, doc_id: str) -> list: + """ + Method to fetch all field names from a specific collection + """ + doc_id = self.__parse_cve_id(doc_id) + + return sorted( + list( + reduce( + lambda all_keys, rec_keys: all_keys | set(rec_keys), + map( + lambda d: d.to_dict(), + [ + self._datasource_collection_connection.find_one( + {"id": doc_id} + ) + ], + ), + set(), + ) + ) + ) + def __repr__(self): """String representation of object""" return "<< CvesDatabaseFunctions:{} >>".format(self._collection)