Skip to content

Commit

Permalink
Add ability to kill off a store once we have generated a manifest.
Browse files Browse the repository at this point in the history
  • Loading branch information
JBorrow committed Feb 28, 2024
1 parent 9f1351e commit 84edbb3
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 14 deletions.
16 changes: 16 additions & 0 deletions hera_librarian/models/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,30 @@ class ManifestEntry(BaseModel):
instance_create_time: datetime
"The time the instance was created."
instance_available: bool
"Whether the instance is available or not. If not, no outgoing transfer is created."

outgoing_transfer_id: int
"The ID of the outgoing transfer, if it exists."


class AdminStoreManifestRequest(BaseModel):
store_name: str
"The name of the store to get the manifest for."

create_outgoing_transfers: bool = False
"Whether to create outgoing transfers for the files in the manifest."

destination_librarian: str = ""
"The name of the librarian to send the files to, if create_outgoing_transfers is true."

disable_store: bool = False
"Whether to disable the store after creating the outgoing transfers."


class AdminStoreManifestResponse(BaseModel):
librarian_name: str
"The name of the librarian that generated this manifest."

store_name: str
"The name of the store."

Expand Down
117 changes: 103 additions & 14 deletions librarian_server/api/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
)

from ..database import yield_session
from ..orm import File, Instance, StoreMetadata
from ..logger import log
from ..orm import File, Instance, Librarian, OutgoingTransfer, StoreMetadata
from ..stores import InvertedStoreNames, StoreNames
from .auth import AdminUserDependency

Expand Down Expand Up @@ -193,36 +194,124 @@ def store_manifest(
the librarian, thus completing the 'sneakernet' process.
"""

log.debug(f"Recieved manifest request from {user.username}: {request}.")

# First, get the store.
store = (
session.query(StoreMetadata).filter_by(name=request.store_name).one_or_none()
)

if store is None:
response.status_code = status.HTTP_400_BAD_REQUEST

log.warning(f"Store {request.store_name} does not exist. Returning 400.")

return AdminRequestFailedResponse(
reason=f"Store {request.store_name} does not exist.",
suggested_remedy="Create the store first. Maybe you need to run DB migration?",
)

# If we are going to create outgoing transfers, we need to make sure
# that the destination librarian exists.

outgoing_librarian: Librarian | None = None

if request.create_outgoing_transfers:
outgoing_librarian = (
session.query(Librarian)
.filter_by(name=request.destination_librarian)
.one_or_none()
)

if outgoing_librarian is None:
response.status_code = status.HTTP_400_BAD_REQUEST

log.warning(
f"Librarian {request.destination_librarian} does not exist, and "
"user requested transfers to be created. Returning 400."
)

return AdminRequestFailedResponse(
reason=f"Librarian {request.destination_librarian} does not exist.",
suggested_remedy="Create the librarian first in the database.",
)

# Get the list of instances.
instances = session.query(Instance).filter_by(store=store).all()

return AdminStoreManifestResponse(
def create_manifest_entry(instance: Instance) -> ManifestEntry:
"Create a linked transfer and manifest entry based on logic."

if request.create_outgoing_transfers:
transfer = OutgoingTransfer.new_transfer(
destination=outgoing_librarian.name,
instance=instance,
file=instance.file,
)

session.add(transfer)
session.commit()

transfer_id = transfer.id
else:
transfer_id = -1

return ManifestEntry(
name=instance.file.name,
create_time=instance.file.create_time,
size=instance.file.size,
checksum=instance.file.checksum,
uploader=instance.file.uploader,
source=instance.file.source,
instance_path=instance.path,
deletion_policy=instance.deletion_policy,
instance_create_time=instance.created_time,
instance_available=instance.available,
outgoing_transfer_id=transfer_id,
)

response = AdminStoreManifestResponse(
store_name=store.name,
store_files=[
ManifestEntry(
name=instance.file.name,
create_time=instance.file.create_time,
size=instance.file.size,
checksum=instance.file.checksum,
uploader=instance.file.uploader,
source=instance.file.source,
instance_path=instance.path,
deletion_policy=instance.deletion_policy,
instance_create_time=instance.created_time,
instance_available=instance.available,
)
create_manifest_entry(instance)
for instance in instances
if instance.available
],
)

if request.disable_store:
store.enabled = False

try:
session.commit()
except Exception as e:
log.error(
f"Failed to disable store {store.name}: {e}. Returning 500, but before "
"that, we need to kill off all these transfers we just created."
)

# Kill off all the transfers we just created.
if request.create_outgoing_transfers:
successfully_killed = 0
for file in response.store_files:
if file.outgoing_transfer_id != -1:
transfer = session.query(OutgoingTransfer).get(
file.outgoing_transfer_id
)
transfer.fail_transfer(session)
successfully_killed += 1

log.error(
f"Killed off {successfully_killed}/{len(response.store_files)} transfers "
"we just created."
)

response.status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
return AdminRequestFailedResponse(
reason="Failed to disable store.",
suggested_remedy="Check the logs for more information.",
)

log.info(f"Disabled store {store.name}.")

return response

0 comments on commit 84edbb3

Please sign in to comment.