Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix NonDurable Subscription msgBackLog incorrect after … #23305

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1232,6 +1232,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
final long consumerEpoch = subscribe.hasConsumerEpoch() ? subscribe.getConsumerEpoch() : DEFAULT_CONSUMER_EPOCH;
final Optional<Map<String, String>> subscriptionProperties = SubscriptionOption.getPropertiesMap(
subscribe.getSubscriptionPropertiesList());
final boolean resetIncludeHead = subscribe.isResetIncludeHead();

if (log.isDebugEnabled()) {
log.debug("Topic name = {}, subscription name = {}, schema is {}", topicName, subscriptionName,
Expand Down Expand Up @@ -1356,6 +1357,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
.subscriptionProperties(subscriptionProperties)
.consumerEpoch(consumerEpoch)
.schemaType(schema == null ? null : schema.getType())
.resetIncludeHead(resetIncludeHead)
.build();
if (schema != null && schema.getType() != SchemaType.AUTO_CONSUME) {
return ignoreUnrecoverableBKException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class SubscriptionOption {
private Optional<Map<String, String>> subscriptionProperties;
private long consumerEpoch;
private SchemaType schemaType;
private boolean resetIncludeHead;

public static Optional<Map<String, String>> getPropertiesMap(List<KeyValue> list) {
if (list == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -887,7 +887,7 @@ public CompletableFuture<Consumer> subscribe(SubscriptionOption option) {
option.getInitialPosition(), option.getStartMessageRollbackDurationSec(),
option.isReplicatedSubscriptionStateArg(), option.getKeySharedMeta(),
option.getSubscriptionProperties().orElse(Collections.emptyMap()),
option.getConsumerEpoch(), option.getSchemaType());
option.getConsumerEpoch(), option.getSchemaType(), option.isResetIncludeHead());
}

private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, String subscriptionName,
Expand All @@ -901,7 +901,8 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St
KeySharedMeta keySharedMeta,
Map<String, String> subscriptionProperties,
long consumerEpoch,
SchemaType schemaType) {
SchemaType schemaType,
boolean resetIncludeHead) {
if (readCompacted && !(subType == SubType.Failover || subType == SubType.Exclusive)) {
return FutureUtil.failedFuture(new NotAllowedException(
"readCompacted only allowed on failover or exclusive subscriptions"));
Expand Down Expand Up @@ -984,7 +985,7 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St
? getDurableSubscription(subscriptionName, initialPosition, startMessageRollbackDurationSec,
replicatedSubscriptionState, subscriptionProperties)
: getNonDurableSubscription(subscriptionName, startMessageId, initialPosition,
startMessageRollbackDurationSec, readCompacted, subscriptionProperties);
startMessageRollbackDurationSec, readCompacted, subscriptionProperties, resetIncludeHead);

CompletableFuture<Consumer> future = subscriptionFuture.thenCompose(subscription -> {
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel,
Expand Down Expand Up @@ -1068,7 +1069,7 @@ public CompletableFuture<Consumer> subscribe(final TransportCnx cnx, String subs
KeySharedMeta keySharedMeta) {
return internalSubscribe(cnx, subscriptionName, consumerId, subType, priorityLevel, consumerName,
isDurable, startMessageId, metadata, readCompacted, initialPosition, startMessageRollbackDurationSec,
replicatedSubscriptionStateArg, keySharedMeta, null, DEFAULT_CONSUMER_EPOCH, null);
replicatedSubscriptionStateArg, keySharedMeta, null, DEFAULT_CONSUMER_EPOCH, null, false);
}

private CompletableFuture<Subscription> getDurableSubscription(String subscriptionName,
Expand Down Expand Up @@ -1134,7 +1135,7 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) {

private CompletableFuture<? extends Subscription> getNonDurableSubscription(String subscriptionName,
MessageId startMessageId, InitialPosition initialPosition, long startMessageRollbackDurationSec,
boolean isReadCompacted, Map<String, String> subscriptionProperties) {
boolean isReadCompacted, Map<String, String> subscriptionProperties, boolean resetIncludeHead) {
log.info("[{}][{}] Creating non-durable subscription at msg id {} - {}",
topic, subscriptionName, startMessageId, subscriptionProperties);

Expand All @@ -1157,7 +1158,8 @@ private CompletableFuture<? extends Subscription> getNonDurableSubscription(Stri
long entryId = msgId.getEntryId();
// Ensure that the start message id starts from a valid entry.
if (ledgerId >= 0 && entryId >= 0
&& msgId instanceof BatchMessageIdImpl) {
&& msgId instanceof BatchMessageIdImpl
&& (msgId.getBatchIndex() >= 0 || resetIncludeHead)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding resetIncludeHead should start a PIP. right now, we can only fix by adding msgId.getBatchIndex() >= 0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if we just add the msgId.getBatchIndex() >= 0, test of method ReaderBuilder#startMessageIdInclusive will fail

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will draft a pip to fix this issue later

// When the start message is relative to a batch, we need to take one step back on the previous
// message,
// because the "batch" might not have been consumed in its entirety.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,50 @@ public void testNonDurableSubscriptionRecovery(SubscriptionType subscriptionType

}

@Test
public void testNonDurableSubscriptionBackLogAfterTopicUnload() throws Exception {
String topicName = "persistent://my-property/my-ns/nonDurable-sub-test";
String subName = "test-sub";

admin.topics().createNonPartitionedTopic(topicName);
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();

@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName(subName)
.subscriptionMode(SubscriptionMode.NonDurable).subscribe();

// 1. send message
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}

assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName).getMsgBacklog(), 10);

// 2. receive the message
for (int i = 0; i < 10; i++) {
Message<byte[]> msg = consumer.receive();
consumer.acknowledge(msg);
}

// 3. consumed all messages and the msgBacklog is 0
Awaitility.await().untilAsserted(() ->
assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName).getMsgBacklog(), 0));

// 4. unload the topic
admin.topics().unload(topicName);

// 5. wait the consumer reconnect
Awaitility.await().until(() -> admin.topics().getStats(topicName).getSubscriptions() != null);

// 6. the backlog should still be 0
Awaitility.await().untilAsserted(() -> assertEquals(admin.topics().getStats(topicName).
getSubscriptions().get(subName).getMsgBacklog(), 0));
}

@Test
public void testFlowCountForMultiTopics() throws Exception {
String topicName = "persistent://my-property/my-ns/test-flow-count";
Expand Down Expand Up @@ -464,7 +508,7 @@ public void testInitReaderAtSpecifiedPosition() throws Exception {
// A middle ledger id, and entry id is "-1".
log.info("start test s8");
String s8 = "s8";
MessageIdImpl startMessageId8 = new MessageIdImpl(ledgers.get(2), 0, -1);
MessageIdImpl startMessageId8 = new MessageIdImpl(ledgers.get(2), -1, -1);
Reader<String> reader8 = pulsarClient.newReader(Schema.STRING).topic(topicName).subscriptionName(s8)
.receiverQueueSize(0).startMessageId(startMessageId8).create();
ManagedLedgerInternalStats.CursorStats cursor8 = admin.topics().getInternalStats(topicName).cursors.get(s8);
Expand Down Expand Up @@ -497,7 +541,7 @@ public void testInitReaderAtSpecifiedPosition() throws Exception {
ManagedLedgerInternalStats.CursorStats cursor10 = admin.topics().getInternalStats(topicName).cursors.get(s10);
log.info("cursor10 readPosition: {}, markDeletedPosition: {}", cursor10.readPosition, cursor10.markDeletePosition);
Position p10 = parseReadPosition(cursor10);
assertEquals(p10.getLedgerId(), ledgers.get(2));
assertEquals(p10.getLedgerId(), ledgers.get(3));
assertEquals(p10.getEntryId(), 0);
reader10.close();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -895,7 +895,7 @@ public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
InitialPosition.valueOf(subscriptionInitialPosition.getValue()),
startMessageRollbackDuration, si, createTopicIfDoesNotExist, conf.getKeySharedPolicy(),
// Use the current epoch to subscribe.
conf.getSubscriptionProperties(), CONSUMER_EPOCH.get(this));
conf.getSubscriptionProperties(), CONSUMER_EPOCH.get(this), resetIncludeHead);

cnx.sendRequestWithId(request, requestId).thenRun(() -> {
synchronized (ConsumerImpl.this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -588,15 +588,15 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu
return newSubscribe(topic, subscription, consumerId, requestId, subType, priorityLevel, consumerName,
isDurable, startMessageId, metadata, readCompacted, isReplicated, subscriptionInitialPosition,
startMessageRollbackDurationInSec, schemaInfo, createTopicIfDoesNotExist, null,
Collections.emptyMap(), DEFAULT_CONSUMER_EPOCH);
Collections.emptyMap(), DEFAULT_CONSUMER_EPOCH, false);
}

public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId,
SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageIdData startMessageId,
Map<String, String> metadata, boolean readCompacted, boolean isReplicated,
InitialPosition subscriptionInitialPosition, long startMessageRollbackDurationInSec,
SchemaInfo schemaInfo, boolean createTopicIfDoesNotExist, KeySharedPolicy keySharedPolicy,
Map<String, String> subscriptionProperties, long consumerEpoch) {
Map<String, String> subscriptionProperties, long consumerEpoch, boolean resetIncludeHead) {
BaseCommand cmd = localCmd(Type.SUBSCRIBE);
CommandSubscribe subscribe = cmd.setSubscribe()
.setTopic(topic)
Expand All @@ -611,7 +611,8 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu
.setInitialPosition(subscriptionInitialPosition)
.setReplicateSubscriptionState(isReplicated)
.setForceTopicCreation(createTopicIfDoesNotExist)
.setConsumerEpoch(consumerEpoch);
.setConsumerEpoch(consumerEpoch)
.setResetIncludeHead(resetIncludeHead);

if (subscriptionProperties != null && !subscriptionProperties.isEmpty()) {
List<KeyValue> keyValues = new ArrayList<>();
Expand Down
2 changes: 2 additions & 0 deletions pulsar-common/src/main/proto/PulsarApi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,8 @@ message CommandSubscribe {

// The consumer epoch, when exclusive and failover consumer redeliver unack message will increase the epoch
optional uint64 consumer_epoch = 19;

optional bool reset_include_head = 20 [default = false];
}

message CommandPartitionedTopicMetadata {
Expand Down
Loading