Skip to content

Commit

Permalink
Bump version to 2.8.0-rc-202106071430 (#224)
Browse files Browse the repository at this point in the history
  • Loading branch information
streamnativebot authored Jun 8, 2021
1 parent 78acf3a commit ad27561
Show file tree
Hide file tree
Showing 32 changed files with 381 additions and 210 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/pr-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ jobs:
- name: Spotbugs check
run: mvn spotbugs:check

- name: amqp-impl tests
run: mvn test -DfailIfNoTests=false -pl amqp-impl

- name: test after build
run: mvn test -DfailIfNoTests=false -pl tests

Expand Down
2 changes: 1 addition & 1 deletion amqp-impl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>io.streamnative.pulsar.handlers</groupId>
<artifactId>pulsar-protocol-handler-amqp-parent</artifactId>
<version>0.1.0-SNAPSHOT</version>
<version>2.8.0-rc-202106071430</version>
</parent>

<groupId>io.streamnative.pulsar.handlers</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.exchange.ExchangeDefaults;
import org.apache.qpid.server.message.MessageDestination;
Expand Down Expand Up @@ -270,13 +270,13 @@ private AmqpConsumer subscribe(String consumerTag, String queueName, Topic topic
try {
if (subscription == null) {
subscription = topic.createSubscription(defaultSubscription,
PulsarApi.CommandSubscribe.InitialPosition.Earliest, false).get();
CommandSubscribe.InitialPosition.Earliest, false).get();
}
consumer = new AmqpConsumer(queueContainer, subscription, exclusive
? PulsarApi.CommandSubscribe.SubType.Exclusive :
PulsarApi.CommandSubscribe.SubType.Shared, topic.getName(), 0, 0,
? CommandSubscribe.SubType.Exclusive :
CommandSubscribe.SubType.Shared, topic.getName(), 0, 0,
consumerTag, 0, connection.getServerCnx(), "", null,
false, PulsarApi.CommandSubscribe.InitialPosition.Latest,
false, CommandSubscribe.InitialPosition.Latest,
null, this, consumerTag, queueName, ack);
subscription.addConsumer(consumer);
consumer.handleFlow(DEFAULT_CONSUMER_PERMIT);
Expand Down Expand Up @@ -378,13 +378,13 @@ public void receiveBasicGet(AMQShortString queue, boolean noAck) {
try {
if (subscription == null) {
subscription = topic.createSubscription(defaultSubscription,
PulsarApi.CommandSubscribe.InitialPosition.Earliest, false).get();
CommandSubscribe.InitialPosition.Earliest, false).get();
}
consumer = new AmqpPullConsumer(queueContainer, subscription,
PulsarApi.CommandSubscribe.SubType.Shared,
CommandSubscribe.SubType.Shared,
topic.getName(), 0, 0, "", 0,
connection.getServerCnx(), "", null, false,
PulsarApi.CommandSubscribe.InitialPosition.Latest, null, this,
CommandSubscribe.InitialPosition.Latest, null, this,
"", queueName, noAck);
subscription.addConsumer(consumer);
consumer.handleFlow(DEFAULT_CONSUMER_PERMIT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
import org.apache.pulsar.zookeeper.ZooKeeperCache;

Expand Down Expand Up @@ -85,6 +86,8 @@ public AmqpBrokerDecoder getBrokerDecoder() {

// advertised data is write in /loadbalance/brokers/advertisedAddress:webServicePort
// here we get the broker url, need to find related webServiceUrl.
MetadataCache<ServiceLookupData> serviceLookupDataCache =
pulsarService.getLocalMetadataStore().getMetadataCache(ServiceLookupData.class);
ZooKeeperCache zkCache = pulsarService.getLocalZkCache();
zkCache.getChildrenAsync(LoadManager.LOADBALANCE_BROKERS_ROOT, zkCache)
.whenComplete((set, throwable) -> {
Expand Down Expand Up @@ -112,10 +115,8 @@ public AmqpBrokerDecoder getBrokerDecoder() {
// Get a list of ServiceLookupData for each matchBroker.
List<CompletableFuture<Optional<ServiceLookupData>>> list = matchBrokers.stream()
.map(matchBroker ->
zkCache.getDataAsync(
String.format("%s/%s", LoadManager.LOADBALANCE_BROKERS_ROOT, matchBroker),
(ZooKeeperCache.Deserializer<ServiceLookupData>)
pulsarService.getLoadManager().get().getLoadReportDeserializer()))
serviceLookupDataCache.get(
String.format("%s/%s", LoadManager.LOADBALANCE_BROKERS_ROOT, matchBroker)))
.collect(Collectors.toList());

FutureUtil.waitForAll(list)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.qpid.server.protocol.v0_8.AMQShortString;

/**
Expand Down Expand Up @@ -70,11 +72,11 @@ public class AmqpConsumer extends Consumer {
private final int maxPermits = 1000;

public AmqpConsumer(QueueContainer queueContainer, Subscription subscription,
PulsarApi.CommandSubscribe.SubType subType, String topicName, long consumerId,
CommandSubscribe.SubType subType, String topicName, long consumerId,
int priorityLevel, String consumerName, int maxUnackedMessages, ServerCnx cnx,
String appId, Map<String, String> metadata, boolean readCompacted,
PulsarApi.CommandSubscribe.InitialPosition subscriptionInitialPosition,
PulsarApi.KeySharedMeta keySharedMeta, AmqpChannel channel, String consumerTag, String queueName,
CommandSubscribe.InitialPosition subscriptionInitialPosition,
KeySharedMeta keySharedMeta, AmqpChannel channel, String consumerTag, String queueName,
boolean autoAck) throws BrokerServiceException {
super(subscription, subType, topicName, consumerId, priorityLevel, consumerName, maxUnackedMessages,
cnx, appId, metadata, readCompacted, subscriptionInitialPosition, keySharedMeta);
Expand Down Expand Up @@ -167,7 +169,7 @@ public void messagesAck(List<Position> position) {
incrementPermits(position.size());
ManagedCursor cursor = ((PersistentSubscription) getSubscription()).getCursor();
Position previousMarkDeletePosition = cursor.getMarkDeletedPosition();
getSubscription().acknowledgeMessage(position, PulsarApi.CommandAck.AckType.Individual, Collections.EMPTY_MAP);
getSubscription().acknowledgeMessage(position, CommandAck.AckType.Individual, Collections.EMPTY_MAP);
if (!cursor.getMarkDeletedPosition().equals(previousMarkDeletePosition)) {
asyncGetQueue().whenComplete((amqpQueue, throwable) -> {
if (throwable != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.CommandSubscribe;

/**
* Amqp exchange replicator, read entries from BookKeeper and process entries.
Expand Down Expand Up @@ -107,7 +107,7 @@ public void startReplicate() {
log.info("{} Replicator is starting.", name);

topic.getManagedLedger().asyncOpenCursor(cursorNamePre + persistentExchange.getName(),
PulsarApi.CommandSubscribe.InitialPosition.Earliest,
CommandSubscribe.InitialPosition.Earliest,
new AsyncCallbacks.OpenCursorCallback() {
@Override
public void openCursorComplete(ManagedCursor managedCursor, Object o) {
Expand Down Expand Up @@ -161,7 +161,7 @@ private void readMoreEntries() {
if (log.isDebugEnabled()) {
log.debug("{} Schedule read of {} messages.", name, availablePermits);
}
cursor.asyncReadEntriesOrWait(availablePermits, defaultReadMaxSizeBytes, this, null);
cursor.asyncReadEntriesOrWait(availablePermits, defaultReadMaxSizeBytes, this, null, null);
} else {
if (log.isDebugEnabled()) {
log.debug("{} Not schedule read due to pending read. Messages to read {}.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.Map;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.broker.protocol.ProtocolHandler;
Expand Down Expand Up @@ -95,7 +94,8 @@ public void start(BrokerService service) {
proxyConfig.setAmqpMaxFrameSize(amqpConfig.getAmqpMaxFrameSize());
proxyConfig.setAmqpHeartBeat(amqpConfig.getAmqpHeartBeat());
proxyConfig.setAmqpProxyPort(amqpConfig.getAmqpProxyPort());
proxyConfig.setBrokerServiceURL("pulsar://" + PulsarService.advertisedAddress(amqpConfig) + ":"
proxyConfig.setBrokerServiceURL("pulsar://"
+ ServiceConfigurationUtils.getAppliedAdvertisedAddress(amqpConfig) + ":"
+ amqpConfig.getBrokerServicePort().get());
ProxyService proxyService = new ProxyService(proxyConfig, service.getPulsar());
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.KeySharedMeta;

/**
* Amqp consumer Used to return pull messages.
Expand All @@ -27,11 +28,11 @@
public class AmqpPullConsumer extends AmqpConsumer {

public AmqpPullConsumer(QueueContainer queueContainer, Subscription subscription,
PulsarApi.CommandSubscribe.SubType subType, String topicName, long consumerId, int priorityLevel,
CommandSubscribe.SubType subType, String topicName, long consumerId, int priorityLevel,
String consumerName, int maxUnackedMessages, ServerCnx cnx, String appId,
Map<String, String> metadata, boolean readCompacted,
PulsarApi.CommandSubscribe.InitialPosition subscriptionInitialPosition,
PulsarApi.KeySharedMeta keySharedMeta, AmqpChannel channel, String consumerTag, String queueName,
CommandSubscribe.InitialPosition subscriptionInitialPosition,
KeySharedMeta keySharedMeta, AmqpChannel channel, String consumerTag, String queueName,
boolean autoAck) throws BrokerServiceException {
super(queueContainer, subscription, subType, topicName, consumerId, priorityLevel, consumerName,
maxUnackedMessages,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.CommandSubscribe;

/**
* exchange topic and queue cursor manager.
Expand Down Expand Up @@ -71,7 +71,7 @@ public ManagedCursor getOrCreateCursor(String name) {
}
ManagedCursor newCursor;
try {
newCursor = ledger.openCursor(name, PulsarApi.CommandSubscribe.InitialPosition.Latest);
newCursor = ledger.openCursor(name, CommandSubscribe.InitialPosition.Latest);
} catch (ManagedLedgerException | InterruptedException e) {
log.error("Error new cursor for topic {} - {}. will cause fetch data error.",
topic.getName(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public void readEntriesComplete(List<Entry> list, Object o) {
public void readEntriesFailed(ManagedLedgerException e, Object o) {
message.complete(null);
}
}, null);
}, null, null);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.KeyValue;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
Expand Down Expand Up @@ -86,7 +86,7 @@ public CompletableFuture<Void> readProcess(Entry entry) {
try {
MessageImpl<byte[]> message = MessageImpl.deserialize(entry.getDataBuffer());
props = message.getMessageBuilder().getPropertiesList().stream()
.collect(Collectors.toMap(PulsarApi.KeyValue::getKey, PulsarApi.KeyValue::getValue));
.collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue));
} catch (IOException e) {
log.error("Deserialize entry dataBuffer failed. exchangeName: {}", exchangeName, e);
return FutureUtil.failedFuture(e);
Expand Down Expand Up @@ -245,7 +245,7 @@ private ManagedCursor createCursorIfNotExists(String name) {
}
ManagedCursor newCursor;
try {
//newCursor = ledger.openCursor(name, PulsarApi.CommandSubscribe.InitialPosition.Latest);
//newCursor = ledger.openCursor(name, CommandSubscribe.InitialPosition.Latest);
newCursor = ledger.newNonDurableCursor(ledger.getLastConfirmedEntry(), name);
} catch (ManagedLedgerException e) {
log.error("Error new cursor for topic {} - {}. will cause fetch data error.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ public ProxyService(ProxyConfiguration proxyConfig, PulsarService pulsarService)
this.proxyConfig = proxyConfig;
this.pulsarService = pulsarService;
this.tenant = this.proxyConfig.getAmqpTenant();
acceptorGroup = EventLoopUtil.newEventLoopGroup(1, acceptorThreadFactory);
workerGroup = EventLoopUtil.newEventLoopGroup(numThreads, workerThreadFactory);
acceptorGroup = EventLoopUtil.newEventLoopGroup(1, false, acceptorThreadFactory);
workerGroup = EventLoopUtil.newEventLoopGroup(numThreads, false, workerThreadFactory);
}

private void configValid(ProxyConfiguration proxyConfig) {
Expand Down
Loading

0 comments on commit ad27561

Please sign in to comment.