Skip to content

Commit

Permalink
return result with kwargs in async unsplash request function
Browse files Browse the repository at this point in the history
  • Loading branch information
m-p-esser committed Oct 7, 2023
1 parent 7647c1c commit 46b164e
Showing 1 changed file with 39 additions and 10 deletions.
49 changes: 39 additions & 10 deletions src/prefect/generic_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from prefect import get_run_logger, task
from prefect.blocks.system import Secret
from prefect.tasks import task_input_hash
from src.etl.load import upload_blob_from_file
from src.etl.load import upload_blob_from_file, upload_blob_from_memory
from src.utils import timer


Expand Down Expand Up @@ -166,8 +166,8 @@ def request_unsplash_napi(


@task(
retries=3,
retry_delay_seconds=3,
# retries=3,
# retry_delay_seconds=3,
cache_key_fn=task_input_hash,
cache_expiration=datetime.timedelta(hours=1),
)
Expand All @@ -177,24 +177,53 @@ async def request_unsplash_napi_async(
headers: dict = None,
params: dict = None,
base_url="https://unsplash.com/napi",
**kwargs,
):
logger = get_run_logger()

async with httpx.AsyncClient(proxies=proxies, verify=False) as client:
URI = base_url + endpoint

# sleep_seconds = random.randint(1, 3)
# logger.info(f"Sleeping for {sleep_seconds} seconds...")
# await asyncio.sleep(sleep_seconds)

logger.info(f"Requesting URI: {URI}")
response = await client.get(url=URI, params=params, headers=headers)

# logger.info(f"Request headers: \n {pformat(dict(response.request.headers))}")
# logger.info(f"Response headers: \n {pformat(dict(response.headers))}")
response.raise_for_status()

return response
if len(kwargs) > 0:
kwargs["response"] = response
return kwargs

else:
return response


@task(
retries=3,
retry_delay_seconds=3,
cache_key_fn=task_input_hash,
cache_expiration=datetime.timedelta(hours=1),
)
async def upload_file_to_gcs_async(
gcp_credential_block_name: str,
bucket_name: str,
contents,
file_name: str,
file_extension: str,
folder: str = None,
):
logger = get_run_logger()

if folder is None:
blob_name = f"{file_name}.{file_extension}"
else:
blob_name = f"{folder}/{file_name}.{file_extension}"
blob = await upload_blob_from_memory(
bucket_name, contents, blob_name, gcp_credential_block_name
)

logger.info(f"Uploaded {blob}: {blob_name} to {bucket_name}")

return blob


@task(retries=3, retry_delay_seconds=10)
Expand Down

0 comments on commit 46b164e

Please sign in to comment.