Skip to content

Commit

Permalink
Add ability to limit the consumption of async tasks based on a maximu…
Browse files Browse the repository at this point in the history
…m number. (#79)
  • Loading branch information
JBorrow authored Jul 23, 2024
1 parent 0bf76e5 commit e88657d
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 21 deletions.
20 changes: 0 additions & 20 deletions hera_librarian/queues.py

This file was deleted.

14 changes: 13 additions & 1 deletion librarian_background/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from pathlib import Path
from typing import TYPE_CHECKING, Callable

from sqlalchemy import select
from sqlalchemy import func, select

from hera_librarian.exceptions import LibrarianError
from hera_librarian.transfer import TransferStatus
Expand Down Expand Up @@ -219,6 +219,18 @@ def consume_queue_item(session_maker: Callable[[], "Session"]) -> bool:
# Nothing to do!
return False

# Now, check we don't have too much going on.
stmt = (
select(func.count(SendQueue.id))
.filter_by(consumed=True)
.filter_by(completed=False)
)
in_flight = session.execute(stmt).scalar()

if in_flight > server_settings.max_async_inflight_transfers:
# Too much to do!
return False

# Otherwise, we are free to consume this item.
transfer_list = [
(Path(x.source_path), Path(x.dest_path)) for x in queue_item.transfers
Expand Down
3 changes: 3 additions & 0 deletions librarian_server/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ class ServerSettings(BaseSettings):

# Transfer constraints
max_async_send_retries: int = 3
# The maximum number of in-flight asynchronous transfers to
# a specific destination.
max_async_inflight_transfers: int = 64

# Slack integration; by default disable this. You will need a slack
# webhook url, and by default we raise all log_to_database alerts to slack too.
Expand Down

0 comments on commit e88657d

Please sign in to comment.