Skip to content

Commit

Permalink
Add client ability to shut down transfers without configuration edits
Browse files Browse the repository at this point in the history
  • Loading branch information
JBorrow committed Nov 12, 2024
1 parent 8ce3ab2 commit 8e1235e
Show file tree
Hide file tree
Showing 12 changed files with 300 additions and 2 deletions.
29 changes: 29 additions & 0 deletions alembic/versions/1def8c988372_add_librarian_transfer_toggling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright 2017 the HERA Collaboration
# Licensed under the 2-clause BSD License.

"""Add librarian transfer toggling
Revision ID: 1def8c988372
Revises: 42f29c26ab0f
Create Date: 2024-11-11 15:09:12.578181
"""
import sqlalchemy as sa

from alembic import op

revision = "1def8c988372"
down_revision = "42f29c26ab0f"
branch_labels = None
depends_on = None


def upgrade():
with op.batch_alter_table("librarians") as batch_op:
batch_op.add_column(
sa.Column("transfers_enabled", sa.Boolean(), nullable=False, default=True)
)


def downgrade():
op.drop_column("librarians", "transfers_enabled")
56 changes: 56 additions & 0 deletions hera_librarian/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,7 @@ def get_librarian_list(args):
print(
f"\033[1m{librarian.name}\033[0m ({librarian.url}:{librarian.port}) "
f"- {'Available' if librarian.available else 'Disabled' if librarian.available is not None else 'Unknown'}"
f"- {'Enabled' if librarian.enabled else 'Disabled'}"
)

return 0
Expand Down Expand Up @@ -771,6 +772,36 @@ def validate_file(args):
return 0


def set_librarian_transfer(args):
"""
Set the transfer status of a librarian.
"""

client = get_client(args.conn_name, admin=True)

if args.enabled and args.disabled:
die("Cannot set both enabled and disabled.")

if args.enabled:
transfer_status = True
elif args.disabled:
transfer_status = False
else:
die("You must choose to enable or disable the transfers.")

try:
client.set_librarian_transfer(
librarian_name=args.librarian_name,
transfer_status=transfer_status,
)
except LibrarianError as e:
die(f"Error setting librarian transfer status: {e}")
except LibrarianHTTPError as e:
die(f"Unexpected error communicating with the librarian server: {e.reason}")

return 0


# make the base parser
def generate_parser():
"""Make a librarian ArgumentParser.
Expand Down Expand Up @@ -1705,6 +1736,31 @@ def config_validate_file_subparser(sub_parsers):
return


def config_set_librarian_transfer_subparser(sub_parsers):
doc = """Set the transfer state of a librarian.
"""
hlp = "Set the transfer state of a librarian"

# add sub parser
sp = sub_parsers.add_parser("set-librarian-transfer", description=doc, help=hlp)
sp.add_argument("conn_name", metavar="CONNECTION-NAME", help=_conn_name_help)
sp.add_argument(
"--name", help="The name of the librarian to set the transfer state of."
)
sp.add_argument(
"--enabled",
action="store_true",
help="Set the librarian to enabled for transfers.",
)
sp.add_argument(
"--disabled",
action="store_true",
help="Set the librarian to disabled for transfers.",
)
sp.set_defaults(func=set_librarian_transfer)


def main():
# make a parser and run the specified command
parser = generate_parser()
Expand Down
40 changes: 40 additions & 0 deletions hera_librarian/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@
from .models.admin import (
AdminAddLibrarianRequest,
AdminAddLibrarianResponse,
AdminChangeLibrarianTransferStatusRequest,
AdminCreateFileRequest,
AdminCreateFileResponse,
AdminDeleteInstanceRequest,
AdminDeleteInstanceResponse,
AdminLibrarianTransferStatusResponse,
AdminListLibrariansRequest,
AdminListLibrariansResponse,
AdminRemoveLibrarianRequest,
Expand Down Expand Up @@ -1319,3 +1321,41 @@ def remove_librarian(
raise e

return response.success, response.number_of_transfers_removed

def set_librarian_status(
self,
librarian_name: str,
transfers_enabled: bool,
) -> bool:
"""
Set the status of transfers to the librarian.
Parameters
----------
librarian_name : str
The name of the librarian to set the status of.
transfers_enabled : bool
Whether transfers to this librarian should be enabled.
Returns
-------
bool
The new status.
"""

try:
response = self.post(
endpoint="admin/librarians/transfer_status/set",
request=AdminChangeLibrarianTransferStatusRequest(
librarian_name=librarian_name,
transfers_enabled=transfers_enabled,
),
response=AdminLibrarianTransferStatusResponse,
)
except LibrarianHTTPError as e:
if e.status_code == 400 and "Librarian" in e.reason:
raise LibrarianError(e.reason)
else:
raise e

return response.transfers_enabled
28 changes: 28 additions & 0 deletions hera_librarian/models/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ class LibrarianListResponseItem(BaseModel):
available: bool | None
"Whether the librarian is available or not, only if ping is true."

enabled: bool
"Whether transfers the librarian is enabled or not."


class AdminListLibrariansResponse(BaseModel):
librarians: list[LibrarianListResponseItem]
Expand Down Expand Up @@ -238,3 +241,28 @@ class AdminDeleteInstanceResponse(BaseModel):

"The instance name of the instance that was changed."
instance_id: int


class AdminChangeLibrarianTransferStatusRequest(BaseModel):
"""
A request to change the transfer status of a librarian, either
to enable or disable outbound transfers.
"""

"The name of the librarian to change the transfer status of."
librarian_name: str

"Whether to enable or disable outbound transfers."
transfers_enabled: bool


class AdminLibrarianTransferStatusResponse(BaseModel):
"""
A response to a user change request.
"""

"The name of the librarian that was changed."
librarian_name: str

"Whether the librarian has outbound transfers enabled."
transfers_enabled: bool
25 changes: 25 additions & 0 deletions librarian_background/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from hera_librarian.transfer import TransferStatus
from librarian_server.database import get_session
from librarian_server.logger import ErrorCategory, ErrorSeverity, log_to_database
from librarian_server.orm.librarian import Librarian
from librarian_server.orm.sendqueue import SendQueue
from librarian_server.settings import server_settings

Expand Down Expand Up @@ -134,6 +135,18 @@ def check_on_consumed(
# We are out of time.
return False

# Check if the librarian is enabled.
librarian = (
session.query(Librarian)
.filter_by(name=queue_item.destination)
.one_or_none()
)

if librarian is None or not librarian.transfers_enabled:
# We can't do anything with this librarian, but there may be other
# librarians that are enabled.
continue

current_status = queue_item.async_transfer_manager.transfer_status(
settings=server_settings
)
Expand Down Expand Up @@ -219,6 +232,18 @@ def consume_queue_item(session_maker: Callable[[], "Session"]) -> bool:
# Nothing to do!
return False

# Check if the librarian is enabled.
librarian = (
session.query(Librarian)
.filter_by(name=queue_item.destination)
.one_or_none()
)

if librarian is None or not librarian.transfers_enabled:
# We can't do anything with this librarian, but there may be other
# librarians that are enabled.
return True

# Now, check we don't have too much going on.
stmt = (
select(func.count(SendQueue.id))
Expand Down
6 changes: 6 additions & 0 deletions librarian_background/send_clone.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,12 @@ def core(self, session: Session):
)
return CancelJob

if not librarian.transfers_enabled:
logger.warning(
f"Transfers to librarian {librarian.name} are temporarily disabled, skipping."
)
return

client: "LibrarianClient" = librarian.client()

try:
Expand Down
39 changes: 39 additions & 0 deletions librarian_server/api/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
from hera_librarian.models.admin import (
AdminAddLibrarianRequest,
AdminAddLibrarianResponse,
AdminChangeLibrarianTransferStatusRequest,
AdminCreateFileRequest,
AdminCreateFileResponse,
AdminDeleteInstanceRequest,
AdminDeleteInstanceResponse,
AdminLibrarianTransferStatusResponse,
AdminListLibrariansRequest,
AdminListLibrariansResponse,
AdminRemoveLibrarianRequest,
Expand Down Expand Up @@ -388,6 +390,7 @@ def list_librarians(
url=librarian.url,
port=librarian.port,
available=ping if request.ping else None,
enabled=librarian.transfers_enabled,
)
)

Expand Down Expand Up @@ -581,3 +584,39 @@ def delete_local_instance(
store.store_manager.delete(Path(instance.path))

return AdminDeleteInstanceResponse(success=True, instance_id=request.instance_id)


@router.post(
path="/librarians/transfer_status/change",
response_model=AdminLibrarianTransferStatusResponse,
)
def change_librarian_transfer_status(
request: AdminChangeLibrarianTransferStatusRequest,
user: AdminUserDependency,
response: Response,
session: Session = Depends(yield_session),
) -> AdminLibrarianTransferStatusResponse:
"""
Change the transfer status of a librarian. This will enable or disable
outbound transfers to the librarian.
"""

librarian = (
session.query(Librarian).filter_by(name=request.librarian_name).one_or_none()
)

if librarian is None:
response.status_code = status.HTTP_400_BAD_REQUEST
return AdminRequestFailedResponse(
reason=f"Librarian {request.librarian_name} does not exist",
suggested_remedy="Check for typos in your request",
)

librarian.transfers_enabled = request.transfers_enabled

session.commit()

return AdminLibrarianTransferStatusResponse(
librarian_name=librarian.name,
transfers_enabled=librarian.transfers_enabled,
)
1 change: 1 addition & 0 deletions librarian_server/orm/librarian.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class Librarian(db.Base):
"The port of this librarian."
authenticator = db.Column(db.String(256), nullable=False)
"The authenticator so we can connect this librarian. This is encrypted."
transfers_enabled = db.Column(db.Boolean, nullable=False, default=True)

last_seen = db.Column(db.DateTime, nullable=False)
"The last time we connected to and verified this librarian exists."
Expand Down
4 changes: 2 additions & 2 deletions librarian_server/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ class ServerSettings(BaseSettings):

# Host and port to bind to.
host: str = "0.0.0.0"
port: int
port: int = 8080

# Stores that the librarian should add or migrate
add_stores: list[StoreSettings]
add_stores: list[StoreSettings] = []

# Database migration settings
alembic_config_path: str = "."
Expand Down
Loading

0 comments on commit 8e1235e

Please sign in to comment.