diff --git a/ci/iwyu/mappings.imp b/ci/iwyu/mappings.imp index 72ec52aa2..2d4875e21 100644 --- a/ci/iwyu/mappings.imp +++ b/ci/iwyu/mappings.imp @@ -48,7 +48,7 @@ { "include": [ "", private, "", "public" ] }, # pybind11 -{ "include": [ "", "private", "", "public" ] }, +{ "include": [ "@", "private", "", "public" ] }, { "include": [ "", "private", "", "public" ] }, # rxcpp diff --git a/python/morpheus/morpheus/_lib/messages/module.cpp b/python/morpheus/morpheus/_lib/messages/module.cpp index 961c3187e..fed31a6d1 100644 --- a/python/morpheus/morpheus/_lib/messages/module.cpp +++ b/python/morpheus/morpheus/_lib/messages/module.cpp @@ -15,8 +15,6 @@ * limitations under the License. */ -#include "pymrc/utilities/object_wrappers.hpp" - #include "morpheus/io/data_loader_registry.hpp" #include "morpheus/messages/control.hpp" #include "morpheus/messages/memory/inference_memory.hpp" @@ -43,6 +41,7 @@ #include // IWYU pragma: keep #include // IWYU pragma: keep #include +#include #include // for pymrc::import #include diff --git a/python/morpheus/morpheus/_lib/src/stages/file_source.cpp b/python/morpheus/morpheus/_lib/src/stages/file_source.cpp index c3dce3369..82532b9af 100644 --- a/python/morpheus/morpheus/_lib/src/stages/file_source.cpp +++ b/python/morpheus/morpheus/_lib/src/stages/file_source.cpp @@ -17,9 +17,6 @@ #include "morpheus/stages/file_source.hpp" -#include "mrc/segment/object.hpp" -#include "pymrc/node.hpp" - #include "morpheus/io/deserializers.hpp" #include "morpheus/objects/file_types.hpp" #include "morpheus/objects/table_info.hpp" @@ -29,10 +26,12 @@ #include #include #include +#include #include // IWYU pragma: keep #include #include // for str_attr_accessor #include // for pybind11::int_ +#include #include #include diff --git a/python/morpheus/morpheus/_lib/src/stages/kafka_source.cpp b/python/morpheus/morpheus/_lib/src/stages/kafka_source.cpp index 1bb6ea369..2aa02a259 100644 --- a/python/morpheus/morpheus/_lib/src/stages/kafka_source.cpp +++ b/python/morpheus/morpheus/_lib/src/stages/kafka_source.cpp @@ -17,9 +17,6 @@ #include "morpheus/stages/kafka_source.hpp" -#include "mrc/segment/object.hpp" -#include "pymrc/utilities/function_wrappers.hpp" // for PyFuncWrapper - #include "morpheus/messages/meta.hpp" #include "morpheus/utilities/stage_util.hpp" #include "morpheus/utilities/string_util.hpp" @@ -31,11 +28,13 @@ #include #include #include +#include #include // for SharedFuture #include #include #include #include +#include // for PyFuncWrapper #include // for find, min, transform #include diff --git a/python/morpheus/morpheus/_lib/src/stages/write_to_file.cpp b/python/morpheus/morpheus/_lib/src/stages/write_to_file.cpp index 327c09df8..75157cf17 100644 --- a/python/morpheus/morpheus/_lib/src/stages/write_to_file.cpp +++ b/python/morpheus/morpheus/_lib/src/stages/write_to_file.cpp @@ -17,13 +17,13 @@ #include "morpheus/stages/write_to_file.hpp" // IWYU pragma: associated -#include "mrc/segment/builder.hpp" -#include "mrc/segment/object.hpp" -#include "pymrc/node.hpp" - #include "morpheus/io/serializers.hpp" #include "morpheus/utilities/string_util.hpp" +#include +#include +#include + #include #include #include diff --git a/python/morpheus/morpheus/_lib/stages/module.cpp b/python/morpheus/morpheus/_lib/stages/module.cpp index f1bae4a08..6fb855e0c 100644 --- a/python/morpheus/morpheus/_lib/stages/module.cpp +++ b/python/morpheus/morpheus/_lib/stages/module.cpp @@ -38,8 +38,10 @@ #include // for Object, ObjectProperties #include // for MRC_CONCAT_STR #include // for multiple_inheritance +#include // IWYU pragma: keep #include // for arg, init, class_, module_, overload_cast, overload_... #include // for none, dict, str_attr +#include // IWYU pragma: keep #include // IWYU pragma: keep #include // for from_import, import #include // for trace_activity, decay_t diff --git a/python/morpheus/morpheus/stages/general/router_stage.py b/python/morpheus/morpheus/stages/general/router_stage.py new file mode 100644 index 000000000..d6be2a93e --- /dev/null +++ b/python/morpheus/morpheus/stages/general/router_stage.py @@ -0,0 +1,108 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import typing + +import mrc +import mrc.core.segment + +import morpheus.pipeline as _pipeline # pylint: disable=cyclic-import +from morpheus.cli.register_stage import register_stage +from morpheus.config import Config +from morpheus.pipeline.execution_mode_mixins import GpuAndCpuMixin + +logger = logging.getLogger(__name__) + + +@register_stage("router") +class RouterStage(GpuAndCpuMixin, _pipeline.Stage): + """ + Buffer results. + + The input messages are buffered by this stage class for faster access to downstream stages. Allows upstream stages + to run faster than downstream stages. + + Parameters + ---------- + c : `morpheus.config.Config` + Pipeline configuration instance. + keys : `list[str]` + List of keys to route the messages. + key_fn : `typing.Callable[[object], str]` + Function to determine the key for the message. The function should take a message as input and return a key. The + key should be one of the keys in the `keys` list. + processing_engines : `int` + Number of processing engines to use for the router. If set to 0, the router will use the thread from the + upstream node for processing. In this situation, slow downstream nodes can block which can prevent routing to + other, non-blocked downstream nodes. To resolve this, set the `processing_engines` parameter to a value greater + than 0. This will create separate engines (similar to a thread) which can continue routing even if one gets + blocked. Higher values of `processing_engines` can prevent blocking at the expense of additional threads. + + """ + + def __init__(self, + c: Config, + *, + keys: list[str], + key_fn: typing.Callable[[object], str], + processing_engines: int = 0) -> None: + super().__init__(c) + + self._keys = keys + self._key_fn = key_fn + self._processing_engines = processing_engines + + if (self._processing_engines < 0): + raise ValueError("Invalid number of processing engines. Must be greater than or equal to 0.") + + if (len(keys) == 0): + raise ValueError("Router stage must have at least one key.") + + self._router: mrc.core.segment.SegmentObject | None = None + + self._create_ports(1, len(keys)) + + @property + def name(self) -> str: + return "router" + + def supports_cpp_node(self): + return True + + def compute_schema(self, schema: _pipeline.StageSchema): + + # Get the input type + input_type = schema.input_type + + for port_idx in range(len(self._keys)): + schema.output_schemas[port_idx].set_type(input_type) + + def _build(self, builder: mrc.Builder, input_nodes: list[mrc.SegmentObject]) -> list[mrc.SegmentObject]: + + assert len(input_nodes) == 1, "Router stage should have exactly one input node" + + from mrc.core.node import Router + from mrc.core.node import RouterComponent + + if (self._processing_engines > 0): + self._router = Router(builder, self.unique_name, router_keys=self._keys, key_fn=self._key_fn) + + self._router.launch_options.engines_per_pe = self._processing_engines + else: + self._router = RouterComponent(builder, self.unique_name, router_keys=self._keys, key_fn=self._key_fn) + + builder.make_edge(input_nodes[0], self._router) + + return [self._router.get_child(k) for k in self._keys] diff --git a/python/morpheus_llm/morpheus_llm/_lib/llm/src/py_llm_lambda_node.cpp b/python/morpheus_llm/morpheus_llm/_lib/llm/src/py_llm_lambda_node.cpp index 2f4ce4b10..a06d14b00 100644 --- a/python/morpheus_llm/morpheus_llm/_lib/llm/src/py_llm_lambda_node.cpp +++ b/python/morpheus_llm/morpheus_llm/_lib/llm/src/py_llm_lambda_node.cpp @@ -19,7 +19,6 @@ #include "morpheus_llm/llm/llm_context.hpp" // for LLMContext #include "morpheus_llm/llm/llm_node_base.hpp" -#include "pymrc/coro.hpp" #include "morpheus/utilities/json_types.hpp" // for cast_from_json, cast_from_pyobject #include "morpheus/utilities/string_util.hpp" @@ -29,6 +28,7 @@ #include // for PyGILState_Check, gil_scoped_acquire, gil_scoped_release #include #include +#include #include // IWYU pragma: keep #include diff --git a/tests/morpheus/stages/test_router_stage_pipe.py b/tests/morpheus/stages/test_router_stage_pipe.py new file mode 100644 index 000000000..861bb44b6 --- /dev/null +++ b/tests/morpheus/stages/test_router_stage_pipe.py @@ -0,0 +1,127 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import threading + +import pytest + +from morpheus.config import ExecutionMode +from morpheus.messages import ControlMessage +from morpheus.messages import MessageMeta +from morpheus.pipeline import Pipeline +from morpheus.pipeline.stage_decorator import stage +from morpheus.stages.general.router_stage import RouterStage +from morpheus.stages.input.in_memory_data_generation_stage import InMemoryDataGenStage +from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage +from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage +from morpheus.stages.preprocess.deserialize_stage import DeserializeStage + + +@pytest.mark.parametrize("processing_engines", [0, 4]) +def test_router_stage_pipe(config, filter_probs_df, processing_engines: bool): + + keys = ["odd", "even"] + + count = 0 + + def determine_route_fn(_: ControlMessage): + nonlocal count + count += 1 + return keys[count % len(keys)] + + pipe = Pipeline(config) + source = pipe.add_stage(InMemorySourceStage(config, dataframes=[filter_probs_df], repeat=5)) + deserialize = pipe.add_stage(DeserializeStage(config)) + router_stage = pipe.add_stage( + RouterStage(config, keys=keys, key_fn=determine_route_fn, processing_engines=processing_engines)) + sink1 = pipe.add_stage(InMemorySinkStage(config)) + sink2 = pipe.add_stage(InMemorySinkStage(config)) + + # Connect the stages + pipe.add_edge(source, deserialize) + pipe.add_edge(deserialize, router_stage) + pipe.add_edge(router_stage.output_ports[0], sink1) + pipe.add_edge(router_stage.output_ports[1], sink2) + + pipe.run() + + assert len(sink1.get_messages()) == 2, "Expected 2 messages in sink1" + assert len(sink2.get_messages()) == 3, "Expected 3 messages in sink2" + + +def test_router_stage_backpressure_pipe(config, filter_probs_df): + + # This test simulates a slow single consumer by blocking the second output port of the router stage The router stage + # will buffer the messages and block the source stage from sending more data When run as a component, less threads + # will be used but this system will eventually block. With a runnable, this should be able to run to completion + + # Set the edge buffer size to trigger blocking + config.edge_buffer_size = 4 + + keys = ["odd", "even"] + + count = 0 + + release_event = threading.Event() + + def source_fn(): + + for i in range(20): + cm = ControlMessage() + cm.set_metadata("index", i) + cm.payload(MessageMeta(filter_probs_df)) + yield cm + + # Release the event to allow the pipeline to continue + release_event.set() + + # Send more data + for i in range(20, 30): + cm = ControlMessage() + cm.set_metadata("index", i) + cm.payload(MessageMeta(filter_probs_df)) + yield cm + + def determine_route_fn(_: ControlMessage): + nonlocal count + count += 1 + return keys[count % len(keys)] + + pipe = Pipeline(config) + + source = pipe.add_stage(InMemoryDataGenStage(config, data_source=source_fn, output_data_type=ControlMessage)) + router_stage = pipe.add_stage(RouterStage(config, keys=keys, key_fn=determine_route_fn, processing_engines=10)) + sink1 = pipe.add_stage(InMemorySinkStage(config)) + sink2 = pipe.add_stage(InMemorySinkStage(config)) + + @stage(execution_modes=[ExecutionMode.CPU, ExecutionMode.GPU]) + def blocking_stage(data: ControlMessage) -> ControlMessage: + + release_event.wait() + + return data + + blocking = pipe.add_stage(blocking_stage(config)) + + # Connect the stages + pipe.add_edge(source, router_stage) + pipe.add_edge(router_stage.output_ports[0], sink1) + pipe.add_edge(router_stage.output_ports[1], blocking) + pipe.add_edge(blocking, sink2) + + pipe.run() + + assert len(sink1.get_messages()) == 15, "Expected 15 messages in sink1" + assert len(sink2.get_messages()) == 15, "Expected 15 messages in sink2"