Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tests to confirm that a mis-configured MultiPortModulesStage will raise an exception rather than segfaulting #1829

Merged
Merged
2 changes: 1 addition & 1 deletion morpheus/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ async def build_and_start(self):
self.build()
except Exception:
logger.exception("Error occurred during Pipeline.build(). Exiting.", exc_info=True)
return
raise

await self._start()

Expand Down
62 changes: 46 additions & 16 deletions tests/test_multi_port_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,45 @@

import pytest

import cudf

# When segment modules are imported, they're added to the module registry.
# To avoid flake8 warnings about unused code, the noqa flag is used during import.
import modules.multiplexer # noqa: F401 # pylint: disable=unused-import
from _utils.dataset_manager import DatasetManager
from morpheus.config import Config
from morpheus.pipeline.pipeline import Pipeline
from morpheus.stages.general.multi_port_modules_stage import MultiPortModulesStage
from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage
from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage


def _run_pipeline(config: Config, source_df: cudf.DataFrame, module_conf: dict,
stage_input_ports: list[str]) -> InMemorySinkStage:
pipe = Pipeline(config)

mux_stage = pipe.add_stage(
MultiPortModulesStage(config, module_conf, input_ports=stage_input_ports, output_ports=["output"]))

for x in range(len(stage_input_ports)):
source_stage = pipe.add_stage(InMemorySourceStage(config, [source_df.copy(deep=True)]))
pipe.add_edge(source_stage, mux_stage.input_ports[x])

sink_stage = pipe.add_stage(InMemorySinkStage(config))

pipe.add_edge(mux_stage, sink_stage)

pipe.run()

return sink_stage


@pytest.mark.parametrize("source_count, expected_count", [(1, 1), (2, 2), (3, 3)])
def test_multi_port_pipeline(config, dataset_cudf: DatasetManager, source_count, expected_count):
def test_multi_port_pipeline(config: Config, dataset_cudf: DatasetManager, source_count, expected_count):

filter_probs_df = dataset_cudf["filter_probs.csv"]

pipe = Pipeline(config)

input_ports = []
for x in range(source_count):
input_port = f"input_{x}"
input_ports.append(input_port)
input_ports = [f"input_{x}" for x in range(source_count)]

multiplexer_module_conf = {
"module_id": "multiplexer",
Expand All @@ -46,17 +64,29 @@ def test_multi_port_pipeline(config, dataset_cudf: DatasetManager, source_count,
"output_port": "output"
}

mux_stage = pipe.add_stage(
MultiPortModulesStage(config, multiplexer_module_conf, input_ports=input_ports, output_ports=["output"]))
sink_stage = _run_pipeline(config=config,
source_df=filter_probs_df,
module_conf=multiplexer_module_conf,
stage_input_ports=input_ports)
assert len(sink_stage.get_messages()) == expected_count

for x in range(source_count):
source_stage = pipe.add_stage(InMemorySourceStage(config, [filter_probs_df]))
pipe.add_edge(source_stage, mux_stage.input_ports[x])

sink_stage = pipe.add_stage(InMemorySinkStage(config))
def test_multi_port_pipeline_mis_config(config: Config, dataset_cudf: DatasetManager):
config_input_ports = ["input_0", "input_1"]
stage_input_ports = ["input_0", "input_1", "input_2"]

pipe.add_edge(mux_stage, sink_stage)
filter_probs_df = dataset_cudf["filter_probs.csv"]

pipe.run()
multiplexer_module_conf = {
"module_id": "multiplexer",
"namespace": "morpheus_test",
"module_name": "multiplexer",
"input_ports": config_input_ports,
"output_port": "output"
}

assert len(sink_stage.get_messages()) == expected_count
with pytest.raises(ValueError):
_run_pipeline(config=config,
source_df=filter_probs_df,
module_conf=multiplexer_module_conf,
stage_input_ports=stage_input_ports)
1 change: 0 additions & 1 deletion tests/test_multi_segment.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ def test_linear_boundary_stages(config, filter_probs_df):
assert_results(comp_stage.get_results())


@pytest.mark.skip(reason="Skipping due to MRC issue #360")
@pytest.mark.use_cudf
def test_multi_segment_bad_data_type(config, filter_probs_df):
with pytest.raises(RuntimeError):
Expand Down
Loading