Skip to content

Commit

Permalink
feature: 新增节点hook事件
Browse files Browse the repository at this point in the history
Signed-off-by: benero <[email protected]>
  • Loading branch information
hanshuaikang authored and benero committed Jan 3, 2024
2 parents fadfeb5 + ece60f7 commit 6910681
Show file tree
Hide file tree
Showing 12 changed files with 228 additions and 18 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ celerybeat-schedule

# dotenv
.env
.envrc

# virtualenv
venv/
Expand All @@ -99,4 +100,3 @@ webpack_cache

# Editor setting file
.vscode/

36 changes: 36 additions & 0 deletions bamboo_engine/eri/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,15 @@ class TaskMixin:
引擎任务派发相关接口
"""

def pre_execute(self, root_pipeline_id: str, node_id: str):
"""
执行节点前需要执行的钩子
:param root_pipeline_id: 任务ID
:type node_id: str
:param node_id: 节点ID
:type node_id: str
"""

@abstractmethod
def execute(
self,
Expand Down Expand Up @@ -625,6 +634,24 @@ def execute(
:type headers: Optional[dict]
"""

def post_execute(self, root_pipeline_id: str, node_id: str):
"""
执行节点后需要执行的钩子
:param root_pipeline_id: 任务ID
:type node_id: str
:param node_id: 节点ID
:type node_id: str
"""

def pre_schedule(self, root_pipeline_id: str, node_id: str):
"""
派发调度任务前需要执行的钩子
:param root_pipeline_id: 任务ID
:type node_id: str
:param node_id: 节点ID
:type node_id: str
"""

@abstractmethod
def schedule(
self,
Expand Down Expand Up @@ -652,6 +679,15 @@ def schedule(
:type headers: Optional[dict]
"""

def post_schedule(self, root_pipeline_id: str, node_id: str):
"""
派发调度任务后需要执行的钩子
:param root_pipeline_id: 任务ID
:type node_id: str
:param node_id: 节点ID
:type node_id: str
"""

@abstractmethod
def set_next_schedule(
self,
Expand Down
8 changes: 8 additions & 0 deletions bamboo_engine/eri/models/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ class HookType(Enum):
EXECUTE = "execute"
# 节点 schedule
SCHEDULE = "schedule"
# 节点 execute 前
PRE_EXECUTE = "pre_execute"
# 节点 execute 后
POST_EXECUTE = "post_execute"
# 节点 schedule 前
PRE_SCHEDULE = "pre_schedule"
# 节点 schedule 后
POST_SCHEDULE = "post_schedule"


class InterruptEvent:
Expand Down
32 changes: 32 additions & 0 deletions bamboo_engine/handlers/service_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,23 @@ def execute(
root_pipeline_data=root_pipeline_data,
)
try:
self.runtime.pre_execute(root_pipeline_id=root_pipeline_id, node_id=self.node.id)
self.hook_dispatch(
hook=HookType.PRE_EXECUTE,
root_pipeline_id=root_pipeline_id,
service=service,
service_data=service_data,
root_pipeline_data=root_pipeline_data,
)
execute_success = service.execute(data=service_data, root_pipeline_data=root_pipeline_data)
self.runtime.post_execute(root_pipeline_id=root_pipeline_id, node_id=self.node.id)
self.hook_dispatch(
hook=HookType.POST_EXECUTE,
root_pipeline_id=root_pipeline_id,
service=service,
service_data=service_data,
root_pipeline_data=root_pipeline_data,
)
except Exception:
ENGINE_EXECUTE_EXCEPTION_COUNT.labels(type=node_type, hostname=self._hostname).inc()
ex_data = traceback.format_exc()
Expand Down Expand Up @@ -512,12 +528,28 @@ def schedule(
)
else:
try:
self.runtime.pre_schedule(root_pipeline_id=root_pipeline_id, node_id=self.node.id)
self.hook_dispatch(
hook=HookType.PRE_SCHEDULE,
root_pipeline_id=root_pipeline_id,
service=service,
service_data=service_data,
root_pipeline_data=root_pipeline_data,
)
schedule_success = service.schedule(
schedule=schedule,
data=service_data,
root_pipeline_data=root_pipeline_data,
callback_data=callback_data,
)
self.runtime.post_schedule(root_pipeline_id=root_pipeline_id, node_id=self.node.id)
self.hook_dispatch(
hook=HookType.POST_SCHEDULE,
root_pipeline_id=root_pipeline_id,
service=service,
service_data=service_data,
root_pipeline_data=root_pipeline_data,
)
except Exception:
ENGINE_SCHEDULE_EXCEPTION_COUNT.labels(type=node_type, hostname=self._hostname).inc()
service_data.outputs.ex_data = traceback.format_exc()
Expand Down
2 changes: 1 addition & 1 deletion runtime/bamboo-pipeline/pipeline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@

default_app_config = "pipeline.apps.PipelineConfig"

__version__ = "3.29.0"
__version__ = "3.29.1"
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,22 @@ def node_finish(self, data, parent_data):
"""节点执行结束"""
return True

def pre_execute(self, data, parent_data):
"""节点执行前"""
return True

def post_execute(self, data, parent_data):
"""节点执行后"""
return True

def pre_schedule(self, data, parent_data):
"""节点调度前"""
return True

def post_schedule(self, data, parent_data):
"""节点调度后"""
return True


class ServiceActivity(Activity):
result_bit = "_result"
Expand Down
56 changes: 56 additions & 0 deletions runtime/bamboo-pipeline/pipeline/eri/imp/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,3 +506,59 @@ def node_finish(self, root_pipeline_id: str, node_id: str):
event_type=self.node_finish.__name__, data={"root_pipeline_id": root_pipeline_id, "node_id": node_id}
)
)

def pre_execute(self, root_pipeline_id: str, node_id: str):
"""
节点执行前需要执行的钩子
:param root_pipeline_id: 任务ID
:type node_id: str
:param node_id: 节点ID
:type node_id: str
"""
self._send_event(
PipelineEvent(
event_type=self.pre_execute.__name__, data={"root_pipeline_id": root_pipeline_id, "node_id": node_id}
)
)

def post_execute(self, root_pipeline_id: str, node_id: str):
"""
节点执行后需要执行的钩子
:param root_pipeline_id: 任务ID
:type node_id: str
:param node_id: 节点ID
:type node_id: str
"""
self._send_event(
PipelineEvent(
event_type=self.post_execute.__name__, data={"root_pipeline_id": root_pipeline_id, "node_id": node_id}
)
)

def pre_schedule(self, root_pipeline_id: str, node_id: str):
"""
节点调度前需要执行的钩子
:param root_pipeline_id: 任务ID
:type node_id: str
:param node_id: 节点ID
:type node_id: str
"""
self._send_event(
PipelineEvent(
event_type=self.pre_schedule.__name__, data={"root_pipeline_id": root_pipeline_id, "node_id": node_id}
)
)

def post_schedule(self, root_pipeline_id: str, node_id: str):
"""
节点调度后需要执行的钩子
:param root_pipeline_id: 任务ID
:type node_id: str
:param node_id: 节点ID
:type node_id: str
"""
self._send_event(
PipelineEvent(
event_type=self.post_schedule.__name__, data={"root_pipeline_id": root_pipeline_id, "node_id": node_id}
)
)
4 changes: 2 additions & 2 deletions runtime/bamboo-pipeline/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions runtime/bamboo-pipeline/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "bamboo-pipeline"
version = "3.29.0"
version = "3.29.1"
description = "runtime for bamboo-engine base on Django and Celery"
authors = ["homholueng <[email protected]>"]
license = "MIT"
Expand All @@ -16,7 +16,7 @@ requests = "^2.22.0"
django-celery-beat = "^2.1.0"
Mako = "^1.1.4"
pytz = "2019.3"
bamboo-engine = "2.10.0"
bamboo-engine = "^2.10.1"
jsonschema = "^2.5.1"
ujson = "4.1.*"
pyparsing = "^2.2.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,23 @@
import pytest
from pipeline.eri.runtime import BambooDjangoRuntime

from bamboo_engine.builder import * # noqa
from bamboo_engine.builder import ( # noqa
EmptyEndEvent,
EmptyStartEvent,
ServiceActivity,
Var,
build_tree,
builder,
)
from bamboo_engine.engine import Engine

from ..utils import * # noqa
from ..utils import (
assert_all_failed,
assert_all_finish,
assert_exec_data_equal,
assert_schedule_finish,
runtime,
)


def test_execution():
Expand All @@ -32,11 +45,24 @@ def test_execution():
"_inner_loop": 1,
"_loop": 1,
"_result": True,
"hook_call_order": ["node_enter", "execute", "schedule", "node_finish"],
"hook_call_order": [
"node_enter",
"pre_execute",
"execute",
"post_execute",
"pre_schedule",
"schedule",
"post_schedule",
"node_finish",
],
"pre_execute": 1,
"execute": 1,
"post_execute": 1,
"node_enter": 1,
"node_finish": 1,
"pre_schedule": 1,
"schedule": 1,
"post_schedule": 1,
},
},
}
Expand All @@ -51,28 +77,46 @@ def test_execution():
True,
False,
False,
["node_enter", "execute", "node_execute_exception", "node_execute_fail"],
["node_enter", "pre_execute", "execute", "node_execute_exception", "node_execute_fail"],
id="execute raise and not ignore",
),
pytest.param(
True,
False,
True,
["node_enter", "execute", "node_execute_exception", "node_finish"],
["node_enter", "pre_execute", "execute", "node_execute_exception", "node_finish"],
id="execute raise and ignore",
),
pytest.param(
False,
True,
False,
["node_enter", "execute", "schedule", "node_schedule_exception", "node_schedule_fail"],
[
"node_enter",
"pre_execute",
"execute",
"post_execute",
"pre_schedule",
"schedule",
"node_schedule_exception",
"node_schedule_fail",
],
id="schedule raise and not ignore",
),
pytest.param(
False,
True,
True,
["node_enter", "execute", "schedule", "node_schedule_exception", "node_finish"],
[
"node_enter",
"pre_execute",
"execute",
"post_execute",
"pre_schedule",
"schedule",
"node_schedule_exception",
"node_finish",
],
id="schedule raise and ignore",
),
],
Expand Down
12 changes: 7 additions & 5 deletions runtime/bamboo-pipeline/test/pipeline_sdk_use/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,10 @@
)

REDIS = {
"host": "localhost",
"port": 6379,
"host": os.getenv("PIPELINE_TEST_REDIS_HOST", "localhost"),
"port": int(os.getenv("PIPELINE_TEST_REDIS_PORT") or 6379),
"password": os.getenv("PIPELINE_TEST_REDIS_PASSWORD"),
"db": int(os.getenv("PIPELINE_TEST_REDIS_DB") or 0),
}

TEMPLATES = [
Expand Down Expand Up @@ -117,8 +119,8 @@
"NAME": os.getenv("PIPELINE_TEST_DB_NAME"),
"USER": os.getenv("PIPELINE_TEST_DB_USER"),
"PASSWORD": os.getenv("PIPELINE_TEST_DB_PWD"),
"HOST": "localhost",
"PORT": 3306,
"HOST": os.getenv("PIPELINE_TEST_DB_HOST", "localhost"),
"PORT": int(os.getenv("PIPELINE_TEST_DB_PORT", 3306)),
"TEST": {"CHARSET": "utf8", "COLLATION": "utf8_general_ci"},
}
}
Expand Down Expand Up @@ -160,7 +162,7 @@

BROKER_VHOST = "/"

BROKER_URL = "amqp://guest:guest@localhost:5672//"
BROKER_URL = os.getenv("PIPELINE_TEST_BROKER_URL", "amqp://guest:guest@localhost:5672//")

# BROKER_URL = "redis://localhost:6379/0"

Expand Down
Loading

0 comments on commit 6910681

Please sign in to comment.