Skip to content

Commit

Permalink
Allow domain services to create arbitrary private topics. (#63)
Browse files Browse the repository at this point in the history
* Allow domain services to create arbitrary private topics.

fixes: #61
  • Loading branch information
big-andy-coates authored Feb 4, 2023
1 parent 0f6b98e commit d658132
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 86 deletions.
48 changes: 48 additions & 0 deletions kafka/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Kafka SpecMesh extension

Extends SpecMesh with the concept of public, protected and public topics.

## Topic structure

Topic names in the spec come under the `channel` section. Their names either:

* start with `_public`, `_protected` or `_private` for topics _owned_ by the domain, or
* are the fully qualified topic name of some other (domain's) topic.

For topic's owned by the domain, their full topic name is that used in the spec, prefixed with the spec's domain id,
i.e. `<domain-id>.<channel-name>`. In this way, all topics owned by the domain and also prefixed by the domain name,
making it trivial to trace back the topic name to its owning domain.

## Authorisation

## Topic authorisation

The provisioner will set ACLs to enforce the following topic authorisation:

### Public topics:

Only the domain itself can `WRITE` to public topics; Any domain can `READ` from them.

### Protected topics:

Only the domain itself can `WRITE` to protected topics; Any domain specifically tagged in the spec can `READ` from them.

Access to a protected topic can be granted to another domain by adding a `grant-access:` tag to the topic:

```yaml
tags: [
name: "grant-access:some.other.domain"
]
```

### Private topics:

Only the domain itself can `WRITE` to private topics; Only the domain itself can `READ` to private topics.

Additionally, the domain itself also has `CREATE` permissions for topics under its domain id.
This allows the domain's services to create any additional internal topics required, e.g. Kafka Streams
library creates repartition and changelog topics for stores automatically using the Admin client.

As services are free to make additional private topics, provisioning does _not_ remove existing private topics not in the spec.
This places the responsibility of cleaning up private topics on engineering teams. However, as these are private
topics, it is easy to determine if such topics are or are not actively in use by the domain's services.
157 changes: 80 additions & 77 deletions kafka/src/main/java/io/specmesh/kafka/KafkaApiSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.specmesh.kafka;

import static java.util.Objects.requireNonNull;
import static org.apache.kafka.common.acl.AclOperation.CREATE;
import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
import static org.apache.kafka.common.acl.AclOperation.IDEMPOTENT_WRITE;
import static org.apache.kafka.common.acl.AclOperation.READ;
Expand All @@ -26,6 +27,7 @@

import io.specmesh.apiparser.model.ApiSpec;
import io.specmesh.apiparser.model.SchemaInfo;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
Expand All @@ -43,6 +45,9 @@

/** Kafka entity mappings from the AsyncAPISpec */
public class KafkaApiSpec {

private static final String GRANT_ACCESS_TAG = "grant-access:";

private final ApiSpec apiSpec;

/**
Expand Down Expand Up @@ -72,7 +77,7 @@ public String id() {
*/
public List<NewTopic> listDomainOwnedTopics() {
return apiSpec.channels().entrySet().stream()
.filter(e -> e.getKey().startsWith(apiSpec.id()))
.filter(e -> e.getKey().startsWith(id()))
.map(
e ->
new NewTopic(
Expand All @@ -83,24 +88,6 @@ public List<NewTopic> listDomainOwnedTopics() {
.collect(Collectors.toList());
}

private void validateTopicConfig() {
final String id = apiSpec.id();
apiSpec.channels()
.forEach(
(k, v) -> {
if (k.startsWith(id)
&& v.publish() != null
&& (v.bindings() == null || v.bindings().kafka() == null)) {
throw new IllegalStateException(
"Kafka bindings are missing from channel: ["
+ k
+ "] Domain owner: ["
+ id
+ "]");
}
});
}

/**
* Create an ACL for the domain-id principle that allows writing to any topic prefixed with the
* Id Prevent non ACL'd ones from writing to it (somehow)
Expand All @@ -112,73 +99,33 @@ private void validateTopicConfig() {
public List<AclBinding> listACLsForDomainOwnedTopics() {
validateTopicConfig();

final String id = apiSpec.id();
final String principal = formatPrincipal(apiSpec.id());

final List<AclBinding> topicAcls =
apiSpec.channels().entrySet().stream()
.filter(
e ->
e.getKey().startsWith(id + "._protected.")
&& e.getValue()
.publish()
.tags()
.toString()
.contains("grant-access:"))
.flatMap(
v ->
v.getValue().publish().tags().stream()
.filter(
tag ->
tag.name()
.startsWith(
"grant-access:"))
.map(
tag ->
tag.name()
.substring(
"grant-access:"
.length()))
.map(
user ->
literalAcls(
TOPIC,
v.getKey(),
formatPrincipal(user),
DESCRIBE,
READ))
.flatMap(Collection::stream))
.collect(Collectors.toList());

// Unrestricted access to all for public topics:
topicAcls.addAll(prefixedAcls(TOPIC, id + "._public", "User:*", DESCRIBE, READ));
// Produce & consume owned topics:
topicAcls.addAll(prefixedAcls(TOPIC, id, principal, DESCRIBE, READ, WRITE));

topicAcls.addAll(prefixedAcls(TOPIC, id, principal, IDEMPOTENT_WRITE));
final List<AclBinding> topicAcls = new ArrayList<>();
topicAcls.addAll(ownTopicAcls());
topicAcls.addAll(publicTopicAcls());
topicAcls.addAll(protectedTopicAcls());
topicAcls.addAll(privateTopicAcls());
topicAcls.addAll(prefixedAcls(TOPIC, id(), principal(), IDEMPOTENT_WRITE));
return topicAcls;
}

/**
* Get the set of required ACLs.
* Get the set of required ACLs for this domain spec.
*
* <p>This includes ACLs for:
* <p>This includes {@code ALLOW} ACLs for:
*
* <ul>
* <li>Acls for everyone to consume the spec's public topics
* <li>Acls for configured domains to consume the spec's protected topics
* <li>Acls for the spec's domain to produce and consume its topics
* <li>Acls for the spec's domain to use its consumer groups
* <li>Everyone to consume the spec's public topics
* <li>Specifically configured domains to consume the spec's protected topics
* <li>The spec's domain to be able to create ad-hoc private topics
* <li>The spec's domain to produce and consume its topics
* <li>The spec's domain to use its own consumer groups
* </ul>
*
* @return returns the set of required acls.
*/
public Set<AclBinding> requiredAcls() {
final String id = apiSpec.id();
final String principal = formatPrincipal(apiSpec.id());

final Set<AclBinding> acls = new HashSet<>();
acls.addAll(prefixedAcls(GROUP, id, principal, READ));
acls.addAll(ownGroupAcls());
acls.addAll(listACLsForDomainOwnedTopics());
return acls;
}
Expand All @@ -194,10 +141,7 @@ public SchemaInfo schemaInfoForTopic(final String topicName) {
myTopics.stream()
.filter(topic -> topic.name().equals(topicName))
.findFirst()
.orElseThrow(
() ->
new IllegalArgumentException(
"Could not find 'owned' topic for:" + topicName));
.orElseThrow(() -> new IllegalArgumentException("Not a domain topic:" + topicName));

return apiSpec.channels().get(topicName).publish().schemaInfo();
}
Expand All @@ -224,6 +168,65 @@ public static String formatPrincipal(final String domainIdAsUsername) {
return "User:" + domainIdAsUsername;
}

private void validateTopicConfig() {
apiSpec.channels()
.forEach(
(name, channel) -> {
if (name.startsWith(id())
&& channel.publish() != null
&& (channel.bindings() == null
|| channel.bindings().kafka() == null)) {
throw new IllegalStateException(
"Kafka bindings are missing from channel: ["
+ name
+ "] Domain owner: ["
+ id()
+ "]");
}
});
}

private String principal() {
return formatPrincipal(id());
}

private Set<AclBinding> ownGroupAcls() {
return prefixedAcls(GROUP, id(), principal(), READ);
}

private Set<AclBinding> ownTopicAcls() {
return prefixedAcls(TOPIC, id(), principal(), DESCRIBE, READ, WRITE);
}

private Set<AclBinding> publicTopicAcls() {
return prefixedAcls(TOPIC, id() + "._public", "User:*", DESCRIBE, READ);
}

private List<AclBinding> protectedTopicAcls() {
return apiSpec.channels().entrySet().stream()
.filter(e -> e.getKey().startsWith(id() + "._protected."))
.filter(e -> e.getValue().publish().tags().toString().contains(GRANT_ACCESS_TAG))
.flatMap(
e ->
e.getValue().publish().tags().stream()
.filter(tag -> tag.name().startsWith(GRANT_ACCESS_TAG))
.map(tag -> tag.name().substring(GRANT_ACCESS_TAG.length()))
.map(
user ->
literalAcls(
TOPIC,
e.getKey(),
formatPrincipal(user),
DESCRIBE,
READ))
.flatMap(Collection::stream))
.collect(Collectors.toList());
}

private Set<AclBinding> privateTopicAcls() {
return prefixedAcls(TOPIC, id() + "._private", principal(), CREATE);
}

private static Set<AclBinding> literalAcls(
final ResourceType resourceType,
final String resourceName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG;
import static org.apache.kafka.common.acl.AclOperation.ALL;
import static org.apache.kafka.common.acl.AclOperation.IDEMPOTENT_WRITE;
import static org.apache.kafka.common.acl.AclPermissionType.ALLOW;
import static org.apache.kafka.common.resource.PatternType.LITERAL;
import static org.apache.kafka.common.resource.ResourceType.CLUSTER;
Expand All @@ -36,15 +37,19 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.errors.TopicAuthorizationException;
Expand Down Expand Up @@ -146,12 +151,12 @@ void shouldHaveCorrectProduceAndConsumeAcls(

domainConsumer.subscribe(List.of(topic.topicName));

final Executable poll = () -> domainConsumer.poll(Duration.ofSeconds(1));
final Executable poll = () -> domainConsumer.poll(Duration.ofMillis(500));
if (canConsume) {
poll.execute();
} else {
final Exception e = assertThrows(TopicAuthorizationException.class, poll);
assertTopicAuthorizationException(topic, e);
assertThat((Throwable) e, instanceOf(TopicAuthorizationException.class));
}

final Executable send =
Expand All @@ -164,7 +169,7 @@ void shouldHaveCorrectProduceAndConsumeAcls(
send.execute();
} else {
final Throwable e = assertThrows(ExecutionException.class, send).getCause();
assertTopicAuthorizationException(topic, e);
assertThat(e, instanceOf(TopicAuthorizationException.class));
}

if (canConsume && canProduce) {
Expand All @@ -182,10 +187,33 @@ void shouldHaveCorrectProduceAndConsumeAcls(
}
}

private void assertTopicAuthorizationException(final Topic topic, final Throwable e) {
assertThat(e, instanceOf(TopicAuthorizationException.class));
assertThat(e.getMessage(), containsString("Not authorized to access topics"));
assertThat(e.getMessage(), containsString(topic.topicName));
@CartesianTest(name = "topic: {0}, domain: {1}")
void shouldHaveCorrectTopicCreationAcls(
@CartesianTest.Enum final Topic topic, @CartesianTest.Enum final Domain domain)
throws Throwable {

final boolean shouldSucceed = topic == Topic.PRIVATE && domain == Domain.SELF;
final NewTopic newTopic =
new NewTopic(topic.topicName + "." + UUID.randomUUID(), 1, (short) 1);

try (Admin adminClient = adminClient(domain)) {
final KafkaFuture<Void> f = adminClient.createTopics(List.of(newTopic)).all();
final Executable executable = () -> f.get(30, TimeUnit.SECONDS);

if (shouldSucceed) {
executable.execute();
} else {
final Throwable cause =
assertThrows(ExecutionException.class, executable).getCause();
assertThat(cause, is(instanceOf(TopicAuthorizationException.class)));
}
}
}

private Admin adminClient(final Domain domain) {
final Map<String, Object> props = clientProperties();
props.putAll(saslAuthProperties(domain.domainId));
return AdminClient.create(props);
}

private KafkaProducer<Long, String> domainProducer(final Domain domain) {
Expand Down Expand Up @@ -245,7 +273,7 @@ private static Set<AclBinding> aclsForOtherDomain(final Domain domain) {
return Set.of(
new AclBinding(
new ResourcePattern(CLUSTER, "kafka-cluster", LITERAL),
new AccessControlEntry(principal, "*", ALL, ALLOW)),
new AccessControlEntry(principal, "*", IDEMPOTENT_WRITE, ALLOW)),
new AclBinding(
new ResourcePattern(GROUP, domain.domainId, LITERAL),
new AccessControlEntry(principal, "*", ALL, ALLOW)));
Expand Down
7 changes: 6 additions & 1 deletion kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,12 @@ public void shouldGenerateAclToAllowSelfControlOfPrivateTopics() {
+ " name=london.hammersmith.olympia.bigdatalondon,"
+ " patternType=PREFIXED),"
+ " entry=(principal=User:london.hammersmith.olympia.bigdatalondon,"
+ " host=*, operation=WRITE, permissionType=ALLOW))"));
+ " host=*, operation=WRITE, permissionType=ALLOW))",
"(pattern=ResourcePattern(resourceType=TOPIC,"
+ " name=london.hammersmith.olympia.bigdatalondon._private,"
+ " patternType=PREFIXED),"
+ " entry=(principal=User:london.hammersmith.olympia.bigdatalondon,"
+ " host=*, operation=CREATE, permissionType=ALLOW))"));
}

@Test
Expand Down

0 comments on commit d658132

Please sign in to comment.