From e54e0040c2e07b29e055982ee9ea41f433c4bbed Mon Sep 17 00:00:00 2001 From: adrian-wojcik Date: Wed, 11 Sep 2024 10:33:34 +0200 Subject: [PATCH 01/17] =?UTF-8?q?=F0=9F=9A=80=20Add=20Vid=20Club=20connect?= =?UTF-8?q?or=20with=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../orchestration/prefect/flows/__init__.py | 2 + .../prefect/flows/vid_club_to_adls.py | 97 +++++ .../orchestration/prefect/tasks/__init__.py | 2 + .../orchestration/prefect/tasks/vid_club.py | 80 ++++ src/viadot/sources/__init__.py | 2 + src/viadot/sources/vid_club.py | 400 ++++++++++++++++++ .../prefect/flows/test_vid_club.py | 28 ++ .../prefect/tasks/test_vid_club.py | 51 +++ tests/unit/test_vid_club.py | 58 +++ 9 files changed, 720 insertions(+) create mode 100644 src/viadot/orchestration/prefect/flows/vid_club_to_adls.py create mode 100644 src/viadot/orchestration/prefect/tasks/vid_club.py create mode 100644 src/viadot/sources/vid_club.py create mode 100644 tests/integration/orchestration/prefect/flows/test_vid_club.py create mode 100644 tests/integration/orchestration/prefect/tasks/test_vid_club.py create mode 100644 tests/unit/test_vid_club.py diff --git a/src/viadot/orchestration/prefect/flows/__init__.py b/src/viadot/orchestration/prefect/flows/__init__.py index 0b314c819..d2a378430 100644 --- a/src/viadot/orchestration/prefect/flows/__init__.py +++ b/src/viadot/orchestration/prefect/flows/__init__.py @@ -21,6 +21,7 @@ from .sql_server_to_parquet import sql_server_to_parquet from .transform import transform from .transform_and_catalog import transform_and_catalog +from .vid_club_to_adls import vid_club_to_adls __all__ = [ @@ -45,4 +46,5 @@ "sql_server_to_parquet", "transform", "transform_and_catalog", + "vid_club_to_adls", ] diff --git a/src/viadot/orchestration/prefect/flows/vid_club_to_adls.py b/src/viadot/orchestration/prefect/flows/vid_club_to_adls.py new file mode 100644 index 000000000..54874ba20 --- /dev/null +++ b/src/viadot/orchestration/prefect/flows/vid_club_to_adls.py @@ -0,0 +1,97 @@ +"""Download data from Vid CLub API and load it into Azure Data Lake Storage.""" + +from typing import Any, Dict, List, Literal +from prefect import flow +from prefect.task_runners import ConcurrentTaskRunner +from viadot.orchestration.prefect.tasks import df_to_adls, vid_club_to_df + + +@flow( + name="Vid CLub extraction to ADLS", + description="Extract data from Vid CLub and load it into Azure Data Lake Storage.", + retries=1, + retry_delay_seconds=60, + task_runner=ConcurrentTaskRunner, +) +def vid_club_to_adls( + *args: List[Any], + source: Literal["jobs", "product", "company", "survey"] = None, + from_date: str = "2022-03-22", + to_date: str = None, + items_per_page: int = 100, + region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] = None, + days_interval: int = 30, + cols_to_drop: List[str] = None, + vid_club_credentials: Dict[str, Any] = None, + vidclub_credentials_secret: str = "VIDCLUB", + adls_config_key: str | None = None, + adls_azure_key_vault_secret: str | None = None, + adls_path: str | None = None, + adls_path_overwrite: bool = False, + validate_df_dict: dict = None, + timeout: int = 3600, + **kwargs: Dict[str, Any] +) -> None: + """ + Flow for downloading data from the Vid Club via API to a CSV or Parquet file. + + Then upload it to Azure Data Lake. + + Args: + source (Literal["jobs", "product", "company", "survey"], optional): The endpoint + source to be accessed. Defaults to None. + from_date (str, optional): Start date for the query, by default is the oldest + date in the data 2022-03-22. + to_date (str, optional): End date for the query. By default None, + which will be executed as datetime.today().strftime("%Y-%m-%d") in code. + items_per_page (int, optional): Number of entries per page. Defaults to 100. + region (Literal["bg", "hu", "hr", "pl", "ro", "si", "all"], optional): Region + filter for the query. Defaults to None (parameter is not used in url). + [December 2023 status: value 'all' does not work for company and jobs] + days_interval (int, optional): Days specified in date range per API call + (test showed that 30-40 is optimal for performance). Defaults to 30. + cols_to_drop (List[str], optional): List of columns to drop. Defaults to None. + vid_club_credentials (Dict[str, Any], optional): Stores the credentials + information. Defaults to None. + vidclub_credentials_secret (str, optional): The name of the secret in + Azure Key Vault or Prefect or local_config file. Defaults to "VIDCLUB". + adls_config_key (Optional[str], optional): The key in the viadot config holding + relevant credentials. Defaults to None. + adls_azure_key_vault_secret (Optional[str], optional): The name of the Azure Key + Vault secret containing a dictionary with ACCOUNT_NAME and Service Principal + credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET) for the Azure Data Lake. + Defaults to None. + adls_path (Optional[str], optional): Azure Data Lake destination file path. + Defaults to None. + adls_path_overwrite (bool, optional): Whether to overwrite the file in ADLS. + Defaults to True. + validate_df_dict (dict, optional): A dictionary with optional list of tests + to verify the output + dataframe. If defined, triggers the `validate_df` task from task_utils. + Defaults to None. + timeout (int, optional): The time (in seconds) to wait while running this task + before a timeout occurs. Defaults to 3600. + """ + data_frame = vid_club_to_df( + args=args, + source=source, + from_date=from_date, + to_date=to_date, + items_per_page=items_per_page, + region=region, + days_interval=days_interval, + cols_to_drop=cols_to_drop, + vid_club_credentials=vid_club_credentials, + vidclub_credentials_secret=vidclub_credentials_secret, + validate_df_dict=validate_df_dict, + timeout=timeout, + kawrgs=kwargs + ) + + return df_to_adls( + df=data_frame, + path=adls_path, + credentials_secret=adls_azure_key_vault_secret, + config_key=adls_config_key, + overwrite=adls_path_overwrite, + ) diff --git a/src/viadot/orchestration/prefect/tasks/__init__.py b/src/viadot/orchestration/prefect/tasks/__init__.py index cbe8f7276..e5e515dff 100644 --- a/src/viadot/orchestration/prefect/tasks/__init__.py +++ b/src/viadot/orchestration/prefect/tasks/__init__.py @@ -22,6 +22,7 @@ sharepoint_to_df, ) from .sql_server import create_sql_server_table, sql_server_query, sql_server_to_df +from .vid_club import vid_club_to_df __all__ = [ @@ -48,4 +49,5 @@ "create_sql_server_table", "sql_server_query", "sql_server_to_df", + "vid_club_to_df", ] diff --git a/src/viadot/orchestration/prefect/tasks/vid_club.py b/src/viadot/orchestration/prefect/tasks/vid_club.py new file mode 100644 index 000000000..d25b272e5 --- /dev/null +++ b/src/viadot/orchestration/prefect/tasks/vid_club.py @@ -0,0 +1,80 @@ +"""Task for downloading data from Vid Club Cloud API.""" + +from typing import Any, Dict, List, Literal +import pandas as pd +from prefect import task +from viadot.sources import VidClub +from viadot.orchestration.prefect.utils import get_credentials +from viadot.orchestration.prefect.exceptions import MissingSourceCredentialsError + + +@task(retries=3, log_prints=True, retry_delay_seconds=10, timeout_seconds=2 * 60 * 60) +def vid_club_to_df( + *args: List[Any], + source: Literal["jobs", "product", "company", "survey"] = None, + from_date: str = "2022-03-22", + to_date: str = None, + items_per_page: int = 100, + region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] = None, + days_interval: int = 30, + cols_to_drop: List[str] = None, + vid_club_credentials: Dict[str, Any] = None, + vidclub_credentials_secret: str = "VIDCLUB", + validate_df_dict: dict = None, + timeout: int = 3600, + **kwargs: Dict[str, Any], +) -> pd.DataFrame: + """ + Task to downloading data from Vid Club APIs to Pandas DataFrame. + + Args: + source (Literal["jobs", "product", "company", "survey"], optional): The endpoint + source to be accessed. Defaults to None. + from_date (str, optional): Start date for the query, by default is the oldest + date in the data 2022-03-22. + to_date (str, optional): End date for the query. By default None, + which will be executed as datetime.today().strftime("%Y-%m-%d") in code. + items_per_page (int, optional): Number of entries per page. Defaults to 100. + region (Literal["bg", "hu", "hr", "pl", "ro", "si", "all"], optional): Region + filter for the query. Defaults to None (parameter is not used in url). + [December 2023 status: value 'all' does not work for company and jobs] + days_interval (int, optional): Days specified in date range per API call + (test showed that 30-40 is optimal for performance). Defaults to 30. + cols_to_drop (List[str], optional): List of columns to drop. Defaults to None. + vid_club_credentials (Dict[str, Any], optional): Stores the credentials + information. Defaults to None. + vidclub_credentials_secret (str, optional): The name of the secret in + Azure Key Vault or Prefect or local_config file. Defaults to "VIDCLUB". + validate_df_dict (dict, optional): A dictionary with optional list of tests + to verify the output + dataframe. If defined, triggers the `validate_df` task from task_utils. + Defaults to None. + timeout (int, optional): The time (in seconds) to wait while running this task + before a timeout occurs. Defaults to 3600. + + Returns: Pandas DataFrame + """ + if not vid_club_credentials: + vid_club_credentials = get_credentials(vidclub_credentials_secret) + + if not vid_club_credentials: + raise MissingSourceCredentialsError + + vc_obj = VidClub( + args=args, + source=source, + from_date=from_date, + to_date=to_date, + items_per_page=items_per_page, + region=region, + days_interval=days_interval, + cols_to_drop=cols_to_drop, + vid_club_credentials=vid_club_credentials, + validate_df_dict=validate_df_dict, + timeout=timeout, + kwargs=kwargs + ) + + vc_dataframe = vc_obj.to_df() + + return vc_dataframe diff --git a/src/viadot/sources/__init__.py b/src/viadot/sources/__init__.py index 0d77c1bf2..e4e3411f7 100644 --- a/src/viadot/sources/__init__.py +++ b/src/viadot/sources/__init__.py @@ -13,6 +13,7 @@ from .sql_server import SQLServer from .trino import Trino from .uk_carbon_intensity import UKCarbonIntensity +from .vid_club import VidClub __all__ = [ @@ -26,6 +27,7 @@ "Trino", "SQLServer", "UKCarbonIntensity", + "VidClub", ] if find_spec("adlfs"): diff --git a/src/viadot/sources/vid_club.py b/src/viadot/sources/vid_club.py new file mode 100644 index 000000000..d1a3a2b25 --- /dev/null +++ b/src/viadot/sources/vid_club.py @@ -0,0 +1,400 @@ +"""Vid Club Cloud API connector.""" + +from datetime import datetime, timedelta +from typing import Any, Dict, List, Literal, Tuple +import pandas as pd + +from prefect import get_run_logger +from ..exceptions import ValidationError +from ..utils import handle_api_response +from .base import Source + + +logger = get_run_logger() + + +class VidClub(Source): + """ + A class implementing the Vid Club API. + + Documentation for this API is located at: https://evps01.envoo.net/vipapi/ + There are 4 endpoints where to get the data. + """ + + def __init__( + self, + *args, + source: Literal["jobs", "product", "company", "survey"] = None, + from_date: str = "2022-03-22", + to_date: str = None, + items_per_page: int = 100, + region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] = None, + days_interval: int = 30, + cols_to_drop: List[str] = None, + vid_club_credentials: Dict[str, Any] = None, + validate_df_dict: dict = None, + timeout: int = 3600, + **kwargs + ): + """ + Create an instance of VidClub. + + Args: + source (Literal["jobs", "product", "company", "survey"], optional): The + endpoint source to be accessed. Defaults to None. + from_date (str, optional): Start date for the query, by default is the + oldest date in the data 2022-03-22. + to_date (str, optional): End date for the query. By default None, + which will be executed as datetime.today().strftime("%Y-%m-%d") in code. + items_per_page (int, optional): Number of entries per page. Defaults to 100. + region (Literal["bg", "hu", "hr", "pl", "ro", "si", "all"], optional): + Region filter for the query. Defaults to None + (parameter is not used in url). [December 2023 status: value 'all' + does not work for company and jobs] + days_interval (int, optional): Days specified in date range per API call + (test showed that 30-40 is optimal for performance). Defaults to 30. + cols_to_drop (List[str], optional): List of columns to drop. + Defaults to None. + vid_club_credentials (Dict[str, Any], optional): Stores the credentials + information. Defaults to None. + validate_df_dict (dict, optional): A dictionary with optional list of tests + to verify the output + dataframe. If defined, triggers the `validate_df` task from task_utils. + Defaults to None. + timeout (int, optional): The time (in seconds) to wait while running this + task before a timeout occurs. Defaults to 3600. + """ + self.source = source + self.from_date = from_date + self.to_date = to_date + self.items_per_page = items_per_page + self.region = region + self.days_interval = days_interval + self.cols_to_drop = cols_to_drop + self.vid_club_credentials = vid_club_credentials + self.validate_df_dict = validate_df_dict + self.timeout = timeout + + self.headers = { + "Authorization": "Bearer " + vid_club_credentials["token"], + "Content-Type": "application/json", + } + + super().__init__(*args, **kwargs) + + def build_query( + self, + from_date: str, + to_date: str, + api_url: str, + items_per_page: int, + source: Literal["jobs", "product", "company", "survey"] = None, + region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] = None, + ) -> str: + """ + Builds the query from the inputs. + + Args: + from_date (str): Start date for the query. + to_date (str): End date for the query, if empty, will be executed as + datetime.today().strftime("%Y-%m-%d"). + api_url (str): Generic part of the URL to Vid Club API. + items_per_page (int): number of entries per page. + source (Literal["jobs", "product", "company", "survey"], optional): + The endpoint source to be accessed. Defaults to None. + region (Literal["bg", "hu", "hr", "pl", "ro", "si", "all"], optional): + Region filter for the query. Defaults to None + (parameter is not used in url). [December 2023 status: value 'all' + does not work for company and jobs] + + Returns: + str: Final query with all filters added. + + Raises: + ValidationError: If any source different than the ones in the list are used. + """ + if source in ["jobs", "product", "company"]: + region_url_string = f"®ion={region}" if region else "" + url = f"""{api_url}{source}?from={from_date}&to={to_date} + {region_url_string}&limit={items_per_page}""" + elif source == "survey": + url = f"{api_url}{source}?language=en&type=question" + else: + raise ValidationError( + "Pick one these sources: jobs, product, company, survey" + ) + return url + + def intervals( + self, from_date: str, to_date: str, days_interval: int + ) -> Tuple[List[str], List[str]]: + """ + Breaks dates range into smaller by provided days interval. + + Args: + from_date (str): Start date for the query in "%Y-%m-%d" format. + to_date (str): End date for the query, if empty, will be executed as + datetime.today().strftime("%Y-%m-%d"). + days_interval (int): Days specified in date range per api call + (test showed that 30-40 is optimal for performance). + + Returns: + List[str], List[str]: Starts and Ends lists that contains information + about date ranges for specific period and time interval. + + Raises: + ValidationError: If the final date of the query is before the start date. + """ + if to_date is None: + to_date = datetime.today().strftime("%Y-%m-%d") + + end_date = datetime.strptime(to_date, "%Y-%m-%d").date() + start_date = datetime.strptime(from_date, "%Y-%m-%d").date() + + from_date_obj = datetime.strptime(from_date, "%Y-%m-%d") + + to_date_obj = datetime.strptime(to_date, "%Y-%m-%d") + delta = to_date_obj - from_date_obj + + if delta.days < 0: + raise ValidationError("to_date cannot be earlier than from_date.") + + interval = timedelta(days=days_interval) + starts = [] + ends = [] + + period_start = start_date + while period_start < end_date: + period_end = min(period_start + interval, end_date) + starts.append(period_start.strftime("%Y-%m-%d")) + ends.append(period_end.strftime("%Y-%m-%d")) + period_start = period_end + if len(starts) == 0 and len(ends) == 0: + starts.append(from_date) + ends.append(to_date) + return starts, ends + + def check_connection( + self, + source: Literal["jobs", "product", "company", "survey"] = None, + from_date: str = "2022-03-22", + to_date: str = None, + items_per_page: int = 100, + region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] = None, + url: str = None, + ) -> Tuple[Dict[str, Any], str]: + """ + Initiate first connection to API to retrieve piece of data. + + With information about type of pagination in API URL. + This option is added because type of pagination for endpoints is being changed + in the future from page number to 'next' id. + + Args: + source (Literal["jobs", "product", "company", "survey"], optional): + The endpoint source to be accessed. Defaults to None. + from_date (str, optional): Start date for the query, by default is the + oldest date in the data 2022-03-22. + to_date (str, optional): End date for the query. By default None, + which will be executed as datetime.today().strftime("%Y-%m-%d") in code. + items_per_page (int, optional): Number of entries per page. + 100 entries by default. + region (Literal["bg", "hu", "hr", "pl", "ro", "si", "all"], optional): + Region filter for the query. Defaults to None + (parameter is not used in url). [December 2023 status: value 'all' + does not work for company and jobs] + url (str, optional): Generic part of the URL to Vid Club API. + Defaults to None. + + Returns: + Tuple[Dict[str, Any], str]: Dictionary with first response from API with + JSON containing data and used URL string. + + Raises: + ValidationError: If from_date is earlier than 2022-03-22. + ValidationError: If to_date is earlier than from_date. + """ + if from_date < "2022-03-22": + raise ValidationError("from_date cannot be earlier than 2022-03-22.") + + if to_date < from_date: + raise ValidationError("to_date cannot be earlier than from_date.") + + if url is None: + url = self.credentials["url"] + + first_url = self.build_query( + source=source, + from_date=from_date, + to_date=to_date, + api_url=url, + items_per_page=items_per_page, + region=region, + ) + headers = self.headers + response = handle_api_response( + url=first_url, headers=headers, method="GET" + ) + response = response.json() + return (response, first_url) + + def get_response( + self, + source: Literal["jobs", "product", "company", "survey"] = None, + from_date: str = "2022-03-22", + to_date: str = None, + items_per_page: int = 100, + region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] = None, + ) -> pd.DataFrame: + """ + Basing on the pagination type retrieved using check_connection function. + + It gets the response from the API queried and transforms it into DataFrame. + + Args: + source (Literal["jobs", "product", "company", "survey"], optional): + The endpoint source to be accessed. Defaults to None. + from_date (str, optional): Start date for the query, by default is the + oldest date in the data 2022-03-22. + to_date (str, optional): End date for the query. By default None, + which will be executed as datetime.today().strftime("%Y-%m-%d") in code. + items_per_page (int, optional): Number of entries per page. + 100 entries by default. + region (Literal["bg", "hu", "hr", "pl", "ro", "si", "all"], optional): + Region filter for the query. Defaults to None + (parameter is not used in url). [December 2023 status: value 'all' + does not work for company and jobs] + + Returns + pd.DataFrame: Table of the data carried in the response. + + Raises: + ValidationError: If any source different than the ones in the list are used. + """ + headers = self.headers + if source not in ["jobs", "product", "company", "survey"]: + raise ValidationError( + "The source has to be: jobs, product, company or survey" + ) + if to_date is None: + to_date = datetime.today().strftime("%Y-%m-%d") + + response, first_url = self.check_connection( + source=source, + from_date=from_date, + to_date=to_date, + items_per_page=items_per_page, + region=region, + ) + + if isinstance(response, dict): + keys_list = list(response.keys()) + elif isinstance(response, list): + keys_list = list(response[0].keys()) + else: + keys_list = [] + + ind = "next" in keys_list + + if "data" in keys_list: + df = pd.json_normalize(response["data"]) + df = pd.DataFrame(df) + length = df.shape[0] + page = 1 + + while length == items_per_page: + if ind is True: + next_page = response["next"] + url = f"{first_url}&next={next_page}" + else: + page += 1 + url = f"{first_url}&page={page}" + response_api = handle_api_response( + url=url, headers=headers, method="GET" + ) + response = response_api.json() + df_page = pd.json_normalize(response["data"]) + df_page = pd.DataFrame(df_page) + if source == "product": + df_page = df_page.transpose() + length = df_page.shape[0] + df = pd.concat((df, df_page), axis=0) + else: + df = pd.DataFrame(response) + + return df + + def to_df( + self, + if_empty: str = "warn", + ) -> pd.DataFrame: + """ + Looping get_response and iterating by date ranges defined in intervals. + + Stores outputs as DataFrames in a list. At the end, daframes are concatenated + in one and dropped duplicates that would appear when quering. + + Args: + if_empty (str, optional): What to do if a fetch produce no data. + Defaults to "warn + + Returns: + pd.DataFrame: Dataframe of the concatanated data carried in the responses. + """ + starts, ends = self.intervals( + from_date=self.from_date, + to_date=self.to_date, + days_interval=self.days_interval + ) + + dfs_list = [] + if len(starts) > 0 and len(ends) > 0: + for start, end in zip(starts, ends): + logger.info(f"ingesting data for dates [{start}]-[{end}]...") + df = self.get_response( + source=self.source, + from_date=start, + to_date=end, + items_per_page=self.items_per_page, + region=self.region, + ) + dfs_list.append(df) + if len(dfs_list) > 1: + df = pd.concat(dfs_list, axis=0, ignore_index=True) + else: + df = pd.DataFrame(dfs_list[0]) + else: + df = self.get_response( + source=self.source, + from_date=self.from_date, + to_date=self.to_date, + items_per_page=self.items_per_page, + region=self.region, + ) + list_columns = df.columns[ + df.map(lambda x: isinstance(x, list)).any() + ].tolist() + for i in list_columns: + df[i] = df[i].apply(lambda x: tuple(x) if isinstance(x, list) else x) + df.drop_duplicates(inplace=True) + + if self.cols_to_drop is not None: + if isinstance(self.cols_to_drop, list): + try: + logger.info(f"Dropping following columns: {self.cols_to_drop}...") + df.drop( + columns=self.cols_to_drop, inplace=True, errors="raise" + ) + except KeyError: + logger.error( + f"""Column(s): {self.cols_to_drop} don't exist in the DataFrame. + No columns were dropped. Returning full DataFrame...""" + ) + logger.info(f"Existing columns: {df.columns}") + else: + raise TypeError("Provide columns to drop in a List.") + + if df.empty: + logger.error("No data for this date range") + + return df diff --git a/tests/integration/orchestration/prefect/flows/test_vid_club.py b/tests/integration/orchestration/prefect/flows/test_vid_club.py new file mode 100644 index 000000000..0780f8601 --- /dev/null +++ b/tests/integration/orchestration/prefect/flows/test_vid_club.py @@ -0,0 +1,28 @@ +from src.viadot.orchestration.prefect.flows import vid_club_to_adls +from src.viadot.sources import AzureDataLake + +TEST_FILE_PATH = "test/path/to/adls.parquet" +TEST_SOURCE = "jobs" +TEST_FROM_DATE = "2023-01-01" +TEST_TO_DATE = "2023-12-31" +ADLS_CREDENTIALS_SECRET = "test_adls_secret" +VIDCLUB_CREDENTIALS_SECRET = "test_vidclub_secret" + + +def test_vid_club_to_adls(): + lake = AzureDataLake(config_key="adls_test") + + assert not lake.exists(TEST_FILE_PATH) + + vid_club_to_adls( + source=TEST_SOURCE, + from_date=TEST_FROM_DATE, + to_date=TEST_TO_DATE, + adls_path=TEST_FILE_PATH, + adls_azure_key_vault_secret=ADLS_CREDENTIALS_SECRET, + vidclub_credentials_secret=VIDCLUB_CREDENTIALS_SECRET + ) + + assert lake.exists(TEST_FILE_PATH) + + lake.rm(TEST_FILE_PATH) diff --git a/tests/integration/orchestration/prefect/tasks/test_vid_club.py b/tests/integration/orchestration/prefect/tasks/test_vid_club.py new file mode 100644 index 000000000..67b48974f --- /dev/null +++ b/tests/integration/orchestration/prefect/tasks/test_vid_club.py @@ -0,0 +1,51 @@ +import pytest +import pandas as pd +from src.viadot.orchestration.prefect.tasks import vid_club_to_df +from src.viadot.orchestration.prefect.exceptions import MissingSourceCredentialsError + + +EXPECTED_DF = pd.DataFrame({ + "id": [1, 2], + "name": ["Company A", "Company B"], + "region": ["pl", "ro"] +}) + + +class MockVidClub: + def __init__(self, *args, **kwargs): + pass + + def to_df(self): + return EXPECTED_DF + + +def test_vid_club_to_df(mocker): + mocker.patch('viadot.orchestration.prefect.tasks.VidClub', new=MockVidClub) + + + df = vid_club_to_df( + source="company", + from_date="2023-01-01", + to_date="2023-12-31", + items_per_page=100, + region="pl", + vidclub_credentials_secret="VIDCLUB" + ) + + assert isinstance(df, pd.DataFrame) + assert not df.empty + assert df.equals(EXPECTED_DF) + + +def test_vid_club_to_df_missing_credentials(mocker): + mocker.patch('viadot.orchestration.prefect.tasks.get_credentials', return_value=None) + + with pytest.raises(MissingSourceCredentialsError): + vid_club_to_df( + source="company", + from_date="2023-01-01", + to_date="2023-12-31", + items_per_page=100, + region="pl", + vidclub_credentials_secret="VIDCLUB" + ) diff --git a/tests/unit/test_vid_club.py b/tests/unit/test_vid_club.py new file mode 100644 index 000000000..287139703 --- /dev/null +++ b/tests/unit/test_vid_club.py @@ -0,0 +1,58 @@ +import unittest +from datetime import datetime +from your_module import VidClub, ValidationError + +class TestVidClub(unittest.TestCase): + def setUp(self): + """Setup VidClub instance before each test.""" + # Sample input data for the constructor + self.vid_club = VidClub( + source="jobs", + vid_club_credentials={"token": "test-token"} + ) + + def test_build_query(self): + """Test correct URL generation for the 'jobs' endpoint.""" + # Sample input data for the build_query method + from_date = "2023-01-01" + to_date = "2023-01-31" + api_url = "https://example.com/api/" + items_per_page = 50 + source = "jobs" + region = "pl" + + # Expected result URL + expected_url = ( + "https://example.com/api/jobs?from=2023-01-01&to=2023-01-31®ion=pl&limit=50" + ) + + # Check if the method returns the correct URL + result_url = self.vid_club.build_query(from_date, to_date, api_url, items_per_page, source, region) + self.assertEqual(result_url.strip(), expected_url.strip()) + + def test_intervals(self): + """Test breaking date range into intervals based on the days_interval.""" + # Sample input data for the intervals method + from_date = "2023-01-01" + to_date = "2023-01-15" + days_interval = 5 + + # Expected starts and ends lists + expected_starts = ["2023-01-01", "2023-01-06", "2023-01-11"] + expected_ends = ["2023-01-06", "2023-01-11", "2023-01-15"] + + # Check if the method returns correct intervals + starts, ends = self.vid_club.intervals(from_date, to_date, days_interval) + self.assertEqual(starts, expected_starts) + self.assertEqual(ends, expected_ends) + + def test_intervals_invalid_date_range(self): + """Test that ValidationError is raised when to_date is before from_date.""" + # Sample input data where to_date is before from_date + from_date = "2023-01-15" + to_date = "2023-01-01" + days_interval = 5 + + # Check if ValidationError is raised + with self.assertRaises(ValidationError): + self.vid_club.intervals(from_date, to_date, days_interval) \ No newline at end of file From 900537422dc459a0e2ac64e26bcc669f6904bc22 Mon Sep 17 00:00:00 2001 From: fdelgadodyvenia Date: Sun, 22 Sep 2024 09:19:25 +0000 Subject: [PATCH 02/17] removed prefect dependency from source --- src/viadot/sources/vid_club.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/viadot/sources/vid_club.py b/src/viadot/sources/vid_club.py index d1a3a2b25..f7adab828 100644 --- a/src/viadot/sources/vid_club.py +++ b/src/viadot/sources/vid_club.py @@ -4,13 +4,15 @@ from typing import Any, Dict, List, Literal, Tuple import pandas as pd -from prefect import get_run_logger +import logging from ..exceptions import ValidationError from ..utils import handle_api_response from .base import Source -logger = get_run_logger() +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) class VidClub(Source): From a5219db8b5297d78e56a656300fd709cd76731c2 Mon Sep 17 00:00:00 2001 From: fdelgadodyvenia Date: Mon, 23 Sep 2024 11:33:54 +0000 Subject: [PATCH 03/17] super init passing credentials --- src/viadot/sources/vid_club.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/viadot/sources/vid_club.py b/src/viadot/sources/vid_club.py index f7adab828..0b8377d48 100644 --- a/src/viadot/sources/vid_club.py +++ b/src/viadot/sources/vid_club.py @@ -82,7 +82,7 @@ def __init__( "Content-Type": "application/json", } - super().__init__(*args, **kwargs) + super().__init__(credentials=vid_club_credentials,*args, **kwargs) def build_query( self, From 39e2e9848abbcf0071fb95cca100843d31554257 Mon Sep 17 00:00:00 2001 From: adrian-wojcik Date: Tue, 24 Sep 2024 11:46:03 +0200 Subject: [PATCH 04/17] =?UTF-8?q?=F0=9F=8E=A8=20Changed=20`source`=20to=20?= =?UTF-8?q?`endpoint`?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../prefect/flows/vid_club_to_adls.py | 6 +-- .../orchestration/prefect/tasks/vid_club.py | 6 +-- src/viadot/sources/vid_club.py | 38 +++++++++---------- .../prefect/flows/test_vid_club.py | 2 +- .../prefect/tasks/test_vid_club.py | 4 +- tests/unit/test_vid_club.py | 6 +-- 6 files changed, 31 insertions(+), 31 deletions(-) diff --git a/src/viadot/orchestration/prefect/flows/vid_club_to_adls.py b/src/viadot/orchestration/prefect/flows/vid_club_to_adls.py index 54874ba20..2441c9aa3 100644 --- a/src/viadot/orchestration/prefect/flows/vid_club_to_adls.py +++ b/src/viadot/orchestration/prefect/flows/vid_club_to_adls.py @@ -15,7 +15,7 @@ ) def vid_club_to_adls( *args: List[Any], - source: Literal["jobs", "product", "company", "survey"] = None, + endpoint: Literal["jobs", "product", "company", "survey"] = None, from_date: str = "2022-03-22", to_date: str = None, items_per_page: int = 100, @@ -38,7 +38,7 @@ def vid_club_to_adls( Then upload it to Azure Data Lake. Args: - source (Literal["jobs", "product", "company", "survey"], optional): The endpoint + endpoint (Literal["jobs", "product", "company", "survey"], optional): The endpoint source to be accessed. Defaults to None. from_date (str, optional): Start date for the query, by default is the oldest date in the data 2022-03-22. @@ -74,7 +74,7 @@ def vid_club_to_adls( """ data_frame = vid_club_to_df( args=args, - source=source, + endpoint=endpoint, from_date=from_date, to_date=to_date, items_per_page=items_per_page, diff --git a/src/viadot/orchestration/prefect/tasks/vid_club.py b/src/viadot/orchestration/prefect/tasks/vid_club.py index d25b272e5..452130d3f 100644 --- a/src/viadot/orchestration/prefect/tasks/vid_club.py +++ b/src/viadot/orchestration/prefect/tasks/vid_club.py @@ -11,7 +11,7 @@ @task(retries=3, log_prints=True, retry_delay_seconds=10, timeout_seconds=2 * 60 * 60) def vid_club_to_df( *args: List[Any], - source: Literal["jobs", "product", "company", "survey"] = None, + endpoint: Literal["jobs", "product", "company", "survey"] = None, from_date: str = "2022-03-22", to_date: str = None, items_per_page: int = 100, @@ -28,7 +28,7 @@ def vid_club_to_df( Task to downloading data from Vid Club APIs to Pandas DataFrame. Args: - source (Literal["jobs", "product", "company", "survey"], optional): The endpoint + endpoint (Literal["jobs", "product", "company", "survey"], optional): The endpoint source to be accessed. Defaults to None. from_date (str, optional): Start date for the query, by default is the oldest date in the data 2022-03-22. @@ -62,7 +62,7 @@ def vid_club_to_df( vc_obj = VidClub( args=args, - source=source, + endpoint=endpoint, from_date=from_date, to_date=to_date, items_per_page=items_per_page, diff --git a/src/viadot/sources/vid_club.py b/src/viadot/sources/vid_club.py index 0b8377d48..499eed775 100644 --- a/src/viadot/sources/vid_club.py +++ b/src/viadot/sources/vid_club.py @@ -26,7 +26,7 @@ class VidClub(Source): def __init__( self, *args, - source: Literal["jobs", "product", "company", "survey"] = None, + endpoint: Literal["jobs", "product", "company", "survey"] = None, from_date: str = "2022-03-22", to_date: str = None, items_per_page: int = 100, @@ -42,7 +42,7 @@ def __init__( Create an instance of VidClub. Args: - source (Literal["jobs", "product", "company", "survey"], optional): The + endpoint (Literal["jobs", "product", "company", "survey"], optional): The endpoint source to be accessed. Defaults to None. from_date (str, optional): Start date for the query, by default is the oldest date in the data 2022-03-22. @@ -66,7 +66,7 @@ def __init__( timeout (int, optional): The time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. """ - self.source = source + self.endpoint = endpoint self.from_date = from_date self.to_date = to_date self.items_per_page = items_per_page @@ -90,7 +90,7 @@ def build_query( to_date: str, api_url: str, items_per_page: int, - source: Literal["jobs", "product", "company", "survey"] = None, + endpoint: Literal["jobs", "product", "company", "survey"] = None, region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] = None, ) -> str: """ @@ -102,7 +102,7 @@ def build_query( datetime.today().strftime("%Y-%m-%d"). api_url (str): Generic part of the URL to Vid Club API. items_per_page (int): number of entries per page. - source (Literal["jobs", "product", "company", "survey"], optional): + endpoint (Literal["jobs", "product", "company", "survey"], optional): The endpoint source to be accessed. Defaults to None. region (Literal["bg", "hu", "hr", "pl", "ro", "si", "all"], optional): Region filter for the query. Defaults to None @@ -115,12 +115,12 @@ def build_query( Raises: ValidationError: If any source different than the ones in the list are used. """ - if source in ["jobs", "product", "company"]: + if endpoint in ["jobs", "product", "company"]: region_url_string = f"®ion={region}" if region else "" - url = f"""{api_url}{source}?from={from_date}&to={to_date} + url = f"""{api_url}{endpoint}?from={from_date}&to={to_date} {region_url_string}&limit={items_per_page}""" - elif source == "survey": - url = f"{api_url}{source}?language=en&type=question" + elif endpoint == "survey": + url = f"{api_url}{endpoint}?language=en&type=question" else: raise ValidationError( "Pick one these sources: jobs, product, company, survey" @@ -178,7 +178,7 @@ def intervals( def check_connection( self, - source: Literal["jobs", "product", "company", "survey"] = None, + endpoint: Literal["jobs", "product", "company", "survey"] = None, from_date: str = "2022-03-22", to_date: str = None, items_per_page: int = 100, @@ -193,7 +193,7 @@ def check_connection( in the future from page number to 'next' id. Args: - source (Literal["jobs", "product", "company", "survey"], optional): + endpoint (Literal["jobs", "product", "company", "survey"], optional): The endpoint source to be accessed. Defaults to None. from_date (str, optional): Start date for the query, by default is the oldest date in the data 2022-03-22. @@ -226,7 +226,7 @@ def check_connection( url = self.credentials["url"] first_url = self.build_query( - source=source, + endpoint=endpoint, from_date=from_date, to_date=to_date, api_url=url, @@ -242,7 +242,7 @@ def check_connection( def get_response( self, - source: Literal["jobs", "product", "company", "survey"] = None, + endpoint: Literal["jobs", "product", "company", "survey"] = None, from_date: str = "2022-03-22", to_date: str = None, items_per_page: int = 100, @@ -254,7 +254,7 @@ def get_response( It gets the response from the API queried and transforms it into DataFrame. Args: - source (Literal["jobs", "product", "company", "survey"], optional): + endpoint (Literal["jobs", "product", "company", "survey"], optional): The endpoint source to be accessed. Defaults to None. from_date (str, optional): Start date for the query, by default is the oldest date in the data 2022-03-22. @@ -274,7 +274,7 @@ def get_response( ValidationError: If any source different than the ones in the list are used. """ headers = self.headers - if source not in ["jobs", "product", "company", "survey"]: + if endpoint not in ["jobs", "product", "company", "survey"]: raise ValidationError( "The source has to be: jobs, product, company or survey" ) @@ -282,7 +282,7 @@ def get_response( to_date = datetime.today().strftime("%Y-%m-%d") response, first_url = self.check_connection( - source=source, + endpoint=endpoint, from_date=from_date, to_date=to_date, items_per_page=items_per_page, @@ -317,7 +317,7 @@ def get_response( response = response_api.json() df_page = pd.json_normalize(response["data"]) df_page = pd.DataFrame(df_page) - if source == "product": + if endpoint == "product": df_page = df_page.transpose() length = df_page.shape[0] df = pd.concat((df, df_page), axis=0) @@ -354,7 +354,7 @@ def to_df( for start, end in zip(starts, ends): logger.info(f"ingesting data for dates [{start}]-[{end}]...") df = self.get_response( - source=self.source, + endpoint=self.endpoint, from_date=start, to_date=end, items_per_page=self.items_per_page, @@ -367,7 +367,7 @@ def to_df( df = pd.DataFrame(dfs_list[0]) else: df = self.get_response( - source=self.source, + endpoint=self.endpoint, from_date=self.from_date, to_date=self.to_date, items_per_page=self.items_per_page, diff --git a/tests/integration/orchestration/prefect/flows/test_vid_club.py b/tests/integration/orchestration/prefect/flows/test_vid_club.py index 0780f8601..1efff2206 100644 --- a/tests/integration/orchestration/prefect/flows/test_vid_club.py +++ b/tests/integration/orchestration/prefect/flows/test_vid_club.py @@ -15,7 +15,7 @@ def test_vid_club_to_adls(): assert not lake.exists(TEST_FILE_PATH) vid_club_to_adls( - source=TEST_SOURCE, + endpoint=TEST_SOURCE, from_date=TEST_FROM_DATE, to_date=TEST_TO_DATE, adls_path=TEST_FILE_PATH, diff --git a/tests/integration/orchestration/prefect/tasks/test_vid_club.py b/tests/integration/orchestration/prefect/tasks/test_vid_club.py index 67b48974f..8a5d6001e 100644 --- a/tests/integration/orchestration/prefect/tasks/test_vid_club.py +++ b/tests/integration/orchestration/prefect/tasks/test_vid_club.py @@ -24,7 +24,7 @@ def test_vid_club_to_df(mocker): df = vid_club_to_df( - source="company", + endpoint="company", from_date="2023-01-01", to_date="2023-12-31", items_per_page=100, @@ -42,7 +42,7 @@ def test_vid_club_to_df_missing_credentials(mocker): with pytest.raises(MissingSourceCredentialsError): vid_club_to_df( - source="company", + endpoint="company", from_date="2023-01-01", to_date="2023-12-31", items_per_page=100, diff --git a/tests/unit/test_vid_club.py b/tests/unit/test_vid_club.py index 287139703..ccd411ad5 100644 --- a/tests/unit/test_vid_club.py +++ b/tests/unit/test_vid_club.py @@ -7,7 +7,7 @@ def setUp(self): """Setup VidClub instance before each test.""" # Sample input data for the constructor self.vid_club = VidClub( - source="jobs", + endpoint="jobs", vid_club_credentials={"token": "test-token"} ) @@ -18,7 +18,7 @@ def test_build_query(self): to_date = "2023-01-31" api_url = "https://example.com/api/" items_per_page = 50 - source = "jobs" + endpoint = "jobs" region = "pl" # Expected result URL @@ -27,7 +27,7 @@ def test_build_query(self): ) # Check if the method returns the correct URL - result_url = self.vid_club.build_query(from_date, to_date, api_url, items_per_page, source, region) + result_url = self.vid_club.build_query(from_date, to_date, api_url, items_per_page, endpoint, region) self.assertEqual(result_url.strip(), expected_url.strip()) def test_intervals(self): From 209bd3b7505022a52e4db2c5b1dd236e24e2be57 Mon Sep 17 00:00:00 2001 From: adrian-wojcik Date: Tue, 24 Sep 2024 11:47:18 +0200 Subject: [PATCH 05/17] =?UTF-8?q?=F0=9F=8E=A8=20Change=20imports=20structu?= =?UTF-8?q?re?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/viadot/sources/vid_club.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/viadot/sources/vid_club.py b/src/viadot/sources/vid_club.py index 499eed775..ee462b8de 100644 --- a/src/viadot/sources/vid_club.py +++ b/src/viadot/sources/vid_club.py @@ -5,9 +5,9 @@ import pandas as pd import logging -from ..exceptions import ValidationError -from ..utils import handle_api_response -from .base import Source +from viadot.exceptions import ValidationError +from viadot.utils import handle_api_response +from viadot.sources.base import Source # Configure logging From a3a9630b5ed93bfe0f12f40680bd53bad79cfb04 Mon Sep 17 00:00:00 2001 From: adrian-wojcik Date: Tue, 24 Sep 2024 11:48:56 +0200 Subject: [PATCH 06/17] =?UTF-8?q?=F0=9F=90=9B=20Fix=20import=20bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/unit/test_vid_club.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/test_vid_club.py b/tests/unit/test_vid_club.py index ccd411ad5..a34acefc6 100644 --- a/tests/unit/test_vid_club.py +++ b/tests/unit/test_vid_club.py @@ -1,6 +1,6 @@ import unittest from datetime import datetime -from your_module import VidClub, ValidationError +from src.viadot.sources.vid_club import VidClub, ValidationError class TestVidClub(unittest.TestCase): def setUp(self): From e28d9abf8680da8341cb5413d93d61e369f68eb8 Mon Sep 17 00:00:00 2001 From: adrian-wojcik Date: Tue, 24 Sep 2024 11:56:56 +0200 Subject: [PATCH 07/17] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20Refactor=20credentia?= =?UTF-8?q?l=20passing=20method=20with=20returning=20data=20frame=20on=20t?= =?UTF-8?q?ask=20level?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../prefect/flows/vid_club_to_adls.py | 20 +++++++------- .../orchestration/prefect/tasks/vid_club.py | 26 +++++++++---------- 2 files changed, 22 insertions(+), 24 deletions(-) diff --git a/src/viadot/orchestration/prefect/flows/vid_club_to_adls.py b/src/viadot/orchestration/prefect/flows/vid_club_to_adls.py index 2441c9aa3..2d0a6013a 100644 --- a/src/viadot/orchestration/prefect/flows/vid_club_to_adls.py +++ b/src/viadot/orchestration/prefect/flows/vid_club_to_adls.py @@ -22,8 +22,8 @@ def vid_club_to_adls( region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] = None, days_interval: int = 30, cols_to_drop: List[str] = None, - vid_club_credentials: Dict[str, Any] = None, - vidclub_credentials_secret: str = "VIDCLUB", + config_key: str | None = None, + azure_key_vault_secret: str | None = None, adls_config_key: str | None = None, adls_azure_key_vault_secret: str | None = None, adls_path: str | None = None, @@ -38,8 +38,8 @@ def vid_club_to_adls( Then upload it to Azure Data Lake. Args: - endpoint (Literal["jobs", "product", "company", "survey"], optional): The endpoint - source to be accessed. Defaults to None. + endpoint (Literal["jobs", "product", "company", "survey"], optional): The + endpoint source to be accessed. Defaults to None. from_date (str, optional): Start date for the query, by default is the oldest date in the data 2022-03-22. to_date (str, optional): End date for the query. By default None, @@ -51,10 +51,10 @@ def vid_club_to_adls( days_interval (int, optional): Days specified in date range per API call (test showed that 30-40 is optimal for performance). Defaults to 30. cols_to_drop (List[str], optional): List of columns to drop. Defaults to None. - vid_club_credentials (Dict[str, Any], optional): Stores the credentials - information. Defaults to None. - vidclub_credentials_secret (str, optional): The name of the secret in - Azure Key Vault or Prefect or local_config file. Defaults to "VIDCLUB". + config_key (str, optional): The key in the viadot config holding relevant + credentials. Defaults to None. + azure_key_vault_secret (Optional[str], optional): The name of the Azure Key + Vault secret where credentials are stored. Defaults to None. adls_config_key (Optional[str], optional): The key in the viadot config holding relevant credentials. Defaults to None. adls_azure_key_vault_secret (Optional[str], optional): The name of the Azure Key @@ -81,8 +81,8 @@ def vid_club_to_adls( region=region, days_interval=days_interval, cols_to_drop=cols_to_drop, - vid_club_credentials=vid_club_credentials, - vidclub_credentials_secret=vidclub_credentials_secret, + config_key=config_key, + azure_key_vault_secret=azure_key_vault_secret, validate_df_dict=validate_df_dict, timeout=timeout, kawrgs=kwargs diff --git a/src/viadot/orchestration/prefect/tasks/vid_club.py b/src/viadot/orchestration/prefect/tasks/vid_club.py index 452130d3f..bf587fd7d 100644 --- a/src/viadot/orchestration/prefect/tasks/vid_club.py +++ b/src/viadot/orchestration/prefect/tasks/vid_club.py @@ -18,8 +18,8 @@ def vid_club_to_df( region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] = None, days_interval: int = 30, cols_to_drop: List[str] = None, - vid_club_credentials: Dict[str, Any] = None, - vidclub_credentials_secret: str = "VIDCLUB", + azure_key_vault_secret: str | None = None, + adls_config_key: str | None = None, validate_df_dict: dict = None, timeout: int = 3600, **kwargs: Dict[str, Any], @@ -41,10 +41,10 @@ def vid_club_to_df( days_interval (int, optional): Days specified in date range per API call (test showed that 30-40 is optimal for performance). Defaults to 30. cols_to_drop (List[str], optional): List of columns to drop. Defaults to None. - vid_club_credentials (Dict[str, Any], optional): Stores the credentials - information. Defaults to None. - vidclub_credentials_secret (str, optional): The name of the secret in - Azure Key Vault or Prefect or local_config file. Defaults to "VIDCLUB". + config_key (str, optional): The key in the viadot config holding relevant + credentials. Defaults to None. + azure_key_vault_secret (Optional[str], optional): The name of the Azure Key + Vault secret where credentials are stored. Defaults to None. validate_df_dict (dict, optional): A dictionary with optional list of tests to verify the output dataframe. If defined, triggers the `validate_df` task from task_utils. @@ -54,12 +54,12 @@ def vid_club_to_df( Returns: Pandas DataFrame """ - if not vid_club_credentials: - vid_club_credentials = get_credentials(vidclub_credentials_secret) - - if not vid_club_credentials: + if not (azure_key_vault_secret or adls_config_key): raise MissingSourceCredentialsError + if not adls_config_key: + credentials = get_credentials(azure_key_vault_secret) + vc_obj = VidClub( args=args, endpoint=endpoint, @@ -69,12 +69,10 @@ def vid_club_to_df( region=region, days_interval=days_interval, cols_to_drop=cols_to_drop, - vid_club_credentials=vid_club_credentials, + vid_club_credentials=credentials, validate_df_dict=validate_df_dict, timeout=timeout, kwargs=kwargs ) - vc_dataframe = vc_obj.to_df() - - return vc_dataframe + return vc_obj.to_df() From eeeda0235148006b4530df68ffb611dd4b0c3336 Mon Sep 17 00:00:00 2001 From: rziemianek Date: Mon, 30 Sep 2024 11:53:04 +0200 Subject: [PATCH 08/17] =?UTF-8?q?=F0=9F=8E=A8=20Improved=20code=20structur?= =?UTF-8?q?e=20for=20VidClub=20source?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/viadot/sources/vid_club.py | 114 +++++++++++++++------------------ 1 file changed, 52 insertions(+), 62 deletions(-) diff --git a/src/viadot/sources/vid_club.py b/src/viadot/sources/vid_club.py index ee462b8de..fbe8abc7a 100644 --- a/src/viadot/sources/vid_club.py +++ b/src/viadot/sources/vid_club.py @@ -1,23 +1,23 @@ """Vid Club Cloud API connector.""" from datetime import datetime, timedelta -from typing import Any, Dict, List, Literal, Tuple +import logging +from typing import Any, Literal + import pandas as pd -import logging -from viadot.exceptions import ValidationError -from viadot.utils import handle_api_response -from viadot.sources.base import Source +from viadot.exceptions import ValidationError +from viadot.sources.base import Source +from viadot.utils import handle_api_response # Configure logging -logging.basicConfig(level=logging.INFO) +logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class VidClub(Source): - """ - A class implementing the Vid Club API. + """A class implementing the Vid Club API. Documentation for this API is located at: https://evps01.envoo.net/vipapi/ There are 4 endpoints where to get the data. @@ -26,20 +26,19 @@ class VidClub(Source): def __init__( self, *args, - endpoint: Literal["jobs", "product", "company", "survey"] = None, + endpoint: Literal["jobs", "product", "company", "survey"] | None = None, from_date: str = "2022-03-22", - to_date: str = None, + to_date: str | None = None, items_per_page: int = 100, - region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] = None, + region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] | None = None, days_interval: int = 30, - cols_to_drop: List[str] = None, - vid_club_credentials: Dict[str, Any] = None, - validate_df_dict: dict = None, + cols_to_drop: list[str] | None = None, + vid_club_credentials: dict[str, Any] | None = None, + validate_df_dict: dict | None = None, timeout: int = 3600, - **kwargs + **kwargs, ): - """ - Create an instance of VidClub. + """Create an instance of VidClub. Args: endpoint (Literal["jobs", "product", "company", "survey"], optional): The @@ -82,7 +81,7 @@ def __init__( "Content-Type": "application/json", } - super().__init__(credentials=vid_club_credentials,*args, **kwargs) + super().__init__(credentials=vid_club_credentials, *args, **kwargs) # noqa: B026 def build_query( self, @@ -90,11 +89,10 @@ def build_query( to_date: str, api_url: str, items_per_page: int, - endpoint: Literal["jobs", "product", "company", "survey"] = None, - region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] = None, + endpoint: Literal["jobs", "product", "company", "survey"] | None = None, + region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] | None = None, ) -> str: - """ - Builds the query from the inputs. + """Builds the query from the inputs. Args: from_date (str): Start date for the query. @@ -122,16 +120,14 @@ def build_query( elif endpoint == "survey": url = f"{api_url}{endpoint}?language=en&type=question" else: - raise ValidationError( - "Pick one these sources: jobs, product, company, survey" - ) + msg = "Pick one these sources: jobs, product, company, survey" + raise ValidationError(msg) return url def intervals( self, from_date: str, to_date: str, days_interval: int - ) -> Tuple[List[str], List[str]]: - """ - Breaks dates range into smaller by provided days interval. + ) -> tuple[list[str], list[str]]: + """Breaks dates range into smaller by provided days interval. Args: from_date (str): Start date for the query in "%Y-%m-%d" format. @@ -159,7 +155,8 @@ def intervals( delta = to_date_obj - from_date_obj if delta.days < 0: - raise ValidationError("to_date cannot be earlier than from_date.") + msg = "to_date cannot be earlier than from_date." + raise ValidationError(msg) interval = timedelta(days=days_interval) starts = [] @@ -178,15 +175,14 @@ def intervals( def check_connection( self, - endpoint: Literal["jobs", "product", "company", "survey"] = None, + endpoint: Literal["jobs", "product", "company", "survey"] | None = None, from_date: str = "2022-03-22", - to_date: str = None, + to_date: str | None = None, items_per_page: int = 100, - region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] = None, - url: str = None, - ) -> Tuple[Dict[str, Any], str]: - """ - Initiate first connection to API to retrieve piece of data. + region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] | None = None, + url: str | None = None, + ) -> tuple[dict[str, Any], str]: + """Initiate first connection to API to retrieve piece of data. With information about type of pagination in API URL. This option is added because type of pagination for endpoints is being changed @@ -217,10 +213,12 @@ def check_connection( ValidationError: If to_date is earlier than from_date. """ if from_date < "2022-03-22": - raise ValidationError("from_date cannot be earlier than 2022-03-22.") + msg = "from_date cannot be earlier than 2022-03-22." + raise ValidationError(msg) if to_date < from_date: - raise ValidationError("to_date cannot be earlier than from_date.") + msg = "to_date cannot be earlier than from_date." + raise ValidationError(msg) if url is None: url = self.credentials["url"] @@ -234,22 +232,19 @@ def check_connection( region=region, ) headers = self.headers - response = handle_api_response( - url=first_url, headers=headers, method="GET" - ) + response = handle_api_response(url=first_url, headers=headers, method="GET") response = response.json() return (response, first_url) def get_response( self, - endpoint: Literal["jobs", "product", "company", "survey"] = None, + endpoint: Literal["jobs", "product", "company", "survey"] | None = None, from_date: str = "2022-03-22", - to_date: str = None, + to_date: str | None = None, items_per_page: int = 100, - region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] = None, + region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] | None = None, ) -> pd.DataFrame: - """ - Basing on the pagination type retrieved using check_connection function. + """Basing on the pagination type retrieved using check_connection function. It gets the response from the API queried and transforms it into DataFrame. @@ -267,7 +262,7 @@ def get_response( (parameter is not used in url). [December 2023 status: value 'all' does not work for company and jobs] - Returns + Returns: pd.DataFrame: Table of the data carried in the response. Raises: @@ -275,9 +270,8 @@ def get_response( """ headers = self.headers if endpoint not in ["jobs", "product", "company", "survey"]: - raise ValidationError( - "The source has to be: jobs, product, company or survey" - ) + msg = "The source has to be: jobs, product, company or survey" + raise ValidationError(msg) if to_date is None: to_date = datetime.today().strftime("%Y-%m-%d") @@ -330,8 +324,7 @@ def to_df( self, if_empty: str = "warn", ) -> pd.DataFrame: - """ - Looping get_response and iterating by date ranges defined in intervals. + """Looping get_response and iterating by date ranges defined in intervals. Stores outputs as DataFrames in a list. At the end, daframes are concatenated in one and dropped duplicates that would appear when quering. @@ -346,12 +339,12 @@ def to_df( starts, ends = self.intervals( from_date=self.from_date, to_date=self.to_date, - days_interval=self.days_interval + days_interval=self.days_interval, ) dfs_list = [] if len(starts) > 0 and len(ends) > 0: - for start, end in zip(starts, ends): + for start, end in zip(starts, ends, strict=False): logger.info(f"ingesting data for dates [{start}]-[{end}]...") df = self.get_response( endpoint=self.endpoint, @@ -373,9 +366,7 @@ def to_df( items_per_page=self.items_per_page, region=self.region, ) - list_columns = df.columns[ - df.map(lambda x: isinstance(x, list)).any() - ].tolist() + list_columns = df.columns[df.map(lambda x: isinstance(x, list)).any()].tolist() for i in list_columns: df[i] = df[i].apply(lambda x: tuple(x) if isinstance(x, list) else x) df.drop_duplicates(inplace=True) @@ -384,17 +375,16 @@ def to_df( if isinstance(self.cols_to_drop, list): try: logger.info(f"Dropping following columns: {self.cols_to_drop}...") - df.drop( - columns=self.cols_to_drop, inplace=True, errors="raise" - ) + df.drop(columns=self.cols_to_drop, inplace=True, errors="raise") except KeyError: - logger.error( + logger.exception( f"""Column(s): {self.cols_to_drop} don't exist in the DataFrame. No columns were dropped. Returning full DataFrame...""" ) logger.info(f"Existing columns: {df.columns}") else: - raise TypeError("Provide columns to drop in a List.") + msg = "Provide columns to drop in a List." + raise TypeError(msg) if df.empty: logger.error("No data for this date range") From 5c991e45430404f22c2d9fa8d544ccaa8941c96e Mon Sep 17 00:00:00 2001 From: rziemianek Date: Mon, 30 Sep 2024 11:59:56 +0200 Subject: [PATCH 09/17] =?UTF-8?q?=F0=9F=90=9B=20Modified=20building=20`url?= =?UTF-8?q?`?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/viadot/sources/vid_club.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/viadot/sources/vid_club.py b/src/viadot/sources/vid_club.py index fbe8abc7a..143e92d8d 100644 --- a/src/viadot/sources/vid_club.py +++ b/src/viadot/sources/vid_club.py @@ -115,8 +115,10 @@ def build_query( """ if endpoint in ["jobs", "product", "company"]: region_url_string = f"®ion={region}" if region else "" - url = f"""{api_url}{endpoint}?from={from_date}&to={to_date} - {region_url_string}&limit={items_per_page}""" + url = ( + f"""{api_url}{endpoint}?from={from_date}&to={to_date}""" + f"""{region_url_string}&limit={items_per_page}""" + ) elif endpoint == "survey": url = f"{api_url}{endpoint}?language=en&type=question" else: From b5c0405ad399f03768630c2926128a6e228bcff2 Mon Sep 17 00:00:00 2001 From: rziemianek Date: Mon, 30 Sep 2024 12:00:24 +0200 Subject: [PATCH 10/17] =?UTF-8?q?=E2=9C=85=20=20Improved=20tests=20code=20?= =?UTF-8?q?structure?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/unit/test_vid_club.py | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/tests/unit/test_vid_club.py b/tests/unit/test_vid_club.py index a34acefc6..db8b27b29 100644 --- a/tests/unit/test_vid_club.py +++ b/tests/unit/test_vid_club.py @@ -1,14 +1,16 @@ import unittest -from datetime import datetime -from src.viadot.sources.vid_club import VidClub, ValidationError + +import pytest + +from viadot.sources.vid_club import ValidationError, VidClub + class TestVidClub(unittest.TestCase): def setUp(self): """Setup VidClub instance before each test.""" # Sample input data for the constructor self.vid_club = VidClub( - endpoint="jobs", - vid_club_credentials={"token": "test-token"} + endpoint="jobs", vid_club_credentials={"token": "test-token"} ) def test_build_query(self): @@ -22,14 +24,14 @@ def test_build_query(self): region = "pl" # Expected result URL - expected_url = ( - "https://example.com/api/jobs?from=2023-01-01&to=2023-01-31®ion=pl&limit=50" - ) + expected_url = "https://example.com/api/jobs?from=2023-01-01&to=2023-01-31®ion=pl&limit=50" # Check if the method returns the correct URL - result_url = self.vid_club.build_query(from_date, to_date, api_url, items_per_page, endpoint, region) - self.assertEqual(result_url.strip(), expected_url.strip()) - + result_url = self.vid_club.build_query( + from_date, to_date, api_url, items_per_page, endpoint, region + ) + assert result_url == expected_url + def test_intervals(self): """Test breaking date range into intervals based on the days_interval.""" # Sample input data for the intervals method @@ -43,8 +45,8 @@ def test_intervals(self): # Check if the method returns correct intervals starts, ends = self.vid_club.intervals(from_date, to_date, days_interval) - self.assertEqual(starts, expected_starts) - self.assertEqual(ends, expected_ends) + assert starts == expected_starts + assert ends == expected_ends def test_intervals_invalid_date_range(self): """Test that ValidationError is raised when to_date is before from_date.""" @@ -54,5 +56,5 @@ def test_intervals_invalid_date_range(self): days_interval = 5 # Check if ValidationError is raised - with self.assertRaises(ValidationError): - self.vid_club.intervals(from_date, to_date, days_interval) \ No newline at end of file + with pytest.raises(ValidationError): + self.vid_club.intervals(from_date, to_date, days_interval) From 781e6393b829eb2e083ed041c9a3a7874f2f4c37 Mon Sep 17 00:00:00 2001 From: rziemianek Date: Tue, 1 Oct 2024 09:33:18 +0200 Subject: [PATCH 11/17] =?UTF-8?q?=F0=9F=8E=A8=20Modified=20code=20structur?= =?UTF-8?q?e?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../prefect/flows/vid_club_to_adls.py | 27 ++++++++-------- .../orchestration/prefect/tasks/vid_club.py | 31 ++++++++++--------- .../prefect/flows/test_vid_club.py | 11 ++++--- .../prefect/tasks/test_vid_club.py | 25 ++++++++------- 4 files changed, 49 insertions(+), 45 deletions(-) diff --git a/src/viadot/orchestration/prefect/flows/vid_club_to_adls.py b/src/viadot/orchestration/prefect/flows/vid_club_to_adls.py index 2d0a6013a..8ba05ab25 100644 --- a/src/viadot/orchestration/prefect/flows/vid_club_to_adls.py +++ b/src/viadot/orchestration/prefect/flows/vid_club_to_adls.py @@ -1,8 +1,10 @@ """Download data from Vid CLub API and load it into Azure Data Lake Storage.""" -from typing import Any, Dict, List, Literal +from typing import Any, Literal + from prefect import flow from prefect.task_runners import ConcurrentTaskRunner + from viadot.orchestration.prefect.tasks import df_to_adls, vid_club_to_df @@ -13,32 +15,31 @@ retry_delay_seconds=60, task_runner=ConcurrentTaskRunner, ) -def vid_club_to_adls( - *args: List[Any], - endpoint: Literal["jobs", "product", "company", "survey"] = None, +def vid_club_to_adls( # noqa: PLR0913 + *args: list[Any], + endpoint: Literal["jobs", "product", "company", "survey"] | None = None, from_date: str = "2022-03-22", - to_date: str = None, + to_date: str | None = None, items_per_page: int = 100, - region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] = None, + region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] | None = None, days_interval: int = 30, - cols_to_drop: List[str] = None, + cols_to_drop: list[str] | None = None, config_key: str | None = None, azure_key_vault_secret: str | None = None, adls_config_key: str | None = None, adls_azure_key_vault_secret: str | None = None, adls_path: str | None = None, adls_path_overwrite: bool = False, - validate_df_dict: dict = None, + validate_df_dict: dict | None = None, timeout: int = 3600, - **kwargs: Dict[str, Any] + **kwargs: dict[str, Any], ) -> None: - """ - Flow for downloading data from the Vid Club via API to a CSV or Parquet file. + """Flow for downloading data from the Vid Club via API to a CSV or Parquet file. Then upload it to Azure Data Lake. Args: - endpoint (Literal["jobs", "product", "company", "survey"], optional): The + endpoint (Literal["jobs", "product", "company", "survey"], optional): The endpoint source to be accessed. Defaults to None. from_date (str, optional): Start date for the query, by default is the oldest date in the data 2022-03-22. @@ -85,7 +86,7 @@ def vid_club_to_adls( azure_key_vault_secret=azure_key_vault_secret, validate_df_dict=validate_df_dict, timeout=timeout, - kawrgs=kwargs + kawrgs=kwargs, ) return df_to_adls( diff --git a/src/viadot/orchestration/prefect/tasks/vid_club.py b/src/viadot/orchestration/prefect/tasks/vid_club.py index bf587fd7d..39ff1b188 100644 --- a/src/viadot/orchestration/prefect/tasks/vid_club.py +++ b/src/viadot/orchestration/prefect/tasks/vid_club.py @@ -1,31 +1,32 @@ """Task for downloading data from Vid Club Cloud API.""" -from typing import Any, Dict, List, Literal +from typing import Any, Literal + import pandas as pd from prefect import task -from viadot.sources import VidClub -from viadot.orchestration.prefect.utils import get_credentials + from viadot.orchestration.prefect.exceptions import MissingSourceCredentialsError +from viadot.orchestration.prefect.utils import get_credentials +from viadot.sources import VidClub @task(retries=3, log_prints=True, retry_delay_seconds=10, timeout_seconds=2 * 60 * 60) -def vid_club_to_df( - *args: List[Any], - endpoint: Literal["jobs", "product", "company", "survey"] = None, +def vid_club_to_df( # noqa: PLR0913 + *args: list[Any], + endpoint: Literal["jobs", "product", "company", "survey"] | None = None, from_date: str = "2022-03-22", - to_date: str = None, + to_date: str | None = None, items_per_page: int = 100, - region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] = None, + region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] | None = None, days_interval: int = 30, - cols_to_drop: List[str] = None, + cols_to_drop: list[str] | None = None, azure_key_vault_secret: str | None = None, adls_config_key: str | None = None, - validate_df_dict: dict = None, + validate_df_dict: dict | None = None, timeout: int = 3600, - **kwargs: Dict[str, Any], + **kwargs: dict[str, Any], ) -> pd.DataFrame: - """ - Task to downloading data from Vid Club APIs to Pandas DataFrame. + """Task to downloading data from Vid Club APIs to Pandas DataFrame. Args: endpoint (Literal["jobs", "product", "company", "survey"], optional): The endpoint @@ -72,7 +73,7 @@ def vid_club_to_df( vid_club_credentials=credentials, validate_df_dict=validate_df_dict, timeout=timeout, - kwargs=kwargs - ) + kwargs=kwargs, + ) return vc_obj.to_df() diff --git a/tests/integration/orchestration/prefect/flows/test_vid_club.py b/tests/integration/orchestration/prefect/flows/test_vid_club.py index 1efff2206..39cb43a16 100644 --- a/tests/integration/orchestration/prefect/flows/test_vid_club.py +++ b/tests/integration/orchestration/prefect/flows/test_vid_club.py @@ -1,17 +1,18 @@ from src.viadot.orchestration.prefect.flows import vid_club_to_adls from src.viadot.sources import AzureDataLake + TEST_FILE_PATH = "test/path/to/adls.parquet" -TEST_SOURCE = "jobs" +TEST_SOURCE = "jobs" TEST_FROM_DATE = "2023-01-01" TEST_TO_DATE = "2023-12-31" -ADLS_CREDENTIALS_SECRET = "test_adls_secret" -VIDCLUB_CREDENTIALS_SECRET = "test_vidclub_secret" +ADLS_CREDENTIALS_SECRET = "test_adls_secret" # noqa: S105 +VIDCLUB_CREDENTIALS_SECRET = "test_vidclub_secret" # noqa: S105 def test_vid_club_to_adls(): lake = AzureDataLake(config_key="adls_test") - + assert not lake.exists(TEST_FILE_PATH) vid_club_to_adls( @@ -20,7 +21,7 @@ def test_vid_club_to_adls(): to_date=TEST_TO_DATE, adls_path=TEST_FILE_PATH, adls_azure_key_vault_secret=ADLS_CREDENTIALS_SECRET, - vidclub_credentials_secret=VIDCLUB_CREDENTIALS_SECRET + vidclub_credentials_secret=VIDCLUB_CREDENTIALS_SECRET, ) assert lake.exists(TEST_FILE_PATH) diff --git a/tests/integration/orchestration/prefect/tasks/test_vid_club.py b/tests/integration/orchestration/prefect/tasks/test_vid_club.py index 8a5d6001e..2d39001bc 100644 --- a/tests/integration/orchestration/prefect/tasks/test_vid_club.py +++ b/tests/integration/orchestration/prefect/tasks/test_vid_club.py @@ -1,18 +1,18 @@ -import pytest import pandas as pd -from src.viadot.orchestration.prefect.tasks import vid_club_to_df +import pytest + from src.viadot.orchestration.prefect.exceptions import MissingSourceCredentialsError +from src.viadot.orchestration.prefect.tasks import vid_club_to_df -EXPECTED_DF = pd.DataFrame({ - "id": [1, 2], - "name": ["Company A", "Company B"], - "region": ["pl", "ro"] -}) +EXPECTED_DF = pd.DataFrame( + {"id": [1, 2], "name": ["Company A", "Company B"], "region": ["pl", "ro"]} +) class MockVidClub: def __init__(self, *args, **kwargs): + """Init method.""" pass def to_df(self): @@ -20,8 +20,7 @@ def to_df(self): def test_vid_club_to_df(mocker): - mocker.patch('viadot.orchestration.prefect.tasks.VidClub', new=MockVidClub) - + mocker.patch("viadot.orchestration.prefect.tasks.VidClub", new=MockVidClub) df = vid_club_to_df( endpoint="company", @@ -29,7 +28,7 @@ def test_vid_club_to_df(mocker): to_date="2023-12-31", items_per_page=100, region="pl", - vidclub_credentials_secret="VIDCLUB" + vidclub_credentials_secret="VIDCLUB", # pragma: allowlist secret # noqa: S106 ) assert isinstance(df, pd.DataFrame) @@ -38,7 +37,9 @@ def test_vid_club_to_df(mocker): def test_vid_club_to_df_missing_credentials(mocker): - mocker.patch('viadot.orchestration.prefect.tasks.get_credentials', return_value=None) + mocker.patch( + "viadot.orchestration.prefect.tasks.get_credentials", return_value=None + ) with pytest.raises(MissingSourceCredentialsError): vid_club_to_df( @@ -47,5 +48,5 @@ def test_vid_club_to_df_missing_credentials(mocker): to_date="2023-12-31", items_per_page=100, region="pl", - vidclub_credentials_secret="VIDCLUB" + vidclub_credentials_secret="VIDCLUB", # pragma: allowlist secret # noqa: S106 ) From 4185ce26b302f2e0dbebc7bef782459ba2de5a3f Mon Sep 17 00:00:00 2001 From: rziemianek Date: Tue, 1 Oct 2024 09:36:02 +0200 Subject: [PATCH 12/17] =?UTF-8?q?=F0=9F=8E=A8=20Modified=20code=20structur?= =?UTF-8?q?e?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/viadot/sources/vid_club.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/viadot/sources/vid_club.py b/src/viadot/sources/vid_club.py index 143e92d8d..b3acfe7c4 100644 --- a/src/viadot/sources/vid_club.py +++ b/src/viadot/sources/vid_club.py @@ -42,9 +42,9 @@ def __init__( Args: endpoint (Literal["jobs", "product", "company", "survey"], optional): The - endpoint source to be accessed. Defaults to None. + endpoint source to be accessed. Defaults to None. from_date (str, optional): Start date for the query, by default is the - oldest date in the data 2022-03-22. + oldest date in the data 2022-03-22. to_date (str, optional): End date for the query. By default None, which will be executed as datetime.today().strftime("%Y-%m-%d") in code. items_per_page (int, optional): Number of entries per page. Defaults to 100. @@ -324,7 +324,6 @@ def get_response( def to_df( self, - if_empty: str = "warn", ) -> pd.DataFrame: """Looping get_response and iterating by date ranges defined in intervals. From 5e450e06d4a2cf879db36b478038e539f959dc7b Mon Sep 17 00:00:00 2001 From: rziemianek Date: Tue, 1 Oct 2024 09:36:31 +0200 Subject: [PATCH 13/17] =?UTF-8?q?=F0=9F=8E=A8=20Modified=20code=20structur?= =?UTF-8?q?e?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/viadot/orchestration/prefect/tasks/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/viadot/orchestration/prefect/tasks/__init__.py b/src/viadot/orchestration/prefect/tasks/__init__.py index 72133b5f3..db43224a7 100644 --- a/src/viadot/orchestration/prefect/tasks/__init__.py +++ b/src/viadot/orchestration/prefect/tasks/__init__.py @@ -30,7 +30,6 @@ from .vid_club import vid_club_to_df - __all__ = [ "adls_upload", "bcp", From 910b7f2a0afb90da1ce012db086b8a903f42dbeb Mon Sep 17 00:00:00 2001 From: rziemianek Date: Tue, 1 Oct 2024 09:39:53 +0200 Subject: [PATCH 14/17] =?UTF-8?q?=F0=9F=8E=A8=20Moved=20description=20to?= =?UTF-8?q?=20the=20new=20line?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/viadot/orchestration/prefect/tasks/vid_club.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/viadot/orchestration/prefect/tasks/vid_club.py b/src/viadot/orchestration/prefect/tasks/vid_club.py index 39ff1b188..b74380c7f 100644 --- a/src/viadot/orchestration/prefect/tasks/vid_club.py +++ b/src/viadot/orchestration/prefect/tasks/vid_club.py @@ -29,8 +29,8 @@ def vid_club_to_df( # noqa: PLR0913 """Task to downloading data from Vid Club APIs to Pandas DataFrame. Args: - endpoint (Literal["jobs", "product", "company", "survey"], optional): The endpoint - source to be accessed. Defaults to None. + endpoint (Literal["jobs", "product", "company", "survey"], optional): + The endpoint source to be accessed. Defaults to None. from_date (str, optional): Start date for the query, by default is the oldest date in the data 2022-03-22. to_date (str, optional): End date for the query. By default None, From 76a76c54488e80fe5f763850a93b8ff139b022a3 Mon Sep 17 00:00:00 2001 From: rziemianek Date: Tue, 1 Oct 2024 09:41:09 +0200 Subject: [PATCH 15/17] =?UTF-8?q?=F0=9F=8E=A8=20Added=20`#=20pragma:=20all?= =?UTF-8?q?owlist=20secret`?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../orchestration/prefect/flows/test_vid_club.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/integration/orchestration/prefect/flows/test_vid_club.py b/tests/integration/orchestration/prefect/flows/test_vid_club.py index 39cb43a16..7053e04f1 100644 --- a/tests/integration/orchestration/prefect/flows/test_vid_club.py +++ b/tests/integration/orchestration/prefect/flows/test_vid_club.py @@ -6,8 +6,10 @@ TEST_SOURCE = "jobs" TEST_FROM_DATE = "2023-01-01" TEST_TO_DATE = "2023-12-31" -ADLS_CREDENTIALS_SECRET = "test_adls_secret" # noqa: S105 -VIDCLUB_CREDENTIALS_SECRET = "test_vidclub_secret" # noqa: S105 +ADLS_CREDENTIALS_SECRET = "test_adls_secret" # pragma: allowlist secret # noqa: S105 +VIDCLUB_CREDENTIALS_SECRET = ( + "test_vidclub_secret" # pragma: allowlist secret # noqa: S105 +) def test_vid_club_to_adls(): From 59f59568e9cbe4377643b8f50630102f3e15bf30 Mon Sep 17 00:00:00 2001 From: rziemianek Date: Wed, 2 Oct 2024 12:51:15 +0200 Subject: [PATCH 16/17] =?UTF-8?q?=F0=9F=8E=A8=20Modified=20loggers=20and?= =?UTF-8?q?=20added=20`if=5Fempty`=20param?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/viadot/sources/vid_club.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/src/viadot/sources/vid_club.py b/src/viadot/sources/vid_club.py index b3acfe7c4..130cfb757 100644 --- a/src/viadot/sources/vid_club.py +++ b/src/viadot/sources/vid_club.py @@ -324,6 +324,7 @@ def get_response( def to_df( self, + if_empty: Literal["warn", "skip", "fail"] = "warn", ) -> pd.DataFrame: """Looping get_response and iterating by date ranges defined in intervals. @@ -331,8 +332,8 @@ def to_df( in one and dropped duplicates that would appear when quering. Args: - if_empty (str, optional): What to do if a fetch produce no data. - Defaults to "warn + if_empty (Literal["warn", "skip", "fail"], optional): What to do if a fetch + produce no data. Defaults to "warn Returns: pd.DataFrame: Dataframe of the concatanated data carried in the responses. @@ -346,7 +347,7 @@ def to_df( dfs_list = [] if len(starts) > 0 and len(ends) > 0: for start, end in zip(starts, ends, strict=False): - logger.info(f"ingesting data for dates [{start}]-[{end}]...") + self.logger.info(f"ingesting data for dates [{start}]-[{end}]...") df = self.get_response( endpoint=self.endpoint, from_date=start, @@ -375,19 +376,21 @@ def to_df( if self.cols_to_drop is not None: if isinstance(self.cols_to_drop, list): try: - logger.info(f"Dropping following columns: {self.cols_to_drop}...") + self.logger.info( + f"Dropping following columns: {self.cols_to_drop}..." + ) df.drop(columns=self.cols_to_drop, inplace=True, errors="raise") except KeyError: - logger.exception( + self.logger.exception( f"""Column(s): {self.cols_to_drop} don't exist in the DataFrame. No columns were dropped. Returning full DataFrame...""" ) - logger.info(f"Existing columns: {df.columns}") + self.logger.info(f"Existing columns: {df.columns}") else: msg = "Provide columns to drop in a List." raise TypeError(msg) if df.empty: - logger.error("No data for this date range") + self._handle_if_empty(if_empty=if_empty) return df From 0818597ef6536a736dacb244e42986a9857f8f27 Mon Sep 17 00:00:00 2001 From: rziemianek Date: Wed, 2 Oct 2024 15:47:33 +0200 Subject: [PATCH 17/17] =?UTF-8?q?=F0=9F=94=A5=20Removed=20logging?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/viadot/sources/vid_club.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/viadot/sources/vid_club.py b/src/viadot/sources/vid_club.py index 130cfb757..db0adca08 100644 --- a/src/viadot/sources/vid_club.py +++ b/src/viadot/sources/vid_club.py @@ -1,7 +1,6 @@ """Vid Club Cloud API connector.""" from datetime import datetime, timedelta -import logging from typing import Any, Literal import pandas as pd @@ -11,11 +10,6 @@ from viadot.utils import handle_api_response -# Configure logging -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - - class VidClub(Source): """A class implementing the Vid Club API.