Skip to content

Commit

Permalink
[ISSUE #622] Add config enableSsl (#623)
Browse files Browse the repository at this point in the history
  • Loading branch information
panzhi33 authored Feb 8, 2024
1 parent 361b1f7 commit 89896c5
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,9 @@ private SimpleConsumerBuilder createConsumer(org.apache.rocketmq.client.annotati
String filterExpressionType = resolvePlaceholders(annotation.filterExpressionType(), simpleConsumer.getFilterExpressionType());
Duration requestTimeout = Duration.ofDays(annotation.requestTimeout());
int awaitDuration = annotation.awaitDuration();
Boolean sslEnabled = simpleConsumer.isSslEnabled();
Assert.hasText(topicName, "[topic] must not be null");
ClientConfiguration clientConfiguration = RocketMQUtil.createClientConfiguration(accessKey, secretKey, endPoints, requestTimeout);
ClientConfiguration clientConfiguration = RocketMQUtil.createClientConfiguration(accessKey, secretKey, endPoints, requestTimeout, sslEnabled);
final ClientServiceProvider provider = ClientServiceProvider.loadService();
FilterExpression filterExpression = RocketMQUtil.createFilterExpression(tag, filterExpressionType);
Duration duration = Duration.ofSeconds(awaitDuration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ private ProducerBuilder createProducer(ExtProducerResetConfiguration annotation)
String secretKey = environment.resolvePlaceholders(annotation.secretKey());
secretKey = StringUtils.hasLength(secretKey) ? secretKey : producerConfig.getSecretKey();
int requestTimeout = annotation.requestTimeout();
ClientConfiguration clientConfiguration = RocketMQUtil.createClientConfiguration(accessKey, secretKey, endpoints, Duration.ofDays(requestTimeout));
Boolean sslEnabled = producerConfig.isSslEnabled();
ClientConfiguration clientConfiguration = RocketMQUtil.createClientConfiguration(accessKey, secretKey, endpoints, Duration.ofDays(requestTimeout), sslEnabled);
final ClientServiceProvider provider = ClientServiceProvider.loadService();
ProducerBuilder producerBuilder = provider.newProducerBuilder()
.setClientConfiguration(clientConfiguration).setMaxAttempts(annotation.maxAttempts())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,12 @@ public ProducerBuilder producerBuilder(RocketMQProperties rocketMQProperties) {
ProducerBuilder producerBuilder;
producerBuilder = provider.newProducerBuilder()
.setClientConfiguration(clientConfiguration)
// Set the topic name(s), which is optional but recommended. It makes producer could prefetch the topic
// route before message publishing.
.setTopics(rocketMQProducer.getTopic())
.setMaxAttempts(rocketMQProducer.getMaxAttempts());
if (StringUtils.hasLength(topic)) {
// Set the topic name(s), which is optional but recommended. It makes producer could prefetch the topic
// route before message publishing.
producerBuilder.setTopics(rocketMQProducer.getTopic());
}
log.info(String.format("a producer init on proxy %s", endPoints));
return producerBuilder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ public class DefaultListenerContainer implements InitializingBean,

int consumptionThreadCount = 20;

Boolean sslEnabled;

public String getName() {
return name;
}
Expand Down Expand Up @@ -230,6 +232,14 @@ public void setType(String type) {
this.type = type;
}

public Boolean getSslEnabled() {
return sslEnabled;
}

public void setSslEnabled(Boolean sslEnabled) {
this.sslEnabled = sslEnabled;
}

private void initRocketMQPushConsumer() {
if (rocketMQMessageListener == null) {
throw new IllegalArgumentException("Property 'rocketMQMessageListener' is required");
Expand All @@ -242,7 +252,8 @@ private void initRocketMQPushConsumer() {
if (StringUtils.hasLength(this.getTag())) {
filterExpression = RocketMQUtil.createFilterExpression(this.getTag(),this.getType());
}
ClientConfiguration clientConfiguration = RocketMQUtil.createClientConfiguration(this.getAccessKey(), this.getSecretKey(), this.getEndpoints(), this.getRequestTimeout());
ClientConfiguration clientConfiguration = RocketMQUtil.createClientConfiguration(this.getAccessKey(), this.getSecretKey(),
this.getEndpoints(), this.getRequestTimeout(), this.sslEnabled);

PushConsumerBuilder pushConsumerBuilder = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,21 +121,22 @@ public static ClientConfiguration createProducerClientConfiguration(RocketMQProp
String secretKey = rocketMQProducer.getSecretKey();
String endPoints = rocketMQProducer.getEndpoints();
Duration requestTimeout = Duration.ofDays(rocketMQProducer.getRequestTimeout());
// boolean sslEnabled = rocketMQProducer.isSslEnabled();
return createClientConfiguration(accessKey, secretKey, endPoints, requestTimeout);
boolean sslEnabled = rocketMQProducer.isSslEnabled();
return createClientConfiguration(accessKey, secretKey, endPoints, requestTimeout, sslEnabled);
}

public static ClientConfiguration createConsumerClientConfiguration(RocketMQProperties.SimpleConsumer simpleConsumer) {
String accessKey = simpleConsumer.getAccessKey();
String secretKey = simpleConsumer.getSecretKey();
String endPoints = simpleConsumer.getEndpoints();
Duration requestTimeout = Duration.ofDays(simpleConsumer.getRequestTimeout());
// boolean sslEnabled = rocketMQProducer.isSslEnabled();
return createClientConfiguration(accessKey, secretKey, endPoints, requestTimeout);
boolean sslEnabled = simpleConsumer.isSslEnabled();
return createClientConfiguration(accessKey, secretKey, endPoints, requestTimeout, sslEnabled);

}

public static ClientConfiguration createClientConfiguration(String accessKey, String secretKey, String endPoints, Duration requestTimeout) {
public static ClientConfiguration createClientConfiguration(String accessKey, String secretKey, String endPoints,
Duration requestTimeout, Boolean sslEnabled) {

SessionCredentialsProvider sessionCredentialsProvider = null;
if (StringUtils.hasLength(accessKey) && StringUtils.hasLength(secretKey)) {
Expand All @@ -150,6 +151,9 @@ public static ClientConfiguration createClientConfiguration(String accessKey, St
if (Objects.nonNull(requestTimeout)) {
clientConfigurationBuilder.setRequestTimeout(requestTimeout);
}
if (Objects.nonNull(sslEnabled)) {
clientConfigurationBuilder.enableSsl(sslEnabled);
}
return clientConfigurationBuilder.build();
}

Expand Down

0 comments on commit 89896c5

Please sign in to comment.