Skip to content

Commit

Permalink
Addressing comments and fixing some proto messages
Browse files Browse the repository at this point in the history
Signed-off-by: Vacha Shah <[email protected]>
  • Loading branch information
VachaShah committed Mar 25, 2024
1 parent 0ab5704 commit 7d1d117
Show file tree
Hide file tree
Showing 71 changed files with 343 additions and 693 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
* @opensearch.api
*/
@ExperimentalApi
public interface ProtobufWriteable {
public interface BytesWriteable {

/**
* Write this into the {@linkplain OutputStream}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

package org.opensearch.core.transport;

import org.opensearch.core.common.io.stream.ProtobufWriteable;
import org.opensearch.core.common.io.stream.BytesWriteable;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.common.transport.TransportAddress;
Expand All @@ -44,7 +44,7 @@
*
* @opensearch.internal
*/
public abstract class TransportMessage implements Writeable, ProtobufWriteable {
public abstract class TransportMessage implements Writeable, BytesWriteable {

private TransportAddress remoteAddress;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@
import org.junit.Before;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -561,12 +560,6 @@ public String executor() {
public TestResponse read(StreamInput in) throws IOException {
return new TestResponse(in);
}

@Override
public TestResponse read(InputStream in) throws IOException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'read'");
}
}
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.opensearch.transport.TransportResponseHandler;

import java.io.IOException;
import java.io.InputStream;
import java.util.Objects;

/**
Expand Down Expand Up @@ -90,10 +89,4 @@ public Response read(StreamInput in) throws IOException {
public String toString() {
return super.toString() + "/" + listener;
}

@Override
public Response read(InputStream in) throws IOException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'read'");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
package org.opensearch.action;

import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.ProtobufWriteable;
import org.opensearch.core.common.io.stream.BytesWriteable;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.transport.TransportResponse;
import org.opensearch.threadpool.ThreadPool;
Expand All @@ -29,20 +29,20 @@
public class ProtobufActionListenerResponseHandler<Response extends TransportResponse> implements TransportResponseHandler<Response> {

private final ActionListener<? super Response> listener;
private final ProtobufWriteable.Reader<Response> reader;
private final BytesWriteable.Reader<Response> reader;
private final String executor;

public ProtobufActionListenerResponseHandler(
ActionListener<? super Response> listener,
ProtobufWriteable.Reader<Response> reader,
BytesWriteable.Reader<Response> reader,
String executor
) {
this.listener = Objects.requireNonNull(listener);
this.reader = Objects.requireNonNull(reader);
this.executor = Objects.requireNonNull(executor);
}

public ProtobufActionListenerResponseHandler(ActionListener<? super Response> listener, ProtobufWriteable.Reader<Response> reader) {
public ProtobufActionListenerResponseHandler(ActionListener<? super Response> listener, BytesWriteable.Reader<Response> reader) {
this(listener, reader, ThreadPool.Names.SAME);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -340,12 +339,6 @@ public void handleResponse(final FieldCapabilitiesIndexResponse response) {
public void handleException(TransportException exp) {
onFailure(shardRouting, exp);
}

@Override
public FieldCapabilitiesIndexResponse read(InputStream in) throws IOException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'read'");
}
}
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.io.InputStream;
import java.util.function.Function;
import java.util.stream.Stream;

Expand Down Expand Up @@ -254,12 +253,6 @@ public void handleResponse(ResyncReplicationResponse response) {
public void handleException(TransportException exp) {
listener.onFailure(exp);
}

@Override
public ResyncReplicationResponse read(InputStream in) throws IOException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'read'");
}
}
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -203,12 +202,6 @@ public String executor() {
public GetAllPitNodesResponse read(StreamInput in) throws IOException {
return new GetAllPitNodesResponse(in);
}

@Override
public GetAllPitNodesResponse read(InputStream in) throws IOException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'read'");
}
}
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lease.Releasables;
import org.opensearch.common.lucene.search.TopDocsAndMaxScore;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.core.common.breaker.CircuitBreaker;
import org.opensearch.core.common.breaker.CircuitBreakingException;
Expand Down Expand Up @@ -114,7 +115,11 @@ public QueryPhaseResultConsumer(

SearchSourceBuilder source = request.source();
this.hasTopDocs = source == null || source.size() != 0;
this.hasAggs = source != null && source.aggregations() != null;
if (FeatureFlags.isEnabled(FeatureFlags.PROTOBUF)) {
this.hasAggs = false;
} else {
this.hasAggs = source != null && source.aggregations() != null;
}
int batchReduceSize = (hasAggs || hasTopDocs) ? Math.min(request.getBatchedReduceSize(), expectedResultSize) : expectedResultSize;
this.pendingMerges = new PendingMerges(batchReduceSize, request.resolveTrackTotalHitsUpTo());
}
Expand Down Expand Up @@ -320,7 +325,7 @@ synchronized long addEstimateAndMaybeBreak(long estimatedSize) {
* provided {@link QuerySearchResult}.
*/
long ramBytesUsedQueryResult(QuerySearchResult result) {
if (hasAggs == false) {
if (hasAggs == false || FeatureFlags.isEnabled(FeatureFlags.PROTOBUF)) {
return 0;
}
return result.aggregations().asSerialized(InternalAggregations::readFrom, namedWriteableRegistry).ramBytesUsed();
Expand Down Expand Up @@ -489,7 +494,7 @@ public synchronized List<TopDocs> consumeTopDocs() {
}

public synchronized List<InternalAggregations> consumeAggs() {
if (hasAggs == false) {
if (hasAggs == false || FeatureFlags.isEnabled(FeatureFlags.PROTOBUF)) {
return Collections.emptyList();
}
List<InternalAggregations> aggsList = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.ProtobufWriteable;
import org.opensearch.core.common.io.stream.BytesWriteable;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
Expand Down Expand Up @@ -248,7 +248,7 @@ public void sendExecuteQuery(
final ActionListener handler = responseWrapper.apply(connection, listener);
TransportResponseHandler transportResponseHandler;
if (FeatureFlags.isEnabled(FeatureFlags.PROTOBUF_SETTING)) {
ProtobufWriteable.Reader<SearchPhaseResult> reader = fetchDocuments ? QueryFetchSearchResult::new : QuerySearchResult::new;
BytesWriteable.Reader<SearchPhaseResult> reader = fetchDocuments ? QueryFetchSearchResult::new : QuerySearchResult::new;
transportResponseHandler = new ProtobufConnectionCountingHandler<>(
handler,
reader,
Expand Down Expand Up @@ -797,7 +797,7 @@ final class ProtobufConnectionCountingHandler<Response extends TransportResponse

ProtobufConnectionCountingHandler(
final ActionListener<? super Response> listener,
final ProtobufWriteable.Reader<Response> responseReader,
final BytesWriteable.Reader<Response> responseReader,
final Map<String, Long> clientConnections,
final String nodeId
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;

Expand Down Expand Up @@ -230,12 +229,6 @@ public void handleResponse(ShardResponse response) {
public void handleException(TransportException e) {
onOperation(shard, shardIt, shardIndex, e);
}

@Override
public ShardResponse read(InputStream in) throws IOException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'read'");
}
}
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -393,13 +392,6 @@ public void handleException(TransportException exp) {
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public TransportBroadcastByNodeAction<Request, Response, ShardOperationResult>.NodeResponse read(InputStream in)
throws IOException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'read'");
}
}
);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -287,12 +286,6 @@ public void handleException(TransportException exp) {
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public NodeResponse read(InputStream in) throws IOException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'read'");
}
}
);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -1170,12 +1169,6 @@ public void handleException(TransportException exp) {
finishWithUnexpectedFailure(e);
}
}

@Override
public Response read(InputStream in) throws IOException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'read'");
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.io.InputStream;

import static org.opensearch.cluster.metadata.IndexNameExpressionResolver.EXCLUDED_DATA_STREAMS_KEY;

Expand Down Expand Up @@ -246,12 +245,6 @@ public void handleException(TransportException exp) {
listener.onFailure(exp);
}
}

@Override
public Response read(InputStream in) throws IOException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'read'");
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.io.InputStream;

import static org.opensearch.action.support.TransportActions.isShardNotAvailableException;

Expand Down Expand Up @@ -226,12 +225,6 @@ public void handleResponse(final Response response) {
public void handleException(TransportException exp) {
listener.onFailure(exp);
}

@Override
public Response read(InputStream in) throws IOException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'read'");
}
}
);
} else {
Expand Down Expand Up @@ -309,12 +302,6 @@ public void handleResponse(final Response response) {
public void handleException(TransportException exp) {
onFailure(finalShardRouting, exp);
}

@Override
public Response read(InputStream in) throws IOException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'read'");
}
}
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -317,14 +316,6 @@ public void handleException(TransportException exp) {
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public
TransportTasksAction<OperationTask, TasksRequest, TasksResponse, TaskResponse>.NodeTasksResponse
read(InputStream in) throws IOException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'read'");
}
}
);
}
Expand Down
Loading

0 comments on commit 7d1d117

Please sign in to comment.