Skip to content

Commit

Permalink
Set update interval (1-120 days) manually for NVD API
Browse files Browse the repository at this point in the history
  • Loading branch information
oh2fih committed Jul 2, 2024
1 parent cb638fe commit 8dbc27e
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 45 deletions.
15 changes: 12 additions & 3 deletions CveXplore/core/database_maintenance/main_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ def reset_download_sources_to_default(self):

return True

def update(self, update_source: str | list = None):
def update(self, update_source: str | list = None, update_days: int = 0):
"""
Method used for updating the database
"""
self.logger.info(f"Starting Database update....")
self.logger.info(f"Starting Database update...")
start_time = time.time()

if not self.do_initialize:
Expand All @@ -88,7 +88,16 @@ def update(self, update_source: str | list = None):
if update_source is None:
for source in self.sources:
up = source["updater"]()
up.update()
if update_days > 0:
if source["name"] in ("cpe", "cve"):
up.update(update_days=update_days)
else:
self.logger.warning(
f"Update interval in days not supported by source {source}; ignoring"
)
up.update()
else:
up.update()

elif isinstance(update_source, list):
for source in update_source:
Expand Down
110 changes: 68 additions & 42 deletions CveXplore/core/database_maintenance/sources_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def process_the_item(self, item: dict = None):

return cpe

def process_downloads(self, sites: list | None = None):
def process_downloads(self, sites: list | None = None, update_days: int = 0):
"""
Method to download and process files
"""
Expand Down Expand Up @@ -156,29 +156,40 @@ def process_downloads(self, sites: list | None = None):
f"Retrieval of api data on url: {data_list.args[0]} failed...."
)
else:
last_mod_start_date = self.database[self.feed_type.lower()].find_one(
{}, {"lastModified": 1}, sort=[("lastModified", -1)]
)
# Get datetime from runtime
last_mod_end_date = datetime.datetime.now()

if last_mod_start_date is not None:
if "lastModified" in last_mod_start_date:
last_mod_start_date = last_mod_start_date[
"lastModified"
] + datetime.timedelta(
0, 1
) # add one second to prevent false results...
else:
raise KeyError(
"Missing field 'lastModified' from database query..."
)
else:
# Use configured day interval or detect from the latest entry in the database
if update_days > 120:
self.logger.warning(
"No records found in the mongodb cpe collection.."
f"Update interval over 120 days not supported by the NVD API; ignoring"
)
return

# Get datetime from runtime
last_mod_end_date = datetime.datetime.now()
if update_days > 0 and update_days < 120:
last_mod_start_date = last_mod_end_date - datetime.timedelta(
days=update_days
)
else:
last_mod_start_date = self.database[
self.feed_type.lower()
].find_one({}, {"lastModified": 1}, sort=[("lastModified", -1)])

if last_mod_start_date is not None:
if "lastModified" in last_mod_start_date:
last_mod_start_date = last_mod_start_date[
"lastModified"
] + datetime.timedelta(
0, 1
) # add one second to prevent false results...
else:
raise KeyError(
"Missing field 'lastModified' from database query..."
)
else:
self.logger.warning(
"No records found in the mongodb cpe collection.."
)
return
self.logger.info(f"Retrieving CPEs starting from {last_mod_start_date}")

try:
total_results = self.api_handler.get_count(
Expand Down Expand Up @@ -231,10 +242,10 @@ def process_downloads(self, sites: list | None = None):
f"Duration: {datetime.timedelta(seconds=time.time() - start_time)}"
)

def update(self, **kwargs):
def update(self, update_days: int = 0):
self.logger.info("CPE database update started")

self.process_downloads()
self.process_downloads(update_days=update_days)

# if collection is non-existent; assume it's not an update
if self.feed_type.lower() not in self.getTableNames():
Expand Down Expand Up @@ -644,7 +655,7 @@ def process_the_item(self, item: dict = None):

return cve

def process_downloads(self, sites: list = None):
def process_downloads(self, sites: list = None, update_days: int = 0):
"""
Method to download and process files
"""
Expand Down Expand Up @@ -701,25 +712,40 @@ def process_downloads(self, sites: list = None):
f"Retrieval of api data on url: {data_list.args[0]} failed...."
)
else:
last_mod_start_date = self.database[self.feed_type.lower()].find_one(
{}, {"lastModified": 1}, sort=[("lastModified", -1)]
)
# Get datetime from runtime
last_mod_end_date = datetime.datetime.now()

if last_mod_start_date is not None:
if "lastModified" in last_mod_start_date:
last_mod_start_date = last_mod_start_date["lastModified"]
else:
raise KeyError(
"Missing field 'lastModified' from database query..."
)
else:
# Use configured day interval or detect from the latest entry in the database
if update_days > 120:
self.logger.warning(
"No records found in the mongodb cves collection.."
f"Update interval over 120 days not supported by the NVD API; ignoring"
)
return

# Get datetime from runtime
last_mod_end_date = datetime.datetime.now()
if update_days > 0 and update_days < 120:
last_mod_start_date = last_mod_end_date - datetime.timedelta(
days=update_days
)
else:
last_mod_start_date = self.database[
self.feed_type.lower()
].find_one({}, {"lastModified": 1}, sort=[("lastModified", -1)])

if last_mod_start_date is not None:
if "lastModified" in last_mod_start_date:
last_mod_start_date = last_mod_start_date[
"lastModified"
] + datetime.timedelta(
0, 1
) # add one second to prevent false results...
else:
raise KeyError(
"Missing field 'lastModified' from database query..."
)
else:
self.logger.warning(
"No records found in the mongodb cpe collection.."
)
return
self.logger.info(f"Retrieving CVEs starting from {last_mod_start_date}")

try:
total_results = self.api_handler.get_count(
Expand Down Expand Up @@ -772,10 +798,10 @@ def process_downloads(self, sites: list = None):
f"Duration: {datetime.timedelta(seconds=time.time() - start_time)}"
)

def update(self):
def update(self, update_days: int = 0):
self.logger.info("CVE database update started")

self.process_downloads()
self.process_downloads(update_days=update_days)

# if collection is non-existent; assume it's not an update
if self.feed_type.lower() not in self.getTableNames():
Expand Down
8 changes: 8 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ You can add your `NIST API Key <https://nvd.nist.gov/developers/request-an-api-k
:code:`NVD_NIST_API_KEY` (e.g., in the :code:`~/.cvexplore/.env` file). You can populate CveXplore without an API key,
but it will limit the amount of parallel requests made to the NIST API.

For the NVD API, the update starts from the last modified document in the database. In case of missing CPEs or CVEs
caused by failures during the regular updates you can manually update entries for 1–120 days. (If the period is longer
than 120 days you would need to re-populate the entire database.) Example:

.. code-block:: python
>>> cvx.database.update(update_days=7)
Package usage
-------------

Expand Down

0 comments on commit 8dbc27e

Please sign in to comment.