Skip to content

Commit

Permalink
handle get_storage_block
Browse files Browse the repository at this point in the history
  • Loading branch information
rphamle committed Nov 21, 2024
1 parent d90ee62 commit e55ecb1
Showing 1 changed file with 13 additions and 2 deletions.
15 changes: 13 additions & 2 deletions block_cascade/prefect/v2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ async def _fetch_block(block_id: str) -> Optional[BlockDocument]:
async with get_client() as client:
return await client.read_block_document(block_id)

async def _fetch_block_by_name(block_name: str, block_type_slug: str = "gcs-bucket") -> Optional[BlockDocument]:
async with get_client() as client:
return await client.read_block_document_by_name(
name=block_name,
block_type_slug=block_type_slug,
)

def get_from_prefect_context(attr: str, default: str = "") -> str:
flow_context = FlowRunContext.get()
Expand Down Expand Up @@ -80,8 +86,13 @@ def get_storage_block() -> Optional[BlockDocument]:

global _CACHED_STORAGE # noqa: PLW0603
if not _CACHED_STORAGE:
_CACHED_STORAGE = run_async(
_fetch_block(current_deployment.storage_document_id)
if current_deployment.pull_steps:
_CACHED_STORAGE = run_async(
_fetch_block_by_name(block_name=current_deployment.pull_steps[0]["prefect.deployments.steps.pull_with_block"]["block_document_name"])
)
else:
_CACHED_STORAGE = run_async(
_fetch_block(block_id=current_deployment.storage_document_id)
)
return _CACHED_STORAGE

Expand Down

0 comments on commit e55ecb1

Please sign in to comment.