Skip to content

Commit

Permalink
Add tests to confirm that a mis-configured MultiPortModulesStage will…
Browse files Browse the repository at this point in the history
… raise an exception rather than segfaulting (#1829)

* MRC PR nv-morpheus/MRC#402 fixed this issue, this PR adds some unittests to ensure this.
* Update `Pipeline.build_and_start` to re-raise any exceptions raised during build, this prevents `Pipeline.join` from being called which avoids a failed assert in `join` since the expected future was not created.

Closes #953 

## By Submitting this PR I confirm:
- I am familiar with the [Contributing Guidelines](https://github.com/nv-morpheus/Morpheus/blob/main/docs/source/developer_guide/contributing.md).
- When the PR is ready for review, new or existing tests cover these changes.
- When the PR is ready for review, the documentation is up to date with these changes.

Authors:
  - David Gardner (https://github.com/dagardner-nv)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: #1829
  • Loading branch information
dagardner-nv authored Aug 2, 2024
1 parent 179afcd commit ad9249c
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 18 deletions.
2 changes: 1 addition & 1 deletion morpheus/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ async def build_and_start(self):
self.build()
except Exception:
logger.exception("Error occurred during Pipeline.build(). Exiting.", exc_info=True)
return
raise

await self._start()

Expand Down
62 changes: 46 additions & 16 deletions tests/test_multi_port_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,45 @@

import pytest

import cudf

# When segment modules are imported, they're added to the module registry.
# To avoid flake8 warnings about unused code, the noqa flag is used during import.
import modules.multiplexer # noqa: F401 # pylint: disable=unused-import
from _utils.dataset_manager import DatasetManager
from morpheus.config import Config
from morpheus.pipeline.pipeline import Pipeline
from morpheus.stages.general.multi_port_modules_stage import MultiPortModulesStage
from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage
from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage


def _run_pipeline(config: Config, source_df: cudf.DataFrame, module_conf: dict,
stage_input_ports: list[str]) -> InMemorySinkStage:
pipe = Pipeline(config)

mux_stage = pipe.add_stage(
MultiPortModulesStage(config, module_conf, input_ports=stage_input_ports, output_ports=["output"]))

for x in range(len(stage_input_ports)):
source_stage = pipe.add_stage(InMemorySourceStage(config, [source_df.copy(deep=True)]))
pipe.add_edge(source_stage, mux_stage.input_ports[x])

sink_stage = pipe.add_stage(InMemorySinkStage(config))

pipe.add_edge(mux_stage, sink_stage)

pipe.run()

return sink_stage


@pytest.mark.parametrize("source_count, expected_count", [(1, 1), (2, 2), (3, 3)])
def test_multi_port_pipeline(config, dataset_cudf: DatasetManager, source_count, expected_count):
def test_multi_port_pipeline(config: Config, dataset_cudf: DatasetManager, source_count, expected_count):

filter_probs_df = dataset_cudf["filter_probs.csv"]

pipe = Pipeline(config)

input_ports = []
for x in range(source_count):
input_port = f"input_{x}"
input_ports.append(input_port)
input_ports = [f"input_{x}" for x in range(source_count)]

multiplexer_module_conf = {
"module_id": "multiplexer",
Expand All @@ -46,17 +64,29 @@ def test_multi_port_pipeline(config, dataset_cudf: DatasetManager, source_count,
"output_port": "output"
}

mux_stage = pipe.add_stage(
MultiPortModulesStage(config, multiplexer_module_conf, input_ports=input_ports, output_ports=["output"]))
sink_stage = _run_pipeline(config=config,
source_df=filter_probs_df,
module_conf=multiplexer_module_conf,
stage_input_ports=input_ports)
assert len(sink_stage.get_messages()) == expected_count

for x in range(source_count):
source_stage = pipe.add_stage(InMemorySourceStage(config, [filter_probs_df]))
pipe.add_edge(source_stage, mux_stage.input_ports[x])

sink_stage = pipe.add_stage(InMemorySinkStage(config))
def test_multi_port_pipeline_mis_config(config: Config, dataset_cudf: DatasetManager):
config_input_ports = ["input_0", "input_1"]
stage_input_ports = ["input_0", "input_1", "input_2"]

pipe.add_edge(mux_stage, sink_stage)
filter_probs_df = dataset_cudf["filter_probs.csv"]

pipe.run()
multiplexer_module_conf = {
"module_id": "multiplexer",
"namespace": "morpheus_test",
"module_name": "multiplexer",
"input_ports": config_input_ports,
"output_port": "output"
}

assert len(sink_stage.get_messages()) == expected_count
with pytest.raises(ValueError):
_run_pipeline(config=config,
source_df=filter_probs_df,
module_conf=multiplexer_module_conf,
stage_input_ports=stage_input_ports)
1 change: 0 additions & 1 deletion tests/test_multi_segment.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ def test_linear_boundary_stages(config, filter_probs_df):
assert_results(comp_stage.get_results())


@pytest.mark.skip(reason="Skipping due to MRC issue #360")
@pytest.mark.use_cudf
def test_multi_segment_bad_data_type(config, filter_probs_df):
with pytest.raises(RuntimeError):
Expand Down

0 comments on commit ad9249c

Please sign in to comment.