Skip to content

Commit

Permalink
Added new incoming transfer hypervisor
Browse files Browse the repository at this point in the history
  • Loading branch information
JBorrow committed Aug 12, 2024
1 parent 72a6d94 commit 83760fa
Show file tree
Hide file tree
Showing 7 changed files with 465 additions and 6 deletions.
1 change: 1 addition & 0 deletions librarian_background/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def background(run_once: bool = False):
+ background_settings.consume_queue
+ background_settings.check_consumed_queue
+ background_settings.outgoing_transfer_hypervisor
+ background_settings.incoming_transfer_hypervisor
)

for task in all_tasks:
Expand Down
189 changes: 189 additions & 0 deletions librarian_background/hypervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from sqlalchemy.orm import Session

from hera_librarian.exceptions import LibrarianHTTPError, LibrarianTimeoutError
from hera_librarian.models.checkin import CheckinStatusRequest, CheckinStatusResponse
from hera_librarian.utils import compare_checksums
from librarian_server.database import get_session
from librarian_server.logger import ErrorCategory, ErrorSeverity, log_to_database
Expand All @@ -22,6 +23,7 @@
RemoteInstance,
TransferStatus,
)
from librarian_server.orm.transfer import IncomingTransfer

from .task import Task

Expand Down Expand Up @@ -174,6 +176,160 @@ def handle_stale_outgoing_transfer(
return True


def handle_stale_incoming_transfer(
session: Session,
transfer: IncomingTransfer,
) -> bool:

upstream_librarian = (
session.query(Librarian).filter_by(name=transfer.source).one_or_none()
)

if not upstream_librarian:
log_to_database(
severity=ErrorSeverity.ERROR,
category=ErrorCategory.DATA_INTEGRITY,
message=f"Upstream librarian {transfer.source} not found",
session=session,
)

transfer.fail_transfer(session=session, commit=True)
return False

# We have an upstream librarian. We can ask it about the status of its
# own OutgoingTransfer.
client = upstream_librarian.client()

status_request = CheckinStatusRequest(
source_transfer_ids=[transfer.source_transfer_id],
destination_transfer_ids=[],
)

try:
response: CheckinStatusResponse = client.post(
"checkin/status", request=status_request, response=CheckinStatusResponse
)

source_status = response.source_transfer_status[transfer.source_transfer_id]
except Exception as e:
log_to_database(
severity=ErrorSeverity.ERROR,
category=ErrorCategory.TRANSFER,
message=(
f"Unsuccessfully tried to contact {transfer.source} for information on "
f"transfer (local: {transfer.id}, remote: {transfer.source_transfer_id}). "
f"Exception: {e}. We are failing {transfer.id}"
),
session=session,
)

# This implies that the transfer doesn't exist on the remote.
transfer.fail_transfer(session=session, commit=True)
return False

# Now we need to do some state matching.
# If remote transfer is 'dead', we can just cancel.
# If remote transfer is 'alive' and in same state as ours, we can
# just leave it... For now. It is upstream's job to cancel these kind
# of stale transfers.
# If remote transfer is 'alive' and in a different state to ours, there
# are two possibilities:
# a) Remote is more advanced than us; we should update ourselves to be
# aligned with this to try to progress with the transfer.
# b) Remote is less advanced than us; we should cancel the transfer - this
# is an explicitly bad state as we are a push-based system.

if source_status in [TransferStatus.COMPLETED]:
log_to_database(
severity=ErrorSeverity.CRITICAL,
category=ErrorCategory.PROGRAMMING,
message=(
f"Transfer (local: {transfer.id}, remote: {transfer.source_transfer_id}) has "
f"COMPLETED status on remote but {transfer.status} on local. This is "
f"an unreachable state for file {transfer.upload_name}. Requires manual check"
),
session=session,
)

transfer.fail_transfer(session=session, commit=True)
return False

if source_status in [TransferStatus.CANCELLED, TransferStatus.FAILED]:
# This one's a gimmie.
transfer.fail_transfer(session=session, commit=True)
return False

if source_status == transfer.status:
# This is the remote's responsibility.
return True

# We only get here in annoying scenarios.
if transfer.status == TransferStatus.INITIATED:
# Remote more advanced.
log_to_database(
severity=ErrorSeverity.INFO,
category=ErrorCategory.TRANSFER,
message=(
f"Transfer (local: {transfer.id}, remote: {transfer.source_transfer_id}) has "
f"more advanced state on remote ({source_status} > {transfer.status}). Catching"
f"up our transfer."
),
session=session,
)

transfer.status = source_status
session.commit()
return True

if transfer.status == TransferStatus.STAGED:
# Uh, this should be picked up by a different task (recv_clone)
log_to_database(
severity=ErrorSeverity.CRITICAL,
category=ErrorCategory.CONFIGURATION,
message=(
f"Transfer (local: {transfer.id}, remote: {transfer.source_transfer_id}) has "
"status STAGED and is being picked up by the hypervisor task. This should not "
"occur; recommend manual check"
),
session=session,
)
return False

if transfer.status == TransferStatus.ONGOING:
if source_status == TransferStatus.INITIATED:
transfer.fail_transfer(session=session, commit=True)
return False
else:
assert source_status == TransferStatus.STAGED
# Remote more advanced (STAGED)
log_to_database(
severity=ErrorSeverity.INFO,
category=ErrorCategory.TRANSFER,
message=(
f"Transfer (local: {transfer.id}, remote: {transfer.source_transfer_id}) has "
f"more advanced state on remote ({source_status} > {transfer.status}). Catching"
f"up our transfer."
),
session=session,
)

transfer.status = source_status
session.commit()
return True

log_to_database(
severity=ErrorSeverity.CRITICAL,
category=ErrorCategory.PROGRAMMING,
message=(
f"Transfer (local: {transfer.id}, remote: {transfer.source_transfer_id}) has "
"fallen through the hypervisor. Recommend manual check"
),
session=session,
)

return True


class OutgoingTransferHypervisor(Task):
"""
Checks in on stale outgoing transfers.
Expand Down Expand Up @@ -205,3 +361,36 @@ def core(self, session):
handle_stale_outgoing_transfer(session, transfer)

return True


class IncomingTransferHypervisor(Task):
"""
Checks on stale incoming transfers.
"""

age_in_days: int
"The age in days of the incoming transfer before we consider it stale."

def on_call(self):
with get_session() as session:
return self.core(session=session)

def core(self, session):
"""
Checks for stale incoming transfers and updates their status.
"""

start_time = datetime.datetime.now(datetime.timezone.utc)
end_time = start_time + self.soft_timeout

stale_transfers = get_stale_of_type(session, self.age_in_days, IncomingTransfer)

for transfer in stale_transfers:
current_time = datetime.datetime.now(datetime.timezone.utc)

if current_time > end_time:
return False

handle_stale_incoming_transfer(session, transfer)

return True
27 changes: 24 additions & 3 deletions librarian_background/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
from pydantic_settings import BaseSettings, SettingsConfigDict

from hera_librarian.deletion import DeletionPolicy
from librarian_background.hypervisor import OutgoingTransferHypervisor
from librarian_background.hypervisor import (
IncomingTransferHypervisor,
OutgoingTransferHypervisor,
)

from .check_integrity import CheckIntegrity
from .create_clone import CreateLocalClone
Expand Down Expand Up @@ -176,7 +179,7 @@ def task(self) -> CheckConsumedQueue:
)


class OutgoingTransferHypevrisorSettings(BackgroundTaskSettings):
class OutgoingTransferHypervisorSettings(BackgroundTaskSettings):
"""
Settings for the hypervisor task.
"""
Expand All @@ -193,6 +196,23 @@ def task(self) -> OutgoingTransferHypervisor:
)


class IncomingTransferHypervisorSettings(BackgroundTaskSettings):
"""
Settings for the hypervisor task.
"""

age_in_days: int
"The age of the items to check, in days."

@property
def task(self) -> IncomingTransferHypervisor:
return IncomingTransferHypervisor(
name=self.task_name,
age_in_days=self.age_in_days,
soft_timeout=self.soft_timeout,
)


class BackgroundSettings(BaseSettings):
"""
Background task settings, configurable.
Expand All @@ -218,7 +238,8 @@ class BackgroundSettings(BaseSettings):
check_consumed_queue: list[CheckConsumedQueueSettings] = []
"Settings for the check consumed queue task."

outgoing_transfer_hypervisor: list[OutgoingTransferHypevrisorSettings] = []
outgoing_transfer_hypervisor: list[OutgoingTransferHypervisorSettings] = []
incoming_transfer_hypervisor: list[IncomingTransferHypervisorSettings] = []

# Global settings:

Expand Down
6 changes: 6 additions & 0 deletions librarian_server/api/checkin.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ def modify_transfers_by_id(
or (transfer.uploader == user.username)
)

# OutgoingTransfers
try:
authorized = authorized or (transfer.destination == user.username)
except AttributeError:
pass

if not authorized:
unprocessed.append(transfer_id)
reasons.add("You are not authorized to modify the transfer.")
Expand Down
68 changes: 66 additions & 2 deletions librarian_server/orm/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
"""

import datetime
from pathlib import Path
from typing import TYPE_CHECKING

from hera_librarian.models.checkin import CheckinUpdateRequest, CheckinUpdateResponse
from hera_librarian.models.clone import (
CloneFailRequest,
CloneFailResponse,
Expand Down Expand Up @@ -104,6 +106,68 @@ def new_transfer(
start_time=datetime.datetime.utcnow(),
)

def fail_transfer(self, session: "Session", commit: bool = True):
"""
Fail the transfer and (optionally) commit to the database.
"""

self.status = TransferStatus.FAILED
self.end_time = datetime.datetime.now(datetime.timezone.utc)

try:
self.store.store_manager.unstage(Path(self.staging_path))
except (FileNotFoundError, OSError, AttributeError):
pass

if commit:
session.commit()

if self.source_transfer_id is None:
# No remote transfer ID, so we can't do anything.
return

# Now here's the interesting part - we need to communicate to the
# remote librarian that the transfer failed!
librarian: Librarian = (
session.query(Librarian).filter_by(name=self.source).first()
)

if not librarian:
# Librarian doesn't exist. We can't do anything.
log.error(
"Remote librarian does not exist when trying to fail transfer. "
"This state should be entirely unreachable."
)
return

client = librarian.client()

request = CheckinUpdateRequest(
source_transfer_ids=[self.source_transfer_id],
destination_transfer_ids=[],
new_status=TransferStatus.FAILED,
)

try:
response: CheckinUpdateResponse = client.post(
"checkin/update",
request=request,
response=CheckinUpdateResponse,
)

if not response.success:
raise Exception(
"Remote librarian refused or failed to set transfer status to FAILED."
)
except Exception as e:
log.error(
f"Failed to communicate to remote librarian that transfer {self.id} "
f"failed with exception {e}. It is likely that there is a stale transfer "
f"on remote librarian {self.source} with id {self.source_transfer_id}."
)

return


class OutgoingTransfer(db.Base):
"""
Expand Down Expand Up @@ -224,7 +288,7 @@ def fail_transfer(self, session: "Session", commit: bool = True):

try:
response: CloneFailResponse = client.post(
path="/api/v2/clone/fail",
"clone/fail",
request=request,
response=CloneFailResponse,
)
Expand Down Expand Up @@ -275,7 +339,7 @@ def staged_transfer(self, session: "Session", commit: bool = True):

try:
response: CloneStagedResponse = client.post(
path="/api/v2/clone/staged",
"clone/staged",
request=request,
response=CloneStagedResponse,
)
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def mocked_admin_client(test_client):

# Now need to replace post

def new_post(endpoint: str, request, response):
def new_post(endpoint: str, request, response=None):
endpoint = f"/api/v2/{endpoint}"
r = test_client.post_with_auth(endpoint, content=request.model_dump_json())

Expand Down
Loading

0 comments on commit 83760fa

Please sign in to comment.