From d82e94f03e27f587abf9329858eb2abc5a0d34ca Mon Sep 17 00:00:00 2001 From: Vova Kolmakov Date: Sat, 28 Dec 2024 15:34:36 +0700 Subject: [PATCH] Kafka-connect-runtime: remove code duplications in integration tests --- .../connect/IntegrationDynamicTableTest.java | 84 ++----------- .../connect/IntegrationMultiTableTest.java | 93 +++----------- .../iceberg/connect/IntegrationTest.java | 115 +++++------------- .../iceberg/connect/IntegrationTestBase.java | 79 +++++++++++- 4 files changed, 133 insertions(+), 238 deletions(-) diff --git a/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationDynamicTableTest.java b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationDynamicTableTest.java index 5c458ad3fa78..6d1d213ecd82 100644 --- a/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationDynamicTableTest.java +++ b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationDynamicTableTest.java @@ -20,55 +20,27 @@ import static org.assertj.core.api.Assertions.assertThat; -import java.time.Duration; import java.time.Instant; import java.util.List; import org.apache.iceberg.DataFile; -import org.apache.iceberg.Table; -import org.apache.iceberg.catalog.Namespace; -import org.apache.iceberg.catalog.SupportsNamespaces; -import org.apache.iceberg.catalog.TableIdentifier; -import org.awaitility.Awaitility; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.NullSource; import org.junit.jupiter.params.provider.ValueSource; public class IntegrationDynamicTableTest extends IntegrationTestBase { - private static final String TEST_DB = "test"; - private static final String TEST_TABLE1 = "tbl1"; - private static final String TEST_TABLE2 = "tbl2"; - private static final TableIdentifier TABLE_IDENTIFIER1 = TableIdentifier.of(TEST_DB, TEST_TABLE1); - private static final TableIdentifier TABLE_IDENTIFIER2 = TableIdentifier.of(TEST_DB, TEST_TABLE2); - - @BeforeEach - public void before() { - createTopic(testTopic(), TEST_TOPIC_PARTITIONS); - ((SupportsNamespaces) catalog()).createNamespace(Namespace.of(TEST_DB)); - } - - @AfterEach - public void after() { - context().stopConnector(connectorName()); - deleteTopic(testTopic()); - catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE1)); - catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE2)); - ((SupportsNamespaces) catalog()).dropNamespace(Namespace.of(TEST_DB)); - } - @ParameterizedTest @NullSource @ValueSource(strings = "test_branch") public void testIcebergSink(String branch) { // partitioned table catalog().createTable(TABLE_IDENTIFIER1, TestEvent.TEST_SCHEMA, TestEvent.TEST_SPEC); - // unpartitioned table + // non-partitioned table catalog().createTable(TABLE_IDENTIFIER2, TestEvent.TEST_SCHEMA); boolean useSchema = branch == null; // use a schema for one of the tests - runTest(branch, useSchema); + runTest(branch, useSchema, ImmutableMap.of(), List.of(TABLE_IDENTIFIER1, TABLE_IDENTIFIER2)); List files = dataFiles(TABLE_IDENTIFIER1, branch); assertThat(files).hasSize(1); @@ -81,36 +53,15 @@ public void testIcebergSink(String branch) { assertSnapshotProps(TABLE_IDENTIFIER2, branch); } - private void runTest(String branch, boolean useSchema) { - // set offset reset to earliest so we don't miss any test messages - KafkaConnectUtils.Config connectorConfig = - new KafkaConnectUtils.Config(connectorName()) - .config("topics", testTopic()) - .config("connector.class", IcebergSinkConnector.class.getName()) - .config("tasks.max", 2) - .config("consumer.override.auto.offset.reset", "earliest") - .config("key.converter", "org.apache.kafka.connect.json.JsonConverter") - .config("key.converter.schemas.enable", false) - .config("value.converter", "org.apache.kafka.connect.json.JsonConverter") - .config("value.converter.schemas.enable", useSchema) - .config("iceberg.tables.dynamic-enabled", true) - .config("iceberg.tables.route-field", "payload") - .config("iceberg.control.commit.interval-ms", 1000) - .config("iceberg.control.commit.timeout-ms", Integer.MAX_VALUE) - .config("iceberg.kafka.auto.offset.reset", "earliest"); - - context().connectorCatalogProperties().forEach(connectorConfig::config); - - if (branch != null) { - connectorConfig.config("iceberg.tables.default-commit-branch", branch); - } - - if (!useSchema) { - connectorConfig.config("value.converter.schemas.enable", false); - } - - context().startConnector(connectorConfig); + @Override + protected KafkaConnectUtils.Config createConfig(boolean useSchema) { + return createCommonConfig(useSchema) + .config("iceberg.tables.dynamic-enabled", true) + .config("iceberg.tables.route-field", "payload"); + } + @Override + protected void sendEvents(boolean useSchema) { TestEvent event1 = new TestEvent(1, "type1", Instant.now(), TEST_DB + "." + TEST_TABLE1); TestEvent event2 = new TestEvent(2, "type2", Instant.now(), TEST_DB + "." + TEST_TABLE2); TestEvent event3 = new TestEvent(3, "type3", Instant.now(), TEST_DB + ".tbl3"); @@ -118,18 +69,5 @@ private void runTest(String branch, boolean useSchema) { send(testTopic(), event1, useSchema); send(testTopic(), event2, useSchema); send(testTopic(), event3, useSchema); - flush(); - - Awaitility.await() - .atMost(Duration.ofSeconds(30)) - .pollInterval(Duration.ofSeconds(1)) - .untilAsserted(this::assertSnapshotAdded); - } - - private void assertSnapshotAdded() { - Table table = catalog().loadTable(TABLE_IDENTIFIER1); - assertThat(table.snapshots()).hasSize(1); - table = catalog().loadTable(TABLE_IDENTIFIER2); - assertThat(table.snapshots()).hasSize(1); } } diff --git a/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationMultiTableTest.java b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationMultiTableTest.java index 7cffbd8838b2..8e3a08cfac9f 100644 --- a/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationMultiTableTest.java +++ b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationMultiTableTest.java @@ -20,55 +20,27 @@ import static org.assertj.core.api.Assertions.assertThat; -import java.time.Duration; import java.time.Instant; import java.util.List; import org.apache.iceberg.DataFile; -import org.apache.iceberg.Table; -import org.apache.iceberg.catalog.Namespace; -import org.apache.iceberg.catalog.SupportsNamespaces; -import org.apache.iceberg.catalog.TableIdentifier; -import org.awaitility.Awaitility; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.NullSource; import org.junit.jupiter.params.provider.ValueSource; public class IntegrationMultiTableTest extends IntegrationTestBase { - private static final String TEST_DB = "test"; - private static final String TEST_TABLE1 = "foobar1"; - private static final String TEST_TABLE2 = "foobar2"; - private static final TableIdentifier TABLE_IDENTIFIER1 = TableIdentifier.of(TEST_DB, TEST_TABLE1); - private static final TableIdentifier TABLE_IDENTIFIER2 = TableIdentifier.of(TEST_DB, TEST_TABLE2); - - @BeforeEach - public void before() { - createTopic(testTopic(), TEST_TOPIC_PARTITIONS); - ((SupportsNamespaces) catalog()).createNamespace(Namespace.of(TEST_DB)); - } - - @AfterEach - public void after() { - context().stopConnector(connectorName()); - deleteTopic(testTopic()); - catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE1)); - catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE2)); - ((SupportsNamespaces) catalog()).dropNamespace(Namespace.of(TEST_DB)); - } - @ParameterizedTest @NullSource @ValueSource(strings = "test_branch") public void testIcebergSink(String branch) { // partitioned table catalog().createTable(TABLE_IDENTIFIER1, TestEvent.TEST_SCHEMA, TestEvent.TEST_SPEC); - // unpartitioned table + // non-partitioned table catalog().createTable(TABLE_IDENTIFIER2, TestEvent.TEST_SCHEMA); boolean useSchema = branch == null; // use a schema for one of the tests - runTest(branch, useSchema); + runTest(branch, useSchema, ImmutableMap.of(), List.of(TABLE_IDENTIFIER1, TABLE_IDENTIFIER2)); List files = dataFiles(TABLE_IDENTIFIER1, branch); assertThat(files).hasSize(1); @@ -81,41 +53,19 @@ public void testIcebergSink(String branch) { assertSnapshotProps(TABLE_IDENTIFIER2, branch); } - private void runTest(String branch, boolean useSchema) { - // set offset reset to earliest so we don't miss any test messages - KafkaConnectUtils.Config connectorConfig = - new KafkaConnectUtils.Config(connectorName()) - .config("topics", testTopic()) - .config("connector.class", IcebergSinkConnector.class.getName()) - .config("tasks.max", 2) - .config("consumer.override.auto.offset.reset", "earliest") - .config("key.converter", "org.apache.kafka.connect.json.JsonConverter") - .config("key.converter.schemas.enable", false) - .config("value.converter", "org.apache.kafka.connect.json.JsonConverter") - .config("value.converter.schemas.enable", useSchema) - .config( - "iceberg.tables", - String.format("%s.%s, %s.%s", TEST_DB, TEST_TABLE1, TEST_DB, TEST_TABLE2)) - .config("iceberg.tables.route-field", "type") - .config(String.format("iceberg.table.%s.%s.route-regex", TEST_DB, TEST_TABLE1), "type1") - .config(String.format("iceberg.table.%s.%s.route-regex", TEST_DB, TEST_TABLE2), "type2") - .config("iceberg.control.commit.interval-ms", 1000) - .config("iceberg.control.commit.timeout-ms", Integer.MAX_VALUE) - .config("iceberg.kafka.auto.offset.reset", "earliest"); - - context().connectorCatalogProperties().forEach(connectorConfig::config); - - if (branch != null) { - connectorConfig.config("iceberg.tables.default-commit-branch", branch); - } - - // use a schema for one of the cases - if (!useSchema) { - connectorConfig.config("value.converter.schemas.enable", false); - } - - context().startConnector(connectorConfig); + @Override + protected KafkaConnectUtils.Config createConfig(boolean useSchema) { + return createCommonConfig(useSchema) + .config( + "iceberg.tables", + String.format("%s.%s, %s.%s", TEST_DB, TEST_TABLE1, TEST_DB, TEST_TABLE2)) + .config("iceberg.tables.route-field", "type") + .config(String.format("iceberg.table.%s.%s.route-regex", TEST_DB, TEST_TABLE1), "type1") + .config(String.format("iceberg.table.%s.%s.route-regex", TEST_DB, TEST_TABLE2), "type2"); + } + @Override + protected void sendEvents(boolean useSchema) { TestEvent event1 = new TestEvent(1, "type1", Instant.now(), "hello world!"); TestEvent event2 = new TestEvent(2, "type2", Instant.now(), "having fun?"); TestEvent event3 = new TestEvent(3, "type3", Instant.now(), "ignore me"); @@ -123,18 +73,5 @@ private void runTest(String branch, boolean useSchema) { send(testTopic(), event1, useSchema); send(testTopic(), event2, useSchema); send(testTopic(), event3, useSchema); - flush(); - - Awaitility.await() - .atMost(Duration.ofSeconds(30)) - .pollInterval(Duration.ofSeconds(1)) - .untilAsserted(this::assertSnapshotAdded); - } - - private void assertSnapshotAdded() { - Table table = catalog().loadTable(TABLE_IDENTIFIER1); - assertThat(table.snapshots()).hasSize(1); - table = catalog().loadTable(TABLE_IDENTIFIER2); - assertThat(table.snapshots()).hasSize(1); } } diff --git a/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationTest.java b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationTest.java index 80a74539311c..cc5cd210308b 100644 --- a/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationTest.java +++ b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationTest.java @@ -19,7 +19,6 @@ package org.apache.iceberg.connect; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.fail; import java.time.Duration; import java.time.Instant; @@ -28,11 +27,6 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; -import org.apache.iceberg.Table; -import org.apache.iceberg.catalog.Namespace; -import org.apache.iceberg.catalog.SupportsNamespaces; -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -41,64 +35,43 @@ import org.apache.iceberg.types.Types.LongType; import org.apache.iceberg.types.Types.StringType; import org.apache.iceberg.types.Types.TimestampType; -import org.awaitility.Awaitility; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.NullSource; import org.junit.jupiter.params.provider.ValueSource; public class IntegrationTest extends IntegrationTestBase { - private static final String TEST_DB = "test"; - private static final String TEST_TABLE = "foobar"; - private static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of(TEST_DB, TEST_TABLE); - - @BeforeEach - public void before() { - createTopic(testTopic(), TEST_TOPIC_PARTITIONS); - ((SupportsNamespaces) catalog()).createNamespace(Namespace.of(TEST_DB)); - } - - @AfterEach - public void after() { - context().stopConnector(connectorName()); - deleteTopic(testTopic()); - catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE)); - ((SupportsNamespaces) catalog()).dropNamespace(Namespace.of(TEST_DB)); - } - @ParameterizedTest @NullSource @ValueSource(strings = "test_branch") public void testIcebergSinkPartitionedTable(String branch) { - catalog().createTable(TABLE_IDENTIFIER, TestEvent.TEST_SCHEMA, TestEvent.TEST_SPEC); + catalog().createTable(TABLE_IDENTIFIER1, TestEvent.TEST_SCHEMA, TestEvent.TEST_SPEC); boolean useSchema = branch == null; // use a schema for one of the tests - runTest(branch, useSchema, ImmutableMap.of()); + runTest(branch, useSchema, ImmutableMap.of(), List.of(TABLE_IDENTIFIER1)); - List files = dataFiles(TABLE_IDENTIFIER, branch); + List files = dataFiles(TABLE_IDENTIFIER1, branch); // partition may involve 1 or 2 workers assertThat(files).hasSizeBetween(1, 2); assertThat(files.get(0).recordCount()).isEqualTo(1); assertThat(files.get(1).recordCount()).isEqualTo(1); - assertSnapshotProps(TABLE_IDENTIFIER, branch); + assertSnapshotProps(TABLE_IDENTIFIER1, branch); } @ParameterizedTest @NullSource @ValueSource(strings = "test_branch") - public void testIcebergSinkUnpartitionedTable(String branch) { - catalog().createTable(TABLE_IDENTIFIER, TestEvent.TEST_SCHEMA); + public void testIcebergSinkNonPartitionedTable(String branch) { + catalog().createTable(TABLE_IDENTIFIER1, TestEvent.TEST_SCHEMA); boolean useSchema = branch == null; // use a schema for one of the tests - runTest(branch, useSchema, ImmutableMap.of()); + runTest(branch, useSchema, ImmutableMap.of(), List.of(TABLE_IDENTIFIER1)); - List files = dataFiles(TABLE_IDENTIFIER, branch); + List files = dataFiles(TABLE_IDENTIFIER1, branch); // may involve 1 or 2 workers assertThat(files).hasSizeBetween(1, 2); assertThat(files.stream().mapToLong(DataFile::recordCount).sum()).isEqualTo(2); - assertSnapshotProps(TABLE_IDENTIFIER, branch); + assertSnapshotProps(TABLE_IDENTIFIER1, branch); } @ParameterizedTest @@ -110,16 +83,20 @@ public void testIcebergSinkSchemaEvolution(String branch) { ImmutableList.of( Types.NestedField.required(1, "id", Types.IntegerType.get()), Types.NestedField.required(2, "type", Types.StringType.get()))); - catalog().createTable(TABLE_IDENTIFIER, initialSchema); + catalog().createTable(TABLE_IDENTIFIER1, initialSchema); boolean useSchema = branch == null; // use a schema for one of the tests - runTest(branch, useSchema, ImmutableMap.of("iceberg.tables.evolve-schema-enabled", "true")); + runTest( + branch, + useSchema, + ImmutableMap.of("iceberg.tables.evolve-schema-enabled", "true"), + List.of(TABLE_IDENTIFIER1)); - List files = dataFiles(TABLE_IDENTIFIER, branch); + List files = dataFiles(TABLE_IDENTIFIER1, branch); // may involve 1 or 2 workers assertThat(files).hasSizeBetween(1, 2); assertThat(files.stream().mapToLong(DataFile::recordCount).sum()).isEqualTo(2); - assertSnapshotProps(TABLE_IDENTIFIER, branch); + assertSnapshotProps(TABLE_IDENTIFIER1, branch); // when not using a value schema, the ID data type will not be updated Class expectedIdType = @@ -141,22 +118,22 @@ public void testIcebergSinkAutoCreate(String branch) { extraConfig.put("iceberg.tables.default-partition-by", "hour(ts)"); } - runTest(branch, useSchema, extraConfig); + runTest(branch, useSchema, extraConfig, List.of(TABLE_IDENTIFIER1)); - List files = dataFiles(TABLE_IDENTIFIER, branch); + List files = dataFiles(TABLE_IDENTIFIER1, branch); // may involve 1 or 2 workers assertThat(files).hasSizeBetween(1, 2); assertThat(files.stream().mapToLong(DataFile::recordCount).sum()).isEqualTo(2); - assertSnapshotProps(TABLE_IDENTIFIER, branch); + assertSnapshotProps(TABLE_IDENTIFIER1, branch); assertGeneratedSchema(useSchema, LongType.class); - PartitionSpec spec = catalog().loadTable(TABLE_IDENTIFIER).spec(); + PartitionSpec spec = catalog().loadTable(TABLE_IDENTIFIER1).spec(); assertThat(spec.isPartitioned()).isEqualTo(useSchema); } private void assertGeneratedSchema(boolean useSchema, Class expectedIdType) { - Schema tableSchema = catalog().loadTable(TABLE_IDENTIFIER).schema(); + Schema tableSchema = catalog().loadTable(TABLE_IDENTIFIER1).schema(); assertThat(tableSchema.findField("id").type()).isInstanceOf(expectedIdType); assertThat(tableSchema.findField("type").type()).isInstanceOf(StringType.class); assertThat(tableSchema.findField("payload").type()).isInstanceOf(StringType.class); @@ -172,33 +149,14 @@ private void assertGeneratedSchema(boolean useSchema, Class expe } } - private void runTest(String branch, boolean useSchema, Map extraConfig) { - // set offset reset to earliest so we don't miss any test messages - KafkaConnectUtils.Config connectorConfig = - new KafkaConnectUtils.Config(connectorName()) - .config("topics", testTopic()) - .config("connector.class", IcebergSinkConnector.class.getName()) - .config("tasks.max", 2) - .config("consumer.override.auto.offset.reset", "earliest") - .config("key.converter", "org.apache.kafka.connect.json.JsonConverter") - .config("key.converter.schemas.enable", false) - .config("value.converter", "org.apache.kafka.connect.json.JsonConverter") - .config("value.converter.schemas.enable", useSchema) - .config("iceberg.tables", String.format("%s.%s", TEST_DB, TEST_TABLE)) - .config("iceberg.control.commit.interval-ms", 1000) - .config("iceberg.control.commit.timeout-ms", Integer.MAX_VALUE) - .config("iceberg.kafka.auto.offset.reset", "earliest"); - - context().connectorCatalogProperties().forEach(connectorConfig::config); - - if (branch != null) { - connectorConfig.config("iceberg.tables.default-commit-branch", branch); - } - - extraConfig.forEach(connectorConfig::config); - - context().startConnector(connectorConfig); + @Override + protected KafkaConnectUtils.Config createConfig(boolean useSchema) { + return createCommonConfig(useSchema) + .config("iceberg.tables", String.format("%s.%s", TEST_DB, TEST_TABLE1)); + } + @Override + protected void sendEvents(boolean useSchema) { TestEvent event1 = new TestEvent(1, "type1", Instant.now(), "hello world!"); Instant threeDaysAgo = Instant.now().minus(Duration.ofDays(3)); @@ -206,20 +164,5 @@ private void runTest(String branch, boolean useSchema, Map extra send(testTopic(), event1, useSchema); send(testTopic(), event2, useSchema); - flush(); - - Awaitility.await() - .atMost(Duration.ofSeconds(30)) - .pollInterval(Duration.ofSeconds(1)) - .untilAsserted(this::assertSnapshotAdded); - } - - private void assertSnapshotAdded() { - try { - Table table = catalog().loadTable(TABLE_IDENTIFIER); - assertThat(table.snapshots()).hasSize(1); - } catch (NoSuchTableException e) { - fail("Table should exist"); - } } } diff --git a/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationTestBase.java b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationTestBase.java index 247211edb01f..02d5c6a807ea 100644 --- a/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationTestBase.java +++ b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationTestBase.java @@ -19,7 +19,9 @@ package org.apache.iceberg.connect; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; +import java.time.Duration; import java.util.List; import java.util.Map; import java.util.UUID; @@ -31,7 +33,10 @@ import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.kafka.clients.admin.Admin; @@ -39,11 +44,12 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.assertj.core.api.Condition; +import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; -public class IntegrationTestBase { +public abstract class IntegrationTestBase { private static TestContext context; @@ -55,6 +61,17 @@ public class IntegrationTestBase { private KafkaProducer producer; protected static final int TEST_TOPIC_PARTITIONS = 2; + protected static final String TEST_DB = "test"; + protected static final String TEST_TABLE1 = "foobar1"; + protected static final String TEST_TABLE2 = "foobar2"; + protected static final TableIdentifier TABLE_IDENTIFIER1 = + TableIdentifier.of(TEST_DB, TEST_TABLE1); + protected static final TableIdentifier TABLE_IDENTIFIER2 = + TableIdentifier.of(TEST_DB, TEST_TABLE2); + + abstract KafkaConnectUtils.Config createConfig(boolean useSchema); + + abstract void sendEvents(boolean useSchema); protected TestContext context() { return context; @@ -84,10 +101,17 @@ public void baseBefore() { this.admin = context.initLocalAdmin(); this.connectorName = "test_connector-" + UUID.randomUUID(); this.testTopic = "test-topic-" + UUID.randomUUID(); + createTopic(testTopic(), TEST_TOPIC_PARTITIONS); + ((SupportsNamespaces) catalog()).createNamespace(Namespace.of(TEST_DB)); } @AfterEach public void baseAfter() { + context().stopConnector(connectorName()); + deleteTopic(testTopic()); + catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE1)); + catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE2)); + ((SupportsNamespaces) catalog()).dropNamespace(Namespace.of(TEST_DB)); try { if (catalog instanceof AutoCloseable) { ((AutoCloseable) catalog).close(); @@ -158,4 +182,57 @@ protected void send(String topicName, TestEvent event, boolean useSchema) { protected void flush() { producer.flush(); } + + protected KafkaConnectUtils.Config createCommonConfig(boolean useSchema) { + // set offset reset to the earliest, so we don't miss any test messages + return new KafkaConnectUtils.Config(connectorName()) + .config("topics", testTopic()) + .config("connector.class", IcebergSinkConnector.class.getName()) + .config("tasks.max", 2) + .config("consumer.override.auto.offset.reset", "earliest") + .config("key.converter", "org.apache.kafka.connect.json.JsonConverter") + .config("key.converter.schemas.enable", false) + .config("value.converter", "org.apache.kafka.connect.json.JsonConverter") + .config("value.converter.schemas.enable", useSchema) + .config("iceberg.control.commit.interval-ms", 1000) + .config("iceberg.control.commit.timeout-ms", Integer.MAX_VALUE) + .config("iceberg.kafka.auto.offset.reset", "earliest"); + } + + protected void runTest( + String branch, + boolean useSchema, + Map extraConfig, + List tableIdentifiers) { + KafkaConnectUtils.Config connectorConfig = createConfig(useSchema); + + context().connectorCatalogProperties().forEach(connectorConfig::config); + + if (branch != null) { + connectorConfig.config("iceberg.tables.default-commit-branch", branch); + } + + extraConfig.forEach(connectorConfig::config); + + context().startConnector(connectorConfig); + + sendEvents(useSchema); + flush(); + + Awaitility.await() + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) + .untilAsserted(() -> assertSnapshotAdded(tableIdentifiers)); + } + + protected void assertSnapshotAdded(List tableIdentifiers) { + for (TableIdentifier tableId : tableIdentifiers) { + try { + Table table = catalog().loadTable(tableId); + assertThat(table.snapshots()).hasSize(1); + } catch (NoSuchTableException e) { + fail("Table should exist"); + } + } + } }