From ef069721706595442dcf34d38b61acac0bd885db Mon Sep 17 00:00:00 2001 From: David Gardner Date: Wed, 2 Aug 2023 10:54:44 -0700 Subject: [PATCH 01/29] Don't skip this test as its perfectly valid and passes, the referenced issue isn't relevent to this test --- cpp/mrc/tests/test_pipeline.cpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/cpp/mrc/tests/test_pipeline.cpp b/cpp/mrc/tests/test_pipeline.cpp index c34731302..196ecfbc4 100644 --- a/cpp/mrc/tests/test_pipeline.cpp +++ b/cpp/mrc/tests/test_pipeline.cpp @@ -105,8 +105,6 @@ TEST_F(TestPipeline, DuplicateSegments) TEST_F(TestPipeline, TwoSegment) { - GTEST_SKIP() << "#185"; - std::atomic next_count = 0; std::atomic complete_count = 0; From 79d144b8b51b984518af3bb37d627646922f2c44 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Wed, 2 Aug 2023 11:02:51 -0700 Subject: [PATCH 02/29] Test for issue #360 --- cpp/mrc/tests/test_pipeline.cpp | 50 +++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/cpp/mrc/tests/test_pipeline.cpp b/cpp/mrc/tests/test_pipeline.cpp index 196ecfbc4..c0db4308f 100644 --- a/cpp/mrc/tests/test_pipeline.cpp +++ b/cpp/mrc/tests/test_pipeline.cpp @@ -36,6 +36,7 @@ #include #include #include +#include #include #include #include @@ -160,6 +161,55 @@ TEST_F(TestPipeline, TwoSegment) LOG(INFO) << "Done" << std::endl; } +TEST_F(TestPipeline, InconsistentPipeline) +{ + // Test to reproduce issue #360 + auto pipeline = mrc::make_pipeline(); + + auto seg_1 = + pipeline->make_segment("seg_1", segment::EgressPorts({"float_port"}), [](segment::IBuilder& seg) { + auto rx_source = seg.make_source("rx_source", [](rxcpp::subscriber s) { + LOG(INFO) << "emit 1"; + s.on_next(1.0F); + LOG(INFO) << "emit 2"; + s.on_next(2.0F); + LOG(INFO) << "emit 3"; + s.on_next(3.0F); + LOG(INFO) << "issuing complete"; + s.on_completed(); + }); + + auto my_float_egress = seg.get_egress("float_port"); + + seg.make_edge(rx_source, my_float_egress); + }); + + auto seg_2 = + pipeline->make_segment("seg_2", segment::IngressPorts({"float_port"}), [&](segment::IBuilder& seg) { + auto my_float_ingress = seg.get_ingress("float_port"); + + auto rx_sink = seg.make_sink("rx_sink", + rxcpp::make_observer_dynamic( + [&](float x) { + DVLOG(1) << x << std::endl; + }, + [&]() { + DVLOG(1) << "Completed" << std::endl; + })); + + seg.make_edge(my_float_ingress, rx_sink); + throw std::runtime_error("Error in initializer"); + }); + + Executor exec(std::move(m_options)); + + exec.register_pipeline(std::move(pipeline)); + + exec.start(); + + EXPECT_THROW(exec.join(), std::runtime_error); +} + /* TEST_F(TestPipeline, TwoSegmentManualTag) { From 52968bf3d364f4ff2a32172339210d8413408d36 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Wed, 2 Aug 2023 11:05:42 -0700 Subject: [PATCH 03/29] Remove unneeded copy/paste code --- cpp/mrc/tests/test_pipeline.cpp | 44 +++++++++++++++------------------ 1 file changed, 20 insertions(+), 24 deletions(-) diff --git a/cpp/mrc/tests/test_pipeline.cpp b/cpp/mrc/tests/test_pipeline.cpp index c0db4308f..2a82b28ae 100644 --- a/cpp/mrc/tests/test_pipeline.cpp +++ b/cpp/mrc/tests/test_pipeline.cpp @@ -169,14 +169,7 @@ TEST_F(TestPipeline, InconsistentPipeline) auto seg_1 = pipeline->make_segment("seg_1", segment::EgressPorts({"float_port"}), [](segment::IBuilder& seg) { auto rx_source = seg.make_source("rx_source", [](rxcpp::subscriber s) { - LOG(INFO) << "emit 1"; - s.on_next(1.0F); - LOG(INFO) << "emit 2"; - s.on_next(2.0F); - LOG(INFO) << "emit 3"; - s.on_next(3.0F); - LOG(INFO) << "issuing complete"; - s.on_completed(); + FAIL() << "This should not be called"; }); auto my_float_egress = seg.get_egress("float_port"); @@ -184,22 +177,25 @@ TEST_F(TestPipeline, InconsistentPipeline) seg.make_edge(rx_source, my_float_egress); }); - auto seg_2 = - pipeline->make_segment("seg_2", segment::IngressPorts({"float_port"}), [&](segment::IBuilder& seg) { - auto my_float_ingress = seg.get_ingress("float_port"); - - auto rx_sink = seg.make_sink("rx_sink", - rxcpp::make_observer_dynamic( - [&](float x) { - DVLOG(1) << x << std::endl; - }, - [&]() { - DVLOG(1) << "Completed" << std::endl; - })); - - seg.make_edge(my_float_ingress, rx_sink); - throw std::runtime_error("Error in initializer"); - }); + auto seg_2 = pipeline->make_segment("seg_2", + segment::IngressPorts({"float_port"}), + [&](segment::IBuilder& seg) { + auto my_float_ingress = seg.get_ingress("float_port"); + + auto rx_sink = seg.make_sink("rx_sink", + rxcpp::make_observer_dynamic( + [&](float x) { + FAIL() << "This should not be " + "called"; + }, + [&]() { + FAIL() << "This should not be " + "called"; + })); + + seg.make_edge(my_float_ingress, rx_sink); + throw std::runtime_error("Error in initializer"); + }); Executor exec(std::move(m_options)); From 1178c029946e8f8a4c1941c4a11e78fe8ce58904 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Wed, 2 Aug 2023 11:26:53 -0700 Subject: [PATCH 04/29] Revert added objects and ingress/egress ports on error in segment init function --- cpp/mrc/src/internal/segment/builder_definition.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cpp/mrc/src/internal/segment/builder_definition.cpp b/cpp/mrc/src/internal/segment/builder_definition.cpp index e631c3f1e..6633aa868 100644 --- a/cpp/mrc/src/internal/segment/builder_definition.cpp +++ b/cpp/mrc/src/internal/segment/builder_definition.cpp @@ -284,6 +284,10 @@ void BuilderDefinition::initialize() << ", Segment Rank: " << m_rank << ". Exception message:\n" << e.what(); + m_objects.clear(); + m_ingress_ports.clear(); + m_egress_ports.clear(); + // Rethrow after logging std::rethrow_exception(std::current_exception()); } From ac2853c26582cf3189665250fea73312080fcf3b Mon Sep 17 00:00:00 2001 From: David Gardner Date: Wed, 2 Aug 2023 11:27:03 -0700 Subject: [PATCH 05/29] Better test name --- cpp/mrc/tests/test_pipeline.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/mrc/tests/test_pipeline.cpp b/cpp/mrc/tests/test_pipeline.cpp index 2a82b28ae..adeb36804 100644 --- a/cpp/mrc/tests/test_pipeline.cpp +++ b/cpp/mrc/tests/test_pipeline.cpp @@ -161,7 +161,7 @@ TEST_F(TestPipeline, TwoSegment) LOG(INFO) << "Done" << std::endl; } -TEST_F(TestPipeline, InconsistentPipeline) +TEST_F(TestPipeline, SegmentInitErrorHandling) { // Test to reproduce issue #360 auto pipeline = mrc::make_pipeline(); From 6a26e4f690e7f8fc4c8bf9082c5558095826e325 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Wed, 2 Aug 2023 11:43:02 -0700 Subject: [PATCH 06/29] wip --- external/utilities | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/utilities b/external/utilities index 899a46063..b3af2a350 160000 --- a/external/utilities +++ b/external/utilities @@ -1 +1 @@ -Subproject commit 899a4606388b6ad2e94662949cd65df8e8c9bdd0 +Subproject commit b3af2a3501b2357c3467b7abb295ae75151db186 From cccef95f54d5c6e6f773812f26bcdcb476f9f394 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Wed, 2 Aug 2023 12:35:25 -0700 Subject: [PATCH 07/29] Only run iwyu on compilation units actually changed in PR --- ci/scripts/cpp_checks.sh | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/ci/scripts/cpp_checks.sh b/ci/scripts/cpp_checks.sh index 416c92167..b83df0727 100755 --- a/ci/scripts/cpp_checks.sh +++ b/ci/scripts/cpp_checks.sh @@ -80,9 +80,22 @@ if [[ -n "${MRC_MODIFIED_FILES}" ]]; then # Include What You Use if [[ "${SKIP_IWYU}" == "" ]]; then - IWYU_DIRS="cpp python" + # Remove .h, .hpp, and .cu files from the modified list + shopt -s extglob + IWYU_MODIFIED_FILES=( "${MRC_MODIFIED_FILES[@]/*.@(h|hpp|cu)/}" ) + + # Get the list of compiled files relative to this directory + WORKING_PREFIX="${PWD}/" + COMPILED_FILES=( $(jq -r .[].file ${BUILD_DIR}/compile_commands.json | sort -u ) ) + COMPILED_FILES=( "${COMPILED_FILES[@]/#$WORKING_PREFIX/}" ) + COMBINED_FILES=("${COMPILED_FILES[@]}") + COMBINED_FILES+=("${IWYU_MODIFIED_FILES[@]}") + + # Find the intersection between compiled files and modified files + IWYU_MODIFIED_FILES=( $(printf '%s\0' "${COMBINED_FILES[@]}" | sort -z | uniq -d -z | xargs -0n1) ) + NUM_PROC=$(get_num_proc) - IWYU_OUTPUT=`${IWYU_TOOL} -p ${BUILD_DIR} -j ${NUM_PROC} ${IWYU_DIRS} 2>&1` + IWYU_OUTPUT=`${IWYU_TOOL} -p ${BUILD_DIR} -j ${NUM_PROC} ${IWYU_MODIFIED_FILES[@]} 2>&1` IWYU_RETVAL=$? fi else From 6914da7aa4ce4d1e1d8f85ee508d00c7f783e3e8 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Wed, 2 Aug 2023 12:58:35 -0700 Subject: [PATCH 08/29] Checks require clang --- ci/scripts/github/checks.sh | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/ci/scripts/github/checks.sh b/ci/scripts/github/checks.sh index 4ea5c5583..6a81e404d 100755 --- a/ci/scripts/github/checks.sh +++ b/ci/scripts/github/checks.sh @@ -24,7 +24,11 @@ update_conda_env rapids-logger "Configuring CMake" git submodule update --init --recursive -cmake -B build -G Ninja ${CMAKE_BUILD_ALL_FEATURES} . + +CMAKE_CACHE_FLAGS="-DCCACHE_PROGRAM_PATH=$(which sccache) -DMRC_USE_CCACHE=ON" +CMAKE_CLANG_OPTIONS="-DCMAKE_C_COMPILER:FILEPATH=$(which clang) -DCMAKE_CXX_COMPILER:FILEPATH=$(which clang++) -DCMAKE_CUDA_COMPILER:FILEPATH=$(which nvcc)" +CMAKE_FLAGS="${CMAKE_CLANG_OPTIONS} ${CMAKE_BUILD_ALL_FEATURES} ${CMAKE_CACHE_FLAGS}" +cmake -B build -G Ninja ${CMAKE_FLAGS} . rapids-logger "Building targets that generate source code" cmake --build build --target mrc_style_checks --parallel ${PARALLEL_LEVEL} From dbeeedc1d711a0acafff1d0fc32434bc42584f70 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Wed, 2 Aug 2023 13:12:44 -0700 Subject: [PATCH 09/29] Revert "Checks require clang" This reverts commit 6914da7aa4ce4d1e1d8f85ee508d00c7f783e3e8. --- ci/scripts/github/checks.sh | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/ci/scripts/github/checks.sh b/ci/scripts/github/checks.sh index 6a81e404d..4ea5c5583 100755 --- a/ci/scripts/github/checks.sh +++ b/ci/scripts/github/checks.sh @@ -24,11 +24,7 @@ update_conda_env rapids-logger "Configuring CMake" git submodule update --init --recursive - -CMAKE_CACHE_FLAGS="-DCCACHE_PROGRAM_PATH=$(which sccache) -DMRC_USE_CCACHE=ON" -CMAKE_CLANG_OPTIONS="-DCMAKE_C_COMPILER:FILEPATH=$(which clang) -DCMAKE_CXX_COMPILER:FILEPATH=$(which clang++) -DCMAKE_CUDA_COMPILER:FILEPATH=$(which nvcc)" -CMAKE_FLAGS="${CMAKE_CLANG_OPTIONS} ${CMAKE_BUILD_ALL_FEATURES} ${CMAKE_CACHE_FLAGS}" -cmake -B build -G Ninja ${CMAKE_FLAGS} . +cmake -B build -G Ninja ${CMAKE_BUILD_ALL_FEATURES} . rapids-logger "Building targets that generate source code" cmake --build build --target mrc_style_checks --parallel ${PARALLEL_LEVEL} From faf233c851e87575c9c3c6c0aa921900d308ab46 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Wed, 2 Aug 2023 14:12:49 -0700 Subject: [PATCH 10/29] test [no ci] --- ci/conda/environments/clang_env.yml | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/ci/conda/environments/clang_env.yml b/ci/conda/environments/clang_env.yml index 9c8867ae4..64873cb36 100644 --- a/ci/conda/environments/clang_env.yml +++ b/ci/conda/environments/clang_env.yml @@ -19,11 +19,11 @@ name: mrc channels: - conda-forge dependencies: - - clang=15 - - clang-tools=15 - - clangdev=15 - - clangxx=15 - - libclang=15 - - libclang-cpp=15 - - llvmdev=15 + - clang=15.0.6 + - clang-tools=15.0.6 + - clangdev=15.0.6 + - clangxx=15.0.6 + - libclang=15.0.6 + - libclang-cpp=15.0.6 + - llvmdev=15.0.6 - include-what-you-use=0.19 From 9fa907c25eb980095136048d3a6c92c504cf2e66 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Wed, 2 Aug 2023 14:26:28 -0700 Subject: [PATCH 11/29] Don't fetch base branch if it's already set --- ci/scripts/github/common.sh | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/ci/scripts/github/common.sh b/ci/scripts/github/common.sh index 02684da2f..667c55993 100644 --- a/ci/scripts/github/common.sh +++ b/ci/scripts/github/common.sh @@ -106,16 +106,18 @@ function update_conda_env() { print_env_vars function fetch_base_branch() { - rapids-logger "Retrieving base branch from GitHub API" - [[ -n "$GH_TOKEN" ]] && CURL_HEADERS=('-H' "Authorization: token ${GH_TOKEN}") - RESP=$( - curl -s \ - -H "Accept: application/vnd.github.v3+json" \ - "${CURL_HEADERS[@]}" \ - "${GITHUB_API_URL}/repos/${ORG_NAME}/${REPO_NAME}/pulls/${PR_NUM}" - ) - - BASE_BRANCH=$(echo "${RESP}" | jq -r '.base.ref') + if [[ "${BASE_BRANCH}" == "" ]]; then + rapids-logger "Retrieving base branch from GitHub API" + [[ -n "$GH_TOKEN" ]] && CURL_HEADERS=('-H' "Authorization: token ${GH_TOKEN}") + RESP=$( + curl -s \ + -H "Accept: application/vnd.github.v3+json" \ + "${CURL_HEADERS[@]}" \ + "${GITHUB_API_URL}/repos/${ORG_NAME}/${REPO_NAME}/pulls/${PR_NUM}" + ) + + BASE_BRANCH=$(echo "${RESP}" | jq -r '.base.ref') + fi # Change target is the branch name we are merging into but due to the weird way jenkins does # the checkout it isn't recognized by git without the origin/ prefix From 81d094f967af9febcd30c67b54351872170e0420 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Wed, 2 Aug 2023 14:44:35 -0700 Subject: [PATCH 12/29] Add comment explaining version --- ci/conda/environments/clang_env.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/ci/conda/environments/clang_env.yml b/ci/conda/environments/clang_env.yml index 64873cb36..4866f4a5a 100644 --- a/ci/conda/environments/clang_env.yml +++ b/ci/conda/environments/clang_env.yml @@ -19,6 +19,7 @@ name: mrc channels: - conda-forge dependencies: + # CI failing with clang 15.0.7 ha770c72_3 from conda-forge Temporarily locking off on 15.0.6 - clang=15.0.6 - clang-tools=15.0.6 - clangdev=15.0.6 From 9d5525952cf6422e4530c0126dc3065572daa2fe Mon Sep 17 00:00:00 2001 From: David Gardner Date: Wed, 2 Aug 2023 14:58:01 -0700 Subject: [PATCH 13/29] Avoid 15.0.7 --- ci/conda/environments/clang_env.yml | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/ci/conda/environments/clang_env.yml b/ci/conda/environments/clang_env.yml index 4866f4a5a..a4ce8e759 100644 --- a/ci/conda/environments/clang_env.yml +++ b/ci/conda/environments/clang_env.yml @@ -19,12 +19,12 @@ name: mrc channels: - conda-forge dependencies: - # CI failing with clang 15.0.7 ha770c72_3 from conda-forge Temporarily locking off on 15.0.6 - - clang=15.0.6 - - clang-tools=15.0.6 - - clangdev=15.0.6 - - clangxx=15.0.6 - - libclang=15.0.6 - - libclang-cpp=15.0.6 - - llvmdev=15.0.6 + # CI failing with clang 15.0.7 ha770c72_3 from conda-forge + - clang=15.0.6|>15.0.7 + - clang-tools=15.0.6|>15.0.7 + - clangdev=15.0.6|>15.0.7 + - clangxx=15.0.6|>15.0.7 + - libclang=15.0.6|>15.0.7 + - libclang-cpp=15.0.6|>15.0.7 + - llvmdev=15.0.6|>15.0.7 - include-what-you-use=0.19 From 1eb89f7e9c8c2a0c8b1d9d071e2d7712621fcec5 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 3 Aug 2023 08:53:08 -0700 Subject: [PATCH 14/29] revert clang version changes [no ci] --- ci/conda/environments/clang_env.yml | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/ci/conda/environments/clang_env.yml b/ci/conda/environments/clang_env.yml index a4ce8e759..25ea14a20 100644 --- a/ci/conda/environments/clang_env.yml +++ b/ci/conda/environments/clang_env.yml @@ -1,14 +1,14 @@ # SPDX-FileCopyrightText: Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # -# Licensed under the Apache License, Version 2.0 (the "License"); +# Licensed under the Apache License, Version 2.0 (the License); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, +# distributed under the License is distributed on an AS IS BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. @@ -19,12 +19,11 @@ name: mrc channels: - conda-forge dependencies: - # CI failing with clang 15.0.7 ha770c72_3 from conda-forge - - clang=15.0.6|>15.0.7 - - clang-tools=15.0.6|>15.0.7 - - clangdev=15.0.6|>15.0.7 - - clangxx=15.0.6|>15.0.7 - - libclang=15.0.6|>15.0.7 - - libclang-cpp=15.0.6|>15.0.7 - - llvmdev=15.0.6|>15.0.7 + - clang=15 + - clang-tools=15 + - clangdev=15 + - clangxx=15 + - libclang=15 + - libclang-cpp=15 + - llvmdev=15 - include-what-you-use=0.19 From c0f8a35dbac8b3a2a209dcf4179d1d4592b89de2 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 3 Aug 2023 09:56:27 -0700 Subject: [PATCH 15/29] Adopt updated versions of boost and libhwlock this chooses different builds of llvm packages to fix CI --- ci/conda/environments/dev_env.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ci/conda/environments/dev_env.yml b/ci/conda/environments/dev_env.yml index 58d83d9a7..5af8a91c9 100644 --- a/ci/conda/environments/dev_env.yml +++ b/ci/conda/environments/dev_env.yml @@ -25,7 +25,7 @@ dependencies: - autoconf>=2.69 - bash-completion - benchmark=1.6.0 - - boost-cpp=1.74 + - boost-cpp=1.82 - ccache - cmake=3.24 - cuda-toolkit # Version comes from the channel above @@ -46,7 +46,7 @@ dependencies: - isort - jinja2=3.0 - lcov=1.15 - - libhwloc=2.5 + - libhwloc=2.9.2 - libprotobuf=3.21 - librmm=23.06 - libtool From 46ac7b514be01a889bcaaa0a0c5e95e2d5e7438c Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 3 Aug 2023 10:20:07 -0700 Subject: [PATCH 16/29] Apply fix from mdemoret-nv:mdd_force-stubgen-dev --- ci/release/update-version.sh | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/ci/release/update-version.sh b/ci/release/update-version.sh index de1318c0c..31b541957 100755 --- a/ci/release/update-version.sh +++ b/ci/release/update-version.sh @@ -60,7 +60,10 @@ function sed_runner() { # .gitmodules git submodule set-branch -b branch-${NEXT_SHORT_TAG} morpheus_utils -git submodule update --remote +if [[ "$(git diff --name-only | grep .gitmodules)" != "" ]]; then + # Only update the submodules if setting the branch changed .gitmodules + git submodule update --remote +fi # Root CMakeLists.txt sed_runner 's/'"VERSION ${CURRENT_FULL_VERSION}.*"'/'"VERSION ${NEXT_FULL_VERSION}"'/g' CMakeLists.txt From 1cd28f6e9349c44db12668511f9f902d5970ca7d Mon Sep 17 00:00:00 2001 From: David Gardner Date: Mon, 25 Sep 2023 13:42:41 -0700 Subject: [PATCH 17/29] IWYU changes --- cpp/mrc/src/internal/segment/builder_definition.cpp | 2 +- cpp/mrc/tests/test_pipeline.cpp | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/cpp/mrc/src/internal/segment/builder_definition.cpp b/cpp/mrc/src/internal/segment/builder_definition.cpp index 6633aa868..569d5cc94 100644 --- a/cpp/mrc/src/internal/segment/builder_definition.cpp +++ b/cpp/mrc/src/internal/segment/builder_definition.cpp @@ -28,9 +28,9 @@ #include "mrc/modules/properties/persistent.hpp" // IWYU pragma: keep #include "mrc/modules/segment_modules.hpp" #include "mrc/node/port_registry.hpp" +#include "mrc/runnable/launchable.hpp" // for Launchable #include "mrc/segment/egress_port.hpp" // IWYU pragma: keep #include "mrc/segment/ingress_port.hpp" // IWYU pragma: keep -#include "mrc/segment/initializers.hpp" #include "mrc/segment/object.hpp" #include "mrc/types.hpp" diff --git a/cpp/mrc/tests/test_pipeline.cpp b/cpp/mrc/tests/test_pipeline.cpp index adeb36804..9d6204419 100644 --- a/cpp/mrc/tests/test_pipeline.cpp +++ b/cpp/mrc/tests/test_pipeline.cpp @@ -16,7 +16,9 @@ */ #include "mrc/node/rx_sink.hpp" +#include "mrc/node/rx_sink_base.hpp" // for RxSinkBase #include "mrc/node/rx_source.hpp" +#include "mrc/node/rx_source_base.hpp" // for RxSourceBase #include "mrc/options/options.hpp" #include "mrc/options/topology.hpp" #include "mrc/pipeline/executor.hpp" @@ -33,13 +35,10 @@ #include #include -#include #include #include #include -#include #include -#include namespace mrc { From dfa066e7102ccb160bdd090a281e6c386b41ee06 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Tue, 26 Sep 2023 08:05:45 -0700 Subject: [PATCH 18/29] Python repro for issue #360 --- python/tests/test_pipeline.py | 44 ++++++++++++++++++++++++++++++++++- 1 file changed, 43 insertions(+), 1 deletion(-) diff --git a/python/tests/test_pipeline.py b/python/tests/test_pipeline.py index 7f70068e6..7b1abc4f0 100644 --- a/python/tests/test_pipeline.py +++ b/python/tests/test_pipeline.py @@ -19,7 +19,8 @@ # from functools import partial # import numpy as np -# import pytest + +import pytest import mrc import mrc.tests.test_edges_cpp as m @@ -445,9 +446,50 @@ def on_complete(): executor.join() +def test_segment_init_error(): + """ + Test for issue #360 + """ + + def gen_data(): + yield 1 + + def init1(builder: mrc.Builder): + source = builder.make_source("source", gen_data) + egress = builder.get_egress("b") + builder.make_edge(source, egress) + + def init2(builder: mrc.Builder): + + def on_next(input): + pass + + ingress = builder.get_ingress("b") + sink = builder.make_sink("sink", on_next) + + builder.make_edge(ingress, sink) + raise RuntimeError("Test for #360") + + pipe = mrc.Pipeline() + + pipe.make_segment("TestSegment11", [], [("b", int, False)], init1) + pipe.make_segment("TestSegment22", [("b", int, False)], [], init2) + + options = mrc.Options() + + executor = mrc.Executor(options) + executor.register_pipeline(pipe) + + with pytest.raises(RuntimeError): + executor.start() + + executor.join() + + if (__name__ in ("__main__", )): test_dynamic_port_creation_good() test_dynamic_port_creation_bad() test_ingress_egress_custom_type_construction() test_dynamic_port_get_ingress_egress() test_dynamic_port_with_type_get_ingress_egress() + test_segment_init_error() From 9e9a5f783576e79401ef6ba340789bbcf9c8cea2 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Tue, 26 Sep 2023 08:24:25 -0700 Subject: [PATCH 19/29] Place executor.join under exception handler [no ci] --- python/tests/test_pipeline.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python/tests/test_pipeline.py b/python/tests/test_pipeline.py index 7b1abc4f0..fc6a4998d 100644 --- a/python/tests/test_pipeline.py +++ b/python/tests/test_pipeline.py @@ -482,8 +482,7 @@ def on_next(input): with pytest.raises(RuntimeError): executor.start() - - executor.join() + executor.join() if (__name__ in ("__main__", )): From 38c0e15ea4bb2a1fe3033eb6042ef04ac9691f3c Mon Sep 17 00:00:00 2001 From: David Gardner Date: Tue, 26 Sep 2023 10:31:28 -0700 Subject: [PATCH 20/29] Fix repro test, works if segment2 raises, but fails os segment1 raises [no ci] --- python/tests/test_pipeline.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/tests/test_pipeline.py b/python/tests/test_pipeline.py index fc6a4998d..49b5a891f 100644 --- a/python/tests/test_pipeline.py +++ b/python/tests/test_pipeline.py @@ -458,6 +458,7 @@ def init1(builder: mrc.Builder): source = builder.make_source("source", gen_data) egress = builder.get_egress("b") builder.make_edge(source, egress) + raise RuntimeError("Test for #360") def init2(builder: mrc.Builder): @@ -468,12 +469,11 @@ def on_next(input): sink = builder.make_sink("sink", on_next) builder.make_edge(ingress, sink) - raise RuntimeError("Test for #360") pipe = mrc.Pipeline() - pipe.make_segment("TestSegment11", [], [("b", int, False)], init1) - pipe.make_segment("TestSegment22", [("b", int, False)], [], init2) + pipe.make_segment("TestSegment1", [], [("b", int, False)], init1) + pipe.make_segment("TestSegment2", [("b", int, False)], [], init2) options = mrc.Options() From 22220a269aacbf08f5f60579dc7f1d929760f20b Mon Sep 17 00:00:00 2001 From: David Gardner Date: Tue, 26 Sep 2023 10:44:33 -0700 Subject: [PATCH 21/29] Add second test, with exception being thrown in both the first and second segments [no ci] --- cpp/mrc/tests/test_pipeline.cpp | 46 ++++++++++++++++++++++++++++++++- 1 file changed, 45 insertions(+), 1 deletion(-) diff --git a/cpp/mrc/tests/test_pipeline.cpp b/cpp/mrc/tests/test_pipeline.cpp index 9d6204419..0315d7ab5 100644 --- a/cpp/mrc/tests/test_pipeline.cpp +++ b/cpp/mrc/tests/test_pipeline.cpp @@ -159,8 +159,52 @@ TEST_F(TestPipeline, TwoSegment) LOG(INFO) << "Done" << std::endl; } +TEST_F(TestPipeline, SegmentInitErrorHandlingFirstSeg) +{ + // Test to reproduce issue #360 + auto pipeline = mrc::make_pipeline(); + + auto seg_1 = + pipeline->make_segment("seg_1", segment::EgressPorts({"float_port"}), [](segment::IBuilder& seg) { + auto rx_source = seg.make_source("rx_source", [](rxcpp::subscriber s) { + FAIL() << "This should not be called"; + }); + + auto my_float_egress = seg.get_egress("float_port"); + + seg.make_edge(rx_source, my_float_egress); + throw std::runtime_error("Error in initializer"); + }); + + auto seg_2 = pipeline->make_segment("seg_2", + segment::IngressPorts({"float_port"}), + [&](segment::IBuilder& seg) { + auto my_float_ingress = seg.get_ingress("float_port"); + + auto rx_sink = seg.make_sink("rx_sink", + rxcpp::make_observer_dynamic( + [&](float x) { + FAIL() << "This should not be " + "called"; + }, + [&]() { + FAIL() << "This should not be " + "called"; + })); + + seg.make_edge(my_float_ingress, rx_sink); + }); + + Executor exec(std::move(m_options)); + + exec.register_pipeline(std::move(pipeline)); + + exec.start(); + + EXPECT_THROW(exec.join(), std::runtime_error); +} -TEST_F(TestPipeline, SegmentInitErrorHandling) +TEST_F(TestPipeline, SegmentInitErrorHandlingSecondSeg) { // Test to reproduce issue #360 auto pipeline = mrc::make_pipeline(); From 06d28a6f039d8ac806fa4770cd54035fd8991bf5 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Wed, 27 Sep 2023 10:11:28 -0700 Subject: [PATCH 22/29] WIP: A bunch of bebug logging that needs to be removed, add a destroy method to ObjectProperties [no ci] --- cpp/mrc/include/mrc/node/rx_sink_base.hpp | 10 +++++- cpp/mrc/include/mrc/node/rx_source_base.hpp | 10 +++++- cpp/mrc/include/mrc/segment/builder.hpp | 15 ++++---- cpp/mrc/include/mrc/segment/component.hpp | 5 +++ cpp/mrc/include/mrc/segment/object.hpp | 2 ++ cpp/mrc/include/mrc/segment/runnable.hpp | 7 ++++ .../internal/segment/builder_definition.cpp | 25 ++++++++++++- cpp/mrc/tests/test_pipeline.cpp | 36 +++++++++++++++++++ 8 files changed, 101 insertions(+), 9 deletions(-) diff --git a/cpp/mrc/include/mrc/node/rx_sink_base.hpp b/cpp/mrc/include/mrc/node/rx_sink_base.hpp index 89d4a8c8f..f975c3d2d 100644 --- a/cpp/mrc/include/mrc/node/rx_sink_base.hpp +++ b/cpp/mrc/include/mrc/node/rx_sink_base.hpp @@ -48,7 +48,7 @@ class RxSinkBase : public WritableProvider, public ReadableAcceptor, publi protected: RxSinkBase(); - ~RxSinkBase() override = default; + ~RxSinkBase() override; const rxcpp::observable& observable() const; @@ -70,6 +70,14 @@ RxSinkBase::RxSinkBase() : this->set_channel(std::make_unique>()); } +template +RxSinkBase::~RxSinkBase() +{ + LOG(INFO) << "~RxSinkBase()"; + this->release_edge_connection(); + LOG(INFO) << "~RxSinkBase() - released"; +} + template const rxcpp::observable& RxSinkBase::observable() const { diff --git a/cpp/mrc/include/mrc/node/rx_source_base.hpp b/cpp/mrc/include/mrc/node/rx_source_base.hpp index 58876d3c6..1fe1cd196 100644 --- a/cpp/mrc/include/mrc/node/rx_source_base.hpp +++ b/cpp/mrc/include/mrc/node/rx_source_base.hpp @@ -56,7 +56,7 @@ class RxSourceBase : public ReadableProvider, protected: RxSourceBase(); - ~RxSourceBase() override = default; + ~RxSourceBase() override; const rxcpp::observer& observer() const; @@ -84,6 +84,14 @@ RxSourceBase::RxSourceBase() : this->set_channel(std::make_unique>()); } +template +RxSourceBase::~RxSourceBase() +{ + LOG(ERROR) << "RxSourceBase destructor called releasing edge connection"; + this->release_edge_connection(); + LOG(ERROR) << "RxSourceBase released"; +} + template const rxcpp::observer& RxSourceBase::observer() const { diff --git a/cpp/mrc/include/mrc/segment/builder.hpp b/cpp/mrc/include/mrc/segment/builder.hpp index a35f571c9..95e2da872 100644 --- a/cpp/mrc/include/mrc/segment/builder.hpp +++ b/cpp/mrc/include/mrc/segment/builder.hpp @@ -79,14 +79,14 @@ namespace { namespace hana = boost::hana; template -auto has_source_add_watcher = hana::is_valid( - [](auto&& thing) -> decltype(std::forward(thing).source_add_watcher( - std::declval>())) {}); +auto has_source_add_watcher = + hana::is_valid([](auto&& thing) -> decltype(std::forward(thing).source_add_watcher( + std::declval>())) {}); template -auto has_sink_add_watcher = hana::is_valid( - [](auto&& thing) -> decltype(std::forward(thing).sink_add_watcher( - std::declval>())) {}); +auto has_sink_add_watcher = + hana::is_valid([](auto&& thing) -> decltype(std::forward(thing).sink_add_watcher( + std::declval>())) {}); template void add_stats_watcher_if_rx_source(T& thing, std::string name) @@ -464,6 +464,8 @@ void IBuilder::make_edge(SourceObjectT source, SinkObjectT sink) auto& source_object = to_object_properties(source); auto& sink_object = to_object_properties(sink); + LOG(INFO) << "Creating edge from " << source_object.name() << " to " << sink_object.name(); + // If we can determine the type from the actual object, use that, then fall back to hints or defaults. using deduced_source_type_t = first_non_void_type_t> segment_o CHECK(segment_object->is_source()); using source_type_t = typename ObjectT::source_type_t; auto counter = this->make_throughput_counter(runnable->name()); + runnable->object().add_epilogue_tap([counter](const source_type_t& data) { counter(1); }); diff --git a/cpp/mrc/include/mrc/segment/component.hpp b/cpp/mrc/include/mrc/segment/component.hpp index 3e25f9b63..2f2a9921d 100644 --- a/cpp/mrc/include/mrc/segment/component.hpp +++ b/cpp/mrc/include/mrc/segment/component.hpp @@ -34,6 +34,11 @@ class Component final : public Object Component(std::unique_ptr resource) : m_resource(std::move(resource)) {} ~Component() final = default; + void destroy() final + { + m_resource.reset(); + } + private: ResourceT* get_object() const final { diff --git a/cpp/mrc/include/mrc/segment/object.hpp b/cpp/mrc/include/mrc/segment/object.hpp index 2ccc80094..b48bfcaaf 100644 --- a/cpp/mrc/include/mrc/segment/object.hpp +++ b/cpp/mrc/include/mrc/segment/object.hpp @@ -74,6 +74,8 @@ struct ObjectProperties virtual runnable::LaunchOptions& launch_options() = 0; virtual const runnable::LaunchOptions& launch_options() const = 0; + + virtual void destroy(){}; }; inline ObjectProperties::~ObjectProperties() = default; diff --git a/cpp/mrc/include/mrc/segment/runnable.hpp b/cpp/mrc/include/mrc/segment/runnable.hpp index ab5b590ca..ffc5a8121 100644 --- a/cpp/mrc/include/mrc/segment/runnable.hpp +++ b/cpp/mrc/include/mrc/segment/runnable.hpp @@ -46,6 +46,13 @@ class Runnable : public Object, public runnable::Launchable CHECK(m_node); } + void destroy() final + { + LOG(INFO) << "Destroying " << this->type_name() << " in segment"; + // m_node->release_edge_connection(); + m_node.reset(); + } + private: NodeT* get_object() const final; std::unique_ptr prepare_launcher(runnable::LaunchControl& launch_control) final; diff --git a/cpp/mrc/src/internal/segment/builder_definition.cpp b/cpp/mrc/src/internal/segment/builder_definition.cpp index 569d5cc94..a4023ef7d 100644 --- a/cpp/mrc/src/internal/segment/builder_definition.cpp +++ b/cpp/mrc/src/internal/segment/builder_definition.cpp @@ -28,6 +28,7 @@ #include "mrc/modules/properties/persistent.hpp" // IWYU pragma: keep #include "mrc/modules/segment_modules.hpp" #include "mrc/node/port_registry.hpp" +#include "mrc/node/rx_source.hpp" #include "mrc/runnable/launchable.hpp" // for Launchable #include "mrc/segment/egress_port.hpp" // IWYU pragma: keep #include "mrc/segment/ingress_port.hpp" // IWYU pragma: keep @@ -284,9 +285,28 @@ void BuilderDefinition::initialize() << ", Segment Rank: " << m_rank << ". Exception message:\n" << e.what(); - m_objects.clear(); + LOG(ERROR) << "********* <--- nodes=" << m_nodes.size() << ", ingress_ports=" << m_ingress_ports.size() + << ", egress_ports=" << m_egress_ports.size() << ", objects=" << m_objects.size() + << ", modules=" << m_modules.size() << " ---> *********"; + + for (auto& [name, obj_prop] : m_objects) + { + if (obj_prop->is_source() && !obj_prop->is_sink()) + { + LOG(ERROR) << "Destroying: " << name; + obj_prop->destroy(); + } + } + + LOG(ERROR) << "********* <--- 0"; m_ingress_ports.clear(); + LOG(ERROR) << "********* <--- 1"; m_egress_ports.clear(); + LOG(ERROR) << "********* <--- 2+"; + m_nodes.clear(); + LOG(ERROR) << "********* <--- 2.5"; + m_objects.clear(); + LOG(ERROR) << "********* <--- 3"; // Rethrow after logging std::rethrow_exception(std::current_exception()); @@ -349,6 +369,7 @@ void BuilderDefinition::add_object(const std::string& name, std::shared_ptr<::mr if (object->is_runnable()) { + LOG(ERROR) << "Adding runnable: " << name; auto launchable = std::dynamic_pointer_cast(object); CHECK(launchable) << "Invalid conversion. Object returned is_runnable() == true, but was not of type " @@ -360,6 +381,7 @@ void BuilderDefinition::add_object(const std::string& name, std::shared_ptr<::mr // Add to ingress ports list if it is the right type if (auto ingress_port = std::dynamic_pointer_cast(object)) { + LOG(ERROR) << "Adding ingress_port: " << name; // Save by the original name m_ingress_ports[local_name] = ingress_port; } @@ -367,6 +389,7 @@ void BuilderDefinition::add_object(const std::string& name, std::shared_ptr<::mr // Add to egress ports list if it is the right type if (auto egress_port = std::dynamic_pointer_cast(object)) { + LOG(ERROR) << "Adding egress_port: " << name; // Save by the original name m_egress_ports[local_name] = egress_port; } diff --git a/cpp/mrc/tests/test_pipeline.cpp b/cpp/mrc/tests/test_pipeline.cpp index 0315d7ab5..9ded8d65a 100644 --- a/cpp/mrc/tests/test_pipeline.cpp +++ b/cpp/mrc/tests/test_pipeline.cpp @@ -159,6 +159,42 @@ TEST_F(TestPipeline, TwoSegment) LOG(INFO) << "Done" << std::endl; } + +TEST_F(TestPipeline, SegmentInitErrorHandling) +{ + // Test to reproduce issue #360 + auto pipeline = mrc::make_pipeline(); + + auto seg = pipeline->make_segment("seg_1", [](segment::IBuilder& seg) { + auto rx_source = seg.make_source("rx_source", [](rxcpp::subscriber s) { + FAIL() << "This should not be called"; + }); + + auto rx_sink = seg.make_sink("rx_sink", + rxcpp::make_observer_dynamic( + [&](float x) { + FAIL() << "This should not be " + "called"; + }, + [&]() { + FAIL() << "This should not be " + "called"; + })); + + seg.make_edge(rx_source, rx_sink); + + throw std::runtime_error("Error in initializer"); + }); + + Executor exec(std::move(m_options)); + + exec.register_pipeline(std::move(pipeline)); + + exec.start(); + + EXPECT_THROW(exec.join(), std::runtime_error); +} + TEST_F(TestPipeline, SegmentInitErrorHandlingFirstSeg) { // Test to reproduce issue #360 From 1d12c2fca9b4ca69f3330d82c9525dc1e765f7a9 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Wed, 27 Sep 2023 16:00:56 -0700 Subject: [PATCH 23/29] WIP: Reset m_owned_edge from release_edge_connection --- cpp/mrc/include/mrc/edge/edge_holder.hpp | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/cpp/mrc/include/mrc/edge/edge_holder.hpp b/cpp/mrc/include/mrc/edge/edge_holder.hpp index b3d801484..23591886f 100644 --- a/cpp/mrc/include/mrc/edge/edge_holder.hpp +++ b/cpp/mrc/include/mrc/edge/edge_holder.hpp @@ -53,12 +53,20 @@ class EdgeHolder EdgeHolder() = default; virtual ~EdgeHolder() { + auto p = m_connected_edge.get(); // Drop any edge connections before this object goes out of scope. This should execute any disconnectors m_connected_edge.reset(); if (this->check_active_connection(false)) { - LOG(FATAL) << "A node was destructed which still had dependent connections. Nodes must be kept alive while " + LOG(INFO) << "edge = " << p << " edge_holder=" << this; + if (p) + { + p->log_edges(); + } + LOG(FATAL) << this + << " A node was destructed which still had dependent connections. Nodes must be kept alive " + "while " "dependent connections are still active"; } } @@ -69,6 +77,7 @@ class EdgeHolder // Alive connection exists when the lock is true, lifetime is false or a connction object has been set if (m_owned_edge.lock() && !m_owned_edge_lifetime) { + LOG(INFO) << "check_active_connection = " << this << " m_owned_edge.lock() && !m_owned_edge_lifetime"; // Then someone is using this edge already, cant be changed if (do_throw) { @@ -80,6 +89,8 @@ class EdgeHolder // Check for set connections. Must be connected to throw error if (m_connected_edge && m_connected_edge->is_connected()) { + LOG(INFO) << "check_active_connection = " << this + << " m_connected_edge && m_connected_edge->is_connected()"; // Then someone is using this edge already, cant be changed if (do_throw) { @@ -152,8 +163,10 @@ class EdgeHolder void release_edge_connection() { + LOG(INFO) << "Releasing edge connection for edge_holder=" << this; m_owned_edge_lifetime.reset(); m_connected_edge.reset(); + m_owned_edge.reset(); } const std::shared_ptr>& get_connected_edge() const From 22205c0b5eb52f30e556f10cbd4a8d90c2920334 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Wed, 27 Sep 2023 16:02:43 -0700 Subject: [PATCH 24/29] WIP: more debug logging than you can shake a stick at [no ci] --- cpp/mrc/include/mrc/edge/edge.hpp | 15 +++++ cpp/mrc/include/mrc/edge/edge_builder.hpp | 6 ++ cpp/mrc/include/mrc/node/rx_sink_base.hpp | 8 +++ cpp/mrc/include/mrc/node/rx_source_base.hpp | 12 +++- cpp/mrc/include/mrc/segment/builder.hpp | 5 ++ cpp/mrc/include/mrc/segment/egress_port.hpp | 18 ++++++ cpp/mrc/include/mrc/segment/ingress_port.hpp | 18 ++++++ cpp/mrc/include/mrc/segment/runnable.hpp | 1 - .../internal/pipeline/pipeline_instance.cpp | 5 +- .../internal/segment/builder_definition.cpp | 59 +++++++++++++------ .../internal/segment/builder_definition.hpp | 3 + .../src/internal/segment/segment_instance.cpp | 6 ++ .../src/internal/segment/segment_instance.hpp | 1 + cpp/mrc/src/public/edge/edge_builder.cpp | 2 + 14 files changed, 138 insertions(+), 21 deletions(-) diff --git a/cpp/mrc/include/mrc/edge/edge.hpp b/cpp/mrc/include/mrc/edge/edge.hpp index ea2a35e07..5871819ff 100644 --- a/cpp/mrc/include/mrc/edge/edge.hpp +++ b/cpp/mrc/include/mrc/edge/edge.hpp @@ -109,6 +109,15 @@ class EdgeBase m_disconnectors.clear(); }; + void log_edges() + { + LOG(INFO) << "EdgeBase: " << this << " has " << m_linked_edges.size() << " linked edges"; + for (auto& linked_edge : m_linked_edges) + { + LOG(INFO) << " Linked edge: " << linked_edge.get(); + } + } + bool is_connected() const { return m_is_connected; @@ -193,6 +202,12 @@ class Edge : public virtual EdgeBase friend class EdgeHolder; template friend class MultiEdgeHolder; + + void log_edges() + { + LOG(INFO) << "Edge: " << this; + EdgeBase::log_edges(); + } }; class EdgeTypeInfo diff --git a/cpp/mrc/include/mrc/edge/edge_builder.hpp b/cpp/mrc/include/mrc/edge/edge_builder.hpp index 78e88b577..2652dbda6 100644 --- a/cpp/mrc/include/mrc/edge/edge_builder.hpp +++ b/cpp/mrc/include/mrc/edge/edge_builder.hpp @@ -85,6 +85,7 @@ struct EdgeBuilder final template static void make_edge_writable(IWritableAcceptor& source, IWritableProvider& sink) { + LOG(INFO) << "\t\tcreating writable edge"; constexpr bool IsConvertable = std::is_convertible_v; constexpr bool LessBits = sizeof(SourceT) > sizeof(SinkT); // Sink requires more bits than source. constexpr bool FloatToInt = std::is_floating_point_v && std::is_integral_v; // float -> int @@ -126,6 +127,7 @@ struct EdgeBuilder final LOG(FATAL) << "No dynamic lookup available for statically typed objects"; } + LOG(INFO) << "\t\twritable edge = " << edge.get() << "\t handle = " << edge->get_handle().get(); source.set_writable_edge_handle(edge); } @@ -173,6 +175,7 @@ struct EdgeBuilder final LOG(FATAL) << "No dynamic lookup available for statically typed objects"; } + LOG(INFO) << "\t\treadable edge = " << edge.get() << "\t handle = " << edge->get_handle().get(); sink.set_readable_edge_handle(edge); } @@ -227,6 +230,8 @@ struct EdgeBuilder final auto edge_handle = edge_holder.get_connected_edge(); edge_holder.release_edge_connection(); + LOG(INFO) << "Splicing edge " << edge_handle.get() << " into " << splice_writable_acceptor; + make_edge_writable(*writable_acceptor, *splice_writable_provider); make_edge_writable(*splice_writable_acceptor, sink); } @@ -463,6 +468,7 @@ void make_edge(SourceT& source, SinkT& sink) using source_full_t = SourceT; using sink_full_t = SinkT; + LOG(INFO) << "\tedge_builder::make_edge"; if constexpr (is_base_of_template::value && is_base_of_template::value) { diff --git a/cpp/mrc/include/mrc/node/rx_sink_base.hpp b/cpp/mrc/include/mrc/node/rx_sink_base.hpp index f975c3d2d..2b616e34d 100644 --- a/cpp/mrc/include/mrc/node/rx_sink_base.hpp +++ b/cpp/mrc/include/mrc/node/rx_sink_base.hpp @@ -45,6 +45,7 @@ class RxSinkBase : public WritableProvider, public ReadableAcceptor, publi public: void sink_add_watcher(std::shared_ptr watcher); void sink_remove_watcher(std::shared_ptr watcher); + void release_edge_connection(); protected: RxSinkBase(); @@ -78,6 +79,13 @@ RxSinkBase::~RxSinkBase() LOG(INFO) << "~RxSinkBase() - released"; } +template +void RxSinkBase::release_edge_connection() +{ + LOG(ERROR) << "RxSinkBase releasing edge connection"; + SinkChannelOwner::release_edge_connection(); +} + template const rxcpp::observable& RxSinkBase::observable() const { diff --git a/cpp/mrc/include/mrc/node/rx_source_base.hpp b/cpp/mrc/include/mrc/node/rx_source_base.hpp index 1fe1cd196..7e3cbd0a3 100644 --- a/cpp/mrc/include/mrc/node/rx_source_base.hpp +++ b/cpp/mrc/include/mrc/node/rx_source_base.hpp @@ -53,6 +53,7 @@ class RxSourceBase : public ReadableProvider, public: void source_add_watcher(std::shared_ptr watcher); void source_remove_watcher(std::shared_ptr watcher); + void release_edge_connection(); protected: RxSourceBase(); @@ -87,9 +88,16 @@ RxSourceBase::RxSourceBase() : template RxSourceBase::~RxSourceBase() { - LOG(ERROR) << "RxSourceBase destructor called releasing edge connection"; + LOG(ERROR) << "~RxSourceBase"; this->release_edge_connection(); - LOG(ERROR) << "RxSourceBase released"; + LOG(ERROR) << "~RxSourceBase - done"; +} + +template +void RxSourceBase::release_edge_connection() +{ + LOG(ERROR) << "RxSourceBase releasing edge connection"; + SourceChannelOwner::release_edge_connection(); } template diff --git a/cpp/mrc/include/mrc/segment/builder.hpp b/cpp/mrc/include/mrc/segment/builder.hpp index 95e2da872..69cdca1f6 100644 --- a/cpp/mrc/include/mrc/segment/builder.hpp +++ b/cpp/mrc/include/mrc/segment/builder.hpp @@ -481,13 +481,18 @@ void IBuilder::make_edge(SourceObjectT source, SinkObjectT sink) if (source_object.is_writable_acceptor() && sink_object.is_writable_provider()) { + LOG(INFO) << "\tCreating edge from WritableAcceptor to WritableProvider for " << source_object.name() << " to " + << sink_object.name(); mrc::make_edge(source_object.template writable_acceptor_typed(), sink_object.template writable_provider_typed()); + LOG(INFO) << "\tCreating edge from WritableAcceptor to WritableProvider for " << source_object.name() << " to " + << sink_object.name() << " - done"; return; } if (source_object.is_readable_provider() && sink_object.is_readable_acceptor()) { + LOG(INFO) << "\tCreating edge from ReadableProvider to ReadableAcceptor"; mrc::make_edge(source_object.template readable_provider_typed(), sink_object.template readable_acceptor_typed()); return; diff --git a/cpp/mrc/include/mrc/segment/egress_port.hpp b/cpp/mrc/include/mrc/segment/egress_port.hpp index 7fe52a5ce..7b14144bf 100644 --- a/cpp/mrc/include/mrc/segment/egress_port.hpp +++ b/cpp/mrc/include/mrc/segment/egress_port.hpp @@ -64,6 +64,24 @@ class EgressPort final : public Object>, m_sink(std::make_unique>()) {} + void destroy() final + { + LOG(INFO) << "Destroying EgressPort " << this->type_name() << " in segment"; + + std::lock_guard lock(m_mutex); + if (m_sink) + { + auto* sink_ptr = get_object(); + LOG(INFO) << "\tEP releasing edge connection"; + sink_ptr->release_edge_connection(); + m_sink.reset(); + } + else + { + LOG(INFO) << "\tEP sink is null"; + } + } + private: node::RxSinkBase* get_object() const final { diff --git a/cpp/mrc/include/mrc/segment/ingress_port.hpp b/cpp/mrc/include/mrc/segment/ingress_port.hpp index fec6d469e..6777943a8 100644 --- a/cpp/mrc/include/mrc/segment/ingress_port.hpp +++ b/cpp/mrc/include/mrc/segment/ingress_port.hpp @@ -58,6 +58,24 @@ class IngressPort : public Object>, public IngressPortBase m_source(std::make_unique>()) {} + void destroy() final + { + LOG(INFO) << "Destroying IngressPort " << this->type_name() << " in segment"; + + std::lock_guard lock(m_mutex); + if (m_source) + { + auto* src_ptr = get_object(); + // LOG(INFO) << "\tIP releasing edge connection"; + // src_ptr->release_edge_connection(); + // m_source.reset(); + } + else + { + LOG(INFO) << "\tIP source is null"; + } + } + private: node::RxSourceBase* get_object() const final { diff --git a/cpp/mrc/include/mrc/segment/runnable.hpp b/cpp/mrc/include/mrc/segment/runnable.hpp index ffc5a8121..c33baee11 100644 --- a/cpp/mrc/include/mrc/segment/runnable.hpp +++ b/cpp/mrc/include/mrc/segment/runnable.hpp @@ -49,7 +49,6 @@ class Runnable : public Object, public runnable::Launchable void destroy() final { LOG(INFO) << "Destroying " << this->type_name() << " in segment"; - // m_node->release_edge_connection(); m_node.reset(); } diff --git a/cpp/mrc/src/internal/pipeline/pipeline_instance.cpp b/cpp/mrc/src/internal/pipeline/pipeline_instance.cpp index dddd73a3c..97f886193 100644 --- a/cpp/mrc/src/internal/pipeline/pipeline_instance.cpp +++ b/cpp/mrc/src/internal/pipeline/pipeline_instance.cpp @@ -103,6 +103,7 @@ void PipelineInstance::stop_segment(const SegmentAddress& address) // manifold(name).drop_output(address); } + search->second->shutdown(); search->second->service_stop(); } @@ -227,7 +228,9 @@ void PipelineInstance::do_service_await_join() } if (first_exception) { - LOG(ERROR) << "pipeline::PipelineInstance - an exception was caught while awaiting on segments - rethrowing"; + LOG(ERROR) << "pipeline::PipelineInstance - an exception was caught while awaiting on segments - calling " + "service kill before rethrowing"; + do_service_kill(); std::rethrow_exception(std::move(first_exception)); } } diff --git a/cpp/mrc/src/internal/segment/builder_definition.cpp b/cpp/mrc/src/internal/segment/builder_definition.cpp index a4023ef7d..d40946727 100644 --- a/cpp/mrc/src/internal/segment/builder_definition.cpp +++ b/cpp/mrc/src/internal/segment/builder_definition.cpp @@ -289,28 +289,53 @@ void BuilderDefinition::initialize() << ", egress_ports=" << m_egress_ports.size() << ", objects=" << m_objects.size() << ", modules=" << m_modules.size() << " ---> *********"; - for (auto& [name, obj_prop] : m_objects) + shutdown(); + + // Rethrow after logging + std::rethrow_exception(std::current_exception()); + } +} + +void BuilderDefinition::shutdown() +{ + if (m_shutdown) + { + return; + } + + m_shutdown = true; + + LOG(INFO) << "Shutting down segment: " << this->name(); + for (auto& [name, obj_prop] : m_objects) + { + if (obj_prop->is_source() && !obj_prop->is_sink()) { - if (obj_prop->is_source() && !obj_prop->is_sink()) - { - LOG(ERROR) << "Destroying: " << name; - obj_prop->destroy(); - } + LOG(ERROR) << "Destroying: " << name; + obj_prop->destroy(); } + } - LOG(ERROR) << "********* <--- 0"; - m_ingress_ports.clear(); - LOG(ERROR) << "********* <--- 1"; - m_egress_ports.clear(); - LOG(ERROR) << "********* <--- 2+"; - m_nodes.clear(); - LOG(ERROR) << "********* <--- 2.5"; - m_objects.clear(); - LOG(ERROR) << "********* <--- 3"; + for (auto& [name, port] : m_ingress_ports) + { + LOG(ERROR) << "Destroying IP: " << name; + port->destroy(); + } - // Rethrow after logging - std::rethrow_exception(std::current_exception()); + for (auto& [name, port] : m_egress_ports) + { + LOG(ERROR) << "Destroying EP: " << name; + port->destroy(); } + + LOG(ERROR) << "********* <--- 0"; + m_ingress_ports.clear(); + LOG(ERROR) << "********* <--- 1"; + m_egress_ports.clear(); + LOG(ERROR) << "********* <--- 2+"; + m_nodes.clear(); + LOG(ERROR) << "********* <--- 2.5"; + m_objects.clear(); + LOG(ERROR) << "********* <--- 3"; } const std::map>& BuilderDefinition::nodes() const diff --git a/cpp/mrc/src/internal/segment/builder_definition.hpp b/cpp/mrc/src/internal/segment/builder_definition.hpp index aa0c96140..d26302988 100644 --- a/cpp/mrc/src/internal/segment/builder_definition.hpp +++ b/cpp/mrc/src/internal/segment/builder_definition.hpp @@ -119,6 +119,7 @@ class BuilderDefinition : public IBuilder const SegmentDefinition& definition() const; void initialize(); + void shutdown(); const std::map>& nodes() const; const std::map>& egress_ports() const; @@ -163,6 +164,8 @@ class BuilderDefinition : public IBuilder // ingress/egress - these are also nodes/objects std::map> m_ingress_ports; std::map> m_egress_ports; + + bool m_shutdown{false}; }; } // namespace mrc::segment diff --git a/cpp/mrc/src/internal/segment/segment_instance.cpp b/cpp/mrc/src/internal/segment/segment_instance.cpp index 53f66b804..bebe1a969 100644 --- a/cpp/mrc/src/internal/segment/segment_instance.cpp +++ b/cpp/mrc/src/internal/segment/segment_instance.cpp @@ -331,4 +331,10 @@ std::shared_ptr SegmentInstance::create_manifold(const Port return nullptr; } +void SegmentInstance::shutdown() +{ + std::lock_guard lock(m_mutex); + m_builder->shutdown(); +} + } // namespace mrc::segment diff --git a/cpp/mrc/src/internal/segment/segment_instance.hpp b/cpp/mrc/src/internal/segment/segment_instance.hpp index addd38dd1..7d18fdd4e 100644 --- a/cpp/mrc/src/internal/segment/segment_instance.hpp +++ b/cpp/mrc/src/internal/segment/segment_instance.hpp @@ -56,6 +56,7 @@ class SegmentInstance final : public Service std::shared_ptr create_manifold(const PortName& name); void attach_manifold(std::shared_ptr manifold); + void shutdown(); protected: const std::string& info() const; diff --git a/cpp/mrc/src/public/edge/edge_builder.cpp b/cpp/mrc/src/public/edge/edge_builder.cpp index ff21e9a4a..8230364ea 100644 --- a/cpp/mrc/src/public/edge/edge_builder.cpp +++ b/cpp/mrc/src/public/edge/edge_builder.cpp @@ -40,6 +40,7 @@ void EdgeBuilder::make_edge_writable_typeless(IWritableAcceptorBase& source, auto ingress = sink.get_writable_edge_handle(); // Set to the source + LOG(INFO) << "\t\twritable typeless edge = " << ingress.get() << "\t handle = " << ingress->get_handle().get(); source.set_writable_edge_handle(ingress); } @@ -51,6 +52,7 @@ void EdgeBuilder::make_edge_readable_typeless(IReadableProviderBase& source, auto egress = source.get_readable_edge_handle(); // Set to the sink + LOG(INFO) << "\t\treadable typeless edge = " << egress.get() << "\t handle = " << egress->get_handle().get(); sink.set_readable_edge_handle(egress); } From 142e9b8b4f6a2da3d3bd2ad0bf5ce4dd2759f62c Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 28 Sep 2023 11:17:37 -0700 Subject: [PATCH 25/29] WIP: tests passing [no ci] --- cpp/mrc/include/mrc/edge/edge_holder.hpp | 6 +++++- .../include/mrc/manifold/composite_manifold.hpp | 14 ++++++++++++++ cpp/mrc/include/mrc/manifold/egress.hpp | 7 +++++++ cpp/mrc/include/mrc/manifold/ingress.hpp | 7 +++++++ cpp/mrc/include/mrc/manifold/interface.hpp | 6 ++++-- cpp/mrc/include/mrc/manifold/manifold.hpp | 4 ++-- cpp/mrc/include/mrc/segment/forward.hpp | 3 +++ .../src/internal/pipeline/pipeline_instance.cpp | 3 +++ .../src/internal/segment/builder_definition.cpp | 5 +++-- .../src/internal/segment/segment_instance.cpp | 16 ++++++++++++++++ .../src/internal/segment/segment_instance.hpp | 1 + cpp/mrc/src/public/manifold/manifold.cpp | 2 ++ cpp/mrc/tests/test_pipeline.cpp | 9 +++++++++ 13 files changed, 76 insertions(+), 7 deletions(-) diff --git a/cpp/mrc/include/mrc/edge/edge_holder.hpp b/cpp/mrc/include/mrc/edge/edge_holder.hpp index 23591886f..6e0b2ab6f 100644 --- a/cpp/mrc/include/mrc/edge/edge_holder.hpp +++ b/cpp/mrc/include/mrc/edge/edge_holder.hpp @@ -50,7 +50,11 @@ template class EdgeHolder { public: - EdgeHolder() = default; + EdgeHolder() + { + LOG(INFO) << "EdgeHolder() = " << this; + }; + virtual ~EdgeHolder() { auto p = m_connected_edge.get(); diff --git a/cpp/mrc/include/mrc/manifold/composite_manifold.hpp b/cpp/mrc/include/mrc/manifold/composite_manifold.hpp index 974729468..4f508ebd6 100644 --- a/cpp/mrc/include/mrc/manifold/composite_manifold.hpp +++ b/cpp/mrc/include/mrc/manifold/composite_manifold.hpp @@ -59,6 +59,20 @@ class CompositeManifold : public Manifold mrc::make_edge(*m_ingress, *m_egress); } + ~CompositeManifold() override + { + LOG(INFO) << "CompositeManifold::~CompositeManifold(): " << info(); + shutdown(); + }; + + void shutdown() final + { + LOG(INFO) << "CompositeManifold::shutdown(): " << info(); + m_ingress->shutdown(); + m_egress->shutdown(); + LOG(INFO) << "CompositeManifold::shutdown(): done"; + } + protected: IngressT& ingress() { diff --git a/cpp/mrc/include/mrc/manifold/egress.hpp b/cpp/mrc/include/mrc/manifold/egress.hpp index 781122d61..33aae7e27 100644 --- a/cpp/mrc/include/mrc/manifold/egress.hpp +++ b/cpp/mrc/include/mrc/manifold/egress.hpp @@ -35,6 +35,7 @@ struct EgressDelegate { virtual ~EgressDelegate() = default; virtual void add_output(const SegmentAddress& address, edge::IWritableProviderBase* output_sink) = 0; + virtual void shutdown(){}; }; template @@ -55,6 +56,12 @@ class TypedEgress : public EgressDelegate template class RoundRobinEgress : public node::Router, public TypedEgress { + public: + void shutdown() final + { + node::Router::release_edge_connections(); + } + protected: SegmentAddress determine_key_for_value(const T& t) override { diff --git a/cpp/mrc/include/mrc/manifold/ingress.hpp b/cpp/mrc/include/mrc/manifold/ingress.hpp index 060446b79..f37292f73 100644 --- a/cpp/mrc/include/mrc/manifold/ingress.hpp +++ b/cpp/mrc/include/mrc/manifold/ingress.hpp @@ -31,6 +31,7 @@ struct IngressDelegate { virtual ~IngressDelegate() = default; virtual void add_input(const SegmentAddress& address, edge::IWritableAcceptorBase* input_source) = 0; + virtual void shutdown(){}; }; template @@ -51,6 +52,12 @@ class TypedIngress : public IngressDelegate template class MuxedIngress : public node::Muxer, public TypedIngress { + public: + void shutdown() final + { + node::SourceProperties::release_edge_connection(); + } + protected: void do_add_input(const SegmentAddress& address, edge::IWritableAcceptor* source) final { diff --git a/cpp/mrc/include/mrc/manifold/interface.hpp b/cpp/mrc/include/mrc/manifold/interface.hpp index 5c3d28fa4..706487091 100644 --- a/cpp/mrc/include/mrc/manifold/interface.hpp +++ b/cpp/mrc/include/mrc/manifold/interface.hpp @@ -27,9 +27,11 @@ struct Interface virtual ~Interface() = default; virtual const PortName& port_name() const = 0; + virtual const std::string& info() const = 0; - virtual void start() = 0; - virtual void join() = 0; + virtual void start() = 0; + virtual void join() = 0; + virtual void shutdown() = 0; virtual void add_input(const SegmentAddress& address, edge::IWritableAcceptorBase* input_source) = 0; virtual void add_output(const SegmentAddress& address, edge::IWritableProviderBase* output_sink) = 0; diff --git a/cpp/mrc/include/mrc/manifold/manifold.hpp b/cpp/mrc/include/mrc/manifold/manifold.hpp index 4cb567341..046934331 100644 --- a/cpp/mrc/include/mrc/manifold/manifold.hpp +++ b/cpp/mrc/include/mrc/manifold/manifold.hpp @@ -39,12 +39,12 @@ class Manifold : public Interface ~Manifold() override; const PortName& port_name() const final; + const std::string& info() const final; + void shutdown() override; protected: runnable::IRunnableResources& resources(); - const std::string& info() const; - private: void add_input(const SegmentAddress& address, edge::IWritableAcceptorBase* input_source) final; void add_output(const SegmentAddress& address, edge::IWritableProviderBase* output_sink) final; diff --git a/cpp/mrc/include/mrc/segment/forward.hpp b/cpp/mrc/include/mrc/segment/forward.hpp index 7ce590512..0d5f0269d 100644 --- a/cpp/mrc/include/mrc/segment/forward.hpp +++ b/cpp/mrc/include/mrc/segment/forward.hpp @@ -30,6 +30,9 @@ class Object; template class Runnable; +template +class Component; + template class EgressPorts; diff --git a/cpp/mrc/src/internal/pipeline/pipeline_instance.cpp b/cpp/mrc/src/internal/pipeline/pipeline_instance.cpp index 97f886193..24a31f1f5 100644 --- a/cpp/mrc/src/internal/pipeline/pipeline_instance.cpp +++ b/cpp/mrc/src/internal/pipeline/pipeline_instance.cpp @@ -201,11 +201,14 @@ void PipelineInstance::do_service_stop() void PipelineInstance::do_service_kill() { + LOG(INFO) << "PipelineInstance::do_service_kill"; mark_joinable(); for (auto& [id, segment] : m_segments) { + LOG(INFO) << "\tstopping: " << id; stop_segment(id); segment->service_kill(); + LOG(INFO) << "\tstopped: " << id; } } diff --git a/cpp/mrc/src/internal/segment/builder_definition.cpp b/cpp/mrc/src/internal/segment/builder_definition.cpp index d40946727..31250e467 100644 --- a/cpp/mrc/src/internal/segment/builder_definition.cpp +++ b/cpp/mrc/src/internal/segment/builder_definition.cpp @@ -262,7 +262,7 @@ void BuilderDefinition::initialize() // construct ingress ports for (const auto& [name, initializer] : this->definition().ingress_initializers()) { - DVLOG(10) << "constructing ingress_port: " << name; + LOG(INFO) << "constructing ingress_port: " << name; auto port = initializer(address); this->add_object(name, port); } @@ -270,7 +270,7 @@ void BuilderDefinition::initialize() // construct egress ports for (const auto& [name, initializer] : this->definition().egress_initializers()) { - DVLOG(10) << "constructing egress_port: " << name; + LOG(INFO) << "constructing egress_port: " << name; auto port = initializer(address); this->add_object(name, port); } @@ -292,6 +292,7 @@ void BuilderDefinition::initialize() shutdown(); // Rethrow after logging + LOG(ERROR) << "rethrowing exception"; std::rethrow_exception(std::current_exception()); } } diff --git a/cpp/mrc/src/internal/segment/segment_instance.cpp b/cpp/mrc/src/internal/segment/segment_instance.cpp index bebe1a969..e48abc9a3 100644 --- a/cpp/mrc/src/internal/segment/segment_instance.cpp +++ b/cpp/mrc/src/internal/segment/segment_instance.cpp @@ -286,6 +286,7 @@ void SegmentInstance::attach_manifold(std::shared_ptr manif { DVLOG(10) << info() << " attaching manifold for egress port " << port_name; search->second->connect_to_manifold(std::move(manifold)); + m_manifolds.push_back(manifold); return; } } @@ -296,6 +297,7 @@ void SegmentInstance::attach_manifold(std::shared_ptr manif { DVLOG(10) << info() << " attaching manifold for ingress port " << port_name; search->second->connect_to_manifold(std::move(manifold)); + m_manifolds.push_back(manifold); return; } } @@ -334,6 +336,20 @@ std::shared_ptr SegmentInstance::create_manifold(const Port void SegmentInstance::shutdown() { std::lock_guard lock(m_mutex); + LOG(INFO) << info() << " shutting down segment checking for manifolds: " << m_manifolds.size(); + for (const auto& weak_manifold : m_manifolds) + { + auto manifold = weak_manifold.lock(); + if (manifold) + { + LOG(INFO) << info() << " shutting down manifold: " << manifold->info(); + manifold->shutdown(); + } + else + { + LOG(INFO) << info() << " manifold has already been shutdown"; + } + } m_builder->shutdown(); } diff --git a/cpp/mrc/src/internal/segment/segment_instance.hpp b/cpp/mrc/src/internal/segment/segment_instance.hpp index 7d18fdd4e..4ae5abcea 100644 --- a/cpp/mrc/src/internal/segment/segment_instance.hpp +++ b/cpp/mrc/src/internal/segment/segment_instance.hpp @@ -83,6 +83,7 @@ class SegmentInstance final : public Service std::map> m_runners; std::map> m_egress_runners; std::map> m_ingress_runners; + std::vector> m_manifolds; mutable std::mutex m_mutex; }; diff --git a/cpp/mrc/src/public/manifold/manifold.cpp b/cpp/mrc/src/public/manifold/manifold.cpp index a1a3cca25..3b4aba3eb 100644 --- a/cpp/mrc/src/public/manifold/manifold.cpp +++ b/cpp/mrc/src/public/manifold/manifold.cpp @@ -50,6 +50,8 @@ const std::string& Manifold::info() const return m_info; } +void Manifold::shutdown() {} + void Manifold::add_input(const SegmentAddress& address, edge::IWritableAcceptorBase* input_source) { DVLOG(3) << "manifold " << this->port_name() << ": connecting to upstream segment " << segment::info(address); diff --git a/cpp/mrc/tests/test_pipeline.cpp b/cpp/mrc/tests/test_pipeline.cpp index 9ded8d65a..edd6c3b12 100644 --- a/cpp/mrc/tests/test_pipeline.cpp +++ b/cpp/mrc/tests/test_pipeline.cpp @@ -29,11 +29,14 @@ #include "mrc/segment/ingress_ports.hpp" #include "mrc/segment/object.hpp" +#include +#include #include #include #include #include +#include // todo remove #include #include #include @@ -42,6 +45,8 @@ namespace mrc { +using namespace std::literals; + class TestPipeline : public ::testing::Test { protected: @@ -208,7 +213,9 @@ TEST_F(TestPipeline, SegmentInitErrorHandlingFirstSeg) auto my_float_egress = seg.get_egress("float_port"); + LOG(INFO) << "\n\n********* Making edge"; seg.make_edge(rx_source, my_float_egress); + LOG(INFO) << "\n\n********* Throwing exception"; throw std::runtime_error("Error in initializer"); }); @@ -228,7 +235,9 @@ TEST_F(TestPipeline, SegmentInitErrorHandlingFirstSeg) "called"; })); + LOG(INFO) << "\n\n********* Seg 2 Making edge"; seg.make_edge(my_float_ingress, rx_sink); + LOG(INFO) << "\n\n********* Seg 2 Making edge - done"; }); Executor exec(std::move(m_options)); From a0f65fdc1566cde2b5e499612acd415ae496c479 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 28 Sep 2023 11:46:16 -0700 Subject: [PATCH 26/29] Release edge created in constructor [no ci] --- cpp/mrc/include/mrc/node/queue.hpp | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/cpp/mrc/include/mrc/node/queue.hpp b/cpp/mrc/include/mrc/node/queue.hpp index 81038feaf..e35a47366 100644 --- a/cpp/mrc/include/mrc/node/queue.hpp +++ b/cpp/mrc/include/mrc/node/queue.hpp @@ -33,7 +33,12 @@ class Queue : public WritableProvider, public ReadableProvider { this->set_channel(std::make_unique>()); } - ~Queue() override = default; + + ~Queue() override + { + SinkProperties::release_edge_connection(); + SourceProperties::release_edge_connection(); + }; void set_channel(std::unique_ptr> channel) { From 2289f42554f669375ce71499b4a37c296cb85017 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 28 Sep 2023 13:13:58 -0700 Subject: [PATCH 27/29] Remove debug logging --- cpp/mrc/src/internal/segment/builder_definition.cpp | 5 ----- 1 file changed, 5 deletions(-) diff --git a/cpp/mrc/src/internal/segment/builder_definition.cpp b/cpp/mrc/src/internal/segment/builder_definition.cpp index 31250e467..56d261e73 100644 --- a/cpp/mrc/src/internal/segment/builder_definition.cpp +++ b/cpp/mrc/src/internal/segment/builder_definition.cpp @@ -328,15 +328,10 @@ void BuilderDefinition::shutdown() port->destroy(); } - LOG(ERROR) << "********* <--- 0"; m_ingress_ports.clear(); - LOG(ERROR) << "********* <--- 1"; m_egress_ports.clear(); - LOG(ERROR) << "********* <--- 2+"; m_nodes.clear(); - LOG(ERROR) << "********* <--- 2.5"; m_objects.clear(); - LOG(ERROR) << "********* <--- 3"; } const std::map>& BuilderDefinition::nodes() const From 5d0d6a9386671a008e03d838e74c1545d28bd0b7 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 28 Sep 2023 13:17:35 -0700 Subject: [PATCH 28/29] WIP: DO NOT MERGE --- python/mrc/_pymrc/include/pymrc/node.hpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/python/mrc/_pymrc/include/pymrc/node.hpp b/python/mrc/_pymrc/include/pymrc/node.hpp index f5d72e7c3..663befc7f 100644 --- a/python/mrc/_pymrc/include/pymrc/node.hpp +++ b/python/mrc/_pymrc/include/pymrc/node.hpp @@ -397,6 +397,11 @@ class PythonSourceComponent : public node::LambdaSourceComponent, public: using base_t::base_t; + + ~PythonSourceComponent() + { + this->release_edge_connection(); + } }; class SegmentObjectProxy From 7d69c5d6258b464c476b219443b8964decba2da9 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 28 Sep 2023 13:17:55 -0700 Subject: [PATCH 29/29] Enable logging in test [no ci] --- python/tests/test_edges.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/python/tests/test_edges.py b/python/tests/test_edges.py index 98ed11d0e..be4ab4735 100644 --- a/python/tests/test_edges.py +++ b/python/tests/test_edges.py @@ -14,6 +14,7 @@ # limitations under the License. import itertools +import logging import typing import pytest @@ -23,6 +24,9 @@ import mrc.core.operators as ops import mrc.tests.test_edges_cpp as m +mrc.logging.init_logging("test_edges") +mrc.logging.set_level(logging.INFO) + @pytest.fixture def ex_runner():