diff --git a/CveXplore/core/database_maintenance/main_updater.py b/CveXplore/core/database_maintenance/main_updater.py index 653b139e..e7885e30 100644 --- a/CveXplore/core/database_maintenance/main_updater.py +++ b/CveXplore/core/database_maintenance/main_updater.py @@ -66,11 +66,11 @@ def reset_download_sources_to_default(self): return True - def update(self, update_source: str | list = None): + def update(self, update_source: str | list = None, manual_days: int = 0): """ Method used for updating the database """ - self.logger.info(f"Starting Database update....") + self.logger.info(f"Starting Database update...") start_time = time.time() if not self.do_initialize: @@ -80,42 +80,35 @@ def update(self, update_source: str | list = None): ) self.database_migrator.db_upgrade() - if update_source is not None: - if not isinstance(update_source, str | list): + if update_source is None: + # update all sources + update_source = [source["name"] for source in self.sources] + elif isinstance(update_source, str): + # update a single source + update_source = [update_source] + else: + # update list of sources + if not isinstance(update_source, list): raise ValueError("Wrong 'update_source' parameter type received!") - try: - if update_source is None: - for source in self.sources: - up = source["updater"]() - up.update() - - elif isinstance(update_source, list): - for source in update_source: - try: - update_this_source = [ - x for x in self.sources if x["name"] == source - ][0] - up = update_this_source["updater"]() - up.update() - except IndexError: - raise UpdateSourceNotFound( - f"Provided source: {source} could not be found...." + for source in update_source: + try: + update_this_source = [x for x in self.sources if x["name"] == source][0] + up = update_this_source["updater"]() + if manual_days > 0: + if update_this_source["name"] in ("cpe", "cve"): + up.update(manual_days=manual_days) + else: + self.logger.warning( + f"Update interval in days not supported by source {source}; ignoring" ) - else: - # single string then.... - try: - update_this_source = [ - x for x in self.sources if x["name"] == update_source - ][0] - up = update_this_source["updater"]() + up.update() + else: up.update() - except IndexError: - raise UpdateSourceNotFound( - f"Provided source: {update_source} could not be found...." - ) - except UpdateSourceNotFound: - raise + except IndexError: + raise UpdateSourceNotFound( + f"Provided source: {source} could not be found...." + ) self.database_indexer.create_indexes() diff --git a/CveXplore/core/database_maintenance/sources_process.py b/CveXplore/core/database_maintenance/sources_process.py index 3030ac16..cdec79d0 100644 --- a/CveXplore/core/database_maintenance/sources_process.py +++ b/CveXplore/core/database_maintenance/sources_process.py @@ -99,7 +99,7 @@ def process_the_item(self, item: dict = None): return cpe - def process_downloads(self, sites: list | None = None): + def process_downloads(self, sites: list | None = None, manual_days: int = 0): """ Method to download and process files """ @@ -156,29 +156,40 @@ def process_downloads(self, sites: list | None = None): f"Retrieval of api data on url: {data_list.args[0]} failed...." ) else: - last_mod_start_date = self.database[self.feed_type.lower()].find_one( - {}, {"lastModified": 1}, sort=[("lastModified", -1)] - ) + # Get datetime from runtime + last_mod_end_date = datetime.datetime.now() - if last_mod_start_date is not None: - if "lastModified" in last_mod_start_date: - last_mod_start_date = last_mod_start_date[ - "lastModified" - ] + datetime.timedelta( - 0, 1 - ) # add one second to prevent false results... - else: - raise KeyError( - "Missing field 'lastModified' from database query..." - ) - else: + # Use configured day interval or detect from the latest entry in the database + if manual_days > 120: self.logger.warning( - "No records found in the mongodb cpe collection.." + f"Update interval over 120 days not supported by the NVD API; ignoring" ) - return - - # Get datetime from runtime - last_mod_end_date = datetime.datetime.now() + if manual_days > 0 and manual_days <= 120: + last_mod_start_date = last_mod_end_date - datetime.timedelta( + days=manual_days + ) + else: + last_mod_start_date = self.database[ + self.feed_type.lower() + ].find_one({}, {"lastModified": 1}, sort=[("lastModified", -1)]) + + if last_mod_start_date is not None: + if "lastModified" in last_mod_start_date: + last_mod_start_date = last_mod_start_date[ + "lastModified" + ] + datetime.timedelta( + 0, 1 + ) # add one second to prevent false results... + else: + raise KeyError( + "Missing field 'lastModified' from database query..." + ) + else: + self.logger.warning( + "No records found in the mongodb cpe collection.." + ) + return + self.logger.info(f"Retrieving CPEs starting from {last_mod_start_date}") try: total_results = self.api_handler.get_count( @@ -231,10 +242,10 @@ def process_downloads(self, sites: list | None = None): f"Duration: {datetime.timedelta(seconds=time.time() - start_time)}" ) - def update(self, **kwargs): + def update(self, manual_days: int = 0): self.logger.info("CPE database update started") - self.process_downloads() + self.process_downloads(manual_days=manual_days) # if collection is non-existent; assume it's not an update if self.feed_type.lower() not in self.getTableNames(): @@ -644,7 +655,7 @@ def process_the_item(self, item: dict = None): return cve - def process_downloads(self, sites: list = None): + def process_downloads(self, sites: list = None, manual_days: int = 0): """ Method to download and process files """ @@ -701,25 +712,40 @@ def process_downloads(self, sites: list = None): f"Retrieval of api data on url: {data_list.args[0]} failed...." ) else: - last_mod_start_date = self.database[self.feed_type.lower()].find_one( - {}, {"lastModified": 1}, sort=[("lastModified", -1)] - ) + # Get datetime from runtime + last_mod_end_date = datetime.datetime.now() - if last_mod_start_date is not None: - if "lastModified" in last_mod_start_date: - last_mod_start_date = last_mod_start_date["lastModified"] - else: - raise KeyError( - "Missing field 'lastModified' from database query..." - ) - else: + # Use configured day interval or detect from the latest entry in the database + if manual_days > 120: self.logger.warning( - "No records found in the mongodb cves collection.." + f"Update interval over 120 days not supported by the NVD API; ignoring" ) - return - - # Get datetime from runtime - last_mod_end_date = datetime.datetime.now() + if manual_days > 0 and manual_days <= 120: + last_mod_start_date = last_mod_end_date - datetime.timedelta( + days=manual_days + ) + else: + last_mod_start_date = self.database[ + self.feed_type.lower() + ].find_one({}, {"lastModified": 1}, sort=[("lastModified", -1)]) + + if last_mod_start_date is not None: + if "lastModified" in last_mod_start_date: + last_mod_start_date = last_mod_start_date[ + "lastModified" + ] + datetime.timedelta( + 0, 1 + ) # add one second to prevent false results... + else: + raise KeyError( + "Missing field 'lastModified' from database query..." + ) + else: + self.logger.warning( + "No records found in the mongodb cpe collection.." + ) + return + self.logger.info(f"Retrieving CVEs starting from {last_mod_start_date}") try: total_results = self.api_handler.get_count( @@ -772,10 +798,10 @@ def process_downloads(self, sites: list = None): f"Duration: {datetime.timedelta(seconds=time.time() - start_time)}" ) - def update(self): + def update(self, manual_days: int = 0): self.logger.info("CVE database update started") - self.process_downloads() + self.process_downloads(manual_days=manual_days) # if collection is non-existent; assume it's not an update if self.feed_type.lower() not in self.getTableNames(): diff --git a/README.rst b/README.rst index 01ff0632..120aced1 100644 --- a/README.rst +++ b/README.rst @@ -111,6 +111,14 @@ You can add your `NIST API Key >> cvx.database.update(manual_days=7) + Package usage -------------