Skip to content

Commit

Permalink
support metrics-only mode in ResponsiveKafkaStreams (#189)
Browse files Browse the repository at this point in the history
agavra authored Nov 28, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 285b525 commit 6d30a89
Showing 9 changed files with 406 additions and 54 deletions.
Original file line number Diff line number Diff line change
@@ -18,14 +18,14 @@

import static dev.responsive.kafka.api.config.ResponsiveConfig.CLIENT_ID_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.CLIENT_SECRET_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.STORAGE_BACKEND_TYPE_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.STORAGE_HOSTNAME_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.TASK_ASSIGNOR_CLASS_OVERRIDE;
import static dev.responsive.kafka.internal.metrics.ResponsiveMetrics.RESPONSIVE_METRICS_NAMESPACE;

import dev.responsive.kafka.api.config.CompatibilityMode;
import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.api.config.StorageBackend;
import dev.responsive.kafka.internal.clients.ResponsiveKafkaClientSupplier;
import dev.responsive.kafka.internal.config.ConfigUtils;
import dev.responsive.kafka.internal.config.InternalSessionConfigs;
import dev.responsive.kafka.internal.config.ResponsiveStreamsConfig;
import dev.responsive.kafka.internal.db.CassandraClientFactory;
@@ -41,7 +41,6 @@
import java.time.Duration;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
@@ -174,18 +173,20 @@ protected ResponsiveKafkaStreams(final Params params) {
super(
params.topology,
propsWithOverrides(
params.responsiveConfig.originals(),
params.responsiveConfig,
params.sessionClients,
params.storeRegistry,
params.topology.describe()),
params.responsiveKafkaClientSupplier,
params.time
);

try {
ResponsiveStreamsConfig.validateStreamsConfig(applicationConfigs);
} catch (final ConfigException e) {
throw new StreamsException("Configuration error, please check your properties");
if (params.compatibilityMode == CompatibilityMode.FULL) {
try {
ResponsiveStreamsConfig.validateStreamsConfig(applicationConfigs);
} catch (final ConfigException e) {
throw new StreamsException("Configuration error, please check your properties", e);
}
}

this.responsiveMetrics = params.metrics;
@@ -236,21 +237,25 @@ private static ResponsiveMetrics createMetrics(final StreamsConfig config) {
* before these get finalized as a {@link StreamsConfig} object
*/
private static Properties propsWithOverrides(
final Map<?, ?> configs,
final ResponsiveConfig configs,
final SessionClients sessionClients,
final ResponsiveStoreRegistry storeRegistry,
final TopologyDescription topologyDescription
) {
final Properties propsWithOverrides = new Properties();

propsWithOverrides.putAll(configs);
propsWithOverrides.putAll(configs.originals());
propsWithOverrides.putAll(new InternalSessionConfigs.Builder()
.withSessionClients(sessionClients)
.withStoreRegistry(storeRegistry)
.withTopologyDescription(topologyDescription)
.build());

final Object o = configs.get(InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS);
if (ConfigUtils.compatibilityMode(configs) == CompatibilityMode.METRICS_ONLY) {
return propsWithOverrides;
}

final Object o = configs.originals().get(InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS);
if (o == null) {
propsWithOverrides.put(
InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS,
@@ -353,6 +358,7 @@ protected static class Params {
final StreamsConfig streamsConfig;
final ResponsiveMetrics metrics;
final ResponsiveStoreRegistry storeRegistry;
final CompatibilityMode compatibilityMode;

// can be set during construction
private Time time = Time.SYSTEM;
@@ -372,6 +378,7 @@ public Params(final Topology topology, final Map<?, ?> configs) {

this.metrics = createMetrics(streamsConfig);
this.storeRegistry = new ResponsiveStoreRegistry();
this.compatibilityMode = ConfigUtils.compatibilityMode(responsiveConfig);
}

public Params withClientSupplier(final KafkaClientSupplier clientSupplier) {
@@ -397,15 +404,17 @@ public Params build() {
clientSupplier,
streamsConfig,
storeRegistry,
metrics
metrics,
compatibilityMode
);
final var admin = responsiveKafkaClientSupplier.getAdmin(responsiveConfig.originals());

final var backendType = StorageBackend.valueOf(responsiveConfig
.getString(STORAGE_BACKEND_TYPE_CONFIG)
.toUpperCase(Locale.ROOT)
);
final var admin = responsiveKafkaClientSupplier.getAdmin(responsiveConfig.originals());
if (compatibilityMode == CompatibilityMode.METRICS_ONLY) {
sessionClients = new SessionClients(Optional.empty(), Optional.empty(), admin);
return this;
}

final var backendType = ConfigUtils.storageBackend(responsiveConfig);
switch (backendType) {
case CASSANDRA:
final var cqlSession = cassandraFactory.createCqlSession(responsiveConfig);
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2023 Responsive Computing, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dev.responsive.kafka.api.config;

import java.util.Arrays;

public enum CompatibilityMode {

FULL,
METRICS_ONLY;

public static String[] names() {
return Arrays.stream(values()).map(Enum::name).toArray(String[]::new);
}

}
Original file line number Diff line number Diff line change
@@ -37,6 +37,11 @@ public class ResponsiveConfig extends AbstractConfig {

// ------------------ connection configurations -----------------------------

public static final String COMPATIBILITY_MODE_CONFIG = "responsive.compatibility.mode";
private static final String COMPATIBILITY_MODE_DOC = "This configuration enables running Responsive "
+ "in compatibility mode, disabling certain features.";
private static final CompatibilityMode COMPATIBILITY_MODE_DEFAULT = CompatibilityMode.FULL;

public static final String STORAGE_HOSTNAME_CONFIG = "responsive.storage.hostname";
private static final String STORAGE_HOSTNAME_DOC = "The hostname of the storage server.";

@@ -138,6 +143,14 @@ public class ResponsiveConfig extends AbstractConfig {
public static final String TASK_ASSIGNOR_CLASS_OVERRIDE = StickyTaskAssignor.class.getName();

private static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(
COMPATIBILITY_MODE_CONFIG,
Type.STRING,
COMPATIBILITY_MODE_DEFAULT.name(),
ConfigDef.CaseInsensitiveValidString.in(CompatibilityMode.names()),
Importance.MEDIUM,
COMPATIBILITY_MODE_DOC
)
.define(
STORAGE_HOSTNAME_CONFIG,
Type.STRING,
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@

import static org.apache.kafka.streams.StreamsConfig.AT_LEAST_ONCE;

import dev.responsive.kafka.api.config.CompatibilityMode;
import dev.responsive.kafka.internal.metrics.EndOffsetsPoller;
import dev.responsive.kafka.internal.metrics.MetricPublishingCommitListener;
import dev.responsive.kafka.internal.metrics.ResponsiveMetrics;
@@ -29,13 +30,16 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.function.Function;
import java.util.regex.Pattern;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.StreamsConfig;
@@ -62,32 +66,40 @@ public final class ResponsiveKafkaClientSupplier implements KafkaClientSupplier
private final EndOffsetsPoller endOffsetsPoller;
private final String applicationId;
private final boolean eos;
private final CompatibilityMode compatibilityMode;

public ResponsiveKafkaClientSupplier(
final KafkaClientSupplier clientSupplier,
final StreamsConfig configs,
final ResponsiveStoreRegistry storeRegistry,
final ResponsiveMetrics metrics
final ResponsiveMetrics metrics,
final CompatibilityMode compatibilityMode
) {
this(new Factories() {}, clientSupplier, configs, storeRegistry, metrics);
this(new Factories() {}, clientSupplier, configs, storeRegistry, metrics, compatibilityMode);
}

ResponsiveKafkaClientSupplier(
final Factories factories,
final KafkaClientSupplier wrapped,
final StreamsConfig configs,
final ResponsiveStoreRegistry storeRegistry,
final ResponsiveMetrics metrics
final ResponsiveMetrics metrics,
final CompatibilityMode compatibilityMode
) {
this.factories = factories;
this.wrapped = wrapped;
this.storeRegistry = storeRegistry;
this.metrics = metrics;
this.compatibilityMode = compatibilityMode;

eos = !(AT_LEAST_ONCE.equals(
configs.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)));

endOffsetsPoller = factories.createEndOffsetPoller(configs.originals(), metrics);
endOffsetsPoller = factories.createEndOffsetPoller(
configs.originals(),
metrics,
this
);
applicationId = configs.getString(StreamsConfig.APPLICATION_ID_CONFIG);
}

@@ -153,26 +165,35 @@ public Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config) {

@Override
public Consumer<byte[], byte[]> getRestoreConsumer(final Map<String, Object> config) {
if (compatibilityMode == CompatibilityMode.METRICS_ONLY) {
return wrapped.getRestoreConsumer(config);
}

final String clientId = (String) config.get(ConsumerConfig.CLIENT_ID_CONFIG);
LOG.info("Creating responsive restore consumer: {}", clientId);
return new ResponsiveRestoreConsumer<>(
return factories.createRestoreConsumer(
clientId,
wrapped.getRestoreConsumer(config),
storeRegistry::getCommittedOffset
);

}

@Override
public Consumer<byte[], byte[]> getGlobalConsumer(final Map<String, Object> config) {
if (compatibilityMode == CompatibilityMode.METRICS_ONLY) {
return wrapped.getGlobalConsumer(config);
}

LOG.info("Creating responsive global consumer");

config.put(ConsumerConfig.GROUP_ID_CONFIG, applicationId + "-global");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);

return new ResponsiveGlobalConsumer(
return factories.createGlobalConsumer(
config,
wrapped.getGlobalConsumer(config),
wrapped,
getAdmin(config)
);
}
@@ -330,9 +351,10 @@ private T getVal() {
interface Factories {
default EndOffsetsPoller createEndOffsetPoller(
final Map<String, ?> config,
final ResponsiveMetrics metrics
final ResponsiveMetrics metrics,
final KafkaClientSupplier clientSupplier
) {
return new EndOffsetsPoller(config, metrics);
return new EndOffsetsPoller(config, metrics, clientSupplier::getAdmin);
}

default <K, V> ResponsiveProducer<K, V> createResponsiveProducer(
@@ -350,6 +372,18 @@ default <K, V> ResponsiveConsumer<K, V> createResponsiveConsumer(
return new ResponsiveConsumer<>(clientId, wrapped, listeners);
}

default <K, V> ResponsiveGlobalConsumer createGlobalConsumer(
final Map<String, Object> config,
final KafkaClientSupplier wrapped,
final Admin admin
) {
return new ResponsiveGlobalConsumer(
config,
wrapped.getGlobalConsumer(config),
admin
);
}

default OffsetRecorder createOffsetRecorder(boolean eos) {
return new OffsetRecorder(eos);
}
@@ -365,5 +399,17 @@ default MetricPublishingCommitListener createMetricsPublishingCommitListener(
offsetRecorder
);
}

default ResponsiveRestoreConsumer<byte[], byte[]> createRestoreConsumer(
final String clientId,
final Consumer<byte[], byte[]> restoreConsumer,
final Function<TopicPartition, OptionalLong> getCommittedOffset
) {
return new ResponsiveRestoreConsumer<>(
clientId,
restoreConsumer,
getCommittedOffset
);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2023 Responsive Computing, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dev.responsive.kafka.internal.config;

import dev.responsive.kafka.api.config.CompatibilityMode;
import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.api.config.StorageBackend;
import java.util.Locale;

/**
* Internal utility to make it easier to extract some values from {@link ResponsiveConfig}
* without making those methods part of our public API.
*
* @implNote please keep this in the {@code .internal} package
*/
public class ConfigUtils {

private ConfigUtils() {
/* Empty constructor for public class */
}

public static StorageBackend storageBackend(final ResponsiveConfig config) {
final var backend = config
.getString(ResponsiveConfig.STORAGE_BACKEND_TYPE_CONFIG)
.toUpperCase(Locale.ROOT);

return StorageBackend.valueOf(backend);
}

public static CompatibilityMode compatibilityMode(final ResponsiveConfig config) {
final var backend = config
.getString(ResponsiveConfig.COMPATIBILITY_MODE_CONFIG)
.toUpperCase(Locale.ROOT);

return CompatibilityMode.valueOf(backend);
}

}
Loading

0 comments on commit 6d30a89

Please sign in to comment.