diff --git a/.github/workflows/pr-test.yml b/.github/workflows/pr-test.yml index a1e08947..416ff193 100644 --- a/.github/workflows/pr-test.yml +++ b/.github/workflows/pr-test.yml @@ -42,6 +42,9 @@ jobs: - name: Spotbugs check run: mvn spotbugs:check + - name: amqp-impl tests + run: mvn test -DfailIfNoTests=false -pl amqp-impl + - name: test after build run: mvn test -DfailIfNoTests=false -pl tests diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/PulsarServiceLookupHandler.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/PulsarServiceLookupHandler.java index c02472c6..8b12f32f 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/PulsarServiceLookupHandler.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/PulsarServiceLookupHandler.java @@ -27,6 +27,7 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.api.MetadataCache; +import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData; import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData; /** @@ -39,12 +40,12 @@ public class PulsarServiceLookupHandler implements LookupHandler { private PulsarClientImpl pulsarClient; - private MetadataCache serviceLookupDataCache; + private MetadataCache serviceLookupDataCache; public PulsarServiceLookupHandler(PulsarService pulsarService, PulsarClientImpl pulsarClient) { this.pulsarService = pulsarService; this.pulsarClient = pulsarClient; - this.serviceLookupDataCache = pulsarService.getLocalMetadataStore().getMetadataCache(ServiceLookupData.class); + this.serviceLookupDataCache = pulsarService.getLocalMetadataStore().getMetadataCache(LocalBrokerData.class); } @Override @@ -73,7 +74,7 @@ public CompletableFuture> findBroker(TopicName topicName, } } - List>> futureList = new ArrayList<>(); + List>> futureList = new ArrayList<>(); for (String webService : matchWebUri) { String path = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + webService; futureList.add(serviceLookupDataCache.get(path)); @@ -109,8 +110,8 @@ public CompletableFuture> findBroker(TopicName topicName, return; } boolean match = false; - for (CompletableFuture> future : futureList) { - Optional optionalServiceLookupData = future.join(); + for (CompletableFuture> future : futureList) { + Optional optionalServiceLookupData = future.join(); if (!optionalServiceLookupData.isPresent()) { log.warn("Service lookup data is null."); continue; diff --git a/amqp-impl/src/main/resources/log4j2.xml b/amqp-impl/src/main/resources/log4j2.xml index d4e2ba01..b859997b 100644 --- a/amqp-impl/src/main/resources/log4j2.xml +++ b/amqp-impl/src/main/resources/log4j2.xml @@ -36,7 +36,7 @@ - + diff --git a/amqp-impl/src/test/java/io/streamnative/pulsar/handlers/amqp/test/AmqpProtocolTestBase.java b/amqp-impl/src/test/java/io/streamnative/pulsar/handlers/amqp/test/AmqpProtocolTestBase.java index 55a62e41..7df5b660 100644 --- a/amqp-impl/src/test/java/io/streamnative/pulsar/handlers/amqp/test/AmqpProtocolTestBase.java +++ b/amqp-impl/src/test/java/io/streamnative/pulsar/handlers/amqp/test/AmqpProtocolTestBase.java @@ -144,6 +144,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { Mockito.when(pulsarService.getConfiguration()).thenReturn(serviceConfiguration); Mockito.when(pulsarService.getOrderedExecutor()).thenReturn( OrderedExecutor.newBuilder().numThreads(8).name("pulsar-ordered").build()); + Mockito.when(serviceConfiguration.getNumIOThreads()).thenReturn(2 * Runtime.getRuntime().availableProcessors()); } private void mockBrokerService() { diff --git a/tests/pom.xml b/tests/pom.xml index 32b7f93f..4550e324 100644 --- a/tests/pom.xml +++ b/tests/pom.xml @@ -71,6 +71,12 @@ qpid-test-utils ${qpid-protocol-plugin.version} test + + + ch.qos.logback + logback-classic + + diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/AmqpProtocolHandlerTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/AmqpProtocolHandlerTestBase.java index 9a6b975f..412a6699 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/AmqpProtocolHandlerTestBase.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/AmqpProtocolHandlerTestBase.java @@ -18,6 +18,7 @@ import com.google.common.collect.Sets; import com.google.common.util.concurrent.MoreExecutors; +import io.netty.channel.EventLoopGroup; import java.io.IOException; import java.lang.reflect.Field; import java.net.URI; @@ -33,8 +34,6 @@ import java.util.concurrent.TimeUnit; import java.util.function.Predicate; import java.util.function.Supplier; - -import io.netty.channel.EventLoopGroup; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.BookKeeper; @@ -346,12 +345,17 @@ public CompletableFuture create(String serverList, SessionType sessio private BookKeeperClientFactory mockBookKeeperClientFactory = new BookKeeperClientFactory() { @Override - public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient, EventLoopGroup eventLoopGroup, Optional> ensemblePlacementPolicyClass, Map ensemblePlacementPolicyProperties) throws IOException { + public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient, EventLoopGroup eventLoopGroup, + Optional> ensemblePlacementPolicyClass, + Map ensemblePlacementPolicyProperties) throws IOException { return mockBookKeeper; } @Override - public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient, EventLoopGroup eventLoopGroup, Optional> ensemblePlacementPolicyClass, Map ensemblePlacementPolicyProperties, StatsLogger statsLogger) throws IOException { + public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient, EventLoopGroup eventLoopGroup, + Optional> ensemblePlacementPolicyClass, + Map ensemblePlacementPolicyProperties, StatsLogger statsLogger) + throws IOException { return mockBookKeeper; } diff --git a/tests/src/test/resources/log4j2.xml b/tests/src/test/resources/log4j2.xml index d4e2ba01..b859997b 100644 --- a/tests/src/test/resources/log4j2.xml +++ b/tests/src/test/resources/log4j2.xml @@ -36,7 +36,7 @@ - +