From 3f7fec08824179377a8967acd88ce6b8d684eb06 Mon Sep 17 00:00:00 2001 From: Josh Borrow Date: Thu, 14 Nov 2024 10:38:21 -0500 Subject: [PATCH] Add ability to shut down external transfers using client/cli scripts (#105) * Add client ability to shut down transfers without configuration edits * Actually add the sub-parser to CLI * Add docs * Apply suggestions from code review Co-authored-by: Ioannis Paraskevakos * Add mutually exclusive group --------- Co-authored-by: Ioannis Paraskevakos --- ...c988372_add_librarian_transfer_toggling.py | 29 +++++++++ docs/source/Globus.rst | 60 +++++++++++++++++++ hera_librarian/cli.py | 59 ++++++++++++++++++ hera_librarian/client.py | 40 +++++++++++++ hera_librarian/models/admin.py | 28 +++++++++ librarian_background/queues.py | 27 ++++++++- librarian_background/send_clone.py | 6 ++ librarian_server/api/admin.py | 39 ++++++++++++ librarian_server/orm/librarian.py | 1 + librarian_server/settings.py | 4 +- .../librarian_change_transfer_status.py | 58 ++++++++++++++++++ pyproject.toml | 1 + tests/server_unit_test/test_admin.py | 15 +++++ 13 files changed, 364 insertions(+), 3 deletions(-) create mode 100644 alembic/versions/1def8c988372_add_librarian_transfer_toggling.py create mode 100644 librarian_server_scripts/librarian_change_transfer_status.py diff --git a/alembic/versions/1def8c988372_add_librarian_transfer_toggling.py b/alembic/versions/1def8c988372_add_librarian_transfer_toggling.py new file mode 100644 index 0000000..c993cc0 --- /dev/null +++ b/alembic/versions/1def8c988372_add_librarian_transfer_toggling.py @@ -0,0 +1,29 @@ +# Copyright 2017 the HERA Collaboration +# Licensed under the 2-clause BSD License. + +"""Add librarian transfer toggling + +Revision ID: 1def8c988372 +Revises: 42f29c26ab0f +Create Date: 2024-11-11 15:09:12.578181 + +""" +import sqlalchemy as sa + +from alembic import op + +revision = "1def8c988372" +down_revision = "42f29c26ab0f" +branch_labels = None +depends_on = None + + +def upgrade(): + with op.batch_alter_table("librarians") as batch_op: + batch_op.add_column( + sa.Column("transfers_enabled", sa.Boolean(), nullable=False, default=True) + ) + + +def downgrade(): + op.drop_column("librarians", "transfers_enabled") diff --git a/docs/source/Globus.rst b/docs/source/Globus.rst index b308efb..95396f0 100644 --- a/docs/source/Globus.rst +++ b/docs/source/Globus.rst @@ -123,3 +123,63 @@ fails because of network interruption. Here, the outgoing transfer hypervisor will find it (as it will still be STAGED on A), call up B, find that the instance exists, and register a remote instance on A. The transfer is then marked as complete. + + +Enabling and Disabling Transfers +-------------------------------- + +There may be points in time when you want to shut down transfers to +specific machines. Whilst this can always be performed by editing the +configuration files and restarting the server, that is not always optimal. + +Instead, you can use the following command-line script inside a container +(i.e. with direct access to the database): + +.. code-block:: + + librarian-change-transfer-status [-h] --librarian LIBRARIAN [--enable] [--disable] + + Change the status of an external librarian, to enable or disable transfers. + + options: + -h, --help show this help message and exit + --librarian LIBRARIAN + Name of the librarian to change the status of. + --enable Enable the librarian. + --disable Disable the librarian. + +Or using the client: + +.. code-block:: + + librarian get-librarian-list [-h] [--ping] CONNECTION-NAME + + Get a list of librarians known to the librarian. + + positional arguments: + CONNECTION-NAME Which Librarian to talk to; as in ~/.hl_client.cfg. + + options: + -h, --help show this help message and exit + --ping Ping the librarians to check they are up. + + +to find information about the connected librarians, and to set their properties: + +.. code-block:: + + librarian set-librarian-transfer [-h] [--name NAME] [--enabled] [--disabled] CONNECTION-NAME + + Set the transfer state of a librarian. + + positional arguments: + CONNECTION-NAME Which Librarian to talk to; as in ~/.hl_client.cfg. + + options: + -h, --help show this help message and exit + --name NAME The name of the librarian to set the transfer state of. + --enabled Set the librarian to enabled for transfers. + --disabled Set the librarian to disabled for transfers. + + +These client tools require an administrator account to use. \ No newline at end of file diff --git a/hera_librarian/cli.py b/hera_librarian/cli.py index d696b8b..4590f0d 100644 --- a/hera_librarian/cli.py +++ b/hera_librarian/cli.py @@ -641,6 +641,7 @@ def get_librarian_list(args): print( f"\033[1m{librarian.name}\033[0m ({librarian.url}:{librarian.port}) " f"- {'Available' if librarian.available else 'Disabled' if librarian.available is not None else 'Unknown'}" + f"- {'Enabled' if librarian.enabled else 'Disabled'}" ) return 0 @@ -771,6 +772,36 @@ def validate_file(args): return 0 +def set_librarian_transfer(args): + """ + Set the transfer status of a librarian. + """ + + client = get_client(args.conn_name, admin=True) + + if args.enabled and args.disabled: + die("Cannot set both enabled and disabled.") + + if args.enabled: + transfer_status = True + elif args.disabled: + transfer_status = False + else: + die("You must choose to enable or disable the transfers.") + + try: + client.set_librarian_transfer( + librarian_name=args.librarian_name, + transfer_status=transfer_status, + ) + except LibrarianError as e: + die(f"Error setting librarian transfer status: {e}") + except LibrarianHTTPError as e: + die(f"Unexpected error communicating with the librarian server: {e.reason}") + + return 0 + + # make the base parser def generate_parser(): """Make a librarian ArgumentParser. @@ -821,6 +852,7 @@ def generate_parser(): config_get_librarian_list_subparser(sub_parsers) config_add_librarian_subparser(sub_parsers) config_remove_librarian_subparser(sub_parsers) + config_set_librarian_transfer_subparser(sub_parsers) config_create_user_subparser(sub_parsers) config_delete_user_subparser(sub_parsers) config_validate_file_subparser(sub_parsers) @@ -1705,6 +1737,33 @@ def config_validate_file_subparser(sub_parsers): return +def config_set_librarian_transfer_subparser(sub_parsers): + doc = """Set the transfer state of a librarian. + + """ + hlp = "Set the transfer state of a librarian" + + # add sub parser + sp = sub_parsers.add_parser("set-librarian-transfer", description=doc, help=hlp) + sp.add_argument("conn_name", metavar="CONNECTION-NAME", help=_conn_name_help) + sp.add_argument( + "--name", help="The name of the librarian to set the transfer state of." + ) + + grp = sp.add_mutually_exclusive_group() + grp.add_argument( + "--enabled", + action="store_true", + help="Set the librarian to enabled for transfers.", + ) + grp.add_argument( + "--disabled", + action="store_true", + help="Set the librarian to disabled for transfers.", + ) + sp.set_defaults(func=set_librarian_transfer) + + def main(): # make a parser and run the specified command parser = generate_parser() diff --git a/hera_librarian/client.py b/hera_librarian/client.py index b6ceafa..35ba864 100644 --- a/hera_librarian/client.py +++ b/hera_librarian/client.py @@ -29,10 +29,12 @@ from .models.admin import ( AdminAddLibrarianRequest, AdminAddLibrarianResponse, + AdminChangeLibrarianTransferStatusRequest, AdminCreateFileRequest, AdminCreateFileResponse, AdminDeleteInstanceRequest, AdminDeleteInstanceResponse, + AdminLibrarianTransferStatusResponse, AdminListLibrariansRequest, AdminListLibrariansResponse, AdminRemoveLibrarianRequest, @@ -1319,3 +1321,41 @@ def remove_librarian( raise e return response.success, response.number_of_transfers_removed + + def set_librarian_status( + self, + librarian_name: str, + transfers_enabled: bool, + ) -> bool: + """ + Set the status of transfers to the librarian. + + Parameters + ---------- + librarian_name : str + The name of the librarian to set the status of. + transfers_enabled : bool + Whether transfers to this librarian should be enabled. + + Returns + ------- + bool + The new status. + """ + + try: + response = self.post( + endpoint="admin/librarians/transfer_status/set", + request=AdminChangeLibrarianTransferStatusRequest( + librarian_name=librarian_name, + transfers_enabled=transfers_enabled, + ), + response=AdminLibrarianTransferStatusResponse, + ) + except LibrarianHTTPError as e: + if e.status_code == 400 and "Librarian" in e.reason: + raise LibrarianError(e.reason) + else: + raise e + + return response.transfers_enabled diff --git a/hera_librarian/models/admin.py b/hera_librarian/models/admin.py index 817946a..066ace8 100644 --- a/hera_librarian/models/admin.py +++ b/hera_librarian/models/admin.py @@ -167,6 +167,9 @@ class LibrarianListResponseItem(BaseModel): available: bool | None "Whether the librarian is available or not, only if ping is true." + enabled: bool + "Whether transfers the librarian is enabled or not." + class AdminListLibrariansResponse(BaseModel): librarians: list[LibrarianListResponseItem] @@ -238,3 +241,28 @@ class AdminDeleteInstanceResponse(BaseModel): "The instance name of the instance that was changed." instance_id: int + + +class AdminChangeLibrarianTransferStatusRequest(BaseModel): + """ + A request to change the transfer status of a librarian, either + to enable or disable outbound transfers. + """ + + "The name of the librarian to change the transfer status of." + librarian_name: str + + "Whether to enable or disable outbound transfers." + transfers_enabled: bool + + +class AdminLibrarianTransferStatusResponse(BaseModel): + """ + A response to a user change request. + """ + + "The name of the librarian that was changed." + librarian_name: str + + "Whether the librarian has outbound transfers enabled." + transfers_enabled: bool diff --git a/librarian_background/queues.py b/librarian_background/queues.py index 22f06d9..9b34cfc 100644 --- a/librarian_background/queues.py +++ b/librarian_background/queues.py @@ -20,6 +20,7 @@ from hera_librarian.exceptions import LibrarianError from hera_librarian.transfer import TransferStatus from librarian_server.database import get_session +from librarian_server.orm.librarian import Librarian from librarian_server.orm.sendqueue import SendQueue from librarian_server.settings import server_settings @@ -145,10 +146,22 @@ def check_on_consumed( # We are out of time. return False + # Check if the librarian is enabled. + librarian = ( + session.query(Librarian) + .filter_by(name=queue_item.destination) + .one_or_none() + ) + + if librarian is None or not librarian.transfers_enabled: + # We can't do anything with this librarian, but there may be other + # librarians that are enabled. + continue + logger.info( "Handling queue item {q.id} with {q.retries} retries", q=queue_item ) - + current_status = queue_item.async_transfer_manager.transfer_status( settings=server_settings ) @@ -238,6 +251,18 @@ def consume_queue_item(session_maker: Callable[[], "Session"]) -> bool: # Nothing to do! return False + # Check if the librarian is enabled. + librarian = ( + session.query(Librarian) + .filter_by(name=queue_item.destination) + .one_or_none() + ) + + if librarian is None or not librarian.transfers_enabled: + # We can't do anything with this librarian, but there may be other + # librarians that are enabled. + return True + # Now, check we don't have too much going on. query_start = time.perf_counter() stmt = ( diff --git a/librarian_background/send_clone.py b/librarian_background/send_clone.py index 8cdbac0..08ecbf6 100644 --- a/librarian_background/send_clone.py +++ b/librarian_background/send_clone.py @@ -488,6 +488,12 @@ def core(self, session: Session): ) return CancelJob + if not librarian.transfers_enabled: + logger.warning( + f"Transfers to librarian {librarian.name} are temporarily disabled, skipping." + ) + return + client: "LibrarianClient" = librarian.client() try: diff --git a/librarian_server/api/admin.py b/librarian_server/api/admin.py index 603e46f..d22a38c 100644 --- a/librarian_server/api/admin.py +++ b/librarian_server/api/admin.py @@ -16,10 +16,12 @@ from hera_librarian.models.admin import ( AdminAddLibrarianRequest, AdminAddLibrarianResponse, + AdminChangeLibrarianTransferStatusRequest, AdminCreateFileRequest, AdminCreateFileResponse, AdminDeleteInstanceRequest, AdminDeleteInstanceResponse, + AdminLibrarianTransferStatusResponse, AdminListLibrariansRequest, AdminListLibrariansResponse, AdminRemoveLibrarianRequest, @@ -388,6 +390,7 @@ def list_librarians( url=librarian.url, port=librarian.port, available=ping if request.ping else None, + enabled=librarian.transfers_enabled, ) ) @@ -581,3 +584,39 @@ def delete_local_instance( store.store_manager.delete(Path(instance.path)) return AdminDeleteInstanceResponse(success=True, instance_id=request.instance_id) + + +@router.post( + path="/librarians/transfer_status/change", + response_model=AdminLibrarianTransferStatusResponse, +) +def change_librarian_transfer_status( + request: AdminChangeLibrarianTransferStatusRequest, + user: AdminUserDependency, + response: Response, + session: Session = Depends(yield_session), +) -> AdminLibrarianTransferStatusResponse: + """ + Change the transfer status of a librarian. This will enable or disable + outbound transfers to the librarian. + """ + + librarian = ( + session.query(Librarian).filter_by(name=request.librarian_name).one_or_none() + ) + + if librarian is None: + response.status_code = status.HTTP_400_BAD_REQUEST + return AdminRequestFailedResponse( + reason=f"Librarian {request.librarian_name} does not exist", + suggested_remedy="Please verify that the requested librarian exists", + ) + + librarian.transfers_enabled = request.transfers_enabled + + session.commit() + + return AdminLibrarianTransferStatusResponse( + librarian_name=librarian.name, + transfers_enabled=librarian.transfers_enabled, + ) diff --git a/librarian_server/orm/librarian.py b/librarian_server/orm/librarian.py index 23273dd..0667839 100644 --- a/librarian_server/orm/librarian.py +++ b/librarian_server/orm/librarian.py @@ -36,6 +36,7 @@ class Librarian(db.Base): "The port of this librarian." authenticator = db.Column(db.String(256), nullable=False) "The authenticator so we can connect this librarian. This is encrypted." + transfers_enabled = db.Column(db.Boolean, nullable=False, default=True) last_seen = db.Column(db.DateTime, nullable=False) "The last time we connected to and verified this librarian exists." diff --git a/librarian_server/settings.py b/librarian_server/settings.py index d31848f..f70df34 100644 --- a/librarian_server/settings.py +++ b/librarian_server/settings.py @@ -136,10 +136,10 @@ class ServerSettings(BaseSettings): # Host and port to bind to. host: str = "0.0.0.0" - port: int + port: int = 8080 # Stores that the librarian should add or migrate - add_stores: list[StoreSettings] + add_stores: list[StoreSettings] = [] # Database migration settings alembic_config_path: str = "." diff --git a/librarian_server_scripts/librarian_change_transfer_status.py b/librarian_server_scripts/librarian_change_transfer_status.py new file mode 100644 index 0000000..3e7aa93 --- /dev/null +++ b/librarian_server_scripts/librarian_change_transfer_status.py @@ -0,0 +1,58 @@ +""" +A command-line script, for use in containers, to change the behaviour +of transfers to external librarians. +""" + +import argparse as ap + +parser = ap.ArgumentParser( + description=( + "Change the status of an external librarian, to enable or disable transfers." + ) +) + +parser.add_argument( + "--librarian", + help="Name of the librarian to change the status of.", + type=str, + required=True, +) + +parser.add_argument( + "--enable", + help="Enable the librarian.", + action="store_true", +) + +parser.add_argument( + "--disable", + help="Disable the librarian.", + action="store_true", +) + + +def main(): + args = parser.parse_args() + + if args.enable and args.disable: + raise ValueError("Cannot enable and disable at the same time.") + + if not args.enable and not args.disable: + raise ValueError("Must enable or disable.") + + from librarian_server.database import get_session + from librarian_server.orm import Librarian + + with get_session() as session: + librarian = ( + session.query(Librarian).filter_by(name=args.librarian).one_or_none() + ) + if librarian is None: + raise ValueError(f"Librarian {args.librarian} does not exist.") + + if args.enable: + librarian.transfers_enabled = True + elif args.disable: + librarian.transfers_enabled = False + + session.commit() diff --git a/pyproject.toml b/pyproject.toml index 9321158..24ec063 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -73,6 +73,7 @@ librarian-server-setup = "librarian_server_scripts.librarian_server_setup:main" librarian = "hera_librarian.cli:main" librarian-server-rebuild-database = "librarian_server_scripts.librarian_server_rebuild_database:main" librarian-server-repair-database = "librarian_server_scripts.librarian_server_repair_database:main" +librarian-change-transfer-status = "librarian_server_scripts.librarian_change_transfer_status:main" [project.urls] Homepage = "https://github.com/simonsobs/librarian" diff --git a/tests/server_unit_test/test_admin.py b/tests/server_unit_test/test_admin.py index 7e262ea..dbe90c7 100644 --- a/tests/server_unit_test/test_admin.py +++ b/tests/server_unit_test/test_admin.py @@ -8,6 +8,7 @@ from hera_librarian.models.admin import ( AdminAddLibrarianRequest, AdminAddLibrarianResponse, + AdminChangeLibrarianTransferStatusRequest, AdminCreateFileRequest, AdminCreateFileResponse, AdminListLibrariansRequest, @@ -352,6 +353,19 @@ def test_add_librarians(test_client, test_server_with_valid_file, test_orm): assert response.success == False assert response.already_exists == True + # Now disable this guy! + disable_request = AdminChangeLibrarianTransferStatusRequest( + librarian_name="our_closest_friend", + transfers_enabled=False, + ) + + response = test_client.post_with_auth( + "/api/v2/admin/librarians/transfer_status/change", + content=disable_request.model_dump_json(), + ) + + assert response.status_code == 200 + # Now we can try the search endpoint. search_request = AdminListLibrariansRequest( ping=False, @@ -371,6 +385,7 @@ def test_add_librarians(test_client, test_server_with_valid_file, test_orm): assert librarian.url == "http://localhost" assert librarian.port == 80 assert librarian.available == None + assert librarian.enabled == False found = True assert found