Skip to content

Commit

Permalink
Check if put.getSchemaId is positive, if not default to 1. Current re…
Browse files Browse the repository at this point in the history
…cord is a chunk. We only write to the storage engine for fully assembled records.
  • Loading branch information
Koorous Vargha committed Jan 10, 2024
1 parent 9fe91d7 commit f26e01d
Showing 1 changed file with 7 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -3083,7 +3083,8 @@ private int processKafkaDataMessage(
// update checksum for this PUT message if needed.
partitionConsumptionState.maybeUpdateExpectedChecksum(keyBytes, put);

int putSchemaId = put.getSchemaId();
// Check if put.getSchemaId is positive, if not default to 1
int putSchemaId = put.getSchemaId() > 0 ? put.getSchemaId() : 1;

// Do transformation recompute key, value and partition
if (recordTransformer != null) {
Expand All @@ -3101,6 +3102,11 @@ private int processKafkaDataMessage(
putSchemaId,
compressorFactory.getCompressor(compressionStrategy, kafkaVersionTopic));

// Current record is a chunk. We only write to the storage engine for fully assembled records
if (assembledObject == null) {
break;
}

SchemaEntry keySchema = schemaRepository.getKeySchema(storeName);
Lazy<Object> lazyKey = Lazy.of(() -> deserializeAvroObjectAndReturn(ByteBuffer.wrap(keyBytes), keySchema));
Lazy<Object> lazyValue = Lazy.of(() -> assembledObject);
Expand Down

0 comments on commit f26e01d

Please sign in to comment.