diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java index 9fd0518a054cc..a13b332e6eb5f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java @@ -83,13 +83,6 @@ public BrokerRegistryImpl(PulsarService pulsar) { this.brokerLookupDataMetadataCache = pulsar.getLocalMetadataStore().getMetadataCache(BrokerLookupData.class); this.scheduler = pulsar.getLoadManagerExecutor(); this.listeners = new ArrayList<>(); - // The registered node is an ephemeral node that could be deleted when the metadata store client's session - // is expired. In this case, we should register again. - this.listeners.add((broker, notificationType) -> { - if (notificationType == NotificationType.Deleted && getBrokerId().equals(broker)) { - registerAsync(); - } - }); this.brokerIdKeyPath = keyPath(pulsar.getBrokerId()); this.brokerLookupData = new BrokerLookupData( pulsar.getWebServiceAddress(), @@ -223,11 +216,16 @@ private void handleMetadataStoreNotification(Notification t) { if (log.isDebugEnabled()) { log.debug("Handle notification: [{}]", t); } + // The registered node is an ephemeral node that could be deleted when the metadata store client's session + // is expired. In this case, we should register again. + final var brokerId = t.getPath().substring(LOADBALANCE_BROKERS_ROOT.length() + 1); + if (t.getType() == NotificationType.Deleted && getBrokerId().equals(brokerId)) { + registerAsync(); + } if (listeners.isEmpty()) { return; } this.scheduler.submit(() -> { - String brokerId = t.getPath().substring(LOADBALANCE_BROKERS_ROOT.length() + 1); for (BiConsumer listener : listeners) { listener.accept(brokerId, t.getType()); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java index d6615a8a5b49b..232088afb94fe 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java @@ -37,7 +37,7 @@ import org.testng.annotations.Test; @Slf4j -@Test(groups = "flaky") +@Test(groups = "broker") public class BrokerRegistryIntegrationTest { private static final String clusterName = "test"; @@ -63,13 +63,18 @@ protected void setup() throws Exception { @AfterClass(alwaysRun = true) protected void cleanup() throws Exception { + final var startMs = System.currentTimeMillis(); if (pulsar != null) { pulsar.close(); } + final var elapsedMs = System.currentTimeMillis() - startMs; bk.stop(); + if (elapsedMs > 5000) { + throw new RuntimeException("Broker took " + elapsedMs + "ms to close"); + } } - @Test(enabled = false) + @Test public void testRecoverFromNodeDeletion() throws Exception { // Simulate the case that the node was somehow deleted (e.g. by session timeout) Awaitility.await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> Assert.assertEquals( @@ -88,7 +93,7 @@ public void testRecoverFromNodeDeletion() throws Exception { Assert.assertEquals(brokerRegistry.getAvailableBrokersAsync().get(), List.of(pulsar.getBrokerId())); } - @Test(enabled = false) + @Test public void testRegisterAgain() throws Exception { Awaitility.await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> Assert.assertEquals( brokerRegistry.getAvailableBrokersAsync().join(), List.of(pulsar.getBrokerId()))); @@ -105,7 +110,7 @@ public void testRegisterAgain() throws Exception { }); } - private ServiceConfiguration brokerConfig() { + protected ServiceConfiguration brokerConfig() { final var config = new ServiceConfiguration(); config.setClusterName(clusterName); config.setAdvertisedAddress("localhost"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryMetadataStoreIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryMetadataStoreIntegrationTest.java new file mode 100644 index 0000000000000..3e01b1fad0f21 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryMetadataStoreIntegrationTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions; + +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateMetadataStoreTableViewImpl; +import org.testng.annotations.Test; + +@Test(groups = "broker") +public class BrokerRegistryMetadataStoreIntegrationTest extends BrokerRegistryIntegrationTest { + + @Override + protected ServiceConfiguration brokerConfig() { + final var config = super.brokerConfig(); + config.setLoadManagerServiceUnitStateTableViewClassName( + ServiceUnitStateMetadataStoreTableViewImpl.class.getName()); + return config; + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java index 61ab4de8a3294..5448751160a9c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java @@ -173,6 +173,9 @@ public void testRefreshAPI(int partition) throws Exception { TableView tv = pulsarClient.newTableView(Schema.BYTES) .topic(topic) .create(); + // Verify refresh can handle the case when the topic is empty + tv.refreshAsync().get(3, TimeUnit.SECONDS); + // 2. Add a listen action to provide the test environment. // The listen action will be triggered when there are incoming messages every time. // This is a sync operation, so sleep in the listen action can slow down the reading rate of messages. diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java index 4f52060497864..17b49828eeced 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java @@ -36,6 +36,7 @@ import org.apache.pulsar.client.api.CryptoKeyReader; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.ReaderBuilder; @@ -259,7 +260,11 @@ private void handleMessage(Message msg) { @Override public CompletableFuture refreshAsync() { CompletableFuture completableFuture = new CompletableFuture<>(); - reader.thenCompose(reader -> getLastMessageIds(reader).thenAccept(lastMessageIds -> { + reader.thenCompose(reader -> getLastMessageIdOfNonEmptyTopics(reader).thenAccept(lastMessageIds -> { + if (lastMessageIds.isEmpty()) { + completableFuture.complete(null); + return; + } // After get the response of lastMessageIds, put the future and result into `refreshMap` // and then filter out partitions that has been read to the lastMessageID. pendingRefreshRequests.put(completableFuture, lastMessageIds); @@ -291,8 +296,12 @@ private CompletableFuture readAllExistingMessages(Reader reader) { AtomicLong messagesRead = new AtomicLong(); CompletableFuture future = new CompletableFuture<>(); - getLastMessageIds(reader).thenAccept(maxMessageIds -> { - readAllExistingMessages(reader, future, startTime, messagesRead, maxMessageIds); + getLastMessageIdOfNonEmptyTopics(reader).thenAccept(lastMessageIds -> { + if (lastMessageIds.isEmpty()) { + future.complete(null); + return; + } + readAllExistingMessages(reader, future, startTime, messagesRead, lastMessageIds); }).exceptionally(ex -> { future.completeExceptionally(ex); return null; @@ -300,13 +309,15 @@ private CompletableFuture readAllExistingMessages(Reader reader) { return future; } - private CompletableFuture> getLastMessageIds(Reader reader) { + private CompletableFuture> getLastMessageIdOfNonEmptyTopics(Reader reader) { return reader.getLastMessageIdsAsync().thenApply(lastMessageIds -> { - Map maxMessageIds = new ConcurrentHashMap<>(); + Map lastMessageIdMap = new ConcurrentHashMap<>(); lastMessageIds.forEach(topicMessageId -> { - maxMessageIds.put(topicMessageId.getOwnerTopic(), topicMessageId); + if (((MessageIdAdv) topicMessageId).getEntryId() >= 0) { + lastMessageIdMap.put(topicMessageId.getOwnerTopic(), topicMessageId); + } // else: a negative entry id represents an empty topic so that we don't have to read messages from it }); - return maxMessageIds; + return lastMessageIdMap; }); }