diff --git a/kafka-test/src/test/java/io/specmesh/kafka/ClientsFunctionalDemoTest.java b/kafka-test/src/test/java/io/specmesh/kafka/ClientsFunctionalDemoTest.java index 972caaec..42900e53 100644 --- a/kafka-test/src/test/java/io/specmesh/kafka/ClientsFunctionalDemoTest.java +++ b/kafka-test/src/test/java/io/specmesh/kafka/ClientsFunctionalDemoTest.java @@ -21,6 +21,7 @@ import static org.apache.kafka.common.acl.AclOperation.ALL; import static org.apache.kafka.common.acl.AclPermissionType.ALLOW; import static org.apache.kafka.common.resource.PatternType.LITERAL; +import static org.apache.kafka.common.resource.Resource.CLUSTER_NAME; import static org.apache.kafka.common.resource.ResourceType.CLUSTER; import static org.apache.kafka.common.resource.ResourceType.GROUP; import static org.apache.kafka.streams.kstream.Produced.with; @@ -353,7 +354,7 @@ private static Set aclsForOtherDomain() { final String principal = "User:" + DIFFERENT_USER; return Set.of( new AclBinding( - new ResourcePattern(CLUSTER, "kafka-cluster", LITERAL), + new ResourcePattern(CLUSTER, CLUSTER_NAME, LITERAL), new AccessControlEntry(principal, "*", ALL, ALLOW)), new AclBinding( new ResourcePattern(GROUP, DIFFERENT_USER, LITERAL), diff --git a/kafka/README.md b/kafka/README.md index 159cef20..2425a6ae 100644 --- a/kafka/README.md +++ b/kafka/README.md @@ -15,6 +15,15 @@ making it trivial to trace back the topic name to its owning domain. ## Authorisation +## Group authorisation + +The provisioner will set ACLs to allow the domain to use any consumer group prefixed with the domain id. +It is recommended that, in most cases, domain services use a consumer group name of `-`. + +## Transaction id authorisation + +The provisioner will set ACLs to allow the domain to use any transaction id prefixed with the domain id. + ## Topic authorisation The provisioner will set ACLs to enforce the following topic authorisation: diff --git a/kafka/src/main/java/io/specmesh/kafka/KafkaApiSpec.java b/kafka/src/main/java/io/specmesh/kafka/KafkaApiSpec.java index 6a1e8c44..5c3a5625 100644 --- a/kafka/src/main/java/io/specmesh/kafka/KafkaApiSpec.java +++ b/kafka/src/main/java/io/specmesh/kafka/KafkaApiSpec.java @@ -22,8 +22,11 @@ import static org.apache.kafka.common.acl.AclOperation.IDEMPOTENT_WRITE; import static org.apache.kafka.common.acl.AclOperation.READ; import static org.apache.kafka.common.acl.AclOperation.WRITE; +import static org.apache.kafka.common.resource.Resource.CLUSTER_NAME; +import static org.apache.kafka.common.resource.ResourceType.CLUSTER; import static org.apache.kafka.common.resource.ResourceType.GROUP; import static org.apache.kafka.common.resource.ResourceType.TOPIC; +import static org.apache.kafka.common.resource.ResourceType.TRANSACTIONAL_ID; import io.specmesh.apiparser.model.ApiSpec; import io.specmesh.apiparser.model.SchemaInfo; @@ -101,10 +104,11 @@ public List listACLsForDomainOwnedTopics() { final List topicAcls = new ArrayList<>(); topicAcls.addAll(ownTopicAcls()); + topicAcls.addAll(ownTransactionIdsAcls()); topicAcls.addAll(publicTopicAcls()); topicAcls.addAll(protectedTopicAcls()); topicAcls.addAll(privateTopicAcls()); - topicAcls.addAll(prefixedAcls(TOPIC, id(), principal(), IDEMPOTENT_WRITE)); + topicAcls.addAll(prefixedAcls(CLUSTER, CLUSTER_NAME, principal(), IDEMPOTENT_WRITE)); return topicAcls; } @@ -119,6 +123,7 @@ public List listACLsForDomainOwnedTopics() { *
  • The spec's domain to be able to create ad-hoc private topics *
  • The spec's domain to produce and consume its topics *
  • The spec's domain to use its own consumer groups + *
  • The spec's domain to use its own transaction ids * * * @return returns the set of required acls. @@ -198,6 +203,10 @@ private Set ownTopicAcls() { return prefixedAcls(TOPIC, id(), principal(), DESCRIBE, READ, WRITE); } + private Set ownTransactionIdsAcls() { + return prefixedAcls(TRANSACTIONAL_ID, id(), principal(), DESCRIBE, WRITE); + } + private Set publicTopicAcls() { return prefixedAcls(TOPIC, id() + "._public", "User:*", DESCRIBE, READ); } diff --git a/kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecFunctionalTest.java b/kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecFunctionalTest.java index 5d72ff08..b1063282 100644 --- a/kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecFunctionalTest.java +++ b/kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecFunctionalTest.java @@ -22,8 +22,11 @@ import static org.apache.kafka.common.acl.AclOperation.IDEMPOTENT_WRITE; import static org.apache.kafka.common.acl.AclPermissionType.ALLOW; import static org.apache.kafka.common.resource.PatternType.LITERAL; +import static org.apache.kafka.common.resource.PatternType.PREFIXED; +import static org.apache.kafka.common.resource.Resource.CLUSTER_NAME; import static org.apache.kafka.common.resource.ResourceType.CLUSTER; import static org.apache.kafka.common.resource.ResourceType.GROUP; +import static org.apache.kafka.common.resource.ResourceType.TRANSACTIONAL_ID; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.instanceOf; @@ -42,12 +45,12 @@ import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClient; -import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.acl.AccessControlEntry; @@ -149,6 +152,8 @@ void shouldHaveCorrectProduceAndConsumeAcls( try (KafkaConsumer domainConsumer = domainConsumer(consumerDomain); KafkaProducer domainProducer = domainProducer(producerDomain)) { + domainProducer.initTransactions(); + domainConsumer.subscribe(List.of(topic.topicName)); final Executable poll = () -> domainConsumer.poll(Duration.ofMillis(500)); @@ -159,6 +164,8 @@ void shouldHaveCorrectProduceAndConsumeAcls( assertThat((Throwable) e, instanceOf(TopicAuthorizationException.class)); } + domainProducer.beginTransaction(); + final Executable send = () -> domainProducer @@ -167,9 +174,11 @@ void shouldHaveCorrectProduceAndConsumeAcls( if (canProduce) { send.execute(); + domainProducer.commitTransaction(); } else { final Throwable e = assertThrows(ExecutionException.class, send).getCause(); assertThat(e, instanceOf(TopicAuthorizationException.class)); + domainProducer.abortTransaction(); } if (canConsume && canProduce) { @@ -219,7 +228,8 @@ private Admin adminClient(final Domain domain) { private KafkaProducer domainProducer(final Domain domain) { final Map props = clientProperties(); props.putAll(saslAuthProperties(domain.domainId)); - props.put(AdminClientConfig.CLIENT_ID_CONFIG, domain.domainId + ".producer"); + props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); + props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, domain.domainId + ".txId"); return new KafkaProducer<>(props, Serdes.Long().serializer(), Serdes.String().serializer()); } @@ -229,8 +239,6 @@ private KafkaConsumer domainConsumer(final Domain domain) { props.putAll(saslAuthProperties(domain.domainId)); props.putAll( Map.of( - ConsumerConfig.CLIENT_ID_CONFIG, - domain.domainId + ".consumer", ConsumerConfig.GROUP_ID_CONFIG, domain.domainId, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, @@ -272,10 +280,13 @@ private static Set aclsForOtherDomain(final Domain domain) { final String principal = "User:" + domain.domainId; return Set.of( new AclBinding( - new ResourcePattern(CLUSTER, "kafka-cluster", LITERAL), + new ResourcePattern(CLUSTER, CLUSTER_NAME, LITERAL), new AccessControlEntry(principal, "*", IDEMPOTENT_WRITE, ALLOW)), new AclBinding( new ResourcePattern(GROUP, domain.domainId, LITERAL), + new AccessControlEntry(principal, "*", ALL, ALLOW)), + new AclBinding( + new ResourcePattern(TRANSACTIONAL_ID, domain.domainId, PREFIXED), new AccessControlEntry(principal, "*", ALL, ALLOW))); } } diff --git a/kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecTest.java b/kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecTest.java index 778f3871..7c96b576 100644 --- a/kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecTest.java +++ b/kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecTest.java @@ -17,12 +17,14 @@ package io.specmesh.kafka; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.hasItems; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import io.specmesh.apiparser.model.SchemaInfo; import io.specmesh.test.TestSpecLoader; +import java.util.Arrays; import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -35,6 +37,80 @@ public class KafkaAPISpecTest { private static final KafkaApiSpec API_SPEC = TestSpecLoader.loadFromClassPath("bigdatalondon-api.yaml"); + private enum ExpectedAcl { + READ_PUBLIC_TOPICS( + "(pattern=ResourcePattern(resourceType=TOPIC," + + " name=london.hammersmith.olympia.bigdatalondon._public," + + " patternType=PREFIXED), entry=(principal=User:*, host=*," + + " operation=READ, permissionType=ALLOW))"), + DESCRIBE_PUBLIC_TOPICS( + "(pattern=ResourcePattern(resourceType=TOPIC," + + " name=london.hammersmith.olympia.bigdatalondon._public," + + " patternType=PREFIXED), entry=(principal=User:*, host=*," + + " operation=DESCRIBE, permissionType=ALLOW))"), + READ_PROTECTED_TOPICS( + "(pattern=ResourcePattern(resourceType=TOPIC," + + " name=london.hammersmith.olympia.bigdatalondon._protected.retail.subway.food.purchase," + + " patternType=LITERAL), entry=(principal=User:.some.other.domain.root," + + " host=*, operation=READ, permissionType=ALLOW))"), + DESCRIBE_PROTECTED_TOPICS( + "(pattern=ResourcePattern(resourceType=TOPIC," + + " name=london.hammersmith.olympia.bigdatalondon._protected.retail.subway.food.purchase," + + " patternType=LITERAL), entry=(principal=User:.some.other.domain.root," + + " host=*, operation=DESCRIBE, permissionType=ALLOW))"), + DESCRIBE_OWN_TOPICS( + "(pattern=ResourcePattern(resourceType=TOPIC," + + " name=london.hammersmith.olympia.bigdatalondon," + + " patternType=PREFIXED)," + + " entry=(principal=User:london.hammersmith.olympia.bigdatalondon," + + " host=*, operation=DESCRIBE, permissionType=ALLOW))"), + READ_OWN_TOPICS( + "(pattern=ResourcePattern(resourceType=TOPIC," + + " name=london.hammersmith.olympia.bigdatalondon," + + " patternType=PREFIXED)," + + " entry=(principal=User:london.hammersmith.olympia.bigdatalondon," + + " host=*, operation=READ, permissionType=ALLOW))"), + WRITE_OWN_TOPICS( + "(pattern=ResourcePattern(resourceType=TOPIC," + + " name=london.hammersmith.olympia.bigdatalondon," + + " patternType=PREFIXED)," + + " entry=(principal=User:london.hammersmith.olympia.bigdatalondon," + + " host=*, operation=WRITE, permissionType=ALLOW))"), + CREATE_OWN_PRIVATE_TOPICS( + "(pattern=ResourcePattern(resourceType=TOPIC," + + " name=london.hammersmith.olympia.bigdatalondon._private," + + " patternType=PREFIXED)," + + " entry=(principal=User:london.hammersmith.olympia.bigdatalondon," + + " host=*, operation=CREATE, permissionType=ALLOW))"), + READ_OWN_GROUPS( + "(pattern=ResourcePattern(resourceType=GROUP," + + " name=london.hammersmith.olympia.bigdatalondon," + + " patternType=PREFIXED)," + + " entry=(principal=User:london.hammersmith.olympia.bigdatalondon," + + " host=*, operation=READ, permissionType=ALLOW))"), + WRITE_OWN_TX_IDS( + "(pattern=ResourcePattern(resourceType=TRANSACTIONAL_ID," + + " name=london.hammersmith.olympia.bigdatalondon, patternType=PREFIXED)," + + " entry=(principal=User:london.hammersmith.olympia.bigdatalondon, host=*," + + " operation=WRITE, permissionType=ALLOW))"), + DESCRIBE_OWN_TX_IDS( + "(pattern=ResourcePattern(resourceType=TRANSACTIONAL_ID," + + " name=london.hammersmith.olympia.bigdatalondon, patternType=PREFIXED)," + + " entry=(principal=User:london.hammersmith.olympia.bigdatalondon, host=*," + + " operation=DESCRIBE, permissionType=ALLOW))"), + OWN_IDEMPOTENT_WRITE( + "(pattern=ResourcePattern(resourceType=CLUSTER, name=kafka-cluster," + + " patternType=PREFIXED)," + + " entry=(principal=User:london.hammersmith.olympia.bigdatalondon, host=*," + + " operation=IDEMPOTENT_WRITE, permissionType=ALLOW))"); + + final String text; + + ExpectedAcl(final String text) { + this.text = text; + } + } + @Test public void shouldListAppOwnedTopics() { final List newTopics = API_SPEC.listDomainOwnedTopics(); @@ -48,14 +124,8 @@ public void shouldGenerateAclToAllowAnyOneToConsumePublicTopics() { assertThat( acls.stream().map(Object::toString).collect(Collectors.toSet()), hasItems( - "(pattern=ResourcePattern(resourceType=TOPIC," - + " name=london.hammersmith.olympia.bigdatalondon._public," - + " patternType=PREFIXED), entry=(principal=User:*, host=*," - + " operation=READ, permissionType=ALLOW))", - "(pattern=ResourcePattern(resourceType=TOPIC," - + " name=london.hammersmith.olympia.bigdatalondon._public," - + " patternType=PREFIXED), entry=(principal=User:*, host=*," - + " operation=DESCRIBE, permissionType=ALLOW))")); + ExpectedAcl.READ_PUBLIC_TOPICS.text, + ExpectedAcl.DESCRIBE_PUBLIC_TOPICS.text)); } @Test @@ -65,59 +135,57 @@ public void shouldGenerateAclToAllowSpecificUsersToConsumeProtectedTopics() { assertThat( acls.stream().map(Object::toString).collect(Collectors.toSet()), hasItems( - "(pattern=ResourcePattern(resourceType=TOPIC," - + " name=london.hammersmith.olympia.bigdatalondon._protected.retail.subway.food.purchase," - + " patternType=LITERAL)," - + " entry=(principal=User:.some.other.domain.root, host=*," - + " operation=READ, permissionType=ALLOW))", - "(pattern=ResourcePattern(resourceType=TOPIC," - + " name=london.hammersmith.olympia.bigdatalondon._protected.retail.subway.food.purchase," - + " patternType=LITERAL)," - + " entry=(principal=User:.some.other.domain.root, host=*," - + " operation=DESCRIBE, permissionType=ALLOW))")); + ExpectedAcl.READ_PROTECTED_TOPICS.text, + ExpectedAcl.DESCRIBE_PROTECTED_TOPICS.text)); } @Test - public void shouldGenerateAclToAllowSelfControlOfPrivateTopics() { + public void shouldGenerateAclToAllowControlOfOwnPrivateTopics() { final Set acls = API_SPEC.requiredAcls(); assertThat( acls.stream().map(Object::toString).collect(Collectors.toSet()), hasItems( - "(pattern=ResourcePattern(resourceType=TOPIC," - + " name=london.hammersmith.olympia.bigdatalondon," - + " patternType=PREFIXED)," - + " entry=(principal=User:london.hammersmith.olympia.bigdatalondon," - + " host=*, operation=DESCRIBE, permissionType=ALLOW))", - "(pattern=ResourcePattern(resourceType=TOPIC," - + " name=london.hammersmith.olympia.bigdatalondon," - + " patternType=PREFIXED)," - + " entry=(principal=User:london.hammersmith.olympia.bigdatalondon," - + " host=*, operation=READ, permissionType=ALLOW))", - "(pattern=ResourcePattern(resourceType=TOPIC," - + " name=london.hammersmith.olympia.bigdatalondon," - + " patternType=PREFIXED)," - + " entry=(principal=User:london.hammersmith.olympia.bigdatalondon," - + " host=*, operation=WRITE, permissionType=ALLOW))", - "(pattern=ResourcePattern(resourceType=TOPIC," - + " name=london.hammersmith.olympia.bigdatalondon._private," - + " patternType=PREFIXED)," - + " entry=(principal=User:london.hammersmith.olympia.bigdatalondon," - + " host=*, operation=CREATE, permissionType=ALLOW))")); + ExpectedAcl.DESCRIBE_OWN_TOPICS.text, + ExpectedAcl.READ_OWN_TOPICS.text, + ExpectedAcl.WRITE_OWN_TOPICS.text, + ExpectedAcl.CREATE_OWN_PRIVATE_TOPICS.text)); } @Test - public void shouldGenerateAclsToAllowSelfToUseConsumerGroups() { + public void shouldGenerateAclsToAllowToUseOwnConsumerGroups() { final Set acls = API_SPEC.requiredAcls(); assertThat( acls.stream().map(Object::toString).collect(Collectors.toSet()), - hasItems( - "(pattern=ResourcePattern(resourceType=GROUP," - + " name=london.hammersmith.olympia.bigdatalondon," - + " patternType=PREFIXED)," - + " entry=(principal=User:london.hammersmith.olympia.bigdatalondon," - + " host=*, operation=READ, permissionType=ALLOW))")); + hasItems(ExpectedAcl.READ_OWN_GROUPS.text)); + } + + @Test + public void shouldGenerateAclsToAllowToUseOwnTransactionId() { + final Set acls = API_SPEC.requiredAcls(); + + assertThat( + acls.stream().map(Object::toString).collect(Collectors.toSet()), + hasItems(ExpectedAcl.WRITE_OWN_TX_IDS.text, ExpectedAcl.DESCRIBE_OWN_TX_IDS.text)); + } + + @Test + public void shouldGenerateAclsToAllowIdempotentWriteOnOlderClusters() { + final Set acls = API_SPEC.requiredAcls(); + + assertThat( + acls.stream().map(Object::toString).collect(Collectors.toSet()), + hasItems(ExpectedAcl.OWN_IDEMPOTENT_WRITE.text)); + } + + @Test + void shouldNotHaveAnyAdditionalAcls() { + final Set acls = API_SPEC.requiredAcls(); + + assertThat( + acls.stream().map(Object::toString).collect(Collectors.toSet()), + containsInAnyOrder(Arrays.stream(ExpectedAcl.values()).map(e -> e.text).toArray())); } @Test