Skip to content

Commit

Permalink
Prep work for registering schemas (#410)
Browse files Browse the repository at this point in the history
* Prep work for registering schemas

Wire in the ability for a serde provider to 'ensure' the schemas of owned topics are registered. This is called during service initialisation.

This will be used when JSON schema serde is added. (#25).
  • Loading branch information
big-andy-coates authored Nov 22, 2023
1 parent 841360a commit 343d934
Show file tree
Hide file tree
Showing 23 changed files with 585 additions and 187 deletions.
1 change: 1 addition & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ jobs:
run: |
./gradlew -Dgradle.publish.key="$GRADLE_PUBLISH_KEY" -Dgradle.publish.secret="$GRADLE_PUBLISH_SECRET" publishPlugins
# Until Creek fully supports Windows, minimal check:
build_windows:
runs-on: windows-latest
steps:
Expand Down
1 change: 1 addition & 0 deletions client-extension/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
exports org.creekservice.api.kafka.extension;
exports org.creekservice.api.kafka.extension.client;
exports org.creekservice.api.kafka.extension.config;
exports org.creekservice.api.kafka.extension.logging;
exports org.creekservice.api.kafka.extension.resource;
exports org.creekservice.internal.kafka.extension to
creek.kafka.streams.extension,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.creekservice.api.base.annotation.VisibleForTesting;
import org.creekservice.api.kafka.extension.client.TopicClient;
import org.creekservice.api.kafka.extension.config.ClustersProperties;
import org.creekservice.api.kafka.serde.provider.KafkaSerdeProviders;
import org.creekservice.api.platform.metadata.ComponentDescriptor;
import org.creekservice.api.service.extension.CreekExtensionProvider;
import org.creekservice.api.service.extension.CreekService;
Expand All @@ -47,12 +48,16 @@ public final class KafkaClientsExtensionProvider

/** Constructor */
public KafkaClientsExtensionProvider() {
this(KafkaSerdeProviders.create());
}

private KafkaClientsExtensionProvider(final KafkaSerdeProviders serdeProviders) {
this(
new KafkaResourceValidator(),
new ClustersPropertiesFactory(),
KafkaTopicClient::new,
props -> new KafkaTopicClient(props, serdeProviders),
ClientsExtension::new,
new ResourceRegistryFactory());
new ResourceRegistryFactory(serdeProviders));
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2023 Creek Contributors (https://github.com/creek-service)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.creekservice.api.kafka.extension.logging;

/** Common logging fields. */
public enum LoggingField {
partitions,
topicIds,
topicId
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static java.util.stream.Collectors.toList;
import static org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG;

import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -36,34 +37,48 @@
import org.creekservice.api.base.annotation.VisibleForTesting;
import org.creekservice.api.kafka.extension.client.TopicClient;
import org.creekservice.api.kafka.extension.config.ClustersProperties;
import org.creekservice.api.kafka.extension.logging.LoggingField;
import org.creekservice.api.kafka.metadata.CreatableKafkaTopic;
import org.creekservice.api.kafka.metadata.KafkaTopicDescriptor;
import org.creekservice.api.kafka.serde.provider.KafkaSerdeProviders;
import org.creekservice.api.observability.logging.structured.LogEntryCustomizer;
import org.creekservice.api.observability.logging.structured.StructuredLogger;
import org.creekservice.api.observability.logging.structured.StructuredLoggerFactory;

/** Implementation of {@link TopicClient} */
/**
* Implementation of {@link TopicClient}.
*
* <p>Responsible for ensuring topics are created, along with any associated resources, e.g.
* schemas.
*/
public final class KafkaTopicClient implements TopicClient {

private final StructuredLogger logger;
private final ClustersProperties clusterProps;
private final KafkaSerdeProviders serdeProviders;
private final Function<Map<String, Object>, Admin> adminFactory;

/**
* @param clusterProps props
* @param serdeProviders all know serde providers
*/
public KafkaTopicClient(final ClustersProperties clusterProps) {
public KafkaTopicClient(
final ClustersProperties clusterProps, final KafkaSerdeProviders serdeProviders) {
this(
clusterProps,
serdeProviders,
Admin::create,
StructuredLoggerFactory.internalLogger(KafkaTopicClient.class));
}

@VisibleForTesting
KafkaTopicClient(
final ClustersProperties clusterProps,
final KafkaSerdeProviders serdeProviders,
final Function<Map<String, Object>, Admin> adminFactory,
final StructuredLogger logger) {
this.clusterProps = requireNonNull(clusterProps, "clusterProps");
this.serdeProviders = requireNonNull(serdeProviders, "serdeProviders");
this.adminFactory = requireNonNull(adminFactory, "adminFactory");
this.logger = requireNonNull(logger, "logger");
}
Expand All @@ -76,11 +91,25 @@ public void ensure(final List<? extends CreatableKafkaTopic<?, ?>> topics) {
}

private void ensure(final String cluster, final List<CreatableKafkaTopic<?, ?>> topics) {
logger.info(
topics.forEach(this::ensureTopicResources);
ensureTopics(cluster, topics);
}

private void ensureTopicResources(final CreatableKafkaTopic<?, ?> topic) {
logger.debug("Ensuring topic resources", log -> log.with(LoggingField.topicId, topic.id()));

final Map<String, Object> props = clusterProps.get(topic.cluster());
serdeProviders.get(topic.key().format()).ensureTopicPartResources(topic.key(), props);

serdeProviders.get(topic.value().format()).ensureTopicPartResources(topic.value(), props);
}

private void ensureTopics(final String cluster, final List<CreatableKafkaTopic<?, ?>> topics) {
logger.debug(
"Ensuring topics",
log ->
log.with(
"topic-ids",
LoggingField.topicIds,
topics.stream().map(CreatableKafkaTopic::id).collect(toList())));

try (Admin admin = adminFactory.apply(clusterProps.get(cluster))) {
Expand All @@ -104,6 +133,7 @@ private void create(
final Consumer<Map.Entry<String, KafkaFuture<Void>>> throwOnFailure =
e -> {
final String topic = e.getKey();
final URI topicId = KafkaTopicDescriptor.resourceId(cluster, topic);
try {
e.getValue().get();

Expand All @@ -117,20 +147,21 @@ private void create(
"Created topic",
log -> {
final LogEntryCustomizer configNs =
log.with("cluster", cluster)
.with("name", topic)
.with("partitions", partitions)
log.with(LoggingField.topicId, topicId)
.with(LoggingField.partitions, partitions)
.ns("config");

config.forEach(c -> configNs.with(c.name(), c.value()));
});
} catch (ExecutionException ex) {
if (!(ex.getCause() instanceof TopicExistsException)) {
throw new CreateTopicException(topic, cluster, ex.getCause());
throw new CreateTopicException(topicId, ex.getCause());
}
logger.debug("Topic already exists", log -> log.with("nane", topic));
logger.debug(
"Topic already exists",
log -> log.with(LoggingField.topicId, topicId));
} catch (Exception ex) {
throw new CreateTopicException(topic, cluster, ex);
throw new CreateTopicException(topicId, ex);
}
};

Expand All @@ -146,11 +177,8 @@ private static NewTopic toNewTopic(final CreatableKafkaTopic<?, ?> descriptor) {
}

private static final class CreateTopicException extends RuntimeException {
CreateTopicException(
final String topicName, final String clusterName, final Throwable cause) {
super(
"Failed to create topic. topic: " + topicName + ", cluster: " + clusterName,
cause);
CreateTopicException(final URI topicId, final Throwable cause) {
super("Failed to create topic. " + LoggingField.topicId + ": " + topicId, cause);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@
import org.apache.kafka.common.serialization.Serde;
import org.creekservice.api.base.annotation.VisibleForTesting;
import org.creekservice.api.kafka.extension.config.ClustersProperties;
import org.creekservice.api.kafka.extension.logging.LoggingField;
import org.creekservice.api.kafka.metadata.CreatableKafkaTopic;
import org.creekservice.api.kafka.metadata.KafkaTopicDescriptor;
import org.creekservice.api.kafka.metadata.SerializationFormat;
import org.creekservice.api.kafka.metadata.KafkaTopicDescriptor.PartDescriptor;
import org.creekservice.api.kafka.serde.provider.KafkaSerdeProvider;
import org.creekservice.api.kafka.serde.provider.KafkaSerdeProviders;
import org.creekservice.api.platform.metadata.ComponentDescriptor;
Expand All @@ -42,9 +43,13 @@ public final class ResourceRegistryFactory {
private final RegistryFactory registryFactory;
private final TopicFactory topicFactory;

/** Constructor. */
public ResourceRegistryFactory() {
this(KafkaSerdeProviders.create(), new TopicCollector(), ResourceRegistry::new, Topic::new);
/**
* Constructor.
*
* @param serdeProviders all known serde providers.
*/
public ResourceRegistryFactory(final KafkaSerdeProviders serdeProviders) {
this(serdeProviders, new TopicCollector(), ResourceRegistry::new, Topic::new);
}

@VisibleForTesting
Expand Down Expand Up @@ -97,31 +102,25 @@ public ResourceRegistry create(
private <K, V> Topic<K, V> createTopicResource(
final KafkaTopicDescriptor<K, V> def, final ClustersProperties allProperties) {
final Map<String, Object> properties = allProperties.get(def.cluster());
final Serde<K> keySerde = serde(def.key(), def.name(), true, properties);
final Serde<V> valueSerde = serde(def.value(), def.name(), false, properties);
final Serde<K> keySerde = serde(def.key(), properties);
final Serde<V> valueSerde = serde(def.value(), properties);
return topicFactory.create(def, keySerde, valueSerde);
}

private <T> Serde<T> serde(
final KafkaTopicDescriptor.PartDescriptor<T> part,
final String topicName,
final boolean isKey,
final Map<String, Object> clusterProperties) {
final KafkaSerdeProvider provider = provider(part, topicName, isKey);
final PartDescriptor<T> part, final Map<String, Object> clusterProperties) {
final KafkaSerdeProvider provider = provider(part);

final Serde<T> serde = provider.create(part);
serde.configure(clusterProperties, isKey);
serde.configure(clusterProperties, part.part().isKey());
return serde;
}

private <T> KafkaSerdeProvider provider(
final KafkaTopicDescriptor.PartDescriptor<T> part,
final String topicName,
final boolean isKey) {
private <T> KafkaSerdeProvider provider(final PartDescriptor<T> part) {
try {
return serdeProviders.get(part.format());
} catch (final Exception e) {
throw new UnknownSerializationFormatException(part.format(), topicName, isKey, e);
throw new UnknownSerializationFormatException(part, e);
}
}

Expand All @@ -137,19 +136,17 @@ interface RegistryFactory {
}

private static final class UnknownSerializationFormatException extends RuntimeException {
UnknownSerializationFormatException(
final SerializationFormat format,
final String topicName,
final boolean isKey,
final Throwable cause) {
UnknownSerializationFormatException(final PartDescriptor<?> part, final Throwable cause) {
super(
"Unknown "
+ (isKey ? "key" : "value")
+ part.part()
+ " serialization format encountered."
+ " format="
+ format
+ ", topic="
+ topicName,
+ part.format()
+ ", "
+ LoggingField.topicId
+ "="
+ part.topic().id(),
cause);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.mockito.quality.Strictness;
import org.mockito.stubbing.Answer;

@SuppressWarnings("resource")
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
class KafkaClientsExtensionProviderTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.creekservice.api.kafka.extension.config.ClustersProperties;
import org.creekservice.api.kafka.extension.resource.KafkaTopic;
import org.creekservice.api.kafka.metadata.CreatableKafkaTopic;
import org.creekservice.api.kafka.serde.provider.KafkaSerdeProviders;
import org.creekservice.api.kafka.test.service.TestServiceDescriptor;
import org.creekservice.api.kafka.test.service.UpstreamAggregateDescriptor;
import org.creekservice.internal.kafka.extension.resource.ResourceRegistry;
Expand Down Expand Up @@ -83,7 +84,7 @@ void setUp() {
.build(Set.of());

final ResourceRegistry registry =
new ResourceRegistryFactory()
new ResourceRegistryFactory(KafkaSerdeProviders.create())
.create(List.of(new TestServiceDescriptor()), clustersProperties);

try (Admin admin = Admin.create(clustersProperties.get(DEFAULT_CLUSTER_NAME))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.creekservice.api.kafka.extension.config.ClustersProperties;
import org.creekservice.api.kafka.metadata.CreatableKafkaTopic;
import org.creekservice.api.kafka.metadata.OwnedKafkaTopicOutput;
import org.creekservice.api.kafka.serde.provider.KafkaSerdeProviders;
import org.creekservice.test.TopicConfigBuilder;
import org.creekservice.test.TopicDescriptors;
import org.hamcrest.Description;
Expand Down Expand Up @@ -97,7 +98,7 @@ class KafkaTopicClientFunctionalTest {

@BeforeEach
void setUp() {
client = new KafkaTopicClient(clustersProperties);
client = new KafkaTopicClient(clustersProperties, KafkaSerdeProviders.create());

final Map<String, Object> defaultClusterProps =
Map.of(BOOTSTRAP_SERVERS_CONFIG, DEFAULT_CLUSTER.getBootstrapServers());
Expand Down
Loading

0 comments on commit 343d934

Please sign in to comment.