Skip to content

Commit

Permalink
Merge pull request #111 from kauxp/remove_unused_funcs
Browse files Browse the repository at this point in the history
Remove unused prefect-related functions
  • Loading branch information
fatchat authored Apr 19, 2024
2 parents 508ad8f + c652234 commit 1ce1d5a
Show file tree
Hide file tree
Showing 5 changed files with 4 additions and 1,300 deletions.
269 changes: 1 addition & 268 deletions proxy/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,58 +10,41 @@
from proxy.service import (
get_airbyte_server_block_id,
create_airbyte_server_block,
get_airbyte_connection_block_id,
get_airbyte_connection_block,
create_airbyte_connection_block,
get_dbtcore_block_id,
create_dbt_core_block,
put_deployment,
put_deployment_v1,
update_postgres_credentials,
update_bigquery_credentials,
update_target_configs_schema,
get_shell_block_id,
create_shell_block,
post_deployment,
post_deployment_v1,
get_flow_runs_by_deployment_id,
get_deployments_by_filter,
get_flow_run_logs,
post_deployment_flow_run,
get_flow_runs_by_name,
post_filter_blocks,
set_deployment_schedule,
get_deployment,
get_flow_run,
create_secret_block,
_create_dbt_cli_profile,
update_dbt_cli_profile,
get_dbt_cli_profile,
get_secret_block_document,
)
from proxy.schemas import (
AirbyteServerCreate,
AirbyteConnectionCreate,
DeploymentUpdate,
PrefectShellSetup,
DbtCoreCreate,
DbtCoreCredentialUpdate,
DbtCoreSchemaUpdate,
RunFlow,
RunDbtCoreOperation,
RunShellOperation,
DeploymentCreate,
DeploymentCreate2,
DeploymentFetch,
FlowRunRequest,
PrefectBlocksDelete,
AirbyteConnectionBlocksFetch,
PrefectSecretBlockCreate,
DbtCliProfileBlockCreate,
DeploymentUpdate2,
DbtCliProfileBlockUpdate,
)
from proxy.flows import run_airbyte_connection_flow, run_dbtcore_flow
from proxy.flows import run_airbyte_connection_flow

from proxy.prefect_flows import run_shell_operation_flow, run_dbtcore_flow_v1

Expand Down Expand Up @@ -114,32 +97,6 @@ def airbytesync(block_name: str, flow_name: str, flow_run_name: str):
raise


def dbtrun(block_name: str, flow_name: str, flow_run_name: str):
"""Run a dbt core flow"""
if not isinstance(block_name, str):
raise TypeError("block_name must be a string")
if not isinstance(flow_name, str):
raise TypeError("flow_name must be a string")
if not isinstance(flow_run_name, str):
raise TypeError("flow_run_name must be a string")

logger.info("dbtrun %s %s %s", block_name, flow_name, flow_run_name)
flow = run_dbtcore_flow
if flow_name:
flow = flow.with_options(name=flow_name)
if flow_run_name:
flow = flow.with_options(flow_run_name=flow_run_name)

try:
result = flow(block_name)
return result
except Exception as error:
logger.exception(error)
raise HTTPException(
status_code=400, detail="failed to run dbt core flow"
) from error


def dbtrun_v1(task_config: RunDbtCoreOperation):
"""Run a dbt core flow"""

Expand Down Expand Up @@ -181,50 +138,6 @@ def shelloprun(task_config: RunShellOperation):
) from error


# =============================================================================
@app.post("/proxy/blocks/bulk/delete/")
async def post_bulk_delete_blocks(request: Request, payload: PrefectBlocksDelete):
"""Delete all airbyte connection blocks in the payload array"""
root = os.getenv("PREFECT_API_URL")
deleted_blockids = []
for block_id in payload.block_ids:
logger.info("deleting block_id : %s ", block_id)
res = requests.delete(f"{root}/block_documents/{block_id}", timeout=10)
try:
res.raise_for_status()
except requests.exceptions.HTTPError as error:
logger.error(
"something went wrong deleting block_id %s: %s", block_id, res.text
)
logger.exception(error)
continue
logger.info("deleted block with block_id : %s", block_id)
deleted_blockids.append(block_id)

return {"deleted_blockids": deleted_blockids}


# =============================================================================
@app.post("/proxy/blocks/airbyte/connection/filter")
def post_airbyte_connection_blocks(
request: Request, payload: AirbyteConnectionBlocksFetch
):
"""Filter the prefect blocks with parameters from payload"""
blocks = post_filter_blocks(payload.block_names)

# we only care about the block name and its connection id
response = []
for blk in blocks:
connection_id = ""
if "data" in blk and "connection_id" in blk["data"]:
connection_id = blk["data"]["connection_id"]
response.append(
{"name": blk["name"], "connectionId": connection_id, "id": blk["id"]}
)

return response


# =============================================================================
@app.get("/proxy/blocks/airbyte/server/{blockname}")
async def get_airbyte_server(request: Request, blockname: str):
Expand Down Expand Up @@ -268,94 +181,6 @@ async def post_airbyte_server(request: Request, payload: AirbyteServerCreate):


# =============================================================================
@app.get("/proxy/blocks/airbyte/connection/byblockname/{blockname}")
async def get_airbyte_connection_by_blockname(request: Request, blockname):
"""look up airbyte connection block by name and return block_id"""
block_id = await get_airbyte_connection_block_id(blockname)
if block_id is None:
logger.error("no airbyte connection block having name %s", blockname)
raise HTTPException(status_code=400, detail="no block having name " + blockname)
logger.info("blockname => blockid : %s => %s", blockname, block_id)
return {"block_id": block_id}


@app.get("/proxy/blocks/airbyte/connection/byblockid/{blockid}")
async def get_airbyte_connection_by_blockid(request: Request, blockid):
"""look up airbyte connection block by id and return block data"""
block = await get_airbyte_connection_block(blockid)
if block is None:
logger.error("no airbyte connection block having id %s", blockid)
raise HTTPException(status_code=400, detail="no block having id " + blockid)
logger.info("Found airbyte connection block by id: %s", block)
return block


@app.post("/proxy/blocks/airbyte/connection/")
async def post_airbyte_connection(request: Request, payload: AirbyteConnectionCreate):
"""
create a new airbyte connection block with this block name,
raise an exception if the name is already in use
"""
try:
block_id = await create_airbyte_connection_block(payload)
except Exception as error:
logger.exception(error)
raise HTTPException(
status_code=400, detail="failed to create airbyte connection block"
) from error
logger.info("Created new airbyte connection block with ID: %s", block_id)
return {"block_id": block_id}


# =============================================================================
@app.get("/proxy/blocks/shell/{blockname}")
async def get_shell(request: Request, blockname):
"""look up a shell operation block by name and return block_id"""
if not isinstance(blockname, str):
raise TypeError("blockname must be a string")
block_id = await get_shell_block_id(blockname)
if block_id is None:
logger.error("no shell block having name %s", blockname)
raise HTTPException(status_code=400, detail="no block having name " + blockname)
logger.info("blockname => blockid : %s => %s", blockname, block_id)
return {"block_id": block_id}


@app.post("/proxy/blocks/shell/")
async def post_shell(request: Request, payload: PrefectShellSetup):
"""
create a new shell block with this block name,
raise an exception if the name is already in use
"""
if not isinstance(payload, PrefectShellSetup):
raise TypeError("payload is invalid")
logger.info(payload)
try:
block_id, cleaned_blockname = await create_shell_block(payload)
except Exception as error:
logger.exception(error)
raise HTTPException(
status_code=400, detail="failed to create shell block"
) from error
logger.info("Created new shell block with ID: %s", block_id)
return {"block_id": block_id, "block_name": cleaned_blockname}


# =============================================================================
@app.get("/proxy/blocks/dbtcore/{blockname}")
async def get_dbtcore(request: Request, blockname):
"""look up a dbt core operation block by name and return block_id"""
if not isinstance(blockname, str):
raise TypeError("blockname must be a string")

block_id = await get_dbtcore_block_id(blockname)
if block_id is None:
logger.error("no dbt core block having name %s", blockname)
raise HTTPException(status_code=400, detail="no block having name " + blockname)
logger.info("blockname => blockid : %s => %s", blockname, block_id)
return {"block_id": block_id}


@app.post("/proxy/blocks/dbtcore/")
async def post_dbtcore(request: Request, payload: DbtCoreCreate):
"""
Expand Down Expand Up @@ -515,24 +340,6 @@ async def post_secret_block(request: Request, payload: PrefectSecretBlockCreate)
return {"block_id": block_id, "block_name": cleaned_blockname}


@app.get("/proxy/blocks/secret/{blockname}")
async def get_secret_block(request: Request, blockname: str):
"""fetch the secret block documents"""
try:
block_id, block_name = await get_secret_block_document(blockname)
except Exception as error:
logger.exception(error)
raise HTTPException(
status_code=400, detail="failed to fetch secret block document"
) from error
logger.info(
"Fetched the secret block with ID: %s and name: %s",
block_id,
block_name,
)
return {"block_id": block_id, "block_name": block_name}


# =============================================================================
@app.delete("/delete-a-block/{block_id}")
async def delete_block(request: Request, block_id):
Expand All @@ -550,44 +357,6 @@ async def delete_block(request: Request, block_id):


# =============================================================================
@app.post("/proxy/flows/airbyte/connection/sync/")
async def sync_airbyte_connection_flow(request: Request, payload: RunFlow):
"""Prefect flow to sync an airbyte connection"""
if not isinstance(payload, RunFlow):
raise TypeError("payload is invalid")
logger.info(payload)
if payload.blockName == "":
logger.error("received empty blockName")
raise HTTPException(status_code=400, detail="received empty blockName")
logger.info("Running airbyte connection sync flow")
try:
result = airbytesync(payload.blockName, payload.flowName, payload.flowRunName)
logger.info(result)
return result
except Exception as error:
logger.exception(error)
raise HTTPException(status_code=400, detail=str(error)) from error


@app.post("/proxy/flows/dbtcore/run/")
async def sync_dbtcore_flow(request: Request, payload: RunFlow):
"""Prefect flow to run dbt"""
logger.info(payload)
if not isinstance(payload, RunFlow):
raise TypeError("payload is invalid")
if payload.blockName == "":
logger.error("received empty blockName")
raise HTTPException(status_code=400, detail="received empty blockName")
logger.info("running dbtcore-run for dbt-core-op %s", payload.blockName)
try:
result = dbtrun(payload.blockName, payload.flowName, payload.flowRunName)
logger.info(result)
return {"status": "success", "result": result}
except Exception as error:
logger.exception(error)
raise HTTPException(status_code=400, detail=str(error)) from error


@app.post("/proxy/v1/flows/dbtcore/run/")
async def sync_dbtcore_flow_v1(request: Request, payload: RunDbtCoreOperation):
"""Prefect flow to run dbt"""
Expand Down Expand Up @@ -622,24 +391,6 @@ async def sync_shellop_flow(request: Request, payload: RunShellOperation):
raise HTTPException(status_code=400, detail=str(error)) from error


@app.post("/proxy/deployments/")
async def post_dataflow(request: Request, payload: DeploymentCreate):
"""Create a deployment from an existing flow"""
if not isinstance(payload, DeploymentCreate):
raise TypeError("payload is invalid")

logger.info(payload)
try:
deployment = await post_deployment(payload)
except Exception as error:
logger.exception(error)
raise HTTPException(
status_code=400, detail="failed to create deployment"
) from error
logger.info("Created new deployment: %s", deployment)
return {"deployment": deployment}


@app.post("/proxy/v1/deployments/")
async def post_dataflow_v1(request: Request, payload: DeploymentCreate2):
"""Create a deployment from an existing flow"""
Expand All @@ -658,24 +409,6 @@ async def post_dataflow_v1(request: Request, payload: DeploymentCreate2):
return {"deployment": deployment}


@app.put("/proxy/deployments/{deployment_id}")
def put_dataflow(request: Request, deployment_id, payload: DeploymentUpdate):
"""updates a deployment"""
if not isinstance(payload, DeploymentUpdate):
raise TypeError("payload is invalid")

logger.info(payload)
try:
put_deployment(deployment_id, payload)
except Exception as error:
logger.exception(error)
raise HTTPException(
status_code=400, detail="failed to update the deployment"
) from error
logger.info("Updated the deployment: %s", deployment_id)
return {"success": 1}


@app.put("/proxy/v1/deployments/{deployment_id}")
def put_dataflow_v1(request: Request, deployment_id, payload: DeploymentUpdate2):
"""updates a deployment"""
Expand Down
Loading

0 comments on commit 1ce1d5a

Please sign in to comment.