Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Add VidClub source and Prefect tasks #1044

Open
wants to merge 21 commits into
base: 2.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/viadot/orchestration/prefect/flows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from .supermetrics_to_adls import supermetrics_to_adls
from .transform import transform
from .transform_and_catalog import transform_and_catalog
from .vid_club_to_adls import vid_club_to_adls


__all__ = [
Expand Down Expand Up @@ -63,4 +64,5 @@
"supermetrics_to_adls",
"transform",
"transform_and_catalog",
"vid_club_to_adls",
]
98 changes: 98 additions & 0 deletions src/viadot/orchestration/prefect/flows/vid_club_to_adls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
"""Download data from Vid CLub API and load it into Azure Data Lake Storage."""

from typing import Any, Literal

from prefect import flow
from prefect.task_runners import ConcurrentTaskRunner

from viadot.orchestration.prefect.tasks import df_to_adls, vid_club_to_df


@flow(
name="Vid CLub extraction to ADLS",
description="Extract data from Vid CLub and load it into Azure Data Lake Storage.",
retries=1,
retry_delay_seconds=60,
task_runner=ConcurrentTaskRunner,
)
def vid_club_to_adls( # noqa: PLR0913
*args: list[Any],
endpoint: Literal["jobs", "product", "company", "survey"] | None = None,
from_date: str = "2022-03-22",
to_date: str | None = None,
items_per_page: int = 100,
region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] | None = None,
days_interval: int = 30,
cols_to_drop: list[str] | None = None,
config_key: str | None = None,
azure_key_vault_secret: str | None = None,
adls_config_key: str | None = None,
adls_azure_key_vault_secret: str | None = None,
adls_path: str | None = None,
adls_path_overwrite: bool = False,
validate_df_dict: dict | None = None,
timeout: int = 3600,
**kwargs: dict[str, Any],
) -> None:
"""Flow for downloading data from the Vid Club via API to a CSV or Parquet file.

Then upload it to Azure Data Lake.

Args:
endpoint (Literal["jobs", "product", "company", "survey"], optional): The
endpoint source to be accessed. Defaults to None.
from_date (str, optional): Start date for the query, by default is the oldest
date in the data 2022-03-22.
to_date (str, optional): End date for the query. By default None,
which will be executed as datetime.today().strftime("%Y-%m-%d") in code.
items_per_page (int, optional): Number of entries per page. Defaults to 100.
region (Literal["bg", "hu", "hr", "pl", "ro", "si", "all"], optional): Region
filter for the query. Defaults to None (parameter is not used in url).
[December 2023 status: value 'all' does not work for company and jobs]
days_interval (int, optional): Days specified in date range per API call
(test showed that 30-40 is optimal for performance). Defaults to 30.
cols_to_drop (List[str], optional): List of columns to drop. Defaults to None.
config_key (str, optional): The key in the viadot config holding relevant
credentials. Defaults to None.
azure_key_vault_secret (Optional[str], optional): The name of the Azure Key
Vault secret where credentials are stored. Defaults to None.
adls_config_key (Optional[str], optional): The key in the viadot config holding
relevant credentials. Defaults to None.
adls_azure_key_vault_secret (Optional[str], optional): The name of the Azure Key
Vault secret containing a dictionary with ACCOUNT_NAME and Service Principal
credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET) for the Azure Data Lake.
Defaults to None.
adls_path (Optional[str], optional): Azure Data Lake destination file path.
Defaults to None.
adls_path_overwrite (bool, optional): Whether to overwrite the file in ADLS.
Defaults to True.
validate_df_dict (dict, optional): A dictionary with optional list of tests
to verify the output
dataframe. If defined, triggers the `validate_df` task from task_utils.
Defaults to None.
timeout (int, optional): The time (in seconds) to wait while running this task
before a timeout occurs. Defaults to 3600.
"""
data_frame = vid_club_to_df(
args=args,
endpoint=endpoint,
from_date=from_date,
to_date=to_date,
items_per_page=items_per_page,
region=region,
days_interval=days_interval,
cols_to_drop=cols_to_drop,
config_key=config_key,
azure_key_vault_secret=azure_key_vault_secret,
validate_df_dict=validate_df_dict,
timeout=timeout,
kawrgs=kwargs,
)

return df_to_adls(
df=data_frame,
path=adls_path,
credentials_secret=adls_azure_key_vault_secret,
config_key=adls_config_key,
overwrite=adls_path_overwrite,
)
2 changes: 2 additions & 0 deletions src/viadot/orchestration/prefect/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from .sharepoint import sharepoint_download_file, sharepoint_to_df
from .sql_server import create_sql_server_table, sql_server_query, sql_server_to_df
from .supermetrics import supermetrics_to_df
from .vid_club import vid_club_to_df


__all__ = [
Expand Down Expand Up @@ -62,5 +63,6 @@
"sharepoint_to_df",
"sql_server_query",
"sql_server_to_df",
"vid_club_to_df",
"supermetrics_to_df",
]
79 changes: 79 additions & 0 deletions src/viadot/orchestration/prefect/tasks/vid_club.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
"""Task for downloading data from Vid Club Cloud API."""

from typing import Any, Literal

import pandas as pd
from prefect import task

from viadot.orchestration.prefect.exceptions import MissingSourceCredentialsError
from viadot.orchestration.prefect.utils import get_credentials
from viadot.sources import VidClub


@task(retries=3, log_prints=True, retry_delay_seconds=10, timeout_seconds=2 * 60 * 60)
def vid_club_to_df( # noqa: PLR0913
*args: list[Any],
endpoint: Literal["jobs", "product", "company", "survey"] | None = None,
from_date: str = "2022-03-22",
to_date: str | None = None,
items_per_page: int = 100,
region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] | None = None,
days_interval: int = 30,
cols_to_drop: list[str] | None = None,
azure_key_vault_secret: str | None = None,
adls_config_key: str | None = None,
validate_df_dict: dict | None = None,
timeout: int = 3600,
**kwargs: dict[str, Any],
) -> pd.DataFrame:
"""Task to downloading data from Vid Club APIs to Pandas DataFrame.

Args:
endpoint (Literal["jobs", "product", "company", "survey"], optional):
The endpoint source to be accessed. Defaults to None.
from_date (str, optional): Start date for the query, by default is the oldest
date in the data 2022-03-22.
to_date (str, optional): End date for the query. By default None,
which will be executed as datetime.today().strftime("%Y-%m-%d") in code.
items_per_page (int, optional): Number of entries per page. Defaults to 100.
region (Literal["bg", "hu", "hr", "pl", "ro", "si", "all"], optional): Region
filter for the query. Defaults to None (parameter is not used in url).
[December 2023 status: value 'all' does not work for company and jobs]
days_interval (int, optional): Days specified in date range per API call
(test showed that 30-40 is optimal for performance). Defaults to 30.
cols_to_drop (List[str], optional): List of columns to drop. Defaults to None.
config_key (str, optional): The key in the viadot config holding relevant
credentials. Defaults to None.
azure_key_vault_secret (Optional[str], optional): The name of the Azure Key
Vault secret where credentials are stored. Defaults to None.
validate_df_dict (dict, optional): A dictionary with optional list of tests
to verify the output
dataframe. If defined, triggers the `validate_df` task from task_utils.
Defaults to None.
timeout (int, optional): The time (in seconds) to wait while running this task
before a timeout occurs. Defaults to 3600.

Returns: Pandas DataFrame
"""
if not (azure_key_vault_secret or adls_config_key):
raise MissingSourceCredentialsError

if not adls_config_key:
credentials = get_credentials(azure_key_vault_secret)

vc_obj = VidClub(
args=args,
endpoint=endpoint,
from_date=from_date,
to_date=to_date,
items_per_page=items_per_page,
region=region,
days_interval=days_interval,
cols_to_drop=cols_to_drop,
vid_club_credentials=credentials,
validate_df_dict=validate_df_dict,
timeout=timeout,
kwargs=kwargs,
)

return vc_obj.to_df()
2 changes: 2 additions & 0 deletions src/viadot/sources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from .sql_server import SQLServer
from .supermetrics import Supermetrics, SupermetricsCredentials
from .uk_carbon_intensity import UKCarbonIntensity
from .vid_club import VidClub


__all__ = [
Expand All @@ -41,6 +42,7 @@
"SupermetricsCredentials", # pragma: allowlist-secret
"Trino",
"UKCarbonIntensity",
"VidClub",
]
if find_spec("adlfs"):
from viadot.sources.azure_data_lake import AzureDataLake # noqa: F401
Expand Down
Loading