From 46b164e107fecb1faaa6b0db265791cbac66eacc Mon Sep 17 00:00:00 2001 From: Marc-Philipp Esser Date: Sat, 7 Oct 2023 17:04:09 +0200 Subject: [PATCH] return result with kwargs in async unsplash request function --- src/prefect/generic_tasks.py | 49 ++++++++++++++++++++++++++++-------- 1 file changed, 39 insertions(+), 10 deletions(-) diff --git a/src/prefect/generic_tasks.py b/src/prefect/generic_tasks.py index 3594ef4..db19e18 100644 --- a/src/prefect/generic_tasks.py +++ b/src/prefect/generic_tasks.py @@ -16,7 +16,7 @@ from prefect import get_run_logger, task from prefect.blocks.system import Secret from prefect.tasks import task_input_hash -from src.etl.load import upload_blob_from_file +from src.etl.load import upload_blob_from_file, upload_blob_from_memory from src.utils import timer @@ -166,8 +166,8 @@ def request_unsplash_napi( @task( - retries=3, - retry_delay_seconds=3, + # retries=3, + # retry_delay_seconds=3, cache_key_fn=task_input_hash, cache_expiration=datetime.timedelta(hours=1), ) @@ -177,24 +177,53 @@ async def request_unsplash_napi_async( headers: dict = None, params: dict = None, base_url="https://unsplash.com/napi", + **kwargs, ): logger = get_run_logger() async with httpx.AsyncClient(proxies=proxies, verify=False) as client: URI = base_url + endpoint - # sleep_seconds = random.randint(1, 3) - # logger.info(f"Sleeping for {sleep_seconds} seconds...") - # await asyncio.sleep(sleep_seconds) - logger.info(f"Requesting URI: {URI}") response = await client.get(url=URI, params=params, headers=headers) - # logger.info(f"Request headers: \n {pformat(dict(response.request.headers))}") - # logger.info(f"Response headers: \n {pformat(dict(response.headers))}") response.raise_for_status() - return response + if len(kwargs) > 0: + kwargs["response"] = response + return kwargs + + else: + return response + + +@task( + retries=3, + retry_delay_seconds=3, + cache_key_fn=task_input_hash, + cache_expiration=datetime.timedelta(hours=1), +) +async def upload_file_to_gcs_async( + gcp_credential_block_name: str, + bucket_name: str, + contents, + file_name: str, + file_extension: str, + folder: str = None, +): + logger = get_run_logger() + + if folder is None: + blob_name = f"{file_name}.{file_extension}" + else: + blob_name = f"{folder}/{file_name}.{file_extension}" + blob = await upload_blob_from_memory( + bucket_name, contents, blob_name, gcp_credential_block_name + ) + + logger.info(f"Uploaded {blob}: {blob_name} to {bucket_name}") + + return blob @task(retries=3, retry_delay_seconds=10)