Skip to content

Commit

Permalink
[improve][broker] PIP-379: Snapshot hash range assignments only in AU…
Browse files Browse the repository at this point in the history
…TO_SPLIT ordered mode
  • Loading branch information
lhotari committed Oct 8, 2024
1 parent 4d6dee4 commit 7209b21
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReadWriteLock;
Expand All @@ -44,21 +45,32 @@ public class ConsistentHashingStickyKeyConsumerSelector implements StickyKeyCons

private final int numberOfPoints;
private final Range keyHashRange;
private final boolean addOrRemoveReturnsImpactedConsumersResult;
private ConsumerHashAssignmentsSnapshot consumerHashAssignmentsSnapshot;

public ConsistentHashingStickyKeyConsumerSelector(int numberOfPoints) {
this(numberOfPoints, DEFAULT_RANGE_SIZE - 1);
this(numberOfPoints, false);
}

public ConsistentHashingStickyKeyConsumerSelector(int numberOfPoints, int rangeMaxValue) {
public ConsistentHashingStickyKeyConsumerSelector(int numberOfPoints,
boolean addOrRemoveReturnsImpactedConsumersResult) {
this(numberOfPoints, addOrRemoveReturnsImpactedConsumersResult, DEFAULT_RANGE_SIZE - 1);
}

public ConsistentHashingStickyKeyConsumerSelector(int numberOfPoints,
boolean addOrRemoveReturnsImpactedConsumersResult,
int rangeMaxValue) {
this.addOrRemoveReturnsImpactedConsumersResult = addOrRemoveReturnsImpactedConsumersResult;
this.hashRing = new TreeMap<>();
this.numberOfPoints = numberOfPoints;
this.keyHashRange = Range.of(STICKY_KEY_HASH_NOT_SET + 1, rangeMaxValue);
this.consumerHashAssignmentsSnapshot = ConsumerHashAssignmentsSnapshot.empty();
this.consumerHashAssignmentsSnapshot = addOrRemoveReturnsImpactedConsumersResult
? ConsumerHashAssignmentsSnapshot.empty()
: null;
}

@Override
public CompletableFuture<ImpactedConsumersResult> addConsumer(Consumer consumer) {
public CompletableFuture<Optional<ImpactedConsumersResult>> addConsumer(Consumer consumer) {
rwLock.writeLock().lock();
try {
ConsumerIdentityWrapper consumerIdentityWrapper = new ConsumerIdentityWrapper(consumer);
Expand All @@ -76,11 +88,14 @@ public CompletableFuture<ImpactedConsumersResult> addConsumer(Consumer consumer)
consumerNameIndexTracker.decreaseConsumerRefCount(removed);
}
}
if (!addOrRemoveReturnsImpactedConsumersResult) {
return CompletableFuture.completedFuture(Optional.empty());
}
ConsumerHashAssignmentsSnapshot assignmentsAfter = internalGetConsumerHashAssignmentsSnapshot();
ImpactedConsumersResult impactedConsumers =
consumerHashAssignmentsSnapshot.resolveImpactedConsumers(assignmentsAfter);
consumerHashAssignmentsSnapshot = assignmentsAfter;
return CompletableFuture.completedFuture(impactedConsumers);
return CompletableFuture.completedFuture(Optional.of(impactedConsumers));
} finally {
rwLock.writeLock().unlock();
}
Expand All @@ -103,7 +118,7 @@ private int calculateHashForConsumerAndIndex(Consumer consumer, int consumerName
}

@Override
public ImpactedConsumersResult removeConsumer(Consumer consumer) {
public Optional<ImpactedConsumersResult> removeConsumer(Consumer consumer) {
rwLock.writeLock().lock();
try {
ConsumerIdentityWrapper consumerIdentityWrapper = new ConsumerIdentityWrapper(consumer);
Expand All @@ -117,11 +132,14 @@ public ImpactedConsumersResult removeConsumer(Consumer consumer) {
}
}
}
if (!addOrRemoveReturnsImpactedConsumersResult) {
return Optional.empty();
}
ConsumerHashAssignmentsSnapshot assignmentsAfter = internalGetConsumerHashAssignmentsSnapshot();
ImpactedConsumersResult impactedConsumers =
consumerHashAssignmentsSnapshot.resolveImpactedConsumers(assignmentsAfter);
consumerHashAssignmentsSnapshot = assignmentsAfter;
return impactedConsumers;
return Optional.of(impactedConsumers);
} finally {
rwLock.writeLock().unlock();
}
Expand Down Expand Up @@ -155,7 +173,8 @@ public Range getKeyHashRange() {
public ConsumerHashAssignmentsSnapshot getConsumerHashAssignmentsSnapshot() {
rwLock.readLock().lock();
try {
return consumerHashAssignmentsSnapshot;
return consumerHashAssignmentsSnapshot != null ? consumerHashAssignmentsSnapshot
: internalGetConsumerHashAssignmentsSnapshot();
} finally {
rwLock.readLock().unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
Expand Down Expand Up @@ -59,13 +60,20 @@ public class HashRangeAutoSplitStickyKeyConsumerSelector implements StickyKeyCon
private final Range keyHashRange;
private final ConcurrentSkipListMap<Integer, Consumer> rangeMap;
private final Map<Consumer, Integer> consumerRange;
private final boolean addOrRemoveReturnsImpactedConsumersResult;
private ConsumerHashAssignmentsSnapshot consumerHashAssignmentsSnapshot;

public HashRangeAutoSplitStickyKeyConsumerSelector() {
this(DEFAULT_RANGE_SIZE);
this(false);
}

public HashRangeAutoSplitStickyKeyConsumerSelector(int rangeSize) {
public HashRangeAutoSplitStickyKeyConsumerSelector(boolean addOrRemoveReturnsImpactedConsumersResult) {
this(DEFAULT_RANGE_SIZE, addOrRemoveReturnsImpactedConsumersResult);
}

public HashRangeAutoSplitStickyKeyConsumerSelector(int rangeSize,
boolean addOrRemoveReturnsImpactedConsumersResult) {
this.addOrRemoveReturnsImpactedConsumersResult = addOrRemoveReturnsImpactedConsumersResult;
if (rangeSize < 2) {
throw new IllegalArgumentException("range size must greater than 2");
}
Expand All @@ -76,11 +84,12 @@ public HashRangeAutoSplitStickyKeyConsumerSelector(int rangeSize) {
this.consumerRange = new HashMap<>();
this.rangeSize = rangeSize;
this.keyHashRange = Range.of(0, rangeSize - 1);
this.consumerHashAssignmentsSnapshot = ConsumerHashAssignmentsSnapshot.empty();
this.consumerHashAssignmentsSnapshot = addOrRemoveReturnsImpactedConsumersResult
? ConsumerHashAssignmentsSnapshot.empty() : null;
}

@Override
public synchronized CompletableFuture<ImpactedConsumersResult> addConsumer(Consumer consumer) {
public synchronized CompletableFuture<Optional<ImpactedConsumersResult>> addConsumer(Consumer consumer) {
if (rangeMap.isEmpty()) {
rangeMap.put(rangeSize, consumer);
consumerRange.put(consumer, rangeSize);
Expand All @@ -91,15 +100,18 @@ public synchronized CompletableFuture<ImpactedConsumersResult> addConsumer(Consu
return CompletableFuture.failedFuture(e);
}
}
if (!addOrRemoveReturnsImpactedConsumersResult) {
return CompletableFuture.completedFuture(Optional.empty());
}
ConsumerHashAssignmentsSnapshot assignmentsAfter = internalGetConsumerHashAssignmentsSnapshot();
ImpactedConsumersResult impactedConsumers =
consumerHashAssignmentsSnapshot.resolveImpactedConsumers(assignmentsAfter);
consumerHashAssignmentsSnapshot = assignmentsAfter;
return CompletableFuture.completedFuture(impactedConsumers);
return CompletableFuture.completedFuture(Optional.of(impactedConsumers));
}

@Override
public synchronized ImpactedConsumersResult removeConsumer(Consumer consumer) {
public synchronized Optional<ImpactedConsumersResult> removeConsumer(Consumer consumer) {
Integer removeRange = consumerRange.remove(consumer);
if (removeRange != null) {
if (removeRange == rangeSize && rangeMap.size() > 1) {
Expand All @@ -111,11 +123,14 @@ public synchronized ImpactedConsumersResult removeConsumer(Consumer consumer) {
rangeMap.remove(removeRange);
}
}
if (!addOrRemoveReturnsImpactedConsumersResult) {
return Optional.empty();
}
ConsumerHashAssignmentsSnapshot assignmentsAfter = internalGetConsumerHashAssignmentsSnapshot();
ImpactedConsumersResult impactedConsumers =
consumerHashAssignmentsSnapshot.resolveImpactedConsumers(assignmentsAfter);
consumerHashAssignmentsSnapshot = assignmentsAfter;
return impactedConsumers;
return Optional.of(impactedConsumers);
}

@Override
Expand All @@ -134,7 +149,8 @@ public Range getKeyHashRange() {

@Override
public synchronized ConsumerHashAssignmentsSnapshot getConsumerHashAssignmentsSnapshot() {
return consumerHashAssignmentsSnapshot;
return consumerHashAssignmentsSnapshot != null ? consumerHashAssignmentsSnapshot
: internalGetConsumerHashAssignmentsSnapshot();
}

private ConsumerHashAssignmentsSnapshot internalGetConsumerHashAssignmentsSnapshot() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.pulsar.client.api.Range;
Expand All @@ -38,7 +39,6 @@ public class HashRangeExclusiveStickyKeyConsumerSelector implements StickyKeyCon
private final int rangeSize;
private final Range keyHashRange;
private final ConcurrentSkipListMap<Integer, Consumer> rangeMap;
private ConsumerHashAssignmentsSnapshot consumerHashAssignmentsSnapshot;

public HashRangeExclusiveStickyKeyConsumerSelector() {
this(DEFAULT_RANGE_SIZE);
Expand All @@ -52,11 +52,10 @@ public HashRangeExclusiveStickyKeyConsumerSelector(int rangeSize) {
this.rangeSize = rangeSize;
this.keyHashRange = Range.of(0, rangeSize - 1);
this.rangeMap = new ConcurrentSkipListMap<>();
this.consumerHashAssignmentsSnapshot = ConsumerHashAssignmentsSnapshot.empty();
}

@Override
public synchronized CompletableFuture<ImpactedConsumersResult> addConsumer(Consumer consumer) {
public synchronized CompletableFuture<Optional<ImpactedConsumersResult>> addConsumer(Consumer consumer) {
return validateKeySharedMeta(consumer).thenApply(__ -> {
try {
return internalAddConsumer(consumer);
Expand All @@ -66,7 +65,7 @@ public synchronized CompletableFuture<ImpactedConsumersResult> addConsumer(Consu
});
}

private synchronized ImpactedConsumersResult internalAddConsumer(Consumer consumer)
private synchronized Optional<ImpactedConsumersResult> internalAddConsumer(Consumer consumer)
throws BrokerServiceException.ConsumerAssignException {
Consumer conflictingConsumer = findConflictingConsumer(consumer.getKeySharedMeta().getHashRangesList());
if (conflictingConsumer != null) {
Expand All @@ -77,29 +76,17 @@ private synchronized ImpactedConsumersResult internalAddConsumer(Consumer consum
rangeMap.put(intRange.getStart(), consumer);
rangeMap.put(intRange.getEnd(), consumer);
}
ConsumerHashAssignmentsSnapshot assignmentsAfter = internalGetConsumerHashAssignmentsSnapshot();
ImpactedConsumersResult impactedConsumers =
consumerHashAssignmentsSnapshot.resolveImpactedConsumers(assignmentsAfter);
consumerHashAssignmentsSnapshot = assignmentsAfter;
return impactedConsumers;
return Optional.empty();
}

@Override
public synchronized ImpactedConsumersResult removeConsumer(Consumer consumer) {
public synchronized Optional<ImpactedConsumersResult> removeConsumer(Consumer consumer) {
rangeMap.entrySet().removeIf(entry -> entry.getValue().equals(consumer));
ConsumerHashAssignmentsSnapshot assignmentsAfter = internalGetConsumerHashAssignmentsSnapshot();
ImpactedConsumersResult impactedConsumers =
consumerHashAssignmentsSnapshot.resolveImpactedConsumers(assignmentsAfter);
consumerHashAssignmentsSnapshot = assignmentsAfter;
return impactedConsumers;
return Optional.empty();
}

@Override
public synchronized ConsumerHashAssignmentsSnapshot getConsumerHashAssignmentsSnapshot() {
return consumerHashAssignmentsSnapshot;
}

private ConsumerHashAssignmentsSnapshot internalGetConsumerHashAssignmentsSnapshot() {
List<HashRangeAssignment> result = new ArrayList<>();
Map.Entry<Integer, Consumer> prev = null;
for (Map.Entry<Integer, Consumer> entry: rangeMap.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.api.Range;

Expand Down Expand Up @@ -50,7 +51,7 @@ public interface StickyKeyConsumerSelector {
* The result contains information about the existing consumers whose hash ranges were affected
* by the addition of the new consumer.
*/
CompletableFuture<ImpactedConsumersResult> addConsumer(Consumer consumer);
CompletableFuture<Optional<ImpactedConsumersResult>> addConsumer(Consumer consumer);

/**
* Remove the consumer.
Expand All @@ -59,7 +60,7 @@ public interface StickyKeyConsumerSelector {
* @return the result of impacted consumers. The result contains information about the existing consumers
* whose hash ranges were affected by the removal of the consumer.
*/
ImpactedConsumersResult removeConsumer(Consumer consumer);
Optional<ImpactedConsumersResult> removeConsumer(Consumer consumer);

/**
* Select a consumer by sticky key.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
case AUTO_SPLIT:
if (conf.isSubscriptionKeySharedUseConsistentHashing()) {
selector = new ConsistentHashingStickyKeyConsumerSelector(
conf.getSubscriptionKeySharedConsistentHashingReplicaPoints());
conf.getSubscriptionKeySharedConsistentHashingReplicaPoints(), drainingHashesRequired);
} else {
selector = new HashRangeAutoSplitStickyKeyConsumerSelector();
selector = new HashRangeAutoSplitStickyKeyConsumerSelector(drainingHashesRequired);
}
break;
case STICKY:
Expand Down Expand Up @@ -155,7 +155,7 @@ public void endBatch() {
drainingHashesTracker.endBatch();
}
});
registerDrainingHashes(consumer, impactedConsumers);
registerDrainingHashes(consumer, impactedConsumers.orElseThrow());
}
}).exceptionally(ex -> {
internalRemoveConsumer(consumer);
Expand Down Expand Up @@ -184,13 +184,13 @@ private synchronized void registerDrainingHashes(Consumer skipConsumer,
@Override
public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
// The consumer must be removed from the selector before calling the superclass removeConsumer method.
ImpactedConsumersResult impactedConsumers = selector.removeConsumer(consumer);
Optional<ImpactedConsumersResult> impactedConsumers = selector.removeConsumer(consumer);
super.removeConsumer(consumer);
if (drainingHashesRequired) {
// register draining hashes for the impacted consumers and ranges, in case a hash switched from one
// consumer to another. This will handle the case where a hash gets switched from an existing
// consumer to another existing consumer during removal.
registerDrainingHashes(consumer, impactedConsumers);
registerDrainingHashes(consumer, impactedConsumers.orElseThrow());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,8 @@ public void testShouldNotChangeSelectedConsumerWhenConsumerIsAdded() {

@Test
public void testShouldContainMinimalMappingChangesWhenConsumerLeavesAndRejoins() {
final ConsistentHashingStickyKeyConsumerSelector selector = new ConsistentHashingStickyKeyConsumerSelector(100);
final ConsistentHashingStickyKeyConsumerSelector selector =
new ConsistentHashingStickyKeyConsumerSelector(100, true);
final String consumerName = "consumer";
final int numOfInitialConsumers = 10;
List<Consumer> consumers = new ArrayList<>();
Expand Down Expand Up @@ -563,7 +564,8 @@ public void testPerformanceOfAdding1000ConsumersWith100Points() {
// test that adding 1000 consumers with 100 points runs in a reasonable time.
// This takes about 1 second on Apple M3
// this unit test can be used for basic profiling
final ConsistentHashingStickyKeyConsumerSelector selector = new ConsistentHashingStickyKeyConsumerSelector(100);
final ConsistentHashingStickyKeyConsumerSelector selector =
new ConsistentHashingStickyKeyConsumerSelector(100, true);
for (int i = 0; i < 1000; i++) {
// use real class to avoid Mockito over head
final Consumer consumer = new Consumer("consumer" + i, 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public class HashRangeAutoSplitStickyKeyConsumerSelectorTest {

@Test
public void testGetConsumerKeyHashRanges() throws BrokerServiceException.ConsumerAssignException {
HashRangeAutoSplitStickyKeyConsumerSelector selector = new HashRangeAutoSplitStickyKeyConsumerSelector(2 << 5);
HashRangeAutoSplitStickyKeyConsumerSelector selector =
new HashRangeAutoSplitStickyKeyConsumerSelector(2 << 5, false);
List<String> consumerName = Arrays.asList("consumer1", "consumer2", "consumer3", "consumer4");
List<Consumer> consumers = new ArrayList<>();
for (String s : consumerName) {
Expand All @@ -61,7 +62,8 @@ public void testGetConsumerKeyHashRanges() throws BrokerServiceException.Consume

@Test
public void testGetConsumerKeyHashRangesWithSameConsumerName() throws Exception {
HashRangeAutoSplitStickyKeyConsumerSelector selector = new HashRangeAutoSplitStickyKeyConsumerSelector(2 << 5);
HashRangeAutoSplitStickyKeyConsumerSelector selector =
new HashRangeAutoSplitStickyKeyConsumerSelector(2 << 5, false);
final String consumerName = "My-consumer";
List<Consumer> consumers = new ArrayList<>();
for (int i = 0; i < 3; i++) {
Expand Down

0 comments on commit 7209b21

Please sign in to comment.