From 96f2c3362388db8f473e708f4dde99ec32be3aa1 Mon Sep 17 00:00:00 2001 From: treff7es Date: Wed, 6 Nov 2024 23:33:15 +0100 Subject: [PATCH] Fix iterator --- .../source/gc/soft_deleted_entity_cleanup.py | 105 ++++++++++-------- 1 file changed, 59 insertions(+), 46 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py b/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py index 9ccad559f9eac..b6779b86c77c0 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py @@ -59,10 +59,15 @@ class SoftDeletedEntitiesCleanupConfig(ConfigModel): default=None, description="Query to filter entities", ) - limit_entities_delete: int = Field( + limit_entities_delete: Optional[int] = Field( 10000, description="Max number of entities to delete." ) + runtime_limit_seconds: Optional[int] = Field( + None, + description="Runtime limit in seconds", + ) + @dataclass class SoftDeletedEntitiesReport(SourceReport): @@ -152,52 +157,60 @@ def delete_soft_deleted_entity(self, urn: str) -> None: def cleanup_soft_deleted_entities(self) -> None: assert self.ctx.graph + start_time = time.time() deleted_count_retention = 0 - while ( - self.report.num_soft_deleted_entity_removed - <= self.config.limit_entities_delete - ): - urns = list( - self.ctx.graph.get_urns_by_filter( - entity_types=self.config.entity_types, - platform=self.config.platform, - env=self.config.env, - query=self.config.query, - status=RemovedStatusFilter.ONLY_SOFT_DELETED, - batch_size=self.config.batch_size, - ) - ) - if len(urns) == 0: - logger.info("No more urns found") - return - - futures = {} - with ThreadPoolExecutor(max_workers=self.config.max_workers) as executor: - for urn in urns: - future = executor.submit(self.delete_soft_deleted_entity, urn) - futures[future] = urn - - if not futures: - return - for future in as_completed(futures): - if future.exception(): - logger.error( - f"Failed to delete entity {futures[future]}: {future.exception()}" - ) - self.report.failure( - f"Failed to delete entity {futures[future]}", - exc=future.exception(), - ) - deleted_count_retention += 1 + urns = self.ctx.graph.get_urns_by_filter( + entity_types=self.config.entity_types, + platform=self.config.platform, + env=self.config.env, + query=self.config.query, + status=RemovedStatusFilter.ONLY_SOFT_DELETED, + batch_size=self.config.batch_size, + ) - if deleted_count_retention % self.config.batch_size == 0: - logger.info( - f"Processed {deleted_count_retention} soft deleted entity and deleted {self.report.num_soft_deleted_entity_removed} entities so far" + futures = {} + with ThreadPoolExecutor(max_workers=self.config.max_workers) as executor: + num_urns_submitted = 0 + for urn in urns: + num_urns_submitted += 1 + if ( + self.config.limit_entities_delete + and num_urns_submitted > self.config.limit_entities_delete + ): + logger.info( + f"Limit of {self.config.limit_entities_delete} entities reached. Stopping" + ) + break + if time.time() - start_time > self.config.runtime_limit_seconds: + logger.info( + f"Runtime limit of {self.config.runtime_limit_seconds} seconds reached. Stopping" + ) + break + + future = executor.submit(self.delete_soft_deleted_entity, urn) + futures[future] = urn + + if not futures: + return + for future in as_completed(futures): + if future.exception(): + logger.error( + f"Failed to delete entity {futures[future]}: {future.exception()}" + ) + self.report.failure( + f"Failed to delete entity {futures[future]}", + exc=future.exception(), + ) + deleted_count_retention += 1 + + if deleted_count_retention % self.config.batch_size == 0: + logger.info( + f"Processed {deleted_count_retention} soft deleted entity and deleted {self.report.num_soft_deleted_entity_removed} entities so far" + ) + + if self.config.delay: + logger.debug( + f"Sleeping for {self.config.delay} seconds before getting next batch" ) - - if self.config.delay: - logger.debug( - f"Sleeping for {self.config.delay} seconds before getting next batch" - ) - time.sleep(self.config.delay) + time.sleep(self.config.delay)