Skip to content

Commit

Permalink
Add separate test class for remote segment directory with pinned time…
Browse files Browse the repository at this point in the history
…stamps

Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
Sachin Kale committed Aug 26, 2024
1 parent ff823f0 commit b331311
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
import org.opensearch.common.io.VersionedCodecStreamWrapper;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.index.Index;
Expand All @@ -40,7 +38,6 @@
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.test.MockLogAppender;
import org.opensearch.test.junit.annotations.TestLogging;
import org.opensearch.threadpool.ThreadPool;
Expand All @@ -64,7 +61,6 @@

import static org.opensearch.index.store.RemoteSegmentStoreDirectory.METADATA_FILES_TO_FETCH;
import static org.opensearch.index.store.RemoteSegmentStoreDirectory.MetadataFilenameUtils.SEPARATOR;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED;
import static org.opensearch.test.RemoteStoreTestUtils.createMetadataFileBytes;
import static org.opensearch.test.RemoteStoreTestUtils.getDummyMetadata;
import static org.hamcrest.CoreMatchers.is;
Expand Down Expand Up @@ -1145,75 +1141,6 @@ public void testMetadataFileNameOrder() {
assertEquals(14, count);
}

private void setupRemotePinnedTimestampFeature(boolean enabled) {
RemoteStoreSettings remoteStoreSettings = new RemoteStoreSettings(
Settings.builder().put(CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), enabled).build(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
);
}

public void testInitializeToSpecificTimestampNoMetadataFiles() throws IOException {
setupRemotePinnedTimestampFeature(true);
when(
remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX,
Integer.MAX_VALUE
)
).thenReturn(new ArrayList<>());
assertNull(remoteSegmentStoreDirectory.initializeToSpecificTimestamp(1234L));
setupRemotePinnedTimestampFeature(false);
}

public void testInitializeToSpecificTimestampNoMdMatchingTimestamp() throws IOException {
setupRemotePinnedTimestampFeature(true);
String metadataPrefix = "metadata__1__2__3__4__5__";
List<String> metadataFiles = new ArrayList<>();
metadataFiles.add(metadataPrefix + RemoteStoreUtils.invertLong(2000));
metadataFiles.add(metadataPrefix + RemoteStoreUtils.invertLong(3000));
metadataFiles.add(metadataPrefix + RemoteStoreUtils.invertLong(4000));

when(
remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX,
Integer.MAX_VALUE
)
).thenReturn(metadataFiles);
assertNull(remoteSegmentStoreDirectory.initializeToSpecificTimestamp(1234L));
setupRemotePinnedTimestampFeature(false);
}

public void testInitializeToSpecificTimestampMatchingMdFile() throws IOException {
setupRemotePinnedTimestampFeature(true);
String metadataPrefix = "metadata__1__2__3__4__5__";
List<String> metadataFiles = new ArrayList<>();
metadataFiles.add(metadataPrefix + RemoteStoreUtils.invertLong(1000));
metadataFiles.add(metadataPrefix + RemoteStoreUtils.invertLong(2000));
metadataFiles.add(metadataPrefix + RemoteStoreUtils.invertLong(3000));

Map<String, String> metadata = new HashMap<>();
metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234::512::" + Version.LATEST.major);
metadata.put("_0.cfs", "_0.cfs::_0.cfs__" + UUIDs.base64UUID() + "::2345::1024::" + Version.LATEST.major);

when(
remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX,
Integer.MAX_VALUE
)
).thenReturn(metadataFiles);
when(remoteMetadataDirectory.getBlobStream(metadataPrefix + RemoteStoreUtils.invertLong(1000))).thenReturn(
createMetadataFileBytes(metadata, indexShard.getLatestReplicationCheckpoint(), segmentInfos)
);

RemoteSegmentMetadata remoteSegmentMetadata = remoteSegmentStoreDirectory.initializeToSpecificTimestamp(1234L);
assertNotNull(remoteSegmentMetadata);
Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> uploadedSegments = remoteSegmentStoreDirectory
.getSegmentsUploadedToRemoteStore();
assertEquals(2, uploadedSegments.size());
assertTrue(uploadedSegments.containsKey("_0.cfe"));
assertTrue(uploadedSegments.containsKey("_0.cfs"));
setupRemotePinnedTimestampFeature(false);
}

private static class WrapperIndexOutput extends IndexOutput {
public IndexOutput indexOutput;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.store;

import org.apache.lucene.util.Version;
import org.opensearch.common.UUIDs;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.action.ActionListener;
import org.opensearch.gateway.remote.model.RemoteStorePinnedTimestampsBlobStore;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.node.Node;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.junit.Before;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;

import org.mockito.Mockito;

import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED;
import static org.opensearch.test.RemoteStoreTestUtils.createMetadataFileBytes;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class RemoteSegmentStoreDirectoryWithPinnedTimestampTests extends RemoteSegmentStoreDirectoryTests {

@Before
public void setupPinnedTimestamp() throws IOException {
RemoteStoreSettings remoteStoreSettings = new RemoteStoreSettings(
Settings.builder().put(CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true).build(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
);

Supplier<RepositoriesService> repositoriesServiceSupplier = mock(Supplier.class);
Settings settings = Settings.builder()
.put(Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, "remote-repo")
.build();
RepositoriesService repositoriesService = mock(RepositoriesService.class);
when(repositoriesServiceSupplier.get()).thenReturn(repositoriesService);
BlobStoreRepository blobStoreRepository = mock(BlobStoreRepository.class);
when(repositoriesService.repository("remote-repo")).thenReturn(blobStoreRepository);

when(threadPool.schedule(any(), any(), any())).then(invocationOnMock -> {
Runnable runnable = invocationOnMock.getArgument(0);
runnable.run();
return null;
}).then(subsequentInvocationsOnMock -> null);

RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = new RemoteStorePinnedTimestampService(
repositoriesServiceSupplier,
settings,
threadPool,
clusterService
);
RemoteStorePinnedTimestampService remoteStorePinnedTimestampServiceSpy = Mockito.spy(remoteStorePinnedTimestampService);

RemoteStorePinnedTimestampsBlobStore remoteStorePinnedTimestampsBlobStore = mock(RemoteStorePinnedTimestampsBlobStore.class);
BlobStoreTransferService blobStoreTransferService = mock(BlobStoreTransferService.class);
when(remoteStorePinnedTimestampServiceSpy.pinnedTimestampsBlobStore()).thenReturn(remoteStorePinnedTimestampsBlobStore);
when(remoteStorePinnedTimestampServiceSpy.blobStoreTransferService()).thenReturn(blobStoreTransferService);

doAnswer(invocationOnMock -> {
ActionListener<List<BlobMetadata>> actionListener = invocationOnMock.getArgument(3);
actionListener.onResponse(new ArrayList<>());
return null;
}).when(blobStoreTransferService).listAllInSortedOrder(any(), any(), eq(1), any());

remoteStorePinnedTimestampServiceSpy.start();
}

public void testInitializeToSpecificTimestampNoMetadataFiles() throws IOException {
when(
remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX,
Integer.MAX_VALUE
)
).thenReturn(new ArrayList<>());
assertNull(remoteSegmentStoreDirectory.initializeToSpecificTimestamp(1234L));
}

public void testInitializeToSpecificTimestampNoMdMatchingTimestamp() throws IOException {
String metadataPrefix = "metadata__1__2__3__4__5__";
List<String> metadataFiles = new ArrayList<>();
metadataFiles.add(metadataPrefix + RemoteStoreUtils.invertLong(2000));
metadataFiles.add(metadataPrefix + RemoteStoreUtils.invertLong(3000));
metadataFiles.add(metadataPrefix + RemoteStoreUtils.invertLong(4000));

when(
remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX,
Integer.MAX_VALUE
)
).thenReturn(metadataFiles);
assertNull(remoteSegmentStoreDirectory.initializeToSpecificTimestamp(1234L));
}

public void testInitializeToSpecificTimestampMatchingMdFile() throws IOException {
String metadataPrefix = "metadata__1__2__3__4__5__";
List<String> metadataFiles = new ArrayList<>();
metadataFiles.add(metadataPrefix + RemoteStoreUtils.invertLong(1000));
metadataFiles.add(metadataPrefix + RemoteStoreUtils.invertLong(2000));
metadataFiles.add(metadataPrefix + RemoteStoreUtils.invertLong(3000));

Map<String, String> metadata = new HashMap<>();
metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234::512::" + Version.LATEST.major);
metadata.put("_0.cfs", "_0.cfs::_0.cfs__" + UUIDs.base64UUID() + "::2345::1024::" + Version.LATEST.major);

when(
remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX,
Integer.MAX_VALUE
)
).thenReturn(metadataFiles);
when(remoteMetadataDirectory.getBlobStream(metadataPrefix + RemoteStoreUtils.invertLong(1000))).thenReturn(
createMetadataFileBytes(metadata, indexShard.getLatestReplicationCheckpoint(), segmentInfos)
);

RemoteSegmentMetadata remoteSegmentMetadata = remoteSegmentStoreDirectory.initializeToSpecificTimestamp(1234L);
assertNotNull(remoteSegmentMetadata);
Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> uploadedSegments = remoteSegmentStoreDirectory
.getSegmentsUploadedToRemoteStore();
assertEquals(2, uploadedSegments.size());
assertTrue(uploadedSegments.containsKey("_0.cfe"));
assertTrue(uploadedSegments.containsKey("_0.cfs"));
}
}

0 comments on commit b331311

Please sign in to comment.