Skip to content

Commit

Permalink
Support key schemas and types (#319)
Browse files Browse the repository at this point in the history
* fixes: #301

BREAKING CHANGE

With Kafka, different parts of the message payload, i.e. the Kafka record, can have type and schema information associated with them.  Most people think of the record value, but there is also the key to think about, and in the future the headers.

This change seeings support for providing type & schema information for the key of each Kafka record in a topic.  This is done by extracting the key information from the channel's `message.bindings.kafka.key` field.

For example, the following defines a Kafka topic with a key that contains a complex type:

```yaml
  message:
    bindings:
      kafka:
        key:
          $ref: "/schema/key.avsc"
    payload:
      $ref: "/schema/value.avsc"
```

The key, or indeed the value, can also be defined as holding one of the inbuilt Kafka types, i.e. types for which most Kafka clients provide serializers out of the box, e.g.

```yaml
  message:
    payload:
      key:
        type: string
      value:
        $ref: "/schema/value.avsc"
```

Support for inline schemas for both key and value remains, however, the provisioning of inline schemas is still unsupported.

* Review comments

* Simplify API

---------

Co-authored-by: Andrew Coates <[email protected]>
  • Loading branch information
big-andy-coates and Andrew Coates authored May 9, 2024
1 parent b640f4e commit a4c31b8
Show file tree
Hide file tree
Showing 37 changed files with 1,199 additions and 195 deletions.
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,20 @@ channels:

publish:
message:
bindings:
kafka:
key:
type: long
payload:
$ref: "/schema/simple.schema_demo._public.user_signed_up.avsc"

_private.user_checkout:
publish:
message:
bindings:
kafka:
key:
$ref: "/schema/simple.schema_demo._public.user_checkout_key.yml"
payload:
$ref: "/schema/simple.schema_demo._public.user_checkout.yml"

Expand Down Expand Up @@ -76,6 +84,8 @@ channels:
2.2. Schema published:
- /schema/simple.schema_demo._public.user_signed_up.avsc
- /schema/simple.schema_demo._public.user_checkout_key.yml
- /schema/simple.schema_demo._public.user_checkout.yml
2.3. ACLs created:
- "name" : "(pattern=ResourcePattern(resourceType=TOPIC, name=simple.spec_demo._public, patternType=PREFIXED), entry=(principal=User:*, host=*, operation=READ, permissionType=ALLOW))",
Expand Down
6 changes: 6 additions & 0 deletions cli/resources/simple_schema_demo-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ channels:
message:
bindings:
kafka:
key:
type: long
schemaIdLocation: "payload"
schemaFormat: "application/vnd.apache.avro+json;version=1.9.0"
contentType: "application/octet-stream"
Expand All @@ -60,6 +62,8 @@ channels:
message:
bindings:
kafka:
key:
type: long
schemaIdLocation: "payload"
schemaFormat: "application/json;version=1.9.0"
contentType: "application/json"
Expand Down Expand Up @@ -87,6 +91,8 @@ channels:
message:
bindings:
kafka:
key:
type: long
schemaIdLocation: "payload"
schemaFormat: "application/json;version=1.9.0"
contentType: "application/json"
Expand Down
4 changes: 2 additions & 2 deletions cli/src/main/java/io/specmesh/cli/Export.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.specmesh.apiparser.model.ApiSpec;
import io.specmesh.apiparser.parse.SpecMapper;
import io.specmesh.kafka.Clients;
import io.specmesh.kafka.Exporter;
import java.util.Map;
Expand Down Expand Up @@ -92,7 +92,7 @@ public Integer call() throws Exception {
try (Admin adminClient = Clients.adminClient(brokerUrl, username, secret)) {
final var apiSpec = Exporter.export(aggid, adminClient);
final var mapper =
new ObjectMapper()
SpecMapper.mapper()
.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY)
.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
System.out.println(mapper.writeValueAsString(apiSpec));
Expand Down
5 changes: 2 additions & 3 deletions cli/src/main/java/io/specmesh/cli/Flatten.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.specmesh.apiparser.parse.SpecMapper;
import io.specmesh.kafka.KafkaApiSpec;
import java.io.FileOutputStream;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -73,7 +72,7 @@ public Integer call() throws Exception {
final var apiSpec = apiSpec1.apiSpec();

final var mapper =
new ObjectMapper(new YAMLFactory())
SpecMapper.mapper()
.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY)
.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
final var channels = apiSpec.channels();
Expand Down
6 changes: 6 additions & 0 deletions cli/src/test/resources/simple_schema_demo-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ channels:
message:
bindings:
kafka:
key:
type: long
schemaIdLocation: "payload"
schemaFormat: "application/vnd.apache.avro+json;version=1.9.0"
contentType: "application/octet-stream"
Expand All @@ -58,6 +60,8 @@ channels:
message:
bindings:
kafka:
key:
type: long
schemaIdLocation: "payload"
schemaFormat: "application/json;version=1.9.0"
contentType: "application/json"
Expand All @@ -84,6 +88,8 @@ channels:
message:
bindings:
kafka:
key:
type: long
schemaIdLocation: "payload"
schemaFormat: "application/json;version=1.9.0"
contentType: "application/json"
Expand Down
2 changes: 1 addition & 1 deletion kafka/src/main/java/io/specmesh/kafka/Exporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ private static TopicReaders.TopicReader reader(
}

/**
* Extract the Channel - todo Produce/Consume info
* Extract the Channel
*
* @param topic - kafka topic config map
* @return decorated channel
Expand Down
5 changes: 2 additions & 3 deletions kafka/src/main/java/io/specmesh/kafka/ExporterYamlWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@
package io.specmesh.kafka;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import io.specmesh.apiparser.model.ApiSpec;
import io.specmesh.apiparser.parse.SpecMapper;

/** Export the Spec object to its yaml representation */
public class ExporterYamlWriter implements ExporterWriter {
Expand All @@ -34,7 +33,7 @@ public class ExporterYamlWriter implements ExporterWriter {
@Override
public String export(final ApiSpec apiSpec) throws Exporter.ExporterException {
try {
return new ObjectMapper(new YAMLFactory()).writeValueAsString(apiSpec);
return SpecMapper.mapper().writeValueAsString(apiSpec);
} catch (JsonProcessingException e) {
throw new Exporter.ExporterException("Failed to convert to YAML", e);
}
Expand Down
50 changes: 44 additions & 6 deletions kafka/src/main/java/io/specmesh/kafka/KafkaApiSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@

import io.specmesh.apiparser.AsyncApiParser;
import io.specmesh.apiparser.model.ApiSpec;
import io.specmesh.apiparser.model.Channel;
import io.specmesh.apiparser.model.Operation;
import io.specmesh.apiparser.model.SchemaInfo;
import java.io.FileInputStream;
import java.io.InputStream;
Expand All @@ -42,8 +44,11 @@
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
Expand Down Expand Up @@ -151,13 +156,46 @@ public Set<AclBinding> requiredAcls() {
* @return the schema info.
*/
public SchemaInfo schemaInfoForTopic(final String topicName) {
final List<NewTopic> myTopics = listDomainOwnedTopics();
myTopics.stream()
.filter(topic -> topic.name().equals(topicName))
.findFirst()
.orElseThrow(() -> new APIException("Not a domain topic:" + topicName));
return ownedTopicSchemas(topicName)
.orElseThrow(() -> new APIException("No schema defined for topic: " + topicName));
}

/**
* Get info about schemas that are conceptually owned by the supplied {@code topicName}.
*
* <p>This differs from {@link #topicSchemas} in that it only returned schemas that should be
* registered when provisioning the topic.
*
* @param topicName the name of the topic
* @return stream of the schema info.
*/
public Optional<SchemaInfo> ownedTopicSchemas(final String topicName) {
final Channel channel = apiSpec.channels().get(topicName);
if (channel == null) {
throw new APIException("Unknown topic:" + topicName);
}

return Optional.ofNullable(channel.publish()).map(Operation::schemaInfo);
}

/**
* Get schema info for the supplied {@code topicName}
*
* <p>This differs from {@link #ownedTopicSchemas} in that it returned all known schemas
* associated with the topic.
*
* @param topicName the name of the topic
* @return stream of the schema info.
*/
public Stream<SchemaInfo> topicSchemas(final String topicName) {
final Channel channel = apiSpec.channels().get(topicName);
if (channel == null) {
throw new APIException("Unknown topic:" + topicName);
}

return apiSpec.channels().get(topicName).publish().schemaInfo();
return Stream.of(channel.publish(), channel.subscribe())
.filter(Objects::nonNull)
.map(Operation::schemaInfo);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,21 @@
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.specmesh.apiparser.model.RecordPart;
import io.specmesh.apiparser.model.SchemaInfo;
import io.specmesh.kafka.KafkaApiSpec;
import io.specmesh.kafka.provision.ExceptionWrapper;
import io.specmesh.kafka.provision.Provisioner;
import io.specmesh.kafka.provision.Status;
import io.specmesh.kafka.provision.WithState;
import io.specmesh.kafka.provision.schema.SchemaReaders.SchemaReader;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Builder;
Expand Down Expand Up @@ -116,39 +120,62 @@ private static SchemaChangeSetCalculators.ChangeSetCalculator calculator(
private static List<Schema> requiredSchemas(
final KafkaApiSpec apiSpec, final String baseResourcePath) {
return apiSpec.listDomainOwnedTopics().stream()
.filter(
topic ->
apiSpec.apiSpec()
.channels()
.get(topic.name())
.publish()
.isSchemaRequired())
.map(
(topic -> {
final var schema = Schema.builder();
try {
final var schemaInfo = apiSpec.schemaInfoForTopic(topic.name());
final var schemaRef = schemaInfo.schemaRef();
final var schemaPath = Paths.get(baseResourcePath, schemaRef);
final var schemas =
new SchemaReaders.FileSystemSchemaReader()
.readLocal(schemaPath.toString());
schema.schemas(schemas)
.type(schemaInfo.schemaRef())
.subject(
resolveSubjectName(
topic.name(), schemas, schemaInfo));
schema.state(CREATE);

} catch (Provisioner.ProvisioningException ex) {
schema.state(FAILED);
schema.exception(ex);
}
return schema.build();
}))
.flatMap(topic -> topicSchemas(apiSpec, baseResourcePath, topic.name()))
.collect(Collectors.toList());
}

private static Stream<Schema> topicSchemas(
final KafkaApiSpec apiSpec, final String baseResourcePath, final String topicName) {
return apiSpec.ownedTopicSchemas(topicName).stream()
.flatMap(
si ->
Stream.of(
si.key()
.flatMap(RecordPart::schemaRef)
.map(
schemaRef ->
partSchema(
"key",
schemaRef,
si,
baseResourcePath,
topicName)),
si.value()
.schemaRef()
.map(
schemaRef ->
partSchema(
"value",
schemaRef,
si,
baseResourcePath,
topicName))))
.flatMap(Optional::stream);
}

private static Schema partSchema(
final String partName,
final String schemaRef,
final SchemaInfo si,
final String baseResourcePath,
final String topicName) {
final Schema.SchemaBuilder builder = Schema.builder();
try {
final Path schemaPath = Paths.get(baseResourcePath, schemaRef);
final Collection<ParsedSchema> schemas =
new SchemaReaders.FileSystemSchemaReader().readLocal(schemaPath);

builder.schemas(schemas)
.type(schemaRef)
.subject(resolveSubjectName(topicName, schemas, si, partName));
builder.state(CREATE);
} catch (Provisioner.ProvisioningException ex) {
builder.state(FAILED);
builder.exception(ex);
}
return builder.build();
}

/**
* Follow these guidelines for Confluent SR and APICurio
* https://docs.confluent.io/platform/6.2/schema-registry/serdes-develop/index.html#referenced-schemas
Expand All @@ -175,8 +202,10 @@ private static List<Schema> requiredSchemas(
private static String resolveSubjectName(
final String topicName,
final Collection<ParsedSchema> schemas,
final SchemaInfo schemaInfo) {
final var lookup = schemaInfo.schemaLookupStrategy();
final SchemaInfo schemaInfo,
final String partName) {
final String lookup = schemaInfo.schemaLookupStrategy().orElse("");

if (lookup.equalsIgnoreCase("SimpleTopicIdStrategy")) {
return topicName;
}
Expand All @@ -202,7 +231,7 @@ private static String resolveSubjectName(

return topicName + "-" + ((AvroSchema) next).rawSchema().getFullName();
}
return topicName + "-value";
return topicName + "-" + partName;
}

private static boolean isAvro(final ParsedSchema schema) {
Expand Down
Loading

0 comments on commit a4c31b8

Please sign in to comment.