diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java new file mode 100644 index 0000000000000..ec9e98b1f81e0 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java @@ -0,0 +1,191 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.Lock; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobMetadata; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.NoSuchFileException; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +/** + * A {@code RemoteDirectory} provides an abstraction layer for storing a list of files to a remote store. + * A remoteDirectory contains only files (no sub-folder hierarchy). This class does not support all the methods in + * the Directory interface. Currently, it contains implementation of methods which are used to copy files to/from + * the remote store. Implementation of remaining methods will be added as remote store is integrated with + * replication, peer recovery etc. + */ +public class RemoteDirectory extends Directory { + + private final BlobContainer blobContainer; + + public RemoteDirectory(BlobContainer blobContainer) { + this.blobContainer = blobContainer; + } + + /** + * Returns names of all files stored in this directory. The output must be in sorted (UTF-16, + * java's {@link String#compareTo}) order. + */ + @Override + public String[] listAll() throws IOException { + return blobContainer.listBlobs().keySet().stream().sorted().toArray(String[]::new); + } + + /** + * Removes an existing file in the directory. + * + *

This method will not throw an exception when the file doesn't exist and simply ignores this case. + * This is a deviation from the {@code Directory} interface where it is expected to throw either + * {@link NoSuchFileException} or {@link FileNotFoundException} if {@code name} points to a non-existing file. + * + * @param name the name of an existing file. + * @throws IOException if the file exists but could not be deleted. + */ + @Override + public void deleteFile(String name) throws IOException { + // ToDo: Add a check for file existence + blobContainer.deleteBlobsIgnoringIfNotExists(Collections.singletonList(name)); + } + + /** + * Creates and returns a new instance of {@link RemoteIndexOutput} which will be used to copy files to the remote + * store. + * + *

In the {@link Directory} interface, it is expected to throw {@link java.nio.file.FileAlreadyExistsException} + * if the file already exists in the remote store. As this method does not open a file, it does not throw the + * exception. + * + * @param name the name of the file to copy to remote store. + */ + @Override + public IndexOutput createOutput(String name, IOContext context) { + return new RemoteIndexOutput(name, blobContainer); + } + + /** + * Opens a stream for reading an existing file and returns {@link RemoteIndexInput} enclosing the stream. + * + * @param name the name of an existing file. + * @throws IOException in case of I/O error + * @throws NoSuchFileException if the file does not exist + */ + @Override + public IndexInput openInput(String name, IOContext context) throws IOException { + return new RemoteIndexInput(name, blobContainer.readBlob(name), fileLength(name)); + } + + /** + * Closes the directory by deleting all the files in this directory + */ + @Override + public void close() throws IOException { + blobContainer.delete(); + } + + /** + * Returns the byte length of a file in the directory. + * + * @param name the name of an existing file. + * @throws IOException in case of I/O error + * @throws NoSuchFileException if the file does not exist + */ + @Override + public long fileLength(String name) throws IOException { + // ToDo: Instead of calling remote store each time, keep a cache with segment metadata + Map metadata = blobContainer.listBlobsByPrefix(name); + if (metadata.containsKey(name)) { + return metadata.get(name).length(); + } + throw new NoSuchFileException(name); + } + + /** + * Guaranteed to throw an exception and leave the directory unmodified. + * Once soft deleting is supported segment files in the remote store, this method will provide details of + * number of files marked as deleted but not actually deleted from the remote store. + * + * @throws UnsupportedOperationException always + */ + @Override + public Set getPendingDeletions() throws IOException { + throw new UnsupportedOperationException(); + } + + /** + * Guaranteed to throw an exception and leave the directory unmodified. + * Temporary IndexOutput is not required while working with Remote store. + * + * @throws UnsupportedOperationException always + */ + @Override + public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) { + throw new UnsupportedOperationException(); + } + + /** + * Guaranteed to throw an exception and leave the directory unmodified. + * Segment upload to the remote store will be permanent and does not require a separate sync API. + * This may change in the future if segment upload to remote store happens via cache and we need sync API to write + * the cache contents to the store permanently. + * + * @throws UnsupportedOperationException always + */ + @Override + public void sync(Collection names) throws IOException { + throw new UnsupportedOperationException(); + } + + /** + * Guaranteed to throw an exception and leave the directory unmodified. + * Once metadata to be stored with each shard is finalized, syncMetaData method will be used to sync the directory + * metadata to the remote store. + * + * @throws UnsupportedOperationException always + */ + @Override + public void syncMetaData() { + throw new UnsupportedOperationException(); + } + + /** + * Guaranteed to throw an exception and leave the directory unmodified. + * As this method is used by IndexWriter to publish commits, the implementation of this method is required when + * IndexWriter is backed by RemoteDirectory. + * + * @throws UnsupportedOperationException always + */ + @Override + public void rename(String source, String dest) throws IOException { + throw new UnsupportedOperationException(); + + } + + /** + * Guaranteed to throw an exception and leave the directory unmodified. + * Once locking segment files in remote store is supported, implementation of this method is required with + * remote store specific LockFactory. + * + * @throws UnsupportedOperationException always + */ + @Override + public Lock obtainLock(String name) throws IOException { + throw new UnsupportedOperationException(); + } +} diff --git a/server/src/main/java/org/opensearch/index/store/RemoteIndexInput.java b/server/src/main/java/org/opensearch/index/store/RemoteIndexInput.java new file mode 100644 index 0000000000000..bc5e5ca17c421 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/RemoteIndexInput.java @@ -0,0 +1,83 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.apache.lucene.store.IndexInput; + +import java.io.IOException; +import java.io.InputStream; + +/** + * Class for input from a file in a {@link RemoteDirectory}. Used for all input operations from the remote store. + * Currently, only methods from {@link IndexInput} that are required for reading a file from remote store are + * implemented. Remaining methods will be implemented as we open up remote store for other use cases like replication, + * peer recovery etc. + * ToDo: Extend ChecksumIndexInput + * @see RemoteDirectory + */ +public class RemoteIndexInput extends IndexInput { + + private final InputStream inputStream; + private final long size; + + public RemoteIndexInput(String name, InputStream inputStream, long size) { + super(name); + this.inputStream = inputStream; + this.size = size; + } + + @Override + public byte readByte() throws IOException { + byte[] buffer = new byte[1]; + inputStream.read(buffer); + return buffer[0]; + } + + @Override + public void readBytes(byte[] b, int offset, int len) throws IOException { + inputStream.read(b, offset, len); + } + + @Override + public void close() throws IOException { + inputStream.close(); + } + + @Override + public long length() { + return size; + } + + @Override + public void seek(long pos) throws IOException { + inputStream.skip(pos); + } + + /** + * Guaranteed to throw an exception and leave the RemoteIndexInput unmodified. + * This method is not implemented as it is not used for the file transfer to/from the remote store. + * + * @throws UnsupportedOperationException always + */ + @Override + public long getFilePointer() { + throw new UnsupportedOperationException(); + } + + /** + * Guaranteed to throw an exception and leave the RemoteIndexInput unmodified. + * This method is not implemented as it is not used for the file transfer to/from the remote store. + * + * @throws UnsupportedOperationException always + */ + @Override + public IndexInput slice(String sliceDescription, long offset, long length) throws IOException { + throw new UnsupportedOperationException(); + } +} diff --git a/server/src/main/java/org/opensearch/index/store/RemoteIndexOutput.java b/server/src/main/java/org/opensearch/index/store/RemoteIndexOutput.java new file mode 100644 index 0000000000000..224290b3d06ef --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/RemoteIndexOutput.java @@ -0,0 +1,97 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.apache.lucene.store.DataInput; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.lucene.store.InputStreamIndexInput; + +import java.io.IOException; + +/** + * Class for output to a file in a {@link RemoteDirectory}. Used for all output operations to the remote store. + * Currently, only methods from {@link IndexOutput} that are required for uploading a segment file to remote store are + * implemented. Remaining methods will be implemented as we open up remote store for other use cases like replication, + * peer recovery etc. + * ToDo: Extend ChecksumIndexInput + * @see RemoteDirectory + */ +public class RemoteIndexOutput extends IndexOutput { + + private final BlobContainer blobContainer; + + public RemoteIndexOutput(String name, BlobContainer blobContainer) { + super(name, name); + this.blobContainer = blobContainer; + } + + @Override + public void copyBytes(DataInput input, long numBytes) throws IOException { + assert input instanceof IndexInput : "input should be instance of IndexInput"; + blobContainer.writeBlob(getName(), new InputStreamIndexInput((IndexInput) input, numBytes), numBytes, false); + } + + /** + * This is a no-op. Once segment file upload to the remote store is complete, we don't need to explicitly close + * the stream. It is taken care by internal APIs of client of the remote store. + */ + @Override + public void close() throws IOException { + // do nothing + } + + /** + * Guaranteed to throw an exception and leave the RemoteIndexOutput unmodified. + * This method is not implemented as it is not used for the file transfer to/from the remote store. + * + * @throws UnsupportedOperationException always + */ + @Override + public void writeByte(byte b) throws IOException { + throw new UnsupportedOperationException(); + } + + /** + * Guaranteed to throw an exception and leave the RemoteIndexOutput unmodified. + * This method is not implemented as it is not used for the file transfer to/from the remote store. + * + * @throws UnsupportedOperationException always + */ + @Override + public void writeBytes(byte[] byteArray, int offset, int length) throws IOException { + throw new UnsupportedOperationException(); + } + + /** + * Guaranteed to throw an exception and leave the RemoteIndexOutput unmodified. + * This method is not implemented as it is not used for the file transfer to/from the remote store. + * + * @throws UnsupportedOperationException always + */ + @Override + public long getFilePointer() { + throw new UnsupportedOperationException(); + } + + /** + * Guaranteed to throw an exception and leave the RemoteIndexOutput unmodified. + * This method is not implemented as it is not directly used for the file transfer to/from the remote store. + * But the checksum is important to verify integrity of the data and that means implementing this method will + * be required for the segment upload as well. + * + * @throws UnsupportedOperationException always + */ + @Override + public long getChecksum() throws IOException { + throw new UnsupportedOperationException(); + } + +} diff --git a/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java new file mode 100644 index 0000000000000..c2c365d9140df --- /dev/null +++ b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java @@ -0,0 +1,158 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.junit.Before; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobMetadata; +import org.opensearch.common.blobstore.support.PlainBlobMetadata; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.NoSuchFileException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.mockito.Mockito.*; + +public class RemoteDirectoryTests extends OpenSearchTestCase { + private BlobContainer blobContainer; + + private RemoteDirectory remoteDirectory; + + @Before + public void setup() { + blobContainer = mock(BlobContainer.class); + remoteDirectory = new RemoteDirectory(blobContainer); + } + + public void testListAllEmpty() throws IOException { + when(blobContainer.listBlobs()).thenReturn(Collections.emptyMap()); + + String[] actualFileNames = remoteDirectory.listAll(); + String[] expectedFileName = new String[] {}; + assertArrayEquals(expectedFileName, actualFileNames); + } + + public void testListAll() throws IOException { + Map fileNames = Stream.of("abc", "xyz", "pqr", "lmn", "jkl") + .collect(Collectors.toMap(filename -> filename, filename -> new PlainBlobMetadata(filename, 100))); + + when(blobContainer.listBlobs()).thenReturn(fileNames); + + String[] actualFileNames = remoteDirectory.listAll(); + String[] expectedFileName = new String[] { "abc", "jkl", "lmn", "pqr", "xyz" }; + assertArrayEquals(expectedFileName, actualFileNames); + } + + public void testListAllException() throws IOException { + when(blobContainer.listBlobs()).thenThrow(new IOException("Error reading blob store")); + + assertThrows(IOException.class, () -> remoteDirectory.listAll()); + } + + public void testDeleteFile() throws IOException { + remoteDirectory.deleteFile("segment_1"); + + verify(blobContainer).deleteBlobsIgnoringIfNotExists(Collections.singletonList("segment_1")); + } + + public void testDeleteFileException() throws IOException { + doThrow(new IOException("Error writing to blob store")).when(blobContainer) + .deleteBlobsIgnoringIfNotExists(Collections.singletonList("segment_1")); + + assertThrows(IOException.class, () -> remoteDirectory.deleteFile("segment_1")); + } + + public void testCreateOutput() { + IndexOutput indexOutput = remoteDirectory.createOutput("segment_1", IOContext.DEFAULT); + assertTrue(indexOutput instanceof RemoteIndexOutput); + assertEquals("segment_1", indexOutput.getName()); + } + + public void testOpenInput() throws IOException { + InputStream mockInputStream = mock(InputStream.class); + when(blobContainer.readBlob("segment_1")).thenReturn(mockInputStream); + Map fileInfo = new HashMap<>(); + fileInfo.put("segment_1", new PlainBlobMetadata("segment_1", 100)); + when(blobContainer.listBlobsByPrefix("segment_1")).thenReturn(fileInfo); + + IndexInput indexInput = remoteDirectory.openInput("segment_1", IOContext.DEFAULT); + assertTrue(indexInput instanceof RemoteIndexInput); + assertEquals(100, indexInput.length()); + } + + public void testOpenInputIOException() throws IOException { + when(blobContainer.readBlob("segment_1")).thenThrow(new IOException("Error while reading")); + + assertThrows(IOException.class, () -> remoteDirectory.openInput("segment_1", IOContext.DEFAULT)); + } + + public void testOpenInputNoSuchFileException() throws IOException { + InputStream mockInputStream = mock(InputStream.class); + when(blobContainer.readBlob("segment_1")).thenReturn(mockInputStream); + when(blobContainer.listBlobsByPrefix("segment_1")).thenThrow(new NoSuchFileException("segment_1")); + + assertThrows(NoSuchFileException.class, () -> remoteDirectory.openInput("segment_1", IOContext.DEFAULT)); + } + + public void testClose() throws IOException { + remoteDirectory.close(); + + verify(blobContainer).delete(); + } + + public void testCloseIOException() throws IOException { + when(blobContainer.delete()).thenThrow(new IOException("Error while writing to blob store")); + + assertThrows(IOException.class, () -> remoteDirectory.close()); + } + + public void testFileLength() throws IOException { + Map fileInfo = new HashMap<>(); + fileInfo.put("segment_1", new PlainBlobMetadata("segment_1", 100)); + when(blobContainer.listBlobsByPrefix("segment_1")).thenReturn(fileInfo); + + assertEquals(100, remoteDirectory.fileLength("segment_1")); + } + + public void testFileLengthIOException() throws IOException { + when(blobContainer.listBlobsByPrefix("segment_1")).thenThrow(new NoSuchFileException("segment_1")); + + assertThrows(IOException.class, () -> remoteDirectory.fileLength("segment_1")); + } + + public void testGetPendingDeletions() { + assertThrows(UnsupportedOperationException.class, () -> remoteDirectory.getPendingDeletions()); + } + + public void testCreateTempOutput() { + assertThrows(UnsupportedOperationException.class, () -> remoteDirectory.createTempOutput("segment_1", "tmp", IOContext.DEFAULT)); + } + + public void testSync() { + assertThrows(UnsupportedOperationException.class, () -> remoteDirectory.sync(Collections.emptyList())); + } + + public void testRename() { + assertThrows(UnsupportedOperationException.class, () -> remoteDirectory.rename("segment_1", "segment_2")); + } + + public void testObtainLock() { + assertThrows(UnsupportedOperationException.class, () -> remoteDirectory.obtainLock("segment_1")); + } + +} diff --git a/server/src/test/java/org/opensearch/index/store/RemoteIndexInputTests.java b/server/src/test/java/org/opensearch/index/store/RemoteIndexInputTests.java new file mode 100644 index 0000000000000..c2f81c035e424 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/store/RemoteIndexInputTests.java @@ -0,0 +1,99 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.junit.Before; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.io.InputStream; + +import static org.mockito.Mockito.*; + +public class RemoteIndexInputTests extends OpenSearchTestCase { + + private static final String FILENAME = "segment_1"; + private static final long FILESIZE = 200; + + private InputStream inputStream; + private RemoteIndexInput remoteIndexInput; + + @Before + public void setup() { + inputStream = mock(InputStream.class); + remoteIndexInput = new RemoteIndexInput(FILENAME, inputStream, FILESIZE); + } + + public void testReadByte() throws IOException { + InputStream inputStream = spy(InputStream.class); + remoteIndexInput = new RemoteIndexInput(FILENAME, inputStream, FILESIZE); + + when(inputStream.read()).thenReturn(10); + + assertEquals(10, remoteIndexInput.readByte()); + + verify(inputStream).read(any()); + } + + public void testReadByteIOException() throws IOException { + when(inputStream.read(any())).thenThrow(new IOException("Error reading")); + + assertThrows(IOException.class, () -> remoteIndexInput.readByte()); + } + + public void testReadBytes() throws IOException { + byte[] buffer = new byte[10]; + remoteIndexInput.readBytes(buffer, 10, 20); + + verify(inputStream).read(buffer, 10, 20); + } + + public void testReadBytesIOException() throws IOException { + byte[] buffer = new byte[10]; + when(inputStream.read(buffer, 10, 20)).thenThrow(new IOException("Error reading")); + + assertThrows(IOException.class, () -> remoteIndexInput.readBytes(buffer, 10, 20)); + } + + public void testClose() throws IOException { + remoteIndexInput.close(); + + verify(inputStream).close(); + } + + public void testCloseIOException() throws IOException { + doThrow(new IOException("Error closing")).when(inputStream).close(); + + assertThrows(IOException.class, () -> remoteIndexInput.close()); + } + + public void testLength() { + assertEquals(FILESIZE, remoteIndexInput.length()); + } + + public void testSeek() throws IOException { + remoteIndexInput.seek(10); + + verify(inputStream).skip(10); + } + + public void testSeekIOException() throws IOException { + when(inputStream.skip(10)).thenThrow(new IOException("Error reading")); + + assertThrows(IOException.class, () -> remoteIndexInput.seek(10)); + } + + public void testGetFilePointer() { + assertThrows(UnsupportedOperationException.class, () -> remoteIndexInput.getFilePointer()); + } + + public void testSlice() { + assertThrows(UnsupportedOperationException.class, () -> remoteIndexInput.slice("Slice middle", 50, 100)); + } +} diff --git a/server/src/test/java/org/opensearch/index/store/RemoteIndexOutputTests.java b/server/src/test/java/org/opensearch/index/store/RemoteIndexOutputTests.java new file mode 100644 index 0000000000000..64975f2ac4892 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/store/RemoteIndexOutputTests.java @@ -0,0 +1,68 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.apache.lucene.store.IndexInput; +import org.junit.Before; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.lucene.store.InputStreamIndexInput; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.*; + +public class RemoteIndexOutputTests extends OpenSearchTestCase { + private static final String FILENAME = "segment_1"; + + private BlobContainer blobContainer; + + private RemoteIndexOutput remoteIndexOutput; + + @Before + public void setup() { + blobContainer = mock(BlobContainer.class); + remoteIndexOutput = new RemoteIndexOutput(FILENAME, blobContainer); + } + + public void testCopyBytes() throws IOException { + IndexInput indexInput = mock(IndexInput.class); + remoteIndexOutput.copyBytes(indexInput, 100); + + verify(blobContainer).writeBlob(eq(FILENAME), any(InputStreamIndexInput.class), eq(100L), eq(false)); + } + + public void testCopyBytesIOException() throws IOException { + doThrow(new IOException("Error writing")).when(blobContainer) + .writeBlob(eq(FILENAME), any(InputStreamIndexInput.class), eq(100L), eq(false)); + + IndexInput indexInput = mock(IndexInput.class); + assertThrows(IOException.class, () -> remoteIndexOutput.copyBytes(indexInput, 100)); + } + + public void testWriteByte() { + byte b = 10; + assertThrows(UnsupportedOperationException.class, () -> remoteIndexOutput.writeByte(b)); + } + + public void testWriteBytes() { + byte[] buffer = new byte[10]; + assertThrows(UnsupportedOperationException.class, () -> remoteIndexOutput.writeBytes(buffer, 50, 60)); + } + + public void testGetFilePointer() { + assertThrows(UnsupportedOperationException.class, () -> remoteIndexOutput.getFilePointer()); + } + + public void testGetChecksum() { + assertThrows(UnsupportedOperationException.class, () -> remoteIndexOutput.getChecksum()); + } +}