Skip to content

Commit

Permalink
Fix iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
treff7es committed Nov 6, 2024
1 parent a4541fd commit 96f2c33
Showing 1 changed file with 59 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,15 @@ class SoftDeletedEntitiesCleanupConfig(ConfigModel):
default=None,
description="Query to filter entities",
)
limit_entities_delete: int = Field(
limit_entities_delete: Optional[int] = Field(
10000, description="Max number of entities to delete."
)

runtime_limit_seconds: Optional[int] = Field(
None,
description="Runtime limit in seconds",
)


@dataclass
class SoftDeletedEntitiesReport(SourceReport):
Expand Down Expand Up @@ -152,52 +157,60 @@ def delete_soft_deleted_entity(self, urn: str) -> None:

def cleanup_soft_deleted_entities(self) -> None:
assert self.ctx.graph
start_time = time.time()

deleted_count_retention = 0
while (
self.report.num_soft_deleted_entity_removed
<= self.config.limit_entities_delete
):
urns = list(
self.ctx.graph.get_urns_by_filter(
entity_types=self.config.entity_types,
platform=self.config.platform,
env=self.config.env,
query=self.config.query,
status=RemovedStatusFilter.ONLY_SOFT_DELETED,
batch_size=self.config.batch_size,
)
)
if len(urns) == 0:
logger.info("No more urns found")
return

futures = {}
with ThreadPoolExecutor(max_workers=self.config.max_workers) as executor:
for urn in urns:
future = executor.submit(self.delete_soft_deleted_entity, urn)
futures[future] = urn

if not futures:
return
for future in as_completed(futures):
if future.exception():
logger.error(
f"Failed to delete entity {futures[future]}: {future.exception()}"
)
self.report.failure(
f"Failed to delete entity {futures[future]}",
exc=future.exception(),
)
deleted_count_retention += 1
urns = self.ctx.graph.get_urns_by_filter(
entity_types=self.config.entity_types,
platform=self.config.platform,
env=self.config.env,
query=self.config.query,
status=RemovedStatusFilter.ONLY_SOFT_DELETED,
batch_size=self.config.batch_size,
)

if deleted_count_retention % self.config.batch_size == 0:
logger.info(
f"Processed {deleted_count_retention} soft deleted entity and deleted {self.report.num_soft_deleted_entity_removed} entities so far"
futures = {}
with ThreadPoolExecutor(max_workers=self.config.max_workers) as executor:
num_urns_submitted = 0
for urn in urns:
num_urns_submitted += 1
if (
self.config.limit_entities_delete
and num_urns_submitted > self.config.limit_entities_delete
):
logger.info(
f"Limit of {self.config.limit_entities_delete} entities reached. Stopping"
)
break
if time.time() - start_time > self.config.runtime_limit_seconds:
logger.info(
f"Runtime limit of {self.config.runtime_limit_seconds} seconds reached. Stopping"
)
break

future = executor.submit(self.delete_soft_deleted_entity, urn)
futures[future] = urn

if not futures:
return
for future in as_completed(futures):
if future.exception():
logger.error(
f"Failed to delete entity {futures[future]}: {future.exception()}"
)
self.report.failure(
f"Failed to delete entity {futures[future]}",
exc=future.exception(),
)
deleted_count_retention += 1

if deleted_count_retention % self.config.batch_size == 0:
logger.info(
f"Processed {deleted_count_retention} soft deleted entity and deleted {self.report.num_soft_deleted_entity_removed} entities so far"
)

if self.config.delay:
logger.debug(
f"Sleeping for {self.config.delay} seconds before getting next batch"
)

if self.config.delay:
logger.debug(
f"Sleeping for {self.config.delay} seconds before getting next batch"
)
time.sleep(self.config.delay)
time.sleep(self.config.delay)

0 comments on commit 96f2c33

Please sign in to comment.