Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka-connect-runtime: remove code duplications in integration tests #11883

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,44 +20,22 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.NullSource;
import org.junit.jupiter.params.provider.ValueSource;

public class IntegrationDynamicTableTest extends IntegrationTestBase {

private static final String TEST_DB = "test";
private static final String TEST_TABLE1 = "tbl1";
private static final String TEST_TABLE2 = "tbl2";
private static final TableIdentifier TABLE_IDENTIFIER1 = TableIdentifier.of(TEST_DB, TEST_TABLE1);
private static final TableIdentifier TABLE_IDENTIFIER2 = TableIdentifier.of(TEST_DB, TEST_TABLE2);

@BeforeEach
public void before() {
createTopic(testTopic(), TEST_TOPIC_PARTITIONS);
((SupportsNamespaces) catalog()).createNamespace(Namespace.of(TEST_DB));
}

@AfterEach
public void after() {
context().stopConnector(connectorName());
deleteTopic(testTopic());
catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE1));
catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE2));
((SupportsNamespaces) catalog()).dropNamespace(Namespace.of(TEST_DB));
}

@ParameterizedTest
@NullSource
@ValueSource(strings = "test_branch")
Expand All @@ -68,7 +46,7 @@ public void testIcebergSink(String branch) {
catalog().createTable(TABLE_IDENTIFIER2, TestEvent.TEST_SCHEMA);

boolean useSchema = branch == null; // use a schema for one of the tests
runTest(branch, useSchema);
runTest(branch, useSchema, ImmutableMap.of(), List.of(TABLE_IDENTIFIER1, TABLE_IDENTIFIER2));

List<DataFile> files = dataFiles(TABLE_IDENTIFIER1, branch);
assertThat(files).hasSize(1);
Expand All @@ -81,55 +59,27 @@ public void testIcebergSink(String branch) {
assertSnapshotProps(TABLE_IDENTIFIER2, branch);
}

private void runTest(String branch, boolean useSchema) {
// set offset reset to earliest so we don't miss any test messages
KafkaConnectUtils.Config connectorConfig =
new KafkaConnectUtils.Config(connectorName())
.config("topics", testTopic())
.config("connector.class", IcebergSinkConnector.class.getName())
.config("tasks.max", 2)
.config("consumer.override.auto.offset.reset", "earliest")
.config("key.converter", "org.apache.kafka.connect.json.JsonConverter")
.config("key.converter.schemas.enable", false)
.config("value.converter", "org.apache.kafka.connect.json.JsonConverter")
.config("value.converter.schemas.enable", useSchema)
.config("iceberg.tables.dynamic-enabled", true)
.config("iceberg.tables.route-field", "payload")
.config("iceberg.control.commit.interval-ms", 1000)
.config("iceberg.control.commit.timeout-ms", Integer.MAX_VALUE)
.config("iceberg.kafka.auto.offset.reset", "earliest");

context().connectorCatalogProperties().forEach(connectorConfig::config);

if (branch != null) {
connectorConfig.config("iceberg.tables.default-commit-branch", branch);
}

if (!useSchema) {
connectorConfig.config("value.converter.schemas.enable", false);
}

context().startConnector(connectorConfig);
@Override
protected KafkaConnectUtils.Config createConfig(boolean useSchema) {
return createCommonConfig(useSchema)
.config("iceberg.tables.dynamic-enabled", true)
.config("iceberg.tables.route-field", "payload");
}

@Override
protected void sendEvents(boolean useSchema) {
TestEvent event1 = new TestEvent(1, "type1", Instant.now(), TEST_DB + "." + TEST_TABLE1);
TestEvent event2 = new TestEvent(2, "type2", Instant.now(), TEST_DB + "." + TEST_TABLE2);
TestEvent event3 = new TestEvent(3, "type3", Instant.now(), TEST_DB + ".tbl3");

send(testTopic(), event1, useSchema);
send(testTopic(), event2, useSchema);
send(testTopic(), event3, useSchema);
flush();

Awaitility.await()
.atMost(Duration.ofSeconds(30))
.pollInterval(Duration.ofSeconds(1))
.untilAsserted(this::assertSnapshotAdded);
}

private void assertSnapshotAdded() {
Table table = catalog().loadTable(TABLE_IDENTIFIER1);
assertThat(table.snapshots()).hasSize(1);
table = catalog().loadTable(TABLE_IDENTIFIER2);
assertThat(table.snapshots()).hasSize(1);
@Override
void dropTables() {
catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE1));
catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE2));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,44 +20,22 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.NullSource;
import org.junit.jupiter.params.provider.ValueSource;

public class IntegrationMultiTableTest extends IntegrationTestBase {

private static final String TEST_DB = "test";
private static final String TEST_TABLE1 = "foobar1";
private static final String TEST_TABLE2 = "foobar2";
private static final TableIdentifier TABLE_IDENTIFIER1 = TableIdentifier.of(TEST_DB, TEST_TABLE1);
private static final TableIdentifier TABLE_IDENTIFIER2 = TableIdentifier.of(TEST_DB, TEST_TABLE2);

@BeforeEach
public void before() {
createTopic(testTopic(), TEST_TOPIC_PARTITIONS);
((SupportsNamespaces) catalog()).createNamespace(Namespace.of(TEST_DB));
}

@AfterEach
public void after() {
context().stopConnector(connectorName());
deleteTopic(testTopic());
catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE1));
catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE2));
((SupportsNamespaces) catalog()).dropNamespace(Namespace.of(TEST_DB));
}

@ParameterizedTest
@NullSource
@ValueSource(strings = "test_branch")
Expand All @@ -68,7 +46,7 @@ public void testIcebergSink(String branch) {
catalog().createTable(TABLE_IDENTIFIER2, TestEvent.TEST_SCHEMA);

boolean useSchema = branch == null; // use a schema for one of the tests
runTest(branch, useSchema);
runTest(branch, useSchema, ImmutableMap.of(), List.of(TABLE_IDENTIFIER1, TABLE_IDENTIFIER2));

List<DataFile> files = dataFiles(TABLE_IDENTIFIER1, branch);
assertThat(files).hasSize(1);
Expand All @@ -81,60 +59,31 @@ public void testIcebergSink(String branch) {
assertSnapshotProps(TABLE_IDENTIFIER2, branch);
}

private void runTest(String branch, boolean useSchema) {
// set offset reset to earliest so we don't miss any test messages
KafkaConnectUtils.Config connectorConfig =
new KafkaConnectUtils.Config(connectorName())
.config("topics", testTopic())
.config("connector.class", IcebergSinkConnector.class.getName())
.config("tasks.max", 2)
.config("consumer.override.auto.offset.reset", "earliest")
.config("key.converter", "org.apache.kafka.connect.json.JsonConverter")
.config("key.converter.schemas.enable", false)
.config("value.converter", "org.apache.kafka.connect.json.JsonConverter")
.config("value.converter.schemas.enable", useSchema)
.config(
"iceberg.tables",
String.format("%s.%s, %s.%s", TEST_DB, TEST_TABLE1, TEST_DB, TEST_TABLE2))
.config("iceberg.tables.route-field", "type")
.config(String.format("iceberg.table.%s.%s.route-regex", TEST_DB, TEST_TABLE1), "type1")
.config(String.format("iceberg.table.%s.%s.route-regex", TEST_DB, TEST_TABLE2), "type2")
.config("iceberg.control.commit.interval-ms", 1000)
.config("iceberg.control.commit.timeout-ms", Integer.MAX_VALUE)
.config("iceberg.kafka.auto.offset.reset", "earliest");

context().connectorCatalogProperties().forEach(connectorConfig::config);

if (branch != null) {
connectorConfig.config("iceberg.tables.default-commit-branch", branch);
}

// use a schema for one of the cases
if (!useSchema) {
connectorConfig.config("value.converter.schemas.enable", false);
}

context().startConnector(connectorConfig);
@Override
protected KafkaConnectUtils.Config createConfig(boolean useSchema) {
return createCommonConfig(useSchema)
.config(
"iceberg.tables",
String.format("%s.%s, %s.%s", TEST_DB, TEST_TABLE1, TEST_DB, TEST_TABLE2))
.config("iceberg.tables.route-field", "type")
.config(String.format("iceberg.table.%s.%s.route-regex", TEST_DB, TEST_TABLE1), "type1")
.config(String.format("iceberg.table.%s.%s.route-regex", TEST_DB, TEST_TABLE2), "type2");
}

@Override
protected void sendEvents(boolean useSchema) {
TestEvent event1 = new TestEvent(1, "type1", Instant.now(), "hello world!");
TestEvent event2 = new TestEvent(2, "type2", Instant.now(), "having fun?");
TestEvent event3 = new TestEvent(3, "type3", Instant.now(), "ignore me");

send(testTopic(), event1, useSchema);
send(testTopic(), event2, useSchema);
send(testTopic(), event3, useSchema);
flush();

Awaitility.await()
.atMost(Duration.ofSeconds(30))
.pollInterval(Duration.ofSeconds(1))
.untilAsserted(this::assertSnapshotAdded);
}

private void assertSnapshotAdded() {
Table table = catalog().loadTable(TABLE_IDENTIFIER1);
assertThat(table.snapshots()).hasSize(1);
table = catalog().loadTable(TABLE_IDENTIFIER2);
assertThat(table.snapshots()).hasSize(1);
@Override
void dropTables() {
catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE1));
catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE2));
}
}
Loading