Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Arpit Bandejiya <[email protected]>
  • Loading branch information
Arpit-Bandejiya committed Jul 15, 2024
1 parent 1d35d5f commit c11bee0
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ public class RemoteClusterStateService implements Closeable {
private RemoteClusterStateAttributesManager remoteClusterStateAttributesManager;
private RemoteManifestManager remoteManifestManager;
private ClusterSettings clusterSettings;
private final ClusterService clusterService;
private final NamedWriteableRegistry namedWriteableRegistry;
private final String CLUSTER_STATE_UPLOAD_TIME_LOG_STRING = "writing cluster state for version [{}] took [{}ms]";
private final String METADATA_UPDATE_LOG_STRING = "wrote metadata for [{}] indices and skipped [{}] unchanged "
Expand Down Expand Up @@ -196,7 +195,6 @@ public RemoteClusterStateService(
this.remoteStateStats = new RemotePersistenceStats();
this.namedWriteableRegistry = namedWriteableRegistry;
this.indexMetadataUploadListeners = indexMetadataUploadListeners;
this.clusterService = clusterService;
this.isPublicationEnabled = FeatureFlags.isEnabled(REMOTE_PUBLICATION_EXPERIMENTAL)
&& RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled(settings)
&& RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled(settings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,11 @@ public String getClusterName() {
return clusterName;
}

public BlobPath getBasePath() {
return blobStoreRepository.basePath();
}

public BlobPath getBlobPathPrefix(String clusterUUID) {
return getBasePath().add(RemoteClusterStateUtils.encodeString(getClusterName())).add(CLUSTER_STATE_PATH_TOKEN).add(clusterUUID);
return blobStoreRepository.basePath()
.add(RemoteClusterStateUtils.encodeString(getClusterName()))
.add(CLUSTER_STATE_PATH_TOKEN)
.add(clusterUUID);
}

public BlobPath getBlobPathForUpload(final AbstractRemoteWritableBlobEntity<T> obj) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,13 @@ private void setPathHashAlgoSetting(RemoteStoreEnums.PathHashAlgorithm pathHashA
this.pathHashAlgo = pathHashAlgo;
}

// For testing only
public RemoteStoreEnums.PathType getPathTypeSetting() {
return pathType;
}

// For testing only
public RemoteStoreEnums.PathHashAlgorithm getPathHashAlgoSetting() {
return pathHashAlgo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@
public class RemoteIndexRoutingTable extends AbstractRemoteWritableBlobEntity<IndexRoutingTable> {

public static final String INDEX_ROUTING_TABLE = "index-routing";
public static final String INDEX_ROUTING_TABLE_PREFIX = "index-routing--";
public static final String INDEX_ROUTING_METADATA_PREFIX = "indexRouting--";
public static final String INDEX_ROUTING_FILE = "index_routing";
private IndexRoutingTable indexRoutingTable;
private final Index index;
private long term;
private long version;
private BlobPathParameters blobPathParameters;
public static final ChecksumWritableBlobStoreFormat<IndexRoutingTable> INDEX_ROUTING_TABLE_FORMAT =
new ChecksumWritableBlobStoreFormat<>("index-routing-table", IndexRoutingTable::readFrom);

Expand Down Expand Up @@ -68,13 +68,12 @@ public RemoteIndexRoutingTable(String blobName, String clusterUUID, Compressor c
this.blobName = blobName;
}

public Index getIndex() {
return index;
}

@Override
public BlobPathParameters getBlobPathParameters() {
return new BlobPathParameters(List.of(indexRoutingTable.getIndex().getUUID()), INDEX_ROUTING_FILE);
if (blobPathParameters == null) {
blobPathParameters = new BlobPathParameters(List.of(indexRoutingTable.getIndex().getUUID()), INDEX_ROUTING_FILE);
}
return blobPathParameters;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_METADATA_PREFIX;
import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE;
import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE_FORMAT;
import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.mockito.ArgumentMatchers.anyIterable;
Expand Down Expand Up @@ -359,7 +358,7 @@ public void testGetAllUploadedIndicesRouting() {
"test-index",
"index-uuid",
"index-filename",
INDEX_ROUTING_TABLE_PREFIX
INDEX_ROUTING_METADATA_PREFIX
);

List<ClusterMetadataManifest.UploadedIndexMetadata> allIndiceRoutingMetadata = remoteRoutingTableService
Expand All @@ -374,7 +373,7 @@ public void testGetAllUploadedIndicesRoutingExistingIndexInManifest() {
"test-index",
"index-uuid",
"index-filename",
INDEX_ROUTING_TABLE_PREFIX
INDEX_ROUTING_METADATA_PREFIX
);
final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder()
.indicesRouting(List.of(uploadedIndexMetadata))
Expand All @@ -392,7 +391,7 @@ public void testGetAllUploadedIndicesRoutingNewIndexFromManifest() {
"test-index",
"index-uuid",
"index-filename",
INDEX_ROUTING_TABLE_PREFIX
INDEX_ROUTING_METADATA_PREFIX
);
final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder()
.indicesRouting(List.of(uploadedIndexMetadata))
Expand All @@ -401,7 +400,7 @@ public void testGetAllUploadedIndicesRoutingNewIndexFromManifest() {
"test-index2",
"index-uuid",
"index-filename",
INDEX_ROUTING_TABLE_PREFIX
INDEX_ROUTING_METADATA_PREFIX
);

List<ClusterMetadataManifest.UploadedIndexMetadata> allIndiceRoutingMetadata = remoteRoutingTableService
Expand All @@ -417,13 +416,13 @@ public void testGetAllUploadedIndicesRoutingIndexDeleted() {
"test-index",
"index-uuid",
"index-filename",
INDEX_ROUTING_TABLE_PREFIX
INDEX_ROUTING_METADATA_PREFIX
);
final ClusterMetadataManifest.UploadedIndexMetadata uploadedIndexMetadata2 = new ClusterMetadataManifest.UploadedIndexMetadata(
"test-index2",
"index-uuid",
"index-filename",
INDEX_ROUTING_TABLE_PREFIX
INDEX_ROUTING_METADATA_PREFIX
);
final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder()
.indicesRouting(List.of(uploadedIndexMetadata, uploadedIndexMetadata2))
Expand All @@ -441,13 +440,13 @@ public void testGetAllUploadedIndicesRoutingNoChange() {
"test-index",
"index-uuid",
"index-filename",
INDEX_ROUTING_TABLE_PREFIX
INDEX_ROUTING_METADATA_PREFIX
);
final ClusterMetadataManifest.UploadedIndexMetadata uploadedIndexMetadata2 = new ClusterMetadataManifest.UploadedIndexMetadata(
"test-index2",
"index-uuid",
"index-filename",
INDEX_ROUTING_TABLE_PREFIX
INDEX_ROUTING_METADATA_PREFIX
);
final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder()
.indicesRouting(List.of(uploadedIndexMetadata, uploadedIndexMetadata2))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote.model;

import org.opensearch.Version;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.compress.DeflateCompressor;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.index.Index;
import org.opensearch.gateway.remote.RemoteClusterStateUtils;
import org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable;
import org.opensearch.index.remote.RemoteStoreEnums;
import org.opensearch.index.remote.RemoteStorePathStrategy;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;

import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CLUSTER_STATE_PATH_TOKEN;
import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE;
import static org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm.FNV_1A_BASE64;
import static org.opensearch.index.remote.RemoteStoreEnums.PathType.HASHED_PREFIX;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class RemoteRoutingTableBlobStoreTests extends OpenSearchTestCase {

private RemoteRoutingTableBlobStore<IndexRoutingTable, RemoteIndexRoutingTable> remoteIndexRoutingTableStore;
ClusterSettings clusterSettings;
ThreadPool threadPool;

@Before
public void setup() {
clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
BlobStoreTransferService blobStoreTransferService = mock(BlobStoreTransferService.class);
BlobStoreRepository blobStoreRepository = mock(BlobStoreRepository.class);
BlobPath blobPath = new BlobPath().add("base-path");
when(blobStoreRepository.basePath()).thenReturn(blobPath);

threadPool = new TestThreadPool(getClass().getName());
this.remoteIndexRoutingTableStore = new RemoteRoutingTableBlobStore<>(
blobStoreTransferService,
blobStoreRepository,
"test-cluster",
threadPool,
ThreadPool.Names.REMOTE_STATE_READ,
clusterSettings
);
}

@After
public void tearDown() throws Exception {
super.tearDown();
threadPool.shutdown();

}

public void testRemoteRoutingTablePathTypeSetting() {
// Assert the default is HASHED_PREFIX
assertEquals(HASHED_PREFIX.toString(), remoteIndexRoutingTableStore.getPathTypeSetting().toString());

Settings newSettings = Settings.builder()
.put("cluster.remote_store.routing_table.path_type", RemoteStoreEnums.PathType.FIXED.toString())
.build();
clusterSettings.applySettings(newSettings);
assertEquals(RemoteStoreEnums.PathType.FIXED.toString(), remoteIndexRoutingTableStore.getPathTypeSetting().toString());
}

public void testRemoteRoutingTableHashAlgoSetting() {
// Assert the default is FNV_1A_BASE64
assertEquals(FNV_1A_BASE64.toString(), remoteIndexRoutingTableStore.getPathHashAlgoSetting().toString());

Settings newSettings = Settings.builder()
.put("cluster.remote_store.routing_table.path_hash_algo", RemoteStoreEnums.PathHashAlgorithm.FNV_1A_COMPOSITE_1.toString())
.build();
clusterSettings.applySettings(newSettings);
assertEquals(
RemoteStoreEnums.PathHashAlgorithm.FNV_1A_COMPOSITE_1.toString(),
remoteIndexRoutingTableStore.getPathHashAlgoSetting().toString()
);
}

public void testGetBlobPathForUpload() {

Index index = new Index("test-idx", "index-uuid");
Settings idxSettings = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID())
.build();

IndexMetadata indexMetadata = new IndexMetadata.Builder(index.getName()).settings(idxSettings)
.numberOfShards(1)
.numberOfReplicas(0)
.build();

IndexRoutingTable indexRoutingTable = new IndexRoutingTable.Builder(index).initializeAsNew(indexMetadata).build();

RemoteIndexRoutingTable remoteObjectForUpload = new RemoteIndexRoutingTable(
indexRoutingTable,
"cluster-uuid",
new DeflateCompressor(),
2L,
3L
);
BlobPath blobPath = remoteIndexRoutingTableStore.getBlobPathForUpload(remoteObjectForUpload);
BlobPath expectedPath = HASHED_PREFIX.path(
RemoteStorePathStrategy.PathInput.builder()
.basePath(
new BlobPath().add("base-path")
.add(RemoteClusterStateUtils.encodeString("test-cluster"))
.add(CLUSTER_STATE_PATH_TOKEN)
.add("cluster-uuid")
.add(INDEX_ROUTING_TABLE)
)
.indexUUID(index.getUUID())
.build(),
FNV_1A_BASE64
);
assertEquals(expectedPath, blobPath);
}
}

0 comments on commit c11bee0

Please sign in to comment.