Skip to content

Commit

Permalink
DBZ-8363 temporarily remove cluster name config
Browse files Browse the repository at this point in the history
  • Loading branch information
smiklosovic committed Nov 4, 2024
1 parent 584d390 commit f4d556b
Show file tree
Hide file tree
Showing 16 changed files with 59 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.function.Function;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.AsciiType;
import org.apache.cassandra.db.marshal.BooleanType;
Expand Down Expand Up @@ -109,4 +110,9 @@ public Object deserialize(Object abstractType, ByteBuffer bb) {
public Function<Object, Object> baseTypeForReversedType() {
return abstractType -> ((AbstractType<?>) abstractType).isReversed() ? ((ReversedType<?>) abstractType).baseType : abstractType;
}

@Override
public String getClusterName() {
return DatabaseDescriptor.getClusterName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.io.IOException;
import java.nio.file.Paths;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.commitlog.CommitLogReadHandler;
import org.apache.cassandra.db.commitlog.CommitLogReader;

Expand All @@ -23,6 +24,11 @@ public CommitLogProcessing provideCommitLogProcessing(CassandraConnectorContext
return new Cassandra3CommitLogProcessing(context, metrics);
}

@Override
public String getClusterName() {
return DatabaseDescriptor.getClusterName();
}

private static class Cassandra3CommitLogProcessing implements CommitLogProcessing {

private final CommitLogReadHandler commitLogReadHandler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.function.Function;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.AsciiType;
import org.apache.cassandra.db.marshal.BooleanType;
Expand Down Expand Up @@ -109,4 +110,9 @@ public Object deserialize(Object abstractType, ByteBuffer bb) {
public Function<Object, Object> baseTypeForReversedType() {
return abstractType -> ((AbstractType<?>) abstractType).isReversed() ? ((ReversedType<?>) abstractType).baseType : abstractType;
}

@Override
public String getClusterName() {
return DatabaseDescriptor.getClusterName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.io.IOException;
import java.nio.file.Paths;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.commitlog.CommitLogReadHandler;
import org.apache.cassandra.db.commitlog.CommitLogReader;

Expand Down Expand Up @@ -46,6 +47,11 @@ public CommitLogProcessing provideCommitLogProcessing(CassandraConnectorContext
return new Cassandra4CommitLogProcessing(context, metrics);
}

@Override
public String getClusterName() {
return DatabaseDescriptor.getClusterName();
}

private static class Cassandra4CommitLogProcessing implements CommitLogProcessing {

private final CommitLogReadHandler commitLogReadHandler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.function.Function;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.AsciiType;
import org.apache.cassandra.db.marshal.BooleanType;
Expand Down Expand Up @@ -109,4 +110,9 @@ public Object deserialize(Object abstractType, ByteBuffer bb) {
public Function<Object, Object> baseTypeForReversedType() {
return abstractType -> ((AbstractType<?>) abstractType).isReversed() ? ((ReversedType<?>) abstractType).baseType : abstractType;
}

@Override
public String getClusterName() {
return DatabaseDescriptor.getClusterName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.io.IOException;
import java.nio.file.Paths;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.commitlog.CommitLogReadHandler;
import org.apache.cassandra.db.commitlog.CommitLogReader;

Expand Down Expand Up @@ -46,6 +47,11 @@ public CommitLogProcessing provideCommitLogProcessing(CassandraConnectorContext
return new Cassandra5CommitLogProcessing(context, metrics);
}

@Override
public String getClusterName() {
return DatabaseDescriptor.getClusterName();
}

private static class Cassandra5CommitLogProcessing implements CommitLogProcessing {

private final CommitLogReadHandler commitLogReadHandler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,11 +347,6 @@ public static EventOrderGuaranteeMode parse(String value) {
.withType(Type.STRING)
.withDescription("The absolute path of the YAML config file used by a Cassandra node.");

public static final Field CASSANDRA_CLUSTER_NAME = Field.create("cassandra.cluster.name")
.withType(Type.STRING)
.withDescription("Name of Cassandra cluster.")
.withDefault("Test Cluster");

public static final Field COMMIT_LOG_RELOCATION_DIR = Field.create("commit.log.relocation.dir")
.withType(Type.STRING)
.withValidation(Field::isRequired)
Expand Down Expand Up @@ -604,10 +599,6 @@ public String cassandraConfig() {
return this.getConfig().getString(CASSANDRA_CONFIG);
}

public String clusterName() {
return this.getConfig().getString(CASSANDRA_CLUSTER_NAME);
}

public String commitLogRelocationDir() {
return this.getConfig().getString(COMMIT_LOG_RELOCATION_DIR);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,4 @@ public interface CassandraConnectorContext {
SchemaHolder getSchemaHolder();

Set<String> getErroneousCommitLogs();

String getClusterName();
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ protected ProcessorGroup initProcessorGroup(CassandraConnectorContext taskContex
processorGroup.addProcessor(processor);
}

processorGroup.addProcessor(new SnapshotProcessor(taskContext, taskContext.getClusterName()));
processorGroup.addProcessor(new SnapshotProcessor(taskContext, deserializerProvider.getClusterName()));
List<ChangeEventQueue<Event>> queues = taskContext.getQueues();
for (int i = 0; i < queues.size(); i++) {
processorGroup.addProcessor(new QueueProcessor(taskContext, i, recordEmitter));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ public interface CassandraTypeProvider {

Function<Object, Object> baseTypeForReversedType();

String getClusterName();
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,4 @@ public SchemaHolder getSchemaHolder() {
public Set<String> getErroneousCommitLogs() {
return erroneousCommitLogs;
}

@Override
public String getClusterName() {
return this.config.clusterName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.function.Function;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.AsciiType;
import org.apache.cassandra.db.marshal.BooleanType;
Expand Down Expand Up @@ -110,4 +111,9 @@ public Object deserialize(Object abstractType, ByteBuffer bb) {
public Function<Object, Object> baseTypeForReversedType() {
return abstractType -> ((AbstractType<?>) abstractType).isReversed() ? ((ReversedType<?>) abstractType).baseType : abstractType;
}

@Override
public String getClusterName() {
return DatabaseDescriptor.getClusterName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.io.IOException;
import java.nio.file.Paths;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.commitlog.CommitLogReadHandler;
import org.apache.cassandra.db.commitlog.CommitLogReader;

Expand Down Expand Up @@ -53,6 +54,11 @@ public CommitLogProcessing provideCommitLogProcessing(CassandraConnectorContext
return new DseCommitLogProcessing(context, metrics);
}

@Override
public String getClusterName() {
return DatabaseDescriptor.getClusterName();
}

private static class DseCommitLogProcessing implements CommitLogProcessing {

private final CommitLogReadHandler commitLogReadHandler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public void verifyEvents() throws Throwable {
if (event instanceof Record) {
Record record = (Record) event;
assertEquals(record.getEventType(), Event.EventType.CHANGE_EVENT);
assertEquals(record.getSource().cluster, context.getCassandraConnectorConfig().clusterName());
assertEquals(record.getSource().cluster, provider.getClusterName());
assertFalse(record.getSource().snapshot);
assertEquals(record.getSource().keyspaceTable.name(), keyspaceTable(TEST_TABLE_NAME));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void beforeTest() {
@Test
public void testSnapshotTable() throws Throwable {
context = provider.provideContext(Configuration.from(TestUtils.generateDefaultConfigMap()));
SnapshotProcessor snapshotProcessor = Mockito.spy(new SnapshotProcessor(context, context.getClusterName()));
SnapshotProcessor snapshotProcessor = Mockito.spy(new SnapshotProcessor(context, provider.getClusterName()));
when(snapshotProcessor.isRunning()).thenReturn(true);

int tableSize = 5;
Expand Down Expand Up @@ -93,7 +93,7 @@ public void testSnapshotTable() throws Throwable {
@Test
public void testSnapshotSkipsNonCdcEnabledTable() throws Throwable {
context = provider.provideContext(Configuration.from(TestUtils.generateDefaultConfigMap()));
SnapshotProcessor snapshotProcessor = Mockito.spy(new SnapshotProcessor(context, context.getClusterName()));
SnapshotProcessor snapshotProcessor = Mockito.spy(new SnapshotProcessor(context, provider.getClusterName()));
when(snapshotProcessor.isRunning()).thenReturn(true);

int tableSize = 5;
Expand All @@ -115,7 +115,7 @@ public void testSnapshotSkipsNonCdcEnabledTable() throws Throwable {
public void testSnapshotEmptyTable() throws Throwable {
context = provider.provideContext(Configuration.from(TestUtils.generateDefaultConfigMap()));
AtomicBoolean globalTaskState = new AtomicBoolean(true);
SnapshotProcessor snapshotProcessor = Mockito.spy(new SnapshotProcessor(context, context.getClusterName()));
SnapshotProcessor snapshotProcessor = Mockito.spy(new SnapshotProcessor(context, provider.getClusterName()));
when(snapshotProcessor.isRunning()).thenReturn(true);

context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + keyspaceTable("cdc_table") + " (a int, b text, PRIMARY KEY(a)) WITH cdc = true;");
Expand Down Expand Up @@ -146,7 +146,7 @@ public void testSnapshotModeAlways() throws Throwable {

context = provider.provideContext(Configuration.from(configs));

SnapshotProcessor snapshotProcessorSpy = Mockito.spy(new SnapshotProcessor(context, context.getClusterName()));
SnapshotProcessor snapshotProcessorSpy = Mockito.spy(new SnapshotProcessor(context, provider.getClusterName()));
doNothing().when(snapshotProcessorSpy).snapshot();

for (int i = 0; i < 5; i++) {
Expand All @@ -161,7 +161,7 @@ public void testSnapshotModeInitial() throws Throwable {
configs.put(CassandraConnectorConfig.SNAPSHOT_MODE.name(), "initial");
configs.put(CassandraConnectorConfig.SNAPSHOT_POLL_INTERVAL_MS.name(), "0");
context = provider.provideContext(Configuration.from(configs));
SnapshotProcessor snapshotProcessorSpy = Mockito.spy(new SnapshotProcessor(context, context.getClusterName()));
SnapshotProcessor snapshotProcessorSpy = Mockito.spy(new SnapshotProcessor(context, provider.getClusterName()));
doNothing().when(snapshotProcessorSpy).snapshot();

for (int i = 0; i < 5; i++) {
Expand All @@ -176,7 +176,7 @@ public void testSnapshotModeNever() throws Throwable {
configs.put(CassandraConnectorConfig.SNAPSHOT_MODE.name(), "never");
configs.put(CassandraConnectorConfig.SNAPSHOT_POLL_INTERVAL_MS.name(), "0");
context = provider.provideContext(Configuration.from(configs));
SnapshotProcessor snapshotProcessorSpy = Mockito.spy(new SnapshotProcessor(context, context.getClusterName()));
SnapshotProcessor snapshotProcessorSpy = Mockito.spy(new SnapshotProcessor(context, provider.getClusterName()));
doNothing().when(snapshotProcessorSpy).snapshot();

for (int i = 0; i < 5; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,6 @@ public interface CassandraTestProvider {
CassandraConnectorContext provideContextWithoutSchemaManagement(Configuration configuration);

CommitLogProcessing provideCommitLogProcessing(CassandraConnectorContext context, CommitLogProcessorMetrics metrics);

String getClusterName();
}

0 comments on commit f4d556b

Please sign in to comment.