Skip to content

Commit

Permalink
#186 - support ACL perms via tags (#197)
Browse files Browse the repository at this point in the history
  • Loading branch information
bluemonk3y authored Sep 7, 2023
1 parent 7b5cc11 commit 718413c
Show file tree
Hide file tree
Showing 8 changed files with 444 additions and 15 deletions.
26 changes: 25 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,33 @@ channels:
{"simple.spec_demo._public.user_signed_up":{"id":"some.other.app","members":[{"id":"console-consumer-7f9d23c7-a627-41cd-ade9-3919164bc363","clientId":"console-consumer","host":"/172.30.0.3","partitions":[{"id":0,"topic":"simple.spec_demo._public.user_signed_up","offset":57,"timestamp":-1}]}],"offsetTotal":57}}
```

## ACLs / Permissions

Notice how `_private`, `_public` or `_protected` is prefixed to the channel. This keyword can be altered in the following ways:
- it can be changed by passing the System.property as follows: `-Dspecmesh.public=everyone' -Dspecmesh.protected=some -Dspecmesh.private=mine`
- instead of 'inlining' the permission on the channel name, for example `_public.myTopic` - the permission can be controlled via channel.operation.tags see below for an example.


```yaml
channels:
# protected
retail.subway.food.purchase:
bindings:
kafka:
publish:
tags: [
name: "grant-access:some.other.domain.root"
]
```
```yaml
channels:
# public
attendee:
bindings:
publish:
tags: [
name: "grant-access:_public"
]
```
# Developer Notes

1. Install the intellij checkstyle plugin and load the config from config/checkstyle.xml
2 changes: 2 additions & 0 deletions kafka/src/main/java/io/specmesh/kafka/Exporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.specmesh.apiparser.model.Bindings;
import io.specmesh.apiparser.model.Channel;
import io.specmesh.apiparser.model.KafkaBinding;
import io.specmesh.apiparser.model.Operation;
import io.specmesh.kafka.provision.TopicProvisioner.Topic;
import io.specmesh.kafka.provision.TopicReaders;
import io.specmesh.kafka.provision.TopicReaders.TopicsReaderBuilder;
Expand Down Expand Up @@ -85,6 +86,7 @@ private static TopicReaders.TopicReader reader(
private static Channel channel(final Topic topic) {
return Channel.builder()
.bindings(Bindings.builder().kafka(kafkaBindings(topic)).build())
.publish(Operation.builder().build())
.build();
}

Expand Down
55 changes: 53 additions & 2 deletions kafka/src/main/java/io/specmesh/kafka/KafkaApiSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ public List<NewTopic> listDomainOwnedTopics() {

/**
* Create an ACL for the domain-id principal that allows writing to any topic prefixed with the
* Id Prevent non ACL'd ones from writing to it (somehow)
* `id` and prevent non ACL'd ones from writing to it (deny access when no acl found property
* must be set)
*
* @return Acl bindings for owned topics
* @deprecated use {@link #requiredAcls()}
Expand Down Expand Up @@ -139,6 +140,7 @@ public Set<AclBinding> requiredAcls() {
final Set<AclBinding> acls = new HashSet<>();
acls.addAll(ownGroupAcls());
acls.addAll(listACLsForDomainOwnedTopics());
acls.addAll(grantAccessControlUsingGrantTagOnly());
return acls;
}

Expand All @@ -165,7 +167,7 @@ public SchemaInfo schemaInfoForTopic(final String topicName) {
* @return the principal
*/
public static String formatPrincipal(final String domainIdAsUsername) {
return "User:" + domainIdAsUsername;
return domainIdAsUsername.equals(PUBLIC) ? "User:*" : "User:" + domainIdAsUsername;
}

private void validateTopicConfig() {
Expand Down Expand Up @@ -227,6 +229,55 @@ private List<AclBinding> protectedTopicAcls() {
.collect(Collectors.toList());
}

/**
* Uses the alternative grant approach - rather than relying on _public, _protected, _private in
* the path it returns ACLs based upon the `grant-access` notation whereby access is
* protected/public using the following: - protected/retricted --> grant-acess:domain.something
* - public --> grant-access:_public
*
* @return ACLs according to the grant-access tags
*/
@SuppressWarnings("checkstyle:BooleanExpressionComplexity")
private List<AclBinding> grantAccessControlUsingGrantTagOnly() {
return apiSpec.channels().entrySet().stream()
.filter(
e ->
e.getValue().publish() != null
&& !isUsingPathPerms(e.getKey())
&& e.getValue()
.publish()
.tags()
.toString()
.contains(GRANT_ACCESS_TAG))
.flatMap(
e ->
e.getValue().publish().tags().stream()
.filter(tag -> tag.name().startsWith(GRANT_ACCESS_TAG))
.map(tag -> tag.name().substring(GRANT_ACCESS_TAG.length()))
.map(
user ->
literalAcls(
TOPIC,
e.getKey(),
formatPrincipal(user),
DESCRIBE,
READ))
.flatMap(Collection::stream))
.collect(Collectors.toList());
}

/**
* the path is using public,private explicit based control
*
* @param key
* @return true if it is
*/
private boolean isUsingPathPerms(final String key) {
return key.startsWith(id() + DELIMITER + PRIVATE + DELIMITER)
|| key.startsWith(id() + DELIMITER + PROTECTED + DELIMITER)
|| key.startsWith(id() + DELIMITER + PUBLIC + DELIMITER);
}

private Set<AclBinding> privateTopicAcls() {
return prefixedAcls(TOPIC, id() + DELIMITER + PRIVATE, principal(), CREATE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class KafkaAPISpecTest {
private static final KafkaApiSpec API_SPEC =
TestSpecLoader.loadFromClassPath("bigdatalondon-api.yaml");

private enum ExpectedAcl {
enum ExpectedAcl {
READ_PUBLIC_TOPICS(
"(pattern=ResourcePattern(resourceType=TOPIC,"
+ " name=london.hammersmith.olympia.bigdatalondon._public,"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
/*
* Copyright 2023 SpecMesh Contributors (https://github.com/specmesh)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.specmesh.kafka;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;

import io.specmesh.apiparser.model.SchemaInfo;
import io.specmesh.test.TestSpecLoader;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.acl.AclBinding;
import org.junit.jupiter.api.Test;

public class KafkaAPISpecWithGrantAccessAclsTest {

private static final KafkaApiSpec API_SPEC =
TestSpecLoader.loadFromClassPath("bigdatalondon-api-with-grant-access-acls.yaml");

private enum ExpectedAcl {
READ_PUBLIC_TOPICS(
"(pattern=ResourcePattern(resourceType=TOPIC,"
+ " name=london.hammersmith.olympia.bigdatalondon.attendee,"
+ " patternType=LITERAL), entry=(principal=User:*, host=*,"
+ " operation=READ, permissionType=ALLOW))"),
DESCRIBE_PUBLIC_TOPICS(
"(pattern=ResourcePattern(resourceType=TOPIC,"
+ " name=london.hammersmith.olympia.bigdatalondon.attendee,"
+ " patternType=LITERAL), entry=(principal=User:*, host=*,"
+ " operation=DESCRIBE, permissionType=ALLOW))"),
READ_PROTECTED_TOPICS(
"(pattern=ResourcePattern(resourceType=TOPIC,"
+ " name=london.hammersmith.olympia.bigdatalondon.retail.subway.food.purchase,"
+ " patternType=LITERAL), entry=(principal=User:some.other.domain.root, host=*,"
+ " operation=READ, permissionType=ALLOW))"),
DESCRIBE_PROTECTED_TOPICS(
"(pattern=ResourcePattern(resourceType=TOPIC,"
+ " name=london.hammersmith.olympia.bigdatalondon.retail.subway.food.purchase,"
+ " patternType=LITERAL), entry=(principal=User:some.other.domain.root, host=*,"
+ " operation=DESCRIBE, permissionType=ALLOW))"),
DESCRIBE_OWN_TOPICS(
"(pattern=ResourcePattern(resourceType=TOPIC,"
+ " name=london.hammersmith.olympia.bigdatalondon,"
+ " patternType=PREFIXED),"
+ " entry=(principal=User:london.hammersmith.olympia.bigdatalondon,"
+ " host=*, operation=DESCRIBE, permissionType=ALLOW))"),
READ_OWN_TOPICS(
"(pattern=ResourcePattern(resourceType=TOPIC,"
+ " name=london.hammersmith.olympia.bigdatalondon,"
+ " patternType=PREFIXED),"
+ " entry=(principal=User:london.hammersmith.olympia.bigdatalondon,"
+ " host=*, operation=READ, permissionType=ALLOW))"),
WRITE_OWN_TOPICS(
"(pattern=ResourcePattern(resourceType=TOPIC,"
+ " name=london.hammersmith.olympia.bigdatalondon,"
+ " patternType=PREFIXED),"
+ " entry=(principal=User:london.hammersmith.olympia.bigdatalondon,"
+ " host=*, operation=WRITE, permissionType=ALLOW))"),
CREATE_OWN_PRIVATE_TOPICS(
"(pattern=ResourcePattern(resourceType=TOPIC,"
+ " name=london.hammersmith.olympia.bigdatalondon._private,"
+ " patternType=PREFIXED),"
+ " entry=(principal=User:london.hammersmith.olympia.bigdatalondon,"
+ " host=*, operation=CREATE, permissionType=ALLOW))"),
READ_OWN_GROUPS(
"(pattern=ResourcePattern(resourceType=GROUP,"
+ " name=london.hammersmith.olympia.bigdatalondon,"
+ " patternType=PREFIXED),"
+ " entry=(principal=User:london.hammersmith.olympia.bigdatalondon,"
+ " host=*, operation=READ, permissionType=ALLOW))"),
WRITE_OWN_TX_IDS(
"(pattern=ResourcePattern(resourceType=TRANSACTIONAL_ID,"
+ " name=london.hammersmith.olympia.bigdatalondon, patternType=PREFIXED),"
+ " entry=(principal=User:london.hammersmith.olympia.bigdatalondon, host=*,"
+ " operation=WRITE, permissionType=ALLOW))"),
DESCRIBE_OWN_TX_IDS(
"(pattern=ResourcePattern(resourceType=TRANSACTIONAL_ID,"
+ " name=london.hammersmith.olympia.bigdatalondon, patternType=PREFIXED),"
+ " entry=(principal=User:london.hammersmith.olympia.bigdatalondon, host=*,"
+ " operation=DESCRIBE, permissionType=ALLOW))"),
OWN_IDEMPOTENT_WRITE(
"(pattern=ResourcePattern(resourceType=CLUSTER, name=kafka-cluster,"
+ " patternType=PREFIXED),"
+ " entry=(principal=User:london.hammersmith.olympia.bigdatalondon, host=*,"
+ " operation=IDEMPOTENT_WRITE, permissionType=ALLOW))");

final String text;

ExpectedAcl(final String text) {
this.text = text;
}
}

@Test
public void shouldListAppOwnedTopics() {
final List<NewTopic> newTopics = API_SPEC.listDomainOwnedTopics();
assertThat(newTopics, hasSize(3));
}

@Test
public void shouldGenerateAclToAllowAnyOneToConsumePublicTopics() {
final Set<AclBinding> acls = API_SPEC.requiredAcls();

assertThat(
acls.stream().map(Object::toString).collect(Collectors.toSet()),
hasItems(
ExpectedAcl.READ_PUBLIC_TOPICS.text,
ExpectedAcl.DESCRIBE_PUBLIC_TOPICS.text));
}

@Test
public void shouldGenerateAclToAllowSpecificUsersToConsumeProtectedTopics() {
final Set<AclBinding> acls = API_SPEC.requiredAcls();

assertThat(
acls.stream().map(Object::toString).collect(Collectors.toSet()),
hasItems(
ExpectedAcl.READ_PROTECTED_TOPICS.text,
ExpectedAcl.DESCRIBE_PROTECTED_TOPICS.text));
}

@Test
public void shouldGenerateAclToAllowControlOfOwnPrivateTopics() {
final Set<AclBinding> acls = API_SPEC.requiredAcls();

assertThat(
acls.stream().map(Object::toString).collect(Collectors.toSet()),
hasItems(
ExpectedAcl.DESCRIBE_OWN_TOPICS.text,
ExpectedAcl.READ_OWN_TOPICS.text,
ExpectedAcl.WRITE_OWN_TOPICS.text,
ExpectedAcl.CREATE_OWN_PRIVATE_TOPICS.text));
}

@Test
public void shouldGenerateAclsToAllowToUseOwnConsumerGroups() {
final Set<AclBinding> acls = API_SPEC.requiredAcls();

assertThat(
acls.stream().map(Object::toString).collect(Collectors.toSet()),
hasItems(ExpectedAcl.READ_OWN_GROUPS.text));
}

@Test
public void shouldGenerateAclsToAllowToUseOwnTransactionId() {
final Set<AclBinding> acls = API_SPEC.requiredAcls();

assertThat(
acls.stream().map(Object::toString).collect(Collectors.toSet()),
hasItems(ExpectedAcl.WRITE_OWN_TX_IDS.text, ExpectedAcl.DESCRIBE_OWN_TX_IDS.text));
}

@Test
public void shouldGenerateAclsToAllowIdempotentWriteOnOlderClusters() {
final Set<AclBinding> acls = API_SPEC.requiredAcls();

assertThat(
acls.stream().map(Object::toString).collect(Collectors.toSet()),
hasItems(ExpectedAcl.OWN_IDEMPOTENT_WRITE.text));
}

@Test
void shouldNotHaveAnyAdditionalAcls() {
final Set<String> testAcls =
new HashSet<>(
Arrays.stream(ExpectedAcl.values())
.map(e -> e.text)
.collect(Collectors.toSet()));

// need to support `_public` access prefixes
testAcls.add(KafkaAPISpecTest.ExpectedAcl.READ_PUBLIC_TOPICS.text);
testAcls.add(KafkaAPISpecTest.ExpectedAcl.DESCRIBE_PUBLIC_TOPICS.text);

final Set<String> specAcls =
API_SPEC.requiredAcls().stream().map(Object::toString).collect(Collectors.toSet());
assertThat(specAcls, containsInAnyOrder(testAcls.toArray()));
}

@Test
public void shouldGetSchemaInfoForOwnedTopics() {
final List<NewTopic> newTopics = API_SPEC.listDomainOwnedTopics();
final SchemaInfo schemaInfo = API_SPEC.schemaInfoForTopic(newTopics.get(0).name());
assertThat(schemaInfo.schemaIdLocation(), is("header"));
}
}
Loading

0 comments on commit 718413c

Please sign in to comment.