Skip to content

Commit

Permalink
🐛 fixed unmatching reports.
Browse files Browse the repository at this point in the history
  • Loading branch information
Diego-H-S authored and trymzet committed Sep 30, 2024
1 parent 66db9e7 commit 0a08c9c
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 11 deletions.
7 changes: 4 additions & 3 deletions src/viadot/orchestration/prefect/flows/genesys_to_adls.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def genesys_to_adls( # noqa: PLR0913
view_type: str | None = None,
view_type_time_sleep: int | None = None,
post_data_list: list[dict[str, Any]] | None = None,
time_between_api_call: float = 0.5,
normalization_sep: str = ".",
drop_duplicates: bool = False,
validate_df_dict: dict[str, Any] | None = None,
Expand Down Expand Up @@ -58,6 +59,8 @@ def genesys_to_adls( # noqa: PLR0913
from Genesys Cloud API. Defaults to None.
post_data_list (Optional[List[Dict[str, Any]]], optional): List of string
templates to generate json body in POST calls to the API. Defaults to None.
time_between_api_call (int, optional): The time, in seconds, to sleep the call
to the API. Defaults to 0.5.
normalization_sep (str, optional): Nested records will generate names separated
by sep. Defaults to ".".
drop_duplicates (bool, optional): Remove duplicates from the DataFrame.
Expand Down Expand Up @@ -96,14 +99,12 @@ def genesys_to_adls( # noqa: PLR0913
view_type=view_type,
view_type_time_sleep=view_type_time_sleep,
post_data_list=post_data_list,
time_between_api_call=time_between_api_call,
normalization_sep=normalization_sep,
drop_duplicates=drop_duplicates,
validate_df_dict=validate_df_dict,
)

# ???
time.sleep(0.5)

return df_to_adls(
df=data_frame,
path=adls_path,
Expand Down
4 changes: 4 additions & 0 deletions src/viadot/orchestration/prefect/tasks/genesys.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def genesys_to_df( # noqa: PLR0913
view_type: str | None = None,
view_type_time_sleep: int | None = None,
post_data_list: list[dict[str, Any]] | None = None,
time_between_api_call: float = 0.5,
normalization_sep: str = ".",
drop_duplicates: bool = False,
validate_df_dict: dict[str, Any] | None = None,
Expand All @@ -48,6 +49,8 @@ def genesys_to_df( # noqa: PLR0913
from Genesys Cloud API. Defaults to None.
post_data_list (Optional[List[Dict[str, Any]]], optional): List of string
templates to generate json body in POST calls to the API. Defaults to None.
time_between_api_call (int, optional): The time, in seconds, to sleep the call
to the API. Defaults to 0.5.
normalization_sep (str, optional): Nested records will generate names separated
by sep. Defaults to ".".
drop_duplicates (bool, optional): Remove duplicates from the DataFrame.
Expand Down Expand Up @@ -98,6 +101,7 @@ def genesys_to_df( # noqa: PLR0913
view_type=view_type,
view_type_time_sleep=view_type_time_sleep,
post_data_list=post_data_list,
time_between_api_call=time_between_api_call,
normalization_sep=normalization_sep,
)
logger.info("running `to_df` method:\n")
Expand Down
31 changes: 23 additions & 8 deletions src/viadot/sources/genesys.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@

import asyncio
import base64
from io import StringIO
import json
import time
from io import StringIO
from typing import Any

import aiohttp
from aiolimiter import AsyncLimiter
import numpy as np
import pandas as pd
from aiolimiter import AsyncLimiter
from pydantic import BaseModel

from viadot.config import get_source_credentials
Expand Down Expand Up @@ -175,7 +175,7 @@ def _api_call(
post_data_list: list[str],
method: str,
params: dict[str, Any] | None = None,
sleep_time: float = 0.5,
time_between_api_call: float = 0.5,
) -> dict[str, Any]:
"""General method to connect to Genesys Cloud API and generate the response.
Expand All @@ -185,8 +185,8 @@ def _api_call(
method (str): Type of connection to the API. Defaults to "POST".
params (Optional[Dict[str, Any]], optional): Parameters to be passed into
the POST call. Defaults to None.
sleep_time (int, optional): The time, in seconds, to sleep the call to the
API. Defaults to 0.5.
time_between_api_call (int, optional): The time, in seconds, to sleep the
call to the API. Defaults to 0.5.
Raises:
RuntimeError: There is no current event loop in asyncio thread.
Expand Down Expand Up @@ -238,7 +238,7 @@ async def generate_post():

semaphore.release()

await asyncio.sleep(sleep_time)
await asyncio.sleep(time_between_api_call)

try:
loop = asyncio.get_event_loop()
Expand Down Expand Up @@ -384,7 +384,9 @@ def _download_report(

return dataframe

def _merge_conversations(self, data_to_merge: list) -> pd.DataFrame: # noqa: C901, PLR0912
def _merge_conversations(
self, data_to_merge: list
) -> pd.DataFrame: # noqa: C901, PLR0912
"""Merge all the conversations data into a single data frame.
Args:
Expand Down Expand Up @@ -593,6 +595,7 @@ def api_connection( # noqa: PLR0912, PLR0915, C901.
view_type: str | None = None,
view_type_time_sleep: int = 10,
post_data_list: list[dict[str, Any]] | None = None,
time_between_api_call: float = 0.5,
normalization_sep: str = ".",
) -> None:
"""General method to connect to Genesys Cloud API and generate the response.
Expand All @@ -614,6 +617,8 @@ def api_connection( # noqa: PLR0912, PLR0915, C901.
post_data_list (Optional[List[Dict[str, Any]]], optional): List of string
templates to generate json body in POST calls to the API.
Defaults to None.
time_between_api_call (int, optional): The time, in seconds, to sleep the
call to the API. Defaults to 0.5.
normalization_sep (str, optional): Nested records will generate names
separated by sep. Defaults to ".".
Expand All @@ -632,6 +637,7 @@ def api_connection( # noqa: PLR0912, PLR0915, C901.
self._api_call(
endpoint=endpoint,
post_data_list=post_data_list,
time_between_api_call=time_between_api_call,
method="POST",
)

Expand All @@ -645,8 +651,14 @@ def api_connection( # noqa: PLR0912, PLR0915, C901.
request_json = self._load_reporting_exports()
entities = request_json["entities"]

if isinstance(entities, list) and len(entities) == len(post_data_list):
if isinstance(entities, list):
ids, urls = self._get_reporting_exports_url(entities)
if len(entities) != len(post_data_list):
self.logger.warning(
f"There are {len(entities)} available reports in Genesys, "
f"and where sent {len(post_data_list)} reports. "
"Unsed reports will be removed."
)
else:
APIError(
"There are no reports to be downloaded."
Expand Down Expand Up @@ -703,6 +715,7 @@ def api_connection( # noqa: PLR0912, PLR0915, C901.
report = self._api_call(
endpoint=endpoint,
post_data_list=post_data_list,
time_between_api_call=time_between_api_call,
method="POST",
)

Expand Down Expand Up @@ -742,6 +755,7 @@ def api_connection( # noqa: PLR0912, PLR0915, C901.
response = self._api_call(
endpoint=endpoint,
post_data_list=post_data_list,
time_between_api_call=time_between_api_call,
method="GET",
params=params,
)
Expand Down Expand Up @@ -773,6 +787,7 @@ def api_connection( # noqa: PLR0912, PLR0915, C901.
endpoint=f"routing/queues/{qid}/members",
params={"pageSize": 100, "pageNumber": page},
post_data_list=post_data_list,
time_between_api_call=time_between_api_call,
method="GET",
)

Expand Down

0 comments on commit 0a08c9c

Please sign in to comment.