Skip to content

Commit

Permalink
check dsl suppliers
Browse files Browse the repository at this point in the history
  • Loading branch information
ableegoldman committed Dec 10, 2024
1 parent 31fd7e7 commit 19ebdd9
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,11 @@ private static Properties propsWithOverrides(
propsWithOverrides.putAll(configs.originals());
propsWithOverrides.putAll(internalConfBuilder.build());

if (ConfigUtils.storageBackend(configs) == StorageBackend.NONE) {
final var backend = ConfigUtils.storageBackend(configs);

ResponsiveStreamsConfig.validateStreamsConfig(propsWithOverrides, backend);

if (backend == StorageBackend.NONE) {
return propsWithOverrides;
}

Expand Down Expand Up @@ -372,12 +376,6 @@ private static Properties propsWithOverrides(
ResponsiveDslStoreSuppliers.class.getName()
);

try {
ResponsiveStreamsConfig.validateStreamsConfig(propsWithOverrides);
} catch (final ConfigException e) {
throw new StreamsException("Configuration error, please check your properties", e);
}

return propsWithOverrides;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,13 @@

import static dev.responsive.kafka.api.config.ResponsiveConfig.NUM_STANDBYS_OVERRIDE;

import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.api.config.StorageBackend;
import dev.responsive.kafka.api.stores.ResponsiveDslStoreSuppliers;
import java.util.Map;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.state.DslStoreSuppliers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -34,10 +38,33 @@ public static ResponsiveStreamsConfig streamsConfig(final Map<?, ?> props) {
return new ResponsiveStreamsConfig(props, false);
}

public static void validateStreamsConfig(final Map<?, ?> props) {
public static void validateStreamsConfig(final Map<?, ?> props, final StorageBackend backend) {
final StreamsConfig streamsConfig = streamsConfig(props);
verifyNoStandbys(streamsConfig);
verifyNotEosV1(streamsConfig);

if (backend == StorageBackend.NONE) {
verifyNoResponsiveDslStoreSuppliers(streamsConfig);
} else {
verifyNoStandbys(streamsConfig);
verifyNotEosV1(streamsConfig);
}
}

static void verifyNoResponsiveDslStoreSuppliers(final StreamsConfig config)
throws ConfigException {
final Class<?> dslStoreSuppliers =
config.getClass(StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG);
if (dslStoreSuppliers.getName().equals(ResponsiveDslStoreSuppliers.class.getName())) {
final String errorMsg = String.format(
"Invalid Streams configuration value for '%s': got %s, "
+ "incompatible with setting '%s' to %s",
StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG,
dslStoreSuppliers,
ResponsiveConfig.STORAGE_BACKEND_TYPE_CONFIG,
StorageBackend.NONE
);
LOG.error(errorMsg);
throw new ConfigException(errorMsg);
}
}

static void verifyNoStandbys(final StreamsConfig config) throws ConfigException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.apache.kafka.streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand All @@ -34,6 +35,7 @@
import com.datastax.oss.driver.api.core.CqlSession;
import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.api.config.StorageBackend;
import dev.responsive.kafka.api.stores.ResponsiveDslStoreSuppliers;
import dev.responsive.kafka.internal.db.CassandraClient;
import dev.responsive.kafka.internal.db.CassandraClientFactory;
import dev.responsive.kafka.internal.license.exception.LicenseAuthenticationException;
Expand Down Expand Up @@ -154,8 +156,8 @@ public void shouldInvalidateBadConfigs() {
builder.stream("foo").to("bar");

// Then:
final StreamsException e = assertThrows(
StreamsException.class,
final ConfigException e = assertThrows(
ConfigException.class,
() -> new IntegrationTestUtils.MockResponsiveKafkaStreams(
builder.build(),
properties,
Expand All @@ -164,7 +166,7 @@ public void shouldInvalidateBadConfigs() {
)
);
assertThat(
e.getCause().getMessage(),
e.getMessage(),
Matchers.containsString("Invalid Streams configuration value for 'num.standby.replicas'")
);
}
Expand All @@ -186,6 +188,34 @@ public void shouldCreateResponsiveKafkaStreamsInNonResponsiveStorageModeWithUnve
ks.close();
}

@Test
public void shouldThrowWhenUsingResponsiveDslStoreSuppliersInNonResponsiveStorageMode() {
// Given:
properties.put(STORAGE_BACKEND_TYPE_CONFIG, StorageBackend.NONE.name());
properties.put(DSL_STORE_SUPPLIERS_CLASS_CONFIG, ResponsiveDslStoreSuppliers.class.getName());

// When:
final StreamsBuilder builder = new StreamsBuilder();
builder.stream("foo").to("bar");

// then:
final ConfigException e = assertThrows(
ConfigException.class,
() -> {
final var ks = new ResponsiveKafkaStreams(builder.build(), properties, supplier);
ks.close();
}
);
assertThat(
e.getMessage(),
Matchers.containsString(
"Invalid Streams configuration value for 'dsl.store.suppliers.class': "
+ "got class dev.responsive.kafka.api.stores.ResponsiveDslStoreSuppliers,"
+ " incompatible with setting 'responsive.storage.backend.type' to NONE"
)
);
}

@Test
public void shouldAcceptLicenseInLicenseFile() {
// given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import static dev.responsive.kafka.api.config.ResponsiveConfig.ASYNC_MAX_EVENTS_QUEUED_PER_ASYNC_THREAD_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.ASYNC_MAX_EVENTS_QUEUED_PER_KEY_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.ASYNC_THREAD_POOL_SIZE_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.STORAGE_BACKEND_TYPE_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.STORE_FLUSH_INTERVAL_TRIGGER_MS_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.STORE_FLUSH_RECORDS_TRIGGER_CONFIG;
import static dev.responsive.kafka.testutils.IntegrationTestUtils.createTopicsAndWait;
Expand Down Expand Up @@ -54,7 +53,6 @@
import com.google.common.base.Throwables;
import dev.responsive.kafka.api.ResponsiveKafkaStreams;
import dev.responsive.kafka.api.config.StorageBackend;
import dev.responsive.kafka.api.stores.ResponsiveKeyValueParams;
import dev.responsive.kafka.api.stores.ResponsiveStores;
import dev.responsive.kafka.testutils.ResponsiveConfigParam;
import dev.responsive.kafka.testutils.ResponsiveExtension;
Expand Down Expand Up @@ -95,7 +93,6 @@
import org.apache.kafka.streams.processor.api.FixedKeyProcessor;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext;
import org.apache.kafka.streams.processor.api.FixedKeyRecord;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand Down

0 comments on commit 19ebdd9

Please sign in to comment.