Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #654] Support namespace for rocketmq-v5-client-spring-boot and rocketmq-spring-boot #655

Merged
merged 2 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@
*/
String namespace() default "";

/**
* The namespace v2 version of consumer, it can not be used in combination with namespace.
*/
String namespaceV2() default "";

/**
* The property of "instanceName".
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,17 @@
* The property of "tlsEnable" default false.
*/
String tlsEnable() default "false";

/**
* The namespace of producer.
*/
String namespace() default "";

/**
* The namespace v2 version of producer, it can not be used in combination with namespace.
*/
String namespaceV2() default "";

/**
* The property of "instanceName".
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@
*/
String namespace() default "";

/**
* The namespace V2 version of listener, it can not be used in combination with namespace.
*/
String namespaceV2() default "";

/**
* Message consume retry strategy in concurrently mode.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ private DefaultLitePullConsumer createConsumer(ExtRocketMQConsumerConfiguration
litePullConsumer.setCustomizedTraceTopic(resolvePlaceholders(annotation.customizedTraceTopic(), consumerConfig.getCustomizedTraceTopic()));
String namespace = environment.resolvePlaceholders(annotation.namespace());
litePullConsumer.setNamespace(RocketMQUtil.getNamespace(namespace, consumerConfig.getNamespace()));
String namespaceV2 = environment.resolvePlaceholders(annotation.namespaceV2());
litePullConsumer.setNamespaceV2(RocketMQUtil.getNamespace(namespaceV2, consumerConfig.getNamespaceV2()));
litePullConsumer.setInstanceName(annotation.instanceName());
return litePullConsumer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ private DefaultMQProducer createProducer(ExtRocketMQTemplateConfiguration annota
producer.setUseTLS(useTLS);
String namespace = environment.resolvePlaceholders(annotation.namespace());
producer.setNamespace(RocketMQUtil.getNamespace(namespace, producerConfig.getNamespace()));
String namespaceV2 = environment.resolvePlaceholders(annotation.namespaceV2());
producer.setNamespaceV2(RocketMQUtil.getNamespace(namespaceV2, producerConfig.getNamespaceV2()));
producer.setInstanceName(annotation.instanceName());
return producer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ public DefaultMQProducer defaultMQProducer(RocketMQProperties rocketMQProperties
if (StringUtils.hasText(producerConfig.getNamespace())) {
producer.setNamespace(producerConfig.getNamespace());
}
if (StringUtils.hasText(producerConfig.getNamespaceV2())) {
producer.setNamespaceV2(producerConfig.getNamespaceV2());
}
producer.setInstanceName(producerConfig.getInstanceName());
log.info("a producer ({}) init on namesrv {}", groupName, nameServer);
return producer;
Expand Down Expand Up @@ -152,6 +155,9 @@ public DefaultLitePullConsumer defaultLitePullConsumer(RocketMQProperties rocket
if (StringUtils.hasText(consumerConfig.getNamespace())) {
litePullConsumer.setNamespace(consumerConfig.getNamespace());
}
if (StringUtils.hasText(consumerConfig.getNamespaceV2())) {
litePullConsumer.setNamespaceV2(consumerConfig.getNamespaceV2());
}
litePullConsumer.setInstanceName(consumerConfig.getInstanceName());
log.info("a pull consumer({} sub {}) init on namesrv {}", groupName, topicName, nameServer);
return litePullConsumer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ public static class Producer {
*/
private String namespace;

/**
* The namespace v2 version of producer, it can not be used in combination with namespace.
*/
private String namespaceV2;

/**
* Millis of send message timeout.
*/
Expand Down Expand Up @@ -274,6 +279,14 @@ public void setNamespace(String namespace) {
this.namespace = namespace;
}

public String getNamespaceV2() {
return namespaceV2;
}

public void setNamespaceV2(String namespaceV2) {
this.namespaceV2 = namespaceV2;
}

public String getInstanceName() {
return instanceName;
}
Expand All @@ -294,6 +307,11 @@ public static class PullConsumer {
*/
private String namespace;

/**
* The namespace v2 version of consumer, it can not be used in combination with namespace.
*/
private String namespaceV2;

/**
* Topic name of consumer.
*/
Expand Down Expand Up @@ -445,6 +463,14 @@ public void setNamespace(String namespace) {
this.namespace = namespace;
}

public String getNamespaceV2() {
return namespaceV2;
}

public void setNamespaceV2(String namespaceV2) {
this.namespaceV2 = namespaceV2;
}

public String getInstanceName() {
return instanceName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ public class DefaultRocketMQListenerContainer implements InitializingBean,
private int replyTimeout;
private String tlsEnable;
private String namespace;
private String namespaceV2;
private long awaitTerminationMillisWhenShutdown;

private String instanceName;
Expand Down Expand Up @@ -246,6 +247,7 @@ public void setRocketMQMessageListener(RocketMQMessageListener anno) {
this.replyTimeout = anno.replyTimeout();
this.tlsEnable = anno.tlsEnable();
this.namespace = anno.namespace();
this.namespaceV2 = anno.namespaceV2();
this.delayLevelWhenNextConsume = anno.delayLevelWhenNextConsume();
this.suspendCurrentQueueTimeMillis = anno.suspendCurrentQueueTimeMillis();
this.awaitTerminationMillisWhenShutdown = Math.max(0, anno.awaitTerminationMillisWhenShutdown());
Expand Down Expand Up @@ -288,6 +290,14 @@ public void setNamespace(String namespace) {
this.namespace = namespace;
}

public String getNamespaceV2() {
return namespaceV2;
}

public void setNamespaceV2(String namespaceV2) {
this.namespaceV2 = namespaceV2;
}

public DefaultMQPushConsumer getConsumer() {
return consumer;
}
Expand Down Expand Up @@ -394,6 +404,7 @@ public String toString() {
return "DefaultRocketMQListenerContainer{" +
"consumerGroup='" + consumerGroup + '\'' +
", namespace='" + namespace + '\'' +
", namespaceV2='" + namespaceV2 + '\'' +
", nameServer='" + nameServer + '\'' +
", topic='" + topic + '\'' +
", consumeMode=" + consumeMode +
Expand Down Expand Up @@ -631,6 +642,7 @@ private void initRocketMQPushConsumer() throws MQClientException {
resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
}
consumer.setNamespace(namespace);
consumer.setNamespaceV2(namespaceV2);

String customizedNameServer = this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.nameServer());
if (customizedNameServer != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String
String namespace = environment.resolvePlaceholders(annotation.namespace());
container.setNamespace(RocketMQUtil.getNamespace(namespace,
rocketMQProperties.getConsumer().getNamespace()));

String namespaceV2 = environment.resolvePlaceholders(annotation.namespaceV2());
container.setNamespaceV2(RocketMQUtil.getNamespace(namespaceV2,
rocketMQProperties.getConsumer().getNamespaceV2()));
return container;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ public void testSetRocketMQMessageListener() {
assertEquals(anno.selectorExpression(), container.getSelectorExpression());
assertEquals(anno.tlsEnable(), container.getTlsEnable());
assertEquals(anno.namespace(), container.getNamespace());
assertEquals(anno.namespaceV2(), container.getNamespaceV2());
assertEquals(anno.delayLevelWhenNextConsume(), container.getDelayLevelWhenNextConsume());
assertEquals(anno.suspendCurrentQueueTimeMillis(), container.getSuspendCurrentQueueTimeMillis());
assertEquals(anno.instanceName(), container.getInstanceName());
Expand All @@ -264,6 +265,7 @@ public void testSetRocketMQMessageListener() {
selectorExpression = "selectorExpression",
tlsEnable = "tlsEnable",
namespace = "namespace",
namespaceV2 = "namespaceV2",
delayLevelWhenNextConsume = 1234,
suspendCurrentQueueTimeMillis = 2345,
instanceName = "instanceName"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,8 @@
*/
int awaitDuration() default 0;

/**
* The namespace of consumer.
*/
String namespace() default "";
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,8 @@
*/
int maxAttempts() default 3;

/**
* The namespace of producer.
*/
String namespace() default "";
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,5 +82,8 @@

int consumptionThreadCount() default 20;


/**
* The namespace of listener.
*/
String namespace() default "";
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,14 @@ private SimpleConsumerBuilder createConsumer(org.apache.rocketmq.client.annotati
String accessKey = resolvePlaceholders(annotation.accessKey(), simpleConsumer.getAccessKey());
String secretKey = resolvePlaceholders(annotation.secretKey(), simpleConsumer.getSecretKey());
String endPoints = resolvePlaceholders(annotation.endpoints(), simpleConsumer.getEndpoints());
String namespace = resolvePlaceholders(annotation.namespace(), simpleConsumer.getNamespace());
String tag = resolvePlaceholders(annotation.tag(), simpleConsumer.getTag());
String filterExpressionType = resolvePlaceholders(annotation.filterExpressionType(), simpleConsumer.getFilterExpressionType());
Duration requestTimeout = Duration.ofSeconds(annotation.requestTimeout());
int awaitDuration = annotation.awaitDuration();
Boolean sslEnabled = simpleConsumer.isSslEnabled();
Assert.hasText(topicName, "[topic] must not be null");
ClientConfiguration clientConfiguration = RocketMQUtil.createClientConfiguration(accessKey, secretKey, endPoints, requestTimeout, sslEnabled);
ClientConfiguration clientConfiguration = RocketMQUtil.createClientConfiguration(accessKey, secretKey, endPoints, requestTimeout, sslEnabled, namespace);
final ClientServiceProvider provider = ClientServiceProvider.loadService();
FilterExpression filterExpression = RocketMQUtil.createFilterExpression(tag, filterExpressionType);
Duration duration = Duration.ofSeconds(awaitDuration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,12 @@ private ProducerBuilder createProducer(ExtProducerResetConfiguration annotation)
accessKey = StringUtils.hasLength(accessKey) ? accessKey : producerConfig.getAccessKey();
String secretKey = environment.resolvePlaceholders(annotation.secretKey());
secretKey = StringUtils.hasLength(secretKey) ? secretKey : producerConfig.getSecretKey();
String namespace = environment.resolvePlaceholders(annotation.namespace());
namespace = StringUtils.hasLength(namespace) ? namespace : producerConfig.getNamespace();
int requestTimeout = annotation.requestTimeout();
Boolean sslEnabled = producerConfig.isSslEnabled();
ClientConfiguration clientConfiguration = RocketMQUtil.createClientConfiguration(accessKey, secretKey, endpoints, Duration.ofSeconds(requestTimeout), sslEnabled);
ClientConfiguration clientConfiguration = RocketMQUtil.createClientConfiguration(accessKey, secretKey,
endpoints, Duration.ofSeconds(requestTimeout), sslEnabled, namespace);
final ClientServiceProvider provider = ClientServiceProvider.loadService();
ProducerBuilder producerBuilder = provider.newProducerBuilder()
.setClientConfiguration(clientConfiguration).setMaxAttempts(annotation.maxAttempts())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ private DefaultListenerContainer createRocketMQListenerContainer(String name, Ob
container.setTag(environment.resolvePlaceholders(annotation.tag()));
container.setEndpoints(environment.resolvePlaceholders(annotation.endpoints()));
container.setTopic(environment.resolvePlaceholders(annotation.topic()));
container.setNamespace(environment.resolvePlaceholders(annotation.namespace()));
container.setRequestTimeout(Duration.ofSeconds(annotation.requestTimeout()));
container.setMaxCachedMessageCount(annotation.maxCachedMessageCount());
container.setConsumptionThreadCount(annotation.consumptionThreadCount());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ public static class Producer {
*/
private int maxAttempts = 3;

private String namespace;

public String getAccessKey() {
return accessKey;
}
Expand Down Expand Up @@ -135,6 +137,14 @@ public void setMaxAttempts(int maxAttempts) {
this.maxAttempts = maxAttempts;
}

public String getNamespace() {
return namespace;
}

public void setNamespace(String namespace) {
this.namespace = namespace;
}

@Override
public String toString() {
return "Producer{" +
Expand All @@ -144,6 +154,7 @@ public String toString() {
", topic='" + topic + '\'' +
", requestTimeout=" + requestTimeout +
", sslEnabled=" + sslEnabled +
", namespace='" + namespace + '\'' +
'}';
}
}
Expand Down Expand Up @@ -200,6 +211,8 @@ public static class SimpleConsumer {
*/
private boolean sslEnabled = true;

private String namespace = "";

public String getAccessKey() {
return accessKey;
}
Expand Down Expand Up @@ -280,6 +293,14 @@ public void setFilterExpressionType(String filterExpressionType) {
this.filterExpressionType = filterExpressionType;
}

public String getNamespace() {
return namespace;
}

public void setNamespace(String namespace) {
this.namespace = namespace;
}

@Override
public String toString() {
return "SimpleConsumer{" +
Expand All @@ -293,6 +314,7 @@ public String toString() {
", requestTimeout=" + requestTimeout +
", filterExpressionType='" + filterExpressionType + '\'' +
", sslEnabled=" + sslEnabled +
", namespace='" + namespace + '\'' +
'}';
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ public class DefaultListenerContainer implements InitializingBean,

Boolean sslEnabled;

String namespace;

public String getName() {
return name;
}
Expand Down Expand Up @@ -240,6 +242,14 @@ public void setSslEnabled(Boolean sslEnabled) {
this.sslEnabled = sslEnabled;
}

public String getNamespace() {
return namespace;
}

public void setNamespace(String namespace) {
this.namespace = namespace;
}

private void initRocketMQPushConsumer() {
if (rocketMQMessageListener == null) {
throw new IllegalArgumentException("Property 'rocketMQMessageListener' is required");
Expand All @@ -253,7 +263,7 @@ private void initRocketMQPushConsumer() {
filterExpression = RocketMQUtil.createFilterExpression(this.getTag(),this.getType());
}
ClientConfiguration clientConfiguration = RocketMQUtil.createClientConfiguration(this.getAccessKey(), this.getSecretKey(),
this.getEndpoints(), this.getRequestTimeout(), this.sslEnabled);
this.getEndpoints(), this.getRequestTimeout(), this.sslEnabled, this.namespace);

PushConsumerBuilder pushConsumerBuilder = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration);
Expand Down Expand Up @@ -354,6 +364,7 @@ public String toString() {
", maxCachedMessageCount=" + maxCachedMessageCount +
", maxCacheMessageSizeInBytes=" + maxCacheMessageSizeInBytes +
", consumptionThreadCount=" + consumptionThreadCount +
", namespace='" + namespace + '\'' +
'}';
}
}
Loading
Loading