diff --git a/python/mrc/_pymrc/include/pymrc/asyncio_runnable.hpp b/python/mrc/_pymrc/include/pymrc/asyncio_runnable.hpp index 719f92b90..18000aa67 100644 --- a/python/mrc/_pymrc/include/pymrc/asyncio_runnable.hpp +++ b/python/mrc/_pymrc/include/pymrc/asyncio_runnable.hpp @@ -103,40 +103,6 @@ class BoostFutureAwaitableOperation std::function m_fn; }; -template -class BoostFutureReader -{ - public: - template - BoostFutureReader(FuncT&& fn) : m_awaiter(std::forward(fn)) - {} - - coroutines::Task async_read(T& value) - { - co_return co_await m_awaiter(std::ref(value)); - } - - private: - BoostFutureAwaitableOperation m_awaiter; -}; - -template -class BoostFutureWriter -{ - public: - template - BoostFutureWriter(FuncT&& fn) : m_awaiter(std::forward(fn)) - {} - - coroutines::Task async_write(T&& value) - { - co_return co_await m_awaiter(std::move(value)); - } - - private: - BoostFutureAwaitableOperation m_awaiter; -}; - template class CoroutineRunnableSink : public mrc::node::WritableProvider, public mrc::node::ReadableAcceptor, @@ -144,7 +110,7 @@ class CoroutineRunnableSink : public mrc::node::WritableProvider, { protected: CoroutineRunnableSink() : - m_reader([this](T& value) { + m_read_async([this](T& value) { return this->get_readable_edge()->await_read(value); }) { @@ -152,13 +118,13 @@ class CoroutineRunnableSink : public mrc::node::WritableProvider, this->set_channel(std::make_unique>()); } - coroutines::Task async_read(T& value) + coroutines::Task read_async(T& value) { - co_return co_await m_reader.async_read(std::ref(value)); + co_return co_await m_read_async(std::ref(value)); } private: - BoostFutureReader m_reader; + BoostFutureAwaitableOperation m_read_async; }; template @@ -168,7 +134,7 @@ class CoroutineRunnableSource : public mrc::node::WritableAcceptor, { protected: CoroutineRunnableSource() : - m_writer([this](T&& value) { + m_write_async([this](T&& value) { return this->get_writable_edge()->await_write(std::move(value)); }) { @@ -176,13 +142,13 @@ class CoroutineRunnableSource : public mrc::node::WritableAcceptor, this->set_channel(std::make_unique>()); } - coroutines::Task async_write(T&& value) + coroutines::Task write_async(T&& value) { - co_return co_await m_writer.async_write(std::move(value)); + co_return co_await m_write_async(std::move(value)); } private: - BoostFutureWriter m_writer; + BoostFutureAwaitableOperation m_write_async; }; template @@ -245,7 +211,7 @@ coroutines::Task<> AsyncioRunnable::main_task(std::shared_ptrasync_read(data); + auto read_status = co_await this->read_async(data); if (read_status != mrc::channel::Status::success) { @@ -289,7 +255,7 @@ coroutines::Task<> AsyncioRunnable::process_one(InputT&& value, // Weird bug, cant directly move the value into the async_write call auto data = std::move(*iter); - co_await this->async_write(std::move(data)); + co_await this->write_async(std::move(data)); // Advance the iterator co_await ++iter;