Skip to content

Commit

Permalink
Adding implementation of Router Nodes (#1963)
Browse files Browse the repository at this point in the history
Requires changes from MRC PR: nv-morpheus/MRC#502

Closes #1519

Authors:
  - Michael Demoret (https://github.com/mdemoret-nv)

Approvers:
  - David Gardner (https://github.com/dagardner-nv)

URL: #1963
  • Loading branch information
mdemoret-nv authored Oct 31, 2024
1 parent f5d8191 commit 8e29e9f
Show file tree
Hide file tree
Showing 9 changed files with 248 additions and 14 deletions.
2 changes: 1 addition & 1 deletion ci/iwyu/mappings.imp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
{ "include": [ "<google/protobuf/repeated_ptr_field.h>", private, "<google/protobuf/repeated_field.h>", "public" ] },

# pybind11
{ "include": [ "<pybind11/detail/common.h>", "private", "<pybind11/pytypes.h>", "public" ] },
{ "include": [ "@<pybind11/detail/.*>", "private", "<pybind11/pybind11.h>", "public" ] },
{ "include": [ "<pybind11/cast.h>", "private", "<pybind11/pybind11.h>", "public" ] },

# rxcpp
Expand Down
3 changes: 1 addition & 2 deletions python/morpheus/morpheus/_lib/messages/module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
* limitations under the License.
*/

#include "pymrc/utilities/object_wrappers.hpp"

#include "morpheus/io/data_loader_registry.hpp"
#include "morpheus/messages/control.hpp"
#include "morpheus/messages/memory/inference_memory.hpp"
Expand All @@ -43,6 +41,7 @@
#include <pybind11/stl.h> // IWYU pragma: keep
#include <pymrc/node.hpp> // IWYU pragma: keep
#include <pymrc/port_builders.hpp>
#include <pymrc/utilities/object_wrappers.hpp>
#include <pymrc/utils.hpp> // for pymrc::import
#include <rxcpp/rx.hpp>

Expand Down
5 changes: 2 additions & 3 deletions python/morpheus/morpheus/_lib/src/stages/file_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@

#include "morpheus/stages/file_source.hpp"

#include "mrc/segment/object.hpp"
#include "pymrc/node.hpp"

#include "morpheus/io/deserializers.hpp"
#include "morpheus/objects/file_types.hpp"
#include "morpheus/objects/table_info.hpp"
Expand All @@ -29,10 +26,12 @@
#include <cudf/types.hpp>
#include <glog/logging.h>
#include <mrc/segment/builder.hpp>
#include <mrc/segment/object.hpp>
#include <pybind11/cast.h> // IWYU pragma: keep
#include <pybind11/gil.h>
#include <pybind11/pybind11.h> // for str_attr_accessor
#include <pybind11/pytypes.h> // for pybind11::int_
#include <pymrc/node.hpp>

#include <filesystem>
#include <memory>
Expand Down
5 changes: 2 additions & 3 deletions python/morpheus/morpheus/_lib/src/stages/kafka_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@

#include "morpheus/stages/kafka_source.hpp"

#include "mrc/segment/object.hpp"
#include "pymrc/utilities/function_wrappers.hpp" // for PyFuncWrapper

#include "morpheus/messages/meta.hpp"
#include "morpheus/utilities/stage_util.hpp"
#include "morpheus/utilities/string_util.hpp"
Expand All @@ -31,11 +28,13 @@
#include <librdkafka/rdkafkacpp.h>
#include <mrc/runnable/context.hpp>
#include <mrc/segment/builder.hpp>
#include <mrc/segment/object.hpp>
#include <mrc/types.hpp> // for SharedFuture
#include <nlohmann/json.hpp>
#include <pybind11/pybind11.h>
#include <pybind11/pytypes.h>
#include <pymrc/node.hpp>
#include <pymrc/utilities/function_wrappers.hpp> // for PyFuncWrapper

#include <algorithm> // for find, min, transform
#include <chrono>
Expand Down
8 changes: 4 additions & 4 deletions python/morpheus/morpheus/_lib/src/stages/write_to_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@

#include "morpheus/stages/write_to_file.hpp" // IWYU pragma: associated

#include "mrc/segment/builder.hpp"
#include "mrc/segment/object.hpp"
#include "pymrc/node.hpp"

#include "morpheus/io/serializers.hpp"
#include "morpheus/utilities/string_util.hpp"

#include <mrc/segment/builder.hpp>
#include <mrc/segment/object.hpp>
#include <pymrc/node.hpp>

#include <exception>
#include <memory>
#include <sstream>
Expand Down
2 changes: 2 additions & 0 deletions python/morpheus/morpheus/_lib/stages/module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@
#include <mrc/segment/object.hpp> // for Object, ObjectProperties
#include <mrc/utils/string_utils.hpp> // for MRC_CONCAT_STR
#include <pybind11/attr.h> // for multiple_inheritance
#include <pybind11/functional.h> // IWYU pragma: keep
#include <pybind11/pybind11.h> // for arg, init, class_, module_, overload_cast, overload_...
#include <pybind11/pytypes.h> // for none, dict, str_attr
#include <pybind11/stl.h> // IWYU pragma: keep
#include <pybind11/stl/filesystem.h> // IWYU pragma: keep
#include <pymrc/utils.hpp> // for from_import, import
#include <rxcpp/rx.hpp> // for trace_activity, decay_t
Expand Down
108 changes: 108 additions & 0 deletions python/morpheus/morpheus/stages/general/router_stage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# Copyright (c) 2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import typing

import mrc
import mrc.core.segment

import morpheus.pipeline as _pipeline # pylint: disable=cyclic-import
from morpheus.cli.register_stage import register_stage
from morpheus.config import Config
from morpheus.pipeline.execution_mode_mixins import GpuAndCpuMixin

logger = logging.getLogger(__name__)


@register_stage("router")
class RouterStage(GpuAndCpuMixin, _pipeline.Stage):
"""
Buffer results.
The input messages are buffered by this stage class for faster access to downstream stages. Allows upstream stages
to run faster than downstream stages.
Parameters
----------
c : `morpheus.config.Config`
Pipeline configuration instance.
keys : `list[str]`
List of keys to route the messages.
key_fn : `typing.Callable[[object], str]`
Function to determine the key for the message. The function should take a message as input and return a key. The
key should be one of the keys in the `keys` list.
processing_engines : `int`
Number of processing engines to use for the router. If set to 0, the router will use the thread from the
upstream node for processing. In this situation, slow downstream nodes can block which can prevent routing to
other, non-blocked downstream nodes. To resolve this, set the `processing_engines` parameter to a value greater
than 0. This will create separate engines (similar to a thread) which can continue routing even if one gets
blocked. Higher values of `processing_engines` can prevent blocking at the expense of additional threads.
"""

def __init__(self,
c: Config,
*,
keys: list[str],
key_fn: typing.Callable[[object], str],
processing_engines: int = 0) -> None:
super().__init__(c)

self._keys = keys
self._key_fn = key_fn
self._processing_engines = processing_engines

if (self._processing_engines < 0):
raise ValueError("Invalid number of processing engines. Must be greater than or equal to 0.")

if (len(keys) == 0):
raise ValueError("Router stage must have at least one key.")

self._router: mrc.core.segment.SegmentObject | None = None

self._create_ports(1, len(keys))

@property
def name(self) -> str:
return "router"

def supports_cpp_node(self):
return True

def compute_schema(self, schema: _pipeline.StageSchema):

# Get the input type
input_type = schema.input_type

for port_idx in range(len(self._keys)):
schema.output_schemas[port_idx].set_type(input_type)

def _build(self, builder: mrc.Builder, input_nodes: list[mrc.SegmentObject]) -> list[mrc.SegmentObject]:

assert len(input_nodes) == 1, "Router stage should have exactly one input node"

from mrc.core.node import Router
from mrc.core.node import RouterComponent

if (self._processing_engines > 0):
self._router = Router(builder, self.unique_name, router_keys=self._keys, key_fn=self._key_fn)

self._router.launch_options.engines_per_pe = self._processing_engines
else:
self._router = RouterComponent(builder, self.unique_name, router_keys=self._keys, key_fn=self._key_fn)

builder.make_edge(input_nodes[0], self._router)

return [self._router.get_child(k) for k in self._keys]
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

#include "morpheus_llm/llm/llm_context.hpp" // for LLMContext
#include "morpheus_llm/llm/llm_node_base.hpp"
#include "pymrc/coro.hpp"

#include "morpheus/utilities/json_types.hpp" // for cast_from_json, cast_from_pyobject
#include "morpheus/utilities/string_util.hpp"
Expand All @@ -29,6 +28,7 @@
#include <pybind11/gil.h> // for PyGILState_Check, gil_scoped_acquire, gil_scoped_release
#include <pybind11/pybind11.h>
#include <pybind11/stl.h>
#include <pymrc/coro.hpp>
#include <pymrc/coro.hpp> // IWYU pragma: keep
#include <pymrc/types.hpp>

Expand Down
127 changes: 127 additions & 0 deletions tests/morpheus/stages/test_router_stage_pipe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import threading

import pytest

from morpheus.config import ExecutionMode
from morpheus.messages import ControlMessage
from morpheus.messages import MessageMeta
from morpheus.pipeline import Pipeline
from morpheus.pipeline.stage_decorator import stage
from morpheus.stages.general.router_stage import RouterStage
from morpheus.stages.input.in_memory_data_generation_stage import InMemoryDataGenStage
from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage
from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage
from morpheus.stages.preprocess.deserialize_stage import DeserializeStage


@pytest.mark.parametrize("processing_engines", [0, 4])
def test_router_stage_pipe(config, filter_probs_df, processing_engines: bool):

keys = ["odd", "even"]

count = 0

def determine_route_fn(_: ControlMessage):
nonlocal count
count += 1
return keys[count % len(keys)]

pipe = Pipeline(config)
source = pipe.add_stage(InMemorySourceStage(config, dataframes=[filter_probs_df], repeat=5))
deserialize = pipe.add_stage(DeserializeStage(config))
router_stage = pipe.add_stage(
RouterStage(config, keys=keys, key_fn=determine_route_fn, processing_engines=processing_engines))
sink1 = pipe.add_stage(InMemorySinkStage(config))
sink2 = pipe.add_stage(InMemorySinkStage(config))

# Connect the stages
pipe.add_edge(source, deserialize)
pipe.add_edge(deserialize, router_stage)
pipe.add_edge(router_stage.output_ports[0], sink1)
pipe.add_edge(router_stage.output_ports[1], sink2)

pipe.run()

assert len(sink1.get_messages()) == 2, "Expected 2 messages in sink1"
assert len(sink2.get_messages()) == 3, "Expected 3 messages in sink2"


def test_router_stage_backpressure_pipe(config, filter_probs_df):

# This test simulates a slow single consumer by blocking the second output port of the router stage The router stage
# will buffer the messages and block the source stage from sending more data When run as a component, less threads
# will be used but this system will eventually block. With a runnable, this should be able to run to completion

# Set the edge buffer size to trigger blocking
config.edge_buffer_size = 4

keys = ["odd", "even"]

count = 0

release_event = threading.Event()

def source_fn():

for i in range(20):
cm = ControlMessage()
cm.set_metadata("index", i)
cm.payload(MessageMeta(filter_probs_df))
yield cm

# Release the event to allow the pipeline to continue
release_event.set()

# Send more data
for i in range(20, 30):
cm = ControlMessage()
cm.set_metadata("index", i)
cm.payload(MessageMeta(filter_probs_df))
yield cm

def determine_route_fn(_: ControlMessage):
nonlocal count
count += 1
return keys[count % len(keys)]

pipe = Pipeline(config)

source = pipe.add_stage(InMemoryDataGenStage(config, data_source=source_fn, output_data_type=ControlMessage))
router_stage = pipe.add_stage(RouterStage(config, keys=keys, key_fn=determine_route_fn, processing_engines=10))
sink1 = pipe.add_stage(InMemorySinkStage(config))
sink2 = pipe.add_stage(InMemorySinkStage(config))

@stage(execution_modes=[ExecutionMode.CPU, ExecutionMode.GPU])
def blocking_stage(data: ControlMessage) -> ControlMessage:

release_event.wait()

return data

blocking = pipe.add_stage(blocking_stage(config))

# Connect the stages
pipe.add_edge(source, router_stage)
pipe.add_edge(router_stage.output_ports[0], sink1)
pipe.add_edge(router_stage.output_ports[1], blocking)
pipe.add_edge(blocking, sink2)

pipe.run()

assert len(sink1.get_messages()) == 15, "Expected 15 messages in sink1"
assert len(sink2.get_messages()) == 15, "Expected 15 messages in sink2"

0 comments on commit 8e29e9f

Please sign in to comment.