Skip to content

Commit

Permalink
Upgrade to AK 3.8 and spring-kafka 3.3
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra committed Dec 17, 2024
1 parent e13fc2b commit 319c093
Show file tree
Hide file tree
Showing 9 changed files with 42 additions and 361 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -351,16 +351,16 @@ private static Properties propsWithOverrides(
return propsWithOverrides;
}

final Object o = configs.originals().get(InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS);
final Object o = configs.originals().get(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG);
if (o == null) {
propsWithOverrides.put(
InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS,
StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG,
TASK_ASSIGNOR_CLASS_OVERRIDE
);
} else if (!TASK_ASSIGNOR_CLASS_OVERRIDE.equals(o.toString())) {
final String errorMsg = String.format(
"Invalid Streams configuration value for '%s': got %s, expected '%s'",
InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS,
StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG,
o,
TASK_ASSIGNOR_CLASS_OVERRIDE
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.kafka.common.config.ConfigDef.Validator;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
import org.apache.kafka.streams.processor.assignment.assignors.StickyTaskAssignor;

/**
* Configurations for {@link ResponsiveKafkaStreams}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,8 @@ public void init(final ProcessorContext<Void, Void> context) {
public void process(final Record<Object, Object> record) {
global.put(record.key(), record.value());
}
}
},
false
);

final String baseDirectoryName = tempDir.getAbsolutePath();
Expand Down
2 changes: 1 addition & 1 deletion responsive-spring/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ version = project(":kafka-client").version
dependencies {
implementation(project(":kafka-client"))
implementation("com.google.code.findbugs:jsr305:3.0.2")
implementation("org.springframework.kafka:spring-kafka:3.2.4")
implementation("org.springframework.kafka:spring-kafka:3.3.0")
implementation("org.springframework.boot:spring-boot-starter:3.3.2")

testImplementation(testlibs.bundles.base)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

package dev.responsive.spring.annotations;

import dev.responsive.spring.config.ResponsiveFactoryBean;
import dev.responsive.spring.config.ResponsiveKafkaStreamsCustomizer;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.UnsatisfiedDependencyException;
import org.springframework.beans.factory.annotation.Qualifier;
Expand All @@ -35,8 +35,9 @@ public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder(
) {
KafkaStreamsConfiguration streamsConfig = streamsConfigProvider.getIfAvailable();
if (streamsConfig != null) {
StreamsBuilderFactoryBean fb = new ResponsiveFactoryBean(streamsConfig);
StreamsBuilderFactoryBean fb = new StreamsBuilderFactoryBean(streamsConfig);
configurerProvider.orderedStream().forEach(configurer -> configurer.configure(fb));
fb.setKafkaStreamsCustomizer((ResponsiveKafkaStreamsCustomizer) kafkaStreams -> { });
return fb;
} else {
throw new UnsatisfiedDependencyException(KafkaStreamsDefaultConfiguration.class.getName(),
Expand Down
Loading

0 comments on commit 319c093

Please sign in to comment.