Skip to content

Commit

Permalink
Merge pull request #153 from DalgoT4D/schema-change-updates
Browse files Browse the repository at this point in the history
proxy to run reset on selected streams
  • Loading branch information
Ishankoradia authored Aug 27, 2024
2 parents bf658a7 + 8f1752a commit fee6791
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 4 deletions.
77 changes: 73 additions & 4 deletions proxy/prefect_flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,14 @@
from prefect import flow, task
from prefect.blocks.system import Secret
from prefect.states import State, StateType
from prefect_airbyte.flows import run_connection_sync, reset_connection
from prefect_airbyte.flows import (
run_connection_sync,
reset_connection,
reset_connection_streams,
update_connection_schema,
)
from prefect_airbyte import AirbyteConnection, AirbyteServer
from prefect_airbyte.connections import ResetStream
from prefect_dbt.cli.commands import DbtCoreOperation, ShellOperation
from prefect_dbt.cli import DbtCliProfile
from proxy.helpers import CustomLogger
Expand Down Expand Up @@ -57,7 +63,7 @@ def run_airbyte_connection_flow_v1(payload: dict):
# task config for a airbyte reset operation
# {
# type AIRBYTECONNECTION,
# slug: str
# slug: "airbyte-reset"
# airbyte_server_block: str
# connection_id: str
# timeout: int
Expand All @@ -82,6 +88,34 @@ def run_airbyte_conn_reset(payload: dict):
raise


# task config for a airbyte reset operation
# {
# type AIRBYTECONNECTION,
# slug: "airbyte-reset"
# airbyte_server_block: str
# connection_id: str
# timeout: int
# }
@flow
def run_airbyte_reset_streams_for_conn(payload: dict, streams: list[ResetStream]):
"""reset an airbyte connection"""
try:
airbyte_server_block = payload["airbyte_server_block"]
serverblock = AirbyteServer.load(airbyte_server_block)
connection_block = AirbyteConnection(
airbyte_server=serverblock,
connection_id=payload["connection_id"],
timeout=payload["timeout"] or 15,
)
result = reset_connection_streams(connection_block, streams)
logger.info("airbyte connection reset result=")
logger.info(result)
return result
except Exception as error: # skipcq PYL-W0703
logger.error(str(error)) # "Job <num> failed."
raise


@flow
def run_dbtcore_flow_v1(payload: dict):
# pylint: disable=broad-exception-caught
Expand All @@ -96,6 +130,25 @@ def run_shell_operation_flow(payload: dict):
return shellopjob(payload, payload["slug"])


@flow
def run_refresh_schema_flow(payload: dict, catalog_diff: dict):
# pylint: disable=broad-exception-caught
# """Prefect flow to run refresh schema"""
try:
airbyte_server_block = payload["airbyte_server_block"]
serverblock = AirbyteServer.load(airbyte_server_block)
connection_block = AirbyteConnection(
airbyte_server=serverblock,
connection_id=payload["connection_id"],
timeout=payload["timeout"] or 15,
)
update_connection_schema(connection_block, catalog_diff=catalog_diff)
return True
except Exception as error: # skipcq PYL-W0703
logger.error(str(error)) # "Job <num> failed."
raise


# =============================================================================
# tasks
# task config for a dbt core operation
Expand Down Expand Up @@ -198,7 +251,11 @@ def shellopjob(task_config: dict, task_slug: str): # pylint: disable=unused-arg
commands=task_config["commands"],
env=task_config["env"],
working_dir=task_config["working_dir"],
shell=task_config["env"]["shell"] if "shell" in task_config["env"] else "/bin/bash",
shell=(
task_config["env"]["shell"]
if "shell" in task_config["env"]
else "/bin/bash"
),
)
return shell_op.run()

Expand Down Expand Up @@ -241,10 +298,22 @@ def deployment_schedule_flow_v4(

elif task_config["type"] == AIRBYTECONNECTION:
if task_config["slug"] == "airbyte-reset":
run_airbyte_conn_reset(task_config)
if task_config.get("streams") and len(task_config["streams"]) > 0:
# run reset for streams
streams = [
ResetStream(**stream) for stream in task_config["streams"]
]
run_airbyte_reset_streams_for_conn(task_config, streams)
else:
# run full reset of all streams
run_airbyte_conn_reset(task_config)
elif task_config["slug"] == "airbyte-sync":
run_airbyte_connection_flow_v1(task_config)

elif task_config["slug"] == "update-schema":
run_refresh_schema_flow(
task_config, catalog_diff=task_config.get("catalog_diff", {})
)
else:
raise Exception(f"Unknown task type: {task_config['type']}")

Expand Down
12 changes: 12 additions & 0 deletions refresh-prefect-airbyte.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!/bin/sh

# Check if the branch name is provided as an argument
if [ -z "$1" ]; then
BRANCH=""
else
BRANCH="@$1"
fi

yes Y | pip uninstall prefect_airbyte
echo "Installing prefect_airbyte from branch $BRANCH. Defaults to main if no branch is provided."
yes Y | pip install prefect_airbyte@git+https://github.com/Ishankoradia/prefect-airbyte$BRANCH

0 comments on commit fee6791

Please sign in to comment.