Skip to content

Commit

Permalink
Add refactoring changes for index routing table stream
Browse files Browse the repository at this point in the history
Signed-off-by: Arpit Bandejiya <[email protected]>
  • Loading branch information
Arpit-Bandejiya committed May 16, 2024
1 parent 54f1f3d commit e3f6da3
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,7 @@ public Builder diffManifest(ClusterDiffManifest diffManifest) {
public Builder() {
indices = new ArrayList<>();
customMetadataMap = new HashMap<>();
indicesRouting = new ArrayList<>();
}

public Builder(ClusterMetadataManifest manifest) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ public IndexRoutingTableInputStream(IndexRoutingTable indexRoutingTable) throws
this(indexRoutingTable, BUFFER_SIZE);
}

public IndexRoutingTableInputStream(IndexRoutingTable indexRoutingTable, int size)
throws IOException {
public IndexRoutingTableInputStream(IndexRoutingTable indexRoutingTable, int size) throws IOException {
this.buf = new byte[size];
this.shardIter = indexRoutingTable.iterator();
this.indexRoutingTableHeader = new IndexRoutingTableHeader(indexRoutingTable.getIndex().getName());
Expand All @@ -91,34 +90,35 @@ private void initialFill(int shardCount) throws IOException {
indexRoutingTableHeader.write(out);
out.writeVInt(shardCount);

System.arraycopy(bytesStreamOutput.bytes().toBytesRef().bytes, 0 , buf, 0, bytesStreamOutput.bytes().length());
System.arraycopy(bytesStreamOutput.bytes().toBytesRef().bytes, 0, buf, 0, bytesStreamOutput.bytes().length());
count = bytesStreamOutput.bytes().length();
bytesStreamOutput.reset();
fill(buf);
}

private void fill(byte[] buf) throws IOException {
if (leftOverBuf != null) {
if(leftOverBuf.length > buf.length - count) {
// leftOverBuf has more content than length of buf, so we need to copy only based on buf length and keep the remaining in leftOverBuf.
if (leftOverBuf.length > buf.length - count) {
// leftOverBuf has more content than length of buf, so we need to copy only based on buf length and keep the remaining in
// leftOverBuf.
System.arraycopy(leftOverBuf, 0, buf, count, buf.length - count);
byte[] tempLeftOverBuffer = new byte[leftOverBuf.length - (buf.length - count)];
System.arraycopy(leftOverBuf, buf.length - count , tempLeftOverBuffer, 0, leftOverBuf.length - (buf.length - count));
byte[] tempLeftOverBuffer = new byte[leftOverBuf.length - (buf.length - count)];
System.arraycopy(leftOverBuf, buf.length - count, tempLeftOverBuffer, 0, leftOverBuf.length - (buf.length - count));
leftOverBuf = tempLeftOverBuffer;
count = buf.length - count;

} else {
System.arraycopy(leftOverBuf, 0, buf, count, leftOverBuf.length);
count += leftOverBuf.length;
count += leftOverBuf.length;
leftOverBuf = null;
}
}

if (count < buf.length && shardIter.hasNext()) {
IndexShardRoutingTable next = shardIter.next();
IndexShardRoutingTable.Builder.writeTo(next, out);
//Add checksum for the file after all shards are done
if(!shardIter.hasNext()) {
// Add checksum for the file after all shards are done
if (!shardIter.hasNext()) {
out.writeLong(out.getChecksum());
}
out.flush();
Expand All @@ -132,7 +132,7 @@ private void fill(byte[] buf) throws IOException {
} else {
System.arraycopy(bytesRef.toBytesRef().bytes, 0, buf, count, buf.length - count);
leftOverBuf = new byte[bytesRef.length() - (buf.length - count)];
System.arraycopy(bytesRef.toBytesRef().bytes, buf.length - count , leftOverBuf, 0, bytesRef.length() - (buf.length - count));
System.arraycopy(bytesRef.toBytesRef().bytes, buf.length - count, leftOverBuf, 0, bytesRef.length() - (buf.length - count));
count = buf.length;

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,13 @@
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.common.io.stream.BufferedChecksumStreamInput;
import org.opensearch.core.common.io.stream.BytesStreamInput;
import org.opensearch.core.common.io.stream.InputStreamStreamInput;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.index.Index;

import java.io.BufferedReader;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class IndexRoutingTableInputStreamReader {

Expand All @@ -34,32 +28,31 @@ public class IndexRoutingTableInputStreamReader {
private static final Logger logger = LogManager.getLogger(IndexRoutingTableInputStreamReader.class);

public IndexRoutingTableInputStreamReader(InputStream inputStream) throws IOException {
this.streamInput = new InputStreamStreamInput(inputStream);
streamInput = new InputStreamStreamInput(inputStream);
}

public Map<String, IndexShardRoutingTable> read() throws IOException {
public IndexRoutingTable readIndexRoutingTable(Index index) throws IOException {
try {
try (BufferedChecksumStreamInput in = new BufferedChecksumStreamInput(streamInput, "assertion")) {
// Read the Table Header first
IndexRoutingTableHeader.read(in);
int shards = in.readVInt();
logger.info("Number of Index Routing Table {}", shards);
Map<String, IndexShardRoutingTable> indicesRouting = new HashMap<String, IndexShardRoutingTable>(Collections.EMPTY_MAP);
for(int i=0; i<shards; i++)
{
// Read the Table Header first and confirm the index
IndexRoutingTableHeader indexRoutingTableHeader = IndexRoutingTableHeader.read(in);
assert indexRoutingTableHeader.getIndexName().equals(index.getName());

int numberOfShardRouting = in.readVInt();
logger.debug("Number of Index Routing Table {}", numberOfShardRouting);
IndexRoutingTable.Builder indicesRoutingTable = IndexRoutingTable.builder(index);
for (int idx = 0; idx < numberOfShardRouting; idx++) {
IndexShardRoutingTable indexShardRoutingTable = IndexShardRoutingTable.Builder.readFrom(in);
logger.info("Index Shard Routing Table reading {}", indexShardRoutingTable);
indicesRouting.put(indexShardRoutingTable.getShardId().getIndexName(), indexShardRoutingTable);
logger.debug("Index Shard Routing Table reading {}", indexShardRoutingTable);
indicesRoutingTable.addIndexShard(indexShardRoutingTable);

}
verifyCheckSum(in);
// Return indices Routing table
return indicesRouting;
return indicesRoutingTable.build();
}
} catch (EOFException e) {
throw new IOException("Indices Routing table is corrupted", e);
}

}

private void verifyCheckSum(BufferedChecksumStreamInput in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,34 +8,21 @@

package org.opensearch.gateway.remote.routingtable;

import org.apache.lucene.util.BytesRef;
import org.opensearch.Version;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.index.seqno.ReplicationTrackerTestCase;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;
import java.io.InputStream;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasToString;
import java.util.concurrent.atomic.AtomicInteger;

public class IndexRoutingTableInputStreamTests extends OpenSearchTestCase {

public void testRoutingTableInputStream(){
public void testRoutingTableInputStream() {
Metadata metadata = Metadata.builder()
.put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
.build();
Expand All @@ -47,15 +34,40 @@ public void testRoutingTableInputStream(){
InputStream indexRoutingStream = new IndexRoutingTableInputStream(indexShardRoutingTables);

IndexRoutingTableInputStreamReader reader = new IndexRoutingTableInputStreamReader(indexRoutingStream);
Map<String, IndexShardRoutingTable> indexShardRoutingTableMap = reader.read();
IndexRoutingTable indexRoutingTable = reader.readIndexRoutingTable(metadata.index("test").getIndex());

assertEquals(1, indexShardRoutingTableMap.size());
assertNotNull(indexShardRoutingTableMap.get("test"));
assertEquals(2,indexShardRoutingTableMap.get("test").shards().size());
assertEquals(1, indexRoutingTable.getShards().size());
assertEquals(indexRoutingTable.getIndex(), metadata.index("test").getIndex());
assertEquals(indexRoutingTable.shardsWithState(ShardRoutingState.UNASSIGNED).size(), 2);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}

public void testRoutingTableInputStreamWithInvalidIndex() {
Metadata metadata = Metadata.builder()
.put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
.put(IndexMetadata.builder("invalid-index").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
.build();

RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build();
AtomicInteger assertionError = new AtomicInteger();
initialRoutingTable.getIndicesRouting().values().forEach(indexShardRoutingTables -> {
try {
InputStream indexRoutingStream = new IndexRoutingTableInputStream(indexShardRoutingTables);

IndexRoutingTableInputStreamReader reader = new IndexRoutingTableInputStreamReader(indexRoutingStream);
reader.readIndexRoutingTable(metadata.index("invalid-index").getIndex());

} catch (AssertionError e) {
assertionError.getAndIncrement();
} catch (IOException e) {
throw new RuntimeException(e);
}
});

assertEquals(1, assertionError.get());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.gateway.remote.RemoteClusterStateService.RemoteStateTransferException;
import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm;
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
import org.opensearch.node.Node;
Expand All @@ -46,6 +45,8 @@

import org.mockito.Mockito;

import static org.opensearch.gateway.remote.RemoteIndexMetadataManager.INDEX_METADATA_UPLOAD_TIMEOUT_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.RemoteStateTransferException;
import static org.opensearch.index.remote.RemoteStoreEnums.PathType.FIXED;
import static org.opensearch.index.remote.RemoteStoreEnums.PathType.HASHED_INFIX;
import static org.opensearch.index.remote.RemoteStoreEnums.PathType.HASHED_PREFIX;
Expand Down Expand Up @@ -276,7 +277,7 @@ public void testInterceptWithLatchAwaitTimeout() throws IOException {

Settings settings = Settings.builder()
.put(this.settings)
.put(RemoteClusterStateService.INDEX_METADATA_UPLOAD_TIMEOUT_SETTING.getKey(), TimeValue.ZERO)
.put(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING.getKey(), TimeValue.ZERO)
.build();
clusterSettings.applySettings(settings);
SetOnce<Exception> exceptionSetOnce = new SetOnce<>();
Expand Down Expand Up @@ -306,7 +307,7 @@ public void testInterceptWithInterruptedExceptionDuringLatchAwait() throws Excep
remoteIndexPathUploader.start();
Settings settings = Settings.builder()
.put(this.settings)
.put(RemoteClusterStateService.INDEX_METADATA_UPLOAD_TIMEOUT_SETTING.getKey(), TimeValue.timeValueSeconds(1))
.put(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING.getKey(), TimeValue.timeValueSeconds(1))
.build();
clusterSettings.applySettings(settings);
SetOnce<Exception> exceptionSetOnce = new SetOnce<>();
Expand Down

0 comments on commit e3f6da3

Please sign in to comment.