-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add RemoteDirectory interface to copy segment files to/from remote st…
…ore (#3102) Signed-off-by: Sachin Kale <[email protected]> Co-authored-by: Sachin Kale <[email protected]>
- Loading branch information
1 parent
781fc6e
commit 0f587d2
Showing
6 changed files
with
696 additions
and
0 deletions.
There are no files selected for viewing
191 changes: 191 additions & 0 deletions
191
server/src/main/java/org/opensearch/index/store/RemoteDirectory.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,191 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.index.store; | ||
|
||
import org.apache.lucene.store.Directory; | ||
import org.apache.lucene.store.IOContext; | ||
import org.apache.lucene.store.IndexInput; | ||
import org.apache.lucene.store.IndexOutput; | ||
import org.apache.lucene.store.Lock; | ||
import org.opensearch.common.blobstore.BlobContainer; | ||
import org.opensearch.common.blobstore.BlobMetadata; | ||
|
||
import java.io.FileNotFoundException; | ||
import java.io.IOException; | ||
import java.nio.file.NoSuchFileException; | ||
import java.util.Collection; | ||
import java.util.Collections; | ||
import java.util.Map; | ||
import java.util.Set; | ||
|
||
/** | ||
* A {@code RemoteDirectory} provides an abstraction layer for storing a list of files to a remote store. | ||
* A remoteDirectory contains only files (no sub-folder hierarchy). This class does not support all the methods in | ||
* the Directory interface. Currently, it contains implementation of methods which are used to copy files to/from | ||
* the remote store. Implementation of remaining methods will be added as remote store is integrated with | ||
* replication, peer recovery etc. | ||
*/ | ||
public class RemoteDirectory extends Directory { | ||
|
||
private final BlobContainer blobContainer; | ||
|
||
public RemoteDirectory(BlobContainer blobContainer) { | ||
this.blobContainer = blobContainer; | ||
} | ||
|
||
/** | ||
* Returns names of all files stored in this directory. The output must be in sorted (UTF-16, | ||
* java's {@link String#compareTo}) order. | ||
*/ | ||
@Override | ||
public String[] listAll() throws IOException { | ||
return blobContainer.listBlobs().keySet().stream().sorted().toArray(String[]::new); | ||
} | ||
|
||
/** | ||
* Removes an existing file in the directory. | ||
* | ||
* <p>This method will not throw an exception when the file doesn't exist and simply ignores this case. | ||
* This is a deviation from the {@code Directory} interface where it is expected to throw either | ||
* {@link NoSuchFileException} or {@link FileNotFoundException} if {@code name} points to a non-existing file. | ||
* | ||
* @param name the name of an existing file. | ||
* @throws IOException if the file exists but could not be deleted. | ||
*/ | ||
@Override | ||
public void deleteFile(String name) throws IOException { | ||
// ToDo: Add a check for file existence | ||
blobContainer.deleteBlobsIgnoringIfNotExists(Collections.singletonList(name)); | ||
} | ||
|
||
/** | ||
* Creates and returns a new instance of {@link RemoteIndexOutput} which will be used to copy files to the remote | ||
* store. | ||
* | ||
* <p> In the {@link Directory} interface, it is expected to throw {@link java.nio.file.FileAlreadyExistsException} | ||
* if the file already exists in the remote store. As this method does not open a file, it does not throw the | ||
* exception. | ||
* | ||
* @param name the name of the file to copy to remote store. | ||
*/ | ||
@Override | ||
public IndexOutput createOutput(String name, IOContext context) { | ||
return new RemoteIndexOutput(name, blobContainer); | ||
} | ||
|
||
/** | ||
* Opens a stream for reading an existing file and returns {@link RemoteIndexInput} enclosing the stream. | ||
* | ||
* @param name the name of an existing file. | ||
* @throws IOException in case of I/O error | ||
* @throws NoSuchFileException if the file does not exist | ||
*/ | ||
@Override | ||
public IndexInput openInput(String name, IOContext context) throws IOException { | ||
return new RemoteIndexInput(name, blobContainer.readBlob(name), fileLength(name)); | ||
} | ||
|
||
/** | ||
* Closes the directory by deleting all the files in this directory | ||
*/ | ||
@Override | ||
public void close() throws IOException { | ||
blobContainer.delete(); | ||
} | ||
|
||
/** | ||
* Returns the byte length of a file in the directory. | ||
* | ||
* @param name the name of an existing file. | ||
* @throws IOException in case of I/O error | ||
* @throws NoSuchFileException if the file does not exist | ||
*/ | ||
@Override | ||
public long fileLength(String name) throws IOException { | ||
// ToDo: Instead of calling remote store each time, keep a cache with segment metadata | ||
Map<String, BlobMetadata> metadata = blobContainer.listBlobsByPrefix(name); | ||
if (metadata.containsKey(name)) { | ||
return metadata.get(name).length(); | ||
} | ||
throw new NoSuchFileException(name); | ||
} | ||
|
||
/** | ||
* Guaranteed to throw an exception and leave the directory unmodified. | ||
* Once soft deleting is supported segment files in the remote store, this method will provide details of | ||
* number of files marked as deleted but not actually deleted from the remote store. | ||
* | ||
* @throws UnsupportedOperationException always | ||
*/ | ||
@Override | ||
public Set<String> getPendingDeletions() throws IOException { | ||
throw new UnsupportedOperationException(); | ||
} | ||
|
||
/** | ||
* Guaranteed to throw an exception and leave the directory unmodified. | ||
* Temporary IndexOutput is not required while working with Remote store. | ||
* | ||
* @throws UnsupportedOperationException always | ||
*/ | ||
@Override | ||
public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) { | ||
throw new UnsupportedOperationException(); | ||
} | ||
|
||
/** | ||
* Guaranteed to throw an exception and leave the directory unmodified. | ||
* Segment upload to the remote store will be permanent and does not require a separate sync API. | ||
* This may change in the future if segment upload to remote store happens via cache and we need sync API to write | ||
* the cache contents to the store permanently. | ||
* | ||
* @throws UnsupportedOperationException always | ||
*/ | ||
@Override | ||
public void sync(Collection<String> names) throws IOException { | ||
throw new UnsupportedOperationException(); | ||
} | ||
|
||
/** | ||
* Guaranteed to throw an exception and leave the directory unmodified. | ||
* Once metadata to be stored with each shard is finalized, syncMetaData method will be used to sync the directory | ||
* metadata to the remote store. | ||
* | ||
* @throws UnsupportedOperationException always | ||
*/ | ||
@Override | ||
public void syncMetaData() { | ||
throw new UnsupportedOperationException(); | ||
} | ||
|
||
/** | ||
* Guaranteed to throw an exception and leave the directory unmodified. | ||
* As this method is used by IndexWriter to publish commits, the implementation of this method is required when | ||
* IndexWriter is backed by RemoteDirectory. | ||
* | ||
* @throws UnsupportedOperationException always | ||
*/ | ||
@Override | ||
public void rename(String source, String dest) throws IOException { | ||
throw new UnsupportedOperationException(); | ||
|
||
} | ||
|
||
/** | ||
* Guaranteed to throw an exception and leave the directory unmodified. | ||
* Once locking segment files in remote store is supported, implementation of this method is required with | ||
* remote store specific LockFactory. | ||
* | ||
* @throws UnsupportedOperationException always | ||
*/ | ||
@Override | ||
public Lock obtainLock(String name) throws IOException { | ||
throw new UnsupportedOperationException(); | ||
} | ||
} |
83 changes: 83 additions & 0 deletions
83
server/src/main/java/org/opensearch/index/store/RemoteIndexInput.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.index.store; | ||
|
||
import org.apache.lucene.store.IndexInput; | ||
|
||
import java.io.IOException; | ||
import java.io.InputStream; | ||
|
||
/** | ||
* Class for input from a file in a {@link RemoteDirectory}. Used for all input operations from the remote store. | ||
* Currently, only methods from {@link IndexInput} that are required for reading a file from remote store are | ||
* implemented. Remaining methods will be implemented as we open up remote store for other use cases like replication, | ||
* peer recovery etc. | ||
* ToDo: Extend ChecksumIndexInput | ||
* @see RemoteDirectory | ||
*/ | ||
public class RemoteIndexInput extends IndexInput { | ||
|
||
private final InputStream inputStream; | ||
private final long size; | ||
|
||
public RemoteIndexInput(String name, InputStream inputStream, long size) { | ||
super(name); | ||
this.inputStream = inputStream; | ||
this.size = size; | ||
} | ||
|
||
@Override | ||
public byte readByte() throws IOException { | ||
byte[] buffer = new byte[1]; | ||
inputStream.read(buffer); | ||
return buffer[0]; | ||
} | ||
|
||
@Override | ||
public void readBytes(byte[] b, int offset, int len) throws IOException { | ||
inputStream.read(b, offset, len); | ||
} | ||
|
||
@Override | ||
public void close() throws IOException { | ||
inputStream.close(); | ||
} | ||
|
||
@Override | ||
public long length() { | ||
return size; | ||
} | ||
|
||
@Override | ||
public void seek(long pos) throws IOException { | ||
inputStream.skip(pos); | ||
} | ||
|
||
/** | ||
* Guaranteed to throw an exception and leave the RemoteIndexInput unmodified. | ||
* This method is not implemented as it is not used for the file transfer to/from the remote store. | ||
* | ||
* @throws UnsupportedOperationException always | ||
*/ | ||
@Override | ||
public long getFilePointer() { | ||
throw new UnsupportedOperationException(); | ||
} | ||
|
||
/** | ||
* Guaranteed to throw an exception and leave the RemoteIndexInput unmodified. | ||
* This method is not implemented as it is not used for the file transfer to/from the remote store. | ||
* | ||
* @throws UnsupportedOperationException always | ||
*/ | ||
@Override | ||
public IndexInput slice(String sliceDescription, long offset, long length) throws IOException { | ||
throw new UnsupportedOperationException(); | ||
} | ||
} |
97 changes: 97 additions & 0 deletions
97
server/src/main/java/org/opensearch/index/store/RemoteIndexOutput.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.index.store; | ||
|
||
import org.apache.lucene.store.DataInput; | ||
import org.apache.lucene.store.IndexInput; | ||
import org.apache.lucene.store.IndexOutput; | ||
import org.opensearch.common.blobstore.BlobContainer; | ||
import org.opensearch.common.lucene.store.InputStreamIndexInput; | ||
|
||
import java.io.IOException; | ||
|
||
/** | ||
* Class for output to a file in a {@link RemoteDirectory}. Used for all output operations to the remote store. | ||
* Currently, only methods from {@link IndexOutput} that are required for uploading a segment file to remote store are | ||
* implemented. Remaining methods will be implemented as we open up remote store for other use cases like replication, | ||
* peer recovery etc. | ||
* ToDo: Extend ChecksumIndexInput | ||
* @see RemoteDirectory | ||
*/ | ||
public class RemoteIndexOutput extends IndexOutput { | ||
|
||
private final BlobContainer blobContainer; | ||
|
||
public RemoteIndexOutput(String name, BlobContainer blobContainer) { | ||
super(name, name); | ||
this.blobContainer = blobContainer; | ||
} | ||
|
||
@Override | ||
public void copyBytes(DataInput input, long numBytes) throws IOException { | ||
assert input instanceof IndexInput : "input should be instance of IndexInput"; | ||
blobContainer.writeBlob(getName(), new InputStreamIndexInput((IndexInput) input, numBytes), numBytes, false); | ||
} | ||
|
||
/** | ||
* This is a no-op. Once segment file upload to the remote store is complete, we don't need to explicitly close | ||
* the stream. It is taken care by internal APIs of client of the remote store. | ||
*/ | ||
@Override | ||
public void close() throws IOException { | ||
// do nothing | ||
} | ||
|
||
/** | ||
* Guaranteed to throw an exception and leave the RemoteIndexOutput unmodified. | ||
* This method is not implemented as it is not used for the file transfer to/from the remote store. | ||
* | ||
* @throws UnsupportedOperationException always | ||
*/ | ||
@Override | ||
public void writeByte(byte b) throws IOException { | ||
throw new UnsupportedOperationException(); | ||
} | ||
|
||
/** | ||
* Guaranteed to throw an exception and leave the RemoteIndexOutput unmodified. | ||
* This method is not implemented as it is not used for the file transfer to/from the remote store. | ||
* | ||
* @throws UnsupportedOperationException always | ||
*/ | ||
@Override | ||
public void writeBytes(byte[] byteArray, int offset, int length) throws IOException { | ||
throw new UnsupportedOperationException(); | ||
} | ||
|
||
/** | ||
* Guaranteed to throw an exception and leave the RemoteIndexOutput unmodified. | ||
* This method is not implemented as it is not used for the file transfer to/from the remote store. | ||
* | ||
* @throws UnsupportedOperationException always | ||
*/ | ||
@Override | ||
public long getFilePointer() { | ||
throw new UnsupportedOperationException(); | ||
} | ||
|
||
/** | ||
* Guaranteed to throw an exception and leave the RemoteIndexOutput unmodified. | ||
* This method is not implemented as it is not directly used for the file transfer to/from the remote store. | ||
* But the checksum is important to verify integrity of the data and that means implementing this method will | ||
* be required for the segment upload as well. | ||
* | ||
* @throws UnsupportedOperationException always | ||
*/ | ||
@Override | ||
public long getChecksum() throws IOException { | ||
throw new UnsupportedOperationException(); | ||
} | ||
|
||
} |
Oops, something went wrong.