diff --git a/cassandra-3/src/main/java/io/debezium/connector/cassandra/Cassandra3TypeProvider.java b/cassandra-3/src/main/java/io/debezium/connector/cassandra/Cassandra3TypeProvider.java index d631962..c95c4bc 100644 --- a/cassandra-3/src/main/java/io/debezium/connector/cassandra/Cassandra3TypeProvider.java +++ b/cassandra-3/src/main/java/io/debezium/connector/cassandra/Cassandra3TypeProvider.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.function.Function; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.AsciiType; import org.apache.cassandra.db.marshal.BooleanType; @@ -109,4 +110,9 @@ public Object deserialize(Object abstractType, ByteBuffer bb) { public Function baseTypeForReversedType() { return abstractType -> ((AbstractType) abstractType).isReversed() ? ((ReversedType) abstractType).baseType : abstractType; } + + @Override + public String getClusterName() { + return DatabaseDescriptor.getClusterName(); + } } diff --git a/cassandra-3/src/test/java/io/debezium/connector/cassandra/Cassandra3TestProvider.java b/cassandra-3/src/test/java/io/debezium/connector/cassandra/Cassandra3TestProvider.java index 5188cbb..f714d16 100644 --- a/cassandra-3/src/test/java/io/debezium/connector/cassandra/Cassandra3TestProvider.java +++ b/cassandra-3/src/test/java/io/debezium/connector/cassandra/Cassandra3TestProvider.java @@ -9,6 +9,7 @@ import java.io.IOException; import java.nio.file.Paths; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.commitlog.CommitLogReadHandler; import org.apache.cassandra.db.commitlog.CommitLogReader; @@ -23,6 +24,11 @@ public CommitLogProcessing provideCommitLogProcessing(CassandraConnectorContext return new Cassandra3CommitLogProcessing(context, metrics); } + @Override + public String getClusterName() { + return DatabaseDescriptor.getClusterName(); + } + private static class Cassandra3CommitLogProcessing implements CommitLogProcessing { private final CommitLogReadHandler commitLogReadHandler; diff --git a/cassandra-4/src/main/java/io/debezium/connector/cassandra/Cassandra4TypeProvider.java b/cassandra-4/src/main/java/io/debezium/connector/cassandra/Cassandra4TypeProvider.java index b29b0af..0cdd07e 100644 --- a/cassandra-4/src/main/java/io/debezium/connector/cassandra/Cassandra4TypeProvider.java +++ b/cassandra-4/src/main/java/io/debezium/connector/cassandra/Cassandra4TypeProvider.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.function.Function; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.AsciiType; import org.apache.cassandra.db.marshal.BooleanType; @@ -109,4 +110,9 @@ public Object deserialize(Object abstractType, ByteBuffer bb) { public Function baseTypeForReversedType() { return abstractType -> ((AbstractType) abstractType).isReversed() ? ((ReversedType) abstractType).baseType : abstractType; } + + @Override + public String getClusterName() { + return DatabaseDescriptor.getClusterName(); + } } diff --git a/cassandra-4/src/test/java/io/debezium/connector/cassandra/Cassandra4TestProvider.java b/cassandra-4/src/test/java/io/debezium/connector/cassandra/Cassandra4TestProvider.java index e0c99a2..8228b62 100644 --- a/cassandra-4/src/test/java/io/debezium/connector/cassandra/Cassandra4TestProvider.java +++ b/cassandra-4/src/test/java/io/debezium/connector/cassandra/Cassandra4TestProvider.java @@ -9,6 +9,7 @@ import java.io.IOException; import java.nio.file.Paths; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.commitlog.CommitLogReadHandler; import org.apache.cassandra.db.commitlog.CommitLogReader; @@ -46,6 +47,11 @@ public CommitLogProcessing provideCommitLogProcessing(CassandraConnectorContext return new Cassandra4CommitLogProcessing(context, metrics); } + @Override + public String getClusterName() { + return DatabaseDescriptor.getClusterName(); + } + private static class Cassandra4CommitLogProcessing implements CommitLogProcessing { private final CommitLogReadHandler commitLogReadHandler; diff --git a/cassandra-5/src/main/java/io/debezium/connector/cassandra/Cassandra5TypeProvider.java b/cassandra-5/src/main/java/io/debezium/connector/cassandra/Cassandra5TypeProvider.java index d5ed00f..1dc07a9 100644 --- a/cassandra-5/src/main/java/io/debezium/connector/cassandra/Cassandra5TypeProvider.java +++ b/cassandra-5/src/main/java/io/debezium/connector/cassandra/Cassandra5TypeProvider.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.function.Function; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.AsciiType; import org.apache.cassandra.db.marshal.BooleanType; @@ -109,4 +110,9 @@ public Object deserialize(Object abstractType, ByteBuffer bb) { public Function baseTypeForReversedType() { return abstractType -> ((AbstractType) abstractType).isReversed() ? ((ReversedType) abstractType).baseType : abstractType; } + + @Override + public String getClusterName() { + return DatabaseDescriptor.getClusterName(); + } } diff --git a/cassandra-5/src/test/java/io/debezium/connector/cassandra/Cassandra5TestProvider.java b/cassandra-5/src/test/java/io/debezium/connector/cassandra/Cassandra5TestProvider.java index 8e2ce1f..2414a62 100644 --- a/cassandra-5/src/test/java/io/debezium/connector/cassandra/Cassandra5TestProvider.java +++ b/cassandra-5/src/test/java/io/debezium/connector/cassandra/Cassandra5TestProvider.java @@ -9,6 +9,7 @@ import java.io.IOException; import java.nio.file.Paths; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.commitlog.CommitLogReadHandler; import org.apache.cassandra.db.commitlog.CommitLogReader; @@ -46,6 +47,11 @@ public CommitLogProcessing provideCommitLogProcessing(CassandraConnectorContext return new Cassandra5CommitLogProcessing(context, metrics); } + @Override + public String getClusterName() { + return DatabaseDescriptor.getClusterName(); + } + private static class Cassandra5CommitLogProcessing implements CommitLogProcessing { private final CommitLogReadHandler commitLogReadHandler; diff --git a/core/src/main/java/io/debezium/connector/cassandra/CassandraConnectorConfig.java b/core/src/main/java/io/debezium/connector/cassandra/CassandraConnectorConfig.java index 67b9198..77bb3fd 100644 --- a/core/src/main/java/io/debezium/connector/cassandra/CassandraConnectorConfig.java +++ b/core/src/main/java/io/debezium/connector/cassandra/CassandraConnectorConfig.java @@ -347,11 +347,6 @@ public static EventOrderGuaranteeMode parse(String value) { .withType(Type.STRING) .withDescription("The absolute path of the YAML config file used by a Cassandra node."); - public static final Field CASSANDRA_CLUSTER_NAME = Field.create("cassandra.cluster.name") - .withType(Type.STRING) - .withDescription("Name of Cassandra cluster.") - .withDefault("Test Cluster"); - public static final Field COMMIT_LOG_RELOCATION_DIR = Field.create("commit.log.relocation.dir") .withType(Type.STRING) .withValidation(Field::isRequired) @@ -604,10 +599,6 @@ public String cassandraConfig() { return this.getConfig().getString(CASSANDRA_CONFIG); } - public String clusterName() { - return this.getConfig().getString(CASSANDRA_CLUSTER_NAME); - } - public String commitLogRelocationDir() { return this.getConfig().getString(COMMIT_LOG_RELOCATION_DIR); } diff --git a/core/src/main/java/io/debezium/connector/cassandra/CassandraConnectorContext.java b/core/src/main/java/io/debezium/connector/cassandra/CassandraConnectorContext.java index 34e3fae..f9416f1 100644 --- a/core/src/main/java/io/debezium/connector/cassandra/CassandraConnectorContext.java +++ b/core/src/main/java/io/debezium/connector/cassandra/CassandraConnectorContext.java @@ -32,6 +32,4 @@ public interface CassandraConnectorContext { SchemaHolder getSchemaHolder(); Set getErroneousCommitLogs(); - - String getClusterName(); } diff --git a/core/src/main/java/io/debezium/connector/cassandra/CassandraConnectorTaskTemplate.java b/core/src/main/java/io/debezium/connector/cassandra/CassandraConnectorTaskTemplate.java index 3cca512..b0d70ce 100644 --- a/core/src/main/java/io/debezium/connector/cassandra/CassandraConnectorTaskTemplate.java +++ b/core/src/main/java/io/debezium/connector/cassandra/CassandraConnectorTaskTemplate.java @@ -154,7 +154,7 @@ protected ProcessorGroup initProcessorGroup(CassandraConnectorContext taskContex processorGroup.addProcessor(processor); } - processorGroup.addProcessor(new SnapshotProcessor(taskContext, taskContext.getClusterName())); + processorGroup.addProcessor(new SnapshotProcessor(taskContext, deserializerProvider.getClusterName())); List> queues = taskContext.getQueues(); for (int i = 0; i < queues.size(); i++) { processorGroup.addProcessor(new QueueProcessor(taskContext, i, recordEmitter)); diff --git a/core/src/main/java/io/debezium/connector/cassandra/CassandraTypeProvider.java b/core/src/main/java/io/debezium/connector/cassandra/CassandraTypeProvider.java index 3efa93c..87eb820 100644 --- a/core/src/main/java/io/debezium/connector/cassandra/CassandraTypeProvider.java +++ b/core/src/main/java/io/debezium/connector/cassandra/CassandraTypeProvider.java @@ -16,4 +16,5 @@ public interface CassandraTypeProvider { Function baseTypeForReversedType(); + String getClusterName(); } diff --git a/core/src/main/java/io/debezium/connector/cassandra/DefaultCassandraConnectorContext.java b/core/src/main/java/io/debezium/connector/cassandra/DefaultCassandraConnectorContext.java index 0d6dac2..ff0d490 100644 --- a/core/src/main/java/io/debezium/connector/cassandra/DefaultCassandraConnectorContext.java +++ b/core/src/main/java/io/debezium/connector/cassandra/DefaultCassandraConnectorContext.java @@ -114,9 +114,4 @@ public SchemaHolder getSchemaHolder() { public Set getErroneousCommitLogs() { return erroneousCommitLogs; } - - @Override - public String getClusterName() { - return this.config.clusterName(); - } } diff --git a/dse/src/main/java/io/debezium/connector/dse/DseTypeProvider.java b/dse/src/main/java/io/debezium/connector/dse/DseTypeProvider.java index 4c12a6b..8678d79 100644 --- a/dse/src/main/java/io/debezium/connector/dse/DseTypeProvider.java +++ b/dse/src/main/java/io/debezium/connector/dse/DseTypeProvider.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.function.Function; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.AsciiType; import org.apache.cassandra.db.marshal.BooleanType; @@ -110,4 +111,9 @@ public Object deserialize(Object abstractType, ByteBuffer bb) { public Function baseTypeForReversedType() { return abstractType -> ((AbstractType) abstractType).isReversed() ? ((ReversedType) abstractType).baseType : abstractType; } + + @Override + public String getClusterName() { + return DatabaseDescriptor.getClusterName(); + } } diff --git a/dse/src/test/java/io/debezium/connector/dse/DseTestProvider.java b/dse/src/test/java/io/debezium/connector/dse/DseTestProvider.java index 940ac6b..de4db40 100644 --- a/dse/src/test/java/io/debezium/connector/dse/DseTestProvider.java +++ b/dse/src/test/java/io/debezium/connector/dse/DseTestProvider.java @@ -9,6 +9,7 @@ import java.io.IOException; import java.nio.file.Paths; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.commitlog.CommitLogReadHandler; import org.apache.cassandra.db.commitlog.CommitLogReader; @@ -53,6 +54,11 @@ public CommitLogProcessing provideCommitLogProcessing(CassandraConnectorContext return new DseCommitLogProcessing(context, metrics); } + @Override + public String getClusterName() { + return DatabaseDescriptor.getClusterName(); + } + private static class DseCommitLogProcessing implements CommitLogProcessing { private final CommitLogReadHandler commitLogReadHandler; diff --git a/tests/src/test/java/io/debezium/connector/cassandra/RowInsertionModificationCommitLogProcessingTest.java b/tests/src/test/java/io/debezium/connector/cassandra/RowInsertionModificationCommitLogProcessingTest.java index 647d941..307dea4 100644 --- a/tests/src/test/java/io/debezium/connector/cassandra/RowInsertionModificationCommitLogProcessingTest.java +++ b/tests/src/test/java/io/debezium/connector/cassandra/RowInsertionModificationCommitLogProcessingTest.java @@ -33,7 +33,7 @@ public void verifyEvents() throws Throwable { if (event instanceof Record) { Record record = (Record) event; assertEquals(record.getEventType(), Event.EventType.CHANGE_EVENT); - assertEquals(record.getSource().cluster, context.getCassandraConnectorConfig().clusterName()); + assertEquals(record.getSource().cluster, provider.getClusterName()); assertFalse(record.getSource().snapshot); assertEquals(record.getSource().keyspaceTable.name(), keyspaceTable(TEST_TABLE_NAME)); } diff --git a/tests/src/test/java/io/debezium/connector/cassandra/SnapshotProcessorTest.java b/tests/src/test/java/io/debezium/connector/cassandra/SnapshotProcessorTest.java index de0aacc..c73421e 100644 --- a/tests/src/test/java/io/debezium/connector/cassandra/SnapshotProcessorTest.java +++ b/tests/src/test/java/io/debezium/connector/cassandra/SnapshotProcessorTest.java @@ -51,7 +51,7 @@ public void beforeTest() { @Test public void testSnapshotTable() throws Throwable { context = provider.provideContext(Configuration.from(TestUtils.generateDefaultConfigMap())); - SnapshotProcessor snapshotProcessor = Mockito.spy(new SnapshotProcessor(context, context.getClusterName())); + SnapshotProcessor snapshotProcessor = Mockito.spy(new SnapshotProcessor(context, provider.getClusterName())); when(snapshotProcessor.isRunning()).thenReturn(true); int tableSize = 5; @@ -93,7 +93,7 @@ public void testSnapshotTable() throws Throwable { @Test public void testSnapshotSkipsNonCdcEnabledTable() throws Throwable { context = provider.provideContext(Configuration.from(TestUtils.generateDefaultConfigMap())); - SnapshotProcessor snapshotProcessor = Mockito.spy(new SnapshotProcessor(context, context.getClusterName())); + SnapshotProcessor snapshotProcessor = Mockito.spy(new SnapshotProcessor(context, provider.getClusterName())); when(snapshotProcessor.isRunning()).thenReturn(true); int tableSize = 5; @@ -115,7 +115,7 @@ public void testSnapshotSkipsNonCdcEnabledTable() throws Throwable { public void testSnapshotEmptyTable() throws Throwable { context = provider.provideContext(Configuration.from(TestUtils.generateDefaultConfigMap())); AtomicBoolean globalTaskState = new AtomicBoolean(true); - SnapshotProcessor snapshotProcessor = Mockito.spy(new SnapshotProcessor(context, context.getClusterName())); + SnapshotProcessor snapshotProcessor = Mockito.spy(new SnapshotProcessor(context, provider.getClusterName())); when(snapshotProcessor.isRunning()).thenReturn(true); context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + keyspaceTable("cdc_table") + " (a int, b text, PRIMARY KEY(a)) WITH cdc = true;"); @@ -146,7 +146,7 @@ public void testSnapshotModeAlways() throws Throwable { context = provider.provideContext(Configuration.from(configs)); - SnapshotProcessor snapshotProcessorSpy = Mockito.spy(new SnapshotProcessor(context, context.getClusterName())); + SnapshotProcessor snapshotProcessorSpy = Mockito.spy(new SnapshotProcessor(context, provider.getClusterName())); doNothing().when(snapshotProcessorSpy).snapshot(); for (int i = 0; i < 5; i++) { @@ -161,7 +161,7 @@ public void testSnapshotModeInitial() throws Throwable { configs.put(CassandraConnectorConfig.SNAPSHOT_MODE.name(), "initial"); configs.put(CassandraConnectorConfig.SNAPSHOT_POLL_INTERVAL_MS.name(), "0"); context = provider.provideContext(Configuration.from(configs)); - SnapshotProcessor snapshotProcessorSpy = Mockito.spy(new SnapshotProcessor(context, context.getClusterName())); + SnapshotProcessor snapshotProcessorSpy = Mockito.spy(new SnapshotProcessor(context, provider.getClusterName())); doNothing().when(snapshotProcessorSpy).snapshot(); for (int i = 0; i < 5; i++) { @@ -176,7 +176,7 @@ public void testSnapshotModeNever() throws Throwable { configs.put(CassandraConnectorConfig.SNAPSHOT_MODE.name(), "never"); configs.put(CassandraConnectorConfig.SNAPSHOT_POLL_INTERVAL_MS.name(), "0"); context = provider.provideContext(Configuration.from(configs)); - SnapshotProcessor snapshotProcessorSpy = Mockito.spy(new SnapshotProcessor(context, context.getClusterName())); + SnapshotProcessor snapshotProcessorSpy = Mockito.spy(new SnapshotProcessor(context, provider.getClusterName())); doNothing().when(snapshotProcessorSpy).snapshot(); for (int i = 0; i < 5; i++) { diff --git a/tests/src/test/java/io/debezium/connector/cassandra/spi/CassandraTestProvider.java b/tests/src/test/java/io/debezium/connector/cassandra/spi/CassandraTestProvider.java index 96e1913..7939c18 100644 --- a/tests/src/test/java/io/debezium/connector/cassandra/spi/CassandraTestProvider.java +++ b/tests/src/test/java/io/debezium/connector/cassandra/spi/CassandraTestProvider.java @@ -16,4 +16,6 @@ public interface CassandraTestProvider { CassandraConnectorContext provideContextWithoutSchemaManagement(Configuration configuration); CommitLogProcessing provideCommitLogProcessing(CassandraConnectorContext context, CommitLogProcessorMetrics metrics); + + String getClusterName(); }