Skip to content

Commit

Permalink
Experiment work stealing pools
Browse files Browse the repository at this point in the history
  • Loading branch information
pderop committed Jan 11, 2024
1 parent 3dae6b7 commit 4b27763
Show file tree
Hide file tree
Showing 14 changed files with 1,441 additions and 20 deletions.
1 change: 1 addition & 0 deletions reactor-pool/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ task japicmp(type: JapicmpTask) {
classExcludes = [
]
methodExcludes = [
"reactor.pool.decorators.InstrumentedPoolDecorators#concurrentPools(int, org.reactivestreams.Publisher, java.util.function.Function)"
]
}
check.dependsOn japicmp
Expand Down
24 changes: 17 additions & 7 deletions reactor-pool/src/main/java/reactor/pool/AbstractPool.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2019-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,7 @@
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import org.reactivestreams.Publisher;
Expand Down Expand Up @@ -390,7 +391,7 @@ static final class Borrower<POOLABLE> extends AtomicBoolean implements Scannable
static final Disposable TIMEOUT_DISPOSED = Disposables.disposed();

final CoreSubscriber<? super AbstractPooledRef<POOLABLE>> actual;
final AbstractPool<POOLABLE> pool;
final AtomicReference<AbstractPool<POOLABLE>> pool;
final Duration pendingAcquireTimeout;

long pendingAcquireStart;
Expand All @@ -400,7 +401,7 @@ static final class Borrower<POOLABLE> extends AtomicBoolean implements Scannable
AbstractPool<POOLABLE> pool,
Duration pendingAcquireTimeout) {
this.actual = actual;
this.pool = pool;
this.pool = new AtomicReference<>(pool);
this.pendingAcquireTimeout = pendingAcquireTimeout;
this.timeoutTask = TIMEOUT_DISPOSED;
}
Expand All @@ -414,7 +415,7 @@ public void run() {
if (Borrower.this.compareAndSet(false, true)) {
// this is failure, a timeout was observed
stopPendingCountdown(false);
pool.cancelAcquire(Borrower.this);
pool().cancelAcquire(Borrower.this);
actual.onError(new PoolAcquireTimeoutException(pendingAcquireTimeout));
}
}
Expand All @@ -423,13 +424,13 @@ public void run() {
public void request(long n) {
if (Operators.validate(n)) {
//start the countdown

AbstractPool<POOLABLE> pool = pool();
boolean noIdle = pool.idleSize() == 0;
boolean noPermits = pool.poolConfig.allocationStrategy().estimatePermitCount() == 0;

if (!pendingAcquireTimeout.isZero() && noIdle && noPermits) {
pendingAcquireStart = pool.clock.millis();
timeoutTask = this.pool.config().pendingAcquireTimer().apply(this, pendingAcquireTimeout);
timeoutTask = pool.config().pendingAcquireTimer().apply(this, pendingAcquireTimeout);
}
//doAcquire should interrupt the countdown if there is either an available
//resource or the pool can allocate one
Expand All @@ -442,6 +443,7 @@ public void request(long n) {
*/
void stopPendingCountdown(boolean success) {
if (!timeoutTask.isDisposed()) {
AbstractPool<POOLABLE> pool = pool();
if (success) {
pool.metricsRecorder.recordPendingSuccessAndLatency(pool.clock.millis() - pendingAcquireStart);
} else {
Expand All @@ -454,7 +456,7 @@ void stopPendingCountdown(boolean success) {
@Override
public void cancel() {
set(true);
pool.cancelAcquire(this);
pool().cancelAcquire(this);
stopPendingCountdown(true); // this is not failure, the subscription was canceled
}

Expand Down Expand Up @@ -493,6 +495,14 @@ void fail(Throwable error) {
public String toString() {
return get() ? "Borrower(cancelled)" : "Borrower";
}

AbstractPool<POOLABLE> pool() {
return pool.get();
}

void setPool(AbstractPool<POOLABLE> replace) {
pool.set(replace);
}
}

}
32 changes: 30 additions & 2 deletions reactor-pool/src/main/java/reactor/pool/DefaultPoolConfig.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2019-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,7 +27,6 @@
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/**
* A default {@link PoolConfig} that can be extended to bear more configuration options
Expand All @@ -50,6 +49,7 @@ public class DefaultPoolConfig<POOLABLE> implements PoolConfig<POOLABLE> {
protected final PoolMetricsRecorder metricsRecorder;
protected final Clock clock;
protected final boolean isIdleLRU;
protected final ResourceManager resourceManager;

public DefaultPoolConfig(Mono<POOLABLE> allocator,
AllocationStrategy allocationStrategy,
Expand All @@ -64,6 +64,26 @@ public DefaultPoolConfig(Mono<POOLABLE> allocator,
PoolMetricsRecorder metricsRecorder,
Clock clock,
boolean isIdleLRU) {
this(allocator, allocationStrategy, maxPending, pendingAcquireTimer, releaseHandler, destroyHandler,
evictionPredicate, evictInBackgroundInterval, evictInBackgroundScheduler, acquisitionScheduler,
metricsRecorder, clock, isIdleLRU,
PoolBuilder.DEFAULT_RESOURCE_MANAGER);
}

public DefaultPoolConfig(Mono<POOLABLE> allocator,
AllocationStrategy allocationStrategy,
int maxPending,
BiFunction<Runnable, Duration, Disposable> pendingAcquireTimer,
Function<POOLABLE, ? extends Publisher<Void>> releaseHandler,
Function<POOLABLE, ? extends Publisher<Void>> destroyHandler,
BiPredicate<POOLABLE, PooledRefMetadata> evictionPredicate,
Duration evictInBackgroundInterval,
Scheduler evictInBackgroundScheduler,
Scheduler acquisitionScheduler,
PoolMetricsRecorder metricsRecorder,
Clock clock,
boolean isIdleLRU,
ResourceManager resourceManager) {
this.pendingAcquireTimer = pendingAcquireTimer;
this.allocator = allocator;
this.allocationStrategy = allocationStrategy;
Expand All @@ -77,6 +97,7 @@ public DefaultPoolConfig(Mono<POOLABLE> allocator,
this.metricsRecorder = metricsRecorder;
this.clock = clock;
this.isIdleLRU = isIdleLRU;
this.resourceManager = resourceManager;
}

/**
Expand All @@ -101,6 +122,7 @@ protected DefaultPoolConfig(PoolConfig<POOLABLE> toCopy) {
this.metricsRecorder = toCopyDpc.metricsRecorder;
this.clock = toCopyDpc.clock;
this.isIdleLRU = toCopyDpc.isIdleLRU;
this.resourceManager = toCopyDpc.resourceManager;
}
else {
this.allocator = toCopy.allocator();
Expand All @@ -116,6 +138,7 @@ protected DefaultPoolConfig(PoolConfig<POOLABLE> toCopy) {
this.metricsRecorder = toCopy.metricsRecorder();
this.clock = toCopy.clock();
this.isIdleLRU = toCopy.reuseIdleResourcesInLruOrder();
this.resourceManager = toCopy.resourceManager();
}
}

Expand Down Expand Up @@ -183,4 +206,9 @@ public Clock clock() {
public boolean reuseIdleResourcesInLruOrder() {
return isIdleLRU;
}

@Override
public ResourceManager resourceManager() {
return resourceManager;
}
}
12 changes: 11 additions & 1 deletion reactor-pool/src/main/java/reactor/pool/InstrumentedPool.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2019-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,6 +31,16 @@ public interface InstrumentedPool<POOLABLE> extends Pool<POOLABLE> {
*/
PoolMetrics metrics();

/**
* Estimates if the pool can currently either reuse or create some resources
* @return true if the pool can currently either reuse or create some resources, false if no idles resources are
* currently available and no more resources can be currently created.
*/
default boolean hasAvailableResources() {
PoolMetrics pm = metrics();
return (pm.idleSize() + config().allocationStrategy().estimatePermitCount()) - pm.pendingAcquireSize() >= 0;
}

/**
* An object that can be used to get live information about a {@link Pool}, suitable
* for gauge metrics.
Expand Down
12 changes: 11 additions & 1 deletion reactor-pool/src/main/java/reactor/pool/Pool.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2018-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -178,4 +178,14 @@ default void dispose() {
* @return a Mono triggering the shutdown of the pool once subscribed.
*/
Mono<Void> disposeLater();

/**
* Transfer some pending borrowers from another pool into this pool.
*
* @param from another pool to steal resources from
* @return true if some borrowers have been moved from <code>from</code> into this pool instance
*/
default boolean transferBorrowersFrom(InstrumentedPool<POOLABLE> from) {
return false;
}
}
14 changes: 12 additions & 2 deletions reactor-pool/src/main/java/reactor/pool/PoolBuilder.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2023 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2018-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -79,6 +79,8 @@ public static <T> PoolBuilder<T, PoolConfig<T>> from(Publisher<? extends T> allo
boolean idleLruOrder = true;
BiFunction<Runnable, Duration, Disposable> pendingAcquireTimer = DEFAULT_PENDING_ACQUIRE_TIMER;

ResourceManager resourceManager = DEFAULT_RESOURCE_MANAGER;

PoolBuilder(Mono<T> allocator, Function<PoolConfig<T>, CONF> configModifier) {
this.allocator = allocator;
this.configModifier = configModifier;
Expand Down Expand Up @@ -445,6 +447,11 @@ public PoolBuilder<T, CONF> idleResourceReuseOrder(boolean isLru) {
return this;
}

public PoolBuilder<T, CONF> resourceManager(ResourceManager resourceManager) {
this.resourceManager = resourceManager;
return this;
}

/**
* Add implementation-specific configuration, changing the type of {@link PoolConfig}
* passed to the {@link Pool} factory in {@link #build(Function)}.
Expand Down Expand Up @@ -508,7 +515,8 @@ CONF buildConfig() {
acquisitionScheduler,
metricsRecorder,
clock,
idleLruOrder);
idleLruOrder,
resourceManager);

return this.configModifier.apply(baseConfig);
}
Expand All @@ -531,4 +539,6 @@ static <T> BiPredicate<T, PooledRefMetadata> idlePredicate(Duration maxIdleTime)
static final BiPredicate<?, ?> NEVER_PREDICATE = (ignored1, ignored2) -> false;
static final BiFunction<Runnable, Duration, Disposable> DEFAULT_PENDING_ACQUIRE_TIMER = (r, d) -> Schedulers.parallel().schedule(r, d.toNanos(), TimeUnit.NANOSECONDS);
static final int DEFAULT_WARMUP_PARALLELISM = 1;

static final ResourceManager DEFAULT_RESOURCE_MANAGER = () -> {};
}
6 changes: 5 additions & 1 deletion reactor-pool/src/main/java/reactor/pool/PoolConfig.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2018-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -141,4 +141,8 @@ default BiFunction<Runnable, Duration, Disposable> pendingAcquireTimer() {
return PoolBuilder.DEFAULT_PENDING_ACQUIRE_TIMER;
}

default ResourceManager resourceManager() {
return PoolBuilder.DEFAULT_RESOURCE_MANAGER;
}

}
39 changes: 39 additions & 0 deletions reactor-pool/src/main/java/reactor/pool/PoolScheduler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.pool;

import java.util.List;

/**
* A Pool that can schedule resource acquisition among multiple sub pools,
* each managing a portion of resources. Resource acquisitions will
* be concurrently distributed across sub pools using sub pool executors, in a work stealing style.
*/
public interface PoolScheduler<T> extends InstrumentedPool<T> {
/**
* Get the number of borrowers steal count (only if the Scheduler supports work stealing).
*
* @return the number of Pool steal count, or -1
*/
long stealCount();

/**
* Returns the number of sub pools managed this this scheduler.
* @return the number of sub pools managed this this scheduler
*/
List<InstrumentedPool<T>> getPools();
}
30 changes: 30 additions & 0 deletions reactor-pool/src/main/java/reactor/pool/ResourceManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (c) 2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.pool;

/**
* A resource manager utilized by concrete Pool implementations. This manager enables Pools to interact
* with its pool scheduler, if there is one enabled.
* Pools can access their resource manager via the {@link PoolConfig#resourceManager()} method.
*/
public interface ResourceManager {
/**
* Notifies the pool scheduler that some resources can be acquired from the current pool because either certain
* resources are currently estimated to be idle or available for allocation.
*/
void resourceAvailable();
}
Loading

0 comments on commit 4b27763

Please sign in to comment.