Skip to content

Commit

Permalink
Fix pagination for many columns (#2440) (#2441)
Browse files Browse the repository at this point in the history
Signed-off-by: Andreas Kulicke <[email protected]>
  • Loading branch information
andreaskulicke authored Apr 19, 2024
1 parent 0d8341f commit 3f53904
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,14 @@ public OpenSearchIndexScan() {}
public void readExternal(ObjectInput in) throws IOException {
int reqSize = in.readInt();
byte[] requestStream = new byte[reqSize];
in.read(requestStream);
int read = 0;
do {
int currentRead = in.read(requestStream, read, reqSize - read);
if (currentRead == -1) {
throw new IOException();
}
read += currentRead;
} while (read < reqSize);

var engine =
(OpenSearchStorageEngine)
Expand Down Expand Up @@ -137,8 +144,8 @@ public void writeExternal(ObjectOutput out) throws IOException {
var reqAsBytes = reqOut.bytes().toBytesRef().bytes;

// 3. Write out the byte[] to object output stream.
out.writeInt(reqAsBytes.length);
out.write(reqAsBytes);
out.writeInt(reqOut.size());
out.write(reqAsBytes, 0, reqOut.size());

out.writeInt(maxResponseSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,25 @@
import static org.opensearch.search.sort.SortOrder.ASC;
import static org.opensearch.sql.data.type.ExprCoreType.STRING;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.SneakyThrows;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayNameGeneration;
import org.junit.jupiter.api.DisplayNameGenerator;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.jupiter.MockitoExtension;
Expand Down Expand Up @@ -100,19 +107,25 @@ void throws_no_cursor_exception() {
}
}

@Test
@SneakyThrows
void serialize() {
@ParameterizedTest
@ValueSource(ints = {0, 150})
void serialize(Integer numberOfIncludes) {
var searchSourceBuilder = new SearchSourceBuilder().size(4);

var factory = mock(OpenSearchExprValueFactory.class);
var engine = mock(OpenSearchStorageEngine.class);
var index = mock(OpenSearchIndex.class);
when(engine.getClient()).thenReturn(client);
when(engine.getTable(any(), any())).thenReturn(index);
var includes =
Stream.iterate(1, i -> i + 1)
.limit(numberOfIncludes)
.map(i -> "column" + i)
.collect(Collectors.toList());
var request =
new OpenSearchScrollRequest(
INDEX_NAME, CURSOR_KEEP_ALIVE, searchSourceBuilder, factory, List.of());
INDEX_NAME, CURSOR_KEEP_ALIVE, searchSourceBuilder, factory, includes);
request.setScrollId("valid-id");
// make a response, so OpenSearchResponse::isEmpty would return true and unset needClean
var response = mock(SearchResponse.class);
Expand All @@ -131,6 +144,22 @@ void serialize() {
}
}

@SneakyThrows
@Test
void throws_io_exception_if_too_short() {
var request = mock(OpenSearchRequest.class);
ByteArrayOutputStream output = new ByteArrayOutputStream();
ObjectOutputStream objectOutput = new ObjectOutputStream(output);
objectOutput.writeInt(4);
objectOutput.flush();
ObjectInputStream objectInput =
new ObjectInputStream(new ByteArrayInputStream(output.toByteArray()));

try (var indexScan = new OpenSearchIndexScan(client, QUERY_SIZE, request)) {
assertThrows(IOException.class, () -> indexScan.readExternal(objectInput));
}
}

@Test
void plan_for_serialization() {
var request = mock(OpenSearchRequest.class);
Expand Down

0 comments on commit 3f53904

Please sign in to comment.