Skip to content

Commit

Permalink
Add ability to shut down external transfers using client/cli scripts (#…
Browse files Browse the repository at this point in the history
…105)

* Add client ability to shut down transfers without configuration edits

* Actually add the sub-parser to CLI

* Add docs

* Apply suggestions from code review

Co-authored-by: Ioannis Paraskevakos <[email protected]>

* Add mutually exclusive group

---------

Co-authored-by: Ioannis Paraskevakos <[email protected]>
  • Loading branch information
JBorrow and iparask authored Nov 14, 2024
1 parent 140fc77 commit 3f7fec0
Show file tree
Hide file tree
Showing 13 changed files with 364 additions and 3 deletions.
29 changes: 29 additions & 0 deletions alembic/versions/1def8c988372_add_librarian_transfer_toggling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright 2017 the HERA Collaboration
# Licensed under the 2-clause BSD License.

"""Add librarian transfer toggling
Revision ID: 1def8c988372
Revises: 42f29c26ab0f
Create Date: 2024-11-11 15:09:12.578181
"""
import sqlalchemy as sa

from alembic import op

revision = "1def8c988372"
down_revision = "42f29c26ab0f"
branch_labels = None
depends_on = None


def upgrade():
with op.batch_alter_table("librarians") as batch_op:
batch_op.add_column(
sa.Column("transfers_enabled", sa.Boolean(), nullable=False, default=True)
)


def downgrade():
op.drop_column("librarians", "transfers_enabled")
60 changes: 60 additions & 0 deletions docs/source/Globus.rst
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,63 @@ fails because of network interruption. Here, the outgoing transfer
hypervisor will find it (as it will still be STAGED on A), call
up B, find that the instance exists, and register a remote instance
on A. The transfer is then marked as complete.


Enabling and Disabling Transfers
--------------------------------

There may be points in time when you want to shut down transfers to
specific machines. Whilst this can always be performed by editing the
configuration files and restarting the server, that is not always optimal.

Instead, you can use the following command-line script inside a container
(i.e. with direct access to the database):

.. code-block::
librarian-change-transfer-status [-h] --librarian LIBRARIAN [--enable] [--disable]
Change the status of an external librarian, to enable or disable transfers.
options:
-h, --help show this help message and exit
--librarian LIBRARIAN
Name of the librarian to change the status of.
--enable Enable the librarian.
--disable Disable the librarian.
Or using the client:

.. code-block::
librarian get-librarian-list [-h] [--ping] CONNECTION-NAME
Get a list of librarians known to the librarian.
positional arguments:
CONNECTION-NAME Which Librarian to talk to; as in ~/.hl_client.cfg.
options:
-h, --help show this help message and exit
--ping Ping the librarians to check they are up.
to find information about the connected librarians, and to set their properties:

.. code-block::
librarian set-librarian-transfer [-h] [--name NAME] [--enabled] [--disabled] CONNECTION-NAME
Set the transfer state of a librarian.
positional arguments:
CONNECTION-NAME Which Librarian to talk to; as in ~/.hl_client.cfg.
options:
-h, --help show this help message and exit
--name NAME The name of the librarian to set the transfer state of.
--enabled Set the librarian to enabled for transfers.
--disabled Set the librarian to disabled for transfers.
These client tools require an administrator account to use.
59 changes: 59 additions & 0 deletions hera_librarian/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,7 @@ def get_librarian_list(args):
print(
f"\033[1m{librarian.name}\033[0m ({librarian.url}:{librarian.port}) "
f"- {'Available' if librarian.available else 'Disabled' if librarian.available is not None else 'Unknown'}"
f"- {'Enabled' if librarian.enabled else 'Disabled'}"
)

return 0
Expand Down Expand Up @@ -771,6 +772,36 @@ def validate_file(args):
return 0


def set_librarian_transfer(args):
"""
Set the transfer status of a librarian.
"""

client = get_client(args.conn_name, admin=True)

if args.enabled and args.disabled:
die("Cannot set both enabled and disabled.")

if args.enabled:
transfer_status = True
elif args.disabled:
transfer_status = False
else:
die("You must choose to enable or disable the transfers.")

try:
client.set_librarian_transfer(
librarian_name=args.librarian_name,
transfer_status=transfer_status,
)
except LibrarianError as e:
die(f"Error setting librarian transfer status: {e}")
except LibrarianHTTPError as e:
die(f"Unexpected error communicating with the librarian server: {e.reason}")

return 0


# make the base parser
def generate_parser():
"""Make a librarian ArgumentParser.
Expand Down Expand Up @@ -821,6 +852,7 @@ def generate_parser():
config_get_librarian_list_subparser(sub_parsers)
config_add_librarian_subparser(sub_parsers)
config_remove_librarian_subparser(sub_parsers)
config_set_librarian_transfer_subparser(sub_parsers)
config_create_user_subparser(sub_parsers)
config_delete_user_subparser(sub_parsers)
config_validate_file_subparser(sub_parsers)
Expand Down Expand Up @@ -1705,6 +1737,33 @@ def config_validate_file_subparser(sub_parsers):
return


def config_set_librarian_transfer_subparser(sub_parsers):
doc = """Set the transfer state of a librarian.
"""
hlp = "Set the transfer state of a librarian"

# add sub parser
sp = sub_parsers.add_parser("set-librarian-transfer", description=doc, help=hlp)
sp.add_argument("conn_name", metavar="CONNECTION-NAME", help=_conn_name_help)
sp.add_argument(
"--name", help="The name of the librarian to set the transfer state of."
)

grp = sp.add_mutually_exclusive_group()
grp.add_argument(
"--enabled",
action="store_true",
help="Set the librarian to enabled for transfers.",
)
grp.add_argument(
"--disabled",
action="store_true",
help="Set the librarian to disabled for transfers.",
)
sp.set_defaults(func=set_librarian_transfer)


def main():
# make a parser and run the specified command
parser = generate_parser()
Expand Down
40 changes: 40 additions & 0 deletions hera_librarian/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@
from .models.admin import (
AdminAddLibrarianRequest,
AdminAddLibrarianResponse,
AdminChangeLibrarianTransferStatusRequest,
AdminCreateFileRequest,
AdminCreateFileResponse,
AdminDeleteInstanceRequest,
AdminDeleteInstanceResponse,
AdminLibrarianTransferStatusResponse,
AdminListLibrariansRequest,
AdminListLibrariansResponse,
AdminRemoveLibrarianRequest,
Expand Down Expand Up @@ -1319,3 +1321,41 @@ def remove_librarian(
raise e

return response.success, response.number_of_transfers_removed

def set_librarian_status(
self,
librarian_name: str,
transfers_enabled: bool,
) -> bool:
"""
Set the status of transfers to the librarian.
Parameters
----------
librarian_name : str
The name of the librarian to set the status of.
transfers_enabled : bool
Whether transfers to this librarian should be enabled.
Returns
-------
bool
The new status.
"""

try:
response = self.post(
endpoint="admin/librarians/transfer_status/set",
request=AdminChangeLibrarianTransferStatusRequest(
librarian_name=librarian_name,
transfers_enabled=transfers_enabled,
),
response=AdminLibrarianTransferStatusResponse,
)
except LibrarianHTTPError as e:
if e.status_code == 400 and "Librarian" in e.reason:
raise LibrarianError(e.reason)
else:
raise e

return response.transfers_enabled
28 changes: 28 additions & 0 deletions hera_librarian/models/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ class LibrarianListResponseItem(BaseModel):
available: bool | None
"Whether the librarian is available or not, only if ping is true."

enabled: bool
"Whether transfers the librarian is enabled or not."


class AdminListLibrariansResponse(BaseModel):
librarians: list[LibrarianListResponseItem]
Expand Down Expand Up @@ -238,3 +241,28 @@ class AdminDeleteInstanceResponse(BaseModel):

"The instance name of the instance that was changed."
instance_id: int


class AdminChangeLibrarianTransferStatusRequest(BaseModel):
"""
A request to change the transfer status of a librarian, either
to enable or disable outbound transfers.
"""

"The name of the librarian to change the transfer status of."
librarian_name: str

"Whether to enable or disable outbound transfers."
transfers_enabled: bool


class AdminLibrarianTransferStatusResponse(BaseModel):
"""
A response to a user change request.
"""

"The name of the librarian that was changed."
librarian_name: str

"Whether the librarian has outbound transfers enabled."
transfers_enabled: bool
27 changes: 26 additions & 1 deletion librarian_background/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from hera_librarian.exceptions import LibrarianError
from hera_librarian.transfer import TransferStatus
from librarian_server.database import get_session
from librarian_server.orm.librarian import Librarian
from librarian_server.orm.sendqueue import SendQueue
from librarian_server.settings import server_settings

Expand Down Expand Up @@ -145,10 +146,22 @@ def check_on_consumed(
# We are out of time.
return False

# Check if the librarian is enabled.
librarian = (
session.query(Librarian)
.filter_by(name=queue_item.destination)
.one_or_none()
)

if librarian is None or not librarian.transfers_enabled:
# We can't do anything with this librarian, but there may be other
# librarians that are enabled.
continue

logger.info(
"Handling queue item {q.id} with {q.retries} retries", q=queue_item
)

current_status = queue_item.async_transfer_manager.transfer_status(
settings=server_settings
)
Expand Down Expand Up @@ -238,6 +251,18 @@ def consume_queue_item(session_maker: Callable[[], "Session"]) -> bool:
# Nothing to do!
return False

# Check if the librarian is enabled.
librarian = (
session.query(Librarian)
.filter_by(name=queue_item.destination)
.one_or_none()
)

if librarian is None or not librarian.transfers_enabled:
# We can't do anything with this librarian, but there may be other
# librarians that are enabled.
return True

# Now, check we don't have too much going on.
query_start = time.perf_counter()
stmt = (
Expand Down
6 changes: 6 additions & 0 deletions librarian_background/send_clone.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,12 @@ def core(self, session: Session):
)
return CancelJob

if not librarian.transfers_enabled:
logger.warning(
f"Transfers to librarian {librarian.name} are temporarily disabled, skipping."
)
return

client: "LibrarianClient" = librarian.client()

try:
Expand Down
Loading

0 comments on commit 3f7fec0

Please sign in to comment.