diff --git a/.gitignore b/.gitignore index df60084c..7a607e3b 100644 --- a/.gitignore +++ b/.gitignore @@ -73,6 +73,7 @@ celerybeat-schedule # dotenv .env +.envrc # virtualenv venv/ @@ -99,4 +100,3 @@ webpack_cache # Editor setting file .vscode/ - diff --git a/bamboo_engine/eri/interfaces.py b/bamboo_engine/eri/interfaces.py index 8ac6fb23..0ddbbd17 100644 --- a/bamboo_engine/eri/interfaces.py +++ b/bamboo_engine/eri/interfaces.py @@ -598,6 +598,15 @@ class TaskMixin: 引擎任务派发相关接口 """ + def pre_execute(self, root_pipeline_id: str, node_id: str): + """ + 执行节点前需要执行的钩子 + :param root_pipeline_id: 任务ID + :type node_id: str + :param node_id: 节点ID + :type node_id: str + """ + @abstractmethod def execute( self, @@ -625,6 +634,24 @@ def execute( :type headers: Optional[dict] """ + def post_execute(self, root_pipeline_id: str, node_id: str): + """ + 执行节点后需要执行的钩子 + :param root_pipeline_id: 任务ID + :type node_id: str + :param node_id: 节点ID + :type node_id: str + """ + + def pre_schedule(self, root_pipeline_id: str, node_id: str): + """ + 派发调度任务前需要执行的钩子 + :param root_pipeline_id: 任务ID + :type node_id: str + :param node_id: 节点ID + :type node_id: str + """ + @abstractmethod def schedule( self, @@ -652,6 +679,15 @@ def schedule( :type headers: Optional[dict] """ + def post_schedule(self, root_pipeline_id: str, node_id: str): + """ + 派发调度任务后需要执行的钩子 + :param root_pipeline_id: 任务ID + :type node_id: str + :param node_id: 节点ID + :type node_id: str + """ + @abstractmethod def set_next_schedule( self, diff --git a/bamboo_engine/eri/models/event.py b/bamboo_engine/eri/models/event.py index efd8730f..a453bc43 100644 --- a/bamboo_engine/eri/models/event.py +++ b/bamboo_engine/eri/models/event.py @@ -54,6 +54,14 @@ class HookType(Enum): EXECUTE = "execute" # 节点 schedule SCHEDULE = "schedule" + # 节点 execute 前 + PRE_EXECUTE = "pre_execute" + # 节点 execute 后 + POST_EXECUTE = "post_execute" + # 节点 schedule 前 + PRE_SCHEDULE = "pre_schedule" + # 节点 schedule 后 + POST_SCHEDULE = "post_schedule" class InterruptEvent: diff --git a/bamboo_engine/handlers/service_activity.py b/bamboo_engine/handlers/service_activity.py index ed1d93ef..c84d523c 100644 --- a/bamboo_engine/handlers/service_activity.py +++ b/bamboo_engine/handlers/service_activity.py @@ -275,7 +275,23 @@ def execute( root_pipeline_data=root_pipeline_data, ) try: + self.runtime.pre_execute(root_pipeline_id=root_pipeline_id, node_id=self.node.id) + self.hook_dispatch( + hook=HookType.PRE_EXECUTE, + root_pipeline_id=root_pipeline_id, + service=service, + service_data=service_data, + root_pipeline_data=root_pipeline_data, + ) execute_success = service.execute(data=service_data, root_pipeline_data=root_pipeline_data) + self.runtime.post_execute(root_pipeline_id=root_pipeline_id, node_id=self.node.id) + self.hook_dispatch( + hook=HookType.POST_EXECUTE, + root_pipeline_id=root_pipeline_id, + service=service, + service_data=service_data, + root_pipeline_data=root_pipeline_data, + ) except Exception: ENGINE_EXECUTE_EXCEPTION_COUNT.labels(type=node_type, hostname=self._hostname).inc() ex_data = traceback.format_exc() @@ -512,12 +528,28 @@ def schedule( ) else: try: + self.runtime.pre_schedule(root_pipeline_id=root_pipeline_id, node_id=self.node.id) + self.hook_dispatch( + hook=HookType.PRE_SCHEDULE, + root_pipeline_id=root_pipeline_id, + service=service, + service_data=service_data, + root_pipeline_data=root_pipeline_data, + ) schedule_success = service.schedule( schedule=schedule, data=service_data, root_pipeline_data=root_pipeline_data, callback_data=callback_data, ) + self.runtime.post_schedule(root_pipeline_id=root_pipeline_id, node_id=self.node.id) + self.hook_dispatch( + hook=HookType.POST_SCHEDULE, + root_pipeline_id=root_pipeline_id, + service=service, + service_data=service_data, + root_pipeline_data=root_pipeline_data, + ) except Exception: ENGINE_SCHEDULE_EXCEPTION_COUNT.labels(type=node_type, hostname=self._hostname).inc() service_data.outputs.ex_data = traceback.format_exc() diff --git a/runtime/bamboo-pipeline/pipeline/__init__.py b/runtime/bamboo-pipeline/pipeline/__init__.py index fc88d19b..023c5a7f 100644 --- a/runtime/bamboo-pipeline/pipeline/__init__.py +++ b/runtime/bamboo-pipeline/pipeline/__init__.py @@ -13,4 +13,4 @@ default_app_config = "pipeline.apps.PipelineConfig" -__version__ = "3.29.0" +__version__ = "3.29.1" diff --git a/runtime/bamboo-pipeline/pipeline/core/flow/activity/service_activity.py b/runtime/bamboo-pipeline/pipeline/core/flow/activity/service_activity.py index d0076b4b..cb05b276 100644 --- a/runtime/bamboo-pipeline/pipeline/core/flow/activity/service_activity.py +++ b/runtime/bamboo-pipeline/pipeline/core/flow/activity/service_activity.py @@ -193,6 +193,22 @@ def node_finish(self, data, parent_data): """节点执行结束""" return True + def pre_execute(self, data, parent_data): + """节点执行前""" + return True + + def post_execute(self, data, parent_data): + """节点执行后""" + return True + + def pre_schedule(self, data, parent_data): + """节点调度前""" + return True + + def post_schedule(self, data, parent_data): + """节点调度后""" + return True + class ServiceActivity(Activity): result_bit = "_result" diff --git a/runtime/bamboo-pipeline/pipeline/eri/imp/hooks.py b/runtime/bamboo-pipeline/pipeline/eri/imp/hooks.py index 23904fbd..647db5e6 100644 --- a/runtime/bamboo-pipeline/pipeline/eri/imp/hooks.py +++ b/runtime/bamboo-pipeline/pipeline/eri/imp/hooks.py @@ -506,3 +506,59 @@ def node_finish(self, root_pipeline_id: str, node_id: str): event_type=self.node_finish.__name__, data={"root_pipeline_id": root_pipeline_id, "node_id": node_id} ) ) + + def pre_execute(self, root_pipeline_id: str, node_id: str): + """ + 节点执行前需要执行的钩子 + :param root_pipeline_id: 任务ID + :type node_id: str + :param node_id: 节点ID + :type node_id: str + """ + self._send_event( + PipelineEvent( + event_type=self.pre_execute.__name__, data={"root_pipeline_id": root_pipeline_id, "node_id": node_id} + ) + ) + + def post_execute(self, root_pipeline_id: str, node_id: str): + """ + 节点执行后需要执行的钩子 + :param root_pipeline_id: 任务ID + :type node_id: str + :param node_id: 节点ID + :type node_id: str + """ + self._send_event( + PipelineEvent( + event_type=self.post_execute.__name__, data={"root_pipeline_id": root_pipeline_id, "node_id": node_id} + ) + ) + + def pre_schedule(self, root_pipeline_id: str, node_id: str): + """ + 节点调度前需要执行的钩子 + :param root_pipeline_id: 任务ID + :type node_id: str + :param node_id: 节点ID + :type node_id: str + """ + self._send_event( + PipelineEvent( + event_type=self.pre_schedule.__name__, data={"root_pipeline_id": root_pipeline_id, "node_id": node_id} + ) + ) + + def post_schedule(self, root_pipeline_id: str, node_id: str): + """ + 节点调度后需要执行的钩子 + :param root_pipeline_id: 任务ID + :type node_id: str + :param node_id: 节点ID + :type node_id: str + """ + self._send_event( + PipelineEvent( + event_type=self.post_schedule.__name__, data={"root_pipeline_id": root_pipeline_id, "node_id": node_id} + ) + ) diff --git a/runtime/bamboo-pipeline/poetry.lock b/runtime/bamboo-pipeline/poetry.lock index 92e44db8..763b430f 100644 --- a/runtime/bamboo-pipeline/poetry.lock +++ b/runtime/bamboo-pipeline/poetry.lock @@ -57,7 +57,7 @@ tests_no_zope = ["hypothesis", "pympler", "pytest (>=4.3.0)", "pytest-xdist", "c [[package]] name = "bamboo-engine" -version = "2.10.0" +version = "2.10.1" description = "Bamboo-engine is a general-purpose workflow engine" category = "main" optional = false @@ -746,7 +746,7 @@ testing = ["pytest (>=4.6)", "pytest-checkdocs (>=2.4)", "pytest-flake8", "pytes [metadata] lock-version = "1.1" python-versions = ">= 3.6, < 4" -content-hash = "686a5768c4985d082d282929f05253c1f234b39b61259cdb30812da79c8efa23" +content-hash = "7374f226043a63c6b79889cffe98b434ad8b737e7d9a2cc33f6f3243e164e6d5" [metadata.files] amqp = [] diff --git a/runtime/bamboo-pipeline/pyproject.toml b/runtime/bamboo-pipeline/pyproject.toml index 2cf3f21e..6145574f 100644 --- a/runtime/bamboo-pipeline/pyproject.toml +++ b/runtime/bamboo-pipeline/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "bamboo-pipeline" -version = "3.29.0" +version = "3.29.1" description = "runtime for bamboo-engine base on Django and Celery" authors = ["homholueng "] license = "MIT" @@ -16,7 +16,7 @@ requests = "^2.22.0" django-celery-beat = "^2.1.0" Mako = "^1.1.4" pytz = "2019.3" -bamboo-engine = "2.10.0" +bamboo-engine = "^2.10.1" jsonschema = "^2.5.1" ujson = "4.1.*" pyparsing = "^2.2.0" diff --git a/runtime/bamboo-pipeline/test/eri_imp_test_use/tests/hook/test_node.py b/runtime/bamboo-pipeline/test/eri_imp_test_use/tests/hook/test_node.py index 88afd6ab..8ad29a52 100755 --- a/runtime/bamboo-pipeline/test/eri_imp_test_use/tests/hook/test_node.py +++ b/runtime/bamboo-pipeline/test/eri_imp_test_use/tests/hook/test_node.py @@ -4,10 +4,23 @@ import pytest from pipeline.eri.runtime import BambooDjangoRuntime -from bamboo_engine.builder import * # noqa +from bamboo_engine.builder import ( # noqa + EmptyEndEvent, + EmptyStartEvent, + ServiceActivity, + Var, + build_tree, + builder, +) from bamboo_engine.engine import Engine -from ..utils import * # noqa +from ..utils import ( + assert_all_failed, + assert_all_finish, + assert_exec_data_equal, + assert_schedule_finish, + runtime, +) def test_execution(): @@ -32,11 +45,24 @@ def test_execution(): "_inner_loop": 1, "_loop": 1, "_result": True, - "hook_call_order": ["node_enter", "execute", "schedule", "node_finish"], + "hook_call_order": [ + "node_enter", + "pre_execute", + "execute", + "post_execute", + "pre_schedule", + "schedule", + "post_schedule", + "node_finish", + ], + "pre_execute": 1, "execute": 1, + "post_execute": 1, "node_enter": 1, "node_finish": 1, + "pre_schedule": 1, "schedule": 1, + "post_schedule": 1, }, }, } @@ -51,28 +77,46 @@ def test_execution(): True, False, False, - ["node_enter", "execute", "node_execute_exception", "node_execute_fail"], + ["node_enter", "pre_execute", "execute", "node_execute_exception", "node_execute_fail"], id="execute raise and not ignore", ), pytest.param( True, False, True, - ["node_enter", "execute", "node_execute_exception", "node_finish"], + ["node_enter", "pre_execute", "execute", "node_execute_exception", "node_finish"], id="execute raise and ignore", ), pytest.param( False, True, False, - ["node_enter", "execute", "schedule", "node_schedule_exception", "node_schedule_fail"], + [ + "node_enter", + "pre_execute", + "execute", + "post_execute", + "pre_schedule", + "schedule", + "node_schedule_exception", + "node_schedule_fail", + ], id="schedule raise and not ignore", ), pytest.param( False, True, True, - ["node_enter", "execute", "schedule", "node_schedule_exception", "node_finish"], + [ + "node_enter", + "pre_execute", + "execute", + "post_execute", + "pre_schedule", + "schedule", + "node_schedule_exception", + "node_finish", + ], id="schedule raise and ignore", ), ], diff --git a/runtime/bamboo-pipeline/test/pipeline_sdk_use/settings.py b/runtime/bamboo-pipeline/test/pipeline_sdk_use/settings.py index 3d7e8ed3..eb9cd4dd 100755 --- a/runtime/bamboo-pipeline/test/pipeline_sdk_use/settings.py +++ b/runtime/bamboo-pipeline/test/pipeline_sdk_use/settings.py @@ -81,8 +81,10 @@ ) REDIS = { - "host": "localhost", - "port": 6379, + "host": os.getenv("PIPELINE_TEST_REDIS_HOST", "localhost"), + "port": int(os.getenv("PIPELINE_TEST_REDIS_PORT") or 6379), + "password": os.getenv("PIPELINE_TEST_REDIS_PASSWORD"), + "db": int(os.getenv("PIPELINE_TEST_REDIS_DB") or 0), } TEMPLATES = [ @@ -117,8 +119,8 @@ "NAME": os.getenv("PIPELINE_TEST_DB_NAME"), "USER": os.getenv("PIPELINE_TEST_DB_USER"), "PASSWORD": os.getenv("PIPELINE_TEST_DB_PWD"), - "HOST": "localhost", - "PORT": 3306, + "HOST": os.getenv("PIPELINE_TEST_DB_HOST", "localhost"), + "PORT": int(os.getenv("PIPELINE_TEST_DB_PORT", 3306)), "TEST": {"CHARSET": "utf8", "COLLATION": "utf8_general_ci"}, } } @@ -160,7 +162,7 @@ BROKER_VHOST = "/" -BROKER_URL = "amqp://guest:guest@localhost:5672//" +BROKER_URL = os.getenv("PIPELINE_TEST_BROKER_URL", "amqp://guest:guest@localhost:5672//") # BROKER_URL = "redis://localhost:6379/0" diff --git a/runtime/bamboo-pipeline/test/pipeline_test_use/components/collections/atom.py b/runtime/bamboo-pipeline/test/pipeline_test_use/components/collections/atom.py index b2ab9dc2..08ce02c0 100755 --- a/runtime/bamboo-pipeline/test/pipeline_test_use/components/collections/atom.py +++ b/runtime/bamboo-pipeline/test/pipeline_test_use/components/collections/atom.py @@ -34,14 +34,30 @@ def recorder(self, hook: HookType, data, parent_data, callback_data=None): logger.info("hook_debug_node hook(%s) output data %s ", hook.value, pprint.pformat(data.outputs)) return True + def pre_execute(self, data, parent_data): + """节点执行前""" + return self.recorder(HookType.PRE_EXECUTE, data, parent_data) + def execute(self, data, parent_data): self.recorder(hook=HookType.EXECUTE, data=data, parent_data=parent_data) return super().execute(data, parent_data) + def post_execute(self, data, parent_data): + """节点执行后""" + return self.recorder(HookType.POST_EXECUTE, data, parent_data) + + def pre_schedule(self, data, parent_data): + """节点调度前""" + return self.recorder(HookType.PRE_SCHEDULE, data, parent_data) + def schedule(self, data, parent_data, callback_data=None): self.recorder(hook=HookType.SCHEDULE, data=data, parent_data=parent_data, callback_data=callback_data) return super().schedule(data, parent_data, callback_data) + def post_schedule(self, data, parent_data): + """节点调度后""" + return self.recorder(HookType.POST_SCHEDULE, data, parent_data) + def pre_resume_node(self, data, parent_data): """节点继续操作前""" return self.recorder(HookType.PRE_RESUME_NODE, data, parent_data)