Skip to content

Commit

Permalink
simplify asyncio_runnable
Browse files Browse the repository at this point in the history
  • Loading branch information
cwharris committed Nov 1, 2023
1 parent a0d136e commit e1da112
Showing 1 changed file with 10 additions and 44 deletions.
54 changes: 10 additions & 44 deletions python/mrc/_pymrc/include/pymrc/asyncio_runnable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,62 +103,28 @@ class BoostFutureAwaitableOperation
std::function<SignatureT> m_fn;
};

template <typename T>
class BoostFutureReader
{
public:
template <typename FuncT>
BoostFutureReader(FuncT&& fn) : m_awaiter(std::forward<FuncT>(fn))
{}

coroutines::Task<mrc::channel::Status> async_read(T& value)
{
co_return co_await m_awaiter(std::ref(value));
}

private:
BoostFutureAwaitableOperation<mrc::channel::Status(T&)> m_awaiter;
};

template <typename T>
class BoostFutureWriter
{
public:
template <typename FuncT>
BoostFutureWriter(FuncT&& fn) : m_awaiter(std::forward<FuncT>(fn))
{}

coroutines::Task<mrc::channel::Status> async_write(T&& value)
{
co_return co_await m_awaiter(std::move(value));
}

private:
BoostFutureAwaitableOperation<mrc::channel::Status(T&&)> m_awaiter;
};

template <typename T>
class CoroutineRunnableSink : public mrc::node::WritableProvider<T>,
public mrc::node::ReadableAcceptor<T>,
public mrc::node::SinkChannelOwner<T>
{
protected:
CoroutineRunnableSink() :
m_reader([this](T& value) {
m_read_async([this](T& value) {
return this->get_readable_edge()->await_read(value);
})
{
// Set the default channel
this->set_channel(std::make_unique<mrc::channel::BufferedChannel<T>>());
}

coroutines::Task<mrc::channel::Status> async_read(T& value)
coroutines::Task<mrc::channel::Status> read_async(T& value)
{
co_return co_await m_reader.async_read(std::ref(value));
co_return co_await m_read_async(std::ref(value));
}

private:
BoostFutureReader<T> m_reader;
BoostFutureAwaitableOperation<mrc::channel::Status(T&)> m_read_async;
};

template <typename T>
Expand All @@ -168,21 +134,21 @@ class CoroutineRunnableSource : public mrc::node::WritableAcceptor<T>,
{
protected:
CoroutineRunnableSource() :
m_writer([this](T&& value) {
m_write_async([this](T&& value) {
return this->get_writable_edge()->await_write(std::move(value));
})
{
// Set the default channel
this->set_channel(std::make_unique<mrc::channel::BufferedChannel<T>>());
}

coroutines::Task<mrc::channel::Status> async_write(T&& value)
coroutines::Task<mrc::channel::Status> write_async(T&& value)
{
co_return co_await m_writer.async_write(std::move(value));
co_return co_await m_write_async(std::move(value));
}

private:
BoostFutureWriter<T> m_writer;
BoostFutureAwaitableOperation<mrc::channel::Status(T&&)> m_write_async;
};

template <typename InputT, typename OutputT>
Expand Down Expand Up @@ -245,7 +211,7 @@ coroutines::Task<> AsyncioRunnable<InputT, OutputT>::main_task(std::shared_ptr<m
{
InputT data;

auto read_status = co_await this->async_read(data);
auto read_status = co_await this->read_async(data);

if (read_status != mrc::channel::Status::success)
{
Expand Down Expand Up @@ -289,7 +255,7 @@ coroutines::Task<> AsyncioRunnable<InputT, OutputT>::process_one(InputT&& value,
// Weird bug, cant directly move the value into the async_write call
auto data = std::move(*iter);

co_await this->async_write(std::move(data));
co_await this->write_async(std::move(data));

// Advance the iterator
co_await ++iter;
Expand Down

0 comments on commit e1da112

Please sign in to comment.