From 9c956c5ad725ab9a546253046e6f81111da4924a Mon Sep 17 00:00:00 2001 From: fdelgadodyvenia Date: Mon, 2 Sep 2024 09:28:08 +0100 Subject: [PATCH 01/10] supermetrics refactered and tested --- .../orchestration/prefect/flows/__init__.py | 5 +- .../prefect/flows/supermetrics_to_adls.py | 88 ++++++ .../orchestration/prefect/tasks/__init__.py | 3 +- .../prefect/tasks/supermetrics.py | 61 +++++ src/viadot/sources/__init__.py | 3 + src/viadot/sources/supermetrics.py | 258 ++++++++++++++++++ .../prefect/flows/test_supermetrics.py | 34 +++ tests/unit/test_supermetrics.py | 172 ++++++++++++ 8 files changed, 621 insertions(+), 3 deletions(-) create mode 100644 src/viadot/orchestration/prefect/flows/supermetrics_to_adls.py create mode 100644 src/viadot/orchestration/prefect/tasks/supermetrics.py create mode 100644 src/viadot/sources/supermetrics.py create mode 100644 tests/integration/orchestration/prefect/flows/test_supermetrics.py create mode 100644 tests/unit/test_supermetrics.py diff --git a/src/viadot/orchestration/prefect/flows/__init__.py b/src/viadot/orchestration/prefect/flows/__init__.py index 0b314c819..6ff706f1f 100644 --- a/src/viadot/orchestration/prefect/flows/__init__.py +++ b/src/viadot/orchestration/prefect/flows/__init__.py @@ -18,12 +18,13 @@ from .sharepoint_to_redshift_spectrum import sharepoint_to_redshift_spectrum from .sharepoint_to_s3 import sharepoint_to_s3 from .sql_server_to_minio import sql_server_to_minio -from .sql_server_to_parquet import sql_server_to_parquet from .transform import transform from .transform_and_catalog import transform_and_catalog - +from .sql_server_to_parquet import sql_server_to_parquet +from .supermetrics_to_adls import supermetrics_to_adls __all__ = [ + "supermetrics_to_adls", "cloud_for_customers_to_adls", "cloud_for_customers_to_databricks", "duckdb_to_parquet", diff --git a/src/viadot/orchestration/prefect/flows/supermetrics_to_adls.py b/src/viadot/orchestration/prefect/flows/supermetrics_to_adls.py new file mode 100644 index 000000000..73178f3f1 --- /dev/null +++ b/src/viadot/orchestration/prefect/flows/supermetrics_to_adls.py @@ -0,0 +1,88 @@ +from typing import Any + +from prefect import flow +from prefect.task_runners import ConcurrentTaskRunner + +from viadot.orchestration.prefect.tasks import ( + supermetrics_to_df, + df_to_adls, +) + + +@flow( + name="Supermetrics extraction to ADLS", + description="Extract data from Supermetrics and load it into Azure Data Lake Storage.", + retries=1, + retry_delay_seconds=60, + task_runner=ConcurrentTaskRunner, +) +def supermetrics_to_adls( # noqa: PLR0913 + # supermetrics + query_params: dict[str, Any] | None = None, + # ADLS + adls_path: str | None = None, + overwrite: bool = False, + # Auth + supermetrics_credentials_secret: str | None = None, + supermetrics_config_key: str | None = None, + adls_credentials_secret: str | None = None, + adls_config_key: str | None = None, + **kwargs: dict[str, Any] | None, +): + """ + Extract data from the Supermetrics API and save it to Azure Data Lake Storage (ADLS). + + This function queries data from the Supermetrics API using the provided query parameters + and saves the resulting DataFrame to Azure Data Lake Storage (ADLS) as a file. + + Args: + query_params (dict[str, Any], optional): + A dictionary of query parameters for the Supermetrics API. These parameters + specify the data to retrieve from Supermetrics. If not provided, the default + parameters from the Supermetrics configuration will be used. + + adls_path (str, optional): + The destination path in ADLS where the DataFrame will be saved. This should + include the file name and extension (e.g., 'myfolder/myfile.csv'). If not provided, + the function will use a default path from the configuration or raise an error. + + overwrite (bool, optional): + A flag indicating whether to overwrite the existing file in ADLS. If set to False + and the file exists, an error will be raised. Default is False. + + supermetrics_credentials_secret (str, optional): + The name of the secret in the secret management system containing the Supermetrics API credentials. + If not provided, the function will use credentials specified in the configuration. + + supermetrics_config_key (str, optional): + The key in the viadot configuration holding relevant credentials. Defaults to None. + + adls_credentials_secret (str, optional): + The name of the secret in the secret management system containing the ADLS credentials. + If not provided, the function will use credentials specified in the configuration. + + adls_config_key (str, optional): + The key in the viadot configuration holding relevant credentials. Defaults to None. + + **kwargs (dict[str, Any], optional): + Additional keyword arguments to pass to the `supermetrics_to_df` function for further customization + of the Supermetrics query. + + Raises: + ValueError: + If `adls_path` is not provided and cannot be determined from the configuration. + """ + df = supermetrics_to_df( + query_params = query_params, + credentials_secret=supermetrics_credentials_secret, + config_key=supermetrics_config_key, + **kwargs, + ) + + return df_to_adls( + df=df, + path=adls_path, + credentials_secret=adls_credentials_secret, + config_key=adls_config_key, + overwrite=overwrite, + ) diff --git a/src/viadot/orchestration/prefect/tasks/__init__.py b/src/viadot/orchestration/prefect/tasks/__init__.py index cbe8f7276..68c6f4aed 100644 --- a/src/viadot/orchestration/prefect/tasks/__init__.py +++ b/src/viadot/orchestration/prefect/tasks/__init__.py @@ -22,7 +22,7 @@ sharepoint_to_df, ) from .sql_server import create_sql_server_table, sql_server_query, sql_server_to_df - +from .supermetrics import supermetrics_to_df __all__ = [ "adls_upload", @@ -48,4 +48,5 @@ "create_sql_server_table", "sql_server_query", "sql_server_to_df", + "supermetrics_to_df", ] diff --git a/src/viadot/orchestration/prefect/tasks/supermetrics.py b/src/viadot/orchestration/prefect/tasks/supermetrics.py new file mode 100644 index 000000000..e79743bfd --- /dev/null +++ b/src/viadot/orchestration/prefect/tasks/supermetrics.py @@ -0,0 +1,61 @@ +from typing import Any + +import pandas as pd +from prefect import task + +from viadot.orchestration.prefect.exceptions import MissingSourceCredentialsError +from viadot.orchestration.prefect.utils import get_credentials +from viadot.sources import Supermetrics + + +@task(retries=3, log_prints=True, retry_delay_seconds=10, timeout_seconds=60 * 60) +def supermetrics_to_df( + query_params: dict, + config_key: str | None = None, + credentials_secret: str | None = None, +) -> pd.DataFrame: + """ + Retrieves data from Supermetrics and returns it as a pandas DataFrame. + + This function queries the Supermetrics API using the provided query parameters and + returns the data as a pandas DataFrame. The function supports both configuration-based + and secret-based credentials. + + The function is decorated with a Prefect task, allowing it to handle retries, logging, + and timeout behavior. + + Args: + query_params (dict): + A dictionary containing the parameters for querying the Supermetrics API. + These parameters define what data to retrieve and how the query should be constructed. + config_key (str, optional): The key in the viadot config holding relevant + credentials. Defaults to None. + credentials_secret (str, optional): + The name of the secret in your secret management system that contains the Supermetrics API + credentials. If `config_key` is not provided, this secret is used to authenticate with + the Supermetrics API. + + Returns: + pd.DataFrame: + A pandas DataFrame containing the data retrieved from Supermetrics based on the provided + query parameters. + + Raises: + MissingSourceCredentialsError: + Raised if neither `credentials_secret` nor `config_key` is provided, indicating that no + valid credentials were supplied to access the Supermetrics API. + """ + if not (credentials_secret or config_key): + raise MissingSourceCredentialsError + + if not config_key: + credentials = get_credentials(credentials_secret) + else: + credentials = None + + supermetrics = Supermetrics( + credentials=credentials, + config_key=config_key, + ) + return supermetrics.to_df(query_params=query_params) + diff --git a/src/viadot/sources/__init__.py b/src/viadot/sources/__init__.py index 0d77c1bf2..9d7c9b54d 100644 --- a/src/viadot/sources/__init__.py +++ b/src/viadot/sources/__init__.py @@ -2,6 +2,7 @@ from importlib.util import find_spec +from .supermetrics import Supermetrics, SupermetricsCredentials from .cloud_for_customers import CloudForCustomers from .duckdb import DuckDB from .exchange_rates import ExchangeRates @@ -26,6 +27,8 @@ "Trino", "SQLServer", "UKCarbonIntensity", + "Supermetrics", + "SupermetricsCredentials" ] if find_spec("adlfs"): diff --git a/src/viadot/sources/supermetrics.py b/src/viadot/sources/supermetrics.py new file mode 100644 index 000000000..3dc299290 --- /dev/null +++ b/src/viadot/sources/supermetrics.py @@ -0,0 +1,258 @@ + +import json +from typing import Dict, Any, List +import pandas as pd +import numpy as np + +from pydantic import BaseModel + +from ..config import get_source_credentials +from ..exceptions import CredentialError +from ..utils import handle_api_response +from .base import Source +from ..utils import add_viadot_metadata_columns + + +class SupermetricsCredentials(BaseModel): + """ + Represents credentials for accessing the Supermetrics API. + + This class encapsulates the necessary credentials required to authenticate + and access the Supermetrics API. + + Attributes: + user (str): + The email account associated with the Supermetrics user. + api_key (str): + The API key that provides access to the Supermetrics API. + """ + user: str + api_key: str + + +class Supermetrics(Source): + """ + Implements methods for querying and interacting with the Supermetrics API. + + This class provides functionality to query data from the Supermetrics API, + which is a tool used for accessing data from various data sources. The API + documentation can be found at: + https://supermetrics.com/docs/product-api-getting-started/. For information + on usage limits, please refer to: + https://supermetrics.com/docs/product-api-usage-limits/. + + Args: + config_key (str, optional): + The key in the viadot configuration that holds the relevant credentials + for the API. Defaults to None. + credentials (dict of str to any, optional): + A dictionary containing the credentials needed for the API connection + configuration, specifically `api_key` and `user`. Defaults to None. + query_params (dict of str to any, optional): + A dictionary containing the parameters to pass to the GET query. + These parameters define the specifics of the data request. Defaults to None. + For a full specification of possible parameters, see: + https://supermetrics.com/docs/product-api-get-data/. + """ + API_ENDPOINT = "https://api.supermetrics.com/enterprise/v2/query/data/json" + + def __init__( + self, + *args, + credentials: dict[str, Any] | None = None, + config_key: str = None, + query_params: Dict[str, Any] = None, + **kwargs, + ): + """ + Initializes the Supermetrics object. + + This constructor sets up the necessary components to interact with the + Supermetrics API, including the credentials and any query parameters. + + Args: + credentials (SupermetricsCredentials, optional): + An instance of `SupermetricsCredentials` containing the API key and user email + for authentication. Defaults to None. + config_key (str, optional): + The key in the viadot configuration that holds the relevant credentials + for the API. Defaults to None. + query_params (dict of str to any, optional): + A dictionary containing the parameters to pass to the GET query. These + parameters define the specifics of the data request. Defaults to None. + """ + credentials = credentials or get_source_credentials(config_key) or None + + if credentials is None or not isinstance(credentials, dict): + msg = "Missing credentials." + raise CredentialError(msg) + self.credentials = dict(SupermetricsCredentials(**credentials)) + + super().__init__(*args, credentials=self.credentials, **kwargs) + + self.query_params = query_params + self.api_key = self.credentials["api_key"] + self.user = self.credentials["user"] + + def to_json(self, timeout=(3.05, 60 * 30)) -> Dict[str, Any]: + """ + Downloads query results to a dictionary. + + This method executes the query against the Supermetrics API and retrieves + the results as a JSON dictionary. + + Args: + timeout (tuple of float, optional): + A tuple specifying the timeout values for the request. The first value + is the timeout for connection issues, and the second value is the timeout + for query execution. Defaults to (3.05, 1800), which provides a short + timeout for connection issues and a longer timeout for the query execution. + + Returns: + dict: + The response from the Supermetrics API, returned as a JSON dictionary. + + Raises: + ValueError: + Raised if the query parameters are not set before calling this method. + """ + if not self.query_params: + raise ValueError("Please build the query first") + + params = {"json": json.dumps(self.query_params)} + headers = {"Authorization": f"Bearer {self.api_key}"} + + response = handle_api_response( + url=self.API_ENDPOINT, params=params, headers=headers, timeout=timeout + ) + return response.json() + + @classmethod + def _get_col_names_google_analytics( + cls, + response: dict, + ) -> List[str]: + """ + Gets column names from Google Analytics data. + + This method extracts the column names from the JSON response received + from a Google Analytics API call. + + Args: + response (dict): + A dictionary containing the JSON response from the API call. + + Returns: + list of str: + A list of column names extracted from the Google Analytics data. + + Raises: + ValueError: + Raised if no data is returned in the response or if the column names + cannot be determined. + """ + is_pivoted = any( + field["field_split"] == "column" + for field in response["meta"]["query"]["fields"] + ) + + if is_pivoted: + if not response["data"]: + raise ValueError( + "Couldn't find column names as query returned no data." + ) + columns = response["data"][0] + else: + cols_meta = response["meta"]["query"]["fields"] + columns = [col_meta["field_name"] for col_meta in cols_meta] + return columns + + @classmethod + def _get_col_names_other(cls, response: dict) -> List[str]: + """ + Gets column names from non-Google Analytics data. + + This method extracts the column names from the JSON response received + from an API call that is not related to Google Analytics. + + Args: + response (dict): + A dictionary containing the JSON response from the API call. + + Returns: + list of str: + A list of column names extracted from the non-Google Analytics data. + """ + cols_meta = response["meta"]["query"]["fields"] + columns = [col_meta["field_name"] for col_meta in cols_meta] + return columns + + def _get_col_names(self) -> List[str]: + """ + Gets column names based on the data type. + + This method determines the appropriate column names for the data based + on its type, whether it's Google Analytics data or another type. + + Returns: + list of str: + A list of column names based on the data type. + + Raises: + ValueError: + Raised if the column names cannot be determined. + """ + response: dict = self.to_json() + if self.query_params["ds_id"] == "GA": + return Supermetrics._get_col_names_google_analytics(response) + + return Supermetrics._get_col_names_other(response) + + @add_viadot_metadata_columns + def to_df(self, if_empty: str = "warn", + query_params: Dict[str, Any] = None) -> pd.DataFrame: + """ + Downloads data into a pandas DataFrame. + + This method retrieves data from the Supermetrics API and loads it into + a pandas DataFrame. + + Args: + if_empty (str, optional): + Specifies the action to take if the query returns no data. + Options include "fail" to raise an error or "ignore" to return + an empty DataFrame. Defaults to "fail". + + Returns: + pd.DataFrame: + A pandas DataFrame containing the JSON data retrieved from the API. + + Raises: + ValueError: + Raised if the DataFrame is empty and `if_empty` is set to "fail". + """ + # Use provided query_params or default to the instance's query_params + if query_params is not None: + self.query_params = query_params + + if not self.query_params: + raise ValueError("Query parameters are required to fetch data.") + + self.query_params["api_key"] = self.api_key + + try: + columns = self._get_col_names() + except ValueError: + columns = None + + data = self.to_json()["data"] + + if data: + df = pd.DataFrame(data[1:], columns=columns).replace("", np.nan) + else: + df = pd.DataFrame(columns=columns) + + if df.empty: + self._handle_if_empty(if_empty) + + return df diff --git a/tests/integration/orchestration/prefect/flows/test_supermetrics.py b/tests/integration/orchestration/prefect/flows/test_supermetrics.py new file mode 100644 index 000000000..b9e1f5e96 --- /dev/null +++ b/tests/integration/orchestration/prefect/flows/test_supermetrics.py @@ -0,0 +1,34 @@ +from viadot.config import get_source_config +from viadot.orchestration.prefect.flows import supermetrics_to_adls +import pytest + +@pytest.mark.parametrize("supermetrics_config_key,adls_credentials_secret", [ + ("supermetrics","app-azure-cr-datalakegen2-dev"), +]) + +def test_supermetrics_to_adls(supermetrics_config_key, adls_credentials_secret): + supermetrics_config = get_source_config(supermetrics_config_key) + google_ads_params = { + "ds_id": "AW", + "ds_user": supermetrics_config["credentials"].get("user"), + "ds_accounts": ["1007802423"], + "date_range_type": "last_month", + "fields": [ + "Date", + "Campaignname", + "Clicks", + ], + "max_rows": 1, + } + + state = supermetrics_to_adls( + query_params=google_ads_params, + supermetrics_config_key=supermetrics_config_key, + adls_credentials_secret=adls_credentials_secret, + overwrite=True, + adls_path="raw/supermetrics/.parquet", + ) + + all_successful = all(s.type == "COMPLETED" for s in state) + assert all_successful, "Not all tasks in the flow completed successfully." + diff --git a/tests/unit/test_supermetrics.py b/tests/unit/test_supermetrics.py new file mode 100644 index 000000000..c4de1dc21 --- /dev/null +++ b/tests/unit/test_supermetrics.py @@ -0,0 +1,172 @@ +import pytest + +from viadot.sources import Supermetrics, SupermetricsCredentials +import pdb + + +@pytest.fixture(scope='function') +def supermetrics_credentials(): + return SupermetricsCredentials(user="test_user", api_key="test_key") + +@pytest.fixture(scope='function') +def mock_get_source_credentials(mocker, supermetrics_credentials): + return mocker.patch('viadot.config.get_source_credentials', return_value={ + "user": supermetrics_credentials.user, + "api_key": supermetrics_credentials.api_key + }) + +@pytest.fixture(scope='function') +def supermetrics(mocker, supermetrics_credentials, mock_get_source_credentials): + return Supermetrics( + credentials={ + "user": supermetrics_credentials.user, + "api_key": supermetrics_credentials.api_key + }, + query_params={"ds_id": "GA", "query": "test_query"} + ) + +def test_to_json(mocker, supermetrics): + # Mock the handle_api_response function to simulate an API response + mock_handle_api_response = mocker.patch('viadot.sources.supermetrics.handle_api_response') + mock_response = { + "data": [["value1", "value2"]], + "meta": { + "query": { + "fields": [ + {"field_name": "col1"}, + {"field_name": "col2"} + ] + } + } + } + # Set the mock to return the mock response object + mock_handle_api_response.return_value.json.return_value = mock_response + + # Call the method under test + response = supermetrics.to_json() + + # Assert that the response is as expected + assert response == { + "data": [["value1", "value2"]], + "meta": { + "query": { + "fields": [ + {"field_name": "col1"}, + {"field_name": "col2"} + ] + } + } + } + +def test_to_df_with_data(supermetrics, mocker): + # Mock the API response with some data + mock_response = { + "meta": { + "query": { + "ds_id": "GA", #Data source ID, e.g., Google Analytics + "fields": [ + { + "field_name": "date", + "field_type": "DIMENSION", + "field_split": "row" + }, + { + "field_name": "sessions", + "field_type": "METRIC", + "field_split": "row" + } + ], + "other_query_metadata": "..." + }, + "status": "success", # Status of the query + "execution_time": "0.456" # Time taken to execute the query + }, + "data": [ + ["2023-01-01", 100], # Example data rows + ["2023-01-02", 150], + ["2023-01-03", 120] + ], + "paging": { + "current_page": 1, # Current page number if pagination is used + "total_pages": 1, + "total_results": 3 + } + } + + mock_method = mocker.patch('viadot.sources.supermetrics.Supermetrics.to_json') + mock_method.return_value = mock_response + mock_method = mocker.patch('viadot.sources.supermetrics.Supermetrics._get_col_names') + mock_method.return_value=["date", "sessions"] + + query_params = {"ds_id": "GA", "query": "test_query"} + df = supermetrics.to_df() + + assert not df.empty + assert list(df.columns) == ["date", "sessions","_viadot_source", "_viadot_downloaded_at_utc"] + +def test_get_col_names_google_analytics_pivoted(mocker, supermetrics): + mock_response = { + "meta": { + "query": { + "fields": [ + {"field_name": "ga:date", "field_split": "column"}, + {"field_name": "ga:sessions", "field_split": "row"} + ] + } + }, + "data": [{"ga:date": "2023-01-01", "ga:sessions": 100}] + } + columns = supermetrics._get_col_names_google_analytics(mock_response) + assert columns == {"ga:date": "2023-01-01", "ga:sessions": 100} + +def test_get_col_names_google_analytics_non_pivoted(mocker, supermetrics): + mock_response = { + "meta": { + "query": { + "fields": [ + {"field_name": "ga:date", "field_split": "row"}, + {"field_name": "ga:sessions", "field_split": "row"} + ] + } + }, + "data": [{"ga:date": "2023-01-01", "ga:sessions": 100}] + } + columns = supermetrics._get_col_names_google_analytics(mock_response) + assert columns == ["ga:date", "ga:sessions"] + +def test_to_df_metadata_columns(mocker, supermetrics): + # Mock the API response with some data + mock_response = { + "data": [["2023-01-01", 100]], + "meta": { + "query": { + "fields": [ + {"field_name": "date"}, + {"field_name": "sessions"} + ] + } + } + } + + mocker.patch('viadot.sources.supermetrics.Supermetrics.to_json', return_value=mock_response) + mocker.patch('viadot.sources.supermetrics.Supermetrics._get_col_names', return_value=["date", "sessions"]) + + df = supermetrics.to_df() + + assert "_viadot_source" in df.columns + assert "_viadot_downloaded_at_utc" in df.columns + +def test_get_col_names_ga(mocker, supermetrics): + mocker.patch('viadot.sources.supermetrics.Supermetrics.to_json', return_value={ + "meta": { + "query": { + "fields": [ + {"field_name": "ga:date", "field_split": "column"}, + {"field_name": "ga:sessions", "field_split": "row"} + ] + } + }, + "data": [{"ga:date": "2023-01-01", "ga:sessions": 100}] + }) + columns = supermetrics._get_col_names() + assert columns == {"ga:date": "2023-01-01", "ga:sessions": 100} \ No newline at end of file From 03413ad816dc0c296ad4b86e09518afe5dafbba7 Mon Sep 17 00:00:00 2001 From: angelika233 Date: Wed, 25 Sep 2024 15:59:52 +0200 Subject: [PATCH 02/10] =?UTF-8?q?=F0=9F=8E=A8=20Change=20secret=20name?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../orchestration/prefect/flows/test_supermetrics.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/orchestration/prefect/flows/test_supermetrics.py b/tests/integration/orchestration/prefect/flows/test_supermetrics.py index b9e1f5e96..fba57fa01 100644 --- a/tests/integration/orchestration/prefect/flows/test_supermetrics.py +++ b/tests/integration/orchestration/prefect/flows/test_supermetrics.py @@ -3,7 +3,7 @@ import pytest @pytest.mark.parametrize("supermetrics_config_key,adls_credentials_secret", [ - ("supermetrics","app-azure-cr-datalakegen2-dev"), + ("supermetrics","supermetrics"), ]) def test_supermetrics_to_adls(supermetrics_config_key, adls_credentials_secret): From fad267a223f7c5e4b224ea10bdda9f7a4ca80584 Mon Sep 17 00:00:00 2001 From: angelika233 Date: Thu, 26 Sep 2024 08:32:42 +0200 Subject: [PATCH 03/10] =?UTF-8?q?=F0=9F=8E=A8=20Docstring=20formatting?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../prefect/flows/supermetrics_to_adls.py | 72 +++--- .../prefect/tasks/supermetrics.py | 45 ++-- src/viadot/sources/supermetrics.py | 146 ++++++------- .../prefect/flows/test_supermetrics.py | 16 +- tests/unit/test_supermetrics.py | 205 +++++++++--------- 5 files changed, 239 insertions(+), 245 deletions(-) diff --git a/src/viadot/orchestration/prefect/flows/supermetrics_to_adls.py b/src/viadot/orchestration/prefect/flows/supermetrics_to_adls.py index 73178f3f1..b297449db 100644 --- a/src/viadot/orchestration/prefect/flows/supermetrics_to_adls.py +++ b/src/viadot/orchestration/prefect/flows/supermetrics_to_adls.py @@ -4,8 +4,8 @@ from prefect.task_runners import ConcurrentTaskRunner from viadot.orchestration.prefect.tasks import ( - supermetrics_to_df, df_to_adls, + supermetrics_to_df, ) @@ -29,51 +29,49 @@ def supermetrics_to_adls( # noqa: PLR0913 adls_config_key: str | None = None, **kwargs: dict[str, Any] | None, ): - """ - Extract data from the Supermetrics API and save it to Azure Data Lake Storage (ADLS). + """Flow to extract data from the Supermetrics API and save it to ADLS. - This function queries data from the Supermetrics API using the provided query parameters - and saves the resulting DataFrame to Azure Data Lake Storage (ADLS) as a file. + This function queries data from the Supermetrics API using the provided query + parameters and saves the resulting DataFrame to Azure Data Lake Storage (ADLS) + as a file. Args: - query_params (dict[str, Any], optional): - A dictionary of query parameters for the Supermetrics API. These parameters - specify the data to retrieve from Supermetrics. If not provided, the default + query_params (dict[str, Any], optional): + A dictionary of query parameters for the Supermetrics API. These parameters + specify the data to retrieve from Supermetrics. If not provided, the default parameters from the Supermetrics configuration will be used. - - adls_path (str, optional): - The destination path in ADLS where the DataFrame will be saved. This should - include the file name and extension (e.g., 'myfolder/myfile.csv'). If not provided, - the function will use a default path from the configuration or raise an error. - - overwrite (bool, optional): - A flag indicating whether to overwrite the existing file in ADLS. If set to False - and the file exists, an error will be raised. Default is False. - - supermetrics_credentials_secret (str, optional): - The name of the secret in the secret management system containing the Supermetrics API credentials. - If not provided, the function will use credentials specified in the configuration. - - supermetrics_config_key (str, optional): - The key in the viadot configuration holding relevant credentials. Defaults to None. - - adls_credentials_secret (str, optional): - The name of the secret in the secret management system containing the ADLS credentials. - If not provided, the function will use credentials specified in the configuration. - - adls_config_key (str, optional): - The key in the viadot configuration holding relevant credentials. Defaults to None. - - **kwargs (dict[str, Any], optional): - Additional keyword arguments to pass to the `supermetrics_to_df` function for further customization - of the Supermetrics query. + adls_path (str, optional): + The destination path in ADLS where the DataFrame will be saved. This should + include the file name and extension (e.g., 'myfolder/myfile.csv'). If not + provided, the function will use a default path from the configuration or raise + an error. + overwrite (bool, optional): + A flag indicating whether to overwrite the existing file in ADLS. If set + to Falseand the file exists, an error will be raised. Default is False. + supermetrics_credentials_secret (str, optional): + The name of the secret in the secret management system containing + the Supermetrics API credentials. If not provided, the function will use + credentials specified in the configuration. + supermetrics_config_key (str, optional): + The key in the viadot configuration holding relevant credentials. + Defaults to None. + adls_credentials_secret (str, optional): + The name of the secret in the secret management system containing + the ADLS credentials. If not provided, the function will use credentials specified + in the configuration. + adls_config_key (str, optional): + The key in the viadot configuration holding relevant credentials. + Defaults to None. + **kwargs (dict[str, Any], optional): + Additional keyword arguments to pass to the `supermetrics_to_df` function + for further customization of the Supermetrics query. Raises: - ValueError: + ValueError: If `adls_path` is not provided and cannot be determined from the configuration. """ df = supermetrics_to_df( - query_params = query_params, + query_params=query_params, credentials_secret=supermetrics_credentials_secret, config_key=supermetrics_config_key, **kwargs, diff --git a/src/viadot/orchestration/prefect/tasks/supermetrics.py b/src/viadot/orchestration/prefect/tasks/supermetrics.py index e79743bfd..0311b563c 100644 --- a/src/viadot/orchestration/prefect/tasks/supermetrics.py +++ b/src/viadot/orchestration/prefect/tasks/supermetrics.py @@ -1,5 +1,3 @@ -from typing import Any - import pandas as pd from prefect import task @@ -14,48 +12,45 @@ def supermetrics_to_df( config_key: str | None = None, credentials_secret: str | None = None, ) -> pd.DataFrame: - """ - Retrieves data from Supermetrics and returns it as a pandas DataFrame. + """Task to retrive data from Supermetrics and returns it as a pandas DataFrame. - This function queries the Supermetrics API using the provided query parameters and - returns the data as a pandas DataFrame. The function supports both configuration-based + This function queries the Supermetrics API using the provided query parameters and + returns the data as a pandas DataFrame. The function supports both configuration-based and secret-based credentials. - The function is decorated with a Prefect task, allowing it to handle retries, logging, + The function is decorated with a Prefect task, allowing it to handle retries, logging, and timeout behavior. Args: - query_params (dict): - A dictionary containing the parameters for querying the Supermetrics API. - These parameters define what data to retrieve and how the query should be constructed. + query_params (dict): + A dictionary containing the parameters for querying the Supermetrics API. + These parameters define what data to retrieve and how the query should + be constructed. config_key (str, optional): The key in the viadot config holding relevant credentials. Defaults to None. - credentials_secret (str, optional): - The name of the secret in your secret management system that contains the Supermetrics API - credentials. If `config_key` is not provided, this secret is used to authenticate with - the Supermetrics API. + credentials_secret (str, optional): + The name of the secret in your secret management system that contains + the Supermetrics API credentials. If `config_key` is not provided, + this secret is used to authenticate with the Supermetrics API. Returns: - pd.DataFrame: - A pandas DataFrame containing the data retrieved from Supermetrics based on the provided - query parameters. + pd.DataFrame: + A pandas DataFrame containing the data retrieved from Supermetrics based + on the provided query parameters. Raises: - MissingSourceCredentialsError: - Raised if neither `credentials_secret` nor `config_key` is provided, indicating that no - valid credentials were supplied to access the Supermetrics API. + MissingSourceCredentialsError: + Raised if neither `credentials_secret` nor `config_key` is provided, + indicating that no valid credentials were supplied to access + the Supermetrics API. """ if not (credentials_secret or config_key): raise MissingSourceCredentialsError - if not config_key: - credentials = get_credentials(credentials_secret) - else: - credentials = None + credentials = get_credentials(credentials_secret) if not config_key else None supermetrics = Supermetrics( credentials=credentials, config_key=config_key, ) return supermetrics.to_df(query_params=query_params) - diff --git a/src/viadot/sources/supermetrics.py b/src/viadot/sources/supermetrics.py index 3dc299290..37d07b781 100644 --- a/src/viadot/sources/supermetrics.py +++ b/src/viadot/sources/supermetrics.py @@ -1,59 +1,57 @@ - import json -from typing import Dict, Any, List -import pandas as pd -import numpy as np +from typing import Any, Dict, List +import numpy as np +import pandas as pd from pydantic import BaseModel from ..config import get_source_credentials from ..exceptions import CredentialError -from ..utils import handle_api_response +from ..utils import add_viadot_metadata_columns, handle_api_response from .base import Source -from ..utils import add_viadot_metadata_columns class SupermetricsCredentials(BaseModel): - """ - Represents credentials for accessing the Supermetrics API. + """Represents credentials for accessing the Supermetrics API. - This class encapsulates the necessary credentials required to authenticate + This class encapsulates the necessary credentials required to authenticate and access the Supermetrics API. Attributes: - user (str): + user (str): The email account associated with the Supermetrics user. - api_key (str): + api_key (str): The API key that provides access to the Supermetrics API. """ + user: str api_key: str class Supermetrics(Source): - """ - Implements methods for querying and interacting with the Supermetrics API. + """Implements methods for querying and interacting with the Supermetrics API. - This class provides functionality to query data from the Supermetrics API, - which is a tool used for accessing data from various data sources. The API - documentation can be found at: - https://supermetrics.com/docs/product-api-getting-started/. For information - on usage limits, please refer to: + This class provides functionality to query data from the Supermetrics API, + which is a tool used for accessing data from various data sources. The API + documentation can be found at: + https://supermetrics.com/docs/product-api-getting-started/. For information + on usage limits, please refer to: https://supermetrics.com/docs/product-api-usage-limits/. Args: - config_key (str, optional): - The key in the viadot configuration that holds the relevant credentials + config_key (str, optional): + The key in the viadot configuration that holds the relevant credentials for the API. Defaults to None. - credentials (dict of str to any, optional): - A dictionary containing the credentials needed for the API connection + credentials (dict of str to any, optional): + A dictionary containing the credentials needed for the API connection configuration, specifically `api_key` and `user`. Defaults to None. - query_params (dict of str to any, optional): - A dictionary containing the parameters to pass to the GET query. + query_params (dict of str to any, optional): + A dictionary containing the parameters to pass to the GET query. These parameters define the specifics of the data request. Defaults to None. - For a full specification of possible parameters, see: + For a full specification of possible parameters, see: https://supermetrics.com/docs/product-api-get-data/. """ + API_ENDPOINT = "https://api.supermetrics.com/enterprise/v2/query/data/json" def __init__( @@ -63,22 +61,21 @@ def __init__( config_key: str = None, query_params: Dict[str, Any] = None, **kwargs, - ): - """ - Initializes the Supermetrics object. + ) -> None: + """Initializes the Supermetrics object. - This constructor sets up the necessary components to interact with the + This constructor sets up the necessary components to interact with the Supermetrics API, including the credentials and any query parameters. Args: - credentials (SupermetricsCredentials, optional): - An instance of `SupermetricsCredentials` containing the API key and user email + credentials (SupermetricsCredentials, optional): + An instance of `SupermetricsCredentials` containing the API key and user email for authentication. Defaults to None. - config_key (str, optional): - The key in the viadot configuration that holds the relevant credentials + config_key (str, optional): + The key in the viadot configuration that holds the relevant credentials for the API. Defaults to None. - query_params (dict of str to any, optional): - A dictionary containing the parameters to pass to the GET query. These + query_params (dict of str to any, optional): + A dictionary containing the parameters to pass to the GET query. These parameters define the specifics of the data request. Defaults to None. """ credentials = credentials or get_source_credentials(config_key) or None @@ -95,25 +92,24 @@ def __init__( self.user = self.credentials["user"] def to_json(self, timeout=(3.05, 60 * 30)) -> Dict[str, Any]: - """ - Downloads query results to a dictionary. + """Downloads query results to a dictionary. - This method executes the query against the Supermetrics API and retrieves + This method executes the query against the Supermetrics API and retrieves the results as a JSON dictionary. Args: - timeout (tuple of float, optional): - A tuple specifying the timeout values for the request. The first value - is the timeout for connection issues, and the second value is the timeout - for query execution. Defaults to (3.05, 1800), which provides a short + timeout (tuple of float, optional): + A tuple specifying the timeout values for the request. The first value + is the timeout for connection issues, and the second value is the timeout + for query execution. Defaults to (3.05, 1800), which provides a short timeout for connection issues and a longer timeout for the query execution. Returns: - dict: + dict: The response from the Supermetrics API, returned as a JSON dictionary. Raises: - ValueError: + ValueError: Raised if the query parameters are not set before calling this method. """ if not self.query_params: @@ -132,23 +128,22 @@ def _get_col_names_google_analytics( cls, response: dict, ) -> List[str]: - """ - Gets column names from Google Analytics data. + """Gets column names from Google Analytics data. - This method extracts the column names from the JSON response received + This method extracts the column names from the JSON response received from a Google Analytics API call. Args: - response (dict): + response (dict): A dictionary containing the JSON response from the API call. Returns: - list of str: + list of str: A list of column names extracted from the Google Analytics data. Raises: - ValueError: - Raised if no data is returned in the response or if the column names + ValueError: + Raised if no data is returned in the response or if the column names cannot be determined. """ is_pivoted = any( @@ -158,9 +153,8 @@ def _get_col_names_google_analytics( if is_pivoted: if not response["data"]: - raise ValueError( - "Couldn't find column names as query returned no data." - ) + msg = "Couldn't find column names as query returned no data." + raise ValueError(msg) columns = response["data"][0] else: cols_meta = response["meta"]["query"]["fields"] @@ -169,37 +163,34 @@ def _get_col_names_google_analytics( @classmethod def _get_col_names_other(cls, response: dict) -> List[str]: - """ - Gets column names from non-Google Analytics data. + """Gets column names from non-Google Analytics data. - This method extracts the column names from the JSON response received + This method extracts the column names from the JSON response received from an API call that is not related to Google Analytics. Args: - response (dict): + response (dict): A dictionary containing the JSON response from the API call. Returns: - list of str: + list of str: A list of column names extracted from the non-Google Analytics data. """ cols_meta = response["meta"]["query"]["fields"] - columns = [col_meta["field_name"] for col_meta in cols_meta] - return columns + return [col_meta["field_name"] for col_meta in cols_meta] def _get_col_names(self) -> List[str]: - """ - Gets column names based on the data type. + """Gets column names based on the data type. - This method determines the appropriate column names for the data based + This method determines the appropriate column names for the data based on its type, whether it's Google Analytics data or another type. Returns: - list of str: + list of str: A list of column names based on the data type. Raises: - ValueError: + ValueError: Raised if the column names cannot be determined. """ response: dict = self.to_json() @@ -209,26 +200,26 @@ def _get_col_names(self) -> List[str]: return Supermetrics._get_col_names_other(response) @add_viadot_metadata_columns - def to_df(self, if_empty: str = "warn", - query_params: Dict[str, Any] = None) -> pd.DataFrame: - """ - Downloads data into a pandas DataFrame. + def to_df( + self, if_empty: str = "warn", query_params: Dict[str, Any] = None + ) -> pd.DataFrame: + """Downloads data into a pandas DataFrame. - This method retrieves data from the Supermetrics API and loads it into + This method retrieves data from the Supermetrics API and loads it into a pandas DataFrame. Args: - if_empty (str, optional): - Specifies the action to take if the query returns no data. - Options include "fail" to raise an error or "ignore" to return + if_empty (str, optional): + Specifies the action to take if the query returns no data. + Options include "fail" to raise an error or "ignore" to return an empty DataFrame. Defaults to "fail". Returns: - pd.DataFrame: + pd.DataFrame: A pandas DataFrame containing the JSON data retrieved from the API. Raises: - ValueError: + ValueError: Raised if the DataFrame is empty and `if_empty` is set to "fail". """ # Use provided query_params or default to the instance's query_params @@ -236,7 +227,8 @@ def to_df(self, if_empty: str = "warn", self.query_params = query_params if not self.query_params: - raise ValueError("Query parameters are required to fetch data.") + msg = "Query parameters are required to fetch data." + raise ValueError(msg) self.query_params["api_key"] = self.api_key diff --git a/tests/integration/orchestration/prefect/flows/test_supermetrics.py b/tests/integration/orchestration/prefect/flows/test_supermetrics.py index fba57fa01..74948deee 100644 --- a/tests/integration/orchestration/prefect/flows/test_supermetrics.py +++ b/tests/integration/orchestration/prefect/flows/test_supermetrics.py @@ -1,11 +1,14 @@ +import pytest from viadot.config import get_source_config from viadot.orchestration.prefect.flows import supermetrics_to_adls -import pytest -@pytest.mark.parametrize("supermetrics_config_key,adls_credentials_secret", [ - ("supermetrics","supermetrics"), -]) +@pytest.mark.parametrize( + "supermetrics_config_key,adls_credentials_secret", + [ + ("supermetrics", "supermetrics"), + ], +) def test_supermetrics_to_adls(supermetrics_config_key, adls_credentials_secret): supermetrics_config = get_source_config(supermetrics_config_key) google_ads_params = { @@ -25,10 +28,9 @@ def test_supermetrics_to_adls(supermetrics_config_key, adls_credentials_secret): query_params=google_ads_params, supermetrics_config_key=supermetrics_config_key, adls_credentials_secret=adls_credentials_secret, - overwrite=True, + overwrite=True, adls_path="raw/supermetrics/.parquet", ) - + all_successful = all(s.type == "COMPLETED" for s in state) assert all_successful, "Not all tasks in the flow completed successfully." - diff --git a/tests/unit/test_supermetrics.py b/tests/unit/test_supermetrics.py index c4de1dc21..c642d63c0 100644 --- a/tests/unit/test_supermetrics.py +++ b/tests/unit/test_supermetrics.py @@ -1,44 +1,45 @@ import pytest - from viadot.sources import Supermetrics, SupermetricsCredentials -import pdb -@pytest.fixture(scope='function') +@pytest.fixture(scope="function") def supermetrics_credentials(): return SupermetricsCredentials(user="test_user", api_key="test_key") -@pytest.fixture(scope='function') -def mock_get_source_credentials(mocker, supermetrics_credentials): - return mocker.patch('viadot.config.get_source_credentials', return_value={ - "user": supermetrics_credentials.user, - "api_key": supermetrics_credentials.api_key - }) -@pytest.fixture(scope='function') -def supermetrics(mocker, supermetrics_credentials, mock_get_source_credentials): +@pytest.fixture(scope="function") +def mock_get_source_credentials( + mocker, supermetrics_credentials: SupermetricsCredentials +): + return mocker.patch( + "viadot.config.get_source_credentials", + return_value={ + "user": supermetrics_credentials.user, + "api_key": supermetrics_credentials.api_key, + }, + ) + + +@pytest.fixture(scope="function") +def supermetrics(supermetrics_credentials: SupermetricsCredentials): return Supermetrics( credentials={ "user": supermetrics_credentials.user, - "api_key": supermetrics_credentials.api_key + "api_key": supermetrics_credentials.api_key, }, - query_params={"ds_id": "GA", "query": "test_query"} + query_params={"ds_id": "GA", "query": "test_query"}, ) -def test_to_json(mocker, supermetrics): + +def test_to_json(mocker, supermetrics: Supermetrics): # Mock the handle_api_response function to simulate an API response - mock_handle_api_response = mocker.patch('viadot.sources.supermetrics.handle_api_response') + mock_handle_api_response = mocker.patch( + "viadot.sources.supermetrics.handle_api_response" + ) mock_response = { - "data": [["value1", "value2"]], - "meta": { - "query": { - "fields": [ - {"field_name": "col1"}, - {"field_name": "col2"} - ] - } - } - } + "data": [["value1", "value2"]], + "meta": {"query": {"fields": [{"field_name": "col1"}, {"field_name": "col2"}]}}, + } # Set the mock to return the mock response object mock_handle_api_response.return_value.json.return_value = mock_response @@ -48,125 +49,131 @@ def test_to_json(mocker, supermetrics): # Assert that the response is as expected assert response == { "data": [["value1", "value2"]], - "meta": { - "query": { - "fields": [ - {"field_name": "col1"}, - {"field_name": "col2"} - ] - } - } + "meta": {"query": {"fields": [{"field_name": "col1"}, {"field_name": "col2"}]}}, } -def test_to_df_with_data(supermetrics, mocker): + +def test_to_df_with_data(supermetrics: Supermetrics, mocker): # Mock the API response with some data mock_response = { - "meta": { - "query": { - "ds_id": "GA", #Data source ID, e.g., Google Analytics - "fields": [ - { - "field_name": "date", - "field_type": "DIMENSION", - "field_split": "row" - }, - { - "field_name": "sessions", - "field_type": "METRIC", - "field_split": "row" - } - ], - "other_query_metadata": "..." + "meta": { + "query": { + "ds_id": "GA", # Data source ID, e.g., Google Analytics + "fields": [ + { + "field_name": "date", + "field_type": "DIMENSION", + "field_split": "row", + }, + { + "field_name": "sessions", + "field_type": "METRIC", + "field_split": "row", }, - "status": "success", # Status of the query - "execution_time": "0.456" # Time taken to execute the query - }, - "data": [ - ["2023-01-01", 100], # Example data rows - ["2023-01-02", 150], - ["2023-01-03", 120] ], - "paging": { - "current_page": 1, # Current page number if pagination is used - "total_pages": 1, - "total_results": 3 - } - } + "other_query_metadata": "...", + }, + "status": "success", # Status of the query + "execution_time": "0.456", # Time taken to execute the query + }, + "data": [ + ["2023-01-01", 100], # Example data rows + ["2023-01-02", 150], + ["2023-01-03", 120], + ], + "paging": { + "current_page": 1, # Current page number if pagination is used + "total_pages": 1, + "total_results": 3, + }, + } - mock_method = mocker.patch('viadot.sources.supermetrics.Supermetrics.to_json') + mock_method = mocker.patch("viadot.sources.supermetrics.Supermetrics.to_json") mock_method.return_value = mock_response - mock_method = mocker.patch('viadot.sources.supermetrics.Supermetrics._get_col_names') - mock_method.return_value=["date", "sessions"] - - query_params = {"ds_id": "GA", "query": "test_query"} + mock_method = mocker.patch( + "viadot.sources.supermetrics.Supermetrics._get_col_names" + ) + mock_method.return_value = ["date", "sessions"] df = supermetrics.to_df() assert not df.empty - assert list(df.columns) == ["date", "sessions","_viadot_source", "_viadot_downloaded_at_utc"] - -def test_get_col_names_google_analytics_pivoted(mocker, supermetrics): + assert list(df.columns) == [ + "date", + "sessions", + "_viadot_source", + "_viadot_downloaded_at_utc", + ] + + +def test_get_col_names_google_analytics_pivoted(mocker, supermetrics: Supermetrics): mock_response = { "meta": { "query": { "fields": [ {"field_name": "ga:date", "field_split": "column"}, - {"field_name": "ga:sessions", "field_split": "row"} + {"field_name": "ga:sessions", "field_split": "row"}, ] } }, - "data": [{"ga:date": "2023-01-01", "ga:sessions": 100}] + "data": [{"ga:date": "2023-01-01", "ga:sessions": 100}], } columns = supermetrics._get_col_names_google_analytics(mock_response) assert columns == {"ga:date": "2023-01-01", "ga:sessions": 100} -def test_get_col_names_google_analytics_non_pivoted(mocker, supermetrics): + +def test_get_col_names_google_analytics_non_pivoted(mocker, supermetrics: Supermetrics): mock_response = { "meta": { "query": { "fields": [ {"field_name": "ga:date", "field_split": "row"}, - {"field_name": "ga:sessions", "field_split": "row"} + {"field_name": "ga:sessions", "field_split": "row"}, ] } }, - "data": [{"ga:date": "2023-01-01", "ga:sessions": 100}] + "data": [{"ga:date": "2023-01-01", "ga:sessions": 100}], } columns = supermetrics._get_col_names_google_analytics(mock_response) assert columns == ["ga:date", "ga:sessions"] - -def test_to_df_metadata_columns(mocker, supermetrics): + + +def test_to_df_metadata_columns(mocker, supermetrics: Supermetrics): # Mock the API response with some data mock_response = { "data": [["2023-01-01", 100]], "meta": { - "query": { - "fields": [ - {"field_name": "date"}, - {"field_name": "sessions"} - ] - } - } + "query": {"fields": [{"field_name": "date"}, {"field_name": "sessions"}]} + }, } - mocker.patch('viadot.sources.supermetrics.Supermetrics.to_json', return_value=mock_response) - mocker.patch('viadot.sources.supermetrics.Supermetrics._get_col_names', return_value=["date", "sessions"]) + mocker.patch( + "viadot.sources.supermetrics.Supermetrics.to_json", return_value=mock_response + ) + mocker.patch( + "viadot.sources.supermetrics.Supermetrics._get_col_names", + return_value=["date", "sessions"], + ) df = supermetrics.to_df() assert "_viadot_source" in df.columns assert "_viadot_downloaded_at_utc" in df.columns -def test_get_col_names_ga(mocker, supermetrics): - mocker.patch('viadot.sources.supermetrics.Supermetrics.to_json', return_value={ - "meta": { - "query": { - "fields": [ - {"field_name": "ga:date", "field_split": "column"}, - {"field_name": "ga:sessions", "field_split": "row"} - ] - } + +def test_get_col_names_ga(mocker, supermetrics: Supermetrics): + mocker.patch( + "viadot.sources.supermetrics.Supermetrics.to_json", + return_value={ + "meta": { + "query": { + "fields": [ + {"field_name": "ga:date", "field_split": "column"}, + {"field_name": "ga:sessions", "field_split": "row"}, + ] + } + }, + "data": [{"ga:date": "2023-01-01", "ga:sessions": 100}], }, - "data": [{"ga:date": "2023-01-01", "ga:sessions": 100}] - }) + ) columns = supermetrics._get_col_names() - assert columns == {"ga:date": "2023-01-01", "ga:sessions": 100} \ No newline at end of file + assert columns == {"ga:date": "2023-01-01", "ga:sessions": 100} From 543e3104576e9dfa6ae52b39b4a8ffa2cec75637 Mon Sep 17 00:00:00 2001 From: angelika233 Date: Thu, 26 Sep 2024 09:37:02 +0200 Subject: [PATCH 04/10] Fix formatting --- .../prefect/flows/supermetrics_to_adls.py | 3 ++ .../prefect/tasks/supermetrics.py | 4 ++ src/viadot/sources/supermetrics.py | 42 ++++++++++++++++--- tests/unit/test_supermetrics.py | 24 ++++++----- 4 files changed, 57 insertions(+), 16 deletions(-) diff --git a/src/viadot/orchestration/prefect/flows/supermetrics_to_adls.py b/src/viadot/orchestration/prefect/flows/supermetrics_to_adls.py index b297449db..e2e0db8fd 100644 --- a/src/viadot/orchestration/prefect/flows/supermetrics_to_adls.py +++ b/src/viadot/orchestration/prefect/flows/supermetrics_to_adls.py @@ -36,6 +36,7 @@ def supermetrics_to_adls( # noqa: PLR0913 as a file. Args: + ---- query_params (dict[str, Any], optional): A dictionary of query parameters for the Supermetrics API. These parameters specify the data to retrieve from Supermetrics. If not provided, the default @@ -67,8 +68,10 @@ def supermetrics_to_adls( # noqa: PLR0913 for further customization of the Supermetrics query. Raises: + ------ ValueError: If `adls_path` is not provided and cannot be determined from the configuration. + """ df = supermetrics_to_df( query_params=query_params, diff --git a/src/viadot/orchestration/prefect/tasks/supermetrics.py b/src/viadot/orchestration/prefect/tasks/supermetrics.py index 0311b563c..03dfc0286 100644 --- a/src/viadot/orchestration/prefect/tasks/supermetrics.py +++ b/src/viadot/orchestration/prefect/tasks/supermetrics.py @@ -22,6 +22,7 @@ def supermetrics_to_df( and timeout behavior. Args: + ---- query_params (dict): A dictionary containing the parameters for querying the Supermetrics API. These parameters define what data to retrieve and how the query should @@ -34,15 +35,18 @@ def supermetrics_to_df( this secret is used to authenticate with the Supermetrics API. Returns: + ------- pd.DataFrame: A pandas DataFrame containing the data retrieved from Supermetrics based on the provided query parameters. Raises: + ------ MissingSourceCredentialsError: Raised if neither `credentials_secret` nor `config_key` is provided, indicating that no valid credentials were supplied to access the Supermetrics API. + """ if not (credentials_secret or config_key): raise MissingSourceCredentialsError diff --git a/src/viadot/sources/supermetrics.py b/src/viadot/sources/supermetrics.py index 37d07b781..68dc134fb 100644 --- a/src/viadot/sources/supermetrics.py +++ b/src/viadot/sources/supermetrics.py @@ -17,11 +17,13 @@ class SupermetricsCredentials(BaseModel): This class encapsulates the necessary credentials required to authenticate and access the Supermetrics API. - Attributes: + Attributes + ---------- user (str): The email account associated with the Supermetrics user. api_key (str): The API key that provides access to the Supermetrics API. + """ user: str @@ -39,6 +41,7 @@ class Supermetrics(Source): https://supermetrics.com/docs/product-api-usage-limits/. Args: + ---- config_key (str, optional): The key in the viadot configuration that holds the relevant credentials for the API. Defaults to None. @@ -50,6 +53,7 @@ class Supermetrics(Source): These parameters define the specifics of the data request. Defaults to None. For a full specification of possible parameters, see: https://supermetrics.com/docs/product-api-get-data/. + """ API_ENDPOINT = "https://api.supermetrics.com/enterprise/v2/query/data/json" @@ -68,6 +72,7 @@ def __init__( Supermetrics API, including the credentials and any query parameters. Args: + ---- credentials (SupermetricsCredentials, optional): An instance of `SupermetricsCredentials` containing the API key and user email for authentication. Defaults to None. @@ -77,6 +82,7 @@ def __init__( query_params (dict of str to any, optional): A dictionary containing the parameters to pass to the GET query. These parameters define the specifics of the data request. Defaults to None. + """ credentials = credentials or get_source_credentials(config_key) or None @@ -98,6 +104,7 @@ def to_json(self, timeout=(3.05, 60 * 30)) -> Dict[str, Any]: the results as a JSON dictionary. Args: + ---- timeout (tuple of float, optional): A tuple specifying the timeout values for the request. The first value is the timeout for connection issues, and the second value is the timeout @@ -105,12 +112,15 @@ def to_json(self, timeout=(3.05, 60 * 30)) -> Dict[str, Any]: timeout for connection issues and a longer timeout for the query execution. Returns: + ------- dict: The response from the Supermetrics API, returned as a JSON dictionary. Raises: + ------ ValueError: Raised if the query parameters are not set before calling this method. + """ if not self.query_params: raise ValueError("Please build the query first") @@ -119,7 +129,10 @@ def to_json(self, timeout=(3.05, 60 * 30)) -> Dict[str, Any]: headers = {"Authorization": f"Bearer {self.api_key}"} response = handle_api_response( - url=self.API_ENDPOINT, params=params, headers=headers, timeout=timeout + url=self.API_ENDPOINT, + params=params, + headers=headers, + timeout=timeout, ) return response.json() @@ -134,17 +147,21 @@ def _get_col_names_google_analytics( from a Google Analytics API call. Args: + ---- response (dict): A dictionary containing the JSON response from the API call. Returns: + ------- list of str: A list of column names extracted from the Google Analytics data. Raises: + ------ ValueError: Raised if no data is returned in the response or if the column names cannot be determined. + """ is_pivoted = any( field["field_split"] == "column" @@ -169,12 +186,15 @@ def _get_col_names_other(cls, response: dict) -> List[str]: from an API call that is not related to Google Analytics. Args: + ---- response (dict): A dictionary containing the JSON response from the API call. Returns: + ------- list of str: A list of column names extracted from the non-Google Analytics data. + """ cols_meta = response["meta"]["query"]["fields"] return [col_meta["field_name"] for col_meta in cols_meta] @@ -185,13 +205,19 @@ def _get_col_names(self) -> List[str]: This method determines the appropriate column names for the data based on its type, whether it's Google Analytics data or another type. - Returns: + + + + Returns + ------- list of str: A list of column names based on the data type. - Raises: + Raises + ------ ValueError: Raised if the column names cannot be determined. + """ response: dict = self.to_json() if self.query_params["ds_id"] == "GA": @@ -201,7 +227,9 @@ def _get_col_names(self) -> List[str]: @add_viadot_metadata_columns def to_df( - self, if_empty: str = "warn", query_params: Dict[str, Any] = None + self, + if_empty: str = "warn", + query_params: Dict[str, Any] = None, ) -> pd.DataFrame: """Downloads data into a pandas DataFrame. @@ -209,18 +237,22 @@ def to_df( a pandas DataFrame. Args: + ---- if_empty (str, optional): Specifies the action to take if the query returns no data. Options include "fail" to raise an error or "ignore" to return an empty DataFrame. Defaults to "fail". Returns: + ------- pd.DataFrame: A pandas DataFrame containing the JSON data retrieved from the API. Raises: + ------ ValueError: Raised if the DataFrame is empty and `if_empty` is set to "fail". + """ # Use provided query_params or default to the instance's query_params if query_params is not None: diff --git a/tests/unit/test_supermetrics.py b/tests/unit/test_supermetrics.py index c642d63c0..39ca35839 100644 --- a/tests/unit/test_supermetrics.py +++ b/tests/unit/test_supermetrics.py @@ -9,7 +9,8 @@ def supermetrics_credentials(): @pytest.fixture(scope="function") def mock_get_source_credentials( - mocker, supermetrics_credentials: SupermetricsCredentials + mocker, + supermetrics_credentials: SupermetricsCredentials, ): return mocker.patch( "viadot.config.get_source_credentials", @@ -34,7 +35,7 @@ def supermetrics(supermetrics_credentials: SupermetricsCredentials): def test_to_json(mocker, supermetrics: Supermetrics): # Mock the handle_api_response function to simulate an API response mock_handle_api_response = mocker.patch( - "viadot.sources.supermetrics.handle_api_response" + "viadot.sources.supermetrics.handle_api_response", ) mock_response = { "data": [["value1", "value2"]], @@ -91,7 +92,7 @@ def test_to_df_with_data(supermetrics: Supermetrics, mocker): mock_method = mocker.patch("viadot.sources.supermetrics.Supermetrics.to_json") mock_method.return_value = mock_response mock_method = mocker.patch( - "viadot.sources.supermetrics.Supermetrics._get_col_names" + "viadot.sources.supermetrics.Supermetrics._get_col_names", ) mock_method.return_value = ["date", "sessions"] df = supermetrics.to_df() @@ -112,8 +113,8 @@ def test_get_col_names_google_analytics_pivoted(mocker, supermetrics: Supermetri "fields": [ {"field_name": "ga:date", "field_split": "column"}, {"field_name": "ga:sessions", "field_split": "row"}, - ] - } + ], + }, }, "data": [{"ga:date": "2023-01-01", "ga:sessions": 100}], } @@ -128,8 +129,8 @@ def test_get_col_names_google_analytics_non_pivoted(mocker, supermetrics: Superm "fields": [ {"field_name": "ga:date", "field_split": "row"}, {"field_name": "ga:sessions", "field_split": "row"}, - ] - } + ], + }, }, "data": [{"ga:date": "2023-01-01", "ga:sessions": 100}], } @@ -142,12 +143,13 @@ def test_to_df_metadata_columns(mocker, supermetrics: Supermetrics): mock_response = { "data": [["2023-01-01", 100]], "meta": { - "query": {"fields": [{"field_name": "date"}, {"field_name": "sessions"}]} + "query": {"fields": [{"field_name": "date"}, {"field_name": "sessions"}]}, }, } mocker.patch( - "viadot.sources.supermetrics.Supermetrics.to_json", return_value=mock_response + "viadot.sources.supermetrics.Supermetrics.to_json", + return_value=mock_response, ) mocker.patch( "viadot.sources.supermetrics.Supermetrics._get_col_names", @@ -169,8 +171,8 @@ def test_get_col_names_ga(mocker, supermetrics: Supermetrics): "fields": [ {"field_name": "ga:date", "field_split": "column"}, {"field_name": "ga:sessions", "field_split": "row"}, - ] - } + ], + }, }, "data": [{"ga:date": "2023-01-01", "ga:sessions": 100}], }, From 42d4cbf00ee23440a374db449e25de6038002917 Mon Sep 17 00:00:00 2001 From: angelika233 Date: Thu, 26 Sep 2024 09:42:12 +0200 Subject: [PATCH 05/10] Add pytest mocker --- pyproject.toml | 1 + requirements-dev.lock | 6 ++++-- requirements.lock | 13 ++++++++++++- 3 files changed, 17 insertions(+), 3 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 155ee859e..3472ea9c8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,6 +34,7 @@ dependencies = [ "numpy>=1.23.4, <2.0", "defusedxml>=0.7.1", "aiohttp>=3.10.5", + "pytest-mock>=3.14.0", ] requires-python = ">=3.10" readme = "README.md" diff --git a/requirements-dev.lock b/requirements-dev.lock index c346b284f..e74fc29b2 100644 --- a/requirements-dev.lock +++ b/requirements-dev.lock @@ -7,7 +7,6 @@ # all-features: false # with-sources: false # generate-hashes: false -# universal: false -e file:. aiohappyeyeballs==2.4.0 @@ -433,7 +432,10 @@ pyparsing==3.1.2 # via mike pytest==8.3.2 # via pytest-asyncio + # via pytest-mock pytest-asyncio==0.23.8 +pytest-mock==3.14.0 + # via viadot2 python-dateutil==2.9.0.post0 # via botocore # via croniter @@ -529,7 +531,7 @@ ruamel-yaml==0.18.6 # via prefect ruamel-yaml-clib==0.2.8 # via ruamel-yaml -ruff==0.6.1 +ruff==0.6.7 s3transfer==0.10.2 # via boto3 scipy==1.14.0 diff --git a/requirements.lock b/requirements.lock index 32c422a17..1f2dc9fca 100644 --- a/requirements.lock +++ b/requirements.lock @@ -7,7 +7,6 @@ # all-features: false # with-sources: false # generate-hashes: false -# universal: false -e file:. aiohappyeyeballs==2.4.0 @@ -87,6 +86,7 @@ et-xmlfile==1.1.0 exceptiongroup==1.2.2 # via anyio # via prefect + # via pytest frozenlist==1.4.1 # via aiohttp # via aiosignal @@ -129,6 +129,8 @@ imagehash==4.3.1 # via viadot2 importlib-resources==6.1.3 # via prefect +iniconfig==2.0.0 + # via pytest itsdangerous==2.2.0 # via prefect jinja2==3.1.4 @@ -186,6 +188,7 @@ orjson==3.10.7 # via prefect packaging==24.1 # via prefect + # via pytest pandas==2.2.2 # via viadot2 # via visions @@ -195,6 +198,8 @@ pendulum==2.1.2 # via prefect pillow==10.4.0 # via imagehash +pluggy==1.5.0 + # via pytest prefect==2.20.2 # via prefect-github # via prefect-sqlalchemy @@ -226,6 +231,10 @@ pygments==2.18.0 # via rich pyodbc==5.1.0 # via viadot2 +pytest==8.3.3 + # via pytest-mock +pytest-mock==3.14.0 + # via viadot2 python-dateutil==2.9.0.post0 # via croniter # via dateparser @@ -330,6 +339,8 @@ text-unidecode==1.3 # via python-slugify toml==0.10.2 # via prefect +tomli==2.0.1 + # via pytest trino==0.328.0 # via viadot2 typer==0.12.4 From c0a772ba8177cc5b49079be8067544be4d1c6b2a Mon Sep 17 00:00:00 2001 From: angelika233 Date: Thu, 26 Sep 2024 10:13:11 +0200 Subject: [PATCH 06/10] Format code --- src/viadot/__init__.py | 5 +-- .../orchestration/prefect/flows/__init__.py | 4 +- .../prefect/flows/supermetrics_to_adls.py | 10 ++--- .../prefect/tasks/supermetrics.py | 10 +++-- src/viadot/sources/__init__.py | 5 +-- src/viadot/sources/supermetrics.py | 45 ++++++++++--------- .../prefect/flows/test_supermetrics.py | 2 +- tests/unit/test_supermetrics.py | 6 +-- 8 files changed, 44 insertions(+), 43 deletions(-) diff --git a/src/viadot/__init__.py b/src/viadot/__init__.py index 6c5389e07..f7ff43fae 100644 --- a/src/viadot/__init__.py +++ b/src/viadot/__init__.py @@ -2,17 +2,16 @@ import logging - # Remove trash Azure INFO logs which contain low-level debugging information # but keep WARNING and higher ones in case something actually important happens. azure_clutter_logger_1 = logging.getLogger( - "azure.core.pipeline.policies.http_logging_policy" + "azure.core.pipeline.policies.http_logging_policy", ) azure_clutter_logger_1.setLevel(logging.WARNING) azure_clutter_logger_2 = logging.getLogger( - "azure.identity.aio._internal.get_token_mixin" + "azure.identity.aio._internal.get_token_mixin", ) azure_clutter_logger_2.setLevel(logging.WARNING) diff --git a/src/viadot/orchestration/prefect/flows/__init__.py b/src/viadot/orchestration/prefect/flows/__init__.py index 679f73be9..d3f84931a 100644 --- a/src/viadot/orchestration/prefect/flows/__init__.py +++ b/src/viadot/orchestration/prefect/flows/__init__.py @@ -20,10 +20,10 @@ from .sharepoint_to_redshift_spectrum import sharepoint_to_redshift_spectrum from .sharepoint_to_s3 import sharepoint_to_s3 from .sql_server_to_minio import sql_server_to_minio -from .transform import transform -from .transform_and_catalog import transform_and_catalog from .sql_server_to_parquet import sql_server_to_parquet from .supermetrics_to_adls import supermetrics_to_adls +from .transform import transform +from .transform_and_catalog import transform_and_catalog __all__ = [ "supermetrics_to_adls", diff --git a/src/viadot/orchestration/prefect/flows/supermetrics_to_adls.py b/src/viadot/orchestration/prefect/flows/supermetrics_to_adls.py index e2e0db8fd..429f66873 100644 --- a/src/viadot/orchestration/prefect/flows/supermetrics_to_adls.py +++ b/src/viadot/orchestration/prefect/flows/supermetrics_to_adls.py @@ -28,7 +28,7 @@ def supermetrics_to_adls( # noqa: PLR0913 adls_credentials_secret: str | None = None, adls_config_key: str | None = None, **kwargs: dict[str, Any] | None, -): +) -> None: """Flow to extract data from the Supermetrics API and save it to ADLS. This function queries data from the Supermetrics API using the provided query @@ -44,8 +44,8 @@ def supermetrics_to_adls( # noqa: PLR0913 adls_path (str, optional): The destination path in ADLS where the DataFrame will be saved. This should include the file name and extension (e.g., 'myfolder/myfile.csv'). If not - provided, the function will use a default path from the configuration or raise - an error. + provided, the function will use a default path from the configuration + or raise an error. overwrite (bool, optional): A flag indicating whether to overwrite the existing file in ADLS. If set to Falseand the file exists, an error will be raised. Default is False. @@ -58,8 +58,8 @@ def supermetrics_to_adls( # noqa: PLR0913 Defaults to None. adls_credentials_secret (str, optional): The name of the secret in the secret management system containing - the ADLS credentials. If not provided, the function will use credentials specified - in the configuration. + the ADLS credentials. If not provided, the function will use credentials + specified in the configuration. adls_config_key (str, optional): The key in the viadot configuration holding relevant credentials. Defaults to None. diff --git a/src/viadot/orchestration/prefect/tasks/supermetrics.py b/src/viadot/orchestration/prefect/tasks/supermetrics.py index 03dfc0286..dd7874bda 100644 --- a/src/viadot/orchestration/prefect/tasks/supermetrics.py +++ b/src/viadot/orchestration/prefect/tasks/supermetrics.py @@ -1,3 +1,5 @@ +"""Task for connecting to Supermetrics API.""" + import pandas as pd from prefect import task @@ -15,11 +17,11 @@ def supermetrics_to_df( """Task to retrive data from Supermetrics and returns it as a pandas DataFrame. This function queries the Supermetrics API using the provided query parameters and - returns the data as a pandas DataFrame. The function supports both configuration-based - and secret-based credentials. + returns the data as a pandas DataFrame. The function supports both + configuration-based and secret-based credentials. - The function is decorated with a Prefect task, allowing it to handle retries, logging, - and timeout behavior. + The function is decorated with a Prefect task, allowing it to handle retries, + logging, and timeout behavior. Args: ---- diff --git a/src/viadot/sources/__init__.py b/src/viadot/sources/__init__.py index 36e4638da..c86da2646 100644 --- a/src/viadot/sources/__init__.py +++ b/src/viadot/sources/__init__.py @@ -2,7 +2,6 @@ from importlib.util import find_spec -from .supermetrics import Supermetrics, SupermetricsCredentials from ._duckdb import DuckDB from ._trino import Trino from .cloud_for_customers import CloudForCustomers @@ -14,9 +13,9 @@ from .outlook import Outlook from .sharepoint import Sharepoint from .sql_server import SQLServer +from .supermetrics import Supermetrics, SupermetricsCredentials from .uk_carbon_intensity import UKCarbonIntensity - __all__ = [ "CloudForCustomers", "Epicor", @@ -30,7 +29,7 @@ "SQLServer", "UKCarbonIntensity", "Supermetrics", - "SupermetricsCredentials" + "SupermetricsCredentials", ] if find_spec("adlfs"): diff --git a/src/viadot/sources/supermetrics.py b/src/viadot/sources/supermetrics.py index 68dc134fb..e2755a87d 100644 --- a/src/viadot/sources/supermetrics.py +++ b/src/viadot/sources/supermetrics.py @@ -1,5 +1,7 @@ +"""Source for connecting to Supermetrics API.""" + import json -from typing import Any, Dict, List +from typing import Any import numpy as np import pandas as pd @@ -63,10 +65,10 @@ def __init__( *args, credentials: dict[str, Any] | None = None, config_key: str = None, - query_params: Dict[str, Any] = None, + query_params: dict[str, Any] = None, **kwargs, ) -> None: - """Initializes the Supermetrics object. + """Initialize the Supermetrics object. This constructor sets up the necessary components to interact with the Supermetrics API, including the credentials and any query parameters. @@ -74,8 +76,8 @@ def __init__( Args: ---- credentials (SupermetricsCredentials, optional): - An instance of `SupermetricsCredentials` containing the API key and user email - for authentication. Defaults to None. + An instance of `SupermetricsCredentials` containing the API key and + user email for authentication. Defaults to None. config_key (str, optional): The key in the viadot configuration that holds the relevant credentials for the API. Defaults to None. @@ -97,8 +99,8 @@ def __init__( self.api_key = self.credentials["api_key"] self.user = self.credentials["user"] - def to_json(self, timeout=(3.05, 60 * 30)) -> Dict[str, Any]: - """Downloads query results to a dictionary. + def to_json(self, timeout=(3.05, 60 * 30)) -> dict[str, Any]: + """Download query results to a dictionary. This method executes the query against the Supermetrics API and retrieves the results as a JSON dictionary. @@ -107,9 +109,10 @@ def to_json(self, timeout=(3.05, 60 * 30)) -> Dict[str, Any]: ---- timeout (tuple of float, optional): A tuple specifying the timeout values for the request. The first value - is the timeout for connection issues, and the second value is the timeout - for query execution. Defaults to (3.05, 1800), which provides a short - timeout for connection issues and a longer timeout for the query execution. + is the timeout for connection issues, and the second value is + the timeout for query execution. Defaults to (3.05, 1800), which + provides a short timeout for connection issues and a longer timeout + for the query execution. Returns: ------- @@ -123,7 +126,8 @@ def to_json(self, timeout=(3.05, 60 * 30)) -> Dict[str, Any]: """ if not self.query_params: - raise ValueError("Please build the query first") + msg = "Please build the query first" + raise ValueError(msg) params = {"json": json.dumps(self.query_params)} headers = {"Authorization": f"Bearer {self.api_key}"} @@ -140,8 +144,8 @@ def to_json(self, timeout=(3.05, 60 * 30)) -> Dict[str, Any]: def _get_col_names_google_analytics( cls, response: dict, - ) -> List[str]: - """Gets column names from Google Analytics data. + ) -> list[str]: + """Get column names from Google Analytics data. This method extracts the column names from the JSON response received from a Google Analytics API call. @@ -179,8 +183,8 @@ def _get_col_names_google_analytics( return columns @classmethod - def _get_col_names_other(cls, response: dict) -> List[str]: - """Gets column names from non-Google Analytics data. + def _get_col_names_other(cls, response: dict) -> list[str]: + """Get column names from non-Google Analytics data. This method extracts the column names from the JSON response received from an API call that is not related to Google Analytics. @@ -199,15 +203,12 @@ def _get_col_names_other(cls, response: dict) -> List[str]: cols_meta = response["meta"]["query"]["fields"] return [col_meta["field_name"] for col_meta in cols_meta] - def _get_col_names(self) -> List[str]: - """Gets column names based on the data type. + def _get_col_names(self) -> list[str]: + """Get column names based on the data type. This method determines the appropriate column names for the data based on its type, whether it's Google Analytics data or another type. - - - Returns ------- list of str: @@ -229,9 +230,9 @@ def _get_col_names(self) -> List[str]: def to_df( self, if_empty: str = "warn", - query_params: Dict[str, Any] = None, + query_params: dict[str, Any] = None, ) -> pd.DataFrame: - """Downloads data into a pandas DataFrame. + """Download data into a pandas DataFrame. This method retrieves data from the Supermetrics API and loads it into a pandas DataFrame. diff --git a/tests/integration/orchestration/prefect/flows/test_supermetrics.py b/tests/integration/orchestration/prefect/flows/test_supermetrics.py index 74948deee..b574debe9 100644 --- a/tests/integration/orchestration/prefect/flows/test_supermetrics.py +++ b/tests/integration/orchestration/prefect/flows/test_supermetrics.py @@ -4,7 +4,7 @@ @pytest.mark.parametrize( - "supermetrics_config_key,adls_credentials_secret", + ("supermetrics_config_key", "adls_credentials_secret"), [ ("supermetrics", "supermetrics"), ], diff --git a/tests/unit/test_supermetrics.py b/tests/unit/test_supermetrics.py index 39ca35839..260341068 100644 --- a/tests/unit/test_supermetrics.py +++ b/tests/unit/test_supermetrics.py @@ -2,12 +2,12 @@ from viadot.sources import Supermetrics, SupermetricsCredentials -@pytest.fixture(scope="function") +@pytest.fixture() def supermetrics_credentials(): return SupermetricsCredentials(user="test_user", api_key="test_key") -@pytest.fixture(scope="function") +@pytest.fixture() def mock_get_source_credentials( mocker, supermetrics_credentials: SupermetricsCredentials, @@ -21,7 +21,7 @@ def mock_get_source_credentials( ) -@pytest.fixture(scope="function") +@pytest.fixture() def supermetrics(supermetrics_credentials: SupermetricsCredentials): return Supermetrics( credentials={ From 4a70b46763cc425a431e70eec232622dc57d75d2 Mon Sep 17 00:00:00 2001 From: angelika233 Date: Thu, 26 Sep 2024 10:23:35 +0200 Subject: [PATCH 07/10] format with ruff --- .../prefect/flows/supermetrics_to_adls.py | 6 ++++-- src/viadot/sources/__init__.py | 2 +- src/viadot/sources/supermetrics.py | 15 ++++++++------- tests/unit/test_supermetrics.py | 10 +++++----- 4 files changed, 18 insertions(+), 15 deletions(-) diff --git a/src/viadot/orchestration/prefect/flows/supermetrics_to_adls.py b/src/viadot/orchestration/prefect/flows/supermetrics_to_adls.py index 429f66873..469ad32cc 100644 --- a/src/viadot/orchestration/prefect/flows/supermetrics_to_adls.py +++ b/src/viadot/orchestration/prefect/flows/supermetrics_to_adls.py @@ -1,3 +1,5 @@ +"""Flow for downloading the data from Superpetrics and uploading it to ADLS.""" + from typing import Any from prefect import flow @@ -11,12 +13,12 @@ @flow( name="Supermetrics extraction to ADLS", - description="Extract data from Supermetrics and load it into Azure Data Lake Storage.", + description="Extract data from Supermetrics and load it into ADLS.", retries=1, retry_delay_seconds=60, task_runner=ConcurrentTaskRunner, ) -def supermetrics_to_adls( # noqa: PLR0913 +def supermetrics_to_adls( # supermetrics query_params: dict[str, Any] | None = None, # ADLS diff --git a/src/viadot/sources/__init__.py b/src/viadot/sources/__init__.py index c86da2646..bf0965bf6 100644 --- a/src/viadot/sources/__init__.py +++ b/src/viadot/sources/__init__.py @@ -29,7 +29,7 @@ "SQLServer", "UKCarbonIntensity", "Supermetrics", - "SupermetricsCredentials", + "SupermetricsCredentials", # pragma: allowlist-secret ] if find_spec("adlfs"): diff --git a/src/viadot/sources/supermetrics.py b/src/viadot/sources/supermetrics.py index e2755a87d..ae0516d07 100644 --- a/src/viadot/sources/supermetrics.py +++ b/src/viadot/sources/supermetrics.py @@ -1,13 +1,14 @@ """Source for connecting to Supermetrics API.""" import json -from typing import Any +from typing import Any, Optional import numpy as np import pandas as pd from pydantic import BaseModel -from ..config import get_source_credentials +from viadot.config import get_source_credentials + from ..exceptions import CredentialError from ..utils import add_viadot_metadata_columns, handle_api_response from .base import Source @@ -19,7 +20,7 @@ class SupermetricsCredentials(BaseModel): This class encapsulates the necessary credentials required to authenticate and access the Supermetrics API. - Attributes + Attributes: ---------- user (str): The email account associated with the Supermetrics user. @@ -65,7 +66,7 @@ def __init__( *args, credentials: dict[str, Any] | None = None, config_key: str = None, - query_params: dict[str, Any] = None, + query_params: dict[str, Any] | None = None, **kwargs, ) -> None: """Initialize the Supermetrics object. @@ -209,12 +210,12 @@ def _get_col_names(self) -> list[str]: This method determines the appropriate column names for the data based on its type, whether it's Google Analytics data or another type. - Returns + Returns: ------- list of str: A list of column names based on the data type. - Raises + Raises: ------ ValueError: Raised if the column names cannot be determined. @@ -230,7 +231,7 @@ def _get_col_names(self) -> list[str]: def to_df( self, if_empty: str = "warn", - query_params: dict[str, Any] = None, + query_params: dict[str, Any] | None = None, ) -> pd.DataFrame: """Download data into a pandas DataFrame. diff --git a/tests/unit/test_supermetrics.py b/tests/unit/test_supermetrics.py index 260341068..8ea8f3518 100644 --- a/tests/unit/test_supermetrics.py +++ b/tests/unit/test_supermetrics.py @@ -2,12 +2,12 @@ from viadot.sources import Supermetrics, SupermetricsCredentials -@pytest.fixture() +@pytest.fixture def supermetrics_credentials(): return SupermetricsCredentials(user="test_user", api_key="test_key") -@pytest.fixture() +@pytest.fixture def mock_get_source_credentials( mocker, supermetrics_credentials: SupermetricsCredentials, @@ -21,7 +21,7 @@ def mock_get_source_credentials( ) -@pytest.fixture() +@pytest.fixture def supermetrics(supermetrics_credentials: SupermetricsCredentials): return Supermetrics( credentials={ @@ -106,7 +106,7 @@ def test_to_df_with_data(supermetrics: Supermetrics, mocker): ] -def test_get_col_names_google_analytics_pivoted(mocker, supermetrics: Supermetrics): +def test_get_col_names_google_analytics_pivoted(supermetrics: Supermetrics): mock_response = { "meta": { "query": { @@ -122,7 +122,7 @@ def test_get_col_names_google_analytics_pivoted(mocker, supermetrics: Supermetri assert columns == {"ga:date": "2023-01-01", "ga:sessions": 100} -def test_get_col_names_google_analytics_non_pivoted(mocker, supermetrics: Supermetrics): +def test_get_col_names_google_analytics_non_pivoted(supermetrics: Supermetrics): mock_response = { "meta": { "query": { From d2783c8478ecfa6c23f8a9e5cf36e71f676562a9 Mon Sep 17 00:00:00 2001 From: angelika233 Date: Thu, 26 Sep 2024 10:32:09 +0200 Subject: [PATCH 08/10] Format with ruff --- src/viadot/sources/__init__.py | 8 -------- src/viadot/sources/supermetrics.py | 4 ++-- tests/unit/test_supermetrics.py | 4 +++- 3 files changed, 5 insertions(+), 11 deletions(-) diff --git a/src/viadot/sources/__init__.py b/src/viadot/sources/__init__.py index bf0965bf6..2ea796ee2 100644 --- a/src/viadot/sources/__init__.py +++ b/src/viadot/sources/__init__.py @@ -31,38 +31,30 @@ "Supermetrics", "SupermetricsCredentials", # pragma: allowlist-secret ] - if find_spec("adlfs"): from viadot.sources.azure_data_lake import AzureDataLake # noqa: F401 __all__.extend(["AzureDataLake"]) - if find_spec("duckdb"): from viadot.sources._duckdb import DuckDB # noqa: F401 __all__.extend(["DuckDB"]) - if find_spec("redshift_connector"): from viadot.sources.redshift_spectrum import RedshiftSpectrum # noqa: F401 __all__.extend(["RedshiftSpectrum"]) - if find_spec("s3fs"): from viadot.sources.s3 import S3 # noqa: F401 __all__.extend(["S3"]) - if find_spec("s3fs"): from viadot.sources.minio import MinIO # noqa: F401 __all__.extend(["MinIO"]) - - if find_spec("pyrfc"): from viadot.sources.sap_rfc import SAPRFC, SAPRFCV2 # noqa: F401 __all__.extend(["SAPRFC", "SAPRFCV2"]) - if find_spec("pyspark"): from viadot.sources.databricks import Databricks # noqa: F401 diff --git a/src/viadot/sources/supermetrics.py b/src/viadot/sources/supermetrics.py index ae0516d07..80e9025e9 100644 --- a/src/viadot/sources/supermetrics.py +++ b/src/viadot/sources/supermetrics.py @@ -1,7 +1,7 @@ """Source for connecting to Supermetrics API.""" import json -from typing import Any, Optional +from typing import Any import numpy as np import pandas as pd @@ -65,7 +65,7 @@ def __init__( self, *args, credentials: dict[str, Any] | None = None, - config_key: str = None, + config_key: str | None = None, query_params: dict[str, Any] | None = None, **kwargs, ) -> None: diff --git a/tests/unit/test_supermetrics.py b/tests/unit/test_supermetrics.py index 8ea8f3518..9d8aad617 100644 --- a/tests/unit/test_supermetrics.py +++ b/tests/unit/test_supermetrics.py @@ -4,7 +4,9 @@ @pytest.fixture def supermetrics_credentials(): - return SupermetricsCredentials(user="test_user", api_key="test_key") + return SupermetricsCredentials( + user="test_user", api_key="test_key" + ) # pragma: allowlist secret @pytest.fixture From 962baa8f7852923ca8485948367448d84b74185c Mon Sep 17 00:00:00 2001 From: angelika233 Date: Thu, 26 Sep 2024 10:54:36 +0200 Subject: [PATCH 09/10] Organize imports --- src/viadot/orchestration/prefect/flows/__init__.py | 4 ++-- .../prefect/flows/supermetrics_to_adls.py | 3 ++- src/viadot/orchestration/prefect/tasks/__init__.py | 12 ++++++------ src/viadot/sources/__init__.py | 8 ++++---- src/viadot/sources/supermetrics.py | 8 ++++---- tests/unit/test_supermetrics.py | 5 +++-- 6 files changed, 21 insertions(+), 19 deletions(-) diff --git a/src/viadot/orchestration/prefect/flows/__init__.py b/src/viadot/orchestration/prefect/flows/__init__.py index d3f84931a..9d2a6621f 100644 --- a/src/viadot/orchestration/prefect/flows/__init__.py +++ b/src/viadot/orchestration/prefect/flows/__init__.py @@ -26,16 +26,15 @@ from .transform_and_catalog import transform_and_catalog __all__ = [ - "supermetrics_to_adls", "cloud_for_customers_to_adls", "cloud_for_customers_to_databricks", "duckdb_to_parquet", "duckdb_to_sql_server", "duckdb_transform", "epicor_to_parquet", + "exchange_rates_api_to_redshift_spectrum", "exchange_rates_to_adls", "exchange_rates_to_databricks", - "exchange_rates_api_to_redshift_spectrum", "genesys_to_adls", "hubspot_to_adls", "mindful_to_adls", @@ -48,6 +47,7 @@ "sharepoint_to_s3", "sql_server_to_minio", "sql_server_to_parquet", + "supermetrics_to_adls", "transform", "transform_and_catalog", ] diff --git a/src/viadot/orchestration/prefect/flows/supermetrics_to_adls.py b/src/viadot/orchestration/prefect/flows/supermetrics_to_adls.py index 469ad32cc..63d62a621 100644 --- a/src/viadot/orchestration/prefect/flows/supermetrics_to_adls.py +++ b/src/viadot/orchestration/prefect/flows/supermetrics_to_adls.py @@ -72,7 +72,8 @@ def supermetrics_to_adls( Raises: ------ ValueError: - If `adls_path` is not provided and cannot be determined from the configuration. + If `adls_path` is not provided and cannot be determined from + the configuration. """ df = supermetrics_to_df( diff --git a/src/viadot/orchestration/prefect/tasks/__init__.py b/src/viadot/orchestration/prefect/tasks/__init__.py index 2c926b7b2..5467b90e8 100644 --- a/src/viadot/orchestration/prefect/tasks/__init__.py +++ b/src/viadot/orchestration/prefect/tasks/__init__.py @@ -27,27 +27,27 @@ __all__ = [ "adls_upload", - "df_to_adls", "bcp", + "clone_repo", "cloud_for_customers_to_df", - "df_to_databricks", + "create_sql_server_table", "dbt_task", + "df_to_adls", + "df_to_databricks", + "df_to_minio", + "df_to_redshift_spectrum", "duckdb_query", "epicor_to_df", "exchange_rates_to_df", "genesys_to_df", - "clone_repo", "hubspot_to_df", "luma_ingest_task", "mindful_to_df", - "df_to_minio", "outlook_to_df", - "df_to_redshift_spectrum", "s3_upload_file", "sap_rfc_to_df", "sharepoint_download_file", "sharepoint_to_df", - "create_sql_server_table", "sql_server_query", "sql_server_to_df", "supermetrics_to_df", diff --git a/src/viadot/sources/__init__.py b/src/viadot/sources/__init__.py index 2ea796ee2..bbc1f0f92 100644 --- a/src/viadot/sources/__init__.py +++ b/src/viadot/sources/__init__.py @@ -21,15 +21,15 @@ "Epicor", "ExchangeRates", "Genesys", - "Outlook", "Hubspot", "Mindful", - "Sharepoint", - "Trino", + "Outlook", "SQLServer", - "UKCarbonIntensity", + "Sharepoint", "Supermetrics", "SupermetricsCredentials", # pragma: allowlist-secret + "Trino", + "UKCarbonIntensity", ] if find_spec("adlfs"): from viadot.sources.azure_data_lake import AzureDataLake # noqa: F401 diff --git a/src/viadot/sources/supermetrics.py b/src/viadot/sources/supermetrics.py index 80e9025e9..5e41f9749 100644 --- a/src/viadot/sources/supermetrics.py +++ b/src/viadot/sources/supermetrics.py @@ -20,7 +20,7 @@ class SupermetricsCredentials(BaseModel): This class encapsulates the necessary credentials required to authenticate and access the Supermetrics API. - Attributes: + Attributes ---------- user (str): The email account associated with the Supermetrics user. @@ -100,7 +100,7 @@ def __init__( self.api_key = self.credentials["api_key"] self.user = self.credentials["user"] - def to_json(self, timeout=(3.05, 60 * 30)) -> dict[str, Any]: + def to_json(self, timeout: tuple = (3.05, 60 * 30)) -> dict[str, Any]: """Download query results to a dictionary. This method executes the query against the Supermetrics API and retrieves @@ -210,12 +210,12 @@ def _get_col_names(self) -> list[str]: This method determines the appropriate column names for the data based on its type, whether it's Google Analytics data or another type. - Returns: + Returns ------- list of str: A list of column names based on the data type. - Raises: + Raises ------ ValueError: Raised if the column names cannot be determined. diff --git a/tests/unit/test_supermetrics.py b/tests/unit/test_supermetrics.py index 9d8aad617..8b9d1ba39 100644 --- a/tests/unit/test_supermetrics.py +++ b/tests/unit/test_supermetrics.py @@ -5,8 +5,9 @@ @pytest.fixture def supermetrics_credentials(): return SupermetricsCredentials( - user="test_user", api_key="test_key" - ) # pragma: allowlist secret + user="test_user", + api_key="test_key", # pragma: allowlist secret + ) @pytest.fixture From 92d7678791edd3014d4cbad1ed8510df64c644e1 Mon Sep 17 00:00:00 2001 From: angelika233 Date: Thu, 26 Sep 2024 11:20:14 +0200 Subject: [PATCH 10/10] Organize imports --- src/viadot/__init__.py | 1 + src/viadot/orchestration/prefect/flows/__init__.py | 1 + src/viadot/orchestration/prefect/tasks/__init__.py | 1 + src/viadot/sources/__init__.py | 1 + src/viadot/sources/supermetrics.py | 6 +++--- .../orchestration/prefect/flows/test_supermetrics.py | 1 + tests/unit/test_supermetrics.py | 1 + 7 files changed, 9 insertions(+), 3 deletions(-) diff --git a/src/viadot/__init__.py b/src/viadot/__init__.py index f7ff43fae..6f31c5d9b 100644 --- a/src/viadot/__init__.py +++ b/src/viadot/__init__.py @@ -2,6 +2,7 @@ import logging + # Remove trash Azure INFO logs which contain low-level debugging information # but keep WARNING and higher ones in case something actually important happens. diff --git a/src/viadot/orchestration/prefect/flows/__init__.py b/src/viadot/orchestration/prefect/flows/__init__.py index 9d2a6621f..73cff365d 100644 --- a/src/viadot/orchestration/prefect/flows/__init__.py +++ b/src/viadot/orchestration/prefect/flows/__init__.py @@ -25,6 +25,7 @@ from .transform import transform from .transform_and_catalog import transform_and_catalog + __all__ = [ "cloud_for_customers_to_adls", "cloud_for_customers_to_databricks", diff --git a/src/viadot/orchestration/prefect/tasks/__init__.py b/src/viadot/orchestration/prefect/tasks/__init__.py index 5467b90e8..965c4e11f 100644 --- a/src/viadot/orchestration/prefect/tasks/__init__.py +++ b/src/viadot/orchestration/prefect/tasks/__init__.py @@ -25,6 +25,7 @@ from .sql_server import create_sql_server_table, sql_server_query, sql_server_to_df from .supermetrics import supermetrics_to_df + __all__ = [ "adls_upload", "bcp", diff --git a/src/viadot/sources/__init__.py b/src/viadot/sources/__init__.py index bbc1f0f92..d05d91fbf 100644 --- a/src/viadot/sources/__init__.py +++ b/src/viadot/sources/__init__.py @@ -16,6 +16,7 @@ from .supermetrics import Supermetrics, SupermetricsCredentials from .uk_carbon_intensity import UKCarbonIntensity + __all__ = [ "CloudForCustomers", "Epicor", diff --git a/src/viadot/sources/supermetrics.py b/src/viadot/sources/supermetrics.py index 5e41f9749..4010c9ed9 100644 --- a/src/viadot/sources/supermetrics.py +++ b/src/viadot/sources/supermetrics.py @@ -20,7 +20,7 @@ class SupermetricsCredentials(BaseModel): This class encapsulates the necessary credentials required to authenticate and access the Supermetrics API. - Attributes + Attributes: ---------- user (str): The email account associated with the Supermetrics user. @@ -210,12 +210,12 @@ def _get_col_names(self) -> list[str]: This method determines the appropriate column names for the data based on its type, whether it's Google Analytics data or another type. - Returns + Returns: ------- list of str: A list of column names based on the data type. - Raises + Raises: ------ ValueError: Raised if the column names cannot be determined. diff --git a/tests/integration/orchestration/prefect/flows/test_supermetrics.py b/tests/integration/orchestration/prefect/flows/test_supermetrics.py index b574debe9..fb59da86e 100644 --- a/tests/integration/orchestration/prefect/flows/test_supermetrics.py +++ b/tests/integration/orchestration/prefect/flows/test_supermetrics.py @@ -1,4 +1,5 @@ import pytest + from viadot.config import get_source_config from viadot.orchestration.prefect.flows import supermetrics_to_adls diff --git a/tests/unit/test_supermetrics.py b/tests/unit/test_supermetrics.py index 8b9d1ba39..0cfa48cd8 100644 --- a/tests/unit/test_supermetrics.py +++ b/tests/unit/test_supermetrics.py @@ -1,4 +1,5 @@ import pytest + from viadot.sources import Supermetrics, SupermetricsCredentials