Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reproduce IndexOutOfBoundsException #1036

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

sudeshwasnik
Copy link
Contributor

@sudeshwasnik sudeshwasnik commented Dec 21, 2024

Test testInsertRowsWithGaps_schematization fails with ->

21-12-2024 15:02:46 main INFO  SnowflakeSinkConnectorConfig:46 - [SF_KAFKA_CONNECTOR] buffer.count.records set to default 10000 
21-12-2024 15:02:46 main INFO  SnowflakeSinkConnectorConfig:46 - [SF_KAFKA_CONNECTOR] buffer.size.bytes set to default 5000000 bytes
21-12-2024 15:02:46 main INFO  SnowflakeSinkConnectorConfig:46 - [SF_KAFKA_CONNECTOR] buffer.flush.time set to default 120 seconds
21-12-2024 15:02:46 main INFO  StreamingClientProperties:46 - [SF_KAFKA_CONNECTOR] Streaming Client Config is overridden for max_client_lag=1 second
21-12-2024 15:02:46 main INFO  DirectStreamingClientHandler:46 - [SF_KAFKA_CONNECTOR] Initializing Streaming Client...
21-12-2024 15:02:46 main INFO  RequestBuilder:360 - Default user agent SnowpipeJavaSDK/3.0.0 (Mac OS X 13.7.1 x86_64) JAVA/1.8.0_382
21-12-2024 15:02:46 main INFO  SecurityManager:150 - Successfully created new JWT
21-12-2024 15:02:46 main INFO  RequestBuilder:319 - Creating a RequestBuilder with arguments : Account : CONFLUENT_PARTNER, User : CONNECTSYSTEMTEST, Scheme : https, Host : confluent_partner.snowflakecomputing.com, Port : 443, userAgentSuffix: null
21-12-2024 15:02:46 main INFO  SnowflakeStreamingIngestClientInternal:58 - [SF_INGEST] Using KEYPAIR_JWT for authorization
21-12-2024 15:02:48 main INFO  FlushService:58 - [SF_INGEST] Create 30 threads for build/upload blobs for client=KC_CLIENT_TEST_CONNECTOR_0, total available processors=10
21-12-2024 15:02:48 main INFO  SnowflakeStreamingIngestClientInternal:58 - [SF_INGEST] Client created, name=KC_CLIENT_TEST_CONNECTOR_0, account=confluent_partner. isTestMode=false, parameters=ParameterProvider{parameterMap={max_client_lag=1 second}}
21-12-2024 15:02:48 main INFO  DirectStreamingClientHandler:46 - [SF_KAFKA_CONNECTOR] Successfully initialized Streaming Client:KC_CLIENT_TEST_CONNECTOR with properties [user=connectsystemtest, url=https://confluent_partner.snowflakecomputing.com:443, role=PUBLIC]
21-12-2024 15:02:48 main INFO  StreamingClientProvider:46 - [SF_KAFKA_CONNECTOR] Streaming client optimization is enabled per worker node, KC will reuse valid clients when possible. Returning client with name: KC_CLIENT_TEST_CONNECTOR_0
21-12-2024 15:02:48 main INFO  SnowflakeSinkServiceFactory$SnowflakeSinkServiceBuilder:46 - [SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.internal.streaming.SnowflakeSinkServiceV2 created
21-12-2024 15:02:48 main INFO  SnowflakeSinkServiceV2:46 - [SF_KAFKA_CONNECTOR] Set number of records for buffer threshold to 4
21-12-2024 15:02:48 main INFO  SnowflakeSinkServiceFactory$SnowflakeSinkServiceBuilder:46 - [SF_KAFKA_CONNECTOR] record number is limited to 4
21-12-2024 15:02:48 main INFO  SnowflakeSinkServiceV2:46 - [SF_KAFKA_CONNECTOR] Creating new table kafka_connector_test_table_5835479904459452811.
21-12-2024 15:02:50 main INFO  SnowflakeConnectionServiceV1:46 - [SF_KAFKA_CONNECTOR] Created table kafka_connector_test_table_5835479904459452811 with only RECORD_METADATA column
21-12-2024 15:02:50 main INFO  SnowflakeConnectionServiceV1:46 - [SF_KAFKA_CONNECTOR] Checking schema evolution permission for table kafka_connector_test_table_5835479904459452811
21-12-2024 15:02:51 main INFO  SnowflakeConnectionServiceV1:46 - [SF_KAFKA_CONNECTOR] Table: kafka_connector_test_table_5835479904459452811 has schema evolution permission: true
21-12-2024 15:02:51 main INFO  SnowflakeSinkServiceV2:46 - [SF_KAFKA_CONNECTOR] [SCHEMA_EVOLUTION_CACHE] Setting true for table kafka_connector_test_table_5835479904459452811
21-12-2024 15:02:52 main INFO  SnowflakeConnectionServiceV1:46 - [SF_KAFKA_CONNECTOR] Migrate OffsetToken response for table:kafka_connector_test_table_5835479904459452811, sourceChannel:TEST_CONNECTOR_kafka_connector_test_table_5835479904459452811_0, destinationChannel:kafka_connector_test_table_5835479904459452811_0 is:ChannelMigrateOffsetTokenResponseDTO{responseCode=51, responseMessage='Source Channel does not exist for Offset Migration'}
21-12-2024 15:02:52 main INFO  BufferedTopicPartitionChannel:46 - [SF_KAFKA_CONNECTOR] Opening a channel with name:kafka_connector_test_table_5835479904459452811_0 for table name:kafka_connector_test_table_5835479904459452811
21-12-2024 15:02:52 main INFO  SnowflakeStreamingIngestClientInternal:58 - [SF_INGEST] Open channel request succeeded, channel=kafka_connector_test_table_5835479904459452811_0, table=TEST_DB_NEW.PUBLIC.kafka_connector_test_table_5835479904459452811, clientSequencer=0, rowSequencer=0, client=KC_CLIENT_TEST_CONNECTOR_0
21-12-2024 15:02:52 main INFO  SnowflakeStreamingIngestChannelInternal:58 - [SF_INGEST] Channel=KAFKA_CONNECTOR_TEST_TABLE_5835479904459452811_0 created for table=KAFKA_CONNECTOR_TEST_TABLE_5835479904459452811
21-12-2024 15:02:53 main INFO  BufferedTopicPartitionChannel:46 - [SF_KAFKA_CONNECTOR] Fetched offsetToken for channelName:TEST_DB_NEW.PUBLIC.KAFKA_CONNECTOR_TEST_TABLE_5835479904459452811.KAFKA_CONNECTOR_TEST_TABLE_5835479904459452811_0, offset:null
21-12-2024 15:02:53 main INFO  BufferedTopicPartitionChannel:46 - [SF_KAFKA_CONNECTOR] TopicPartitionChannel:TEST_DB_NEW.PUBLIC.KAFKA_CONNECTOR_TEST_TABLE_5835479904459452811.KAFKA_CONNECTOR_TEST_TABLE_5835479904459452811_0, offset token is NULL, will rely on Kafka to send us the correct offset instead
21-12-2024 15:02:53 main INFO  SnowflakeSinkServiceFactory$SnowflakeSinkServiceBuilder:46 - [SF_KAFKA_CONNECTOR] create new task in com.snowflake.kafka.connector.internal.SnowflakeSinkService - table: kafka_connector_test_table_5835479904459452811, topicPartition: kafka_connector_test_table_5835479904459452811-0
21-12-2024 15:02:53 main INFO  SnowflakeSinkServiceFactory$SnowflakeSinkServiceBuilder:46 - [SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.internal.SnowflakeSinkService created
21-12-2024 15:02:53 main INFO  BufferedTopicPartitionChannel:46 - [SF_KAFKA_CONNECTOR] Triggering schema evolution. Items: SchemaEvolutionTargetItems{tableName='KAFKA_CONNECTOR_TEST_TABLE_5835479904459452811', nonNullableColumns=[], extraColNames=["GENDER", "REGIONID"]}
Dec 21, 2024 3:02:53 PM net.snowflake.client.jdbc.SnowflakeConnectionV1 initConnectionWithImpl
INFO: Initializing new connection
Dec 21, 2024 3:02:53 PM net.snowflake.client.core.SFSession open
INFO: Opening session with server: https://confluent_partner.snowflakecomputing.com:443/, account: confluent_partner, user: connectsystemtest, password is not provided, role: null, database: TEST_DB_NEW, schema: PUBLIC, warehouse: DEMO_WH, validate default parameters: null, authenticator: snowflake_jwt, ocsp mode: FAIL_OPEN, passcode in password: null, passcode is not provided, private key is provided, disable socks proxy: null, application: null, app id: JDBC, app version: 3.20.0, login timeout: null, retry timeout: null, network timeout: null, query timeout: null, connection timeout: null, socket timeout: null, tracing: null, private key file: null, private key pwd is not provided, enable_diagnostics: not provided, diagnostics_allowlist_path: null, session parameters: client store temporary credential: null, gzip disabled: null, browser response timeout: null
Dec 21, 2024 3:02:53 PM net.snowflake.client.core.SFBaseSession logHttpClientInitInfo
INFO: Driver OCSP mode: FAIL_OPEN, gzip disabled: false and no proxy
Dec 21, 2024 3:02:53 PM net.snowflake.client.core.SFSession open
INFO: Connecting to GLOBAL Snowflake domain
Dec 21, 2024 3:02:53 PM net.snowflake.client.core.SFSession open
INFO: Session 2804867217548378 opened in 381 ms.
Dec 21, 2024 3:02:53 PM net.snowflake.client.jdbc.SnowflakeConnectionV1 initConnectionWithImpl
INFO: Connection initialized successfully in 395 ms. Session id: 2804867217548378
21-12-2024 15:02:53 ingest-flush-thread INFO  FlushService:58 - [SF_INGEST] buildAndUpload task added for client=KC_CLIENT_TEST_CONNECTOR_0, blob=net.snowflake.ingest.streaming.internal.BlobPath@28154b51, buildUploadWorkers stats=java.util.concurrent.ThreadPoolExecutor@4977836e[Running, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]
21-12-2024 15:02:53 ingest-build-upload-thread-0 WARN  NativeCodeLoader:60 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21-12-2024 15:02:54 ingest-build-upload-thread-0 INFO  CodecPool:153 - Got brand-new compressor [.gz]

java.lang.IndexOutOfBoundsException: Index: 301, Size: 4

	at java.util.ArrayList.rangeCheck(ArrayList.java:659)
	at java.util.ArrayList.get(ArrayList.java:435)
	at com.snowflake.kafka.connector.internal.streaming.BufferedTopicPartitionChannel$StreamingBuffer.getSinkRecord(BufferedTopicPartitionChannel.java:1383)
	at com.snowflake.kafka.connector.internal.streaming.BufferedTopicPartitionChannel$InsertRowsApiResponseSupplier.get(BufferedTopicPartitionChannel.java:687)
	at com.snowflake.kafka.connector.internal.streaming.BufferedTopicPartitionChannel$InsertRowsApiResponseSupplier.get(BufferedTopicPartitionChannel.java:609)
	at dev.failsafe.Functions.lambda$toCtxSupplier$11(Functions.java:243)
	at dev.failsafe.Functions.lambda$get$0(Functions.java:46)
	at dev.failsafe.internal.FallbackExecutor.lambda$apply$0(FallbackExecutor.java:51)
	at dev.failsafe.SyncExecutionImpl.executeSync(SyncExecutionImpl.java:187)
	at dev.failsafe.FailsafeExecutor.call(FailsafeExecutor.java:376)
	at dev.failsafe.FailsafeExecutor.get(FailsafeExecutor.java:112)
	at com.snowflake.kafka.connector.internal.streaming.BufferedTopicPartitionChannel.insertRowsWithFallback(BufferedTopicPartitionChannel.java:599)
	at com.snowflake.kafka.connector.internal.streaming.BufferedTopicPartitionChannel.insertRecords(BufferedTopicPartitionChannel.java:525)
	at com.snowflake.kafka.connector.internal.streaming.BufferedTopicPartitionChannel.insertRecord(BufferedTopicPartitionChannel.java:418)
	at com.snowflake.kafka.connector.internal.streaming.SnowflakeSinkServiceV2.insert(SnowflakeSinkServiceV2.java:410)
	at com.snowflake.kafka.connector.internal.streaming.SnowflakeSinkServiceV2.insert(SnowflakeSinkServiceV2.java:377)
	at com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannelIT.testInsertRowsWithGaps(TopicPartitionChannelIT.java:801)
	at com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannelIT.testInsertRowsWithGaps_schematization(TopicPartitionChannelIT.java:746)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
	at java.util.ArrayList.forEach(ArrayList.java:1259)
	at java.util.ArrayList.forEach(ArrayList.java:1259)

Originally -> Exception trace on production connector ->

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:657)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:348)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:247)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:216)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:247)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:302)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$7(Plugins.java:339)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.lang.IndexOutOfBoundsException: Index 4293 out of bounds for length 3159
at java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:100)
at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:106)
at java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:302)
at java.base/java.util.Objects.checkIndex(Objects.java:385)
at java.base/java.util.ArrayList.get(ArrayList.java:427)
at com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel$StreamingBuffer.getSinkRecord(TopicPartitionChannel.java:1341)
at com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel$InsertRowsApiResponseSupplier.get(TopicPartitionChannel.java:697)
at com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel$InsertRowsApiResponseSupplier.get(TopicPartitionChannel.java:625)
at dev.failsafe.Functions.lambda$toCtxSupplier$11(Functions.java:243)
at dev.failsafe.Functions.lambda$get$0(Functions.java:46)
at dev.failsafe.internal.FallbackExecutor.lambda$apply$0(FallbackExecutor.java:51)
at dev.failsafe.SyncExecutionImpl.executeSync(SyncExecutionImpl.java:182)
at dev.failsafe.FailsafeExecutor.call(FailsafeExecutor.java:438)
at dev.failsafe.FailsafeExecutor.get(FailsafeExecutor.java:115)
at com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel.insertRowsWithFallback(TopicPartitionChannel.java:619)
at com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel.insertBufferedRecords(TopicPartitionChannel.java:545)
at com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel.insertRecordToBuffer(TopicPartitionChannel.java:423)
at com.snowflake.kafka.connector.internal.streaming.SnowflakeSinkServiceV2.insert(SnowflakeSinkServiceV2.java:325)
at com.snowflake.kafka.connector.internal.streaming.SnowflakeSinkServiceV2.insert(SnowflakeSinkServiceV2.java:292)
at com.snowflake.kafka.connector.SnowflakeSinkTask.put(SnowflakeSinkTask.java:313)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:623)
... 11 more

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant