Skip to content

Commit

Permalink
add python-coroutine-based test for asynciorunnable
Browse files Browse the repository at this point in the history
  • Loading branch information
cwharris committed Nov 1, 2023
1 parent efd7fe9 commit e509bbb
Showing 1 changed file with 94 additions and 6 deletions.
100 changes: 94 additions & 6 deletions python/mrc/_pymrc/tests/test_asyncio_runnable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "pymrc/executor.hpp"
#include "pymrc/pipeline.hpp"
#include "pymrc/types.hpp"
#include "pymrc/utilities/object_wrappers.hpp"

#include "mrc/coroutines/async_generator.hpp"
#include "mrc/node/rx_sink.hpp"
Expand All @@ -34,6 +35,9 @@

#include <boost/fiber/policy.hpp>
#include <gtest/gtest.h>
#include <pybind11/embed.h>
#include <pybind11/eval.h>
#include <pybind11/gil.h>
#include <pybind11/pybind11.h>
#include <pybind11/pytypes.h>
#include <rxcpp/rx.hpp>
Expand All @@ -55,30 +59,114 @@ class MyAsyncioRunnable : public pymrc::AsyncioRunnable<int, unsigned int>
mrc::coroutines::AsyncGenerator<unsigned int> on_data(int&& value) override
{
co_yield value;
co_yield value * 2;
};
};

TEST_F(TestAsyncioRunnable, Execute)
// TEST_F(TestAsyncioRunnable, YieldMultipleValues)
// {
// std::atomic<unsigned int> counter = 0;
// pymrc::Pipeline p;

// pybind11::module_::import("mrc.core.coro");

// auto init = [&counter](mrc::segment::IBuilder& seg) {
// auto src = seg.make_source<int>("src", [](rxcpp::subscriber<int>& s) {
// if (s.is_subscribed())
// {
// s.on_next(5);
// s.on_next(10);
// }

// s.on_completed();
// });

// auto internal = seg.construct_object<MyAsyncioRunnable>("internal");

// auto sink = seg.make_sink<unsigned int>("sink", [&counter](unsigned int x) {
// counter.fetch_add(x, std::memory_order_relaxed);
// });

// seg.make_edge(src, internal);
// seg.make_edge(internal, sink);
// };

// p.make_segment("seg1"s, init);
// p.make_segment("seg2"s, init);

// auto options = std::make_shared<mrc::Options>();
// options->topology().user_cpuset("0");
// // AsyncioRunnable only works with the Thread engine due to asyncio loops being thread-specific.
// options->engine_factories().set_default_engine_type(runnable::EngineType::Thread);

// pymrc::Executor exec{options};
// exec.register_pipeline(p);

// exec.start();
// exec.join();

// EXPECT_EQ(counter, 30);
// }

class PythonFlatmapAsyncioRunnable : public pymrc::AsyncioRunnable<int, int>
{
public:
PythonFlatmapAsyncioRunnable(pymrc::PyObjectHolder operation) : m_operation(std::move(operation)) {}

mrc::coroutines::AsyncGenerator<int> on_data(int&& value) override
{
py::gil_scoped_acquire acquire;

auto coroutine = m_operation(py::cast(value));

pymrc::PyObjectHolder result;
{
py::gil_scoped_release release;

result = co_await pymrc::coro::PyTaskToCppAwaitable(std::move(coroutine));
}

auto result_casted = py::cast<int>(result);

py::gil_scoped_release release;

co_yield result_casted;
};

private:
pymrc::PyObjectHolder m_operation;
};

TEST_F(TestAsyncioRunnable, UseAsyncioTasks)
{
py::object globals = py::globals();
py::exec(
"\
async def fn(value): \
return value \
",
globals);

pymrc::PyObjectHolder fn = static_cast<py::object>(globals["fn"]);

ASSERT_FALSE(fn.is_none());

std::atomic<unsigned int> counter = 0;
pymrc::Pipeline p;

pybind11::module_::import("mrc.core.coro");

auto init = [&counter](mrc::segment::IBuilder& seg) {
auto init = [&counter, &fn](mrc::segment::IBuilder& seg) {
auto src = seg.make_source<int>("src", [](rxcpp::subscriber<int>& s) {
if (s.is_subscribed())
{
s.on_next(5);
s.on_next(10);
s.on_next(7);
}

s.on_completed();
});

auto internal = seg.construct_object<MyAsyncioRunnable>("internal");
auto internal = seg.construct_object<PythonFlatmapAsyncioRunnable>("internal", fn);

auto sink = seg.make_sink<unsigned int>("sink", [&counter](unsigned int x) {
counter.fetch_add(x, std::memory_order_relaxed);
Expand All @@ -102,5 +190,5 @@ TEST_F(TestAsyncioRunnable, Execute)
exec.start();
exec.join();

EXPECT_EQ(counter, 132);
EXPECT_EQ(counter, 30);
}

0 comments on commit e509bbb

Please sign in to comment.