Skip to content

Commit

Permalink
Fix repo initialisation in remote routing table
Browse files Browse the repository at this point in the history
Signed-off-by: Arpit Bandejiya <[email protected]>
  • Loading branch information
Arpit-Bandejiya committed Jul 10, 2024
1 parent bce40bb commit 329930e
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,34 +57,26 @@ public class InternalRemoteRoutingTableService extends AbstractLifecycleComponen
private static final Logger logger = LogManager.getLogger(InternalRemoteRoutingTableService.class);
private final Settings settings;
private final Supplier<RepositoriesService> repositoriesService;
private final Compressor compressor;
private final RemoteWritableEntityStore<IndexRoutingTable, RemoteIndexRoutingTable> remoteIndexRoutingTableStore;
private Compressor compressor;
private RemoteWritableEntityStore<IndexRoutingTable, RemoteIndexRoutingTable> remoteIndexRoutingTableStore;
private final ClusterSettings clusterSettings;
private BlobStoreRepository blobStoreRepository;
private final ThreadPool threadPool;
private final String clusterName;

public InternalRemoteRoutingTableService(
Supplier<RepositoriesService> repositoriesService,
Settings settings,
ClusterSettings clusterSettings,
ThreadPool threadpool,
Compressor compressor,
BlobStoreTransferService blobStoreTransferService,
BlobStoreRepository blobStoreRepository,
String clusterName
) {
assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled";
this.repositoriesService = repositoriesService;
this.settings = settings;
this.threadPool = threadpool;
this.compressor = compressor;
this.remoteIndexRoutingTableStore = new RemoteRoutingTableBlobStore<>(
blobStoreTransferService,
blobStoreRepository,
clusterName,
threadpool,
ThreadPool.Names.REMOTE_STATE_READ,
clusterSettings
);
this.clusterName = clusterName;
this.clusterSettings = clusterSettings;
}

public List<IndexRoutingTable> getIndicesRouting(RoutingTable routingTable) {
Expand Down Expand Up @@ -211,6 +203,16 @@ protected void doStart() {
final Repository repository = repositoriesService.get().repository(remoteStoreRepo);
assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository";
blobStoreRepository = (BlobStoreRepository) repository;
compressor = blobStoreRepository.getCompressor();

this.remoteIndexRoutingTableStore = new RemoteRoutingTableBlobStore<>(
new BlobStoreTransferService(blobStoreRepository.blobStore(), threadPool),
blobStoreRepository,
clusterName,
threadPool,
ThreadPool.Names.REMOTE_STATE_READ,
clusterSettings
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@

import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.compress.Compressor;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;

import java.util.function.Supplier;
Expand All @@ -38,22 +35,10 @@ public static RemoteRoutingTableService getService(
Settings settings,
ClusterSettings clusterSettings,
ThreadPool threadPool,
Compressor compressor,
BlobStoreTransferService blobStoreTransferService,
BlobStoreRepository blobStoreRepository,
String clusterName
) {
if (isRemoteRoutingTableEnabled(settings)) {
return new InternalRemoteRoutingTableService(
repositoriesService,
settings,
clusterSettings,
threadPool,
compressor,
blobStoreTransferService,
blobStoreRepository,
clusterName
);
return new InternalRemoteRoutingTableService(repositoriesService, settings, clusterSettings, threadPool, clusterName);
}
return new NoopRemoteRoutingTableService();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,14 @@ public RemoteClusterStateService(
this.isPublicationEnabled = FeatureFlags.isEnabled(REMOTE_PUBLICATION_EXPERIMENTAL)
&& RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled(settings)
&& RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled(settings);
this.remoteRoutingTableService = RemoteRoutingTableServiceFactory.getService(
repositoriesService,
settings,
clusterSettings,
threadpool,
ClusterName.CLUSTER_NAME_SETTING.get(settings).value()
);
remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService, remoteRoutingTableService);
}

/**
Expand Down Expand Up @@ -891,16 +899,6 @@ public void start() {
String clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings).value();

blobStoreTransferService = new BlobStoreTransferService(getBlobStore(), threadpool);
this.remoteRoutingTableService = RemoteRoutingTableServiceFactory.getService(
repositoriesService,
settings,
clusterSettings,
threadpool,
blobStoreRepository.getCompressor(),
blobStoreTransferService,
blobStoreRepository,
ClusterName.CLUSTER_NAME_SETTING.get(settings).value()
);

remoteGlobalMetadataManager = new RemoteGlobalMetadataManager(
clusterSettings,
Expand Down Expand Up @@ -932,7 +930,6 @@ public void start() {
namedWriteableRegistry,
threadpool
);
remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService, remoteRoutingTableService);

remoteClusterStateCleanupManager.start();
remoteRoutingTableService.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,7 @@
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.compress.Compressor;
import org.opensearch.core.compress.NoneCompressor;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.fs.FsRepository;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
Expand All @@ -26,7 +22,6 @@

import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.mockito.Mockito.mock;

public class RemoteRoutingTableServiceFactoryTests extends OpenSearchTestCase {

Expand All @@ -41,17 +36,11 @@ public void teardown() throws Exception {

public void testGetServiceWhenRemoteRoutingDisabled() {
Settings settings = Settings.builder().build();
BlobStoreRepository blobStoreRepository = mock(BlobStoreRepository.class);
Compressor compressor = new NoneCompressor();
BlobStoreTransferService blobStoreTransferService = mock(BlobStoreTransferService.class);
RemoteRoutingTableService service = RemoteRoutingTableServiceFactory.getService(
repositoriesService,
settings,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
threadPool,
compressor,
blobStoreTransferService,
blobStoreRepository,
"test-cluster"
);
assertTrue(service instanceof NoopRemoteRoutingTableService);
Expand All @@ -62,19 +51,13 @@ public void testGetServiceWhenRemoteRoutingEnabled() {
.put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, "routing_repository")
.put(FsRepository.REPOSITORIES_COMPRESS_SETTING.getKey(), false)
.build();
BlobStoreRepository blobStoreRepository = mock(BlobStoreRepository.class);
Compressor compressor = new NoneCompressor();
BlobStoreTransferService blobStoreTransferService = mock(BlobStoreTransferService.class);
Settings nodeSettings = Settings.builder().put(REMOTE_PUBLICATION_EXPERIMENTAL, "true").build();
FeatureFlags.initializeFeatureFlags(nodeSettings);
RemoteRoutingTableService service = RemoteRoutingTableServiceFactory.getService(
repositoriesService,
settings,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
threadPool,
compressor,
blobStoreTransferService,
blobStoreRepository,
"test-cluster"
);
assertTrue(service instanceof InternalRemoteRoutingTableService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,6 @@ public void setup() {
settings,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
threadPool,
compressor,
blobStoreTransferService,
blobStoreRepository,
"test-cluster"
);

Expand All @@ -149,9 +146,6 @@ public void testFailInitializationWhenRemoteRoutingDisabled() {
settings,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
threadPool,
compressor,
blobStoreTransferService,
blobStoreRepository,
"test-cluster"
)
);
Expand Down

0 comments on commit 329930e

Please sign in to comment.