From 32878ab2e1faf3d5df42e05d0653c0855a47f8ff Mon Sep 17 00:00:00 2001 From: Aseem Bansal Date: Thu, 7 Nov 2024 04:38:13 +0530 Subject: [PATCH] fix(ingest/gc): add limit, add actual loop for iterating over batches (#11809) Co-authored-by: treff7es --- .../source/gc/soft_deleted_entity_cleanup.py | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py b/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py index 4da23c13659a7..7ec7bb7e589d6 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py @@ -59,6 +59,14 @@ class SoftDeletedEntitiesCleanupConfig(ConfigModel): default=None, description="Query to filter entities", ) + limit_entities_delete: Optional[int] = Field( + 10000, description="Max number of entities to delete." + ) + + runtime_limit_seconds: Optional[int] = Field( + None, + description="Runtime limit in seconds", + ) @dataclass @@ -122,6 +130,10 @@ def delete_entity(self, urn: str) -> None: return self.ctx.graph.delete_entity(urn=urn, hard=True) + self.ctx.graph.delete_references_to_urn( + urn=urn, + dry_run=False, + ) def delete_soft_deleted_entity(self, urn: str) -> None: assert self.ctx.graph @@ -145,6 +157,7 @@ def delete_soft_deleted_entity(self, urn: str) -> None: def cleanup_soft_deleted_entities(self) -> None: assert self.ctx.graph + start_time = time.time() deleted_count_retention = 0 urns = self.ctx.graph.get_urns_by_filter( @@ -158,7 +171,26 @@ def cleanup_soft_deleted_entities(self) -> None: futures = {} with ThreadPoolExecutor(max_workers=self.config.max_workers) as executor: + num_urns_submitted = 0 for urn in urns: + num_urns_submitted += 1 + if ( + self.config.limit_entities_delete + and num_urns_submitted > self.config.limit_entities_delete + ): + logger.info( + f"Limit of {self.config.limit_entities_delete} entities reached. Stopping" + ) + break + if ( + self.config.runtime_limit_seconds + and time.time() - start_time > self.config.runtime_limit_seconds + ): + logger.info( + f"Runtime limit of {self.config.runtime_limit_seconds} seconds reached. Stopping" + ) + break + future = executor.submit(self.delete_soft_deleted_entity, urn) futures[future] = urn