diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQConsumerConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQConsumerConfiguration.java index 64a261f3..2d6b2733 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQConsumerConfiguration.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQConsumerConfiguration.java @@ -116,6 +116,11 @@ */ String namespace() default ""; + /** + * The namespace v2 version of consumer, it can not be used in combination with namespace. + */ + String namespaceV2() default ""; + /** * The property of "instanceName". */ diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQTemplateConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQTemplateConfiguration.java index 504d5c0f..a8ae90df 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQTemplateConfiguration.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQTemplateConfiguration.java @@ -89,11 +89,17 @@ * The property of "tlsEnable" default false. */ String tlsEnable() default "false"; + /** * The namespace of producer. */ String namespace() default ""; + /** + * The namespace v2 version of producer, it can not be used in combination with namespace. + */ + String namespaceV2() default ""; + /** * The property of "instanceName". */ diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java index 302d276c..1a5843e0 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java @@ -142,6 +142,11 @@ */ String namespace() default ""; + /** + * The namespace V2 version of listener, it can not be used in combination with namespace. + */ + String namespaceV2() default ""; + /** * Message consume retry strategy in concurrently mode. * diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtConsumerResetConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtConsumerResetConfiguration.java index 178285d7..efe69ec2 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtConsumerResetConfiguration.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtConsumerResetConfiguration.java @@ -132,6 +132,8 @@ private DefaultLitePullConsumer createConsumer(ExtRocketMQConsumerConfiguration litePullConsumer.setCustomizedTraceTopic(resolvePlaceholders(annotation.customizedTraceTopic(), consumerConfig.getCustomizedTraceTopic())); String namespace = environment.resolvePlaceholders(annotation.namespace()); litePullConsumer.setNamespace(RocketMQUtil.getNamespace(namespace, consumerConfig.getNamespace())); + String namespaceV2 = environment.resolvePlaceholders(annotation.namespaceV2()); + litePullConsumer.setNamespaceV2(RocketMQUtil.getNamespace(namespaceV2, consumerConfig.getNamespaceV2())); litePullConsumer.setInstanceName(annotation.instanceName()); return litePullConsumer; } diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java index 35aee2a2..1db3ae26 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java @@ -129,6 +129,8 @@ private DefaultMQProducer createProducer(ExtRocketMQTemplateConfiguration annota producer.setUseTLS(useTLS); String namespace = environment.resolvePlaceholders(annotation.namespace()); producer.setNamespace(RocketMQUtil.getNamespace(namespace, producerConfig.getNamespace())); + String namespaceV2 = environment.resolvePlaceholders(annotation.namespaceV2()); + producer.setNamespaceV2(RocketMQUtil.getNamespace(namespaceV2, producerConfig.getNamespaceV2())); producer.setInstanceName(annotation.instanceName()); return producer; } diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java index 1bf9764c..45d26642 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java @@ -118,6 +118,9 @@ public DefaultMQProducer defaultMQProducer(RocketMQProperties rocketMQProperties if (StringUtils.hasText(producerConfig.getNamespace())) { producer.setNamespace(producerConfig.getNamespace()); } + if (StringUtils.hasText(producerConfig.getNamespaceV2())) { + producer.setNamespaceV2(producerConfig.getNamespaceV2()); + } producer.setInstanceName(producerConfig.getInstanceName()); log.info("a producer ({}) init on namesrv {}", groupName, nameServer); return producer; @@ -152,6 +155,9 @@ public DefaultLitePullConsumer defaultLitePullConsumer(RocketMQProperties rocket if (StringUtils.hasText(consumerConfig.getNamespace())) { litePullConsumer.setNamespace(consumerConfig.getNamespace()); } + if (StringUtils.hasText(consumerConfig.getNamespaceV2())) { + litePullConsumer.setNamespaceV2(consumerConfig.getNamespaceV2()); + } litePullConsumer.setInstanceName(consumerConfig.getInstanceName()); log.info("a pull consumer({} sub {}) init on namesrv {}", groupName, topicName, nameServer); return litePullConsumer; diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java index e1b26a76..41752ae5 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java @@ -108,6 +108,11 @@ public static class Producer { */ private String namespace; + /** + * The namespace v2 version of producer, it can not be used in combination with namespace. + */ + private String namespaceV2; + /** * Millis of send message timeout. */ @@ -274,6 +279,14 @@ public void setNamespace(String namespace) { this.namespace = namespace; } + public String getNamespaceV2() { + return namespaceV2; + } + + public void setNamespaceV2(String namespaceV2) { + this.namespaceV2 = namespaceV2; + } + public String getInstanceName() { return instanceName; } @@ -294,6 +307,11 @@ public static class PullConsumer { */ private String namespace; + /** + * The namespace v2 version of consumer, it can not be used in combination with namespace. + */ + private String namespaceV2; + /** * Topic name of consumer. */ @@ -445,6 +463,14 @@ public void setNamespace(String namespace) { this.namespace = namespace; } + public String getNamespaceV2() { + return namespaceV2; + } + + public void setNamespaceV2(String namespaceV2) { + this.namespaceV2 = namespaceV2; + } + public String getInstanceName() { return instanceName; } diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java index fb7762e9..ae095d0a 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java @@ -135,6 +135,7 @@ public class DefaultRocketMQListenerContainer implements InitializingBean, private int replyTimeout; private String tlsEnable; private String namespace; + private String namespaceV2; private long awaitTerminationMillisWhenShutdown; private String instanceName; @@ -246,6 +247,7 @@ public void setRocketMQMessageListener(RocketMQMessageListener anno) { this.replyTimeout = anno.replyTimeout(); this.tlsEnable = anno.tlsEnable(); this.namespace = anno.namespace(); + this.namespaceV2 = anno.namespaceV2(); this.delayLevelWhenNextConsume = anno.delayLevelWhenNextConsume(); this.suspendCurrentQueueTimeMillis = anno.suspendCurrentQueueTimeMillis(); this.awaitTerminationMillisWhenShutdown = Math.max(0, anno.awaitTerminationMillisWhenShutdown()); @@ -288,6 +290,14 @@ public void setNamespace(String namespace) { this.namespace = namespace; } + public String getNamespaceV2() { + return namespaceV2; + } + + public void setNamespaceV2(String namespaceV2) { + this.namespaceV2 = namespaceV2; + } + public DefaultMQPushConsumer getConsumer() { return consumer; } @@ -394,6 +404,7 @@ public String toString() { return "DefaultRocketMQListenerContainer{" + "consumerGroup='" + consumerGroup + '\'' + ", namespace='" + namespace + '\'' + + ", namespaceV2='" + namespaceV2 + '\'' + ", nameServer='" + nameServer + '\'' + ", topic='" + topic + '\'' + ", consumeMode=" + consumeMode + @@ -631,6 +642,7 @@ private void initRocketMQPushConsumer() throws MQClientException { resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic())); } consumer.setNamespace(namespace); + consumer.setNamespaceV2(namespaceV2); String customizedNameServer = this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.nameServer()); if (customizedNameServer != null) { diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQMessageListenerContainerRegistrar.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQMessageListenerContainerRegistrar.java index 11cdcd92..a27c187f 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQMessageListenerContainerRegistrar.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQMessageListenerContainerRegistrar.java @@ -151,6 +151,10 @@ private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String String namespace = environment.resolvePlaceholders(annotation.namespace()); container.setNamespace(RocketMQUtil.getNamespace(namespace, rocketMQProperties.getConsumer().getNamespace())); + + String namespaceV2 = environment.resolvePlaceholders(annotation.namespaceV2()); + container.setNamespaceV2(RocketMQUtil.getNamespace(namespaceV2, + rocketMQProperties.getConsumer().getNamespaceV2())); return container; } diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java index de15fcdf..182d2fa3 100644 --- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java +++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java @@ -251,6 +251,7 @@ public void testSetRocketMQMessageListener() { assertEquals(anno.selectorExpression(), container.getSelectorExpression()); assertEquals(anno.tlsEnable(), container.getTlsEnable()); assertEquals(anno.namespace(), container.getNamespace()); + assertEquals(anno.namespaceV2(), container.getNamespaceV2()); assertEquals(anno.delayLevelWhenNextConsume(), container.getDelayLevelWhenNextConsume()); assertEquals(anno.suspendCurrentQueueTimeMillis(), container.getSuspendCurrentQueueTimeMillis()); assertEquals(anno.instanceName(), container.getInstanceName()); @@ -264,6 +265,7 @@ public void testSetRocketMQMessageListener() { selectorExpression = "selectorExpression", tlsEnable = "tlsEnable", namespace = "namespace", + namespaceV2 = "namespaceV2", delayLevelWhenNextConsume = 1234, suspendCurrentQueueTimeMillis = 2345, instanceName = "instanceName" diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java index 8615e2b6..20ea5aec 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java @@ -87,4 +87,8 @@ */ int awaitDuration() default 0; + /** + * The namespace of consumer. + */ + String namespace() default ""; } diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtProducerResetConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtProducerResetConfiguration.java index 8849d0fc..c6ab5052 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtProducerResetConfiguration.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtProducerResetConfiguration.java @@ -75,4 +75,8 @@ */ int maxAttempts() default 3; + /** + * The namespace of producer. + */ + String namespace() default ""; } diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListener.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListener.java index 89107aa1..24d303be 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListener.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListener.java @@ -82,5 +82,8 @@ int consumptionThreadCount() default 20; - + /** + * The namespace of listener. + */ + String namespace() default ""; } diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtConsumerResetConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtConsumerResetConfiguration.java index 23b70af8..6e854a01 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtConsumerResetConfiguration.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtConsumerResetConfiguration.java @@ -113,13 +113,14 @@ private SimpleConsumerBuilder createConsumer(org.apache.rocketmq.client.annotati String accessKey = resolvePlaceholders(annotation.accessKey(), simpleConsumer.getAccessKey()); String secretKey = resolvePlaceholders(annotation.secretKey(), simpleConsumer.getSecretKey()); String endPoints = resolvePlaceholders(annotation.endpoints(), simpleConsumer.getEndpoints()); + String namespace = resolvePlaceholders(annotation.namespace(), simpleConsumer.getNamespace()); String tag = resolvePlaceholders(annotation.tag(), simpleConsumer.getTag()); String filterExpressionType = resolvePlaceholders(annotation.filterExpressionType(), simpleConsumer.getFilterExpressionType()); Duration requestTimeout = Duration.ofSeconds(annotation.requestTimeout()); int awaitDuration = annotation.awaitDuration(); Boolean sslEnabled = simpleConsumer.isSslEnabled(); Assert.hasText(topicName, "[topic] must not be null"); - ClientConfiguration clientConfiguration = RocketMQUtil.createClientConfiguration(accessKey, secretKey, endPoints, requestTimeout, sslEnabled); + ClientConfiguration clientConfiguration = RocketMQUtil.createClientConfiguration(accessKey, secretKey, endPoints, requestTimeout, sslEnabled, namespace); final ClientServiceProvider provider = ClientServiceProvider.loadService(); FilterExpression filterExpression = RocketMQUtil.createFilterExpression(tag, filterExpressionType); Duration duration = Duration.ofSeconds(awaitDuration); diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtTemplateResetConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtTemplateResetConfiguration.java index 11ddf854..19737524 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtTemplateResetConfiguration.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtTemplateResetConfiguration.java @@ -110,9 +110,12 @@ private ProducerBuilder createProducer(ExtProducerResetConfiguration annotation) accessKey = StringUtils.hasLength(accessKey) ? accessKey : producerConfig.getAccessKey(); String secretKey = environment.resolvePlaceholders(annotation.secretKey()); secretKey = StringUtils.hasLength(secretKey) ? secretKey : producerConfig.getSecretKey(); + String namespace = environment.resolvePlaceholders(annotation.namespace()); + namespace = StringUtils.hasLength(namespace) ? namespace : producerConfig.getNamespace(); int requestTimeout = annotation.requestTimeout(); Boolean sslEnabled = producerConfig.isSslEnabled(); - ClientConfiguration clientConfiguration = RocketMQUtil.createClientConfiguration(accessKey, secretKey, endpoints, Duration.ofSeconds(requestTimeout), sslEnabled); + ClientConfiguration clientConfiguration = RocketMQUtil.createClientConfiguration(accessKey, secretKey, + endpoints, Duration.ofSeconds(requestTimeout), sslEnabled, namespace); final ClientServiceProvider provider = ClientServiceProvider.loadService(); ProducerBuilder producerBuilder = provider.newProducerBuilder() .setClientConfiguration(clientConfiguration).setMaxAttempts(annotation.maxAttempts()) diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ListenerContainerConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ListenerContainerConfiguration.java index bfbb7f9a..450d846e 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ListenerContainerConfiguration.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ListenerContainerConfiguration.java @@ -102,6 +102,7 @@ private DefaultListenerContainer createRocketMQListenerContainer(String name, Ob container.setTag(environment.resolvePlaceholders(annotation.tag())); container.setEndpoints(environment.resolvePlaceholders(annotation.endpoints())); container.setTopic(environment.resolvePlaceholders(annotation.topic())); + container.setNamespace(environment.resolvePlaceholders(annotation.namespace())); container.setRequestTimeout(Duration.ofSeconds(annotation.requestTimeout())); container.setMaxCachedMessageCount(annotation.maxCachedMessageCount()); container.setConsumptionThreadCount(annotation.consumptionThreadCount()); diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java index 64c9a6a1..23f0ad49 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java @@ -79,6 +79,8 @@ public static class Producer { */ private int maxAttempts = 3; + private String namespace; + public String getAccessKey() { return accessKey; } @@ -135,6 +137,14 @@ public void setMaxAttempts(int maxAttempts) { this.maxAttempts = maxAttempts; } + public String getNamespace() { + return namespace; + } + + public void setNamespace(String namespace) { + this.namespace = namespace; + } + @Override public String toString() { return "Producer{" + @@ -144,6 +154,7 @@ public String toString() { ", topic='" + topic + '\'' + ", requestTimeout=" + requestTimeout + ", sslEnabled=" + sslEnabled + + ", namespace='" + namespace + '\'' + '}'; } } @@ -200,6 +211,8 @@ public static class SimpleConsumer { */ private boolean sslEnabled = true; + private String namespace = ""; + public String getAccessKey() { return accessKey; } @@ -280,6 +293,14 @@ public void setFilterExpressionType(String filterExpressionType) { this.filterExpressionType = filterExpressionType; } + public String getNamespace() { + return namespace; + } + + public void setNamespace(String namespace) { + this.namespace = namespace; + } + @Override public String toString() { return "SimpleConsumer{" + @@ -293,6 +314,7 @@ public String toString() { ", requestTimeout=" + requestTimeout + ", filterExpressionType='" + filterExpressionType + '\'' + ", sslEnabled=" + sslEnabled + + ", namespace='" + namespace + '\'' + '}'; } } diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/DefaultListenerContainer.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/DefaultListenerContainer.java index 45cdf7eb..69bbe60e 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/DefaultListenerContainer.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/DefaultListenerContainer.java @@ -87,6 +87,8 @@ public class DefaultListenerContainer implements InitializingBean, Boolean sslEnabled; + String namespace; + public String getName() { return name; } @@ -240,6 +242,14 @@ public void setSslEnabled(Boolean sslEnabled) { this.sslEnabled = sslEnabled; } + public String getNamespace() { + return namespace; + } + + public void setNamespace(String namespace) { + this.namespace = namespace; + } + private void initRocketMQPushConsumer() { if (rocketMQMessageListener == null) { throw new IllegalArgumentException("Property 'rocketMQMessageListener' is required"); @@ -253,7 +263,7 @@ private void initRocketMQPushConsumer() { filterExpression = RocketMQUtil.createFilterExpression(this.getTag(),this.getType()); } ClientConfiguration clientConfiguration = RocketMQUtil.createClientConfiguration(this.getAccessKey(), this.getSecretKey(), - this.getEndpoints(), this.getRequestTimeout(), this.sslEnabled); + this.getEndpoints(), this.getRequestTimeout(), this.sslEnabled, this.namespace); PushConsumerBuilder pushConsumerBuilder = provider.newPushConsumerBuilder() .setClientConfiguration(clientConfiguration); @@ -354,6 +364,7 @@ public String toString() { ", maxCachedMessageCount=" + maxCachedMessageCount + ", maxCacheMessageSizeInBytes=" + maxCacheMessageSizeInBytes + ", consumptionThreadCount=" + consumptionThreadCount + + ", namespace='" + namespace + '\'' + '}'; } } diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQUtil.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQUtil.java index 9104828e..4e95bf46 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQUtil.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQUtil.java @@ -122,7 +122,8 @@ public static ClientConfiguration createProducerClientConfiguration(RocketMQProp String endPoints = rocketMQProducer.getEndpoints(); Duration requestTimeout = Duration.ofSeconds(rocketMQProducer.getRequestTimeout()); boolean sslEnabled = rocketMQProducer.isSslEnabled(); - return createClientConfiguration(accessKey, secretKey, endPoints, requestTimeout, sslEnabled); + String namespace = rocketMQProducer.getNamespace(); + return createClientConfiguration(accessKey, secretKey, endPoints, requestTimeout, sslEnabled, namespace); } public static ClientConfiguration createConsumerClientConfiguration(RocketMQProperties.SimpleConsumer simpleConsumer) { @@ -131,12 +132,13 @@ public static ClientConfiguration createConsumerClientConfiguration(RocketMQProp String endPoints = simpleConsumer.getEndpoints(); Duration requestTimeout = Duration.ofSeconds(simpleConsumer.getRequestTimeout()); boolean sslEnabled = simpleConsumer.isSslEnabled(); - return createClientConfiguration(accessKey, secretKey, endPoints, requestTimeout, sslEnabled); + String namespace = simpleConsumer.getNamespace(); + return createClientConfiguration(accessKey, secretKey, endPoints, requestTimeout, sslEnabled, namespace); } public static ClientConfiguration createClientConfiguration(String accessKey, String secretKey, String endPoints, - Duration requestTimeout, Boolean sslEnabled) { + Duration requestTimeout, Boolean sslEnabled, String namespace) { SessionCredentialsProvider sessionCredentialsProvider = null; if (StringUtils.hasLength(accessKey) && StringUtils.hasLength(secretKey)) { @@ -154,6 +156,9 @@ public static ClientConfiguration createClientConfiguration(String accessKey, St if (Objects.nonNull(sslEnabled)) { clientConfigurationBuilder.enableSsl(sslEnabled); } + if (StringUtils.hasLength(namespace)) { + clientConfigurationBuilder.setNamespace(namespace); + } return clientConfigurationBuilder.build(); }