Skip to content

Commit

Permalink
Fix indexOutOfBoundException
Browse files Browse the repository at this point in the history
  • Loading branch information
sudeshwasnik committed Dec 21, 2024
1 parent bf4ff0a commit c2ef317
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -641,10 +641,10 @@ public InsertRowsResponse get() throws Throwable {
"Invoking insertRows API for channel:{}, streamingBuffer:{}",
this.channel.getFullyQualifiedName(),
this.insertRowsStreamingBuffer);
Pair<List<Map<String, Object>>, List<Long>> recordsAndOffsets =
Pair<List<Map<String, Object>>, List<SinkRecord>> recordsAndOriginalSinkRecords =
this.insertRowsStreamingBuffer.getData();
List<Map<String, Object>> records = recordsAndOffsets.getKey();
List<Long> offsets = recordsAndOffsets.getValue();
List<Map<String, Object>> records = recordsAndOriginalSinkRecords.getKey();
List<SinkRecord> originalSinkRecords = recordsAndOriginalSinkRecords.getValue();
InsertValidationResponse finalResponse = new InsertValidationResponse();
boolean needToResetOffset = false;
if (!enableSchemaEvolution) {
Expand All @@ -658,16 +658,19 @@ public InsertRowsResponse get() throws Throwable {
// For schema evolution, we need to call the insertRows API row by row in order to
// preserve the original order, for anything after the first schema mismatch error we will
// retry after the evolution
InsertValidationResponse response =
this.channel.insertRow(records.get(idx), Long.toString(offsets.get(idx)));
SinkRecord originalSinkRecord = originalSinkRecords.get(idx);
InsertValidationResponse response = this.channel.insertRow(
records.get(idx), Long.toString(originalSinkRecord.kafkaOffset())
);
if (response.hasErrors()) {
InsertValidationResponse.InsertError insertError = response.getInsertErrors().get(0);
SchemaEvolutionTargetItems schemaEvolutionTargetItems =
insertErrorMapper.mapToSchemaEvolutionItems(
insertError, this.channel.getTableName());

// TODO : originalSinkRecordIdx can be replaced by idx
long originalSinkRecordIdx =
offsets.get(idx) - this.insertRowsStreamingBuffer.getFirstOffset();
originalSinkRecord.kafkaOffset() - this.insertRowsStreamingBuffer.getFirstOffset();

if (!schemaEvolutionTargetItems.hasDataForSchemaEvolution()) {
InsertValidationResponse.InsertError newInsertError =
Expand All @@ -684,7 +687,7 @@ public InsertRowsResponse get() throws Throwable {
LOGGER.info("Triggering schema evolution. Items: {}", schemaEvolutionTargetItems);
schemaEvolutionService.evolveSchemaIfNeeded(
schemaEvolutionTargetItems,
this.insertRowsStreamingBuffer.getSinkRecord(originalSinkRecordIdx),
originalSinkRecord,
channel.getTableSchema());
// Offset reset needed since it's possible that we successfully ingested partial batch
needToResetOffset = true;
Expand Down Expand Up @@ -1282,7 +1285,7 @@ protected long getApproxSizeOfRecordInBytes(SinkRecord kafkaSinkRecord) {
* before calling insertRows API.
*/
@VisibleForTesting
class StreamingBuffer extends PartitionBuffer<Pair<List<Map<String, Object>>, List<Long>>> {
class StreamingBuffer extends PartitionBuffer<Pair<List<Map<String, Object>>, List<SinkRecord>>> {
// Records coming from Kafka
private final List<SinkRecord> sinkRecords;

Expand Down Expand Up @@ -1316,9 +1319,9 @@ public void insert(SinkRecord kafkaSinkRecord) {
* @return A pair that contains the records and their corresponding offsets
*/
@Override
public Pair<List<Map<String, Object>>, List<Long>> getData() {
public Pair<List<Map<String, Object>>, List<SinkRecord>> getData() {
final List<Map<String, Object>> records = new ArrayList<>();
final List<Long> offsets = new ArrayList<>();
final List<SinkRecord> filteredOriginalSinkRecords = new ArrayList<>();

for (SinkRecord kafkaSinkRecord : sinkRecords) {
SinkRecord snowflakeRecord = getSnowflakeSinkRecordFromKafkaRecord(kafkaSinkRecord);
Expand All @@ -1345,7 +1348,7 @@ public Pair<List<Map<String, Object>>, List<Long>> getData() {
Map<String, Object> tableRow =
recordService.getProcessedRecordForStreamingIngest(snowflakeRecord);
records.add(tableRow);
offsets.add(snowflakeRecord.kafkaOffset());
filteredOriginalSinkRecords.add(kafkaSinkRecord);
} catch (JsonProcessingException e) {
LOGGER.warn(
"Record has JsonProcessingException offset:{}, topic:{}",
Expand All @@ -1371,7 +1374,7 @@ public Pair<List<Map<String, Object>>, List<Long>> getData() {
getBufferSizeBytes(),
getFirstOffset(),
getLastOffset());
return new Pair<>(records, offsets);
return new Pair<>(records, filteredOriginalSinkRecords);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -716,22 +716,44 @@ public static List<SinkRecord> createJsonStringSinkRecords(
return records;
}

/* Generate (noOfRecords - startOffset) blank records for a given topic and partition. */
public static List<SinkRecord> createBlankJsonSinkRecords(
final long startOffset,
final long noOfRecords,
final String topicName,
final int partitionNo) {
return createJsonRecords(
startOffset, noOfRecords, topicName, partitionNo, null,
Collections.singletonMap("schemas.enable", Boolean.toString(false))
);
}

/* Generate (noOfRecords - startOffset) for a given topic and partition. */
public static List<SinkRecord> createNativeJsonSinkRecords(
final long startOffset,
final long noOfRecords,
final String topicName,
final int partitionNo) {
ArrayList<SinkRecord> records = new ArrayList<>();
return createJsonRecords(
startOffset, noOfRecords, topicName, partitionNo,
TestUtils.JSON_WITH_SCHEMA.getBytes(StandardCharsets.UTF_8),
Collections.singletonMap("schemas.enable", Boolean.toString(true))
);
}

private static List<SinkRecord> createJsonRecords(
final long startOffset,
final long noOfRecords,
final String topicName,
final int partitionNo,
byte[] value,
Map<String, String> converterConfig
) {
JsonConverter converter = new JsonConverter();
HashMap<String, String> converterConfig = new HashMap<>();
converterConfig.put("schemas.enable", "true");
converter.configure(converterConfig, false);
SchemaAndValue schemaInputValue =
converter.toConnectData(
"test", TestUtils.JSON_WITH_SCHEMA.getBytes(StandardCharsets.UTF_8));
SchemaAndValue schemaInputValue = converter.toConnectData("test", value);

ArrayList<SinkRecord> records = new ArrayList<>();
for (long i = startOffset; i < startOffset + noOfRecords; ++i) {
records.add(
new SinkRecord(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
package com.snowflake.kafka.connector.internal.streaming;

import static com.snowflake.kafka.connector.internal.streaming.ChannelMigrationResponseCode.SUCCESS;
import static com.snowflake.kafka.connector.internal.streaming.ChannelMigrationResponseCode.isChannelMigrationResponseSuccessful;
import static com.snowflake.kafka.connector.internal.streaming.channel.TopicPartitionChannel.NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE;

import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig;
import com.snowflake.kafka.connector.Utils;
import com.snowflake.kafka.connector.dlq.InMemoryKafkaRecordErrorReporter;
import com.snowflake.kafka.connector.internal.SnowflakeConnectionService;
import com.snowflake.kafka.connector.internal.SnowflakeSinkService;
import com.snowflake.kafka.connector.internal.SnowflakeSinkServiceFactory;
import com.snowflake.kafka.connector.internal.TestUtils;
import static com.snowflake.kafka.connector.internal.streaming.ChannelMigrationResponseCode.SUCCESS;
import static com.snowflake.kafka.connector.internal.streaming.ChannelMigrationResponseCode.isChannelMigrationResponseSuccessful;
import com.snowflake.kafka.connector.internal.streaming.channel.TopicPartitionChannel;
import static com.snowflake.kafka.connector.internal.streaming.channel.TopicPartitionChannel.NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE;
import com.snowflake.kafka.connector.internal.streaming.schemaevolution.InsertErrorMapper;
import com.snowflake.kafka.connector.internal.streaming.schemaevolution.snowflake.SnowflakeSchemaEvolutionService;
import com.snowflake.kafka.connector.internal.streaming.telemetry.SnowflakeTelemetryServiceV2;
Expand All @@ -23,9 +22,6 @@
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -510,25 +506,7 @@ public void testPartialBatchChannelInvalidationIngestion_schematization(boolean
final long secondBatchCount = 500;

// create 18 blank records that do not kick off schematization
JsonConverter converter = new JsonConverter();
HashMap<String, String> converterConfig = new HashMap<>();
converterConfig.put("schemas.enable", "false");
converter.configure(converterConfig, false);
SchemaAndValue schemaInputValue = converter.toConnectData("test", null);

List<SinkRecord> firstBatch = new ArrayList<>();
for (int i = 0; i < firstBatchCount; i++) {
firstBatch.add(
new SinkRecord(
topic,
PARTITION,
Schema.STRING_SCHEMA,
"test",
schemaInputValue.schema(),
schemaInputValue.value(),
i));
}

List<SinkRecord> firstBatch = TestUtils.createBlankJsonSinkRecords(0, firstBatchCount, topic, PARTITION);
service.insert(firstBatch);

// send batch with 500, should kick off a record based flush and schematization on record 19,
Expand Down Expand Up @@ -759,53 +737,38 @@ private void testInsertRowsWithGaps(boolean withSchematization, boolean useSingl
SnowflakeSinkConnectorConfig.setDefaultValues(config);
config.put(
SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG,
Boolean.toString(withSchematization));
Boolean.toString(withSchematization)
);

// create tpChannel
SnowflakeSinkService service =
SnowflakeSinkServiceFactory.builder(conn, IngestionMethodConfig.SNOWPIPE_STREAMING, config)
.setRecordNumber(1)
.setRecordNumber(4)
.setErrorReporter(new InMemoryKafkaRecordErrorReporter())
.setSinkTaskContext(new InMemorySinkTaskContext(Collections.singleton(topicPartition)))
.addTask(testTableName, topicPartition)
.build();

// insert blank records that do not evolve schema: 0, 1
JsonConverter converter = new JsonConverter();
HashMap<String, String> converterConfig = new HashMap<>();
converterConfig.put("schemas.enable", "false");
converter.configure(converterConfig, false);
SchemaAndValue schemaInputValue = converter.toConnectData("test", null);
List<SinkRecord> blankRecords = new ArrayList<>();
for (int i = 0; i < 2; i++) {
blankRecords.add(
new SinkRecord(
topic,
PARTITION,
Schema.STRING_SCHEMA,
"test",
schemaInputValue.schema(),
schemaInputValue.value(),
i));
}

service.insert(blankRecords);
TestUtils.assertWithRetry(
() -> service.getOffset(new TopicPartition(topic, PARTITION)) == 2, 20, 5);
List<SinkRecord> blankRecords = TestUtils.createBlankJsonSinkRecords(0, 2, topic, PARTITION);

// Insert another two records with offset gap that requires evolution: 3, 4
List<SinkRecord> gapRecords = TestUtils.createNativeJsonSinkRecords(2, 3, topic, PARTITION);
gapRecords.remove(0);
service.insert(gapRecords);
// Insert another two records with offset gap that requires evolution: 300, 301
List<SinkRecord> gapRecords = TestUtils.createNativeJsonSinkRecords(300, 2, topic, PARTITION);

List<SinkRecord> mergedList = new ArrayList<>(blankRecords);
mergedList.addAll(gapRecords);
// mergedList' offsets -> [0, 1, 300, 301]
service.insert(mergedList);
// With schematization, we need to resend a new batch should succeed even if there is an offset
// gap from the previous committed offset
if (withSchematization) {
service.insert(gapRecords);
service.insert(mergedList);
}

TestUtils.assertWithRetry(
() -> service.getOffset(new TopicPartition(topic, PARTITION)) == 5, 20, 5);
() -> service.getOffset(new TopicPartition(topic, PARTITION)) == 302,
20, 5
);

assert TestUtils.tableSize(testTableName) == 4
: "expected: " + 4 + " actual: " + TestUtils.tableSize(testTableName);
Expand Down

0 comments on commit c2ef317

Please sign in to comment.