Skip to content

Commit

Permalink
Kafka-connect-runtime: remove code duplications in integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Vova Kolmakov committed Dec 28, 2024
1 parent de54a08 commit d82e94f
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 238 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,55 +20,27 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.NullSource;
import org.junit.jupiter.params.provider.ValueSource;

public class IntegrationDynamicTableTest extends IntegrationTestBase {

private static final String TEST_DB = "test";
private static final String TEST_TABLE1 = "tbl1";
private static final String TEST_TABLE2 = "tbl2";
private static final TableIdentifier TABLE_IDENTIFIER1 = TableIdentifier.of(TEST_DB, TEST_TABLE1);
private static final TableIdentifier TABLE_IDENTIFIER2 = TableIdentifier.of(TEST_DB, TEST_TABLE2);

@BeforeEach
public void before() {
createTopic(testTopic(), TEST_TOPIC_PARTITIONS);
((SupportsNamespaces) catalog()).createNamespace(Namespace.of(TEST_DB));
}

@AfterEach
public void after() {
context().stopConnector(connectorName());
deleteTopic(testTopic());
catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE1));
catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE2));
((SupportsNamespaces) catalog()).dropNamespace(Namespace.of(TEST_DB));
}

@ParameterizedTest
@NullSource
@ValueSource(strings = "test_branch")
public void testIcebergSink(String branch) {
// partitioned table
catalog().createTable(TABLE_IDENTIFIER1, TestEvent.TEST_SCHEMA, TestEvent.TEST_SPEC);
// unpartitioned table
// non-partitioned table
catalog().createTable(TABLE_IDENTIFIER2, TestEvent.TEST_SCHEMA);

boolean useSchema = branch == null; // use a schema for one of the tests
runTest(branch, useSchema);
runTest(branch, useSchema, ImmutableMap.of(), List.of(TABLE_IDENTIFIER1, TABLE_IDENTIFIER2));

List<DataFile> files = dataFiles(TABLE_IDENTIFIER1, branch);
assertThat(files).hasSize(1);
Expand All @@ -81,55 +53,21 @@ public void testIcebergSink(String branch) {
assertSnapshotProps(TABLE_IDENTIFIER2, branch);
}

private void runTest(String branch, boolean useSchema) {
// set offset reset to earliest so we don't miss any test messages
KafkaConnectUtils.Config connectorConfig =
new KafkaConnectUtils.Config(connectorName())
.config("topics", testTopic())
.config("connector.class", IcebergSinkConnector.class.getName())
.config("tasks.max", 2)
.config("consumer.override.auto.offset.reset", "earliest")
.config("key.converter", "org.apache.kafka.connect.json.JsonConverter")
.config("key.converter.schemas.enable", false)
.config("value.converter", "org.apache.kafka.connect.json.JsonConverter")
.config("value.converter.schemas.enable", useSchema)
.config("iceberg.tables.dynamic-enabled", true)
.config("iceberg.tables.route-field", "payload")
.config("iceberg.control.commit.interval-ms", 1000)
.config("iceberg.control.commit.timeout-ms", Integer.MAX_VALUE)
.config("iceberg.kafka.auto.offset.reset", "earliest");

context().connectorCatalogProperties().forEach(connectorConfig::config);

if (branch != null) {
connectorConfig.config("iceberg.tables.default-commit-branch", branch);
}

if (!useSchema) {
connectorConfig.config("value.converter.schemas.enable", false);
}

context().startConnector(connectorConfig);
@Override
protected KafkaConnectUtils.Config createConfig(boolean useSchema) {
return createCommonConfig(useSchema)
.config("iceberg.tables.dynamic-enabled", true)
.config("iceberg.tables.route-field", "payload");
}

@Override
protected void sendEvents(boolean useSchema) {
TestEvent event1 = new TestEvent(1, "type1", Instant.now(), TEST_DB + "." + TEST_TABLE1);
TestEvent event2 = new TestEvent(2, "type2", Instant.now(), TEST_DB + "." + TEST_TABLE2);
TestEvent event3 = new TestEvent(3, "type3", Instant.now(), TEST_DB + ".tbl3");

send(testTopic(), event1, useSchema);
send(testTopic(), event2, useSchema);
send(testTopic(), event3, useSchema);
flush();

Awaitility.await()
.atMost(Duration.ofSeconds(30))
.pollInterval(Duration.ofSeconds(1))
.untilAsserted(this::assertSnapshotAdded);
}

private void assertSnapshotAdded() {
Table table = catalog().loadTable(TABLE_IDENTIFIER1);
assertThat(table.snapshots()).hasSize(1);
table = catalog().loadTable(TABLE_IDENTIFIER2);
assertThat(table.snapshots()).hasSize(1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,55 +20,27 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.NullSource;
import org.junit.jupiter.params.provider.ValueSource;

public class IntegrationMultiTableTest extends IntegrationTestBase {

private static final String TEST_DB = "test";
private static final String TEST_TABLE1 = "foobar1";
private static final String TEST_TABLE2 = "foobar2";
private static final TableIdentifier TABLE_IDENTIFIER1 = TableIdentifier.of(TEST_DB, TEST_TABLE1);
private static final TableIdentifier TABLE_IDENTIFIER2 = TableIdentifier.of(TEST_DB, TEST_TABLE2);

@BeforeEach
public void before() {
createTopic(testTopic(), TEST_TOPIC_PARTITIONS);
((SupportsNamespaces) catalog()).createNamespace(Namespace.of(TEST_DB));
}

@AfterEach
public void after() {
context().stopConnector(connectorName());
deleteTopic(testTopic());
catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE1));
catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE2));
((SupportsNamespaces) catalog()).dropNamespace(Namespace.of(TEST_DB));
}

@ParameterizedTest
@NullSource
@ValueSource(strings = "test_branch")
public void testIcebergSink(String branch) {
// partitioned table
catalog().createTable(TABLE_IDENTIFIER1, TestEvent.TEST_SCHEMA, TestEvent.TEST_SPEC);
// unpartitioned table
// non-partitioned table
catalog().createTable(TABLE_IDENTIFIER2, TestEvent.TEST_SCHEMA);

boolean useSchema = branch == null; // use a schema for one of the tests
runTest(branch, useSchema);
runTest(branch, useSchema, ImmutableMap.of(), List.of(TABLE_IDENTIFIER1, TABLE_IDENTIFIER2));

List<DataFile> files = dataFiles(TABLE_IDENTIFIER1, branch);
assertThat(files).hasSize(1);
Expand All @@ -81,60 +53,25 @@ public void testIcebergSink(String branch) {
assertSnapshotProps(TABLE_IDENTIFIER2, branch);
}

private void runTest(String branch, boolean useSchema) {
// set offset reset to earliest so we don't miss any test messages
KafkaConnectUtils.Config connectorConfig =
new KafkaConnectUtils.Config(connectorName())
.config("topics", testTopic())
.config("connector.class", IcebergSinkConnector.class.getName())
.config("tasks.max", 2)
.config("consumer.override.auto.offset.reset", "earliest")
.config("key.converter", "org.apache.kafka.connect.json.JsonConverter")
.config("key.converter.schemas.enable", false)
.config("value.converter", "org.apache.kafka.connect.json.JsonConverter")
.config("value.converter.schemas.enable", useSchema)
.config(
"iceberg.tables",
String.format("%s.%s, %s.%s", TEST_DB, TEST_TABLE1, TEST_DB, TEST_TABLE2))
.config("iceberg.tables.route-field", "type")
.config(String.format("iceberg.table.%s.%s.route-regex", TEST_DB, TEST_TABLE1), "type1")
.config(String.format("iceberg.table.%s.%s.route-regex", TEST_DB, TEST_TABLE2), "type2")
.config("iceberg.control.commit.interval-ms", 1000)
.config("iceberg.control.commit.timeout-ms", Integer.MAX_VALUE)
.config("iceberg.kafka.auto.offset.reset", "earliest");

context().connectorCatalogProperties().forEach(connectorConfig::config);

if (branch != null) {
connectorConfig.config("iceberg.tables.default-commit-branch", branch);
}

// use a schema for one of the cases
if (!useSchema) {
connectorConfig.config("value.converter.schemas.enable", false);
}

context().startConnector(connectorConfig);
@Override
protected KafkaConnectUtils.Config createConfig(boolean useSchema) {
return createCommonConfig(useSchema)
.config(
"iceberg.tables",
String.format("%s.%s, %s.%s", TEST_DB, TEST_TABLE1, TEST_DB, TEST_TABLE2))
.config("iceberg.tables.route-field", "type")
.config(String.format("iceberg.table.%s.%s.route-regex", TEST_DB, TEST_TABLE1), "type1")
.config(String.format("iceberg.table.%s.%s.route-regex", TEST_DB, TEST_TABLE2), "type2");
}

@Override
protected void sendEvents(boolean useSchema) {
TestEvent event1 = new TestEvent(1, "type1", Instant.now(), "hello world!");
TestEvent event2 = new TestEvent(2, "type2", Instant.now(), "having fun?");
TestEvent event3 = new TestEvent(3, "type3", Instant.now(), "ignore me");

send(testTopic(), event1, useSchema);
send(testTopic(), event2, useSchema);
send(testTopic(), event3, useSchema);
flush();

Awaitility.await()
.atMost(Duration.ofSeconds(30))
.pollInterval(Duration.ofSeconds(1))
.untilAsserted(this::assertSnapshotAdded);
}

private void assertSnapshotAdded() {
Table table = catalog().loadTable(TABLE_IDENTIFIER1);
assertThat(table.snapshots()).hasSize(1);
table = catalog().loadTable(TABLE_IDENTIFIER2);
assertThat(table.snapshots()).hasSize(1);
}
}
Loading

0 comments on commit d82e94f

Please sign in to comment.