From 33f66ed57f5f5afba491cb222772c2830cda20fc Mon Sep 17 00:00:00 2001 From: Byron Himes Date: Mon, 4 Mar 2024 17:29:53 +0100 Subject: [PATCH] Idempotence (GSI-626) (#12) * Update template files and lock files * Add mongodb provider from hexkit * Add mongodb config * Add model for storing hash sums of notification events * Add DAO for notification records * Update Notifier to use new DAO * Update the joint fixture * Update pyproject config for ruff linting * Update existing tests to use notifier in joint fixture Also removes some outdated calls to stop the event loop * Make _controller loop stop call conditional * Update model field directly before update * Add tests for new functionality * Update readme for the config * Make _ensure_not_sent func more intuitive * Don't check email str existence * Remove unnecessary imports in test * Type cast the Notifier instead of using type ignores * Rename _check_if_sent to _has_been_sent --------- Co-authored-by: TheByronHimes --- .devcontainer/.dev_config.yaml | 2 + .github/workflows/ci_workflow_dispatch.yaml | 2 +- .pre-commit-config.yaml | 2 +- .pyproject_generation/pyproject_custom.toml | 6 +- .pyproject_generation/pyproject_template.toml | 24 +- README.md | 20 + config_schema.json | 20 + example_config.yaml | 2 + lock/requirements-dev-template.in | 4 +- lock/requirements-dev.txt | 725 ++++++++++-------- lock/requirements.txt | 524 +++++++------ pyproject.toml | 40 +- src/ns/adapters/outbound/dao.py | 33 + src/ns/config.py | 2 + src/ns/core/models.py | 31 + src/ns/core/notifier.py | 60 +- src/ns/inject.py | 12 +- src/ns/ports/outbound/dao.py | 27 + tests/conftest.py | 32 + tests/fixtures/joint.py | 30 +- tests/fixtures/server.py | 7 +- tests/fixtures/test_config.yaml | 2 + tests/test_basic.py | 202 +++-- 23 files changed, 1151 insertions(+), 658 deletions(-) create mode 100644 src/ns/adapters/outbound/dao.py create mode 100644 src/ns/core/models.py create mode 100644 src/ns/ports/outbound/dao.py create mode 100644 tests/conftest.py diff --git a/.devcontainer/.dev_config.yaml b/.devcontainer/.dev_config.yaml index f41bb23..2c902a4 100644 --- a/.devcontainer/.dev_config.yaml +++ b/.devcontainer/.dev_config.yaml @@ -12,3 +12,5 @@ login_user: "test@test.com" login_password: test from_address: "test@test.com" use_starttls: false +db_connection_str: "mongodb://mongodb:27017" +db_name: "dev_db" diff --git a/.github/workflows/ci_workflow_dispatch.yaml b/.github/workflows/ci_workflow_dispatch.yaml index 1452f7e..eab7d5a 100644 --- a/.github/workflows/ci_workflow_dispatch.yaml +++ b/.github/workflows/ci_workflow_dispatch.yaml @@ -12,7 +12,7 @@ on: jobs: fetch-tag: runs-on: ubuntu-latest - if: ( github.event.action == 'workflow_dispatch' || github.event.action != 'labeled' && contains(github.event.pull_request.labels.*.name, 'build') ) || ( github.event.action == 'labeled' && github.event.label.name == 'build' ) + if: github.event_name == 'workflow_dispatch' || ( github.event.action != 'labeled' && contains(github.event.pull_request.labels.*.name, 'build') ) || ( github.event.action == 'labeled' && github.event.label.name == 'build' ) steps: - id: fetch-tag uses: ghga-de/gh-action-fetch-tag@v1 diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 0d50d4f..49ab325 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -48,7 +48,7 @@ repos: - id: no-commit-to-branch args: [--branch, dev, --branch, int, --branch, main] - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.1.13 + rev: v0.2.1 hooks: - id: ruff args: [--fix, --exit-non-zero-on-fix] diff --git a/.pyproject_generation/pyproject_custom.toml b/.pyproject_generation/pyproject_custom.toml index 5b28f94..bffe26d 100644 --- a/.pyproject_generation/pyproject_custom.toml +++ b/.pyproject_generation/pyproject_custom.toml @@ -3,10 +3,10 @@ name = "ns" version = "1.1.0" description = "The Notification Service (NS) handles notification kafka events." dependencies = [ - "typer>=0.7.0", + "typer>=0.9.0", "ghga-event-schemas>=3.0.0, <4", - "ghga-service-commons>=1.2.0", - "hexkit[akafka]>=1.0.0", + "ghga-service-commons>=2.0.0", + "hexkit[akafka,mongodb]>=2.0.0", ] [project.urls] diff --git a/.pyproject_generation/pyproject_template.toml b/.pyproject_generation/pyproject_template.toml index c6bb5fc..a6201df 100644 --- a/.pyproject_generation/pyproject_template.toml +++ b/.pyproject_generation/pyproject_template.toml @@ -32,6 +32,16 @@ exclude = [ "build", "dist", ] +line-length = 88 +src = ["src", "tests", "examples", "scripts"] +target-version = "py39" + +[tool.ruff.lint] +fixable = [ + "UP", # e.g. List -> list + "I", # sort imports + "D", # pydocstyle +] ignore = [ "E", # pycodestyle errors "W", # pycodestyle warnings - pycodestyle covered by black @@ -49,7 +59,6 @@ ignore = [ "D206", # indent-with-spaces (ignored for formatter) "D300", # triple-single-quotes (ignored for formatter) ] -line-length = 88 select = [ "C90", # McCabe Complexity "F", # pyflakes codes @@ -63,25 +72,18 @@ select = [ "SIM", # flake8-simplify "D", # pydocstyle ] -fixable = [ - "UP", # e.g. List -> list - "I", # sort imports - "D", # pydocstyle -] -src = ["src", "tests", "examples", "scripts"] -target-version = "py39" -[tool.ruff.mccabe] +[tool.ruff.lint.mccabe] max-complexity = 10 -[tool.ruff.per-file-ignores] +[tool.ruff.lint.per-file-ignores] "scripts/*" = ["PL", "S", "SIM", "D"] "tests/*" = ["S", "SIM", "PLR", "B011"] ".devcontainer/*" = ["S", "SIM", "D"] "examples/*" = ["S", "D"] "__init__.py" = ["D"] -[tool.ruff.pydocstyle] +[tool.ruff.lint.pydocstyle] convention = "pep257" [tool.mypy] diff --git a/README.md b/README.md index b32d60d..d302fbd 100644 --- a/README.md +++ b/README.md @@ -64,6 +64,26 @@ ns --help ### Parameters The service requires the following configuration parameters: +- **`db_connection_str`** *(string, format: password)*: MongoDB connection string. Might include credentials. For more information see: https://naiveskill.com/mongodb-connection-string/. + + + Examples: + + ```json + "mongodb://localhost:27017" + ``` + + +- **`db_name`** *(string)*: Name of the database located on the MongoDB server. + + + Examples: + + ```json + "my-database" + ``` + + - **`log_level`** *(string)*: The minimum log level to capture. Must be one of: `["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG", "TRACE"]`. Default: `"INFO"`. - **`service_name`** *(string)*: Default: `"ns"`. diff --git a/config_schema.json b/config_schema.json index cd6604c..66b5356 100644 --- a/config_schema.json +++ b/config_schema.json @@ -2,6 +2,24 @@ "additionalProperties": false, "description": "Modifies the orginal Settings class provided by the user", "properties": { + "db_connection_str": { + "description": "MongoDB connection string. Might include credentials. For more information see: https://naiveskill.com/mongodb-connection-string/", + "examples": [ + "mongodb://localhost:27017" + ], + "format": "password", + "title": "Db Connection Str", + "type": "string", + "writeOnly": true + }, + "db_name": { + "description": "Name of the database located on the MongoDB server.", + "examples": [ + "my-database" + ], + "title": "Db Name", + "type": "string" + }, "log_level": { "default": "INFO", "description": "The minimum log level to capture.", @@ -165,6 +183,8 @@ } }, "required": [ + "db_connection_str", + "db_name", "service_instance_id", "plaintext_email_template", "html_email_template", diff --git a/example_config.yaml b/example_config.yaml index a6ecf6d..966ab5d 100644 --- a/example_config.yaml +++ b/example_config.yaml @@ -1,3 +1,5 @@ +db_connection_str: '**********' +db_name: dev_db from_address: test@test.com generate_correlation_id: true html_email_template: '=0.7.0", + "typer>=0.9.0", "ghga-event-schemas>=3.0.0, <4", - "ghga-service-commons>=1.2.0", - "hexkit[akafka]>=1.0.0", + "ghga-service-commons>=2.0.0", + "hexkit[akafka,mongodb]>=2.0.0", ] [project.license] @@ -53,6 +53,21 @@ exclude = [ "build", "dist", ] +line-length = 88 +src = [ + "src", + "tests", + "examples", + "scripts", +] +target-version = "py39" + +[tool.ruff.lint] +fixable = [ + "UP", + "I", + "D", +] ignore = [ "E", "W", @@ -70,7 +85,6 @@ ignore = [ "D206", "D300", ] -line-length = 88 select = [ "C90", "F", @@ -84,23 +98,11 @@ select = [ "SIM", "D", ] -fixable = [ - "UP", - "I", - "D", -] -src = [ - "src", - "tests", - "examples", - "scripts", -] -target-version = "py39" -[tool.ruff.mccabe] +[tool.ruff.lint.mccabe] max-complexity = 10 -[tool.ruff.per-file-ignores] +[tool.ruff.lint.per-file-ignores] "scripts/*" = [ "PL", "S", @@ -126,7 +128,7 @@ max-complexity = 10 "D", ] -[tool.ruff.pydocstyle] +[tool.ruff.lint.pydocstyle] convention = "pep257" [tool.mypy] diff --git a/src/ns/adapters/outbound/dao.py b/src/ns/adapters/outbound/dao.py new file mode 100644 index 0000000..a48a90a --- /dev/null +++ b/src/ns/adapters/outbound/dao.py @@ -0,0 +1,33 @@ +# Copyright 2021 - 2023 Universität Tübingen, DKFZ, EMBL, and Universität zu Köln +# for the German Human Genome-Phenome Archive (GHGA) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""DAO translator constructor""" + +from hexkit.protocols.dao import DaoFactoryProtocol + +from ns.core import models +from ns.ports.outbound.dao import NotificationRecordDaoPort + + +async def notification_record_dao_factory( + *, dao_factory: DaoFactoryProtocol +) -> NotificationRecordDaoPort: + """Construct a NotificationRecordDaoPort from the provided dao_factory""" + return await dao_factory.get_dao( + name="notification_records", + dto_model=models.NotificationRecord, + id_field="hash_sum", + ) diff --git a/src/ns/config.py b/src/ns/config.py index c941616..5e3162c 100644 --- a/src/ns/config.py +++ b/src/ns/config.py @@ -18,6 +18,7 @@ from hexkit.config import config_from_yaml from hexkit.log import LoggingConfig from hexkit.providers.akafka import KafkaConfig +from hexkit.providers.mongodb.provider import MongoDbConfig from ns.adapters.inbound.akafka import EventSubTranslatorConfig from ns.adapters.outbound.smtp_client import SmtpClientConfig @@ -33,6 +34,7 @@ class Config( SmtpClientConfig, NotifierConfig, LoggingConfig, + MongoDbConfig, ): """Config parameters and their defaults.""" diff --git a/src/ns/core/models.py b/src/ns/core/models.py new file mode 100644 index 0000000..6e50d4f --- /dev/null +++ b/src/ns/core/models.py @@ -0,0 +1,31 @@ +# Copyright 2021 - 2023 Universität Tübingen, DKFZ, EMBL, and Universität zu Köln +# for the German Human Genome-Phenome Archive (GHGA) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Contains models for the notification service.""" + +from pydantic import BaseModel, Field + + +class NotificationRecord(BaseModel): + """Model for tracking which notifications have been sent. + + The hash sum is used to identify the notification event content and the sent flag + indicates if the notification has been sent. + """ + + hash_sum: str = Field(..., description="Hash sum of notification event") + sent: bool = Field( + ..., description="Flag indicating if the notification has been sent" + ) diff --git a/src/ns/core/notifier.py b/src/ns/core/notifier.py index ba22708..6bd78a1 100644 --- a/src/ns/core/notifier.py +++ b/src/ns/core/notifier.py @@ -15,15 +15,19 @@ # """Contains the concrete implementation of a NotifierPort""" import logging +from contextlib import suppress from email.message import EmailMessage from enum import Enum +from hashlib import sha256 from string import Template from ghga_event_schemas import pydantic_ as event_schemas from pydantic import EmailStr, Field from pydantic_settings import BaseSettings +from ns.core import models from ns.ports.inbound.notifier import NotifierPort +from ns.ports.outbound.dao import NotificationRecordDaoPort, ResourceNotFoundError from ns.ports.outbound.smtp_client import SmtpClientPort log = logging.getLogger(__name__) @@ -51,16 +55,64 @@ class NotifierConfig(BaseSettings): class Notifier(NotifierPort): """Implementation of the Notifier Port""" - def __init__(self, *, config: NotifierConfig, smtp_client: SmtpClientPort): + def __init__( + self, + *, + config: NotifierConfig, + smtp_client: SmtpClientPort, + notification_record_dao: NotificationRecordDaoPort, + ): """Initialize the Notifier with configuration and smtp client""" self._config = config self._smtp_client = smtp_client + self._notification_record_dao = notification_record_dao + + def _create_notification_record( + self, *, notification: event_schemas.Notification + ) -> models.NotificationRecord: + """Creates a notification record from a notification event""" + hash_sum = sha256(notification.model_dump_json().encode("utf-8")).hexdigest() + return models.NotificationRecord(hash_sum=hash_sum, sent=False) + + async def _has_been_sent(self, *, hash_sum: str) -> bool: + """Check whether the notification has been sent already. + + Returns: + - `False` if the notification **has not** been sent yet. + - `True` if the notification **has** already been sent. + """ + with suppress(ResourceNotFoundError): + record = await self._notification_record_dao.get_by_id(id_=hash_sum) + return record.sent + return False + + async def _register_new_notification( + self, *, notification_record: models.NotificationRecord + ): + """Registers a new notification in the database""" + await self._notification_record_dao.upsert(dto=notification_record) async def send_notification(self, *, notification: event_schemas.Notification): """Sends notifications based on the channel info provided (e.g. email addresses)""" - if len(notification.recipient_email) > 0: - message = self._construct_email(notification=notification) - self._smtp_client.send_email_message(message) + # Generate sha-256 hash of the notification payload + notification_record = self._create_notification_record( + notification=notification + ) + + # Abort if the notification has been sent already + if await self._has_been_sent(hash_sum=notification_record.hash_sum): + log.info("Notification already sent, skipping.") + return + + # Add the notification to the database (with sent=False) + await self._register_new_notification(notification_record=notification_record) + + message = self._construct_email(notification=notification) + self._smtp_client.send_email_message(message) + + # update the notification record to show that the notification has been sent. + notification_record.sent = True + await self._notification_record_dao.update(dto=notification_record) def _construct_email( self, *, notification: event_schemas.Notification diff --git a/src/ns/inject.py b/src/ns/inject.py index 1fb4227..0f4e713 100644 --- a/src/ns/inject.py +++ b/src/ns/inject.py @@ -22,8 +22,10 @@ from ghga_service_commons.utils.context import asyncnullcontext from hexkit.providers.akafka.provider import KafkaEventSubscriber +from hexkit.providers.mongodb.provider import MongoDbDaoFactory from ns.adapters.inbound.akafka import EventSubTranslator +from ns.adapters.outbound.dao import notification_record_dao_factory from ns.adapters.outbound.smtp_client import SmtpClient from ns.config import Config from ns.core.notifier import Notifier @@ -34,7 +36,15 @@ async def prepare_core(*, config: Config) -> AsyncGenerator[NotifierPort, None]: """Constructs and initializes all core components and their outbound dependencies.""" smtp_client = SmtpClient(config=config) - notifier = Notifier(config=config, smtp_client=smtp_client) + dao_factory = MongoDbDaoFactory(config=config) + notification_record_dao = await notification_record_dao_factory( + dao_factory=dao_factory + ) + notifier = Notifier( + config=config, + smtp_client=smtp_client, + notification_record_dao=notification_record_dao, + ) yield notifier diff --git a/src/ns/ports/outbound/dao.py b/src/ns/ports/outbound/dao.py new file mode 100644 index 0000000..4abc307 --- /dev/null +++ b/src/ns/ports/outbound/dao.py @@ -0,0 +1,27 @@ +# Copyright 2021 - 2023 Universität Tübingen, DKFZ, EMBL, and Universität zu Köln +# for the German Human Genome-Phenome Archive (GHGA) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Defines the NotificationRecordDao, which manages notification records.""" + +from hexkit.protocols.dao import ( # noqa: F401 + DaoNaturalId, + ResourceNotFoundError, +) +from typing_extensions import TypeAlias + +from ns.core import models + +NotificationRecordDaoPort: TypeAlias = DaoNaturalId[models.NotificationRecord] diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..aa0117d --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,32 @@ +# Copyright 2021 - 2023 Universität Tübingen, DKFZ, EMBL, and Universität zu Köln +# for the German Human Genome-Phenome Archive (GHGA) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Defines fixtures for the tests""" +import pytest +from hexkit.providers.akafka.testutils import KafkaFixture, get_kafka_fixture +from hexkit.providers.mongodb.testutils import MongoDbFixture, get_mongodb_fixture + +from tests.fixtures.joint import get_joint_fixture + +kafka_fixture = get_kafka_fixture(scope="session") +mongodb_fixture = get_mongodb_fixture(scope="session") +joint_fixture = get_joint_fixture(scope="session") + + +@pytest.fixture(autouse=True, scope="function") +def reset_state(mongodb_fixture: MongoDbFixture, kafka_fixture: KafkaFixture): + """Reset the state of the system before each test""" + mongodb_fixture.empty_collections() + kafka_fixture.clear_topics() diff --git a/tests/fixtures/joint.py b/tests/fixtures/joint.py index 810c834..7f63bb6 100644 --- a/tests/fixtures/joint.py +++ b/tests/fixtures/joint.py @@ -18,32 +18,39 @@ from dataclasses import dataclass import pytest_asyncio +from hexkit.custom_types import PytestScope from hexkit.providers.akafka import KafkaEventSubscriber -from hexkit.providers.akafka.testutils import KafkaFixture, kafka_fixture # noqa: F401 +from hexkit.providers.akafka.testutils import KafkaFixture +from hexkit.providers.mongodb.testutils import MongoDbFixture from ns.config import Config from ns.inject import prepare_core, prepare_event_subscriber -from tests.fixtures.config import get_config +from ns.ports.inbound.notifier import NotifierPort +from tests.fixtures.config import SMTP_TEST_CONFIG, get_config @dataclass class JointFixture: - """returned by joint_fixture""" + """Returned by joint_fixture_function""" config: Config kafka: KafkaFixture + mongodb: MongoDbFixture event_subscriber: KafkaEventSubscriber + notifier: NotifierPort -@pytest_asyncio.fixture -async def joint_fixture( - kafka_fixture: KafkaFixture, # noqa: F811 +async def joint_fixture_function( + kafka_fixture: KafkaFixture, + mongodb_fixture: MongoDbFixture, ) -> AsyncGenerator[JointFixture, None]: """A fixture that embeds all other fixtures for integration testing""" # merge configs from different sources with the default one: - config = get_config(sources=[kafka_fixture.config]) + config = get_config( + sources=[kafka_fixture.config, mongodb_fixture.config, SMTP_TEST_CONFIG] + ) - # create a DI container instance:translators + # prepare the core and the event subscriber async with prepare_core(config=config) as notifier: async with prepare_event_subscriber( config=config, notifier_override=notifier @@ -51,5 +58,12 @@ async def joint_fixture( yield JointFixture( config=config, kafka=kafka_fixture, + mongodb=mongodb_fixture, event_subscriber=event_subscriber, + notifier=notifier, ) + + +def get_joint_fixture(scope: PytestScope = "function"): + """Produce a joint fixture with desired scope""" + return pytest_asyncio.fixture(joint_fixture_function, scope=scope) diff --git a/tests/fixtures/server.py b/tests/fixtures/server.py index 1a6d58f..90d75b3 100644 --- a/tests/fixtures/server.py +++ b/tests/fixtures/server.py @@ -92,9 +92,10 @@ async def __aenter__(self): try: self._controller.start() - except RuntimeError as err: - self._controller.stop() - raise RuntimeError(err.args[0]) from err + except RuntimeError: + if self._controller.loop.is_running(): + self._controller.stop() + raise async def __aexit__(self, *args): """Async context manager exit method""" diff --git a/tests/fixtures/test_config.yaml b/tests/fixtures/test_config.yaml index bf34359..9084c70 100644 --- a/tests/fixtures/test_config.yaml +++ b/tests/fixtures/test_config.yaml @@ -26,3 +26,5 @@ login_user: "test@test.com" login_password: test use_starttls: false from_address: "test@test.com" +db_connection_str: "mongodb://mongodb:27017" +db_name: "dev_db" diff --git a/tests/test_basic.py b/tests/test_basic.py index 0e8c038..e857dfb 100644 --- a/tests/test_basic.py +++ b/tests/test_basic.py @@ -14,15 +14,15 @@ # limitations under the License. """Test basic event consumption""" -import asyncio +from hashlib import sha256 +from typing import cast import pytest -from hexkit.providers.akafka.testutils import kafka_fixture # noqa: F401 from ns.adapters.outbound.smtp_client import SmtpClient +from ns.core.models import NotificationRecord from ns.core.notifier import Notifier -from tests.fixtures.config import SMTP_TEST_CONFIG, get_config -from tests.fixtures.joint import JointFixture, joint_fixture # noqa: F401 +from tests.fixtures.joint import JointFixture from tests.fixtures.server import DummyServer from tests.fixtures.utils import make_notification @@ -40,78 +40,82 @@ "notification_details", [sample_notification], ) -def test_email_construction(notification_details): +@pytest.mark.asyncio(scope="session") +async def test_email_construction( + joint_fixture: JointFixture, + notification_details, +): """Verify that the email is getting constructed properly from the template.""" - config = get_config([SMTP_TEST_CONFIG]) + # Cast notifier type + joint_fixture.notifier = cast(Notifier, joint_fixture.notifier) + notification = make_notification(notification_details) - smtp_client = SmtpClient(config=config) - notifier = Notifier(config=config, smtp_client=smtp_client) - msg = notifier._construct_email(notification=notification) + + msg = joint_fixture.notifier._construct_email(notification=notification) assert msg is not None plaintext_body = msg.get_body(preferencelist="plain") assert plaintext_body is not None - plaintext_content = plaintext_body.get_content() # type: ignore[attr-defined] - expected_plaintext = "Dear Yolanda Martinez,\n\nWhere are you, where are you, Yolanda?\n\nWarm regards,\n\nThe GHGA Team" + plaintext_content = plaintext_body.get_content() # type: ignore + expected_plaintext = ( + "Dear Yolanda Martinez,\n\nWhere are you, where are you, Yolanda?\n" + + "\nWarm regards,\n\nThe GHGA Team" + ) assert plaintext_content.strip() == expected_plaintext html_body = msg.get_body(preferencelist="html") assert html_body is not None - html_content = html_body.get_content() # type: ignore[attr-defined] + html_content = html_body.get_content() # type: ignore assert html_content is not None - expected_html = '

Dear Yolanda Martinez,

Where are you, where are you, Yolanda?

Warm regards,

The GHGA Team

' + expected_html = ( + '

Dear Yolanda Martinez,

Where are you,' + + " where are you, Yolanda?

Warm regards,

The GHGA Team

" + + "" + ) assert html_content.strip() == expected_html -@pytest.mark.parametrize( - "notification_details", - [sample_notification], -) -@pytest.mark.asyncio -async def test_transmission(notification_details): - """Test that the email that the test server gets is what we expect""" - config = get_config([SMTP_TEST_CONFIG]) - notification = make_notification(notification_details) - - smtp_client = SmtpClient(config=config) - server = DummyServer(config=config) - - notifier = Notifier(config=config, smtp_client=smtp_client) - - # send the notification so it gets intercepted by the dummy client - expected_email = notifier._construct_email(notification=notification) - - # tell the smtp client to send the message and compare that with what is received - async with server.expect_email(expected_email=expected_email): - await notifier.send_notification(notification=notification) - asyncio.get_running_loop().stop() +@pytest.mark.asyncio(scope="session") +async def test_failed_authentication(joint_fixture: JointFixture): + """Change login credentials so authentication fails.""" + # Cast notifier type + joint_fixture.notifier = cast(Notifier, joint_fixture.notifier) + server = DummyServer(config=joint_fixture.config) -@pytest.mark.asyncio -async def test_failed_authentication(): - """Change login credentials so authentication fails.""" - config = get_config([SMTP_TEST_CONFIG]) - server = DummyServer(config=config) + # change the login credentials so that the authentication fails server.login = "bob@bobswebsite.com" server.password = "notCorrect" notification = make_notification(sample_notification) - smtp_client = SmtpClient(config=config) - notifier = Notifier(config=config, smtp_client=smtp_client) - expected_email = notifier._construct_email(notification=notification) + + expected_email = joint_fixture.notifier._construct_email( + notification=notification, + ) # send the notification so it gets intercepted by the dummy client with pytest.raises(SmtpClient.FailedLoginError): async with server.expect_email(expected_email=expected_email): - await notifier.send_notification(notification=notification) - asyncio.get_running_loop().stop() + await joint_fixture.notifier.send_notification(notification=notification) + + # verify that the email is in the database but not marked as sent + expected_record = joint_fixture.notifier._create_notification_record( + notification=notification + ) + record_in_db = await joint_fixture.notifier._notification_record_dao.get_by_id( + id_=expected_record.hash_sum + ) + + assert not record_in_db.sent -@pytest.mark.asyncio -async def test_consume_thru_send(joint_fixture: JointFixture): # noqa: F811 + +@pytest.mark.asyncio(scope="session") +async def test_consume_thru_send(joint_fixture: JointFixture): """Verify that the event is correctly translated into a basic email object""" await joint_fixture.kafka.publish_event( payload={ @@ -131,3 +135,107 @@ async def test_consume_thru_send(joint_fixture: JointFixture): # noqa: F811 # means that the consumer successfully passed the event through the notifier # and on to the client for emailing. await joint_fixture.event_subscriber.run(forever=False) + + +@pytest.mark.asyncio(scope="session") +async def test_helper_functions(joint_fixture: JointFixture): + """Unit test for the _has_been_sent function, _create_notification_record, + and _register_new_notification function. + """ + # Cast notifier type + joint_fixture.notifier = cast(Notifier, joint_fixture.notifier) + + # first, create a notification + notification = make_notification(sample_notification) + + # manually create a NotificationRecord for the notification + expected_record = NotificationRecord( + hash_sum=sha256(notification.model_dump_json().encode("utf-8")).hexdigest(), + sent=False, + ) + + # Now create the record using the notifier's function and compare + actual_record = joint_fixture.notifier._create_notification_record( + notification=notification + ) + + assert actual_record.model_dump() == expected_record.model_dump() + + # Now check the _has_been_sent function before the record has been inserted + assert not await joint_fixture.notifier._has_been_sent( + hash_sum=actual_record.hash_sum + ) + + # register the notification + await joint_fixture.notifier._register_new_notification( + notification_record=actual_record + ) + + # Verify the record is in the database + record_in_db = await joint_fixture.notifier._notification_record_dao.get_by_id( + id_=actual_record.hash_sum + ) + + # Extra sanity check to make sure they're the same + assert record_in_db.model_dump() == actual_record.model_dump() + + # Record still has not been sent, but now it's in the database. Do another check + assert not await joint_fixture.notifier._has_been_sent( + hash_sum=actual_record.hash_sum + ) + + # Now mark the record as sent + actual_record.sent = True + await joint_fixture.notifier._notification_record_dao.update(dto=actual_record) + + # Now the record has been marked as sent, so _has_been_sent should return False + assert await joint_fixture.notifier._has_been_sent(hash_sum=actual_record.hash_sum) + + +@pytest.mark.asyncio(scope="session") +async def test_idempotence_and_transmission(joint_fixture: JointFixture): + """Consume identical events and verify that only one email is sent.""" + # Cast notifier type + joint_fixture.notifier = cast(Notifier, joint_fixture.notifier) + + notification_event = make_notification(sample_notification) + await joint_fixture.kafka.publish_event( + payload=notification_event.model_dump(), + type_=joint_fixture.config.notification_event_type, + topic=joint_fixture.config.notification_event_topic, + ) + + # generate the hash sum for the notification + record = joint_fixture.notifier._create_notification_record( + notification=notification_event + ) + + # the record hasn't been sent, so this should return False + assert not await joint_fixture.notifier._has_been_sent(hash_sum=record.hash_sum) + + server = DummyServer(config=joint_fixture.config) + expected_email = joint_fixture.notifier._construct_email( + notification=notification_event, + ) + + # Intercept the email with a dummy server and check content upon receipt + async with server.expect_email(expected_email=expected_email): + # consume the event to send the notification email to the dummy server + await joint_fixture.event_subscriber.run(forever=False) + + # Verify that the notification has now been marked as sent + assert await joint_fixture.notifier._has_been_sent(hash_sum=record.hash_sum) + + # Now publish the same event again + await joint_fixture.kafka.publish_event( + payload=notification_event.model_dump(), + type_=joint_fixture.config.notification_event_type, + topic=joint_fixture.config.notification_event_topic, + ) + + # Consume the event, which should NOT send anything and create no ConnectionError + # For clarification for anyone visiting this after a while, the ConnectionError + # would be raised if it tried to connect to an actual server because the config + # is bogus. If no ConnectionError is raised, that means the Notifier didn't try + # to send an email. + await joint_fixture.event_subscriber.run(forever=False)