Skip to content

Commit

Permalink
Fix repo initialisation in remote routing table
Browse files Browse the repository at this point in the history
Signed-off-by: Arpit Bandejiya <[email protected]>
  • Loading branch information
Arpit-Bandejiya committed Jul 11, 2024
1 parent 8105d30 commit e607370
Show file tree
Hide file tree
Showing 8 changed files with 33 additions and 76 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add allowlist setting for ingest-common and search-pipeline-common processors ([#14439](https://github.com/opensearch-project/OpenSearch/issues/14439))
- Create SystemIndexRegistry with helper method matchesSystemIndex ([#14415](https://github.com/opensearch-project/OpenSearch/pull/14415))
- Print reason why parent task was cancelled ([#14604](https://github.com/opensearch-project/OpenSearch/issues/14604))
- Refactor remote-routing-table service inline with remote state interfaces([#14668](https://github.com/opensearch-project/OpenSearch/pull/14668))

### Dependencies
- Bump `org.gradle.test-retry` from 1.5.8 to 1.5.9 ([#13442](https://github.com/opensearch-project/OpenSearch/pull/13442))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,34 +57,26 @@ public class InternalRemoteRoutingTableService extends AbstractLifecycleComponen
private static final Logger logger = LogManager.getLogger(InternalRemoteRoutingTableService.class);
private final Settings settings;
private final Supplier<RepositoriesService> repositoriesService;
private final Compressor compressor;
private final RemoteWritableEntityStore<IndexRoutingTable, RemoteIndexRoutingTable> remoteIndexRoutingTableStore;
private Compressor compressor;
private RemoteWritableEntityStore<IndexRoutingTable, RemoteIndexRoutingTable> remoteIndexRoutingTableStore;
private final ClusterSettings clusterSettings;
private BlobStoreRepository blobStoreRepository;
private final ThreadPool threadPool;
private final String clusterName;

public InternalRemoteRoutingTableService(
Supplier<RepositoriesService> repositoriesService,
Settings settings,
ClusterSettings clusterSettings,
ThreadPool threadpool,
Compressor compressor,
BlobStoreTransferService blobStoreTransferService,
BlobStoreRepository blobStoreRepository,
String clusterName
) {
assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled";
this.repositoriesService = repositoriesService;
this.settings = settings;
this.threadPool = threadpool;
this.compressor = compressor;
this.remoteIndexRoutingTableStore = new RemoteRoutingTableBlobStore<>(
blobStoreTransferService,
blobStoreRepository,
clusterName,
threadpool,
ThreadPool.Names.REMOTE_STATE_READ,
clusterSettings
);
this.clusterName = clusterName;
this.clusterSettings = clusterSettings;
}

public List<IndexRoutingTable> getIndicesRouting(RoutingTable routingTable) {
Expand Down Expand Up @@ -211,6 +203,16 @@ protected void doStart() {
final Repository repository = repositoriesService.get().repository(remoteStoreRepo);
assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository";
blobStoreRepository = (BlobStoreRepository) repository;
compressor = blobStoreRepository.getCompressor();

this.remoteIndexRoutingTableStore = new RemoteRoutingTableBlobStore<>(
new BlobStoreTransferService(blobStoreRepository.blobStore(), threadPool),
blobStoreRepository,
clusterName,
threadPool,
ThreadPool.Names.REMOTE_STATE_READ,
clusterSettings
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
*/
public interface RemoteRoutingTableService extends LifecycleComponent {
public static final DiffableUtils.NonDiffableValueSerializer<String, IndexRoutingTable> CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER =
new DiffableUtils.NonDiffableValueSerializer<>() {
new DiffableUtils.NonDiffableValueSerializer<String, IndexRoutingTable>() {
@Override
public void write(IndexRoutingTable value, StreamOutput out) throws IOException {
value.writeTo(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@

import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.compress.Compressor;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;

import java.util.function.Supplier;
Expand All @@ -38,22 +35,10 @@ public static RemoteRoutingTableService getService(
Settings settings,
ClusterSettings clusterSettings,
ThreadPool threadPool,
Compressor compressor,
BlobStoreTransferService blobStoreTransferService,
BlobStoreRepository blobStoreRepository,
String clusterName
) {
if (isRemoteRoutingTableEnabled(settings)) {
return new InternalRemoteRoutingTableService(
repositoriesService,
settings,
clusterSettings,
threadPool,
compressor,
blobStoreTransferService,
blobStoreRepository,
clusterName
);
return new InternalRemoteRoutingTableService(repositoriesService, settings, clusterSettings, threadPool, clusterName);
}
return new NoopRemoteRoutingTableService();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ public BlobPathParameters(final List<String> pathTokens, final String filePrefix
this.filePrefix = filePrefix;
}

public BlobPathParameters(final List<String> pathTokens) {
this(pathTokens, null);
}

public List<String> getPathTokens() {
return pathTokens;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@
import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadata.TEMPLATES_METADATA;
import static org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA;
import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_METADATA_PREFIX;
import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;

/**
Expand Down Expand Up @@ -201,6 +200,14 @@ public RemoteClusterStateService(
this.isPublicationEnabled = FeatureFlags.isEnabled(REMOTE_PUBLICATION_EXPERIMENTAL)
&& RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled(settings)
&& RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled(settings);
this.remoteRoutingTableService = RemoteRoutingTableServiceFactory.getService(
repositoriesService,
settings,
clusterSettings,
threadpool,
ClusterName.CLUSTER_NAME_SETTING.get(settings).value()
);
remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService, remoteRoutingTableService);
}

/**
Expand Down Expand Up @@ -657,7 +664,7 @@ UploadedMetadataResults writeMetadataInParallel(
});
indicesRoutingToUpload.forEach(indexRoutingTable -> {
uploadTasks.put(
String.join(CUSTOM_DELIMITER, INDEX_ROUTING_TABLE, indexRoutingTable.getIndex().getName()),
INDEX_ROUTING_METADATA_PREFIX + indexRoutingTable.getIndex().getName(),
remoteRoutingTableService.getAsyncIndexRoutingWriteAction(
clusterState.metadata().clusterUUID(),
clusterState.term(),
Expand Down Expand Up @@ -889,18 +896,7 @@ public void start() {
assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository";
blobStoreRepository = (BlobStoreRepository) repository;
String clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings).value();

blobStoreTransferService = new BlobStoreTransferService(getBlobStore(), threadpool);
this.remoteRoutingTableService = RemoteRoutingTableServiceFactory.getService(
repositoriesService,
settings,
clusterSettings,
threadpool,
blobStoreRepository.getCompressor(),
blobStoreTransferService,
blobStoreRepository,
ClusterName.CLUSTER_NAME_SETTING.get(settings).value()
);

remoteGlobalMetadataManager = new RemoteGlobalMetadataManager(
clusterSettings,
Expand Down Expand Up @@ -932,10 +928,9 @@ public void start() {
namedWriteableRegistry,
threadpool
);
remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService, remoteRoutingTableService);

remoteClusterStateCleanupManager.start();
remoteRoutingTableService.start();
remoteClusterStateCleanupManager.start();
}

private void setSlowWriteLoggingThreshold(TimeValue slowWriteLoggingThreshold) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,7 @@
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.compress.Compressor;
import org.opensearch.core.compress.NoneCompressor;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.fs.FsRepository;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
Expand All @@ -26,7 +22,6 @@

import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.mockito.Mockito.mock;

public class RemoteRoutingTableServiceFactoryTests extends OpenSearchTestCase {

Expand All @@ -41,17 +36,11 @@ public void teardown() throws Exception {

public void testGetServiceWhenRemoteRoutingDisabled() {
Settings settings = Settings.builder().build();
BlobStoreRepository blobStoreRepository = mock(BlobStoreRepository.class);
Compressor compressor = new NoneCompressor();
BlobStoreTransferService blobStoreTransferService = mock(BlobStoreTransferService.class);
RemoteRoutingTableService service = RemoteRoutingTableServiceFactory.getService(
repositoriesService,
settings,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
threadPool,
compressor,
blobStoreTransferService,
blobStoreRepository,
"test-cluster"
);
assertTrue(service instanceof NoopRemoteRoutingTableService);
Expand All @@ -62,19 +51,13 @@ public void testGetServiceWhenRemoteRoutingEnabled() {
.put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, "routing_repository")
.put(FsRepository.REPOSITORIES_COMPRESS_SETTING.getKey(), false)
.build();
BlobStoreRepository blobStoreRepository = mock(BlobStoreRepository.class);
Compressor compressor = new NoneCompressor();
BlobStoreTransferService blobStoreTransferService = mock(BlobStoreTransferService.class);
Settings nodeSettings = Settings.builder().put(REMOTE_PUBLICATION_EXPERIMENTAL, "true").build();
FeatureFlags.initializeFeatureFlags(nodeSettings);
RemoteRoutingTableService service = RemoteRoutingTableServiceFactory.getService(
repositoriesService,
settings,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
threadPool,
compressor,
blobStoreTransferService,
blobStoreRepository,
"test-cluster"
);
assertTrue(service instanceof InternalRemoteRoutingTableService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,9 @@ public void setup() {
when(blobStoreRepository.getCompressor()).thenReturn(new DeflateCompressor());
blobStore = mock(BlobStore.class);
blobContainer = mock(BlobContainer.class);
when(repositoriesService.repository("routing_repository")).thenReturn(blobStoreRepository);
when(repositoriesService.repository(anyString())).thenReturn(blobStoreRepository);
when(blobStoreRepository.blobStore()).thenReturn(blobStore);
when(blobStore.blobContainer(any())).thenReturn(blobContainer);
Settings nodeSettings = Settings.builder().put(REMOTE_PUBLICATION_EXPERIMENTAL, "true").build();
FeatureFlags.initializeFeatureFlags(nodeSettings);
compressor = new NoneCompressor();
Expand All @@ -125,12 +126,9 @@ public void setup() {
settings,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
threadPool,
compressor,
blobStoreTransferService,
blobStoreRepository,
"test-cluster"
);

remoteRoutingTableService.doStart();
}

@After
Expand All @@ -149,9 +147,6 @@ public void testFailInitializationWhenRemoteRoutingDisabled() {
settings,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
threadPool,
compressor,
blobStoreTransferService,
blobStoreRepository,
"test-cluster"
)
);
Expand Down Expand Up @@ -532,7 +527,7 @@ public void testGetAsyncIndexRoutingReadAction() throws Exception {
String indexName = randomAlphaOfLength(randomIntBetween(1, 50));
ClusterState clusterState = createClusterState(indexName);
String uploadedFileName = String.format(Locale.ROOT, "index-routing/" + indexName);
when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn(
when(blobContainer.readBlob(indexName)).thenReturn(
INDEX_ROUTING_TABLE_FORMAT.serialize(
clusterState.getRoutingTable().getIndicesRouting().get(indexName),
uploadedFileName,
Expand Down

0 comments on commit e607370

Please sign in to comment.