Skip to content

Commit

Permalink
DBZ-8118 Remove unnecessary prefix; add test
Browse files Browse the repository at this point in the history
  • Loading branch information
jpechane committed Aug 9, 2024
1 parent 58f7d46 commit d36d291
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConnectionFactoryConfigurator;
import com.rabbitmq.stream.*;
import com.rabbitmq.stream.Address;
import com.rabbitmq.stream.ByteCapacity;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.Producer;
import com.rabbitmq.stream.StreamCreator;
import com.rabbitmq.stream.StreamException;

import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
Expand Down Expand Up @@ -54,13 +59,13 @@ public class RabbitMqStreamNativeChangeConsumer extends BaseChangeConsumer imple
@ConfigProperty(name = PROP_STREAM)
Optional<String> stream;

@ConfigProperty(name = PROP_PREFIX + "stream.config.maxAge")
@ConfigProperty(name = PROP_PREFIX + "stream.maxAge")
Optional<Duration> streamMaxAge;

@ConfigProperty(name = PROP_PREFIX + "stream.config.maxLength")
@ConfigProperty(name = PROP_PREFIX + "stream.maxLength")
Optional<String> streamMaxLength;

@ConfigProperty(name = PROP_PREFIX + "stream.config.maxSegmentSize")
@ConfigProperty(name = PROP_PREFIX + "stream.maxSegmentSize")
Optional<String> streamMaxSegmentSize;

@ConfigProperty(name = PROP_PREFIX + "ackTimeout", defaultValue = "30000")
Expand All @@ -71,9 +76,10 @@ public class RabbitMqStreamNativeChangeConsumer extends BaseChangeConsumer imple

Environment environment;

Map<String, Producer> streamProducers = new HashMap<>();;
Map<String, Producer> streamProducers = new HashMap<>();

private void createStream(Environment env, String name) {
LOGGER.info("Creating stream '{}'", name);
StreamCreator stream = env.streamCreator().stream(name);

streamMaxAge.ifPresent(stream::maxAge);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import java.util.List;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.stream.Address;
import jakarta.enterprise.event.Observes;

import org.awaitility.Awaitility;
Expand All @@ -24,6 +23,7 @@
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.stream.Address;
import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.OffsetSpecification;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ public RabbitMqTestConfigSource() {
String sinkType = System.getProperty("debezium.sink.type");
if ("rabbitmqstream".equals(sinkType)) {
rabbitmqConfig.put("debezium.sink.type", "rabbitmqstream");
rabbitmqConfig.put("debezium.sink.rabbitmqstream.stream", TOPIC_NAME);
}
else {
rabbitmqConfig.put("debezium.sink.type", "rabbitmq");
Expand Down

0 comments on commit d36d291

Please sign in to comment.