From 8e1235e27512dd3890df81e6c9f60013d591367d Mon Sep 17 00:00:00 2001 From: Josh Borrow Date: Tue, 12 Nov 2024 08:41:09 -0500 Subject: [PATCH] Add client ability to shut down transfers without configuration edits --- ...c988372_add_librarian_transfer_toggling.py | 29 ++++++++++ hera_librarian/cli.py | 56 ++++++++++++++++++ hera_librarian/client.py | 40 +++++++++++++ hera_librarian/models/admin.py | 28 +++++++++ librarian_background/queues.py | 25 ++++++++ librarian_background/send_clone.py | 6 ++ librarian_server/api/admin.py | 39 +++++++++++++ librarian_server/orm/librarian.py | 1 + librarian_server/settings.py | 4 +- .../librarian_change_transfer_status.py | 58 +++++++++++++++++++ pyproject.toml | 1 + tests/server_unit_test/test_admin.py | 15 +++++ 12 files changed, 300 insertions(+), 2 deletions(-) create mode 100644 alembic/versions/1def8c988372_add_librarian_transfer_toggling.py create mode 100644 librarian_server_scripts/librarian_change_transfer_status.py diff --git a/alembic/versions/1def8c988372_add_librarian_transfer_toggling.py b/alembic/versions/1def8c988372_add_librarian_transfer_toggling.py new file mode 100644 index 0000000..c993cc0 --- /dev/null +++ b/alembic/versions/1def8c988372_add_librarian_transfer_toggling.py @@ -0,0 +1,29 @@ +# Copyright 2017 the HERA Collaboration +# Licensed under the 2-clause BSD License. + +"""Add librarian transfer toggling + +Revision ID: 1def8c988372 +Revises: 42f29c26ab0f +Create Date: 2024-11-11 15:09:12.578181 + +""" +import sqlalchemy as sa + +from alembic import op + +revision = "1def8c988372" +down_revision = "42f29c26ab0f" +branch_labels = None +depends_on = None + + +def upgrade(): + with op.batch_alter_table("librarians") as batch_op: + batch_op.add_column( + sa.Column("transfers_enabled", sa.Boolean(), nullable=False, default=True) + ) + + +def downgrade(): + op.drop_column("librarians", "transfers_enabled") diff --git a/hera_librarian/cli.py b/hera_librarian/cli.py index d696b8b..25aac1d 100644 --- a/hera_librarian/cli.py +++ b/hera_librarian/cli.py @@ -641,6 +641,7 @@ def get_librarian_list(args): print( f"\033[1m{librarian.name}\033[0m ({librarian.url}:{librarian.port}) " f"- {'Available' if librarian.available else 'Disabled' if librarian.available is not None else 'Unknown'}" + f"- {'Enabled' if librarian.enabled else 'Disabled'}" ) return 0 @@ -771,6 +772,36 @@ def validate_file(args): return 0 +def set_librarian_transfer(args): + """ + Set the transfer status of a librarian. + """ + + client = get_client(args.conn_name, admin=True) + + if args.enabled and args.disabled: + die("Cannot set both enabled and disabled.") + + if args.enabled: + transfer_status = True + elif args.disabled: + transfer_status = False + else: + die("You must choose to enable or disable the transfers.") + + try: + client.set_librarian_transfer( + librarian_name=args.librarian_name, + transfer_status=transfer_status, + ) + except LibrarianError as e: + die(f"Error setting librarian transfer status: {e}") + except LibrarianHTTPError as e: + die(f"Unexpected error communicating with the librarian server: {e.reason}") + + return 0 + + # make the base parser def generate_parser(): """Make a librarian ArgumentParser. @@ -1705,6 +1736,31 @@ def config_validate_file_subparser(sub_parsers): return +def config_set_librarian_transfer_subparser(sub_parsers): + doc = """Set the transfer state of a librarian. + + """ + hlp = "Set the transfer state of a librarian" + + # add sub parser + sp = sub_parsers.add_parser("set-librarian-transfer", description=doc, help=hlp) + sp.add_argument("conn_name", metavar="CONNECTION-NAME", help=_conn_name_help) + sp.add_argument( + "--name", help="The name of the librarian to set the transfer state of." + ) + sp.add_argument( + "--enabled", + action="store_true", + help="Set the librarian to enabled for transfers.", + ) + sp.add_argument( + "--disabled", + action="store_true", + help="Set the librarian to disabled for transfers.", + ) + sp.set_defaults(func=set_librarian_transfer) + + def main(): # make a parser and run the specified command parser = generate_parser() diff --git a/hera_librarian/client.py b/hera_librarian/client.py index b6ceafa..35ba864 100644 --- a/hera_librarian/client.py +++ b/hera_librarian/client.py @@ -29,10 +29,12 @@ from .models.admin import ( AdminAddLibrarianRequest, AdminAddLibrarianResponse, + AdminChangeLibrarianTransferStatusRequest, AdminCreateFileRequest, AdminCreateFileResponse, AdminDeleteInstanceRequest, AdminDeleteInstanceResponse, + AdminLibrarianTransferStatusResponse, AdminListLibrariansRequest, AdminListLibrariansResponse, AdminRemoveLibrarianRequest, @@ -1319,3 +1321,41 @@ def remove_librarian( raise e return response.success, response.number_of_transfers_removed + + def set_librarian_status( + self, + librarian_name: str, + transfers_enabled: bool, + ) -> bool: + """ + Set the status of transfers to the librarian. + + Parameters + ---------- + librarian_name : str + The name of the librarian to set the status of. + transfers_enabled : bool + Whether transfers to this librarian should be enabled. + + Returns + ------- + bool + The new status. + """ + + try: + response = self.post( + endpoint="admin/librarians/transfer_status/set", + request=AdminChangeLibrarianTransferStatusRequest( + librarian_name=librarian_name, + transfers_enabled=transfers_enabled, + ), + response=AdminLibrarianTransferStatusResponse, + ) + except LibrarianHTTPError as e: + if e.status_code == 400 and "Librarian" in e.reason: + raise LibrarianError(e.reason) + else: + raise e + + return response.transfers_enabled diff --git a/hera_librarian/models/admin.py b/hera_librarian/models/admin.py index 817946a..066ace8 100644 --- a/hera_librarian/models/admin.py +++ b/hera_librarian/models/admin.py @@ -167,6 +167,9 @@ class LibrarianListResponseItem(BaseModel): available: bool | None "Whether the librarian is available or not, only if ping is true." + enabled: bool + "Whether transfers the librarian is enabled or not." + class AdminListLibrariansResponse(BaseModel): librarians: list[LibrarianListResponseItem] @@ -238,3 +241,28 @@ class AdminDeleteInstanceResponse(BaseModel): "The instance name of the instance that was changed." instance_id: int + + +class AdminChangeLibrarianTransferStatusRequest(BaseModel): + """ + A request to change the transfer status of a librarian, either + to enable or disable outbound transfers. + """ + + "The name of the librarian to change the transfer status of." + librarian_name: str + + "Whether to enable or disable outbound transfers." + transfers_enabled: bool + + +class AdminLibrarianTransferStatusResponse(BaseModel): + """ + A response to a user change request. + """ + + "The name of the librarian that was changed." + librarian_name: str + + "Whether the librarian has outbound transfers enabled." + transfers_enabled: bool diff --git a/librarian_background/queues.py b/librarian_background/queues.py index fb46c48..ae580aa 100644 --- a/librarian_background/queues.py +++ b/librarian_background/queues.py @@ -19,6 +19,7 @@ from hera_librarian.transfer import TransferStatus from librarian_server.database import get_session from librarian_server.logger import ErrorCategory, ErrorSeverity, log_to_database +from librarian_server.orm.librarian import Librarian from librarian_server.orm.sendqueue import SendQueue from librarian_server.settings import server_settings @@ -134,6 +135,18 @@ def check_on_consumed( # We are out of time. return False + # Check if the librarian is enabled. + librarian = ( + session.query(Librarian) + .filter_by(name=queue_item.destination) + .one_or_none() + ) + + if librarian is None or not librarian.transfers_enabled: + # We can't do anything with this librarian, but there may be other + # librarians that are enabled. + continue + current_status = queue_item.async_transfer_manager.transfer_status( settings=server_settings ) @@ -219,6 +232,18 @@ def consume_queue_item(session_maker: Callable[[], "Session"]) -> bool: # Nothing to do! return False + # Check if the librarian is enabled. + librarian = ( + session.query(Librarian) + .filter_by(name=queue_item.destination) + .one_or_none() + ) + + if librarian is None or not librarian.transfers_enabled: + # We can't do anything with this librarian, but there may be other + # librarians that are enabled. + return True + # Now, check we don't have too much going on. stmt = ( select(func.count(SendQueue.id)) diff --git a/librarian_background/send_clone.py b/librarian_background/send_clone.py index ca0eacf..4678b48 100644 --- a/librarian_background/send_clone.py +++ b/librarian_background/send_clone.py @@ -479,6 +479,12 @@ def core(self, session: Session): ) return CancelJob + if not librarian.transfers_enabled: + logger.warning( + f"Transfers to librarian {librarian.name} are temporarily disabled, skipping." + ) + return + client: "LibrarianClient" = librarian.client() try: diff --git a/librarian_server/api/admin.py b/librarian_server/api/admin.py index 603e46f..cb44309 100644 --- a/librarian_server/api/admin.py +++ b/librarian_server/api/admin.py @@ -16,10 +16,12 @@ from hera_librarian.models.admin import ( AdminAddLibrarianRequest, AdminAddLibrarianResponse, + AdminChangeLibrarianTransferStatusRequest, AdminCreateFileRequest, AdminCreateFileResponse, AdminDeleteInstanceRequest, AdminDeleteInstanceResponse, + AdminLibrarianTransferStatusResponse, AdminListLibrariansRequest, AdminListLibrariansResponse, AdminRemoveLibrarianRequest, @@ -388,6 +390,7 @@ def list_librarians( url=librarian.url, port=librarian.port, available=ping if request.ping else None, + enabled=librarian.transfers_enabled, ) ) @@ -581,3 +584,39 @@ def delete_local_instance( store.store_manager.delete(Path(instance.path)) return AdminDeleteInstanceResponse(success=True, instance_id=request.instance_id) + + +@router.post( + path="/librarians/transfer_status/change", + response_model=AdminLibrarianTransferStatusResponse, +) +def change_librarian_transfer_status( + request: AdminChangeLibrarianTransferStatusRequest, + user: AdminUserDependency, + response: Response, + session: Session = Depends(yield_session), +) -> AdminLibrarianTransferStatusResponse: + """ + Change the transfer status of a librarian. This will enable or disable + outbound transfers to the librarian. + """ + + librarian = ( + session.query(Librarian).filter_by(name=request.librarian_name).one_or_none() + ) + + if librarian is None: + response.status_code = status.HTTP_400_BAD_REQUEST + return AdminRequestFailedResponse( + reason=f"Librarian {request.librarian_name} does not exist", + suggested_remedy="Check for typos in your request", + ) + + librarian.transfers_enabled = request.transfers_enabled + + session.commit() + + return AdminLibrarianTransferStatusResponse( + librarian_name=librarian.name, + transfers_enabled=librarian.transfers_enabled, + ) diff --git a/librarian_server/orm/librarian.py b/librarian_server/orm/librarian.py index 23273dd..0667839 100644 --- a/librarian_server/orm/librarian.py +++ b/librarian_server/orm/librarian.py @@ -36,6 +36,7 @@ class Librarian(db.Base): "The port of this librarian." authenticator = db.Column(db.String(256), nullable=False) "The authenticator so we can connect this librarian. This is encrypted." + transfers_enabled = db.Column(db.Boolean, nullable=False, default=True) last_seen = db.Column(db.DateTime, nullable=False) "The last time we connected to and verified this librarian exists." diff --git a/librarian_server/settings.py b/librarian_server/settings.py index ddcd9b5..2853c6e 100644 --- a/librarian_server/settings.py +++ b/librarian_server/settings.py @@ -87,10 +87,10 @@ class ServerSettings(BaseSettings): # Host and port to bind to. host: str = "0.0.0.0" - port: int + port: int = 8080 # Stores that the librarian should add or migrate - add_stores: list[StoreSettings] + add_stores: list[StoreSettings] = [] # Database migration settings alembic_config_path: str = "." diff --git a/librarian_server_scripts/librarian_change_transfer_status.py b/librarian_server_scripts/librarian_change_transfer_status.py new file mode 100644 index 0000000..3e7aa93 --- /dev/null +++ b/librarian_server_scripts/librarian_change_transfer_status.py @@ -0,0 +1,58 @@ +""" +A command-line script, for use in containers, to change the behaviour +of transfers to external librarians. +""" + +import argparse as ap + +parser = ap.ArgumentParser( + description=( + "Change the status of an external librarian, to enable or disable transfers." + ) +) + +parser.add_argument( + "--librarian", + help="Name of the librarian to change the status of.", + type=str, + required=True, +) + +parser.add_argument( + "--enable", + help="Enable the librarian.", + action="store_true", +) + +parser.add_argument( + "--disable", + help="Disable the librarian.", + action="store_true", +) + + +def main(): + args = parser.parse_args() + + if args.enable and args.disable: + raise ValueError("Cannot enable and disable at the same time.") + + if not args.enable and not args.disable: + raise ValueError("Must enable or disable.") + + from librarian_server.database import get_session + from librarian_server.orm import Librarian + + with get_session() as session: + librarian = ( + session.query(Librarian).filter_by(name=args.librarian).one_or_none() + ) + if librarian is None: + raise ValueError(f"Librarian {args.librarian} does not exist.") + + if args.enable: + librarian.transfers_enabled = True + elif args.disable: + librarian.transfers_enabled = False + + session.commit() diff --git a/pyproject.toml b/pyproject.toml index 1e6094d..798f332 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -71,6 +71,7 @@ librarian-server-setup = "librarian_server_scripts.librarian_server_setup:main" librarian = "hera_librarian.cli:main" librarian-server-rebuild-database = "librarian_server_scripts.librarian_server_rebuild_database:main" librarian-server-repair-database = "librarian_server_scripts.librarian_server_repair_database:main" +librarian-change-transfer-status = "librarian_server_scripts.librarian_change_transfer_status:main" [project.urls] Homepage = "https://github.com/simonsobs/librarian" diff --git a/tests/server_unit_test/test_admin.py b/tests/server_unit_test/test_admin.py index 7e262ea..dbe90c7 100644 --- a/tests/server_unit_test/test_admin.py +++ b/tests/server_unit_test/test_admin.py @@ -8,6 +8,7 @@ from hera_librarian.models.admin import ( AdminAddLibrarianRequest, AdminAddLibrarianResponse, + AdminChangeLibrarianTransferStatusRequest, AdminCreateFileRequest, AdminCreateFileResponse, AdminListLibrariansRequest, @@ -352,6 +353,19 @@ def test_add_librarians(test_client, test_server_with_valid_file, test_orm): assert response.success == False assert response.already_exists == True + # Now disable this guy! + disable_request = AdminChangeLibrarianTransferStatusRequest( + librarian_name="our_closest_friend", + transfers_enabled=False, + ) + + response = test_client.post_with_auth( + "/api/v2/admin/librarians/transfer_status/change", + content=disable_request.model_dump_json(), + ) + + assert response.status_code == 200 + # Now we can try the search endpoint. search_request = AdminListLibrariansRequest( ping=False, @@ -371,6 +385,7 @@ def test_add_librarians(test_client, test_server_with_valid_file, test_orm): assert librarian.url == "http://localhost" assert librarian.port == 80 assert librarian.available == None + assert librarian.enabled == False found = True assert found