Skip to content

Commit

Permalink
update ObservableProxy::pipe to support any number of operators
Browse files Browse the repository at this point in the history
  • Loading branch information
cwharris committed Sep 17, 2023
1 parent 9153243 commit dfea856
Showing 1 changed file with 10 additions and 64 deletions.
74 changes: 10 additions & 64 deletions python/mrc/_pymrc/src/subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,6 @@ PySubscription ObservableProxy::subscribe(PyObjectObservable* self, PyObjectSubs
return self->subscribe(subscriber);
}

template <typename... OpsT>
PyObjectObservable pipe_ops(const PyObjectObservable* self, OpsT&&... ops)
{
return (*self | ... | ops);
}

PyObjectObservable ObservableProxy::pipe(const PyObjectObservable* self, py::args args)
{
std::vector<PyObjectOperateFn> operators;
Expand All @@ -150,66 +144,18 @@ PyObjectObservable ObservableProxy::pipe(const PyObjectObservable* self, py::arg
operators.emplace_back(op.get_operate_fn());
}

switch (operators.size())
if (operators.empty()) {
throw std::runtime_error("pipe() must be given at least one argument");
}

auto result = *self | operators[0];

for (auto i = 1 ; i < operators.size(); i++)
{
case 1:
return pipe_ops(self, operators[0]);
case 2:
return pipe_ops(self, operators[0], operators[1]);
case 3:
return pipe_ops(self, operators[0], operators[1], operators[2]);
case 4:
return pipe_ops(self, operators[0], operators[1], operators[2], operators[3]);
case 5:
return pipe_ops(self, operators[0], operators[1], operators[2], operators[3], operators[4]);
case 6:
return pipe_ops(self, operators[0], operators[1], operators[2], operators[3], operators[4], operators[5]);
case 7:
return pipe_ops(self,
operators[0],
operators[1],
operators[2],
operators[3],
operators[4],
operators[5],
operators[6]);
case 8:
return pipe_ops(self,
operators[0],
operators[1],
operators[2],
operators[3],
operators[4],
operators[5],
operators[6],
operators[7]);
case 9:
return pipe_ops(self,
operators[0],
operators[1],
operators[2],
operators[3],
operators[4],
operators[5],
operators[6],
operators[7],
operators[8]);
case 10:
return pipe_ops(self,
operators[0],
operators[1],
operators[2],
operators[3],
operators[4],
operators[5],
operators[6],
operators[7],
operators[8],
operators[9]);
default:
// Not supported error
throw std::runtime_error("pipe() only supports up 10 arguments. Please use another pipe() to use more");
result = result | operators[i];
}

return result;
}

} // namespace mrc::pymrc

0 comments on commit dfea856

Please sign in to comment.