Skip to content

Commit

Permalink
improve log message on missing configurations (#406)
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra authored Jan 9, 2025
1 parent 5bc49de commit 9ce9b2b
Showing 1 changed file with 21 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package dev.responsive.kafka.internal.config;

import dev.responsive.kafka.api.async.internals.AsyncThreadPoolRegistry;
import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.internal.metrics.ResponsiveMetrics;
import dev.responsive.kafka.internal.stores.ResponsiveStoreRegistry;
import dev.responsive.kafka.internal.utils.SessionClients;
Expand All @@ -27,21 +28,26 @@ public final class InternalSessionConfigs {
private static final Logger LOG = LoggerFactory.getLogger(InternalSessionConfigs.class);

private static final String INTERNAL_ASYNC_THREAD_POOL_REGISTRY_CONFIG = "__internal.responsive.async.thread.pool.registry__";
private static final String INTERNAL_SESSION_CLIENTS_CONFIG = "__internal.responsive.cassandra.client__";
private static final String INTERNAL_SESSION_CLIENTS_CONFIG = "__internal.responsive.session.clients__";
private static final String INTERNAL_STORE_REGISTRY_CONFIG = "__internal.responsive.store.registry__";
private static final String INTERNAL_TOPOLOGY_DESCRIPTION_CONFIG = "__internal.responsive.topology.description__";
private static final String INTERNAL_METRICS_CONFIG = "__internal.responsive.metrics__";

private static final String NOT_RESPONSIVE_ERR_MSG =
"This can happen if you instantiated a Responsive state store with a KafkaStreams instance "
+ "that is not ResponsiveKafkaStreams.";

private static <T> T loadFromConfig(
final Map<String, Object> configs,
final String configName,
final Class<T> type,
final String name
final String name,
final Object details
) {
final Object o = configs.get(configName);
if (o == null) {
final IllegalStateException fatalException = new IllegalStateException(
String.format("Failed to load %s as %s was missing", name, configName)
String.format("Failed to load %s as %s was missing. %s", name, configName, details)
);
LOG.error(fatalException.getMessage(), fatalException);
throw fatalException;
Expand All @@ -66,7 +72,8 @@ public static TopologyDescription loadTopologyDescription(
config,
INTERNAL_TOPOLOGY_DESCRIPTION_CONFIG,
TopologyDescription.class,
"Topology description"
"Topology description",
NOT_RESPONSIVE_ERR_MSG
);
}

Expand All @@ -75,7 +82,10 @@ public static AsyncThreadPoolRegistry loadAsyncThreadPoolRegistry(final Map<Stri
configs,
InternalSessionConfigs.INTERNAL_ASYNC_THREAD_POOL_REGISTRY_CONFIG,
AsyncThreadPoolRegistry.class,
"Async thread pool registry"
"Async thread pool registry",
"This can happen if " + ResponsiveConfig.ASYNC_THREAD_POOL_SIZE_CONFIG
+ " is zero or an async store was registered with a KafkaStreams instance "
+ "that is not ResponsiveKafkaStreams"
);
}

Expand All @@ -84,7 +94,8 @@ public static ResponsiveMetrics loadMetrics(final Map<String, Object> configs) {
configs,
InternalSessionConfigs.INTERNAL_METRICS_CONFIG,
ResponsiveMetrics.class,
"Responsive Metrics"
"Responsive Metrics",
NOT_RESPONSIVE_ERR_MSG
);
}

Expand All @@ -93,7 +104,8 @@ public static SessionClients loadSessionClients(final Map<String, Object> config
configs,
InternalSessionConfigs.INTERNAL_SESSION_CLIENTS_CONFIG,
SessionClients.class,
"Shared session clients"
"Shared session clients",
NOT_RESPONSIVE_ERR_MSG
);
}

Expand All @@ -102,7 +114,8 @@ public static ResponsiveStoreRegistry loadStoreRegistry(final Map<String, Object
configs,
INTERNAL_STORE_REGISTRY_CONFIG,
ResponsiveStoreRegistry.class,
"Store registry"
"Store registry",
NOT_RESPONSIVE_ERR_MSG
);
}

Expand Down

0 comments on commit 9ce9b2b

Please sign in to comment.