diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java index 6dab5e6f3..709947ffa 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java @@ -777,4 +777,51 @@ private void testInsertRowsWithGaps(boolean withSchematization) throws Exception : "expected: " + 4 + " actual: " + TestUtils.tableSize(testTableName); service.closeAll(); } + + @Test + public void testInsertRowsWithGaps_schematization_indexOutOfBoundsException() { + // setup + Map config = TestUtils.getConfForStreaming(); + SnowflakeSinkConnectorConfig.setDefaultValues(config); + config.put( + SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG, Boolean.toString(true) + ); + + // create tpChannel + SnowflakeSinkService service = + SnowflakeSinkServiceFactory.builder(conn, IngestionMethodConfig.SNOWPIPE_STREAMING, config) + .setRecordNumber(4) // test must try inserting all 4 rows in one insert iteration. + .setErrorReporter(new InMemoryKafkaRecordErrorReporter()) + .setSinkTaskContext(new InMemorySinkTaskContext(Collections.singleton(topicPartition))) + .addTask(testTableName, topicPartition) + .build(); + + // insert blank records that do not evolve schema: 0, 1 + JsonConverter converter = new JsonConverter(); + HashMap converterConfig = new HashMap<>(); + converterConfig.put("schemas.enable", "false"); + converter.configure(converterConfig, false); + SchemaAndValue schemaInputValue = converter.toConnectData("test", null); + List records = new ArrayList<>(); + for (int i = 0; i < 2; i++) { + records.add( + new SinkRecord( + topic, + PARTITION, + Schema.STRING_SCHEMA, + "test", + schemaInputValue.schema(), + schemaInputValue.value(), + i * 100 + ) + ); + } + + // add records with change in schema with extreme gap in offsets. + records.addAll(TestUtils.createNativeJsonSinkRecords(300, 2, topic, PARTITION)); + + // records' offsets -> [0, 100, 300, 301] + service.insert(records); // will throw IndexOutOfBounds exception + service.closeAll(); + } }