Skip to content

Commit

Permalink
Add log statements to repository and event sub
Browse files Browse the repository at this point in the history
  • Loading branch information
TheByronHimes committed Feb 7, 2024
1 parent 7cc9eac commit d373505
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 18 deletions.
7 changes: 7 additions & 0 deletions src/wps/adapters/inbound/event_sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

"""KafkaEventSubscriber receiving events that announce datasets"""

import logging
from contextlib import suppress

from ghga_event_schemas import pydantic_ as event_schemas
Expand All @@ -30,6 +31,8 @@

__all__ = ["EventSubTranslatorConfig", "EventSubTranslator"]

log = logging.getLogger(__name__)


class EventSubTranslatorConfig(BaseSettings):
"""Config for dataset creation related events."""
Expand Down Expand Up @@ -83,6 +86,10 @@ async def _handle_upsertion(self, payload: JsonObject):
stage = WorkType[validated_payload.stage.name]
except KeyError:
# stage does not correspond to a work type, ignore event
log.info(
"Ignoring dataset event with unknown stage %s",
validated_payload.stage.name,
)
return

files = [
Expand Down
103 changes: 85 additions & 18 deletions src/wps/core/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

"""A repository for work packages."""

import logging
from datetime import timedelta
from typing import Optional

Expand Down Expand Up @@ -48,6 +49,8 @@
WorkPackageDaoPort,
)

log = logging.getLogger(__name__)


class WorkPackageConfig(BaseSettings):
"""Config parameters needed for the WorkPackageRepository."""
Expand Down Expand Up @@ -88,7 +91,9 @@ def __init__(
config.work_package_signing_key.get_secret_value()
)
if not self._signing_key.has_private:
raise KeyError("No private work order signing key found.")
key_error = KeyError("No private work order signing key found.")
log.error(key_error)
raise key_error
self._access = access_check
self._dataset_dao = dataset_dao
self._dao = work_package_dao
Expand All @@ -107,30 +112,49 @@ async def create(
"""
user_id = auth_context.id
if user_id is None:
raise self.WorkPackageAccessError("No internal user specified")
access_error = self.WorkPackageAccessError("No internal user specified")
log.error(access_error)
raise access_error

dataset_id = creation_data.dataset_id
work_type = creation_data.type

extra = { # only used for logging
"user_id": user_id,
"dataset_id": dataset_id,
"work_type": work_type,
}

if work_type == WorkType.DOWNLOAD:
if not await self._access.check_download_access(user_id, dataset_id):
raise self.WorkPackageAccessError("Missing dataset access permission")
access_error = self.WorkPackageAccessError(
"Missing dataset access permission"
)
log.error(access_error, extra=extra)
raise access_error
else:
raise self.WorkPackageAccessError("Unsupported work type")
access_error = self.WorkPackageAccessError("Unsupported work type")
log.error(access_error, extra=extra)
raise access_error

try:
dataset = await self.get_dataset(dataset_id)
except self.DatasetNotFoundError as error:
raise self.WorkPackageAccessError(
"Cannot determine dataset files"
) from error
access_error = self.WorkPackageAccessError("Cannot determine dataset files")
log.error(access_error, extra=extra)
raise access_error from error

file_ids = [file.id for file in dataset.files]
if creation_data.file_ids is not None:
# if file_ids is not passed as None, restrict the file set
file_id_set = set(creation_data.file_ids)
file_ids = [file_id for file_id in file_ids if file_id in file_id_set]
if not file_ids:
raise self.WorkPackageAccessError("No existing files have been specified")
access_error = self.WorkPackageAccessError(
"No existing files have been specified"
)
log.error(access_error, extra=extra)
raise access_error

file_id_set = set(file_ids)
files = {
Expand Down Expand Up @@ -178,24 +202,44 @@ async def get(
- if a work_package_access_token is specified and it does not match
the token hash that is stored in the work package
"""
extra = {"work_package_id": work_package_id} # only used for logging

try:
work_package = await self._dao.get_by_id(work_package_id)
except ResourceNotFoundError as error:
raise self.WorkPackageAccessError("Work package not found") from error
access_error = self.WorkPackageAccessError("Work package not found")
log.error(access_error, extra=extra)
raise access_error from error

if work_package_access_token and work_package.token_hash != hash_token(
work_package_access_token
):
raise self.WorkPackageAccessError("Invalid work package access token")
access_error = self.WorkPackageAccessError(
"Invalid work package access token"
)
log.error(access_error, extra=extra)
raise access_error

if check_valid:
if not work_package.created <= now_as_utc() <= work_package.expires:
raise self.WorkPackageAccessError("Work package has expired")
access_error = self.WorkPackageAccessError("Work package has expired")
log.error(access_error, extra=extra)
raise access_error

if work_package.type == WorkType.DOWNLOAD:
if not await self._access.check_download_access(
work_package.user_id, work_package.dataset_id
):
raise self.WorkPackageAccessError("Access has been revoked")
access_error = self.WorkPackageAccessError(
"Access has been revoked"
)
log.error(access_error, extra=extra)
raise access_error
else:
raise self.WorkPackageAccessError("Unsupported work type")
access_error = self.WorkPackageAccessError("Unsupported work type")
log.error(access_error, extra=extra)
raise access_error

return work_package

async def work_order_token(
Expand All @@ -215,13 +259,25 @@ async def work_order_token(
- if a work_package_access_token is specified and it does not match
the token hash that is stored in the work package
"""
extra = { # only used for logging
"work_package_id": work_package_id,
"file_id": file_id,
"check_valid": check_valid,
}

work_package = await self.get(
work_package_id,
check_valid=check_valid,
work_package_access_token=work_package_access_token,
)

if file_id not in work_package.files:
raise self.WorkPackageAccessError("File is not contained in work package")
access_error = self.WorkPackageAccessError(
"File is not contained in work package"
)
log.error(access_error, extra=extra)
raise access_error

user_public_crypt4gh_key = work_package.user_public_crypt4gh_key
wot = WorkOrderToken(
type=work_package.type,
Expand All @@ -246,7 +302,9 @@ async def delete_dataset(self, dataset_id: str) -> None:
try:
await self._dataset_dao.delete(id_=dataset_id)
except ResourceNotFoundError as error:
raise self.DatasetNotFoundError("Dataset not found") from error
dataset_not_found_error = self.DatasetNotFoundError("Dataset not found")
log.error(dataset_not_found_error, extra={"dataset_id": dataset_id})
raise dataset_not_found_error from error

async def get_dataset(self, dataset_id: str) -> Dataset:
"""Get a registered dataset using the given ID.
Expand All @@ -256,7 +314,9 @@ async def get_dataset(self, dataset_id: str) -> Dataset:
try:
return await self._dataset_dao.get_by_id(dataset_id)
except ResourceNotFoundError as error:
raise self.DatasetNotFoundError("Dataset not found") from error
dataset_not_found_error = self.DatasetNotFoundError("Dataset not found")
log.error(dataset_not_found_error, extra={"dataset_id": dataset_id})
raise dataset_not_found_error from error

async def get_datasets(
self, *, auth_context: AuthContext, work_type: Optional[WorkType] = None
Expand All @@ -270,15 +330,22 @@ async def get_datasets(
"""
user_id = auth_context.id
if user_id is None:
raise self.WorkPackageAccessError("No internal user specified")
access_error = self.WorkPackageAccessError("No internal user specified")
log.error(access_error)
raise access_error

if work_type is not None and work_type != WorkType.DOWNLOAD:
raise self.WorkPackageAccessError("Unsupported work type")
access_error = self.WorkPackageAccessError("Unsupported work type")
log.error(access_error, extra={"work_type": work_type})
raise access_error

dataset_ids = await self._access.get_datasets_with_download_access(user_id)
datasets: list[Dataset] = []
for dataset_id in dataset_ids:
try:
dataset = await self.get_dataset(dataset_id)
except self.DatasetNotFoundError:
log.debug("Dataset '%s' not found, continuing...", dataset_id)
continue
datasets.append(dataset)
return datasets

0 comments on commit d373505

Please sign in to comment.