diff --git a/.devcontainer/.dev_config.yaml b/.devcontainer/.dev_config.yaml index 1dfb23c..ba4dfcb 100644 --- a/.devcontainer/.dev_config.yaml +++ b/.devcontainer/.dev_config.yaml @@ -16,8 +16,9 @@ kafka_servers: ["kafka:9092"] download_access_url: "http://127.0.0.1:8080/download-access" -dataset_overview_event_topic: metadata -dataset_overview_event_type: metadata_dataset_overview +dataset_change_event_topic: metadata_datasets +dataset_upsertion_event_type: dataset_created +dataset_deletion_event_type: dataset_deleted # the default keys are invalid but set for creating the example specs auth_key: "{}" diff --git a/.github/workflows/cd.yaml b/.github/workflows/cd.yaml index 63f9f67..786220f 100644 --- a/.github/workflows/cd.yaml +++ b/.github/workflows/cd.yaml @@ -32,7 +32,7 @@ jobs: name: Verify tag format # format must be compatible with semantic versioning run: | - SEMVER_REGEX="^(0|[1-9]\d*)\.(0|[1-9]\d*)\.(0|[1-9]\d*)(?:-((?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(?:\.(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*))?(?:\+([0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?$" + SEMVER_REGEX="^(0|[1-9][0-9]*)\.(0|[1-9][0-9]*)\.(0|[1-9][0-9]*)(?:-((?:0|[1-9][0-9]*|[0-9]*[a-zA-Z-][0-9a-zA-Z-]*)(?:\.(?:0|[1-9][0-9]*|[0-9]*[a-zA-Z-][0-9a-zA-Z-]*))*))?(?:\+([0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?$" if echo "${{ steps.get_version_tag.outputs.version }}" | grep -Eq "$SEMVER_REGEX"; then echo "Tag format is valid" else diff --git a/.static_files b/.static_files index a2f8ff5..3ebfa18 100644 --- a/.static_files +++ b/.static_files @@ -15,6 +15,8 @@ scripts/script_utils/__init__.py scripts/script_utils/cli.py +scripts/__init__.py +scripts/update_all.py scripts/license_checker.py scripts/get_package_name.py scripts/update_config_docs.py diff --git a/README.md b/README.md index 10b33fd..8f08494 100644 --- a/README.md +++ b/README.md @@ -52,13 +52,13 @@ We recommend using the provided Docker container. A pre-build version is available at [docker hub](https://hub.docker.com/repository/docker/ghga/work-package-service): ```bash -docker pull ghga/work-package-service:0.1.3 +docker pull ghga/work-package-service:0.1.4 ``` Or you can build the container yourself from the [`./Dockerfile`](./Dockerfile): ```bash # Execute in the repo's root dir: -docker build -t ghga/work-package-service:0.1.3 . +docker build -t ghga/work-package-service:0.1.4 . ``` For production-ready deployment, we recommend using Kubernetes, however, @@ -66,7 +66,7 @@ for simple use cases, you could execute the service using docker on a single server: ```bash # The entrypoint is preconfigured: -docker run -p 8080:8080 ghga/work-package-service:0.1.3 --help +docker run -p 8080:8080 ghga/work-package-service:0.1.4 --help ``` If you prefer not to use containers, you may install the service from source: @@ -102,9 +102,11 @@ The service requires the following configuration parameters: - **Items** *(string)* -- **`dataset_overview_event_topic`** *(string)*: Name of the topic for events that inform about datasets. +- **`dataset_change_event_topic`** *(string)*: Name of the topic for events that inform about datasets. -- **`dataset_overview_event_type`** *(string)*: The type to use for events that inform about datasets. +- **`dataset_upsertion_event_type`** *(string)*: The type of events that inform about new and changed datasets. + +- **`dataset_deletion_event_type`** *(string)*: The type of events that inform about deleted datasets. - **`download_access_url`** *(string)*: URL pointing to the internal download access API. diff --git a/config_schema.json b/config_schema.json index 762552c..606c799 100644 --- a/config_schema.json +++ b/config_schema.json @@ -91,21 +91,30 @@ "type": "string" } }, - "dataset_overview_event_topic": { - "title": "Dataset Overview Event Topic", + "dataset_change_event_topic": { + "title": "Dataset Change Event Topic", "description": "Name of the topic for events that inform about datasets.", - "example": "metadata", + "example": "metadata_datasets", "env_names": [ - "wps_dataset_overview_event_topic" + "wps_dataset_change_event_topic" ], "type": "string" }, - "dataset_overview_event_type": { - "title": "Dataset Overview Event Type", - "description": "The type to use for events that inform about datasets.", - "example": "metadata_dataset_overview", + "dataset_upsertion_event_type": { + "title": "Dataset Upsertion Event Type", + "description": "The type of events that inform about new and changed datasets.", + "example": "dataset_created", "env_names": [ - "wps_dataset_overview_event_type" + "wps_dataset_upsertion_event_type" + ], + "type": "string" + }, + "dataset_deletion_event_type": { + "title": "Dataset Deletion Event Type", + "description": "The type of events that inform about deleted datasets.", + "example": "dataset_deleted", + "env_names": [ + "wps_dataset_deletion_event_type" ], "type": "string" }, @@ -306,8 +315,9 @@ "db_connection_str", "service_instance_id", "kafka_servers", - "dataset_overview_event_topic", - "dataset_overview_event_type", + "dataset_change_event_topic", + "dataset_upsertion_event_type", + "dataset_deletion_event_type", "download_access_url", "auth_key" ], diff --git a/example_config.yaml b/example_config.yaml index 38e3432..67b4d16 100644 --- a/example_config.yaml +++ b/example_config.yaml @@ -13,8 +13,9 @@ cors_allow_credentials: false cors_allowed_headers: [] cors_allowed_methods: [] cors_allowed_origins: [] -dataset_overview_event_topic: metadata -dataset_overview_event_type: metadata_dataset_overview +dataset_change_event_topic: metadata_datasets +dataset_deletion_event_type: dataset_deleted +dataset_upsertion_event_type: dataset_created datasets_collection: datasets db_connection_str: '**********' db_name: dev-db diff --git a/openapi.yaml b/openapi.yaml index 5081a2b..2d26acd 100644 --- a/openapi.yaml +++ b/openapi.yaml @@ -138,7 +138,7 @@ components: info: description: A service managing work packages for the GHGA CLI title: Work Package Service - version: 0.1.3 + version: 0.1.4 openapi: 3.0.2 paths: /health: diff --git a/scripts/__init__.py b/scripts/__init__.py new file mode 100644 index 0000000..6222ab0 --- /dev/null +++ b/scripts/__init__.py @@ -0,0 +1,17 @@ +# Copyright 2021 - 2023 Universität Tübingen, DKFZ, EMBL, and Universität zu Köln +# for the German Human Genome-Phenome Archive (GHGA) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Scripts and utils used during development or in CI pipelines.""" diff --git a/scripts/update_all.py b/scripts/update_all.py new file mode 100755 index 0000000..78854df --- /dev/null +++ b/scripts/update_all.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python3 + +# Copyright 2021 - 2023 Universität Tübingen, DKFZ, EMBL, and Universität zu Köln +# for the German Human Genome-Phenome Archive (GHGA) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Run all update scripts that are present in the repository in the correct order""" + +try: + from scripts.update_template_files import main as update_template +except ImportError: + pass +else: + print("Pulling in updates from template repository") + update_template() + +try: + from scripts.update_config_docs import main as update_config +except ImportError: + pass +else: + print("Updating config docs") + update_config() + +try: + from scripts.update_openapi_docs import main as update_openapi +except ImportError: + pass +else: + print("Updating OpenAPI docs") + update_openapi() + +try: + from scripts.update_readme import main as update_readme +except ImportError: + pass +else: + print("Updating README") + update_readme() diff --git a/scripts/update_template_files.py b/scripts/update_template_files.py index 7ff6e22..952fe2c 100755 --- a/scripts/update_template_files.py +++ b/scripts/update_template_files.py @@ -33,7 +33,7 @@ try: from script_utils.cli import echo_failure, echo_success, run except ImportError: - echo_failure = echo_success = print # type: ignore + echo_failure = echo_success = print def run(main_fn): """Run main function without cli tools (typer).""" diff --git a/setup.cfg b/setup.cfg index b72edba..70e2a20 100644 --- a/setup.cfg +++ b/setup.cfg @@ -35,9 +35,9 @@ zip_safe = False include_package_data = True packages = find: install_requires = - ghga-service-commons[api,auth,crypt]==0.4.1 - ghga-event-schemas==0.13.2 - hexkit[akafka,mongodb]==0.10.0 + ghga-service-commons[api,auth,crypt]==0.5.0 + ghga-event-schemas==0.13.4 + hexkit[akafka,mongodb]==0.10.2 httpx==0.23.3 typer==0.7.0 diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..8dee8e9 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,32 @@ +# Copyright 2021 - 2023 Universität Tübingen, DKFZ, EMBL, and Universität zu Köln +# for the German Human Genome-Phenome Archive (GHGA) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Shared fixtures""" + +import pytest +from hexkit.providers.akafka.testutils import get_kafka_fixture +from hexkit.providers.mongodb.testutils import MongoDbFixture, get_mongodb_fixture +from hexkit.providers.testing.utils import get_event_loop + +event_loop = get_event_loop("session") +kafka_fixture = get_kafka_fixture("session") +mongodb_fixture = get_mongodb_fixture("session") + + +@pytest.fixture(autouse=True) +def reset_db(mongodb_fixture: MongoDbFixture): # noqa: F811 + """Clear the database before tests.""" + mongodb_fixture.empty_collections() diff --git a/tests/fixtures/__init__.py b/tests/fixtures/__init__.py index 86f5560..1f494a4 100644 --- a/tests/fixtures/__init__.py +++ b/tests/fixtures/__init__.py @@ -41,7 +41,7 @@ ) from .access import AccessCheckMock -from .datasets import DATASET_OVERVIEW_EVENT +from .datasets import DATASET_UPSERTION_EVENT __all__ = [ "AUTH_CLAIMS", @@ -139,15 +139,17 @@ async def fixture_container( # publish an event announcing a dataset async with get_container(config=config) as container: await kafka_fixture.publish_event( - payload=DATASET_OVERVIEW_EVENT.dict(), - type_="metadata_dataset_overview", - key="test_key", - topic="metadata", + payload=DATASET_UPSERTION_EVENT.dict(), + topic=config.dataset_change_event_topic, + type_=config.dataset_upsertion_event_type, + key="test-key-fixture", ) # populate database with published dataset event_subscriber = await container.event_subscriber() + # wait for event to be submitted and processed await asyncio.wait_for(event_subscriber.run(forever=False), timeout=10) + await asyncio.sleep(0.25) # return the configured and wired container yield container diff --git a/tests/fixtures/datasets.py b/tests/fixtures/datasets.py index 4a6366f..2df4268 100644 --- a/tests/fixtures/datasets.py +++ b/tests/fixtures/datasets.py @@ -18,13 +18,14 @@ from ghga_event_schemas.pydantic_ import ( MetadataDatasetFile, + MetadataDatasetID, MetadataDatasetOverview, MetadataDatasetStage, ) from wps.core.models import Dataset, DatasetFile, WorkType -__all__ = ["DATASET", "DATASET_OVERVIEW_EVENT"] +__all__ = ["DATASET", "DATASET_UPSERTION_EVENT", "DATASET_DELETION_EVENT"] DATASET = Dataset( @@ -40,7 +41,7 @@ ) -DATASET_OVERVIEW_EVENT = MetadataDatasetOverview( +DATASET_UPSERTION_EVENT = MetadataDatasetOverview( accession="some-dataset-id", stage=MetadataDatasetStage.DOWNLOAD, title="Test dataset 1", @@ -63,3 +64,8 @@ ), ], ) + + +DATASET_DELETION_EVENT = MetadataDatasetID( + accession="some-dataset-id", +) diff --git a/tests/test_api.py b/tests/test_api.py index a2fac66..c52990b 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -19,12 +19,6 @@ from fastapi import status from ghga_service_commons.api.testing import AsyncTestClient from ghga_service_commons.utils.jwt_helpers import decode_and_validate_token -from hexkit.providers.akafka.testutils import ( # noqa: F401 # pylint: disable=unused-import - kafka_fixture, -) -from hexkit.providers.mongodb.testutils import ( # noqa: F401 # pylint: disable=unused-import - mongodb_fixture, -) from pytest import mark from pytest_httpx import HTTPXMock @@ -47,12 +41,14 @@ "user_public_crypt4gh_key": user_public_crypt4gh_key, } +TIMEOUT = 5 + @mark.asyncio async def test_health_check(client: AsyncTestClient): """Test that the health check endpoint works.""" - response = await client.get("/health") + response = await client.get("/health", timeout=TIMEOUT) assert response.status_code == status.HTTP_200_OK assert response.json() == {"status": "OK"} diff --git a/tests/test_events.py b/tests/test_events.py index f1f479f..e2c9771 100644 --- a/tests/test_events.py +++ b/tests/test_events.py @@ -16,18 +16,22 @@ """Test that the work package service consumes and processes events properly.""" -from hexkit.providers.akafka.testutils import ( # noqa: F401 # pylint: disable=unused-import - kafka_fixture, -) -from hexkit.providers.mongodb.testutils import ( # noqa: F401 # pylint: disable=unused-import - mongodb_fixture, -) +import asyncio + +from hexkit.base import InboundProviderBase +from hexkit.providers.akafka.testutils import KafkaFixture from pytest import mark, raises +from wps.config import Config from wps.container import Container +from wps.core.repository import WorkPackageRepository from .fixtures import fixture_container # noqa: F401 # pylint: disable=unused-import -from .fixtures.datasets import DATASET +from .fixtures.datasets import DATASET, DATASET_DELETION_EVENT, DATASET_UPSERTION_EVENT + +TIMEOUT = 5 +RETRY_INTERVAL = 0.05 +RETRIES = round(TIMEOUT / RETRY_INTERVAL) @mark.asyncio @@ -41,3 +45,90 @@ async def test_dataset_registration(container: Container): with raises(repository.DatasetNotFoundError): await repository.get_dataset("another-dataset-id") + + +@mark.asyncio +async def test_dataset_insert_update_delete( + container: Container, kafka_fixture: KafkaFixture +): + """Test the whole lifecycle of a dataset announced as an event.""" + + config: Config = container.config() + repository: WorkPackageRepository = await container.work_package_repository() + event_subscriber: InboundProviderBase = await container.event_subscriber() + + accession = "another-dataset-id" + with raises(repository.DatasetNotFoundError): + await repository.get_dataset(accession) + key = "test-key-crud" + + # insert a dataset + + inserted_dataset = DATASET_UPSERTION_EVENT + inserted_dataset = inserted_dataset.copy(update={"accession": accession}) + await kafka_fixture.publish_event( + payload=inserted_dataset.dict(), + topic=config.dataset_change_event_topic, + type_=config.dataset_upsertion_event_type, + key=key, + ) + await asyncio.wait_for(event_subscriber.run(forever=False), timeout=TIMEOUT) + + # wait until dataset is stored + dataset = None + for _ in range(RETRIES): + await asyncio.sleep(RETRY_INTERVAL) + try: + dataset = await repository.get_dataset(accession) + except repository.DatasetNotFoundError: + pass + else: + assert dataset.title == "Test dataset 1" + break + else: + assert False, "dataset not created" + + # update the dataset + + updated_dataset = DATASET_UPSERTION_EVENT + updated_dataset = inserted_dataset.copy( + update={"accession": accession, "title": "Changed dataset 1"} + ) + await kafka_fixture.publish_event( + payload=updated_dataset.dict(), + topic=config.dataset_change_event_topic, + type_=config.dataset_upsertion_event_type, + key=key, + ) + await asyncio.wait_for(event_subscriber.run(forever=False), timeout=TIMEOUT) + # wait until dataset is updated + dataset = None + for _ in range(RETRIES): + await asyncio.sleep(RETRY_INTERVAL) + dataset = await repository.get_dataset(accession) + if dataset.title == "Changed dataset 1": + break + else: + assert False, "dataset title not changed" + + # delete the dataset again + + deleted_dataset = DATASET_DELETION_EVENT + deleted_dataset = deleted_dataset.copy(update={"accession": accession}) + await kafka_fixture.publish_event( + payload=deleted_dataset.dict(), + topic=config.dataset_change_event_topic, + type_=config.dataset_deletion_event_type, + key=key, + ) + await asyncio.wait_for(event_subscriber.run(forever=False), timeout=TIMEOUT) + + # wait until dataset is deleted + for _ in range(RETRIES): + await asyncio.sleep(RETRY_INTERVAL) + try: + await repository.get_dataset(accession) + except repository.DatasetNotFoundError: + break + else: + assert False, "dataset not deleted" diff --git a/tests/test_repository.py b/tests/test_repository.py index d8a71b8..593fafc 100644 --- a/tests/test_repository.py +++ b/tests/test_repository.py @@ -18,10 +18,6 @@ from ghga_service_commons.auth.ghga import AuthContext from ghga_service_commons.utils.jwt_helpers import decode_and_validate_token -from hexkit.providers.mongodb.testutils import ( # noqa: F401; pylint: disable=unused-import - MongoDbFixture, - mongodb_fixture, -) from pytest import mark, raises from wps.core.models import ( @@ -220,3 +216,21 @@ async def test_checking_accessible_datasets( assert await repository.get_dataset("some-dataset-id") == DATASET assert await repository.get_datasets(auth_context=auth_context) == [DATASET] + + +@mark.asyncio +async def test_deletion_of_datasets( + repository: WorkPackageRepository, auth_context: AuthContext +): + """Test deletion of existing datasets""" + + with raises(repository.DatasetNotFoundError): + await repository.delete_dataset(DATASET.id) + + await repository.register_dataset(DATASET) + assert await repository.get_dataset(DATASET.id) == DATASET + + await repository.delete_dataset(DATASET.id) + + with raises(repository.DatasetNotFoundError): + await repository.delete_dataset(DATASET.id) diff --git a/wps/__init__.py b/wps/__init__.py index 2c96b5e..0af59ea 100644 --- a/wps/__init__.py +++ b/wps/__init__.py @@ -15,4 +15,4 @@ """Work Package Service""" -__version__ = "0.1.3" +__version__ = "0.1.4" diff --git a/wps/adapters/inbound/event_sub.py b/wps/adapters/inbound/event_sub.py index 4e32c03..a070085 100644 --- a/wps/adapters/inbound/event_sub.py +++ b/wps/adapters/inbound/event_sub.py @@ -31,15 +31,20 @@ class EventSubTranslatorConfig(BaseSettings): """Config for dataset creation related events.""" - dataset_overview_event_topic: str = Field( + dataset_change_event_topic: str = Field( ..., description="Name of the topic for events that inform about datasets.", - example="metadata", + example="metadata_datasets", ) - dataset_overview_event_type: str = Field( + dataset_upsertion_event_type: str = Field( ..., - description="The type to use for events that inform about datasets.", - example="metadata_dataset_overview", + description="The type of events that inform about new and changed datasets.", + example="dataset_created", + ) + dataset_deletion_event_type: str = Field( + ..., + description="The type of events that inform about deleted datasets.", + example="dataset_deleted", ) @@ -54,25 +59,18 @@ def __init__( ): """Initialize with config parameters and core dependencies.""" self.topics_of_interest = [ - config.dataset_overview_event_topic, + config.dataset_change_event_topic, ] self.types_of_interest = [ - config.dataset_overview_event_type, + config.dataset_upsertion_event_type, + config.dataset_deletion_event_type, ] + self._dataset_upsertion_event_type = config.dataset_upsertion_event_type + self._dataset_deletion_event_type = config.dataset_deletion_event_type self._repository = work_package_repository - async def _consume_validated( # pylint: disable=unused-argument - self, *, payload: JsonObject, type_: Ascii, topic: Ascii - ) -> None: - """ - Receive and process an event with already validated topic and type. - - Args: - payload (JsonObject): The data/payload to send with the event. - type_ (str): The type of the event. - topic (str): Name of the topic the event was published to. - """ - + async def _handle_upsertion(self, payload: JsonObject): + """Handle event for new or changed datasets.""" validated_payload = get_validated_payload( payload=payload, schema=event_schemas.MetadataDatasetOverview, @@ -100,3 +98,29 @@ async def _consume_validated( # pylint: disable=unused-argument ) await self._repository.register_dataset(dataset) + + async def _handle_deletion(self, payload: JsonObject): + """Handle event for deleted datasets.""" + validated_payload = get_validated_payload( + payload=payload, schema=event_schemas.MetadataDatasetID + ) + try: + await self._repository.delete_dataset(validated_payload.accession) + except self._repository.DatasetNotFoundError: + pass # already deleted + + async def _consume_validated( # pylint: disable=unused-argument + self, *, payload: JsonObject, type_: Ascii, topic: Ascii + ) -> None: + """ + Receive and process an event with already validated topic and type. + + Args: + payload (JsonObject): The data/payload to send with the event. + type_ (str): The type of the event. + topic (str): Name of the topic the event was published to. + """ + if type_ == self._dataset_upsertion_event_type: + await self._handle_upsertion(payload) + elif type_ == self._dataset_deletion_event_type: + await self._handle_deletion(payload) diff --git a/wps/core/repository.py b/wps/core/repository.py index 9057cb0..a508cc3 100644 --- a/wps/core/repository.py +++ b/wps/core/repository.py @@ -239,6 +239,16 @@ async def register_dataset(self, dataset: Dataset) -> None: """Register a dataset with all of its files.""" await self._dataset_dao.upsert(dataset) + async def delete_dataset(self, dataset_id: str) -> None: + """Delete a dataset with all of its files. + + If the dataset does not exist, a DatasetNotFoundError will be raised. + """ + try: + await self._dataset_dao.delete(id_=dataset_id) + except ResourceNotFoundError as error: + raise self.DatasetNotFoundError("Dataset not found") from error + async def get_dataset(self, dataset_id: str) -> Dataset: """Get a registered dataset using the given ID. diff --git a/wps/ports/inbound/repository.py b/wps/ports/inbound/repository.py index 882154c..5247be2 100644 --- a/wps/ports/inbound/repository.py +++ b/wps/ports/inbound/repository.py @@ -89,6 +89,11 @@ async def register_dataset(self, dataset: Dataset) -> None: """Register a dataset with all of its files.""" ... + @abstractmethod + async def delete_dataset(self, dataset_id: str) -> None: + """Delete a dataset with all of its files.""" + ... + @abstractmethod async def get_dataset(self, dataset_id: str) -> Dataset: """Get a registered dataset using the given ID.