diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md index f5743e0d..9f35227b 100644 --- a/CONTRIBUTORS.md +++ b/CONTRIBUTORS.md @@ -13,6 +13,7 @@ Thanks to the contributors who helped on this project apart from the authors * [Jagadapi Sivanaga Krishnam Raja Reddy](www.linkedin.com/in/jskrajareddy/) * [Vigneshwarr Venkatesan](https://www.linkedin.com/in/vignesh15) * [Nishant Singh](https://www.linkedin.com/in/singh-nishant/) +* [Amaldev Kunnel](https://www.linkedin.com/in/amaldev-k-40222680) # Honorary Mentions Thanks to the team below for invaluable insights and support throughout the initial release of this project diff --git a/docs/api/zoom_plugin.md b/docs/api/zoom_plugin.md new file mode 100644 index 00000000..e03a6385 --- /dev/null +++ b/docs/api/zoom_plugin.md @@ -0,0 +1,11 @@ +--- +search: + exclude: true +--- + +::: spark_expectations.notifications.plugins.zoom + handler: python + options: + filters: + - "!^_[^_]" + - "!^__[^__]" \ No newline at end of file diff --git a/docs/se_diagrams/features.png b/docs/se_diagrams/features.png index b8bae348..53245a68 100644 Binary files a/docs/se_diagrams/features.png and b/docs/se_diagrams/features.png differ diff --git a/docs/se_diagrams/spark_expectations_flow_and_feature.pptx b/docs/se_diagrams/spark_expectations_flow_and_feature.pptx index e3a433f5..ba59b6d0 100644 Binary files a/docs/se_diagrams/spark_expectations_flow_and_feature.pptx and b/docs/se_diagrams/spark_expectations_flow_and_feature.pptx differ diff --git a/spark_expectations/config/user_config.py b/spark_expectations/config/user_config.py index feb53229..701e9d80 100644 --- a/spark_expectations/config/user_config.py +++ b/spark_expectations/config/user_config.py @@ -29,6 +29,13 @@ class Constants: "spark.expectations.notifications.teams.webhook_url" ) + # declare const user config variables for zoom notification + se_notifications_enable_zoom = "spark.expectations.notifications.zoom.enabled" + se_notifications_zoom_webhook_url = ( + "spark.expectations.notifications.zoom.webhook_url" + ) + se_notifications_zoom_token = "spark.expectations.notifications.zoom.token" + se_notifications_on_start = "spark.expectations.notifications.on_start" se_notifications_on_completion = "spark.expectations.notifications.on.completion" se_notifications_on_fail = "spark.expectations.notifications.on.fail" diff --git a/spark_expectations/core/context.py b/spark_expectations/core/context.py index 7ee6d159..78f1fb34 100644 --- a/spark_expectations/core/context.py +++ b/spark_expectations/core/context.py @@ -59,6 +59,10 @@ def __post_init__(self) -> None: self._enable_teams: bool = False self._teams_webhook_url: Optional[str] = None + self._enable_zoom: bool = False + self._zoom_webhook_url: Optional[str] = None + self._zoom_token: Optional[str] = None + self._table_name: Optional[str] = None self._input_count: int = 0 self._error_count: int = 0 @@ -622,6 +626,74 @@ def get_teams_webhook_url(self) -> str: accessing it""" ) + # def set_enable_zoom(self, enable_zoom: bool, zoom_token: str) -> None: + def set_enable_zoom(self, enable_zoom: bool) -> None: + """ + Set whether to enable Zoom notification and its token. + + Args: + enable_zoom (bool): Whether to enable Zoom notification or not. + """ + self._enable_zoom = enable_zoom + + @property + def get_enable_zoom(self) -> bool: + """ + Get whether Zoom notification is enabled. + + Returns: + bool: Whether Zoom notification is enabled or not. + """ + return self._enable_zoom + + def set_zoom_webhook_url(self, zoom_webhook_url: str) -> None: + """ + Set the Zoom webhook URL. + + Args: + zoom_webhook_url (str): The webhook URL for Zoom notification. + """ + self._zoom_webhook_url = zoom_webhook_url + + @property + def get_zoom_webhook_url(self) -> str: + """ + Get the Zoom webhook URL. + + Returns: + str: The Zoom webhook URL. + """ + if self._zoom_webhook_url: + return self._zoom_webhook_url + raise SparkExpectationsMiscException( + """The spark expectations context is not set completely, please assign '_zoom_webhook_url' before + accessing it""" + ) + + def set_zoom_token(self, zoom_token: str) -> None: + """ + Set the Zoom webhook token. + + Args: + zoom_token (str): The token for Zoom notification. + """ + self._zoom_token = zoom_token + + @property + def get_zoom_token(self) -> str: + """ + Get the Zoom token. + + Returns: + str: The Zoom token. + """ + if self._zoom_token: + return self._zoom_token + raise SparkExpectationsMiscException( + """The spark expectations context is not set completely, please assign '_zoom_token' before + accessing it""" + ) + def set_table_name(self, table_name: str) -> None: self._table_name = table_name diff --git a/spark_expectations/core/exceptions.py b/spark_expectations/core/exceptions.py index 1edb6c36..df709c51 100644 --- a/spark_expectations/core/exceptions.py +++ b/spark_expectations/core/exceptions.py @@ -57,6 +57,14 @@ class SparkExpectationsTeamsNotificationException(Exception): pass +class SparkExpectationsZoomNotificationException(Exception): + """ + Throw this exception when spark expectations encounters exceptions while sending Zoom notifications + """ + + pass + + class SparkExpectationsEmailException(Exception): """ Throw this exception when spark expectations encounters exceptions while sending email notifications diff --git a/spark_expectations/notifications/__init__.py b/spark_expectations/notifications/__init__.py index 65a7cf4b..90a62661 100644 --- a/spark_expectations/notifications/__init__.py +++ b/spark_expectations/notifications/__init__.py @@ -15,12 +15,15 @@ from spark_expectations.notifications.plugins.teams import ( SparkExpectationsTeamsPluginImpl, ) +from spark_expectations.notifications.plugins.zoom import ( + SparkExpectationsZoomPluginImpl, # Import Zoom plugin +) @functools.lru_cache def get_notifications_hook() -> pluggy.PluginManager: """ - function provides pluggy hook manger to send email and slack notification + function provides pluggy hook manger to send email, slack and zoom notification Returns: PluginManager: pluggy Manager object @@ -36,6 +39,10 @@ def get_notifications_hook() -> pluggy.PluginManager: pm.register( SparkExpectationsTeamsPluginImpl(), "spark_expectations_teams_notification" ) + pm.register( + SparkExpectationsZoomPluginImpl(), + "spark_expectations_zoom_notification", # Register Zoom plugin + ) for name, plugin_instance in pm.list_name_plugin(): _log.info( "Loaded plugin with name: %s and class: %s", diff --git a/spark_expectations/notifications/plugins/zoom.py b/spark_expectations/notifications/plugins/zoom.py new file mode 100644 index 00000000..30a21ee6 --- /dev/null +++ b/spark_expectations/notifications/plugins/zoom.py @@ -0,0 +1,68 @@ +from typing import Dict, Union +import requests +from spark_expectations import _log +from spark_expectations.notifications.plugins.base_notification import ( + SparkExpectationsNotification, + spark_expectations_notification_impl, +) +from spark_expectations.core.exceptions import ( + SparkExpectationsZoomNotificationException, +) +from spark_expectations.core.context import SparkExpectationsContext + + +class SparkExpectationsZoomPluginImpl(SparkExpectationsNotification): + """ + This class implements/supports functionality to send Zoom notification + """ + + @spark_expectations_notification_impl + def send_notification( + self, + _context: SparkExpectationsContext, + _config_args: Dict[str, Union[str, bool]], + ) -> None: + """ + function to send the Zoom notification + Args: + _context: SparkExpectationsContext class object + _config_args: dict + + Returns: None + + """ + try: + if _context.get_enable_zoom is True: + message = _config_args.get("message") + + # Format Message for Zoom + if isinstance(message, str): + message = message.replace("\n", "\n\n").replace(" ", "") + + payload = { + "title": "SE Notification", + "themeColor": "008000", + "text": message, + } + headers = { + "Authorization": f"Bearer {_context.get_zoom_token}", # Use get_zoom_token to retrieve token. + "Content-Type": "application/json", + } + response = requests.post( + _context.get_zoom_webhook_url, + json=payload, + headers=headers, + timeout=10, + ) + + # Check the response for success or failure + if response.status_code == 200: + _log.info("Message posted successfully!") + else: + _log.info("Failed to post message") + raise SparkExpectationsZoomNotificationException( + "error occurred while sending Zoom notification from spark expectations project" + ) + + except Exception as e: + raise SparkExpectationsZoomNotificationException(e) diff --git a/spark_expectations/utils/reader.py b/spark_expectations/utils/reader.py index 471e9197..38519cde 100644 --- a/spark_expectations/utils/reader.py +++ b/spark_expectations/utils/reader.py @@ -44,6 +44,9 @@ def set_notification_param( user_config.se_notifications_slack_webhook_url: "", user_config.se_notifications_enable_teams: False, user_config.se_notifications_teams_webhook_url: "", + user_config.se_notifications_enable_zoom: False, + user_config.se_notifications_zoom_webhook_url: "", + user_config.se_notifications_zoom_token: "", } _notification_dict: Dict[str, Union[str, int, bool]] = ( @@ -132,6 +135,30 @@ def set_notification_param( "All params/variables required for slack notification is not configured or supplied" ) + if _notification_dict[user_config.se_notifications_enable_zoom] is True: + if _notification_dict[ + user_config.se_notifications_zoom_webhook_url + ]: + self._context.set_enable_zoom(True) + self._context.set_zoom_webhook_url( + str( + _notification_dict[ + user_config.se_notifications_zoom_webhook_url + ] + ) + ) + self._context.set_zoom_token( + str( + _notification_dict[ + user_config.se_notifications_zoom_token + ] + ) + ) + else: + raise SparkExpectationsMiscException( + "All params/variables required for zoom notification is not configured or supplied" + ) + except Exception as e: raise SparkExpectationsMiscException( f"error occurred while reading notification configurations {e}" diff --git a/tests/config/test_user_config.py b/tests/config/test_user_config.py index c9d3536b..07de013c 100644 --- a/tests/config/test_user_config.py +++ b/tests/config/test_user_config.py @@ -15,6 +15,12 @@ def test_constants(): assert user_config.se_notifications_slack_webhook_url == "spark.expectations.notifications.slack.webhook_url" + assert user_config.se_notifications_enable_zoom == "spark.expectations.notifications.zoom.enabled" + + assert user_config.se_notifications_zoom_webhook_url == "spark.expectations.notifications.zoom.webhook_url" + + assert user_config.se_notifications_zoom_token == "spark.expectations.notifications.zoom.token" + assert user_config.se_notifications_on_start == "spark.expectations.notifications.on_start" assert user_config.se_notifications_on_completion == "spark.expectations.notifications.on.completion" diff --git a/tests/core/test_context.py b/tests/core/test_context.py index f85e91f6..2c4ba15f 100644 --- a/tests/core/test_context.py +++ b/tests/core/test_context.py @@ -59,6 +59,9 @@ def test_context_properties(): context._slack_webhook_url = "abcedfghi" context._enable_teams = True context._teams_webhook_url = "abcedfghi" + context._enable_zoom = True + context._zoom_webhook_url = "abcedfghi" + context._zoom_token = "abcedfghi" context._table_name = "test_table" context._input_count = 100 context._error_count = 10 @@ -173,6 +176,9 @@ def test_context_properties(): assert context._slack_webhook_url == "abcedfghi" assert context._enable_teams is True assert context._teams_webhook_url == "abcedfghi" + assert context._enable_zoom is True + assert context._zoom_webhook_url == "abcedfghi" + assert context._zoom_token == "abcedfghi" assert context._table_name == "test_table" assert context._input_count == 100 assert context._error_count == 10 @@ -535,6 +541,27 @@ def test_set_teams_webhook_url(): assert context.get_teams_webhook_url == "abcdefghi" +def test_set_enable_zoom(): + context = SparkExpectationsContext(product_id="product1", spark=spark) + context.set_enable_zoom(True) + assert context._enable_zoom is True + assert context.get_enable_zoom is True + + +def test_set_zoom_webhook_url(): + context = SparkExpectationsContext(product_id="product1", spark=spark) + context.set_zoom_webhook_url("abcdefghi") + assert context._zoom_webhook_url == "abcdefghi" + assert context.get_zoom_webhook_url == "abcdefghi" + + +def test_set_zoom_token(): + context = SparkExpectationsContext(product_id="product1", spark=spark) + context.set_zoom_token("abcdefghi") + assert context._zoom_token == "abcdefghi" + assert context.get_zoom_token == "abcdefghi" + + def test_table_name(): context = SparkExpectationsContext(product_id="product1", spark=spark) context.set_table_name("test_table") @@ -709,6 +736,28 @@ def test_get_teams_webhook_url_exception(): context.get_teams_webhook_url +def test_get_zoom_webhook_url_exception(): + context = SparkExpectationsContext(product_id="product1", spark=spark) + context._zoom_webhook_url = False + with pytest.raises( + SparkExpectationsMiscException, + match="The spark expectations context is not set completely, please assign " + "'_zoom_webhook_url' before \n accessing it", + ): + context.get_zoom_webhook_url + + +def test_get_zoom_token(): + context = SparkExpectationsContext(product_id="product1", spark=spark) + context._zoom_token = False + with pytest.raises( + SparkExpectationsMiscException, + match="The spark expectations context is not set completely, please assign " + "'_zoom_token' before \n accessing it", + ): + context.get_zoom_token + + def test_get_table_name_expection(): context = SparkExpectationsContext(product_id="product1", spark=spark) context._table_name = "" diff --git a/tests/notification/plugins/test_zoom.py b/tests/notification/plugins/test_zoom.py new file mode 100644 index 00000000..a0d1584f --- /dev/null +++ b/tests/notification/plugins/test_zoom.py @@ -0,0 +1,69 @@ +from unittest.mock import patch, Mock +import pytest +import requests +from spark_expectations.core.exceptions import SparkExpectationsZoomNotificationException +from spark_expectations.notifications.plugins.zoom import SparkExpectationsZoomPluginImpl + + +@patch('spark_expectations.notifications.plugins.zoom.SparkExpectationsContext', autospec=True, spec_set=True) +def test_send_notification_success(_mock_context): + # Arrange + zoom_handler = SparkExpectationsZoomPluginImpl() + _mock_context.get_enable_zoom = True + _mock_context.get_zoom_webhook_url = "http://test_webhook_url" + _mock_context.get_zoom_token = "abcdefghi" + + _config_args = { + "title": "SE Notification", + "themeColor": "008000", "message": "test message"} + + # Mock requests.post to return a response with status code 200 + with patch.object(requests, "post") as mock_post: + mock_response = Mock() + mock_response.status_code = 200 + mock_post.return_value = mock_response + + # Act + zoom_handler.send_notification(_context=_mock_context, _config_args=_config_args) + + # Assert + mock_post.assert_called_once_with(_mock_context.get_zoom_webhook_url, json={ + "title": "SE Notification", + "themeColor": "008000", "text": "test message"}, headers={"Authorization": "Bearer abcdefghi", "Content-Type": "application/json"}, timeout=10) + + + +@patch('spark_expectations.notifications.plugins.zoom.SparkExpectationsContext', autospec=True, spec_set=True) +def test_send_notification_exception(_mock_context): + # Arrange + zoom_handler = SparkExpectationsZoomPluginImpl() + _mock_context.get_enable_zoom = True + _mock_context.get_zoom_webhook_url = "http://test_webhook_url" + _mock_context.get_zoom_token = "abcdefghi" + _config_args = {"message": "test message"} + + # Mock requests.post to return a response with status code 404 + with patch.object(requests, "post") as mock_post: + mock_response = Mock() + mock_response.status_code = 404 + mock_post.return_value = mock_response + + # Act and Assert + with pytest.raises(SparkExpectationsZoomNotificationException): + zoom_handler.send_notification(_context=_mock_context, _config_args=_config_args) + + +@patch('spark_expectations.notifications.plugins.zoom.SparkExpectationsContext', autospec=True, spec_set=True) +def test_send_notification_zoom_disabled(_mock_context): + # Arrange + zoom_handler = SparkExpectationsZoomPluginImpl() + _mock_context.get_enable_zoom = False + _mock_context.get_zoom_webhook_url = "http://test_webhook_url" + _mock_context.get_zoom_token = "abcdefghi" + _config_args = {"message": "test message"} + + with patch.object(requests, "post") as mock_post: + # Act + zoom_handler.send_notification(_context=_mock_context, _config_args=_config_args) + + mock_post.post.assert_not_called() diff --git a/tests/notification/test__init__.py b/tests/notification/test__init__.py index 53197bfa..e29f6676 100644 --- a/tests/notification/test__init__.py +++ b/tests/notification/test__init__.py @@ -9,6 +9,9 @@ from spark_expectations.notifications.plugins.teams import ( SparkExpectationsTeamsPluginImpl, ) +from spark_expectations.notifications.plugins.zoom import ( + SparkExpectationsZoomPluginImpl, +) def test_notifications_hook(): @@ -19,13 +22,16 @@ def test_notifications_hook(): email_plugin = pm.get_plugin("spark_expectations_email_notification") slack_plugin = pm.get_plugin("spark_expectations_slack_notification") teams_plugin = pm.get_plugin("spark_expectations_teams_notification") + zoom_plugin = pm.get_plugin("spark_expectations_zoom_notification") # Check that the correct number of plugins have been registered - assert len(pm.list_name_plugin()) == 3 + assert len(pm.list_name_plugin()) == 4 # assert assert isinstance(pm, pluggy.PluginManager) assert email_plugin is not None assert slack_plugin is not None assert teams_plugin is not None + assert zoom_plugin is not None assert isinstance(email_plugin, SparkExpectationsEmailPluginImpl) assert isinstance(slack_plugin, SparkExpectationsSlackPluginImpl) assert isinstance(teams_plugin, SparkExpectationsTeamsPluginImpl) + assert isinstance(zoom_plugin, SparkExpectationsZoomPluginImpl)