From 718413c4abdabdb792fd5b49df752cef88084eb4 Mon Sep 17 00:00:00 2001 From: Neil Avery Date: Thu, 7 Sep 2023 10:04:31 +0100 Subject: [PATCH] #186 - support ACL perms via tags (#197) --- README.md | 26 ++- .../main/java/io/specmesh/kafka/Exporter.java | 2 + .../java/io/specmesh/kafka/KafkaApiSpec.java | 55 ++++- .../io/specmesh/kafka/KafkaAPISpecTest.java | 2 +- .../KafkaAPISpecWithGrantAccessAclsTest.java | 206 ++++++++++++++++++ ...datalondon-api-with-grant-access-acls.yaml | 143 ++++++++++++ .../io/specmesh/apiparser/model/ApiSpec.java | 21 +- .../specmesh/apiparser/model/Operation.java | 4 +- 8 files changed, 444 insertions(+), 15 deletions(-) create mode 100644 kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecWithGrantAccessAclsTest.java create mode 100644 kafka/src/test/resources/bigdatalondon-api-with-grant-access-acls.yaml diff --git a/README.md b/README.md index d07ae9b4..5eaf4a59 100644 --- a/README.md +++ b/README.md @@ -105,9 +105,33 @@ channels: {"simple.spec_demo._public.user_signed_up":{"id":"some.other.app","members":[{"id":"console-consumer-7f9d23c7-a627-41cd-ade9-3919164bc363","clientId":"console-consumer","host":"/172.30.0.3","partitions":[{"id":0,"topic":"simple.spec_demo._public.user_signed_up","offset":57,"timestamp":-1}]}],"offsetTotal":57}} ``` +## ACLs / Permissions +Notice how `_private`, `_public` or `_protected` is prefixed to the channel. This keyword can be altered in the following ways: +- it can be changed by passing the System.property as follows: `-Dspecmesh.public=everyone' -Dspecmesh.protected=some -Dspecmesh.private=mine` +- instead of 'inlining' the permission on the channel name, for example `_public.myTopic` - the permission can be controlled via channel.operation.tags see below for an example. - +```yaml +channels: + # protected + retail.subway.food.purchase: + bindings: + kafka: + publish: + tags: [ + name: "grant-access:some.other.domain.root" + ] +``` +```yaml +channels: + # public + attendee: + bindings: + publish: + tags: [ + name: "grant-access:_public" + ] +``` # Developer Notes 1. Install the intellij checkstyle plugin and load the config from config/checkstyle.xml diff --git a/kafka/src/main/java/io/specmesh/kafka/Exporter.java b/kafka/src/main/java/io/specmesh/kafka/Exporter.java index 44a26373..e0d4d175 100644 --- a/kafka/src/main/java/io/specmesh/kafka/Exporter.java +++ b/kafka/src/main/java/io/specmesh/kafka/Exporter.java @@ -20,6 +20,7 @@ import io.specmesh.apiparser.model.Bindings; import io.specmesh.apiparser.model.Channel; import io.specmesh.apiparser.model.KafkaBinding; +import io.specmesh.apiparser.model.Operation; import io.specmesh.kafka.provision.TopicProvisioner.Topic; import io.specmesh.kafka.provision.TopicReaders; import io.specmesh.kafka.provision.TopicReaders.TopicsReaderBuilder; @@ -85,6 +86,7 @@ private static TopicReaders.TopicReader reader( private static Channel channel(final Topic topic) { return Channel.builder() .bindings(Bindings.builder().kafka(kafkaBindings(topic)).build()) + .publish(Operation.builder().build()) .build(); } diff --git a/kafka/src/main/java/io/specmesh/kafka/KafkaApiSpec.java b/kafka/src/main/java/io/specmesh/kafka/KafkaApiSpec.java index 45e614ee..e0001db9 100644 --- a/kafka/src/main/java/io/specmesh/kafka/KafkaApiSpec.java +++ b/kafka/src/main/java/io/specmesh/kafka/KafkaApiSpec.java @@ -100,7 +100,8 @@ public List listDomainOwnedTopics() { /** * Create an ACL for the domain-id principal that allows writing to any topic prefixed with the - * Id Prevent non ACL'd ones from writing to it (somehow) + * `id` and prevent non ACL'd ones from writing to it (deny access when no acl found property + * must be set) * * @return Acl bindings for owned topics * @deprecated use {@link #requiredAcls()} @@ -139,6 +140,7 @@ public Set requiredAcls() { final Set acls = new HashSet<>(); acls.addAll(ownGroupAcls()); acls.addAll(listACLsForDomainOwnedTopics()); + acls.addAll(grantAccessControlUsingGrantTagOnly()); return acls; } @@ -165,7 +167,7 @@ public SchemaInfo schemaInfoForTopic(final String topicName) { * @return the principal */ public static String formatPrincipal(final String domainIdAsUsername) { - return "User:" + domainIdAsUsername; + return domainIdAsUsername.equals(PUBLIC) ? "User:*" : "User:" + domainIdAsUsername; } private void validateTopicConfig() { @@ -227,6 +229,55 @@ private List protectedTopicAcls() { .collect(Collectors.toList()); } + /** + * Uses the alternative grant approach - rather than relying on _public, _protected, _private in + * the path it returns ACLs based upon the `grant-access` notation whereby access is + * protected/public using the following: - protected/retricted --> grant-acess:domain.something + * - public --> grant-access:_public + * + * @return ACLs according to the grant-access tags + */ + @SuppressWarnings("checkstyle:BooleanExpressionComplexity") + private List grantAccessControlUsingGrantTagOnly() { + return apiSpec.channels().entrySet().stream() + .filter( + e -> + e.getValue().publish() != null + && !isUsingPathPerms(e.getKey()) + && e.getValue() + .publish() + .tags() + .toString() + .contains(GRANT_ACCESS_TAG)) + .flatMap( + e -> + e.getValue().publish().tags().stream() + .filter(tag -> tag.name().startsWith(GRANT_ACCESS_TAG)) + .map(tag -> tag.name().substring(GRANT_ACCESS_TAG.length())) + .map( + user -> + literalAcls( + TOPIC, + e.getKey(), + formatPrincipal(user), + DESCRIBE, + READ)) + .flatMap(Collection::stream)) + .collect(Collectors.toList()); + } + + /** + * the path is using public,private explicit based control + * + * @param key + * @return true if it is + */ + private boolean isUsingPathPerms(final String key) { + return key.startsWith(id() + DELIMITER + PRIVATE + DELIMITER) + || key.startsWith(id() + DELIMITER + PROTECTED + DELIMITER) + || key.startsWith(id() + DELIMITER + PUBLIC + DELIMITER); + } + private Set privateTopicAcls() { return prefixedAcls(TOPIC, id() + DELIMITER + PRIVATE, principal(), CREATE); } diff --git a/kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecTest.java b/kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecTest.java index 1c12eb15..70be8391 100644 --- a/kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecTest.java +++ b/kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecTest.java @@ -37,7 +37,7 @@ public class KafkaAPISpecTest { private static final KafkaApiSpec API_SPEC = TestSpecLoader.loadFromClassPath("bigdatalondon-api.yaml"); - private enum ExpectedAcl { + enum ExpectedAcl { READ_PUBLIC_TOPICS( "(pattern=ResourcePattern(resourceType=TOPIC," + " name=london.hammersmith.olympia.bigdatalondon._public," diff --git a/kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecWithGrantAccessAclsTest.java b/kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecWithGrantAccessAclsTest.java new file mode 100644 index 00000000..fa61c399 --- /dev/null +++ b/kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecWithGrantAccessAclsTest.java @@ -0,0 +1,206 @@ +/* + * Copyright 2023 SpecMesh Contributors (https://github.com/specmesh) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.specmesh.kafka; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.hasItems; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; + +import io.specmesh.apiparser.model.SchemaInfo; +import io.specmesh.test.TestSpecLoader; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.common.acl.AclBinding; +import org.junit.jupiter.api.Test; + +public class KafkaAPISpecWithGrantAccessAclsTest { + + private static final KafkaApiSpec API_SPEC = + TestSpecLoader.loadFromClassPath("bigdatalondon-api-with-grant-access-acls.yaml"); + + private enum ExpectedAcl { + READ_PUBLIC_TOPICS( + "(pattern=ResourcePattern(resourceType=TOPIC," + + " name=london.hammersmith.olympia.bigdatalondon.attendee," + + " patternType=LITERAL), entry=(principal=User:*, host=*," + + " operation=READ, permissionType=ALLOW))"), + DESCRIBE_PUBLIC_TOPICS( + "(pattern=ResourcePattern(resourceType=TOPIC," + + " name=london.hammersmith.olympia.bigdatalondon.attendee," + + " patternType=LITERAL), entry=(principal=User:*, host=*," + + " operation=DESCRIBE, permissionType=ALLOW))"), + READ_PROTECTED_TOPICS( + "(pattern=ResourcePattern(resourceType=TOPIC," + + " name=london.hammersmith.olympia.bigdatalondon.retail.subway.food.purchase," + + " patternType=LITERAL), entry=(principal=User:some.other.domain.root, host=*," + + " operation=READ, permissionType=ALLOW))"), + DESCRIBE_PROTECTED_TOPICS( + "(pattern=ResourcePattern(resourceType=TOPIC," + + " name=london.hammersmith.olympia.bigdatalondon.retail.subway.food.purchase," + + " patternType=LITERAL), entry=(principal=User:some.other.domain.root, host=*," + + " operation=DESCRIBE, permissionType=ALLOW))"), + DESCRIBE_OWN_TOPICS( + "(pattern=ResourcePattern(resourceType=TOPIC," + + " name=london.hammersmith.olympia.bigdatalondon," + + " patternType=PREFIXED)," + + " entry=(principal=User:london.hammersmith.olympia.bigdatalondon," + + " host=*, operation=DESCRIBE, permissionType=ALLOW))"), + READ_OWN_TOPICS( + "(pattern=ResourcePattern(resourceType=TOPIC," + + " name=london.hammersmith.olympia.bigdatalondon," + + " patternType=PREFIXED)," + + " entry=(principal=User:london.hammersmith.olympia.bigdatalondon," + + " host=*, operation=READ, permissionType=ALLOW))"), + WRITE_OWN_TOPICS( + "(pattern=ResourcePattern(resourceType=TOPIC," + + " name=london.hammersmith.olympia.bigdatalondon," + + " patternType=PREFIXED)," + + " entry=(principal=User:london.hammersmith.olympia.bigdatalondon," + + " host=*, operation=WRITE, permissionType=ALLOW))"), + CREATE_OWN_PRIVATE_TOPICS( + "(pattern=ResourcePattern(resourceType=TOPIC," + + " name=london.hammersmith.olympia.bigdatalondon._private," + + " patternType=PREFIXED)," + + " entry=(principal=User:london.hammersmith.olympia.bigdatalondon," + + " host=*, operation=CREATE, permissionType=ALLOW))"), + READ_OWN_GROUPS( + "(pattern=ResourcePattern(resourceType=GROUP," + + " name=london.hammersmith.olympia.bigdatalondon," + + " patternType=PREFIXED)," + + " entry=(principal=User:london.hammersmith.olympia.bigdatalondon," + + " host=*, operation=READ, permissionType=ALLOW))"), + WRITE_OWN_TX_IDS( + "(pattern=ResourcePattern(resourceType=TRANSACTIONAL_ID," + + " name=london.hammersmith.olympia.bigdatalondon, patternType=PREFIXED)," + + " entry=(principal=User:london.hammersmith.olympia.bigdatalondon, host=*," + + " operation=WRITE, permissionType=ALLOW))"), + DESCRIBE_OWN_TX_IDS( + "(pattern=ResourcePattern(resourceType=TRANSACTIONAL_ID," + + " name=london.hammersmith.olympia.bigdatalondon, patternType=PREFIXED)," + + " entry=(principal=User:london.hammersmith.olympia.bigdatalondon, host=*," + + " operation=DESCRIBE, permissionType=ALLOW))"), + OWN_IDEMPOTENT_WRITE( + "(pattern=ResourcePattern(resourceType=CLUSTER, name=kafka-cluster," + + " patternType=PREFIXED)," + + " entry=(principal=User:london.hammersmith.olympia.bigdatalondon, host=*," + + " operation=IDEMPOTENT_WRITE, permissionType=ALLOW))"); + + final String text; + + ExpectedAcl(final String text) { + this.text = text; + } + } + + @Test + public void shouldListAppOwnedTopics() { + final List newTopics = API_SPEC.listDomainOwnedTopics(); + assertThat(newTopics, hasSize(3)); + } + + @Test + public void shouldGenerateAclToAllowAnyOneToConsumePublicTopics() { + final Set acls = API_SPEC.requiredAcls(); + + assertThat( + acls.stream().map(Object::toString).collect(Collectors.toSet()), + hasItems( + ExpectedAcl.READ_PUBLIC_TOPICS.text, + ExpectedAcl.DESCRIBE_PUBLIC_TOPICS.text)); + } + + @Test + public void shouldGenerateAclToAllowSpecificUsersToConsumeProtectedTopics() { + final Set acls = API_SPEC.requiredAcls(); + + assertThat( + acls.stream().map(Object::toString).collect(Collectors.toSet()), + hasItems( + ExpectedAcl.READ_PROTECTED_TOPICS.text, + ExpectedAcl.DESCRIBE_PROTECTED_TOPICS.text)); + } + + @Test + public void shouldGenerateAclToAllowControlOfOwnPrivateTopics() { + final Set acls = API_SPEC.requiredAcls(); + + assertThat( + acls.stream().map(Object::toString).collect(Collectors.toSet()), + hasItems( + ExpectedAcl.DESCRIBE_OWN_TOPICS.text, + ExpectedAcl.READ_OWN_TOPICS.text, + ExpectedAcl.WRITE_OWN_TOPICS.text, + ExpectedAcl.CREATE_OWN_PRIVATE_TOPICS.text)); + } + + @Test + public void shouldGenerateAclsToAllowToUseOwnConsumerGroups() { + final Set acls = API_SPEC.requiredAcls(); + + assertThat( + acls.stream().map(Object::toString).collect(Collectors.toSet()), + hasItems(ExpectedAcl.READ_OWN_GROUPS.text)); + } + + @Test + public void shouldGenerateAclsToAllowToUseOwnTransactionId() { + final Set acls = API_SPEC.requiredAcls(); + + assertThat( + acls.stream().map(Object::toString).collect(Collectors.toSet()), + hasItems(ExpectedAcl.WRITE_OWN_TX_IDS.text, ExpectedAcl.DESCRIBE_OWN_TX_IDS.text)); + } + + @Test + public void shouldGenerateAclsToAllowIdempotentWriteOnOlderClusters() { + final Set acls = API_SPEC.requiredAcls(); + + assertThat( + acls.stream().map(Object::toString).collect(Collectors.toSet()), + hasItems(ExpectedAcl.OWN_IDEMPOTENT_WRITE.text)); + } + + @Test + void shouldNotHaveAnyAdditionalAcls() { + final Set testAcls = + new HashSet<>( + Arrays.stream(ExpectedAcl.values()) + .map(e -> e.text) + .collect(Collectors.toSet())); + + // need to support `_public` access prefixes + testAcls.add(KafkaAPISpecTest.ExpectedAcl.READ_PUBLIC_TOPICS.text); + testAcls.add(KafkaAPISpecTest.ExpectedAcl.DESCRIBE_PUBLIC_TOPICS.text); + + final Set specAcls = + API_SPEC.requiredAcls().stream().map(Object::toString).collect(Collectors.toSet()); + assertThat(specAcls, containsInAnyOrder(testAcls.toArray())); + } + + @Test + public void shouldGetSchemaInfoForOwnedTopics() { + final List newTopics = API_SPEC.listDomainOwnedTopics(); + final SchemaInfo schemaInfo = API_SPEC.schemaInfoForTopic(newTopics.get(0).name()); + assertThat(schemaInfo.schemaIdLocation(), is("header")); + } +} diff --git a/kafka/src/test/resources/bigdatalondon-api-with-grant-access-acls.yaml b/kafka/src/test/resources/bigdatalondon-api-with-grant-access-acls.yaml new file mode 100644 index 00000000..0c0a153c --- /dev/null +++ b/kafka/src/test/resources/bigdatalondon-api-with-grant-access-acls.yaml @@ -0,0 +1,143 @@ +asyncapi: '2.5.0' +id: 'urn:london.hammersmith.olympia.bigdatalondon' + +info: + title: BigDataLondon API + version: '1.0.0' + description: | + Simple model of BigDataLondon as a Data Product + license: + name: Apache 2.0 + url: 'https://www.apache.org/licenses/LICENSE-2.0' + +servers: + test: + url: test.mykafkacluster.org:8092 + protocol: kafka-secure + description: Test broker + +channels: + # public + attendee: + # publish bindings to instruct topic configuration per environment + bindings: + kafka: + envs: + - staging + - prod + partitions: 10 + replicas: 1 + configs: + cleanup.policy: delete + publish: + summary: Humans arriving + operationId: onHumansArriving + tags: [ + name: "grant-access:_public" + ] + message: + name: Human + tags: + - name: "human" + description: "eats food" + - name: "big data london" + description: "eats food" + bindings: + kafka: + key: + type: string + enum: [ 'myKey' ] + schemaIdLocation: 'header' + bindingVersion: '0.3.0' + payload: + type: object + properties: + id: + type: integer + minimum: 0 + description: Id of the human + age: + type: integer + minimum: 0 + description: Age of the human + # protected + retail.subway.food.purchase: + bindings: + kafka: + partitions: 3 + + publish: + summary: Humans purchasing food + tags: [ + name: "grant-access:some.other.domain.root" + ] + message: + name: Food Item + tags: [ + name: "human", + name: "purchase" + ] + payload: + type: object + properties: + id: + type: integer + minimum: 0 + description: Id of the food + cost: + type: number + minimum: 0 + description: GBP cost of the food item + human_id: + type: integer + minimum: 0 + description: Id of the human purchasing the food + + # private + retail.subway.customers: + bindings: + kafka: + partitions: 3 + + publish: + summary: Humans customers + message: + name: Food Item + tags: [ + name: "human", + name: "customer" + ] + payload: + type: object + properties: + id: + type: integer + minimum: 0 + description: Id of the food + cost: + type: number + minimum: 0 + description: GBP cost of the food item + human_id: + type: integer + minimum: 0 + description: Id of the human purchasing the food + + + # subscribing to another domains channel + paris.hammersmith.transport._public.tube: + subscribe: + summary: Humans arriving in the borough + message: + name: Human + payload: + type: object + properties: + id: + type: integer + minimum: 0 + description: Id of the human + age: + type: integer + minimum: 0 + description: Age of the human \ No newline at end of file diff --git a/parser/src/main/java/io/specmesh/apiparser/model/ApiSpec.java b/parser/src/main/java/io/specmesh/apiparser/model/ApiSpec.java index 09f08269..68197b5a 100644 --- a/parser/src/main/java/io/specmesh/apiparser/model/ApiSpec.java +++ b/parser/src/main/java/io/specmesh/apiparser/model/ApiSpec.java @@ -72,30 +72,31 @@ private String validate(final String id) { } /** - * @return channels + * @return channels with names formatted using canonical path */ public Map channels() { return channels.entrySet().stream() .collect( Collectors.toMap( - e -> getCanonical(id(), e.getKey()), + e -> getCanonical(id(), e.getKey(), e.getValue().publish() != null), Map.Entry::getValue, (k, v) -> k, LinkedHashMap::new)); } - private String getCanonical(final String id, final String channelName) { + private String getCanonical(final String id, final String channelName, final boolean publish) { + // legacy and deprecated if (channelName.startsWith("/")) { return channelName.substring(1).replace('/', DELIMITER); - } else if (!(channelName.startsWith(PRIVATE) - || channelName.startsWith(PROTECTED) - || channelName.startsWith(PUBLIC))) { - return channelName; - } else if (channelName.startsWith(id)) { + } + // should not occur unless subdomains are being used + if (channelName.startsWith(id)) { return channelName; - - } else { + } + if (publish) { return id + DELIMITER + channelName.replace('/', DELIMITER); + } else { + return channelName; } } diff --git a/parser/src/main/java/io/specmesh/apiparser/model/Operation.java b/parser/src/main/java/io/specmesh/apiparser/model/Operation.java index 24dde071..227f4c63 100644 --- a/parser/src/main/java/io/specmesh/apiparser/model/Operation.java +++ b/parser/src/main/java/io/specmesh/apiparser/model/Operation.java @@ -89,6 +89,8 @@ public void validate() { } public boolean isSchemaRequired() { - return this.message().schemaRef() != null && this.message().schemaRef().length() > 0; + return this.message() != null + && this.message().schemaRef() != null + && this.message().schemaRef().length() > 0; } }