Skip to content

Commit

Permalink
Kafka-connect-runtime: remove code duplications in integration tests (#…
Browse files Browse the repository at this point in the history
…11883)

Co-authored-by: Vova Kolmakov <[email protected]>
  • Loading branch information
wombatu-kun and Vova Kolmakov authored Jan 4, 2025
1 parent dbfefb0 commit fcd5dd9
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 200 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,44 +20,22 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.NullSource;
import org.junit.jupiter.params.provider.ValueSource;

public class IntegrationDynamicTableTest extends IntegrationTestBase {

private static final String TEST_DB = "test";
private static final String TEST_TABLE1 = "tbl1";
private static final String TEST_TABLE2 = "tbl2";
private static final TableIdentifier TABLE_IDENTIFIER1 = TableIdentifier.of(TEST_DB, TEST_TABLE1);
private static final TableIdentifier TABLE_IDENTIFIER2 = TableIdentifier.of(TEST_DB, TEST_TABLE2);

@BeforeEach
public void before() {
createTopic(testTopic(), TEST_TOPIC_PARTITIONS);
((SupportsNamespaces) catalog()).createNamespace(Namespace.of(TEST_DB));
}

@AfterEach
public void after() {
context().stopConnector(connectorName());
deleteTopic(testTopic());
catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE1));
catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE2));
((SupportsNamespaces) catalog()).dropNamespace(Namespace.of(TEST_DB));
}

@ParameterizedTest
@NullSource
@ValueSource(strings = "test_branch")
Expand All @@ -68,7 +46,7 @@ public void testIcebergSink(String branch) {
catalog().createTable(TABLE_IDENTIFIER2, TestEvent.TEST_SCHEMA);

boolean useSchema = branch == null; // use a schema for one of the tests
runTest(branch, useSchema);
runTest(branch, useSchema, ImmutableMap.of(), List.of(TABLE_IDENTIFIER1, TABLE_IDENTIFIER2));

List<DataFile> files = dataFiles(TABLE_IDENTIFIER1, branch);
assertThat(files).hasSize(1);
Expand All @@ -81,55 +59,27 @@ public void testIcebergSink(String branch) {
assertSnapshotProps(TABLE_IDENTIFIER2, branch);
}

private void runTest(String branch, boolean useSchema) {
// set offset reset to earliest so we don't miss any test messages
KafkaConnectUtils.Config connectorConfig =
new KafkaConnectUtils.Config(connectorName())
.config("topics", testTopic())
.config("connector.class", IcebergSinkConnector.class.getName())
.config("tasks.max", 2)
.config("consumer.override.auto.offset.reset", "earliest")
.config("key.converter", "org.apache.kafka.connect.json.JsonConverter")
.config("key.converter.schemas.enable", false)
.config("value.converter", "org.apache.kafka.connect.json.JsonConverter")
.config("value.converter.schemas.enable", useSchema)
.config("iceberg.tables.dynamic-enabled", true)
.config("iceberg.tables.route-field", "payload")
.config("iceberg.control.commit.interval-ms", 1000)
.config("iceberg.control.commit.timeout-ms", Integer.MAX_VALUE)
.config("iceberg.kafka.auto.offset.reset", "earliest");

context().connectorCatalogProperties().forEach(connectorConfig::config);

if (branch != null) {
connectorConfig.config("iceberg.tables.default-commit-branch", branch);
}

if (!useSchema) {
connectorConfig.config("value.converter.schemas.enable", false);
}

context().startConnector(connectorConfig);
@Override
protected KafkaConnectUtils.Config createConfig(boolean useSchema) {
return createCommonConfig(useSchema)
.config("iceberg.tables.dynamic-enabled", true)
.config("iceberg.tables.route-field", "payload");
}

@Override
protected void sendEvents(boolean useSchema) {
TestEvent event1 = new TestEvent(1, "type1", Instant.now(), TEST_DB + "." + TEST_TABLE1);
TestEvent event2 = new TestEvent(2, "type2", Instant.now(), TEST_DB + "." + TEST_TABLE2);
TestEvent event3 = new TestEvent(3, "type3", Instant.now(), TEST_DB + ".tbl3");

send(testTopic(), event1, useSchema);
send(testTopic(), event2, useSchema);
send(testTopic(), event3, useSchema);
flush();

Awaitility.await()
.atMost(Duration.ofSeconds(30))
.pollInterval(Duration.ofSeconds(1))
.untilAsserted(this::assertSnapshotAdded);
}

private void assertSnapshotAdded() {
Table table = catalog().loadTable(TABLE_IDENTIFIER1);
assertThat(table.snapshots()).hasSize(1);
table = catalog().loadTable(TABLE_IDENTIFIER2);
assertThat(table.snapshots()).hasSize(1);
@Override
void dropTables() {
catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE1));
catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE2));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,44 +20,22 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.NullSource;
import org.junit.jupiter.params.provider.ValueSource;

public class IntegrationMultiTableTest extends IntegrationTestBase {

private static final String TEST_DB = "test";
private static final String TEST_TABLE1 = "foobar1";
private static final String TEST_TABLE2 = "foobar2";
private static final TableIdentifier TABLE_IDENTIFIER1 = TableIdentifier.of(TEST_DB, TEST_TABLE1);
private static final TableIdentifier TABLE_IDENTIFIER2 = TableIdentifier.of(TEST_DB, TEST_TABLE2);

@BeforeEach
public void before() {
createTopic(testTopic(), TEST_TOPIC_PARTITIONS);
((SupportsNamespaces) catalog()).createNamespace(Namespace.of(TEST_DB));
}

@AfterEach
public void after() {
context().stopConnector(connectorName());
deleteTopic(testTopic());
catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE1));
catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE2));
((SupportsNamespaces) catalog()).dropNamespace(Namespace.of(TEST_DB));
}

@ParameterizedTest
@NullSource
@ValueSource(strings = "test_branch")
Expand All @@ -68,7 +46,7 @@ public void testIcebergSink(String branch) {
catalog().createTable(TABLE_IDENTIFIER2, TestEvent.TEST_SCHEMA);

boolean useSchema = branch == null; // use a schema for one of the tests
runTest(branch, useSchema);
runTest(branch, useSchema, ImmutableMap.of(), List.of(TABLE_IDENTIFIER1, TABLE_IDENTIFIER2));

List<DataFile> files = dataFiles(TABLE_IDENTIFIER1, branch);
assertThat(files).hasSize(1);
Expand All @@ -81,60 +59,31 @@ public void testIcebergSink(String branch) {
assertSnapshotProps(TABLE_IDENTIFIER2, branch);
}

private void runTest(String branch, boolean useSchema) {
// set offset reset to earliest so we don't miss any test messages
KafkaConnectUtils.Config connectorConfig =
new KafkaConnectUtils.Config(connectorName())
.config("topics", testTopic())
.config("connector.class", IcebergSinkConnector.class.getName())
.config("tasks.max", 2)
.config("consumer.override.auto.offset.reset", "earliest")
.config("key.converter", "org.apache.kafka.connect.json.JsonConverter")
.config("key.converter.schemas.enable", false)
.config("value.converter", "org.apache.kafka.connect.json.JsonConverter")
.config("value.converter.schemas.enable", useSchema)
.config(
"iceberg.tables",
String.format("%s.%s, %s.%s", TEST_DB, TEST_TABLE1, TEST_DB, TEST_TABLE2))
.config("iceberg.tables.route-field", "type")
.config(String.format("iceberg.table.%s.%s.route-regex", TEST_DB, TEST_TABLE1), "type1")
.config(String.format("iceberg.table.%s.%s.route-regex", TEST_DB, TEST_TABLE2), "type2")
.config("iceberg.control.commit.interval-ms", 1000)
.config("iceberg.control.commit.timeout-ms", Integer.MAX_VALUE)
.config("iceberg.kafka.auto.offset.reset", "earliest");

context().connectorCatalogProperties().forEach(connectorConfig::config);

if (branch != null) {
connectorConfig.config("iceberg.tables.default-commit-branch", branch);
}

// use a schema for one of the cases
if (!useSchema) {
connectorConfig.config("value.converter.schemas.enable", false);
}

context().startConnector(connectorConfig);
@Override
protected KafkaConnectUtils.Config createConfig(boolean useSchema) {
return createCommonConfig(useSchema)
.config(
"iceberg.tables",
String.format("%s.%s, %s.%s", TEST_DB, TEST_TABLE1, TEST_DB, TEST_TABLE2))
.config("iceberg.tables.route-field", "type")
.config(String.format("iceberg.table.%s.%s.route-regex", TEST_DB, TEST_TABLE1), "type1")
.config(String.format("iceberg.table.%s.%s.route-regex", TEST_DB, TEST_TABLE2), "type2");
}

@Override
protected void sendEvents(boolean useSchema) {
TestEvent event1 = new TestEvent(1, "type1", Instant.now(), "hello world!");
TestEvent event2 = new TestEvent(2, "type2", Instant.now(), "having fun?");
TestEvent event3 = new TestEvent(3, "type3", Instant.now(), "ignore me");

send(testTopic(), event1, useSchema);
send(testTopic(), event2, useSchema);
send(testTopic(), event3, useSchema);
flush();

Awaitility.await()
.atMost(Duration.ofSeconds(30))
.pollInterval(Duration.ofSeconds(1))
.untilAsserted(this::assertSnapshotAdded);
}

private void assertSnapshotAdded() {
Table table = catalog().loadTable(TABLE_IDENTIFIER1);
assertThat(table.snapshots()).hasSize(1);
table = catalog().loadTable(TABLE_IDENTIFIER2);
assertThat(table.snapshots()).hasSize(1);
@Override
void dropTables() {
catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE1));
catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE2));
}
}
Loading

0 comments on commit fcd5dd9

Please sign in to comment.