From 370f9c987206ad7d900f4b7149d3ce10e74845d2 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Tue, 26 Sep 2023 01:21:25 +0300 Subject: [PATCH 1/4] chore: add logback to test runtime --- build.gradle | 1 + src/test/resources/logback-test.xml | 12 ++++++++++++ 2 files changed, 13 insertions(+) create mode 100644 src/test/resources/logback-test.xml diff --git a/build.gradle b/build.gradle index 844974a57..1aad4f1a4 100644 --- a/build.gradle +++ b/build.gradle @@ -181,6 +181,7 @@ dependencies { testImplementation "io.confluent:kafka-connect-avro-converter:$confluentPlatformVersion" testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:$junitVersion" + testRuntimeOnly "ch.qos.logback:logback-classic:1.4.11" } test { diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml new file mode 100644 index 000000000..f1d0b0cb6 --- /dev/null +++ b/src/test/resources/logback-test.xml @@ -0,0 +1,12 @@ + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + From 5d885df0164522815338a33fcd1edb180e00eadd Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Tue, 26 Sep 2023 01:21:55 +0300 Subject: [PATCH 2/4] fix: prevent npe when extracting key --- .../common/grouper/KeyAndTopicPartitionRecordGrouper.java | 2 +- .../io/aiven/kafka/connect/common/grouper/KeyRecordGrouper.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/aiven/kafka/connect/common/grouper/KeyAndTopicPartitionRecordGrouper.java b/src/main/java/io/aiven/kafka/connect/common/grouper/KeyAndTopicPartitionRecordGrouper.java index b719a57d3..3a5c8f858 100644 --- a/src/main/java/io/aiven/kafka/connect/common/grouper/KeyAndTopicPartitionRecordGrouper.java +++ b/src/main/java/io/aiven/kafka/connect/common/grouper/KeyAndTopicPartitionRecordGrouper.java @@ -75,7 +75,7 @@ private String generateRecordKey(final SinkRecord record) { final Supplier setKey = () -> { if (record.key() == null) { return "null"; - } else if (record.keySchema().type() == Schema.Type.STRING) { + } else if (record.keySchema() != null && record.keySchema().type() == Schema.Type.STRING) { return (String) record.key(); } else { return record.key().toString(); diff --git a/src/main/java/io/aiven/kafka/connect/common/grouper/KeyRecordGrouper.java b/src/main/java/io/aiven/kafka/connect/common/grouper/KeyRecordGrouper.java index 19a946026..172276d16 100644 --- a/src/main/java/io/aiven/kafka/connect/common/grouper/KeyRecordGrouper.java +++ b/src/main/java/io/aiven/kafka/connect/common/grouper/KeyRecordGrouper.java @@ -72,7 +72,7 @@ private String generateRecordKey(final SinkRecord record) { final Supplier setKey = () -> { if (record.key() == null) { return "null"; - } else if (record.keySchema().type() == Schema.Type.STRING) { + } else if (record.keySchema() != null && record.keySchema().type() == Schema.Type.STRING) { return (String) record.key(); } else { return record.key().toString(); From e73fa2971fc7c24d2b009f12f0e0ecb5bdd4df41 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Fri, 22 Sep 2023 16:54:14 +0300 Subject: [PATCH 3/4] feat: add topic-partition-key record grouper --- .../common/grouper/RecordGrouperFactory.java | 86 ++- ...maBasedTopicPartitionKeyRecordGrouper.java | 130 +++++ .../TopicPartitionKeyRecordGrouper.java | 187 ++++++ .../grouper/RecordGrouperFactoryTest.java | 25 +- ...sedTopicPartitionKeyRecordGrouperTest.java | 162 ++++++ .../TopicPartitionKeyRecordGrouperTest.java | 543 ++++++++++++++++++ 6 files changed, 1103 insertions(+), 30 deletions(-) create mode 100644 src/main/java/io/aiven/kafka/connect/common/grouper/SchemaBasedTopicPartitionKeyRecordGrouper.java create mode 100644 src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionKeyRecordGrouper.java create mode 100644 src/test/java/io/aiven/kafka/connect/common/grouper/SchemaBasedTopicPartitionKeyRecordGrouperTest.java create mode 100644 src/test/java/io/aiven/kafka/connect/common/grouper/TopicPartitionKeyRecordGrouperTest.java diff --git a/src/main/java/io/aiven/kafka/connect/common/grouper/RecordGrouperFactory.java b/src/main/java/io/aiven/kafka/connect/common/grouper/RecordGrouperFactory.java index b2309d8e9..2b131702e 100644 --- a/src/main/java/io/aiven/kafka/connect/common/grouper/RecordGrouperFactory.java +++ b/src/main/java/io/aiven/kafka/connect/common/grouper/RecordGrouperFactory.java @@ -17,10 +17,12 @@ package io.aiven.kafka.connect.common.grouper; import java.util.Collections; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Supplier; import java.util.stream.Collectors; import io.aiven.kafka.connect.common.config.AivenCommonConfig; @@ -36,23 +38,31 @@ public final class RecordGrouperFactory { public static final String KEY_RECORD = KeyRecordGrouper.class.getName(); public static final String TOPIC_PARTITION_RECORD = TopicPartitionRecordGrouper.class.getName(); + public static final String TOPIC_PARTITION_KEY_RECORD = TopicPartitionKeyRecordGrouper.class.getName(); public static final String KEY_TOPIC_PARTITION_RECORD = KeyAndTopicPartitionRecordGrouper.class.getName(); public static final Map>> SUPPORTED_VARIABLES = new LinkedHashMap<>() {{ - put(TOPIC_PARTITION_RECORD, List.of( - Pair.of(FilenameTemplateVariable.TOPIC.name, true), - Pair.of(FilenameTemplateVariable.PARTITION.name, true), - Pair.of(FilenameTemplateVariable.START_OFFSET.name, true), - Pair.of(FilenameTemplateVariable.TIMESTAMP.name, false) - )); - put(KEY_RECORD, List.of(Pair.of(FilenameTemplateVariable.KEY.name, true))); - put(KEY_TOPIC_PARTITION_RECORD, List.of( - Pair.of(FilenameTemplateVariable.KEY.name, true), - Pair.of(FilenameTemplateVariable.TOPIC.name, false), - Pair.of(FilenameTemplateVariable.PARTITION.name, false) - )); - }}; + put(TOPIC_PARTITION_RECORD, List.of( + Pair.of(FilenameTemplateVariable.TOPIC.name, true), + Pair.of(FilenameTemplateVariable.PARTITION.name, true), + Pair.of(FilenameTemplateVariable.START_OFFSET.name, true), + Pair.of(FilenameTemplateVariable.TIMESTAMP.name, false) + )); + put(TOPIC_PARTITION_KEY_RECORD, List.of( + Pair.of(FilenameTemplateVariable.TOPIC.name, true), + Pair.of(FilenameTemplateVariable.PARTITION.name, true), + Pair.of(FilenameTemplateVariable.KEY.name, true), + Pair.of(FilenameTemplateVariable.START_OFFSET.name, true), + Pair.of(FilenameTemplateVariable.TIMESTAMP.name, false) + )); + put(KEY_RECORD, List.of(Pair.of(FilenameTemplateVariable.KEY.name, true))); + put(KEY_TOPIC_PARTITION_RECORD, List.of( + Pair.of(FilenameTemplateVariable.KEY.name, true), + Pair.of(FilenameTemplateVariable.TOPIC.name, false), + Pair.of(FilenameTemplateVariable.PARTITION.name, false) + )); + }}; public static final List ALL_SUPPORTED_VARIABLES = SUPPORTED_VARIABLES.values() @@ -73,6 +83,13 @@ public final class RecordGrouperFactory { .map(Pair::getLeft) .collect(Collectors.toSet()); + + private static final Set TOPIC_PARTITION_KEY_RECORD_REQUIRED_VARS = + SUPPORTED_VARIABLES.get(TOPIC_PARTITION_KEY_RECORD).stream() + .filter(Pair::getRight) + .map(Pair::getLeft) + .collect(Collectors.toSet()); + private static final Set KEY_TOPIC_PARTITION_RECORD_REQUIRED_VARS = SUPPORTED_VARIABLES.get(KEY_TOPIC_PARTITION_RECORD).stream() .filter(Pair::getRight) @@ -85,6 +102,12 @@ public final class RecordGrouperFactory { .map(Pair::getLeft) .collect(Collectors.toSet()); + private static final Set TOPIC_PARTITION_KEY_RECORD_OPT_VARS = + SUPPORTED_VARIABLES.get(TOPIC_PARTITION_KEY_RECORD).stream() + .filter(p -> !p.getRight()) + .map(Pair::getLeft) + .collect(Collectors.toSet()); + private static final Set KEY_TOPIC_PARTITION_RECORD_OPT_VARS = SUPPORTED_VARIABLES.get(KEY_TOPIC_PARTITION_RECORD).stream() .filter(p -> !p.getRight()) @@ -102,13 +125,15 @@ private RecordGrouperFactory() { } public static String resolveRecordGrouperType(final Template template) { - final Set variables = template.variablesSet(); - if (isByKeyRecord(variables)) { + final Supplier> variables = () -> new HashSet<>(template.variablesSet()); + if (isByTopicPartitionKeyRecord(variables.get())) { + return TOPIC_PARTITION_KEY_RECORD; + } else if (isByTopicPartitionRecord(variables.get())) { + return TOPIC_PARTITION_RECORD; + } else if (isByKeyRecord(variables.get())) { return KEY_RECORD; - } else if (isByTopicPartitionKeyRecord(variables)) { + } else if (isByKeyTopicPartitionRecord(variables.get())) { return KEY_TOPIC_PARTITION_RECORD; - } else if (isByTopicPartitionRecord(variables)) { - return TOPIC_PARTITION_RECORD; } else { throw new IllegalArgumentException( String.format( @@ -131,11 +156,19 @@ public static RecordGrouper newRecordGrouper(final AivenCommonConfig config) { config.getMaxRecordsPerFile() != 0 ? config.getMaxRecordsPerFile() : null; - return config.getFormatType() == FormatType.PARQUET || config.getFormatType() == FormatType.AVRO + if (TOPIC_PARTITION_KEY_RECORD.equals(grType)) { + return config.getFormatType() == FormatType.PARQUET || config.getFormatType() == FormatType.AVRO + ? new SchemaBasedTopicPartitionKeyRecordGrouper( + fileNameTemplate, maxRecordsPerFile, config.getFilenameTimestampSource()) + : new TopicPartitionKeyRecordGrouper( + fileNameTemplate, maxRecordsPerFile, config.getFilenameTimestampSource()); + } else { + return config.getFormatType() == FormatType.PARQUET || config.getFormatType() == FormatType.AVRO ? new SchemaBasedTopicPartitionRecordGrouper( - fileNameTemplate, maxRecordsPerFile, config.getFilenameTimestampSource()) + fileNameTemplate, maxRecordsPerFile, config.getFilenameTimestampSource()) : new TopicPartitionRecordGrouper( - fileNameTemplate, maxRecordsPerFile, config.getFilenameTimestampSource()); + fileNameTemplate, maxRecordsPerFile, config.getFilenameTimestampSource()); + } } } @@ -155,6 +188,17 @@ private static boolean isByTopicPartitionRecord(final Set vars) { } private static boolean isByTopicPartitionKeyRecord(final Set vars) { + final Set requiredVars = + Sets.intersection(TOPIC_PARTITION_KEY_RECORD_REQUIRED_VARS, vars) + .immutableCopy(); + vars.removeAll(requiredVars); + final boolean containsRequiredVars = TOPIC_PARTITION_KEY_RECORD_REQUIRED_VARS.equals(requiredVars); + final boolean containsOptionalVars = + vars.isEmpty() || !Collections.disjoint(TOPIC_PARTITION_KEY_RECORD_OPT_VARS, vars); + return containsRequiredVars && containsOptionalVars; + } + + private static boolean isByKeyTopicPartitionRecord(final Set vars) { final Set requiredVars = Sets.intersection(KEY_TOPIC_PARTITION_RECORD_REQUIRED_VARS, vars) .immutableCopy(); diff --git a/src/main/java/io/aiven/kafka/connect/common/grouper/SchemaBasedTopicPartitionKeyRecordGrouper.java b/src/main/java/io/aiven/kafka/connect/common/grouper/SchemaBasedTopicPartitionKeyRecordGrouper.java new file mode 100644 index 000000000..a0dcf5832 --- /dev/null +++ b/src/main/java/io/aiven/kafka/connect/common/grouper/SchemaBasedTopicPartitionKeyRecordGrouper.java @@ -0,0 +1,130 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.common.grouper; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.errors.SchemaProjectorException; +import org.apache.kafka.connect.sink.SinkRecord; + +import io.aiven.kafka.connect.common.config.TimestampSource; +import io.aiven.kafka.connect.common.templating.Template; + +final class SchemaBasedTopicPartitionKeyRecordGrouper extends TopicPartitionKeyRecordGrouper { + + private final SchemaBasedRotator schemaBasedRotator = new SchemaBasedRotator(); + + SchemaBasedTopicPartitionKeyRecordGrouper(final Template filenameTemplate, + final Integer maxRecordsPerFile, + final TimestampSource tsSource) { + super(filenameTemplate, maxRecordsPerFile, tsSource); + } + + @Override + protected String resolveRecordKeyFor(final SinkRecord record) { + if (schemaBasedRotator.rotate(record)) { + return generateNewRecordKey(record); + } else { + return super.resolveRecordKeyFor(record); + } + } + + @Override + public void clear() { + schemaBasedRotator.clear(); + super.clear(); + } + + private static final class SchemaBasedRotator implements Rotator { + + private final Map keyValueSchemas = new HashMap<>(); + + @Override + public boolean rotate(final SinkRecord record) { + if (Objects.isNull(record.valueSchema()) || Objects.isNull(record.keySchema())) { + throw new SchemaProjectorException("Record must have schemas for key and value"); + } + final var key = recordKey(record); + final var tpk = new TopicPartitionKey(new TopicPartition(record.topic(), record.kafkaPartition()), key); + final var keyValueVersion = + keyValueSchemas.computeIfAbsent(tpk, ignored -> new KeyValueSchema( + record.keySchema(), + record.valueSchema())); + final var schemaChanged = + !keyValueVersion.keySchema.equals(record.keySchema()) + || !keyValueVersion.valueSchema.equals(record.valueSchema()); + if (schemaChanged) { + keyValueSchemas.put(tpk, + new KeyValueSchema(record.keySchema(), record.valueSchema()) + ); + } + return schemaChanged; + } + + private String recordKey(final SinkRecord record) { + final String key; + if (record.key() == null) { + key = "null"; + } else if (record.keySchema().type() == Schema.Type.STRING) { + key = (String) record.key(); + } else { + key = record.key().toString(); + } + return key; + } + + + private static class KeyValueSchema { + + final Schema keySchema; + + final Schema valueSchema; + + KeyValueSchema(final Schema keySchema, final Schema valueSchema) { + this.keySchema = keySchema; + this.valueSchema = valueSchema; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final var that = (KeyValueSchema) o; + return keySchema.equals(that.keySchema) && valueSchema.equals(that.valueSchema); + } + + @Override + public int hashCode() { + return Objects.hash(keySchema, valueSchema); + } + } + + void clear() { + keyValueSchemas.clear(); + } + + } + +} diff --git a/src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionKeyRecordGrouper.java b/src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionKeyRecordGrouper.java new file mode 100644 index 000000000..8f975e74e --- /dev/null +++ b/src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionKeyRecordGrouper.java @@ -0,0 +1,187 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.common.grouper; + +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.function.Function; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.sink.SinkRecord; + +import io.aiven.kafka.connect.common.config.FilenameTemplateVariable; +import io.aiven.kafka.connect.common.config.TimestampSource; +import io.aiven.kafka.connect.common.templating.Template; +import io.aiven.kafka.connect.common.templating.VariableTemplatePart.Parameter; + +public class TopicPartitionKeyRecordGrouper implements RecordGrouper { + + private static final Map TIMESTAMP_FORMATTERS = + Map.of( + "yyyy", DateTimeFormatter.ofPattern("yyyy"), + "MM", DateTimeFormatter.ofPattern("MM"), + "dd", DateTimeFormatter.ofPattern("dd"), + "HH", DateTimeFormatter.ofPattern("HH") + ); + + private final Template filenameTemplate; + + private final Map currentHeadRecords = new HashMap<>(); + + private final Map> fileBuffers = new HashMap<>(); + + private final Function> setTimestampBasedOnRecord; + + private final Rotator> rotator; + + TopicPartitionKeyRecordGrouper(final Template filenameTemplate, + final Integer maxRecordsPerFile, + final TimestampSource tsSource) { + Objects.requireNonNull(filenameTemplate, "filenameTemplate cannot be null"); + Objects.requireNonNull(tsSource, "tsSource cannot be null"); + this.filenameTemplate = filenameTemplate; + + this.setTimestampBasedOnRecord = + record -> parameter -> tsSource.time(record).format(TIMESTAMP_FORMATTERS.get(parameter.value())); + + this.rotator = buffer -> { + final var unlimited = maxRecordsPerFile == null; + if (unlimited) { + return false; + } else { + return buffer == null || buffer.size() >= maxRecordsPerFile; + } + }; + } + + @Override + public void put(final SinkRecord record) { + Objects.requireNonNull(record, "record cannot be null"); + final String recordKey = resolveRecordKeyFor(record); + fileBuffers.computeIfAbsent(recordKey, ignored -> new ArrayList<>()).add(record); + } + + protected String resolveRecordKeyFor(final SinkRecord record) { + final var key = recordKey(record); + + final TopicPartitionKey tpk = + new TopicPartitionKey(new TopicPartition(record.topic(), record.kafkaPartition()), key); + final SinkRecord currentHeadRecord = currentHeadRecords.computeIfAbsent(tpk, ignored -> record); + String objectKey = generateObjectKey(tpk, currentHeadRecord, record); + if (rotator.rotate(fileBuffers.get(objectKey))) { + // Create new file using this record as the head record. + objectKey = generateNewRecordKey(record); + } + return objectKey; + } + + private String recordKey(final SinkRecord record) { + final String key; + if (record.key() == null) { + key = "null"; + } else if (record.keySchema() != null && record.keySchema().type() == Schema.Type.STRING) { + key = (String) record.key(); + } else { + key = record.key().toString(); + } + return key; + } + + public String generateObjectKey( + final TopicPartitionKey tpk, + final SinkRecord headRecord, + final SinkRecord currentRecord) { + final Function setKafkaOffset = + usePaddingParameter -> usePaddingParameter.asBoolean() + ? String.format("%020d", headRecord.kafkaOffset()) + : Long.toString(headRecord.kafkaOffset()); + final Function setKafkaPartition = + usePaddingParameter -> usePaddingParameter.asBoolean() + ? String.format("%010d", headRecord.kafkaPartition()) + : Long.toString(headRecord.kafkaPartition()); + + + return filenameTemplate.instance() + .bindVariable(FilenameTemplateVariable.TOPIC.name, tpk.topicPartition::topic) + .bindVariable( + FilenameTemplateVariable.PARTITION.name, + setKafkaPartition) + .bindVariable(FilenameTemplateVariable.KEY.name, tpk::key) + .bindVariable( + FilenameTemplateVariable.START_OFFSET.name, + setKafkaOffset) + .bindVariable( + FilenameTemplateVariable.TIMESTAMP.name, + setTimestampBasedOnRecord.apply(currentRecord)) + .render(); + } + + protected String generateNewRecordKey(final SinkRecord record) { + final var key = recordKey(record); + final var tpk = new TopicPartitionKey(new TopicPartition(record.topic(), record.kafkaPartition()), key); + currentHeadRecords.put(tpk, record); + return generateObjectKey(tpk, record, record); + } + + @Override + public void clear() { + currentHeadRecords.clear(); + fileBuffers.clear(); + } + + @Override + public Map> records() { + return Collections.unmodifiableMap(fileBuffers); + } + + public static class TopicPartitionKey { + final TopicPartition topicPartition; + final String key; + + TopicPartitionKey(final TopicPartition topicPartition, final String key) { + this.topicPartition = topicPartition; + this.key = key; + } + + public String key() { + return key; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final TopicPartitionKey that = (TopicPartitionKey) o; + return Objects.equals(topicPartition, that.topicPartition) && Objects.equals(key, that.key); + } + + @Override + public int hashCode() { + return Objects.hash(topicPartition, key); + } + } +} diff --git a/src/test/java/io/aiven/kafka/connect/common/grouper/RecordGrouperFactoryTest.java b/src/test/java/io/aiven/kafka/connect/common/grouper/RecordGrouperFactoryTest.java index b867736e0..66ca427e1 100644 --- a/src/test/java/io/aiven/kafka/connect/common/grouper/RecordGrouperFactoryTest.java +++ b/src/test/java/io/aiven/kafka/connect/common/grouper/RecordGrouperFactoryTest.java @@ -24,6 +24,21 @@ final class RecordGrouperFactoryTest { + @Test + void topicPartition() { + final Template filenameTemplate = Template.of("{{topic}}/{{partition}}/{{start_offset}}"); + final String grType = RecordGrouperFactory.resolveRecordGrouperType(filenameTemplate); + System.out.println(grType); + assertThat(RecordGrouperFactory.TOPIC_PARTITION_RECORD).isEqualTo(grType); + } + + @Test + void topicPartitionAndKey() { + final Template filenameTemplate = Template.of("{{topic}}/{{partition}}/{{key}}/{{start_offset}}"); + final String grType = RecordGrouperFactory.resolveRecordGrouperType(filenameTemplate); + System.out.println(grType); + assertThat(RecordGrouperFactory.TOPIC_PARTITION_KEY_RECORD).isEqualTo(grType); + } @Test void keyOnly() { @@ -33,17 +48,9 @@ void keyOnly() { } @Test - void topicPartitionAndKey() { + void keyAndTopicPartition() { final Template filenameTemplate = Template.of("{{topic}}/{{partition}}/{{key}}"); final String grType = RecordGrouperFactory.resolveRecordGrouperType(filenameTemplate); assertThat(RecordGrouperFactory.KEY_TOPIC_PARTITION_RECORD).isEqualTo(grType); } - - @Test - void topicPartition() { - final Template filenameTemplate = Template.of("{{topic}}/{{partition}}/{{start_offset}}"); - final String grType = RecordGrouperFactory.resolveRecordGrouperType(filenameTemplate); - System.out.println(grType); - assertThat(RecordGrouperFactory.TOPIC_PARTITION_RECORD).isEqualTo(grType); - } } diff --git a/src/test/java/io/aiven/kafka/connect/common/grouper/SchemaBasedTopicPartitionKeyRecordGrouperTest.java b/src/test/java/io/aiven/kafka/connect/common/grouper/SchemaBasedTopicPartitionKeyRecordGrouperTest.java new file mode 100644 index 000000000..1761f31aa --- /dev/null +++ b/src/test/java/io/aiven/kafka/connect/common/grouper/SchemaBasedTopicPartitionKeyRecordGrouperTest.java @@ -0,0 +1,162 @@ +/* + * Copyright 2021 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.common.grouper; + +import java.util.List; +import java.util.Map; + +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.sink.SinkRecord; + +import io.aiven.kafka.connect.common.config.TimestampSource; +import io.aiven.kafka.connect.common.templating.Template; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.entry; +import static org.assertj.core.util.Lists.list; + +final class SchemaBasedTopicPartitionKeyRecordGrouperTest { + + static final SinkRecord KT0P0R0 = new SinkRecord( + "topic0", 0, + SchemaBuilder.string().optional().version(1).build(), null, + SchemaBuilder.string().optional().version(1).build(), null, 0); + static final SinkRecord KT0P0R1 = new SinkRecord( + "topic0", 0, + SchemaBuilder.string().optional().version(1).build(), "some_key", + SchemaBuilder.string().optional().version(1).build(), null, 1); + static final SinkRecord KT0P0R2 = new SinkRecord( + "topic0", 0, + SchemaBuilder.string().optional().version(1).build(), null, + SchemaBuilder.string().optional().version(1).build(), null, 2); + static final SinkRecord KT0P0R3 = new SinkRecord( + "topic0", 0, + SchemaBuilder.string().optional().version(1).build(), null, + SchemaBuilder.string().optional().version(1).build(), null, 3); + static final SinkRecord KT0P0R4 = new SinkRecord( + "topic0", 0, + SchemaBuilder.string().optional().version(2).build(), "some_key", + SchemaBuilder.string().optional().version(1).build(), null, 4); + static final SinkRecord KT0P0R5 = new SinkRecord( + "topic0", 0, + SchemaBuilder.string().optional().version(2).build(), "some_key", + SchemaBuilder.string().optional().version(1).build(), null, 5); + + static final SinkRecord RT0P1R0 = new SinkRecord( + "topic0", 1, + SchemaBuilder.string().optional().version(1).build(), null, + SchemaBuilder.string().optional().version(1).build(), null, 10); + static final SinkRecord RT0P1R1 = new SinkRecord( + "topic0", 1, + SchemaBuilder.string().optional().version(1).build(), null, + SchemaBuilder.string().optional().version(1).build(), null, 11); + static final SinkRecord RT0P1R2 = new SinkRecord( + "topic0", 1, + SchemaBuilder.string().optional().version(1).build(), null, + SchemaBuilder.string().optional().version(2).build(), null, 12); + static final SinkRecord RT0P1R3 = new SinkRecord( + "topic0", 1, + SchemaBuilder.string().optional().version(1).build(), "some_key", + SchemaBuilder.string().optional().version(2).build(), null, 13); + + static final SinkRecord KRT1P1R0 = new SinkRecord( + "topic1", 0, + SchemaBuilder.string().optional().version(1).build(), "some_key", + SchemaBuilder.string().optional().version(1).build(), null, 1000); + static final SinkRecord KRT1P1R1 = new SinkRecord( + "topic1", 0, + SchemaBuilder.string().optional().version(1).build(), null, + SchemaBuilder.string().optional().version(1).build(), null, 1001); + static final SinkRecord KRT1P1R2 = new SinkRecord( + "topic1", 0, + SchemaBuilder.string().optional().version(1).build(), null, + SchemaBuilder.string().optional().version(1).build(), null, 1002); + static final SinkRecord KRT1P1R3 = new SinkRecord( + "topic1", 0, + SchemaBuilder.string().optional().version(2).build(), "some_key", + SchemaBuilder.string().optional().version(2).build(), null, 1003); + + static final TimestampSource DEFAULT_TS_SOURCE = + TimestampSource.of(TimestampSource.Type.WALLCLOCK); + + @Test + void rotateOnKeySchemaChanged() { + final Template filenameTemplate = Template.of("{{topic}}-{{partition}}-{{key}}-{{start_offset}}"); + final RecordGrouper grouper = + new SchemaBasedTopicPartitionKeyRecordGrouper( + filenameTemplate, null, DEFAULT_TS_SOURCE); + + grouper.put(KT0P0R0); + grouper.put(KT0P0R1); + grouper.put(KT0P0R2); + grouper.put(KT0P0R3); + grouper.put(KT0P0R4); + grouper.put(KT0P0R5); + + final Map> records = grouper.records(); + assertThat(records) + .containsOnly( + entry("topic0-0-null-0", list(KT0P0R0, KT0P0R2, KT0P0R3)), + entry("topic0-0-some_key-1", list(KT0P0R1)), + entry("topic0-0-some_key-4", list(KT0P0R4, KT0P0R5)) + ); + } + + @Test + void rotateOnValueSchemaChanged() { + final Template filenameTemplate = Template.of("{{topic}}-{{partition}}-{{key}}-{{start_offset}}"); + final RecordGrouper grouper = + new SchemaBasedTopicPartitionKeyRecordGrouper( + filenameTemplate, null, DEFAULT_TS_SOURCE); + + grouper.put(RT0P1R0); + grouper.put(RT0P1R1); + grouper.put(RT0P1R2); + grouper.put(RT0P1R3); + + final Map> records = grouper.records(); + assertThat(records) + .containsOnly( + entry("topic0-1-null-10", list(RT0P1R0, RT0P1R1)), + entry("topic0-1-null-12", list(RT0P1R2)), + entry("topic0-1-some_key-13", list(RT0P1R3)) + ); + } + + @Test + void rotateOnValueSchemaChangedAndButchSize() { + final Template filenameTemplate = Template.of("{{topic}}-{{partition}}-{{key}}-{{start_offset}}"); + final RecordGrouper grouper = + new SchemaBasedTopicPartitionKeyRecordGrouper( + filenameTemplate, 2, DEFAULT_TS_SOURCE); + + grouper.put(KRT1P1R0); + grouper.put(KRT1P1R1); + grouper.put(KRT1P1R2); + grouper.put(KRT1P1R3); + + final var records = grouper.records(); + assertThat(records) + .containsOnly( + entry("topic1-0-some_key-1000", list(KRT1P1R0)), + entry("topic1-0-null-1001", list(KRT1P1R1, KRT1P1R2)), + entry("topic1-0-some_key-1003", list(KRT1P1R3)) + ); + } +} diff --git a/src/test/java/io/aiven/kafka/connect/common/grouper/TopicPartitionKeyRecordGrouperTest.java b/src/test/java/io/aiven/kafka/connect/common/grouper/TopicPartitionKeyRecordGrouperTest.java new file mode 100644 index 000000000..3393882c5 --- /dev/null +++ b/src/test/java/io/aiven/kafka/connect/common/grouper/TopicPartitionKeyRecordGrouperTest.java @@ -0,0 +1,543 @@ +/* + * Copyright 2021 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.common.grouper; + +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.time.temporal.TemporalAdjusters; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.sink.SinkRecord; + +import io.aiven.kafka.connect.common.config.TimestampSource; +import io.aiven.kafka.connect.common.templating.Template; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.NullSource; +import org.junit.jupiter.params.provider.ValueSource; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.entry; +import static org.assertj.core.util.Lists.list; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +final class TopicPartitionKeyRecordGrouperTest { + + private static final SinkRecord T0P0R0 = new SinkRecord( + "topic0", 0, Schema.OPTIONAL_STRING_SCHEMA, null, null, null, 0); + private static final SinkRecord T0P0R1 = new SinkRecord( + "topic0", 0, Schema.OPTIONAL_STRING_SCHEMA, "some_key", null, null, 1); + private static final SinkRecord T0P0R2 = new SinkRecord( + "topic0", 0, Schema.OPTIONAL_STRING_SCHEMA, null, null, null, 2); + private static final SinkRecord T0P0R3 = new SinkRecord( + "topic0", 0, Schema.OPTIONAL_STRING_SCHEMA, null, null, null, 3); + private static final SinkRecord T0P0R4 = new SinkRecord( + "topic0", 0, Schema.OPTIONAL_STRING_SCHEMA, "some_key", null, null, 4); + private static final SinkRecord T0P0R5 = new SinkRecord( + "topic0", 0, Schema.OPTIONAL_STRING_SCHEMA, "some_key", null, null, 5); + + private static final SinkRecord T0P1R0 = new SinkRecord( + "topic0", 1, Schema.OPTIONAL_STRING_SCHEMA, null, null, null, 10); + private static final SinkRecord T0P1R1 = new SinkRecord( + "topic0", 1, Schema.OPTIONAL_STRING_SCHEMA, null, null, null, 11); + private static final SinkRecord T0P1R2 = new SinkRecord( + "topic0", 1, Schema.OPTIONAL_STRING_SCHEMA, null, null, null, 12); + private static final SinkRecord T0P1R3 = new SinkRecord( + "topic0", 1, Schema.OPTIONAL_STRING_SCHEMA, "some_key", null, null, 13); + + private static final SinkRecord T1P1R0 = new SinkRecord( + "topic1", 1, Schema.OPTIONAL_STRING_SCHEMA, "some_key", null, null, 1000); + private static final SinkRecord T1P1R1 = new SinkRecord( + "topic1", 1, Schema.OPTIONAL_STRING_SCHEMA, null, null, null, 1001); + private static final SinkRecord T1P1R2 = new SinkRecord( + "topic1", 1, Schema.OPTIONAL_STRING_SCHEMA, null, null, null, 1002); + private static final SinkRecord T1P1R3 = new SinkRecord( + "topic1", 1, Schema.OPTIONAL_STRING_SCHEMA, "some_key", null, null, 1003); + + private static final SinkRecord T2P1R0 = new SinkRecord( + "topic2", 1, Schema.OPTIONAL_STRING_SCHEMA, null, null, null, 2000, 1635375104000L, + TimestampType.CREATE_TIME); + private static final SinkRecord T2P1R1 = new SinkRecord( + "topic2", 1, Schema.OPTIONAL_STRING_SCHEMA, null, null, null, 2001, 1635461504000L, + TimestampType.CREATE_TIME); + private static final SinkRecord T2P1R2 = new SinkRecord( + "topic2", 1, Schema.OPTIONAL_STRING_SCHEMA, null, null, null, 2002, 1635547904000L, + TimestampType.CREATE_TIME); + private static final SinkRecord T2P1R3 = new SinkRecord( + "topic2", 1, Schema.OPTIONAL_STRING_SCHEMA, null, null, null, 2003, 1635547906000L, + TimestampType.CREATE_TIME); + + private static final TimestampSource DEFAULT_TS_SOURCE = + TimestampSource.of(TimestampSource.Type.WALLCLOCK); + + @Test + void withoutNecessaryParameters() { + assertThatThrownBy(() -> new TopicPartitionKeyRecordGrouper(null, 0, DEFAULT_TS_SOURCE)) + .isInstanceOf(NullPointerException.class) + .hasMessage("filenameTemplate cannot be null"); + + assertThatThrownBy(() -> new TopicPartitionKeyRecordGrouper(Template.of("{{topic}}"), 0, null)) + .isInstanceOf(NullPointerException.class) + .hasMessage("tsSource cannot be null"); + } + + @ParameterizedTest + @NullSource + @ValueSource(ints = 10) + void empty(final Integer maxRecordsPerFile) { + final Template filenameTemplate = Template.of("{{topic}}-{{partition}}-{{key}}-{{start_offset}}"); + final TopicPartitionKeyRecordGrouper grouper = + new TopicPartitionKeyRecordGrouper(filenameTemplate, maxRecordsPerFile, DEFAULT_TS_SOURCE); + assertThat(grouper.records()).isEmpty(); + } + + @Test + void unlimited() { + final Template filenameTemplate = Template.of("{{topic}}-{{partition}}-{{key}}-{{start_offset}}"); + final TopicPartitionKeyRecordGrouper grouper = + new TopicPartitionKeyRecordGrouper( + filenameTemplate, null, DEFAULT_TS_SOURCE); + + grouper.put(T0P1R0); + grouper.put(T0P0R0); + grouper.put(T0P1R1); + grouper.put(T0P0R1); + grouper.put(T0P0R2); + grouper.put(T0P1R2); + grouper.put(T0P0R3); + grouper.put(T1P1R0); + grouper.put(T1P1R1); + grouper.put(T0P0R4); + grouper.put(T1P1R2); + grouper.put(T1P1R3); + grouper.put(T0P0R5); + grouper.put(T0P1R3); + + final Map> records = grouper.records(); + assertThat(records) + .containsOnly( + entry("topic0-0-null-0", list(T0P0R0, T0P0R2, T0P0R3)), + entry("topic0-0-some_key-1", list(T0P0R1, T0P0R4, T0P0R5)), + entry("topic0-1-null-10", list(T0P1R0, T0P1R1, T0P1R2)), + entry("topic0-1-some_key-13", list(T0P1R3)), + entry("topic1-1-null-1001", list(T1P1R1, T1P1R2)), + entry("topic1-1-some_key-1000", list(T1P1R0, T1P1R3)) + ); + } + + @Test + void limited() { + final Template filenameTemplate = Template.of("{{topic}}-{{partition}}-{{key}}-{{start_offset}}"); + final TopicPartitionKeyRecordGrouper grouper = + new TopicPartitionKeyRecordGrouper( + filenameTemplate, 2, DEFAULT_TS_SOURCE); + + grouper.put(T0P1R0); + grouper.put(T0P0R0); + grouper.put(T0P1R1); + grouper.put(T0P0R1); + grouper.put(T0P0R2); + grouper.put(T0P1R2); + grouper.put(T0P0R3); + grouper.put(T1P1R0); + grouper.put(T1P1R1); + grouper.put(T0P0R4); + grouper.put(T1P1R2); + grouper.put(T1P1R3); + grouper.put(T0P0R5); + grouper.put(T0P1R3); + + final Map> records = grouper.records(); + assertThat(records) + .containsOnly( + entry("topic0-0-null-0", list(T0P0R0, T0P0R2)), + entry("topic0-0-null-3", list(T0P0R3)), + entry("topic0-0-some_key-1", list(T0P0R1, T0P0R4)), + entry("topic0-0-some_key-5", list(T0P0R5)), + entry("topic0-1-null-10", list(T0P1R0, T0P1R1)), + entry("topic0-1-null-12", list(T0P1R2)), + entry("topic0-1-some_key-13", list(T0P1R3)), + entry("topic1-1-null-1001", list(T1P1R1, T1P1R2)), + entry("topic1-1-some_key-1000", list(T1P1R0, T1P1R3)) + ); + } + + @Test + void clear() { + final Template filenameTemplate = Template.of("{{topic}}-{{partition}}-{{key}}-{{start_offset}}"); + final TopicPartitionKeyRecordGrouper grouper = + new TopicPartitionKeyRecordGrouper( + filenameTemplate, null, DEFAULT_TS_SOURCE); + + grouper.put(T0P1R0); + grouper.put(T0P0R0); + grouper.put(T0P1R1); + grouper.put(T0P0R1); + grouper.put(T0P0R2); + grouper.put(T0P1R2); + grouper.put(T0P0R3); + grouper.put(T0P1R3); + + grouper.clear(); + assertThat(grouper.records()).isEmpty(); + + grouper.put(T1P1R0); + grouper.put(T1P1R1); + grouper.put(T0P0R4); + grouper.put(T1P1R2); + grouper.put(T1P1R3); + grouper.put(T0P0R5); + + final Map> records = grouper.records(); + + assertThat(records) + .containsExactly( + entry("topic0-0-some_key-4", list(T0P0R4, T0P0R5)), + entry("topic1-1-some_key-1000", list(T1P1R0, T1P1R3)), + entry("topic1-1-null-1001", list(T1P1R1, T1P1R2)) + ); + } + + @Test + void setZeroPaddingForKafkaOffset() { + final Template filenameTemplate = Template.of("{{topic}}-{{partition}}-{{key}}-{{start_offset:padding=true}}"); + final TopicPartitionKeyRecordGrouper grouper = + new TopicPartitionKeyRecordGrouper( + filenameTemplate, null, DEFAULT_TS_SOURCE); + + grouper.put(T1P1R0); + grouper.put(T1P1R1); + grouper.put(T0P0R4); + grouper.put(T1P1R2); + grouper.put(T1P1R3); + grouper.put(T0P0R5); + + final Map> records = grouper.records(); + assertThat(records) + .containsOnly( + entry("topic0-0-some_key-00000000000000000004", list(T0P0R4, T0P0R5)), + entry("topic1-1-null-00000000000000001001", list(T1P1R1, T1P1R2)), + entry("topic1-1-some_key-00000000000000001000", list(T1P1R0, T1P1R3)) + ); + } + + @Test + void setZeroPaddingForKafkaPartition() { + final Template filenameTemplate = Template.of("{{topic}}-{{partition:padding=true}}-{{key}}-{{start_offset}}"); + final TopicPartitionKeyRecordGrouper grouper = + new TopicPartitionKeyRecordGrouper( + filenameTemplate, null, DEFAULT_TS_SOURCE); + + grouper.put(T1P1R0); + grouper.put(T1P1R1); + grouper.put(T0P0R4); + grouper.put(T1P1R2); + grouper.put(T1P1R3); + grouper.put(T0P0R5); + + final Map> records = grouper.records(); + assertThat(records) + .containsOnly( + entry("topic0-0000000000-some_key-4", list(T0P0R4, T0P0R5)), + entry("topic1-0000000001-some_key-1000", list(T1P1R0, T1P1R3)), + entry("topic1-0000000001-null-1001", list(T1P1R1, T1P1R2)) + ); + } + + @Test + void addTimeUnitsToTheFileNameUsingWallclockTimestampSource() { + final Template filenameTemplate = + Template.of( + "{{topic}}-" + + "{{partition}}-" + + "{{key}}-" + + "{{start_offset}}-" + + "{{timestamp:unit=yyyy}}" + + "{{timestamp:unit=MM}}" + + "{{timestamp:unit=dd}}" + ); + final ZonedDateTime t = TimestampSource.of(TimestampSource.Type.WALLCLOCK).time(null); + final String expectedTs = + t.format(DateTimeFormatter.ofPattern("yyyy")) + + t.format(DateTimeFormatter.ofPattern("MM")) + + t.format(DateTimeFormatter.ofPattern("dd")); + + final TopicPartitionKeyRecordGrouper grouper = + new TopicPartitionKeyRecordGrouper( + filenameTemplate, null, TimestampSource.of(TimestampSource.Type.WALLCLOCK)); + + grouper.put(T1P1R0); + grouper.put(T1P1R1); + grouper.put(T0P0R4); + grouper.put(T1P1R2); + grouper.put(T1P1R3); + grouper.put(T0P0R5); + + final Map> records = grouper.records(); + + assertThat(records) + .containsOnly( + entry("topic0-0-some_key-4-" + expectedTs, list(T0P0R4, T0P0R5)), + entry("topic1-1-some_key-1000-" + expectedTs, list(T1P1R0, T1P1R3)), + entry("topic1-1-null-1001-" + expectedTs, list(T1P1R1, T1P1R2)) + ); + } + + @Test + void rotateKeysHourly() { + final Template filenameTemplate = + Template.of( + "{{topic}}-" + + "{{partition}}-" + + "{{key}}-" + + "{{start_offset}}-" + + "{{timestamp:unit=yyyy}}" + + "{{timestamp:unit=MM}}" + + "{{timestamp:unit=dd}}" + + "{{timestamp:unit=HH}}" + ); + final TimestampSource timestampSourceMock = mock(TimestampSource.class); + + final ZonedDateTime firstHourTime = ZonedDateTime.now(); + final ZonedDateTime secondHourTime = firstHourTime.plusHours(1); + final String firstHourTs = + firstHourTime.format(DateTimeFormatter.ofPattern("yyyy")) + + firstHourTime.format(DateTimeFormatter.ofPattern("MM")) + + firstHourTime.format(DateTimeFormatter.ofPattern("dd")) + + firstHourTime.format(DateTimeFormatter.ofPattern("HH")); + final String secondHourTs = + secondHourTime.format(DateTimeFormatter.ofPattern("yyyy")) + + secondHourTime.format(DateTimeFormatter.ofPattern("MM")) + + secondHourTime.format(DateTimeFormatter.ofPattern("dd")) + + secondHourTime.format(DateTimeFormatter.ofPattern("HH")); + + when(timestampSourceMock.time(any())).thenReturn(firstHourTime); + + final TopicPartitionKeyRecordGrouper grouper = + new TopicPartitionKeyRecordGrouper( + filenameTemplate, null, timestampSourceMock); + + grouper.put(T0P0R1); + grouper.put(T0P0R2); + grouper.put(T0P0R3); + + when(timestampSourceMock.time(any())).thenReturn(secondHourTime); + + grouper.put(T0P0R4); + grouper.put(T0P0R5); + + final Map> records = grouper.records(); + + assertThat(records) + .containsOnly( + entry("topic0-0-some_key-1-" + firstHourTs, list(T0P0R1)), + entry("topic0-0-null-2-" + firstHourTs, list(T0P0R2, T0P0R3)), + entry("topic0-0-some_key-1-" + secondHourTs, list(T0P0R4, T0P0R5)) + ); + } + + @Test + void rotateKeysDaily() { + final Template filenameTemplate = + Template.of( + "{{topic}}-" + + "{{partition}}-" + + "{{key}}-" + + "{{start_offset}}-" + + "{{timestamp:unit=yyyy}}" + + "{{timestamp:unit=MM}}" + + "{{timestamp:unit=dd}}" + ); + final TimestampSource timestampSourceMock = mock(TimestampSource.class); + + final ZonedDateTime firstDayTime = ZonedDateTime.now(); + final ZonedDateTime secondDayTime = firstDayTime.plusDays(1); + final String firstDayTs = + firstDayTime.format(DateTimeFormatter.ofPattern("yyyy")) + + firstDayTime.format(DateTimeFormatter.ofPattern("MM")) + + firstDayTime.format(DateTimeFormatter.ofPattern("dd")); + final String secondDayTs = + secondDayTime.format(DateTimeFormatter.ofPattern("yyyy")) + + secondDayTime.format(DateTimeFormatter.ofPattern("MM")) + + secondDayTime.format(DateTimeFormatter.ofPattern("dd")); + + when(timestampSourceMock.time(any())).thenReturn(firstDayTime); + final TopicPartitionKeyRecordGrouper grouper = + new TopicPartitionKeyRecordGrouper( + filenameTemplate, null, timestampSourceMock); + + grouper.put(T0P1R0); + grouper.put(T0P1R1); + grouper.put(T0P1R2); + + when(timestampSourceMock.time(any())).thenReturn(secondDayTime); + + grouper.put(T0P1R3); + + final Map> records = grouper.records(); + + assertThat(records) + .containsOnly( + entry("topic0-1-null-10-" + firstDayTs, list(T0P1R0, T0P1R1, T0P1R2)), + entry("topic0-1-some_key-13-" + secondDayTs, list(T0P1R3)) + ); + } + + @Test + void rotateKeysMonthly() { + final Template filenameTemplate = + Template.of( + "{{topic}}-" + + "{{partition}}-" + + "{{key}}-" + + "{{start_offset}}-" + + "{{timestamp:unit=yyyy}}" + + "{{timestamp:unit=MM}}" + ); + final TimestampSource timestampSourceMock = mock(TimestampSource.class); + + final ZonedDateTime firstMonthTime = ZonedDateTime.now().with(TemporalAdjusters.lastDayOfMonth()); + final ZonedDateTime secondMonth = firstMonthTime.plusDays(1); + final String firstMonthTs = + firstMonthTime.format(DateTimeFormatter.ofPattern("yyyy")) + + firstMonthTime.format(DateTimeFormatter.ofPattern("MM")); + final String secondMonthTs = + secondMonth.format(DateTimeFormatter.ofPattern("yyyy")) + + secondMonth.format(DateTimeFormatter.ofPattern("MM")); + + when(timestampSourceMock.time(any())).thenReturn(firstMonthTime); + final TopicPartitionKeyRecordGrouper grouper = + new TopicPartitionKeyRecordGrouper( + filenameTemplate, null, timestampSourceMock); + + grouper.put(T0P1R0); + grouper.put(T0P1R1); + grouper.put(T0P1R3); + + when(timestampSourceMock.time(any())).thenReturn(secondMonth); + + grouper.put(T0P1R2); + + final Map> records = grouper.records(); + + assertThat(records) + .containsOnly( + entry("topic0-1-null-10-" + firstMonthTs, list(T0P1R0, T0P1R1)), + entry("topic0-1-some_key-13-" + firstMonthTs, list(T0P1R3)), + entry("topic0-1-null-10-" + secondMonthTs, list(T0P1R2)) + ); + } + + @Test + void rotateKeysYearly() { + final Template filenameTemplate = + Template.of( + "{{topic}}-" + + "{{partition}}-" + + "{{key}}-" + + "{{start_offset}}-" + + "{{timestamp:unit=yyyy}}" + + "{{timestamp:unit=MM}}" + ); + final TimestampSource timestampSourceMock = mock(TimestampSource.class); + + final ZonedDateTime firstYearTime = ZonedDateTime.now(); + final ZonedDateTime secondYearMonth = firstYearTime.plusYears(1); + final String firstYearTs = + firstYearTime.format(DateTimeFormatter.ofPattern("yyyy")) + + firstYearTime.format(DateTimeFormatter.ofPattern("MM")); + final String secondYearTs = + secondYearMonth.format(DateTimeFormatter.ofPattern("yyyy")) + + secondYearMonth.format(DateTimeFormatter.ofPattern("MM")); + + when(timestampSourceMock.time(any())).thenReturn(firstYearTime); + final TopicPartitionKeyRecordGrouper grouper = + new TopicPartitionKeyRecordGrouper( + filenameTemplate, null, timestampSourceMock); + + grouper.put(T0P1R0); + grouper.put(T0P1R1); + grouper.put(T0P1R2); + + when(timestampSourceMock.time(any())).thenReturn(secondYearMonth); + + grouper.put(T0P1R3); + + final Map> records = grouper.records(); + + assertThat(records) + .containsOnly( + entry("topic0-1-null-10-" + firstYearTs, list(T0P1R0, T0P1R1, T0P1R2)), + entry("topic0-1-some_key-13-" + secondYearTs, list(T0P1R3)) + ); + } + + @Test + void rotateDailyWithEventTimestampSource() { + final Template filenameTemplate = + Template.of( + "{{topic}}-" + + "{{partition}}-" + + "{{key}}-" + + "{{start_offset}}-" + + "{{timestamp:unit=yyyy}}" + + "{{timestamp:unit=MM}}" + + "{{timestamp:unit=dd}}" + ); + final ZonedDateTime t0 = TimestampSource.of(TimestampSource.Type.EVENT).time(T2P1R0); + final ZonedDateTime t1 = TimestampSource.of(TimestampSource.Type.EVENT).time(T2P1R1); + final ZonedDateTime t2 = TimestampSource.of(TimestampSource.Type.EVENT).time(T2P1R2); + + final String expectedTs0 = + t0.format(DateTimeFormatter.ofPattern("yyyy")) + + t0.format(DateTimeFormatter.ofPattern("MM")) + + t0.format(DateTimeFormatter.ofPattern("dd")); + final String expectedTs1 = + t1.format(DateTimeFormatter.ofPattern("yyyy")) + + t1.format(DateTimeFormatter.ofPattern("MM")) + + t1.format(DateTimeFormatter.ofPattern("dd")); + final String expectedTs2 = + t2.format(DateTimeFormatter.ofPattern("yyyy")) + + t2.format(DateTimeFormatter.ofPattern("MM")) + + t2.format(DateTimeFormatter.ofPattern("dd")); + + final TopicPartitionKeyRecordGrouper grouper = + new TopicPartitionKeyRecordGrouper( + filenameTemplate, null, TimestampSource.of(TimestampSource.Type.EVENT)); + + grouper.put(T2P1R0); + grouper.put(T2P1R1); + grouper.put(T2P1R2); + grouper.put(T2P1R3); + + final Map> records = grouper.records(); + + assertThat(records) + .containsOnly( + entry("topic2-1-null-2000-" + expectedTs0, list(T2P1R0)), + entry("topic2-1-null-2000-" + expectedTs1, list(T2P1R1)), + entry("topic2-1-null-2000-" + expectedTs2, list(T2P1R2, T2P1R3)) + ); + } +} From c21025736f002eb7df4bfd011e0dcb4bfc7319fe Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Wed, 4 Oct 2023 12:09:03 +0300 Subject: [PATCH 4/4] fixup! feat: add topic-partition-key record grouper --- .../kafka/connect/common/grouper/RecordGrouperFactoryTest.java | 2 -- .../grouper/SchemaBasedTopicPartitionKeyRecordGrouperTest.java | 2 +- .../common/grouper/TopicPartitionKeyRecordGrouperTest.java | 2 +- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/test/java/io/aiven/kafka/connect/common/grouper/RecordGrouperFactoryTest.java b/src/test/java/io/aiven/kafka/connect/common/grouper/RecordGrouperFactoryTest.java index 66ca427e1..a4e9c7062 100644 --- a/src/test/java/io/aiven/kafka/connect/common/grouper/RecordGrouperFactoryTest.java +++ b/src/test/java/io/aiven/kafka/connect/common/grouper/RecordGrouperFactoryTest.java @@ -28,7 +28,6 @@ final class RecordGrouperFactoryTest { void topicPartition() { final Template filenameTemplate = Template.of("{{topic}}/{{partition}}/{{start_offset}}"); final String grType = RecordGrouperFactory.resolveRecordGrouperType(filenameTemplate); - System.out.println(grType); assertThat(RecordGrouperFactory.TOPIC_PARTITION_RECORD).isEqualTo(grType); } @@ -36,7 +35,6 @@ void topicPartition() { void topicPartitionAndKey() { final Template filenameTemplate = Template.of("{{topic}}/{{partition}}/{{key}}/{{start_offset}}"); final String grType = RecordGrouperFactory.resolveRecordGrouperType(filenameTemplate); - System.out.println(grType); assertThat(RecordGrouperFactory.TOPIC_PARTITION_KEY_RECORD).isEqualTo(grType); } diff --git a/src/test/java/io/aiven/kafka/connect/common/grouper/SchemaBasedTopicPartitionKeyRecordGrouperTest.java b/src/test/java/io/aiven/kafka/connect/common/grouper/SchemaBasedTopicPartitionKeyRecordGrouperTest.java index 1761f31aa..fab09dbd8 100644 --- a/src/test/java/io/aiven/kafka/connect/common/grouper/SchemaBasedTopicPartitionKeyRecordGrouperTest.java +++ b/src/test/java/io/aiven/kafka/connect/common/grouper/SchemaBasedTopicPartitionKeyRecordGrouperTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Aiven Oy + * Copyright 2023 Aiven Oy * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/test/java/io/aiven/kafka/connect/common/grouper/TopicPartitionKeyRecordGrouperTest.java b/src/test/java/io/aiven/kafka/connect/common/grouper/TopicPartitionKeyRecordGrouperTest.java index 3393882c5..f0eef9318 100644 --- a/src/test/java/io/aiven/kafka/connect/common/grouper/TopicPartitionKeyRecordGrouperTest.java +++ b/src/test/java/io/aiven/kafka/connect/common/grouper/TopicPartitionKeyRecordGrouperTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Aiven Oy + * Copyright 2023 Aiven Oy * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License.