diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java b/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java index c908d7fba..09ffbf058 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java @@ -351,16 +351,16 @@ private static Properties propsWithOverrides( return propsWithOverrides; } - final Object o = configs.originals().get(InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS); + final Object o = configs.originals().get(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG); if (o == null) { propsWithOverrides.put( - InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, + StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG, TASK_ASSIGNOR_CLASS_OVERRIDE ); } else if (!TASK_ASSIGNOR_CLASS_OVERRIDE.equals(o.toString())) { final String errorMsg = String.format( "Invalid Streams configuration value for '%s': got %s, expected '%s'", - InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, + StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG, o, TASK_ASSIGNOR_CLASS_OVERRIDE ); diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java b/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java index 07bceaf43..3665b4636 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java @@ -29,7 +29,7 @@ import org.apache.kafka.common.config.ConfigDef.Validator; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.types.Password; -import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor; +import org.apache.kafka.streams.processor.assignment.assignors.StickyTaskAssignor; /** * Configurations for {@link ResponsiveKafkaStreams} diff --git a/kafka-client/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadIntegrationTest.java b/kafka-client/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadIntegrationTest.java index fcf029543..7b7ee24a9 100644 --- a/kafka-client/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadIntegrationTest.java +++ b/kafka-client/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadIntegrationTest.java @@ -291,7 +291,8 @@ public void init(final ProcessorContext context) { public void process(final Record record) { global.put(record.key(), record.value()); } - } + }, + false ); final String baseDirectoryName = tempDir.getAbsolutePath(); diff --git a/responsive-spring/build.gradle.kts b/responsive-spring/build.gradle.kts index 8d25b1f15..71a1e0f87 100644 --- a/responsive-spring/build.gradle.kts +++ b/responsive-spring/build.gradle.kts @@ -44,7 +44,7 @@ version = project(":kafka-client").version dependencies { implementation(project(":kafka-client")) implementation("com.google.code.findbugs:jsr305:3.0.2") - implementation("org.springframework.kafka:spring-kafka:3.2.4") + implementation("org.springframework.kafka:spring-kafka:3.3.0") implementation("org.springframework.boot:spring-boot-starter:3.3.2") testImplementation(testlibs.bundles.base) diff --git a/responsive-spring/src/main/java/dev/responsive/spring/annotations/ResponsiveDefaultConfiguration.java b/responsive-spring/src/main/java/dev/responsive/spring/annotations/ResponsiveDefaultConfiguration.java index bb837416c..45efaa564 100644 --- a/responsive-spring/src/main/java/dev/responsive/spring/annotations/ResponsiveDefaultConfiguration.java +++ b/responsive-spring/src/main/java/dev/responsive/spring/annotations/ResponsiveDefaultConfiguration.java @@ -12,7 +12,7 @@ package dev.responsive.spring.annotations; -import dev.responsive.spring.config.ResponsiveFactoryBean; +import dev.responsive.spring.config.ResponsiveKafkaStreamsCustomizer; import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.UnsatisfiedDependencyException; import org.springframework.beans.factory.annotation.Qualifier; @@ -35,8 +35,9 @@ public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder( ) { KafkaStreamsConfiguration streamsConfig = streamsConfigProvider.getIfAvailable(); if (streamsConfig != null) { - StreamsBuilderFactoryBean fb = new ResponsiveFactoryBean(streamsConfig); + StreamsBuilderFactoryBean fb = new StreamsBuilderFactoryBean(streamsConfig); configurerProvider.orderedStream().forEach(configurer -> configurer.configure(fb)); + fb.setKafkaStreamsCustomizer((ResponsiveKafkaStreamsCustomizer) kafkaStreams -> { }); return fb; } else { throw new UnsatisfiedDependencyException(KafkaStreamsDefaultConfiguration.class.getName(), diff --git a/responsive-spring/src/main/java/dev/responsive/spring/config/ResponsiveFactoryBean.java b/responsive-spring/src/main/java/dev/responsive/spring/config/ResponsiveFactoryBean.java deleted file mode 100644 index 50ff4789a..000000000 --- a/responsive-spring/src/main/java/dev/responsive/spring/config/ResponsiveFactoryBean.java +++ /dev/null @@ -1,349 +0,0 @@ -/* - * Copyright 2024 Responsive Computing, Inc. - * - * This source code is licensed under the Responsive Business Source License Agreement v1.0 - * available at: - * - * https://www.responsive.dev/legal/responsive-bsl-10 - * - * This software requires a valid Commercial License Key for production use. Trial and commercial - * licenses can be obtained at https://www.responsive.dev - */ - -package dev.responsive.spring.config; - -import dev.responsive.kafka.api.ResponsiveKafkaStreams; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.locks.ReentrantLock; -import org.apache.commons.logging.LogFactory; -import org.apache.kafka.streams.KafkaClientSupplier; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.TopologyConfig; -import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; -import org.apache.kafka.streams.processor.StateRestoreListener; -import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; -import org.springframework.core.log.LogAccessor; -import org.springframework.kafka.KafkaException; -import org.springframework.kafka.config.KafkaStreamsConfiguration; -import org.springframework.kafka.config.KafkaStreamsCustomizer; -import org.springframework.kafka.config.KafkaStreamsInfrastructureCustomizer; -import org.springframework.kafka.config.StreamsBuilderFactoryBean; -import org.springframework.kafka.core.CleanupConfig; -import org.springframework.lang.Nullable; -import org.springframework.util.Assert; - -public class ResponsiveFactoryBean extends StreamsBuilderFactoryBean { - - public static final Duration DEFAULT_CLOSE_TIMEOUT = Duration.ofSeconds(10); - - private static final LogAccessor LOGGER = - new LogAccessor(LogFactory.getLog(StreamsBuilderFactoryBean.class)); - - private final ReentrantLock lifecycleLock = new ReentrantLock(); - private final List listeners = new ArrayList<>(); - - private KafkaClientSupplier clientSupplier = new DefaultKafkaClientSupplier(); - private Properties properties; - private CleanupConfig cleanupConfig; - private KafkaStreamsInfrastructureCustomizer infrastructureCustomizer = new InfraCustomizer(); - - private KafkaStreamsCustomizer kafkaStreamsCustomizer; - private KafkaStreams.StateListener stateListener; - private StateRestoreListener stateRestoreListener; - private StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler; - private boolean autoStartup = true; - private int phase = Integer.MAX_VALUE - 1000; // NOSONAR magic # - private Duration closeTimeout = DEFAULT_CLOSE_TIMEOUT; - private boolean leaveGroupOnClose = false; - private KafkaStreams kafkaStreams; - private volatile boolean running; - private Topology topology; - private String beanName; - - public ResponsiveFactoryBean() { - this.cleanupConfig = new CleanupConfig(); - } - - public ResponsiveFactoryBean(KafkaStreamsConfiguration streamsConfiguration) { - this(streamsConfiguration, new CleanupConfig()); - } - - public ResponsiveFactoryBean( - KafkaStreamsConfiguration streamsConfig, - CleanupConfig cleanupConfig - ) { - super(streamsConfig, cleanupConfig); - Assert.notNull(streamsConfig, "'streamsConfig' must not be null"); - Assert.notNull(cleanupConfig, "'cleanupConfig' must not be null"); - this.properties = streamsConfig.asProperties(); - this.cleanupConfig = cleanupConfig; - } - - @Override - public synchronized void setBeanName(final String name) { - this.beanName = name; - } - - @Override - public void setStreamsConfiguration(Properties streamsConfig) { - Assert.notNull(streamsConfig, "'streamsConfig' must not be null"); - this.properties = streamsConfig; - } - - @Override - @Nullable - public Properties getStreamsConfiguration() { - return properties; - } - - @Override - public void setClientSupplier(final KafkaClientSupplier clientSupplier) { - Assert.notNull(clientSupplier, "'clientSupplier' must not be null"); - this.clientSupplier = clientSupplier; - } - - @Override - public void setInfrastructureCustomizer( - final KafkaStreamsInfrastructureCustomizer infrastructureCustomizer - ) { - Assert.notNull(infrastructureCustomizer, "'infrastructureCustomizer' must not be null"); - this.infrastructureCustomizer = infrastructureCustomizer; - } - - @Override - public void setKafkaStreamsCustomizer(final KafkaStreamsCustomizer kafkaStreamsCustomizer) { - Assert.notNull(kafkaStreamsCustomizer, "'kafkaStreamsCustomizer' must not be null"); - this.kafkaStreamsCustomizer = kafkaStreamsCustomizer; - } - - @Override - public void setStateListener(final KafkaStreams.StateListener stateListener) { - this.stateListener = stateListener; - } - - @Override - public void setStreamsUncaughtExceptionHandler( - final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler - ) { - this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler; - } - - @Override - public StreamsUncaughtExceptionHandler getStreamsUncaughtExceptionHandler() { - return streamsUncaughtExceptionHandler; - } - - @Override - public void setStateRestoreListener(final StateRestoreListener stateRestoreListener) { - this.stateRestoreListener = stateRestoreListener; - } - - @Override - public void setCloseTimeout(final int closeTimeout) { - this.closeTimeout = Duration.ofSeconds(closeTimeout); - } - - @Override - public void setLeaveGroupOnClose(final boolean leaveGroupOnClose) { - this.leaveGroupOnClose = leaveGroupOnClose; - } - - @Override - public Topology getTopology() { - return topology; - } - - @Override - public Class getObjectType() { - return super.getObjectType(); - } - - @Override - public void setAutoStartup(final boolean autoStartup) { - this.autoStartup = autoStartup; - } - - @Override - public void setPhase(final int phase) { - this.phase = phase; - } - - @Override - public int getPhase() { - return this.phase; - } - - @Override - public void setCleanupConfig(final CleanupConfig cleanupConfig) { - this.cleanupConfig = cleanupConfig; - } - - @Override - @Nullable - public synchronized KafkaStreams getKafkaStreams() { - this.lifecycleLock.lock(); - try { - return this.kafkaStreams; - } finally { - this.lifecycleLock.unlock(); - } - } - - @Override - public List getListeners() { - return Collections.unmodifiableList(this.listeners); - } - - @Override - public void addListener(Listener listener) { - Assert.notNull(listener, "'listener' cannot be null"); - this.listeners.add(listener); - } - - @Override - public boolean removeListener(Listener listener) { - return this.listeners.remove(listener); - } - - @Override - protected StreamsBuilder createInstance() { - this.lifecycleLock.lock(); - try { - if (this.autoStartup) { - Assert.state( - this.properties != null, - "streams configuration properties must not be null" - ); - } - StreamsBuilder builder = createStreamBuilder(); - this.infrastructureCustomizer.configureBuilder(builder); - return builder; - } finally { - this.lifecycleLock.unlock(); - } - } - - @Override - public boolean isAutoStartup() { - return this.autoStartup; - } - - @Override - public void start() { - this.lifecycleLock.lock(); - try { - if (!this.running) { - try { - Assert.state( - this.properties != null, - "streams configuration properties must not be null" - ); - this.kafkaStreams = new ResponsiveKafkaStreams( - this.topology, - this.properties, - this.clientSupplier - ); - this.kafkaStreams.setStateListener(this.stateListener); - this.kafkaStreams.setGlobalStateRestoreListener(this.stateRestoreListener); - if (this.streamsUncaughtExceptionHandler != null) { - this.kafkaStreams.setUncaughtExceptionHandler(this.streamsUncaughtExceptionHandler); - } - if (this.kafkaStreamsCustomizer != null) { - this.kafkaStreamsCustomizer.customize(this.kafkaStreams); - } - if (this.cleanupConfig.cleanupOnStart()) { - this.kafkaStreams.cleanUp(); - } - this.kafkaStreams.start(); - for (Listener listener : this.listeners) { - listener.streamsAdded(this.beanName, this.kafkaStreams); - } - this.running = true; - } catch (Exception e) { - throw new KafkaException("Could not start stream: ", e); - } - } - } finally { - this.lifecycleLock.unlock(); - } - } - - @Override - public void stop(Runnable callback) { - stop(); - if (callback != null) { - callback.run(); - } - } - - @Override - public void stop() { - this.lifecycleLock.lock(); - try { - if (this.running) { - try { - if (this.kafkaStreams != null) { - this.kafkaStreams.close(new KafkaStreams.CloseOptions() - .timeout(this.closeTimeout) - .leaveGroup(this.leaveGroupOnClose) - ); - if (this.cleanupConfig.cleanupOnStop()) { - this.kafkaStreams.cleanUp(); - } - for (Listener listener : this.listeners) { - listener.streamsRemoved(this.beanName, this.kafkaStreams); - } - this.kafkaStreams = null; - } - } catch (Exception e) { - LOGGER.error(e, "Failed to stop streams"); - } finally { - this.running = false; - } - } - } finally { - this.lifecycleLock.unlock(); - } - } - - @Override - public void afterSingletonsInstantiated() { - try { - this.topology = getObject().build(this.properties); - this.infrastructureCustomizer.configureTopology(this.topology); - LOGGER.debug(() -> this.topology.describe().toString()); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - @Override - public boolean isRunning() { - this.lifecycleLock.lock(); - try { - return this.running; - } finally { - this.lifecycleLock.unlock(); - } - } - - private StreamsBuilder createStreamBuilder() { - if (this.properties == null) { - return new StreamsBuilder(); - } else { - StreamsConfig streamsConfig = new StreamsConfig(this.properties); - TopologyConfig topologyConfig = new TopologyConfig(streamsConfig); - return new StreamsBuilder(topologyConfig); - } - } - - private static class InfraCustomizer implements KafkaStreamsInfrastructureCustomizer { - } -} diff --git a/responsive-spring/src/main/java/dev/responsive/spring/config/ResponsiveKafkaStreamsCustomizer.java b/responsive-spring/src/main/java/dev/responsive/spring/config/ResponsiveKafkaStreamsCustomizer.java new file mode 100644 index 000000000..feb3dd0ca --- /dev/null +++ b/responsive-spring/src/main/java/dev/responsive/spring/config/ResponsiveKafkaStreamsCustomizer.java @@ -0,0 +1,31 @@ +/* + * Copyright 2024 Responsive Computing, Inc. + * + * This source code is licensed under the Responsive Business Source License Agreement v1.0 + * available at: + * + * https://www.responsive.dev/legal/responsive-bsl-10 + * + * This software requires a valid Commercial License Key for production use. Trial and commercial + * licenses can be obtained at https://www.responsive.dev + */ + +package dev.responsive.spring.config; + +import dev.responsive.kafka.api.ResponsiveKafkaStreams; +import java.util.Properties; +import org.apache.kafka.streams.KafkaClientSupplier; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.Topology; +import org.springframework.kafka.config.KafkaStreamsCustomizer; + +public interface ResponsiveKafkaStreamsCustomizer extends KafkaStreamsCustomizer { + @Override + default KafkaStreams initKafkaStreams( + Topology topology, + Properties properties, + KafkaClientSupplier clientSupplier + ) { + return new ResponsiveKafkaStreams(topology, properties, clientSupplier); + } +} diff --git a/responsive-spring/src/test/java/dev/responsive/spring/KafkaStreamsApp.java b/responsive-spring/src/test/java/dev/responsive/spring/KafkaStreamsApp.java index 16791cde4..6e74257e1 100644 --- a/responsive-spring/src/test/java/dev/responsive/spring/KafkaStreamsApp.java +++ b/responsive-spring/src/test/java/dev/responsive/spring/KafkaStreamsApp.java @@ -99,9 +99,6 @@ public NewTopic topic1() { @Bean public KStream magic(StreamsBuilder streamBuilder, KafkaAdmin admin) { - final Map streamingTopic1 = admin.describeTopics("streamingTopic1"); - System.out.println(streamingTopic1); - KStream stream = streamBuilder.stream("streamingTopic1"); stream .mapValues((ValueMapper) String::toUpperCase) diff --git a/settings.gradle.kts b/settings.gradle.kts index 72225c6d0..2817ed152 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -43,7 +43,7 @@ dependencyResolutionManagement { versionCatalogs { create("libs") { version("jackson", "2.15.2") - version("kafka", "3.7.1") + version("kafka", "3.8.1") version("scylla", "4.15.0.0") version("javaoperatorsdk", "4.9.6") version("grpc", "1.52.1")