Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(entity-client): batch entity-client ingestProposals #11787

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions datahub-frontend/app/auth/AuthModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.google.inject.Provides;
import com.google.inject.Singleton;
import com.google.inject.name.Named;
import com.linkedin.entity.client.EntityClientConfig;
import com.linkedin.entity.client.SystemEntityClient;
import com.linkedin.entity.client.SystemRestliEntityClient;
import com.linkedin.metadata.models.registry.EmptyEntityRegistry;
Expand Down Expand Up @@ -213,11 +214,13 @@ protected SystemEntityClient provideEntityClient(

return new SystemRestliEntityClient(
buildRestliClient(),
new ExponentialBackoff(configs.getInt(ENTITY_CLIENT_RETRY_INTERVAL)),
configs.getInt(ENTITY_CLIENT_NUM_RETRIES),
configurationProvider.getCache().getClient().getEntityClient(),
Math.max(1, configs.getInt(ENTITY_CLIENT_RESTLI_GET_BATCH_SIZE)),
Math.max(1, configs.getInt(ENTITY_CLIENT_RESTLI_GET_BATCH_CONCURRENCY)));
EntityClientConfig.builder()
.backoffPolicy(new ExponentialBackoff(configs.getInt(ENTITY_CLIENT_RETRY_INTERVAL)))
.retryCount(configs.getInt(ENTITY_CLIENT_NUM_RETRIES))
.batchGetV2Size(configs.getInt(ENTITY_CLIENT_RESTLI_GET_BATCH_SIZE))
.batchGetV2Concurrency(2)
.build(),
configurationProvider.getCache().getClient().getEntityClient());
}

@Provides
Expand Down
2 changes: 2 additions & 0 deletions li-utils/src/main/java/com/linkedin/metadata/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -464,5 +464,7 @@ public class Constants {
public static final String MDC_ENTITY_TYPE = "entityType";
public static final String MDC_CHANGE_TYPE = "changeType";

public static final String RESTLI_SUCCESS = "success";

private Constants() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static org.testng.AssertJUnit.assertNotNull;
import static org.testng.AssertJUnit.assertTrue;

import com.linkedin.data.schema.annotation.PathSpecBasedSchemaAnnotationVisitor;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.metadata.kafka.MCLKafkaListenerRegistrar;
import com.linkedin.metadata.kafka.hook.UpdateIndicesHook;
Expand Down Expand Up @@ -36,6 +37,12 @@ public class MCLMAESpringTest extends AbstractTestNGSpringContextTests {

@Autowired private UpdateIndicesService updateIndicesService;

static {
PathSpecBasedSchemaAnnotationVisitor.class
.getClassLoader()
.setClassAssertionStatus(PathSpecBasedSchemaAnnotationVisitor.class.getName(), false);
}

@Test
public void testHooks() {
MCLKafkaListenerRegistrar registrar =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import com.datahub.authentication.Authentication;
import com.datahub.metadata.ingestion.IngestionScheduler;
import com.linkedin.entity.client.EntityClientConfig;
import com.linkedin.entity.client.SystemEntityClient;
import com.linkedin.gms.factory.plugins.SpringStandardPluginConfiguration;
import com.linkedin.metadata.boot.kafka.DataHubUpgradeKafkaListener;
Expand Down Expand Up @@ -58,6 +59,11 @@ public class MCLSpringCommonTestConfiguration {

@MockBean public IngestionScheduler ingestionScheduler;

@Bean
public EntityClientConfig entityClientConfig() {
return EntityClientConfig.builder().build();
}

@MockBean(name = "systemEntityClient")
public SystemEntityClient systemEntityClient;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.metadata.kafka;

import com.linkedin.entity.client.EntityClientConfig;
import com.linkedin.entity.client.SystemEntityClient;
import com.linkedin.entity.client.SystemRestliEntityClient;
import com.linkedin.gms.factory.auth.SystemAuthenticationFactory;
Expand Down Expand Up @@ -39,16 +40,25 @@ public class MceConsumerApplicationTestConfiguration {
@Bean
@Primary
public SystemEntityClient systemEntityClient(
@Qualifier("configurationProvider") final ConfigurationProvider configurationProvider) {
@Qualifier("configurationProvider") final ConfigurationProvider configurationProvider,
final EntityClientConfig entityClientConfig) {
String selfUri = restTemplate.getRootUri();
final Client restClient = DefaultRestliClientFactory.getRestLiClient(URI.create(selfUri), null);
return new SystemRestliEntityClient(
restClient,
new ExponentialBackoff(1),
1,
configurationProvider.getCache().getClient().getEntityClient(),
1,
2);
entityClientConfig,
configurationProvider.getCache().getClient().getEntityClient());
}

@Bean
@Primary
public EntityClientConfig entityClientConfig() {
return EntityClientConfig.builder()
.backoffPolicy(new ExponentialBackoff(1))
.retryCount(1)
.batchGetV2Size(1)
.batchGetV2Concurrency(2)
.build();
}

@MockBean public Database ebeanServer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,10 +457,19 @@ entityClient:
java:
get:
batchSize: ${ENTITY_CLIENT_JAVA_GET_BATCH_SIZE:375} # matches EbeanAspectDao batch size
ingest:
batchSize: ${ENTITY_CLIENT_JAVA_INGEST_BATCH_SIZE:375}
restli:
get:
batchSize: ${ENTITY_CLIENT_RESTLI_GET_BATCH_SIZE:100} # limited to prevent exceeding restli URI size limit
batchConcurrency: ${ENTITY_CLIENT_RESTLI_GET_BATCH_CONCURRENCY:2} # parallel threads
batchQueueSize: ${ENTITY_CLIENT_RESTLI_GET_BATCH_QUEUE_SIZE:500}
batchThreadKeepAlive: ${ENTITY_CLIENT_RESTLI_GET_BATCH_THREAD_KEEP_ALIVE:60}
ingest:
batchSize: ${ENTITY_CLIENT_RESTLI_INGEST_BATCH_SIZE:50} # limited to prevent exceeding restli timeouts
batchConcurrency: ${ENTITY_CLIENT_RESTLI_INGEST_BATCH_CONCURRENCY:2} # parallel threads
batchQueueSize: ${ENTITY_CLIENT_RESTLI_INGEST_BATCH_QUEUE_SIZE:500}
batchThreadKeepAlive: ${ENTITY_CLIENT_RESTLI_INGEST_BATCH_THREAD_KEEP_ALIVE:60}

usageClient:
retryInterval: ${USAGE_CLIENT_RETRY_INTERVAL:2}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package com.linkedin.gms.factory.entityclient;

import com.linkedin.entity.client.EntityClientConfig;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.metadata.config.cache.client.EntityClientCacheConfig;
import com.linkedin.parseq.retry.backoff.ExponentialBackoff;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

Expand All @@ -14,4 +17,30 @@ public EntityClientCacheConfig entityClientCacheConfig(
@Qualifier("configurationProvider") final ConfigurationProvider configurationProvider) {
return configurationProvider.getCache().getClient().getEntityClient();
}

@Bean
public EntityClientConfig entityClientConfig(
final @Value("${entityClient.retryInterval:2}") int retryInterval,
final @Value("${entityClient.numRetries:3}") int numRetries,
final @Value("${entityClient.restli.get.batchSize}") int batchGetV2Size,
final @Value("${entityClient.restli.get.batchConcurrency}") int batchGetV2Concurrency,
final @Value("${entityClient.restli.get.batchQueueSize}") int batchGetV2QueueSize,
final @Value("${entityClient.restli.get.batchThreadKeepAlive}") int batchGetV2KeepAlive,
final @Value("${entityClient.restli.ingest.batchSize}") int batchIngestSize,
final @Value("${entityClient.restli.ingest.batchConcurrency}") int batchIngestConcurrency,
final @Value("${entityClient.restli.ingest.batchQueueSize}") int batchIngestQueueSize,
final @Value("${entityClient.restli.ingest.batchThreadKeepAlive}") int batchIngestKeepAlive) {
return EntityClientConfig.builder()
.backoffPolicy(new ExponentialBackoff(retryInterval))
.retryCount(numRetries)
.batchGetV2Size(batchGetV2Size)
.batchGetV2Concurrency(batchGetV2Concurrency)
.batchGetV2QueueSize(batchGetV2QueueSize)
.batchGetV2KeepAlive(batchGetV2KeepAlive)
.batchIngestSize(batchIngestSize)
.batchIngestConcurrency(batchIngestConcurrency)
.batchIngestQueueSize(batchIngestQueueSize)
.batchIngestKeepAlive(batchIngestKeepAlive)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.gms.factory.entityclient;

import com.linkedin.entity.client.EntityClient;
import com.linkedin.entity.client.EntityClientConfig;
import com.linkedin.entity.client.SystemEntityClient;
import com.linkedin.metadata.client.JavaEntityClient;
import com.linkedin.metadata.client.SystemJavaEntityClient;
Expand All @@ -16,7 +17,6 @@
import com.linkedin.metadata.timeseries.TimeseriesAspectService;
import javax.inject.Singleton;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand All @@ -39,7 +39,7 @@ public EntityClient entityClient(
final @Qualifier("relationshipSearchService") LineageSearchService _lineageSearchService,
final @Qualifier("kafkaEventProducer") EventProducer _eventProducer,
final RollbackService rollbackService,
final @Value("${entityClient.restli.get.batchSize:375}") int batchGetV2Size) {
final EntityClientConfig entityClientConfig) {
return new JavaEntityClient(
_entityService,
_deleteEntityService,
Expand All @@ -50,7 +50,7 @@ public EntityClient entityClient(
_timeseriesAspectService,
rollbackService,
_eventProducer,
batchGetV2Size);
entityClientConfig.getBatchGetV2Size());
}

@Bean("systemEntityClient")
Expand All @@ -67,7 +67,7 @@ public SystemEntityClient systemEntityClient(
final @Qualifier("kafkaEventProducer") EventProducer _eventProducer,
final RollbackService rollbackService,
final EntityClientCacheConfig entityClientCacheConfig,
final @Value("${entityClient.restli.get.batchSize:375}") int batchGetV2Size) {
final EntityClientConfig entityClientConfig) {
return new SystemJavaEntityClient(
_entityService,
_deleteEntityService,
Expand All @@ -79,6 +79,6 @@ public SystemEntityClient systemEntityClient(
rollbackService,
_eventProducer,
entityClientCacheConfig,
batchGetV2Size);
entityClientConfig.getBatchGetV2Size());
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package com.linkedin.gms.factory.entityclient;

import com.linkedin.entity.client.EntityClient;
import com.linkedin.entity.client.EntityClientConfig;
import com.linkedin.entity.client.RestliEntityClient;
import com.linkedin.entity.client.SystemEntityClient;
import com.linkedin.entity.client.SystemRestliEntityClient;
import com.linkedin.metadata.config.cache.client.EntityClientCacheConfig;
import com.linkedin.metadata.restli.DefaultRestliClientFactory;
import com.linkedin.parseq.retry.backoff.ExponentialBackoff;
import com.linkedin.restli.client.Client;
import java.net.URI;
import javax.inject.Singleton;
Expand All @@ -28,23 +28,15 @@ public EntityClient entityClient(
@Value("${datahub.gms.useSSL}") boolean gmsUseSSL,
@Value("${datahub.gms.uri}") String gmsUri,
@Value("${datahub.gms.sslContext.protocol}") String gmsSslProtocol,
@Value("${entityClient.retryInterval:2}") int retryInterval,
@Value("${entityClient.numRetries:3}") int numRetries,
final @Value("${entityClient.restli.get.batchSize}") int batchGetV2Size,
final @Value("${entityClient.restli.get.batchConcurrency}") int batchGetV2Concurrency) {
final EntityClientConfig entityClientConfig) {
final Client restClient;
if (gmsUri != null) {
restClient = DefaultRestliClientFactory.getRestLiClient(URI.create(gmsUri), gmsSslProtocol);
} else {
restClient =
DefaultRestliClientFactory.getRestLiClient(gmsHost, gmsPort, gmsUseSSL, gmsSslProtocol);
}
return new RestliEntityClient(
restClient,
new ExponentialBackoff(retryInterval),
numRetries,
batchGetV2Size,
batchGetV2Concurrency);
return new RestliEntityClient(restClient, entityClientConfig);
}

@Bean("systemEntityClient")
Expand All @@ -55,11 +47,8 @@ public SystemEntityClient systemEntityClient(
@Value("${datahub.gms.useSSL}") boolean gmsUseSSL,
@Value("${datahub.gms.uri}") String gmsUri,
@Value("${datahub.gms.sslContext.protocol}") String gmsSslProtocol,
@Value("${entityClient.retryInterval:2}") int retryInterval,
@Value("${entityClient.numRetries:3}") int numRetries,
final EntityClientCacheConfig entityClientCacheConfig,
final @Value("${entityClient.restli.get.batchSize}") int batchGetV2Size,
final @Value("${entityClient.restli.get.batchConcurrency}") int batchGetV2Concurrency) {
final EntityClientConfig entityClientConfig) {

final Client restClient;
if (gmsUri != null) {
Expand All @@ -68,12 +57,6 @@ public SystemEntityClient systemEntityClient(
restClient =
DefaultRestliClientFactory.getRestLiClient(gmsHost, gmsPort, gmsUseSSL, gmsSslProtocol);
}
return new SystemRestliEntityClient(
restClient,
new ExponentialBackoff(retryInterval),
numRetries,
entityClientCacheConfig,
batchGetV2Size,
batchGetV2Concurrency);
return new SystemRestliEntityClient(restClient, entityClientConfig, entityClientCacheConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import com.datahub.authentication.Authentication;
import com.linkedin.common.callback.FutureCallback;
import com.linkedin.entity.client.EntityClientConfig;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import com.linkedin.parseq.retry.backoff.BackoffPolicy;
import com.linkedin.r2.RemoteInvocationException;
import com.linkedin.restli.client.AbstractRequestBuilder;
import com.linkedin.restli.client.Client;
Expand All @@ -19,17 +19,15 @@
@Slf4j
public abstract class BaseClient implements AutoCloseable {

protected final Client _client;
protected final BackoffPolicy _backoffPolicy;
protected final int _retryCount;
protected final Client client;
protected final EntityClientConfig entityClientConfig;

protected static final Set<String> NON_RETRYABLE =
Set.of("com.linkedin.data.template.RequiredFieldNotPresentException");

protected BaseClient(@Nonnull Client restliClient, BackoffPolicy backoffPolicy, int retryCount) {
_client = Objects.requireNonNull(restliClient);
_backoffPolicy = backoffPolicy;
_retryCount = retryCount;
protected BaseClient(@Nonnull Client restliClient, EntityClientConfig entityClientConfig) {
client = Objects.requireNonNull(restliClient);
this.entityClientConfig = entityClientConfig;
}

protected <T> Response<T> sendClientRequest(
Expand All @@ -52,9 +50,9 @@ protected <T> Response<T> sendClientRequest(

int attemptCount = 0;

while (attemptCount < _retryCount + 1) {
while (attemptCount < entityClientConfig.getRetryCount() + 1) {
try {
return _client.sendRequest(requestBuilder.build()).getResponse();
return client.sendRequest(requestBuilder.build()).getResponse();
} catch (Throwable ex) {
MetricUtils.counter(
BaseClient.class,
Expand All @@ -66,12 +64,13 @@ protected <T> Response<T> sendClientRequest(
|| (ex.getCause() != null
&& NON_RETRYABLE.contains(ex.getCause().getClass().getCanonicalName()));

if (attemptCount == _retryCount || skipRetry) {
if (attemptCount == entityClientConfig.getRetryCount() || skipRetry) {
throw ex;
} else {
attemptCount = attemptCount + 1;
try {
Thread.sleep(_backoffPolicy.nextBackoff(attemptCount, ex) * 1000);
Thread.sleep(
entityClientConfig.getBackoffPolicy().nextBackoff(attemptCount, ex) * 1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Expand All @@ -84,6 +83,6 @@ protected <T> Response<T> sendClientRequest(

@Override
public void close() {
_client.shutdown(new FutureCallback<>());
client.shutdown(new FutureCallback<>());
}
}
Loading
Loading