Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Add Supermetrics source and Prefect tasks #1054

Merged
merged 11 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/viadot/orchestration/prefect/flows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
from .sharepoint_to_redshift_spectrum import sharepoint_to_redshift_spectrum
from .sharepoint_to_s3 import sharepoint_to_s3
from .sql_server_to_minio import sql_server_to_minio
from .sql_server_to_parquet import sql_server_to_parquet
from .transform import transform
from .transform_and_catalog import transform_and_catalog

from .sql_server_to_parquet import sql_server_to_parquet
from .supermetrics_to_adls import supermetrics_to_adls

__all__ = [
"supermetrics_to_adls",
"cloud_for_customers_to_adls",
"cloud_for_customers_to_databricks",
"duckdb_to_parquet",
Expand Down
88 changes: 88 additions & 0 deletions src/viadot/orchestration/prefect/flows/supermetrics_to_adls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from typing import Any

from prefect import flow
from prefect.task_runners import ConcurrentTaskRunner

from viadot.orchestration.prefect.tasks import (
supermetrics_to_df,
df_to_adls,
)


@flow(
name="Supermetrics extraction to ADLS",
description="Extract data from Supermetrics and load it into Azure Data Lake Storage.",
retries=1,
retry_delay_seconds=60,
task_runner=ConcurrentTaskRunner,
)
def supermetrics_to_adls( # noqa: PLR0913
# supermetrics
query_params: dict[str, Any] | None = None,
# ADLS
adls_path: str | None = None,
overwrite: bool = False,
# Auth
supermetrics_credentials_secret: str | None = None,
supermetrics_config_key: str | None = None,
adls_credentials_secret: str | None = None,
adls_config_key: str | None = None,
**kwargs: dict[str, Any] | None,
):
"""
Extract data from the Supermetrics API and save it to Azure Data Lake Storage (ADLS).

This function queries data from the Supermetrics API using the provided query parameters
and saves the resulting DataFrame to Azure Data Lake Storage (ADLS) as a file.

Args:
query_params (dict[str, Any], optional):
A dictionary of query parameters for the Supermetrics API. These parameters
specify the data to retrieve from Supermetrics. If not provided, the default
parameters from the Supermetrics configuration will be used.

adls_path (str, optional):
The destination path in ADLS where the DataFrame will be saved. This should
include the file name and extension (e.g., 'myfolder/myfile.csv'). If not provided,
the function will use a default path from the configuration or raise an error.

overwrite (bool, optional):
A flag indicating whether to overwrite the existing file in ADLS. If set to False
and the file exists, an error will be raised. Default is False.

supermetrics_credentials_secret (str, optional):
The name of the secret in the secret management system containing the Supermetrics API credentials.
If not provided, the function will use credentials specified in the configuration.

supermetrics_config_key (str, optional):
The key in the viadot configuration holding relevant credentials. Defaults to None.

adls_credentials_secret (str, optional):
The name of the secret in the secret management system containing the ADLS credentials.
If not provided, the function will use credentials specified in the configuration.

adls_config_key (str, optional):
The key in the viadot configuration holding relevant credentials. Defaults to None.

**kwargs (dict[str, Any], optional):
Additional keyword arguments to pass to the `supermetrics_to_df` function for further customization
of the Supermetrics query.

Raises:
ValueError:
If `adls_path` is not provided and cannot be determined from the configuration.
"""
df = supermetrics_to_df(
query_params = query_params,
credentials_secret=supermetrics_credentials_secret,
config_key=supermetrics_config_key,
**kwargs,
)

return df_to_adls(
df=df,
path=adls_path,
credentials_secret=adls_credentials_secret,
config_key=adls_config_key,
overwrite=overwrite,
)
3 changes: 2 additions & 1 deletion src/viadot/orchestration/prefect/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
sharepoint_to_df,
)
from .sql_server import create_sql_server_table, sql_server_query, sql_server_to_df

from .supermetrics import supermetrics_to_df

__all__ = [
"adls_upload",
Expand All @@ -48,4 +48,5 @@
"create_sql_server_table",
"sql_server_query",
"sql_server_to_df",
"supermetrics_to_df",
]
61 changes: 61 additions & 0 deletions src/viadot/orchestration/prefect/tasks/supermetrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from typing import Any

import pandas as pd
from prefect import task

from viadot.orchestration.prefect.exceptions import MissingSourceCredentialsError
from viadot.orchestration.prefect.utils import get_credentials
from viadot.sources import Supermetrics


@task(retries=3, log_prints=True, retry_delay_seconds=10, timeout_seconds=60 * 60)
def supermetrics_to_df(
query_params: dict,
config_key: str | None = None,
credentials_secret: str | None = None,
) -> pd.DataFrame:
"""
Retrieves data from Supermetrics and returns it as a pandas DataFrame.

This function queries the Supermetrics API using the provided query parameters and
returns the data as a pandas DataFrame. The function supports both configuration-based
and secret-based credentials.

The function is decorated with a Prefect task, allowing it to handle retries, logging,
and timeout behavior.

Args:
query_params (dict):
A dictionary containing the parameters for querying the Supermetrics API.
These parameters define what data to retrieve and how the query should be constructed.
config_key (str, optional): The key in the viadot config holding relevant
credentials. Defaults to None.
credentials_secret (str, optional):
The name of the secret in your secret management system that contains the Supermetrics API
credentials. If `config_key` is not provided, this secret is used to authenticate with
the Supermetrics API.

Returns:
pd.DataFrame:
A pandas DataFrame containing the data retrieved from Supermetrics based on the provided
query parameters.

Raises:
MissingSourceCredentialsError:
Raised if neither `credentials_secret` nor `config_key` is provided, indicating that no
valid credentials were supplied to access the Supermetrics API.
"""
if not (credentials_secret or config_key):
raise MissingSourceCredentialsError

if not config_key:
credentials = get_credentials(credentials_secret)
else:
credentials = None

supermetrics = Supermetrics(
credentials=credentials,
config_key=config_key,
)
return supermetrics.to_df(query_params=query_params)

3 changes: 3 additions & 0 deletions src/viadot/sources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from importlib.util import find_spec

from .supermetrics import Supermetrics, SupermetricsCredentials
from .cloud_for_customers import CloudForCustomers
from .duckdb import DuckDB
from .exchange_rates import ExchangeRates
Expand All @@ -26,6 +27,8 @@
"Trino",
"SQLServer",
"UKCarbonIntensity",
"Supermetrics",
"SupermetricsCredentials"
]

if find_spec("adlfs"):
Expand Down
Loading
Loading