diff --git a/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationDynamicTableTest.java b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationDynamicTableTest.java index 5c458ad3fa78..1603b8927e62 100644 --- a/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationDynamicTableTest.java +++ b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationDynamicTableTest.java @@ -20,44 +20,22 @@ import static org.assertj.core.api.Assertions.assertThat; -import java.time.Duration; import java.time.Instant; import java.util.List; import org.apache.iceberg.DataFile; -import org.apache.iceberg.Table; -import org.apache.iceberg.catalog.Namespace; -import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; -import org.awaitility.Awaitility; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.NullSource; import org.junit.jupiter.params.provider.ValueSource; public class IntegrationDynamicTableTest extends IntegrationTestBase { - private static final String TEST_DB = "test"; private static final String TEST_TABLE1 = "tbl1"; private static final String TEST_TABLE2 = "tbl2"; private static final TableIdentifier TABLE_IDENTIFIER1 = TableIdentifier.of(TEST_DB, TEST_TABLE1); private static final TableIdentifier TABLE_IDENTIFIER2 = TableIdentifier.of(TEST_DB, TEST_TABLE2); - @BeforeEach - public void before() { - createTopic(testTopic(), TEST_TOPIC_PARTITIONS); - ((SupportsNamespaces) catalog()).createNamespace(Namespace.of(TEST_DB)); - } - - @AfterEach - public void after() { - context().stopConnector(connectorName()); - deleteTopic(testTopic()); - catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE1)); - catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE2)); - ((SupportsNamespaces) catalog()).dropNamespace(Namespace.of(TEST_DB)); - } - @ParameterizedTest @NullSource @ValueSource(strings = "test_branch") @@ -68,7 +46,7 @@ public void testIcebergSink(String branch) { catalog().createTable(TABLE_IDENTIFIER2, TestEvent.TEST_SCHEMA); boolean useSchema = branch == null; // use a schema for one of the tests - runTest(branch, useSchema); + runTest(branch, useSchema, ImmutableMap.of(), List.of(TABLE_IDENTIFIER1, TABLE_IDENTIFIER2)); List files = dataFiles(TABLE_IDENTIFIER1, branch); assertThat(files).hasSize(1); @@ -81,36 +59,15 @@ public void testIcebergSink(String branch) { assertSnapshotProps(TABLE_IDENTIFIER2, branch); } - private void runTest(String branch, boolean useSchema) { - // set offset reset to earliest so we don't miss any test messages - KafkaConnectUtils.Config connectorConfig = - new KafkaConnectUtils.Config(connectorName()) - .config("topics", testTopic()) - .config("connector.class", IcebergSinkConnector.class.getName()) - .config("tasks.max", 2) - .config("consumer.override.auto.offset.reset", "earliest") - .config("key.converter", "org.apache.kafka.connect.json.JsonConverter") - .config("key.converter.schemas.enable", false) - .config("value.converter", "org.apache.kafka.connect.json.JsonConverter") - .config("value.converter.schemas.enable", useSchema) - .config("iceberg.tables.dynamic-enabled", true) - .config("iceberg.tables.route-field", "payload") - .config("iceberg.control.commit.interval-ms", 1000) - .config("iceberg.control.commit.timeout-ms", Integer.MAX_VALUE) - .config("iceberg.kafka.auto.offset.reset", "earliest"); - - context().connectorCatalogProperties().forEach(connectorConfig::config); - - if (branch != null) { - connectorConfig.config("iceberg.tables.default-commit-branch", branch); - } - - if (!useSchema) { - connectorConfig.config("value.converter.schemas.enable", false); - } - - context().startConnector(connectorConfig); + @Override + protected KafkaConnectUtils.Config createConfig(boolean useSchema) { + return createCommonConfig(useSchema) + .config("iceberg.tables.dynamic-enabled", true) + .config("iceberg.tables.route-field", "payload"); + } + @Override + protected void sendEvents(boolean useSchema) { TestEvent event1 = new TestEvent(1, "type1", Instant.now(), TEST_DB + "." + TEST_TABLE1); TestEvent event2 = new TestEvent(2, "type2", Instant.now(), TEST_DB + "." + TEST_TABLE2); TestEvent event3 = new TestEvent(3, "type3", Instant.now(), TEST_DB + ".tbl3"); @@ -118,18 +75,11 @@ private void runTest(String branch, boolean useSchema) { send(testTopic(), event1, useSchema); send(testTopic(), event2, useSchema); send(testTopic(), event3, useSchema); - flush(); - - Awaitility.await() - .atMost(Duration.ofSeconds(30)) - .pollInterval(Duration.ofSeconds(1)) - .untilAsserted(this::assertSnapshotAdded); } - private void assertSnapshotAdded() { - Table table = catalog().loadTable(TABLE_IDENTIFIER1); - assertThat(table.snapshots()).hasSize(1); - table = catalog().loadTable(TABLE_IDENTIFIER2); - assertThat(table.snapshots()).hasSize(1); + @Override + void dropTables() { + catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE1)); + catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE2)); } } diff --git a/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationMultiTableTest.java b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationMultiTableTest.java index 7cffbd8838b2..d85514c3f92d 100644 --- a/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationMultiTableTest.java +++ b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationMultiTableTest.java @@ -20,44 +20,22 @@ import static org.assertj.core.api.Assertions.assertThat; -import java.time.Duration; import java.time.Instant; import java.util.List; import org.apache.iceberg.DataFile; -import org.apache.iceberg.Table; -import org.apache.iceberg.catalog.Namespace; -import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; -import org.awaitility.Awaitility; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.NullSource; import org.junit.jupiter.params.provider.ValueSource; public class IntegrationMultiTableTest extends IntegrationTestBase { - private static final String TEST_DB = "test"; private static final String TEST_TABLE1 = "foobar1"; private static final String TEST_TABLE2 = "foobar2"; private static final TableIdentifier TABLE_IDENTIFIER1 = TableIdentifier.of(TEST_DB, TEST_TABLE1); private static final TableIdentifier TABLE_IDENTIFIER2 = TableIdentifier.of(TEST_DB, TEST_TABLE2); - @BeforeEach - public void before() { - createTopic(testTopic(), TEST_TOPIC_PARTITIONS); - ((SupportsNamespaces) catalog()).createNamespace(Namespace.of(TEST_DB)); - } - - @AfterEach - public void after() { - context().stopConnector(connectorName()); - deleteTopic(testTopic()); - catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE1)); - catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE2)); - ((SupportsNamespaces) catalog()).dropNamespace(Namespace.of(TEST_DB)); - } - @ParameterizedTest @NullSource @ValueSource(strings = "test_branch") @@ -68,7 +46,7 @@ public void testIcebergSink(String branch) { catalog().createTable(TABLE_IDENTIFIER2, TestEvent.TEST_SCHEMA); boolean useSchema = branch == null; // use a schema for one of the tests - runTest(branch, useSchema); + runTest(branch, useSchema, ImmutableMap.of(), List.of(TABLE_IDENTIFIER1, TABLE_IDENTIFIER2)); List files = dataFiles(TABLE_IDENTIFIER1, branch); assertThat(files).hasSize(1); @@ -81,41 +59,19 @@ public void testIcebergSink(String branch) { assertSnapshotProps(TABLE_IDENTIFIER2, branch); } - private void runTest(String branch, boolean useSchema) { - // set offset reset to earliest so we don't miss any test messages - KafkaConnectUtils.Config connectorConfig = - new KafkaConnectUtils.Config(connectorName()) - .config("topics", testTopic()) - .config("connector.class", IcebergSinkConnector.class.getName()) - .config("tasks.max", 2) - .config("consumer.override.auto.offset.reset", "earliest") - .config("key.converter", "org.apache.kafka.connect.json.JsonConverter") - .config("key.converter.schemas.enable", false) - .config("value.converter", "org.apache.kafka.connect.json.JsonConverter") - .config("value.converter.schemas.enable", useSchema) - .config( - "iceberg.tables", - String.format("%s.%s, %s.%s", TEST_DB, TEST_TABLE1, TEST_DB, TEST_TABLE2)) - .config("iceberg.tables.route-field", "type") - .config(String.format("iceberg.table.%s.%s.route-regex", TEST_DB, TEST_TABLE1), "type1") - .config(String.format("iceberg.table.%s.%s.route-regex", TEST_DB, TEST_TABLE2), "type2") - .config("iceberg.control.commit.interval-ms", 1000) - .config("iceberg.control.commit.timeout-ms", Integer.MAX_VALUE) - .config("iceberg.kafka.auto.offset.reset", "earliest"); - - context().connectorCatalogProperties().forEach(connectorConfig::config); - - if (branch != null) { - connectorConfig.config("iceberg.tables.default-commit-branch", branch); - } - - // use a schema for one of the cases - if (!useSchema) { - connectorConfig.config("value.converter.schemas.enable", false); - } - - context().startConnector(connectorConfig); + @Override + protected KafkaConnectUtils.Config createConfig(boolean useSchema) { + return createCommonConfig(useSchema) + .config( + "iceberg.tables", + String.format("%s.%s, %s.%s", TEST_DB, TEST_TABLE1, TEST_DB, TEST_TABLE2)) + .config("iceberg.tables.route-field", "type") + .config(String.format("iceberg.table.%s.%s.route-regex", TEST_DB, TEST_TABLE1), "type1") + .config(String.format("iceberg.table.%s.%s.route-regex", TEST_DB, TEST_TABLE2), "type2"); + } + @Override + protected void sendEvents(boolean useSchema) { TestEvent event1 = new TestEvent(1, "type1", Instant.now(), "hello world!"); TestEvent event2 = new TestEvent(2, "type2", Instant.now(), "having fun?"); TestEvent event3 = new TestEvent(3, "type3", Instant.now(), "ignore me"); @@ -123,18 +79,11 @@ private void runTest(String branch, boolean useSchema) { send(testTopic(), event1, useSchema); send(testTopic(), event2, useSchema); send(testTopic(), event3, useSchema); - flush(); - - Awaitility.await() - .atMost(Duration.ofSeconds(30)) - .pollInterval(Duration.ofSeconds(1)) - .untilAsserted(this::assertSnapshotAdded); } - private void assertSnapshotAdded() { - Table table = catalog().loadTable(TABLE_IDENTIFIER1); - assertThat(table.snapshots()).hasSize(1); - table = catalog().loadTable(TABLE_IDENTIFIER2); - assertThat(table.snapshots()).hasSize(1); + @Override + void dropTables() { + catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE1)); + catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE2)); } } diff --git a/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationTest.java b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationTest.java index 80a74539311c..71a306e310bb 100644 --- a/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationTest.java +++ b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationTest.java @@ -19,7 +19,6 @@ package org.apache.iceberg.connect; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.fail; import java.time.Duration; import java.time.Instant; @@ -28,11 +27,7 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; -import org.apache.iceberg.Table; -import org.apache.iceberg.catalog.Namespace; -import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -41,33 +36,15 @@ import org.apache.iceberg.types.Types.LongType; import org.apache.iceberg.types.Types.StringType; import org.apache.iceberg.types.Types.TimestampType; -import org.awaitility.Awaitility; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.NullSource; import org.junit.jupiter.params.provider.ValueSource; public class IntegrationTest extends IntegrationTestBase { - private static final String TEST_DB = "test"; private static final String TEST_TABLE = "foobar"; private static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of(TEST_DB, TEST_TABLE); - @BeforeEach - public void before() { - createTopic(testTopic(), TEST_TOPIC_PARTITIONS); - ((SupportsNamespaces) catalog()).createNamespace(Namespace.of(TEST_DB)); - } - - @AfterEach - public void after() { - context().stopConnector(connectorName()); - deleteTopic(testTopic()); - catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE)); - ((SupportsNamespaces) catalog()).dropNamespace(Namespace.of(TEST_DB)); - } - @ParameterizedTest @NullSource @ValueSource(strings = "test_branch") @@ -75,7 +52,7 @@ public void testIcebergSinkPartitionedTable(String branch) { catalog().createTable(TABLE_IDENTIFIER, TestEvent.TEST_SCHEMA, TestEvent.TEST_SPEC); boolean useSchema = branch == null; // use a schema for one of the tests - runTest(branch, useSchema, ImmutableMap.of()); + runTest(branch, useSchema, ImmutableMap.of(), List.of(TABLE_IDENTIFIER)); List files = dataFiles(TABLE_IDENTIFIER, branch); // partition may involve 1 or 2 workers @@ -92,7 +69,7 @@ public void testIcebergSinkUnpartitionedTable(String branch) { catalog().createTable(TABLE_IDENTIFIER, TestEvent.TEST_SCHEMA); boolean useSchema = branch == null; // use a schema for one of the tests - runTest(branch, useSchema, ImmutableMap.of()); + runTest(branch, useSchema, ImmutableMap.of(), List.of(TABLE_IDENTIFIER)); List files = dataFiles(TABLE_IDENTIFIER, branch); // may involve 1 or 2 workers @@ -113,7 +90,11 @@ public void testIcebergSinkSchemaEvolution(String branch) { catalog().createTable(TABLE_IDENTIFIER, initialSchema); boolean useSchema = branch == null; // use a schema for one of the tests - runTest(branch, useSchema, ImmutableMap.of("iceberg.tables.evolve-schema-enabled", "true")); + runTest( + branch, + useSchema, + ImmutableMap.of("iceberg.tables.evolve-schema-enabled", "true"), + List.of(TABLE_IDENTIFIER)); List files = dataFiles(TABLE_IDENTIFIER, branch); // may involve 1 or 2 workers @@ -141,7 +122,7 @@ public void testIcebergSinkAutoCreate(String branch) { extraConfig.put("iceberg.tables.default-partition-by", "hour(ts)"); } - runTest(branch, useSchema, extraConfig); + runTest(branch, useSchema, extraConfig, List.of(TABLE_IDENTIFIER)); List files = dataFiles(TABLE_IDENTIFIER, branch); // may involve 1 or 2 workers @@ -172,33 +153,14 @@ private void assertGeneratedSchema(boolean useSchema, Class expe } } - private void runTest(String branch, boolean useSchema, Map extraConfig) { - // set offset reset to earliest so we don't miss any test messages - KafkaConnectUtils.Config connectorConfig = - new KafkaConnectUtils.Config(connectorName()) - .config("topics", testTopic()) - .config("connector.class", IcebergSinkConnector.class.getName()) - .config("tasks.max", 2) - .config("consumer.override.auto.offset.reset", "earliest") - .config("key.converter", "org.apache.kafka.connect.json.JsonConverter") - .config("key.converter.schemas.enable", false) - .config("value.converter", "org.apache.kafka.connect.json.JsonConverter") - .config("value.converter.schemas.enable", useSchema) - .config("iceberg.tables", String.format("%s.%s", TEST_DB, TEST_TABLE)) - .config("iceberg.control.commit.interval-ms", 1000) - .config("iceberg.control.commit.timeout-ms", Integer.MAX_VALUE) - .config("iceberg.kafka.auto.offset.reset", "earliest"); - - context().connectorCatalogProperties().forEach(connectorConfig::config); - - if (branch != null) { - connectorConfig.config("iceberg.tables.default-commit-branch", branch); - } - - extraConfig.forEach(connectorConfig::config); - - context().startConnector(connectorConfig); + @Override + protected KafkaConnectUtils.Config createConfig(boolean useSchema) { + return createCommonConfig(useSchema) + .config("iceberg.tables", String.format("%s.%s", TEST_DB, TEST_TABLE)); + } + @Override + protected void sendEvents(boolean useSchema) { TestEvent event1 = new TestEvent(1, "type1", Instant.now(), "hello world!"); Instant threeDaysAgo = Instant.now().minus(Duration.ofDays(3)); @@ -206,20 +168,10 @@ private void runTest(String branch, boolean useSchema, Map extra send(testTopic(), event1, useSchema); send(testTopic(), event2, useSchema); - flush(); - - Awaitility.await() - .atMost(Duration.ofSeconds(30)) - .pollInterval(Duration.ofSeconds(1)) - .untilAsserted(this::assertSnapshotAdded); } - private void assertSnapshotAdded() { - try { - Table table = catalog().loadTable(TABLE_IDENTIFIER); - assertThat(table.snapshots()).hasSize(1); - } catch (NoSuchTableException e) { - fail("Table should exist"); - } + @Override + void dropTables() { + catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE)); } } diff --git a/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationTestBase.java b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationTestBase.java index 247211edb01f..532cbde6d315 100644 --- a/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationTestBase.java +++ b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationTestBase.java @@ -19,7 +19,9 @@ package org.apache.iceberg.connect; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; +import java.time.Duration; import java.util.List; import java.util.Map; import java.util.UUID; @@ -31,7 +33,10 @@ import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.kafka.clients.admin.Admin; @@ -39,11 +44,12 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.assertj.core.api.Condition; +import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; -public class IntegrationTestBase { +public abstract class IntegrationTestBase { private static TestContext context; @@ -55,6 +61,13 @@ public class IntegrationTestBase { private KafkaProducer producer; protected static final int TEST_TOPIC_PARTITIONS = 2; + protected static final String TEST_DB = "test"; + + abstract KafkaConnectUtils.Config createConfig(boolean useSchema); + + abstract void sendEvents(boolean useSchema); + + abstract void dropTables(); protected TestContext context() { return context; @@ -84,10 +97,16 @@ public void baseBefore() { this.admin = context.initLocalAdmin(); this.connectorName = "test_connector-" + UUID.randomUUID(); this.testTopic = "test-topic-" + UUID.randomUUID(); + createTopic(testTopic(), TEST_TOPIC_PARTITIONS); + ((SupportsNamespaces) catalog()).createNamespace(Namespace.of(TEST_DB)); } @AfterEach public void baseAfter() { + context().stopConnector(connectorName()); + deleteTopic(testTopic()); + dropTables(); + ((SupportsNamespaces) catalog()).dropNamespace(Namespace.of(TEST_DB)); try { if (catalog instanceof AutoCloseable) { ((AutoCloseable) catalog).close(); @@ -158,4 +177,57 @@ protected void send(String topicName, TestEvent event, boolean useSchema) { protected void flush() { producer.flush(); } + + protected KafkaConnectUtils.Config createCommonConfig(boolean useSchema) { + // set offset reset to the earliest, so we don't miss any test messages + return new KafkaConnectUtils.Config(connectorName()) + .config("topics", testTopic()) + .config("connector.class", IcebergSinkConnector.class.getName()) + .config("tasks.max", 2) + .config("consumer.override.auto.offset.reset", "earliest") + .config("key.converter", "org.apache.kafka.connect.json.JsonConverter") + .config("key.converter.schemas.enable", false) + .config("value.converter", "org.apache.kafka.connect.json.JsonConverter") + .config("value.converter.schemas.enable", useSchema) + .config("iceberg.control.commit.interval-ms", 1000) + .config("iceberg.control.commit.timeout-ms", Integer.MAX_VALUE) + .config("iceberg.kafka.auto.offset.reset", "earliest"); + } + + protected void runTest( + String branch, + boolean useSchema, + Map extraConfig, + List tableIdentifiers) { + KafkaConnectUtils.Config connectorConfig = createConfig(useSchema); + + context().connectorCatalogProperties().forEach(connectorConfig::config); + + if (branch != null) { + connectorConfig.config("iceberg.tables.default-commit-branch", branch); + } + + extraConfig.forEach(connectorConfig::config); + + context().startConnector(connectorConfig); + + sendEvents(useSchema); + flush(); + + Awaitility.await() + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) + .untilAsserted(() -> assertSnapshotAdded(tableIdentifiers)); + } + + protected void assertSnapshotAdded(List tableIdentifiers) { + for (TableIdentifier tableId : tableIdentifiers) { + try { + Table table = catalog().loadTable(tableId); + assertThat(table.snapshots()).hasSize(1); + } catch (NoSuchTableException e) { + fail("Table should exist"); + } + } + } }