From c6522346750b9539420618a34f93fdd088f07c30 Mon Sep 17 00:00:00 2001 From: Paramjeet kaur Date: Thu, 11 Apr 2024 17:52:36 +0530 Subject: [PATCH] Remove unused functions --- proxy/main.py | 269 +---------------------- proxy/service.py | 180 +--------------- tests.py | 60 ------ tests/test_main.py | 484 ------------------------------------------ tests/test_service.py | 311 --------------------------- 5 files changed, 4 insertions(+), 1300 deletions(-) diff --git a/proxy/main.py b/proxy/main.py index 3f28912..0ac8fe4 100644 --- a/proxy/main.py +++ b/proxy/main.py @@ -10,26 +10,17 @@ from proxy.service import ( get_airbyte_server_block_id, create_airbyte_server_block, - get_airbyte_connection_block_id, - get_airbyte_connection_block, - create_airbyte_connection_block, - get_dbtcore_block_id, create_dbt_core_block, - put_deployment, put_deployment_v1, update_postgres_credentials, update_bigquery_credentials, update_target_configs_schema, - get_shell_block_id, - create_shell_block, - post_deployment, post_deployment_v1, get_flow_runs_by_deployment_id, get_deployments_by_filter, get_flow_run_logs, post_deployment_flow_run, get_flow_runs_by_name, - post_filter_blocks, set_deployment_schedule, get_deployment, get_flow_run, @@ -37,31 +28,23 @@ _create_dbt_cli_profile, update_dbt_cli_profile, get_dbt_cli_profile, - get_secret_block_document, ) from proxy.schemas import ( AirbyteServerCreate, - AirbyteConnectionCreate, - DeploymentUpdate, - PrefectShellSetup, DbtCoreCreate, DbtCoreCredentialUpdate, DbtCoreSchemaUpdate, - RunFlow, RunDbtCoreOperation, RunShellOperation, - DeploymentCreate, DeploymentCreate2, DeploymentFetch, FlowRunRequest, - PrefectBlocksDelete, - AirbyteConnectionBlocksFetch, PrefectSecretBlockCreate, DbtCliProfileBlockCreate, DeploymentUpdate2, DbtCliProfileBlockUpdate, ) -from proxy.flows import run_airbyte_connection_flow, run_dbtcore_flow +from proxy.flows import run_airbyte_connection_flow from proxy.prefect_flows import run_shell_operation_flow, run_dbtcore_flow_v1 @@ -114,32 +97,6 @@ def airbytesync(block_name: str, flow_name: str, flow_run_name: str): raise -def dbtrun(block_name: str, flow_name: str, flow_run_name: str): - """Run a dbt core flow""" - if not isinstance(block_name, str): - raise TypeError("block_name must be a string") - if not isinstance(flow_name, str): - raise TypeError("flow_name must be a string") - if not isinstance(flow_run_name, str): - raise TypeError("flow_run_name must be a string") - - logger.info("dbtrun %s %s %s", block_name, flow_name, flow_run_name) - flow = run_dbtcore_flow - if flow_name: - flow = flow.with_options(name=flow_name) - if flow_run_name: - flow = flow.with_options(flow_run_name=flow_run_name) - - try: - result = flow(block_name) - return result - except Exception as error: - logger.exception(error) - raise HTTPException( - status_code=400, detail="failed to run dbt core flow" - ) from error - - def dbtrun_v1(task_config: RunDbtCoreOperation): """Run a dbt core flow""" @@ -181,50 +138,6 @@ def shelloprun(task_config: RunShellOperation): ) from error -# ============================================================================= -@app.post("/proxy/blocks/bulk/delete/") -async def post_bulk_delete_blocks(request: Request, payload: PrefectBlocksDelete): - """Delete all airbyte connection blocks in the payload array""" - root = os.getenv("PREFECT_API_URL") - deleted_blockids = [] - for block_id in payload.block_ids: - logger.info("deleting block_id : %s ", block_id) - res = requests.delete(f"{root}/block_documents/{block_id}", timeout=10) - try: - res.raise_for_status() - except requests.exceptions.HTTPError as error: - logger.error( - "something went wrong deleting block_id %s: %s", block_id, res.text - ) - logger.exception(error) - continue - logger.info("deleted block with block_id : %s", block_id) - deleted_blockids.append(block_id) - - return {"deleted_blockids": deleted_blockids} - - -# ============================================================================= -@app.post("/proxy/blocks/airbyte/connection/filter") -def post_airbyte_connection_blocks( - request: Request, payload: AirbyteConnectionBlocksFetch -): - """Filter the prefect blocks with parameters from payload""" - blocks = post_filter_blocks(payload.block_names) - - # we only care about the block name and its connection id - response = [] - for blk in blocks: - connection_id = "" - if "data" in blk and "connection_id" in blk["data"]: - connection_id = blk["data"]["connection_id"] - response.append( - {"name": blk["name"], "connectionId": connection_id, "id": blk["id"]} - ) - - return response - - # ============================================================================= @app.get("/proxy/blocks/airbyte/server/{blockname}") async def get_airbyte_server(request: Request, blockname: str): @@ -268,94 +181,6 @@ async def post_airbyte_server(request: Request, payload: AirbyteServerCreate): # ============================================================================= -@app.get("/proxy/blocks/airbyte/connection/byblockname/{blockname}") -async def get_airbyte_connection_by_blockname(request: Request, blockname): - """look up airbyte connection block by name and return block_id""" - block_id = await get_airbyte_connection_block_id(blockname) - if block_id is None: - logger.error("no airbyte connection block having name %s", blockname) - raise HTTPException(status_code=400, detail="no block having name " + blockname) - logger.info("blockname => blockid : %s => %s", blockname, block_id) - return {"block_id": block_id} - - -@app.get("/proxy/blocks/airbyte/connection/byblockid/{blockid}") -async def get_airbyte_connection_by_blockid(request: Request, blockid): - """look up airbyte connection block by id and return block data""" - block = await get_airbyte_connection_block(blockid) - if block is None: - logger.error("no airbyte connection block having id %s", blockid) - raise HTTPException(status_code=400, detail="no block having id " + blockid) - logger.info("Found airbyte connection block by id: %s", block) - return block - - -@app.post("/proxy/blocks/airbyte/connection/") -async def post_airbyte_connection(request: Request, payload: AirbyteConnectionCreate): - """ - create a new airbyte connection block with this block name, - raise an exception if the name is already in use - """ - try: - block_id = await create_airbyte_connection_block(payload) - except Exception as error: - logger.exception(error) - raise HTTPException( - status_code=400, detail="failed to create airbyte connection block" - ) from error - logger.info("Created new airbyte connection block with ID: %s", block_id) - return {"block_id": block_id} - - -# ============================================================================= -@app.get("/proxy/blocks/shell/{blockname}") -async def get_shell(request: Request, blockname): - """look up a shell operation block by name and return block_id""" - if not isinstance(blockname, str): - raise TypeError("blockname must be a string") - block_id = await get_shell_block_id(blockname) - if block_id is None: - logger.error("no shell block having name %s", blockname) - raise HTTPException(status_code=400, detail="no block having name " + blockname) - logger.info("blockname => blockid : %s => %s", blockname, block_id) - return {"block_id": block_id} - - -@app.post("/proxy/blocks/shell/") -async def post_shell(request: Request, payload: PrefectShellSetup): - """ - create a new shell block with this block name, - raise an exception if the name is already in use - """ - if not isinstance(payload, PrefectShellSetup): - raise TypeError("payload is invalid") - logger.info(payload) - try: - block_id, cleaned_blockname = await create_shell_block(payload) - except Exception as error: - logger.exception(error) - raise HTTPException( - status_code=400, detail="failed to create shell block" - ) from error - logger.info("Created new shell block with ID: %s", block_id) - return {"block_id": block_id, "block_name": cleaned_blockname} - - -# ============================================================================= -@app.get("/proxy/blocks/dbtcore/{blockname}") -async def get_dbtcore(request: Request, blockname): - """look up a dbt core operation block by name and return block_id""" - if not isinstance(blockname, str): - raise TypeError("blockname must be a string") - - block_id = await get_dbtcore_block_id(blockname) - if block_id is None: - logger.error("no dbt core block having name %s", blockname) - raise HTTPException(status_code=400, detail="no block having name " + blockname) - logger.info("blockname => blockid : %s => %s", blockname, block_id) - return {"block_id": block_id} - - @app.post("/proxy/blocks/dbtcore/") async def post_dbtcore(request: Request, payload: DbtCoreCreate): """ @@ -515,24 +340,6 @@ async def post_secret_block(request: Request, payload: PrefectSecretBlockCreate) return {"block_id": block_id, "block_name": cleaned_blockname} -@app.get("/proxy/blocks/secret/{blockname}") -async def get_secret_block(request: Request, blockname: str): - """fetch the secret block documents""" - try: - block_id, block_name = await get_secret_block_document(blockname) - except Exception as error: - logger.exception(error) - raise HTTPException( - status_code=400, detail="failed to fetch secret block document" - ) from error - logger.info( - "Fetched the secret block with ID: %s and name: %s", - block_id, - block_name, - ) - return {"block_id": block_id, "block_name": block_name} - - # ============================================================================= @app.delete("/delete-a-block/{block_id}") async def delete_block(request: Request, block_id): @@ -550,44 +357,6 @@ async def delete_block(request: Request, block_id): # ============================================================================= -@app.post("/proxy/flows/airbyte/connection/sync/") -async def sync_airbyte_connection_flow(request: Request, payload: RunFlow): - """Prefect flow to sync an airbyte connection""" - if not isinstance(payload, RunFlow): - raise TypeError("payload is invalid") - logger.info(payload) - if payload.blockName == "": - logger.error("received empty blockName") - raise HTTPException(status_code=400, detail="received empty blockName") - logger.info("Running airbyte connection sync flow") - try: - result = airbytesync(payload.blockName, payload.flowName, payload.flowRunName) - logger.info(result) - return result - except Exception as error: - logger.exception(error) - raise HTTPException(status_code=400, detail=str(error)) from error - - -@app.post("/proxy/flows/dbtcore/run/") -async def sync_dbtcore_flow(request: Request, payload: RunFlow): - """Prefect flow to run dbt""" - logger.info(payload) - if not isinstance(payload, RunFlow): - raise TypeError("payload is invalid") - if payload.blockName == "": - logger.error("received empty blockName") - raise HTTPException(status_code=400, detail="received empty blockName") - logger.info("running dbtcore-run for dbt-core-op %s", payload.blockName) - try: - result = dbtrun(payload.blockName, payload.flowName, payload.flowRunName) - logger.info(result) - return {"status": "success", "result": result} - except Exception as error: - logger.exception(error) - raise HTTPException(status_code=400, detail=str(error)) from error - - @app.post("/proxy/v1/flows/dbtcore/run/") async def sync_dbtcore_flow_v1(request: Request, payload: RunDbtCoreOperation): """Prefect flow to run dbt""" @@ -622,24 +391,6 @@ async def sync_shellop_flow(request: Request, payload: RunShellOperation): raise HTTPException(status_code=400, detail=str(error)) from error -@app.post("/proxy/deployments/") -async def post_dataflow(request: Request, payload: DeploymentCreate): - """Create a deployment from an existing flow""" - if not isinstance(payload, DeploymentCreate): - raise TypeError("payload is invalid") - - logger.info(payload) - try: - deployment = await post_deployment(payload) - except Exception as error: - logger.exception(error) - raise HTTPException( - status_code=400, detail="failed to create deployment" - ) from error - logger.info("Created new deployment: %s", deployment) - return {"deployment": deployment} - - @app.post("/proxy/v1/deployments/") async def post_dataflow_v1(request: Request, payload: DeploymentCreate2): """Create a deployment from an existing flow""" @@ -658,24 +409,6 @@ async def post_dataflow_v1(request: Request, payload: DeploymentCreate2): return {"deployment": deployment} -@app.put("/proxy/deployments/{deployment_id}") -def put_dataflow(request: Request, deployment_id, payload: DeploymentUpdate): - """updates a deployment""" - if not isinstance(payload, DeploymentUpdate): - raise TypeError("payload is invalid") - - logger.info(payload) - try: - put_deployment(deployment_id, payload) - except Exception as error: - logger.exception(error) - raise HTTPException( - status_code=400, detail="failed to update the deployment" - ) from error - logger.info("Updated the deployment: %s", deployment_id) - return {"success": 1} - - @app.put("/proxy/v1/deployments/{deployment_id}") def put_dataflow_v1(request: Request, deployment_id, payload: DeploymentUpdate2): """updates a deployment""" diff --git a/proxy/service.py b/proxy/service.py index 3883300..4fbadf8 100644 --- a/proxy/service.py +++ b/proxy/service.py @@ -1,6 +1,5 @@ """interface with prefect's python client api""" -import asyncio import os from time import sleep import requests @@ -12,12 +11,12 @@ from prefect.blocks.system import Secret from prefect.blocks.core import Block from prefect.client import get_client -from prefect_airbyte import AirbyteConnection, AirbyteServer +from prefect_airbyte import AirbyteServer from prefect_gcp import GcpCredentials from prefect_dbt.cli.configs import TargetConfigs from prefect_dbt.cli.configs import BigQueryTargetConfigs -from prefect_dbt.cli.commands import DbtCoreOperation, ShellOperation +from prefect_dbt.cli.commands import DbtCoreOperation from prefect_dbt.cli import DbtCliProfile from dotenv import load_dotenv @@ -26,10 +25,7 @@ from proxy.exception import PrefectException from proxy.schemas import ( AirbyteServerCreate, - AirbyteConnectionCreate, - PrefectShellSetup, DbtCoreCreate, - DeploymentCreate, DeploymentCreate2, DeploymentUpdate, PrefectSecretBlockCreate, @@ -37,7 +33,6 @@ DbtCliProfileBlockUpdate, DeploymentUpdate2, ) -from proxy.flows import deployment_schedule_flow_v3 from proxy.prefect_flows import deployment_schedule_flow_v4 load_dotenv() @@ -174,25 +169,6 @@ def _block_name(block: Block) -> str: return str(block.dict()["_block_document_name"]) -# ================================================================================================ -def post_filter_blocks(block_names) -> dict: - """Filter and fetch prefect blocks based on the query parameter""" - try: - query = { - "block_documents": { - "operator": "and_", - "name": {"any_": []}, - } - } - if block_names: - query["block_documents"]["name"]["any_"] = block_names - - return prefect_post("block_documents/filter", query) - except Exception as err: - logger.exception(err) - raise PrefectException("failed to filter blocks") from err - - # ================================================================================================ async def get_airbyte_server_block_id(blockname: str) -> str | None: """look up an airbyte server block by name and return block_id""" @@ -246,75 +222,6 @@ def delete_airbyte_server_block(blockid: str): # ================================================================================================ -async def get_airbyte_connection_block_id(blockname: str) -> str | None: - """look up airbyte connection block by name and return block_id""" - if not isinstance(blockname, str): - raise TypeError("blockname must be a string") - try: - block = await AirbyteConnection.load(blockname) - logger.info( - "found airbyte connection block named %s", - blockname, - ) - return _block_id(block) - except ValueError: - logger.error( - "no airbyte connection block named %s", - blockname, - ) - # pylint: disable=raise-missing-from - raise HTTPException( - status_code=404, detail=f"No airbyte connection block named {blockname}" - ) - - -async def get_airbyte_connection_block(blockid: str) -> dict: - """look up and return block data for an airbyte connection""" - if not isinstance(blockid, str): - raise TypeError("blockid must be a string") - try: - result = prefect_get(f"block_documents/{blockid}") - logger.info("found airbyte connection block having id %s", blockid) - return result - except requests.exceptions.HTTPError: - logger.error("no airbyte connection block having id %s", blockid) - # pylint: disable=raise-missing-from - raise HTTPException( - status_code=404, detail=f"No airbyte connection block having id {blockid}" - ) - - -async def create_airbyte_connection_block( - conninfo: AirbyteConnectionCreate, -) -> str: - """Create airbyte connection block""" - if not isinstance(conninfo, AirbyteConnectionCreate): - raise TypeError("conninfo must be an AirbyteConnectionCreate") - logger.info(conninfo) - try: - serverblock = await AirbyteServer.load(conninfo.serverBlockName) - except ValueError as exc: - logger.exception(exc) - raise PrefectException( - f"could not find Airbyte Server block named {conninfo.serverBlockName}" - ) from exc - - connection_block = AirbyteConnection( - airbyte_server=serverblock, connection_id=conninfo.connectionId, timeout=15 - ) - try: - block_name_for_save = cleaned_name_for_prefectblock( - conninfo.connectionBlockName - ) - await connection_block.save(block_name_for_save) - except Exception as error: - logger.exception(error) - raise PrefectException( - f"failed to create airbyte connection block for connection {conninfo.connectionId}" - ) from error - logger.info("created airbyte connection block %s", conninfo.connectionBlockName) - - return _block_id(connection_block) def update_airbyte_connection_block(blockname: str): @@ -335,38 +242,7 @@ def delete_airbyte_connection_block(blockid: str) -> dict: return prefect_delete(f"block_documents/{blockid}") -# ================================================================================================ -async def get_shell_block_id(blockname: str) -> str | None: - """look up a shell operation block by name and return block_id""" - if not isinstance(blockname, str): - raise TypeError("blockname must be a string") - - try: - block = await ShellOperation.load(blockname) - return _block_id(block) - except ValueError: - # pylint: disable=raise-missing-from - raise HTTPException( - status_code=404, detail=f"No shell operation block named {blockname}" - ) - - -async def create_shell_block(shell: PrefectShellSetup): - """Create a prefect shell block""" - if not isinstance(shell, PrefectShellSetup): - raise TypeError("shell must be a PrefectShellSetup") - shell_operation_block = ShellOperation( - commands=shell.commands, env=shell.env, working_dir=shell.workingDir - ) - try: - block_name_for_save = cleaned_name_for_prefectblock(shell.blockName) - await shell_operation_block.save(block_name_for_save, overwrite=True) - except Exception as error: - logger.exception(error) - raise PrefectException("failed to create shell block") from error - logger.info("created shell operation block %s", shell.blockName) - return _block_id(shell_operation_block), block_name_for_save - +# =============================================================================================== def delete_shell_block(blockid: str) -> dict: """Delete a prefect shell block""" @@ -377,21 +253,6 @@ def delete_shell_block(blockid: str) -> dict: # ================================================================================================ -async def get_dbtcore_block_id(blockname: str) -> str | None: - """look up a dbt core operation block by name and return block_id""" - if not isinstance(blockname, str): - raise TypeError("blockname must be a string") - - try: - block = await DbtCoreOperation.load(blockname) - return _block_id(block) - except ValueError: - # pylint: disable=raise-missing-from - raise HTTPException( - status_code=404, detail=f"No dbt core operation block named {blockname}" - ) - - async def get_dbt_cli_profile(cli_profile_block_name: str) -> dict: """look up a dbt cli profile block by name and return block_id""" if not isinstance(cli_profile_block_name, str): @@ -576,16 +437,6 @@ async def create_secret_block(payload: PrefectSecretBlockCreate): return _block_id(secret_block), cleaned_blockname -async def get_secret_block_document(blockname: str): - """Get a prefect block of type secret""" - try: - secret_block = await Secret.load(blockname) - except Exception as error: - raise PrefectException("Could not fetch the secret block") from error - - return _block_id(secret_block), _block_name(secret_block) - - async def update_postgres_credentials(dbt_blockname, new_extras): """updates the database credentials inside a dbt postgres block""" try: @@ -690,31 +541,6 @@ async def update_target_configs_schema(dbt_blockname: str, target_configs_schema # ================================================================================================ -async def post_deployment(payload: DeploymentCreate) -> dict: - """create a deployment from a flow and a schedule""" - if not isinstance(payload, DeploymentCreate): - raise TypeError("payload must be a DeploymentCreate") - logger.info(payload) - - deployment = await Deployment.build_from_flow( - flow=deployment_schedule_flow_v3.with_options(name=payload.flow_name), - name=payload.deployment_name, - work_queue_name="ddp", - tags=[payload.org_slug], - ) - deployment.parameters = { - "airbyte_blocks": payload.connection_blocks, - "dbt_blocks": payload.dbt_blocks, - } - deployment.schedule = CronSchedule(cron=payload.cron) if payload.cron else None - try: - deployment_id = await deployment.apply() - except Exception as error: - logger.exception(error) - raise PrefectException("failed to create deployment") from error - return {"id": deployment_id, "name": deployment.name} - - async def post_deployment_v1(payload: DeploymentCreate2) -> dict: """ create a deployment from a flow and a schedule diff --git a/tests.py b/tests.py index 4be7d01..d30b69e 100644 --- a/tests.py +++ b/tests.py @@ -14,17 +14,13 @@ PostDeploymentResponse, ) from proxy.service import ( - create_airbyte_connection_block, create_airbyte_server_block, create_dbt_core_block, delete_airbyte_connection_block, delete_airbyte_server_block, delete_dbt_core_block, - get_airbyte_connection_block_id, get_airbyte_server_block_id, - get_dbtcore_block_id, get_flow_runs_by_deployment_id, - post_deployment, ) @@ -63,32 +59,6 @@ async def test_get_airbyte_server_block_id(self): class TestAirbyteConnection: block_id = None - @pytest.mark.asyncio - async def test_create_airbyte_connection_block(self): - payload = { - "serverBlockName": "airbyte1", - "connectionId": "6a791af6-eb58-11ed-a05b-0242ac120009", - "connectionBlockName": "block", - } - try: - validated_payload = AirbyteConnectionCreate(**payload) - except ValidationError as e: - raise ValueError(f"Response validation failed: {e.errors()}") - - try: - res = await create_airbyte_connection_block(validated_payload) - AirbyteConnectionBlockResponse(block_id=res) - TestAirbyteConnection.block_id = res - except ValidationError as e: - raise ValueError(f"Response validation failed: {e.errors()}") - - @pytest.mark.asyncio - async def test_get_airbyte_connection_block_id(self): - try: - res = await get_airbyte_connection_block_id(blockname="blockkkkk") - AirbyteServerBlockResponse(block_id=res) - except ValidationError as e: - raise ValueError(f"Response validation failed: {e.errors()}") def test_delete_airbyte_connection_block(self): try: @@ -135,14 +105,6 @@ async def test_create_dbt_core_block(self): except ValidationError as e: raise ValueError(f"Response validation failed: {e.errors()}") - @pytest.mark.asyncio - async def test_get_dbtcore_block_id(self): - try: - res = await get_dbtcore_block_id(blockname="test") - DbtCoreBlockResponse(block_id=res) - except ValidationError as e: - raise ValueError(f"Response validation failed: {e.errors()}") - def test_delete_dbt_core_block(self): try: delete_dbt_core_block(block_id=TestDbtConnection.block_id) @@ -151,28 +113,6 @@ def test_delete_dbt_core_block(self): class TestFlowDeployment: - @pytest.mark.asyncio - async def test_post_deployment(self): - payload = { - "flow_name": "test_flow", - "deployment_name": "test_deployment", - "org_slug": "test_org", - "connection_blocks": ["blockkkkk", "block2"], - "dbt_blocks": [], - "cron": "0 9 * * *", - } - try: - validated_payload = DeploymentCreate(**payload) - except ValidationError as e: - raise ValueError(f"Payload validation failed: {e.errors()}") - - try: - res = await post_deployment(validated_payload) - res["id"] = str(res["id"]) - PostDeploymentResponse(deployment=res) - TestFlowDeployment.deployment_id = res["id"] - except ValidationError as e: - raise ValueError(f"Response validation failed: {e.errors()}") def test_get_flow_runs_by_deployment_id(self): deployment_id = TestFlowDeployment.deployment_id diff --git a/tests/test_main.py b/tests/test_main.py index 51669ed..1c573b7 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -7,42 +7,28 @@ from proxy.main import ( airbytesync, app, - dbtrun, dbtrun_v1, shelloprun, delete_block, delete_deployment, - get_airbyte_connection_by_blockid, - get_airbyte_connection_by_blockname, get_airbyte_server, - get_dbtcore, get_dbtcli_profile, get_flow_run_logs_paginated, get_flow_runs, get_flowrun, get_read_deployment, - get_shell, - post_airbyte_connection, - post_airbyte_connection_blocks, post_airbyte_server, - post_bulk_delete_blocks, post_create_deployment_flow_run, - post_dataflow, post_dbtcore, post_dbtcli_profile, put_dbtcli_profile, put_dbtcore_postgres, put_dbtcore_bigquery, put_dbtcore_schema, - put_dataflow, get_flow_run_by_id, post_secret_block, - get_secret_block, post_deployment_set_schedule, post_deployments, - post_shell, - sync_airbyte_connection_flow, - sync_dbtcore_flow, sync_shellop_flow, sync_dbtcore_flow_v1, post_dataflow_v1, @@ -155,36 +141,6 @@ def test_airbyte_sync_invalid_flow_run_name(): airbytesync(block_name, flow_name, invalid_flow_run_name) -def test_dbtrun_success(): - block_name = "example_block" - flow_name = "example_flow" - flow_run_name = "example_flow_run" - expected_result = {"result": "example_result"} - - with patch("proxy.main.run_dbtcore_flow") as mock_run_dbtcore_flow: - with patch.object( - mock_run_dbtcore_flow.with_options.return_value, "with_options" - ) as mock_with_options: - mock_with_options.return_value = lambda x: expected_result - result = dbtrun(block_name, flow_name, flow_run_name) - assert result == expected_result - - -def test_dbtrun_failure(): - block_name = "example_block" - flow_name = "example_flow" - flow_run_name = "example_flow_run" - expected_result = {"result": "example_failure_result", "status": "failed"} - - with patch("proxy.main.run_dbtcore_flow") as mock_run_dbtcore_flow: - with patch.object( - mock_run_dbtcore_flow.with_options.return_value, "with_options" - ) as mock_with_options: - mock_with_options.return_value = lambda x: expected_result - result = dbtrun(block_name, flow_name, flow_run_name) - assert result == expected_result - - def test_dbtrun_v1(): """tests dbtrun_v1""" task_config = RunDbtCoreOperation( @@ -255,76 +211,6 @@ def test_airbyte_sync_invalid_payload_type(): shelloprun(task_config) -def test_run_dbt_exception(): - block_name = "example_block" - flow_name = "example_flow" - flow_run_name = "example_flow_run" - expected_result = {"result": "example_failure_result", "status": "failed"} - - def mock_with_options(*args, **kwargs): - raise HTTPException(status_code=400, detail="Job 12345 failed.") - - with patch("proxy.main.run_dbtcore_flow.with_options", new=mock_with_options): - try: - result = dbtrun(block_name, flow_name, flow_run_name) - assert result == expected_result - except HTTPException as e: - assert e.status_code == 400 - assert e.detail == "Job 12345 failed." - - -def test_dbtrun_invalid_block_name(): - block_name = None - flow_name = "example_flow" - flow_run_name = "example_flow_run" - - with pytest.raises(TypeError): - dbtrun(block_name, flow_name, flow_run_name) - - -def test_dbtrun_invalid_flow_name(): - block_name = "example_block" - flow_name = None - flow_run_name = "example_flow_run" - - with pytest.raises(TypeError): - dbtrun(block_name, flow_name, flow_run_name) - - -def test_dbtrun_invalid_flow_run_name(): - block_name = "example_block" - flow_name = "example_flow" - flow_run_name = None - - with pytest.raises(TypeError): - dbtrun(block_name, flow_name, flow_run_name) - - -@pytest.mark.asyncio -async def test_post_bulk_delete_blocks_success(): - payload = PrefectBlocksDelete(block_ids=["12345", "67890"]) - request = client.request("POST", "/") - with patch("proxy.main.requests.delete") as mock_delete: - mock_delete.return_value.raise_for_status.return_value = None - response = await post_bulk_delete_blocks(request, payload) - assert response == {"deleted_blockids": ["12345", "67890"]} - - -def test_post_airbyte_connection_blocks_success(): - payload = AirbyteConnectionBlocksFetch(block_names=["test_block"]) - request = client.request("POST", "/") - with patch( - "proxy.main.post_filter_blocks", - return_value=[ - {"name": "test_block", "data": {"connection_id": "12345"}, "id": "67890"} - ], - ): - response = post_airbyte_connection_blocks(request, payload) - assert response == [ - {"name": "test_block", "connectionId": "12345", "id": "67890"} - ] - - @pytest.mark.asyncio async def test_get_airbyte_server_success(): request = client.request("POST", "/") @@ -395,207 +281,6 @@ async def test_post_airbyte_server_with_invalid_payload(): assert excinfo.value.args[0] == "payload is invalid" -@pytest.mark.asyncio -async def test_get_airbyte_connection_by_blockname_success(): - request = client.request("POST", "/") - with patch("proxy.main.get_airbyte_connection_block_id", return_value="12345"): - response = await get_airbyte_connection_by_blockname(request, "test_block") - assert response == {"block_id": "12345"} - - -@pytest.mark.asyncio -async def test_get_airbyte_connection_by_blockname_failure(): - request = client.request("POST", "/") - with patch("proxy.main.get_airbyte_connection_block_id", return_value=None): - with pytest.raises(HTTPException) as excinfo: - await get_airbyte_connection_by_blockname(request, "test_block") - assert excinfo.value.status_code == 400 - assert excinfo.value.detail == "no block having name test_block" - - -@pytest.mark.asyncio -async def test_get_airbyte_connection_by_blockname_invalid_blockname(): - request = client.request("POST", "/") - with patch( - "proxy.main.get_airbyte_connection_block_id" - ) as get_airbyte_connection_block_id_mock: - get_airbyte_connection_block_id_mock.return_value = None - blockname = "invalid_blockname" - with pytest.raises(HTTPException) as exc_info: - await get_airbyte_connection_by_blockname(request, blockname) - assert exc_info.value.status_code == 400 - assert exc_info.value.detail == "no block having name " + blockname - - -@pytest.mark.asyncio -async def test_get_airbyte_connection_by_blockid_success(): - block_data = {"id": "12345", "name": "test_block", "url": "http://test-block.com"} - request = client.request("POST", "/") - with patch("proxy.main.get_airbyte_connection_block", return_value=block_data): - response = await get_airbyte_connection_by_blockid(request, "12345") - assert response == block_data - - -@pytest.mark.asyncio -async def test_get_airbyte_connection_by_blockid_failure(): - request = client.request("POST", "/") - with patch("proxy.main.get_airbyte_connection_block", return_value=None): - with pytest.raises(HTTPException) as excinfo: - await get_airbyte_connection_by_blockid(request, "12345") - assert excinfo.value.status_code == 400 - assert excinfo.value.detail == "no block having id 12345" - - -@pytest.mark.asyncio -async def test_get_airbyte_connection_by_blockid_invalid_blockid(): - request = client.request("POST", "/") - with patch( - "proxy.main.get_airbyte_connection_block" - ) as get_airbyte_connection_block_mock: - get_airbyte_connection_block_mock.return_value = None - blockid = "invalid_blockid" - with pytest.raises(HTTPException) as exc_info: - await get_airbyte_connection_by_blockid(request, blockid) - assert exc_info.value.status_code == 400 - assert exc_info.value.detail == "no block having id " + blockid - - -@pytest.mark.asyncio -async def test_post_airbyte_connection_success(): - payload = AirbyteConnectionCreate( - serverBlockName="testserver", - connectionId="12345", - connectionBlockName="test_connection", - ) - request = client.request("POST", "/") - with patch("proxy.main.create_airbyte_connection_block", return_value="12345"): - response = await post_airbyte_connection(request, payload) - assert response == {"block_id": "12345"} - - -@pytest.mark.asyncio -async def test_post_airbyte_connection_failure(): - request = client.request("POST", "/") - payload = AirbyteConnectionCreate( - serverBlockName="testserver", - connectionId="12345", - connectionBlockName="test_connection", - ) - with patch( - "proxy.main.create_airbyte_connection_block", - side_effect=Exception("test error"), - ): - with pytest.raises(HTTPException) as excinfo: - await post_airbyte_connection(request, payload) - assert excinfo.value.status_code == 400 - assert excinfo.value.detail == "failed to create airbyte connection block" - - -@pytest.mark.asyncio -async def test_post_airbyte_connection_with_invalid_payload(): - request = client.request("POST", "/") - payload = AirbyteConnectionCreate( - serverBlockName="testserver", - connectionId="12345", - connectionBlockName="test_connection", - ) - with pytest.raises(HTTPException) as excinfo: - await post_airbyte_connection(request, payload) - assert excinfo.value.status_code == 400 - assert excinfo.value.detail == "failed to create airbyte connection block" - - -@pytest.mark.asyncio -async def test_get_shell_success(): - request = client.request("POST", "/") - with patch("proxy.main.get_shell_block_id", return_value="12345"): - response = await get_shell(request, "test_block") - assert response == {"block_id": "12345"} - - -@pytest.mark.asyncio -async def test_get_shell_failure(): - request = client.request("POST", "/") - with patch("proxy.main.get_shell_block_id", return_value=None): - with pytest.raises(HTTPException) as excinfo: - await get_shell(request, "test_block") - assert excinfo.value.status_code == 400 - assert excinfo.value.detail == "no block having name test_block" - - -@pytest.mark.asyncio -async def test_get_shell_invalid_blockname(): - request = client.request("POST", "/") - with pytest.raises(TypeError) as excinfo: - await get_shell(request, None) - assert excinfo.value.args[0] == "blockname must be a string" - - -@pytest.mark.asyncio -async def test_post_shell_success(): - request = client.request("POST", "/") - payload = PrefectShellSetup( - blockName="test_shell", - commands=['echo "Hello, World!"'], - workingDir="test_dir", - env={"TEST_ENV": "test_value"}, - ) - with patch("proxy.main.create_shell_block", return_value=("12345", "test_shell")): - response = await post_shell(request, payload) - assert response == {"block_id": "12345", "block_name": "test_shell"} - - -@pytest.mark.asyncio -async def test_post_shell_failure(): - payload = PrefectShellSetup( - blockName="test_shell", - commands=['echo "Hello, World!"'], - workingDir="test_dir", - env={"TEST_ENV": "test_value"}, - ) - request = client.request("POST", "/") - with patch("proxy.main.create_shell_block", side_effect=Exception("test error")): - with pytest.raises(HTTPException) as excinfo: - await post_shell(request, payload) - assert excinfo.value.status_code == 400 - assert excinfo.value.detail == "failed to create shell block" - - -@pytest.mark.asyncio -async def test_post_shell_invalid_payload(): - payload = None - request = client.request("POST", "/") - with pytest.raises(TypeError) as excinfo: - await post_shell(request, payload) - assert excinfo.value.args[0] == "payload is invalid" - - -@pytest.mark.asyncio -async def test_get_dbtcore_success(): - request = client.request("POST", "/") - with patch("proxy.main.get_dbtcore_block_id", return_value="12345"): - response = await get_dbtcore(request, "test_block") - assert response == {"block_id": "12345"} - - -@pytest.mark.asyncio -async def test_get_dbtcore_failure(): - request = client.request("POST", "/") - with patch("proxy.main.get_dbtcore_block_id", return_value=None): - with pytest.raises(HTTPException) as excinfo: - await get_dbtcore(request, "test_block") - assert excinfo.value.status_code == 400 - assert excinfo.value.detail == "no block having name test_block" - - -@pytest.mark.asyncio -async def test_get_dbtcore_invalid_blockname(): - request = client.request("POST", "/") - with pytest.raises(TypeError) as excinfo: - await get_dbtcore(request, None) - assert excinfo.value.args[0] == "blockname must be a string" - - @pytest.mark.asyncio async def test_post_dbtcore_success(): request = client.request("POST", "/") @@ -887,27 +572,6 @@ async def test_post_secret_block_success(mock_create: AsyncMock): assert response == {"block_id": "block_id", "block_name": "cleaned_blockname"} -@pytest.mark.asyncio -@patch("proxy.main.get_secret_block_document") -async def test_get_secret_block_success(mock_get_secret_block_document: AsyncMock): - request = Mock() - - mock_get_secret_block_document.return_value = "block_id", "block_name" - response = await get_secret_block(request, "block-name") - assert response == {"block_id": "block_id", "block_name": "block_name"} - - -@pytest.mark.asyncio -@patch("proxy.main.get_secret_block_document") -async def test_get_secret_block_failure(mock_get_secret_block_document: AsyncMock): - request = Mock() - - mock_get_secret_block_document.side_effect = Exception() - with pytest.raises(HTTPException) as excinfo: - await get_secret_block(request, "block-name") - assert excinfo.value.detail == "failed to fetch secret block document" - - @pytest.mark.asyncio async def test_delete_block_success(): request = client.request("POST", "/") @@ -937,66 +601,6 @@ async def test_delete_block_invalid_blockid(): assert excinfo.value.args[0] == "block_id must be a string" -@pytest.mark.asyncio -async def test_sync_airbyte_connection_flow_success(): - request = client.request("POST", "/") - payload = RunFlow( - blockName="test_block", flowName="test_flow", flowRunName="test_flow_run" - ) - with patch("proxy.main.airbytesync", return_value="test result"): - response = await sync_airbyte_connection_flow(request, payload) - assert response == "test result" - - -@pytest.mark.asyncio -async def test_sync_airbyte_connection_flow_failure(): - payload = RunFlow(blockName="", flowName="test_flow", flowRunName="test_flow_run") - request = client.request("POST", "/") - with pytest.raises(HTTPException) as excinfo: - await sync_airbyte_connection_flow(request, payload) - assert excinfo.value.status_code == 400 - assert excinfo.value.detail == "received empty blockName" - - -@pytest.mark.asyncio -async def test_sync_airbyte_connection_flow_invalid_payload(): - payload = None - request = client.request("POST", "/") - with pytest.raises(TypeError) as excinfo: - await sync_airbyte_connection_flow(request, payload) - assert excinfo.value.args[0] == "payload is invalid" - - -@pytest.mark.asyncio -async def test_sync_dbtcore_flow_success(): - payload = RunFlow( - blockName="test_block", flowName="test_flow", flowRunName="test_flow_run" - ) - request = client.request("POST", "/") - with patch("proxy.main.dbtrun", return_value="test result"): - response = await sync_dbtcore_flow(request, payload) - assert response == {"status": "success", "result": "test result"} - - -@pytest.mark.asyncio -async def test_sync_dbtcore_flow_failure(): - payload = RunFlow(blockName="", flowName="test_flow", flowRunName="test_flow_run") - request = client.request("POST", "/") - with pytest.raises(HTTPException) as excinfo: - await sync_dbtcore_flow(request, payload) - assert excinfo.value.status_code == 400 - assert excinfo.value.detail == "received empty blockName" - - -@pytest.mark.asyncio -async def test_sync_dbtcore_flow_invalid_payload(): - payload = None - request = client.request("POST", "/") - with pytest.raises(TypeError) as excinfo: - await sync_dbtcore_flow(request, payload) - assert excinfo.value.args[0] == "payload is invalid" - - @pytest.mark.asyncio async def test_sync_shellop_flow_success(): payload = RunShellOperation( @@ -1045,77 +649,6 @@ async def test_sync_dbtcore_flow_v1(): assert result == {"status": "success", "result": "test result"} -@pytest.mark.asyncio -async def test_post_dataflow_success(): - payload = DeploymentCreate( - flow_name="test_block", - deployment_name="FULL", - org_slug="test_org", - connection_blocks=[{"name": "test_block"}], - dbt_blocks=[{"name": "test_block"}], - cron="* * * * *", - ) - request = client.request("POST", "/") - with patch("proxy.main.post_deployment", return_value={"id": "67890"}): - response = await post_dataflow(request, payload) - assert response == {"deployment": {"id": "67890"}} - - -@pytest.mark.asyncio -async def test_post_dataflow_failure(): - request = client.request("POST", "/") - payload = DeploymentCreate( - flow_name="test_block", - deployment_name="FULL", - org_slug="test_org", - connection_blocks=[{"name": "test_block"}], - dbt_blocks=[{"name": "test_block"}], - cron="* * * * *", - ) - with patch("proxy.main.post_deployment", side_effect=Exception("test error")): - with pytest.raises(HTTPException) as excinfo: - await post_dataflow(request, payload) - assert excinfo.value.status_code == 400 - assert excinfo.value.detail == "failed to create deployment" - - -@pytest.mark.asyncio -async def test_post_dataflow_invalid_payload(): - payload = None - request = client.request("POST", "/") - with pytest.raises(TypeError) as excinfo: - await post_dataflow(request, payload) - assert excinfo.value.args[0] == "payload is invalid" - - -def test_put_dataflow_badparams(): - request = Mock() - with pytest.raises(TypeError) as excinfo: - put_dataflow(request, "deployment-id", 123) - assert str(excinfo.value) == "payload is invalid" - - -def test_put_dataflow_raises(): - """put_dataflow raises http exception""" - request = Mock() - with patch("proxy.main.put_deployment") as mock_put_dataflow: - mock_put_dataflow.side_effect = Exception() - payload = DeploymentUpdate(connection_blocks=[], dbt_blocks=[]) - with pytest.raises(HTTPException) as excinfo: - put_dataflow(request, "deployment-id", payload) - assert excinfo.value.status_code == 400 - assert excinfo.value.detail == "failed to update the deployment" - - -def test_put_dataflow_success(): - """put_dataflow raises http exception""" - request = Mock() - with patch("proxy.main.put_deployment"): - payload = DeploymentUpdate(connection_blocks=[], dbt_blocks=[]) - result = put_dataflow(request, "deployment-id", payload) - assert result == {"success": 1} - - def test_put_dataflow_v1_raises(): """put_dataflow_v1 raises http exception""" request = Mock() @@ -1137,23 +670,6 @@ def test_put_dataflow_v1_success(): assert result == {"success": 1} -@patch("proxy.main.put_deployment") -def test_put_data_flow_failure(mock_put: Mock): - request = Mock() - payload = DeploymentUpdate(cron="* * * * *") - mock_put.side_effect = Exception("exception") - with pytest.raises(HTTPException) as excinfo: - put_dataflow(request, "deployment-id", payload) - assert excinfo.value.detail == "failed to update the deployment" - - -@patch("proxy.main.put_deployment") -def test_put_data_flow_success(mock_put: Mock): - request = Mock() - payload = DeploymentUpdate(cron="* * * * *") - response = put_dataflow(request, "deployment-id", payload) - assert response == {"success": 1} - def test_get_flow_run_by_id_badparams(): request = Mock() diff --git a/tests/test_service.py b/tests/test_service.py index a0f7929..ab280ec 100644 --- a/tests/test_service.py +++ b/tests/test_service.py @@ -20,24 +20,18 @@ from proxy.service import ( _create_dbt_cli_profile, get_dbt_cli_profile, - create_airbyte_connection_block, create_airbyte_server_block, create_dbt_core_block, - create_shell_block, delete_airbyte_connection_block, delete_airbyte_server_block, delete_dbt_core_block, delete_shell_block, - get_airbyte_connection_block, - get_airbyte_connection_block_id, get_airbyte_server_block_id, - get_dbtcore_block_id, get_deployments_by_filter, get_flow_run, get_flow_run_logs, get_flow_runs_by_deployment_id, get_flow_runs_by_name, - get_shell_block_id, parse_log, prefect_delete, prefect_get, @@ -45,13 +39,11 @@ prefect_patch, set_deployment_schedule, traverse_flow_run_graph, - post_filter_blocks, update_airbyte_server_block, update_airbyte_connection_block, update_postgres_credentials, update_bigquery_credentials, update_target_configs_schema, - post_deployment, put_deployment, get_deployment, CronSchedule, @@ -268,31 +260,6 @@ def test_prefect_delete_success_204(mock_delete): assert response == {} -@patch("proxy.service.prefect_post") -def test_post_filter_blocks_failure(mock_prefect_post): - block_names = ["block_one", "block_two"] - mock_prefect_post.side_effect = PrefectException("failed to filter blocks") - with pytest.raises(PrefectException) as excinfo: - post_filter_blocks(block_names) - - assert str(excinfo.value) == "failed to filter blocks" - - -@patch("proxy.service.prefect_post") -def test_post_filter_blocks_success(mock_prefect_post): - block_names = ["block_one", "block_two"] - post_filter_blocks(block_names) - mock_prefect_post.assert_called_once_with( - "block_documents/filter", - { - "block_documents": { - "operator": "and_", - "name": {"any_": block_names}, - } - }, - ) - - class MockBlock: def dict(self): return {"_block_document_id": "expected_block_id"} @@ -401,79 +368,6 @@ def test_update_airbyte_server_block_not_implemented(): update_airbyte_server_block("blockname") assert str(excinfo.value) == "not implemented" - -# ================================================================================================= - - -@pytest.mark.asyncio -@patch("proxy.service.AirbyteConnection.load", new_callable=AsyncMock) -async def test_get_airbyte_connection_block_id_valid_blockname(mock_load): - class MockBlock: - def dict(self): - return {"_block_document_id": "expected_block_id"} - - mock_load.return_value = MockBlock() - blockname = "valid_blockname" - result = await get_airbyte_connection_block_id(blockname) - assert result == "expected_block_id" - mock_load.assert_called_once_with(blockname) - - -@pytest.mark.asyncio -@patch("proxy.service.AirbyteConnection.load", new_callable=AsyncMock) -async def test_get_airbyte_connection_block_id_invalid_blockname(mock_load): - mock_load.side_effect = ValueError( - "no airbyte connection block named invalid_blockname" - ) - blockname = "invalid_blockname" - with pytest.raises(HTTPException) as excinfo: - await get_airbyte_connection_block_id(blockname) - assert excinfo.value.status_code == 404 - assert excinfo.value.detail == f"No airbyte connection block named {blockname}" - mock_load.assert_called_once_with(blockname) - - -@pytest.mark.asyncio -async def test_get_airbyte_connection_block_id_non_string_blockname(): - blockname = 1234 - with pytest.raises(TypeError) as excinfo: - await get_airbyte_connection_block_id(blockname) - assert str(excinfo.value) == "blockname must be a string" - - -# ================================================================================================= - - -@pytest.mark.asyncio -async def test_get_airbyte_connection_block_id_non_string_blockid(): - blockid = 1234 - with pytest.raises(TypeError) as excinfo: - await get_airbyte_connection_block(blockid) - assert str(excinfo.value) == "blockid must be a string" - - -@pytest.mark.asyncio -@patch("proxy.service.prefect_get") -async def test_get_airbyte_connection_block_valid_blockid(mock_prefect_get): - mock_prefect_get.return_value = {"key": "value"} - blockid = "valid_blockid" - result = await get_airbyte_connection_block(blockid) - assert result == {"key": "value"} - mock_prefect_get.assert_called_once_with(f"block_documents/{blockid}") - - -@pytest.mark.asyncio -@patch("proxy.service.prefect_get") -async def test_get_airbyte_connection_block_invalid_blockid(mock_prefect_get): - mock_prefect_get.side_effect = requests.exceptions.HTTPError() - blockid = "invalid_blockid" - with pytest.raises(HTTPException) as excinfo: - await get_airbyte_connection_block(blockid) - assert excinfo.value.status_code == 404 - assert excinfo.value.detail == f"No airbyte connection block having id {blockid}" - mock_prefect_get.assert_called_once_with(f"block_documents/{blockid}") - - # ================================================================================================= @@ -502,67 +396,6 @@ def dict(self): return {"_block_document_id": "expected_connection_block_id"} -@pytest.mark.asyncio -@patch("proxy.service.AirbyteConnection", new=MockAirbyteConnection) -@patch("proxy.service.AirbyteServer.load", new_callable=AsyncMock) -async def test_create_airbyte_connection_block(mock_load): - mock_load.return_value = MockAirbyteServer(None, None, None) - conninfo = AirbyteConnectionCreate( - serverBlockName="test_server_block", - connectionBlockName="test_connection_block", - connectionId="test_connection_id", - ) - result = await create_airbyte_connection_block(conninfo) - assert result == "expected_connection_block_id" - mock_load.assert_called_once_with("test_server_block") - - -@pytest.mark.asyncio -@patch("proxy.service.AirbyteConnection", new=MockAirbyteConnection) -@patch("proxy.service.AirbyteServer.load", new_callable=AsyncMock) -async def test_create_airbyte_connection_block_save_error(mock_load): - mock_load.return_value = MockAirbyteServer(None, None, None) - conninfo = AirbyteConnectionCreate( - serverBlockName="test_server_block", - connectionBlockName="test_connection_block", - connectionId="test_error_connection_id", - ) - with pytest.raises(PrefectException) as excinfo: - await create_airbyte_connection_block(conninfo) - assert ( - str(excinfo.value) - == f"failed to create airbyte connection block for connection {conninfo.connectionId}" - ) - - -@pytest.mark.asyncio -@patch("proxy.service.AirbyteServer.load", new_callable=AsyncMock) -async def test_create_airbyte_connection_block_invalid_server_block(mock_load): - mock_load.side_effect = ValueError( - "no airbyte server block named invalid_server_block" - ) - conninfo = AirbyteConnectionCreate( - serverBlockName="invalid_server_block", - connectionBlockName="test_connection_block", - connectionId="test_connection_id", - ) - with pytest.raises(PrefectException) as excinfo: - await create_airbyte_connection_block(conninfo) - assert ( - str(excinfo.value) - == "could not find Airbyte Server block named invalid_server_block" - ) - mock_load.assert_called_once_with("invalid_server_block") - - -@pytest.mark.asyncio -async def test_create_airbyte_connection_block_invalid_conninfo(): - conninfo = "invalid_conninfo" - with pytest.raises(TypeError) as excinfo: - await create_airbyte_connection_block(conninfo) - assert str(excinfo.value) == "conninfo must be an AirbyteConnectionCreate" - - # ================================================================================================= # ================================================================================================= def test_update_airbyte_connection_block_must_be_string(): @@ -608,79 +441,6 @@ def dict(self): return {"_block_document_id": "expected_block_id"} -@pytest.mark.asyncio -@patch("proxy.service.ShellOperation.load", new_callable=AsyncMock) -async def test_get_shell_block_id_valid_blockname(mock_load): - mock_load.return_value = MockShellOperation(None, None, None) - blockname = "valid_blockname" - result = await get_shell_block_id(blockname) - assert result == "expected_block_id" - mock_load.assert_called_once_with(blockname) - - -@pytest.mark.asyncio -@patch("proxy.service.ShellOperation.load", new_callable=AsyncMock) -async def test_get_shell_block_id_invalid_blockname(mock_load): - mock_load.side_effect = ValueError( - "no shell operation block named invalid_blockname" - ) - blockname = "invalid_blockname" - with pytest.raises(HTTPException) as excinfo: - await get_shell_block_id(blockname) - assert excinfo.value.status_code == 404 - assert excinfo.value.detail == f"No shell operation block named {blockname}" - mock_load.assert_called_once_with(blockname) - - -@pytest.mark.asyncio -async def test_get_shell_block_id_invalid_blockname_type(): - blockname = 123 - with pytest.raises(TypeError) as excinfo: - await get_shell_block_id(blockname) - assert str(excinfo.value) == "blockname must be a string" - - -@pytest.mark.asyncio -@patch("proxy.service.ShellOperation", new=MockShellOperation) -@patch("proxy.service.ShellOperation.load", new_callable=AsyncMock) -async def test_create_shell_block(mock_load): - mock_load.return_value = MockShellOperation(None, None, None) - shell = PrefectShellSetup( - blockName="test_block_name", - commands=["test_command"], - env={"test_key": "test_value"}, - workingDir="test_working_dir", - ) - result = await create_shell_block(shell) - assert result[0] == "expected_block_id" - - -@pytest.mark.asyncio -async def test_create_shell_block_invalid_shell(): - shell = "invalid_shell" - with pytest.raises(TypeError) as excinfo: - await create_shell_block(shell) - assert str(excinfo.value) == "shell must be a PrefectShellSetup" - - -@pytest.mark.asyncio -# @patch("proxy.service.ShellOperation", new=MockShellOperation) -@patch("proxy.service.ShellOperation.save", new_callable=AsyncMock) -async def test_create_shell_block_failure(mock_save): - mock_save.side_effect = Exception("save failed") - - shell = PrefectShellSetup( - blockName="test_block_name", - commands=["test_command"], - env={"test_key": "test_value"}, - workingDir="/tmp", - ) - - with pytest.raises(PrefectException) as excinfo: - await create_shell_block(shell) - assert str(excinfo.value) == "failed to create shell block" - - @patch("proxy.service.prefect_delete") def test_delete_shell_block(mock_prefect_delete): blockid = "test_blockid" @@ -703,36 +463,6 @@ def dict(self): return {"_block_document_id": "expected_block_id"} -@pytest.mark.asyncio -async def test_get_dbtcore_block_id_success(): - mock_block = MockBlock() - - with patch( - "proxy.service.DbtCoreOperation.load", new_callable=AsyncMock - ) as mock_load: - mock_load.return_value = mock_block - result = await get_dbtcore_block_id("test_block_name") - assert result == "expected_block_id" - - -@pytest.mark.asyncio -@patch("proxy.service.DbtCoreOperation.load", new_callable=AsyncMock) -async def test_get_dbtcore_block_id_failure(mock_load): - mock_load.side_effect = ValueError("load failed") - - with pytest.raises(HTTPException) as excinfo: - await get_dbtcore_block_id("test_block_name") - assert excinfo.value.status_code == 404 - assert excinfo.value.detail == "No dbt core operation block named test_block_name" - - -@pytest.mark.asyncio -async def test_get_dbtcore_block_id_invalid_blockname(): - with pytest.raises(TypeError) as excinfo: - await get_dbtcore_block_id(123) - assert str(excinfo.value) == "blockname must be a string" - - # @pytest.mark.asyncio # @patch("proxy.service.DbtCliProfile.save", new_callable=AsyncMock) # async def test_create_dbt_cli_profile(mock_save): @@ -1146,47 +876,6 @@ async def test_update_target_configs_schema(mock_load): assert dbt_coreop_block.commands[0] == "dbt run --target newtarget" -@pytest.mark.asyncio -async def test_post_deployment_bad_payload(): - with pytest.raises(TypeError) as excinfo: - await post_deployment(123) - assert str(excinfo.value) == "payload must be a DeploymentCreate" - - -@pytest.mark.asyncio -@patch("proxy.service.Deployment.build_from_flow", new_callable=AsyncMock) -@patch( - "proxy.service.deployment_schedule_flow_v3", - new_callable=Mock, -) -async def test_post_deployment(deployment_schedule_flow_v3, mock_build): - payload = DeploymentCreate( - flow_name="flow-name", - deployment_name="deployment-name", - org_slug="org-slug", - connection_blocks=[], - dbt_blocks=[], - cron=None, - ) - deployment = Mock( - apply=AsyncMock(return_value="deployment-id"), - ) - deployment.name = "deployment-name" - - mock_build.return_value = deployment - deployment_schedule_flow_v3.with_options = Mock(return_value="dsf") - - response = await post_deployment(payload) - assert response["id"] == "deployment-id" - assert response["name"] == "deployment-name" - mock_build.assert_called_once_with( - flow="dsf", - name=payload.deployment_name, - work_queue_name="ddp", - tags=[payload.org_slug], - ) - - def test_put_deployment_bad_param(): payload = 123 with pytest.raises(TypeError) as excinfo: