Skip to content

Commit

Permalink
restructured field list command
Browse files Browse the repository at this point in the history
  • Loading branch information
P-T-I committed Oct 17, 2023
1 parent 3fe6b61 commit 6cbe624
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 58 deletions.
2 changes: 1 addition & 1 deletion CveXplore/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.3.8.dev3
0.3.9
2 changes: 1 addition & 1 deletion CveXplore/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
import click_completion.core

from CveXplore.cli_cmds.capec_cmds import commands as group6
from CveXplore.cli_cmds.cwe_cmds import commands as group7
from CveXplore.cli_cmds.cpe_cmds import commands as group5
from CveXplore.cli_cmds.cve_cmds import commands as group2
from CveXplore.cli_cmds.cwe_cmds import commands as group7
from CveXplore.cli_cmds.db_cmds import commands as group4
from CveXplore.cli_cmds.find_cmds import commands as group1
from CveXplore.cli_cmds.stats_cmds import commands as group3
Expand Down
17 changes: 7 additions & 10 deletions CveXplore/cli_cmds/capec_cmds/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ def capec_cmd(ctx):
"--capec",
help="Search for CAPEC's (could be multiple)",
multiple=True,
cls=Mutex,
not_required_if=["field_list"],
)
@click.option(
"-f",
Expand All @@ -36,10 +34,9 @@ def capec_cmd(ctx):
"-fl",
"--field_list",
help="Return a field list for this collection",
multiple=True,
is_flag=True,
cls=Mutex,
not_required_if=["field", "capec"],
not_required_if=["field"],
)
@click.option(
"-o",
Expand All @@ -56,18 +53,18 @@ def search_cmd(
field_list,
output,
):
if capec:
if capec and not field_list:
ret_list = getattr(ctx.obj["data_source"], "capec").mget_by_id(*capec)
elif field_list:
ret_list = getattr(ctx.obj["data_source"], "capec").field_list()
elif capec and field_list:
ret_list = getattr(ctx.obj["data_source"], "capec").field_list(*capec)
else:
click.echo(search_cmd.get_help(ctx))
return

if isinstance(ret_list, list):
if isinstance(ret_list, list) and not field_list:
result = [result.to_dict(*field) for result in ret_list]
elif isinstance(ret_list, set):
result = sorted([result for result in ret_list])
elif field_list:
result = [result for result in ret_list]
else:
result = []

Expand Down
23 changes: 11 additions & 12 deletions CveXplore/cli_cmds/cpe_cmds/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def cpe_cmd(ctx):
help="Search for CPE's (could be multiple) by id",
multiple=True,
cls=Mutex,
not_required_if=["field_list", "name", "title", "vendor"],
not_required_if=["name", "title", "vendor"],
)
@click.option(
"-f",
Expand All @@ -64,10 +64,9 @@ def cpe_cmd(ctx):
"-fl",
"--field_list",
help="Return a field list for this collection",
multiple=True,
is_flag=True,
cls=Mutex,
not_required_if=["field", "cpe", "name", "title", "vendor"],
not_required_if=["field", "name", "title", "vendor"],
)
@click.option(
"-m",
Expand All @@ -84,7 +83,7 @@ def cpe_cmd(ctx):
not_required_if=["match"],
)
@click.option("-d", "--deprecated", is_flag=True, help="Filter deprecated cpe's")
@click.option("-c", "--cve", is_flag=True, help="Add related CVE's")
@click.option("-rc", "--related_cve", is_flag=True, help="Add related CVE's")
@click.option(
"-p",
"--product_search",
Expand Down Expand Up @@ -113,7 +112,7 @@ def search_cmd(
match,
regex,
deprecated,
cve,
related_cve,
product_search,
limit,
sort,
Expand Down Expand Up @@ -155,20 +154,20 @@ def search_cmd(
.limit(limit)
.sort(search_by, sorting)
)
elif cpe:
elif cpe and not field_list:
ret_list = getattr(ctx.obj["data_source"], "cpe").mget_by_id(*cpe)
elif field_list:
ret_list = getattr(ctx.obj["data_source"], "cpe").field_list()
elif cpe and field_list:
ret_list = getattr(ctx.obj["data_source"], "cpe").field_list(*cpe)
else:
click.echo(search_cmd.get_help(ctx))
return

if cve:
if related_cve:
result = [result.to_cve_summary(product_search) for result in ret_list]
elif cpe:
elif cpe and not field_list:
result = [result.to_dict(*field) for result in ret_list]
elif isinstance(ret_list, set):
result = sorted([result for result in ret_list])
elif field_list:
result = [result for result in ret_list]
else:
result = [result.to_dict() for result in ret_list]

Expand Down
17 changes: 7 additions & 10 deletions CveXplore/cli_cmds/cve_cmds/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ def cve_cmd(ctx):
"--cve",
help="Search for CVE's (could be multiple)",
multiple=True,
cls=Mutex,
not_required_if=["field_list"],
)
@click.option(
"-f",
Expand All @@ -36,10 +34,9 @@ def cve_cmd(ctx):
"-fl",
"--field_list",
help="Return a field list for this collection",
multiple=True,
is_flag=True,
cls=Mutex,
not_required_if=["field", "cve"],
not_required_if=["field"],
)
@click.option(
"-o",
Expand All @@ -56,18 +53,18 @@ def search_cmd(
field_list,
output,
):
if cve:
if cve and not field_list:
ret_list = getattr(ctx.obj["data_source"], "cves").mget_by_id(*cve)
elif field_list:
ret_list = getattr(ctx.obj["data_source"], "cves").field_list()
elif cve and field_list:
ret_list = getattr(ctx.obj["data_source"], "cves").field_list(*cve)
else:
click.echo(search_cmd.get_help(ctx))
return

if isinstance(ret_list, list):
if isinstance(ret_list, list) and not field_list:
result = [result.to_dict(*field) for result in ret_list]
elif isinstance(ret_list, set):
result = sorted([result for result in ret_list])
elif field_list:
result = [result for result in ret_list]
else:
result = []

Expand Down
17 changes: 7 additions & 10 deletions CveXplore/cli_cmds/cwe_cmds/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ def cwe_cmd(ctx):
"--cwe",
help="Search for CWE's (could be multiple)",
multiple=True,
cls=Mutex,
not_required_if=["field_list"],
)
@click.option(
"-f",
Expand All @@ -36,10 +34,9 @@ def cwe_cmd(ctx):
"-fl",
"--field_list",
help="Return a field list for this collection",
multiple=True,
is_flag=True,
cls=Mutex,
not_required_if=["field", "cwe"],
not_required_if=["field"],
)
@click.option(
"-o",
Expand All @@ -56,18 +53,18 @@ def search_cmd(
field_list,
output,
):
if cwe:
if cwe and not field_list:
ret_list = getattr(ctx.obj["data_source"], "cwe").mget_by_id(*cwe)
elif field_list:
ret_list = getattr(ctx.obj["data_source"], "cwe").field_list()
elif cwe and field_list:
ret_list = getattr(ctx.obj["data_source"], "cwe").field_list(*cwe)
else:
click.echo(search_cmd.get_help(ctx))
return

if isinstance(ret_list, list):
if isinstance(ret_list, list) and not field_list:
result = [result.to_dict(*field) for result in ret_list]
elif isinstance(ret_list, set):
result = sorted([result for result in ret_list])
elif field_list:
result = [result for result in ret_list]
else:
result = []

Expand Down
35 changes: 27 additions & 8 deletions CveXplore/database/helpers/generic_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,19 +94,38 @@ def mget_by_id(self, *doc_ids: str) -> Union[Iterable[CveXploreObject], Iterable
else:
return ret_data

def field_list(self) -> set:
def _field_list(self, doc_id: str) -> list:
"""
Method to fetch all field names from a specific collection
"""
return reduce(
lambda all_keys, rec_keys: all_keys | set(rec_keys),
map(
lambda d: d.to_dict(),
[self._datasource_collection_connection.find_one()],
),
set(),
return sorted(
list(
reduce(
lambda all_keys, rec_keys: all_keys | set(rec_keys),
map(
lambda d: d.to_dict(),
[
self._datasource_collection_connection.find_one(
{"id": doc_id}
)
],
),
set(),
)
)
)

def field_list(self, *doc_ids: str) -> list | dict:
"""
Method to fetch all field names from a specific collection
"""
ret_data = []
for doc_id in sorted(doc_ids):
data = self._field_list(doc_id=doc_id)
ret_data.append({doc_id: data, "field_count": len(data)})

return ret_data

def __repr__(self):
"""String representation of object"""
return "<< GenericDatabaseFactory:{} >>".format(self._collection)
Expand Down
40 changes: 34 additions & 6 deletions CveXplore/database/helpers/specific_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
===========================
"""
import re
from typing import List, Union, Iterable
from functools import reduce
from typing import List

from pymongo import DESCENDING

Expand Down Expand Up @@ -39,11 +40,7 @@ def get_cves_for_vendor(
else:
return None

def get_by_id(self, doc_id: str):
"""
Method to retrieve a single CVE from the database by its CVE ID number.
The number format should be either CVE-2000-0001, cve-2000-0001 or 2000-0001.
"""
def __parse_cve_id(self, doc_id: str) -> str:
# first try to match full cve number format
reg_match = re.compile(r"[cC][vV][eE]-\d{4}-\d{4,10}")
if reg_match.match(doc_id) is not None:
Expand All @@ -57,6 +54,14 @@ def get_by_id(self, doc_id: str):
"Could not validate the CVE number. The number format should be either "
"CVE-2000-0001, cve-2000-0001 or 2000-0001."
)
return doc_id

def get_by_id(self, doc_id: str):
"""
Method to retrieve a single CVE from the database by its CVE ID number.
The number format should be either CVE-2000-0001, cve-2000-0001 or 2000-0001.
"""
doc_id = self.__parse_cve_id(doc_id)

if not isinstance(doc_id, str):
try:
Expand All @@ -66,6 +71,29 @@ def get_by_id(self, doc_id: str):

return self._datasource_collection_connection.find_one({"id": doc_id})

def _field_list(self, doc_id: str) -> list:
"""
Method to fetch all field names from a specific collection
"""
doc_id = self.__parse_cve_id(doc_id)

return sorted(
list(
reduce(
lambda all_keys, rec_keys: all_keys | set(rec_keys),
map(
lambda d: d.to_dict(),
[
self._datasource_collection_connection.find_one(
{"id": doc_id}
)
],
),
set(),
)
)
)

def __repr__(self):
"""String representation of object"""
return "<< CvesDatabaseFunctions:{} >>".format(self._collection)
Expand Down

0 comments on commit 6cbe624

Please sign in to comment.