diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpTopicManager.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpTopicManager.java index 5f0f61d9..8a207fec 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpTopicManager.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpTopicManager.java @@ -16,6 +16,7 @@ import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.service.AbstractTopic; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.common.naming.TopicName; @@ -61,6 +62,8 @@ public static CompletableFuture getTopic(PulsarService pulsarService, Str try { if (topicOptional.isPresent()) { Topic topic = topicOptional.get(); + AbstractTopic abstractTopic = (AbstractTopic) topic; + abstractTopic.setDeleteWhileInactive(false); topicCompletableFuture.complete(topic); } else { log.error("Get empty topic for name {}", topicName); diff --git a/amqp-impl/src/test/java/io/streamnative/pulsar/handlers/amqp/test/AmqpTopicManagerTest.java b/amqp-impl/src/test/java/io/streamnative/pulsar/handlers/amqp/test/AmqpTopicManagerTest.java new file mode 100644 index 00000000..70ef5ee5 --- /dev/null +++ b/amqp-impl/src/test/java/io/streamnative/pulsar/handlers/amqp/test/AmqpTopicManagerTest.java @@ -0,0 +1,35 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.streamnative.pulsar.handlers.amqp.test; + +import io.streamnative.pulsar.handlers.amqp.AmqpTopicManager; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.service.AbstractTopic; +import org.junit.Assert; +import org.testng.annotations.Test; + +/** + * Unit test for Pulsar Topic. + */ +public class AmqpTopicManagerTest extends AmqpProtocolTestBase { + + @Test + public void testDeleteWhileInactiveIsFalse() { + PulsarService pulsarService = connection.getPulsarService(); + AbstractTopic abstractTopic = (AbstractTopic) AmqpTopicManager.getOrCreateTopic(pulsarService, + "public/vhost1/test", true); + Assert.assertFalse(abstractTopic.isDeleteWhileInactive()); + } +} diff --git a/amqp-impl/src/test/java/io/streamnative/pulsar/handlers/amqp/test/mock/MockTopic.java b/amqp-impl/src/test/java/io/streamnative/pulsar/handlers/amqp/test/mock/MockTopic.java deleted file mode 100644 index 7254b6b2..00000000 --- a/amqp-impl/src/test/java/io/streamnative/pulsar/handlers/amqp/test/mock/MockTopic.java +++ /dev/null @@ -1,307 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.streamnative.pulsar.handlers.amqp.test.mock; - -import io.netty.buffer.ByteBuf; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import org.apache.bookkeeper.mledger.Position; -import org.apache.pulsar.broker.service.BrokerServiceException; -import org.apache.pulsar.broker.service.Consumer; -import org.apache.pulsar.broker.service.Producer; -import org.apache.pulsar.broker.service.Replicator; -import org.apache.pulsar.broker.service.ServerCnx; -import org.apache.pulsar.broker.service.Subscription; -import org.apache.pulsar.broker.service.Topic; -import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; -import org.apache.pulsar.broker.stats.ClusterReplicationMetrics; -import org.apache.pulsar.broker.stats.NamespaceStats; -import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.common.api.proto.PulsarApi; -import org.apache.pulsar.common.policies.data.BacklogQuota; -import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; -import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; -import org.apache.pulsar.common.policies.data.Policies; -import org.apache.pulsar.common.policies.data.TopicStats; -import org.apache.pulsar.common.protocol.schema.SchemaData; -import org.apache.pulsar.common.protocol.schema.SchemaVersion; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; -import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; -import org.apache.pulsar.utils.StatsOutputStream; - - -/** - * Mock topic. - */ -public class MockTopic implements Topic { - - ConcurrentOpenHashMap subscriptions = new ConcurrentOpenHashMap<>(); - - @Override - public void publishMessage(ByteBuf byteBuf, PublishContext publishContext) { - - } - - @Override - public void addProducer(Producer producer) throws BrokerServiceException { - - } - - @Override - public void removeProducer(Producer producer) { - - } - - @Override - public void recordAddLatency(long l, TimeUnit timeUnit) { - - } - - @Override - public CompletableFuture subscribe(ServerCnx serverCnx, String s, long l, - PulsarApi.CommandSubscribe.SubType subType, - int i, String s1, boolean b, - MessageId messageId, Map map, - boolean b1, - PulsarApi.CommandSubscribe.InitialPosition initialPosition, - long l1, boolean b2, PulsarApi.KeySharedMeta keySharedMeta) { - return null; - } - - @Override - public CompletableFuture createSubscription(String s, - PulsarApi.CommandSubscribe.InitialPosition initialPosition, boolean b) { - CompletableFuture completableFuture = new CompletableFuture<>(); - completableFuture.complete(null); - return completableFuture; - } - - @Override - public CompletableFuture unsubscribe(String s) { - return null; - } - - @Override - public ConcurrentOpenHashMap getSubscriptions() { - return subscriptions; - } - - @Override - public CompletableFuture delete() { - CompletableFuture completableFuture = new CompletableFuture<>(); - completableFuture.complete(null); - return completableFuture; - } - - @Override - public Map getProducers() { - return null; - } - - @Override - public String getName() { - return "persistent://public/topic"; - } - - @Override - public CompletableFuture checkReplication() { - return null; - } - - @Override - public CompletableFuture close(boolean b) { - return null; - } - - @Override - public void checkGC(int i, InactiveTopicDeleteMode inactiveTopicDeleteMode) { - - } - - - @Override - public void checkInactiveSubscriptions() { - - } - - @Override - public void checkBackloggedCursors() { - - } - - @Override - public void checkMessageExpiry() { - - } - - @Override - public void checkMessageDeduplicationInfo() { - - } - - @Override - public void checkTopicPublishThrottlingRate() { - - } - - @Override - public void incrementPublishCount(int i, long l) { - - } - - @Override - public void resetTopicPublishCountAndEnableReadIfRequired() { - - } - - @Override - public void resetBrokerPublishCountAndEnableReadIfRequired(boolean b) { - - } - - @Override - public boolean isPublishRateExceeded() { - return false; - } - - @Override - public boolean isTopicPublishRateExceeded(int i, int i1) { - return false; - } - - @Override - public boolean isBrokerPublishRateExceeded() { - return false; - } - - @Override - public void disableCnxAutoRead() { - - } - - @Override - public void enableCnxAutoRead() { - - } - - @Override - public CompletableFuture onPoliciesUpdate(Policies policies) { - return null; - } - - @Override - public boolean isBacklogQuotaExceeded(String s) { - return false; - } - - @Override - public boolean isEncryptionRequired() { - return false; - } - - @Override - public boolean getSchemaValidationEnforced() { - return false; - } - - @Override - public boolean isReplicated() { - return false; - } - - @Override - public BacklogQuota getBacklogQuota() { - return null; - } - - @Override - public void updateRates(NamespaceStats namespaceStats, - NamespaceBundleStats namespaceBundleStats, - StatsOutputStream statsOutputStream, - ClusterReplicationMetrics clusterReplicationMetrics, - String s, boolean b) { - - } - - @Override - public Subscription getSubscription(String s) { - Subscription subscription = subscriptions.get(s); - if (subscription == null) { - subscription = new MockSubscription(); - } - return subscription; - } - - @Override - public ConcurrentOpenHashMap getReplicators() { - return null; - } - - @Override - public TopicStats getStats(boolean b) { - return null; - } - - @Override - public PersistentTopicInternalStats getInternalStats() { - return null; - } - - @Override - public Position getLastPosition() { - return null; - } - - @Override - public CompletableFuture getLastMessageId() { - return null; - } - - @Override - public CompletableFuture hasSchema() { - return null; - } - - @Override - public CompletableFuture addSchema(SchemaData schemaData) { - return null; - } - - @Override - public CompletableFuture deleteSchema() { - return null; - } - - @Override - public CompletableFuture checkSchemaCompatibleForConsumer(SchemaData schemaData) { - return null; - } - - @Override - public CompletableFuture addSchemaIfIdleOrCheckCompatible(SchemaData schemaData) { - return null; - } - - @Override - public CompletableFuture deleteForcefully() { - return null; - } - - @Override - public Optional getDispatchRateLimiter() { - return Optional.empty(); - } -} diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/rabbitmq/RabbitMQTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/rabbitmq/RabbitMQTestBase.java index 8cb6d82e..036cbb1e 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/rabbitmq/RabbitMQTestBase.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/rabbitmq/RabbitMQTestBase.java @@ -34,7 +34,6 @@ import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TenantInfo; import org.testng.Assert; import org.testng.annotations.AfterClass; @@ -73,8 +72,6 @@ public void setup() throws Exception { String ns = "public/" + vhost; if (!admin.namespaces().getNamespaces("public").contains(ns)) { admin.namespaces().createNamespace(ns, 1); - admin.namespaces().setRetention(ns, - new RetentionPolicies(60, 1000)); } } checkPulsarServiceState();