Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #81]Fix RocketMQTemplate.syncSend collection type method signature #150

Merged
merged 10 commits into from
Oct 31, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public SendResult syncSend(String destination, Message<?> message, long timeout)
* @param timeout send timeout with millis
* @return {@link SendResult}
*/
public SendResult syncSend(String destination, Collection<Message<?>> messages, long timeout) {
public <T extends Message> SendResult syncSend(String destination, Collection<T> messages, long timeout) {
if (Objects.isNull(messages) || messages.size() == 0) {
log.error("syncSend with batch failed. destination:{}, messages is empty ", destination);
throw new IllegalArgumentException("`messages` can not be empty");
Expand All @@ -144,7 +144,7 @@ public SendResult syncSend(String destination, Collection<Message<?>> messages,
long now = System.currentTimeMillis();
Collection<org.apache.rocketmq.common.message.Message> rmqMsgs = new ArrayList<>();
org.apache.rocketmq.common.message.Message rocketMsg;
for (Message<?> msg:messages) {
for (Message msg:messages) {
if (Objects.isNull(msg) || Objects.isNull(msg.getPayload())) {
log.warn("Found a message empty in the batch, skip it");
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public static org.springframework.messaging.Message convertToSpringMessage(

public static org.apache.rocketmq.common.message.Message convertToRocketMessage(
ObjectMapper objectMapper, String charset,
String destination, org.springframework.messaging.Message<?> message) {
String destination, org.springframework.messaging.Message message) {
Object payloadObj = message.getPayload();
byte[] payloads;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
Expand All @@ -38,6 +40,11 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.GenericMessage;

import java.util.ArrayList;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -176,6 +183,28 @@ public void testRocketMQTransactionListener() {

}

@Test
public void testBatchSendMessage() {
runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876",
"rocketmq.producer.group=spring_rocketmq").
run((context) -> {
RocketMQTemplate rocketMQTemplate = context.getBean(RocketMQTemplate.class);
List<GenericMessage<String>> batchMessages = new ArrayList<GenericMessage<String>>();

String errorMsg = null;
try {
SendResult customSendResult = rocketMQTemplate.syncSend("test", batchMessages, 60000);
} catch (IllegalArgumentException e) {
// it will be throw IllegalArgumentException: `messages` can not be empty
errorMsg = e.getMessage();
}

// that means the rocketMQTemplate.syncSend is chosen the correct type method
Assert.assertEquals("`messages` can not be empty", errorMsg);
});

}

@Configuration
static class TestConfig {

Expand Down