Skip to content

Commit

Permalink
Fix repo initialisation in remote routing table
Browse files Browse the repository at this point in the history
Signed-off-by: Arpit Bandejiya <[email protected]>
  • Loading branch information
Arpit-Bandejiya committed Jul 11, 2024
1 parent bce40bb commit c01bb1b
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,34 +57,26 @@ public class InternalRemoteRoutingTableService extends AbstractLifecycleComponen
private static final Logger logger = LogManager.getLogger(InternalRemoteRoutingTableService.class);
private final Settings settings;
private final Supplier<RepositoriesService> repositoriesService;
private final Compressor compressor;
private final RemoteWritableEntityStore<IndexRoutingTable, RemoteIndexRoutingTable> remoteIndexRoutingTableStore;
private Compressor compressor;
private RemoteWritableEntityStore<IndexRoutingTable, RemoteIndexRoutingTable> remoteIndexRoutingTableStore;
private final ClusterSettings clusterSettings;
private BlobStoreRepository blobStoreRepository;
private final ThreadPool threadPool;
private final String clusterName;

public InternalRemoteRoutingTableService(
Supplier<RepositoriesService> repositoriesService,
Settings settings,
ClusterSettings clusterSettings,
ThreadPool threadpool,
Compressor compressor,
BlobStoreTransferService blobStoreTransferService,
BlobStoreRepository blobStoreRepository,
String clusterName
) {
assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled";
this.repositoriesService = repositoriesService;
this.settings = settings;
this.threadPool = threadpool;
this.compressor = compressor;
this.remoteIndexRoutingTableStore = new RemoteRoutingTableBlobStore<>(
blobStoreTransferService,
blobStoreRepository,
clusterName,
threadpool,
ThreadPool.Names.REMOTE_STATE_READ,
clusterSettings
);
this.clusterName = clusterName;
this.clusterSettings = clusterSettings;
}

public List<IndexRoutingTable> getIndicesRouting(RoutingTable routingTable) {
Expand Down Expand Up @@ -211,6 +203,16 @@ protected void doStart() {
final Repository repository = repositoriesService.get().repository(remoteStoreRepo);
assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository";
blobStoreRepository = (BlobStoreRepository) repository;
compressor = blobStoreRepository.getCompressor();

this.remoteIndexRoutingTableStore = new RemoteRoutingTableBlobStore<>(
new BlobStoreTransferService(blobStoreRepository.blobStore(), threadPool),
blobStoreRepository,
clusterName,
threadPool,
ThreadPool.Names.REMOTE_STATE_READ,
clusterSettings
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@

import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.compress.Compressor;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;

import java.util.function.Supplier;
Expand All @@ -38,22 +35,10 @@ public static RemoteRoutingTableService getService(
Settings settings,
ClusterSettings clusterSettings,
ThreadPool threadPool,
Compressor compressor,
BlobStoreTransferService blobStoreTransferService,
BlobStoreRepository blobStoreRepository,
String clusterName
) {
if (isRemoteRoutingTableEnabled(settings)) {
return new InternalRemoteRoutingTableService(
repositoriesService,
settings,
clusterSettings,
threadPool,
compressor,
blobStoreTransferService,
blobStoreRepository,
clusterName
);
return new InternalRemoteRoutingTableService(repositoriesService, settings, clusterSettings, threadPool, clusterName);
}
return new NoopRemoteRoutingTableService();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,14 @@ public RemoteClusterStateService(
this.isPublicationEnabled = FeatureFlags.isEnabled(REMOTE_PUBLICATION_EXPERIMENTAL)
&& RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled(settings)
&& RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled(settings);
this.remoteRoutingTableService = RemoteRoutingTableServiceFactory.getService(
repositoriesService,
settings,
clusterSettings,
threadpool,
ClusterName.CLUSTER_NAME_SETTING.get(settings).value()
);
remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService, remoteRoutingTableService);
}

/**
Expand Down Expand Up @@ -891,16 +899,6 @@ public void start() {
String clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings).value();

blobStoreTransferService = new BlobStoreTransferService(getBlobStore(), threadpool);
this.remoteRoutingTableService = RemoteRoutingTableServiceFactory.getService(
repositoriesService,
settings,
clusterSettings,
threadpool,
blobStoreRepository.getCompressor(),
blobStoreTransferService,
blobStoreRepository,
ClusterName.CLUSTER_NAME_SETTING.get(settings).value()
);

remoteGlobalMetadataManager = new RemoteGlobalMetadataManager(
clusterSettings,
Expand Down Expand Up @@ -932,7 +930,6 @@ public void start() {
namedWriteableRegistry,
threadpool
);
remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService, remoteRoutingTableService);

remoteClusterStateCleanupManager.start();
remoteRoutingTableService.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,7 @@
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.compress.Compressor;
import org.opensearch.core.compress.NoneCompressor;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.fs.FsRepository;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
Expand All @@ -26,7 +22,6 @@

import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.mockito.Mockito.mock;

public class RemoteRoutingTableServiceFactoryTests extends OpenSearchTestCase {

Expand All @@ -41,17 +36,11 @@ public void teardown() throws Exception {

public void testGetServiceWhenRemoteRoutingDisabled() {
Settings settings = Settings.builder().build();
BlobStoreRepository blobStoreRepository = mock(BlobStoreRepository.class);
Compressor compressor = new NoneCompressor();
BlobStoreTransferService blobStoreTransferService = mock(BlobStoreTransferService.class);
RemoteRoutingTableService service = RemoteRoutingTableServiceFactory.getService(
repositoriesService,
settings,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
threadPool,
compressor,
blobStoreTransferService,
blobStoreRepository,
"test-cluster"
);
assertTrue(service instanceof NoopRemoteRoutingTableService);
Expand All @@ -62,19 +51,13 @@ public void testGetServiceWhenRemoteRoutingEnabled() {
.put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, "routing_repository")
.put(FsRepository.REPOSITORIES_COMPRESS_SETTING.getKey(), false)
.build();
BlobStoreRepository blobStoreRepository = mock(BlobStoreRepository.class);
Compressor compressor = new NoneCompressor();
BlobStoreTransferService blobStoreTransferService = mock(BlobStoreTransferService.class);
Settings nodeSettings = Settings.builder().put(REMOTE_PUBLICATION_EXPERIMENTAL, "true").build();
FeatureFlags.initializeFeatureFlags(nodeSettings);
RemoteRoutingTableService service = RemoteRoutingTableServiceFactory.getService(
repositoriesService,
settings,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
threadPool,
compressor,
blobStoreTransferService,
blobStoreRepository,
"test-cluster"
);
assertTrue(service instanceof InternalRemoteRoutingTableService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,9 @@ public void setup() {
when(blobStoreRepository.getCompressor()).thenReturn(new DeflateCompressor());
blobStore = mock(BlobStore.class);
blobContainer = mock(BlobContainer.class);
when(repositoriesService.repository("routing_repository")).thenReturn(blobStoreRepository);
when(repositoriesService.repository(anyString())).thenReturn(blobStoreRepository);
when(blobStoreRepository.blobStore()).thenReturn(blobStore);
when(blobStore.blobContainer(any())).thenReturn(blobContainer);
Settings nodeSettings = Settings.builder().put(REMOTE_PUBLICATION_EXPERIMENTAL, "true").build();
FeatureFlags.initializeFeatureFlags(nodeSettings);
compressor = new NoneCompressor();
Expand All @@ -125,12 +126,9 @@ public void setup() {
settings,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
threadPool,
compressor,
blobStoreTransferService,
blobStoreRepository,
"test-cluster"
);

remoteRoutingTableService.doStart();
}

@After
Expand All @@ -149,9 +147,6 @@ public void testFailInitializationWhenRemoteRoutingDisabled() {
settings,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
threadPool,
compressor,
blobStoreTransferService,
blobStoreRepository,
"test-cluster"
)
);
Expand Down Expand Up @@ -532,7 +527,7 @@ public void testGetAsyncIndexRoutingReadAction() throws Exception {
String indexName = randomAlphaOfLength(randomIntBetween(1, 50));
ClusterState clusterState = createClusterState(indexName);
String uploadedFileName = String.format(Locale.ROOT, "index-routing/" + indexName);
when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn(
when(blobContainer.readBlob(indexName)).thenReturn(
INDEX_ROUTING_TABLE_FORMAT.serialize(
clusterState.getRoutingTable().getIndicesRouting().get(indexName),
uploadedFileName,
Expand Down

0 comments on commit c01bb1b

Please sign in to comment.