Skip to content

Commit

Permalink
[ML] Refactoring inference HTTP Client to allow dynamic settings upda…
Browse files Browse the repository at this point in the history
…tes (elastic#100541)

* Refactoring to allow for settings updating

* Allowing requests to queue

* Testing getHttpClient
  • Loading branch information
jonathan-buttner authored Oct 10, 2023
1 parent 99f0b5c commit 07f6524
Show file tree
Hide file tree
Showing 7 changed files with 395 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
import org.elasticsearch.xpack.inference.action.TransportGetInferenceModelAction;
import org.elasticsearch.xpack.inference.action.TransportInferenceAction;
import org.elasticsearch.xpack.inference.action.TransportPutInferenceModelAction;
import org.elasticsearch.xpack.inference.external.http.HttpClient;
import org.elasticsearch.xpack.inference.external.http.HttpClientManager;
import org.elasticsearch.xpack.inference.external.http.HttpSettings;
import org.elasticsearch.xpack.inference.registry.ModelRegistry;
import org.elasticsearch.xpack.inference.rest.RestDeleteInferenceModelAction;
Expand All @@ -62,13 +62,16 @@
import java.util.Collection;
import java.util.List;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class InferencePlugin extends Plugin implements ActionPlugin, InferenceServicePlugin, SystemIndexPlugin {

public static final String NAME = "inference";
public static final String UTILITY_THREAD_POOL_NAME = "inference_utility";
public static final String HTTP_CLIENT_SENDER_THREAD_POOL_NAME = "inference_http_client_sender";
private final Settings settings;
private final SetOnce<HttpClient> httpClient = new SetOnce<>();
private final SetOnce<HttpClientManager> httpClientManager = new SetOnce<>();

public InferencePlugin(Settings settings) {
this.settings = settings;
Expand Down Expand Up @@ -119,8 +122,7 @@ public Collection<Object> createComponents(
AllocationService allocationService,
IndicesService indicesService
) {
var httpSettings = new HttpSettings(settings, clusterService);
httpClient.set(HttpClient.create(httpSettings, threadPool));
httpClientManager.set(HttpClientManager.create(settings, threadPool, clusterService));

ModelRegistry modelRegistry = new ModelRegistry(client);
return List.of(modelRegistry);
Expand Down Expand Up @@ -154,22 +156,35 @@ public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings sett
}

@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings unused) {
ScalingExecutorBuilder utility = new ScalingExecutorBuilder(
UTILITY_THREAD_POOL_NAME,
0,
1,
TimeValue.timeValueMinutes(10),
false,
"xpack.inference.utility_thread_pool"
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settingsToUse) {
return List.of(
new ScalingExecutorBuilder(
UTILITY_THREAD_POOL_NAME,
0,
1,
TimeValue.timeValueMinutes(10),
false,
"xpack.inference.utility_thread_pool"
),
/*
* This executor is specifically for enqueuing requests to be sent. The underlying
* connection pool used by the http client will block if there are no available connections to lease.
* See here for more info: https://hc.apache.org/httpcomponents-client-4.5.x/current/tutorial/html/connmgmt.html
*/
new ScalingExecutorBuilder(
HTTP_CLIENT_SENDER_THREAD_POOL_NAME,
0,
1,
TimeValue.timeValueMinutes(10),
false,
"xpack.inference.http_client_sender_thread_pool"
)
);

return List.of(utility);
}

@Override
public List<Setting<?>> getSettings() {
return HttpSettings.getSettings();
return Stream.concat(HttpSettings.getSettings().stream(), HttpClientManager.getSettings().stream()).collect(Collectors.toList());
}

@Override
Expand All @@ -194,8 +209,8 @@ public List<NamedWriteableRegistry.Entry> getInferenceServiceNamedWriteables() {

@Override
public void close() {
if (httpClient.get() != null) {
IOUtils.closeWhileHandlingException(httpClient.get());
if (httpClientManager.get() != null) {
IOUtils.closeWhileHandlingException(httpClientManager.get());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.nio.reactor.ConnectingIOReactor;
import org.apache.http.nio.reactor.IOReactorException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
Expand All @@ -29,6 +26,7 @@
import java.util.concurrent.atomic.AtomicReference;

import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.xpack.inference.InferencePlugin.HTTP_CLIENT_SENDER_THREAD_POOL_NAME;
import static org.elasticsearch.xpack.inference.InferencePlugin.UTILITY_THREAD_POOL_NAME;

public class HttpClient implements Closeable {
Expand All @@ -41,45 +39,19 @@ enum Status {
}

private final CloseableHttpAsyncClient client;
private final IdleConnectionEvictor connectionEvictor;
private final AtomicReference<Status> status = new AtomicReference<>(Status.CREATED);
private final ThreadPool threadPool;
private final HttpSettings settings;

public static HttpClient create(HttpSettings settings, ThreadPool threadPool) {
PoolingNHttpClientConnectionManager connectionManager = createConnectionManager();
IdleConnectionEvictor connectionEvictor = new IdleConnectionEvictor(
threadPool,
connectionManager,
settings.getEvictionInterval(),
settings.getEvictionMaxIdle()
);
public static HttpClient create(HttpSettings settings, ThreadPool threadPool, PoolingNHttpClientConnectionManager connectionManager) {
CloseableHttpAsyncClient client = createAsyncClient(connectionManager);

int maxConnections = settings.getMaxConnections();
CloseableHttpAsyncClient client = createAsyncClient(connectionManager, maxConnections);

return new HttpClient(settings, client, connectionEvictor, threadPool);
}

private static PoolingNHttpClientConnectionManager createConnectionManager() {
ConnectingIOReactor ioReactor;
try {
ioReactor = new DefaultConnectingIOReactor();
} catch (IOReactorException e) {
var message = "Failed to initialize the inference http client";
logger.error(message, e);
throw new ElasticsearchException(message, e);
}

return new PoolingNHttpClientConnectionManager(ioReactor);
return new HttpClient(settings, client, threadPool);
}

private static CloseableHttpAsyncClient createAsyncClient(PoolingNHttpClientConnectionManager connectionManager, int maxConnections) {
private static CloseableHttpAsyncClient createAsyncClient(PoolingNHttpClientConnectionManager connectionManager) {
HttpAsyncClientBuilder clientBuilder = HttpAsyncClientBuilder.create();

clientBuilder.setConnectionManager(connectionManager);
clientBuilder.setMaxConnPerRoute(maxConnections);
clientBuilder.setMaxConnTotal(maxConnections);
// The apache client will be shared across all connections because it can be expensive to create it
// so we don't want to support cookies to avoid accidental authentication for unauthorized users
clientBuilder.disableCookieManagement();
Expand All @@ -88,24 +60,32 @@ private static CloseableHttpAsyncClient createAsyncClient(PoolingNHttpClientConn
}

// Default for testing
HttpClient(HttpSettings settings, CloseableHttpAsyncClient asyncClient, IdleConnectionEvictor evictor, ThreadPool threadPool) {
HttpClient(HttpSettings settings, CloseableHttpAsyncClient asyncClient, ThreadPool threadPool) {
this.settings = settings;
this.threadPool = threadPool;
this.client = asyncClient;
this.connectionEvictor = evictor;
}

public void start() {
if (status.compareAndSet(Status.CREATED, Status.STARTED)) {
client.start();
connectionEvictor.start();
}
}

public void send(HttpUriRequest request, ActionListener<HttpResult> listener) throws IOException {
public void send(HttpUriRequest request, ActionListener<HttpResult> listener) {
// The caller must call start() first before attempting to send a request
assert status.get() == Status.STARTED;

threadPool.executor(HTTP_CLIENT_SENDER_THREAD_POOL_NAME).execute(() -> {
try {
doPrivilegedSend(request, listener);
} catch (IOException e) {
listener.onFailure(new ElasticsearchException(format("Failed to send request [%s]", request.getRequestLine()), e));
}
});
}

private void doPrivilegedSend(HttpUriRequest request, ActionListener<HttpResult> listener) throws IOException {
SocketAccess.doPrivileged(() -> client.execute(request, new FutureCallback<>() {
@Override
public void completed(HttpResponse response) {
Expand Down Expand Up @@ -144,6 +124,5 @@ private void failUsingUtilityThread(Exception exception, ActionListener<HttpResu
public void close() throws IOException {
status.set(Status.STOPPED);
client.close();
connectionEvictor.stop();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.inference.external.http;

import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.nio.reactor.ConnectingIOReactor;
import org.apache.http.nio.reactor.IOReactorException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;

public class HttpClientManager implements Closeable {
private static final Logger logger = LogManager.getLogger(HttpClientManager.class);
/**
* From googling around the connection pools maxTotal value should be close to the number of available threads.
*
* https://stackoverflow.com/questions/30989637/how-to-decide-optimal-settings-for-setmaxtotal-and-setdefaultmaxperroute
*/
static final Setting<Integer> MAX_CONNECTIONS = Setting.intSetting(
"xpack.inference.http.max_connections",
// TODO pick a reasonable values here
20,
1,
1000,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

private static final TimeValue DEFAULT_CONNECTION_EVICTION_THREAD_INTERVAL_TIME = TimeValue.timeValueSeconds(10);
static final Setting<TimeValue> CONNECTION_EVICTION_THREAD_INTERVAL_SETTING = Setting.timeSetting(
"xpack.inference.http.connection_eviction_interval",
DEFAULT_CONNECTION_EVICTION_THREAD_INTERVAL_TIME,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

private static final TimeValue DEFAULT_CONNECTION_EVICTION_MAX_IDLE_TIME_SETTING = DEFAULT_CONNECTION_EVICTION_THREAD_INTERVAL_TIME;
static final Setting<TimeValue> CONNECTION_EVICTION_MAX_IDLE_TIME_SETTING = Setting.timeSetting(
"xpack.inference.http.connection_eviction_max_idle_time",
DEFAULT_CONNECTION_EVICTION_MAX_IDLE_TIME_SETTING,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

private final ThreadPool threadPool;
private final PoolingNHttpClientConnectionManager connectionManager;
private EvictorSettings evictorSettings;
private IdleConnectionEvictor connectionEvictor;
private final HttpClient httpClient;

public static HttpClientManager create(Settings settings, ThreadPool threadPool, ClusterService clusterService) {
PoolingNHttpClientConnectionManager connectionManager = createConnectionManager();
return new HttpClientManager(settings, connectionManager, threadPool, clusterService);
}

// Default for testing
HttpClientManager(
Settings settings,
PoolingNHttpClientConnectionManager connectionManager,
ThreadPool threadPool,
ClusterService clusterService
) {
this.threadPool = threadPool;

this.connectionManager = connectionManager;
setMaxConnections(MAX_CONNECTIONS.get(settings));

this.httpClient = HttpClient.create(new HttpSettings(settings, clusterService), threadPool, connectionManager);

evictorSettings = new EvictorSettings(settings);
connectionEvictor = createConnectionEvictor();

this.addSettingsUpdateConsumers(clusterService);
}

private static PoolingNHttpClientConnectionManager createConnectionManager() {
ConnectingIOReactor ioReactor;
try {
ioReactor = new DefaultConnectingIOReactor();
} catch (IOReactorException e) {
var message = "Failed to initialize the inference http client manager";
logger.error(message, e);
throw new ElasticsearchException(message, e);
}

return new PoolingNHttpClientConnectionManager(ioReactor);
}

private void addSettingsUpdateConsumers(ClusterService clusterService) {
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_CONNECTIONS, this::setMaxConnections);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(CONNECTION_EVICTION_THREAD_INTERVAL_SETTING, this::setEvictionInterval);
clusterService.getClusterSettings().addSettingsUpdateConsumer(CONNECTION_EVICTION_MAX_IDLE_TIME_SETTING, this::setEvictionMaxIdle);
}

private IdleConnectionEvictor createConnectionEvictor() {
return new IdleConnectionEvictor(threadPool, connectionManager, evictorSettings.evictionInterval, evictorSettings.evictionMaxIdle);
}

public static List<Setting<?>> getSettings() {
return List.of(MAX_CONNECTIONS, CONNECTION_EVICTION_THREAD_INTERVAL_SETTING, CONNECTION_EVICTION_MAX_IDLE_TIME_SETTING);
}

public void start() {
httpClient.start();
connectionEvictor.start();
}

public HttpClient getHttpClient() {
return httpClient;
}

@Override
public void close() throws IOException {
httpClient.close();
connectionEvictor.stop();
}

private void setMaxConnections(int maxConnections) {
connectionManager.setMaxTotal(maxConnections);
connectionManager.setDefaultMaxPerRoute(maxConnections);
}

// default for testing
void setEvictionInterval(TimeValue evictionInterval) {
evictorSettings = new EvictorSettings(evictionInterval, evictorSettings.evictionMaxIdle);

connectionEvictor.stop();
connectionEvictor = createConnectionEvictor();
connectionEvictor.start();
}

void setEvictionMaxIdle(TimeValue evictionMaxIdle) {
evictorSettings = new EvictorSettings(evictorSettings.evictionInterval, evictionMaxIdle);

connectionEvictor.stop();
connectionEvictor = createConnectionEvictor();
connectionEvictor.start();
}

private static class EvictorSettings {
private final TimeValue evictionInterval;
private final TimeValue evictionMaxIdle;

EvictorSettings(Settings settings) {
this.evictionInterval = CONNECTION_EVICTION_THREAD_INTERVAL_SETTING.get(settings);
this.evictionMaxIdle = CONNECTION_EVICTION_MAX_IDLE_TIME_SETTING.get(settings);
}

EvictorSettings(TimeValue evictionInterval, TimeValue evictionMaxIdle) {
this.evictionInterval = evictionInterval;
this.evictionMaxIdle = evictionMaxIdle;
}
}
}
Loading

0 comments on commit 07f6524

Please sign in to comment.