Skip to content

Commit

Permalink
Support seek operation on a multi-topics consumer
Browse files Browse the repository at this point in the history
### Motivation

See apache/pulsar-client-python#213

### Modifications

Add a new `forEachValue` overload that allows users to count the number
of rest running tasks through `SharedFuture` to `SynchronizedHashMap`.
Leverage this overload in seek operations when the argument is a
timestamp, or a MessageId that represents earliest or latest. When the
argument is a MessageId whose `getTopicName()` method returns a correct
topic name, seek on the internal consumer of that topic.

Add `testMultiTopicsSeekAll` and `testMultiTopicsSeekSingle` to
`ConsumerSeekTest` to cover these cases.
  • Loading branch information
BewareMyPower committed May 20, 2024
1 parent 3f0b33b commit 07ac4a9
Show file tree
Hide file tree
Showing 5 changed files with 371 additions and 149 deletions.
135 changes: 60 additions & 75 deletions lib/MultiTopicsConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -338,41 +338,23 @@ void MultiTopicsConsumerImpl::unsubscribeAsync(ResultCallback originalCallback)
}
state_ = Closing;

std::shared_ptr<std::atomic<int>> consumerUnsubed = std::make_shared<std::atomic<int>>(0);
auto self = get_shared_this_ptr();
int numConsumers = 0;
consumers_.forEachValue(
[&numConsumers, &consumerUnsubed, &self, callback](const ConsumerImplPtr& consumer) {
numConsumers++;
consumer->unsubscribeAsync([self, consumerUnsubed, callback](Result result) {
self->handleUnsubscribedAsync(result, consumerUnsubed, callback);
[this, self, callback](const ConsumerImplPtr& consumer, SharedFuture future) {
consumer->unsubscribeAsync([this, self, callback, future](Result result) {
if (result != ResultOk) {
state_ = Failed;
LOG_ERROR("Error Closing one of the consumers in TopicsConsumer, result: "
<< result << " subscription - " << subscriptionName_);
}
if (future.tryComplete()) {
LOG_DEBUG("Unsubscribed all of the partition consumer for TopicsConsumer. - "
<< consumerStr_);
callback((state_ != Failed) ? ResultOk : ResultUnknownError);
}
});
});
if (numConsumers == 0) {
// No need to unsubscribe, since the list matching the regex was empty
callback(ResultOk);
}
}

void MultiTopicsConsumerImpl::handleUnsubscribedAsync(Result result,
std::shared_ptr<std::atomic<int>> consumerUnsubed,
ResultCallback callback) {
(*consumerUnsubed)++;

if (result != ResultOk) {
state_ = Failed;
LOG_ERROR("Error Closing one of the consumers in TopicsConsumer, result: "
<< result << " subscription - " << subscriptionName_);
}

if (consumerUnsubed->load() == numberTopicPartitions_->load()) {
LOG_DEBUG("Unsubscribed all of the partition consumer for TopicsConsumer. - " << consumerStr_);
Result result1 = (state_ != Failed) ? ResultOk : ResultUnknownError;
// The `callback` is a wrapper of user provided callback, it's not null and will call `shutdown()` if
// unsubscribe succeeds.
callback(result1);
return;
}
},
[callback] { callback(ResultOk); });
}

void MultiTopicsConsumerImpl::unsubscribeOneTopicAsync(const std::string& topic, ResultCallback callback) {
Expand Down Expand Up @@ -841,44 +823,47 @@ void MultiTopicsConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCal
callback(ResultConsumerNotInitialized, BrokerConsumerStats());
return;
}

Lock lock(mutex_);
MultiTopicsBrokerConsumerStatsPtr statsPtr =
std::make_shared<MultiTopicsBrokerConsumerStatsImpl>(numberTopicPartitions_->load());
LatchPtr latchPtr = std::make_shared<Latch>(numberTopicPartitions_->load());
lock.unlock();

size_t i = 0;
consumers_.forEachValue([this, &latchPtr, &statsPtr, &i, callback](const ConsumerImplPtr& consumer) {
size_t index = i++;
auto weakSelf = weak_from_this();
consumer->getBrokerConsumerStatsAsync(
[this, weakSelf, latchPtr, statsPtr, index, callback](Result result, BrokerConsumerStats stats) {
// TODO: fix the thread safety issue if numberTopicPartitions_ was changed here
consumers_.forEachValue(
[this, statsPtr, &i, callback](const ConsumerImplPtr& consumer, SharedFuture future) {
size_t index = i++;
auto weakSelf = weak_from_this();
consumer->getBrokerConsumerStatsAsync([this, weakSelf, future, statsPtr, index, callback](
Result result, BrokerConsumerStats stats) {
auto self = weakSelf.lock();
if (self) {
handleGetConsumerStats(result, stats, latchPtr, statsPtr, index, callback);
handleGetConsumerStats(result, stats, future, statsPtr, index, callback);
}
});
});
},
[callback] { callback(ResultOk, BrokerConsumerStats{}); });
}

void MultiTopicsConsumerImpl::getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback) {
callback(ResultOperationNotSupported, GetLastMessageIdResponse());
}

void MultiTopicsConsumerImpl::handleGetConsumerStats(Result res, BrokerConsumerStats brokerConsumerStats,
LatchPtr latchPtr,
SharedFuture future,
MultiTopicsBrokerConsumerStatsPtr statsPtr, size_t index,
BrokerConsumerStatsCallback callback) {
Lock lock(mutex_);
bool completed = false;
if (res == ResultOk) {
latchPtr->countdown();
completed = future.tryComplete();
statsPtr->add(brokerConsumerStats, index);
} else {
lock.unlock();
callback(res, BrokerConsumerStats());
return;
}
if (latchPtr->getCount() == 0) {
if (completed) {
lock.unlock();
callback(ResultOk, BrokerConsumerStats(statsPtr));
}
Expand All @@ -899,50 +884,50 @@ std::shared_ptr<TopicName> MultiTopicsConsumerImpl::topicNamesValid(const std::v
return topicNamePtr;
}

void MultiTopicsConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) {
callback(ResultOperationNotSupported);
}

void MultiTopicsConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {
if (state_ != Ready) {
callback(ResultAlreadyClosed);
return;
}

void MultiTopicsConsumerImpl::beforeSeek() {
duringSeek_.store(true, std::memory_order_release);
consumers_.forEachValue([](const ConsumerImplPtr& consumer) { consumer->pauseMessageListener(); });
unAckedMessageTrackerPtr_->clear();
incomingMessages_.clear();
incomingMessagesSize_ = 0L;
}

void MultiTopicsConsumerImpl::afterSeek() {
duringSeek_.store(false, std::memory_order_release);
auto self = get_shared_this_ptr();
listenerExecutor_->postWork([this, self] {
consumers_.forEachValue([](const ConsumerImplPtr& consumer) { consumer->resumeMessageListener(); });
});
}

void MultiTopicsConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) {
if (msgId == MessageId::earliest() || msgId == MessageId::latest()) {
return seekAllAsync(msgId, callback);
}

auto optConsumer = consumers_.find(msgId.getTopicName());
if (!optConsumer) {
callback(ResultOperationNotSupported);
return;
}

beforeSeek();
auto weakSelf = weak_from_this();
auto numConsumersLeft = std::make_shared<std::atomic<int64_t>>(consumers_.size());
auto wrappedCallback = [this, weakSelf, callback, numConsumersLeft](Result result) {
optConsumer.get()->seekAsync(msgId, [this, weakSelf, callback](Result result) {
auto self = weakSelf.lock();
if (PULSAR_UNLIKELY(!self)) {
callback(result);
return;
}
if (result != ResultOk) {
*numConsumersLeft = 0; // skip the following callbacks
if (self) {
afterSeek();
callback(result);
return;
}
if (--*numConsumersLeft > 0) {
return;
} else {
callback(ResultAlreadyClosed);
}
duringSeek_.store(false, std::memory_order_release);
listenerExecutor_->postWork([this, self] {
consumers_.forEachValue(
[](const ConsumerImplPtr& consumer) { consumer->resumeMessageListener(); });
});
callback(ResultOk);
};
consumers_.forEachValue([timestamp, &wrappedCallback](const ConsumerImplPtr& consumer) {
consumer->seekAsync(timestamp, wrappedCallback);
});
}

void MultiTopicsConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {
seekAllAsync(timestamp, callback);
}

void MultiTopicsConsumerImpl::setNegativeAcknowledgeEnabledForTesting(bool enabled) {
consumers_.forEachValue([enabled](const ConsumerImplPtr& consumer) {
consumer->setNegativeAcknowledgeEnabledForTesting(enabled);
Expand Down
54 changes: 49 additions & 5 deletions lib/MultiTopicsConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#include <vector>

#include "Commands.h"
#include "ConsumerImplBase.h"
#include "ConsumerImpl.h"
#include "ConsumerInterceptors.h"
#include "Future.h"
#include "Latch.h"
Expand All @@ -38,7 +38,6 @@
namespace pulsar {
typedef std::shared_ptr<Promise<Result, Consumer>> ConsumerSubResultPromisePtr;

class ConsumerImpl;
using ConsumerImplPtr = std::shared_ptr<ConsumerImpl>;
class ClientImpl;
using ClientImplPtr = std::shared_ptr<ClientImpl>;
Expand Down Expand Up @@ -99,7 +98,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
uint64_t getNumberOfConnectedConsumer() override;
void hasMessageAvailableAsync(HasMessageAvailableCallback callback) override;

void handleGetConsumerStats(Result, BrokerConsumerStats, LatchPtr, MultiTopicsBrokerConsumerStatsPtr,
void handleGetConsumerStats(Result, BrokerConsumerStats, SharedFuture, MultiTopicsBrokerConsumerStatsPtr,
size_t, BrokerConsumerStatsCallback);
// return first topic name when all topics name valid, or return null pointer
static std::shared_ptr<TopicName> topicNamesValid(const std::vector<std::string>& topics);
Expand Down Expand Up @@ -152,8 +151,6 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
void handleSingleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
std::shared_ptr<std::atomic<int>> partitionsNeedCreate,
ConsumerSubResultPromisePtr topicSubResultPromise);
void handleUnsubscribedAsync(Result result, std::shared_ptr<std::atomic<int>> consumerUnsubed,
ResultCallback callback);
void handleOneTopicUnsubscribedAsync(Result result, std::shared_ptr<std::atomic<int>> consumerUnsubed,
int numberPartitions, TopicNamePtr topicNamePtr,
std::string& topicPartitionName, ResultCallback callback);
Expand All @@ -179,6 +176,16 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
return std::static_pointer_cast<MultiTopicsConsumerImpl>(shared_from_this());
}

template <typename SeekArg>
#if __cplusplus >= 202002L
requires std::convertible_to<SeekArg, uint64_t> ||
std::same_as<std::remove_cv_t<std::remove_reference_t<SeekArg>>, MessageId>
#endif
void seekAllAsync(const SeekArg& seekArg, ResultCallback callback);

void beforeSeek();
void afterSeek();

FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
FRIEND_TEST(ConsumerTest, testAcknowledgeCumulativeWithPartition);
Expand All @@ -187,5 +194,42 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
};

typedef std::shared_ptr<MultiTopicsConsumerImpl> MultiTopicsConsumerImplPtr;

template <typename SeekArg>
#if __cplusplus >= 202002L
requires std::convertible_to<SeekArg, uint64_t> ||
std::same_as<std::remove_cv_t<std::remove_reference_t<SeekArg>>, MessageId>
#endif
void MultiTopicsConsumerImpl::seekAllAsync(const SeekArg& seekArg, ResultCallback callback) {
if (state_ != Ready) {
callback(ResultAlreadyClosed);
return;
}
beforeSeek();
auto weakSelf = weak_from_this();
auto failed = std::make_shared<std::atomic_bool>(false);
consumers_.forEachValue(
[this, weakSelf, &seekArg, callback, failed](const ConsumerImplPtr& consumer, SharedFuture future) {
consumer->seekAsync(seekArg, [this, weakSelf, callback, failed, future](Result result) {
auto self = weakSelf.lock();
if (!self || failed->load(std::memory_order_acquire)) {
callback(result);
return;
}
if (result != ResultOk) {
failed->store(true, std::memory_order_release); // skip the following callbacks
afterSeek();
callback(result);
return;
}
if (future.tryComplete()) {
afterSeek();
callback(ResultOk);
}
});
},
[callback] { callback(ResultOk); });
}

} // namespace pulsar
#endif // PULSAR_MULTI_TOPICS_CONSUMER_HEADER
67 changes: 63 additions & 4 deletions lib/SynchronizedHashMap.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,27 @@
*/
#pragma once

#include <atomic>
#include <boost/optional.hpp>
#include <functional>
#include <memory>
#include <mutex>
#include <unordered_map>
#include <utility>
#include <vector>

namespace pulsar {

class SharedFuture {
public:
SharedFuture(size_t size) : count_(std::make_shared<std::atomic_size_t>(size)) {}

bool tryComplete() const { return --*count_ == 0; }

private:
std::shared_ptr<std::atomic_size_t> count_;
};

// V must be default constructible and copyable
template <typename K, typename V>
class SynchronizedHashMap {
Expand Down Expand Up @@ -60,10 +72,57 @@ class SynchronizedHashMap {
}
}

void forEachValue(std::function<void(const V&)> f) const {
Lock lock(mutex_);
for (const auto& kv : data_) {
f(kv.second);
template <typename ValueFunc>
#if __cplusplus >= 202002L
requires requires(ValueFunc&& each, const V& value) {
each(value);
}
#endif
void forEachValue(ValueFunc&& each) {
Lock lock{mutex_};
for (auto&& kv : data_) {
each(kv.second);
}
}

// This override provides a convenient approach to execute tasks on each consumer concurrently and
// supports checking if all tasks are done in the `each` callback.
//
// All map values will be passed as the 1st argument to the `each` function. The 2nd argument is a shared
// future whose `tryComplete` method marks this task as completed. If users want to check if all task are
// completed in the `each` function, this method must be called.
//
// For example, given a `SynchronizedHashMap<int, std::string>` object `m` and the following call:
//
// ```c++
// m.forEachValue([](const std::string& s, SharedFuture future) {
// std::cout << s << std::endl;
// if (future.tryComplete()) {
// std::cout << "done" << std::endl;
// }
// }, [] { std::cout << "empty map" << std::endl; });
// ```
//
// If the map is empty, only "empty map" will be printed. Otherwise, all values will be printed
// and "done" will be printed after that.
template <typename ValueFunc, typename EmptyFunc>
#if __cplusplus >= 202002L
requires requires(ValueFunc&& each, const V& value, SharedFuture count, EmptyFunc emptyFunc) {
each(value, count);
emptyFunc();
}
#endif
void forEachValue(ValueFunc&& each, EmptyFunc&& emptyFunc) {
std::unique_lock<MutexType> lock{mutex_};
if (data_.empty()) {
lock.unlock();
emptyFunc();
return;
}
SharedFuture future{data_.size()};
for (auto&& kv : data_) {
const auto& value = kv.second;
each(value, future);
}
}

Expand Down
Loading

0 comments on commit 07ac4a9

Please sign in to comment.